From 7e4f930ae30827edcab643e6cdf8f1be81aac59e Mon Sep 17 00:00:00 2001
From: pablo
Date: Mon, 14 Jul 2025 11:14:36 +0300
Subject: [PATCH] feat: store rate limit
---
.gitignore | 2 +
migrations/001_create_ratelimit_state.sql | 7 +
ratelimit/store/memory.nim | 21 +++
ratelimit/store/sqlite.nim | 78 +++++++++
ratelimit/store/store.nim | 13 ++
ratelimit/token_bucket.nim | 198 ++++++++++++++++++++++
tests/test_sqlite_store.nim | 40 +++++
7 files changed, 359 insertions(+)
create mode 100644 migrations/001_create_ratelimit_state.sql
create mode 100644 ratelimit/store/memory.nim
create mode 100644 ratelimit/store/sqlite.nim
create mode 100644 ratelimit/store/store.nim
create mode 100644 ratelimit/token_bucket.nim
create mode 100644 tests/test_sqlite_store.nim
diff --git a/.gitignore b/.gitignore
index 3a79a35..4ee6ee2 100644
--- a/.gitignore
+++ b/.gitignore
@@ -23,5 +23,7 @@ chat_sdk/*
!*.nim
apps/*
!*.nim
+tests/*
+!*.nim
nimble.develop
nimble.paths
diff --git a/migrations/001_create_ratelimit_state.sql b/migrations/001_create_ratelimit_state.sql
new file mode 100644
index 0000000..11431ab
--- /dev/null
+++ b/migrations/001_create_ratelimit_state.sql
@@ -0,0 +1,7 @@
+ -- will only exist one row in the table
+ CREATE TABLE IF NOT EXISTS bucket_state (
+ id INTEGER PRIMARY KEY,
+ budget INTEGER NOT NULL,
+ budget_cap INTEGER NOT NULL,
+ last_time_full_seconds INTEGER NOT NULL
+ )
\ No newline at end of file
diff --git a/ratelimit/store/memory.nim b/ratelimit/store/memory.nim
new file mode 100644
index 0000000..ef19de1
--- /dev/null
+++ b/ratelimit/store/memory.nim
@@ -0,0 +1,21 @@
+import std/times
+import ./store
+import chronos
+
+# Memory Implementation
+type MemoryRateLimitStore* = ref object
+ bucketState: BucketState
+
+proc newMemoryRateLimitStore*(): MemoryRateLimitStore =
+ result = MemoryRateLimitStore()
+ result.bucketState =
+ BucketState(budget: 10, budgetCap: 10, lastTimeFull: Moment.now())
+
+proc saveBucketState*(
+ store: MemoryRateLimitStore, bucketState: BucketState
+): Future[bool] {.async.} =
+ store.bucketState = bucketState
+ return true
+
+proc loadBucketState*(store: MemoryRateLimitStore): Future[BucketState] {.async.} =
+ return store.bucketState
diff --git a/ratelimit/store/sqlite.nim b/ratelimit/store/sqlite.nim
new file mode 100644
index 0000000..d8aaf38
--- /dev/null
+++ b/ratelimit/store/sqlite.nim
@@ -0,0 +1,78 @@
+import std/times
+import std/strutils
+import ./store
+import chronos
+import db_connector/db_sqlite
+
+# SQLite Implementation
+type SqliteRateLimitStore* = ref object
+ db: DbConn
+ dbPath: string
+
+proc newSqliteRateLimitStore*(dbPath: string = ":memory:"): SqliteRateLimitStore =
+ result = SqliteRateLimitStore(dbPath: dbPath)
+ result.db = open(dbPath, "", "", "")
+
+ # Create table if it doesn't exist
+ result.db.exec(
+ sql"""
+ CREATE TABLE IF NOT EXISTS bucket_state (
+ id INTEGER PRIMARY KEY,
+ budget INTEGER NOT NULL,
+ budget_cap INTEGER NOT NULL,
+ last_time_full_seconds INTEGER NOT NULL
+ )
+ """
+ )
+
+ # Insert default state if table is empty
+ let count = result.db.getValue(sql"SELECT COUNT(*) FROM bucket_state").parseInt()
+ if count == 0:
+ let defaultTimeSeconds = Moment.now().epochSeconds()
+ result.db.exec(
+ sql"""
+ INSERT INTO bucket_state (id, budget, budget_cap, last_time_full_seconds)
+ VALUES (1, 10, 10, ?)
+ """,
+ defaultTimeSeconds,
+ )
+
+proc close*(store: SqliteRateLimitStore) =
+ if store.db != nil:
+ store.db.close()
+
+proc saveBucketState*(
+ store: SqliteRateLimitStore, bucketState: BucketState
+): Future[bool] {.async.} =
+ try:
+ # Convert Moment to Unix seconds for storage
+ let lastTimeSeconds = bucketState.lastTimeFull.epochSeconds()
+ store.db.exec(
+ sql"""
+ UPDATE bucket_state
+ SET budget = ?, budget_cap = ?, last_time_full_seconds = ?
+ WHERE id = 1
+ """,
+ bucketState.budget,
+ bucketState.budgetCap,
+ lastTimeSeconds,
+ )
+ return true
+ except:
+ return false
+
+proc loadBucketState*(store: SqliteRateLimitStore): Future[BucketState] {.async.} =
+ let row = store.db.getRow(
+ sql"""
+ SELECT budget, budget_cap, last_time_full_seconds
+ FROM bucket_state
+ WHERE id = 1
+ """
+ )
+ # Convert Unix seconds back to Moment (seconds precission)
+ let unixSeconds = row[2].parseInt().int64
+ let lastTimeFull = Moment.init(unixSeconds, chronos.seconds(1))
+
+ return BucketState(
+ budget: row[0].parseInt(), budgetCap: row[1].parseInt(), lastTimeFull: lastTimeFull
+ )
diff --git a/ratelimit/store/store.nim b/ratelimit/store/store.nim
new file mode 100644
index 0000000..fab578d
--- /dev/null
+++ b/ratelimit/store/store.nim
@@ -0,0 +1,13 @@
+import std/[times, deques]
+import chronos
+
+type
+ BucketState* = object
+ budget*: int
+ budgetCap*: int
+ lastTimeFull*: Moment
+
+ RateLimitStoreConcept* =
+ concept s
+ s.saveBucketState(BucketState) is Future[bool]
+ s.loadBucketState() is Future[BucketState]
diff --git a/ratelimit/token_bucket.nim b/ratelimit/token_bucket.nim
new file mode 100644
index 0000000..447569a
--- /dev/null
+++ b/ratelimit/token_bucket.nim
@@ -0,0 +1,198 @@
+{.push raises: [].}
+
+import chronos, std/math, std/options
+
+const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
+
+## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
+## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
+## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
+## ref: https://github.com/status-im/nim-chronos/issues/500
+##
+## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
+## - It has a new mode called `Compensating` which is the default mode.
+## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
+## or up until maximum the allowed compansation treshold (Currently it is const 25%).
+## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
+## overcompensation.
+## - Strict mode is also available which will only replenish when time period is over but also will fill
+## the bucket to the max capacity.
+
+type
+ ReplenishMode* = enum
+ Strict
+ Compensating
+
+ TokenBucket* = ref object
+ budget: int ## Current number of tokens in the bucket
+ budgetCap: int ## Bucket capacity
+ lastTimeFull: Moment
+ ## This timer measures the proper periodizaiton of the bucket refilling
+ fillDuration: Duration ## Refill period
+ case replenishMode*: ReplenishMode
+ of Strict:
+ ## In strict mode, the bucket is refilled only till the budgetCap
+ discard
+ of Compensating:
+ ## This is the default mode.
+ maxCompensation: float
+
+func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
+ ## notice fillDuration cannot be zero by design
+ ## period distance is a float number representing the calculated period time
+ ## since the last time bucket was refilled.
+ return
+ nanoseconds(currentTime - bucket.lastTimeFull).float /
+ nanoseconds(bucket.fillDuration).float
+
+func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
+ if distance == 0.float:
+ ## in case there is zero time difference than the usage percentage is 100%
+ return 1.0
+
+ ## budgetCap can never be zero
+ ## usage average is calculated as a percentage of total capacity available over
+ ## the measured period
+ return bucket.budget.float / bucket.budgetCap.float / distance
+
+proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
+ # if we already fully used or even overused the tokens, there is no place for compensation
+ if averageUsage >= 1.0:
+ return 0
+
+ ## compensation is the not used bucket capacity in the last measured period(s) in average.
+ ## or maximum the allowed compansation treshold
+ let compensationPercent =
+ min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
+ return trunc(compensationPercent).int
+
+func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
+ return currentTime - bucket.lastTimeFull >= bucket.fillDuration
+
+## Update will take place if bucket is empty and trying to consume tokens.
+## It checks if the bucket can be replenished as refill duration is passed or not.
+## - strict mode:
+proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
+ if bucket.fillDuration == default(Duration):
+ bucket.budget = min(bucket.budgetCap, bucket.budget)
+ return
+
+ if not periodElapsed(bucket, currentTime):
+ return
+
+ bucket.budget = bucket.budgetCap
+ bucket.lastTimeFull = currentTime
+
+## - compensating - ballancing load:
+## - between updates we calculate average load (current bucket capacity / number of periods till last update)
+## - gives the percentage load used recently
+## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
+proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
+ if bucket.fillDuration == default(Duration):
+ bucket.budget = min(bucket.budgetCap, bucket.budget)
+ return
+
+ # do not replenish within the same period
+ if not periodElapsed(bucket, currentTime):
+ return
+
+ let distance = bucket.periodDistance(currentTime)
+ let recentAvgUsage = bucket.getUsageAverageSince(distance)
+ let compensation = bucket.calcCompensation(recentAvgUsage)
+
+ bucket.budget = bucket.budgetCap + compensation
+ bucket.lastTimeFull = currentTime
+
+proc update(bucket: TokenBucket, currentTime: Moment) =
+ if bucket.replenishMode == ReplenishMode.Compensating:
+ updateWithCompensation(bucket, currentTime)
+ else:
+ updateStrict(bucket, currentTime)
+
+## Returns the available capacity of the bucket: (budget, budgetCap)
+proc getAvailableCapacity*(
+ bucket: TokenBucket, currentTime: Moment
+): tuple[budget: int, budgetCap: int] =
+ if periodElapsed(bucket, currentTime):
+ case bucket.replenishMode
+ of ReplenishMode.Strict:
+ return (bucket.budgetCap, bucket.budgetCap)
+ of ReplenishMode.Compensating:
+ let distance = bucket.periodDistance(currentTime)
+ let recentAvgUsage = bucket.getUsageAverageSince(distance)
+ let compensation = bucket.calcCompensation(recentAvgUsage)
+ let availableBudget = bucket.budgetCap + compensation
+ return (availableBudget, bucket.budgetCap)
+ return (bucket.budget, bucket.budgetCap)
+
+proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
+ ## If `tokens` are available, consume them,
+ ## Otherwhise, return false.
+
+ if bucket.budget >= bucket.budgetCap:
+ bucket.lastTimeFull = now
+
+ if bucket.budget >= tokens:
+ bucket.budget -= tokens
+ return true
+
+ bucket.update(now)
+
+ if bucket.budget >= tokens:
+ bucket.budget -= tokens
+ return true
+ else:
+ return false
+
+proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
+ ## Add `tokens` to the budget (capped to the bucket capacity)
+ bucket.budget += tokens
+ bucket.update(now)
+
+proc new*(
+ T: type[TokenBucket],
+ budgetCap: int,
+ fillDuration: Duration = 1.seconds,
+ mode: ReplenishMode = ReplenishMode.Compensating,
+): T =
+ assert not isZero(fillDuration)
+ assert budgetCap != 0
+
+ ## Create different mode TokenBucket
+ case mode
+ of ReplenishMode.Strict:
+ return T(
+ budget: budgetCap,
+ budgetCap: budgetCap,
+ fillDuration: fillDuration,
+ lastTimeFull: Moment.now(),
+ replenishMode: mode,
+ )
+ of ReplenishMode.Compensating:
+ T(
+ budget: budgetCap,
+ budgetCap: budgetCap,
+ fillDuration: fillDuration,
+ lastTimeFull: Moment.now(),
+ replenishMode: mode,
+ maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
+ )
+
+proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
+ T.new(capacity, period, ReplenishMode.Strict)
+
+proc newCompensating*(
+ T: type[TokenBucket], capacity: int, period: Duration
+): TokenBucket =
+ T.new(capacity, period, ReplenishMode.Compensating)
+
+func `$`*(b: TokenBucket): string {.inline.} =
+ if isNil(b):
+ return "nil"
+ return $b.budgetCap & "/" & $b.fillDuration
+
+func `$`*(ob: Option[TokenBucket]): string {.inline.} =
+ if ob.isNone():
+ return "no-limit"
+
+ return $ob.get()
diff --git a/tests/test_sqlite_store.nim b/tests/test_sqlite_store.nim
new file mode 100644
index 0000000..98dbe08
--- /dev/null
+++ b/tests/test_sqlite_store.nim
@@ -0,0 +1,40 @@
+{.used.}
+
+import testutils/unittests
+import ../ratelimit/store/sqlite
+import ../ratelimit/store/store
+import chronos
+
+suite "SqliteRateLimitStore Tests":
+ asyncTest "newSqliteRateLimitStore - creates store with default values":
+ ## Given & When
+ let now = Moment.now()
+ let store = newSqliteRateLimitStore()
+ defer:
+ store.close()
+
+ ## Then
+ let bucketState = await store.loadBucketState()
+ check bucketState.budget == 10
+ check bucketState.budgetCap == 10
+ check bucketState.lastTimeFull.epochSeconds() == now.epochSeconds()
+
+ asyncTest "saveBucketState and loadBucketState - state persistence":
+ ## Given
+ let now = Moment.now()
+ let store = newSqliteRateLimitStore()
+ defer:
+ store.close()
+
+ let newBucketState = BucketState(budget: 5, budgetCap: 20, lastTimeFull: now)
+
+ ## When
+ let saveResult = await store.saveBucketState(newBucketState)
+ let loadedState = await store.loadBucketState()
+
+ ## Then
+ check saveResult == true
+ check loadedState.budget == newBucketState.budget
+ check loadedState.budgetCap == newBucketState.budgetCap
+ check loadedState.lastTimeFull.epochSeconds() ==
+ newBucketState.lastTimeFull.epochSeconds()