diff --git a/.gitignore b/.gitignore index 5aed900..3a79a35 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# IDE +.vscode/ + # Nim nimcache/ *.exe @@ -19,4 +22,6 @@ nimcache/ chat_sdk/* !*.nim apps/* -!*.nim \ No newline at end of file +!*.nim +nimble.develop +nimble.paths diff --git a/chat_sdk.nimble b/chat_sdk.nimble index e52efb9..b7d1da0 100644 --- a/chat_sdk.nimble +++ b/chat_sdk.nimble @@ -1,21 +1,19 @@ # Package -version = "0.0.1" -author = "Waku Chat Team" -description = "Chat features over Waku" -license = "MIT" -srcDir = "src" +version = "0.0.1" +author = "Waku Chat Team" +description = "Chat features over Waku" +license = "MIT" +srcDir = "src" - -# Dependencies - -requires "nim >= 2.0.0" +### Dependencies +requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "waku" task buildSharedLib, "Build shared library for C bindings": - exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" + exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" task buildStaticLib, "Build static library for C bindings": - exec "nim c --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim" + exec "nim c --mm:refc --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim" task migrate, "Run database migrations": - exec "nim c -r apps/run_migration.nim" \ No newline at end of file + exec "nim c -r apps/run_migration.nim" diff --git a/config.nims b/config.nims new file mode 100644 index 0000000..8ee48d2 --- /dev/null +++ b/config.nims @@ -0,0 +1,4 @@ +# begin Nimble config (version 2) +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" +# end Nimble config diff --git a/ratelimit/rate_limit_manager.nim b/ratelimit/rate_limit_manager.nim new file mode 100644 index 0000000..c8f51f9 --- /dev/null +++ b/ratelimit/rate_limit_manager.nim @@ -0,0 +1,196 @@ +import std/[times, deques, options] +import waku/common/rate_limit/token_bucket +import chronos + +type + CapacityState {.pure.} = enum + Normal + AlmostNone + None + + SendResult* {.pure.} = enum + PassedToSender + Enqueued + Dropped + DroppedBatchTooLarge + + Priority* {.pure.} = enum + Critical + Normal + Optional + + Serializable* = + concept x + x.toBytes() is seq[byte] + + MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T] + + MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.} + + RateLimitManager*[T: Serializable] = ref object + bucket: TokenBucket + sender: MessageSender[T] + queueCritical: Deque[seq[MsgIdMsg[T]]] + queueNormal: Deque[seq[MsgIdMsg[T]]] + sleepDuration: chronos.Duration + pxQueueHandleLoop: Future[void] + +proc new*[T: Serializable]( + M: type[RateLimitManager[T]], + sender: MessageSender[T], + capacity: int = 100, + duration: chronos.Duration = chronos.minutes(10), + sleepDuration: chronos.Duration = chronos.milliseconds(1000), +): M = + M( + bucket: TokenBucket.newStrict(capacity, duration), + sender: sender, + queueCritical: Deque[seq[MsgIdMsg[T]]](), + queueNormal: Deque[seq[MsgIdMsg[T]]](), + sleepDuration: sleepDuration, + ) + +proc getCapacityState[T: Serializable]( + manager: RateLimitManager[T], now: Moment, count: int = 1 +): CapacityState = + let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now) + let countAfter = budget - count + let ratio = countAfter.float / budgetCap.float + if ratio < 0.0: + return CapacityState.None + elif ratio < 0.3: + return CapacityState.AlmostNone + else: + return CapacityState.Normal + +proc passToSender[T: Serializable]( + manager: RateLimitManager[T], + msgs: sink seq[MsgIdMsg[T]], + now: Moment, + priority: Priority, +): Future[SendResult] {.async.} = + let count = msgs.len + let capacity = manager.bucket.tryConsume(count, now) + if not capacity: + case priority + of Priority.Critical: + manager.queueCritical.addLast(msgs) + return SendResult.Enqueued + of Priority.Normal: + manager.queueNormal.addLast(msgs) + return SendResult.Enqueued + of Priority.Optional: + return SendResult.Dropped + await manager.sender(msgs) + return SendResult.PassedToSender + +proc processCriticalQueue[T: Serializable]( + manager: RateLimitManager[T], now: Moment +) {.async.} = + while manager.queueCritical.len > 0: + let msgs = manager.queueCritical.popFirst() + let capacityState = manager.getCapacityState(now, msgs.len) + if capacityState == CapacityState.Normal: + discard await manager.passToSender(msgs, now, Priority.Critical) + elif capacityState == CapacityState.AlmostNone: + discard await manager.passToSender(msgs, now, Priority.Critical) + else: + # add back to critical queue + manager.queueCritical.addFirst(msgs) + break + +proc processNormalQueue[T: Serializable]( + manager: RateLimitManager[T], now: Moment +) {.async.} = + while manager.queueNormal.len > 0: + let msgs = manager.queueNormal.popFirst() + let capacityState = manager.getCapacityState(now, msgs.len) + if capacityState == CapacityState.Normal: + discard await manager.passToSender(msgs, now, Priority.Normal) + else: + # add back to critical queue + manager.queueNormal.addFirst(msgs) + break + +proc sendOrEnqueue*[T: Serializable]( + manager: RateLimitManager[T], + msgs: seq[MsgIdMsg[T]], + priority: Priority, + now: Moment = Moment.now(), +): Future[SendResult] {.async.} = + let (_, budgetCap) = manager.bucket.getAvailableCapacity(now) + if msgs.len.float / budgetCap.float >= 0.3: + # drop batch if it's too large to avoid starvation + return SendResult.DroppedBatchTooLarge + + let capacityState = manager.getCapacityState(now, msgs.len) + case capacityState + of CapacityState.Normal: + return await manager.passToSender(msgs, now, priority) + of CapacityState.AlmostNone: + case priority + of Priority.Critical: + return await manager.passToSender(msgs, now, priority) + of Priority.Normal: + manager.queueNormal.addLast(msgs) + return SendResult.Enqueued + of Priority.Optional: + return SendResult.Dropped + of CapacityState.None: + case priority + of Priority.Critical: + manager.queueCritical.addLast(msgs) + return SendResult.Enqueued + of Priority.Normal: + manager.queueNormal.addLast(msgs) + return SendResult.Enqueued + of Priority.Optional: + return SendResult.Dropped + +proc getEnqueued*[T: Serializable]( + manager: RateLimitManager[T] +): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] = + var criticalMsgs: seq[MsgIdMsg[T]] + var normalMsgs: seq[MsgIdMsg[T]] + + for batch in manager.queueCritical: + criticalMsgs.add(batch) + + for batch in manager.queueNormal: + normalMsgs.add(batch) + + return (criticalMsgs, normalMsgs) + +proc queueHandleLoop[T: Serializable]( + manager: RateLimitManager[T], + nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = + Moment.now(), +) {.async.} = + while true: + try: + let now = nowProvider() + await manager.processCriticalQueue(now) + await manager.processNormalQueue(now) + except Exception as e: + echo "Error in queue processing: ", e.msg + + # configurable sleep duration for processing queued messages + await sleepAsync(manager.sleepDuration) + +proc start*[T: Serializable]( + manager: RateLimitManager[T], + nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = + Moment.now(), +) {.async.} = + manager.pxQueueHandleLoop = manager.queueHandleLoop(nowProvider) + +proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} = + if not isNil(manager.pxQueueHandleLoop): + await manager.pxQueueHandleLoop.cancelAndWait() + +func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = + if isNil(b): + return "nil" + return + "RateLimitManager(critical: " & $b.queueCritical.len & ", normal: " & + $b.queueNormal.len & ")" diff --git a/tests/test_rate_limit_manager.nim b/tests/test_rate_limit_manager.nim new file mode 100644 index 0000000..6c34c54 --- /dev/null +++ b/tests/test_rate_limit_manager.nim @@ -0,0 +1,259 @@ +import testutils/unittests +import ../ratelimit/rate_limit_manager +import chronos + +# Implement the Serializable concept for string +proc toBytes*(s: string): seq[byte] = + cast[seq[byte]](s) + +suite "Queue RateLimitManager": + setup: + var sentMessages: seq[tuple[msgId: string, msg: string]] + var senderCallCount: int = 0 + + # Create a mock sender + proc mockSender( + msgs: seq[tuple[msgId: string, msg: string]] + ): Future[void] {.async.} = + senderCallCount.inc() + for msg in msgs: + sentMessages.add(msg) + await sleepAsync(chronos.milliseconds(10)) + + asyncTest "sendOrEnqueue - immediate send when capacity available": + ## Given + let manager = RateLimitManager[string].new( + mockSender, capacity = 10, duration = chronos.milliseconds(100) + ) + let testMsg = "Hello World" + + ## When + let res = await manager.sendOrEnqueue(@[("msg1", testMsg)], Critical) + + ## Then + check: + res == PassedToSender + senderCallCount == 1 + sentMessages.len == 1 + sentMessages[0].msgId == "msg1" + sentMessages[0].msg == "Hello World" + + asyncTest "sendOrEnqueue - multiple messages": + ## Given + let manager = RateLimitManager[string].new( + mockSender, capacity = 10, duration = chronos.milliseconds(100) + ) + + ## When + let res = + await manager.sendOrEnqueue(@[("msg1", "First"), ("msg2", "Second")], Normal) + + ## Then + check: + res == PassedToSender + senderCallCount == 1 + sentMessages.len == 2 + sentMessages[0].msgId == "msg1" + sentMessages[0].msg == "First" + sentMessages[1].msgId == "msg2" + sentMessages[1].msg == "Second" + + asyncTest "start and stop - drop large batch": + ## Given + let manager = RateLimitManager[string].new( + mockSender, + capacity = 2, + duration = chronos.milliseconds(100), + sleepDuration = chronos.milliseconds(30), + ) + + let largeBatch1 = @[("msg1", "First"), ("msg2", "Second"), ("msg3", "Third")] + let largeBatch2 = @[("msg4", "Fourth"), ("msg5", "Fifth")] + + discard await manager.sendOrEnqueue(largeBatch1, Normal) + discard await manager.sendOrEnqueue(largeBatch2, Critical) + + asyncTest "enqueue - enqueue critical only when exceeded": + ## Given + let manager = RateLimitManager[string].new( + mockSender, + capacity = 10, + duration = chronos.milliseconds(100), + sleepDuration = chronos.milliseconds(100), + ) + var now = Moment.now() + await manager.start( + nowProvider = proc(): Moment = + now + ) + defer: + await manager.stop() + + let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now) + let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now) + let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Critical, now) + let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Critical, now) + let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Critical, now) + let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Critical, now) + let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Critical, now) + let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Critical, now) + let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Critical, now) + let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Critical, now) + # will enqueue to critical queue + let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now) + + check: + r1 == PassedToSender + r2 == PassedToSender + r3 == PassedToSender + r4 == PassedToSender + r5 == PassedToSender + r6 == PassedToSender + r7 == PassedToSender + r8 == PassedToSender + r9 == PassedToSender + r10 == PassedToSender + r11 == Enqueued + + let (critical, normal) = manager.getEnqueued() + check: + critical.len == 1 + normal.len == 0 + critical[0].msgId == "msg11" + + asyncTest "enqueue - enqueue normal on 70% capacity": + ## Given + let manager = RateLimitManager[string].new( + mockSender, + capacity = 10, + duration = chronos.milliseconds(100), + sleepDuration = chronos.milliseconds(100), + ) + var now = Moment.now() + await manager.start( + nowProvider = proc(): Moment = + now + ) + defer: + await manager.stop() + + let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now) + let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now) + let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Normal, now) + let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Normal, now) + let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Normal, now) + let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Normal, now) + let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Normal, now) + let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Normal, now) + let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Normal, now) + let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Normal, now) + let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now) + let r12 = await manager.sendOrEnqueue(@[("msg12", "12")], Optional, now) + + check: + r1 == PassedToSender + r2 == PassedToSender + r3 == PassedToSender + r4 == PassedToSender + r5 == PassedToSender + r6 == PassedToSender + r7 == PassedToSender + r8 == Enqueued + r9 == Enqueued + r10 == Enqueued + r11 == PassedToSender + r12 == Dropped + + let (critical, normal) = manager.getEnqueued() + check: + critical.len == 0 + normal.len == 3 + normal[0].msgId == "msg8" + normal[1].msgId == "msg9" + normal[2].msgId == "msg10" + + asyncTest "enqueue - process queued messages": + ## Given + let manager = RateLimitManager[string].new( + mockSender, + capacity = 10, + duration = chronos.milliseconds(200), + sleepDuration = chronos.milliseconds(200), + ) + type MomentRef = ref object + value: Moment + + var now = Moment.now() + var nowRef = MomentRef(value: now) + await manager.start( + nowProvider = proc(): Moment = + return nowRef.value + ) + defer: + await manager.stop() + + let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now) + let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now) + let r3 = await manager.sendOrEnqueue(@[("3", "val_3")], Normal, now) + let r4 = await manager.sendOrEnqueue(@[("4", "val_4")], Normal, now) + let r5 = await manager.sendOrEnqueue(@[("5", "val_5")], Normal, now) + let r6 = await manager.sendOrEnqueue(@[("6", "val_6")], Normal, now) + let r7 = await manager.sendOrEnqueue(@[("7", "val_7")], Normal, now) + let r8 = await manager.sendOrEnqueue(@[("8", "val_8")], Normal, now) + let r9 = await manager.sendOrEnqueue(@[("9", "val_9")], Normal, now) + let r10 = await manager.sendOrEnqueue(@[("10", "val_10")], Normal, now) + let r11 = await manager.sendOrEnqueue(@[("11", "val_11")], Critical, now) + let r12 = await manager.sendOrEnqueue(@[("12", "val_12")], Optional, now) + let r13 = await manager.sendOrEnqueue(@[("13", "val_13")], Critical, now) + let r14 = await manager.sendOrEnqueue(@[("14", "val_14")], Critical, now) + let r15 = await manager.sendOrEnqueue(@[("15", "val_15")], Critical, now) + + check: + r1 == PassedToSender + r2 == PassedToSender + r3 == PassedToSender + r4 == PassedToSender + r5 == PassedToSender + r6 == PassedToSender + r7 == PassedToSender + r8 == Enqueued + r9 == Enqueued + r10 == Enqueued + r11 == PassedToSender + r12 == Dropped + r13 == PassedToSender + r14 == PassedToSender + r15 == Enqueued + + var (critical, normal) = manager.getEnqueued() + check: + critical.len == 1 + normal.len == 3 + normal[0].msgId == "8" + normal[1].msgId == "9" + normal[2].msgId == "10" + critical[0].msgId == "15" + + nowRef.value = now + chronos.milliseconds(250) + await sleepAsync(chronos.milliseconds(250)) + + (critical, normal) = manager.getEnqueued() + check: + critical.len == 0 + normal.len == 0 + senderCallCount == 14 + sentMessages.len == 14 + sentMessages[0].msgId == "1" + sentMessages[1].msgId == "2" + sentMessages[2].msgId == "3" + sentMessages[3].msgId == "4" + sentMessages[4].msgId == "5" + sentMessages[5].msgId == "6" + sentMessages[6].msgId == "7" + sentMessages[7].msgId == "11" + sentMessages[8].msgId == "13" + sentMessages[9].msgId == "14" + sentMessages[10].msgId == "15" + sentMessages[11].msgId == "8" + sentMessages[12].msgId == "9" + sentMessages[13].msgId == "10"