feat: enhance configuration generation in initFullStack

- Added steps to generate node2.yaml, node3.yaml, and gateway.yaml configurations during the full stack initialization process.
- Implemented checks to prevent overwriting existing configuration files unless the --force flag is used.
- Improved output messages to provide clear feedback on generated configurations and bootstrap information.
This commit is contained in:
anonpenguin23 2025-10-27 09:00:49 +02:00
parent 030ea88cf6
commit dbf8660941
No known key found for this signature in database
GPG Key ID: 1CBB1FE35AFBEE30
8 changed files with 141 additions and 24 deletions

View File

@ -21,7 +21,7 @@ test-e2e:
.PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports
VERSION := 0.52.10 VERSION := 0.52.12
COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown)
DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ)
LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)'

View File

@ -309,12 +309,65 @@ func initFullStack(force bool) {
} }
fmt.Printf("✅ Generated bootstrap config: %s\n", bootstrapPath) fmt.Printf("✅ Generated bootstrap config: %s\n", bootstrapPath)
// Generate node2, node3, gateway configs... // Step 3: Generate node2.yaml
// (keeping implementation similar to original) node2Name := "node2.yaml"
node2Path := filepath.Join(debrosDir, node2Name)
if !force {
if _, err := os.Stat(node2Path); err == nil {
fmt.Fprintf(os.Stderr, "Node2 config already exists at %s (use --force to overwrite)\n", node2Path)
os.Exit(1)
}
}
node2Content := GenerateNodeConfig(node2Name, "", 4002, 5002, 7002, "localhost:7001", bootstrapMultiaddr)
if err := os.WriteFile(node2Path, []byte(node2Content), 0644); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write node2 config: %v\n", err)
os.Exit(1)
}
fmt.Printf("✅ Generated node2 config: %s\n", node2Path)
// Step 4: Generate node3.yaml
node3Name := "node3.yaml"
node3Path := filepath.Join(debrosDir, node3Name)
if !force {
if _, err := os.Stat(node3Path); err == nil {
fmt.Fprintf(os.Stderr, "Node3 config already exists at %s (use --force to overwrite)\n", node3Path)
os.Exit(1)
}
}
node3Content := GenerateNodeConfig(node3Name, "", 4003, 5003, 7003, "localhost:7001", bootstrapMultiaddr)
if err := os.WriteFile(node3Path, []byte(node3Content), 0644); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write node3 config: %v\n", err)
os.Exit(1)
}
fmt.Printf("✅ Generated node3 config: %s\n", node3Path)
// Step 5: Generate gateway.yaml
gatewayName := "gateway.yaml"
gatewayPath := filepath.Join(debrosDir, gatewayName)
if !force {
if _, err := os.Stat(gatewayPath); err == nil {
fmt.Fprintf(os.Stderr, "Gateway config already exists at %s (use --force to overwrite)\n", gatewayPath)
os.Exit(1)
}
}
gatewayContent := GenerateGatewayConfig(bootstrapMultiaddr)
if err := os.WriteFile(gatewayPath, []byte(gatewayContent), 0644); err != nil {
fmt.Fprintf(os.Stderr, "Failed to write gateway config: %v\n", err)
os.Exit(1)
}
fmt.Printf("✅ Generated gateway config: %s\n", gatewayPath)
fmt.Printf("\n" + strings.Repeat("=", 60) + "\n") fmt.Printf("\n" + strings.Repeat("=", 60) + "\n")
fmt.Printf("✅ Full network stack initialized successfully!\n") fmt.Printf("✅ Full network stack initialized successfully!\n")
fmt.Printf(strings.Repeat("=", 60) + "\n") fmt.Printf(strings.Repeat("=", 60) + "\n")
fmt.Printf("\nBootstrap Peer ID: %s\n", bootstrapInfo.PeerID.String())
fmt.Printf("Bootstrap Multiaddr: %s\n", bootstrapMultiaddr)
fmt.Printf("\nGenerated configs:\n")
fmt.Printf(" - %s\n", bootstrapPath)
fmt.Printf(" - %s\n", node2Path)
fmt.Printf(" - %s\n", node3Path)
fmt.Printf(" - %s\n", gatewayPath)
fmt.Printf("\nStart the network with: make dev\n")
} }
// GenerateNodeConfig generates a node configuration // GenerateNodeConfig generates a node configuration

