diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim index 9bd1bf6..3111b1b 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/ratelimit.nim @@ -4,10 +4,10 @@ import waku/common/rate_limit/token_bucket import chronos type - CapacityState = enum - Normal = 0 - AlmostNone = 1 - None = 2 + CapacityState {.pure.} = enum + Normal + AlmostNone + None SendResult* = enum PassedToSender = 0 @@ -19,23 +19,23 @@ type Critical = 0 Normal = 1 Optional = 2 - + MsgIdMsg = tuple[msgId: string, msg: T] Serializable* = concept x x.toBytes() is seq[byte] MessageSender*[T: Serializable] = - proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.} + proc(msgs: seq[tuple[msgId: string, msg: T]]) {.async.} RateLimitManager*[T: Serializable] = ref object bucket: TokenBucket sender: MessageSender[T] running: bool - queueCritical: Deque[seq[tuple[msgId: string, msg: T]]] + queueCritical: Deque[tuple[msgId: string, msg: T]] queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] sleepDuration: chronos.Duration -proc newRateLimitManager*[T: Serializable]( +proc new*[T: Serializable]( sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), @@ -167,19 +167,19 @@ proc start*[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), -): Future[void] {.async.} = +) {.async.} = manager.running = true - while manager.running: + while true: try: let now = nowProvider() await manager.processCriticalQueue(now) await manager.processNormalQueue(now) # configurable sleep duration for processing queued messages - await sleepAsync(manager.sleepDuration) except Exception as e: echo "Error in queue processing: ", e.msg - await sleepAsync(manager.sleepDuration) + + await sleepAsync(manager.sleepDuration) proc stop*[T: Serializable](manager: RateLimitManager[T]) = manager.running = false