mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-27 11:04:12 +00:00
Merge branch '0.115.0' of github-debros:DeBrosOfficial/network into 0.115.0
This commit is contained in:
commit
6657c90e36
@ -11,6 +11,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"github.com/DeBrosOfficial/network/pkg/wireguard"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@ -57,7 +58,7 @@ func (n *Node) registerDNSNode(ctx context.Context) error {
|
||||
`
|
||||
|
||||
db := n.rqliteAdapter.GetSQLDB()
|
||||
_, err = db.ExecContext(ctx, query, nodeID, ipAddress, internalIP, region)
|
||||
_, err = rqlite.SafeExecContext(db, ctx, query, nodeID, ipAddress, internalIP, region)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to register DNS node: %w", err)
|
||||
}
|
||||
@ -112,7 +113,7 @@ func (n *Node) updateDNSHeartbeat(ctx context.Context) error {
|
||||
|
||||
query := `UPDATE dns_nodes SET last_seen = datetime('now'), updated_at = datetime('now') WHERE id = ?`
|
||||
db := n.rqliteAdapter.GetSQLDB()
|
||||
_, err := db.ExecContext(ctx, query, nodeID)
|
||||
_, err := rqlite.SafeExecContext(db, ctx, query, nodeID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update DNS heartbeat: %w", err)
|
||||
}
|
||||
@ -177,7 +178,7 @@ func (n *Node) ensureBaseDNSRecords(ctx context.Context) error {
|
||||
query := `INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at)
|
||||
VALUES (?, 'A', ?, 300, 'system', 'system', TRUE, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(fqdn, record_type, value) DO NOTHING`
|
||||
if _, err := db.ExecContext(ctx, query, r.fqdn, r.value); err != nil {
|
||||
if _, err := rqlite.SafeExecContext(db, ctx, query, r.fqdn, r.value); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to ensure DNS record",
|
||||
zap.String("fqdn", r.fqdn), zap.Error(err))
|
||||
}
|
||||
@ -219,7 +220,7 @@ func (n *Node) ensureSOAAndNSRecords(ctx context.Context, baseDomain string) {
|
||||
// Create SOA record
|
||||
soaValue := fmt.Sprintf("ns1.%s. admin.%s. %d 3600 1800 604800 300",
|
||||
baseDomain, baseDomain, time.Now().Unix())
|
||||
if _, err := db.ExecContext(ctx,
|
||||
if _, err := rqlite.SafeExecContext(db, ctx,
|
||||
`INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at)
|
||||
VALUES (?, 'SOA', ?, 300, 'system', 'system', TRUE, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(fqdn, record_type, value) DO NOTHING`,
|
||||
@ -231,7 +232,7 @@ func (n *Node) ensureSOAAndNSRecords(ctx context.Context, baseDomain string) {
|
||||
// Create NS records (ns1, ns2, ns3)
|
||||
for i := 1; i <= 3; i++ {
|
||||
nsValue := fmt.Sprintf("ns%d.%s.", i, baseDomain)
|
||||
if _, err := db.ExecContext(ctx,
|
||||
if _, err := rqlite.SafeExecContext(db, ctx,
|
||||
`INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at)
|
||||
VALUES (?, 'NS', ?, 300, 'system', 'system', TRUE, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(fqdn, record_type, value) DO NOTHING`,
|
||||
@ -257,7 +258,7 @@ func (n *Node) claimNameserverSlot(ctx context.Context, domain, ipAddress string
|
||||
|
||||
if err == nil {
|
||||
// Already claimed — update IP if changed
|
||||
if _, err := db.ExecContext(ctx,
|
||||
if _, err := rqlite.SafeExecContext(db, ctx,
|
||||
`UPDATE dns_nameservers SET ip_address = ?, updated_at = datetime('now') WHERE hostname = ? AND domain = ?`,
|
||||
ipAddress, existingHostname, domain,
|
||||
); err != nil {
|
||||
@ -265,7 +266,7 @@ func (n *Node) claimNameserverSlot(ctx context.Context, domain, ipAddress string
|
||||
}
|
||||
// Ensure the glue A record matches
|
||||
nsFQDN := existingHostname + "." + domain + "."
|
||||
if _, err := db.ExecContext(ctx,
|
||||
if _, err := rqlite.SafeExecContext(db, ctx,
|
||||
`INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at)
|
||||
VALUES (?, 'A', ?, 300, 'system', 'system', TRUE, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(fqdn, record_type, value) DO NOTHING`,
|
||||
@ -278,7 +279,7 @@ func (n *Node) claimNameserverSlot(ctx context.Context, domain, ipAddress string
|
||||
|
||||
// Try to claim an available slot
|
||||
for _, hostname := range []string{"ns1", "ns2", "ns3"} {
|
||||
result, err := db.ExecContext(ctx,
|
||||
result, err := rqlite.SafeExecContext(db, ctx,
|
||||
`INSERT INTO dns_nameservers (hostname, node_id, ip_address, domain) VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(hostname) DO NOTHING`,
|
||||
hostname, nodeID, ipAddress, domain,
|
||||
@ -290,7 +291,7 @@ func (n *Node) claimNameserverSlot(ctx context.Context, domain, ipAddress string
|
||||
if rows > 0 {
|
||||
// Successfully claimed this slot — create glue record
|
||||
nsFQDN := hostname + "." + domain + "."
|
||||
if _, err := db.ExecContext(ctx,
|
||||
if _, err := rqlite.SafeExecContext(db, ctx,
|
||||
`INSERT INTO dns_records (fqdn, record_type, value, ttl, namespace, created_by, is_active, created_at, updated_at)
|
||||
VALUES (?, 'A', ?, 300, 'system', 'system', TRUE, datetime('now'), datetime('now'))
|
||||
ON CONFLICT(fqdn, record_type, value) DO NOTHING`,
|
||||
@ -347,27 +348,27 @@ func (n *Node) cleanupStaleNodeRecords(ctx context.Context) {
|
||||
}
|
||||
|
||||
// Mark node as inactive
|
||||
if _, err := db.ExecContext(ctx, `UPDATE dns_nodes SET status = 'inactive', updated_at = datetime('now') WHERE id = ?`, nodeID); err != nil {
|
||||
if _, err := rqlite.SafeExecContext(db, ctx, `UPDATE dns_nodes SET status = 'inactive', updated_at = datetime('now') WHERE id = ?`, nodeID); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to mark node inactive", zap.String("node_id", nodeID), zap.Error(err))
|
||||
}
|
||||
|
||||
// Remove the dead node's A records from round-robin
|
||||
for _, f := range fqdnsToClean {
|
||||
if _, err := db.ExecContext(ctx, `DELETE FROM dns_records WHERE fqdn = ? AND record_type = 'A' AND value = ? AND namespace = 'system'`, f, ip); err != nil {
|
||||
if _, err := rqlite.SafeExecContext(db, ctx, `DELETE FROM dns_records WHERE fqdn = ? AND record_type = 'A' AND value = ? AND namespace = 'system'`, f, ip); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to remove stale DNS record",
|
||||
zap.String("fqdn", f), zap.String("ip", ip), zap.Error(err))
|
||||
}
|
||||
}
|
||||
|
||||
// Release any NS slot held by this dead node
|
||||
if _, err := db.ExecContext(ctx, `DELETE FROM dns_nameservers WHERE node_id = ?`, nodeID); err != nil {
|
||||
if _, err := rqlite.SafeExecContext(db, ctx, `DELETE FROM dns_nameservers WHERE node_id = ?`, nodeID); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to release NS slot", zap.String("node_id", nodeID), zap.Error(err))
|
||||
}
|
||||
|
||||
// Remove glue records for this node's IP (ns1.domain., ns2.domain., ns3.domain.)
|
||||
for _, ns := range []string{"ns1", "ns2", "ns3"} {
|
||||
nsFQDN := ns + "." + baseDomain + "."
|
||||
if _, err := db.ExecContext(ctx,
|
||||
if _, err := rqlite.SafeExecContext(db, ctx,
|
||||
`DELETE FROM dns_records WHERE fqdn = ? AND record_type = 'A' AND value = ? AND namespace = 'system'`,
|
||||
nsFQDN, ip,
|
||||
); err != nil {
|
||||
@ -484,7 +485,7 @@ func cleanupPrivateIPRecords(ctx context.Context, db *sql.DB, logger *logging.Co
|
||||
AND (value LIKE '10.%' OR value LIKE '172.16.%' OR value LIKE '172.17.%' OR value LIKE '172.18.%'
|
||||
OR value LIKE '172.19.%' OR value LIKE '172.2_.%' OR value LIKE '172.30.%' OR value LIKE '172.31.%'
|
||||
OR value LIKE '192.168.%' OR value = '127.0.0.1')`
|
||||
result, err := db.ExecContext(ctx, query)
|
||||
result, err := rqlite.SafeExecContext(db, ctx, query)
|
||||
if err != nil {
|
||||
logger.ComponentWarn(logging.ComponentNode, "Failed to clean up private IP DNS records", zap.Error(err))
|
||||
return
|
||||
|
||||
@ -12,6 +12,7 @@ import (
|
||||
|
||||
"github.com/DeBrosOfficial/network/pkg/environments/production"
|
||||
"github.com/DeBrosOfficial/network/pkg/logging"
|
||||
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
@ -166,13 +167,13 @@ func (n *Node) ensureWireGuardSelfRegistered(ctx context.Context) {
|
||||
// Clean up stale entries for this public IP with a different node_id.
|
||||
// This prevents ghost peers from previous installs or from the temporary
|
||||
// "node-10.0.0.X" ID that the join handler creates.
|
||||
if _, err := db.ExecContext(ctx,
|
||||
if _, err := rqlite.SafeExecContext(db, ctx,
|
||||
"DELETE FROM wireguard_peers WHERE public_ip = ? AND node_id != ?",
|
||||
publicIP, nodeID); err != nil {
|
||||
n.logger.ComponentWarn(logging.ComponentNode, "Failed to clean stale WG entries", zap.Error(err))
|
||||
}
|
||||
|
||||
_, err = db.ExecContext(ctx,
|
||||
_, err = rqlite.SafeExecContext(db, ctx,
|
||||
"INSERT OR REPLACE INTO wireguard_peers (node_id, wg_ip, public_key, public_ip, wg_port, ipfs_peer_id) VALUES (?, ?, ?, ?, ?, ?)",
|
||||
nodeID, wgIP, localPubKey, publicIP, 51820, ipfsPeerID)
|
||||
if err != nil {
|
||||
|
||||
@ -35,7 +35,14 @@ func (c *client) Query(ctx context.Context, dest any, query string, args ...any)
|
||||
}
|
||||
|
||||
// Exec runs a write statement (INSERT/UPDATE/DELETE).
|
||||
func (c *client) Exec(ctx context.Context, query string, args ...any) (sql.Result, error) {
|
||||
// Includes panic recovery because the gorqlite stdlib driver can panic
|
||||
// with "index out of range" when RQLite is temporarily unavailable.
|
||||
func (c *client) Exec(ctx context.Context, query string, args ...any) (result sql.Result, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("gorqlite panic (ExecContext): %v", r)
|
||||
}
|
||||
}()
|
||||
return c.db.ExecContext(ctx, query, args...)
|
||||
}
|
||||
|
||||
|
||||
19
pkg/rqlite/safe_exec.go
Normal file
19
pkg/rqlite/safe_exec.go
Normal file
@ -0,0 +1,19 @@
|
||||
package rqlite
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// SafeExecContext wraps db.ExecContext with panic recovery.
|
||||
// The gorqlite stdlib driver can panic with "index out of range" when
|
||||
// RQLite is temporarily unavailable. This converts the panic to an error.
|
||||
func SafeExecContext(db *sql.DB, ctx context.Context, query string, args ...interface{}) (result sql.Result, err error) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
err = fmt.Errorf("gorqlite panic (ExecContext): %v", r)
|
||||
}
|
||||
}()
|
||||
return db.ExecContext(ctx, query, args...)
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user