nim-chat-sdk/ratelimit/ratelimit_manager.nim
2025-08-12 12:09:57 +03:00

211 lines
6.8 KiB
Nim

import std/[times, options]
# TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live
import ./token_bucket
# import waku/common/rate_limit/token_bucket
import ./store
import chronos
import db_connector/db_sqlite
type
CapacityState {.pure.} = enum
Normal
AlmostNone
None
SendResult* {.pure.} = enum
PassedToSender
Enqueued
Dropped
DroppedBatchTooLarge
Priority* {.pure.} = enum
Critical
Normal
Optional
MsgIdMsg[T] = tuple[msgId: string, msg: T]
MessageSender*[T] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
RateLimitManager*[T] = ref object
store: RateLimitStore[T]
bucket: TokenBucket
sender: MessageSender[T]
sleepDuration: chronos.Duration
pxQueueHandleLoop: Future[void]
proc new*[T](
M: type[RateLimitManager[T]],
store: RateLimitStore[T],
sender: MessageSender[T],
capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
): Future[RateLimitManager[T]] {.async.} =
var current = await store.loadBucketState()
if current.isNone():
# initialize bucket state with full capacity
current = some(
BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now())
)
discard await store.saveBucketState(current.get())
return RateLimitManager[T](
store: store,
bucket: TokenBucket.new(
current.get().budgetCap,
duration,
ReplenishMode.Strict,
current.get().budget,
current.get().lastTimeFull,
),
sender: sender,
sleepDuration: sleepDuration,
)
proc getCapacityState[T](
manager: RateLimitManager[T], now: Moment, count: int = 1
): CapacityState =
let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
let countAfter = budget - count
let ratio = countAfter.float / budgetCap.float
if ratio < 0.0:
return CapacityState.None
elif ratio < 0.3:
return CapacityState.AlmostNone
else:
return CapacityState.Normal
proc passToSender[T](
manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]],
now: Moment,
priority: Priority,
): Future[SendResult] {.async.} =
let count = msgs.len
let consumed = manager.bucket.tryConsume(count, now)
if not consumed:
case priority
of Priority.Critical:
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
return SendResult.Enqueued
of Priority.Normal:
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
discard await manager.store.saveBucketState(
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
)
await manager.sender(msgs)
return SendResult.PassedToSender
proc processCriticalQueue[T](
manager: RateLimitManager[T], now: Moment
): Future[void] {.async.} =
while manager.store.getQueueLength(QueueType.Critical) > 0:
# Peek at the next batch by getting it, but we'll handle putting it back if needed
let maybeMsgs = await manager.store.popFromQueue(QueueType.Critical)
if maybeMsgs.isNone():
break
let msgs = maybeMsgs.get()
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
discard await manager.passToSender(msgs, now, Priority.Critical)
elif capacityState == CapacityState.AlmostNone:
discard await manager.passToSender(msgs, now, Priority.Critical)
else:
# Put back to critical queue (add to front not possible, so we add to back and exit)
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
break
proc processNormalQueue[T](
manager: RateLimitManager[T], now: Moment
): Future[void] {.async.} =
while manager.store.getQueueLength(QueueType.Normal) > 0:
# Peek at the next batch by getting it, but we'll handle putting it back if needed
let maybeMsgs = await manager.store.popFromQueue(QueueType.Normal)
if maybeMsgs.isNone():
break
let msgs = maybeMsgs.get()
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
discard await manager.passToSender(msgs, now, Priority.Normal)
else:
# Put back to normal queue (add to front not possible, so we add to back and exit)
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
break
proc sendOrEnqueue*[T](
manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority,
now: Moment = Moment.now(),
): Future[SendResult] {.async.} =
let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation
return SendResult.DroppedBatchTooLarge
let capacityState = manager.getCapacityState(now, msgs.len)
case capacityState
of CapacityState.Normal:
return await manager.passToSender(msgs, now, priority)
of CapacityState.AlmostNone:
case priority
of Priority.Critical:
return await manager.passToSender(msgs, now, priority)
of Priority.Normal:
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
of CapacityState.None:
case priority
of Priority.Critical:
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
return SendResult.Enqueued
of Priority.Normal:
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
proc queueHandleLoop*[T](
manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
while true:
try:
let now = nowProvider()
await manager.processCriticalQueue(now)
await manager.processNormalQueue(now)
except Exception as e:
echo "Error in queue processing: ", e.msg
# configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration)
proc start*[T](
manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider)
proc stop*[T](manager: RateLimitManager[T]) {.async.} =
if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait()
func `$`*[T](b: RateLimitManager[T]): string {.inline.} =
if isNil(b):
return "nil"
return
"RateLimitManager(critical: " & $b.store.getQueueLength(QueueType.Critical) &
", normal: " & $b.store.getQueueLength(QueueType.Normal) & ")"