mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 18:36:57 +00:00
Replaces plaintext password-based SSH authentication (sshpass) across the entire Go CLI with wallet-derived ed25519 keys via RootWallet. - Add `rw vault ssh agent-load` command to RootWallet CLI for SSH agent forwarding in push fanout - Create wallet.go bridge: PrepareNodeKeys resolves keys from `rw vault ssh get --priv`, writes temp PEMs (0600), zero-overwrites on cleanup - Remove Password field from Node struct, update config parser to new 3-field format (env|user@host|role) - Remove all sshpass branches from inspector/ssh.go and remotessh/ssh.go, require SSHKey on all SSH paths - Add WithAgentForward() option to RunSSHStreaming for hub fanout - Add PrepareNodeKeys + defer cleanup to all 7 entry points: inspect, monitor, push, upgrade, clean, recover, install - Update push fanout to use SSH agent forwarding instead of sshpass on hub - Delete install/ssh.go duplicate, replace with remotessh calls - Create nodes.conf from remote-nodes.conf (topology only, no secrets) - Update all config defaults and help text from remote-nodes.conf to nodes.conf - Use StrictHostKeyChecking=accept-new consistently everywhere
262 lines
7.2 KiB
Go
262 lines
7.2 KiB
Go
package push
|
|
|
|
import (
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/cli/remotessh"
|
|
"github.com/DeBrosOfficial/network/pkg/inspector"
|
|
)
|
|
|
|
// Flags holds push command flags.
|
|
type Flags struct {
|
|
Env string // Target environment (devnet, testnet)
|
|
Node string // Single node IP (optional)
|
|
Direct bool // Sequential upload to each node (no fanout)
|
|
}
|
|
|
|
// Handle is the entry point for the push command.
|
|
func Handle(args []string) {
|
|
flags, err := parseFlags(args)
|
|
if err != nil {
|
|
if err == flag.ErrHelp {
|
|
return
|
|
}
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
|
|
if err := execute(flags); err != nil {
|
|
fmt.Fprintf(os.Stderr, "Error: %v\n", err)
|
|
os.Exit(1)
|
|
}
|
|
}
|
|
|
|
func parseFlags(args []string) (*Flags, error) {
|
|
fs := flag.NewFlagSet("push", flag.ContinueOnError)
|
|
fs.SetOutput(os.Stderr)
|
|
|
|
flags := &Flags{}
|
|
fs.StringVar(&flags.Env, "env", "", "Target environment (devnet, testnet) [required]")
|
|
fs.StringVar(&flags.Node, "node", "", "Push to a single node IP only")
|
|
fs.BoolVar(&flags.Direct, "direct", false, "Upload directly to each node (no hub fanout)")
|
|
|
|
if err := fs.Parse(args); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if flags.Env == "" {
|
|
return nil, fmt.Errorf("--env is required\nUsage: orama node push --env <devnet|testnet>")
|
|
}
|
|
|
|
return flags, nil
|
|
}
|
|
|
|
func execute(flags *Flags) error {
|
|
// Find archive
|
|
archivePath := findNewestArchive()
|
|
if archivePath == "" {
|
|
return fmt.Errorf("no binary archive found in /tmp/ (run `orama build` first)")
|
|
}
|
|
|
|
info, _ := os.Stat(archivePath)
|
|
fmt.Printf("Archive: %s (%s)\n", filepath.Base(archivePath), formatBytes(info.Size()))
|
|
|
|
// Resolve nodes
|
|
nodes, err := remotessh.LoadEnvNodes(flags.Env)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Prepare wallet-derived SSH keys
|
|
cleanup, err := remotessh.PrepareNodeKeys(nodes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer cleanup()
|
|
|
|
// Filter to single node if specified
|
|
if flags.Node != "" {
|
|
nodes = remotessh.FilterByIP(nodes, flags.Node)
|
|
if len(nodes) == 0 {
|
|
return fmt.Errorf("node %s not found in %s environment", flags.Node, flags.Env)
|
|
}
|
|
}
|
|
|
|
fmt.Printf("Environment: %s (%d nodes)\n\n", flags.Env, len(nodes))
|
|
|
|
if flags.Direct || len(nodes) == 1 {
|
|
return pushDirect(archivePath, nodes)
|
|
}
|
|
|
|
// Load keys into ssh-agent for fanout forwarding
|
|
if err := remotessh.LoadAgentKeys(nodes); err != nil {
|
|
return fmt.Errorf("load agent keys for fanout: %w", err)
|
|
}
|
|
|
|
return pushFanout(archivePath, nodes)
|
|
}
|
|
|
|
// pushDirect uploads the archive to each node sequentially.
|
|
func pushDirect(archivePath string, nodes []inspector.Node) error {
|
|
remotePath := "/tmp/" + filepath.Base(archivePath)
|
|
|
|
for i, node := range nodes {
|
|
fmt.Printf("[%d/%d] Pushing to %s...\n", i+1, len(nodes), node.Host)
|
|
|
|
if err := remotessh.UploadFile(node, archivePath, remotePath); err != nil {
|
|
return fmt.Errorf("upload to %s failed: %w", node.Host, err)
|
|
}
|
|
|
|
if err := extractOnNode(node, remotePath); err != nil {
|
|
return fmt.Errorf("extract on %s failed: %w", node.Host, err)
|
|
}
|
|
|
|
fmt.Printf(" ✓ %s done\n\n", node.Host)
|
|
}
|
|
|
|
fmt.Printf("✓ Push complete (%d nodes)\n", len(nodes))
|
|
return nil
|
|
}
|
|
|
|
// pushFanout uploads to a hub node, then fans out to all others via agent forwarding.
|
|
func pushFanout(archivePath string, nodes []inspector.Node) error {
|
|
hub := remotessh.PickHubNode(nodes)
|
|
remotePath := "/tmp/" + filepath.Base(archivePath)
|
|
|
|
// Step 1: Upload to hub
|
|
fmt.Printf("[hub] Uploading to %s...\n", hub.Host)
|
|
if err := remotessh.UploadFile(hub, archivePath, remotePath); err != nil {
|
|
return fmt.Errorf("upload to hub %s failed: %w", hub.Host, err)
|
|
}
|
|
|
|
if err := extractOnNode(hub, remotePath); err != nil {
|
|
return fmt.Errorf("extract on hub %s failed: %w", hub.Host, err)
|
|
}
|
|
fmt.Printf(" ✓ hub %s done\n\n", hub.Host)
|
|
|
|
// Step 2: Fan out from hub to remaining nodes in parallel (via agent forwarding)
|
|
remaining := make([]inspector.Node, 0, len(nodes)-1)
|
|
for _, n := range nodes {
|
|
if n.Host != hub.Host {
|
|
remaining = append(remaining, n)
|
|
}
|
|
}
|
|
|
|
if len(remaining) == 0 {
|
|
fmt.Printf("✓ Push complete (1 node)\n")
|
|
return nil
|
|
}
|
|
|
|
fmt.Printf("[fanout] Distributing from %s to %d nodes...\n", hub.Host, len(remaining))
|
|
|
|
var wg sync.WaitGroup
|
|
errors := make([]error, len(remaining))
|
|
|
|
for i, target := range remaining {
|
|
wg.Add(1)
|
|
go func(idx int, target inspector.Node) {
|
|
defer wg.Done()
|
|
|
|
// SCP from hub to target (agent forwarding serves the key)
|
|
scpCmd := fmt.Sprintf("scp -o StrictHostKeyChecking=accept-new -o ConnectTimeout=10 %s %s@%s:%s",
|
|
remotePath, target.User, target.Host, remotePath)
|
|
|
|
if err := remotessh.RunSSHStreaming(hub, scpCmd, remotessh.WithAgentForward()); err != nil {
|
|
errors[idx] = fmt.Errorf("fanout to %s failed: %w", target.Host, err)
|
|
return
|
|
}
|
|
|
|
if err := extractOnNodeVia(hub, target, remotePath); err != nil {
|
|
errors[idx] = fmt.Errorf("extract on %s failed: %w", target.Host, err)
|
|
return
|
|
}
|
|
|
|
fmt.Printf(" ✓ %s done\n", target.Host)
|
|
}(i, target)
|
|
}
|
|
|
|
wg.Wait()
|
|
|
|
// Check for errors
|
|
var failed []string
|
|
for i, err := range errors {
|
|
if err != nil {
|
|
fmt.Fprintf(os.Stderr, " ✗ %s: %v\n", remaining[i].Host, err)
|
|
failed = append(failed, remaining[i].Host)
|
|
}
|
|
}
|
|
|
|
if len(failed) > 0 {
|
|
return fmt.Errorf("push failed on %d node(s): %s", len(failed), strings.Join(failed, ", "))
|
|
}
|
|
|
|
fmt.Printf("\n✓ Push complete (%d nodes)\n", len(nodes))
|
|
return nil
|
|
}
|
|
|
|
// extractOnNode extracts the archive on a remote node.
|
|
func extractOnNode(node inspector.Node, remotePath string) error {
|
|
sudo := remotessh.SudoPrefix(node)
|
|
cmd := fmt.Sprintf("%smkdir -p /opt/orama && %star xzf %s -C /opt/orama && %srm -f %s",
|
|
sudo, sudo, remotePath, sudo, remotePath)
|
|
return remotessh.RunSSHStreaming(node, cmd)
|
|
}
|
|
|
|
// extractOnNodeVia extracts the archive on a target node by SSHing through the hub.
|
|
// Uses agent forwarding so the hub can authenticate to the target.
|
|
func extractOnNodeVia(hub, target inspector.Node, remotePath string) error {
|
|
sudo := remotessh.SudoPrefix(target)
|
|
extractCmd := fmt.Sprintf("%smkdir -p /opt/orama && %star xzf %s -C /opt/orama && %srm -f %s",
|
|
sudo, sudo, remotePath, sudo, remotePath)
|
|
|
|
// SSH from hub to target to extract (agent forwarding serves the key)
|
|
sshCmd := fmt.Sprintf("ssh -o StrictHostKeyChecking=accept-new -o ConnectTimeout=10 %s@%s '%s'",
|
|
target.User, target.Host, extractCmd)
|
|
|
|
return remotessh.RunSSHStreaming(hub, sshCmd, remotessh.WithAgentForward())
|
|
}
|
|
|
|
// findNewestArchive finds the newest binary archive in /tmp/.
|
|
func findNewestArchive() string {
|
|
entries, err := os.ReadDir("/tmp")
|
|
if err != nil {
|
|
return ""
|
|
}
|
|
|
|
var best string
|
|
var bestMod int64
|
|
for _, entry := range entries {
|
|
name := entry.Name()
|
|
if strings.HasPrefix(name, "orama-") && strings.Contains(name, "-linux-") && strings.HasSuffix(name, ".tar.gz") {
|
|
info, err := entry.Info()
|
|
if err != nil {
|
|
continue
|
|
}
|
|
if info.ModTime().Unix() > bestMod {
|
|
best = filepath.Join("/tmp", name)
|
|
bestMod = info.ModTime().Unix()
|
|
}
|
|
}
|
|
}
|
|
|
|
return best
|
|
}
|
|
|
|
func formatBytes(b int64) string {
|
|
const unit = 1024
|
|
if b < unit {
|
|
return fmt.Sprintf("%d B", b)
|
|
}
|
|
div, exp := int64(unit), 0
|
|
for n := b / unit; n >= unit; n /= unit {
|
|
div *= unit
|
|
exp++
|
|
}
|
|
return fmt.Sprintf("%.1f %cB", float64(b)/float64(div), "KMGTPE"[exp])
|
|
}
|