From 527bdde50aaf34c545ddfeb64770a44bfb3b486e Mon Sep 17 00:00:00 2001 From: pablo Date: Mon, 23 Jun 2025 14:48:59 +0300 Subject: [PATCH 01/10] feat: rate limit (wip: no storage) --- .gitignore | 7 +- chat_sdk.nimble | 7 +- config.nims | 4 + ratelimit/ratelimit.nim | 183 ++++++++++++++++++++++++++++++ tests/test_ratelimit.nim | 233 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 431 insertions(+), 3 deletions(-) create mode 100644 config.nims create mode 100644 ratelimit/ratelimit.nim create mode 100644 tests/test_ratelimit.nim diff --git a/.gitignore b/.gitignore index dbf16ae..d71d057 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# IDE +.vscode/ + # Nim nimcache/ *.exe @@ -10,4 +13,6 @@ nimcache/ *.a # OS -.DS_Store \ No newline at end of file +.DS_Store +nimble.develop +nimble.paths diff --git a/chat_sdk.nimble b/chat_sdk.nimble index a64bb4f..186ec7b 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -7,9 +7,12 @@ license = "MIT" srcDir = "src" -# Dependencies -requires "nim >= 2.0.0" +### Dependencies +requires "nim >= 2.2.4", + "chronicles", + "chronos", + "db_connector" task buildSharedLib, "Build shared library for C bindings": exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" diff --git a/config.nims b/config.nims new file mode 100644 index 0000000..8ee48d2 --- /dev/null +++ b/config.nims @@ -0,0 +1,4 @@ +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim new file mode 100644 index 0000000..e2482e4 --- /dev/null +++ b/ratelimit/ratelimit.nim @@ -0,0 +1,183 @@ +import std/[times, deques] +import chronos + +type + MessagePriority* = enum + Optional = 0 + Normal = 1 + Critical = 2 + + QueuedMessage*[T] = object + messageId*: string + msg*: T + priority*: MessagePriority + queuedAt*: float + + MessageSender*[T] = proc(messageId: string, msg: T): Future[bool] {.async.} + + RateLimitManager*[T] = ref object + messageCount*: int = 100 # Default to 100 messages + epochDurationSec*: int = 600 # Default to 10 minutes + currentCount*: int + currentEpoch*: int64 + criticalQueue*: Deque[QueuedMessage[T]] + normalQueue*: Deque[QueuedMessage[T]] + optionalQueue*: Deque[QueuedMessage[T]] + sender*: MessageSender[T] + isRunning*: bool + sendTask*: Future[void] + +proc getCurrentEpoch(epochDurationSec: int): int64 = + int64(epochTime() / float(epochDurationSec)) + +proc newRateLimitManager*[T](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] = + RateLimitManager[T]( + messageCount: messageCount, + epochDurationSec: epochDurationSec, + currentCount: 0, + currentEpoch: getCurrentEpoch(epochDurationSec), + criticalQueue: initDeque[QueuedMessage[T]](), + normalQueue: initDeque[QueuedMessage[T]](), + optionalQueue: initDeque[QueuedMessage[T]](), + sender: sender, + isRunning: false + ) + +proc updateEpochIfNeeded[T](manager: RateLimitManager[T]) = + let newEpoch = getCurrentEpoch(manager.epochDurationSec) + if newEpoch > manager.currentEpoch: + manager.currentEpoch = newEpoch + manager.currentCount = 0 + +proc getUsagePercent[T](manager: RateLimitManager[T]): float = + if manager.messageCount == 0: + return 1.0 + float(manager.currentCount) / float(manager.messageCount) + +proc queueForSend*[T](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) = + manager.updateEpochIfNeeded() + + let queuedMsg = QueuedMessage[T]( + messageId: messageId, + msg: msg, + priority: priority, + queuedAt: epochTime() + ) + + let usagePercent = manager.getUsagePercent() + + if usagePercent >= 1.0: + # Quota exhausted - queue critical on top, queue normal, drop optional + case priority: + of Critical: + manager.criticalQueue.addLast(queuedMsg) + of Normal: + manager.normalQueue.addLast(queuedMsg) + of Optional: + discard # Drop optional messages when quota exhausted + elif usagePercent >= 0.7: + # Low quota - send critical, queue normal, drop optional + case priority: + of Critical: + manager.criticalQueue.addLast(queuedMsg) + of Normal: + manager.normalQueue.addLast(queuedMsg) + of Optional: + discard # Drop optional messages when quota low + else: + # Normal operation - queue all messages + case priority: + of Critical: + manager.criticalQueue.addLast(queuedMsg) + of Normal: + manager.normalQueue.addLast(queuedMsg) + of Optional: + manager.optionalQueue.addLast(queuedMsg) + +proc getNextMessage[T](manager: RateLimitManager[T]): QueuedMessage[T] = + # Priority order: Critical -> Normal -> Optional + if manager.criticalQueue.len > 0: + return manager.criticalQueue.popFirst() + elif manager.normalQueue.len > 0: + return manager.normalQueue.popFirst() + elif manager.optionalQueue.len > 0: + return manager.optionalQueue.popFirst() + else: + raise newException(ValueError, "No messages in queue") + +proc hasMessages[T](manager: RateLimitManager[T]): bool = + manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0 + +proc sendLoop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = + manager.isRunning = true + + while manager.isRunning: + try: + manager.updateEpochIfNeeded() + + while manager.hasMessages() and manager.currentCount < manager.messageCount: + let msg = manager.getNextMessage() + + try: + let sent = await manager.sender(msg.messageId, msg.msg) + if sent: + manager.currentCount += 1 + else: + # Re-queue failed message at beginning of appropriate queue + case msg.priority: + of Critical: + manager.criticalQueue.addFirst(msg) + of Normal: + manager.normalQueue.addFirst(msg) + of Optional: + manager.optionalQueue.addFirst(msg) + break # Stop trying to send more messages if one fails + except: + # Re-queue on exception + case msg.priority: + of Critical: + manager.criticalQueue.addFirst(msg) + of Normal: + manager.normalQueue.addFirst(msg) + of Optional: + manager.optionalQueue.addFirst(msg) + break + + # Small delay between messages TODO not sure if needed + await sleepAsync(chronos.milliseconds(10)) + + # Wait before next processing cycle TODO not completely sure if this is the way + await sleepAsync(chronos.milliseconds(100)) + except CancelledError: + break + except: + await sleepAsync(chronos.seconds(1)) # Wait longer on error + +proc start*[T](manager: RateLimitManager[T]): Future[void] {.async.} = + if not manager.isRunning: + manager.sendTask = manager.sendLoop() + +proc stop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = + if manager.isRunning: + manager.isRunning = false + if not manager.sendTask.isNil: + manager.sendTask.cancelSoon() + try: + await manager.sendTask + except CancelledError: + discard + +proc getQueueStatus*[T](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] = + ( + critical: manager.criticalQueue.len, + normal: manager.normalQueue.len, + optional: manager.optionalQueue.len, + total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len + ) + +proc getCurrentQuota*[T](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] = + ( + total: manager.messageCount, + used: manager.currentCount, + remaining: max(0, manager.messageCount - manager.currentCount) + ) \ No newline at end of file diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim new file mode 100644 index 0000000..0598b3f --- /dev/null +++ b/tests/test_ratelimit.nim @@ -0,0 +1,233 @@ +{.used.} + +import std/[sequtils, times, random], testutils/unittests +import ../ratelimit/ratelimit +import chronos + +randomize() + +# Test message types +type + TestMessage = object + content: string + id: int + +suite "Rate Limit Manager": + setup: + ## Given + let epochDuration = 60 + var sentMessages: seq[string] + + proc testSender(messageId: string, msg: string): Future[bool] {.async.} = + sentMessages.add(msg) + await sleepAsync(chronos.milliseconds(10)) + return true + + asyncTest "basic message queueing and sending": + ## Given + let capacity = 10 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When + manager.queueForSend("msg1", "Hello", Critical) + manager.queueForSend("msg2", "World", Normal) + + await sleepAsync(chronos.milliseconds(200)) + await manager.stop() + + ## Then + check: + sentMessages.len == 2 + sentMessages[0] == "Hello" + sentMessages[1] == "World" + + asyncTest "priority ordering - critical first, then normal, then optional": + ## Given + let capacity = 10 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - queue messages in mixed priority order + manager.queueForSend("normal1", "Normal1", Normal) + manager.queueForSend("critical1", "Critical1", Critical) + manager.queueForSend("optional1", "Optional1", Optional) + manager.queueForSend("critical2", "Critical2", Critical) + manager.queueForSend("normal2", "Normal2", Normal) + + await sleepAsync(chronos.milliseconds(300)) + await manager.stop() + + ## Then - critical messages should be sent first + check: + sentMessages.len == 5 + sentMessages[0] == "Critical1" # First critical + sentMessages[1] == "Critical2" # Second critical + sentMessages[2] == "Normal1" # Then normal messages + sentMessages[3] == "Normal2" + sentMessages[4] == "Optional1" # Finally optional + + asyncTest "rate limiting - respects message quota per epoch": + ## Given + let capacity = 3 # Only 3 messages allowed + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - queue more messages than the limit + for i in 1..5: + manager.queueForSend("msg" & $i, "Message" & $i, Normal) + + await sleepAsync(chronos.milliseconds(300)) + + ## Then + let quota = manager.getCurrentQuota() + let queueStatus = manager.getQueueStatus() + + check: + quota.used == 3 + quota.remaining == 0 + sentMessages.len == 3 + queueStatus.total == 2 # 2 messages should be queued + + await manager.stop() + + asyncTest "optional message dropping at high usage": + ## Given + let capacity = 10 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + # Fill to 80% capacity to trigger optional dropping + for i in 1..8: + manager.queueForSend("fill" & $i, "Fill" & $i, Normal) + + await sleepAsync(chronos.milliseconds(200)) + + ## When - add messages at high usage + manager.queueForSend("critical", "Critical", Critical) + manager.queueForSend("normal", "Normal", Normal) + manager.queueForSend("optional", "Optional", Optional) # Should be dropped + + await sleepAsync(chronos.milliseconds(200)) + + ## Then + let quota = manager.getCurrentQuota() + let optionalSent = sentMessages.anyIt(it == "Optional") + + check: + quota.used == 10 + not optionalSent # Optional message should not be sent + + await manager.stop() + + asyncTest "quota tracking - accurate total, used, and remaining counts": + ## Given + let capacity = 5 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - initially no messages sent + let initialQuota = manager.getCurrentQuota() + + ## Then - should show full quota available + check: + initialQuota.total == 5 + initialQuota.used == 0 + initialQuota.remaining == 5 + + ## When - send some messages + manager.queueForSend("msg1", "Message1", Normal) + manager.queueForSend("msg2", "Message2", Normal) + + await sleepAsync(chronos.milliseconds(200)) + + ## Then - quota should be updated + let afterQuota = manager.getCurrentQuota() + check: + afterQuota.used == 2 + afterQuota.remaining == 3 + + await manager.stop() + + asyncTest "queue status tracking - by priority levels": + ## Given + let capacity = 2 # Small limit to force queueing + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - fill quota and add more messages + manager.queueForSend("msg1", "Message1", Critical) + manager.queueForSend("msg2", "Message2", Normal) + manager.queueForSend("msg3", "Message3", Critical) # Should be queued + manager.queueForSend("msg4", "Message4", Normal) # Should be queued + manager.queueForSend("msg5", "Message5", Optional) # Should be queued + + await sleepAsync(chronos.milliseconds(100)) + + ## Then + let queueStatus = manager.getQueueStatus() + + check: + queueStatus.total >= 2 # At least some messages queued + queueStatus.critical >= 1 # Critical messages in queue + queueStatus.normal >= 1 # Normal messages in queue + + await manager.stop() + +suite "Generic Type Support": + setup: + ## Given + let epochDuration = 60 + var sentCustomMessages: seq[TestMessage] + + proc testCustomSender(messageId: string, msg: TestMessage): Future[bool] {.async.} = + sentCustomMessages.add(msg) + await sleepAsync(chronos.milliseconds(10)) + return true + + asyncTest "custom message types - TestMessage objects": + ## Given + let capacity = 5 + var manager = newRateLimitManager(capacity, epochDuration, testCustomSender) + await manager.start() + + ## When + let testMsg = TestMessage(content: "Test content", id: 42) + manager.queueForSend("custom1", testMsg, Normal) + + await sleepAsync(chronos.milliseconds(200)) + await manager.stop() + + ## Then + check: + sentCustomMessages.len == 1 + sentCustomMessages[0].content == "Test content" + sentCustomMessages[0].id == 42 + + asyncTest "integer message types": + ## Given + let capacity = 5 + var sentInts: seq[int] + + proc testIntSender(messageId: string, msg: int): Future[bool] {.async.} = + sentInts.add(msg) + await sleepAsync(chronos.milliseconds(10)) + return true + + var manager = newRateLimitManager(capacity, epochDuration, testIntSender) + await manager.start() + + ## When + manager.queueForSend("int1", 42, Critical) + manager.queueForSend("int2", 100, Normal) + manager.queueForSend("int3", 999, Optional) + + await sleepAsync(chronos.milliseconds(200)) + await manager.stop() + + ## Then + check: + sentInts.len == 3 + sentInts[0] == 42 # Critical sent first + sentInts[1] == 100 # Normal sent second + sentInts[2] == 999 # Optional sent last \ No newline at end of file From 836a04e6ba320d8df231cd4f792bef14fabb2580 Mon Sep 17 00:00:00 2001 From: Pablo Lopez Date: Thu, 26 Jun 2025 22:45:47 +0300 Subject: [PATCH 02/10] fix: update criticality Co-authored-by: fryorcraken <110212804+fryorcraken@users.noreply.github.com> --- ratelimit/ratelimit.nim | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim index e2482e4..18b3a1f 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/ratelimit.nim @@ -3,9 +3,9 @@ import chronos type MessagePriority* = enum - Optional = 0 + Critical = 0 Normal = 1 - Critical = 2 + Optional = 2 QueuedMessage*[T] = object messageId*: string From f04e6ae7aa3ba58793569b04734072d14af573b4 Mon Sep 17 00:00:00 2001 From: pablo Date: Fri, 27 Jun 2025 08:50:48 +0300 Subject: [PATCH 03/10] fix: add ref object and serializable --- ratelimit/ratelimit.nim | 44 ++++++++++++++++++++-------------------- tests/test_ratelimit.nim | 11 ++++++++++ 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim index 18b3a1f..d634d82 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/ratelimit.nim @@ -2,24 +2,27 @@ import std/[times, deques] import chronos type + Serializable* = concept x + x.toBytes() is seq[byte] + MessagePriority* = enum Critical = 0 Normal = 1 Optional = 2 - QueuedMessage*[T] = object + QueuedMessage*[T: Serializable] = ref object of RootObj messageId*: string msg*: T priority*: MessagePriority queuedAt*: float - MessageSender*[T] = proc(messageId: string, msg: T): Future[bool] {.async.} + MessageSender*[T: Serializable] = proc(messageId: string, msg: T): Future[bool] {.async.} - RateLimitManager*[T] = ref object + RateLimitManager*[T: Serializable] = ref object messageCount*: int = 100 # Default to 100 messages epochDurationSec*: int = 600 # Default to 10 minutes currentCount*: int - currentEpoch*: int64 + lastResetTime*: float criticalQueue*: Deque[QueuedMessage[T]] normalQueue*: Deque[QueuedMessage[T]] optionalQueue*: Deque[QueuedMessage[T]] @@ -27,15 +30,12 @@ type isRunning*: bool sendTask*: Future[void] -proc getCurrentEpoch(epochDurationSec: int): int64 = - int64(epochTime() / float(epochDurationSec)) - -proc newRateLimitManager*[T](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] = +proc newRateLimitManager*[T: Serializable](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] = RateLimitManager[T]( messageCount: messageCount, epochDurationSec: epochDurationSec, currentCount: 0, - currentEpoch: getCurrentEpoch(epochDurationSec), + lastResetTime: epochTime(), criticalQueue: initDeque[QueuedMessage[T]](), normalQueue: initDeque[QueuedMessage[T]](), optionalQueue: initDeque[QueuedMessage[T]](), @@ -43,18 +43,18 @@ proc newRateLimitManager*[T](messageCount: int, epochDurationSec: int, sender: M isRunning: false ) -proc updateEpochIfNeeded[T](manager: RateLimitManager[T]) = - let newEpoch = getCurrentEpoch(manager.epochDurationSec) - if newEpoch > manager.currentEpoch: - manager.currentEpoch = newEpoch +proc updateEpochIfNeeded[T: Serializable](manager: RateLimitManager[T]) = + let now = epochTime() + if now - manager.lastResetTime >= float(manager.epochDurationSec): + manager.lastResetTime = now manager.currentCount = 0 -proc getUsagePercent[T](manager: RateLimitManager[T]): float = +proc getUsagePercent[T: Serializable](manager: RateLimitManager[T]): float = if manager.messageCount == 0: return 1.0 float(manager.currentCount) / float(manager.messageCount) -proc queueForSend*[T](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) = +proc queueForSend*[T: Serializable](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) = manager.updateEpochIfNeeded() let queuedMsg = QueuedMessage[T]( @@ -94,7 +94,7 @@ proc queueForSend*[T](manager: RateLimitManager[T], messageId: string, msg: T, p of Optional: manager.optionalQueue.addLast(queuedMsg) -proc getNextMessage[T](manager: RateLimitManager[T]): QueuedMessage[T] = +proc getNextMessage[T: Serializable](manager: RateLimitManager[T]): QueuedMessage[T] = # Priority order: Critical -> Normal -> Optional if manager.criticalQueue.len > 0: return manager.criticalQueue.popFirst() @@ -105,10 +105,10 @@ proc getNextMessage[T](manager: RateLimitManager[T]): QueuedMessage[T] = else: raise newException(ValueError, "No messages in queue") -proc hasMessages[T](manager: RateLimitManager[T]): bool = +proc hasMessages[T: Serializable](manager: RateLimitManager[T]): bool = manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0 -proc sendLoop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = +proc sendLoop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = manager.isRunning = true while manager.isRunning: @@ -153,11 +153,11 @@ proc sendLoop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = except: await sleepAsync(chronos.seconds(1)) # Wait longer on error -proc start*[T](manager: RateLimitManager[T]): Future[void] {.async.} = +proc start*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = if not manager.isRunning: manager.sendTask = manager.sendLoop() -proc stop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = +proc stop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = if manager.isRunning: manager.isRunning = false if not manager.sendTask.isNil: @@ -167,7 +167,7 @@ proc stop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = except CancelledError: discard -proc getQueueStatus*[T](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] = +proc getQueueStatus*[T: Serializable](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] = ( critical: manager.criticalQueue.len, normal: manager.normalQueue.len, @@ -175,7 +175,7 @@ proc getQueueStatus*[T](manager: RateLimitManager[T]): tuple[critical: int, norm total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len ) -proc getCurrentQuota*[T](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] = +proc getCurrentQuota*[T: Serializable](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] = ( total: manager.messageCount, used: manager.currentCount, diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim index 0598b3f..e5f7e5e 100644 --- a/tests/test_ratelimit.nim +++ b/tests/test_ratelimit.nim @@ -12,6 +12,17 @@ type content: string id: int +# Implement Serializable for test types +proc toBytes*(s: string): seq[byte] = + cast[seq[byte]](s) + +proc toBytes*(msg: TestMessage): seq[byte] = + result = toBytes(msg.content) + result.add(cast[seq[byte]](@[byte(msg.id)])) + +proc toBytes*(i: int): seq[byte] = + cast[seq[byte]](@[byte(i)]) + suite "Rate Limit Manager": setup: ## Given From 1d8ad697c272e66a7324b0a79c2e89aa5634a83d Mon Sep 17 00:00:00 2001 From: pablo Date: Sun, 6 Jul 2025 13:35:33 +0300 Subject: [PATCH 04/10] fix: usage of nwaku's token_bucket --- chat_sdk.nimble | 7 +- ratelimit/ratelimit.nim | 327 ++++++++++++------------- tests/test_ratelimit.nim | 499 ++++++++++++++++++++++----------------- 3 files changed, 455 insertions(+), 378 deletions(-) diff --git a/chat_sdk.nimble b/chat_sdk.nimble index 186ec7b..cd2c463 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -12,10 +12,11 @@ srcDir = "src" requires "nim >= 2.2.4", "chronicles", "chronos", - "db_connector" + "db_connector", + "waku" task buildSharedLib, "Build shared library for C bindings": - exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" + exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" task buildStaticLib, "Build static library for C bindings": - exec "nim c --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim" + exec "nim c --mm:refc --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim" diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim index d634d82..9bd1bf6 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/ratelimit.nim @@ -1,183 +1,192 @@ -import std/[times, deques] +import std/[times, deques, options] +# import ./token_bucket +import waku/common/rate_limit/token_bucket import chronos type - Serializable* = concept x - x.toBytes() is seq[byte] + CapacityState = enum + Normal = 0 + AlmostNone = 1 + None = 2 - MessagePriority* = enum + SendResult* = enum + PassedToSender = 0 + Enqueued = 1 + Dropped = 2 + DroppedBatchTooLarge = 3 + + Priority* = enum Critical = 0 Normal = 1 Optional = 2 - QueuedMessage*[T: Serializable] = ref object of RootObj - messageId*: string - msg*: T - priority*: MessagePriority - queuedAt*: float + Serializable* = + concept x + x.toBytes() is seq[byte] - MessageSender*[T: Serializable] = proc(messageId: string, msg: T): Future[bool] {.async.} + MessageSender*[T: Serializable] = + proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.} RateLimitManager*[T: Serializable] = ref object - messageCount*: int = 100 # Default to 100 messages - epochDurationSec*: int = 600 # Default to 10 minutes - currentCount*: int - lastResetTime*: float - criticalQueue*: Deque[QueuedMessage[T]] - normalQueue*: Deque[QueuedMessage[T]] - optionalQueue*: Deque[QueuedMessage[T]] - sender*: MessageSender[T] - isRunning*: bool - sendTask*: Future[void] + bucket: TokenBucket + sender: MessageSender[T] + running: bool + queueCritical: Deque[seq[tuple[msgId: string, msg: T]]] + queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] + sleepDuration: chronos.Duration -proc newRateLimitManager*[T: Serializable](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] = +proc newRateLimitManager*[T: Serializable]( + sender: MessageSender[T], + capacity: int = 100, + duration: chronos.Duration = chronos.minutes(10), + sleepDuration: chronos.Duration = chronos.milliseconds(1000), +): RateLimitManager[T] = RateLimitManager[T]( - messageCount: messageCount, - epochDurationSec: epochDurationSec, - currentCount: 0, - lastResetTime: epochTime(), - criticalQueue: initDeque[QueuedMessage[T]](), - normalQueue: initDeque[QueuedMessage[T]](), - optionalQueue: initDeque[QueuedMessage[T]](), + bucket: TokenBucket.newStrict(capacity, duration), sender: sender, - isRunning: false + running: false, + queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](), + queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](), + sleepDuration: sleepDuration, ) -proc updateEpochIfNeeded[T: Serializable](manager: RateLimitManager[T]) = - let now = epochTime() - if now - manager.lastResetTime >= float(manager.epochDurationSec): - manager.lastResetTime = now - manager.currentCount = 0 - -proc getUsagePercent[T: Serializable](manager: RateLimitManager[T]): float = - if manager.messageCount == 0: - return 1.0 - float(manager.currentCount) / float(manager.messageCount) - -proc queueForSend*[T: Serializable](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) = - manager.updateEpochIfNeeded() - - let queuedMsg = QueuedMessage[T]( - messageId: messageId, - msg: msg, - priority: priority, - queuedAt: epochTime() - ) - - let usagePercent = manager.getUsagePercent() - - if usagePercent >= 1.0: - # Quota exhausted - queue critical on top, queue normal, drop optional - case priority: - of Critical: - manager.criticalQueue.addLast(queuedMsg) - of Normal: - manager.normalQueue.addLast(queuedMsg) - of Optional: - discard # Drop optional messages when quota exhausted - elif usagePercent >= 0.7: - # Low quota - send critical, queue normal, drop optional - case priority: - of Critical: - manager.criticalQueue.addLast(queuedMsg) - of Normal: - manager.normalQueue.addLast(queuedMsg) - of Optional: - discard # Drop optional messages when quota low +proc getCapacityState[T: Serializable]( + manager: RateLimitManager[T], now: Moment, count: int = 1 +): CapacityState = + let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now) + let countAfter = budget - count + let ratio = countAfter.float / budgetCap.float + if ratio < 0.0: + return CapacityState.None + elif ratio < 0.3: + return CapacityState.AlmostNone else: - # Normal operation - queue all messages - case priority: - of Critical: - manager.criticalQueue.addLast(queuedMsg) - of Normal: - manager.normalQueue.addLast(queuedMsg) - of Optional: - manager.optionalQueue.addLast(queuedMsg) + return CapacityState.Normal -proc getNextMessage[T: Serializable](manager: RateLimitManager[T]): QueuedMessage[T] = - # Priority order: Critical -> Normal -> Optional - if manager.criticalQueue.len > 0: - return manager.criticalQueue.popFirst() - elif manager.normalQueue.len > 0: - return manager.normalQueue.popFirst() - elif manager.optionalQueue.len > 0: - return manager.optionalQueue.popFirst() - else: - raise newException(ValueError, "No messages in queue") +proc passToSender[T: Serializable]( + manager: RateLimitManager[T], + msgs: seq[tuple[msgId: string, msg: T]], + now: Moment, + priority: Priority, +): Future[SendResult] {.async.} = + let count = msgs.len + let capacity = manager.bucket.tryConsume(count, now) + if not capacity: + case priority + of Priority.Critical: + manager.queueCritical.addLast(msgs) + return SendResult.Enqueued + of Priority.Normal: + manager.queueNormal.addLast(msgs) + return SendResult.Enqueued + of Priority.Optional: + return SendResult.Dropped + await manager.sender(msgs) + return SendResult.PassedToSender -proc hasMessages[T: Serializable](manager: RateLimitManager[T]): bool = - manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0 - -proc sendLoop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = - manager.isRunning = true - - while manager.isRunning: - try: - manager.updateEpochIfNeeded() - - while manager.hasMessages() and manager.currentCount < manager.messageCount: - let msg = manager.getNextMessage() - - try: - let sent = await manager.sender(msg.messageId, msg.msg) - if sent: - manager.currentCount += 1 - else: - # Re-queue failed message at beginning of appropriate queue - case msg.priority: - of Critical: - manager.criticalQueue.addFirst(msg) - of Normal: - manager.normalQueue.addFirst(msg) - of Optional: - manager.optionalQueue.addFirst(msg) - break # Stop trying to send more messages if one fails - except: - # Re-queue on exception - case msg.priority: - of Critical: - manager.criticalQueue.addFirst(msg) - of Normal: - manager.normalQueue.addFirst(msg) - of Optional: - manager.optionalQueue.addFirst(msg) - break - - # Small delay between messages TODO not sure if needed - await sleepAsync(chronos.milliseconds(10)) - - # Wait before next processing cycle TODO not completely sure if this is the way - await sleepAsync(chronos.milliseconds(100)) - except CancelledError: +proc processCriticalQueue[T: Serializable]( + manager: RateLimitManager[T], now: Moment +): Future[void] {.async.} = + while manager.queueCritical.len > 0: + let msgs = manager.queueCritical.popFirst() + let capacityState = manager.getCapacityState(now, msgs.len) + if capacityState == CapacityState.Normal: + discard await manager.passToSender(msgs, now, Priority.Critical) + elif capacityState == CapacityState.AlmostNone: + discard await manager.passToSender(msgs, now, Priority.Critical) + else: + # add back to critical queue + manager.queueCritical.addFirst(msgs) break - except: - await sleepAsync(chronos.seconds(1)) # Wait longer on error -proc start*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = - if not manager.isRunning: - manager.sendTask = manager.sendLoop() +proc processNormalQueue[T: Serializable]( + manager: RateLimitManager[T], now: Moment +): Future[void] {.async.} = + while manager.queueNormal.len > 0: + let msgs = manager.queueNormal.popFirst() + let capacityState = manager.getCapacityState(now, msgs.len) + if capacityState == CapacityState.Normal: + discard await manager.passToSender(msgs, now, Priority.Normal) + else: + # add back to critical queue + manager.queueNormal.addFirst(msgs) + break -proc stop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = - if manager.isRunning: - manager.isRunning = false - if not manager.sendTask.isNil: - manager.sendTask.cancelSoon() - try: - await manager.sendTask - except CancelledError: - discard +proc sendOrEnqueue*[T: Serializable]( + manager: RateLimitManager[T], + msgs: seq[tuple[msgId: string, msg: T]], + priority: Priority, + now: Moment = Moment.now(), +): Future[SendResult] {.async.} = + let (_, budgetCap) = manager.bucket.getAvailableCapacity(now) + if msgs.len.float / budgetCap.float >= 0.3: + # drop batch if it's too large to avoid starvation + return SendResult.DroppedBatchTooLarge -proc getQueueStatus*[T: Serializable](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] = - ( - critical: manager.criticalQueue.len, - normal: manager.normalQueue.len, - optional: manager.optionalQueue.len, - total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len - ) + let capacityState = manager.getCapacityState(now, msgs.len) + case capacityState + of CapacityState.Normal: + return await manager.passToSender(msgs, now, priority) + of CapacityState.AlmostNone: + case priority + of Priority.Critical: + return await manager.passToSender(msgs, now, priority) + of Priority.Normal: + manager.queueNormal.addLast(msgs) + return SendResult.Enqueued + of Priority.Optional: + return SendResult.Dropped + of CapacityState.None: + case priority + of Priority.Critical: + manager.queueCritical.addLast(msgs) + return SendResult.Enqueued + of Priority.Normal: + manager.queueNormal.addLast(msgs) + return SendResult.Enqueued + of Priority.Optional: + return SendResult.Dropped -proc getCurrentQuota*[T: Serializable](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] = - ( - total: manager.messageCount, - used: manager.currentCount, - remaining: max(0, manager.messageCount - manager.currentCount) - ) \ No newline at end of file +proc getEnqueued*[T: Serializable]( + manager: RateLimitManager[T] +): tuple[ + critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] +] = + var criticalMsgs: seq[tuple[msgId: string, msg: T]] + var normalMsgs: seq[tuple[msgId: string, msg: T]] + + for batch in manager.queueCritical: + criticalMsgs.add(batch) + + for batch in manager.queueNormal: + normalMsgs.add(batch) + + return (criticalMsgs, normalMsgs) + +proc start*[T: Serializable]( + manager: RateLimitManager[T], + nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = + Moment.now(), +): Future[void] {.async.} = + manager.running = true + while manager.running: + try: + let now = nowProvider() + await manager.processCriticalQueue(now) + await manager.processNormalQueue(now) + + # configurable sleep duration for processing queued messages + await sleepAsync(manager.sleepDuration) + except Exception as e: + echo "Error in queue processing: ", e.msg + await sleepAsync(manager.sleepDuration) + +proc stop*[T: Serializable](manager: RateLimitManager[T]) = + manager.running = false + +func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = + if isNil(b): + return "nil" + return + "RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len & + ", normal: " & $b.queueNormal.len & ")" diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim index e5f7e5e..f38ddbf 100644 --- a/tests/test_ratelimit.nim +++ b/tests/test_ratelimit.nim @@ -1,244 +1,311 @@ {.used.} -import std/[sequtils, times, random], testutils/unittests +import testutils/unittests import ../ratelimit/ratelimit import chronos +import strutils -randomize() - -# Test message types -type - TestMessage = object - content: string - id: int - -# Implement Serializable for test types +# Implement the Serializable concept for string proc toBytes*(s: string): seq[byte] = cast[seq[byte]](s) -proc toBytes*(msg: TestMessage): seq[byte] = - result = toBytes(msg.content) - result.add(cast[seq[byte]](@[byte(msg.id)])) - -proc toBytes*(i: int): seq[byte] = - cast[seq[byte]](@[byte(i)]) - -suite "Rate Limit Manager": +suite "Queue RateLimitManager": setup: - ## Given - let epochDuration = 60 - var sentMessages: seq[string] - - proc testSender(messageId: string, msg: string): Future[bool] {.async.} = - sentMessages.add(msg) - await sleepAsync(chronos.milliseconds(10)) - return true + var sentMessages: seq[tuple[msgId: string, msg: string]] + var senderCallCount: int = 0 - asyncTest "basic message queueing and sending": + # Create a mock sender + proc mockSender( + msgs: seq[tuple[msgId: string, msg: string]] + ): Future[void] {.async.} = + senderCallCount.inc() + for msg in msgs: + sentMessages.add(msg) + await sleepAsync(chronos.milliseconds(10)) + + asyncTest "sendOrEnqueue - immediate send when capacity available": ## Given - let capacity = 10 - var manager = newRateLimitManager(capacity, epochDuration, testSender) - await manager.start() - + let manager = newRateLimitManager[string]( + mockSender, capacity = 10, duration = chronos.milliseconds(100) + ) + let testMsg = "Hello World" + ## When - manager.queueForSend("msg1", "Hello", Critical) - manager.queueForSend("msg2", "World", Normal) - - await sleepAsync(chronos.milliseconds(200)) - await manager.stop() - + let res = await manager.sendOrEnqueue(@[("msg1", testMsg)], Critical) + ## Then check: + res == PassedToSender + senderCallCount == 1 + sentMessages.len == 1 + sentMessages[0].msgId == "msg1" + sentMessages[0].msg == "Hello World" + + asyncTest "sendOrEnqueue - multiple messages": + ## Given + let manager = newRateLimitManager[string]( + mockSender, capacity = 10, duration = chronos.milliseconds(100) + ) + + ## When + let res = + await manager.sendOrEnqueue(@[("msg1", "First"), ("msg2", "Second")], Normal) + + ## Then + check: + res == PassedToSender + senderCallCount == 1 sentMessages.len == 2 - sentMessages[0] == "Hello" - sentMessages[1] == "World" + sentMessages[0].msgId == "msg1" + sentMessages[0].msg == "First" + sentMessages[1].msgId == "msg2" + sentMessages[1].msg == "Second" - asyncTest "priority ordering - critical first, then normal, then optional": + asyncTest "start and stop - basic functionality": ## Given - let capacity = 10 - var manager = newRateLimitManager(capacity, epochDuration, testSender) - await manager.start() - - ## When - queue messages in mixed priority order - manager.queueForSend("normal1", "Normal1", Normal) - manager.queueForSend("critical1", "Critical1", Critical) - manager.queueForSend("optional1", "Optional1", Optional) - manager.queueForSend("critical2", "Critical2", Critical) - manager.queueForSend("normal2", "Normal2", Normal) - - await sleepAsync(chronos.milliseconds(300)) - await manager.stop() - - ## Then - critical messages should be sent first - check: - sentMessages.len == 5 - sentMessages[0] == "Critical1" # First critical - sentMessages[1] == "Critical2" # Second critical - sentMessages[2] == "Normal1" # Then normal messages - sentMessages[3] == "Normal2" - sentMessages[4] == "Optional1" # Finally optional + let manager = newRateLimitManager[string]( + mockSender, + capacity = 10, + duration = chronos.milliseconds(100), + sleepDuration = chronos.milliseconds(50), + ) - asyncTest "rate limiting - respects message quota per epoch": - ## Given - let capacity = 3 # Only 3 messages allowed - var manager = newRateLimitManager(capacity, epochDuration, testSender) - await manager.start() - - ## When - queue more messages than the limit - for i in 1..5: - manager.queueForSend("msg" & $i, "Message" & $i, Normal) - - await sleepAsync(chronos.milliseconds(300)) - - ## Then - let quota = manager.getCurrentQuota() - let queueStatus = manager.getQueueStatus() - - check: - quota.used == 3 - quota.remaining == 0 - sentMessages.len == 3 - queueStatus.total == 2 # 2 messages should be queued - - await manager.stop() + ## When - start the manager + let startFut = manager.start() - asyncTest "optional message dropping at high usage": - ## Given - let capacity = 10 - var manager = newRateLimitManager(capacity, epochDuration, testSender) - await manager.start() - - # Fill to 80% capacity to trigger optional dropping - for i in 1..8: - manager.queueForSend("fill" & $i, "Fill" & $i, Normal) - - await sleepAsync(chronos.milliseconds(200)) - - ## When - add messages at high usage - manager.queueForSend("critical", "Critical", Critical) - manager.queueForSend("normal", "Normal", Normal) - manager.queueForSend("optional", "Optional", Optional) # Should be dropped - - await sleepAsync(chronos.milliseconds(200)) - - ## Then - let quota = manager.getCurrentQuota() - let optionalSent = sentMessages.anyIt(it == "Optional") - - check: - quota.used == 10 - not optionalSent # Optional message should not be sent - - await manager.stop() + # Give it some time to start + await sleepAsync(chronos.milliseconds(20)) - asyncTest "quota tracking - accurate total, used, and remaining counts": - ## Given - let capacity = 5 - var manager = newRateLimitManager(capacity, epochDuration, testSender) - await manager.start() - - ## When - initially no messages sent - let initialQuota = manager.getCurrentQuota() - - ## Then - should show full quota available + ## Then - manager should be running (check via string representation) check: - initialQuota.total == 5 - initialQuota.used == 0 - initialQuota.remaining == 5 - - ## When - send some messages - manager.queueForSend("msg1", "Message1", Normal) - manager.queueForSend("msg2", "Message2", Normal) - - await sleepAsync(chronos.milliseconds(200)) - - ## Then - quota should be updated - let afterQuota = manager.getCurrentQuota() - check: - afterQuota.used == 2 - afterQuota.remaining == 3 - - await manager.stop() + "running: true" in $manager + not startFut.finished() # should still be running - asyncTest "queue status tracking - by priority levels": - ## Given - let capacity = 2 # Small limit to force queueing - var manager = newRateLimitManager(capacity, epochDuration, testSender) - await manager.start() - - ## When - fill quota and add more messages - manager.queueForSend("msg1", "Message1", Critical) - manager.queueForSend("msg2", "Message2", Normal) - manager.queueForSend("msg3", "Message3", Critical) # Should be queued - manager.queueForSend("msg4", "Message4", Normal) # Should be queued - manager.queueForSend("msg5", "Message5", Optional) # Should be queued - - await sleepAsync(chronos.milliseconds(100)) - - ## Then - let queueStatus = manager.getQueueStatus() - - check: - queueStatus.total >= 2 # At least some messages queued - queueStatus.critical >= 1 # Critical messages in queue - queueStatus.normal >= 1 # Normal messages in queue - - await manager.stop() + ## When - stop the manager + manager.stop() -suite "Generic Type Support": - setup: - ## Given - let epochDuration = 60 - var sentCustomMessages: seq[TestMessage] - - proc testCustomSender(messageId: string, msg: TestMessage): Future[bool] {.async.} = - sentCustomMessages.add(msg) - await sleepAsync(chronos.milliseconds(10)) - return true + # Give it time to stop (should happen within one sleep cycle) + await sleepAsync(chronos.milliseconds(60)) - asyncTest "custom message types - TestMessage objects": - ## Given - let capacity = 5 - var manager = newRateLimitManager(capacity, epochDuration, testCustomSender) - await manager.start() - - ## When - let testMsg = TestMessage(content: "Test content", id: 42) - manager.queueForSend("custom1", testMsg, Normal) - - await sleepAsync(chronos.milliseconds(200)) - await manager.stop() - - ## Then + ## Then - manager should be stopped check: - sentCustomMessages.len == 1 - sentCustomMessages[0].content == "Test content" - sentCustomMessages[0].id == 42 + "running: false" in $manager + startFut.finished() # should have completed - asyncTest "integer message types": + asyncTest "start and stop - drop large batch": ## Given - let capacity = 5 - var sentInts: seq[int] - - proc testIntSender(messageId: string, msg: int): Future[bool] {.async.} = - sentInts.add(msg) - await sleepAsync(chronos.milliseconds(10)) - return true - - var manager = newRateLimitManager(capacity, epochDuration, testIntSender) - await manager.start() - - ## When - manager.queueForSend("int1", 42, Critical) - manager.queueForSend("int2", 100, Normal) - manager.queueForSend("int3", 999, Optional) - - await sleepAsync(chronos.milliseconds(200)) - await manager.stop() - - ## Then + let manager = newRateLimitManager[string]( + mockSender, + capacity = 2, + duration = chronos.milliseconds(100), + sleepDuration = chronos.milliseconds(30), + ) + + let largeBatch1 = @[("msg1", "First"), ("msg2", "Second"), ("msg3", "Third")] + let largeBatch2 = @[("msg4", "Fourth"), ("msg5", "Fifth")] + + discard await manager.sendOrEnqueue(largeBatch1, Normal) + discard await manager.sendOrEnqueue(largeBatch2, Critical) + + asyncTest "enqueue - enqueue critical only when exceeded": + ## Given + let manager = newRateLimitManager[string]( + mockSender, + capacity = 10, + duration = chronos.milliseconds(100), + sleepDuration = chronos.milliseconds(100), + ) + var now = Moment.now() + let startFut = manager.start( + nowProvider = proc(): Moment = + now + ) + + let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now) + let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now) + let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Critical, now) + let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Critical, now) + let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Critical, now) + let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Critical, now) + let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Critical, now) + let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Critical, now) + let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Critical, now) + let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Critical, now) + # will enqueue to critical queue + let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now) + check: - sentInts.len == 3 - sentInts[0] == 42 # Critical sent first - sentInts[1] == 100 # Normal sent second - sentInts[2] == 999 # Optional sent last \ No newline at end of file + r1 == PassedToSender + r2 == PassedToSender + r3 == PassedToSender + r4 == PassedToSender + r5 == PassedToSender + r6 == PassedToSender + r7 == PassedToSender + r8 == PassedToSender + r9 == PassedToSender + r10 == PassedToSender + r11 == Enqueued + + let (critical, normal) = manager.getEnqueued() + check: + critical.len == 1 + normal.len == 0 + critical[0].msgId == "msg11" + + # Give it time to stop + manager.stop() + await sleepAsync(chronos.milliseconds(150)) + ## Then - should be stopped + check: + "running: false" in $manager + startFut.finished() + + asyncTest "enqueue - enqueue normal on 70% capacity": + ## Given + let manager = newRateLimitManager[string]( + mockSender, + capacity = 10, + duration = chronos.milliseconds(100), + sleepDuration = chronos.milliseconds(100), + ) + var now = Moment.now() + let startFut = manager.start( + nowProvider = proc(): Moment = + now + ) + + let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now) + let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now) + let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Normal, now) + let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Normal, now) + let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Normal, now) + let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Normal, now) + let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Normal, now) + let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Normal, now) + let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Normal, now) + let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Normal, now) + let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now) + let r12 = await manager.sendOrEnqueue(@[("msg12", "12")], Optional, now) + + check: + r1 == PassedToSender + r2 == PassedToSender + r3 == PassedToSender + r4 == PassedToSender + r5 == PassedToSender + r6 == PassedToSender + r7 == PassedToSender + r8 == Enqueued + r9 == Enqueued + r10 == Enqueued + r11 == PassedToSender + r12 == Dropped + + let (critical, normal) = manager.getEnqueued() + check: + critical.len == 0 + normal.len == 3 + normal[0].msgId == "msg8" + normal[1].msgId == "msg9" + normal[2].msgId == "msg10" + + # Give it time to stop + manager.stop() + await sleepAsync(chronos.milliseconds(150)) + ## Then - should be stopped + check: + "running: false" in $manager + startFut.finished() + + asyncTest "enqueue - process queued messages": + ## Given + let manager = newRateLimitManager[string]( + mockSender, + capacity = 10, + duration = chronos.milliseconds(200), + sleepDuration = chronos.milliseconds(200), + ) + type MomentRef = ref object + value: Moment + + var now = Moment.now() + var nowRef = MomentRef(value: now) + let startFut = manager.start( + nowProvider = proc(): Moment = + return nowRef.value + ) + + let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now) + let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now) + let r3 = await manager.sendOrEnqueue(@[("3", "val_3")], Normal, now) + let r4 = await manager.sendOrEnqueue(@[("4", "val_4")], Normal, now) + let r5 = await manager.sendOrEnqueue(@[("5", "val_5")], Normal, now) + let r6 = await manager.sendOrEnqueue(@[("6", "val_6")], Normal, now) + let r7 = await manager.sendOrEnqueue(@[("7", "val_7")], Normal, now) + let r8 = await manager.sendOrEnqueue(@[("8", "val_8")], Normal, now) + let r9 = await manager.sendOrEnqueue(@[("9", "val_9")], Normal, now) + let r10 = await manager.sendOrEnqueue(@[("10", "val_10")], Normal, now) + let r11 = await manager.sendOrEnqueue(@[("11", "val_11")], Critical, now) + let r12 = await manager.sendOrEnqueue(@[("12", "val_12")], Optional, now) + let r13 = await manager.sendOrEnqueue(@[("13", "val_13")], Critical, now) + let r14 = await manager.sendOrEnqueue(@[("14", "val_14")], Critical, now) + let r15 = await manager.sendOrEnqueue(@[("15", "val_15")], Critical, now) + + check: + r1 == PassedToSender + r2 == PassedToSender + r3 == PassedToSender + r4 == PassedToSender + r5 == PassedToSender + r6 == PassedToSender + r7 == PassedToSender + r8 == Enqueued + r9 == Enqueued + r10 == Enqueued + r11 == PassedToSender + r12 == Dropped + r13 == PassedToSender + r14 == PassedToSender + r15 == Enqueued + + var (critical, normal) = manager.getEnqueued() + check: + critical.len == 1 + normal.len == 3 + normal[0].msgId == "8" + normal[1].msgId == "9" + normal[2].msgId == "10" + critical[0].msgId == "15" + + nowRef.value = now + chronos.milliseconds(250) + await sleepAsync(chronos.milliseconds(250)) + + (critical, normal) = manager.getEnqueued() + check: + critical.len == 0 + normal.len == 0 + senderCallCount == 14 + sentMessages.len == 14 + sentMessages[0].msgId == "1" + sentMessages[1].msgId == "2" + sentMessages[2].msgId == "3" + sentMessages[3].msgId == "4" + sentMessages[4].msgId == "5" + sentMessages[5].msgId == "6" + sentMessages[6].msgId == "7" + sentMessages[7].msgId == "11" + sentMessages[8].msgId == "13" + sentMessages[9].msgId == "14" + sentMessages[10].msgId == "15" + sentMessages[11].msgId == "8" + sentMessages[12].msgId == "9" + sentMessages[13].msgId == "10" + + # Give it time to stop + manager.stop() + await sleepAsync(chronos.milliseconds(250)) + ## Then - should be stopped + check: + "running: false" in $manager + startFut.finished() From b9e80bcc373d817bfe5e4b6038b682ba5df3e248 Mon Sep 17 00:00:00 2001 From: Pablo Lopez Date: Wed, 16 Jul 2025 16:10:23 +0300 Subject: [PATCH 05/10] Apply suggestions from code review Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- ratelimit/ratelimit.nim | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim index 9bd1bf6..3111b1b 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/ratelimit.nim @@ -4,10 +4,10 @@ import waku/common/rate_limit/token_bucket import chronos type - CapacityState = enum - Normal = 0 - AlmostNone = 1 - None = 2 + CapacityState {.pure.} = enum + Normal + AlmostNone + None SendResult* = enum PassedToSender = 0 @@ -19,23 +19,23 @@ type Critical = 0 Normal = 1 Optional = 2 - + MsgIdMsg = tuple[msgId: string, msg: T] Serializable* = concept x x.toBytes() is seq[byte] MessageSender*[T: Serializable] = - proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.} + proc(msgs: seq[tuple[msgId: string, msg: T]]) {.async.} RateLimitManager*[T: Serializable] = ref object bucket: TokenBucket sender: MessageSender[T] running: bool - queueCritical: Deque[seq[tuple[msgId: string, msg: T]]] + queueCritical: Deque[tuple[msgId: string, msg: T]] queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] sleepDuration: chronos.Duration -proc newRateLimitManager*[T: Serializable]( +proc new*[T: Serializable]( sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), @@ -167,19 +167,19 @@ proc start*[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), -): Future[void] {.async.} = +) {.async.} = manager.running = true - while manager.running: + while true: try: let now = nowProvider() await manager.processCriticalQueue(now) await manager.processNormalQueue(now) # configurable sleep duration for processing queued messages - await sleepAsync(manager.sleepDuration) except Exception as e: echo "Error in queue processing: ", e.msg - await sleepAsync(manager.sleepDuration) + + await sleepAsync(manager.sleepDuration) proc stop*[T: Serializable](manager: RateLimitManager[T]) = manager.running = false From 6b514f6f216ead66e3a30dfebcb7470f7e3967cd Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 16 Jul 2025 18:48:30 +0300 Subject: [PATCH 06/10] fix: pr comments --- .../{ratelimit.nim => rate_limit_manager.nim} | 58 +++++++------ ...elimit.nim => test_rate_limit_manager.nim} | 84 ++++--------------- 2 files changed, 47 insertions(+), 95 deletions(-) rename ratelimit/{ratelimit.nim => rate_limit_manager.nim} (80%) rename tests/{test_ratelimit.nim => test_rate_limit_manager.nim} (81%) diff --git a/ratelimit/ratelimit.nim b/ratelimit/rate_limit_manager.nim similarity index 80% rename from ratelimit/ratelimit.nim rename to ratelimit/rate_limit_manager.nim index 3111b1b..80c2362 100644 --- a/ratelimit/ratelimit.nim +++ b/ratelimit/rate_limit_manager.nim @@ -1,5 +1,4 @@ import std/[times, deques, options] -# import ./token_bucket import waku/common/rate_limit/token_bucket import chronos @@ -19,34 +18,35 @@ type Critical = 0 Normal = 1 Optional = 2 - MsgIdMsg = tuple[msgId: string, msg: T] + Serializable* = concept x x.toBytes() is seq[byte] - MessageSender*[T: Serializable] = - proc(msgs: seq[tuple[msgId: string, msg: T]]) {.async.} + MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T] + + MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.} RateLimitManager*[T: Serializable] = ref object bucket: TokenBucket sender: MessageSender[T] - running: bool - queueCritical: Deque[tuple[msgId: string, msg: T]] - queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] + queueCritical: Deque[seq[MsgIdMsg[T]]] + queueNormal: Deque[seq[MsgIdMsg[T]]] sleepDuration: chronos.Duration + pxQueueHandleLoop: Future[void] proc new*[T: Serializable]( + M: type[RateLimitManager[T]], sender: MessageSender[T], capacity: int = 100, duration: chronos.Duration = chronos.minutes(10), sleepDuration: chronos.Duration = chronos.milliseconds(1000), -): RateLimitManager[T] = - RateLimitManager[T]( +): M = + M( bucket: TokenBucket.newStrict(capacity, duration), sender: sender, - running: false, - queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](), - queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](), + queueCritical: Deque[seq[MsgIdMsg[T]]](), + queueNormal: Deque[seq[MsgIdMsg[T]]](), sleepDuration: sleepDuration, ) @@ -65,7 +65,7 @@ proc getCapacityState[T: Serializable]( proc passToSender[T: Serializable]( manager: RateLimitManager[T], - msgs: seq[tuple[msgId: string, msg: T]], + msgs: seq[MsgIdMsg[T]], now: Moment, priority: Priority, ): Future[SendResult] {.async.} = @@ -114,7 +114,7 @@ proc processNormalQueue[T: Serializable]( proc sendOrEnqueue*[T: Serializable]( manager: RateLimitManager[T], - msgs: seq[tuple[msgId: string, msg: T]], + msgs: seq[MsgIdMsg[T]], priority: Priority, now: Moment = Moment.now(), ): Future[SendResult] {.async.} = @@ -149,11 +149,9 @@ proc sendOrEnqueue*[T: Serializable]( proc getEnqueued*[T: Serializable]( manager: RateLimitManager[T] -): tuple[ - critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]] -] = - var criticalMsgs: seq[tuple[msgId: string, msg: T]] - var normalMsgs: seq[tuple[msgId: string, msg: T]] +): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] = + var criticalMsgs: seq[MsgIdMsg[T]] + var normalMsgs: seq[MsgIdMsg[T]] for batch in manager.queueCritical: criticalMsgs.add(batch) @@ -163,30 +161,36 @@ proc getEnqueued*[T: Serializable]( return (criticalMsgs, normalMsgs) -proc start*[T: Serializable]( +proc startQueueHandleLoop*[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = - manager.running = true while true: try: let now = nowProvider() await manager.processCriticalQueue(now) await manager.processNormalQueue(now) - - # configurable sleep duration for processing queued messages except Exception as e: echo "Error in queue processing: ", e.msg + # configurable sleep duration for processing queued messages await sleepAsync(manager.sleepDuration) -proc stop*[T: Serializable](manager: RateLimitManager[T]) = - manager.running = false +proc start*[T: Serializable]( + manager: RateLimitManager[T], + nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = + Moment.now(), +) {.async.} = + manager.pxQueueHandleLoop = manager.startQueueHandleLoop(nowProvider) + +proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} = + if not isNil(manager.pxQueueHandleLoop): + await manager.pxQueueHandleLoop.cancelAndWait() func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = if isNil(b): return "nil" return - "RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len & - ", normal: " & $b.queueNormal.len & ")" + "RateLimitManager(critical: " & $b.queueCritical.len & ", normal: " & + $b.queueNormal.len & ")" diff --git a/tests/test_ratelimit.nim b/tests/test_rate_limit_manager.nim similarity index 81% rename from tests/test_ratelimit.nim rename to tests/test_rate_limit_manager.nim index f38ddbf..6c34c54 100644 --- a/tests/test_ratelimit.nim +++ b/tests/test_rate_limit_manager.nim @@ -1,9 +1,6 @@ -{.used.} - import testutils/unittests -import ../ratelimit/ratelimit +import ../ratelimit/rate_limit_manager import chronos -import strutils # Implement the Serializable concept for string proc toBytes*(s: string): seq[byte] = @@ -25,7 +22,7 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - immediate send when capacity available": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100) ) let testMsg = "Hello World" @@ -43,7 +40,7 @@ suite "Queue RateLimitManager": asyncTest "sendOrEnqueue - multiple messages": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100) ) @@ -61,40 +58,9 @@ suite "Queue RateLimitManager": sentMessages[1].msgId == "msg2" sentMessages[1].msg == "Second" - asyncTest "start and stop - basic functionality": - ## Given - let manager = newRateLimitManager[string]( - mockSender, - capacity = 10, - duration = chronos.milliseconds(100), - sleepDuration = chronos.milliseconds(50), - ) - - ## When - start the manager - let startFut = manager.start() - - # Give it some time to start - await sleepAsync(chronos.milliseconds(20)) - - ## Then - manager should be running (check via string representation) - check: - "running: true" in $manager - not startFut.finished() # should still be running - - ## When - stop the manager - manager.stop() - - # Give it time to stop (should happen within one sleep cycle) - await sleepAsync(chronos.milliseconds(60)) - - ## Then - manager should be stopped - check: - "running: false" in $manager - startFut.finished() # should have completed - asyncTest "start and stop - drop large batch": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 2, duration = chronos.milliseconds(100), @@ -109,17 +75,19 @@ suite "Queue RateLimitManager": asyncTest "enqueue - enqueue critical only when exceeded": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100), sleepDuration = chronos.milliseconds(100), ) var now = Moment.now() - let startFut = manager.start( + await manager.start( nowProvider = proc(): Moment = now ) + defer: + await manager.stop() let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now) let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now) @@ -153,27 +121,21 @@ suite "Queue RateLimitManager": normal.len == 0 critical[0].msgId == "msg11" - # Give it time to stop - manager.stop() - await sleepAsync(chronos.milliseconds(150)) - ## Then - should be stopped - check: - "running: false" in $manager - startFut.finished() - asyncTest "enqueue - enqueue normal on 70% capacity": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(100), sleepDuration = chronos.milliseconds(100), ) var now = Moment.now() - let startFut = manager.start( + await manager.start( nowProvider = proc(): Moment = now ) + defer: + await manager.stop() let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now) let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now) @@ -210,17 +172,9 @@ suite "Queue RateLimitManager": normal[1].msgId == "msg9" normal[2].msgId == "msg10" - # Give it time to stop - manager.stop() - await sleepAsync(chronos.milliseconds(150)) - ## Then - should be stopped - check: - "running: false" in $manager - startFut.finished() - asyncTest "enqueue - process queued messages": ## Given - let manager = newRateLimitManager[string]( + let manager = RateLimitManager[string].new( mockSender, capacity = 10, duration = chronos.milliseconds(200), @@ -231,10 +185,12 @@ suite "Queue RateLimitManager": var now = Moment.now() var nowRef = MomentRef(value: now) - let startFut = manager.start( + await manager.start( nowProvider = proc(): Moment = return nowRef.value ) + defer: + await manager.stop() let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now) let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now) @@ -301,11 +257,3 @@ suite "Queue RateLimitManager": sentMessages[11].msgId == "8" sentMessages[12].msgId == "9" sentMessages[13].msgId == "10" - - # Give it time to stop - manager.stop() - await sleepAsync(chronos.milliseconds(250)) - ## Then - should be stopped - check: - "running: false" in $manager - startFut.finished() From 408ee4bd3ff9d08f20e10f6a006f85da4eccde39 Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 16 Jul 2025 19:06:10 +0300 Subject: [PATCH 07/10] fix: pure --- ratelimit/rate_limit_manager.nim | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/ratelimit/rate_limit_manager.nim b/ratelimit/rate_limit_manager.nim index 80c2362..5aaacc5 100644 --- a/ratelimit/rate_limit_manager.nim +++ b/ratelimit/rate_limit_manager.nim @@ -8,16 +8,16 @@ type AlmostNone None - SendResult* = enum - PassedToSender = 0 - Enqueued = 1 - Dropped = 2 - DroppedBatchTooLarge = 3 + SendResult* {.pure.} = enum + PassedToSender + Enqueued + Dropped + DroppedBatchTooLarge - Priority* = enum - Critical = 0 - Normal = 1 - Optional = 2 + Priority* {.pure.} = enum + Critical + Normal + Optional Serializable* = concept x From 8c6d5ab16fc18974d7fb11974371b13a491a0f97 Mon Sep 17 00:00:00 2001 From: Pablo Lopez Date: Wed, 16 Jul 2025 20:07:37 +0300 Subject: [PATCH 08/10] Apply suggestion from @Ivansete-status Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- ratelimit/rate_limit_manager.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratelimit/rate_limit_manager.nim b/ratelimit/rate_limit_manager.nim index 5aaacc5..d7e6997 100644 --- a/ratelimit/rate_limit_manager.nim +++ b/ratelimit/rate_limit_manager.nim @@ -65,7 +65,7 @@ proc getCapacityState[T: Serializable]( proc passToSender[T: Serializable]( manager: RateLimitManager[T], - msgs: seq[MsgIdMsg[T]], + msgs: sink seq[MsgIdMsg[T]], now: Moment, priority: Priority, ): Future[SendResult] {.async.} = From 5ee473459064232ad0486a5171398bd35b014578 Mon Sep 17 00:00:00 2001 From: Pablo Lopez Date: Wed, 16 Jul 2025 20:09:13 +0300 Subject: [PATCH 09/10] Apply suggestions from code review Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- ratelimit/rate_limit_manager.nim | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ratelimit/rate_limit_manager.nim b/ratelimit/rate_limit_manager.nim index d7e6997..aceca08 100644 --- a/ratelimit/rate_limit_manager.nim +++ b/ratelimit/rate_limit_manager.nim @@ -86,7 +86,7 @@ proc passToSender[T: Serializable]( proc processCriticalQueue[T: Serializable]( manager: RateLimitManager[T], now: Moment -): Future[void] {.async.} = +) {.async.} = while manager.queueCritical.len > 0: let msgs = manager.queueCritical.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) @@ -101,7 +101,7 @@ proc processCriticalQueue[T: Serializable]( proc processNormalQueue[T: Serializable]( manager: RateLimitManager[T], now: Moment -): Future[void] {.async.} = +) {.async.} = while manager.queueNormal.len > 0: let msgs = manager.queueNormal.popFirst() let capacityState = manager.getCapacityState(now, msgs.len) @@ -161,7 +161,7 @@ proc getEnqueued*[T: Serializable]( return (criticalMsgs, normalMsgs) -proc startQueueHandleLoop*[T: Serializable]( +proc queueHandleLoop[T: Serializable]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), From dab7f970dd6e629bb9e473363c1452db2522ff0a Mon Sep 17 00:00:00 2001 From: pablo Date: Wed, 16 Jul 2025 20:12:04 +0300 Subject: [PATCH 10/10] fix: merge --- ratelimit/rate_limit_manager.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ratelimit/rate_limit_manager.nim b/ratelimit/rate_limit_manager.nim index aceca08..c8f51f9 100644 --- a/ratelimit/rate_limit_manager.nim +++ b/ratelimit/rate_limit_manager.nim @@ -182,7 +182,7 @@ proc start*[T: Serializable]( nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), ) {.async.} = - manager.pxQueueHandleLoop = manager.startQueueHandleLoop(nowProvider) + manager.pxQueueHandleLoop = manager.queueHandleLoop(nowProvider) proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} = if not isNil(manager.pxQueueHandleLoop):