mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 09:36:56 +00:00
Bug fixing
This commit is contained in:
parent
156de7eb19
commit
d85ed032f8
155
docs/DEVNET_INSTALL.md
Normal file
155
docs/DEVNET_INSTALL.md
Normal file
@ -0,0 +1,155 @@
|
||||
# Devnet Installation Commands
|
||||
|
||||
This document contains example installation commands for a multi-node devnet cluster.
|
||||
|
||||
**Wallet:** `<YOUR_WALLET_ADDRESS>`
|
||||
**Contact:** `@anon: <YOUR_WALLET_ADDRESS>`
|
||||
|
||||
## Node Configuration
|
||||
|
||||
| Node | Role | Nameserver | Anyone Relay |
|
||||
|------|------|------------|--------------|
|
||||
| ns1 | Genesis | Yes | No |
|
||||
| ns2 | Nameserver | Yes | Yes (relay-1) |
|
||||
| ns3 | Nameserver | Yes | Yes (relay-2) |
|
||||
| node4 | Worker | No | Yes (relay-3) |
|
||||
| node5 | Worker | No | Yes (relay-4) |
|
||||
| node6 | Worker | No | No |
|
||||
|
||||
**Note:** Store credentials securely (not in version control).
|
||||
|
||||
## MyFamily Fingerprints
|
||||
|
||||
If running multiple Anyone relays, configure MyFamily with all your relay fingerprints:
|
||||
```
|
||||
<FINGERPRINT_1>,<FINGERPRINT_2>,<FINGERPRINT_3>,...
|
||||
```
|
||||
|
||||
## Installation Order
|
||||
|
||||
Install nodes **one at a time**, waiting for each to complete before starting the next:
|
||||
|
||||
1. ns1 (genesis, no Anyone relay)
|
||||
2. ns2 (nameserver + relay)
|
||||
3. ns3 (nameserver + relay)
|
||||
4. node4 (non-nameserver + relay)
|
||||
5. node5 (non-nameserver + relay)
|
||||
6. node6 (non-nameserver, no relay)
|
||||
|
||||
## ns1 - Genesis Node (No Anyone Relay)
|
||||
|
||||
```bash
|
||||
# SSH: <user>@<ns1-ip>
|
||||
|
||||
sudo orama install --no-pull --pre-built \
|
||||
--vps-ip <ns1-ip> \
|
||||
--domain <your-domain.com> \
|
||||
--base-domain <your-domain.com> \
|
||||
--nameserver
|
||||
```
|
||||
|
||||
After ns1 is installed, generate invite tokens:
|
||||
```bash
|
||||
orama invite --expiry 24h
|
||||
```
|
||||
|
||||
## ns2 - Nameserver + Relay
|
||||
|
||||
```bash
|
||||
# SSH: <user>@<ns2-ip>
|
||||
|
||||
sudo orama install --no-pull --pre-built \
|
||||
--join http://<ns1-ip> --token <TOKEN> \
|
||||
--vps-ip <ns2-ip> \
|
||||
--domain <your-domain.com> \
|
||||
--base-domain <your-domain.com> \
|
||||
--nameserver \
|
||||
--anyone-relay --anyone-migrate \
|
||||
--anyone-nickname <relay-name> \
|
||||
--anyone-wallet <wallet-address> \
|
||||
--anyone-contact "<contact-info>" \
|
||||
--anyone-family "<fingerprint1>,<fingerprint2>,..."
|
||||
```
|
||||
|
||||
## ns3 - Nameserver + Relay
|
||||
|
||||
```bash
|
||||
# SSH: <user>@<ns3-ip>
|
||||
|
||||
sudo orama install --no-pull --pre-built \
|
||||
--join http://<ns1-ip> --token <TOKEN> \
|
||||
--vps-ip <ns3-ip> \
|
||||
--domain <your-domain.com> \
|
||||
--base-domain <your-domain.com> \
|
||||
--nameserver \
|
||||
--anyone-relay --anyone-migrate \
|
||||
--anyone-nickname <relay-name> \
|
||||
--anyone-wallet <wallet-address> \
|
||||
--anyone-contact "<contact-info>" \
|
||||
--anyone-family "<fingerprint1>,<fingerprint2>,..."
|
||||
```
|
||||
|
||||
## node4 - Non-Nameserver + Relay
|
||||
|
||||
```bash
|
||||
# SSH: <user>@<node4-ip>
|
||||
|
||||
sudo orama install --no-pull --pre-built \
|
||||
--join http://<ns1-ip> --token <TOKEN> \
|
||||
--vps-ip <node4-ip> \
|
||||
--domain node4.<your-domain.com> \
|
||||
--base-domain <your-domain.com> \
|
||||
--skip-checks \
|
||||
--anyone-relay --anyone-migrate \
|
||||
--anyone-nickname <relay-name> \
|
||||
--anyone-wallet <wallet-address> \
|
||||
--anyone-contact "<contact-info>" \
|
||||
--anyone-family "<fingerprint1>,<fingerprint2>,..."
|
||||
```
|
||||
|
||||
## node5 - Non-Nameserver + Relay
|
||||
|
||||
```bash
|
||||
# SSH: <user>@<node5-ip>
|
||||
|
||||
sudo orama install --no-pull --pre-built \
|
||||
--join http://<ns1-ip> --token <TOKEN> \
|
||||
--vps-ip <node5-ip> \
|
||||
--domain node5.<your-domain.com> \
|
||||
--base-domain <your-domain.com> \
|
||||
--skip-checks \
|
||||
--anyone-relay --anyone-migrate \
|
||||
--anyone-nickname <relay-name> \
|
||||
--anyone-wallet <wallet-address> \
|
||||
--anyone-contact "<contact-info>" \
|
||||
--anyone-family "<fingerprint1>,<fingerprint2>,..."
|
||||
```
|
||||
|
||||
## node6 - Non-Nameserver (No Anyone Relay)
|
||||
|
||||
```bash
|
||||
# SSH: <user>@<node6-ip>
|
||||
|
||||
sudo orama install --no-pull --pre-built \
|
||||
--join http://<ns1-ip> --token <TOKEN> \
|
||||
--vps-ip <node6-ip> \
|
||||
--domain node6.<your-domain.com> \
|
||||
--base-domain <your-domain.com> \
|
||||
--skip-checks
|
||||
```
|
||||
|
||||
## Verification
|
||||
|
||||
After all nodes are installed, verify cluster health:
|
||||
|
||||
```bash
|
||||
# Check RQLite cluster (from any node)
|
||||
curl -s http://localhost:5001/status | jq -r '.store.raft.state, .store.raft.num_peers'
|
||||
# Should show: Leader (on one node) and N-1 peers
|
||||
|
||||
# Check gateway health
|
||||
curl -s http://localhost:6001/health
|
||||
|
||||
# Check Anyone relay (on nodes with relays)
|
||||
systemctl status debros-anyone-relay
|
||||
```
|
||||
@ -119,10 +119,78 @@ ssh ubuntu@<ip> "sudo mv /tmp/orama /usr/local/bin/orama && sudo chmod +x /usr/l
|
||||
ssh ubuntu@<ip> "sudo orama upgrade --branch <branch> --restart"
|
||||
```
|
||||
|
||||
### Deploying to All 3 Nodes
|
||||
### Upgrading a Multi-Node Cluster (CRITICAL)
|
||||
|
||||
**NEVER restart all nodes simultaneously.** RQLite uses Raft consensus and requires a majority (quorum) to function. Restarting all nodes at once can cause cluster splits where nodes elect different leaders or form isolated clusters.
|
||||
|
||||
#### Safe Upgrade Procedure (Rolling Restart)
|
||||
|
||||
Always upgrade nodes **one at a time**, waiting for each to rejoin before proceeding:
|
||||
|
||||
```bash
|
||||
# 1. Build and deploy archives to ALL nodes first (don't restart yet)
|
||||
make build-linux-all
|
||||
./scripts/generate-source-archive.sh
|
||||
|
||||
# Copy to all nodes
|
||||
for ip in <ip1> <ip2> <ip3> ...; do
|
||||
scp /tmp/network-source.tar.gz ubuntu@$ip:/tmp/
|
||||
ssh ubuntu@$ip 'sudo bash -s' < scripts/extract-deploy.sh
|
||||
done
|
||||
|
||||
# 2. Upgrade nodes ONE AT A TIME (rolling restart)
|
||||
# Start with follower nodes, do the leader LAST
|
||||
|
||||
# Check which node is the RQLite leader
|
||||
ssh ubuntu@<any-node> 'curl -s http://localhost:5001/status | jq -r .store.raft.state'
|
||||
|
||||
# Upgrade a follower node
|
||||
ssh ubuntu@<follower-ip> 'sudo orama upgrade --no-pull --pre-built --restart'
|
||||
|
||||
# Wait for it to rejoin (check from any healthy node)
|
||||
ssh ubuntu@<leader-ip> 'curl -s http://localhost:5001/status | jq -r .store.raft.num_peers'
|
||||
# Should show the expected number of peers
|
||||
|
||||
# Repeat for each follower, then upgrade the leader last
|
||||
```
|
||||
|
||||
#### What NOT to Do
|
||||
|
||||
- **DON'T** stop all nodes, replace binaries, then start all nodes
|
||||
- **DON'T** run `orama upgrade --restart` on multiple nodes in parallel
|
||||
- **DON'T** clear RQLite data directories unless doing a full cluster rebuild
|
||||
- **DON'T** use `systemctl stop debros-node` on multiple nodes simultaneously
|
||||
|
||||
#### Recovery from Cluster Split
|
||||
|
||||
If nodes get stuck in "Candidate" state or show "leader not found" errors:
|
||||
|
||||
1. Identify which node has the most recent data (usually the old leader)
|
||||
2. Keep that node running as the new leader
|
||||
3. On each other node, clear RQLite data and restart:
|
||||
```bash
|
||||
sudo orama prod stop
|
||||
sudo rm -rf /home/debros/.orama/data/rqlite
|
||||
sudo systemctl start debros-node
|
||||
```
|
||||
4. The node should automatically rejoin using its configured `rqlite_join_address`
|
||||
|
||||
If automatic rejoin fails, the node may have started without the `-join` flag. Check:
|
||||
```bash
|
||||
ps aux | grep rqlited
|
||||
# Should include: -join 10.0.0.1:7001 (or similar)
|
||||
```
|
||||
|
||||
If `-join` is missing, the node bootstrapped standalone. You'll need to either:
|
||||
- Restart debros-node (it should detect empty data and use join)
|
||||
- Or do a full cluster rebuild from CLEAN_NODE.md
|
||||
|
||||
### Deploying to Multiple Nodes
|
||||
|
||||
To deploy to all nodes, repeat steps 3-5 (dev) or 3-4 (production) for each VPS IP.
|
||||
|
||||
**Important:** When using `--restart`, do nodes one at a time (see "Upgrading a Multi-Node Cluster" above).
|
||||
|
||||
### CLI Flags Reference
|
||||
|
||||
#### `orama install`
|
||||
@ -171,6 +239,26 @@ To deploy to all nodes, repeat steps 3-5 (dev) or 3-4 (production) for each VPS
|
||||
| `--pre-built` | Skip all Go compilation, use pre-built binaries already on disk |
|
||||
| `--restart` | Restart all services after upgrade |
|
||||
|
||||
#### `orama prod` (Service Management)
|
||||
|
||||
Use these commands to manage services on production nodes:
|
||||
|
||||
```bash
|
||||
# Stop all services (debros-node, coredns, caddy)
|
||||
sudo orama prod stop
|
||||
|
||||
# Start all services
|
||||
sudo orama prod start
|
||||
|
||||
# Restart all services
|
||||
sudo orama prod restart
|
||||
|
||||
# Check service status
|
||||
sudo orama prod status
|
||||
```
|
||||
|
||||
**Note:** Always use `orama prod stop` instead of manually running `systemctl stop`. The CLI ensures all related services (including CoreDNS and Caddy on nameserver nodes) are handled correctly.
|
||||
|
||||
### Node Join Flow
|
||||
|
||||
```bash
|
||||
|
||||
@ -12,6 +12,7 @@ type Flags struct {
|
||||
RestartServices bool
|
||||
NoPull bool
|
||||
PreBuilt bool
|
||||
SkipChecks bool
|
||||
Branch string
|
||||
Nameserver *bool // Pointer so we can detect if explicitly set vs default
|
||||
|
||||
@ -37,6 +38,7 @@ func ParseFlags(args []string) (*Flags, error) {
|
||||
fs.BoolVar(&flags.RestartServices, "restart", false, "Automatically restart services after upgrade")
|
||||
fs.BoolVar(&flags.NoPull, "no-pull", false, "Skip source download, use existing /home/debros/src")
|
||||
fs.BoolVar(&flags.PreBuilt, "pre-built", false, "Skip building binaries on VPS, use pre-built binaries already in /home/debros/bin and /usr/local/bin")
|
||||
fs.BoolVar(&flags.SkipChecks, "skip-checks", false, "Skip minimum resource checks (RAM/CPU)")
|
||||
fs.StringVar(&flags.Branch, "branch", "", "Git branch to use (uses saved preference if not specified)")
|
||||
|
||||
// Nameserver flag - use pointer to detect if explicitly set
|
||||
|
||||
@ -44,7 +44,7 @@ func NewOrchestrator(flags *Flags) *Orchestrator {
|
||||
isNameserver = *flags.Nameserver
|
||||
}
|
||||
|
||||
setup := production.NewProductionSetup(oramaHome, os.Stdout, flags.Force, branch, flags.NoPull, false, flags.PreBuilt)
|
||||
setup := production.NewProductionSetup(oramaHome, os.Stdout, flags.Force, branch, flags.NoPull, flags.SkipChecks, flags.PreBuilt)
|
||||
setup.SetNameserver(isNameserver)
|
||||
|
||||
// Configure Anyone relay if enabled
|
||||
|
||||
@ -506,6 +506,76 @@ func (g *Gateway) startOlricReconnectLoop(cfg olric.Config) {
|
||||
}()
|
||||
}
|
||||
|
||||
// Cache handler wrappers - these check cacheHandlers dynamically to support
|
||||
// background Olric reconnection. Without these, cache routes won't work if
|
||||
// Olric wasn't available at gateway startup but connected later.
|
||||
|
||||
func (g *Gateway) cacheHealthHandler(w http.ResponseWriter, r *http.Request) {
|
||||
g.olricMu.RLock()
|
||||
handlers := g.cacheHandlers
|
||||
g.olricMu.RUnlock()
|
||||
if handlers == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "cache service unavailable")
|
||||
return
|
||||
}
|
||||
handlers.HealthHandler(w, r)
|
||||
}
|
||||
|
||||
func (g *Gateway) cacheGetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
g.olricMu.RLock()
|
||||
handlers := g.cacheHandlers
|
||||
g.olricMu.RUnlock()
|
||||
if handlers == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "cache service unavailable")
|
||||
return
|
||||
}
|
||||
handlers.GetHandler(w, r)
|
||||
}
|
||||
|
||||
func (g *Gateway) cacheMGetHandler(w http.ResponseWriter, r *http.Request) {
|
||||
g.olricMu.RLock()
|
||||
handlers := g.cacheHandlers
|
||||
g.olricMu.RUnlock()
|
||||
if handlers == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "cache service unavailable")
|
||||
return
|
||||
}
|
||||
handlers.MultiGetHandler(w, r)
|
||||
}
|
||||
|
||||
func (g *Gateway) cachePutHandler(w http.ResponseWriter, r *http.Request) {
|
||||
g.olricMu.RLock()
|
||||
handlers := g.cacheHandlers
|
||||
g.olricMu.RUnlock()
|
||||
if handlers == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "cache service unavailable")
|
||||
return
|
||||
}
|
||||
handlers.SetHandler(w, r)
|
||||
}
|
||||
|
||||
func (g *Gateway) cacheDeleteHandler(w http.ResponseWriter, r *http.Request) {
|
||||
g.olricMu.RLock()
|
||||
handlers := g.cacheHandlers
|
||||
g.olricMu.RUnlock()
|
||||
if handlers == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "cache service unavailable")
|
||||
return
|
||||
}
|
||||
handlers.DeleteHandler(w, r)
|
||||
}
|
||||
|
||||
func (g *Gateway) cacheScanHandler(w http.ResponseWriter, r *http.Request) {
|
||||
g.olricMu.RLock()
|
||||
handlers := g.cacheHandlers
|
||||
g.olricMu.RUnlock()
|
||||
if handlers == nil {
|
||||
writeError(w, http.StatusServiceUnavailable, "cache service unavailable")
|
||||
return
|
||||
}
|
||||
handlers.ScanHandler(w, r)
|
||||
}
|
||||
|
||||
// namespaceClusterStatusHandler handles GET /v1/namespace/status?id={cluster_id}
|
||||
// This endpoint is public (no API key required) to allow polling during provisioning.
|
||||
func (g *Gateway) namespaceClusterStatusHandler(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
@ -6,7 +6,9 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/gateway"
|
||||
"github.com/DeBrosOfficial/network/pkg/olric"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"go.uber.org/zap"
|
||||
@ -14,7 +16,7 @@ import (
|
||||
|
||||
// SpawnRequest represents a request to spawn or stop a namespace instance
|
||||
type SpawnRequest struct {
|
||||
Action string `json:"action"` // "spawn-rqlite", "spawn-olric", "stop-rqlite", "stop-olric"
|
||||
Action string `json:"action"` // "spawn-rqlite", "spawn-olric", "spawn-gateway", "stop-rqlite", "stop-olric", "stop-gateway"
|
||||
Namespace string `json:"namespace"`
|
||||
NodeID string `json:"node_id"`
|
||||
|
||||
@ -32,6 +34,16 @@ type SpawnRequest struct {
|
||||
OlricBindAddr string `json:"olric_bind_addr,omitempty"`
|
||||
OlricAdvertiseAddr string `json:"olric_advertise_addr,omitempty"`
|
||||
OlricPeerAddresses []string `json:"olric_peer_addresses,omitempty"`
|
||||
|
||||
// Gateway config (when action = "spawn-gateway")
|
||||
GatewayHTTPPort int `json:"gateway_http_port,omitempty"`
|
||||
GatewayBaseDomain string `json:"gateway_base_domain,omitempty"`
|
||||
GatewayRQLiteDSN string `json:"gateway_rqlite_dsn,omitempty"`
|
||||
GatewayOlricServers []string `json:"gateway_olric_servers,omitempty"`
|
||||
IPFSClusterAPIURL string `json:"ipfs_cluster_api_url,omitempty"`
|
||||
IPFSAPIURL string `json:"ipfs_api_url,omitempty"`
|
||||
IPFSTimeout string `json:"ipfs_timeout,omitempty"`
|
||||
IPFSReplicationFactor int `json:"ipfs_replication_factor,omitempty"`
|
||||
}
|
||||
|
||||
// SpawnResponse represents the response from a spawn/stop request
|
||||
@ -46,16 +58,18 @@ type SpawnResponse struct {
|
||||
type SpawnHandler struct {
|
||||
rqliteSpawner *rqlite.InstanceSpawner
|
||||
olricSpawner *olric.InstanceSpawner
|
||||
gatewaySpawner *gateway.InstanceSpawner
|
||||
logger *zap.Logger
|
||||
rqliteInstances map[string]*rqlite.Instance // key: "namespace:nodeID"
|
||||
rqliteInstanceMu sync.RWMutex
|
||||
}
|
||||
|
||||
// NewSpawnHandler creates a new spawn handler
|
||||
func NewSpawnHandler(rs *rqlite.InstanceSpawner, os *olric.InstanceSpawner, logger *zap.Logger) *SpawnHandler {
|
||||
func NewSpawnHandler(rs *rqlite.InstanceSpawner, os *olric.InstanceSpawner, gs *gateway.InstanceSpawner, logger *zap.Logger) *SpawnHandler {
|
||||
return &SpawnHandler{
|
||||
rqliteSpawner: rs,
|
||||
olricSpawner: os,
|
||||
gatewaySpawner: gs,
|
||||
logger: logger.With(zap.String("component", "namespace-spawn-handler")),
|
||||
rqliteInstances: make(map[string]*rqlite.Instance),
|
||||
}
|
||||
@ -165,6 +179,46 @@ func (h *SpawnHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
||||
|
||||
case "spawn-gateway":
|
||||
// Parse IPFS timeout if provided
|
||||
var ipfsTimeout time.Duration
|
||||
if req.IPFSTimeout != "" {
|
||||
var err error
|
||||
ipfsTimeout, err = time.ParseDuration(req.IPFSTimeout)
|
||||
if err != nil {
|
||||
h.logger.Warn("Invalid IPFS timeout, using default", zap.String("timeout", req.IPFSTimeout), zap.Error(err))
|
||||
ipfsTimeout = 60 * time.Second
|
||||
}
|
||||
}
|
||||
|
||||
cfg := gateway.InstanceConfig{
|
||||
Namespace: req.Namespace,
|
||||
NodeID: req.NodeID,
|
||||
HTTPPort: req.GatewayHTTPPort,
|
||||
BaseDomain: req.GatewayBaseDomain,
|
||||
RQLiteDSN: req.GatewayRQLiteDSN,
|
||||
OlricServers: req.GatewayOlricServers,
|
||||
IPFSClusterAPIURL: req.IPFSClusterAPIURL,
|
||||
IPFSAPIURL: req.IPFSAPIURL,
|
||||
IPFSTimeout: ipfsTimeout,
|
||||
IPFSReplicationFactor: req.IPFSReplicationFactor,
|
||||
}
|
||||
instance, err := h.gatewaySpawner.SpawnInstance(ctx, cfg)
|
||||
if err != nil {
|
||||
h.logger.Error("Failed to spawn Gateway instance", zap.Error(err))
|
||||
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
|
||||
return
|
||||
}
|
||||
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true, PID: instance.PID})
|
||||
|
||||
case "stop-gateway":
|
||||
if err := h.gatewaySpawner.StopInstance(ctx, req.Namespace, req.NodeID); err != nil {
|
||||
h.logger.Error("Failed to stop Gateway instance", zap.Error(err))
|
||||
writeSpawnResponse(w, http.StatusInternalServerError, SpawnResponse{Error: err.Error()})
|
||||
return
|
||||
}
|
||||
writeSpawnResponse(w, http.StatusOK, SpawnResponse{Success: true})
|
||||
|
||||
default:
|
||||
writeSpawnResponse(w, http.StatusBadRequest, SpawnResponse{Error: fmt.Sprintf("unknown action: %s", req.Action)})
|
||||
}
|
||||
|
||||
@ -89,15 +89,14 @@ func (g *Gateway) Routes() http.Handler {
|
||||
// anon proxy (authenticated users only)
|
||||
mux.HandleFunc("/v1/proxy/anon", g.anonProxyHandler)
|
||||
|
||||
// cache endpoints (Olric)
|
||||
if g.cacheHandlers != nil {
|
||||
mux.HandleFunc("/v1/cache/health", g.cacheHandlers.HealthHandler)
|
||||
mux.HandleFunc("/v1/cache/get", g.cacheHandlers.GetHandler)
|
||||
mux.HandleFunc("/v1/cache/mget", g.cacheHandlers.MultiGetHandler)
|
||||
mux.HandleFunc("/v1/cache/put", g.cacheHandlers.SetHandler)
|
||||
mux.HandleFunc("/v1/cache/delete", g.cacheHandlers.DeleteHandler)
|
||||
mux.HandleFunc("/v1/cache/scan", g.cacheHandlers.ScanHandler)
|
||||
}
|
||||
// cache endpoints (Olric) - always register, check handler dynamically
|
||||
// This allows cache routes to work after background Olric reconnection
|
||||
mux.HandleFunc("/v1/cache/health", g.cacheHealthHandler)
|
||||
mux.HandleFunc("/v1/cache/get", g.cacheGetHandler)
|
||||
mux.HandleFunc("/v1/cache/mget", g.cacheMGetHandler)
|
||||
mux.HandleFunc("/v1/cache/put", g.cachePutHandler)
|
||||
mux.HandleFunc("/v1/cache/delete", g.cacheDeleteHandler)
|
||||
mux.HandleFunc("/v1/cache/scan", g.cacheScanHandler)
|
||||
|
||||
// storage endpoints (IPFS)
|
||||
if g.storageHandlers != nil {
|
||||
|
||||
@ -458,7 +458,7 @@ func (cm *ClusterManager) startOlricCluster(ctx context.Context, cluster *Namesp
|
||||
return instances, nil
|
||||
}
|
||||
|
||||
// startGatewayCluster starts Gateway instances on all nodes
|
||||
// startGatewayCluster starts Gateway instances on all nodes (locally or remotely)
|
||||
func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *NamespaceCluster, nodes []NodeCapacity, portBlocks []*PortBlock, rqliteInstances []*rqlite.Instance, olricInstances []*olric.OlricInstance) ([]*gateway.GatewayInstance, error) {
|
||||
instances := make([]*gateway.GatewayInstance, len(nodes))
|
||||
|
||||
@ -470,7 +470,7 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name
|
||||
|
||||
// Start all Gateway instances
|
||||
for i, node := range nodes {
|
||||
// Connect to local RQLite instance
|
||||
// Connect to local RQLite instance on each node
|
||||
rqliteDSN := fmt.Sprintf("http://localhost:%d", portBlocks[i].RQLiteHTTPPort)
|
||||
|
||||
cfg := gateway.InstanceConfig{
|
||||
@ -486,11 +486,19 @@ func (cm *ClusterManager) startGatewayCluster(ctx context.Context, cluster *Name
|
||||
IPFSReplicationFactor: cm.ipfsReplicationFactor,
|
||||
}
|
||||
|
||||
instance, err := cm.gatewaySpawner.SpawnInstance(ctx, cfg)
|
||||
var instance *gateway.GatewayInstance
|
||||
var err error
|
||||
if node.NodeID == cm.localNodeID {
|
||||
cm.logger.Info("Spawning Gateway locally", zap.String("node", node.NodeID))
|
||||
instance, err = cm.gatewaySpawner.SpawnInstance(ctx, cfg)
|
||||
} else {
|
||||
cm.logger.Info("Spawning Gateway remotely", zap.String("node", node.NodeID), zap.String("ip", node.InternalIP))
|
||||
instance, err = cm.spawnGatewayRemote(ctx, node.InternalIP, cfg)
|
||||
}
|
||||
if err != nil {
|
||||
// Stop previously started instances
|
||||
for j := 0; j < i; j++ {
|
||||
cm.gatewaySpawner.StopInstance(ctx, cluster.NamespaceName, nodes[j].NodeID)
|
||||
cm.stopGatewayOnNode(ctx, nodes[j].NodeID, nodes[j].InternalIP, cluster.NamespaceName)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to start Gateway on node %s: %w", node.NodeID, err)
|
||||
}
|
||||
@ -549,6 +557,40 @@ func (cm *ClusterManager) spawnOlricRemote(ctx context.Context, nodeIP string, c
|
||||
}, nil
|
||||
}
|
||||
|
||||
// spawnGatewayRemote sends a spawn-gateway request to a remote node
|
||||
func (cm *ClusterManager) spawnGatewayRemote(ctx context.Context, nodeIP string, cfg gateway.InstanceConfig) (*gateway.GatewayInstance, error) {
|
||||
ipfsTimeout := ""
|
||||
if cfg.IPFSTimeout > 0 {
|
||||
ipfsTimeout = cfg.IPFSTimeout.String()
|
||||
}
|
||||
|
||||
resp, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
|
||||
"action": "spawn-gateway",
|
||||
"namespace": cfg.Namespace,
|
||||
"node_id": cfg.NodeID,
|
||||
"gateway_http_port": cfg.HTTPPort,
|
||||
"gateway_base_domain": cfg.BaseDomain,
|
||||
"gateway_rqlite_dsn": cfg.RQLiteDSN,
|
||||
"gateway_olric_servers": cfg.OlricServers,
|
||||
"ipfs_cluster_api_url": cfg.IPFSClusterAPIURL,
|
||||
"ipfs_api_url": cfg.IPFSAPIURL,
|
||||
"ipfs_timeout": ipfsTimeout,
|
||||
"ipfs_replication_factor": cfg.IPFSReplicationFactor,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &gateway.GatewayInstance{
|
||||
Namespace: cfg.Namespace,
|
||||
NodeID: cfg.NodeID,
|
||||
HTTPPort: cfg.HTTPPort,
|
||||
BaseDomain: cfg.BaseDomain,
|
||||
RQLiteDSN: cfg.RQLiteDSN,
|
||||
OlricServers: cfg.OlricServers,
|
||||
PID: resp.PID,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// spawnResponse represents the JSON response from a spawn request
|
||||
type spawnResponse struct {
|
||||
Success bool `json:"success"`
|
||||
@ -615,6 +657,15 @@ func (cm *ClusterManager) stopOlricOnNode(ctx context.Context, nodeID, nodeIP, n
|
||||
}
|
||||
}
|
||||
|
||||
// stopGatewayOnNode stops a Gateway instance on a node (local or remote)
|
||||
func (cm *ClusterManager) stopGatewayOnNode(ctx context.Context, nodeID, nodeIP, namespace string) {
|
||||
if nodeID == cm.localNodeID {
|
||||
cm.gatewaySpawner.StopInstance(ctx, namespace, nodeID)
|
||||
} else {
|
||||
cm.sendStopRequest(ctx, nodeIP, "stop-gateway", namespace, nodeID)
|
||||
}
|
||||
}
|
||||
|
||||
// sendStopRequest sends a stop request to a remote node
|
||||
func (cm *ClusterManager) sendStopRequest(ctx context.Context, nodeIP, action, namespace, nodeID string) {
|
||||
_, err := cm.sendSpawnRequest(ctx, nodeIP, map[string]interface{}{
|
||||
|
||||
@ -82,7 +82,8 @@ func (n *Node) startHTTPGateway(ctx context.Context) error {
|
||||
// Wire spawn handler for distributed namespace instance spawning
|
||||
rqliteSpawner := rqlitepkg.NewInstanceSpawner(baseDataDir, n.logger.Logger)
|
||||
olricSpawner := olricpkg.NewInstanceSpawner(baseDataDir, n.logger.Logger)
|
||||
spawnHandler := namespacehandlers.NewSpawnHandler(rqliteSpawner, olricSpawner, n.logger.Logger)
|
||||
gatewaySpawner := gateway.NewInstanceSpawner(baseDataDir, n.logger.Logger)
|
||||
spawnHandler := namespacehandlers.NewSpawnHandler(rqliteSpawner, olricSpawner, gatewaySpawner, n.logger.Logger)
|
||||
apiGateway.SetSpawnHandler(spawnHandler)
|
||||
|
||||
// Wire namespace delete handler
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user