mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 05:13:01 +00:00
Bored of fixing bugs
This commit is contained in:
parent
0c4af88388
commit
f972358e78
@ -77,6 +77,7 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
|||||||
ListenAddr string `yaml:"listen_addr"`
|
ListenAddr string `yaml:"listen_addr"`
|
||||||
ClientNamespace string `yaml:"client_namespace"`
|
ClientNamespace string `yaml:"client_namespace"`
|
||||||
RQLiteDSN string `yaml:"rqlite_dsn"`
|
RQLiteDSN string `yaml:"rqlite_dsn"`
|
||||||
|
GlobalRQLiteDSN string `yaml:"global_rqlite_dsn"`
|
||||||
Peers []string `yaml:"bootstrap_peers"`
|
Peers []string `yaml:"bootstrap_peers"`
|
||||||
EnableHTTPS bool `yaml:"enable_https"`
|
EnableHTTPS bool `yaml:"enable_https"`
|
||||||
DomainName string `yaml:"domain_name"`
|
DomainName string `yaml:"domain_name"`
|
||||||
@ -113,6 +114,7 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
|||||||
ClientNamespace: "default",
|
ClientNamespace: "default",
|
||||||
BootstrapPeers: nil,
|
BootstrapPeers: nil,
|
||||||
RQLiteDSN: "",
|
RQLiteDSN: "",
|
||||||
|
GlobalRQLiteDSN: "",
|
||||||
EnableHTTPS: false,
|
EnableHTTPS: false,
|
||||||
DomainName: "",
|
DomainName: "",
|
||||||
TLSCacheDir: "",
|
TLSCacheDir: "",
|
||||||
@ -133,6 +135,9 @@ func parseGatewayConfig(logger *logging.ColoredLogger) *gateway.Config {
|
|||||||
if v := strings.TrimSpace(y.RQLiteDSN); v != "" {
|
if v := strings.TrimSpace(y.RQLiteDSN); v != "" {
|
||||||
cfg.RQLiteDSN = v
|
cfg.RQLiteDSN = v
|
||||||
}
|
}
|
||||||
|
if v := strings.TrimSpace(y.GlobalRQLiteDSN); v != "" {
|
||||||
|
cfg.GlobalRQLiteDSN = v
|
||||||
|
}
|
||||||
if len(y.Peers) > 0 {
|
if len(y.Peers) > 0 {
|
||||||
var peers []string
|
var peers []string
|
||||||
for _, p := range y.Peers {
|
for _, p := range y.Peers {
|
||||||
|
|||||||
@ -150,10 +150,11 @@ func IsServiceMasked(service string) (bool, error) {
|
|||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetProductionServices returns a list of all DeBros production service names that exist
|
// GetProductionServices returns a list of all DeBros production service names that exist,
|
||||||
|
// including both global services and namespace-specific services
|
||||||
func GetProductionServices() []string {
|
func GetProductionServices() []string {
|
||||||
// Unified service names (no bootstrap/node distinction)
|
// Global/default service names
|
||||||
allServices := []string{
|
globalServices := []string{
|
||||||
"debros-gateway",
|
"debros-gateway",
|
||||||
"debros-node",
|
"debros-node",
|
||||||
"debros-olric",
|
"debros-olric",
|
||||||
@ -163,15 +164,34 @@ func GetProductionServices() []string {
|
|||||||
"debros-anyone-relay",
|
"debros-anyone-relay",
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter to only existing services by checking if unit file exists
|
|
||||||
var existing []string
|
var existing []string
|
||||||
for _, svc := range allServices {
|
|
||||||
|
// Add existing global services
|
||||||
|
for _, svc := range globalServices {
|
||||||
unitPath := filepath.Join("/etc/systemd/system", svc+".service")
|
unitPath := filepath.Join("/etc/systemd/system", svc+".service")
|
||||||
if _, err := os.Stat(unitPath); err == nil {
|
if _, err := os.Stat(unitPath); err == nil {
|
||||||
existing = append(existing, svc)
|
existing = append(existing, svc)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Also discover namespace-specific services (debros-*@<namespace>.service)
|
||||||
|
// These are created when namespaces are provisioned and need to be restarted too
|
||||||
|
systemdDir := "/etc/systemd/system"
|
||||||
|
entries, err := os.ReadDir(systemdDir)
|
||||||
|
if err == nil {
|
||||||
|
for _, entry := range entries {
|
||||||
|
name := entry.Name()
|
||||||
|
// Look for debros-*@*.service pattern (namespace services)
|
||||||
|
if strings.HasPrefix(name, "debros-") &&
|
||||||
|
strings.Contains(name, "@") &&
|
||||||
|
strings.HasSuffix(name, ".service") {
|
||||||
|
// Extract service name without .service extension
|
||||||
|
serviceName := strings.TrimSuffix(name, ".service")
|
||||||
|
existing = append(existing, serviceName)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return existing
|
return existing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -13,6 +13,10 @@ type Config struct {
|
|||||||
// If empty, defaults to "http://localhost:4001".
|
// If empty, defaults to "http://localhost:4001".
|
||||||
RQLiteDSN string
|
RQLiteDSN string
|
||||||
|
|
||||||
|
// Global RQLite DSN for API key validation (for namespace gateways)
|
||||||
|
// If empty, uses RQLiteDSN (for main/global gateways)
|
||||||
|
GlobalRQLiteDSN string
|
||||||
|
|
||||||
// HTTPS configuration
|
// HTTPS configuration
|
||||||
EnableHTTPS bool // Enable HTTPS with ACME (Let's Encrypt)
|
EnableHTTPS bool // Enable HTTPS with ACME (Let's Encrypt)
|
||||||
DomainName string // Domain name for HTTPS certificate
|
DomainName string // Domain name for HTTPS certificate
|
||||||
|
|||||||
@ -52,6 +52,9 @@ type Gateway struct {
|
|||||||
ormClient rqlite.Client
|
ormClient rqlite.Client
|
||||||
ormHTTP *rqlite.HTTPGateway
|
ormHTTP *rqlite.HTTPGateway
|
||||||
|
|
||||||
|
// Global RQLite client for API key validation (namespace gateways only)
|
||||||
|
authClient client.NetworkClient
|
||||||
|
|
||||||
// Olric cache client
|
// Olric cache client
|
||||||
olricClient *olric.Client
|
olricClient *olric.Client
|
||||||
olricMu sync.RWMutex
|
olricMu sync.RWMutex
|
||||||
@ -237,6 +240,33 @@ func New(logger *logging.ColoredLogger, cfg *Config) (*Gateway, error) {
|
|||||||
presenceMembers: make(map[string][]PresenceMember),
|
presenceMembers: make(map[string][]PresenceMember),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Create separate auth client for global RQLite if GlobalRQLiteDSN is provided
|
||||||
|
// This allows namespace gateways to validate API keys against the global database
|
||||||
|
if cfg.GlobalRQLiteDSN != "" && cfg.GlobalRQLiteDSN != cfg.RQLiteDSN {
|
||||||
|
logger.ComponentInfo(logging.ComponentGeneral, "Creating global auth client...",
|
||||||
|
zap.String("global_dsn", cfg.GlobalRQLiteDSN),
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create client config for global namespace
|
||||||
|
authCfg := client.DefaultClientConfig("default") // Use "default" namespace for global
|
||||||
|
authCfg.DatabaseEndpoints = []string{cfg.GlobalRQLiteDSN}
|
||||||
|
if len(cfg.BootstrapPeers) > 0 {
|
||||||
|
authCfg.BootstrapPeers = cfg.BootstrapPeers
|
||||||
|
}
|
||||||
|
|
||||||
|
authClient, err := client.NewClient(authCfg)
|
||||||
|
if err != nil {
|
||||||
|
logger.ComponentWarn(logging.ComponentGeneral, "Failed to create global auth client", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
if err := authClient.Connect(); err != nil {
|
||||||
|
logger.ComponentWarn(logging.ComponentGeneral, "Failed to connect global auth client", zap.Error(err))
|
||||||
|
} else {
|
||||||
|
gw.authClient = authClient
|
||||||
|
logger.ComponentInfo(logging.ComponentGeneral, "Global auth client connected")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Initialize handler instances
|
// Initialize handler instances
|
||||||
gw.pubsubHandlers = pubsubhandlers.NewPubSubHandlers(deps.Client, logger)
|
gw.pubsubHandlers = pubsubhandlers.NewPubSubHandlers(deps.Client, logger)
|
||||||
|
|
||||||
|
|||||||
@ -40,7 +40,12 @@ func (h *Handlers) DownloadHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
ctx := r.Context()
|
ctx := r.Context()
|
||||||
|
|
||||||
|
h.logger.ComponentDebug(logging.ComponentGeneral, "Starting CID retrieval",
|
||||||
|
zap.String("cid", path),
|
||||||
|
zap.String("namespace", namespace))
|
||||||
|
|
||||||
// Check if namespace owns this CID (namespace isolation)
|
// Check if namespace owns this CID (namespace isolation)
|
||||||
|
h.logger.ComponentDebug(logging.ComponentGeneral, "Checking CID ownership", zap.String("cid", path))
|
||||||
hasAccess, err := h.checkCIDOwnership(ctx, path, namespace)
|
hasAccess, err := h.checkCIDOwnership(ctx, path, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.ComponentError(logging.ComponentGeneral, "failed to check CID ownership",
|
h.logger.ComponentError(logging.ComponentGeneral, "failed to check CID ownership",
|
||||||
@ -55,12 +60,18 @@ func (h *Handlers) DownloadHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.logger.ComponentDebug(logging.ComponentGeneral, "CID ownership check passed", zap.String("cid", path))
|
||||||
|
|
||||||
// Get IPFS API URL from config
|
// Get IPFS API URL from config
|
||||||
ipfsAPIURL := h.config.IPFSAPIURL
|
ipfsAPIURL := h.config.IPFSAPIURL
|
||||||
if ipfsAPIURL == "" {
|
if ipfsAPIURL == "" {
|
||||||
ipfsAPIURL = "http://localhost:5001"
|
ipfsAPIURL = "http://localhost:5001"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.logger.ComponentDebug(logging.ComponentGeneral, "Fetching content from IPFS",
|
||||||
|
zap.String("cid", path),
|
||||||
|
zap.String("ipfs_api_url", ipfsAPIURL))
|
||||||
|
|
||||||
reader, err := h.ipfsClient.Get(ctx, path, ipfsAPIURL)
|
reader, err := h.ipfsClient.Get(ctx, path, ipfsAPIURL)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
h.logger.ComponentError(logging.ComponentGeneral, "failed to get content from IPFS",
|
h.logger.ComponentError(logging.ComponentGeneral, "failed to get content from IPFS",
|
||||||
@ -77,6 +88,9 @@ func (h *Handlers) DownloadHandler(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
defer reader.Close()
|
defer reader.Close()
|
||||||
|
|
||||||
|
h.logger.ComponentDebug(logging.ComponentGeneral, "Successfully retrieved content from IPFS, starting stream",
|
||||||
|
zap.String("cid", path))
|
||||||
|
|
||||||
// Set headers for file download
|
// Set headers for file download
|
||||||
w.Header().Set("Content-Type", "application/octet-stream")
|
w.Header().Set("Content-Type", "application/octet-stream")
|
||||||
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", path))
|
w.Header().Set("Content-Disposition", fmt.Sprintf("attachment; filename=%s", path))
|
||||||
|
|||||||
@ -3,11 +3,13 @@ package storage
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"io"
|
"io"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys"
|
"github.com/DeBrosOfficial/network/pkg/gateway/ctxkeys"
|
||||||
"github.com/DeBrosOfficial/network/pkg/ipfs"
|
"github.com/DeBrosOfficial/network/pkg/ipfs"
|
||||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||||
|
"go.uber.org/zap"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IPFSClient defines the interface for interacting with IPFS.
|
// IPFSClient defines the interface for interacting with IPFS.
|
||||||
@ -82,13 +84,28 @@ func (h *Handlers) checkCIDOwnership(ctx context.Context, cid, namespace string)
|
|||||||
return true, nil // Allow access in test mode
|
return true, nil // Allow access in test mode
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add 5-second timeout to prevent hanging on slow RQLite queries
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
h.logger.ComponentDebug(logging.ComponentGeneral, "Querying RQLite for CID ownership",
|
||||||
|
zap.String("cid", cid),
|
||||||
|
zap.String("namespace", namespace))
|
||||||
|
|
||||||
query := `SELECT COUNT(*) as count FROM ipfs_content_ownership WHERE cid = ? AND namespace = ?`
|
query := `SELECT COUNT(*) as count FROM ipfs_content_ownership WHERE cid = ? AND namespace = ?`
|
||||||
|
|
||||||
var result []map[string]interface{}
|
var result []map[string]interface{}
|
||||||
if err := h.db.Query(ctx, &result, query, cid, namespace); err != nil {
|
if err := h.db.Query(ctx, &result, query, cid, namespace); err != nil {
|
||||||
|
h.logger.ComponentError(logging.ComponentGeneral, "RQLite ownership query failed",
|
||||||
|
zap.Error(err),
|
||||||
|
zap.String("cid", cid))
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
h.logger.ComponentDebug(logging.ComponentGeneral, "RQLite ownership query completed",
|
||||||
|
zap.String("cid", cid),
|
||||||
|
zap.Int("result_count", len(result)))
|
||||||
|
|
||||||
if len(result) == 0 {
|
if len(result) == 0 {
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -77,7 +77,9 @@ type InstanceConfig struct {
|
|||||||
HTTPPort int // HTTP API port
|
HTTPPort int // HTTP API port
|
||||||
BaseDomain string // Base domain (e.g., "orama-devnet.network")
|
BaseDomain string // Base domain (e.g., "orama-devnet.network")
|
||||||
RQLiteDSN string // RQLite connection DSN (e.g., "http://localhost:10000")
|
RQLiteDSN string // RQLite connection DSN (e.g., "http://localhost:10000")
|
||||||
|
GlobalRQLiteDSN string // Global RQLite DSN for API key validation (empty = use RQLiteDSN)
|
||||||
OlricServers []string // Olric server addresses
|
OlricServers []string // Olric server addresses
|
||||||
|
OlricTimeout time.Duration // Timeout for Olric operations
|
||||||
NodePeerID string // Physical node's peer ID for home node management
|
NodePeerID string // Physical node's peer ID for home node management
|
||||||
DataDir string // Data directory for deployments, SQLite, etc.
|
DataDir string // Data directory for deployments, SQLite, etc.
|
||||||
// IPFS configuration for storage endpoints
|
// IPFS configuration for storage endpoints
|
||||||
@ -94,6 +96,7 @@ type GatewayYAMLConfig struct {
|
|||||||
ListenAddr string `yaml:"listen_addr"`
|
ListenAddr string `yaml:"listen_addr"`
|
||||||
ClientNamespace string `yaml:"client_namespace"`
|
ClientNamespace string `yaml:"client_namespace"`
|
||||||
RQLiteDSN string `yaml:"rqlite_dsn"`
|
RQLiteDSN string `yaml:"rqlite_dsn"`
|
||||||
|
GlobalRQLiteDSN string `yaml:"global_rqlite_dsn,omitempty"`
|
||||||
BootstrapPeers []string `yaml:"bootstrap_peers,omitempty"`
|
BootstrapPeers []string `yaml:"bootstrap_peers,omitempty"`
|
||||||
EnableHTTPS bool `yaml:"enable_https,omitempty"`
|
EnableHTTPS bool `yaml:"enable_https,omitempty"`
|
||||||
DomainName string `yaml:"domain_name,omitempty"`
|
DomainName string `yaml:"domain_name,omitempty"`
|
||||||
@ -277,6 +280,7 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig,
|
|||||||
ListenAddr: fmt.Sprintf(":%d", cfg.HTTPPort),
|
ListenAddr: fmt.Sprintf(":%d", cfg.HTTPPort),
|
||||||
ClientNamespace: cfg.Namespace,
|
ClientNamespace: cfg.Namespace,
|
||||||
RQLiteDSN: cfg.RQLiteDSN,
|
RQLiteDSN: cfg.RQLiteDSN,
|
||||||
|
GlobalRQLiteDSN: cfg.GlobalRQLiteDSN,
|
||||||
OlricServers: cfg.OlricServers,
|
OlricServers: cfg.OlricServers,
|
||||||
// Note: DomainName is used for HTTPS/TLS, not needed for namespace gateways in dev mode
|
// Note: DomainName is used for HTTPS/TLS, not needed for namespace gateways in dev mode
|
||||||
DomainName: cfg.BaseDomain,
|
DomainName: cfg.BaseDomain,
|
||||||
@ -285,6 +289,10 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig,
|
|||||||
IPFSAPIURL: cfg.IPFSAPIURL,
|
IPFSAPIURL: cfg.IPFSAPIURL,
|
||||||
IPFSReplicationFactor: cfg.IPFSReplicationFactor,
|
IPFSReplicationFactor: cfg.IPFSReplicationFactor,
|
||||||
}
|
}
|
||||||
|
// Set Olric timeout if provided
|
||||||
|
if cfg.OlricTimeout > 0 {
|
||||||
|
gatewayCfg.OlricTimeout = cfg.OlricTimeout.String()
|
||||||
|
}
|
||||||
// Set IPFS timeout if provided
|
// Set IPFS timeout if provided
|
||||||
if cfg.IPFSTimeout > 0 {
|
if cfg.IPFSTimeout > 0 {
|
||||||
gatewayCfg.IPFSTimeout = cfg.IPFSTimeout.String()
|
gatewayCfg.IPFSTimeout = cfg.IPFSTimeout.String()
|
||||||
|
|||||||
@ -277,7 +277,13 @@ func (g *Gateway) authMiddleware(next http.Handler) http.Handler {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Look up API key in DB and derive namespace
|
// Look up API key in DB and derive namespace
|
||||||
db := g.client.Database()
|
// Use authClient for namespace gateways (validates against global RQLite)
|
||||||
|
// Otherwise use regular client for global gateways
|
||||||
|
authClient := g.client
|
||||||
|
if g.authClient != nil {
|
||||||
|
authClient = g.authClient
|
||||||
|
}
|
||||||
|
db := authClient.Database()
|
||||||
// Use internal auth for DB validation (auth not established yet)
|
// Use internal auth for DB validation (auth not established yet)
|
||||||
internalCtx := client.WithInternalAuth(r.Context())
|
internalCtx := client.WithInternalAuth(r.Context())
|
||||||
// Join to namespaces to resolve name in one query
|
// Join to namespaces to resolve name in one query
|
||||||
|
|||||||
@ -24,8 +24,9 @@ import (
|
|||||||
|
|
||||||
// ClusterManagerConfig contains configuration for the cluster manager
|
// ClusterManagerConfig contains configuration for the cluster manager
|
||||||
type ClusterManagerConfig struct {
|
type ClusterManagerConfig struct {
|
||||||
BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network")
|
BaseDomain string // Base domain for namespace gateways (e.g., "orama-devnet.network")
|
||||||
BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces")
|
BaseDataDir string // Base directory for namespace data (e.g., "~/.orama/data/namespaces")
|
||||||
|
GlobalRQLiteDSN string // Global RQLite DSN for API key validation (e.g., "http://localhost:4001")
|
||||||
// IPFS configuration for namespace gateways (defaults used if not set)
|
// IPFS configuration for namespace gateways (defaults used if not set)
|
||||||
IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094")
|
IPFSClusterAPIURL string // IPFS Cluster API URL (default: "http://localhost:9094")
|
||||||
IPFSAPIURL string // IPFS API URL (default: "http://localhost:5001")
|
IPFSAPIURL string // IPFS API URL (default: "http://localhost:5001")
|
||||||
@ -41,9 +42,10 @@ type ClusterManager struct {
|
|||||||
rqliteSpawner *rqlite.InstanceSpawner
|
rqliteSpawner *rqlite.InstanceSpawner
|
||||||
olricSpawner *olric.InstanceSpawner
|
olricSpawner *olric.InstanceSpawner
|
||||||
gatewaySpawner *gateway.InstanceSpawner
|
gatewaySpawner *gateway.InstanceSpawner
|
||||||
logger *zap.Logger
|
logger *zap.Logger
|
||||||
baseDomain string
|
baseDomain string
|
||||||
baseDataDir string
|
baseDataDir string
|
||||||
|
globalRQLiteDSN string // Global RQLite DSN for namespace gateway auth
|
||||||
|
|
||||||
// IPFS configuration for namespace gateways
|
// IPFS configuration for namespace gateways
|
||||||
ipfsClusterAPIURL string
|
ipfsClusterAPIURL string
|
||||||
@ -99,6 +101,7 @@ func NewClusterManager(
|
|||||||
gatewaySpawner: gatewaySpawner,
|
gatewaySpawner: gatewaySpawner,
|
||||||
baseDomain: cfg.BaseDomain,
|
baseDomain: cfg.BaseDomain,
|
||||||
baseDataDir: cfg.BaseDataDir,
|
baseDataDir: cfg.BaseDataDir,
|
||||||
|
globalRQLiteDSN: cfg.GlobalRQLiteDSN,
|
||||||
ipfsClusterAPIURL: ipfsClusterAPIURL,
|
ipfsClusterAPIURL: ipfsClusterAPIURL,
|
||||||
ipfsAPIURL: ipfsAPIURL,
|
ipfsAPIURL: ipfsAPIURL,
|
||||||
ipfsTimeout: ipfsTimeout,
|
ipfsTimeout: ipfsTimeout,
|
||||||
@ -146,6 +149,7 @@ func NewClusterManagerWithComponents(
|
|||||||
gatewaySpawner: gatewaySpawner,
|
gatewaySpawner: gatewaySpawner,
|
||||||
baseDomain: cfg.BaseDomain,
|
baseDomain: cfg.BaseDomain,
|
||||||
baseDataDir: cfg.BaseDataDir,
|
baseDataDir: cfg.BaseDataDir,
|
||||||
|
globalRQLiteDSN: cfg.GlobalRQLiteDSN,
|
||||||
ipfsClusterAPIURL: ipfsClusterAPIURL,
|
ipfsClusterAPIURL: ipfsClusterAPIURL,
|
||||||
ipfsAPIURL: ipfsAPIURL,
|
ipfsAPIURL: ipfsAPIURL,
|
||||||
ipfsTimeout: ipfsTimeout,
|
ipfsTimeout: ipfsTimeout,
|
||||||
@ -479,7 +483,9 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name
|
|||||||
HTTPPort: portBlocks[i].GatewayHTTPPort,
|
HTTPPort: portBlocks[i].GatewayHTTPPort,
|
||||||
BaseDomain: cm.baseDomain,
|
BaseDomain: cm.baseDomain,
|
||||||
RQLiteDSN: rqliteDSN,
|
RQLiteDSN: rqliteDSN,
|
||||||
|
GlobalRQLiteDSN: cm.globalRQLiteDSN,
|
||||||
OlricServers: olricServers,
|
OlricServers: olricServers,
|
||||||
|
OlricTimeout: 30 * time.Second,
|
||||||
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
||||||
IPFSAPIURL: cm.ipfsAPIURL,
|
IPFSAPIURL: cm.ipfsAPIURL,
|
||||||
IPFSTimeout: cm.ipfsTimeout,
|
IPFSTimeout: cm.ipfsTimeout,
|
||||||
@ -683,35 +689,73 @@ func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, n
|
|||||||
}
|
}
|
||||||
|
|
||||||
// createDNSRecords creates DNS records for the namespace gateway.
|
// createDNSRecords creates DNS records for the namespace gateway.
|
||||||
// All namespace nodes get DNS A records since all nodes now run Caddy
|
// Creates A records for ALL nameservers (not just cluster nodes) so that any nameserver
|
||||||
// and can serve TLS for ns-{namespace}.{baseDomain} subdomains.
|
// can receive requests and proxy them to the namespace cluster via internal routing.
|
||||||
func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error {
|
func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error {
|
||||||
fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain)
|
fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain)
|
||||||
|
|
||||||
|
// Query for ALL nameserver IPs (not just the selected cluster nodes)
|
||||||
|
// This ensures DNS round-robins across all nameservers, even those not hosting the cluster
|
||||||
|
type nameserverIP struct {
|
||||||
|
IPAddress string `db:"ip_address"`
|
||||||
|
}
|
||||||
|
var nameservers []nameserverIP
|
||||||
|
nameserverQuery := `
|
||||||
|
SELECT DISTINCT ip_address
|
||||||
|
FROM dns_nameservers
|
||||||
|
WHERE domain = ?
|
||||||
|
ORDER BY hostname
|
||||||
|
`
|
||||||
|
err := cm.db.Query(ctx, &nameservers, nameserverQuery, cm.baseDomain)
|
||||||
|
if err != nil {
|
||||||
|
cm.logger.Error("Failed to query nameservers for DNS records",
|
||||||
|
zap.String("domain", cm.baseDomain),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
return fmt.Errorf("failed to query nameservers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var nameserverIPs []string
|
||||||
|
for _, ns := range nameservers {
|
||||||
|
if ns.IPAddress != "" {
|
||||||
|
nameserverIPs = append(nameserverIPs, ns.IPAddress)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fallback: if no nameservers found in dns_nameservers table, use cluster node IPs
|
||||||
|
// This maintains backwards compatibility with clusters created before nameserver tracking
|
||||||
|
if len(nameserverIPs) == 0 {
|
||||||
|
cm.logger.Warn("No nameservers found in dns_nameservers table, falling back to cluster node IPs",
|
||||||
|
zap.String("domain", cm.baseDomain),
|
||||||
|
)
|
||||||
|
for _, node := range nodes {
|
||||||
|
nameserverIPs = append(nameserverIPs, node.IPAddress)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
recordCount := 0
|
recordCount := 0
|
||||||
for i, node := range nodes {
|
for _, ip := range nameserverIPs {
|
||||||
query := `
|
query := `
|
||||||
INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by)
|
INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by)
|
||||||
VALUES (?, 'A', ?, 300, ?, 'system')
|
VALUES (?, 'A', ?, 300, ?, 'system')
|
||||||
`
|
`
|
||||||
_, err := cm.db.Exec(ctx, query, fqdn, node.IPAddress, cluster.NamespaceName)
|
_, err := cm.db.Exec(ctx, query, fqdn, ip, cluster.NamespaceName)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cm.logger.Warn("Failed to create DNS record",
|
cm.logger.Warn("Failed to create DNS record",
|
||||||
zap.String("fqdn", fqdn),
|
zap.String("fqdn", fqdn),
|
||||||
zap.String("ip", node.IPAddress),
|
zap.String("ip", ip),
|
||||||
zap.Error(err),
|
zap.Error(err),
|
||||||
)
|
)
|
||||||
} else {
|
} else {
|
||||||
cm.logger.Info("Created DNS A record",
|
cm.logger.Info("Created DNS A record for nameserver",
|
||||||
zap.String("fqdn", fqdn),
|
zap.String("fqdn", fqdn),
|
||||||
zap.String("ip", node.IPAddress),
|
zap.String("ip", ip),
|
||||||
zap.Int("gateway_port", portBlocks[i].GatewayHTTPPort),
|
|
||||||
)
|
)
|
||||||
recordCount++
|
recordCount++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d records)", fqdn, recordCount), nil)
|
cm.logEvent(ctx, cluster.ID, EventDNSCreated, "", fmt.Sprintf("DNS records created for %s (%d nameserver records)", fqdn, recordCount), nil)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1393,7 +1437,9 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
|
|||||||
HTTPPort: pb.GatewayHTTPPort,
|
HTTPPort: pb.GatewayHTTPPort,
|
||||||
BaseDomain: cm.baseDomain,
|
BaseDomain: cm.baseDomain,
|
||||||
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
||||||
|
GlobalRQLiteDSN: cm.globalRQLiteDSN,
|
||||||
OlricServers: olricServers,
|
OlricServers: olricServers,
|
||||||
|
OlricTimeout: 30 * time.Second,
|
||||||
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
||||||
IPFSAPIURL: cm.ipfsAPIURL,
|
IPFSAPIURL: cm.ipfsAPIURL,
|
||||||
IPFSTimeout: cm.ipfsTimeout,
|
IPFSTimeout: cm.ipfsTimeout,
|
||||||
@ -1648,7 +1694,9 @@ func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *Cl
|
|||||||
HTTPPort: pb.GatewayHTTPPort,
|
HTTPPort: pb.GatewayHTTPPort,
|
||||||
BaseDomain: state.BaseDomain,
|
BaseDomain: state.BaseDomain,
|
||||||
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
||||||
|
GlobalRQLiteDSN: cm.globalRQLiteDSN,
|
||||||
OlricServers: olricServers,
|
OlricServers: olricServers,
|
||||||
|
OlricTimeout: 30 * time.Second,
|
||||||
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
IPFSClusterAPIURL: cm.ipfsClusterAPIURL,
|
||||||
IPFSAPIURL: cm.ipfsAPIURL,
|
IPFSAPIURL: cm.ipfsAPIURL,
|
||||||
IPFSTimeout: cm.ipfsTimeout,
|
IPFSTimeout: cm.ipfsTimeout,
|
||||||
|
|||||||
@ -72,8 +72,9 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
|
|||||||
if ormClient := apiGateway.GetORMClient(); ormClient != nil {
|
if ormClient := apiGateway.GetORMClient(); ormClient != nil {
|
||||||
baseDataDir := filepath.Join(os.ExpandEnv(n.config.Node.DataDir), "..", "data", "namespaces")
|
baseDataDir := filepath.Join(os.ExpandEnv(n.config.Node.DataDir), "..", "data", "namespaces")
|
||||||
clusterCfg := namespace.ClusterManagerConfig{
|
clusterCfg := namespace.ClusterManagerConfig{
|
||||||
BaseDomain: n.config.HTTPGateway.BaseDomain,
|
BaseDomain: n.config.HTTPGateway.BaseDomain,
|
||||||
BaseDataDir: baseDataDir,
|
BaseDataDir: baseDataDir,
|
||||||
|
GlobalRQLiteDSN: gwCfg.RQLiteDSN, // Pass global RQLite DSN for namespace gateway auth
|
||||||
}
|
}
|
||||||
clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger)
|
clusterManager := namespace.NewClusterManager(ormClient, clusterCfg, n.logger.Logger)
|
||||||
clusterManager.SetLocalNodeID(gwCfg.NodePeerID)
|
clusterManager.SetLocalNodeID(gwCfg.NodePeerID)
|
||||||
|
|||||||
@ -86,8 +86,9 @@ type InstanceConfig struct {
|
|||||||
|
|
||||||
// OlricConfig represents the Olric YAML configuration structure
|
// OlricConfig represents the Olric YAML configuration structure
|
||||||
type OlricConfig struct {
|
type OlricConfig struct {
|
||||||
Server OlricServerConfig `yaml:"server"`
|
Server OlricServerConfig `yaml:"server"`
|
||||||
Memberlist OlricMemberlistConfig `yaml:"memberlist"`
|
Memberlist OlricMemberlistConfig `yaml:"memberlist"`
|
||||||
|
PartitionCount uint64 `yaml:"partitionCount"` // Number of partitions (default: 256, we use 12 for namespace isolation)
|
||||||
}
|
}
|
||||||
|
|
||||||
// OlricServerConfig represents the server section of Olric config
|
// OlricServerConfig represents the server section of Olric config
|
||||||
@ -269,6 +270,10 @@ func (is *InstanceSpawner) generateConfig(configPath string, cfg InstanceConfig)
|
|||||||
BindPort: cfg.MemberlistPort,
|
BindPort: cfg.MemberlistPort,
|
||||||
Peers: cfg.PeerAddresses,
|
Peers: cfg.PeerAddresses,
|
||||||
},
|
},
|
||||||
|
// Use 12 partitions for namespace Olric instances (vs 256 default)
|
||||||
|
// This gives perfect distribution for 2-6 nodes and 20x faster scans
|
||||||
|
// 12 partitions × 2 (primary+replica) = 24 network calls (~0.6s vs 12s)
|
||||||
|
PartitionCount: 12,
|
||||||
}
|
}
|
||||||
|
|
||||||
data, err := yaml.Marshal(olricCfg)
|
data, err := yaml.Marshal(olricCfg)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user