mirror of
https://github.com/logos-messaging/nim-chat-sdk.git
synced 2026-01-02 14:13:07 +00:00
feat: rate limit (wip: no storage)
This commit is contained in:
parent
1dd9cd9ac8
commit
527bdde50a
7
.gitignore
vendored
7
.gitignore
vendored
@ -1,3 +1,6 @@
|
|||||||
|
# IDE
|
||||||
|
.vscode/
|
||||||
|
|
||||||
# Nim
|
# Nim
|
||||||
nimcache/
|
nimcache/
|
||||||
*.exe
|
*.exe
|
||||||
@ -10,4 +13,6 @@ nimcache/
|
|||||||
*.a
|
*.a
|
||||||
|
|
||||||
# OS
|
# OS
|
||||||
.DS_Store
|
.DS_Store
|
||||||
|
nimble.develop
|
||||||
|
nimble.paths
|
||||||
|
|||||||
@ -7,9 +7,12 @@ license = "MIT"
|
|||||||
srcDir = "src"
|
srcDir = "src"
|
||||||
|
|
||||||
|
|
||||||
# Dependencies
|
|
||||||
|
|
||||||
requires "nim >= 2.0.0"
|
### Dependencies
|
||||||
|
requires "nim >= 2.2.4",
|
||||||
|
"chronicles",
|
||||||
|
"chronos",
|
||||||
|
"db_connector"
|
||||||
|
|
||||||
task buildSharedLib, "Build shared library for C bindings":
|
task buildSharedLib, "Build shared library for C bindings":
|
||||||
exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
|
exec "nim c --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"
|
||||||
|
|||||||
4
config.nims
Normal file
4
config.nims
Normal file
@ -0,0 +1,4 @@
|
|||||||
|
# begin Nimble config (version 2)
|
||||||
|
when withDir(thisDir(), system.fileExists("nimble.paths")):
|
||||||
|
include "nimble.paths"
|
||||||
|
# end Nimble config
|
||||||
183
ratelimit/ratelimit.nim
Normal file
183
ratelimit/ratelimit.nim
Normal file
@ -0,0 +1,183 @@
|
|||||||
|
import std/[times, deques]
|
||||||
|
import chronos
|
||||||
|
|
||||||
|
type
|
||||||
|
MessagePriority* = enum
|
||||||
|
Optional = 0
|
||||||
|
Normal = 1
|
||||||
|
Critical = 2
|
||||||
|
|
||||||
|
QueuedMessage*[T] = object
|
||||||
|
messageId*: string
|
||||||
|
msg*: T
|
||||||
|
priority*: MessagePriority
|
||||||
|
queuedAt*: float
|
||||||
|
|
||||||
|
MessageSender*[T] = proc(messageId: string, msg: T): Future[bool] {.async.}
|
||||||
|
|
||||||
|
RateLimitManager*[T] = ref object
|
||||||
|
messageCount*: int = 100 # Default to 100 messages
|
||||||
|
epochDurationSec*: int = 600 # Default to 10 minutes
|
||||||
|
currentCount*: int
|
||||||
|
currentEpoch*: int64
|
||||||
|
criticalQueue*: Deque[QueuedMessage[T]]
|
||||||
|
normalQueue*: Deque[QueuedMessage[T]]
|
||||||
|
optionalQueue*: Deque[QueuedMessage[T]]
|
||||||
|
sender*: MessageSender[T]
|
||||||
|
isRunning*: bool
|
||||||
|
sendTask*: Future[void]
|
||||||
|
|
||||||
|
proc getCurrentEpoch(epochDurationSec: int): int64 =
|
||||||
|
int64(epochTime() / float(epochDurationSec))
|
||||||
|
|
||||||
|
proc newRateLimitManager*[T](messageCount: int, epochDurationSec: int, sender: MessageSender[T]): RateLimitManager[T] =
|
||||||
|
RateLimitManager[T](
|
||||||
|
messageCount: messageCount,
|
||||||
|
epochDurationSec: epochDurationSec,
|
||||||
|
currentCount: 0,
|
||||||
|
currentEpoch: getCurrentEpoch(epochDurationSec),
|
||||||
|
criticalQueue: initDeque[QueuedMessage[T]](),
|
||||||
|
normalQueue: initDeque[QueuedMessage[T]](),
|
||||||
|
optionalQueue: initDeque[QueuedMessage[T]](),
|
||||||
|
sender: sender,
|
||||||
|
isRunning: false
|
||||||
|
)
|
||||||
|
|
||||||
|
proc updateEpochIfNeeded[T](manager: RateLimitManager[T]) =
|
||||||
|
let newEpoch = getCurrentEpoch(manager.epochDurationSec)
|
||||||
|
if newEpoch > manager.currentEpoch:
|
||||||
|
manager.currentEpoch = newEpoch
|
||||||
|
manager.currentCount = 0
|
||||||
|
|
||||||
|
proc getUsagePercent[T](manager: RateLimitManager[T]): float =
|
||||||
|
if manager.messageCount == 0:
|
||||||
|
return 1.0
|
||||||
|
float(manager.currentCount) / float(manager.messageCount)
|
||||||
|
|
||||||
|
proc queueForSend*[T](manager: RateLimitManager[T], messageId: string, msg: T, priority: MessagePriority) =
|
||||||
|
manager.updateEpochIfNeeded()
|
||||||
|
|
||||||
|
let queuedMsg = QueuedMessage[T](
|
||||||
|
messageId: messageId,
|
||||||
|
msg: msg,
|
||||||
|
priority: priority,
|
||||||
|
queuedAt: epochTime()
|
||||||
|
)
|
||||||
|
|
||||||
|
let usagePercent = manager.getUsagePercent()
|
||||||
|
|
||||||
|
if usagePercent >= 1.0:
|
||||||
|
# Quota exhausted - queue critical on top, queue normal, drop optional
|
||||||
|
case priority:
|
||||||
|
of Critical:
|
||||||
|
manager.criticalQueue.addLast(queuedMsg)
|
||||||
|
of Normal:
|
||||||
|
manager.normalQueue.addLast(queuedMsg)
|
||||||
|
of Optional:
|
||||||
|
discard # Drop optional messages when quota exhausted
|
||||||
|
elif usagePercent >= 0.7:
|
||||||
|
# Low quota - send critical, queue normal, drop optional
|
||||||
|
case priority:
|
||||||
|
of Critical:
|
||||||
|
manager.criticalQueue.addLast(queuedMsg)
|
||||||
|
of Normal:
|
||||||
|
manager.normalQueue.addLast(queuedMsg)
|
||||||
|
of Optional:
|
||||||
|
discard # Drop optional messages when quota low
|
||||||
|
else:
|
||||||
|
# Normal operation - queue all messages
|
||||||
|
case priority:
|
||||||
|
of Critical:
|
||||||
|
manager.criticalQueue.addLast(queuedMsg)
|
||||||
|
of Normal:
|
||||||
|
manager.normalQueue.addLast(queuedMsg)
|
||||||
|
of Optional:
|
||||||
|
manager.optionalQueue.addLast(queuedMsg)
|
||||||
|
|
||||||
|
proc getNextMessage[T](manager: RateLimitManager[T]): QueuedMessage[T] =
|
||||||
|
# Priority order: Critical -> Normal -> Optional
|
||||||
|
if manager.criticalQueue.len > 0:
|
||||||
|
return manager.criticalQueue.popFirst()
|
||||||
|
elif manager.normalQueue.len > 0:
|
||||||
|
return manager.normalQueue.popFirst()
|
||||||
|
elif manager.optionalQueue.len > 0:
|
||||||
|
return manager.optionalQueue.popFirst()
|
||||||
|
else:
|
||||||
|
raise newException(ValueError, "No messages in queue")
|
||||||
|
|
||||||
|
proc hasMessages[T](manager: RateLimitManager[T]): bool =
|
||||||
|
manager.criticalQueue.len > 0 or manager.normalQueue.len > 0 or manager.optionalQueue.len > 0
|
||||||
|
|
||||||
|
proc sendLoop*[T](manager: RateLimitManager[T]): Future[void] {.async.} =
|
||||||
|
manager.isRunning = true
|
||||||
|
|
||||||
|
while manager.isRunning:
|
||||||
|
try:
|
||||||
|
manager.updateEpochIfNeeded()
|
||||||
|
|
||||||
|
while manager.hasMessages() and manager.currentCount < manager.messageCount:
|
||||||
|
let msg = manager.getNextMessage()
|
||||||
|
|
||||||
|
try:
|
||||||
|
let sent = await manager.sender(msg.messageId, msg.msg)
|
||||||
|
if sent:
|
||||||
|
manager.currentCount += 1
|
||||||
|
else:
|
||||||
|
# Re-queue failed message at beginning of appropriate queue
|
||||||
|
case msg.priority:
|
||||||
|
of Critical:
|
||||||
|
manager.criticalQueue.addFirst(msg)
|
||||||
|
of Normal:
|
||||||
|
manager.normalQueue.addFirst(msg)
|
||||||
|
of Optional:
|
||||||
|
manager.optionalQueue.addFirst(msg)
|
||||||
|
break # Stop trying to send more messages if one fails
|
||||||
|
except:
|
||||||
|
# Re-queue on exception
|
||||||
|
case msg.priority:
|
||||||
|
of Critical:
|
||||||
|
manager.criticalQueue.addFirst(msg)
|
||||||
|
of Normal:
|
||||||
|
manager.normalQueue.addFirst(msg)
|
||||||
|
of Optional:
|
||||||
|
manager.optionalQueue.addFirst(msg)
|
||||||
|
break
|
||||||
|
|
||||||
|
# Small delay between messages TODO not sure if needed
|
||||||
|
await sleepAsync(chronos.milliseconds(10))
|
||||||
|
|
||||||
|
# Wait before next processing cycle TODO not completely sure if this is the way
|
||||||
|
await sleepAsync(chronos.milliseconds(100))
|
||||||
|
except CancelledError:
|
||||||
|
break
|
||||||
|
except:
|
||||||
|
await sleepAsync(chronos.seconds(1)) # Wait longer on error
|
||||||
|
|
||||||
|
proc start*[T](manager: RateLimitManager[T]): Future[void] {.async.} =
|
||||||
|
if not manager.isRunning:
|
||||||
|
manager.sendTask = manager.sendLoop()
|
||||||
|
|
||||||
|
proc stop*[T](manager: RateLimitManager[T]): Future[void] {.async.} =
|
||||||
|
if manager.isRunning:
|
||||||
|
manager.isRunning = false
|
||||||
|
if not manager.sendTask.isNil:
|
||||||
|
manager.sendTask.cancelSoon()
|
||||||
|
try:
|
||||||
|
await manager.sendTask
|
||||||
|
except CancelledError:
|
||||||
|
discard
|
||||||
|
|
||||||
|
proc getQueueStatus*[T](manager: RateLimitManager[T]): tuple[critical: int, normal: int, optional: int, total: int] =
|
||||||
|
(
|
||||||
|
critical: manager.criticalQueue.len,
|
||||||
|
normal: manager.normalQueue.len,
|
||||||
|
optional: manager.optionalQueue.len,
|
||||||
|
total: manager.criticalQueue.len + manager.normalQueue.len + manager.optionalQueue.len
|
||||||
|
)
|
||||||
|
|
||||||
|
proc getCurrentQuota*[T](manager: RateLimitManager[T]): tuple[total: int, used: int, remaining: int] =
|
||||||
|
(
|
||||||
|
total: manager.messageCount,
|
||||||
|
used: manager.currentCount,
|
||||||
|
remaining: max(0, manager.messageCount - manager.currentCount)
|
||||||
|
)
|
||||||
233
tests/test_ratelimit.nim
Normal file
233
tests/test_ratelimit.nim
Normal file
@ -0,0 +1,233 @@
|
|||||||
|
{.used.}
|
||||||
|
|
||||||
|
import std/[sequtils, times, random], testutils/unittests
|
||||||
|
import ../ratelimit/ratelimit
|
||||||
|
import chronos
|
||||||
|
|
||||||
|
randomize()
|
||||||
|
|
||||||
|
# Test message types
|
||||||
|
type
|
||||||
|
TestMessage = object
|
||||||
|
content: string
|
||||||
|
id: int
|
||||||
|
|
||||||
|
suite "Rate Limit Manager":
|
||||||
|
setup:
|
||||||
|
## Given
|
||||||
|
let epochDuration = 60
|
||||||
|
var sentMessages: seq[string]
|
||||||
|
|
||||||
|
proc testSender(messageId: string, msg: string): Future[bool] {.async.} =
|
||||||
|
sentMessages.add(msg)
|
||||||
|
await sleepAsync(chronos.milliseconds(10))
|
||||||
|
return true
|
||||||
|
|
||||||
|
asyncTest "basic message queueing and sending":
|
||||||
|
## Given
|
||||||
|
let capacity = 10
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
## When
|
||||||
|
manager.queueForSend("msg1", "Hello", Critical)
|
||||||
|
manager.queueForSend("msg2", "World", Normal)
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(200))
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
sentMessages.len == 2
|
||||||
|
sentMessages[0] == "Hello"
|
||||||
|
sentMessages[1] == "World"
|
||||||
|
|
||||||
|
asyncTest "priority ordering - critical first, then normal, then optional":
|
||||||
|
## Given
|
||||||
|
let capacity = 10
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
## When - queue messages in mixed priority order
|
||||||
|
manager.queueForSend("normal1", "Normal1", Normal)
|
||||||
|
manager.queueForSend("critical1", "Critical1", Critical)
|
||||||
|
manager.queueForSend("optional1", "Optional1", Optional)
|
||||||
|
manager.queueForSend("critical2", "Critical2", Critical)
|
||||||
|
manager.queueForSend("normal2", "Normal2", Normal)
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(300))
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
## Then - critical messages should be sent first
|
||||||
|
check:
|
||||||
|
sentMessages.len == 5
|
||||||
|
sentMessages[0] == "Critical1" # First critical
|
||||||
|
sentMessages[1] == "Critical2" # Second critical
|
||||||
|
sentMessages[2] == "Normal1" # Then normal messages
|
||||||
|
sentMessages[3] == "Normal2"
|
||||||
|
sentMessages[4] == "Optional1" # Finally optional
|
||||||
|
|
||||||
|
asyncTest "rate limiting - respects message quota per epoch":
|
||||||
|
## Given
|
||||||
|
let capacity = 3 # Only 3 messages allowed
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
## When - queue more messages than the limit
|
||||||
|
for i in 1..5:
|
||||||
|
manager.queueForSend("msg" & $i, "Message" & $i, Normal)
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(300))
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let quota = manager.getCurrentQuota()
|
||||||
|
let queueStatus = manager.getQueueStatus()
|
||||||
|
|
||||||
|
check:
|
||||||
|
quota.used == 3
|
||||||
|
quota.remaining == 0
|
||||||
|
sentMessages.len == 3
|
||||||
|
queueStatus.total == 2 # 2 messages should be queued
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
asyncTest "optional message dropping at high usage":
|
||||||
|
## Given
|
||||||
|
let capacity = 10
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
# Fill to 80% capacity to trigger optional dropping
|
||||||
|
for i in 1..8:
|
||||||
|
manager.queueForSend("fill" & $i, "Fill" & $i, Normal)
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(200))
|
||||||
|
|
||||||
|
## When - add messages at high usage
|
||||||
|
manager.queueForSend("critical", "Critical", Critical)
|
||||||
|
manager.queueForSend("normal", "Normal", Normal)
|
||||||
|
manager.queueForSend("optional", "Optional", Optional) # Should be dropped
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(200))
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let quota = manager.getCurrentQuota()
|
||||||
|
let optionalSent = sentMessages.anyIt(it == "Optional")
|
||||||
|
|
||||||
|
check:
|
||||||
|
quota.used == 10
|
||||||
|
not optionalSent # Optional message should not be sent
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
asyncTest "quota tracking - accurate total, used, and remaining counts":
|
||||||
|
## Given
|
||||||
|
let capacity = 5
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
## When - initially no messages sent
|
||||||
|
let initialQuota = manager.getCurrentQuota()
|
||||||
|
|
||||||
|
## Then - should show full quota available
|
||||||
|
check:
|
||||||
|
initialQuota.total == 5
|
||||||
|
initialQuota.used == 0
|
||||||
|
initialQuota.remaining == 5
|
||||||
|
|
||||||
|
## When - send some messages
|
||||||
|
manager.queueForSend("msg1", "Message1", Normal)
|
||||||
|
manager.queueForSend("msg2", "Message2", Normal)
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(200))
|
||||||
|
|
||||||
|
## Then - quota should be updated
|
||||||
|
let afterQuota = manager.getCurrentQuota()
|
||||||
|
check:
|
||||||
|
afterQuota.used == 2
|
||||||
|
afterQuota.remaining == 3
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
asyncTest "queue status tracking - by priority levels":
|
||||||
|
## Given
|
||||||
|
let capacity = 2 # Small limit to force queueing
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
## When - fill quota and add more messages
|
||||||
|
manager.queueForSend("msg1", "Message1", Critical)
|
||||||
|
manager.queueForSend("msg2", "Message2", Normal)
|
||||||
|
manager.queueForSend("msg3", "Message3", Critical) # Should be queued
|
||||||
|
manager.queueForSend("msg4", "Message4", Normal) # Should be queued
|
||||||
|
manager.queueForSend("msg5", "Message5", Optional) # Should be queued
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(100))
|
||||||
|
|
||||||
|
## Then
|
||||||
|
let queueStatus = manager.getQueueStatus()
|
||||||
|
|
||||||
|
check:
|
||||||
|
queueStatus.total >= 2 # At least some messages queued
|
||||||
|
queueStatus.critical >= 1 # Critical messages in queue
|
||||||
|
queueStatus.normal >= 1 # Normal messages in queue
|
||||||
|
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
suite "Generic Type Support":
|
||||||
|
setup:
|
||||||
|
## Given
|
||||||
|
let epochDuration = 60
|
||||||
|
var sentCustomMessages: seq[TestMessage]
|
||||||
|
|
||||||
|
proc testCustomSender(messageId: string, msg: TestMessage): Future[bool] {.async.} =
|
||||||
|
sentCustomMessages.add(msg)
|
||||||
|
await sleepAsync(chronos.milliseconds(10))
|
||||||
|
return true
|
||||||
|
|
||||||
|
asyncTest "custom message types - TestMessage objects":
|
||||||
|
## Given
|
||||||
|
let capacity = 5
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testCustomSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
## When
|
||||||
|
let testMsg = TestMessage(content: "Test content", id: 42)
|
||||||
|
manager.queueForSend("custom1", testMsg, Normal)
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(200))
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
sentCustomMessages.len == 1
|
||||||
|
sentCustomMessages[0].content == "Test content"
|
||||||
|
sentCustomMessages[0].id == 42
|
||||||
|
|
||||||
|
asyncTest "integer message types":
|
||||||
|
## Given
|
||||||
|
let capacity = 5
|
||||||
|
var sentInts: seq[int]
|
||||||
|
|
||||||
|
proc testIntSender(messageId: string, msg: int): Future[bool] {.async.} =
|
||||||
|
sentInts.add(msg)
|
||||||
|
await sleepAsync(chronos.milliseconds(10))
|
||||||
|
return true
|
||||||
|
|
||||||
|
var manager = newRateLimitManager(capacity, epochDuration, testIntSender)
|
||||||
|
await manager.start()
|
||||||
|
|
||||||
|
## When
|
||||||
|
manager.queueForSend("int1", 42, Critical)
|
||||||
|
manager.queueForSend("int2", 100, Normal)
|
||||||
|
manager.queueForSend("int3", 999, Optional)
|
||||||
|
|
||||||
|
await sleepAsync(chronos.milliseconds(200))
|
||||||
|
await manager.stop()
|
||||||
|
|
||||||
|
## Then
|
||||||
|
check:
|
||||||
|
sentInts.len == 3
|
||||||
|
sentInts[0] == 42 # Critical sent first
|
||||||
|
sentInts[1] == 100 # Normal sent second
|
||||||
|
sentInts[2] == 999 # Optional sent last
|
||||||
Loading…
x
Reference in New Issue
Block a user