View File

@ -158,6 +158,8 @@ func (c *Client) Connect() error {
var ps *libp2ppubsub.PubSub var ps *libp2ppubsub.PubSub
ps, err = libp2ppubsub.NewGossipSub(context.Background(), h, ps, err = libp2ppubsub.NewGossipSub(context.Background(), h,
libp2ppubsub.WithPeerExchange(true), libp2ppubsub.WithPeerExchange(true),
libp2ppubsub.WithFloodPublish(true), // Ensure messages reach all peers, not just mesh
libp2ppubsub.WithDirectPeers(nil), // Enable direct peer connections
) )
if err != nil { if err != nil {
h.Close() h.Close()

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/DeBrosOfficial/network/pkg/client" "github.com/DeBrosOfficial/network/pkg/client"
"go.uber.org/zap"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -63,18 +64,28 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
// Subscribe to the topic; push data into msgs with simple per-connection de-dup // Subscribe to the topic; push data into msgs with simple per-connection de-dup
recent := make(map[string]time.Time) recent := make(map[string]time.Time)
h := func(_ string, data []byte) error { h := func(_ string, data []byte) error {
g.logger.ComponentInfo("gateway", "pubsub ws: received message",
zap.String("topic", topic),
zap.Int("data_len", len(data)))
// Drop duplicates seen in the last 2 seconds // Drop duplicates seen in the last 2 seconds
sum := sha256.Sum256(data) sum := sha256.Sum256(data)
key := hex.EncodeToString(sum[:]) key := hex.EncodeToString(sum[:])
if t, ok := recent[key]; ok && time.Since(t) < 2*time.Second { if t, ok := recent[key]; ok && time.Since(t) < 2*time.Second {
g.logger.ComponentInfo("gateway", "pubsub ws: dropping duplicate",
zap.String("topic", topic))
return nil return nil
} }
recent[key] = time.Now() recent[key] = time.Now()
select { select {
case msgs <- data: case msgs <- data:
g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client",
zap.String("topic", topic))
return nil return nil
default: default:
// Drop if client is slow to avoid blocking network // Drop if client is slow to avoid blocking network
g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message",
zap.String("topic", topic))
return nil return nil
} }
} }
@ -100,8 +111,19 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request)
close(done) close(done)
return return
} }
// Format message as JSON envelope with data (base64 encoded), timestamp, and topic
// This matches the SDK's Message interface: {data: string, timestamp: number, topic: string}
envelope := map[string]interface{}{
"data": base64.StdEncoding.EncodeToString(b),
"timestamp": time.Now().UnixMilli(),
"topic": topic,
}
envelopeJSON, err := json.Marshal(envelope)
if err != nil {
continue
}
conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) conn.SetWriteDeadline(time.Now().Add(30 * time.Second))
if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil { if err := conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil {
close(done) close(done)
return return
} }
@ -160,10 +182,23 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) {
writeError(w, http.StatusBadRequest, "invalid base64 data") writeError(w, http.StatusBadRequest, "invalid base64 data")
return return
} }
g.logger.ComponentInfo("gateway", "pubsub publish: publishing message",
zap.String("topic", body.Topic),
zap.String("namespace", ns),
zap.Int("data_len", len(data)))
if err := g.client.PubSub().Publish(client.WithInternalAuth(r.Context()), body.Topic, data); err != nil { if err := g.client.PubSub().Publish(client.WithInternalAuth(r.Context()), body.Topic, data); err != nil {
g.logger.ComponentWarn("gateway", "pubsub publish: failed",
zap.String("topic", body.Topic),
zap.Error(err))
writeError(w, http.StatusInternalServerError, err.Error()) writeError(w, http.StatusInternalServerError, err.Error())
return return
} }
g.logger.ComponentInfo("gateway", "pubsub publish: message published successfully",
zap.String("topic", body.Topic))
// rely on libp2p to deliver to WS subscribers // rely on libp2p to deliver to WS subscribers
writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) writeJSON(w, http.StatusOK, map[string]any{"status": "ok"})
} }

