mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 06:43:01 +00:00
Bug fixing
This commit is contained in:
parent
79a489d650
commit
859c30fcd9
@ -534,24 +534,25 @@ func (g *Gateway) domainRoutingMiddleware(next http.Handler) http.Handler {
|
||||
return
|
||||
}
|
||||
|
||||
// Skip API paths (they should use JWT/API key auth)
|
||||
if strings.HasPrefix(r.URL.Path, "/v1/") || strings.HasPrefix(r.URL.Path, "/.well-known/") {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Check for namespace gateway domain: ns-{namespace}.{baseDomain}
|
||||
// Check for namespace gateway domain FIRST (before API path skip)
|
||||
// Namespace subdomains (ns-{name}.{baseDomain}) must be proxied to namespace gateways
|
||||
// regardless of path — including /v1/ paths
|
||||
suffix := "." + baseDomain
|
||||
if strings.HasSuffix(host, suffix) {
|
||||
subdomain := strings.TrimSuffix(host, suffix)
|
||||
if strings.HasPrefix(subdomain, "ns-") {
|
||||
// This is a namespace gateway request
|
||||
namespaceName := strings.TrimPrefix(subdomain, "ns-")
|
||||
g.handleNamespaceGatewayRequest(w, r, namespaceName)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Skip API paths (they should use JWT/API key auth on the main gateway)
|
||||
if strings.HasPrefix(r.URL.Path, "/v1/") || strings.HasPrefix(r.URL.Path, "/.well-known/") {
|
||||
next.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Check if deployment handlers are available
|
||||
if g.deploymentService == nil || g.staticHandler == nil {
|
||||
next.ServeHTTP(w, r)
|
||||
|
||||
@ -6,7 +6,9 @@ import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
@ -575,7 +577,7 @@ func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, n
|
||||
// createDNSRecords creates DNS records for the namespace gateway
|
||||
func (cm *ClusterManager) createDNSRecords(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock) error {
|
||||
// Create A records for ns-{namespace}.{baseDomain} pointing to all 3 nodes
|
||||
fqdn := fmt.Sprintf("ns-%s.%s", cluster.NamespaceName, cm.baseDomain)
|
||||
fqdn := fmt.Sprintf("ns-%s.%s.", cluster.NamespaceName, cm.baseDomain)
|
||||
|
||||
for i, node := range nodes {
|
||||
query := `
|
||||
@ -1023,6 +1025,283 @@ func (cm *ClusterManager) provisionClusterAsync(cluster *NamespaceCluster, names
|
||||
)
|
||||
}
|
||||
|
||||
// RestoreLocalClusters restores namespace cluster processes that should be running on this node.
|
||||
// Called on node startup to re-spawn RQLite, Olric, and Gateway processes for clusters
|
||||
// that were previously provisioned and assigned to this node.
|
||||
func (cm *ClusterManager) RestoreLocalClusters(ctx context.Context) error {
|
||||
if cm.localNodeID == "" {
|
||||
return fmt.Errorf("local node ID not set")
|
||||
}
|
||||
|
||||
cm.logger.Info("Checking for namespace clusters to restore", zap.String("local_node_id", cm.localNodeID))
|
||||
|
||||
// Find all ready clusters that have this node assigned
|
||||
type clusterNodeInfo struct {
|
||||
ClusterID string `db:"namespace_cluster_id"`
|
||||
NamespaceName string `db:"namespace_name"`
|
||||
NodeID string `db:"node_id"`
|
||||
Role string `db:"role"`
|
||||
}
|
||||
var assignments []clusterNodeInfo
|
||||
query := `
|
||||
SELECT DISTINCT cn.namespace_cluster_id, c.namespace_name, cn.node_id, cn.role
|
||||
FROM namespace_cluster_nodes cn
|
||||
JOIN namespace_clusters c ON cn.namespace_cluster_id = c.id
|
||||
WHERE cn.node_id = ? AND c.status = 'ready'
|
||||
`
|
||||
if err := cm.db.Query(ctx, &assignments, query, cm.localNodeID); err != nil {
|
||||
return fmt.Errorf("failed to query local cluster assignments: %w", err)
|
||||
}
|
||||
|
||||
if len(assignments) == 0 {
|
||||
cm.logger.Info("No namespace clusters to restore on this node")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Group by cluster
|
||||
clusterNamespaces := make(map[string]string) // clusterID -> namespaceName
|
||||
for _, a := range assignments {
|
||||
clusterNamespaces[a.ClusterID] = a.NamespaceName
|
||||
}
|
||||
|
||||
cm.logger.Info("Found namespace clusters to restore",
|
||||
zap.Int("count", len(clusterNamespaces)),
|
||||
zap.String("local_node_id", cm.localNodeID),
|
||||
)
|
||||
|
||||
// Get local node's WireGuard IP
|
||||
type nodeIPInfo struct {
|
||||
InternalIP string `db:"internal_ip"`
|
||||
}
|
||||
var localNodeInfo []nodeIPInfo
|
||||
ipQuery := `SELECT COALESCE(internal_ip, ip_address) as internal_ip FROM dns_nodes WHERE id = ? LIMIT 1`
|
||||
if err := cm.db.Query(ctx, &localNodeInfo, ipQuery, cm.localNodeID); err != nil || len(localNodeInfo) == 0 {
|
||||
cm.logger.Warn("Could not determine local node IP, skipping restore", zap.Error(err))
|
||||
return fmt.Errorf("failed to get local node IP: %w", err)
|
||||
}
|
||||
localIP := localNodeInfo[0].InternalIP
|
||||
|
||||
for clusterID, namespaceName := range clusterNamespaces {
|
||||
if err := cm.restoreClusterOnNode(ctx, clusterID, namespaceName, localIP); err != nil {
|
||||
cm.logger.Error("Failed to restore namespace cluster",
|
||||
zap.String("namespace", namespaceName),
|
||||
zap.String("cluster_id", clusterID),
|
||||
zap.Error(err),
|
||||
)
|
||||
// Continue restoring other clusters
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// restoreClusterOnNode restores all processes for a single cluster on this node
|
||||
func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, namespaceName, localIP string) error {
|
||||
cm.logger.Info("Restoring namespace cluster processes",
|
||||
zap.String("namespace", namespaceName),
|
||||
zap.String("cluster_id", clusterID),
|
||||
)
|
||||
|
||||
// Get port allocation for this node
|
||||
var portBlocks []PortBlock
|
||||
portQuery := `SELECT * FROM namespace_port_allocations WHERE namespace_cluster_id = ? AND node_id = ?`
|
||||
if err := cm.db.Query(ctx, &portBlocks, portQuery, clusterID, cm.localNodeID); err != nil || len(portBlocks) == 0 {
|
||||
return fmt.Errorf("no port allocation found for cluster %s on node %s", clusterID, cm.localNodeID)
|
||||
}
|
||||
pb := &portBlocks[0]
|
||||
|
||||
// Get all nodes in this cluster (for join addresses and peer addresses)
|
||||
allNodes, err := cm.getClusterNodes(ctx, clusterID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get cluster nodes: %w", err)
|
||||
}
|
||||
|
||||
// Get all nodes' IPs and port allocations
|
||||
type nodePortInfo struct {
|
||||
NodeID string `db:"node_id"`
|
||||
InternalIP string `db:"internal_ip"`
|
||||
RQLiteHTTPPort int `db:"rqlite_http_port"`
|
||||
RQLiteRaftPort int `db:"rqlite_raft_port"`
|
||||
OlricHTTPPort int `db:"olric_http_port"`
|
||||
OlricMemberlistPort int `db:"olric_memberlist_port"`
|
||||
}
|
||||
var allNodePorts []nodePortInfo
|
||||
allPortsQuery := `
|
||||
SELECT pa.node_id, COALESCE(dn.internal_ip, dn.ip_address) as internal_ip,
|
||||
pa.rqlite_http_port, pa.rqlite_raft_port, pa.olric_http_port, pa.olric_memberlist_port
|
||||
FROM namespace_port_allocations pa
|
||||
JOIN dns_nodes dn ON pa.node_id = dn.id
|
||||
WHERE pa.namespace_cluster_id = ?
|
||||
`
|
||||
if err := cm.db.Query(ctx, &allNodePorts, allPortsQuery, clusterID); err != nil {
|
||||
return fmt.Errorf("failed to get all node ports: %w", err)
|
||||
}
|
||||
|
||||
// 1. Restore RQLite
|
||||
if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) {
|
||||
hasExistingData := cm.rqliteSpawner.HasExistingData(namespaceName, cm.localNodeID)
|
||||
|
||||
if hasExistingData {
|
||||
// Write peers.json for Raft cluster recovery (official RQLite mechanism).
|
||||
// When all nodes restart simultaneously, Raft can't form quorum from stale state.
|
||||
// peers.json tells rqlited the correct voter list so it can hold a fresh election.
|
||||
var peers []rqlite.RaftPeer
|
||||
for _, np := range allNodePorts {
|
||||
raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)
|
||||
peers = append(peers, rqlite.RaftPeer{
|
||||
ID: raftAddr,
|
||||
Address: raftAddr,
|
||||
NonVoter: false,
|
||||
})
|
||||
}
|
||||
dataDir := cm.rqliteSpawner.GetDataDir(namespaceName, cm.localNodeID)
|
||||
if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil {
|
||||
cm.logger.Error("Failed to write peers.json", zap.String("namespace", namespaceName), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Build join addresses for first-time joins (no existing data)
|
||||
var joinAddrs []string
|
||||
isLeader := false
|
||||
if !hasExistingData {
|
||||
// Deterministic leader selection: sort all node IDs and pick the first one.
|
||||
// Every node independently computes the same result — no coordination needed.
|
||||
// The elected leader bootstraps the cluster; followers use -join with retries
|
||||
// to wait for the leader to become ready (up to 5 minutes).
|
||||
sortedNodeIDs := make([]string, 0, len(allNodePorts))
|
||||
for _, np := range allNodePorts {
|
||||
sortedNodeIDs = append(sortedNodeIDs, np.NodeID)
|
||||
}
|
||||
sort.Strings(sortedNodeIDs)
|
||||
electedLeaderID := sortedNodeIDs[0]
|
||||
|
||||
if cm.localNodeID == electedLeaderID {
|
||||
isLeader = true
|
||||
cm.logger.Info("Deterministic leader election: this node is the leader",
|
||||
zap.String("namespace", namespaceName),
|
||||
zap.String("node_id", cm.localNodeID))
|
||||
} else {
|
||||
// Follower: join the elected leader's raft address
|
||||
for _, np := range allNodePorts {
|
||||
if np.NodeID == electedLeaderID {
|
||||
joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort))
|
||||
break
|
||||
}
|
||||
}
|
||||
cm.logger.Info("Deterministic leader election: this node is a follower",
|
||||
zap.String("namespace", namespaceName),
|
||||
zap.String("node_id", cm.localNodeID),
|
||||
zap.String("leader_id", electedLeaderID),
|
||||
zap.Strings("join_addrs", joinAddrs))
|
||||
}
|
||||
}
|
||||
|
||||
rqliteCfg := rqlite.InstanceConfig{
|
||||
Namespace: namespaceName,
|
||||
NodeID: cm.localNodeID,
|
||||
HTTPPort: pb.RQLiteHTTPPort,
|
||||
RaftPort: pb.RQLiteRaftPort,
|
||||
HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort),
|
||||
RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort),
|
||||
JoinAddresses: joinAddrs,
|
||||
IsLeader: isLeader,
|
||||
}
|
||||
|
||||
if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil {
|
||||
cm.logger.Error("Failed to restore RQLite", zap.String("namespace", namespaceName), zap.Error(err))
|
||||
} else {
|
||||
cm.logger.Info("Restored RQLite instance", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort))
|
||||
}
|
||||
} else {
|
||||
cm.logger.Info("RQLite already running", zap.String("namespace", namespaceName), zap.Int("port", pb.RQLiteHTTPPort))
|
||||
}
|
||||
|
||||
// 2. Restore Olric
|
||||
olricRunning := false
|
||||
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second)
|
||||
if err == nil {
|
||||
conn.Close()
|
||||
olricRunning = true
|
||||
}
|
||||
|
||||
if !olricRunning {
|
||||
var peers []string
|
||||
for _, np := range allNodePorts {
|
||||
if np.NodeID != cm.localNodeID {
|
||||
peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort))
|
||||
}
|
||||
}
|
||||
|
||||
olricCfg := olric.InstanceConfig{
|
||||
Namespace: namespaceName,
|
||||
NodeID: cm.localNodeID,
|
||||
HTTPPort: pb.OlricHTTPPort,
|
||||
MemberlistPort: pb.OlricMemberlistPort,
|
||||
BindAddr: localIP,
|
||||
AdvertiseAddr: localIP,
|
||||
PeerAddresses: peers,
|
||||
}
|
||||
|
||||
if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil {
|
||||
cm.logger.Error("Failed to restore Olric", zap.String("namespace", namespaceName), zap.Error(err))
|
||||
} else {
|
||||
cm.logger.Info("Restored Olric instance", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricHTTPPort))
|
||||
}
|
||||
} else {
|
||||
cm.logger.Info("Olric already running", zap.String("namespace", namespaceName), zap.Int("port", pb.OlricMemberlistPort))
|
||||
}
|
||||
|
||||
// 3. Restore Gateway
|
||||
// Check if any cluster node has the gateway role (gateway may have been skipped during provisioning)
|
||||
hasGateway := false
|
||||
for _, node := range allNodes {
|
||||
if node.Role == NodeRoleGateway {
|
||||
hasGateway = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if hasGateway {
|
||||
gwRunning := false
|
||||
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort))
|
||||
if err == nil {
|
||||
resp.Body.Close()
|
||||
gwRunning = true
|
||||
}
|
||||
|
||||
if !gwRunning {
|
||||
// Build olric server addresses
|
||||
var olricServers []string
|
||||
for _, np := range allNodePorts {
|
||||
if np.NodeID == cm.localNodeID {
|
||||
olricServers = append(olricServers, fmt.Sprintf("localhost:%d", np.OlricHTTPPort))
|
||||
} else {
|
||||
olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort))
|
||||
}
|
||||
}
|
||||
|
||||
gwCfg := gateway.InstanceConfig{
|
||||
Namespace: namespaceName,
|
||||
NodeID: cm.localNodeID,
|
||||
HTTPPort: pb.GatewayHTTPPort,
|
||||
BaseDomain: cm.baseDomain,
|
||||
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
||||
OlricServers: olricServers,
|
||||
}
|
||||
|
||||
if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil {
|
||||
cm.logger.Error("Failed to restore Gateway", zap.String("namespace", namespaceName), zap.Error(err))
|
||||
} else {
|
||||
cm.logger.Info("Restored Gateway instance", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort))
|
||||
}
|
||||
} else {
|
||||
cm.logger.Info("Gateway already running", zap.String("namespace", namespaceName), zap.Int("port", pb.GatewayHTTPPort))
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetClusterStatusByID returns the full status of a cluster by ID.
|
||||
// This method is part of the ClusterProvisioner interface used by the gateway.
|
||||
// It returns a generic struct that matches the interface definition in auth/handlers.go.
|
||||
|
||||
@ -6,6 +6,7 @@ import (
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway"
|
||||
namespacehandlers "github.com/DeBrosOfficial/network/pkg/gateway/handlers/namespace"
|
||||
@ -91,6 +92,15 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
|
||||
n.logger.ComponentInfo(logging.ComponentNode, "Namespace cluster provisioning enabled",
|
||||
zap.String("base_domain", clusterCfg.BaseDomain),
|
||||
zap.String("base_data_dir", baseDataDir))
|
||||
|
||||
// Restore previously-running namespace cluster processes in background
|
||||
go func() {
|
||||
// Wait for main RQLite to be ready before querying cluster assignments
|
||||
time.Sleep(10 * time.Second)
|
||||
if err := clusterManager.RestoreLocalClusters(ctx); err != nil {
|
||||
n.logger.ComponentError(logging.ComponentNode, "Failed to restore namespace clusters", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
go func() {
|
||||
|
||||
@ -2,6 +2,7 @@ package rqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"os"
|
||||
@ -12,6 +13,13 @@ import (
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// RaftPeer represents a peer entry in RQLite's peers.json recovery file
|
||||
type RaftPeer struct {
|
||||
ID string `json:"id"`
|
||||
Address string `json:"address"`
|
||||
NonVoter bool `json:"non_voter"`
|
||||
}
|
||||
|
||||
// InstanceConfig contains configuration for spawning a RQLite instance
|
||||
type InstanceConfig struct {
|
||||
Namespace string // Namespace this instance belongs to
|
||||
@ -80,6 +88,9 @@ func (is *InstanceSpawner) SpawnInstance(ctx context.Context, cfg InstanceConfig
|
||||
for _, addr := range cfg.JoinAddresses {
|
||||
args = append(args, "-join", addr)
|
||||
}
|
||||
// Retry joining for up to 5 minutes (default is 5 attempts / 3s = 15s which is too short
|
||||
// when all namespace nodes restart simultaneously and the leader isn't ready yet)
|
||||
args = append(args, "-join-attempts", "30", "-join-interval", "10s")
|
||||
}
|
||||
|
||||
// Data directory must be the last argument
|
||||
@ -139,7 +150,9 @@ func (is *InstanceSpawner) waitForReady(ctx context.Context, httpPort int) error
|
||||
url := fmt.Sprintf("http://localhost:%d/status", httpPort)
|
||||
client := &http.Client{Timeout: 2 * time.Second}
|
||||
|
||||
deadline := time.Now().Add(30 * time.Second)
|
||||
// 6 minutes: must exceed the join retry window (30 attempts * 10s = 5min)
|
||||
// so we don't kill followers that are still waiting for the leader
|
||||
deadline := time.Now().Add(6 * time.Minute)
|
||||
for time.Now().Before(deadline) {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@ -237,6 +250,42 @@ func (is *InstanceSpawner) IsInstanceRunning(httpPort int) bool {
|
||||
return resp.StatusCode == http.StatusOK
|
||||
}
|
||||
|
||||
// HasExistingData checks if a RQLite instance has existing data (raft.db indicates prior startup)
|
||||
func (is *InstanceSpawner) HasExistingData(namespace, nodeID string) bool {
|
||||
dataDir := is.GetDataDir(namespace, nodeID)
|
||||
if _, err := os.Stat(filepath.Join(dataDir, "raft.db")); err == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// WritePeersJSON writes a peers.json recovery file into the Raft directory.
|
||||
// This is RQLite's official mechanism for recovering a cluster when all nodes are down.
|
||||
// On startup, rqlited reads this file, overwrites the Raft peer configuration,
|
||||
// and renames it to peers.info after recovery.
|
||||
func (is *InstanceSpawner) WritePeersJSON(dataDir string, peers []RaftPeer) error {
|
||||
raftDir := filepath.Join(dataDir, "raft")
|
||||
if err := os.MkdirAll(raftDir, 0755); err != nil {
|
||||
return fmt.Errorf("failed to create raft directory: %w", err)
|
||||
}
|
||||
|
||||
data, err := json.MarshalIndent(peers, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal peers.json: %w", err)
|
||||
}
|
||||
|
||||
peersPath := filepath.Join(raftDir, "peers.json")
|
||||
if err := os.WriteFile(peersPath, data, 0644); err != nil {
|
||||
return fmt.Errorf("failed to write peers.json: %w", err)
|
||||
}
|
||||
|
||||
is.logger.Info("Wrote peers.json for cluster recovery",
|
||||
zap.String("path", peersPath),
|
||||
zap.Int("peer_count", len(peers)),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDataDir returns the data directory path for a namespace RQLite instance
|
||||
func (is *InstanceSpawner) GetDataDir(namespace, nodeID string) string {
|
||||
return filepath.Join(is.baseDataDir, namespace, "rqlite", nodeID)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user