From 1d8ad697c272e66a7324b0a79c2e89aa5634a83d Mon Sep 17 00:00:00 2001
From: pablo
Date: Sun, 6 Jul 2025 13:35:33 +0300
Subject: [PATCH] fix: usage of nwaku's token_bucket
---
chat_sdk.nimble | 7 +-
ratelimit/ratelimit.nim | 327 ++++++++++++-------------
tests/test_ratelimit.nim | 499 ++++++++++++++++++++++-----------------
3 files changed, 455 insertions(+), 378 deletions(-)
diff --git a/chat_sdk.nimble b/chat_sdk.nimble
index 186ec7b..cd2c463 100644
--- a/chat_sdk.nimble
+++ b/chat_sdk.nimble
@@ -12,10 +12,11 @@ srcDir = "src"
requires "nim >= 2.2.4",
"chronicles",
"chronos",
- "db_connector"
+ "db_connector",
+ "waku"
task buildSharedLib, "Build shared library for C bindings":
- exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
+ exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
task buildStaticLib, "Build static library for C bindings":
- exec "nim c --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim"
+ exec "nim c --mm:refc --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim"
diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim
index d634d82..9bd1bf6 100644
--- a/ratelimit/ratelimit.nim
+++ b/ratelimit/ratelimit.nim
@@ -1,183 +1,192 @@
-import std/[times, deques]
+import std/[times, deques, options]
+# import ./token_bucket
+import waku/common/rate_limit/token_bucket
import chronos
type
- Serializable* = concept x
- x.toBytes() is seq[byte]
+ CapacityState = enum
+ Normal = 0
+ AlmostNone = 1
+ None = 2
- MessagePriority* = enum
+ SendResult* = enum
+ PassedToSender = 0
+ Enqueued = 1
+ Dropped = 2
+ DroppedBatchTooLarge = 3
+
+ Priority* = enum
Critical = 0
Normal = 1
Optional = 2
- QueuedMessage*[T: Serializable] = ref object of RootObj
- messageId*: string
- msg*: T
- priority*: MessagePriority
- queuedAt*: float
+ Serializable* =
+ concept x
+ x.toBytes() is seq[byte]
- MessageSender*[T: Serializable] = proc(messageId: string, msg: T): Future[bool] {.async.}
+ MessageSender*[T: Serializable] =
+ proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.}
RateLimitManager*[T: Serializable] = ref object
- messageCount*: int = 100 # Default to 100 messages
- epochDurationSec*: int = 600 # Default to 10 minutes
- currentCount*: int
- lastResetTime*: float
- criticalQueue*: Deque[QueuedMessage[T]]
- normalQueue*: Deque[QueuedMessage[T]]
- optionalQueue*: Deque[QueuedMessage[T]]
- sender*: MessageSender[T]
- isRunning*: bool
- sendTask*: Future[void]
+ bucket: TokenBucket
+ sender: MessageSender[T]
+ running: bool
+ queueCritical: Deque[seq[tuple[msgId: string, msg: T]]]
+ queueNormal: Deque[seq[tuple[msgId: string, msg: T]]]
+ sleepDuration: chronos.Duration
-proc newRateLimitManager*[T: Serializable](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] =
+proc newRateLimitManager*[T: Serializable](
+ sender: MessageSender[T],
+ capacity: int = 100,
+ duration: chronos.Duration = chronos.minutes(10),
+ sleepDuration: chronos.Duration = chronos.milliseconds(1000),
+): RateLimitManager[T] =
RateLimitManager[T](
- messageCount: messageCount,
- epochDurationSec: epochDurationSec,
- currentCount: 0,
- lastResetTime: epochTime(),
- criticalQueue: initDeque[QueuedMessage[T]](),
- normalQueue: initDeque[QueuedMessage[T]](),
- optionalQueue: initDeque[QueuedMessage[T]](),
+ bucket: TokenBucket.newStrict(capacity, duration),
sender: sender,
- isRunning: false
+ running: false,
+ queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](),
+ queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](),
+ sleepDuration: sleepDuration,
)
-proc updateEpochIfNeeded[T: Serializable](manager: RateLimitManager[T]) =
- let now = epochTime()
- if now - manager.lastResetTime >= float(manager.epochDurationSec):
- manager.lastResetTime = now
- manager.currentCount = 0
-
-proc getUsagePercent[T: Serializable](manager: RateLimitManager[T]): float =
- if manager.messageCount == 0:
- return 1.0
- float(manager.currentCount) / float(manager.messageCount)
-
-proc queueForSend*[T: Serializable](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) =
- manager.updateEpochIfNeeded()
-
- let queuedMsg = QueuedMessage[T](
- messageId: messageId,
- msg: msg,
- priority: priority,
- queuedAt: epochTime()
- )
-
- let usagePercent = manager.getUsagePercent()
-
- if usagePercent >= 1.0:
- # Quota exhausted - queue critical on top, queue normal, drop optional
- case priority:
- of Critical:
- manager.criticalQueue.addLast(queuedMsg)
- of Normal:
- manager.normalQueue.addLast(queuedMsg)
- of Optional:
- discard # Drop optional messages when quota exhausted
- elif usagePercent >= 0.7:
- # Low quota - send critical, queue normal, drop optional
- case priority:
- of Critical:
- manager.criticalQueue.addLast(queuedMsg)
- of Normal:
- manager.normalQueue.addLast(queuedMsg)
- of Optional:
- discard # Drop optional messages when quota low
+proc getCapacityState[T: Serializable](
+ manager: RateLimitManager[T], now: Moment, count: int = 1
+): CapacityState =
+ let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now)
+ let countAfter = budget - count
+ let ratio = countAfter.float / budgetCap.float
+ if ratio < 0.0:
+ return CapacityState.None
+ elif ratio < 0.3:
+ return CapacityState.AlmostNone
else:
- # Normal operation - queue all messages
- case priority:
- of Critical:
- manager.criticalQueue.addLast(queuedMsg)
- of Normal:
- manager.normalQueue.addLast(queuedMsg)
- of Optional:
- manager.optionalQueue.addLast(queuedMsg)
+ return CapacityState.Normal
-proc getNextMessage[T: Serializable](manager: RateLimitManager[T]): QueuedMessage[T] =
- # Priority order: Critical -> Normal -> Optional
- if manager.criticalQueue.len > 0:
- return manager.criticalQueue.popFirst()
- elif manager.normalQueue.len > 0:
- return manager.normalQueue.popFirst()
- elif manager.optionalQueue.len > 0:
- return manager.optionalQueue.popFirst()
- else:
- raise newException(ValueError, "No messages in queue")
+proc passToSender[T: Serializable](
+ manager: RateLimitManager[T],
+ msgs: seq[tuple[msgId: string, msg: T]],
+ now: Moment,
+ priority: Priority,
+): Future[SendResult] {.async.} =
+ let count = msgs.len
+ let capacity = manager.bucket.tryConsume(count, now)
+ if not capacity:
+ case priority
+ of Priority.Critical:
+ manager.queueCritical.addLast(msgs)
+ return SendResult.Enqueued
+ of Priority.Normal:
+ manager.queueNormal.addLast(msgs)
+ return SendResult.Enqueued
+ of Priority.Optional:
+ return SendResult.Dropped
+ await manager.sender(msgs)
+ return SendResult.PassedToSender
-proc hasMessages[T: Serializable](manager: RateLimitManager[T]): bool =
- manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0
-
-proc sendLoop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} =
- manager.isRunning = true
-
- while manager.isRunning:
- try:
- manager.updateEpochIfNeeded()
-
- while manager.hasMessages() and manager.currentCount < manager.messageCount:
- let msg = manager.getNextMessage()
-
- try:
- let sent = await manager.sender(msg.messageId, msg.msg)
- if sent:
- manager.currentCount += 1
- else:
- # Re-queue failed message at beginning of appropriate queue
- case msg.priority:
- of Critical:
- manager.criticalQueue.addFirst(msg)
- of Normal:
- manager.normalQueue.addFirst(msg)
- of Optional:
- manager.optionalQueue.addFirst(msg)
- break # Stop trying to send more messages if one fails
- except:
- # Re-queue on exception
- case msg.priority:
- of Critical:
- manager.criticalQueue.addFirst(msg)
- of Normal:
- manager.normalQueue.addFirst(msg)
- of Optional:
- manager.optionalQueue.addFirst(msg)
- break
-
- # Small delay between messages TODO not sure if needed
- await sleepAsync(chronos.milliseconds(10))
-
- # Wait before next processing cycle TODO not completely sure if this is the way
- await sleepAsync(chronos.milliseconds(100))
- except CancelledError:
+proc processCriticalQueue[T: Serializable](
+ manager: RateLimitManager[T], now: Moment
+): Future[void] {.async.} =
+ while manager.queueCritical.len > 0:
+ let msgs = manager.queueCritical.popFirst()
+ let capacityState = manager.getCapacityState(now, msgs.len)
+ if capacityState == CapacityState.Normal:
+ discard await manager.passToSender(msgs, now, Priority.Critical)
+ elif capacityState == CapacityState.AlmostNone:
+ discard await manager.passToSender(msgs, now, Priority.Critical)
+ else:
+ # add back to critical queue
+ manager.queueCritical.addFirst(msgs)
break
- except:
- await sleepAsync(chronos.seconds(1)) # Wait longer on error
-proc start*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} =
- if not manager.isRunning:
- manager.sendTask = manager.sendLoop()
+proc processNormalQueue[T: Serializable](
+ manager: RateLimitManager[T], now: Moment
+): Future[void] {.async.} =
+ while manager.queueNormal.len > 0:
+ let msgs = manager.queueNormal.popFirst()
+ let capacityState = manager.getCapacityState(now, msgs.len)
+ if capacityState == CapacityState.Normal:
+ discard await manager.passToSender(msgs, now, Priority.Normal)
+ else:
+ # add back to critical queue
+ manager.queueNormal.addFirst(msgs)
+ break
-proc stop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} =
- if manager.isRunning:
- manager.isRunning = false
- if not manager.sendTask.isNil:
- manager.sendTask.cancelSoon()
- try:
- await manager.sendTask
- except CancelledError:
- discard
+proc sendOrEnqueue*[T: Serializable](
+ manager: RateLimitManager[T],
+ msgs: seq[tuple[msgId: string, msg: T]],
+ priority: Priority,
+ now: Moment = Moment.now(),
+): Future[SendResult] {.async.} =
+ let (_, budgetCap) = manager.bucket.getAvailableCapacity(now)
+ if msgs.len.float / budgetCap.float >= 0.3:
+ # drop batch if it's too large to avoid starvation
+ return SendResult.DroppedBatchTooLarge
-proc getQueueStatus*[T: Serializable](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] =
- (
- critical: manager.criticalQueue.len,
- normal: manager.normalQueue.len,
- optional: manager.optionalQueue.len,
- total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len
- )
+ let capacityState = manager.getCapacityState(now, msgs.len)
+ case capacityState
+ of CapacityState.Normal:
+ return await manager.passToSender(msgs, now, priority)
+ of CapacityState.AlmostNone:
+ case priority
+ of Priority.Critical:
+ return await manager.passToSender(msgs, now, priority)
+ of Priority.Normal:
+ manager.queueNormal.addLast(msgs)
+ return SendResult.Enqueued
+ of Priority.Optional:
+ return SendResult.Dropped
+ of CapacityState.None:
+ case priority
+ of Priority.Critical:
+ manager.queueCritical.addLast(msgs)
+ return SendResult.Enqueued
+ of Priority.Normal:
+ manager.queueNormal.addLast(msgs)
+ return SendResult.Enqueued
+ of Priority.Optional:
+ return SendResult.Dropped
-proc getCurrentQuota*[T: Serializable](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] =
- (
- total: manager.messageCount,
- used: manager.currentCount,
- remaining: max(0, manager.messageCount - manager.currentCount)
- )
\ No newline at end of file
+proc getEnqueued*[T: Serializable](
+ manager: RateLimitManager[T]
+): tuple[
+ critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]]
+] =
+ var criticalMsgs: seq[tuple[msgId: string, msg: T]]
+ var normalMsgs: seq[tuple[msgId: string, msg: T]]
+
+ for batch in manager.queueCritical:
+ criticalMsgs.add(batch)
+
+ for batch in manager.queueNormal:
+ normalMsgs.add(batch)
+
+ return (criticalMsgs, normalMsgs)
+
+proc start*[T: Serializable](
+ manager: RateLimitManager[T],
+ nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
+ Moment.now(),
+): Future[void] {.async.} =
+ manager.running = true
+ while manager.running:
+ try:
+ let now = nowProvider()
+ await manager.processCriticalQueue(now)
+ await manager.processNormalQueue(now)
+
+ # configurable sleep duration for processing queued messages
+ await sleepAsync(manager.sleepDuration)
+ except Exception as e:
+ echo "Error in queue processing: ", e.msg
+ await sleepAsync(manager.sleepDuration)
+
+proc stop*[T: Serializable](manager: RateLimitManager[T]) =
+ manager.running = false
+
+func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} =
+ if isNil(b):
+ return "nil"
+ return
+ "RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len &
+ ", normal: " & $b.queueNormal.len & ")"
diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim
index e5f7e5e..f38ddbf 100644
--- a/tests/test_ratelimit.nim
+++ b/tests/test_ratelimit.nim
@@ -1,244 +1,311 @@
{.used.}
-import std/[sequtils, times, random], testutils/unittests
+import testutils/unittests
import ../ratelimit/ratelimit
import chronos
+import strutils
-randomize()
-
-# Test message types
-type
- TestMessage = object
- content: string
- id: int
-
-# Implement Serializable for test types
+# Implement the Serializable concept for string
proc toBytes*(s: string): seq[byte] =
cast[seq[byte]](s)
-proc toBytes*(msg: TestMessage): seq[byte] =
- result = toBytes(msg.content)
- result.add(cast[seq[byte]](@[byte(msg.id)]))
-
-proc toBytes*(i: int): seq[byte] =
- cast[seq[byte]](@[byte(i)])
-
-suite "Rate Limit Manager":
+suite "Queue RateLimitManager":
setup:
- ## Given
- let epochDuration = 60
- var sentMessages: seq[string]
-
- proc testSender(messageId: string, msg: string): Future[bool] {.async.} =
- sentMessages.add(msg)
- await sleepAsync(chronos.milliseconds(10))
- return true
+ var sentMessages: seq[tuple[msgId: string, msg: string]]
+ var senderCallCount: int = 0
- asyncTest "basic message queueing and sending":
+ # Create a mock sender
+ proc mockSender(
+ msgs: seq[tuple[msgId: string, msg: string]]
+ ): Future[void] {.async.} =
+ senderCallCount.inc()
+ for msg in msgs:
+ sentMessages.add(msg)
+ await sleepAsync(chronos.milliseconds(10))
+
+ asyncTest "sendOrEnqueue - immediate send when capacity available":
## Given
- let capacity = 10
- var manager = newRateLimitManager(capacity, epochDuration, testSender)
- await manager.start()
-
+ let manager = newRateLimitManager[string](
+ mockSender, capacity = 10, duration = chronos.milliseconds(100)
+ )
+ let testMsg = "Hello World"
+
## When
- manager.queueForSend("msg1", "Hello", Critical)
- manager.queueForSend("msg2", "World", Normal)
-
- await sleepAsync(chronos.milliseconds(200))
- await manager.stop()
-
+ let res = await manager.sendOrEnqueue(@[("msg1", testMsg)], Critical)
+
## Then
check:
+ res == PassedToSender
+ senderCallCount == 1
+ sentMessages.len == 1
+ sentMessages[0].msgId == "msg1"
+ sentMessages[0].msg == "Hello World"
+
+ asyncTest "sendOrEnqueue - multiple messages":
+ ## Given
+ let manager = newRateLimitManager[string](
+ mockSender, capacity = 10, duration = chronos.milliseconds(100)
+ )
+
+ ## When
+ let res =
+ await manager.sendOrEnqueue(@[("msg1", "First"), ("msg2", "Second")], Normal)
+
+ ## Then
+ check:
+ res == PassedToSender
+ senderCallCount == 1
sentMessages.len == 2
- sentMessages[0] == "Hello"
- sentMessages[1] == "World"
+ sentMessages[0].msgId == "msg1"
+ sentMessages[0].msg == "First"
+ sentMessages[1].msgId == "msg2"
+ sentMessages[1].msg == "Second"
- asyncTest "priority ordering - critical first, then normal, then optional":
+ asyncTest "start and stop - basic functionality":
## Given
- let capacity = 10
- var manager = newRateLimitManager(capacity, epochDuration, testSender)
- await manager.start()
-
- ## When - queue messages in mixed priority order
- manager.queueForSend("normal1", "Normal1", Normal)
- manager.queueForSend("critical1", "Critical1", Critical)
- manager.queueForSend("optional1", "Optional1", Optional)
- manager.queueForSend("critical2", "Critical2", Critical)
- manager.queueForSend("normal2", "Normal2", Normal)
-
- await sleepAsync(chronos.milliseconds(300))
- await manager.stop()
-
- ## Then - critical messages should be sent first
- check:
- sentMessages.len == 5
- sentMessages[0] == "Critical1" # First critical
- sentMessages[1] == "Critical2" # Second critical
- sentMessages[2] == "Normal1" # Then normal messages
- sentMessages[3] == "Normal2"
- sentMessages[4] == "Optional1" # Finally optional
+ let manager = newRateLimitManager[string](
+ mockSender,
+ capacity = 10,
+ duration = chronos.milliseconds(100),
+ sleepDuration = chronos.milliseconds(50),
+ )
- asyncTest "rate limiting - respects message quota per epoch":
- ## Given
- let capacity = 3 # Only 3 messages allowed
- var manager = newRateLimitManager(capacity, epochDuration, testSender)
- await manager.start()
-
- ## When - queue more messages than the limit
- for i in 1..5:
- manager.queueForSend("msg" & $i, "Message" & $i, Normal)
-
- await sleepAsync(chronos.milliseconds(300))
-
- ## Then
- let quota = manager.getCurrentQuota()
- let queueStatus = manager.getQueueStatus()
-
- check:
- quota.used == 3
- quota.remaining == 0
- sentMessages.len == 3
- queueStatus.total == 2 # 2 messages should be queued
-
- await manager.stop()
+ ## When - start the manager
+ let startFut = manager.start()
- asyncTest "optional message dropping at high usage":
- ## Given
- let capacity = 10
- var manager = newRateLimitManager(capacity, epochDuration, testSender)
- await manager.start()
-
- # Fill to 80% capacity to trigger optional dropping
- for i in 1..8:
- manager.queueForSend("fill" & $i, "Fill" & $i, Normal)
-
- await sleepAsync(chronos.milliseconds(200))
-
- ## When - add messages at high usage
- manager.queueForSend("critical", "Critical", Critical)
- manager.queueForSend("normal", "Normal", Normal)
- manager.queueForSend("optional", "Optional", Optional) # Should be dropped
-
- await sleepAsync(chronos.milliseconds(200))
-
- ## Then
- let quota = manager.getCurrentQuota()
- let optionalSent = sentMessages.anyIt(it == "Optional")
-
- check:
- quota.used == 10
- not optionalSent # Optional message should not be sent
-
- await manager.stop()
+ # Give it some time to start
+ await sleepAsync(chronos.milliseconds(20))
- asyncTest "quota tracking - accurate total, used, and remaining counts":
- ## Given
- let capacity = 5
- var manager = newRateLimitManager(capacity, epochDuration, testSender)
- await manager.start()
-
- ## When - initially no messages sent
- let initialQuota = manager.getCurrentQuota()
-
- ## Then - should show full quota available
+ ## Then - manager should be running (check via string representation)
check:
- initialQuota.total == 5
- initialQuota.used == 0
- initialQuota.remaining == 5
-
- ## When - send some messages
- manager.queueForSend("msg1", "Message1", Normal)
- manager.queueForSend("msg2", "Message2", Normal)
-
- await sleepAsync(chronos.milliseconds(200))
-
- ## Then - quota should be updated
- let afterQuota = manager.getCurrentQuota()
- check:
- afterQuota.used == 2
- afterQuota.remaining == 3
-
- await manager.stop()
+ "running: true" in $manager
+ not startFut.finished() # should still be running
- asyncTest "queue status tracking - by priority levels":
- ## Given
- let capacity = 2 # Small limit to force queueing
- var manager = newRateLimitManager(capacity, epochDuration, testSender)
- await manager.start()
-
- ## When - fill quota and add more messages
- manager.queueForSend("msg1", "Message1", Critical)
- manager.queueForSend("msg2", "Message2", Normal)
- manager.queueForSend("msg3", "Message3", Critical) # Should be queued
- manager.queueForSend("msg4", "Message4", Normal) # Should be queued
- manager.queueForSend("msg5", "Message5", Optional) # Should be queued
-
- await sleepAsync(chronos.milliseconds(100))
-
- ## Then
- let queueStatus = manager.getQueueStatus()
-
- check:
- queueStatus.total >= 2 # At least some messages queued
- queueStatus.critical >= 1 # Critical messages in queue
- queueStatus.normal >= 1 # Normal messages in queue
-
- await manager.stop()
+ ## When - stop the manager
+ manager.stop()
-suite "Generic Type Support":
- setup:
- ## Given
- let epochDuration = 60
- var sentCustomMessages: seq[TestMessage]
-
- proc testCustomSender(messageId: string, msg: TestMessage): Future[bool] {.async.} =
- sentCustomMessages.add(msg)
- await sleepAsync(chronos.milliseconds(10))
- return true
+ # Give it time to stop (should happen within one sleep cycle)
+ await sleepAsync(chronos.milliseconds(60))
- asyncTest "custom message types - TestMessage objects":
- ## Given
- let capacity = 5
- var manager = newRateLimitManager(capacity, epochDuration, testCustomSender)
- await manager.start()
-
- ## When
- let testMsg = TestMessage(content: "Test content", id: 42)
- manager.queueForSend("custom1", testMsg, Normal)
-
- await sleepAsync(chronos.milliseconds(200))
- await manager.stop()
-
- ## Then
+ ## Then - manager should be stopped
check:
- sentCustomMessages.len == 1
- sentCustomMessages[0].content == "Test content"
- sentCustomMessages[0].id == 42
+ "running: false" in $manager
+ startFut.finished() # should have completed
- asyncTest "integer message types":
+ asyncTest "start and stop - drop large batch":
## Given
- let capacity = 5
- var sentInts: seq[int]
-
- proc testIntSender(messageId: string, msg: int): Future[bool] {.async.} =
- sentInts.add(msg)
- await sleepAsync(chronos.milliseconds(10))
- return true
-
- var manager = newRateLimitManager(capacity, epochDuration, testIntSender)
- await manager.start()
-
- ## When
- manager.queueForSend("int1", 42, Critical)
- manager.queueForSend("int2", 100, Normal)
- manager.queueForSend("int3", 999, Optional)
-
- await sleepAsync(chronos.milliseconds(200))
- await manager.stop()
-
- ## Then
+ let manager = newRateLimitManager[string](
+ mockSender,
+ capacity = 2,
+ duration = chronos.milliseconds(100),
+ sleepDuration = chronos.milliseconds(30),
+ )
+
+ let largeBatch1 = @[("msg1", "First"), ("msg2", "Second"), ("msg3", "Third")]
+ let largeBatch2 = @[("msg4", "Fourth"), ("msg5", "Fifth")]
+
+ discard await manager.sendOrEnqueue(largeBatch1, Normal)
+ discard await manager.sendOrEnqueue(largeBatch2, Critical)
+
+ asyncTest "enqueue - enqueue critical only when exceeded":
+ ## Given
+ let manager = newRateLimitManager[string](
+ mockSender,
+ capacity = 10,
+ duration = chronos.milliseconds(100),
+ sleepDuration = chronos.milliseconds(100),
+ )
+ var now = Moment.now()
+ let startFut = manager.start(
+ nowProvider = proc(): Moment =
+ now
+ )
+
+ let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now)
+ let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now)
+ let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Critical, now)
+ let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Critical, now)
+ let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Critical, now)
+ let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Critical, now)
+ let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Critical, now)
+ let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Critical, now)
+ let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Critical, now)
+ let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Critical, now)
+ # will enqueue to critical queue
+ let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
+
check:
- sentInts.len == 3
- sentInts[0] == 42 # Critical sent first
- sentInts[1] == 100 # Normal sent second
- sentInts[2] == 999 # Optional sent last
\ No newline at end of file
+ r1 == PassedToSender
+ r2 == PassedToSender
+ r3 == PassedToSender
+ r4 == PassedToSender
+ r5 == PassedToSender
+ r6 == PassedToSender
+ r7 == PassedToSender
+ r8 == PassedToSender
+ r9 == PassedToSender
+ r10 == PassedToSender
+ r11 == Enqueued
+
+ let (critical, normal) = manager.getEnqueued()
+ check:
+ critical.len == 1
+ normal.len == 0
+ critical[0].msgId == "msg11"
+
+ # Give it time to stop
+ manager.stop()
+ await sleepAsync(chronos.milliseconds(150))
+ ## Then - should be stopped
+ check:
+ "running: false" in $manager
+ startFut.finished()
+
+ asyncTest "enqueue - enqueue normal on 70% capacity":
+ ## Given
+ let manager = newRateLimitManager[string](
+ mockSender,
+ capacity = 10,
+ duration = chronos.milliseconds(100),
+ sleepDuration = chronos.milliseconds(100),
+ )
+ var now = Moment.now()
+ let startFut = manager.start(
+ nowProvider = proc(): Moment =
+ now
+ )
+
+ let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now)
+ let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now)
+ let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Normal, now)
+ let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Normal, now)
+ let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Normal, now)
+ let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Normal, now)
+ let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Normal, now)
+ let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Normal, now)
+ let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Normal, now)
+ let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Normal, now)
+ let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
+ let r12 = await manager.sendOrEnqueue(@[("msg12", "12")], Optional, now)
+
+ check:
+ r1 == PassedToSender
+ r2 == PassedToSender
+ r3 == PassedToSender
+ r4 == PassedToSender
+ r5 == PassedToSender
+ r6 == PassedToSender
+ r7 == PassedToSender
+ r8 == Enqueued
+ r9 == Enqueued
+ r10 == Enqueued
+ r11 == PassedToSender
+ r12 == Dropped
+
+ let (critical, normal) = manager.getEnqueued()
+ check:
+ critical.len == 0
+ normal.len == 3
+ normal[0].msgId == "msg8"
+ normal[1].msgId == "msg9"
+ normal[2].msgId == "msg10"
+
+ # Give it time to stop
+ manager.stop()
+ await sleepAsync(chronos.milliseconds(150))
+ ## Then - should be stopped
+ check:
+ "running: false" in $manager
+ startFut.finished()
+
+ asyncTest "enqueue - process queued messages":
+ ## Given
+ let manager = newRateLimitManager[string](
+ mockSender,
+ capacity = 10,
+ duration = chronos.milliseconds(200),
+ sleepDuration = chronos.milliseconds(200),
+ )
+ type MomentRef = ref object
+ value: Moment
+
+ var now = Moment.now()
+ var nowRef = MomentRef(value: now)
+ let startFut = manager.start(
+ nowProvider = proc(): Moment =
+ return nowRef.value
+ )
+
+ let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now)
+ let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now)
+ let r3 = await manager.sendOrEnqueue(@[("3", "val_3")], Normal, now)
+ let r4 = await manager.sendOrEnqueue(@[("4", "val_4")], Normal, now)
+ let r5 = await manager.sendOrEnqueue(@[("5", "val_5")], Normal, now)
+ let r6 = await manager.sendOrEnqueue(@[("6", "val_6")], Normal, now)
+ let r7 = await manager.sendOrEnqueue(@[("7", "val_7")], Normal, now)
+ let r8 = await manager.sendOrEnqueue(@[("8", "val_8")], Normal, now)
+ let r9 = await manager.sendOrEnqueue(@[("9", "val_9")], Normal, now)
+ let r10 = await manager.sendOrEnqueue(@[("10", "val_10")], Normal, now)
+ let r11 = await manager.sendOrEnqueue(@[("11", "val_11")], Critical, now)
+ let r12 = await manager.sendOrEnqueue(@[("12", "val_12")], Optional, now)
+ let r13 = await manager.sendOrEnqueue(@[("13", "val_13")], Critical, now)
+ let r14 = await manager.sendOrEnqueue(@[("14", "val_14")], Critical, now)
+ let r15 = await manager.sendOrEnqueue(@[("15", "val_15")], Critical, now)
+
+ check:
+ r1 == PassedToSender
+ r2 == PassedToSender
+ r3 == PassedToSender
+ r4 == PassedToSender
+ r5 == PassedToSender
+ r6 == PassedToSender
+ r7 == PassedToSender
+ r8 == Enqueued
+ r9 == Enqueued
+ r10 == Enqueued
+ r11 == PassedToSender
+ r12 == Dropped
+ r13 == PassedToSender
+ r14 == PassedToSender
+ r15 == Enqueued
+
+ var (critical, normal) = manager.getEnqueued()
+ check:
+ critical.len == 1
+ normal.len == 3
+ normal[0].msgId == "8"
+ normal[1].msgId == "9"
+ normal[2].msgId == "10"
+ critical[0].msgId == "15"
+
+ nowRef.value = now + chronos.milliseconds(250)
+ await sleepAsync(chronos.milliseconds(250))
+
+ (critical, normal) = manager.getEnqueued()
+ check:
+ critical.len == 0
+ normal.len == 0
+ senderCallCount == 14
+ sentMessages.len == 14
+ sentMessages[0].msgId == "1"
+ sentMessages[1].msgId == "2"
+ sentMessages[2].msgId == "3"
+ sentMessages[3].msgId == "4"
+ sentMessages[4].msgId == "5"
+ sentMessages[5].msgId == "6"
+ sentMessages[6].msgId == "7"
+ sentMessages[7].msgId == "11"
+ sentMessages[8].msgId == "13"
+ sentMessages[9].msgId == "14"
+ sentMessages[10].msgId == "15"
+ sentMessages[11].msgId == "8"
+ sentMessages[12].msgId == "9"
+ sentMessages[13].msgId == "10"
+
+ # Give it time to stop
+ manager.stop()
+ await sleepAsync(chronos.milliseconds(250))
+ ## Then - should be stopped
+ check:
+ "running: false" in $manager
+ startFut.finished()