diff --git a/migrations/001_create_ratelimit_state.sql b/migrations/001_create_ratelimit_state.sql index af73ba2..c761148 100644 --- a/migrations/001_create_ratelimit_state.sql +++ b/migrations/001_create_ratelimit_state.sql @@ -14,6 +14,6 @@ CREATE TABLE IF NOT EXISTS ratelimit_queues ( CREATE TABLE IF NOT EXISTS ratelimit_message_status ( msg_id TEXT PRIMARY KEY, - "status" INTEGER NOT NULL, + "status" TEXT NOT NULL, updated_at INTEGER NOT NULL ); \ No newline at end of file diff --git a/ratelimit/ratelimit_manager.nim b/ratelimit/ratelimit_manager.nim index ef366a4..4b51e66 100644 --- a/ratelimit/ratelimit_manager.nim +++ b/ratelimit/ratelimit_manager.nim @@ -178,7 +178,7 @@ proc sendOrEnqueue*[T]( of Priority.Normal: return await manager.pushToQueueUpdatingStatus(QueueType.Normal, msgs) of Priority.Optional: - return MessageStatus.Dropped + return await manager.updateStatuses(msgs, MessageStatus.Dropped) of CapacityState.None: case priority of Priority.Critical: diff --git a/ratelimit/store.nim b/ratelimit/store.nim index d41783e..3cde84e 100644 --- a/ratelimit/store.nim +++ b/ratelimit/store.nim @@ -11,7 +11,7 @@ type normalLength: int nextBatchId: int - BucketState* {.pure} = object + BucketState* {.pure.} = object budget*: int budgetCap*: int lastTimeFull*: Moment @@ -31,7 +31,7 @@ const BUCKET_STATE_KEY = "rate_limit_bucket_state" ## TODO find a way to make these procs async -proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async} = +proc new*[T](M: type[RateLimitStore[T]], db: DbConn): Future[M] {.async.} = result = M(db: db, criticalLength: 0, normalLength: 0, nextBatchId: 1) # Initialize cached lengths from database @@ -227,7 +227,7 @@ proc getMessageStatus*[T]( if statusStr == "": return none(MessageStatus) - return some(MessageStatus.parse(statusStr)) + return some(parseEnum[MessageStatus](statusStr)) proc getQueueLength*[T](store: RateLimitStore[T], queueType: QueueType): int = case queueType diff --git a/tests/test_ratelimit_manager.nim b/tests/test_ratelimit_manager.nim index 50a2000..e239746 100644 --- a/tests/test_ratelimit_manager.nim +++ b/tests/test_ratelimit_manager.nim @@ -50,6 +50,12 @@ suite "Queue RateLimitManager": sentMessages[0].msgId == "msg1" sentMessages[0].msg == "Hello World" + # Check message status + let status = await manager.getMessageStatus("msg1") + check: + status.isSome() + status.get() == MessageStatus.PassedToSender + asyncTest "sendOrEnqueue - multiple messages": ## Given let store = await RateLimitStore[string].new(db) @@ -71,6 +77,15 @@ suite "Queue RateLimitManager": sentMessages[1].msgId == "msg2" sentMessages[1].msg == "Second" + # Check message statuses + let status1 = await manager.getMessageStatus("msg1") + let status2 = await manager.getMessageStatus("msg2") + check: + status1.isSome() + status1.get() == MessageStatus.PassedToSender + status2.isSome() + status2.get() == MessageStatus.PassedToSender + asyncTest "start and stop - drop large batch": ## Given let store = await RateLimitStore[string].new(db) @@ -132,6 +147,18 @@ suite "Queue RateLimitManager": r10 == PassedToSender r11 == Enqueued + # Check message statuses + let status1 = await manager.getMessageStatus("msg1") + let status10 = await manager.getMessageStatus("msg10") + let status11 = await manager.getMessageStatus("msg11") + check: + status1.isSome() + status1.get() == MessageStatus.PassedToSender + status10.isSome() + status10.get() == MessageStatus.PassedToSender + status11.isSome() + status11.get() == MessageStatus.Enqueued + asyncTest "enqueue - enqueue normal on 70% capacity": ## Given let store = await RateLimitStore[string].new(db) @@ -177,6 +204,21 @@ suite "Queue RateLimitManager": r11 == PassedToSender r12 == Dropped + # Check message statuses for different outcomes + let statusSent = await manager.getMessageStatus("msg1") + let statusEnqueued = await manager.getMessageStatus("msg8") + let statusCritical = await manager.getMessageStatus("msg11") + let statusDropped = await manager.getMessageStatus("msg12") + check: + statusSent.isSome() + statusSent.get() == MessageStatus.PassedToSender + statusEnqueued.isSome() + statusEnqueued.get() == MessageStatus.Enqueued + statusCritical.isSome() + statusCritical.get() == MessageStatus.PassedToSender + statusDropped.isSome() + statusDropped.get() == MessageStatus.Dropped + asyncTest "enqueue - process queued messages": ## Given let store = await RateLimitStore[string].new(db) @@ -256,3 +298,15 @@ suite "Queue RateLimitManager": sentMessages[11].msgId == "8" sentMessages[12].msgId == "9" sentMessages[13].msgId == "10" + + # Check message statuses after queue processing + let statusProcessed8 = await manager.getMessageStatus("8") + let statusProcessed15 = await manager.getMessageStatus("15") + let statusDropped12 = await manager.getMessageStatus("12") + check: + statusProcessed8.isSome() + statusProcessed8.get() == MessageStatus.PassedToSender + statusProcessed15.isSome() + statusProcessed15.get() == MessageStatus.PassedToSender + statusDropped12.isSome() + statusDropped12.get() == MessageStatus.Dropped