fix: pr comments

This commit is contained in:
pablo 2025-07-16 18:48:30 +03:00
parent b9e80bcc37
commit 6b514f6f21
No known key found for this signature in database
GPG Key ID: 78F35FCC60FDC63A
2 changed files with 47 additions and 95 deletions

View File

@ -1,5 +1,4 @@
import std/[times, deques, options] import std/[times, deques, options]
# import ./token_bucket
import waku/common/rate_limit/token_bucket import waku/common/rate_limit/token_bucket
import chronos import chronos
@ -19,34 +18,35 @@ type
Critical = 0 Critical = 0
Normal = 1 Normal = 1
Optional = 2 Optional = 2
MsgIdMsg = tuple[msgId: string, msg: T]
Serializable* = Serializable* =
concept x concept x
x.toBytes() is seq[byte] x.toBytes() is seq[byte]
MessageSender*[T: Serializable] = MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T]
proc(msgs: seq[tuple[msgId: string, msg: T]]) {.async.}
MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
RateLimitManager*[T: Serializable] = ref object RateLimitManager*[T: Serializable] = ref object
bucket: TokenBucket bucket: TokenBucket
sender: MessageSender[T] sender: MessageSender[T]
running: bool queueCritical: Deque[seq[MsgIdMsg[T]]]
queueCritical: Deque[tuple[msgId: string, msg: T]] queueNormal: Deque[seq[MsgIdMsg[T]]]
queueNormal: Deque[seq[tuple[msgId: string, msg: T]]]
sleepDuration: chronos.Duration sleepDuration: chronos.Duration
pxQueueHandleLoop: Future[void]
proc new*[T: Serializable]( proc new*[T: Serializable](
M: type[RateLimitManager[T]],
sender: MessageSender[T], sender: MessageSender[T],
capacity: int = 100, capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10), duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000), sleepDuration: chronos.Duration = chronos.milliseconds(1000),
): RateLimitManager[T] = ): M =
RateLimitManager[T]( M(
bucket: TokenBucket.newStrict(capacity, duration), bucket: TokenBucket.newStrict(capacity, duration),
sender: sender, sender: sender,
running: false, queueCritical: Deque[seq[MsgIdMsg[T]]](),
queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](), queueNormal: Deque[seq[MsgIdMsg[T]]](),
queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](),
sleepDuration: sleepDuration, sleepDuration: sleepDuration,
) )
@ -65,7 +65,7 @@ proc getCapacityState[T: Serializable](
proc passToSender[T: Serializable]( proc passToSender[T: Serializable](
manager: RateLimitManager[T], manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]], msgs: seq[MsgIdMsg[T]],
now: Moment, now: Moment,
priority: Priority, priority: Priority,
): Future[SendResult] {.async.} = ): Future[SendResult] {.async.} =
@ -114,7 +114,7 @@ proc processNormalQueue[T: Serializable](
proc sendOrEnqueue*[T: Serializable]( proc sendOrEnqueue*[T: Serializable](
manager: RateLimitManager[T], manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]], msgs: seq[MsgIdMsg[T]],
priority: Priority, priority: Priority,
now: Moment = Moment.now(), now: Moment = Moment.now(),
): Future[SendResult] {.async.} = ): Future[SendResult] {.async.} =
@ -149,11 +149,9 @@ proc sendOrEnqueue*[T: Serializable](
proc getEnqueued*[T: Serializable]( proc getEnqueued*[T: Serializable](
manager: RateLimitManager[T] manager: RateLimitManager[T]
): tuple[ ): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] =
critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] var criticalMsgs: seq[MsgIdMsg[T]]
] = var normalMsgs: seq[MsgIdMsg[T]]
var criticalMsgs: seq[tuple[msgId: string, msg: T]]
var normalMsgs: seq[tuple[msgId: string, msg: T]]
for batch in manager.queueCritical: for batch in manager.queueCritical:
criticalMsgs.add(batch) criticalMsgs.add(batch)
@ -163,30 +161,36 @@ proc getEnqueued*[T: Serializable](
return (criticalMsgs, normalMsgs) return (criticalMsgs, normalMsgs)
proc start*[T: Serializable]( proc startQueueHandleLoop*[T: Serializable](
manager: RateLimitManager[T], manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(), Moment.now(),
) {.async.} = ) {.async.} =
manager.running = true
while true: while true:
try: try:
let now = nowProvider() let now = nowProvider()
await manager.processCriticalQueue(now) await manager.processCriticalQueue(now)
await manager.processNormalQueue(now) await manager.processNormalQueue(now)
# configurable sleep duration for processing queued messages
except Exception as e: except Exception as e:
echo "Error in queue processing: ", e.msg echo "Error in queue processing: ", e.msg
# configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration) await sleepAsync(manager.sleepDuration)
proc stop*[T: Serializable](manager: RateLimitManager[T]) = proc start*[T: Serializable](
manager.running = false manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
) {.async.} =
manager.pxQueueHandleLoop = manager.startQueueHandleLoop(nowProvider)
proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} =
if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait()
func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} =
if isNil(b): if isNil(b):
return "nil" return "nil"
return return
"RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len & "RateLimitManager(critical: " & $b.queueCritical.len & ", normal: " &
", normal: " & $b.queueNormal.len & ")" $b.queueNormal.len & ")"

View File

@ -1,9 +1,6 @@
{.used.}
import testutils/unittests import testutils/unittests
import ../ratelimit/ratelimit import ../ratelimit/rate_limit_manager
import chronos import chronos
import strutils
# Implement the Serializable concept for string # Implement the Serializable concept for string
proc toBytes*(s: string): seq[byte] = proc toBytes*(s: string): seq[byte] =
@ -25,7 +22,7 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - immediate send when capacity available": asyncTest "sendOrEnqueue - immediate send when capacity available":
## Given ## Given
let manager = newRateLimitManager[string]( let manager = RateLimitManager[string].new(
mockSender, capacity = 10, duration = chronos.milliseconds(100) mockSender, capacity = 10, duration = chronos.milliseconds(100)
) )
let testMsg = "Hello World" let testMsg = "Hello World"
@ -43,7 +40,7 @@ suite "Queue RateLimitManager":
asyncTest "sendOrEnqueue - multiple messages": asyncTest "sendOrEnqueue - multiple messages":
## Given ## Given
let manager = newRateLimitManager[string]( let manager = RateLimitManager[string].new(
mockSender, capacity = 10, duration = chronos.milliseconds(100) mockSender, capacity = 10, duration = chronos.milliseconds(100)
) )
@ -61,40 +58,9 @@ suite "Queue RateLimitManager":
sentMessages[1].msgId == "msg2" sentMessages[1].msgId == "msg2"
sentMessages[1].msg == "Second" sentMessages[1].msg == "Second"
asyncTest "start and stop - basic functionality":
## Given
let manager = newRateLimitManager[string](
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(50),
)
## When - start the manager
let startFut = manager.start()
# Give it some time to start
await sleepAsync(chronos.milliseconds(20))
## Then - manager should be running (check via string representation)
check:
"running: true" in $manager
not startFut.finished() # should still be running
## When - stop the manager
manager.stop()
# Give it time to stop (should happen within one sleep cycle)
await sleepAsync(chronos.milliseconds(60))
## Then - manager should be stopped
check:
"running: false" in $manager
startFut.finished() # should have completed
asyncTest "start and stop - drop large batch": asyncTest "start and stop - drop large batch":
## Given ## Given
let manager = newRateLimitManager[string]( let manager = RateLimitManager[string].new(
mockSender, mockSender,
capacity = 2, capacity = 2,
duration = chronos.milliseconds(100), duration = chronos.milliseconds(100),
@ -109,17 +75,19 @@ suite "Queue RateLimitManager":
asyncTest "enqueue - enqueue critical only when exceeded": asyncTest "enqueue - enqueue critical only when exceeded":
## Given ## Given
let manager = newRateLimitManager[string]( let manager = RateLimitManager[string].new(
mockSender, mockSender,
capacity = 10, capacity = 10,
duration = chronos.milliseconds(100), duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(100), sleepDuration = chronos.milliseconds(100),
) )
var now = Moment.now() var now = Moment.now()
let startFut = manager.start( await manager.start(
nowProvider = proc(): Moment = nowProvider = proc(): Moment =
now now
) )
defer:
await manager.stop()
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now) let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now)
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now) let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now)
@ -153,27 +121,21 @@ suite "Queue RateLimitManager":
normal.len == 0 normal.len == 0
critical[0].msgId == "msg11" critical[0].msgId == "msg11"
# Give it time to stop
manager.stop()
await sleepAsync(chronos.milliseconds(150))
## Then - should be stopped
check:
"running: false" in $manager
startFut.finished()
asyncTest "enqueue - enqueue normal on 70% capacity": asyncTest "enqueue - enqueue normal on 70% capacity":
## Given ## Given
let manager = newRateLimitManager[string]( let manager = RateLimitManager[string].new(
mockSender, mockSender,
capacity = 10, capacity = 10,
duration = chronos.milliseconds(100), duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(100), sleepDuration = chronos.milliseconds(100),
) )
var now = Moment.now() var now = Moment.now()
let startFut = manager.start( await manager.start(
nowProvider = proc(): Moment = nowProvider = proc(): Moment =
now now
) )
defer:
await manager.stop()
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now) let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now)
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now) let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now)
@ -210,17 +172,9 @@ suite "Queue RateLimitManager":
normal[1].msgId == "msg9" normal[1].msgId == "msg9"
normal[2].msgId == "msg10" normal[2].msgId == "msg10"
# Give it time to stop
manager.stop()
await sleepAsync(chronos.milliseconds(150))
## Then - should be stopped
check:
"running: false" in $manager
startFut.finished()
asyncTest "enqueue - process queued messages": asyncTest "enqueue - process queued messages":
## Given ## Given
let manager = newRateLimitManager[string]( let manager = RateLimitManager[string].new(
mockSender, mockSender,
capacity = 10, capacity = 10,
duration = chronos.milliseconds(200), duration = chronos.milliseconds(200),
@ -231,10 +185,12 @@ suite "Queue RateLimitManager":
var now = Moment.now() var now = Moment.now()
var nowRef = MomentRef(value: now) var nowRef = MomentRef(value: now)
let startFut = manager.start( await manager.start(
nowProvider = proc(): Moment = nowProvider = proc(): Moment =
return nowRef.value return nowRef.value
) )
defer:
await manager.stop()
let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now) let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now)
let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now) let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now)
@ -301,11 +257,3 @@ suite "Queue RateLimitManager":
sentMessages[11].msgId == "8" sentMessages[11].msgId == "8"
sentMessages[12].msgId == "9" sentMessages[12].msgId == "9"
sentMessages[13].msgId == "10" sentMessages[13].msgId == "10"
# Give it time to stop
manager.stop()
await sleepAsync(chronos.milliseconds(250))
## Then - should be stopped
check:
"running: false" in $manager
startFut.finished()