From 527bdde50aaf34c545ddfeb64770a44bfb3b486e Mon Sep 17 00:00:00 2001
From: pablo
Date: Mon, 23 Jun 2025 14:48:59 +0300
Subject: [PATCH] feat: rate limit (wip: no storage)
---
.gitignore | 7 +-
chat_sdk.nimble | 7 +-
config.nims | 4 +
ratelimit/ratelimit.nim | 183 ++++++++++++++++++++++++++++++
tests/test_ratelimit.nim | 233 +++++++++++++++++++++++++++++++++++++++
5 files changed, 431 insertions(+), 3 deletions(-)
create mode 100644 config.nims
create mode 100644 ratelimit/ratelimit.nim
create mode 100644 tests/test_ratelimit.nim
diff --git a/.gitignore b/.gitignore
index dbf16ae..d71d057 100644
--- a/.gitignore
+++ b/.gitignore
@@ -1,3 +1,6 @@
+# IDE
+.vscode/
+
# Nim
nimcache/
*.exe
@@ -10,4 +13,6 @@ nimcache/
*.a
# OS
-.DS_Store
\ No newline at end of file
+.DS_Store
+nimble.develop
+nimble.paths
diff --git a/chat_sdk.nimble b/chat_sdk.nimble
index a64bb4f..186ec7b 100644
--- a/chat_sdk.nimble
+++ b/chat_sdk.nimble
@@ -7,9 +7,12 @@ license = "MIT"
srcDir = "src"
-# Dependencies
-requires "nim >= 2.0.0"
+### Dependencies
+requires "nim >= 2.2.4",
+ "chronicles",
+ "chronos",
+ "db_connector"
task buildSharedLib, "Build shared library for C bindings":
exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
diff --git a/config.nims b/config.nims
new file mode 100644
index 0000000..8ee48d2
--- /dev/null
+++ b/config.nims
@@ -0,0 +1,4 @@
+# begin Nimble config (version 2)
+when withDir(thisDir(), system.fileExists("nimble.paths")):
+ include "nimble.paths"
+# end Nimble config
diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim
new file mode 100644
index 0000000..e2482e4
--- /dev/null
+++ b/ratelimit/ratelimit.nim
@@ -0,0 +1,183 @@
+import std/[times, deques]
+import chronos
+
+type
+ MessagePriority* = enum
+ Optional = 0
+ Normal = 1
+ Critical = 2
+
+ QueuedMessage*[T] = object
+ messageId*: string
+ msg*: T
+ priority*: MessagePriority
+ queuedAt*: float
+
+ MessageSender*[T] = proc(messageId: string, msg: T): Future[bool] {.async.}
+
+ RateLimitManager*[T] = ref object
+ messageCount*: int = 100 # Default to 100 messages
+ epochDurationSec*: int = 600 # Default to 10 minutes
+ currentCount*: int
+ currentEpoch*: int64
+ criticalQueue*: Deque[QueuedMessage[T]]
+ normalQueue*: Deque[QueuedMessage[T]]
+ optionalQueue*: Deque[QueuedMessage[T]]
+ sender*: MessageSender[T]
+ isRunning*: bool
+ sendTask*: Future[void]
+
+proc getCurrentEpoch(epochDurationSec: int): int64 =
+ int64(epochTime() / float(epochDurationSec))
+
+proc newRateLimitManager*[T](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] =
+ RateLimitManager[T](
+ messageCount: messageCount,
+ epochDurationSec: epochDurationSec,
+ currentCount: 0,
+ currentEpoch: getCurrentEpoch(epochDurationSec),
+ criticalQueue: initDeque[QueuedMessage[T]](),
+ normalQueue: initDeque[QueuedMessage[T]](),
+ optionalQueue: initDeque[QueuedMessage[T]](),
+ sender: sender,
+ isRunning: false
+ )
+
+proc updateEpochIfNeeded[T](manager: RateLimitManager[T]) =
+ let newEpoch = getCurrentEpoch(manager.epochDurationSec)
+ if newEpoch > manager.currentEpoch:
+ manager.currentEpoch = newEpoch
+ manager.currentCount = 0
+
+proc getUsagePercent[T](manager: RateLimitManager[T]): float =
+ if manager.messageCount == 0:
+ return 1.0
+ float(manager.currentCount) / float(manager.messageCount)
+
+proc queueForSend*[T](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) =
+ manager.updateEpochIfNeeded()
+
+ let queuedMsg = QueuedMessage[T](
+ messageId: messageId,
+ msg: msg,
+ priority: priority,
+ queuedAt: epochTime()
+ )
+
+ let usagePercent = manager.getUsagePercent()
+
+ if usagePercent >= 1.0:
+ # Quota exhausted - queue critical on top, queue normal, drop optional
+ case priority:
+ of Critical:
+ manager.criticalQueue.addLast(queuedMsg)
+ of Normal:
+ manager.normalQueue.addLast(queuedMsg)
+ of Optional:
+ discard # Drop optional messages when quota exhausted
+ elif usagePercent >= 0.7:
+ # Low quota - send critical, queue normal, drop optional
+ case priority:
+ of Critical:
+ manager.criticalQueue.addLast(queuedMsg)
+ of Normal:
+ manager.normalQueue.addLast(queuedMsg)
+ of Optional:
+ discard # Drop optional messages when quota low
+ else:
+ # Normal operation - queue all messages
+ case priority:
+ of Critical:
+ manager.criticalQueue.addLast(queuedMsg)
+ of Normal:
+ manager.normalQueue.addLast(queuedMsg)
+ of Optional:
+ manager.optionalQueue.addLast(queuedMsg)
+
+proc getNextMessage[T](manager: RateLimitManager[T]): QueuedMessage[T] =
+ # Priority order: Critical -> Normal -> Optional
+ if manager.criticalQueue.len > 0:
+ return manager.criticalQueue.popFirst()
+ elif manager.normalQueue.len > 0:
+ return manager.normalQueue.popFirst()
+ elif manager.optionalQueue.len > 0:
+ return manager.optionalQueue.popFirst()
+ else:
+ raise newException(ValueError, "No messages in queue")
+
+proc hasMessages[T](manager: RateLimitManager[T]): bool =
+ manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0
+
+proc sendLoop*[T](manager: RateLimitManager[T]): Future[void] {.async.} =
+ manager.isRunning = true
+
+ while manager.isRunning:
+ try:
+ manager.updateEpochIfNeeded()
+
+ while manager.hasMessages() and manager.currentCount < manager.messageCount:
+ let msg = manager.getNextMessage()
+
+ try:
+ let sent = await manager.sender(msg.messageId, msg.msg)
+ if sent:
+ manager.currentCount += 1
+ else:
+ # Re-queue failed message at beginning of appropriate queue
+ case msg.priority:
+ of Critical:
+ manager.criticalQueue.addFirst(msg)
+ of Normal:
+ manager.normalQueue.addFirst(msg)
+ of Optional:
+ manager.optionalQueue.addFirst(msg)
+ break # Stop trying to send more messages if one fails
+ except:
+ # Re-queue on exception
+ case msg.priority:
+ of Critical:
+ manager.criticalQueue.addFirst(msg)
+ of Normal:
+ manager.normalQueue.addFirst(msg)
+ of Optional:
+ manager.optionalQueue.addFirst(msg)
+ break
+
+ # Small delay between messages TODO not sure if needed
+ await sleepAsync(chronos.milliseconds(10))
+
+ # Wait before next processing cycle TODO not completely sure if this is the way
+ await sleepAsync(chronos.milliseconds(100))
+ except CancelledError:
+ break
+ except:
+ await sleepAsync(chronos.seconds(1)) # Wait longer on error
+
+proc start*[T](manager: RateLimitManager[T]): Future[void] {.async.} =
+ if not manager.isRunning:
+ manager.sendTask = manager.sendLoop()
+
+proc stop*[T](manager: RateLimitManager[T]): Future[void] {.async.} =
+ if manager.isRunning:
+ manager.isRunning = false
+ if not manager.sendTask.isNil:
+ manager.sendTask.cancelSoon()
+ try:
+ await manager.sendTask
+ except CancelledError:
+ discard
+
+proc getQueueStatus*[T](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] =
+ (
+ critical: manager.criticalQueue.len,
+ normal: manager.normalQueue.len,
+ optional: manager.optionalQueue.len,
+ total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len
+ )
+
+proc getCurrentQuota*[T](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] =
+ (
+ total: manager.messageCount,
+ used: manager.currentCount,
+ remaining: max(0, manager.messageCount - manager.currentCount)
+ )
\ No newline at end of file
diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim
new file mode 100644
index 0000000..0598b3f
--- /dev/null
+++ b/tests/test_ratelimit.nim
@@ -0,0 +1,233 @@
+{.used.}
+
+import std/[sequtils, times, random], testutils/unittests
+import ../ratelimit/ratelimit
+import chronos
+
+randomize()
+
+# Test message types
+type
+ TestMessage = object
+ content: string
+ id: int
+
+suite "Rate Limit Manager":
+ setup:
+ ## Given
+ let epochDuration = 60
+ var sentMessages: seq[string]
+
+ proc testSender(messageId: string, msg: string): Future[bool] {.async.} =
+ sentMessages.add(msg)
+ await sleepAsync(chronos.milliseconds(10))
+ return true
+
+ asyncTest "basic message queueing and sending":
+ ## Given
+ let capacity = 10
+ var manager = newRateLimitManager(capacity, epochDuration, testSender)
+ await manager.start()
+
+ ## When
+ manager.queueForSend("msg1", "Hello", Critical)
+ manager.queueForSend("msg2", "World", Normal)
+
+ await sleepAsync(chronos.milliseconds(200))
+ await manager.stop()
+
+ ## Then
+ check:
+ sentMessages.len == 2
+ sentMessages[0] == "Hello"
+ sentMessages[1] == "World"
+
+ asyncTest "priority ordering - critical first, then normal, then optional":
+ ## Given
+ let capacity = 10
+ var manager = newRateLimitManager(capacity, epochDuration, testSender)
+ await manager.start()
+
+ ## When - queue messages in mixed priority order
+ manager.queueForSend("normal1", "Normal1", Normal)
+ manager.queueForSend("critical1", "Critical1", Critical)
+ manager.queueForSend("optional1", "Optional1", Optional)
+ manager.queueForSend("critical2", "Critical2", Critical)
+ manager.queueForSend("normal2", "Normal2", Normal)
+
+ await sleepAsync(chronos.milliseconds(300))
+ await manager.stop()
+
+ ## Then - critical messages should be sent first
+ check:
+ sentMessages.len == 5
+ sentMessages[0] == "Critical1" # First critical
+ sentMessages[1] == "Critical2" # Second critical
+ sentMessages[2] == "Normal1" # Then normal messages
+ sentMessages[3] == "Normal2"
+ sentMessages[4] == "Optional1" # Finally optional
+
+ asyncTest "rate limiting - respects message quota per epoch":
+ ## Given
+ let capacity = 3 # Only 3 messages allowed
+ var manager = newRateLimitManager(capacity, epochDuration, testSender)
+ await manager.start()
+
+ ## When - queue more messages than the limit
+ for i in 1..5:
+ manager.queueForSend("msg" & $i, "Message" & $i, Normal)
+
+ await sleepAsync(chronos.milliseconds(300))
+
+ ## Then
+ let quota = manager.getCurrentQuota()
+ let queueStatus = manager.getQueueStatus()
+
+ check:
+ quota.used == 3
+ quota.remaining == 0
+ sentMessages.len == 3
+ queueStatus.total == 2 # 2 messages should be queued
+
+ await manager.stop()
+
+ asyncTest "optional message dropping at high usage":
+ ## Given
+ let capacity = 10
+ var manager = newRateLimitManager(capacity, epochDuration, testSender)
+ await manager.start()
+
+ # Fill to 80% capacity to trigger optional dropping
+ for i in 1..8:
+ manager.queueForSend("fill" & $i, "Fill" & $i, Normal)
+
+ await sleepAsync(chronos.milliseconds(200))
+
+ ## When - add messages at high usage
+ manager.queueForSend("critical", "Critical", Critical)
+ manager.queueForSend("normal", "Normal", Normal)
+ manager.queueForSend("optional", "Optional", Optional) # Should be dropped
+
+ await sleepAsync(chronos.milliseconds(200))
+
+ ## Then
+ let quota = manager.getCurrentQuota()
+ let optionalSent = sentMessages.anyIt(it == "Optional")
+
+ check:
+ quota.used == 10
+ not optionalSent # Optional message should not be sent
+
+ await manager.stop()
+
+ asyncTest "quota tracking - accurate total, used, and remaining counts":
+ ## Given
+ let capacity = 5
+ var manager = newRateLimitManager(capacity, epochDuration, testSender)
+ await manager.start()
+
+ ## When - initially no messages sent
+ let initialQuota = manager.getCurrentQuota()
+
+ ## Then - should show full quota available
+ check:
+ initialQuota.total == 5
+ initialQuota.used == 0
+ initialQuota.remaining == 5
+
+ ## When - send some messages
+ manager.queueForSend("msg1", "Message1", Normal)
+ manager.queueForSend("msg2", "Message2", Normal)
+
+ await sleepAsync(chronos.milliseconds(200))
+
+ ## Then - quota should be updated
+ let afterQuota = manager.getCurrentQuota()
+ check:
+ afterQuota.used == 2
+ afterQuota.remaining == 3
+
+ await manager.stop()
+
+ asyncTest "queue status tracking - by priority levels":
+ ## Given
+ let capacity = 2 # Small limit to force queueing
+ var manager = newRateLimitManager(capacity, epochDuration, testSender)
+ await manager.start()
+
+ ## When - fill quota and add more messages
+ manager.queueForSend("msg1", "Message1", Critical)
+ manager.queueForSend("msg2", "Message2", Normal)
+ manager.queueForSend("msg3", "Message3", Critical) # Should be queued
+ manager.queueForSend("msg4", "Message4", Normal) # Should be queued
+ manager.queueForSend("msg5", "Message5", Optional) # Should be queued
+
+ await sleepAsync(chronos.milliseconds(100))
+
+ ## Then
+ let queueStatus = manager.getQueueStatus()
+
+ check:
+ queueStatus.total >= 2 # At least some messages queued
+ queueStatus.critical >= 1 # Critical messages in queue
+ queueStatus.normal >= 1 # Normal messages in queue
+
+ await manager.stop()
+
+suite "Generic Type Support":
+ setup:
+ ## Given
+ let epochDuration = 60
+ var sentCustomMessages: seq[TestMessage]
+
+ proc testCustomSender(messageId: string, msg: TestMessage): Future[bool] {.async.} =
+ sentCustomMessages.add(msg)
+ await sleepAsync(chronos.milliseconds(10))
+ return true
+
+ asyncTest "custom message types - TestMessage objects":
+ ## Given
+ let capacity = 5
+ var manager = newRateLimitManager(capacity, epochDuration, testCustomSender)
+ await manager.start()
+
+ ## When
+ let testMsg = TestMessage(content: "Test content", id: 42)
+ manager.queueForSend("custom1", testMsg, Normal)
+
+ await sleepAsync(chronos.milliseconds(200))
+ await manager.stop()
+
+ ## Then
+ check:
+ sentCustomMessages.len == 1
+ sentCustomMessages[0].content == "Test content"
+ sentCustomMessages[0].id == 42
+
+ asyncTest "integer message types":
+ ## Given
+ let capacity = 5
+ var sentInts: seq[int]
+
+ proc testIntSender(messageId: string, msg: int): Future[bool] {.async.} =
+ sentInts.add(msg)
+ await sleepAsync(chronos.milliseconds(10))
+ return true
+
+ var manager = newRateLimitManager(capacity, epochDuration, testIntSender)
+ await manager.start()
+
+ ## When
+ manager.queueForSend("int1", 42, Critical)
+ manager.queueForSend("int2", 100, Normal)
+ manager.queueForSend("int3", 999, Optional)
+
+ await sleepAsync(chronos.milliseconds(200))
+ await manager.stop()
+
+ ## Then
+ check:
+ sentInts.len == 3
+ sentInts[0] == 42 # Critical sent first
+ sentInts[1] == 100 # Normal sent second
+ sentInts[2] == 999 # Optional sent last
\ No newline at end of file