mirror of
https://github.com/logos-messaging/nim-chat-sdk.git
synced 2026-01-03 22:53:12 +00:00
235 lines
8.2 KiB
Nim
235 lines
8.2 KiB
Nim
import std/[times, options, tables, sequtils]
|
|
# TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live
|
|
import ./token_bucket
|
|
# import waku/common/rate_limit/token_bucket
|
|
import ./store
|
|
import chronos
|
|
import db_connector/db_sqlite
|
|
|
|
type
|
|
CapacityState {.pure.} = enum
|
|
Normal
|
|
AlmostNone
|
|
None
|
|
|
|
Priority* {.pure.} = enum
|
|
Critical
|
|
Normal
|
|
Optional
|
|
|
|
MsgIdMsg[T] = tuple[msgId: string, msg: T]
|
|
|
|
MessageSender*[T] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
|
|
|
|
RateLimitManager*[T] = ref object
|
|
store: RateLimitStore[T]
|
|
bucket: TokenBucket
|
|
sender: MessageSender[T]
|
|
sleepDuration: chronos.Duration
|
|
pxQueueHandleLoop: Future[void]
|
|
|
|
proc new*[T](
|
|
M: type[RateLimitManager[T]],
|
|
store: RateLimitStore[T],
|
|
sender: MessageSender[T],
|
|
capacity: int = 100,
|
|
duration: chronos.Duration = chronos.minutes(10),
|
|
sleepDuration: chronos.Duration = chronos.milliseconds(1000),
|
|
): Future[RateLimitManager[T]] {.async.} =
|
|
var current = await store.loadBucketState()
|
|
if current.isNone():
|
|
# initialize bucket state with full capacity
|
|
current = some(
|
|
BucketState(budget: capacity, budgetCap: capacity, lastTimeFull: Moment.now())
|
|
)
|
|
discard await store.saveBucketState(current.get())
|
|
|
|
return RateLimitManager[T](
|
|
store: store,
|
|
bucket: TokenBucket.new(
|
|
current.get().budgetCap,
|
|
duration,
|
|
ReplenishMode.Strict,
|
|
current.get().budget,
|
|
current.get().lastTimeFull,
|
|
),
|
|
sender: sender,
|
|
sleepDuration: sleepDuration,
|
|
)
|
|
|
|
proc getCapacityState[T](
|
|
manager: RateLimitManager[T], now: Moment, count: int = 1
|
|
): CapacityState =
|
|
let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
|
|
let countAfter = budget - count
|
|
let ratio = countAfter.float / budgetCap.float
|
|
if ratio < 0.0:
|
|
return CapacityState.None
|
|
elif ratio < 0.3:
|
|
return CapacityState.AlmostNone
|
|
else:
|
|
return CapacityState.Normal
|
|
|
|
proc updateStatuses[T](
|
|
manager: RateLimitManager[T],
|
|
msgs: seq[tuple[msgId: string, msg: T]],
|
|
status: MessageStatus,
|
|
): Future[MessageStatus] {.async.} =
|
|
let msgIds: seq[string] = msgs.mapIt(it.msgId)
|
|
# TODO log failed to update message statuses (if it occurs) think of a logging strategy
|
|
discard await manager.store.updateMessageStatuses(msgIds, status)
|
|
return status
|
|
|
|
proc pushToQueueUpdatingStatus[T](
|
|
manager: RateLimitManager[T],
|
|
queueType: QueueType,
|
|
msgs: seq[tuple[msgId: string, msg: T]],
|
|
): Future[MessageStatus] {.async.} =
|
|
## Pushes to the queue and updates the status of the messages
|
|
let success = await manager.store.pushToQueue(queueType, msgs)
|
|
let status =
|
|
if success: MessageStatus.Enqueued else: MessageStatus.DroppedFailedToEnqueue
|
|
return await manager.updateStatuses(msgs, status)
|
|
|
|
proc passToSender[T](
|
|
manager: RateLimitManager[T],
|
|
msgs: seq[tuple[msgId: string, msg: T]],
|
|
now: Moment,
|
|
priority: Priority,
|
|
): Future[MessageStatus] {.async.} =
|
|
let count = msgs.len
|
|
let consumed = manager.bucket.tryConsume(count, now)
|
|
if not consumed:
|
|
case priority
|
|
of Priority.Critical:
|
|
return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
|
|
of Priority.Normal:
|
|
return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
|
|
of Priority.Optional:
|
|
return await manager.updateStatuses(msgs, MessageStatus.Dropped)
|
|
|
|
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
|
|
discard await manager.store.saveBucketState(
|
|
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
|
|
)
|
|
await manager.sender(msgs)
|
|
return await manager.updateStatuses(msgs, MessageStatus.PassedToSender)
|
|
|
|
proc processCriticalQueue[T](
|
|
manager: RateLimitManager[T], now: Moment
|
|
): Future[void] {.async.} =
|
|
while manager.store.getQueueLength(QueueType.Critical) > 0:
|
|
# Peek at the next batch by getting it, but we'll handle putting it back if needed
|
|
let maybeMsgs = await manager.store.popFromQueue(QueueType.Critical)
|
|
if maybeMsgs.isNone():
|
|
break
|
|
|
|
let msgs = maybeMsgs.get()
|
|
let capacityState = manager.getCapacityState(now, msgs.len)
|
|
if capacityState == CapacityState.Normal:
|
|
discard await manager.passToSender(msgs, now, Priority.Critical)
|
|
elif capacityState == CapacityState.AlmostNone:
|
|
discard await manager.passToSender(msgs, now, Priority.Critical)
|
|
else:
|
|
# Put back to critical queue (add to front not possible, so we add to back and exit)
|
|
# I can safely discard the return since the status will be persisted
|
|
discard await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
|
|
break
|
|
|
|
proc processNormalQueue[T](
|
|
manager: RateLimitManager[T], now: Moment
|
|
): Future[void] {.async.} =
|
|
while manager.store.getQueueLength(QueueType.Normal) > 0:
|
|
# Peek at the next batch by getting it, but we'll handle putting it back if needed
|
|
let maybeMsgs = await manager.store.popFromQueue(QueueType.Normal)
|
|
if maybeMsgs.isNone():
|
|
break
|
|
|
|
let msgs = maybeMsgs.get()
|
|
let capacityState = manager.getCapacityState(now, msgs.len)
|
|
if capacityState == CapacityState.Normal:
|
|
# I can safely discard the return since the status will be persisted
|
|
discard await manager.passToSender(msgs, now, Priority.Normal)
|
|
else:
|
|
# Put back to normal queue (add to front not possible, so we add to back and exit)
|
|
# I can safely discard the return since the status will be persisted
|
|
discard await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
|
|
break
|
|
|
|
proc sendOrEnqueue*[T](
|
|
manager: RateLimitManager[T],
|
|
msgs: seq[tuple[msgId: string, msg: T]],
|
|
priority: Priority,
|
|
now: Moment = Moment.now(),
|
|
): Future[MessageStatus] {.async.} =
|
|
let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
|
|
if msgs.len.float / budgetCap.float >= 0.3:
|
|
# drop batch if it's too large to avoid starvation
|
|
return await manager.updateStatuses(msgs, MessageStatus.DroppedBatchTooLarge)
|
|
|
|
let capacityState = manager.getCapacityState(now, msgs.len)
|
|
case capacityState
|
|
of CapacityState.Normal:
|
|
return await manager.passToSender(msgs, now, priority)
|
|
of CapacityState.AlmostNone:
|
|
case priority
|
|
of Priority.Critical:
|
|
return await manager.passToSender(msgs, now, priority)
|
|
of Priority.Normal:
|
|
return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
|
|
of Priority.Optional:
|
|
return await manager.updateStatuses(msgs, MessageStatus.Dropped)
|
|
of CapacityState.None:
|
|
case priority
|
|
of Priority.Critical:
|
|
return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
|
|
of Priority.Normal:
|
|
return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
|
|
of Priority.Optional:
|
|
return await manager.updateStatuses(msgs, MessageStatus.Dropped)
|
|
|
|
proc queueHandleLoop[T](
|
|
manager: RateLimitManager[T],
|
|
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
|
|
Moment.now(),
|
|
) {.async.} =
|
|
while true:
|
|
try:
|
|
let now = nowProvider()
|
|
await manager.processCriticalQueue(now)
|
|
await manager.processNormalQueue(now)
|
|
except Exception as e:
|
|
echo "Error in queue processing: ", e.msg
|
|
|
|
# configurable sleep duration for processing queued messages
|
|
await sleepAsync(manager.sleepDuration)
|
|
|
|
proc start*[T](
|
|
manager: RateLimitManager[T],
|
|
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
|
|
Moment.now(),
|
|
) {.async.} =
|
|
manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider)
|
|
|
|
proc stop*[T](manager: RateLimitManager[T]) {.async.} =
|
|
if not isNil(manager.pxQueueHandleLoop):
|
|
await manager.pxQueueHandleLoop.cancelAndWait()
|
|
|
|
proc getQuota*[T](
|
|
manager: RateLimitManager[T], now: Moment = Moment.now()
|
|
): tuple[budget: int, budgetCap: int] =
|
|
let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
|
|
return (budget, budgetCap)
|
|
|
|
proc getMessageStatus*[T](
|
|
manager: RateLimitManager[T], msgId: string
|
|
): Future[Option[MessageStatus]] {.async.} =
|
|
return await manager.store.getMessageStatus(msgId)
|
|
|
|
func `$`*[T](b: RateLimitManager[T]): string {.inline.} =
|
|
if isNil(b):
|
|
return "nil"
|
|
return
|
|
"RateLimitManager(critical: " & $b.store.getQueueLength(QueueType.Critical) &
|
|
", normal: " & $b.store.getQueueLength(QueueType.Normal) & ")"
|