mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 06:23:00 +00:00
Did a small refactor, code clean, remove dead code, legacy etc
This commit is contained in:
parent
888df0385e
commit
eddf0553b7
@ -14,10 +14,6 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// For transition, alias main.GatewayConfig to pkg/gateway.Config
|
||||
// server.go will be removed; this keeps compatibility until then.
|
||||
type GatewayConfig = gateway.Config
|
||||
|
||||
func getEnvDefault(key, def string) string {
|
||||
if v := os.Getenv(key); strings.TrimSpace(v) != "" {
|
||||
return v
|
||||
|
||||
@ -53,13 +53,17 @@ func HandleStop() {
|
||||
// Reset failed state for any services that might be in failed state
|
||||
resetArgs := []string{"reset-failed"}
|
||||
resetArgs = append(resetArgs, services...)
|
||||
exec.Command("systemctl", resetArgs...).Run()
|
||||
if err := exec.Command("systemctl", resetArgs...).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Warning: Failed to reset-failed state: %v\n", err)
|
||||
}
|
||||
|
||||
// Wait again after reset-failed
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
// Stop again to ensure they're stopped
|
||||
exec.Command("systemctl", stopArgs...).Run()
|
||||
if err := exec.Command("systemctl", stopArgs...).Run(); err != nil {
|
||||
fmt.Printf(" ⚠️ Warning: Second stop attempt had errors: %v\n", err)
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
hadError := false
|
||||
|
||||
@ -60,10 +60,6 @@ func ParseFlags(args []string) (*Flags, error) {
|
||||
fs.IntVar(&flags.AnyoneBandwidth, "anyone-bandwidth", 30, "Limit relay to N% of VPS bandwidth (0=unlimited, runs speedtest)")
|
||||
fs.IntVar(&flags.AnyoneAccounting, "anyone-accounting", 0, "Monthly data cap for relay in GB (0=unlimited)")
|
||||
|
||||
// Support legacy flags for backwards compatibility
|
||||
nightly := fs.Bool("nightly", false, "Use nightly branch (deprecated, use --branch nightly)")
|
||||
main := fs.Bool("main", false, "Use main branch (deprecated, use --branch main)")
|
||||
|
||||
if err := fs.Parse(args); err != nil {
|
||||
if err == flag.ErrHelp {
|
||||
return nil, err
|
||||
@ -71,14 +67,6 @@ func ParseFlags(args []string) (*Flags, error) {
|
||||
return nil, fmt.Errorf("failed to parse flags: %w", err)
|
||||
}
|
||||
|
||||
// Handle legacy flags
|
||||
if *nightly {
|
||||
flags.Branch = "nightly"
|
||||
}
|
||||
if *main {
|
||||
flags.Branch = "main"
|
||||
}
|
||||
|
||||
// Set nameserver if explicitly provided
|
||||
if *nameserver {
|
||||
flags.Nameserver = nameserver
|
||||
|
||||
@ -10,6 +10,8 @@ import (
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/constants"
|
||||
)
|
||||
|
||||
var ErrServiceNotFound = errors.New("service not found")
|
||||
@ -22,15 +24,15 @@ type PortSpec struct {
|
||||
|
||||
var ServicePorts = map[string][]PortSpec{
|
||||
"debros-gateway": {
|
||||
{Name: "Gateway API", Port: 6001},
|
||||
{Name: "Gateway API", Port: constants.GatewayAPIPort},
|
||||
},
|
||||
"debros-olric": {
|
||||
{Name: "Olric HTTP", Port: 3320},
|
||||
{Name: "Olric Memberlist", Port: 3322},
|
||||
{Name: "Olric HTTP", Port: constants.OlricHTTPPort},
|
||||
{Name: "Olric Memberlist", Port: constants.OlricMemberlistPort},
|
||||
},
|
||||
"debros-node": {
|
||||
{Name: "RQLite HTTP", Port: 5001},
|
||||
{Name: "RQLite Raft", Port: 7001},
|
||||
{Name: "RQLite HTTP", Port: constants.RQLiteHTTPPort},
|
||||
{Name: "RQLite Raft", Port: constants.RQLiteRaftPort},
|
||||
},
|
||||
"debros-ipfs": {
|
||||
{Name: "IPFS API", Port: 4501},
|
||||
@ -48,12 +50,12 @@ func DefaultPorts() []PortSpec {
|
||||
{Name: "IPFS Swarm", Port: 4001},
|
||||
{Name: "IPFS API", Port: 4501},
|
||||
{Name: "IPFS Gateway", Port: 8080},
|
||||
{Name: "Gateway API", Port: 6001},
|
||||
{Name: "RQLite HTTP", Port: 5001},
|
||||
{Name: "RQLite Raft", Port: 7001},
|
||||
{Name: "Gateway API", Port: constants.GatewayAPIPort},
|
||||
{Name: "RQLite HTTP", Port: constants.RQLiteHTTPPort},
|
||||
{Name: "RQLite Raft", Port: constants.RQLiteRaftPort},
|
||||
{Name: "IPFS Cluster API", Port: 9094},
|
||||
{Name: "Olric HTTP", Port: 3320},
|
||||
{Name: "Olric Memberlist", Port: 3322},
|
||||
{Name: "Olric HTTP", Port: constants.OlricHTTPPort},
|
||||
{Name: "Olric Memberlist", Port: constants.OlricMemberlistPort},
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
9
pkg/constants/capacity.go
Normal file
9
pkg/constants/capacity.go
Normal file
@ -0,0 +1,9 @@
|
||||
package constants
|
||||
|
||||
// Node capacity limits used by both deployment and namespace scheduling.
|
||||
const (
|
||||
MaxDeploymentsPerNode = 100
|
||||
MaxMemoryMB = 8192 // 8GB
|
||||
MaxCPUPercent = 400 // 400% = 4 cores
|
||||
MaxPortsPerNode = 9900 // ~10k ports available
|
||||
)
|
||||
11
pkg/constants/ports.go
Normal file
11
pkg/constants/ports.go
Normal file
@ -0,0 +1,11 @@
|
||||
package constants
|
||||
|
||||
// Service ports used across the network.
|
||||
const (
|
||||
WireGuardPort = 51820
|
||||
RQLiteHTTPPort = 5001
|
||||
RQLiteRaftPort = 7001
|
||||
OlricHTTPPort = 3320
|
||||
OlricMemberlistPort = 3322
|
||||
GatewayAPIPort = 6001
|
||||
)
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/constants"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -270,7 +271,7 @@ func (hnm *HomeNodeManager) getNodeCapacity(ctx context.Context, nodeID string)
|
||||
AllocatedPorts: allocatedPorts,
|
||||
AvailablePorts: availablePorts,
|
||||
UsedMemoryMB: totalMemoryMB,
|
||||
AvailableMemoryMB: 8192 - totalMemoryMB, // Assume 8GB per node (make configurable later)
|
||||
AvailableMemoryMB: constants.MaxMemoryMB - totalMemoryMB,
|
||||
UsedCPUPercent: totalCPUPercent,
|
||||
Score: score,
|
||||
}
|
||||
@ -331,12 +332,10 @@ func (hnm *HomeNodeManager) getNodeResourceUsage(ctx context.Context, nodeID str
|
||||
|
||||
// calculateCapacityScore calculates a 0.0-1.0 score (higher is better)
|
||||
func (hnm *HomeNodeManager) calculateCapacityScore(deploymentCount, allocatedPorts, availablePorts, usedMemoryMB, usedCPUPercent int) float64 {
|
||||
const (
|
||||
maxDeployments = 100 // Max deployments per node
|
||||
maxMemoryMB = 8192 // 8GB
|
||||
maxCPUPercent = 400 // 400% = 4 cores
|
||||
maxPorts = 9900 // ~10k ports available
|
||||
)
|
||||
maxDeployments := constants.MaxDeploymentsPerNode
|
||||
maxMemoryMB := constants.MaxMemoryMB
|
||||
maxCPUPercent := constants.MaxCPUPercent
|
||||
maxPorts := constants.MaxPortsPerNode
|
||||
|
||||
// Calculate individual component scores (0.0 to 1.0)
|
||||
deploymentScore := 1.0 - (float64(deploymentCount) / float64(maxDeployments))
|
||||
|
||||
@ -3,6 +3,7 @@ package deployments
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
@ -216,21 +217,6 @@ func isConflictError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
// RQLite returns constraint violation errors as strings containing "UNIQUE constraint failed"
|
||||
errStr := err.Error()
|
||||
return contains(errStr, "UNIQUE") || contains(errStr, "constraint") || contains(errStr, "conflict")
|
||||
}
|
||||
|
||||
// contains checks if a string contains a substring (case-insensitive)
|
||||
func contains(s, substr string) bool {
|
||||
return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr))
|
||||
}
|
||||
|
||||
func findSubstring(s, substr string) bool {
|
||||
for i := 0; i <= len(s)-len(substr); i++ {
|
||||
if s[i:i+len(substr)] == substr {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return strings.Contains(errStr, "UNIQUE") || strings.Contains(errStr, "constraint") || strings.Contains(errStr, "conflict")
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
@ -410,7 +411,7 @@ func TestContains(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
result := contains(tt.s, tt.substr)
|
||||
result := strings.Contains(tt.s, tt.substr)
|
||||
if result != tt.expected {
|
||||
t.Errorf("contains(%q, %q) = %v, expected %v", tt.s, tt.substr, result, tt.expected)
|
||||
}
|
||||
|
||||
@ -429,7 +429,9 @@ func (sg *SecretGenerator) SaveConfig(filename string, content string) error {
|
||||
}
|
||||
|
||||
// Fix ownership
|
||||
exec.Command("chown", "debros:debros", configPath).Run()
|
||||
if err := exec.Command("chown", "debros:debros", configPath).Run(); err != nil {
|
||||
fmt.Printf("Warning: failed to chown %s to debros:debros: %v\n", configPath, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -3,7 +3,6 @@ package production
|
||||
import (
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
|
||||
"gopkg.in/yaml.v3"
|
||||
)
|
||||
@ -15,10 +14,7 @@ type NodePreferences struct {
|
||||
AnyoneClient bool `yaml:"anyone_client"`
|
||||
}
|
||||
|
||||
const (
|
||||
preferencesFile = "preferences.yaml"
|
||||
legacyBranchFile = ".branch"
|
||||
)
|
||||
const preferencesFile = "preferences.yaml"
|
||||
|
||||
// SavePreferences saves node preferences to disk
|
||||
func SavePreferences(oramaDir string, prefs *NodePreferences) error {
|
||||
@ -38,10 +34,6 @@ func SavePreferences(oramaDir string, prefs *NodePreferences) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// Also save branch to legacy .branch file for backward compatibility
|
||||
legacyPath := filepath.Join(oramaDir, legacyBranchFile)
|
||||
os.WriteFile(legacyPath, []byte(prefs.Branch), 0644)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -53,7 +45,7 @@ func LoadPreferences(oramaDir string) *NodePreferences {
|
||||
Nameserver: false,
|
||||
}
|
||||
|
||||
// Try to load from preferences.yaml first
|
||||
// Try to load from preferences.yaml
|
||||
path := filepath.Join(oramaDir, preferencesFile)
|
||||
if data, err := os.ReadFile(path); err == nil {
|
||||
if err := yaml.Unmarshal(data, prefs); err == nil {
|
||||
@ -61,15 +53,6 @@ func LoadPreferences(oramaDir string) *NodePreferences {
|
||||
}
|
||||
}
|
||||
|
||||
// Fall back to legacy .branch file
|
||||
legacyPath := filepath.Join(oramaDir, legacyBranchFile)
|
||||
if data, err := os.ReadFile(legacyPath); err == nil {
|
||||
branch := strings.TrimSpace(string(data))
|
||||
if branch != "" {
|
||||
prefs.Branch = branch
|
||||
}
|
||||
}
|
||||
|
||||
return prefs
|
||||
}
|
||||
|
||||
|
||||
@ -5,6 +5,7 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strings"
|
||||
@ -234,31 +235,15 @@ func isPrivateOrLocalHost(host string) bool {
|
||||
}
|
||||
|
||||
// Check for localhost variants
|
||||
if host == "localhost" || host == "::1" {
|
||||
return true
|
||||
}
|
||||
|
||||
// Check common private ranges (basic check)
|
||||
if strings.HasPrefix(host, "10.") ||
|
||||
strings.HasPrefix(host, "192.168.") ||
|
||||
strings.HasPrefix(host, "172.16.") ||
|
||||
strings.HasPrefix(host, "172.17.") ||
|
||||
strings.HasPrefix(host, "172.18.") ||
|
||||
strings.HasPrefix(host, "172.19.") ||
|
||||
strings.HasPrefix(host, "172.20.") ||
|
||||
strings.HasPrefix(host, "172.21.") ||
|
||||
strings.HasPrefix(host, "172.22.") ||
|
||||
strings.HasPrefix(host, "172.23.") ||
|
||||
strings.HasPrefix(host, "172.24.") ||
|
||||
strings.HasPrefix(host, "172.25.") ||
|
||||
strings.HasPrefix(host, "172.26.") ||
|
||||
strings.HasPrefix(host, "172.27.") ||
|
||||
strings.HasPrefix(host, "172.28.") ||
|
||||
strings.HasPrefix(host, "172.29.") ||
|
||||
strings.HasPrefix(host, "172.30.") ||
|
||||
strings.HasPrefix(host, "172.31.") {
|
||||
if host == "localhost" {
|
||||
return true
|
||||
}
|
||||
|
||||
// Parse as IP and use standard library checks
|
||||
ip := net.ParseIP(host)
|
||||
if ip == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast()
|
||||
}
|
||||
|
||||
@ -1111,35 +1111,6 @@ func (g *Gateway) getDeploymentByDomain(ctx context.Context, domain string) (*de
|
||||
}
|
||||
}
|
||||
|
||||
// Legacy format: {name}.node-{shortID}.{baseDomain} (backwards compatibility)
|
||||
if len(parts) == 2 && strings.HasPrefix(parts[1], "node-") {
|
||||
deploymentName := parts[0]
|
||||
shortNodeID := parts[1] // e.g., "node-kv4la8"
|
||||
|
||||
// Query by name and matching short node ID
|
||||
query := `
|
||||
SELECT id, namespace, name, type, port, content_cid, status, home_node_id
|
||||
FROM deployments
|
||||
WHERE name = ?
|
||||
AND ('node-' || substr(home_node_id, 9, 6) = ? OR home_node_id = ?)
|
||||
AND status = 'active'
|
||||
LIMIT 1
|
||||
`
|
||||
result, err := db.Query(internalCtx, query, deploymentName, shortNodeID, shortNodeID)
|
||||
if err == nil && len(result.Rows) > 0 {
|
||||
row := result.Rows[0]
|
||||
return &deployments.Deployment{
|
||||
ID: getString(row[0]),
|
||||
Namespace: getString(row[1]),
|
||||
Name: getString(row[2]),
|
||||
Type: deployments.DeploymentType(getString(row[3])),
|
||||
Port: getInt(row[4]),
|
||||
ContentCID: getString(row[5]),
|
||||
Status: deployments.DeploymentStatus(getString(row[6])),
|
||||
HomeNodeID: getString(row[7]),
|
||||
}, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Try custom domain from deployment_domains table
|
||||
|
||||
@ -9,6 +9,7 @@ import (
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/wireguard"
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
"github.com/libp2p/go-libp2p/core/peer"
|
||||
"github.com/multiformats/go-multiaddr"
|
||||
@ -337,16 +338,21 @@ func (pd *PeerDiscovery) updateHeartbeat(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// GetWireGuardIP detects the local WireGuard IP address using the wg0 network
|
||||
// interface or the WireGuard config file. It does not require a PeerDiscovery
|
||||
// instance and can be called from anywhere in the gateway package.
|
||||
// interface, the 'ip' command, or the WireGuard config file.
|
||||
// It does not require a PeerDiscovery instance and can be called from anywhere
|
||||
// in the gateway package.
|
||||
func GetWireGuardIP() (string, error) {
|
||||
// Method 1: Use 'ip addr show wg0' command (works without root)
|
||||
ip, err := getWireGuardIPFromCommand()
|
||||
if err == nil {
|
||||
// Method 1: Use net.InterfaceByName (shared implementation)
|
||||
if ip, err := wireguard.GetIP(); err == nil {
|
||||
return ip, nil
|
||||
}
|
||||
|
||||
// Method 2: Try to read from WireGuard config file (requires root, may fail)
|
||||
// Method 2: Use 'ip addr show wg0' command (works without root)
|
||||
if ip, err := getWireGuardIPFromCommand(); err == nil {
|
||||
return ip, nil
|
||||
}
|
||||
|
||||
// Method 3: Try to read from WireGuard config file (requires root, may fail)
|
||||
configPath := "/etc/wireguard/wg0.conf"
|
||||
data, err := os.ReadFile(configPath)
|
||||
if err == nil {
|
||||
@ -359,7 +365,6 @@ func GetWireGuardIP() (string, error) {
|
||||
parts := strings.Split(line, "=")
|
||||
if len(parts) == 2 {
|
||||
addrWithCIDR := strings.TrimSpace(parts[1])
|
||||
// Remove /24 suffix
|
||||
ip := strings.Split(addrWithCIDR, "/")[0]
|
||||
ip = strings.TrimSpace(ip)
|
||||
return ip, nil
|
||||
|
||||
@ -61,7 +61,9 @@ func (cm *ClusterConfigManager) UpdateAllClusterPeers() error {
|
||||
func (cm *ClusterConfigManager) RepairPeerConfiguration() error {
|
||||
cm.logger.Info("Attempting to repair IPFS Cluster peer configuration")
|
||||
|
||||
_ = cm.FixIPFSConfigAddresses()
|
||||
if err := cm.FixIPFSConfigAddresses(); err != nil {
|
||||
cm.logger.Warn("Failed to fix IPFS config addresses during repair", zap.Error(err))
|
||||
}
|
||||
|
||||
peers, err := cm.DiscoverClusterPeersFromGateway()
|
||||
if err != nil {
|
||||
@ -72,7 +74,9 @@ func (cm *ClusterConfigManager) RepairPeerConfiguration() error {
|
||||
peerAddrs = append(peerAddrs, p.Multiaddress)
|
||||
}
|
||||
if len(peerAddrs) > 0 {
|
||||
_ = cm.UpdatePeerAddresses(peerAddrs)
|
||||
if err := cm.UpdatePeerAddresses(peerAddrs); err != nil {
|
||||
cm.logger.Warn("Failed to update peer addresses during repair", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -893,21 +893,35 @@ func (cm *ClusterManager) GetClusterStatus(ctx context.Context, clusterID string
|
||||
ClusterID: cluster.ID,
|
||||
}
|
||||
|
||||
// Check individual service status
|
||||
// TODO: Actually check each service's health
|
||||
if cluster.Status == ClusterStatusReady {
|
||||
status.RQLiteReady = true
|
||||
status.OlricReady = true
|
||||
status.GatewayReady = true
|
||||
status.DNSReady = true
|
||||
}
|
||||
|
||||
// Get node list
|
||||
// Check individual service status by inspecting cluster nodes
|
||||
nodes, err := cm.getClusterNodes(ctx, clusterID)
|
||||
if err == nil {
|
||||
runningCount := 0
|
||||
hasRQLite := false
|
||||
hasOlric := false
|
||||
hasGateway := false
|
||||
|
||||
for _, node := range nodes {
|
||||
status.Nodes = append(status.Nodes, node.NodeID)
|
||||
if node.Status == NodeStatusRunning {
|
||||
runningCount++
|
||||
}
|
||||
if node.RQLiteHTTPPort > 0 {
|
||||
hasRQLite = true
|
||||
}
|
||||
if node.OlricHTTPPort > 0 {
|
||||
hasOlric = true
|
||||
}
|
||||
if node.GatewayHTTPPort > 0 {
|
||||
hasGateway = true
|
||||
}
|
||||
}
|
||||
|
||||
allRunning := len(nodes) > 0 && runningCount == len(nodes)
|
||||
status.RQLiteReady = allRunning && hasRQLite
|
||||
status.OlricReady = allRunning && hasOlric
|
||||
status.GatewayReady = allRunning && hasGateway
|
||||
status.DNSReady = allRunning
|
||||
}
|
||||
|
||||
if cluster.ErrorMessage != "" {
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
"github.com/DeBrosOfficial/network/pkg/constants"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -176,12 +177,10 @@ func (cns *ClusterNodeSelector) getNodeCapacity(ctx context.Context, nodeID, ipA
|
||||
}
|
||||
|
||||
// Calculate available capacity
|
||||
const (
|
||||
maxDeployments = 100
|
||||
maxPorts = 9900 // User deployment port range
|
||||
maxMemoryMB = 8192 // 8GB
|
||||
maxCPUPercent = 400 // 4 cores
|
||||
)
|
||||
maxDeployments := constants.MaxDeploymentsPerNode
|
||||
maxPorts := constants.MaxPortsPerNode
|
||||
maxMemoryMB := constants.MaxMemoryMB
|
||||
maxCPUPercent := constants.MaxCPUPercent
|
||||
|
||||
availablePorts := maxPorts - allocatedPorts
|
||||
if availablePorts < 0 {
|
||||
|
||||
@ -3,6 +3,7 @@ package namespace
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/client"
|
||||
@ -369,19 +370,5 @@ func isConflictError(err error) bool {
|
||||
return false
|
||||
}
|
||||
errStr := err.Error()
|
||||
return contains(errStr, "UNIQUE") || contains(errStr, "constraint") || contains(errStr, "conflict")
|
||||
}
|
||||
|
||||
// contains checks if a string contains a substring (case-insensitive)
|
||||
func contains(s, substr string) bool {
|
||||
return len(s) >= len(substr) && (s == substr || len(s) > len(substr) && findSubstring(s, substr))
|
||||
}
|
||||
|
||||
func findSubstring(s, substr string) bool {
|
||||
for i := 0; i <= len(s)-len(substr); i++ {
|
||||
if s[i:i+len(substr)] == substr {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return strings.Contains(errStr, "UNIQUE") || strings.Contains(errStr, "constraint") || strings.Contains(errStr, "conflict")
|
||||
}
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@ -269,7 +270,7 @@ func TestContains(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.s+"_"+tt.substr, func(t *testing.T) {
|
||||
result := contains(tt.s, tt.substr)
|
||||
result := strings.Contains(tt.s, tt.substr)
|
||||
if result != tt.expected {
|
||||
t.Errorf("contains(%q, %q) = %v, want %v", tt.s, tt.substr, result, tt.expected)
|
||||
}
|
||||
|
||||
@ -1,25 +1,9 @@
|
||||
package namespace
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
import "github.com/DeBrosOfficial/network/pkg/wireguard"
|
||||
|
||||
// getWireGuardIP returns the IPv4 address of the wg0 interface.
|
||||
// Used as a fallback when Olric BindAddr is empty or 0.0.0.0.
|
||||
func getWireGuardIP() (string, error) {
|
||||
iface, err := net.InterfaceByName("wg0")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("wg0 interface not found: %w", err)
|
||||
}
|
||||
addrs, err := iface.Addrs()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get wg0 addresses: %w", err)
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
|
||||
return ipnet.IP.String(), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("no IPv4 address on wg0")
|
||||
return wireguard.GetIP()
|
||||
}
|
||||
|
||||
@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"github.com/DeBrosOfficial/network/pkg/wireguard"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -414,20 +415,7 @@ func (n *Node) isNameserverNode(ctx context.Context) bool {
|
||||
|
||||
// getWireGuardIP returns the IPv4 address assigned to the wg0 interface, if any
|
||||
func (n *Node) getWireGuardIP() (string, error) {
|
||||
iface, err := net.InterfaceByName("wg0")
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
addrs, err := iface.Addrs()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
|
||||
return ipnet.IP.String(), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("no IPv4 address on wg0")
|
||||
return wireguard.GetIP()
|
||||
}
|
||||
|
||||
// getNodeIPAddress attempts to determine the node's external IP address
|
||||
|
||||
@ -47,7 +47,9 @@ func (r *RQLiteManager) waitForMinClusterSizeBeforeStart(ctx context.Context, rq
|
||||
return nil
|
||||
}
|
||||
|
||||
_ = r.discoveryService.TriggerPeerExchange(ctx)
|
||||
if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil {
|
||||
r.logger.Warn("Failed to trigger peer exchange before cluster wait", zap.Error(err))
|
||||
}
|
||||
|
||||
checkInterval := 2 * time.Second
|
||||
for {
|
||||
@ -92,7 +94,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql
|
||||
return fmt.Errorf("discovery service not available")
|
||||
}
|
||||
|
||||
_ = r.discoveryService.TriggerPeerExchange(ctx)
|
||||
if err := r.discoveryService.TriggerPeerExchange(ctx); err != nil {
|
||||
r.logger.Warn("Failed to trigger peer exchange during pre-start discovery", zap.Error(err))
|
||||
}
|
||||
time.Sleep(1 * time.Second)
|
||||
r.discoveryService.TriggerSync()
|
||||
time.Sleep(2 * time.Second)
|
||||
@ -123,7 +127,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql
|
||||
zap.Int("discovered_peers", discoveredPeers),
|
||||
zap.Int("min_cluster_size", r.config.MinClusterSize))
|
||||
// Still write peers.json with just ourselves - better than nothing
|
||||
_ = r.discoveryService.ForceWritePeersJSON()
|
||||
if err := r.discoveryService.ForceWritePeersJSON(); err != nil {
|
||||
r.logger.Warn("Failed to write single-node peers.json fallback", zap.Error(err))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -137,8 +143,12 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql
|
||||
}
|
||||
|
||||
if ourLogIndex == 0 && maxPeerIndex > 0 {
|
||||
_ = r.clearRaftState(rqliteDataDir)
|
||||
_ = r.discoveryService.ForceWritePeersJSON()
|
||||
if err := r.clearRaftState(rqliteDataDir); err != nil {
|
||||
r.logger.Warn("Failed to clear raft state during pre-start discovery", zap.Error(err))
|
||||
}
|
||||
if err := r.discoveryService.ForceWritePeersJSON(); err != nil {
|
||||
r.logger.Warn("Failed to write peers.json after clearing raft state", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -150,7 +160,9 @@ func (r *RQLiteManager) performPreStartClusterDiscovery(ctx context.Context, rql
|
||||
|
||||
// recoverCluster restarts RQLite using peers.json
|
||||
func (r *RQLiteManager) recoverCluster(ctx context.Context, peersJSONPath string) error {
|
||||
_ = r.Stop()
|
||||
if err := r.Stop(); err != nil {
|
||||
r.logger.Warn("Failed to stop RQLite during cluster recovery", zap.Error(err))
|
||||
}
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
rqliteDataDir, err := r.rqliteDataDirPath()
|
||||
@ -187,10 +199,14 @@ func (r *RQLiteManager) recoverFromSplitBrain(ctx context.Context) error {
|
||||
}
|
||||
|
||||
if ourIndex == 0 && maxPeerIndex > 0 {
|
||||
_ = r.clearRaftState(rqliteDataDir)
|
||||
if err := r.clearRaftState(rqliteDataDir); err != nil {
|
||||
r.logger.Warn("Failed to clear raft state during split-brain recovery", zap.Error(err))
|
||||
}
|
||||
r.discoveryService.TriggerPeerExchange(ctx)
|
||||
time.Sleep(1 * time.Second)
|
||||
_ = r.discoveryService.ForceWritePeersJSON()
|
||||
if err := r.discoveryService.ForceWritePeersJSON(); err != nil {
|
||||
r.logger.Warn("Failed to write peers.json during split-brain recovery", zap.Error(err))
|
||||
}
|
||||
return r.recoverCluster(ctx, filepath.Join(rqliteDataDir, "raft", "peers.json"))
|
||||
}
|
||||
|
||||
@ -265,7 +281,9 @@ func (r *RQLiteManager) startHealthMonitoring(ctx context.Context) {
|
||||
return
|
||||
case <-ticker.C:
|
||||
if r.isInSplitBrainState() {
|
||||
_ = r.recoverFromSplitBrain(ctx)
|
||||
if err := r.recoverFromSplitBrain(ctx); err != nil {
|
||||
r.logger.Warn("Split-brain recovery attempt failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
85
pkg/serverless/cache/module_cache.go
vendored
85
pkg/serverless/cache/module_cache.go
vendored
@ -3,14 +3,21 @@ package cache
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/tetratelabs/wazero"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// cacheEntry wraps a compiled module with access tracking for LRU eviction.
|
||||
type cacheEntry struct {
|
||||
module wazero.CompiledModule
|
||||
lastAccessed time.Time
|
||||
}
|
||||
|
||||
// ModuleCache manages compiled WASM module caching.
|
||||
type ModuleCache struct {
|
||||
modules map[string]wazero.CompiledModule
|
||||
modules map[string]*cacheEntry
|
||||
mu sync.RWMutex
|
||||
capacity int
|
||||
logger *zap.Logger
|
||||
@ -19,7 +26,7 @@ type ModuleCache struct {
|
||||
// NewModuleCache creates a new ModuleCache.
|
||||
func NewModuleCache(capacity int, logger *zap.Logger) *ModuleCache {
|
||||
return &ModuleCache{
|
||||
modules: make(map[string]wazero.CompiledModule),
|
||||
modules: make(map[string]*cacheEntry),
|
||||
capacity: capacity,
|
||||
logger: logger,
|
||||
}
|
||||
@ -27,15 +34,20 @@ func NewModuleCache(capacity int, logger *zap.Logger) *ModuleCache {
|
||||
|
||||
// Get retrieves a compiled module from the cache.
|
||||
func (c *ModuleCache) Get(wasmCID string) (wazero.CompiledModule, bool) {
|
||||
c.mu.RLock()
|
||||
defer c.mu.RUnlock()
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
module, exists := c.modules[wasmCID]
|
||||
return module, exists
|
||||
entry, exists := c.modules[wasmCID]
|
||||
if !exists {
|
||||
return nil, false
|
||||
}
|
||||
|
||||
entry.lastAccessed = time.Now()
|
||||
return entry.module, true
|
||||
}
|
||||
|
||||
// Set stores a compiled module in the cache.
|
||||
// If the cache is full, it evicts the oldest module.
|
||||
// If the cache is full, it evicts the least recently used module.
|
||||
func (c *ModuleCache) Set(wasmCID string, module wazero.CompiledModule) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
@ -50,7 +62,10 @@ func (c *ModuleCache) Set(wasmCID string, module wazero.CompiledModule) {
|
||||
c.evictOldest()
|
||||
}
|
||||
|
||||
c.modules[wasmCID] = module
|
||||
c.modules[wasmCID] = &cacheEntry{
|
||||
module: module,
|
||||
lastAccessed: time.Now(),
|
||||
}
|
||||
|
||||
c.logger.Debug("Module cached",
|
||||
zap.String("wasm_cid", wasmCID),
|
||||
@ -63,8 +78,8 @@ func (c *ModuleCache) Delete(ctx context.Context, wasmCID string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
if module, exists := c.modules[wasmCID]; exists {
|
||||
_ = module.Close(ctx)
|
||||
if entry, exists := c.modules[wasmCID]; exists {
|
||||
_ = entry.module.Close(ctx)
|
||||
delete(c.modules, wasmCID)
|
||||
c.logger.Debug("Module removed from cache", zap.String("wasm_cid", wasmCID))
|
||||
}
|
||||
@ -97,8 +112,8 @@ func (c *ModuleCache) Clear(ctx context.Context) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
|
||||
for cid, module := range c.modules {
|
||||
if err := module.Close(ctx); err != nil {
|
||||
for cid, entry := range c.modules {
|
||||
if err := entry.module.Close(ctx); err != nil {
|
||||
c.logger.Warn("Failed to close cached module during clear",
|
||||
zap.String("cid", cid),
|
||||
zap.Error(err),
|
||||
@ -106,7 +121,7 @@ func (c *ModuleCache) Clear(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
|
||||
c.modules = make(map[string]wazero.CompiledModule)
|
||||
c.modules = make(map[string]*cacheEntry)
|
||||
c.logger.Debug("Module cache cleared")
|
||||
}
|
||||
|
||||
@ -118,16 +133,23 @@ func (c *ModuleCache) GetStats() (size int, capacity int) {
|
||||
return len(c.modules), c.capacity
|
||||
}
|
||||
|
||||
// evictOldest removes the oldest module from cache.
|
||||
// evictOldest removes the least recently accessed module from cache.
|
||||
// Must be called with mu held.
|
||||
func (c *ModuleCache) evictOldest() {
|
||||
// Simple LRU: just remove the first one we find
|
||||
// In production, you'd want proper LRU tracking
|
||||
for cid, module := range c.modules {
|
||||
_ = module.Close(context.Background())
|
||||
delete(c.modules, cid)
|
||||
c.logger.Debug("Evicted module from cache", zap.String("wasm_cid", cid))
|
||||
break
|
||||
var oldestCID string
|
||||
var oldestTime time.Time
|
||||
|
||||
for cid, entry := range c.modules {
|
||||
if oldestCID == "" || entry.lastAccessed.Before(oldestTime) {
|
||||
oldestCID = cid
|
||||
oldestTime = entry.lastAccessed
|
||||
}
|
||||
}
|
||||
|
||||
if oldestCID != "" {
|
||||
_ = c.modules[oldestCID].module.Close(context.Background())
|
||||
delete(c.modules, oldestCID)
|
||||
c.logger.Debug("Evicted LRU module from cache", zap.String("wasm_cid", oldestCID))
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,12 +157,13 @@ func (c *ModuleCache) evictOldest() {
|
||||
// The compute function is called with the lock released to avoid blocking.
|
||||
func (c *ModuleCache) GetOrCompute(wasmCID string, compute func() (wazero.CompiledModule, error)) (wazero.CompiledModule, error) {
|
||||
// Try to get from cache first
|
||||
c.mu.RLock()
|
||||
if module, exists := c.modules[wasmCID]; exists {
|
||||
c.mu.RUnlock()
|
||||
return module, nil
|
||||
c.mu.Lock()
|
||||
if entry, exists := c.modules[wasmCID]; exists {
|
||||
entry.lastAccessed = time.Now()
|
||||
c.mu.Unlock()
|
||||
return entry.module, nil
|
||||
}
|
||||
c.mu.RUnlock()
|
||||
c.mu.Unlock()
|
||||
|
||||
// Compute the module (without holding the lock)
|
||||
module, err := compute()
|
||||
@ -153,9 +176,10 @@ func (c *ModuleCache) GetOrCompute(wasmCID string, compute func() (wazero.Compil
|
||||
defer c.mu.Unlock()
|
||||
|
||||
// Double-check (another goroutine might have added it)
|
||||
if existingModule, exists := c.modules[wasmCID]; exists {
|
||||
if entry, exists := c.modules[wasmCID]; exists {
|
||||
_ = module.Close(context.Background()) // Discard our compilation
|
||||
return existingModule, nil
|
||||
entry.lastAccessed = time.Now()
|
||||
return entry.module, nil
|
||||
}
|
||||
|
||||
// Evict if cache is full
|
||||
@ -163,7 +187,10 @@ func (c *ModuleCache) GetOrCompute(wasmCID string, compute func() (wazero.Compil
|
||||
c.evictOldest()
|
||||
}
|
||||
|
||||
c.modules[wasmCID] = module
|
||||
c.modules[wasmCID] = &cacheEntry{
|
||||
module: module,
|
||||
lastAccessed: time.Now(),
|
||||
}
|
||||
|
||||
c.logger.Debug("Module compiled and cached",
|
||||
zap.String("wasm_cid", wasmCID),
|
||||
|
||||
@ -3,6 +3,7 @@ package serverless
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
@ -249,7 +250,7 @@ func (i *Invoker) isRetryable(err error) bool {
|
||||
|
||||
// Retry execution errors (could be transient)
|
||||
var execErr *ExecutionError
|
||||
if ok := errorAs(err, &execErr); ok {
|
||||
if errors.As(err, &execErr) {
|
||||
return true
|
||||
}
|
||||
|
||||
@ -347,22 +348,6 @@ type DLQMessage struct {
|
||||
CallerWallet string `json:"caller_wallet,omitempty"`
|
||||
}
|
||||
|
||||
// errorAs is a helper to avoid import of errors package.
|
||||
func errorAs(err error, target interface{}) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
// Simple type assertion for our custom error types
|
||||
switch t := target.(type) {
|
||||
case **ExecutionError:
|
||||
if e, ok := err.(*ExecutionError); ok {
|
||||
*t = e
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// -----------------------------------------------------------------------------
|
||||
// Batch Invocation (for future use)
|
||||
// -----------------------------------------------------------------------------
|
||||
|
||||
@ -82,12 +82,9 @@ func GetTLSConfig() *tls.Config {
|
||||
MinVersion: tls.VersionTLS12,
|
||||
}
|
||||
|
||||
// If we have a CA cert pool, use it
|
||||
// If we have a CA cert pool, use it for verifying self-signed certs
|
||||
if caCertPool != nil {
|
||||
config.RootCAs = caCertPool
|
||||
} else if len(trustedDomains) > 0 {
|
||||
// Fallback: skip verification if trusted domains are configured but no CA pool
|
||||
config.InsecureSkipVerify = true
|
||||
}
|
||||
|
||||
return config
|
||||
@ -103,11 +100,12 @@ func NewHTTPClient(timeout time.Duration) *http.Client {
|
||||
}
|
||||
}
|
||||
|
||||
// NewHTTPClientForDomain creates an HTTP client configured for a specific domain
|
||||
// NewHTTPClientForDomain creates an HTTP client configured for a specific domain.
|
||||
// Only skips TLS verification for explicitly trusted domains when no CA cert is available.
|
||||
func NewHTTPClientForDomain(timeout time.Duration, hostname string) *http.Client {
|
||||
tlsConfig := GetTLSConfig()
|
||||
|
||||
// If this domain is in trusted list and we don't have a CA pool, allow insecure
|
||||
// Only skip TLS for explicitly trusted domains when no CA pool is configured
|
||||
if caCertPool == nil && ShouldSkipTLSVerify(hostname) {
|
||||
tlsConfig.InsecureSkipVerify = true
|
||||
}
|
||||
|
||||
24
pkg/wireguard/ip.go
Normal file
24
pkg/wireguard/ip.go
Normal file
@ -0,0 +1,24 @@
|
||||
package wireguard
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
)
|
||||
|
||||
// GetIP returns the IPv4 address of the wg0 interface.
|
||||
func GetIP() (string, error) {
|
||||
iface, err := net.InterfaceByName("wg0")
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("wg0 interface not found: %w", err)
|
||||
}
|
||||
addrs, err := iface.Addrs()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to get wg0 addresses: %w", err)
|
||||
}
|
||||
for _, addr := range addrs {
|
||||
if ipnet, ok := addr.(*net.IPNet); ok && ipnet.IP.To4() != nil {
|
||||
return ipnet.IP.String(), nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("no IPv4 address on wg0")
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user