fix: usage of nwaku's token_bucket

This commit is contained in:
pablo 2025-07-06 13:35:33 +03:00
parent f04e6ae7aa
commit 1d8ad697c2
No known key found for this signature in database
GPG Key ID: 78F35FCC60FDC63A
3 changed files with 455 additions and 378 deletions

View File

@ -12,10 +12,11 @@ srcDir = "src"
requires "nim >= 2.2.4", requires "nim >= 2.2.4",
"chronicles", "chronicles",
"chronos", "chronos",
"db_connector" "db_connector",
"waku"
task buildSharedLib, "Build shared library for C bindings": task buildSharedLib, "Build shared library for C bindings":
exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
task buildStaticLib, "Build static library for C bindings": task buildStaticLib, "Build static library for C bindings":
exec "nim c --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim" exec "nim c --mm:refc --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim"

View File

@ -1,183 +1,192 @@
import std/[times, deques] import std/[times, deques, options]
# import ./token_bucket
import waku/common/rate_limit/token_bucket
import chronos import chronos
type type
Serializable* = concept x CapacityState = enum
x.toBytes() is seq[byte] Normal = 0
AlmostNone = 1
None = 2
MessagePriority* = enum SendResult* = enum
PassedToSender = 0
Enqueued = 1
Dropped = 2
DroppedBatchTooLarge = 3
Priority* = enum
Critical = 0 Critical = 0
Normal = 1 Normal = 1
Optional = 2 Optional = 2
QueuedMessage*[T: Serializable] = ref object of RootObj Serializable* =
messageId*: string concept x
msg*: T x.toBytes() is seq[byte]
priority*: MessagePriority
queuedAt*: float
MessageSender*[T: Serializable] = proc(messageId: string, msg: T): Future[bool] {.async.} MessageSender*[T: Serializable] =
proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.}
RateLimitManager*[T: Serializable] = ref object RateLimitManager*[T: Serializable] = ref object
messageCount*: int = 100 # Default to 100 messages bucket: TokenBucket
epochDurationSec*: int = 600 # Default to 10 minutes sender: MessageSender[T]
currentCount*: int running: bool
lastResetTime*: float queueCritical: Deque[seq[tuple[msgId: string, msg: T]]]
criticalQueue*: Deque[QueuedMessage[T]] queueNormal: Deque[seq[tuple[msgId: string, msg: T]]]
normalQueue*: Deque[QueuedMessage[T]] sleepDuration: chronos.Duration
optionalQueue*: Deque[QueuedMessage[T]]
sender*: MessageSender[T]
isRunning*: bool
sendTask*: Future[void]
proc newRateLimitManager*[T: Serializable](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] = proc newRateLimitManager*[T: Serializable](
sender: MessageSender[T],
capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10),
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
): RateLimitManager[T] =
RateLimitManager[T]( RateLimitManager[T](
messageCount: messageCount, bucket: TokenBucket.newStrict(capacity, duration),
epochDurationSec: epochDurationSec,
currentCount: 0,
lastResetTime: epochTime(),
criticalQueue: initDeque[QueuedMessage[T]](),
normalQueue: initDeque[QueuedMessage[T]](),
optionalQueue: initDeque[QueuedMessage[T]](),
sender: sender, sender: sender,
isRunning: false running: false,
queueCritical: Deque[seq[tuple[msgId: string, msg: T]]](),
queueNormal: Deque[seq[tuple[msgId: string, msg: T]]](),
sleepDuration: sleepDuration,
) )
proc updateEpochIfNeeded[T: Serializable](manager: RateLimitManager[T]) = proc getCapacityState[T: Serializable](
let now = epochTime() manager: RateLimitManager[T], now: Moment, count: int = 1
if now - manager.lastResetTime >= float(manager.epochDurationSec): ): CapacityState =
manager.lastResetTime = now let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now)
manager.currentCount = 0 let countAfter = budget - count
let ratio = countAfter.float / budgetCap.float
proc getUsagePercent[T: Serializable](manager: RateLimitManager[T]): float = if ratio < 0.0:
if manager.messageCount == 0: return CapacityState.None
return 1.0 elif ratio < 0.3:
float(manager.currentCount) / float(manager.messageCount) return CapacityState.AlmostNone
proc queueForSend*[T: Serializable](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) =
manager.updateEpochIfNeeded()
let queuedMsg = QueuedMessage[T](
messageId: messageId,
msg: msg,
priority: priority,
queuedAt: epochTime()
)
let usagePercent = manager.getUsagePercent()
if usagePercent >= 1.0:
# Quota exhausted - queue critical on top, queue normal, drop optional
case priority:
of Critical:
manager.criticalQueue.addLast(queuedMsg)
of Normal:
manager.normalQueue.addLast(queuedMsg)
of Optional:
discard # Drop optional messages when quota exhausted
elif usagePercent >= 0.7:
# Low quota - send critical, queue normal, drop optional
case priority:
of Critical:
manager.criticalQueue.addLast(queuedMsg)
of Normal:
manager.normalQueue.addLast(queuedMsg)
of Optional:
discard # Drop optional messages when quota low
else: else:
# Normal operation - queue all messages return CapacityState.Normal
case priority:
of Critical:
manager.criticalQueue.addLast(queuedMsg)
of Normal:
manager.normalQueue.addLast(queuedMsg)
of Optional:
manager.optionalQueue.addLast(queuedMsg)
proc getNextMessage[T: Serializable](manager: RateLimitManager[T]): QueuedMessage[T] = proc passToSender[T: Serializable](
# Priority order: Critical -> Normal -> Optional manager: RateLimitManager[T],
if manager.criticalQueue.len > 0: msgs: seq[tuple[msgId: string, msg: T]],
return manager.criticalQueue.popFirst() now: Moment,
elif manager.normalQueue.len > 0: priority: Priority,
return manager.normalQueue.popFirst() ): Future[SendResult] {.async.} =
elif manager.optionalQueue.len > 0: let count = msgs.len
return manager.optionalQueue.popFirst() let capacity = manager.bucket.tryConsume(count, now)
else: if not capacity:
raise newException(ValueError, "No messages in queue") case priority
of Priority.Critical:
manager.queueCritical.addLast(msgs)
return SendResult.Enqueued
of Priority.Normal:
manager.queueNormal.addLast(msgs)
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
await manager.sender(msgs)
return SendResult.PassedToSender
proc hasMessages[T: Serializable](manager: RateLimitManager[T]): bool = proc processCriticalQueue[T: Serializable](
manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0 manager: RateLimitManager[T], now: Moment
): Future[void] {.async.} =
proc sendLoop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = while manager.queueCritical.len > 0:
manager.isRunning = true let msgs = manager.queueCritical.popFirst()
let capacityState = manager.getCapacityState(now, msgs.len)
while manager.isRunning: if capacityState == CapacityState.Normal:
try: discard await manager.passToSender(msgs, now, Priority.Critical)
manager.updateEpochIfNeeded() elif capacityState == CapacityState.AlmostNone:
discard await manager.passToSender(msgs, now, Priority.Critical)
while manager.hasMessages() and manager.currentCount < manager.messageCount: else:
let msg = manager.getNextMessage() # add back to critical queue
manager.queueCritical.addFirst(msgs)
try:
let sent = await manager.sender(msg.messageId, msg.msg)
if sent:
manager.currentCount += 1
else:
# Re-queue failed message at beginning of appropriate queue
case msg.priority:
of Critical:
manager.criticalQueue.addFirst(msg)
of Normal:
manager.normalQueue.addFirst(msg)
of Optional:
manager.optionalQueue.addFirst(msg)
break # Stop trying to send more messages if one fails
except:
# Re-queue on exception
case msg.priority:
of Critical:
manager.criticalQueue.addFirst(msg)
of Normal:
manager.normalQueue.addFirst(msg)
of Optional:
manager.optionalQueue.addFirst(msg)
break
# Small delay between messages TODO not sure if needed
await sleepAsync(chronos.milliseconds(10))
# Wait before next processing cycle TODO not completely sure if this is the way
await sleepAsync(chronos.milliseconds(100))
except CancelledError:
break break
except:
await sleepAsync(chronos.seconds(1)) # Wait longer on error
proc start*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = proc processNormalQueue[T: Serializable](
if not manager.isRunning: manager: RateLimitManager[T], now: Moment
manager.sendTask = manager.sendLoop() ): Future[void] {.async.} =
while manager.queueNormal.len > 0:
let msgs = manager.queueNormal.popFirst()
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
discard await manager.passToSender(msgs, now, Priority.Normal)
else:
# add back to critical queue
manager.queueNormal.addFirst(msgs)
break
proc stop*[T: Serializable](manager: RateLimitManager[T]): Future[void] {.async.} = proc sendOrEnqueue*[T: Serializable](
if manager.isRunning: manager: RateLimitManager[T],
manager.isRunning = false msgs: seq[tuple[msgId: string, msg: T]],
if not manager.sendTask.isNil: priority: Priority,
manager.sendTask.cancelSoon() now: Moment = Moment.now(),
try: ): Future[SendResult] {.async.} =
await manager.sendTask let (_, budgetCap) = manager.bucket.getAvailableCapacity(now)
except CancelledError: if msgs.len.float / budgetCap.float >= 0.3:
discard # drop batch if it's too large to avoid starvation
return SendResult.DroppedBatchTooLarge
proc getQueueStatus*[T: Serializable](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] = let capacityState = manager.getCapacityState(now, msgs.len)
( case capacityState
critical: manager.criticalQueue.len, of CapacityState.Normal:
normal: manager.normalQueue.len, return await manager.passToSender(msgs, now, priority)
optional: manager.optionalQueue.len, of CapacityState.AlmostNone:
total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len case priority
) of Priority.Critical:
return await manager.passToSender(msgs, now, priority)
of Priority.Normal:
manager.queueNormal.addLast(msgs)
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
of CapacityState.None:
case priority
of Priority.Critical:
manager.queueCritical.addLast(msgs)
return SendResult.Enqueued
of Priority.Normal:
manager.queueNormal.addLast(msgs)
return SendResult.Enqueued
of Priority.Optional:
return SendResult.Dropped
proc getCurrentQuota*[T: Serializable](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] = proc getEnqueued*[T: Serializable](
( manager: RateLimitManager[T]
total: manager.messageCount, ): tuple[
used: manager.currentCount, critical: seq[tuple[msgId: string, msg: T]], normal: seq[tuple[msgId: string, msg: T]]
remaining: max(0, manager.messageCount - manager.currentCount) ] =
) var criticalMsgs: seq[tuple[msgId: string, msg: T]]
var normalMsgs: seq[tuple[msgId: string, msg: T]]
for batch in manager.queueCritical:
criticalMsgs.add(batch)
for batch in manager.queueNormal:
normalMsgs.add(batch)
return (criticalMsgs, normalMsgs)
proc start*[T: Serializable](
manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
): Future[void] {.async.} =
manager.running = true
while manager.running:
try:
let now = nowProvider()
await manager.processCriticalQueue(now)
await manager.processNormalQueue(now)
# configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration)
except Exception as e:
echo "Error in queue processing: ", e.msg
await sleepAsync(manager.sleepDuration)
proc stop*[T: Serializable](manager: RateLimitManager[T]) =
manager.running = false
func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} =
if isNil(b):
return "nil"
return
"RateLimitManager(running: " & $b.running & ", critical: " & $b.queueCritical.len &
", normal: " & $b.queueNormal.len & ")"

