From 6b514f6f216ead66e3a30dfebcb7470f7e3967cd Mon Sep 17 00:00:00 2001
From: pablo
Date: Wed, 16 Jul 2025 18:48:30 +0300
Subject: [PATCH] fix: pr comments
---
.../{ratelimit.nim => rate_limit_manager.nim} | 58 +++++++------
...elimit.nim => test_rate_limit_manager.nim} | 84 ++++---------------
2 files changed, 47 insertions(+), 95 deletions(-)
rename ratelimit/{ratelimit.nim => rate_limit_manager.nim} (80%)
rename tests/{test_ratelimit.nim => test_rate_limit_manager.nim} (81%)
diff --git a/ratelimit/ratelimit.nim b/ratelimit/rate_limit_manager.nim
similarity index 80%
rename from ratelimit/ratelimit.nim
rename to ratelimit/rate_limit_manager.nim
index 3111b1b..80c2362 100644
--- a/ratelimit/ratelimit.nim
+++ b/ratelimit/rate_limit_manager.nim
@@ -1,5 +1,4 @@
import std/[times, deques, options]
-# import ./token_bucket
import waku/common/rate_limit/token_bucket
import chronos
@@ -19,34 +18,35 @@ type
Critical = 0
Normal = 1
Optional = 2
- MsgIdMsg = tuple[msgId: string, msg: T]
+
Serializable* =
concept x
x.toBytes() is seq[byte]
- MessageSender*[T: Serializable] =
- proc(msgs: seq[tuple[msgId: string, msg: T]]) {.async.}
+ MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T]
+
+ MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
RateLimitManager*[T: Serializable] = ref object
bucket: TokenBucket
sender: MessageSender[T]
- running: bool
- queueCritical: Deque[tuple[msgId: string, msg: T]]
- queueNormal: Deque[seq[tuple[msgId: string, msg: T]]]
+ queueCritical: Deque[seq[MsgIdMsg[T]]]
+ queueNormal: Deque[seq[MsgIdMsg[T]]]
sleepDuration: chronos.Duration
+ pxQueueHandleLoop: Future[void]
proc new*[T: Serializable](
+ M: type[RateLimitManager[T]],
sender: MessageSender[T],
capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
-): RateLimitManager[T] =
- RateLimitManager[T](
+): M =
+ M(
bucket: TokenBucket.newStrict(capacity, duration),
sender: sender,
- running: false,
- queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](),
- queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](),
+ queueCritical: Deque[seq[MsgIdMsg[T]]](),
+ queueNormal: Deque[seq[MsgIdMsg[T]]](),
sleepDuration: sleepDuration,
)
@@ -65,7 +65,7 @@ proc getCapacityState[T: Serializable](
proc passToSender[T: Serializable](
manager: RateLimitManager[T],
- msgs: seq[tuple[msgId: string, msg: T]],
+ msgs: seq[MsgIdMsg[T]],
now: Moment,
priority: Priority,
): Future[SendResult] {.async.} =
@@ -114,7 +114,7 @@ proc processNormalQueue[T: Serializable](
proc sendOrEnqueue*[T: Serializable](
manager: RateLimitManager[T],
- msgs: seq[tuple[msgId: string, msg: T]],
+ msgs: seq[MsgIdMsg[T]],
priority: Priority,
now: Moment = Moment.now(),
): Future[SendResult] {.async.} =
@@ -149,11 +149,9 @@ proc sendOrEnqueue*[T: Serializable](
proc getEnqueued*[T: Serializable](
manager: RateLimitManager[T]
-): tuple[
- critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]]
-] =
- var criticalMsgs: seq[tuple[msgId: string, msg: T]]
- var normalMsgs: seq[tuple[msgId: string, msg: T]]
+): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] =
+ var criticalMsgs: seq[MsgIdMsg[T]]
+ var normalMsgs: seq[MsgIdMsg[T]]
for batch in manager.queueCritical:
criticalMsgs.add(batch)
@@ -163,30 +161,36 @@ proc getEnqueued*[T: Serializable](
return (criticalMsgs, normalMsgs)
-proc start*[T: Serializable](
+proc startQueueHandleLoop*[T: Serializable](
manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
- manager.running = true
while true:
try:
let now = nowProvider()
await manager.processCriticalQueue(now)
await manager.processNormalQueue(now)
-
- # configurable sleep duration for processing queued messages
except Exception as e:
echo "Error in queue processing: ", e.msg
+ # configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration)
-proc stop*[T: Serializable](manager: RateLimitManager[T]) =
- manager.running = false
+proc start*[T: Serializable](
+ manager: RateLimitManager[T],
+ nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
+ Moment.now(),
+) {.async.} =
+ manager.pxQueueHandleLoop = manager.startQueueHandleLoop(nowProvider)
+
+proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} =
+ if not isNil(manager.pxQueueHandleLoop):
+ await manager.pxQueueHandleLoop.cancelAndWait()
func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} =
if isNil(b):
return "nil"
return
- "RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len &
- ", normal: " & $b.queueNormal.len & ")"
+ "RateLimitManager(critical: " & $b.queueCritical.len & ", normal: " &
+ $b.queueNormal.len & ")"
diff --git a/tests/test_ratelimit.nim b/tests/test_rate_limit_manager.nim
similarity index 81%
rename from tests/test_ratelimit.nim
rename to tests/test_rate_limit_manager.nim
index f38ddbf..6c34c54 100644
--- a/tests/test_ratelimit.nim
+++ b/tests/test_rate_limit_manager.nim
@@ -1,9 +1,6 @@
-{.used.}
-
import testutils/unittests
-import ../ratelimit/ratelimit
+import ../ratelimit/rate_limit_manager
import chronos
-import strutils
# Implement the Serializable concept for string
proc toBytes*(s: string): seq[byte] =
@@ -25,7 +22,7 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - immediate send when capacity available":
## Given
- let manager = newRateLimitManager[string](
+ let manager = RateLimitManager[string].new(
mockSender, capacity = 10, duration = chronos.milliseconds(100)
)
let testMsg = "Hello World"
@@ -43,7 +40,7 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - multiple messages":
## Given
- let manager = newRateLimitManager[string](
+ let manager = RateLimitManager[string].new(
mockSender, capacity = 10, duration = chronos.milliseconds(100)
)
@@ -61,40 +58,9 @@ suite "Queue RateLimitManager":
sentMessages[1].msgId == "msg2"
sentMessages[1].msg == "Second"
- asyncTest "start and stop - basic functionality":
- ## Given
- let manager = newRateLimitManager[string](
- mockSender,
- capacity = 10,
- duration = chronos.milliseconds(100),
- sleepDuration = chronos.milliseconds(50),
- )
-
- ## When - start the manager
- let startFut = manager.start()
-
- # Give it some time to start
- await sleepAsync(chronos.milliseconds(20))
-
- ## Then - manager should be running (check via string representation)
- check:
- "running: true" in $manager
- not startFut.finished() # should still be running
-
- ## When - stop the manager
- manager.stop()
-
- # Give it time to stop (should happen within one sleep cycle)
- await sleepAsync(chronos.milliseconds(60))
-
- ## Then - manager should be stopped
- check:
- "running: false" in $manager
- startFut.finished() # should have completed
-
asyncTest "start and stop - drop large batch":
## Given
- let manager = newRateLimitManager[string](
+ let manager = RateLimitManager[string].new(
mockSender,
capacity = 2,
duration = chronos.milliseconds(100),
@@ -109,17 +75,19 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue critical only when exceeded":
## Given
- let manager = newRateLimitManager[string](
+ let manager = RateLimitManager[string].new(
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(100),
)
var now = Moment.now()
- let startFut = manager.start(
+ await manager.start(
nowProvider = proc(): Moment =
now
)
+ defer:
+ await manager.stop()
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now)
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now)
@@ -153,27 +121,21 @@ suite "Queue RateLimitManager":
normal.len == 0
critical[0].msgId == "msg11"
- # Give it time to stop
- manager.stop()
- await sleepAsync(chronos.milliseconds(150))
- ## Then - should be stopped
- check:
- "running: false" in $manager
- startFut.finished()
-
asyncTest "enqueue - enqueue normal on 70% capacity":
## Given
- let manager = newRateLimitManager[string](
+ let manager = RateLimitManager[string].new(
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(100),
)
var now = Moment.now()
- let startFut = manager.start(
+ await manager.start(
nowProvider = proc(): Moment =
now
)
+ defer:
+ await manager.stop()
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now)
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now)
@@ -210,17 +172,9 @@ suite "Queue RateLimitManager":
normal[1].msgId == "msg9"
normal[2].msgId == "msg10"
- # Give it time to stop
- manager.stop()
- await sleepAsync(chronos.milliseconds(150))
- ## Then - should be stopped
- check:
- "running: false" in $manager
- startFut.finished()
-
asyncTest "enqueue - process queued messages":
## Given
- let manager = newRateLimitManager[string](
+ let manager = RateLimitManager[string].new(
mockSender,
capacity = 10,
duration = chronos.milliseconds(200),
@@ -231,10 +185,12 @@ suite "Queue RateLimitManager":
var now = Moment.now()
var nowRef = MomentRef(value: now)
- let startFut = manager.start(
+ await manager.start(
nowProvider = proc(): Moment =
return nowRef.value
)
+ defer:
+ await manager.stop()
let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now)
let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now)
@@ -301,11 +257,3 @@ suite "Queue RateLimitManager":
sentMessages[11].msgId == "8"
sentMessages[12].msgId == "9"
sentMessages[13].msgId == "10"
-
- # Give it time to stop
- manager.stop()
- await sleepAsync(chronos.milliseconds(250))
- ## Then - should be stopped
- check:
- "running: false" in $manager
- startFut.finished()