diff --git a/examples/basic_usage.go b/examples/basic_usage.go deleted file mode 100644 index e95e78d..0000000 --- a/examples/basic_usage.go +++ /dev/null @@ -1,151 +0,0 @@ -package main - -import ( - "context" - "log" - "time" - - "github.com/DeBrosOfficial/network/pkg/client" -) - -func main() { - // Create client configuration - config := client.DefaultClientConfig("example_app") - config.BootstrapPeers = []string{ - "/ip4/127.0.0.1/tcp/4001/p2p/QmBootstrap1", - } - - // Create network client - networkClient, err := client.NewClient(config) - if err != nil { - log.Fatalf("Failed to create network client: %v", err) - } - - // Connect to network - if err := networkClient.Connect(); err != nil { - log.Fatalf("Failed to connect to network: %v", err) - } - defer networkClient.Disconnect() - - log.Printf("Connected to network successfully!") - - // Example: Database operations - demonstrateDatabase(networkClient) - - // Example: Pub/Sub messaging - demonstratePubSub(networkClient) - - // Example: Network information - demonstrateNetworkInfo(networkClient) - - log.Printf("Example completed successfully!") -} - -func demonstrateDatabase(client client.NetworkClient) { - ctx := context.Background() - db := client.Database() - - log.Printf("=== Database Operations ===") - - // Create a table - schema := ` - CREATE TABLE IF NOT EXISTS messages ( - id INTEGER PRIMARY KEY, - content TEXT NOT NULL, - timestamp DATETIME DEFAULT CURRENT_TIMESTAMP - ) - ` - if err := db.CreateTable(ctx, schema); err != nil { - log.Printf("Error creating table: %v", err) - return - } - log.Printf("Table created successfully") - - // Insert some data - insertSQL := "INSERT INTO messages (content) VALUES (?)" - result, err := db.Query(ctx, insertSQL, "Hello, distributed world!") - if err != nil { - log.Printf("Error inserting data: %v", err) - return - } - log.Printf("Data inserted, result: %+v", result) - - // Query data - selectSQL := "SELECT * FROM messages" - result, err = db.Query(ctx, selectSQL) - if err != nil { - log.Printf("Error querying data: %v", err) - return - } - log.Printf("Query result: %+v", result) -} - -func demonstratePubSub(client client.NetworkClient) { - ctx := context.Background() - pubsub := client.PubSub() - - log.Printf("=== Pub/Sub Operations ===") - - // Subscribe to a topic - topic := "notifications" - handler := func(topic string, data []byte) error { - log.Printf("Received message on topic '%s': %s", topic, string(data)) - return nil - } - - if err := pubsub.Subscribe(ctx, topic, handler); err != nil { - log.Printf("Error subscribing: %v", err) - return - } - log.Printf("Subscribed to topic: %s", topic) - - // Publish a message - message := []byte("Hello from pub/sub!") - if err := pubsub.Publish(ctx, topic, message); err != nil { - log.Printf("Error publishing: %v", err) - return - } - log.Printf("Message published") - - // Wait a bit for message delivery - time.Sleep(time.Millisecond * 100) - - // List topics - topics, err := pubsub.ListTopics(ctx) - if err != nil { - log.Printf("Error listing topics: %v", err) - return - } - log.Printf("Subscribed topics: %v", topics) -} - -func demonstrateNetworkInfo(client client.NetworkClient) { - ctx := context.Background() - network := client.Network() - - log.Printf("=== Network Information ===") - - // Get network status - status, err := network.GetStatus(ctx) - if err != nil { - log.Printf("Error getting status: %v", err) - return - } - log.Printf("Network status: %+v", status) - - // Get peers - peers, err := network.GetPeers(ctx) - if err != nil { - log.Printf("Error getting peers: %v", err) - return - } - log.Printf("Connected peers: %+v", peers) - - // Get client health - health, err := client.Health() - if err != nil { - log.Printf("Error getting health: %v", err) - return - } - log.Printf("Client health: %+v", health) -} diff --git a/examples/sdk-typescript/README.md b/examples/sdk-typescript/README.md deleted file mode 100644 index 1e78797..0000000 --- a/examples/sdk-typescript/README.md +++ /dev/null @@ -1,23 +0,0 @@ -# DeBros Gateway TypeScript SDK (Minimal Example) - -Minimal, dependency-light wrapper around the HTTP Gateway. - -Usage: - -```bash -npm i -export GATEWAY_BASE_URL=http://127.0.0.1:6001 -export GATEWAY_API_KEY=your_api_key -``` - -```ts -import { GatewayClient } from './src/client'; - -const c = new GatewayClient(process.env.GATEWAY_BASE_URL!, process.env.GATEWAY_API_KEY!); -await c.createTable('CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT)'); -await c.transaction([ - 'INSERT INTO users (id,name) VALUES (1,\'Alice\')' -]); -const res = await c.query('SELECT name FROM users WHERE id = ?', [1]); -console.log(res.rows); -``` diff --git a/examples/sdk-typescript/package.json b/examples/sdk-typescript/package.json deleted file mode 100644 index 3aac5f0..0000000 --- a/examples/sdk-typescript/package.json +++ /dev/null @@ -1,17 +0,0 @@ -{ - "name": "debros-gateway-sdk", - "version": "0.1.0", - "private": true, - "type": "module", - "main": "dist/index.js", - "types": "dist/index.d.ts", - "scripts": { - "build": "tsc -p tsconfig.json" - }, - "dependencies": { - "isomorphic-ws": "^5.0.0" - }, - "devDependencies": { - "typescript": "^5.5.4" - } -} diff --git a/examples/sdk-typescript/src/client.ts b/examples/sdk-typescript/src/client.ts deleted file mode 100644 index bf80606..0000000 --- a/examples/sdk-typescript/src/client.ts +++ /dev/null @@ -1,154 +0,0 @@ -import WebSocket from "isomorphic-ws"; - -export class GatewayClient { - constructor( - private baseUrl: string, - private apiKey: string, - private http = fetch - ) {} - - private headers(json = true): Record { - const h: Record = { "X-API-Key": this.apiKey }; - if (json) h["Content-Type"] = "application/json"; - return h; - } - - // Database - async createTable(schema: string): Promise { - const r = await this.http(`${this.baseUrl}/v1/rqlite/create-table`, { - method: "POST", - headers: this.headers(), - body: JSON.stringify({ schema }), - }); - if (!r.ok) throw new Error(`createTable failed: ${r.status}`); - } - - async dropTable(table: string): Promise { - const r = await this.http(`${this.baseUrl}/v1/rqlite/drop-table`, { - method: "POST", - headers: this.headers(), - body: JSON.stringify({ table }), - }); - if (!r.ok) throw new Error(`dropTable failed: ${r.status}`); - } - - async query(sql: string, args: any[] = []): Promise<{ rows: T[] }> { - const r = await this.http(`${this.baseUrl}/v1/rqlite/query`, { - method: "POST", - headers: this.headers(), - body: JSON.stringify({ sql, args }), - }); - if (!r.ok) throw new Error(`query failed: ${r.status}`); - return r.json(); - } - - async transaction(statements: string[]): Promise { - const r = await this.http(`${this.baseUrl}/v1/rqlite/transaction`, { - method: "POST", - headers: this.headers(), - body: JSON.stringify({ statements }), - }); - if (!r.ok) throw new Error(`transaction failed: ${r.status}`); - } - - async schema(): Promise { - const r = await this.http(`${this.baseUrl}/v1/rqlite/schema`, { - headers: this.headers(false), - }); - if (!r.ok) throw new Error(`schema failed: ${r.status}`); - return r.json(); - } - - // Storage - async put(key: string, value: Uint8Array | string): Promise { - const body = - typeof value === "string" ? new TextEncoder().encode(value) : value; - const r = await this.http( - `${this.baseUrl}/v1/storage/put?key=${encodeURIComponent(key)}`, - { - method: "POST", - headers: { "X-API-Key": this.apiKey }, - body, - } - ); - if (!r.ok) throw new Error(`put failed: ${r.status}`); - } - - async get(key: string): Promise { - const r = await this.http( - `${this.baseUrl}/v1/storage/get?key=${encodeURIComponent(key)}`, - { - headers: { "X-API-Key": this.apiKey }, - } - ); - if (!r.ok) throw new Error(`get failed: ${r.status}`); - const buf = new Uint8Array(await r.arrayBuffer()); - return buf; - } - - async exists(key: string): Promise { - const r = await this.http( - `${this.baseUrl}/v1/storage/exists?key=${encodeURIComponent(key)}`, - { - headers: this.headers(false), - } - ); - if (!r.ok) throw new Error(`exists failed: ${r.status}`); - const j = await r.json(); - return !!j.exists; - } - - async list(prefix = ""): Promise { - const r = await this.http( - `${this.baseUrl}/v1/storage/list?prefix=${encodeURIComponent(prefix)}`, - { - headers: this.headers(false), - } - ); - if (!r.ok) throw new Error(`list failed: ${r.status}`); - const j = await r.json(); - return j.keys || []; - } - - async delete(key: string): Promise { - const r = await this.http(`${this.baseUrl}/v1/storage/delete`, { - method: "POST", - headers: this.headers(), - body: JSON.stringify({ key }), - }); - if (!r.ok) throw new Error(`delete failed: ${r.status}`); - } - - // PubSub (minimal) - subscribe( - topic: string, - onMessage: (data: Uint8Array) => void - ): { close: () => void } { - const url = new URL(`${this.baseUrl.replace(/^http/, "ws")}/v1/pubsub/ws`); - url.searchParams.set("topic", topic); - const ws = new WebSocket(url.toString(), { - headers: { "X-API-Key": this.apiKey }, - } as any); - ws.binaryType = "arraybuffer"; - ws.onmessage = (ev: any) => { - const data = - ev.data instanceof ArrayBuffer - ? new Uint8Array(ev.data) - : new TextEncoder().encode(String(ev.data)); - onMessage(data); - }; - return { close: () => ws.close() }; - } - - async publish(topic: string, data: Uint8Array | string): Promise { - const bytes = - typeof data === "string" ? new TextEncoder().encode(data) : data; - const b64 = Buffer.from(bytes).toString("base64"); - const r = await this.http(`${this.baseUrl}/v1/pubsub/publish`, { - method: "POST", - headers: this.headers(), - body: JSON.stringify({ topic, data_base64: b64 }), - }); - if (!r.ok) throw new Error(`publish failed: ${r.status}`); - } -} diff --git a/examples/sdk-typescript/tsconfig.json b/examples/sdk-typescript/tsconfig.json deleted file mode 100644 index 80e7492..0000000 --- a/examples/sdk-typescript/tsconfig.json +++ /dev/null @@ -1,12 +0,0 @@ -{ - "compilerOptions": { - "target": "ES2020", - "module": "ES2020", - "declaration": true, - "outDir": "dist", - "rootDir": "src", - "strict": true, - "moduleResolution": "Node" - }, - "include": ["src/**/*"] -} diff --git a/pkg/discovery/discovery.go b/pkg/discovery/discovery.go index d022828..ef8da35 100644 --- a/pkg/discovery/discovery.go +++ b/pkg/discovery/discovery.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "io" + "strconv" "time" "github.com/libp2p/go-libp2p/core/host" @@ -114,9 +115,46 @@ func (d *Manager) handlePeerExchangeStream(s network.Stream) { continue } + // Filter addresses to only include configured listen addresses, not ephemeral ports + // Ephemeral ports are typically > 32768, so we filter those out + filteredAddrs := make([]multiaddr.Multiaddr, 0) + for _, addr := range addrs { + // Extract TCP port from multiaddr + port, err := addr.ValueForProtocol(multiaddr.P_TCP) + if err == nil { + portNum, err := strconv.Atoi(port) + if err == nil { + // Only include ports that are reasonable (not ephemeral ports > 32768) + // Common LibP2P ports are typically < 10000 + if portNum > 0 && portNum <= 32767 { + filteredAddrs = append(filteredAddrs, addr) + } else { + d.logger.Debug("Filtering out ephemeral port address", + zap.String("peer_id", pid.String()[:8]+"..."), + zap.String("addr", addr.String()), + zap.Int("port", portNum)) + } + } else { + // If we can't parse port, include it anyway (might be non-TCP) + filteredAddrs = append(filteredAddrs, addr) + } + } else { + // If no TCP port found, include it anyway (might be non-TCP) + filteredAddrs = append(filteredAddrs, addr) + } + } + + // If no addresses remain after filtering, skip this peer + if len(filteredAddrs) == 0 { + d.logger.Debug("No valid addresses after filtering ephemeral ports", + zap.String("peer_id", pid.String()[:8]+"..."), + zap.Int("original_count", len(addrs))) + continue + } + // Convert addresses to strings - addrStrs := make([]string, len(addrs)) - for i, addr := range addrs { + addrStrs := make([]string, len(filteredAddrs)) + for i, addr := range filteredAddrs { addrStrs[i] = addr.String() }