diff --git a/.gitignore b/.gitignore index dbf16ae..d71d057 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# IDE +.vscode/ + # Nim nimcache/ *.exe @@ -10,4 +13,6 @@ nimcache/ *.a # OS -.DS_Store \ No newline at end of file +.DS_Store +nimble.develop +nimble.paths diff --git a/chat_sdk.nimble b/chat_sdk.nimble index a64bb4f..186ec7b 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -7,9 +7,12 @@ license = "MIT" srcDir = "src" -# Dependencies -requires "nim >= 2.0.0" +### Dependencies +requires "nim >= 2.2.4", + "chronicles", + "chronos", + "db_connector" task buildSharedLib, "Build shared library for C bindings": exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" diff --git a/config.nims b/config.nims new file mode 100644 index 0000000..8ee48d2 --- /dev/null +++ b/config.nims @@ -0,0 +1,4 @@ +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/ratelimit/ratelimit.nim b/ratelimit/ratelimit.nim new file mode 100644 index 0000000..e2482e4 --- /dev/null +++ b/ratelimit/ratelimit.nim @@ -0,0 +1,183 @@ +import std/[times, deques] +import chronos + +type + MessagePriority* = enum + Optional = 0 + Normal = 1 + Critical = 2 + + QueuedMessage*[T] = object + messageId*: string + msg*: T + priority*: MessagePriority + queuedAt*: float + + MessageSender*[T] = proc(messageId: string, msg: T): Future[bool] {.async.} + + RateLimitManager*[T] = ref object + messageCount*: int = 100 # Default to 100 messages + epochDurationSec*: int = 600 # Default to 10 minutes + currentCount*: int + currentEpoch*: int64 + criticalQueue*: Deque[QueuedMessage[T]] + normalQueue*: Deque[QueuedMessage[T]] + optionalQueue*: Deque[QueuedMessage[T]] + sender*: MessageSender[T] + isRunning*: bool + sendTask*: Future[void] + +proc getCurrentEpoch(epochDurationSec: int): int64 = + int64(epochTime() / float(epochDurationSec)) + +proc newRateLimitManager*[T](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] = + RateLimitManager[T]( + messageCount: messageCount, + epochDurationSec: epochDurationSec, + currentCount: 0, + currentEpoch: getCurrentEpoch(epochDurationSec), + criticalQueue: initDeque[QueuedMessage[T]](), + normalQueue: initDeque[QueuedMessage[T]](), + optionalQueue: initDeque[QueuedMessage[T]](), + sender: sender, + isRunning: false + ) + +proc updateEpochIfNeeded[T](manager: RateLimitManager[T]) = + let newEpoch = getCurrentEpoch(manager.epochDurationSec) + if newEpoch > manager.currentEpoch: + manager.currentEpoch = newEpoch + manager.currentCount = 0 + +proc getUsagePercent[T](manager: RateLimitManager[T]): float = + if manager.messageCount == 0: + return 1.0 + float(manager.currentCount) / float(manager.messageCount) + +proc queueForSend*[T](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) = + manager.updateEpochIfNeeded() + + let queuedMsg = QueuedMessage[T]( + messageId: messageId, + msg: msg, + priority: priority, + queuedAt: epochTime() + ) + + let usagePercent = manager.getUsagePercent() + + if usagePercent >= 1.0: + # Quota exhausted - queue critical on top, queue normal, drop optional + case priority: + of Critical: + manager.criticalQueue.addLast(queuedMsg) + of Normal: + manager.normalQueue.addLast(queuedMsg) + of Optional: + discard # Drop optional messages when quota exhausted + elif usagePercent >= 0.7: + # Low quota - send critical, queue normal, drop optional + case priority: + of Critical: + manager.criticalQueue.addLast(queuedMsg) + of Normal: + manager.normalQueue.addLast(queuedMsg) + of Optional: + discard # Drop optional messages when quota low + else: + # Normal operation - queue all messages + case priority: + of Critical: + manager.criticalQueue.addLast(queuedMsg) + of Normal: + manager.normalQueue.addLast(queuedMsg) + of Optional: + manager.optionalQueue.addLast(queuedMsg) + +proc getNextMessage[T](manager: RateLimitManager[T]): QueuedMessage[T] = + # Priority order: Critical -> Normal -> Optional + if manager.criticalQueue.len > 0: + return manager.criticalQueue.popFirst() + elif manager.normalQueue.len > 0: + return manager.normalQueue.popFirst() + elif manager.optionalQueue.len > 0: + return manager.optionalQueue.popFirst() + else: + raise newException(ValueError, "No messages in queue") + +proc hasMessages[T](manager: RateLimitManager[T]): bool = + manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0 + +proc sendLoop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = + manager.isRunning = true + + while manager.isRunning: + try: + manager.updateEpochIfNeeded() + + while manager.hasMessages() and manager.currentCount < manager.messageCount: + let msg = manager.getNextMessage() + + try: + let sent = await manager.sender(msg.messageId, msg.msg) + if sent: + manager.currentCount += 1 + else: + # Re-queue failed message at beginning of appropriate queue + case msg.priority: + of Critical: + manager.criticalQueue.addFirst(msg) + of Normal: + manager.normalQueue.addFirst(msg) + of Optional: + manager.optionalQueue.addFirst(msg) + break # Stop trying to send more messages if one fails + except: + # Re-queue on exception + case msg.priority: + of Critical: + manager.criticalQueue.addFirst(msg) + of Normal: + manager.normalQueue.addFirst(msg) + of Optional: + manager.optionalQueue.addFirst(msg) + break + + # Small delay between messages TODO not sure if needed + await sleepAsync(chronos.milliseconds(10)) + + # Wait before next processing cycle TODO not completely sure if this is the way + await sleepAsync(chronos.milliseconds(100)) + except CancelledError: + break + except: + await sleepAsync(chronos.seconds(1)) # Wait longer on error + +proc start*[T](manager: RateLimitManager[T]): Future[void] {.async.} = + if not manager.isRunning: + manager.sendTask = manager.sendLoop() + +proc stop*[T](manager: RateLimitManager[T]): Future[void] {.async.} = + if manager.isRunning: + manager.isRunning = false + if not manager.sendTask.isNil: + manager.sendTask.cancelSoon() + try: + await manager.sendTask + except CancelledError: + discard + +proc getQueueStatus*[T](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] = + ( + critical: manager.criticalQueue.len, + normal: manager.normalQueue.len, + optional: manager.optionalQueue.len, + total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len + ) + +proc getCurrentQuota*[T](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] = + ( + total: manager.messageCount, + used: manager.currentCount, + remaining: max(0, manager.messageCount - manager.currentCount) + ) \ No newline at end of file diff --git a/tests/test_ratelimit.nim b/tests/test_ratelimit.nim new file mode 100644 index 0000000..0598b3f --- /dev/null +++ b/tests/test_ratelimit.nim @@ -0,0 +1,233 @@ +{.used.} + +import std/[sequtils, times, random], testutils/unittests +import ../ratelimit/ratelimit +import chronos + +randomize() + +# Test message types +type + TestMessage = object + content: string + id: int + +suite "Rate Limit Manager": + setup: + ## Given + let epochDuration = 60 + var sentMessages: seq[string] + + proc testSender(messageId: string, msg: string): Future[bool] {.async.} = + sentMessages.add(msg) + await sleepAsync(chronos.milliseconds(10)) + return true + + asyncTest "basic message queueing and sending": + ## Given + let capacity = 10 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When + manager.queueForSend("msg1", "Hello", Critical) + manager.queueForSend("msg2", "World", Normal) + + await sleepAsync(chronos.milliseconds(200)) + await manager.stop() + + ## Then + check: + sentMessages.len == 2 + sentMessages[0] == "Hello" + sentMessages[1] == "World" + + asyncTest "priority ordering - critical first, then normal, then optional": + ## Given + let capacity = 10 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - queue messages in mixed priority order + manager.queueForSend("normal1", "Normal1", Normal) + manager.queueForSend("critical1", "Critical1", Critical) + manager.queueForSend("optional1", "Optional1", Optional) + manager.queueForSend("critical2", "Critical2", Critical) + manager.queueForSend("normal2", "Normal2", Normal) + + await sleepAsync(chronos.milliseconds(300)) + await manager.stop() + + ## Then - critical messages should be sent first + check: + sentMessages.len == 5 + sentMessages[0] == "Critical1" # First critical + sentMessages[1] == "Critical2" # Second critical + sentMessages[2] == "Normal1" # Then normal messages + sentMessages[3] == "Normal2" + sentMessages[4] == "Optional1" # Finally optional + + asyncTest "rate limiting - respects message quota per epoch": + ## Given + let capacity = 3 # Only 3 messages allowed + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - queue more messages than the limit + for i in 1..5: + manager.queueForSend("msg" & $i, "Message" & $i, Normal) + + await sleepAsync(chronos.milliseconds(300)) + + ## Then + let quota = manager.getCurrentQuota() + let queueStatus = manager.getQueueStatus() + + check: + quota.used == 3 + quota.remaining == 0 + sentMessages.len == 3 + queueStatus.total == 2 # 2 messages should be queued + + await manager.stop() + + asyncTest "optional message dropping at high usage": + ## Given + let capacity = 10 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + # Fill to 80% capacity to trigger optional dropping + for i in 1..8: + manager.queueForSend("fill" & $i, "Fill" & $i, Normal) + + await sleepAsync(chronos.milliseconds(200)) + + ## When - add messages at high usage + manager.queueForSend("critical", "Critical", Critical) + manager.queueForSend("normal", "Normal", Normal) + manager.queueForSend("optional", "Optional", Optional) # Should be dropped + + await sleepAsync(chronos.milliseconds(200)) + + ## Then + let quota = manager.getCurrentQuota() + let optionalSent = sentMessages.anyIt(it == "Optional") + + check: + quota.used == 10 + not optionalSent # Optional message should not be sent + + await manager.stop() + + asyncTest "quota tracking - accurate total, used, and remaining counts": + ## Given + let capacity = 5 + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - initially no messages sent + let initialQuota = manager.getCurrentQuota() + + ## Then - should show full quota available + check: + initialQuota.total == 5 + initialQuota.used == 0 + initialQuota.remaining == 5 + + ## When - send some messages + manager.queueForSend("msg1", "Message1", Normal) + manager.queueForSend("msg2", "Message2", Normal) + + await sleepAsync(chronos.milliseconds(200)) + + ## Then - quota should be updated + let afterQuota = manager.getCurrentQuota() + check: + afterQuota.used == 2 + afterQuota.remaining == 3 + + await manager.stop() + + asyncTest "queue status tracking - by priority levels": + ## Given + let capacity = 2 # Small limit to force queueing + var manager = newRateLimitManager(capacity, epochDuration, testSender) + await manager.start() + + ## When - fill quota and add more messages + manager.queueForSend("msg1", "Message1", Critical) + manager.queueForSend("msg2", "Message2", Normal) + manager.queueForSend("msg3", "Message3", Critical) # Should be queued + manager.queueForSend("msg4", "Message4", Normal) # Should be queued + manager.queueForSend("msg5", "Message5", Optional) # Should be queued + + await sleepAsync(chronos.milliseconds(100)) + + ## Then + let queueStatus = manager.getQueueStatus() + + check: + queueStatus.total >= 2 # At least some messages queued + queueStatus.critical >= 1 # Critical messages in queue + queueStatus.normal >= 1 # Normal messages in queue + + await manager.stop() + +suite "Generic Type Support": + setup: + ## Given + let epochDuration = 60 + var sentCustomMessages: seq[TestMessage] + + proc testCustomSender(messageId: string, msg: TestMessage): Future[bool] {.async.} = + sentCustomMessages.add(msg) + await sleepAsync(chronos.milliseconds(10)) + return true + + asyncTest "custom message types - TestMessage objects": + ## Given + let capacity = 5 + var manager = newRateLimitManager(capacity, epochDuration, testCustomSender) + await manager.start() + + ## When + let testMsg = TestMessage(content: "Test content", id: 42) + manager.queueForSend("custom1", testMsg, Normal) + + await sleepAsync(chronos.milliseconds(200)) + await manager.stop() + + ## Then + check: + sentCustomMessages.len == 1 + sentCustomMessages[0].content == "Test content" + sentCustomMessages[0].id == 42 + + asyncTest "integer message types": + ## Given + let capacity = 5 + var sentInts: seq[int] + + proc testIntSender(messageId: string, msg: int): Future[bool] {.async.} = + sentInts.add(msg) + await sleepAsync(chronos.milliseconds(10)) + return true + + var manager = newRateLimitManager(capacity, epochDuration, testIntSender) + await manager.start() + + ## When + manager.queueForSend("int1", 42, Critical) + manager.queueForSend("int2", 100, Normal) + manager.queueForSend("int3", 999, Optional) + + await sleepAsync(chronos.milliseconds(200)) + await manager.stop() + + ## Then + check: + sentInts.len == 3 + sentInts[0] == 42 # Critical sent first + sentInts[1] == 100 # Normal sent second + sentInts[2] == 999 # Optional sent last \ No newline at end of file