mirror of
https://github.com/DeBrosOfficial/orama.git
synced 2026-03-17 14:36:58 +00:00
189 lines
4.5 KiB
Go
189 lines
4.5 KiB
Go
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/DeBrosOfficial/network/pkg/client"
|
|
"github.com/DeBrosOfficial/network/pkg/logging"
|
|
"go.uber.org/zap"
|
|
)
|
|
|
|
// requestLogEntry holds a single request log to be batched.
|
|
type requestLogEntry struct {
|
|
method string
|
|
path string
|
|
statusCode int
|
|
bytesOut int
|
|
durationMs int64
|
|
ip string
|
|
apiKey string // raw API key (resolved to ID at flush time in batch)
|
|
}
|
|
|
|
// requestLogBatcher aggregates request logs and flushes them to RQLite in bulk
|
|
// instead of issuing 3 DB writes per request (INSERT log + SELECT api_key_id + UPDATE last_used).
|
|
type requestLogBatcher struct {
|
|
gw *Gateway
|
|
entries []requestLogEntry
|
|
mu sync.Mutex
|
|
interval time.Duration
|
|
maxBatch int
|
|
stopCh chan struct{}
|
|
}
|
|
|
|
func newRequestLogBatcher(gw *Gateway, interval time.Duration, maxBatch int) *requestLogBatcher {
|
|
b := &requestLogBatcher{
|
|
gw: gw,
|
|
entries: make([]requestLogEntry, 0, maxBatch),
|
|
interval: interval,
|
|
maxBatch: maxBatch,
|
|
stopCh: make(chan struct{}),
|
|
}
|
|
go b.run()
|
|
return b
|
|
}
|
|
|
|
// Add enqueues a log entry. If the buffer is full, it triggers an early flush.
|
|
func (b *requestLogBatcher) Add(entry requestLogEntry) {
|
|
b.mu.Lock()
|
|
b.entries = append(b.entries, entry)
|
|
needsFlush := len(b.entries) >= b.maxBatch
|
|
b.mu.Unlock()
|
|
|
|
if needsFlush {
|
|
go b.flush()
|
|
}
|
|
}
|
|
|
|
// run is the background loop that flushes logs periodically.
|
|
func (b *requestLogBatcher) run() {
|
|
ticker := time.NewTicker(b.interval)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
b.flush()
|
|
case <-b.stopCh:
|
|
b.flush() // final flush on stop
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// flush writes all buffered log entries to RQLite in a single batch.
|
|
func (b *requestLogBatcher) flush() {
|
|
b.mu.Lock()
|
|
if len(b.entries) == 0 {
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
batch := b.entries
|
|
b.entries = make([]requestLogEntry, 0, b.maxBatch)
|
|
b.mu.Unlock()
|
|
|
|
if b.gw.client == nil {
|
|
return
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
|
defer cancel()
|
|
|
|
db := b.gw.client.Database()
|
|
|
|
// Collect unique API keys that need ID resolution
|
|
apiKeySet := make(map[string]struct{})
|
|
for _, e := range batch {
|
|
if e.apiKey != "" {
|
|
apiKeySet[e.apiKey] = struct{}{}
|
|
}
|
|
}
|
|
|
|
// Batch-resolve API key IDs in a single query
|
|
apiKeyIDs := make(map[string]int64)
|
|
if len(apiKeySet) > 0 {
|
|
keys := make([]string, 0, len(apiKeySet))
|
|
for k := range apiKeySet {
|
|
keys = append(keys, k)
|
|
}
|
|
|
|
placeholders := make([]string, len(keys))
|
|
args := make([]interface{}, len(keys))
|
|
for i, k := range keys {
|
|
placeholders[i] = "?"
|
|
args[i] = k
|
|
}
|
|
|
|
q := fmt.Sprintf("SELECT id, key FROM api_keys WHERE key IN (%s)", strings.Join(placeholders, ","))
|
|
res, err := db.Query(client.WithInternalAuth(ctx), q, args...)
|
|
if err == nil && res != nil {
|
|
for _, row := range res.Rows {
|
|
if len(row) >= 2 {
|
|
var id int64
|
|
switch v := row[0].(type) {
|
|
case float64:
|
|
id = int64(v)
|
|
case int64:
|
|
id = v
|
|
}
|
|
if key, ok := row[1].(string); ok && id > 0 {
|
|
apiKeyIDs[key] = id
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Build batch INSERT for request_logs
|
|
if len(batch) > 0 {
|
|
var sb strings.Builder
|
|
sb.WriteString("INSERT INTO request_logs (method, path, status_code, bytes_out, duration_ms, ip, api_key_id) VALUES ")
|
|
|
|
args := make([]interface{}, 0, len(batch)*7)
|
|
for i, e := range batch {
|
|
if i > 0 {
|
|
sb.WriteString(", ")
|
|
}
|
|
sb.WriteString("(?, ?, ?, ?, ?, ?, ?)")
|
|
|
|
var apiKeyID interface{} = nil
|
|
if e.apiKey != "" {
|
|
if id, ok := apiKeyIDs[e.apiKey]; ok {
|
|
apiKeyID = id
|
|
}
|
|
}
|
|
args = append(args, e.method, e.path, e.statusCode, e.bytesOut, e.durationMs, e.ip, apiKeyID)
|
|
}
|
|
|
|
_, _ = db.Query(client.WithInternalAuth(ctx), sb.String(), args...)
|
|
}
|
|
|
|
// Batch UPDATE last_used_at for all API keys seen in this batch
|
|
if len(apiKeyIDs) > 0 {
|
|
ids := make([]string, 0, len(apiKeyIDs))
|
|
args := make([]interface{}, 0, len(apiKeyIDs))
|
|
for _, id := range apiKeyIDs {
|
|
ids = append(ids, "?")
|
|
args = append(args, id)
|
|
}
|
|
|
|
q := fmt.Sprintf("UPDATE api_keys SET last_used_at = CURRENT_TIMESTAMP WHERE id IN (%s)", strings.Join(ids, ","))
|
|
_, _ = db.Query(client.WithInternalAuth(ctx), q, args...)
|
|
}
|
|
|
|
if b.gw.logger != nil {
|
|
b.gw.logger.ComponentDebug(logging.ComponentGeneral, "request logs flushed",
|
|
zap.Int("count", len(batch)),
|
|
zap.Int("api_keys", len(apiKeyIDs)),
|
|
)
|
|
}
|
|
}
|
|
|
|
// Stop signals the batcher to stop and flush remaining entries.
|
|
func (b *requestLogBatcher) Stop() {
|
|
close(b.stopCh)
|
|
}
|