From b21ff2d1cdb42750133cf7865052574c26141e17 Mon Sep 17 00:00:00 2001
From: pablo
Date: Mon, 1 Sep 2025 11:50:41 +0300
Subject: [PATCH 1/2] feat: expose getQuota and getMessageStatus to query for
the status of messages
---
migrations/001_create_ratelimit_state.sql | 6 ++
ratelimit/ratelimit_manager.nim | 78 +++++++++++++++--------
ratelimit/store.nim | 37 +++++++++++
3 files changed, 94 insertions(+), 27 deletions(-)
diff --git a/migrations/001_create_ratelimit_state.sql b/migrations/001_create_ratelimit_state.sql
index 293c6ee..af73ba2 100644
--- a/migrations/001_create_ratelimit_state.sql
+++ b/migrations/001_create_ratelimit_state.sql
@@ -10,4 +10,10 @@ CREATE TABLE IF NOT EXISTS ratelimit_queues (
batch_id INTEGER NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY (queue_type, batch_id, msg_id)
+);
+
+CREATE TABLE IF NOT EXISTS ratelimit_message_status (
+ msg_id TEXT PRIMARY KEY,
+ "status" INTEGER NOT NULL,
+ updated_at INTEGER NOT NULL
);
\ No newline at end of file
diff --git a/ratelimit/ratelimit_manager.nim b/ratelimit/ratelimit_manager.nim
index a30b5be..ef366a4 100644
--- a/ratelimit/ratelimit_manager.nim
+++ b/ratelimit/ratelimit_manager.nim
@@ -1,4 +1,4 @@
-import std/[times, options]
+import std/[times, options, tables, sequtils]
# TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live
import ./token_bucket
# import waku/common/rate_limit/token_bucket
@@ -12,12 +12,6 @@ type
AlmostNone
None
- SendResult* {.pure.} = enum
- PassedToSender
- Enqueued
- Dropped
- DroppedBatchTooLarge
-
Priority* {.pure.} = enum
Critical
Normal
@@ -76,31 +70,50 @@ proc getCapacityState[T](
else:
return CapacityState.Normal
+proc updateStatuses[T](
+ manager: RateLimitManager[T],
+ msgs: seq[tuple[msgId: string, msg: T]],
+ status: MessageStatus,
+): Future[MessageStatus] {.async.} =
+ let msgIds: seq[string] = msgs.mapIt(it.msgId)
+ # TODO log failed to update message statuses (if it occurs) think of a logging strategy
+ discard await manager.store.updateMessageStatuses(msgIds, status)
+ return status
+
+proc pushToQueueUpdatingStatus[T](
+ manager: RateLimitManager[T],
+ queueType: QueueType,
+ msgs: seq[tuple[msgId: string, msg: T]],
+): Future[MessageStatus] {.async.} =
+ ## Pushes to the queue and updates the status of the messages
+ let success = await manager.store.pushToQueue(queueType, msgs)
+ let status =
+ if success: MessageStatus.Enqueued else: MessageStatus.DroppedFailedToEnqueue
+ return await manager.updateStatuses(msgs, status)
+
proc passToSender[T](
manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]],
now: Moment,
priority: Priority,
-): Future[SendResult] {.async.} =
+): Future[MessageStatus] {.async.} =
let count = msgs.len
let consumed = manager.bucket.tryConsume(count, now)
if not consumed:
case priority
of Priority.Critical:
- discard await manager.store.pushToQueue(QueueType.Critical, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
of Priority.Normal:
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
- return SendResult.Dropped
+ return await manager.updateStatuses(msgs, MessageStatus.Dropped)
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
discard await manager.store.saveBucketState(
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
)
await manager.sender(msgs)
- return SendResult.PassedToSender
+ return await manager.updateStatuses(msgs, MessageStatus.PassedToSender)
proc processCriticalQueue[T](
manager: RateLimitManager[T], now: Moment
@@ -119,7 +132,8 @@ proc processCriticalQueue[T](
discard await manager.passToSender(msgs, now, Priority.Critical)
else:
# Put back to critical queue (add to front not possible, so we add to back and exit)
- discard await manager.store.pushToQueue(QueueType.Critical, msgs)
+ # I can safely discard the return since the status will be persisted
+ discard await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
break
proc processNormalQueue[T](
@@ -134,10 +148,12 @@ proc processNormalQueue[T](
let msgs = maybeMsgs.get()
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
+ # I can safely discard the return since the status will be persisted
discard await manager.passToSender(msgs, now, Priority.Normal)
else:
# Put back to normal queue (add to front not possible, so we add to back and exit)
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
+ # I can safely discard the return since the status will be persisted
+ discard await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
break
proc sendOrEnqueue*[T](
@@ -145,11 +161,11 @@ proc sendOrEnqueue*[T](
msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority,
now: Moment = Moment.now(),
-): Future[SendResult] {.async.} =
+): Future[MessageStatus] {.async.} =
let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation
- return SendResult.DroppedBatchTooLarge
+ return await manager.updateStatuses(msgs, MessageStatus.DroppedBatchTooLarge)
let capacityState = manager.getCapacityState(now, msgs.len)
case capacityState
@@ -160,22 +176,19 @@ proc sendOrEnqueue*[T](
of Priority.Critical:
return await manager.passToSender(msgs, now, priority)
of Priority.Normal:
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
- return SendResult.Dropped
+ return MessageStatus.Dropped
of CapacityState.None:
case priority
of Priority.Critical:
- discard await manager.store.pushToQueue(QueueType.Critical, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
of Priority.Normal:
- discard await manager.store.pushToQueue(QueueType.Normal, msgs)
- return SendResult.Enqueued
+ return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
- return SendResult.Dropped
+ return await manager.updateStatuses(msgs, MessageStatus.Dropped)
-proc queueHandleLoop*[T](
+proc queueHandleLoop[T](
manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
@@ -202,6 +215,17 @@ proc stop*[T](manager: RateLimitManager[T]) {.async.} =
if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait()
+proc getQuota*[T](
+ manager: RateLimitManager[T], now: Moment = Moment.now()
+): tuple[budget: int, budgetCap: int] =
+ let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
+ return (budget, budgetCap)
+
+proc getMessageStatus*[T](
+ manager: RateLimitManager[T], msgId: string
+): Future[Option[MessageStatus]] {.async.} =
+ return await manager.store.getMessageStatus(msgId)
+
func `$`*[T](b: RateLimitManager[T]): string {.inline.} =
if isNil(b):
return "nil"
diff --git a/ratelimit/store.nim b/ratelimit/store.nim
index 42fd152..d41783e 100644
--- a/ratelimit/store.nim
+++ b/ratelimit/store.nim
@@ -20,6 +20,13 @@ type
Critical = "critical"
Normal = "normal"
+ MessageStatus* {.pure.} = enum
+ PassedToSender
+ Enqueued
+ Dropped
+ DroppedBatchTooLarge
+ DroppedFailedToEnqueue
+
const BUCKET_STATE_KEY = "rate_limit_bucket_state"
## TODO find a way to make these procs async
@@ -192,6 +199,36 @@ proc popFromQueue*[T](
except:
return none(seq[tuple[msgId: string, msg: T]])
+proc updateMessageStatuses*[T](
+ store: RateLimitStore[T], messageIds: seq[string], status: MessageStatus
+): Future[bool] {.async.} =
+ try:
+ let now = times.getTime().toUnix()
+ store.db.exec(sql"BEGIN TRANSACTION")
+ for msgId in messageIds:
+ store.db.exec(
+ sql"INSERT INTO ratelimit_message_status (msg_id, status, updated_at) VALUES (?, ?, ?) ON CONFLICT(msg_id) DO UPDATE SET status = excluded.status, updated_at = excluded.updated_at",
+ msgId,
+ status,
+ now,
+ )
+ store.db.exec(sql"COMMIT")
+ return true
+ except:
+ store.db.exec(sql"ROLLBACK")
+ return false
+
+proc getMessageStatus*[T](
+ store: RateLimitStore[T], messageId: string
+): Future[Option[MessageStatus]] {.async.} =
+ let statusStr = store.db.getValue(
+ sql"SELECT status FROM ratelimit_message_status WHERE msg_id = ?", messageId
+ )
+ if statusStr == "":
+ return none(MessageStatus)
+
+ return some(MessageStatus.parse(statusStr))
+
proc getQueueLength*[T](store: RateLimitStore[T], queueType: QueueType): int =
case queueType
of QueueType.Critical:
From f71cf7b701961081595be78604710e123b29d3f1 Mon Sep 17 00:00:00 2001
From: pablo
Date: Mon, 1 Sep 2025 12:07:39 +0300
Subject: [PATCH 2/2] fix: added test for getMessageStatus
---
migrations/001_create_ratelimit_state.sql | 2 +-
ratelimit/ratelimit_manager.nim | 2 +-
ratelimit/store.nim | 6 +--
tests/test_ratelimit_manager.nim | 54 +++++++++++++++++++++++
4 files changed, 59 insertions(+), 5 deletions(-)
diff --git a/migrations/001_create_ratelimit_state.sql b/migrations/001_create_ratelimit_state.sql
index af73ba2..c761148 100644
--- a/migrations/001_create_ratelimit_state.sql
+++ b/migrations/001_create_ratelimit_state.sql
@@ -14,6 +14,6 @@ CREATE TABLE IF NOT EXISTS ratelimit_queues (
CREATE TABLE IF NOT EXISTS ratelimit_message_status (
msg_id TEXT PRIMARY KEY,
- "status" INTEGER NOT NULL,
+ "status" TEXT NOT NULL,
updated_at INTEGER NOT NULL
);
\ No newline at end of file
diff --git a/ratelimit/ratelimit_manager.nim b/ratelimit/ratelimit_manager.nim
index ef366a4..4b51e66 100644
--- a/ratelimit/ratelimit_manager.nim
+++ b/ratelimit/ratelimit_manager.nim
@@ -178,7 +178,7 @@ proc sendOrEnqueue*[T](
of Priority.Normal:
return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
- return MessageStatus.Dropped
+ return await manager.updateStatuses(msgs, MessageStatus.Dropped)
of CapacityState.None:
case priority
of Priority.Critical:
diff --git a/ratelimit/store.nim b/ratelimit/store.nim
index d41783e..3cde84e 100644
--- a/ratelimit/store.nim
+++ b/ratelimit/store.nim
@@ -11,7 +11,7 @@ type
normalLength: int
nextBatchId: int
- BucketState* {.pure} = object
+ BucketState* {.pure.} = object
budget*: int
budgetCap*: int
lastTimeFull*: Moment
@@ -31,7 +31,7 @@ const BUCKET_STATE_KEY = "rate_limit_bucket_state"
## TODO find a way to make these procs async
-proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async} =
+proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async.} =
result = M(db: db, criticalLength: 0, normalLength: 0, nextBatchId: 1)
# Initialize cached lengths from database
@@ -227,7 +227,7 @@ proc getMessageStatus*[T](
if statusStr == "":
return none(MessageStatus)
- return some(MessageStatus.parse(statusStr))
+ return some(parseEnum[MessageStatus](statusStr))
proc getQueueLength*[T](store: RateLimitStore[T], queueType: QueueType): int =
case queueType
diff --git a/tests/test_ratelimit_manager.nim b/tests/test_ratelimit_manager.nim
index 50a2000..e239746 100644
--- a/tests/test_ratelimit_manager.nim
+++ b/tests/test_ratelimit_manager.nim
@@ -50,6 +50,12 @@ suite "Queue RateLimitManager":
sentMessages[0].msgId == "msg1"
sentMessages[0].msg == "Hello World"
+ # Check message status
+ let status = await manager.getMessageStatus("msg1")
+ check:
+ status.isSome()
+ status.get() == MessageStatus.PassedToSender
+
asyncTest "sendOrEnqueue - multiple messages":
## Given
let store = await RateLimitStore[string].new(db)
@@ -71,6 +77,15 @@ suite "Queue RateLimitManager":
sentMessages[1].msgId == "msg2"
sentMessages[1].msg == "Second"
+ # Check message statuses
+ let status1 = await manager.getMessageStatus("msg1")
+ let status2 = await manager.getMessageStatus("msg2")
+ check:
+ status1.isSome()
+ status1.get() == MessageStatus.PassedToSender
+ status2.isSome()
+ status2.get() == MessageStatus.PassedToSender
+
asyncTest "start and stop - drop large batch":
## Given
let store = await RateLimitStore[string].new(db)
@@ -132,6 +147,18 @@ suite "Queue RateLimitManager":
r10 == PassedToSender
r11 == Enqueued
+ # Check message statuses
+ let status1 = await manager.getMessageStatus("msg1")
+ let status10 = await manager.getMessageStatus("msg10")
+ let status11 = await manager.getMessageStatus("msg11")
+ check:
+ status1.isSome()
+ status1.get() == MessageStatus.PassedToSender
+ status10.isSome()
+ status10.get() == MessageStatus.PassedToSender
+ status11.isSome()
+ status11.get() == MessageStatus.Enqueued
+
asyncTest "enqueue - enqueue normal on 70% capacity":
## Given
let store = await RateLimitStore[string].new(db)
@@ -177,6 +204,21 @@ suite "Queue RateLimitManager":
r11 == PassedToSender
r12 == Dropped
+ # Check message statuses for different outcomes
+ let statusSent = await manager.getMessageStatus("msg1")
+ let statusEnqueued = await manager.getMessageStatus("msg8")
+ let statusCritical = await manager.getMessageStatus("msg11")
+ let statusDropped = await manager.getMessageStatus("msg12")
+ check:
+ statusSent.isSome()
+ statusSent.get() == MessageStatus.PassedToSender
+ statusEnqueued.isSome()
+ statusEnqueued.get() == MessageStatus.Enqueued
+ statusCritical.isSome()
+ statusCritical.get() == MessageStatus.PassedToSender
+ statusDropped.isSome()
+ statusDropped.get() == MessageStatus.Dropped
+
asyncTest "enqueue - process queued messages":
## Given
let store = await RateLimitStore[string].new(db)
@@ -256,3 +298,15 @@ suite "Queue RateLimitManager":
sentMessages[11].msgId == "8"
sentMessages[12].msgId == "9"
sentMessages[13].msgId == "10"
+
+ # Check message statuses after queue processing
+ let statusProcessed8 = await manager.getMessageStatus("8")
+ let statusProcessed15 = await manager.getMessageStatus("15")
+ let statusDropped12 = await manager.getMessageStatus("12")
+ check:
+ statusProcessed8.isSome()
+ statusProcessed8.get() == MessageStatus.PassedToSender
+ statusProcessed15.isSome()
+ statusProcessed15.get() == MessageStatus.PassedToSender
+ statusDropped12.isSome()
+ statusDropped12.get() == MessageStatus.Dropped