diff --git a/Makefile b/Makefile index eb6c9b3..769f848 100644 --- a/Makefile +++ b/Makefile @@ -21,7 +21,7 @@ test-e2e: .PHONY: build clean test run-node run-node2 run-node3 run-example deps tidy fmt vet lint clear-ports -VERSION := 0.52.10 +VERSION := 0.52.12 COMMIT ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo unknown) DATE ?= $(shell date -u +%Y-%m-%dT%H:%M:%SZ) LDFLAGS := -X 'main.version=$(VERSION)' -X 'main.commit=$(COMMIT)' -X 'main.date=$(DATE)' diff --git a/pkg/cli/config_commands.go b/pkg/cli/config_commands.go index 1b31d1d..4ae5674 100644 --- a/pkg/cli/config_commands.go +++ b/pkg/cli/config_commands.go @@ -309,12 +309,65 @@ func initFullStack(force bool) { } fmt.Printf("✅ Generated bootstrap config: %s\n", bootstrapPath) - // Generate node2, node3, gateway configs... - // (keeping implementation similar to original) + // Step 3: Generate node2.yaml + node2Name := "node2.yaml" + node2Path := filepath.Join(debrosDir, node2Name) + if !force { + if _, err := os.Stat(node2Path); err == nil { + fmt.Fprintf(os.Stderr, "Node2 config already exists at %s (use --force to overwrite)\n", node2Path) + os.Exit(1) + } + } + node2Content := GenerateNodeConfig(node2Name, "", 4002, 5002, 7002, "localhost:7001", bootstrapMultiaddr) + if err := os.WriteFile(node2Path, []byte(node2Content), 0644); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write node2 config: %v\n", err) + os.Exit(1) + } + fmt.Printf("✅ Generated node2 config: %s\n", node2Path) + + // Step 4: Generate node3.yaml + node3Name := "node3.yaml" + node3Path := filepath.Join(debrosDir, node3Name) + if !force { + if _, err := os.Stat(node3Path); err == nil { + fmt.Fprintf(os.Stderr, "Node3 config already exists at %s (use --force to overwrite)\n", node3Path) + os.Exit(1) + } + } + node3Content := GenerateNodeConfig(node3Name, "", 4003, 5003, 7003, "localhost:7001", bootstrapMultiaddr) + if err := os.WriteFile(node3Path, []byte(node3Content), 0644); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write node3 config: %v\n", err) + os.Exit(1) + } + fmt.Printf("✅ Generated node3 config: %s\n", node3Path) + + // Step 5: Generate gateway.yaml + gatewayName := "gateway.yaml" + gatewayPath := filepath.Join(debrosDir, gatewayName) + if !force { + if _, err := os.Stat(gatewayPath); err == nil { + fmt.Fprintf(os.Stderr, "Gateway config already exists at %s (use --force to overwrite)\n", gatewayPath) + os.Exit(1) + } + } + gatewayContent := GenerateGatewayConfig(bootstrapMultiaddr) + if err := os.WriteFile(gatewayPath, []byte(gatewayContent), 0644); err != nil { + fmt.Fprintf(os.Stderr, "Failed to write gateway config: %v\n", err) + os.Exit(1) + } + fmt.Printf("✅ Generated gateway config: %s\n", gatewayPath) fmt.Printf("\n" + strings.Repeat("=", 60) + "\n") fmt.Printf("✅ Full network stack initialized successfully!\n") fmt.Printf(strings.Repeat("=", 60) + "\n") + fmt.Printf("\nBootstrap Peer ID: %s\n", bootstrapInfo.PeerID.String()) + fmt.Printf("Bootstrap Multiaddr: %s\n", bootstrapMultiaddr) + fmt.Printf("\nGenerated configs:\n") + fmt.Printf(" - %s\n", bootstrapPath) + fmt.Printf(" - %s\n", node2Path) + fmt.Printf(" - %s\n", node3Path) + fmt.Printf(" - %s\n", gatewayPath) + fmt.Printf("\nStart the network with: make dev\n") } // GenerateNodeConfig generates a node configuration diff --git a/pkg/client/client.go b/pkg/client/client.go index 1f3938d..a0b06dd 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -158,6 +158,8 @@ func (c *Client) Connect() error { var ps *libp2ppubsub.PubSub ps, err = libp2ppubsub.NewGossipSub(context.Background(), h, libp2ppubsub.WithPeerExchange(true), + libp2ppubsub.WithFloodPublish(true), // Ensure messages reach all peers, not just mesh + libp2ppubsub.WithDirectPeers(nil), // Enable direct peer connections ) if err != nil { h.Close() diff --git a/pkg/gateway/pubsub_handlers.go b/pkg/gateway/pubsub_handlers.go index 971a09c..3f2f25f 100644 --- a/pkg/gateway/pubsub_handlers.go +++ b/pkg/gateway/pubsub_handlers.go @@ -9,6 +9,7 @@ import ( "time" "github.com/DeBrosOfficial/network/pkg/client" + "go.uber.org/zap" "github.com/gorilla/websocket" ) @@ -63,18 +64,28 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) // Subscribe to the topic; push data into msgs with simple per-connection de-dup recent := make(map[string]time.Time) h := func(_ string, data []byte) error { + g.logger.ComponentInfo("gateway", "pubsub ws: received message", + zap.String("topic", topic), + zap.Int("data_len", len(data))) + // Drop duplicates seen in the last 2 seconds sum := sha256.Sum256(data) key := hex.EncodeToString(sum[:]) if t, ok := recent[key]; ok && time.Since(t) < 2*time.Second { + g.logger.ComponentInfo("gateway", "pubsub ws: dropping duplicate", + zap.String("topic", topic)) return nil } recent[key] = time.Now() select { case msgs <- data: + g.logger.ComponentInfo("gateway", "pubsub ws: forwarded to client", + zap.String("topic", topic)) return nil default: // Drop if client is slow to avoid blocking network + g.logger.ComponentWarn("gateway", "pubsub ws: client slow, dropping message", + zap.String("topic", topic)) return nil } } @@ -100,8 +111,19 @@ func (g *Gateway) pubsubWebsocketHandler(w http.ResponseWriter, r *http.Request) close(done) return } + // Format message as JSON envelope with data (base64 encoded), timestamp, and topic + // This matches the SDK's Message interface: {data: string, timestamp: number, topic: string} + envelope := map[string]interface{}{ + "data": base64.StdEncoding.EncodeToString(b), + "timestamp": time.Now().UnixMilli(), + "topic": topic, + } + envelopeJSON, err := json.Marshal(envelope) + if err != nil { + continue + } conn.SetWriteDeadline(time.Now().Add(30 * time.Second)) - if err := conn.WriteMessage(websocket.BinaryMessage, b); err != nil { + if err := conn.WriteMessage(websocket.TextMessage, envelopeJSON); err != nil { close(done) return } @@ -160,10 +182,23 @@ func (g *Gateway) pubsubPublishHandler(w http.ResponseWriter, r *http.Request) { writeError(w, http.StatusBadRequest, "invalid base64 data") return } + + g.logger.ComponentInfo("gateway", "pubsub publish: publishing message", + zap.String("topic", body.Topic), + zap.String("namespace", ns), + zap.Int("data_len", len(data))) + if err := g.client.PubSub().Publish(client.WithInternalAuth(r.Context()), body.Topic, data); err != nil { + g.logger.ComponentWarn("gateway", "pubsub publish: failed", + zap.String("topic", body.Topic), + zap.Error(err)) writeError(w, http.StatusInternalServerError, err.Error()) return } + + g.logger.ComponentInfo("gateway", "pubsub publish: message published successfully", + zap.String("topic", body.Topic)) + // rely on libp2p to deliver to WS subscribers writeJSON(w, http.StatusOK, map[string]any{"status": "ok"}) } diff --git a/pkg/node/node.go b/pkg/node/node.go index 12327e2..988ab1d 100644 --- a/pkg/node/node.go +++ b/pkg/node/node.go @@ -306,6 +306,8 @@ func (n *Node) startLibP2P() error { // Initialize pubsub ps, err := libp2ppubsub.NewGossipSub(context.Background(), h, libp2ppubsub.WithPeerExchange(true), + libp2ppubsub.WithFloodPublish(true), // Ensure messages reach all peers, not just mesh + libp2ppubsub.WithDirectPeers(nil), // Enable direct peer connections ) if err != nil { return fmt.Errorf("failed to create pubsub: %w", err) diff --git a/pkg/pubsub/discovery_integration.go b/pkg/pubsub/discovery_integration.go index 91b2521..4016a63 100644 --- a/pkg/pubsub/discovery_integration.go +++ b/pkg/pubsub/discovery_integration.go @@ -2,6 +2,7 @@ package pubsub import ( "context" + "log" "time" pubsub "github.com/libp2p/go-libp2p-pubsub" @@ -31,23 +32,50 @@ func (m *Manager) announceTopicInterest(topicName string) { } // forceTopicPeerDiscovery uses a simple strategy to announce presence on the topic. -// It publishes a lightweight discovery ping and returns quickly. +// It publishes lightweight discovery pings continuously to maintain mesh health. func (m *Manager) forceTopicPeerDiscovery(topicName string, topic *pubsub.Topic) { - // If pubsub already reports peers for this topic, do nothing. - peers := topic.ListPeers() - if len(peers) > 0 { - return + log.Printf("[PUBSUB] Starting continuous peer discovery for topic: %s", topicName) + + // Initial aggressive discovery phase (10 attempts) + for attempt := 0; attempt < 10; attempt++ { + peers := topic.ListPeers() + if len(peers) > 0 { + log.Printf("[PUBSUB] Topic %s: Found %d peers in initial discovery", topicName, len(peers)) + break + } + + log.Printf("[PUBSUB] Topic %s: Initial attempt %d, sending discovery ping", topicName, attempt+1) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + discoveryMsg := []byte("PEER_DISCOVERY_PING") + _ = topic.Publish(ctx, discoveryMsg) + cancel() + + delay := time.Duration(100*(attempt+1)) * time.Millisecond + if delay > 2*time.Second { + delay = 2 * time.Second + } + time.Sleep(delay) } - - // Send a short-lived discovery ping to the topic to announce presence. - ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) - defer cancel() - - discoveryMsg := []byte("PEER_DISCOVERY_PING") - _ = topic.Publish(ctx, discoveryMsg) - - // Wait briefly to allow peers to respond via pubsub peer exchange - time.Sleep(300 * time.Millisecond) + + // Continuous maintenance phase - keep pinging every 15 seconds + ticker := time.NewTicker(15 * time.Second) + defer ticker.Stop() + + for i := 0; i < 20; i++ { // Run for ~5 minutes total + <-ticker.C + peers := topic.ListPeers() + + if len(peers) == 0 { + log.Printf("[PUBSUB] Topic %s: No peers, sending maintenance ping", topicName) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + discoveryMsg := []byte("PEER_DISCOVERY_PING") + _ = topic.Publish(ctx, discoveryMsg) + cancel() + } + } + + log.Printf("[PUBSUB] Topic %s: Peer discovery maintenance completed", topicName) } // monitorTopicPeers periodically checks topic peer connectivity and stops once peers are found. diff --git a/pkg/pubsub/subscriptions.go b/pkg/pubsub/subscriptions.go index 5c4a5b0..7a53882 100644 --- a/pkg/pubsub/subscriptions.go +++ b/pkg/pubsub/subscriptions.go @@ -82,9 +82,6 @@ func (m *Manager) Subscribe(ctx context.Context, topic string, handler MessageHa } }() - // Force peer discovery for this topic (application-agnostic) - go m.announceTopicInterest(namespacedTopic) - return nil } diff --git a/pkg/rqlite/rqlite.go b/pkg/rqlite/rqlite.go index d673662..3498bc2 100644 --- a/pkg/rqlite/rqlite.go +++ b/pkg/rqlite/rqlite.go @@ -137,8 +137,8 @@ func (r *RQLiteManager) Start(ctx context.Context) error { r.cmd = exec.Command("rqlited", args...) // Enable debug logging of RQLite process to help diagnose issues - // r.cmd.Stdout = os.Stdout - // r.cmd.Stderr = os.Stderr + r.cmd.Stdout = os.Stdout + r.cmd.Stderr = os.Stderr if err := r.cmd.Start(); err != nil { return fmt.Errorf("failed to start RQLite: %w", err)