mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 08:36:57 +00:00
Bug fixing
This commit is contained in:
parent
859c30fcd9
commit
11d5c1b19a
@ -589,24 +589,23 @@ func (g *Gateway) domainRoutingMiddleware(next http.Handler) http.Handler {
|
|||||||
// handleNamespaceGatewayRequest proxies requests to a namespace's dedicated gateway cluster
|
// handleNamespaceGatewayRequest proxies requests to a namespace's dedicated gateway cluster
|
||||||
// This enables physical isolation where each namespace has its own RQLite, Olric, and Gateway
|
// This enables physical isolation where each namespace has its own RQLite, Olric, and Gateway
|
||||||
func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.Request, namespaceName string) {
|
func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.Request, namespaceName string) {
|
||||||
// Look up namespace cluster gateway IPs from DNS records
|
// Look up namespace cluster gateway using internal (WireGuard) IPs for inter-node proxying
|
||||||
db := g.client.Database()
|
db := g.client.Database()
|
||||||
internalCtx := client.WithInternalAuth(r.Context())
|
internalCtx := client.WithInternalAuth(r.Context())
|
||||||
|
|
||||||
baseDomain := "dbrs.space"
|
// Single query: get internal IP + gateway port from cluster tables
|
||||||
if g.cfg != nil && g.cfg.BaseDomain != "" {
|
query := `
|
||||||
baseDomain = g.cfg.BaseDomain
|
SELECT COALESCE(dn.internal_ip, dn.ip_address), npa.gateway_http_port
|
||||||
}
|
FROM namespace_port_allocations npa
|
||||||
|
JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id
|
||||||
// Query DNS records for the namespace gateway
|
JOIN dns_nodes dn ON npa.node_id = dn.id
|
||||||
fqdn := "ns-" + namespaceName + "." + baseDomain + "."
|
WHERE nc.namespace_name = ? AND nc.status = 'ready'
|
||||||
query := `SELECT value FROM dns_records WHERE fqdn = ? AND record_type = 'A' AND is_active = TRUE ORDER BY RANDOM() LIMIT 1`
|
ORDER BY RANDOM() LIMIT 1
|
||||||
result, err := db.Query(internalCtx, query, fqdn)
|
`
|
||||||
|
result, err := db.Query(internalCtx, query, namespaceName)
|
||||||
if err != nil || result == nil || len(result.Rows) == 0 {
|
if err != nil || result == nil || len(result.Rows) == 0 {
|
||||||
// No gateway found for this namespace
|
|
||||||
g.logger.ComponentWarn(logging.ComponentGeneral, "namespace gateway not found",
|
g.logger.ComponentWarn(logging.ComponentGeneral, "namespace gateway not found",
|
||||||
zap.String("namespace", namespaceName),
|
zap.String("namespace", namespaceName),
|
||||||
zap.String("fqdn", fqdn),
|
|
||||||
)
|
)
|
||||||
http.Error(w, "Namespace gateway not found", http.StatusNotFound)
|
http.Error(w, "Namespace gateway not found", http.StatusNotFound)
|
||||||
return
|
return
|
||||||
@ -617,22 +616,9 @@ func (g *Gateway) handleNamespaceGatewayRequest(w http.ResponseWriter, r *http.R
|
|||||||
http.Error(w, "Namespace gateway not available", http.StatusServiceUnavailable)
|
http.Error(w, "Namespace gateway not available", http.StatusServiceUnavailable)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
gatewayPort := 10004
|
||||||
// Get the gateway port from namespace_port_allocations
|
if p := getInt(result.Rows[0][1]); p > 0 {
|
||||||
// Gateway HTTP port is port_start + 4
|
gatewayPort = p
|
||||||
portQuery := `
|
|
||||||
SELECT npa.gateway_http_port
|
|
||||||
FROM namespace_port_allocations npa
|
|
||||||
JOIN namespace_clusters nc ON npa.namespace_cluster_id = nc.id
|
|
||||||
WHERE nc.namespace_name = ?
|
|
||||||
LIMIT 1
|
|
||||||
`
|
|
||||||
portResult, err := db.Query(internalCtx, portQuery, namespaceName)
|
|
||||||
gatewayPort := 10004 // Default to first namespace's gateway port
|
|
||||||
if err == nil && portResult != nil && len(portResult.Rows) > 0 {
|
|
||||||
if p := getInt(portResult.Rows[0][0]); p > 0 {
|
|
||||||
gatewayPort = p
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Proxy request to the namespace gateway
|
// Proxy request to the namespace gateway
|
||||||
|
|||||||
@ -8,6 +8,8 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
"net"
|
"net"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
"sort"
|
"sort"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
@ -1299,6 +1301,257 @@ func (cm *ClusterManager) restoreClusterOnNode(ctx context.Context, clusterID, n
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Save local state to disk for future restarts without DB dependency
|
||||||
|
var stateNodes []ClusterLocalStateNode
|
||||||
|
for _, np := range allNodePorts {
|
||||||
|
stateNodes = append(stateNodes, ClusterLocalStateNode{
|
||||||
|
NodeID: np.NodeID,
|
||||||
|
InternalIP: np.InternalIP,
|
||||||
|
RQLiteHTTPPort: np.RQLiteHTTPPort,
|
||||||
|
RQLiteRaftPort: np.RQLiteRaftPort,
|
||||||
|
OlricHTTPPort: np.OlricHTTPPort,
|
||||||
|
OlricMemberlistPort: np.OlricMemberlistPort,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
localState := &ClusterLocalState{
|
||||||
|
ClusterID: clusterID,
|
||||||
|
NamespaceName: namespaceName,
|
||||||
|
LocalNodeID: cm.localNodeID,
|
||||||
|
LocalIP: localIP,
|
||||||
|
LocalPorts: ClusterLocalStatePorts{
|
||||||
|
RQLiteHTTPPort: pb.RQLiteHTTPPort,
|
||||||
|
RQLiteRaftPort: pb.RQLiteRaftPort,
|
||||||
|
OlricHTTPPort: pb.OlricHTTPPort,
|
||||||
|
OlricMemberlistPort: pb.OlricMemberlistPort,
|
||||||
|
GatewayHTTPPort: pb.GatewayHTTPPort,
|
||||||
|
},
|
||||||
|
AllNodes: stateNodes,
|
||||||
|
HasGateway: hasGateway,
|
||||||
|
BaseDomain: cm.baseDomain,
|
||||||
|
SavedAt: time.Now(),
|
||||||
|
}
|
||||||
|
if err := cm.saveLocalState(localState); err != nil {
|
||||||
|
cm.logger.Warn("Failed to save cluster local state", zap.String("namespace", namespaceName), zap.Error(err))
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ClusterLocalState is persisted to disk so namespace processes can be restored
|
||||||
|
// without querying the main RQLite cluster (which may not have a leader yet on cold start).
|
||||||
|
type ClusterLocalState struct {
|
||||||
|
ClusterID string `json:"cluster_id"`
|
||||||
|
NamespaceName string `json:"namespace_name"`
|
||||||
|
LocalNodeID string `json:"local_node_id"`
|
||||||
|
LocalIP string `json:"local_ip"`
|
||||||
|
LocalPorts ClusterLocalStatePorts `json:"local_ports"`
|
||||||
|
AllNodes []ClusterLocalStateNode `json:"all_nodes"`
|
||||||
|
HasGateway bool `json:"has_gateway"`
|
||||||
|
BaseDomain string `json:"base_domain"`
|
||||||
|
SavedAt time.Time `json:"saved_at"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClusterLocalStatePorts struct {
|
||||||
|
RQLiteHTTPPort int `json:"rqlite_http_port"`
|
||||||
|
RQLiteRaftPort int `json:"rqlite_raft_port"`
|
||||||
|
OlricHTTPPort int `json:"olric_http_port"`
|
||||||
|
OlricMemberlistPort int `json:"olric_memberlist_port"`
|
||||||
|
GatewayHTTPPort int `json:"gateway_http_port"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClusterLocalStateNode struct {
|
||||||
|
NodeID string `json:"node_id"`
|
||||||
|
InternalIP string `json:"internal_ip"`
|
||||||
|
RQLiteHTTPPort int `json:"rqlite_http_port"`
|
||||||
|
RQLiteRaftPort int `json:"rqlite_raft_port"`
|
||||||
|
OlricHTTPPort int `json:"olric_http_port"`
|
||||||
|
OlricMemberlistPort int `json:"olric_memberlist_port"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// saveLocalState writes cluster state to disk for fast recovery without DB queries.
|
||||||
|
func (cm *ClusterManager) saveLocalState(state *ClusterLocalState) error {
|
||||||
|
dir := filepath.Join(cm.baseDataDir, state.NamespaceName)
|
||||||
|
if err := os.MkdirAll(dir, 0755); err != nil {
|
||||||
|
return fmt.Errorf("failed to create state dir: %w", err)
|
||||||
|
}
|
||||||
|
data, err := json.MarshalIndent(state, "", " ")
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal state: %w", err)
|
||||||
|
}
|
||||||
|
path := filepath.Join(dir, "cluster-state.json")
|
||||||
|
if err := os.WriteFile(path, data, 0644); err != nil {
|
||||||
|
return fmt.Errorf("failed to write state file: %w", err)
|
||||||
|
}
|
||||||
|
cm.logger.Info("Saved cluster local state", zap.String("namespace", state.NamespaceName), zap.String("path", path))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadLocalState reads cluster state from disk.
|
||||||
|
func loadLocalState(path string) (*ClusterLocalState, error) {
|
||||||
|
data, err := os.ReadFile(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
var state ClusterLocalState
|
||||||
|
if err := json.Unmarshal(data, &state); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse state file: %w", err)
|
||||||
|
}
|
||||||
|
return &state, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// RestoreLocalClustersFromDisk restores namespace processes using local state files,
|
||||||
|
// avoiding any dependency on the main RQLite cluster being available.
|
||||||
|
// Returns the number of namespaces restored, or -1 if no state files were found.
|
||||||
|
func (cm *ClusterManager) RestoreLocalClustersFromDisk(ctx context.Context) (int, error) {
|
||||||
|
pattern := filepath.Join(cm.baseDataDir, "*", "cluster-state.json")
|
||||||
|
matches, err := filepath.Glob(pattern)
|
||||||
|
if err != nil {
|
||||||
|
return -1, fmt.Errorf("failed to glob state files: %w", err)
|
||||||
|
}
|
||||||
|
if len(matches) == 0 {
|
||||||
|
return -1, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
cm.logger.Info("Found local cluster state files", zap.Int("count", len(matches)))
|
||||||
|
|
||||||
|
restored := 0
|
||||||
|
for _, path := range matches {
|
||||||
|
state, err := loadLocalState(path)
|
||||||
|
if err != nil {
|
||||||
|
cm.logger.Error("Failed to load cluster state file", zap.String("path", path), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := cm.restoreClusterFromState(ctx, state); err != nil {
|
||||||
|
cm.logger.Error("Failed to restore cluster from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
restored++
|
||||||
|
}
|
||||||
|
return restored, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// restoreClusterFromState restores all processes for a cluster using local state (no DB queries).
|
||||||
|
func (cm *ClusterManager) restoreClusterFromState(ctx context.Context, state *ClusterLocalState) error {
|
||||||
|
cm.logger.Info("Restoring namespace cluster from local state",
|
||||||
|
zap.String("namespace", state.NamespaceName),
|
||||||
|
zap.String("cluster_id", state.ClusterID),
|
||||||
|
)
|
||||||
|
|
||||||
|
pb := &state.LocalPorts
|
||||||
|
localIP := state.LocalIP
|
||||||
|
|
||||||
|
// 1. Restore RQLite
|
||||||
|
if !cm.rqliteSpawner.IsInstanceRunning(pb.RQLiteHTTPPort) {
|
||||||
|
hasExistingData := cm.rqliteSpawner.HasExistingData(state.NamespaceName, cm.localNodeID)
|
||||||
|
|
||||||
|
if hasExistingData {
|
||||||
|
var peers []rqlite.RaftPeer
|
||||||
|
for _, np := range state.AllNodes {
|
||||||
|
raftAddr := fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort)
|
||||||
|
peers = append(peers, rqlite.RaftPeer{ID: raftAddr, Address: raftAddr, NonVoter: false})
|
||||||
|
}
|
||||||
|
dataDir := cm.rqliteSpawner.GetDataDir(state.NamespaceName, cm.localNodeID)
|
||||||
|
if err := cm.rqliteSpawner.WritePeersJSON(dataDir, peers); err != nil {
|
||||||
|
cm.logger.Error("Failed to write peers.json", zap.String("namespace", state.NamespaceName), zap.Error(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
var joinAddrs []string
|
||||||
|
isLeader := false
|
||||||
|
if !hasExistingData {
|
||||||
|
sortedNodeIDs := make([]string, 0, len(state.AllNodes))
|
||||||
|
for _, np := range state.AllNodes {
|
||||||
|
sortedNodeIDs = append(sortedNodeIDs, np.NodeID)
|
||||||
|
}
|
||||||
|
sort.Strings(sortedNodeIDs)
|
||||||
|
electedLeaderID := sortedNodeIDs[0]
|
||||||
|
|
||||||
|
if cm.localNodeID == electedLeaderID {
|
||||||
|
isLeader = true
|
||||||
|
} else {
|
||||||
|
for _, np := range state.AllNodes {
|
||||||
|
if np.NodeID == electedLeaderID {
|
||||||
|
joinAddrs = append(joinAddrs, fmt.Sprintf("%s:%d", np.InternalIP, np.RQLiteRaftPort))
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
rqliteCfg := rqlite.InstanceConfig{
|
||||||
|
Namespace: state.NamespaceName,
|
||||||
|
NodeID: cm.localNodeID,
|
||||||
|
HTTPPort: pb.RQLiteHTTPPort,
|
||||||
|
RaftPort: pb.RQLiteRaftPort,
|
||||||
|
HTTPAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteHTTPPort),
|
||||||
|
RaftAdvAddress: fmt.Sprintf("%s:%d", localIP, pb.RQLiteRaftPort),
|
||||||
|
JoinAddresses: joinAddrs,
|
||||||
|
IsLeader: isLeader,
|
||||||
|
}
|
||||||
|
if _, err := cm.rqliteSpawner.SpawnInstance(ctx, rqliteCfg); err != nil {
|
||||||
|
cm.logger.Error("Failed to restore RQLite from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
cm.logger.Info("Restored RQLite instance from state", zap.String("namespace", state.NamespaceName))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 2. Restore Olric
|
||||||
|
conn, err := net.DialTimeout("tcp", fmt.Sprintf("localhost:%d", pb.OlricMemberlistPort), 2*time.Second)
|
||||||
|
if err == nil {
|
||||||
|
conn.Close()
|
||||||
|
} else {
|
||||||
|
var peers []string
|
||||||
|
for _, np := range state.AllNodes {
|
||||||
|
if np.NodeID != cm.localNodeID {
|
||||||
|
peers = append(peers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricMemberlistPort))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
olricCfg := olric.InstanceConfig{
|
||||||
|
Namespace: state.NamespaceName,
|
||||||
|
NodeID: cm.localNodeID,
|
||||||
|
HTTPPort: pb.OlricHTTPPort,
|
||||||
|
MemberlistPort: pb.OlricMemberlistPort,
|
||||||
|
BindAddr: localIP,
|
||||||
|
AdvertiseAddr: localIP,
|
||||||
|
PeerAddresses: peers,
|
||||||
|
}
|
||||||
|
if _, err := cm.olricSpawner.SpawnInstance(ctx, olricCfg); err != nil {
|
||||||
|
cm.logger.Error("Failed to restore Olric from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
cm.logger.Info("Restored Olric instance from state", zap.String("namespace", state.NamespaceName))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 3. Restore Gateway
|
||||||
|
if state.HasGateway {
|
||||||
|
resp, err := http.Get(fmt.Sprintf("http://localhost:%d/v1/health", pb.GatewayHTTPPort))
|
||||||
|
if err == nil {
|
||||||
|
resp.Body.Close()
|
||||||
|
} else {
|
||||||
|
var olricServers []string
|
||||||
|
for _, np := range state.AllNodes {
|
||||||
|
if np.NodeID == cm.localNodeID {
|
||||||
|
olricServers = append(olricServers, fmt.Sprintf("localhost:%d", np.OlricHTTPPort))
|
||||||
|
} else {
|
||||||
|
olricServers = append(olricServers, fmt.Sprintf("%s:%d", np.InternalIP, np.OlricHTTPPort))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
gwCfg := gateway.InstanceConfig{
|
||||||
|
Namespace: state.NamespaceName,
|
||||||
|
NodeID: cm.localNodeID,
|
||||||
|
HTTPPort: pb.GatewayHTTPPort,
|
||||||
|
BaseDomain: state.BaseDomain,
|
||||||
|
RQLiteDSN: fmt.Sprintf("http://localhost:%d", pb.RQLiteHTTPPort),
|
||||||
|
OlricServers: olricServers,
|
||||||
|
}
|
||||||
|
if _, err := cm.gatewaySpawner.SpawnInstance(ctx, gwCfg); err != nil {
|
||||||
|
cm.logger.Error("Failed to restore Gateway from state", zap.String("namespace", state.NamespaceName), zap.Error(err))
|
||||||
|
} else {
|
||||||
|
cm.logger.Info("Restored Gateway instance from state", zap.String("namespace", state.NamespaceName))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -93,13 +93,35 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
|
|||||||
zap.String("base_domain", clusterCfg.BaseDomain),
|
zap.String("base_domain", clusterCfg.BaseDomain),
|
||||||
zap.String("base_data_dir", baseDataDir))
|
zap.String("base_data_dir", baseDataDir))
|
||||||
|
|
||||||
// Restore previously-running namespace cluster processes in background
|
// Restore previously-running namespace cluster processes in background.
|
||||||
|
// First try local state files (no DB dependency), then fall back to DB query with retries.
|
||||||
go func() {
|
go func() {
|
||||||
// Wait for main RQLite to be ready before querying cluster assignments
|
time.Sleep(5 * time.Second)
|
||||||
time.Sleep(10 * time.Second)
|
|
||||||
if err := clusterManager.RestoreLocalClusters(ctx); err != nil {
|
// Try disk-based restore first (instant, no DB needed)
|
||||||
n.logger.ComponentError(logging.ComponentNode, "Failed to restore namespace clusters", zap.Error(err))
|
restored, err := clusterManager.RestoreLocalClustersFromDisk(ctx)
|
||||||
|
if err != nil {
|
||||||
|
n.logger.ComponentWarn(logging.ComponentNode, "Disk-based namespace restore failed", zap.Error(err))
|
||||||
}
|
}
|
||||||
|
if restored > 0 {
|
||||||
|
n.logger.ComponentInfo(logging.ComponentNode, "Restored namespace clusters from local state",
|
||||||
|
zap.Int("count", restored))
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// No state files found — fall back to DB query with retries
|
||||||
|
n.logger.ComponentInfo(logging.ComponentNode, "No local state files, falling back to DB restore")
|
||||||
|
time.Sleep(5 * time.Second)
|
||||||
|
for attempt := 1; attempt <= 12; attempt++ {
|
||||||
|
if err := clusterManager.RestoreLocalClusters(ctx); err == nil {
|
||||||
|
return
|
||||||
|
} else {
|
||||||
|
n.logger.ComponentWarn(logging.ComponentNode, "Namespace cluster restore failed, retrying",
|
||||||
|
zap.Int("attempt", attempt), zap.Error(err))
|
||||||
|
}
|
||||||
|
time.Sleep(10 * time.Second)
|
||||||
|
}
|
||||||
|
n.logger.ComponentError(logging.ComponentNode, "Failed to restore namespace clusters after all retries")
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user