diff --git a/ratelimit/ratelimit.nim b/ratelimit/rate_limit_manager.nim similarity index 80% rename from ratelimit/ratelimit.nim rename to ratelimit/rate_limit_manager.nim index 3111b1b..80c2362 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/rate_limit_manager.nim @@ -1,5 +1,4 @@ import std/[times, deques, options] -# import ./token_bucket import waku/common/rate_limit/token_bucket import chronos @@ -19,34 +18,35 @@ type Critical = 0 Normal = 1 Optional = 2 - MsgIdMsg = tuple[msgId: string, msg: T] + Serializable* = concept x x.toBytes() is seq[byte] - MessageSender*[T: Serializable] = - proc(msgs: seq[tuple[msgId: string, msg: T]]) {.async.} + MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T] + + MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.} RateLimitManager*[T: Serializable] = ref object bucket: TokenBucket sender: MessageSender[T] - running: bool - queueCritical: Deque[tuple[msgId: string, msg: T]] - queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] + queueCritical: Deque[seq[MsgIdMsg[T]]] + queueNormal: Deque[seq[MsgIdMsg[T]]] sleepDuration: chronos.Duration + pxQueueHandleLoop: Future[void] proc new*[T: Serializable]( + M: type[RateLimitManager[T]], sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), sleepDuration: chronos.Duration = chronos.milliseconds(1000), -): RateLimitManager[T] = - RateLimitManager[T]( +): M = + M( bucket: TokenBucket.newStrict(capacity, duration), sender: sender, - running: false, - queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](), - queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](), + queueCritical: Deque[seq[MsgIdMsg[T]]](), + queueNormal: Deque[seq[MsgIdMsg[T]]](), sleepDuration: sleepDuration, ) @@ -65,7 +65,7 @@ proc getCapacityState[T: Serializable]( proc passToSender[T: Serializable]( manager: RateLimitManager[T], - msgs: seq[tuple[msgId: string, msg: T]], + msgs: seq[MsgIdMsg[T]], now: Moment, priority: Priority, ): Future[SendResult] {.async.} = @@ -114,7 +114,7 @@ proc processNormalQueue[T: Serializable]( proc sendOrEnqueue*[T: Serializable]( manager: RateLimitManager[T], - msgs: seq[tuple[msgId: string, msg: T]], + msgs: seq[MsgIdMsg[T]], priority: Priority, now: Moment = Moment.now(), ): Future[SendResult] {.async.} = @@ -149,11 +149,9 @@ proc sendOrEnqueue*[T: Serializable]( proc getEnqueued*[T: Serializable]( manager: RateLimitManager[T] -): tuple[ - critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] -] = - var criticalMsgs: seq[tuple[msgId: string, msg: T]] - var normalMsgs: seq[tuple[msgId: string, msg: T]] +): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] = + var criticalMsgs: seq[MsgIdMsg[T]] + var normalMsgs: seq[MsgIdMsg[T]] for batch in manager.queueCritical: criticalMsgs.add(batch) @@ -163,30 +161,36 @@ proc getEnqueued*[T: Serializable]( return (criticalMsgs, normalMsgs) -proc start*[T: Serializable]( +proc startQueueHandleLoop*[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = - manager.running = true while true: try: let now = nowProvider() await manager.processCriticalQueue(now) await manager.processNormalQueue(now) - - # configurable sleep duration for processing queued messages except Exception as e: echo "Error in queue processing: ", e.msg + # configurable sleep duration for processing queued messages await sleepAsync(manager.sleepDuration) -proc stop*[T: Serializable](manager: RateLimitManager[T]) = - manager.running = false +proc start*[T: Serializable]( + manager: RateLimitManager[T], + nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = + Moment.now(), +) {.async.} = + manager.pxQueueHandleLoop = manager.startQueueHandleLoop(nowProvider) + +proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} = + if not isNil(manager.pxQueueHandleLoop): + await manager.pxQueueHandleLoop.cancelAndWait() func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = if isNil(b): return "nil" return - "RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len & - ", normal: " & $b.queueNormal.len & ")" + "RateLimitManager(critical: " & $b.queueCritical.len & ", normal: " & + $b.queueNormal.len & ")" diff --git a/tests/test_ratelimit.nim b/tests/test_rate_limit_manager.nim similarity index 81% rename from tests/test_ratelimit.nim rename to tests/test_rate_limit_manager.nim index f38ddbf..6c34c54 100644 --- a/tests/test_ratelimit.nim +++ b/tests/test_rate_limit_manager.nim @@ -1,9 +1,6 @@ -{.used.} - import testutils/unittests -import ../ratelimit/ratelimit +import ../ratelimit/rate_limit_manager import chronos -import strutils # Implement the Serializable concept for string proc toBytes*(s: string): seq[byte] = @@ -25,7 +22,7 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - immediate send when capacity available": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100) ) let testMsg = "Hello World" @@ -43,7 +40,7 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - multiple messages": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100) ) @@ -61,40 +58,9 @@ suite "Queue RateLimitManager": sentMessages[1].msgId == "msg2" sentMessages[1].msg == "Second" - asyncTest "start and stop - basic functionality": - ## Given - let manager = newRateLimitManager[string]( - mockSender, - capacity = 10, - duration = chronos.milliseconds(100), - sleepDuration = chronos.milliseconds(50), - ) - - ## When - start the manager - let startFut = manager.start() - - # Give it some time to start - await sleepAsync(chronos.milliseconds(20)) - - ## Then - manager should be running (check via string representation) - check: - "running: true" in $manager - not startFut.finished() # should still be running - - ## When - stop the manager - manager.stop() - - # Give it time to stop (should happen within one sleep cycle) - await sleepAsync(chronos.milliseconds(60)) - - ## Then - manager should be stopped - check: - "running: false" in $manager - startFut.finished() # should have completed - asyncTest "start and stop - drop large batch": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 2, duration = chronos.milliseconds(100), @@ -109,17 +75,19 @@ suite "Queue RateLimitManager": asyncTest "enqueue - enqueue critical only when exceeded": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100), sleepDuration = chronos.milliseconds(100), ) var now = Moment.now() - let startFut = manager.start( + await manager.start( nowProvider = proc(): Moment = now ) + defer: + await manager.stop() let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now) let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now) @@ -153,27 +121,21 @@ suite "Queue RateLimitManager": normal.len == 0 critical[0].msgId == "msg11" - # Give it time to stop - manager.stop() - await sleepAsync(chronos.milliseconds(150)) - ## Then - should be stopped - check: - "running: false" in $manager - startFut.finished() - asyncTest "enqueue - enqueue normal on 70% capacity": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100), sleepDuration = chronos.milliseconds(100), ) var now = Moment.now() - let startFut = manager.start( + await manager.start( nowProvider = proc(): Moment = now ) + defer: + await manager.stop() let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now) let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now) @@ -210,17 +172,9 @@ suite "Queue RateLimitManager": normal[1].msgId == "msg9" normal[2].msgId == "msg10" - # Give it time to stop - manager.stop() - await sleepAsync(chronos.milliseconds(150)) - ## Then - should be stopped - check: - "running: false" in $manager - startFut.finished() - asyncTest "enqueue - process queued messages": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(200), @@ -231,10 +185,12 @@ suite "Queue RateLimitManager": var now = Moment.now() var nowRef = MomentRef(value: now) - let startFut = manager.start( + await manager.start( nowProvider = proc(): Moment = return nowRef.value ) + defer: + await manager.stop() let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now) let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now) @@ -301,11 +257,3 @@ suite "Queue RateLimitManager": sentMessages[11].msgId == "8" sentMessages[12].msgId == "9" sentMessages[13].msgId == "10" - - # Give it time to stop - manager.stop() - await sleepAsync(chronos.milliseconds(250)) - ## Then - should be stopped - check: - "running: false" in $manager - startFut.finished()