mirror of
https://github.com/DeBrosOfficial/network.git
synced 2026-01-30 12:43:04 +00:00
274 lines
7.9 KiB
Go
274 lines
7.9 KiB
Go
package deployments
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/client"
|
|
"github.com/DeBrosOfficial/network/pkg/rqlite"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// ReplicaManager manages deployment replicas across nodes
|
|
type ReplicaManager struct {
|
|
db rqlite.Client
|
|
homeNodeMgr *HomeNodeManager
|
|
portAllocator *PortAllocator
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// NewReplicaManager creates a new replica manager
|
|
func NewReplicaManager(db rqlite.Client, homeNodeMgr *HomeNodeManager, portAllocator *PortAllocator, logger *zap.Logger) *ReplicaManager {
|
|
return &ReplicaManager{
|
|
db: db,
|
|
homeNodeMgr: homeNodeMgr,
|
|
portAllocator: portAllocator,
|
|
logger: logger,
|
|
}
|
|
}
|
|
|
|
// SelectReplicaNodes picks additional nodes for replicas, excluding the primary node.
|
|
// Returns up to count node IDs.
|
|
func (rm *ReplicaManager) SelectReplicaNodes(ctx context.Context, primaryNodeID string, count int) ([]string, error) {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
activeNodes, err := rm.homeNodeMgr.getActiveNodes(internalCtx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get active nodes: %w", err)
|
|
}
|
|
|
|
// Filter out the primary node
|
|
var candidates []string
|
|
for _, nodeID := range activeNodes {
|
|
if nodeID != primaryNodeID {
|
|
candidates = append(candidates, nodeID)
|
|
}
|
|
}
|
|
|
|
if len(candidates) == 0 {
|
|
return nil, nil // No additional nodes available
|
|
}
|
|
|
|
// Calculate capacity scores and pick the best ones
|
|
capacities, err := rm.homeNodeMgr.calculateNodeCapacities(internalCtx, candidates)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to calculate capacities: %w", err)
|
|
}
|
|
|
|
// Sort by score descending (simple selection)
|
|
selected := make([]string, 0, count)
|
|
for i := 0; i < count && i < len(capacities); i++ {
|
|
best := rm.homeNodeMgr.selectBestNode(capacities)
|
|
if best == nil {
|
|
break
|
|
}
|
|
selected = append(selected, best.NodeID)
|
|
// Remove selected from capacities
|
|
remaining := make([]*NodeCapacity, 0, len(capacities)-1)
|
|
for _, c := range capacities {
|
|
if c.NodeID != best.NodeID {
|
|
remaining = append(remaining, c)
|
|
}
|
|
}
|
|
capacities = remaining
|
|
}
|
|
|
|
rm.logger.Info("Selected replica nodes",
|
|
zap.String("primary", primaryNodeID),
|
|
zap.Strings("replicas", selected),
|
|
zap.Int("requested", count),
|
|
)
|
|
|
|
return selected, nil
|
|
}
|
|
|
|
// CreateReplica inserts a replica record for a deployment on a specific node.
|
|
func (rm *ReplicaManager) CreateReplica(ctx context.Context, deploymentID, nodeID string, port int, isPrimary bool) error {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
query := `
|
|
INSERT INTO deployment_replicas (deployment_id, node_id, port, status, is_primary, created_at, updated_at)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(deployment_id, node_id) DO UPDATE SET
|
|
port = excluded.port,
|
|
status = excluded.status,
|
|
is_primary = excluded.is_primary,
|
|
updated_at = excluded.updated_at
|
|
`
|
|
|
|
now := time.Now()
|
|
_, err := rm.db.Exec(internalCtx, query, deploymentID, nodeID, port, ReplicaStatusActive, isPrimary, now, now)
|
|
if err != nil {
|
|
return &DeploymentError{
|
|
Message: fmt.Sprintf("failed to create replica for deployment %s on node %s", deploymentID, nodeID),
|
|
Cause: err,
|
|
}
|
|
}
|
|
|
|
rm.logger.Info("Created deployment replica",
|
|
zap.String("deployment_id", deploymentID),
|
|
zap.String("node_id", nodeID),
|
|
zap.Int("port", port),
|
|
zap.Bool("is_primary", isPrimary),
|
|
)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetReplicas returns all replicas for a deployment.
|
|
func (rm *ReplicaManager) GetReplicas(ctx context.Context, deploymentID string) ([]Replica, error) {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
type replicaRow struct {
|
|
DeploymentID string `db:"deployment_id"`
|
|
NodeID string `db:"node_id"`
|
|
Port int `db:"port"`
|
|
Status string `db:"status"`
|
|
IsPrimary bool `db:"is_primary"`
|
|
}
|
|
|
|
var rows []replicaRow
|
|
query := `SELECT deployment_id, node_id, port, status, is_primary FROM deployment_replicas WHERE deployment_id = ?`
|
|
err := rm.db.Query(internalCtx, &rows, query, deploymentID)
|
|
if err != nil {
|
|
return nil, &DeploymentError{
|
|
Message: "failed to query replicas",
|
|
Cause: err,
|
|
}
|
|
}
|
|
|
|
replicas := make([]Replica, len(rows))
|
|
for i, row := range rows {
|
|
replicas[i] = Replica{
|
|
DeploymentID: row.DeploymentID,
|
|
NodeID: row.NodeID,
|
|
Port: row.Port,
|
|
Status: ReplicaStatus(row.Status),
|
|
IsPrimary: row.IsPrimary,
|
|
}
|
|
}
|
|
|
|
return replicas, nil
|
|
}
|
|
|
|
// GetActiveReplicaNodes returns node IDs of all active replicas for a deployment.
|
|
func (rm *ReplicaManager) GetActiveReplicaNodes(ctx context.Context, deploymentID string) ([]string, error) {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
type nodeRow struct {
|
|
NodeID string `db:"node_id"`
|
|
}
|
|
|
|
var rows []nodeRow
|
|
query := `SELECT node_id FROM deployment_replicas WHERE deployment_id = ? AND status = ?`
|
|
err := rm.db.Query(internalCtx, &rows, query, deploymentID, ReplicaStatusActive)
|
|
if err != nil {
|
|
return nil, &DeploymentError{
|
|
Message: "failed to query active replicas",
|
|
Cause: err,
|
|
}
|
|
}
|
|
|
|
nodes := make([]string, len(rows))
|
|
for i, row := range rows {
|
|
nodes[i] = row.NodeID
|
|
}
|
|
|
|
return nodes, nil
|
|
}
|
|
|
|
// IsReplicaNode checks if the given node is an active replica for the deployment.
|
|
func (rm *ReplicaManager) IsReplicaNode(ctx context.Context, deploymentID, nodeID string) (bool, error) {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
type countRow struct {
|
|
Count int `db:"c"`
|
|
}
|
|
|
|
var rows []countRow
|
|
query := `SELECT COUNT(*) as c FROM deployment_replicas WHERE deployment_id = ? AND node_id = ? AND status = ?`
|
|
err := rm.db.Query(internalCtx, &rows, query, deploymentID, nodeID, ReplicaStatusActive)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
return len(rows) > 0 && rows[0].Count > 0, nil
|
|
}
|
|
|
|
// GetReplicaPort returns the port allocated for a deployment on a specific node.
|
|
func (rm *ReplicaManager) GetReplicaPort(ctx context.Context, deploymentID, nodeID string) (int, error) {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
type portRow struct {
|
|
Port int `db:"port"`
|
|
}
|
|
|
|
var rows []portRow
|
|
query := `SELECT port FROM deployment_replicas WHERE deployment_id = ? AND node_id = ? AND status = ? LIMIT 1`
|
|
err := rm.db.Query(internalCtx, &rows, query, deploymentID, nodeID, ReplicaStatusActive)
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
if len(rows) == 0 {
|
|
return 0, fmt.Errorf("no active replica found for deployment %s on node %s", deploymentID, nodeID)
|
|
}
|
|
|
|
return rows[0].Port, nil
|
|
}
|
|
|
|
// UpdateReplicaStatus updates the status of a specific replica.
|
|
func (rm *ReplicaManager) UpdateReplicaStatus(ctx context.Context, deploymentID, nodeID string, status ReplicaStatus) error {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
query := `UPDATE deployment_replicas SET status = ?, updated_at = ? WHERE deployment_id = ? AND node_id = ?`
|
|
_, err := rm.db.Exec(internalCtx, query, status, time.Now(), deploymentID, nodeID)
|
|
if err != nil {
|
|
return &DeploymentError{
|
|
Message: fmt.Sprintf("failed to update replica status for %s on %s", deploymentID, nodeID),
|
|
Cause: err,
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RemoveReplicas deletes all replica records for a deployment.
|
|
func (rm *ReplicaManager) RemoveReplicas(ctx context.Context, deploymentID string) error {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
query := `DELETE FROM deployment_replicas WHERE deployment_id = ?`
|
|
_, err := rm.db.Exec(internalCtx, query, deploymentID)
|
|
if err != nil {
|
|
return &DeploymentError{
|
|
Message: "failed to remove replicas",
|
|
Cause: err,
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetNodeIP retrieves the IP address for a node from dns_nodes.
|
|
func (rm *ReplicaManager) GetNodeIP(ctx context.Context, nodeID string) (string, error) {
|
|
internalCtx := client.WithInternalAuth(ctx)
|
|
|
|
type nodeRow struct {
|
|
IPAddress string `db:"ip_address"`
|
|
}
|
|
|
|
var rows []nodeRow
|
|
query := `SELECT ip_address FROM dns_nodes WHERE id = ? LIMIT 1`
|
|
err := rm.db.Query(internalCtx, &rows, query, nodeID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
if len(rows) == 0 {
|
|
return "", fmt.Errorf("node not found: %s", nodeID)
|
|
}
|
|
|
|
return rows[0].IPAddress, nil
|
|
}
|