import std/[times, deques, options] # import ./token_bucket import waku/common/rate_limit/token_bucket import chronos type CapacityState = enum Normal = 0 AlmostNone = 1 None = 2 SendResult* = enum PassedToSender = 0 Enqueued = 1 Dropped = 2 DroppedBatchTooLarge = 3 Priority* = enum Critical = 0 Normal = 1 Optional = 2 Serializable* = concept x x.toBytes() is seq[byte] MessageSender*[T: Serializable] = proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.} RateLimitManager*[T: Serializable] = ref object bucket: TokenBucket sender: MessageSender[T] running: bool queueCritical: Deque[seq[tuple[msgId: string, msg: T]]] queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] sleepDuration: chronos.Duration proc newRateLimitManager*[T: Serializable]( sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), sleepDuration: chronos.Duration = chronos.milliseconds(1000), ): RateLimitManager[T] = RateLimitManager[T]( bucket: TokenBucket.newStrict(capacity, duration), sender: sender, running: false, queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](), queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](), sleepDuration: sleepDuration, ) proc getCapacityState[T: Serializable]( manager: RateLimitManager[T], now: Moment, count: int = 1 ): CapacityState = let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now) let countAfter = budget - count let ratio = countAfter.float / budgetCap.float if ratio < 0.0: return CapacityState.None elif ratio < 0.3: return CapacityState.AlmostNone else: return CapacityState.Normal proc passToSender[T: Serializable]( manager: RateLimitManager[T], msgs: seq[tuple[msgId: string, msg: T]], now: Moment, priority: Priority, ): Future[SendResult] {.async.} = let count = msgs.len let capacity = manager.bucket.tryConsume(count, now) if not capacity: case priority of Priority.Critical: manager.queueCritical.addLast(msgs) return SendResult.Enqueued of Priority.Normal: manager.queueNormal.addLast(msgs) return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped await manager.sender(msgs) return SendResult.PassedToSender proc processCriticalQueue[T: Serializable]( manager: RateLimitManager[T], now: Moment ): Future[void] {.async.} = while manager.queueCritical.len > 0: let msgs = manager.queueCritical.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) if capacityState == CapacityState.Normal: discard await manager.passToSender(msgs, now, Priority.Critical) elif capacityState == CapacityState.AlmostNone: discard await manager.passToSender(msgs, now, Priority.Critical) else: # add back to critical queue manager.queueCritical.addFirst(msgs) break proc processNormalQueue[T: Serializable]( manager: RateLimitManager[T], now: Moment ): Future[void] {.async.} = while manager.queueNormal.len > 0: let msgs = manager.queueNormal.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) if capacityState == CapacityState.Normal: discard await manager.passToSender(msgs, now, Priority.Normal) else: # add back to critical queue manager.queueNormal.addFirst(msgs) break proc sendOrEnqueue*[T: Serializable]( manager: RateLimitManager[T], msgs: seq[tuple[msgId: string, msg: T]], priority: Priority, now: Moment = Moment.now(), ): Future[SendResult] {.async.} = let (_, budgetCap) = manager.bucket.getAvailableCapacity(now) if msgs.len.float / budgetCap.float >= 0.3: # drop batch if it's too large to avoid starvation return SendResult.DroppedBatchTooLarge let capacityState = manager.getCapacityState(now, msgs.len) case capacityState of CapacityState.Normal: return await manager.passToSender(msgs, now, priority) of CapacityState.AlmostNone: case priority of Priority.Critical: return await manager.passToSender(msgs, now, priority) of Priority.Normal: manager.queueNormal.addLast(msgs) return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped of CapacityState.None: case priority of Priority.Critical: manager.queueCritical.addLast(msgs) return SendResult.Enqueued of Priority.Normal: manager.queueNormal.addLast(msgs) return SendResult.Enqueued of Priority.Optional: return SendResult.Dropped proc getEnqueued*[T: Serializable]( manager: RateLimitManager[T] ): tuple[ critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] ] = var criticalMsgs: seq[tuple[msgId: string, msg: T]] var normalMsgs: seq[tuple[msgId: string, msg: T]] for batch in manager.queueCritical: criticalMsgs.add(batch) for batch in manager.queueNormal: normalMsgs.add(batch) return (criticalMsgs, normalMsgs) proc start*[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ): Future[void] {.async.} = manager.running = true while manager.running: try: let now = nowProvider() await manager.processCriticalQueue(now) await manager.processNormalQueue(now) # configurable sleep duration for processing queued messages await sleepAsync(manager.sleepDuration) except Exception as e: echo "Error in queue processing: ", e.msg await sleepAsync(manager.sleepDuration) proc stop*[T: Serializable](manager: RateLimitManager[T]) = manager.running = false func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = if isNil(b): return "nil" return "RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len & ", normal: " & $b.queueNormal.len & ")"