orama/pkg/gateway/request_log_batcher.go
2026-02-13 14:33:11 +02:00

193 lines
4.8 KiB
Go

package gateway
import (
"context"
"fmt"
"strings"
"sync"
"time"
"github.com/DeBrosOfficial/network/pkg/client"
"github.com/DeBrosOfficial/network/pkg/logging"
"go.uber.org/zap"
)
// requestLogEntry holds a single request log to be batched.
type requestLogEntry struct {
method string
path string
statusCode int
bytesOut int
durationMs int64
ip string
apiKey string // raw API key (resolved to ID at flush time in batch)
}
// requestLogBatcher aggregates request logs and flushes them to RQLite in bulk
// instead of issuing 3 DB writes per request (INSERT log + SELECT api_key_id + UPDATE last_used).
type requestLogBatcher struct {
gw *Gateway
entries []requestLogEntry
mu sync.Mutex
interval time.Duration
maxBatch int
stopCh chan struct{}
}
func newRequestLogBatcher(gw *Gateway, interval time.Duration, maxBatch int) *requestLogBatcher {
b := &requestLogBatcher{
gw: gw,
entries: make([]requestLogEntry, 0, maxBatch),
interval: interval,
maxBatch: maxBatch,
stopCh: make(chan struct{}),
}
go b.run()
return b
}
// Add enqueues a log entry. If the buffer is full, it triggers an early flush.
func (b *requestLogBatcher) Add(entry requestLogEntry) {
b.mu.Lock()
b.entries = append(b.entries, entry)
needsFlush := len(b.entries) >= b.maxBatch
b.mu.Unlock()
if needsFlush {
go b.flush()
}
}
// run is the background loop that flushes logs periodically.
func (b *requestLogBatcher) run() {
ticker := time.NewTicker(b.interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
b.flush()
case <-b.stopCh:
b.flush() // final flush on stop
return
}
}
}
// flush writes all buffered log entries to RQLite in a single batch.
func (b *requestLogBatcher) flush() {
b.mu.Lock()
if len(b.entries) == 0 {
b.mu.Unlock()
return
}
batch := b.entries
b.entries = make([]requestLogEntry, 0, b.maxBatch)
b.mu.Unlock()
if b.gw.client == nil {
return
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
db := b.gw.client.Database()
// Collect unique API keys that need ID resolution
apiKeySet := make(map[string]struct{})
for _, e := range batch {
if e.apiKey != "" {
apiKeySet[e.apiKey] = struct{}{}
}
}
// Batch-resolve API key IDs in a single query
apiKeyIDs := make(map[string]int64)
if len(apiKeySet) > 0 {
keys := make([]string, 0, len(apiKeySet))
for k := range apiKeySet {
keys = append(keys, k)
}
placeholders := make([]string, len(keys))
args := make([]interface{}, len(keys))
for i, k := range keys {
placeholders[i] = "?"
args[i] = k
}
q := fmt.Sprintf("SELECT id, key FROM api_keys WHERE key IN (%s)", strings.Join(placeholders, ","))
res, err := db.Query(client.WithInternalAuth(ctx), q, args...)
if err == nil && res != nil {
for _, row := range res.Rows {
if len(row) >= 2 {
var id int64
switch v := row[0].(type) {
case float64:
id = int64(v)
case int64:
id = v
}
if key, ok := row[1].(string); ok && id > 0 {
apiKeyIDs[key] = id
}
}
}
}
}
// Build batch INSERT for request_logs
if len(batch) > 0 {
var sb strings.Builder
sb.WriteString("INSERT INTO request_logs (method, path, status_code, bytes_out, duration_ms, ip, api_key_id) VALUES ")
args := make([]interface{}, 0, len(batch)*7)
for i, e := range batch {
if i > 0 {
sb.WriteString(", ")
}
sb.WriteString("(?, ?, ?, ?, ?, ?, ?)")
var apiKeyID interface{} = nil
if e.apiKey != "" {
if id, ok := apiKeyIDs[e.apiKey]; ok {
apiKeyID = id
}
}
args = append(args, e.method, e.path, e.statusCode, e.bytesOut, e.durationMs, e.ip, apiKeyID)
}
if _, err := db.Query(client.WithInternalAuth(ctx), sb.String(), args...); err != nil && b.gw.logger != nil {
b.gw.logger.ComponentWarn(logging.ComponentGeneral, "failed to flush request logs", zap.Error(err))
}
}
// Batch UPDATE last_used_at for all API keys seen in this batch
if len(apiKeyIDs) > 0 {
ids := make([]string, 0, len(apiKeyIDs))
args := make([]interface{}, 0, len(apiKeyIDs))
for _, id := range apiKeyIDs {
ids = append(ids, "?")
args = append(args, id)
}
q := fmt.Sprintf("UPDATE api_keys SET last_used_at = CURRENT_TIMESTAMP WHERE id IN (%s)", strings.Join(ids, ","))
if _, err := db.Query(client.WithInternalAuth(ctx), q, args...); err != nil && b.gw.logger != nil {
b.gw.logger.ComponentWarn(logging.ComponentGeneral, "failed to update api key last_used_at", zap.Error(err))
}
}
if b.gw.logger != nil {
b.gw.logger.ComponentDebug(logging.ComponentGeneral, "request logs flushed",
zap.Int("count", len(batch)),
zap.Int("api_keys", len(apiKeyIDs)),
)
}
}
// Stop signals the batcher to stop and flush remaining entries.
func (b *requestLogBatcher) Stop() {
close(b.stopCh)
}