import std/[times, deques, options] import waku/common/rate_limit/token_bucket import chronos type CapacityState {.pure.} = enum Normal AlmostNone None SendResult* {.pure.} = enum PassedToSender Enqueued Dropped DroppedBatchTooLarge Priority* {.pure.} = enum Critical Normal Optional Serializable* = concept x x.toBytes() is seq[byte] MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T] MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.} RateLimitManager*[T: Serializable] = ref object bucket: TokenBucket sender: MessageSender[T] queueCritical: Deque[seq[MsgIdMsg[T]]] queueNormal: Deque[seq[MsgIdMsg[T]]] sleepDuration: chronos.Duration pxQueueHandleLoop: Future[void] proc new*[T: Serializable]( M: type[RateLimitManager[T]], sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), sleepDuration: chronos.Duration = chronos.milliseconds(1000), ): M = M( bucket: TokenBucket.newStrict(capacity, duration), sender: sender, queueCritical: Deque[seq[MsgIdMsg[T]]](), queueNormal: Deque[seq[MsgIdMsg[T]]](), sleepDuration: sleepDuration, ) proc getCapacityState[T: Serializable]( manager: RateLimitManager[T], now: Moment, count: int = 1 ): CapacityState = let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now) let countAfter = budget - count let ratio = countAfter.float / budgetCap.float if ratio < 0.0: return CapacityState.None elif ratio < 0.3: return CapacityState.AlmostNone else: return CapacityState.Normal proc passToSender[T: Serializable]( manager: RateLimitManager[T], msgs: seq[MsgIdMsg[T]], now: Moment, priority: Priority, ): Future[SendResult] {.async.} = let count = msgs.len let capacity = manager.bucket.tryConsume(count, now) if not capacity: case priority of Priority.Critical: manager.queueCritical.addLast(msgs) return SendResult.Enqueued of Priority.Normal: manager.queueNormal.addLast(msgs) return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped await manager.sender(msgs) return SendResult.PassedToSender proc processCriticalQueue[T: Serializable]( manager: RateLimitManager[T], now: Moment ): Future[void] {.async.} = while manager.queueCritical.len > 0: let msgs = manager.queueCritical.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) if capacityState == CapacityState.Normal: discard await manager.passToSender(msgs, now, Priority.Critical) elif capacityState == CapacityState.AlmostNone: discard await manager.passToSender(msgs, now, Priority.Critical) else: # add back to critical queue manager.queueCritical.addFirst(msgs) break proc processNormalQueue[T: Serializable]( manager: RateLimitManager[T], now: Moment ): Future[void] {.async.} = while manager.queueNormal.len > 0: let msgs = manager.queueNormal.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) if capacityState == CapacityState.Normal: discard await manager.passToSender(msgs, now, Priority.Normal) else: # add back to critical queue manager.queueNormal.addFirst(msgs) break proc sendOrEnqueue*[T: Serializable]( manager: RateLimitManager[T], msgs: seq[MsgIdMsg[T]], priority: Priority, now: Moment = Moment.now(), ): Future[SendResult] {.async.} = let (_, budgetCap) = manager.bucket.getAvailableCapacity(now) if msgs.len.float / budgetCap.float >= 0.3: # drop batch if it's too large to avoid starvation return SendResult.DroppedBatchTooLarge let capacityState = manager.getCapacityState(now, msgs.len) case capacityState of CapacityState.Normal: return await manager.passToSender(msgs, now, priority) of CapacityState.AlmostNone: case priority of Priority.Critical: return await manager.passToSender(msgs, now, priority) of Priority.Normal: manager.queueNormal.addLast(msgs) return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped of CapacityState.None: case priority of Priority.Critical: manager.queueCritical.addLast(msgs) return SendResult.Enqueued of Priority.Normal: manager.queueNormal.addLast(msgs) return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped proc getEnqueued*[T: Serializable]( manager: RateLimitManager[T] ): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] = var criticalMsgs: seq[MsgIdMsg[T]] var normalMsgs: seq[MsgIdMsg[T]] for batch in manager.queueCritical: criticalMsgs.add(batch) for batch in manager.queueNormal: normalMsgs.add(batch) return (criticalMsgs, normalMsgs) proc startQueueHandleLoop*[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = while true: try: let now = nowProvider() await manager.processCriticalQueue(now) await manager.processNormalQueue(now) except Exception as e: echo "Error in queue processing: ", e.msg # configurable sleep duration for processing queued messages await sleepAsync(manager.sleepDuration) proc start*[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = manager.pxQueueHandleLoop = manager.startQueueHandleLoop(nowProvider) proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} = if not isNil(manager.pxQueueHandleLoop): await manager.pxQueueHandleLoop.cancelAndWait() func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = if isNil(b): return "nil" return "RateLimitManager(critical: " & $b.queueCritical.len & ", normal: " & $b.queueNormal.len & ")"