anonpenguin23 d10f8c35bb feat(gateway): implement persistent webhooks and namespace sequencing
- Add migrations for per-namespace publish sequences and persistent WebSocket function settings
- Integrate PersistentWSManager and WSBridge into the gateway dependency graph
- Upgrade serverless engine to use a multi-tier rate limiter
- Update JWT claims to support custom application-defined fields
2026-05-04 11:38:19 +03:00

214 lines
6.9 KiB
Go

package hostfunctions
import (
"bytes"
"context"
"encoding/json"
"fmt"
"strconv"
"github.com/DeBrosOfficial/network/pkg/rqlite"
"github.com/DeBrosOfficial/network/pkg/serverless"
)
// DBQuery executes a SELECT query and returns JSON-encoded results.
func (h *HostFunctions) DBQuery(ctx context.Context, query string, args []interface{}) ([]byte, error) {
if h.db == nil {
return nil, &serverless.HostFunctionError{Function: "db_query", Cause: serverless.ErrDatabaseUnavailable}
}
var results []map[string]interface{}
if err := h.db.Query(ctx, &results, query, args...); err != nil {
return nil, &serverless.HostFunctionError{Function: "db_query", Cause: err}
}
data, err := json.Marshal(results)
if err != nil {
return nil, &serverless.HostFunctionError{Function: "db_query", Cause: fmt.Errorf("failed to marshal results: %w", err)}
}
return data, nil
}
// DBExecute executes an INSERT/UPDATE/DELETE query and returns affected rows.
func (h *HostFunctions) DBExecute(ctx context.Context, query string, args []interface{}) (int64, error) {
if h.db == nil {
return 0, &serverless.HostFunctionError{Function: "db_execute", Cause: serverless.ErrDatabaseUnavailable}
}
result, err := h.db.Exec(ctx, query, args...)
if err != nil {
return 0, &serverless.HostFunctionError{Function: "db_execute", Cause: err}
}
affected, _ := result.RowsAffected()
return affected, nil
}
// dbTransactionRequest is the WASM-side shape for db_transaction input.
type dbTransactionRequest struct {
Ops []rqlite.BatchOp `json:"ops"`
}
// DBTransaction executes ops as a single atomic batch.
// Input is JSON: {"ops": [{"kind":"exec"|"query","sql":"...","args":[...]}, ...]}
// Output is JSON: BatchResult — caller checks `committed` to know if writes landed.
//
// Returns an error only for setup/validation problems. A rolled-back batch is
// communicated via committed=false in the returned JSON; that's not a Go error.
func (h *HostFunctions) DBTransaction(ctx context.Context, opsJSON []byte) ([]byte, error) {
if h.db == nil {
return nil, &serverless.HostFunctionError{Function: "db_transaction", Cause: serverless.ErrDatabaseUnavailable}
}
var req dbTransactionRequest
if err := json.Unmarshal(opsJSON, &req); err != nil {
return nil, &serverless.HostFunctionError{
Function: "db_transaction",
Cause: fmt.Errorf("invalid json: %w", err),
}
}
if len(req.Ops) == 0 {
return nil, &serverless.HostFunctionError{
Function: "db_transaction",
Cause: fmt.Errorf("ops required"),
}
}
if len(req.Ops) > rqlite.MaxBatchOps {
return nil, &serverless.HostFunctionError{
Function: "db_transaction",
Cause: fmt.Errorf("too many ops: max %d", rqlite.MaxBatchOps),
}
}
res, err := h.db.Batch(ctx, req.Ops)
// Always return the structured result, even on rollback — caller wants the
// per-op detail to know which op failed.
if res == nil {
// Unrecoverable setup failure (no native conn). Surface as Go error.
return nil, &serverless.HostFunctionError{Function: "db_transaction", Cause: err}
}
out, mErr := json.Marshal(res)
if mErr != nil {
return nil, &serverless.HostFunctionError{
Function: "db_transaction",
Cause: fmt.Errorf("marshal result: %w", mErr),
}
}
// Rollback errors are encoded in the JSON; do NOT propagate as Go error.
// Only true setup/transport errors after the result was built warrant returning err.
_ = err // intentionally swallowed — committed=false carries the signal
return out, nil
}
// execAndPublishResult is the JSON wire shape returned to WASM callers.
type execAndPublishResult struct {
Results []rqlite.OpResult `json:"results"`
Committed bool `json:"committed"`
FailedIndex int `json:"failed_index,omitempty"`
Seq int64 `json:"seq,omitempty"`
Published bool `json:"published,omitempty"`
PublishError string `json:"publish_error,omitempty"`
}
// ExecAndPublish runs ops atomically (with a seq increment in the same batch)
// and, if committed, publishes data with `{{seq}}` substituted for the
// assigned per-namespace sequence number.
//
// Failure modes (each communicated in the JSON, not as Go error):
// - Rollback: committed=false, failed_index points to the failing user op
// - Publish failed but commit succeeded: committed=true, published=false,
// publish_error is set. Writes are durable; caller can retry the publish.
// - Both succeeded: committed=true, published=true.
//
// Returns a Go error only on setup failures (no DB, bad JSON, no namespace).
func (h *HostFunctions) ExecAndPublish(
ctx context.Context, opsJSON []byte, topic string, dataTemplate []byte,
) ([]byte, error) {
if h.db == nil {
return nil, &serverless.HostFunctionError{
Function: "exec_and_publish",
Cause: serverless.ErrDatabaseUnavailable,
}
}
if h.pubsub == nil {
return nil, &serverless.HostFunctionError{
Function: "exec_and_publish",
Cause: fmt.Errorf("pubsub not available"),
}
}
if topic == "" {
return nil, &serverless.HostFunctionError{
Function: "exec_and_publish",
Cause: fmt.Errorf("topic required"),
}
}
// Resolve namespace from invocation context — server-trusted.
h.invCtxLock.RLock()
ns := ""
if h.invCtx != nil {
ns = h.invCtx.Namespace
}
h.invCtxLock.RUnlock()
if ns == "" {
return nil, &serverless.HostFunctionError{
Function: "exec_and_publish",
Cause: fmt.Errorf("no namespace in invocation context"),
}
}
var req dbTransactionRequest
if err := json.Unmarshal(opsJSON, &req); err != nil {
return nil, &serverless.HostFunctionError{
Function: "exec_and_publish",
Cause: fmt.Errorf("invalid ops json: %w", err),
}
}
if len(req.Ops) > rqlite.MaxBatchOps {
return nil, &serverless.HostFunctionError{
Function: "exec_and_publish",
Cause: fmt.Errorf("too many ops: max %d", rqlite.MaxBatchOps),
}
}
batchRes, seq, batchErr := h.db.BatchWithSeq(ctx, ns, req.Ops)
out := execAndPublishResult{}
if batchRes != nil {
out.Results = batchRes.Results
out.Committed = batchRes.Committed
out.FailedIndex = batchRes.FailedIndex
}
// On rollback or pre-publish error, return without publishing.
if batchErr != nil || !out.Committed {
// On a true rollback batchErr may be non-nil; that's already encoded
// in the result. Don't surface as Go error — caller reads `committed`.
_ = batchErr
buf, mErr := json.Marshal(out)
if mErr != nil {
return nil, &serverless.HostFunctionError{Function: "exec_and_publish", Cause: mErr}
}
return buf, nil
}
out.Seq = seq
// Substitute {{seq}} in the data template, then publish.
finalData := bytes.ReplaceAll(
dataTemplate,
[]byte("{{seq}}"),
[]byte(strconv.FormatInt(seq, 10)),
)
if err := h.pubsub.Publish(ctx, topic, finalData); err != nil {
out.PublishError = err.Error()
} else {
out.Published = true
}
buf, mErr := json.Marshal(out)
if mErr != nil {
return nil, &serverless.HostFunctionError{Function: "exec_and_publish", Cause: mErr}
}
return buf, nil
}