View File

@ -1,244 +1,311 @@
{.used.} {.used.}
import std/[sequtils, times, random], testutils/unittests import testutils/unittests
import ../ratelimit/ratelimit import ../ratelimit/ratelimit
import chronos import chronos
import strutils
randomize() # Implement the Serializable concept for string
# Test message types
type
TestMessage = object
content: string
id: int
# Implement Serializable for test types
proc toBytes*(s: string): seq[byte] = proc toBytes*(s: string): seq[byte] =
cast[seq[byte]](s) cast[seq[byte]](s)
proc toBytes*(msg: TestMessage): seq[byte] = suite "Queue RateLimitManager":
result = toBytes(msg.content)
result.add(cast[seq[byte]](@[byte(msg.id)]))
proc toBytes*(i: int): seq[byte] =
cast[seq[byte]](@[byte(i)])
suite "Rate Limit Manager":
setup: setup:
## Given var sentMessages: seq[tuple[msgId: string, msg: string]]
let epochDuration = 60 var senderCallCount: int = 0
var sentMessages: seq[string]
proc testSender(messageId: string, msg: string): Future[bool] {.async.} = # Create a mock sender
sentMessages.add(msg) proc mockSender(
msgs: seq[tuple[msgId: string, msg: string]]
): Future[void] {.async.} =
senderCallCount.inc()
for msg in msgs:
sentMessages.add(msg)
await sleepAsync(chronos.milliseconds(10)) await sleepAsync(chronos.milliseconds(10))
return true
asyncTest "basic message queueing and sending": asyncTest "sendOrEnqueue - immediate send when capacity available":
## Given ## Given
let capacity = 10 let manager = newRateLimitManager[string](
var manager = newRateLimitManager(capacity, epochDuration, testSender) mockSender, capacity = 10, duration = chronos.milliseconds(100)
await manager.start() )
let testMsg = "Hello World"
## When ## When
manager.queueForSend("msg1", "Hello", Critical) let res = await manager.sendOrEnqueue(@[("msg1", testMsg)], Critical)
manager.queueForSend("msg2", "World", Normal)
await sleepAsync(chronos.milliseconds(200))
await manager.stop()
## Then ## Then
check: check:
res == PassedToSender
senderCallCount == 1
sentMessages.len == 1
sentMessages[0].msgId == "msg1"
sentMessages[0].msg == "Hello World"
asyncTest "sendOrEnqueue - multiple messages":
## Given
let manager = newRateLimitManager[string](
mockSender, capacity = 10, duration = chronos.milliseconds(100)
)
## When
let res =
await manager.sendOrEnqueue(@[("msg1", "First"), ("msg2", "Second")], Normal)
## Then
check:
res == PassedToSender
senderCallCount == 1
sentMessages.len == 2 sentMessages.len == 2
sentMessages[0] == "Hello" sentMessages[0].msgId == "msg1"
sentMessages[1] == "World" sentMessages[0].msg == "First"
sentMessages[1].msgId == "msg2"
sentMessages[1].msg == "Second"
asyncTest "priority ordering - critical first, then normal, then optional": asyncTest "start and stop - basic functionality":
## Given ## Given
let capacity = 10 let manager = newRateLimitManager[string](
var manager = newRateLimitManager(capacity, epochDuration, testSender) mockSender,
await manager.start() capacity = 10,
duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(50),
)
## When - queue messages in mixed priority order ## When - start the manager
manager.queueForSend("normal1", "Normal1", Normal) let startFut = manager.start()
manager.queueForSend("critical1", "Critical1", Critical)
manager.queueForSend("optional1", "Optional1", Optional)
manager.queueForSend("critical2", "Critical2", Critical)
manager.queueForSend("normal2", "Normal2", Normal)
await sleepAsync(chronos.milliseconds(300)) # Give it some time to start
await manager.stop() await sleepAsync(chronos.milliseconds(20))
## Then - critical messages should be sent first ## Then - manager should be running (check via string representation)
check: check:
sentMessages.len == 5 "running: true" in $manager
sentMessages[0] == "Critical1" # First critical not startFut.finished() # should still be running
sentMessages[1] == "Critical2" # Second critical
sentMessages[2] == "Normal1" # Then normal messages
sentMessages[3] == "Normal2"
sentMessages[4] == "Optional1" # Finally optional
asyncTest "rate limiting - respects message quota per epoch": ## When - stop the manager
manager.stop()
# Give it time to stop (should happen within one sleep cycle)
await sleepAsync(chronos.milliseconds(60))
## Then - manager should be stopped
check:
"running: false" in $manager
startFut.finished() # should have completed
asyncTest "start and stop - drop large batch":
## Given ## Given
let capacity = 3 # Only 3 messages allowed let manager = newRateLimitManager[string](
var manager = newRateLimitManager(capacity, epochDuration, testSender) mockSender,
await manager.start() capacity = 2,
duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(30),
)
## When - queue more messages than the limit let largeBatch1 = @[("msg1", "First"), ("msg2", "Second"), ("msg3", "Third")]
for i in 1..5: let largeBatch2 = @[("msg4", "Fourth"), ("msg5", "Fifth")]
manager.queueForSend("msg" & $i, "Message" & $i, Normal)
await sleepAsync(chronos.milliseconds(300)) discard await manager.sendOrEnqueue(largeBatch1, Normal)
discard await manager.sendOrEnqueue(largeBatch2, Critical)
## Then asyncTest "enqueue - enqueue critical only when exceeded":
let quota = manager.getCurrentQuota() ## Given
let queueStatus = manager.getQueueStatus() let manager = newRateLimitManager[string](
mockSender,
capacity = 10,
duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(100),
)
var now = Moment.now()
let startFut = manager.start(
nowProvider = proc(): Moment =
now
)
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now)
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now)
let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Critical, now)
let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Critical, now)
let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Critical, now)
let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Critical, now)
let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Critical, now)
let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Critical, now)
let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Critical, now)
let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Critical, now)
# will enqueue to critical queue
let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
check: check:
quota.used == 3 r1 == PassedToSender
quota.remaining == 0 r2 == PassedToSender
sentMessages.len == 3 r3 == PassedToSender
queueStatus.total == 2 # 2 messages should be queued r4 == PassedToSender
r5 == PassedToSender
r6 == PassedToSender
r7 == PassedToSender
r8 == PassedToSender
r9 == PassedToSender
r10 == PassedToSender
r11 == Enqueued
await manager.stop() let (critical, normal) = manager.getEnqueued()
check:
critical.len == 1
normal.len == 0
critical[0].msgId == "msg11"
asyncTest "optional message dropping at high usage": # Give it time to stop
manager.stop()
await sleepAsync(chronos.milliseconds(150))
## Then - should be stopped
check:
"running: false" in $manager
startFut.finished()
asyncTest "enqueue - enqueue normal on 70% capacity":
## Given ## Given
let capacity = 10 let manager = newRateLimitManager[string](
var manager = newRateLimitManager(capacity, epochDuration, testSender) mockSender,
await manager.start() capacity = 10,
duration = chronos.milliseconds(100),
sleepDuration = chronos.milliseconds(100),
)
var now = Moment.now()
let startFut = manager.start(
nowProvider = proc(): Moment =
now
)
# Fill to 80% capacity to trigger optional dropping let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now)
for i in 1..8: let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now)
manager.queueForSend("fill" & $i, "Fill" & $i, Normal) let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Normal, now)
let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Normal, now)
await sleepAsync(chronos.milliseconds(200)) let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Normal, now)
let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Normal, now)
## When - add messages at high usage let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Normal, now)
manager.queueForSend("critical", "Critical", Critical) let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Normal, now)
manager.queueForSend("normal", "Normal", Normal) let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Normal, now)
manager.queueForSend("optional", "Optional", Optional) # Should be dropped let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Normal, now)
let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
await sleepAsync(chronos.milliseconds(200)) let r12 = await manager.sendOrEnqueue(@[("msg12", "12")], Optional, now)
## Then
let quota = manager.getCurrentQuota()
let optionalSent = sentMessages.anyIt(it == "Optional")
check: check:
quota.used == 10 r1 == PassedToSender
not optionalSent # Optional message should not be sent r2 == PassedToSender
r3 == PassedToSender
r4 == PassedToSender
r5 == PassedToSender
r6 == PassedToSender
r7 == PassedToSender
r8 == Enqueued
r9 == Enqueued
r10 == Enqueued
r11 == PassedToSender
r12 == Dropped
await manager.stop() let (critical, normal) = manager.getEnqueued()
asyncTest "quota tracking - accurate total, used, and remaining counts":
## Given
let capacity = 5
var manager = newRateLimitManager(capacity, epochDuration, testSender)
await manager.start()
## When - initially no messages sent
let initialQuota = manager.getCurrentQuota()
## Then - should show full quota available
check: check:
initialQuota.total == 5 critical.len == 0
initialQuota.used == 0 normal.len == 3
initialQuota.remaining == 5 normal[0].msgId == "msg8"
normal[1].msgId == "msg9"
normal[2].msgId == "msg10"
## When - send some messages # Give it time to stop
manager.queueForSend("msg1", "Message1", Normal) manager.stop()
manager.queueForSend("msg2", "Message2", Normal) await sleepAsync(chronos.milliseconds(150))
## Then - should be stopped
await sleepAsync(chronos.milliseconds(200))
## Then - quota should be updated
let afterQuota = manager.getCurrentQuota()
check: check:
afterQuota.used == 2 "running: false" in $manager
afterQuota.remaining == 3 startFut.finished()
await manager.stop() asyncTest "enqueue - process queued messages":
asyncTest "queue status tracking - by priority levels":
## Given ## Given
let capacity = 2 # Small limit to force queueing let manager = newRateLimitManager[string](
var manager = newRateLimitManager(capacity, epochDuration, testSender) mockSender,
await manager.start() capacity = 10,
duration = chronos.milliseconds(200),
sleepDuration = chronos.milliseconds(200),
)
type MomentRef = ref object
value: Moment
## When - fill quota and add more messages var now = Moment.now()
manager.queueForSend("msg1", "Message1", Critical) var nowRef = MomentRef(value: now)
manager.queueForSend("msg2", "Message2", Normal) let startFut = manager.start(
manager.queueForSend("msg3", "Message3", Critical) # Should be queued nowProvider = proc(): Moment =
manager.queueForSend("msg4", "Message4", Normal) # Should be queued return nowRef.value
manager.queueForSend("msg5", "Message5", Optional) # Should be queued )
await sleepAsync(chronos.milliseconds(100)) let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now)
let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now)
## Then let r3 = await manager.sendOrEnqueue(@[("3", "val_3")], Normal, now)
let queueStatus = manager.getQueueStatus() let r4 = await manager.sendOrEnqueue(@[("4", "val_4")], Normal, now)
let r5 = await manager.sendOrEnqueue(@[("5", "val_5")], Normal, now)
let r6 = await manager.sendOrEnqueue(@[("6", "val_6")], Normal, now)
let r7 = await manager.sendOrEnqueue(@[("7", "val_7")], Normal, now)
let r8 = await manager.sendOrEnqueue(@[("8", "val_8")], Normal, now)
let r9 = await manager.sendOrEnqueue(@[("9", "val_9")], Normal, now)
let r10 = await manager.sendOrEnqueue(@[("10", "val_10")], Normal, now)
let r11 = await manager.sendOrEnqueue(@[("11", "val_11")], Critical, now)
let r12 = await manager.sendOrEnqueue(@[("12", "val_12")], Optional, now)
let r13 = await manager.sendOrEnqueue(@[("13", "val_13")], Critical, now)
let r14 = await manager.sendOrEnqueue(@[("14", "val_14")], Critical, now)
let r15 = await manager.sendOrEnqueue(@[("15", "val_15")], Critical, now)
check: check:
queueStatus.total >= 2 # At least some messages queued r1 == PassedToSender
queueStatus.critical >= 1 # Critical messages in queue r2 == PassedToSender
queueStatus.normal >= 1 # Normal messages in queue r3 == PassedToSender
r4 == PassedToSender
r5 == PassedToSender
r6 == PassedToSender
r7 == PassedToSender
r8 == Enqueued
r9 == Enqueued
r10 == Enqueued
r11 == PassedToSender
r12 == Dropped
r13 == PassedToSender
r14 == PassedToSender
r15 == Enqueued
await manager.stop() var (critical, normal) = manager.getEnqueued()
suite "Generic Type Support":
setup:
## Given
let epochDuration = 60
var sentCustomMessages: seq[TestMessage]
proc testCustomSender(messageId: string, msg: TestMessage): Future[bool] {.async.} =
sentCustomMessages.add(msg)
await sleepAsync(chronos.milliseconds(10))
return true
asyncTest "custom message types - TestMessage objects":
## Given
let capacity = 5
var manager = newRateLimitManager(capacity, epochDuration, testCustomSender)
await manager.start()
## When
let testMsg = TestMessage(content: "Test content", id: 42)
manager.queueForSend("custom1", testMsg, Normal)
await sleepAsync(chronos.milliseconds(200))
await manager.stop()
## Then
check: check:
sentCustomMessages.len == 1 critical.len == 1
sentCustomMessages[0].content == "Test content" normal.len == 3
sentCustomMessages[0].id == 42 normal[0].msgId == "8"
normal[1].msgId == "9"
normal[2].msgId == "10"
critical[0].msgId == "15"
asyncTest "integer message types": nowRef.value = now + chronos.milliseconds(250)
## Given await sleepAsync(chronos.milliseconds(250))
let capacity = 5
var sentInts: seq[int]
proc testIntSender(messageId: string, msg: int): Future[bool] {.async.} = (critical, normal) = manager.getEnqueued()
sentInts.add(msg)
await sleepAsync(chronos.milliseconds(10))
return true
var manager = newRateLimitManager(capacity, epochDuration, testIntSender)
await manager.start()
## When
manager.queueForSend("int1", 42, Critical)
manager.queueForSend("int2", 100, Normal)
manager.queueForSend("int3", 999, Optional)
await sleepAsync(chronos.milliseconds(200))
await manager.stop()
## Then
check: check:
sentInts.len == 3 critical.len == 0
sentInts[0] == 42 # Critical sent first normal.len == 0
sentInts[1] == 100 # Normal sent second senderCallCount == 14
sentInts[2] == 999 # Optional sent last sentMessages.len == 14
sentMessages[0].msgId == "1"
sentMessages[1].msgId == "2"
sentMessages[2].msgId == "3"
sentMessages[3].msgId == "4"
sentMessages[4].msgId == "5"
sentMessages[5].msgId == "6"
sentMessages[6].msgId == "7"
sentMessages[7].msgId == "11"
sentMessages[8].msgId == "13"
sentMessages[9].msgId == "14"
sentMessages[10].msgId == "15"
sentMessages[11].msgId == "8"
sentMessages[12].msgId == "9"
sentMessages[13].msgId == "10"
# Give it time to stop
manager.stop()
await sleepAsync(chronos.milliseconds(250))
## Then - should be stopped
check:
"running: false" in $manager
startFut.finished()