mirror of
https://github.com/logos-messaging/nim-chat-sdk.git
synced 2026-01-07 16:43:11 +00:00
Merge pull request #3 from waku-org/feat/rate-limit-no-storage
feat: rate limit (wip: no storage)
This commit is contained in:
commit
c8a3c6ca1d
7
.gitignore
vendored
7
.gitignore
vendored
@ -1,3 +1,6 @@
|
|||||||
|
# IDE
|
||||||
|
.vscode/
|
||||||
|
|
||||||
# Nim
|
# Nim
|
||||||
nimcache/
|
nimcache/
|
||||||
*.exe
|
*.exe
|
||||||
@ -19,4 +22,6 @@ nimcache/
|
|||||||
chat_sdk/*
|
chat_sdk/*
|
||||||
!*.nim
|
!*.nim
|
||||||
apps/*
|
apps/*
|
||||||
!*.nim
|
!*.nim
|
||||||
|
nimble.develop
|
||||||
|
nimble.paths
|
||||||
|
|||||||
@ -1,21 +1,19 @@
|
|||||||
# Package
|
# Package
|
||||||
|
|
||||||
version = "0.0.1"
|
version = "0.0.1"
|
||||||
author = "Waku Chat Team"
|
author = "Waku Chat Team"
|
||||||
description = "Chat features over Waku"
|
description = "Chat features over Waku"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
srcDir = "src"
|
srcDir = "src"
|
||||||
|
|
||||||
|
### Dependencies
|
||||||
# Dependencies
|
requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "waku"
|
||||||
|
|
||||||
requires "nim >= 2.0.0"
|
|
||||||
|
|
||||||
task buildSharedLib, "Build shared library for C bindings":
|
task buildSharedLib, "Build shared library for C bindings":
|
||||||
exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
|
exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
|
||||||
|
|
||||||
task buildStaticLib, "Build static library for C bindings":
|
task buildStaticLib, "Build static library for C bindings":
|
||||||
exec "nim c --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim"
|
exec "nim c --mm:refc --app:staticLib --out:../library/c-bindings/libchatsdk.a chat_sdk/chat_sdk.nim"
|
||||||
|
|
||||||
task migrate, "Run database migrations":
|
task migrate, "Run database migrations":
|
||||||
exec "nim c -r apps/run_migration.nim"
|
exec "nim c -r apps/run_migration.nim"
|
||||||
|
|||||||
4
config.nims
Normal file
4
config.nims
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
# begin Nimble config (version 2)
|
||||||
|
when withDir(thisDir(), system.fileExists("nimble.paths")):
|
||||||
|
include "nimble.paths"
|
||||||
|
# end Nimble config
|
||||||
196
ratelimit/rate_limit_manager.nim
Normal file
196
ratelimit/rate_limit_manager.nim
Normal file
@ -0,0 +1,196 @@
|
|||||||
|
import std/[times, deques, options]
|
||||||
|
import waku/common/rate_limit/token_bucket
|
||||||
|
import chronos
|
||||||
|
|
||||||
|
type
|
||||||
|
CapacityState {.pure.} = enum
|
||||||
|
Normal
|
||||||
|
AlmostNone
|
||||||
|
None
|
||||||
|
|
||||||
|
SendResult* {.pure.} = enum
|
||||||
|
PassedToSender
|
||||||
|
Enqueued
|
||||||
|
Dropped
|
||||||
|
DroppedBatchTooLarge
|
||||||
|
|
||||||
|
Priority* {.pure.} = enum
|
||||||
|
Critical
|
||||||
|
Normal
|
||||||
|
Optional
|
||||||
|
|
||||||
|
Serializable* =
|
||||||
|
concept x
|
||||||
|
x.toBytes() is seq[byte]
|
||||||
|
|
||||||
|
MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T]
|
||||||
|
|
||||||
|
MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
|
||||||
|
|
||||||
|
RateLimitManager*[T: Serializable] = ref object
|
||||||
|
bucket: TokenBucket
|
||||||
|
sender: MessageSender[T]
|
||||||
|
queueCritical: Deque[seq[MsgIdMsg[T]]]
|
||||||
|
queueNormal: Deque[seq[MsgIdMsg[T]]]
|
||||||
|
sleepDuration: chronos.Duration
|
||||||
|
pxQueueHandleLoop: Future[void]
|
||||||
|
|
||||||
|
proc new*[T: Serializable](
|
||||||
|
M: type[RateLimitManager[T]],
|
||||||
|
sender: MessageSender[T],
|
||||||
|
capacity: int = 100,
|
||||||
|
duration: chronos.Duration = chronos.minutes(10),
|
||||||
|
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
|
||||||
|
): M =
|
||||||
|
M(
|
||||||
|
bucket: TokenBucket.newStrict(capacity, duration),
|
||||||
|
sender: sender,
|
||||||
|
queueCritical: Deque[seq[MsgIdMsg[T]]](),
|
||||||
|
queueNormal: Deque[seq[MsgIdMsg[T]]](),
|
||||||
|
sleepDuration: sleepDuration,
|
||||||
|
)
|
||||||
|
|
||||||
|
proc getCapacityState[T: Serializable](
|
||||||
|
manager: RateLimitManager[T], now: Moment, count: int = 1
|
||||||
|
): CapacityState =
|
||||||
|
let (budget, budgetCap) = manager.bucket.getAvailableCapacity(now)
|
||||||
|
let countAfter = budget - count
|
||||||
|
let ratio = countAfter.float / budgetCap.float
|
||||||
|
if ratio < 0.0:
|
||||||
|
return CapacityState.None
|
||||||
|
elif ratio < 0.3:
|
||||||
|
return CapacityState.AlmostNone
|
||||||
|
else:
|
||||||
|
return CapacityState.Normal
|
||||||
|
|
||||||
|
proc passToSender[T: Serializable](
|
||||||
|
manager: RateLimitManager[T],
|
||||||
|
msgs: sink seq[MsgIdMsg[T]],
|
||||||
|
now: Moment,
|
||||||
|
priority: Priority,
|
||||||
|
): Future[SendResult] {.async.} =
|
||||||
|
let count = msgs.len
|
||||||
|
let capacity = manager.bucket.tryConsume(count, now)
|
||||||
|
if not capacity:
|
||||||
|
case priority
|
||||||
|
of Priority.Critical:
|
||||||
|
manager.queueCritical.addLast(msgs)
|
||||||
|
return SendResult.Enqueued
|
||||||
|
of Priority.Normal:
|
||||||
|
manager.queueNormal.addLast(msgs)
|
||||||
|
return SendResult.Enqueued
|
||||||
|
of Priority.Optional:
|
||||||
|
return SendResult.Dropped
|
||||||
|
await manager.sender(msgs)
|
||||||
|
return SendResult.PassedToSender
|
||||||
|
|
||||||
|
proc processCriticalQueue[T: Serializable](
|
||||||
|
manager: RateLimitManager[T], now: Moment
|
||||||
|
) {.async.} =
|
||||||
|
while manager.queueCritical.len > 0:
|
||||||
|
let msgs = manager.queueCritical.popFirst()
|
||||||
|
let capacityState = manager.getCapacityState(now, msgs.len)
|
||||||
|
if capacityState == CapacityState.Normal:
|
||||||
|
discard await manager.passToSender(msgs, now, Priority.Critical)
|
||||||
|
elif capacityState == CapacityState.AlmostNone:
|
||||||
|
discard await manager.passToSender(msgs, now, Priority.Critical)
|
||||||
|
else:
|
||||||
|
# add back to critical queue
|
||||||
|
manager.queueCritical.addFirst(msgs)
|
||||||
|
break
|
||||||
|
|
||||||
|
proc processNormalQueue[T: Serializable](
|
||||||
|
manager: RateLimitManager[T], now: Moment
|
||||||
|
) {.async.} =
|
||||||
|
while manager.queueNormal.len > 0:
|
||||||
|
let msgs = manager.queueNormal.popFirst()
|
||||||
|
let capacityState = manager.getCapacityState(now, msgs.len)
|
||||||
|
if capacityState == CapacityState.Normal:
|
||||||
|
discard await manager.passToSender(msgs, now, Priority.Normal)
|
||||||
|
else:
|
||||||
|
# add back to critical queue
|
||||||
|
manager.queueNormal.addFirst(msgs)
|
||||||
|
break
|
||||||
|
|
||||||
|
proc sendOrEnqueue*[T: Serializable](
|
||||||
|
manager: RateLimitManager[T],
|
||||||
|
msgs: seq[MsgIdMsg[T]],
|
||||||
|
priority: Priority,
|
||||||
|
now: Moment = Moment.now(),
|
||||||
|
): Future[SendResult] {.async.} =
|
||||||
|
let (_, budgetCap) = manager.bucket.getAvailableCapacity(now)
|
||||||
|
if msgs.len.float / budgetCap.float >= 0.3:
|
||||||
|
# drop batch if it's too large to avoid starvation
|
||||||
|
return SendResult.DroppedBatchTooLarge
|
||||||
|
|
||||||
|
let capacityState = manager.getCapacityState(now, msgs.len)
|
||||||
|
case capacityState
|
||||||
|
of CapacityState.Normal:
|
||||||
|
return await manager.passToSender(msgs, now, priority)
|
||||||
|
of CapacityState.AlmostNone:
|
||||||
|
case priority
|
||||||
|
of Priority.Critical:
|
||||||
|
return await manager.passToSender(msgs, now, priority)
|
||||||
|
of Priority.Normal:
|
||||||
|
manager.queueNormal.addLast(msgs)
|
||||||
|
return SendResult.Enqueued
|
||||||
|
of Priority.Optional:
|
||||||
|
return SendResult.Dropped
|
||||||
|
of CapacityState.None:
|
||||||
|
case priority
|
||||||
|
of Priority.Critical:
|
||||||
|
manager.queueCritical.addLast(msgs)
|
||||||
|
return SendResult.Enqueued
|
||||||
|
of Priority.Normal:
|
||||||
|
manager.queueNormal.addLast(msgs)
|
||||||
|
return SendResult.Enqueued
|
||||||
|
of Priority.Optional:
|
||||||
|
return SendResult.Dropped
|
||||||
|
|
||||||
|
proc getEnqueued*[T: Serializable](
|
||||||
|
manager: RateLimitManager[T]
|
||||||
|
): tuple[critical: seq[MsgIdMsg[T]], normal: seq[MsgIdMsg[T]]] =
|
||||||
|
var criticalMsgs: seq[MsgIdMsg[T]]
|
||||||
|
var normalMsgs: seq[MsgIdMsg[T]]
|
||||||
|
|
||||||
|
for batch in manager.queueCritical:
|
||||||
|
criticalMsgs.add(batch)
|
||||||
|
|
||||||
|
for batch in manager.queueNormal:
|
||||||
|
normalMsgs.add(batch)
|
||||||
|
|
||||||
|
return (criticalMsgs, normalMsgs)
|
||||||
|
|
||||||
|
proc queueHandleLoop[T: Serializable](
|
||||||
|
manager: RateLimitManager[T],
|
||||||
|
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
|
||||||
|
Moment.now(),
|
||||||
|
) {.async.} =
|
||||||
|
while true:
|
||||||
|
try:
|
||||||
|
let now = nowProvider()
|
||||||
|
await manager.processCriticalQueue(now)
|
||||||
|
await manager.processNormalQueue(now)
|
||||||
|
except Exception as e:
|
||||||
|
echo "Error in queue processing: ", e.msg
|
||||||
|
|
||||||
|
# configurable sleep duration for processing queued messages
|
||||||
|
await sleepAsync(manager.sleepDuration)
|
||||||
|
|
||||||
|
proc start*[T: Serializable](
|
||||||
|
manager: RateLimitManager[T],
|
||||||
|
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
|
||||||
|
Moment.now(),
|
||||||
|
) {.async.} =
|
||||||
|
manager.pxQueueHandleLoop = manager.queueHandleLoop(nowProvider)
|
||||||
|
|
||||||
|
proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} =
|
||||||
|
if not isNil(manager.pxQueueHandleLoop):
|
||||||
|
await manager.pxQueueHandleLoop.cancelAndWait()
|
||||||
|
|
||||||
|
func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} =
|
||||||
|
if isNil(b):
|
||||||
|
return "nil"
|
||||||
|
return
|
||||||
|
"RateLimitManager(critical: " & $b.queueCritical.len & ", normal: " &
|
||||||
|
$b.queueNormal.len & ")"
|
||||||
259
tests/test_rate_limit_manager.nim
Normal file
259
tests/test_rate_limit_manager.nim
Normal file
@ -0,0 +1,259 @@
|
|||||||
|
import testutils/unittests
|
||||||
|
import ../ratelimit/rate_limit_manager
|
||||||
|
import chronos
|
||||||
|
|
||||||
|
# Implement the Serializable concept for string
|
||||||
|
proc toBytes*(s: string): seq[byte] =
|
||||||
|
cast[seq[byte]](s)
|
||||||
|
|
||||||
|
suite "Queue RateLimitManager":
|
||||||
|
setup:
|
||||||
|
var sentMessages: seq[tuple[msgId: string, msg: string]]
|
||||||
|
var senderCallCount: int = 0
|
||||||
|
|
||||||
|
# Create a mock sender
|
||||||
|
proc mockSender(
|
||||||
|
msgs: seq[tuple[msgId: string, msg: string]]
|
||||||
|
): Future[void] {.async.} =
|
||||||
|
senderCallCount.inc()
|
||||||
|
for msg in msgs:
|
||||||
|
sentMessages.add(msg)
|
||||||
|
await sleepAsync(chronos.milliseconds(10))
|
||||||
|
|
||||||
|
asyncTest "sendOrEnqueue - immediate send when capacity available":
|
||||||
|
## Given
|
||||||
|
let manager = RateLimitManager[string].new(
|
||||||
|
mockSender, capacity = 10, duration = chronos.milliseconds(100)
|
||||||
|
)
|
||||||
|
let testMsg = "Hello World"
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res = await manager.sendOrEnqueue(@[("msg1", testMsg)], Critical)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res == PassedToSender
|
||||||
|
senderCallCount == 1
|
||||||
|
sentMessages.len == 1
|
||||||
|
sentMessages[0].msgId == "msg1"
|
||||||
|
sentMessages[0].msg == "Hello World"
|
||||||
|
|
||||||
|
asyncTest "sendOrEnqueue - multiple messages":
|
||||||
|
## Given
|
||||||
|
let manager = RateLimitManager[string].new(
|
||||||
|
mockSender, capacity = 10, duration = chronos.milliseconds(100)
|
||||||
|
)
|
||||||
|
|
||||||
|
## When
|
||||||
|
let res =
|
||||||
|
await manager.sendOrEnqueue(@[("msg1", "First"), ("msg2", "Second")], Normal)
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
res == PassedToSender
|
||||||
|
senderCallCount == 1
|
||||||
|
sentMessages.len == 2
|
||||||
|
sentMessages[0].msgId == "msg1"
|
||||||
|
sentMessages[0].msg == "First"
|
||||||
|
sentMessages[1].msgId == "msg2"
|
||||||
|
sentMessages[1].msg == "Second"
|
||||||
|
|
||||||
|
asyncTest "start and stop - drop large batch":
|
||||||
|
## Given
|
||||||
|
let manager = RateLimitManager[string].new(
|
||||||
|
mockSender,
|
||||||
|
capacity = 2,
|
||||||
|
duration = chronos.milliseconds(100),
|
||||||
|
sleepDuration = chronos.milliseconds(30),
|
||||||
|
)
|
||||||
|
|
||||||
|
let largeBatch1 = @[("msg1", "First"), ("msg2", "Second"), ("msg3", "Third")]
|
||||||
|
let largeBatch2 = @[("msg4", "Fourth"), ("msg5", "Fifth")]
|
||||||
|
|
||||||
|
discard await manager.sendOrEnqueue(largeBatch1, Normal)
|
||||||
|
discard await manager.sendOrEnqueue(largeBatch2, Critical)
|
||||||
|
|
||||||
|
asyncTest "enqueue - enqueue critical only when exceeded":
|
||||||
|
## Given
|
||||||
|
let manager = RateLimitManager[string].new(
|
||||||
|
mockSender,
|
||||||
|
capacity = 10,
|
||||||
|
duration = chronos.milliseconds(100),
|
||||||
|
sleepDuration = chronos.milliseconds(100),
|
||||||
|
)
|
||||||
|
var now = Moment.now()
|
||||||
|
await manager.start(
|
||||||
|
nowProvider = proc(): Moment =
|
||||||
|
now
|
||||||
|
)
|
||||||
|
defer:
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now)
|
||||||
|
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now)
|
||||||
|
let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Critical, now)
|
||||||
|
let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Critical, now)
|
||||||
|
let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Critical, now)
|
||||||
|
let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Critical, now)
|
||||||
|
let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Critical, now)
|
||||||
|
let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Critical, now)
|
||||||
|
let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Critical, now)
|
||||||
|
let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Critical, now)
|
||||||
|
# will enqueue to critical queue
|
||||||
|
let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
|
||||||
|
|
||||||
|
check:
|
||||||
|
r1 == PassedToSender
|
||||||
|
r2 == PassedToSender
|
||||||
|
r3 == PassedToSender
|
||||||
|
r4 == PassedToSender
|
||||||
|
r5 == PassedToSender
|
||||||
|
r6 == PassedToSender
|
||||||
|
r7 == PassedToSender
|
||||||
|
r8 == PassedToSender
|
||||||
|
r9 == PassedToSender
|
||||||
|
r10 == PassedToSender
|
||||||
|
r11 == Enqueued
|
||||||
|
|
||||||
|
let (critical, normal) = manager.getEnqueued()
|
||||||
|
check:
|
||||||
|
critical.len == 1
|
||||||
|
normal.len == 0
|
||||||
|
critical[0].msgId == "msg11"
|
||||||
|
|
||||||
|
asyncTest "enqueue - enqueue normal on 70% capacity":
|
||||||
|
## Given
|
||||||
|
let manager = RateLimitManager[string].new(
|
||||||
|
mockSender,
|
||||||
|
capacity = 10,
|
||||||
|
duration = chronos.milliseconds(100),
|
||||||
|
sleepDuration = chronos.milliseconds(100),
|
||||||
|
)
|
||||||
|
var now = Moment.now()
|
||||||
|
await manager.start(
|
||||||
|
nowProvider = proc(): Moment =
|
||||||
|
now
|
||||||
|
)
|
||||||
|
defer:
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now)
|
||||||
|
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now)
|
||||||
|
let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Normal, now)
|
||||||
|
let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Normal, now)
|
||||||
|
let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Normal, now)
|
||||||
|
let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Normal, now)
|
||||||
|
let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Normal, now)
|
||||||
|
let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Normal, now)
|
||||||
|
let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Normal, now)
|
||||||
|
let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Normal, now)
|
||||||
|
let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
|
||||||
|
let r12 = await manager.sendOrEnqueue(@[("msg12", "12")], Optional, now)
|
||||||
|
|
||||||
|
check:
|
||||||
|
r1 == PassedToSender
|
||||||
|
r2 == PassedToSender
|
||||||
|
r3 == PassedToSender
|
||||||
|
r4 == PassedToSender
|
||||||
|
r5 == PassedToSender
|
||||||
|
r6 == PassedToSender
|
||||||
|
r7 == PassedToSender
|
||||||
|
r8 == Enqueued
|
||||||
|
r9 == Enqueued
|
||||||
|
r10 == Enqueued
|
||||||
|
r11 == PassedToSender
|
||||||
|
r12 == Dropped
|
||||||
|
|
||||||
|
let (critical, normal) = manager.getEnqueued()
|
||||||
|
check:
|
||||||
|
critical.len == 0
|
||||||
|
normal.len == 3
|
||||||
|
normal[0].msgId == "msg8"
|
||||||
|
normal[1].msgId == "msg9"
|
||||||
|
normal[2].msgId == "msg10"
|
||||||
|
|
||||||
|
asyncTest "enqueue - process queued messages":
|
||||||
|
## Given
|
||||||
|
let manager = RateLimitManager[string].new(
|
||||||
|
mockSender,
|
||||||
|
capacity = 10,
|
||||||
|
duration = chronos.milliseconds(200),
|
||||||
|
sleepDuration = chronos.milliseconds(200),
|
||||||
|
)
|
||||||
|
type MomentRef = ref object
|
||||||
|
value: Moment
|
||||||
|
|
||||||
|
var now = Moment.now()
|
||||||
|
var nowRef = MomentRef(value: now)
|
||||||
|
await manager.start(
|
||||||
|
nowProvider = proc(): Moment =
|
||||||
|
return nowRef.value
|
||||||
|
)
|
||||||
|
defer:
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now)
|
||||||
|
let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now)
|
||||||
|
let r3 = await manager.sendOrEnqueue(@[("3", "val_3")], Normal, now)
|
||||||
|
let r4 = await manager.sendOrEnqueue(@[("4", "val_4")], Normal, now)
|
||||||
|
let r5 = await manager.sendOrEnqueue(@[("5", "val_5")], Normal, now)
|
||||||
|
let r6 = await manager.sendOrEnqueue(@[("6", "val_6")], Normal, now)
|
||||||
|
let r7 = await manager.sendOrEnqueue(@[("7", "val_7")], Normal, now)
|
||||||
|
let r8 = await manager.sendOrEnqueue(@[("8", "val_8")], Normal, now)
|
||||||
|
let r9 = await manager.sendOrEnqueue(@[("9", "val_9")], Normal, now)
|
||||||
|
let r10 = await manager.sendOrEnqueue(@[("10", "val_10")], Normal, now)
|
||||||
|
let r11 = await manager.sendOrEnqueue(@[("11", "val_11")], Critical, now)
|
||||||
|
let r12 = await manager.sendOrEnqueue(@[("12", "val_12")], Optional, now)
|
||||||
|
let r13 = await manager.sendOrEnqueue(@[("13", "val_13")], Critical, now)
|
||||||
|
let r14 = await manager.sendOrEnqueue(@[("14", "val_14")], Critical, now)
|
||||||
|
let r15 = await manager.sendOrEnqueue(@[("15", "val_15")], Critical, now)
|
||||||
|
|
||||||
|
check:
|
||||||
|
r1 == PassedToSender
|
||||||
|
r2 == PassedToSender
|
||||||
|
r3 == PassedToSender
|
||||||
|
r4 == PassedToSender
|
||||||
|
r5 == PassedToSender
|
||||||
|
r6 == PassedToSender
|
||||||
|
r7 == PassedToSender
|
||||||
|
r8 == Enqueued
|
||||||
|
r9 == Enqueued
|
||||||
|
r10 == Enqueued
|
||||||
|
r11 == PassedToSender
|
||||||
|
r12 == Dropped
|
||||||
|
r13 == PassedToSender
|
||||||
|
r14 == PassedToSender
|
||||||
|
r15 == Enqueued
|
||||||
|
|
||||||
|
var (critical, normal) = manager.getEnqueued()
|
||||||
|
check:
|
||||||
|
critical.len == 1
|
||||||
|
normal.len == 3
|
||||||
|
normal[0].msgId == "8"
|
||||||
|
normal[1].msgId == "9"
|
||||||
|
normal[2].msgId == "10"
|
||||||
|
critical[0].msgId == "15"
|
||||||
|
|
||||||
|
nowRef.value = now + chronos.milliseconds(250)
|
||||||
|
await sleepAsync(chronos.milliseconds(250))
|
||||||
|
|
||||||
|
(critical, normal) = manager.getEnqueued()
|
||||||
|
check:
|
||||||
|
critical.len == 0
|
||||||
|
normal.len == 0
|
||||||
|
senderCallCount == 14
|
||||||
|
sentMessages.len == 14
|
||||||
|
sentMessages[0].msgId == "1"
|
||||||
|
sentMessages[1].msgId == "2"
|
||||||
|
sentMessages[2].msgId == "3"
|
||||||
|
sentMessages[3].msgId == "4"
|
||||||
|
sentMessages[4].msgId == "5"
|
||||||
|
sentMessages[5].msgId == "6"
|
||||||
|
sentMessages[6].msgId == "7"
|
||||||
|
sentMessages[7].msgId == "11"
|
||||||
|
sentMessages[8].msgId == "13"
|
||||||
|
sentMessages[9].msgId == "14"
|
||||||
|
sentMessages[10].msgId == "15"
|
||||||
|
sentMessages[11].msgId == "8"
|
||||||
|
sentMessages[12].msgId == "9"
|
||||||
|
sentMessages[13].msgId == "10"
|
||||||
Loading…
x
Reference in New Issue
Block a user