diff --git a/migrations/001_create_ratelimit_state.sql b/migrations/001_create_ratelimit_state.sql index 293c6ee..c761148 100644 --- a/migrations/001_create_ratelimit_state.sql +++ b/migrations/001_create_ratelimit_state.sql @@ -10,4 +10,10 @@ CREATE TABLE IF NOT EXISTS ratelimit_queues ( batch_id INTEGER NOT NULL, created_at INTEGER NOT NULL, PRIMARY KEY (queue_type, batch_id, msg_id) +); + +CREATE TABLE IF NOT EXISTS ratelimit_message_status ( + msg_id TEXT PRIMARY KEY, + "status" TEXT NOT NULL, + updated_at INTEGER NOT NULL ); \ No newline at end of file diff --git a/ratelimit/ratelimit_manager.nim b/ratelimit/ratelimit_manager.nim index a30b5be..4b51e66 100644 --- a/ratelimit/ratelimit_manager.nim +++ b/ratelimit/ratelimit_manager.nim @@ -1,4 +1,4 @@ -import std/[times, options] +import std/[times, options, tables, sequtils] # TODO: move to waku's, chronos' or a lib tocken_bucket once decided where this will live import ./token_bucket # import waku/common/rate_limit/token_bucket @@ -12,12 +12,6 @@ type AlmostNone None - SendResult* {.pure.} = enum - PassedToSender - Enqueued - Dropped - DroppedBatchTooLarge - Priority* {.pure.} = enum Critical Normal @@ -76,31 +70,50 @@ proc getCapacityState[T]( else: return CapacityState.Normal +proc updateStatuses[T]( + manager: RateLimitManager[T], + msgs: seq[tuple[msgId: string, msg: T]], + status: MessageStatus, +): Future[MessageStatus] {.async.} = + let msgIds: seq[string] = msgs.mapIt(it.msgId) + # TODO log failed to update message statuses (if it occurs) think of a logging strategy + discard await manager.store.updateMessageStatuses(msgIds, status) + return status + +proc pushToQueueUpdatingStatus[T]( + manager: RateLimitManager[T], + queueType: QueueType, + msgs: seq[tuple[msgId: string, msg: T]], +): Future[MessageStatus] {.async.} = + ## Pushes to the queue and updates the status of the messages + let success = await manager.store.pushToQueue(queueType, msgs) + let status = + if success: MessageStatus.Enqueued else: MessageStatus.DroppedFailedToEnqueue + return await manager.updateStatuses(msgs, status) + proc passToSender[T]( manager: RateLimitManager[T], msgs: seq[tuple[msgId: string, msg: T]], now: Moment, priority: Priority, -): Future[SendResult] {.async.} = +): Future[MessageStatus] {.async.} = let count = msgs.len let consumed = manager.bucket.tryConsume(count, now) if not consumed: case priority of Priority.Critical: - discard await manager.store.pushToQueue(QueueType.Critical, msgs) - return SendResult.Enqueued + return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs) of Priority.Normal: - discard await manager.store.pushToQueue(QueueType.Normal, msgs) - return SendResult.Enqueued + return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) of Priority.Optional: - return SendResult.Dropped + return await manager.updateStatuses(msgs, MessageStatus.Dropped) let (budget, budgetCap, lastTimeFull) = manager.bucket.getAvailableCapacity(now) discard await manager.store.saveBucketState( BucketState(budget: budget, budgetCap: budgetCap, lastTimeFull: lastTimeFull) ) await manager.sender(msgs) - return SendResult.PassedToSender + return await manager.updateStatuses(msgs, MessageStatus.PassedToSender) proc processCriticalQueue[T]( manager: RateLimitManager[T], now: Moment @@ -119,7 +132,8 @@ proc processCriticalQueue[T]( discard await manager.passToSender(msgs, now, Priority.Critical) else: # Put back to critical queue (add to front not possible, so we add to back and exit) - discard await manager.store.pushToQueue(QueueType.Critical, msgs) + # I can safely discard the return since the status will be persisted + discard await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs) break proc processNormalQueue[T]( @@ -134,10 +148,12 @@ proc processNormalQueue[T]( let msgs = maybeMsgs.get() let capacityState = manager.getCapacityState(now, msgs.len) if capacityState == CapacityState.Normal: + # I can safely discard the return since the status will be persisted discard await manager.passToSender(msgs, now, Priority.Normal) else: # Put back to normal queue (add to front not possible, so we add to back and exit) - discard await manager.store.pushToQueue(QueueType.Normal, msgs) + # I can safely discard the return since the status will be persisted + discard await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) break proc sendOrEnqueue*[T]( @@ -145,11 +161,11 @@ proc sendOrEnqueue*[T]( msgs: seq[tuple[msgId: string, msg: T]], priority: Priority, now: Moment = Moment.now(), -): Future[SendResult] {.async.} = +): Future[MessageStatus] {.async.} = let (_, budgetCap, _) = manager.bucket.getAvailableCapacity(now) if msgs.len.float / budgetCap.float >= 0.3: # drop batch if it's too large to avoid starvation - return SendResult.DroppedBatchTooLarge + return await manager.updateStatuses(msgs, MessageStatus.DroppedBatchTooLarge) let capacityState = manager.getCapacityState(now, msgs.len) case capacityState @@ -160,22 +176,19 @@ proc sendOrEnqueue*[T]( of Priority.Critical: return await manager.passToSender(msgs, now, priority) of Priority.Normal: - discard await manager.store.pushToQueue(QueueType.Normal, msgs) - return SendResult.Enqueued + return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) of Priority.Optional: - return SendResult.Dropped + return await manager.updateStatuses(msgs, MessageStatus.Dropped) of CapacityState.None: case priority of Priority.Critical: - discard await manager.store.pushToQueue(QueueType.Critical, msgs) - return SendResult.Enqueued + return await manager.pushToQueueUpdatingStatus(QueueType.Critical, msgs) of Priority.Normal: - discard await manager.store.pushToQueue(QueueType.Normal, msgs) - return SendResult.Enqueued + return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) of Priority.Optional: - return SendResult.Dropped + return await manager.updateStatuses(msgs, MessageStatus.Dropped) -proc queueHandleLoop*[T]( +proc queueHandleLoop[T]( manager: RateLimitManager[T], nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = Moment.now(), @@ -202,6 +215,17 @@ proc stop*[T](manager: RateLimitManager[T]) {.async.} = if not isNil(manager.pxQueueHandleLoop): await manager.pxQueueHandleLoop.cancelAndWait() +proc getQuota*[T]( + manager: RateLimitManager[T], now: Moment = Moment.now() +): tuple[budget: int, budgetCap: int] = + let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now) + return (budget, budgetCap) + +proc getMessageStatus*[T]( + manager: RateLimitManager[T], msgId: string +): Future[Option[MessageStatus]] {.async.} = + return await manager.store.getMessageStatus(msgId) + func `$`*[T](b: RateLimitManager[T]): string {.inline.} = if isNil(b): return "nil" diff --git a/ratelimit/store.nim b/ratelimit/store.nim index 42fd152..3cde84e 100644 --- a/ratelimit/store.nim +++ b/ratelimit/store.nim @@ -11,7 +11,7 @@ type normalLength: int nextBatchId: int - BucketState* {.pure} = object + BucketState* {.pure.} = object budget*: int budgetCap*: int lastTimeFull*: Moment @@ -20,11 +20,18 @@ type Critical = "critical" Normal = "normal" + MessageStatus* {.pure.} = enum + PassedToSender + Enqueued + Dropped + DroppedBatchTooLarge + DroppedFailedToEnqueue + const BUCKET_STATE_KEY = "rate_limit_bucket_state" ## TODO find a way to make these procs async -proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async} = +proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async.} = result = M(db: db, criticalLength: 0, normalLength: 0, nextBatchId: 1) # Initialize cached lengths from database @@ -192,6 +199,36 @@ proc popFromQueue*[T]( except: return none(seq[tuple[msgId: string, msg: T]]) +proc updateMessageStatuses*[T]( + store: RateLimitStore[T], messageIds: seq[string], status: MessageStatus +): Future[bool] {.async.} = + try: + let now = times.getTime().toUnix() + store.db.exec(sql"BEGIN TRANSACTION") + for msgId in messageIds: + store.db.exec( + sql"INSERT INTO ratelimit_message_status (msg_id, status, updated_at) VALUES (?, ?, ?) ON CONFLICT(msg_id) DO UPDATE SET status = excluded.status, updated_at = excluded.updated_at", + msgId, + status, + now, + ) + store.db.exec(sql"COMMIT") + return true + except: + store.db.exec(sql"ROLLBACK") + return false + +proc getMessageStatus*[T]( + store: RateLimitStore[T], messageId: string +): Future[Option[MessageStatus]] {.async.} = + let statusStr = store.db.getValue( + sql"SELECT status FROM ratelimit_message_status WHERE msg_id = ?", messageId + ) + if statusStr == "": + return none(MessageStatus) + + return some(parseEnum[MessageStatus](statusStr)) + proc getQueueLength*[T](store: RateLimitStore[T], queueType: QueueType): int = case queueType of QueueType.Critical: diff --git a/tests/test_ratelimit_manager.nim b/tests/test_ratelimit_manager.nim index 50a2000..e239746 100644 --- a/tests/test_ratelimit_manager.nim +++ b/tests/test_ratelimit_manager.nim @@ -50,6 +50,12 @@ suite "Queue RateLimitManager": sentMessages[0].msgId == "msg1" sentMessages[0].msg == "Hello World" + # Check message status + let status = await manager.getMessageStatus("msg1") + check: + status.isSome() + status.get() == MessageStatus.PassedToSender + asyncTest "sendOrEnqueue - multiple messages": ## Given let store = await RateLimitStore[string].new(db) @@ -71,6 +77,15 @@ suite "Queue RateLimitManager": sentMessages[1].msgId == "msg2" sentMessages[1].msg == "Second" + # Check message statuses + let status1 = await manager.getMessageStatus("msg1") + let status2 = await manager.getMessageStatus("msg2") + check: + status1.isSome() + status1.get() == MessageStatus.PassedToSender + status2.isSome() + status2.get() == MessageStatus.PassedToSender + asyncTest "start and stop - drop large batch": ## Given let store = await RateLimitStore[string].new(db) @@ -132,6 +147,18 @@ suite "Queue RateLimitManager": r10 == PassedToSender r11 == Enqueued + # Check message statuses + let status1 = await manager.getMessageStatus("msg1") + let status10 = await manager.getMessageStatus("msg10") + let status11 = await manager.getMessageStatus("msg11") + check: + status1.isSome() + status1.get() == MessageStatus.PassedToSender + status10.isSome() + status10.get() == MessageStatus.PassedToSender + status11.isSome() + status11.get() == MessageStatus.Enqueued + asyncTest "enqueue - enqueue normal on 70% capacity": ## Given let store = await RateLimitStore[string].new(db) @@ -177,6 +204,21 @@ suite "Queue RateLimitManager": r11 == PassedToSender r12 == Dropped + # Check message statuses for different outcomes + let statusSent = await manager.getMessageStatus("msg1") + let statusEnqueued = await manager.getMessageStatus("msg8") + let statusCritical = await manager.getMessageStatus("msg11") + let statusDropped = await manager.getMessageStatus("msg12") + check: + statusSent.isSome() + statusSent.get() == MessageStatus.PassedToSender + statusEnqueued.isSome() + statusEnqueued.get() == MessageStatus.Enqueued + statusCritical.isSome() + statusCritical.get() == MessageStatus.PassedToSender + statusDropped.isSome() + statusDropped.get() == MessageStatus.Dropped + asyncTest "enqueue - process queued messages": ## Given let store = await RateLimitStore[string].new(db) @@ -256,3 +298,15 @@ suite "Queue RateLimitManager": sentMessages[11].msgId == "8" sentMessages[12].msgId == "9" sentMessages[13].msgId == "10" + + # Check message statuses after queue processing + let statusProcessed8 = await manager.getMessageStatus("8") + let statusProcessed15 = await manager.getMessageStatus("15") + let statusDropped12 = await manager.getMessageStatus("12") + check: + statusProcessed8.isSome() + statusProcessed8.get() == MessageStatus.PassedToSender + statusProcessed15.isSome() + statusProcessed15.get() == MessageStatus.PassedToSender + statusDropped12.isSome() + statusDropped12.get() == MessageStatus.Dropped