Merge pull request #9 from waku-org/feat/expose-data

Feat/expose data
This commit is contained in:
Pablo Lopez 2025-09-02 07:37:23 +03:00 committed by GitHub
commit ef6add52be
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 150 additions and 29 deletions

View File

@ -10,4 +10,10 @@ CREATE TABLE IF NOT EXISTS ratelimit_queues (
batch_id INTEGER NOT NULL,
created_at INTEGER NOT NULL,
PRIMARY KEY (queue_type, batch_id, msg_id)
);
CREATE TABLE IF NOT EXISTS ratelimit_message_status (
msg_id TEXT PRIMARY KEY,
"status" TEXT NOT NULL,
updated_at INTEGER NOT NULL
);

View File

@ -1,4 +1,4 @@
import std/[times, options]
import std/[times, options, tables, sequtils]
# TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live
import ./token_bucket
# import waku/common/rate_limit/token_bucket
@ -12,12 +12,6 @@ type
AlmostNone
None
SendResult* {.pure.} = enum
PassedToSender
Enqueued
Dropped
DroppedBatchTooLarge
Priority* {.pure.} = enum
Critical
Normal
@ -76,31 +70,50 @@ proc getCapacityState[T](
else:
return CapacityState.Normal
proc updateStatuses[T](
manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]],
status: MessageStatus,
): Future[MessageStatus] {.async.} =
let msgIds: seq[string] = msgs.mapIt(it.msgId)
# TODO log failed to update message statuses (if it occurs) think of a logging strategy
discard await manager.store.updateMessageStatuses(msgIds, status)
return status
proc pushToQueueUpdatingStatus[T](
manager: RateLimitManager[T],
queueType: QueueType,
msgs: seq[tuple[msgId: string, msg: T]],
): Future[MessageStatus] {.async.} =
## Pushes to the queue and updates the status of the messages
let success = await manager.store.pushToQueue(queueType, msgs)
let status =
if success: MessageStatus.Enqueued else: MessageStatus.DroppedFailedToEnqueue
return await manager.updateStatuses(msgs, status)
proc passToSender[T](
manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]],
now: Moment,
priority: Priority,
): Future[SendResult] {.async.} =
): Future[MessageStatus] {.async.} =
let count = msgs.len
let consumed = manager.bucket.tryConsume(count, now)
if not consumed:
case priority
of Priority.Critical:
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
return SendResult.Enqueued
return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
of Priority.Normal:
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued
return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
return SendResult.Dropped
return await manager.updateStatuses(msgs, MessageStatus.Dropped)
let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now)
discard await manager.store.saveBucketState(
BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull)
)
await manager.sender(msgs)
return SendResult.PassedToSender
return await manager.updateStatuses(msgs, MessageStatus.PassedToSender)
proc processCriticalQueue[T](
manager: RateLimitManager[T], now: Moment
@ -119,7 +132,8 @@ proc processCriticalQueue[T](
discard await manager.passToSender(msgs, now, Priority.Critical)
else:
# Put back to critical queue (add to front not possible, so we add to back and exit)
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
# I can safely discard the return since the status will be persisted
discard await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
break
proc processNormalQueue[T](
@ -134,10 +148,12 @@ proc processNormalQueue[T](
let msgs = maybeMsgs.get()
let capacityState = manager.getCapacityState(now, msgs.len)
if capacityState == CapacityState.Normal:
# I can safely discard the return since the status will be persisted
discard await manager.passToSender(msgs, now, Priority.Normal)
else:
# Put back to normal queue (add to front not possible, so we add to back and exit)
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
# I can safely discard the return since the status will be persisted
discard await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
break
proc sendOrEnqueue*[T](
@ -145,11 +161,11 @@ proc sendOrEnqueue*[T](
msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority,
now: Moment = Moment.now(),
): Future[SendResult] {.async.} =
): Future[MessageStatus] {.async.} =
let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
if msgs.len.float / budgetCap.float >= 0.3:
# drop batch if it's too large to avoid starvation
return SendResult.DroppedBatchTooLarge
return await manager.updateStatuses(msgs, MessageStatus.DroppedBatchTooLarge)
let capacityState = manager.getCapacityState(now, msgs.len)
case capacityState
@ -160,22 +176,19 @@ proc sendOrEnqueue*[T](
of Priority.Critical:
return await manager.passToSender(msgs, now, priority)
of Priority.Normal:
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued
return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
return SendResult.Dropped
return await manager.updateStatuses(msgs, MessageStatus.Dropped)
of CapacityState.None:
case priority
of Priority.Critical:
discard await manager.store.pushToQueue(QueueType.Critical, msgs)
return SendResult.Enqueued
return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs)
of Priority.Normal:
discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued
return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs)
of Priority.Optional:
return SendResult.Dropped
return await manager.updateStatuses(msgs, MessageStatus.Dropped)
proc queueHandleLoop*[T](
proc queueHandleLoop[T](
manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(),
@ -202,6 +215,17 @@ proc stop*[T](manager: RateLimitManager[T]) {.async.} =
if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait()
proc getQuota*[T](
manager: RateLimitManager[T], now: Moment = Moment.now()
): tuple[budget: int, budgetCap: int] =
let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
return (budget, budgetCap)
proc getMessageStatus*[T](
manager: RateLimitManager[T], msgId: string
): Future[Option[MessageStatus]] {.async.} =
return await manager.store.getMessageStatus(msgId)
func `$`*[T](b: RateLimitManager[T]): string {.inline.} =
if isNil(b):
return "nil"

View File

@ -11,7 +11,7 @@ type
normalLength: int
nextBatchId: int
BucketState* {.pure} = object
BucketState* {.pure.} = object
budget*: int
budgetCap*: int
lastTimeFull*: Moment
@ -20,11 +20,18 @@ type
Critical = "critical"
Normal = "normal"
MessageStatus* {.pure.} = enum
PassedToSender
Enqueued
Dropped
DroppedBatchTooLarge
DroppedFailedToEnqueue
const BUCKET_STATE_KEY = "rate_limit_bucket_state"
## TODO find a way to make these procs async
proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async} =
proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async.} =
result = M(db: db, criticalLength: 0, normalLength: 0, nextBatchId: 1)
# Initialize cached lengths from database
@ -192,6 +199,36 @@ proc popFromQueue*[T](
except:
return none(seq[tuple[msgId: string, msg: T]])
proc updateMessageStatuses*[T](
store: RateLimitStore[T], messageIds: seq[string], status: MessageStatus
): Future[bool] {.async.} =
try:
let now = times.getTime().toUnix()
store.db.exec(sql"BEGIN TRANSACTION")
for msgId in messageIds:
store.db.exec(
sql"INSERT INTO ratelimit_message_status (msg_id, status, updated_at) VALUES (?, ?, ?) ON CONFLICT(msg_id) DO UPDATE SET status = excluded.status, updated_at = excluded.updated_at",
msgId,
status,
now,
)
store.db.exec(sql"COMMIT")
return true
except:
store.db.exec(sql"ROLLBACK")
return false
proc getMessageStatus*[T](
store: RateLimitStore[T], messageId: string
): Future[Option[MessageStatus]] {.async.} =
let statusStr = store.db.getValue(
sql"SELECT status FROM ratelimit_message_status WHERE msg_id = ?", messageId
)
if statusStr == "":
return none(MessageStatus)
return some(parseEnum[MessageStatus](statusStr))
proc getQueueLength*[T](store: RateLimitStore[T], queueType: QueueType): int =
case queueType
of QueueType.Critical:

View File

@ -50,6 +50,12 @@ suite "Queue RateLimitManager":
sentMessages[0].msgId == "msg1"
sentMessages[0].msg == "Hello World"
# Check message status
let status = await manager.getMessageStatus("msg1")
check:
status.isSome()
status.get() == MessageStatus.PassedToSender
asyncTest "sendOrEnqueue - multiple messages":
## Given
let store = await RateLimitStore[string].new(db)
@ -71,6 +77,15 @@ suite "Queue RateLimitManager":
sentMessages[1].msgId == "msg2"
sentMessages[1].msg == "Second"
# Check message statuses
let status1 = await manager.getMessageStatus("msg1")
let status2 = await manager.getMessageStatus("msg2")
check:
status1.isSome()
status1.get() == MessageStatus.PassedToSender
status2.isSome()
status2.get() == MessageStatus.PassedToSender
asyncTest "start and stop - drop large batch":
## Given
let store = await RateLimitStore[string].new(db)
@ -132,6 +147,18 @@ suite "Queue RateLimitManager":
r10 == PassedToSender
r11 == Enqueued
# Check message statuses
let status1 = await manager.getMessageStatus("msg1")
let status10 = await manager.getMessageStatus("msg10")
let status11 = await manager.getMessageStatus("msg11")
check:
status1.isSome()
status1.get() == MessageStatus.PassedToSender
status10.isSome()
status10.get() == MessageStatus.PassedToSender
status11.isSome()
status11.get() == MessageStatus.Enqueued
asyncTest "enqueue - enqueue normal on 70% capacity":
## Given
let store = await RateLimitStore[string].new(db)
@ -177,6 +204,21 @@ suite "Queue RateLimitManager":
r11 == PassedToSender
r12 == Dropped
# Check message statuses for different outcomes
let statusSent = await manager.getMessageStatus("msg1")
let statusEnqueued = await manager.getMessageStatus("msg8")
let statusCritical = await manager.getMessageStatus("msg11")
let statusDropped = await manager.getMessageStatus("msg12")
check:
statusSent.isSome()
statusSent.get() == MessageStatus.PassedToSender
statusEnqueued.isSome()
statusEnqueued.get() == MessageStatus.Enqueued
statusCritical.isSome()
statusCritical.get() == MessageStatus.PassedToSender
statusDropped.isSome()
statusDropped.get() == MessageStatus.Dropped
asyncTest "enqueue - process queued messages":
## Given
let store = await RateLimitStore[string].new(db)
@ -256,3 +298,15 @@ suite "Queue RateLimitManager":
sentMessages[11].msgId == "8"
sentMessages[12].msgId == "9"
sentMessages[13].msgId == "10"
# Check message statuses after queue processing
let statusProcessed8 = await manager.getMessageStatus("8")
let statusProcessed15 = await manager.getMessageStatus("15")
let statusDropped12 = await manager.getMessageStatus("12")
check:
statusProcessed8.isSome()
statusProcessed8.get() == MessageStatus.PassedToSender
statusProcessed15.isSome()
statusProcessed15.get() == MessageStatus.PassedToSender
statusDropped12.isSome()
statusDropped12.get() == MessageStatus.Dropped