nim-chat-sdk/ratelimit/ratelimit_manager.nim

211 lines
6.8 KiB
Nim
Raw Normal View History

2025-08-04 11:48:20 +03:00
import std/[times, options]
2025-08-04 09:04:52 +03:00
# TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live
2025-07-16 15:51:26 +03:00
import ./token_bucket
# import waku/common/rate_limit/token_bucket
2025-08-04 11:31:44 +03:00
import ./store
2025-06-23 14:48:59 +03:00
import chronos
2025-08-04 11:48:20 +03:00
import db_connector/db_sqlite
2025-06-23 14:48:59 +03:00
type
CapacityState {.pure.} = enum
Normal
AlmostNone
None
2025-07-06 13:35:33 +03:00
2025-07-16 19:06:10 +03:00
SendResult* {.pure.} = enum
PassedToSender
Enqueued
Dropped
DroppedBatchTooLarge
Priority* {.pure.} = enum
Critical
Normal
Optional
2025-07-16 18:48:30 +03:00
2025-08-12 12:09:57 +03:00
MsgIdMsg[T] = tuple[msgId: string, msg: T]
2025-07-16 18:48:30 +03:00
2025-08-12 12:09:57 +03:00
MessageSender*[T] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
2025-06-23 14:48:59 +03:00
2025-08-12 12:09:57 +03:00
RateLimitManager*[T] = ref object
2025-08-04 11:31:44 +03:00
store: RateLimitStore[T]
2025-07-06 13:35:33 +03:00
bucket: TokenBucket
sender: MessageSender[T]
sleepDuration: chronos.Duration
2025-07-16 18:48:30 +03:00
pxQueueHandleLoop: Future[void]
2025-07-06 13:35:33 +03:00
2025-08-12 12:09:57 +03:00
proc new*[T](
2025-08-04 11:31:44 +03:00
M: type[RateLimitManager[T]],
store: RateLimitStore[T],
2025-07-06 13:35:33 +03:00
sender: MessageSender[T],
capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
2025-08-04 11:31:44 +03:00
): Future[RateLimitManager[T]] {.async.} =
2025-07-16 15:51:26 +03:00
var current = await store.loadBucketState()
if current.isNone():
# initialize bucket state with full capacity
current = some(
BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now())
)
discard await store.saveBucketState(current.get())
2025-08-04 11:31:44 +03:00
return RateLimitManager[T](
2025-07-16 15:51:26 +03:00
store: store,
bucket: TokenBucket.new(
current.get().budgetCap,
duration,
ReplenishMode.Strict,
current.get().budget,
current.get().lastTimeFull,
),
2025-06-23 14:48:59 +03:00
sender: sender,
2025-07-06 13:35:33 +03:00
sleepDuration: sleepDuration,
2025-06-23 14:48:59 +03:00
)
2025-08-12 12:09:57 +03:00
proc getCapacityState[T](
2025-08-04 11:31:44 +03:00
manager: RateLimitManager[T], now: Moment, count: int = 1
2025-07-06 13:35:33 +03:00
): CapacityState =
2025-07-16 15:51:26 +03:00
let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
2025-07-06 13:35:33 +03:00
let countAfter = budget - count
let ratio = countAfter.float / budgetCap.float
if ratio < 0.0:
return CapacityState.None
elif ratio < 0.3:
return CapacityState.AlmostNone
2025-06-23 14:48:59 +03:00
else:
2025-07-06 13:35:33 +03:00
return CapacityState.Normal
2025-08-12 12:09:57 +03:00
proc passToSender[T](
2025-08-04 11:31:44 +03:00
manager: RateLimitManager[T],
2025-07-16 15:51:26 +03:00
msgs: seq[tuple[msgId: string, msg: T]],
2025-07-06 13:35:33 +03:00
now: Moment,
priority: Priority,
): Future[SendResult] {.async.} =
let count = msgs.len
2025-07-16 15:51:26 +03:00
let consumed = manager.bucket.tryConsume(count, now)
if not consumed:
2025-07-06 13:35:33 +03:00
case priority
of Priority.Critical:
2025-08-12 12:09:57 +03:00
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
2025-07-06 13:35:33 +03:00
return SendResult.Enqueued
of Priority.Normal:
2025-08-12 12:09:57 +03:00
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
2025-07-06 13:35:33 +03:00
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
2025-07-16 15:51:26 +03:00
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
discard await manager.store.saveBucketState(
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
)
2025-07-06 13:35:33 +03:00
await manager.sender(msgs)
return SendResult.PassedToSender
2025-08-12 12:09:57 +03:00
proc processCriticalQueue[T](
2025-08-04 11:31:44 +03:00
manager: RateLimitManager[T], now: Moment
2025-07-16 15:51:26 +03:00
): Future[void] {.async.} =
2025-08-04 11:48:20 +03:00
while manager.store.getQueueLength(QueueType.Critical) > 0:
# Peek at the next batch by getting it, but we'll handle putting it back if needed
let maybeMsgs = await manager.store.popFromQueue(QueueType.Critical)
if maybeMsgs.isNone():
break
let msgs = maybeMsgs.get()
2025-07-06 13:35:33 +03:00
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
discard await manager.passToSender(msgs, now, Priority.Critical)
elif capacityState == CapacityState.AlmostNone:
discard await manager.passToSender(msgs, now, Priority.Critical)
else:
2025-08-04 11:48:20 +03:00
# Put back to critical queue (add to front not possible, so we add to back and exit)
2025-08-12 12:09:57 +03:00
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
2025-07-06 13:35:33 +03:00
break
2025-06-23 14:48:59 +03:00
2025-08-12 12:09:57 +03:00
proc processNormalQueue[T](
2025-08-04 11:31:44 +03:00
manager: RateLimitManager[T], now: Moment
2025-07-16 15:51:26 +03:00
): Future[void] {.async.} =
2025-08-04 11:48:20 +03:00
while manager.store.getQueueLength(QueueType.Normal) > 0:
# Peek at the next batch by getting it, but we'll handle putting it back if needed
let maybeMsgs = await manager.store.popFromQueue(QueueType.Normal)
if maybeMsgs.isNone():
break
let msgs = maybeMsgs.get()
2025-07-06 13:35:33 +03:00
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
discard await manager.passToSender(msgs, now, Priority.Normal)
else:
2025-08-04 11:48:20 +03:00
# Put back to normal queue (add to front not possible, so we add to back and exit)
2025-08-12 12:09:57 +03:00
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
2025-06-23 14:48:59 +03:00
break
2025-08-12 12:09:57 +03:00
proc sendOrEnqueue*[T](
2025-08-04 11:31:44 +03:00
manager: RateLimitManager[T],
2025-07-16 15:51:26 +03:00
msgs: seq[tuple[msgId: string, msg: T]],
2025-07-06 13:35:33 +03:00
priority: Priority,
now: Moment = Moment.now(),
): Future[SendResult] {.async.} =
2025-07-16 15:51:26 +03:00
let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
2025-07-06 13:35:33 +03:00
if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation
return SendResult.DroppedBatchTooLarge
let capacityState = manager.getCapacityState(now, msgs.len)
case capacityState
of CapacityState.Normal:
return await manager.passToSender(msgs, now, priority)
of CapacityState.AlmostNone:
case priority
of Priority.Critical:
return await manager.passToSender(msgs, now, priority)
of Priority.Normal:
2025-08-12 12:09:57 +03:00
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
2025-07-06 13:35:33 +03:00
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
of CapacityState.None:
case priority
of Priority.Critical:
2025-08-12 12:09:57 +03:00
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
2025-07-06 13:35:33 +03:00
return SendResult.Enqueued
of Priority.Normal:
2025-08-12 12:09:57 +03:00
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
2025-07-06 13:35:33 +03:00
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
2025-08-12 12:09:57 +03:00
proc queueHandleLoop*[T](
2025-08-04 11:31:44 +03:00
manager: RateLimitManager[T],
2025-07-06 13:35:33 +03:00
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
while true:
2025-07-06 13:35:33 +03:00
try:
let now = nowProvider()
await manager.processCriticalQueue(now)
await manager.processNormalQueue(now)
except Exception as e:
echo "Error in queue processing: ", e.msg
2025-07-16 18:48:30 +03:00
# configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration)
2025-07-06 13:35:33 +03:00
2025-08-12 12:09:57 +03:00
proc start*[T](
2025-08-04 11:31:44 +03:00
manager: RateLimitManager[T],
2025-07-16 18:48:30 +03:00
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
2025-07-16 15:51:26 +03:00
manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider)
2025-07-16 18:48:30 +03:00
2025-08-12 12:09:57 +03:00
proc stop*[T](manager: RateLimitManager[T]) {.async.} =
2025-07-16 18:48:30 +03:00
if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait()
2025-07-06 13:35:33 +03:00
2025-08-12 12:09:57 +03:00
func `$`*[T](b: RateLimitManager[T]): string {.inline.} =
2025-07-06 13:35:33 +03:00
if isNil(b):
return "nil"
return
2025-08-04 11:48:20 +03:00
"RateLimitManager(critical: " & $b.store.getQueueLength(QueueType.Critical) &
", normal: " & $b.store.getQueueLength(QueueType.Normal) & ")"