View File

@ -306,6 +306,8 @@ func (n *Node) startLibP2P() error {
// Initialize pubsub // Initialize pubsub
ps, err := libp2ppubsub.NewGossipSub(context.Background(), h, ps, err := libp2ppubsub.NewGossipSub(context.Background(), h,
libp2ppubsub.WithPeerExchange(true), libp2ppubsub.WithPeerExchange(true),
libp2ppubsub.WithFloodPublish(true), // Ensure messages reach all peers, not just mesh
libp2ppubsub.WithDirectPeers(nil), // Enable direct peer connections
) )
if err != nil { if err != nil {
return fmt.Errorf("failed to create pubsub: %w", err) return fmt.Errorf("failed to create pubsub: %w", err)

View File

@ -2,6 +2,7 @@ package pubsub
import ( import (
"context" "context"
"log"
"time" "time"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
@ -31,23 +32,50 @@ func (m *Manager) announceTopicInterest(topicName string) {
} }
// forceTopicPeerDiscovery uses a simple strategy to announce presence on the topic. // forceTopicPeerDiscovery uses a simple strategy to announce presence on the topic.
// It publishes a lightweight discovery ping and returns quickly. // It publishes lightweight discovery pings continuously to maintain mesh health.
func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) {
// If pubsub already reports peers for this topic, do nothing. log.Printf("[PUBSUB] Starting continuous peer discovery for topic: %s", topicName)
peers := topic.ListPeers()
if len(peers) > 0 { // Initial aggressive discovery phase (10 attempts)
return for attempt := 0; attempt < 10; attempt++ {
peers := topic.ListPeers()
if len(peers) > 0 {
log.Printf("[PUBSUB] Topic %s: Found %d peers in initial discovery", topicName, len(peers))
break
}
log.Printf("[PUBSUB] Topic %s: Initial attempt %d, sending discovery ping", topicName, attempt+1)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
discoveryMsg := []byte("PEER_DISCOVERY_PING")
_ = topic.Publish(ctx, discoveryMsg)
cancel()
delay := time.Duration(100*(attempt+1)) * time.Millisecond
if delay > 2*time.Second {
delay = 2 * time.Second
}
time.Sleep(delay)
} }
// Send a short-lived discovery ping to the topic to announce presence. // Continuous maintenance phase - keep pinging every 15 seconds
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) ticker := time.NewTicker(15 * time.Second)
defer cancel() defer ticker.Stop()
discoveryMsg := []byte("PEER_DISCOVERY_PING") for i := 0; i < 20; i++ { // Run for ~5 minutes total
_ = topic.Publish(ctx, discoveryMsg) <-ticker.C
peers := topic.ListPeers()
// Wait briefly to allow peers to respond via pubsub peer exchange if len(peers) == 0 {
time.Sleep(300 * time.Millisecond) log.Printf("[PUBSUB] Topic %s: No peers, sending maintenance ping", topicName)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
discoveryMsg := []byte("PEER_DISCOVERY_PING")
_ = topic.Publish(ctx, discoveryMsg)
cancel()
}
}
log.Printf("[PUBSUB] Topic %s: Peer discovery maintenance completed", topicName)
} }
// monitorTopicPeers periodically checks topic peer connectivity and stops once peers are found. // monitorTopicPeers periodically checks topic peer connectivity and stops once peers are found.

View File

@ -82,9 +82,6 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa
} }
}() }()
// Force peer discovery for this topic (application-agnostic)
go m.announceTopicInterest(namespacedTopic)
return nil return nil
} }

View File

@ -137,8 +137,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error {
r.cmd = exec.Command("rqlited", args...) r.cmd = exec.Command("rqlited", args...)
// Enable debug logging of RQLite process to help diagnose issues // Enable debug logging of RQLite process to help diagnose issues
// r.cmd.Stdout = os.Stdout r.cmd.Stdout = os.Stdout
// r.cmd.Stderr = os.Stderr r.cmd.Stderr = os.Stderr
if err := r.cmd.Start(); err != nil { if err := r.cmd.Start(); err != nil {
return fmt.Errorf("failed to start RQLite: %w", err) return fmt.Errorf("failed to start RQLite: %w", err)