import std/[times, options, tables, sequtils] # TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live import ./token_bucket # import waku/common/rate_limit/token_bucket import ./store import chronos import db_connector/db_sqlite type CapacityState {.pure.} = enum Normal AlmostNone None Priority* {.pure.} = enum Critical Normal Optional MsgIdMsg[T] = tuple[msgId: string, msg: T] MessageSender*[T] = proc(msgs: seq[MsgIdMsg[T]]) {.async.} RateLimitManager*[T] = ref object store: RateLimitStore[T] bucket: TokenBucket sender: MessageSender[T] sleepDuration: chronos.Duration pxQueueHandleLoop: Future[void] proc new*[T]( M: type[RateLimitManager[T]], store: RateLimitStore[T], sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), sleepDuration: chronos.Duration = chronos.milliseconds(1000), ): Future[RateLimitManager[T]] {.async.} = var current = await store.loadBucketState() if current.isNone(): # initialize bucket state with full capacity current = some( BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now()) ) discard await store.saveBucketState(current.get()) return RateLimitManager[T]( store: store, bucket: TokenBucket.new( current.get().budgetCap, duration, ReplenishMode.Strict, current.get().budget, current.get().lastTimeFull, ), sender: sender, sleepDuration: sleepDuration, ) proc getCapacityState[T]( manager: RateLimitManager[T], now: Moment, count: int = 1 ): CapacityState = let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now) let countAfter = budget - count let ratio = countAfter.float / budgetCap.float if ratio < 0.0: return CapacityState.None elif ratio < 0.3: return CapacityState.AlmostNone else: return CapacityState.Normal proc updateStatuses[T]( manager: RateLimitManager[T], msgs: seq[tuple[msgId: string, msg: T]], status: MessageStatus, ): Future[MessageStatus] {.async.} = let msgIds: seq[string] = msgs.mapIt(it.msgId) # TODO log failed to update message statuses (if it occurs) think of a logging strategy discard await manager.store.updateMessageStatuses(msgIds, status) return status proc pushToQueueUpdatingStatus[T]( manager: RateLimitManager[T], queueType: QueueType, msgs: seq[tuple[msgId: string, msg: T]], ): Future[MessageStatus] {.async.} = ## Pushes to the queue and updates the status of the messages let success = await manager.store.pushToQueue(queueType, msgs) let status = if success: MessageStatus.Enqueued else: MessageStatus.DroppedFailedToEnqueue return await manager.updateStatuses(msgs, status) proc passToSender[T]( manager: RateLimitManager[T], msgs: seq[tuple[msgId: string, msg: T]], now: Moment, priority: Priority, ): Future[MessageStatus] {.async.} = let count = msgs.len let consumed = manager.bucket.tryConsume(count, now) if not consumed: case priority of Priority.Critical: return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs) of Priority.Normal: return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) of Priority.Optional: return await manager.updateStatuses(msgs, MessageStatus.Dropped) let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now) discard await manager.store.saveBucketState( BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull) ) await manager.sender(msgs) return await manager.updateStatuses(msgs, MessageStatus.PassedToSender) proc processCriticalQueue[T]( manager: RateLimitManager[T], now: Moment ): Future[void] {.async.} = while manager.store.getQueueLength(QueueType.Critical) > 0: # Peek at the next batch by getting it, but we'll handle putting it back if needed let maybeMsgs = await manager.store.popFromQueue(QueueType.Critical) if maybeMsgs.isNone(): break let msgs = maybeMsgs.get() let capacityState = manager.getCapacityState(now, msgs.len) if capacityState == CapacityState.Normal: discard await manager.passToSender(msgs, now, Priority.Critical) elif capacityState == CapacityState.AlmostNone: discard await manager.passToSender(msgs, now, Priority.Critical) else: # Put back to critical queue (add to front not possible, so we add to back and exit) # I can safely discard the return since the status will be persisted discard await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs) break proc processNormalQueue[T]( manager: RateLimitManager[T], now: Moment ): Future[void] {.async.} = while manager.store.getQueueLength(QueueType.Normal) > 0: # Peek at the next batch by getting it, but we'll handle putting it back if needed let maybeMsgs = await manager.store.popFromQueue(QueueType.Normal) if maybeMsgs.isNone(): break let msgs = maybeMsgs.get() let capacityState = manager.getCapacityState(now, msgs.len) if capacityState == CapacityState.Normal: # I can safely discard the return since the status will be persisted discard await manager.passToSender(msgs, now, Priority.Normal) else: # Put back to normal queue (add to front not possible, so we add to back and exit) # I can safely discard the return since the status will be persisted discard await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) break proc sendOrEnqueue*[T]( manager: RateLimitManager[T], msgs: seq[tuple[msgId: string, msg: T]], priority: Priority, now: Moment = Moment.now(), ): Future[MessageStatus] {.async.} = let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now) if msgs.len.float / budgetCap.float >= 0.3: # drop batch if it's too large to avoid starvation return await manager.updateStatuses(msgs, MessageStatus.DroppedBatchTooLarge) let capacityState = manager.getCapacityState(now, msgs.len) case capacityState of CapacityState.Normal: return await manager.passToSender(msgs, now, priority) of CapacityState.AlmostNone: case priority of Priority.Critical: return await manager.passToSender(msgs, now, priority) of Priority.Normal: return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) of Priority.Optional: return MessageStatus.Dropped of CapacityState.None: case priority of Priority.Critical: return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs) of Priority.Normal: return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) of Priority.Optional: return await manager.updateStatuses(msgs, MessageStatus.Dropped) proc queueHandleLoop[T]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = while true: try: let now = nowProvider() await manager.processCriticalQueue(now) await manager.processNormalQueue(now) except Exception as e: echo "Error in queue processing: ", e.msg # configurable sleep duration for processing queued messages await sleepAsync(manager.sleepDuration) proc start*[T]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider) proc stop*[T](manager: RateLimitManager[T]) {.async.} = if not isNil(manager.pxQueueHandleLoop): await manager.pxQueueHandleLoop.cancelAndWait() proc getQuota*[T]( manager: RateLimitManager[T], now: Moment = Moment.now() ): tuple[budget: int, budgetCap: int] = let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now) return (budget, budgetCap) proc getMessageStatus*[T]( manager: RateLimitManager[T], msgId: string ): Future[Option[MessageStatus]] {.async.} = return await manager.store.getMessageStatus(msgId) func `$`*[T](b: RateLimitManager[T]): string {.inline.} = if isNil(b): return "nil" return "RateLimitManager(critical: " & $b.store.getQueueLength(QueueType.Critical) & ", normal: " & $b.store.getQueueLength(QueueType.Normal) & ")"