mirror of
https://github.com/logos-messaging/nim-chat-sdk.git
synced 2026-01-02 14:13:07 +00:00
313 lines
11 KiB
Nim
313 lines
11 KiB
Nim
import testutils/unittests
|
|
import ../ratelimit/ratelimit_manager
|
|
import ../ratelimit/store
|
|
import chronos
|
|
import db_connector/db_sqlite
|
|
import ../chat_sdk/migration
|
|
import std/[os, options]
|
|
|
|
var dbName = "test_ratelimit_manager.db"
|
|
|
|
suite "Queue RateLimitManager":
|
|
setup:
|
|
let db = open(dbName, "", "", "")
|
|
runMigrations(db)
|
|
|
|
var sentMessages: seq[tuple[msgId: string, msg: string]]
|
|
var senderCallCount: int = 0
|
|
|
|
# Create a mock sender
|
|
proc mockSender(
|
|
msgs: seq[tuple[msgId: string, msg: string]]
|
|
): Future[void] {.async.} =
|
|
senderCallCount.inc()
|
|
for msg in msgs:
|
|
sentMessages.add(msg)
|
|
await sleepAsync(chronos.milliseconds(10))
|
|
|
|
teardown:
|
|
if db != nil:
|
|
db.close()
|
|
if fileExists(dbName):
|
|
removeFile(dbName)
|
|
|
|
asyncTest "sendOrEnqueue - immediate send when capacity available":
|
|
## Given
|
|
let store: RateLimitStore[string] = await RateLimitStore[string].new(db)
|
|
let manager = await RateLimitManager[string].new(
|
|
store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
|
|
)
|
|
let testMsg = "Hello World"
|
|
|
|
## When
|
|
let res = await manager.sendOrEnqueue(@[("msg1", testMsg)], Critical)
|
|
|
|
## Then
|
|
check:
|
|
res == PassedToSender
|
|
senderCallCount == 1
|
|
sentMessages.len == 1
|
|
sentMessages[0].msgId == "msg1"
|
|
sentMessages[0].msg == "Hello World"
|
|
|
|
# Check message status
|
|
let status = await manager.getMessageStatus("msg1")
|
|
check:
|
|
status.isSome()
|
|
status.get() == MessageStatus.PassedToSender
|
|
|
|
asyncTest "sendOrEnqueue - multiple messages":
|
|
## Given
|
|
let store = await RateLimitStore[string].new(db)
|
|
let manager = await RateLimitManager[string].new(
|
|
store, mockSender, capacity = 10, duration = chronos.milliseconds(100)
|
|
)
|
|
|
|
## When
|
|
let res =
|
|
await manager.sendOrEnqueue(@[("msg1", "First"), ("msg2", "Second")], Normal)
|
|
|
|
## Then
|
|
check:
|
|
res == PassedToSender
|
|
senderCallCount == 1
|
|
sentMessages.len == 2
|
|
sentMessages[0].msgId == "msg1"
|
|
sentMessages[0].msg == "First"
|
|
sentMessages[1].msgId == "msg2"
|
|
sentMessages[1].msg == "Second"
|
|
|
|
# Check message statuses
|
|
let status1 = await manager.getMessageStatus("msg1")
|
|
let status2 = await manager.getMessageStatus("msg2")
|
|
check:
|
|
status1.isSome()
|
|
status1.get() == MessageStatus.PassedToSender
|
|
status2.isSome()
|
|
status2.get() == MessageStatus.PassedToSender
|
|
|
|
asyncTest "start and stop - drop large batch":
|
|
## Given
|
|
let store = await RateLimitStore[string].new(db)
|
|
let manager = await RateLimitManager[string].new(
|
|
store,
|
|
mockSender,
|
|
capacity = 2,
|
|
duration = chronos.milliseconds(100),
|
|
sleepDuration = chronos.milliseconds(30),
|
|
)
|
|
|
|
let largeBatch1 = @[("msg1", "First"), ("msg2", "Second"), ("msg3", "Third")]
|
|
let largeBatch2 = @[("msg4", "Fourth"), ("msg5", "Fifth")]
|
|
|
|
discard await manager.sendOrEnqueue(largeBatch1, Normal)
|
|
discard await manager.sendOrEnqueue(largeBatch2, Critical)
|
|
|
|
asyncTest "enqueue - enqueue critical only when exceeded":
|
|
## Given
|
|
let store = await RateLimitStore[string].new(db)
|
|
let manager = await RateLimitManager[string].new(
|
|
store,
|
|
mockSender,
|
|
capacity = 10,
|
|
duration = chronos.milliseconds(100),
|
|
sleepDuration = chronos.milliseconds(100),
|
|
)
|
|
var now = Moment.now()
|
|
await manager.start(
|
|
nowProvider = proc(): Moment =
|
|
now
|
|
)
|
|
defer:
|
|
await manager.stop()
|
|
|
|
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Critical, now)
|
|
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Critical, now)
|
|
let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Critical, now)
|
|
let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Critical, now)
|
|
let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Critical, now)
|
|
let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Critical, now)
|
|
let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Critical, now)
|
|
let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Critical, now)
|
|
let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Critical, now)
|
|
let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Critical, now)
|
|
# will enqueue to critical queue
|
|
let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
|
|
|
|
check:
|
|
r1 == PassedToSender
|
|
r2 == PassedToSender
|
|
r3 == PassedToSender
|
|
r4 == PassedToSender
|
|
r5 == PassedToSender
|
|
r6 == PassedToSender
|
|
r7 == PassedToSender
|
|
r8 == PassedToSender
|
|
r9 == PassedToSender
|
|
r10 == PassedToSender
|
|
r11 == Enqueued
|
|
|
|
# Check message statuses
|
|
let status1 = await manager.getMessageStatus("msg1")
|
|
let status10 = await manager.getMessageStatus("msg10")
|
|
let status11 = await manager.getMessageStatus("msg11")
|
|
check:
|
|
status1.isSome()
|
|
status1.get() == MessageStatus.PassedToSender
|
|
status10.isSome()
|
|
status10.get() == MessageStatus.PassedToSender
|
|
status11.isSome()
|
|
status11.get() == MessageStatus.Enqueued
|
|
|
|
asyncTest "enqueue - enqueue normal on 70% capacity":
|
|
## Given
|
|
let store = await RateLimitStore[string].new(db)
|
|
let manager = await RateLimitManager[string].new(
|
|
store,
|
|
mockSender,
|
|
capacity = 10,
|
|
duration = chronos.milliseconds(100),
|
|
sleepDuration = chronos.milliseconds(100),
|
|
)
|
|
var now = Moment.now()
|
|
await manager.start(
|
|
nowProvider = proc(): Moment =
|
|
now
|
|
)
|
|
defer:
|
|
await manager.stop()
|
|
|
|
let r1 = await manager.sendOrEnqueue(@[("msg1", "1")], Normal, now)
|
|
let r2 = await manager.sendOrEnqueue(@[("msg2", "2")], Normal, now)
|
|
let r3 = await manager.sendOrEnqueue(@[("msg3", "3")], Normal, now)
|
|
let r4 = await manager.sendOrEnqueue(@[("msg4", "4")], Normal, now)
|
|
let r5 = await manager.sendOrEnqueue(@[("msg5", "5")], Normal, now)
|
|
let r6 = await manager.sendOrEnqueue(@[("msg6", "6")], Normal, now)
|
|
let r7 = await manager.sendOrEnqueue(@[("msg7", "7")], Normal, now)
|
|
let r8 = await manager.sendOrEnqueue(@[("msg8", "8")], Normal, now)
|
|
let r9 = await manager.sendOrEnqueue(@[("msg9", "9")], Normal, now)
|
|
let r10 = await manager.sendOrEnqueue(@[("msg10", "10")], Normal, now)
|
|
let r11 = await manager.sendOrEnqueue(@[("msg11", "11")], Critical, now)
|
|
let r12 = await manager.sendOrEnqueue(@[("msg12", "12")], Optional, now)
|
|
|
|
check:
|
|
r1 == PassedToSender
|
|
r2 == PassedToSender
|
|
r3 == PassedToSender
|
|
r4 == PassedToSender
|
|
r5 == PassedToSender
|
|
r6 == PassedToSender
|
|
r7 == PassedToSender
|
|
r8 == Enqueued
|
|
r9 == Enqueued
|
|
r10 == Enqueued
|
|
r11 == PassedToSender
|
|
r12 == Dropped
|
|
|
|
# Check message statuses for different outcomes
|
|
let statusSent = await manager.getMessageStatus("msg1")
|
|
let statusEnqueued = await manager.getMessageStatus("msg8")
|
|
let statusCritical = await manager.getMessageStatus("msg11")
|
|
let statusDropped = await manager.getMessageStatus("msg12")
|
|
check:
|
|
statusSent.isSome()
|
|
statusSent.get() == MessageStatus.PassedToSender
|
|
statusEnqueued.isSome()
|
|
statusEnqueued.get() == MessageStatus.Enqueued
|
|
statusCritical.isSome()
|
|
statusCritical.get() == MessageStatus.PassedToSender
|
|
statusDropped.isSome()
|
|
statusDropped.get() == MessageStatus.Dropped
|
|
|
|
asyncTest "enqueue - process queued messages":
|
|
## Given
|
|
let store = await RateLimitStore[string].new(db)
|
|
let manager = await RateLimitManager[string].new(
|
|
store,
|
|
mockSender,
|
|
capacity = 10,
|
|
duration = chronos.milliseconds(200),
|
|
sleepDuration = chronos.milliseconds(200),
|
|
)
|
|
type MomentRef = ref object
|
|
value: Moment
|
|
|
|
var now = Moment.now()
|
|
var nowRef = MomentRef(value: now)
|
|
await manager.start(
|
|
nowProvider = proc(): Moment =
|
|
return nowRef.value
|
|
)
|
|
defer:
|
|
await manager.stop()
|
|
|
|
let r1 = await manager.sendOrEnqueue(@[("1", "val_1")], Normal, now)
|
|
let r2 = await manager.sendOrEnqueue(@[("2", "val_2")], Normal, now)
|
|
let r3 = await manager.sendOrEnqueue(@[("3", "val_3")], Normal, now)
|
|
let r4 = await manager.sendOrEnqueue(@[("4", "val_4")], Normal, now)
|
|
let r5 = await manager.sendOrEnqueue(@[("5", "val_5")], Normal, now)
|
|
let r6 = await manager.sendOrEnqueue(@[("6", "val_6")], Normal, now)
|
|
let r7 = await manager.sendOrEnqueue(@[("7", "val_7")], Normal, now)
|
|
let r8 = await manager.sendOrEnqueue(@[("8", "val_8")], Normal, now)
|
|
let r9 = await manager.sendOrEnqueue(@[("9", "val_9")], Normal, now)
|
|
let r10 = await manager.sendOrEnqueue(@[("10", "val_10")], Normal, now)
|
|
let r11 = await manager.sendOrEnqueue(@[("11", "val_11")], Critical, now)
|
|
let r12 = await manager.sendOrEnqueue(@[("12", "val_12")], Optional, now)
|
|
let r13 = await manager.sendOrEnqueue(@[("13", "val_13")], Critical, now)
|
|
let r14 = await manager.sendOrEnqueue(@[("14", "val_14")], Critical, now)
|
|
let r15 = await manager.sendOrEnqueue(@[("15", "val_15")], Critical, now)
|
|
|
|
check:
|
|
r1 == PassedToSender
|
|
r2 == PassedToSender
|
|
r3 == PassedToSender
|
|
r4 == PassedToSender
|
|
r5 == PassedToSender
|
|
r6 == PassedToSender
|
|
r7 == PassedToSender
|
|
r8 == Enqueued
|
|
r9 == Enqueued
|
|
r10 == Enqueued
|
|
r11 == PassedToSender
|
|
r12 == Dropped
|
|
r13 == PassedToSender
|
|
r14 == PassedToSender
|
|
r15 == Enqueued
|
|
|
|
check:
|
|
senderCallCount == 10 # 10 messages passed to sender
|
|
sentMessages.len == 10
|
|
sentMessages[0].msgId == "1"
|
|
sentMessages[1].msgId == "2"
|
|
sentMessages[2].msgId == "3"
|
|
sentMessages[3].msgId == "4"
|
|
sentMessages[4].msgId == "5"
|
|
sentMessages[5].msgId == "6"
|
|
sentMessages[6].msgId == "7"
|
|
sentMessages[7].msgId == "11"
|
|
sentMessages[8].msgId == "13"
|
|
sentMessages[9].msgId == "14"
|
|
|
|
nowRef.value = now + chronos.milliseconds(250)
|
|
await sleepAsync(chronos.milliseconds(250))
|
|
|
|
check:
|
|
senderCallCount == 14
|
|
sentMessages.len == 14
|
|
sentMessages[10].msgId == "15"
|
|
sentMessages[11].msgId == "8"
|
|
sentMessages[12].msgId == "9"
|
|
sentMessages[13].msgId == "10"
|
|
|
|
# Check message statuses after queue processing
|
|
let statusProcessed8 = await manager.getMessageStatus("8")
|
|
let statusProcessed15 = await manager.getMessageStatus("15")
|
|
let statusDropped12 = await manager.getMessageStatus("12")
|
|
check:
|
|
statusProcessed8.isSome()
|
|
statusProcessed8.get() == MessageStatus.PassedToSender
|
|
statusProcessed15.isSome()
|
|
statusProcessed15.get() == MessageStatus.PassedToSender
|
|
statusDropped12.isSome()
|
|
statusDropped12.get() == MessageStatus.Dropped
|