From b21ff2d1cdb42750133cf7865052574c26141e17 Mon Sep 17 00:00:00 2001
From: pablo
Date: Mon, 1 Sep 2025 11:50:41 +0300
Subject: [PATCH] feat: expose getQuota and getMessageStatus to query for the
status of messages
---
migrations/001_create_ratelimit_state.sql | 6 ++
ratelimit/ratelimit_manager.nim | 78 +++++++++++++++--------
ratelimit/store.nim | 37 +++++++++++
3 files changed, 94 insertions(+), 27 deletions(-)
diff --git a/migrations/001_create_ratelimit_state.sql b/migrations/001_create_ratelimit_state.sql
index 293c6ee..af73ba2 100644
--- a/migrations/001_create_ratelimit_state.sql
+++ b/migrations/001_create_ratelimit_state.sql
@@ -10,4 +10,10 @@ CREATE TABLE IF NOT EXISTS ratelimit_queues (
batch_id INTEGER NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY (queue_type, batch_id, msg_id)
+);
+
+CREATE TABLE IF NOT EXISTS ratelimit_message_status (
+ msg_id TEXT PRIMARY KEY,
+ "status" INTEGER NOT NULL,
+ updated_at INTEGER NOT NULL
);
\ No newline at end of file
diff --git a/ratelimit/ratelimit_manager.nim b/ratelimit/ratelimit_manager.nim
index a30b5be..ef366a4 100644
--- a/ratelimit/ratelimit_manager.nim
+++ b/ratelimit/ratelimit_manager.nim
@@ -1,4 +1,4 @@
-import std/[times, options]
+import std/[times, options, tables, sequtils]
# TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live
import ./token_bucket
# import waku/common/rate_limit/token_bucket
@@ -12,12 +12,6 @@ type
AlmostNone
None
- SendResult* {.pure.} = enum
- PassedToSender
- Enqueued
- Dropped
- DroppedBatchTooLarge
-
Priority* {.pure.} = enum
Critical
Normal
@@ -76,31 +70,50 @@ proc getCapacityState[T](
else:
return CapacityState.Normal
+proc updateStatuses[T](
+ manager: RateLimitManager[T],
+ msgs: seq[tuple[msgId: string, msg: T]],
+ status: MessageStatus,
+): Future[MessageStatus] {.async.} =
+ let msgIds: seq[string] = msgs.mapIt(it.msgId)
+ # TODO log failed to update message statuses (if it occurs) think of a logging strategy
+ discard await manager.store.updateMessageStatuses(msgIds, status)
+ return status
+
+proc pushToQueueUpdatingStatus[T](
+ manager: RateLimitManager[T],
+ queueType: QueueType,
+ msgs: seq[tuple[msgId: string, msg: T]],
+): Future[MessageStatus] {.async.} =
+ ## Pushes to the queue and updates the status of the messages
+ let success = await manager.store.pushToQueue(queueType, msgs)
+ let status =
+ if success: MessageStatus.Enqueued else: MessageStatus.DroppedFailedToEnqueue
+ return await manager.updateStatuses(msgs, status)
+
proc passToSender[T](
manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]],
now: Moment,
priority: Priority,
-): Future[SendResult] {.async.} =
+): Future[MessageStatus] {.async.} =
let count = msgs.len
let consumed = manager.bucket.tryConsume(count, now)
if not consumed:
case priority
of Priority.Critical:
- discard await manager.store.pushToQueue(QueueType.Critical, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
of Priority.Normal:
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
- return SendResult.Dropped
+ return await manager.updateStatuses(msgs, MessageStatus.Dropped)
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
discard await manager.store.saveBucketState(
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
)
await manager.sender(msgs)
- return SendResult.PassedToSender
+ return await manager.updateStatuses(msgs, MessageStatus.PassedToSender)
proc processCriticalQueue[T](
manager: RateLimitManager[T], now: Moment
@@ -119,7 +132,8 @@ proc processCriticalQueue[T](
discard await manager.passToSender(msgs, now, Priority.Critical)
else:
# Put back to critical queue (add to front not possible, so we add to back and exit)
- discard await manager.store.pushToQueue(QueueType.Critical, msgs)
+ # I can safely discard the return since the status will be persisted
+ discard await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
break
proc processNormalQueue[T](
@@ -134,10 +148,12 @@ proc processNormalQueue[T](
let msgs = maybeMsgs.get()
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
+ # I can safely discard the return since the status will be persisted
discard await manager.passToSender(msgs, now, Priority.Normal)
else:
# Put back to normal queue (add to front not possible, so we add to back and exit)
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
+ # I can safely discard the return since the status will be persisted
+ discard await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
break
proc sendOrEnqueue*[T](
@@ -145,11 +161,11 @@ proc sendOrEnqueue*[T](
msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority,
now: Moment = Moment.now(),
-): Future[SendResult] {.async.} =
+): Future[MessageStatus] {.async.} =
let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation
- return SendResult.DroppedBatchTooLarge
+ return await manager.updateStatuses(msgs, MessageStatus.DroppedBatchTooLarge)
let capacityState = manager.getCapacityState(now, msgs.len)
case capacityState
@@ -160,22 +176,19 @@ proc sendOrEnqueue*[T](
of Priority.Critical:
return await manager.passToSender(msgs, now, priority)
of Priority.Normal:
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
- return SendResult.Dropped
+ return MessageStatus.Dropped
of CapacityState.None:
case priority
of Priority.Critical:
- discard await manager.store.pushToQueue(QueueType.Critical, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
of Priority.Normal:
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
- return SendResult.Dropped
+ return await manager.updateStatuses(msgs, MessageStatus.Dropped)
-proc queueHandleLoop*[T](
+proc queueHandleLoop[T](
manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
@@ -202,6 +215,17 @@ proc stop*[T](manager: RateLimitManager[T]) {.async.} =
if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait()
+proc getQuota*[T](
+ manager: RateLimitManager[T], now: Moment = Moment.now()
+): tuple[budget: int, budgetCap: int] =
+ let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
+ return (budget, budgetCap)
+
+proc getMessageStatus*[T](
+ manager: RateLimitManager[T], msgId: string
+): Future[Option[MessageStatus]] {.async.} =
+ return await manager.store.getMessageStatus(msgId)
+
func `$`*[T](b: RateLimitManager[T]): string {.inline.} =
if isNil(b):
return "nil"
diff --git a/ratelimit/store.nim b/ratelimit/store.nim
index 42fd152..d41783e 100644
--- a/ratelimit/store.nim
+++ b/ratelimit/store.nim
@@ -20,6 +20,13 @@ type
Critical = "critical"
Normal = "normal"
+ MessageStatus* {.pure.} = enum
+ PassedToSender
+ Enqueued
+ Dropped
+ DroppedBatchTooLarge
+ DroppedFailedToEnqueue
+
const BUCKET_STATE_KEY = "rate_limit_bucket_state"
## TODO find a way to make these procs async
@@ -192,6 +199,36 @@ proc popFromQueue*[T](
except:
return none(seq[tuple[msgId: string, msg: T]])
+proc updateMessageStatuses*[T](
+ store: RateLimitStore[T], messageIds: seq[string], status: MessageStatus
+): Future[bool] {.async.} =
+ try:
+ let now = times.getTime().toUnix()
+ store.db.exec(sql"BEGIN TRANSACTION")
+ for msgId in messageIds:
+ store.db.exec(
+ sql"INSERT INTO ratelimit_message_status (msg_id, status, updated_at) VALUES (?, ?, ?) ON CONFLICT(msg_id) DO UPDATE SET status = excluded.status, updated_at = excluded.updated_at",
+ msgId,
+ status,
+ now,
+ )
+ store.db.exec(sql"COMMIT")
+ return true
+ except:
+ store.db.exec(sql"ROLLBACK")
+ return false
+
+proc getMessageStatus*[T](
+ store: RateLimitStore[T], messageId: string
+): Future[Option[MessageStatus]] {.async.} =
+ let statusStr = store.db.getValue(
+ sql"SELECT status FROM ratelimit_message_status WHERE msg_id = ?", messageId
+ )
+ if statusStr == "":
+ return none(MessageStatus)
+
+ return some(MessageStatus.parse(statusStr))
+
proc getQueueLength*[T](store: RateLimitStore[T], queueType: QueueType): int =
case queueType
of QueueType.Critical: