mirror of
https://github.com/logos-messaging/nim-chat-sdk.git
synced 2026-01-07 08:33:11 +00:00
feat: store rate limit
This commit is contained in:
parent
f8e2ace1df
commit
7e4f930ae3
2
.gitignore
vendored
2
.gitignore
vendored
@ -23,5 +23,7 @@ chat_sdk/*
|
||||
!*.nim
|
||||
apps/*
|
||||
!*.nim
|
||||
tests/*
|
||||
!*.nim
|
||||
nimble.develop
|
||||
nimble.paths
|
||||
|
||||
7
migrations/001_create_ratelimit_state.sql
Normal file
7
migrations/001_create_ratelimit_state.sql
Normal file
@ -0,0 +1,7 @@
|
||||
-- will only exist one row in the table
|
||||
CREATE TABLE IF NOT EXISTS bucket_state (
|
||||
id INTEGER PRIMARY KEY,
|
||||
budget INTEGER NOT NULL,
|
||||
budget_cap INTEGER NOT NULL,
|
||||
last_time_full_seconds INTEGER NOT NULL
|
||||
)
|
||||
21
ratelimit/store/memory.nim
Normal file
21
ratelimit/store/memory.nim
Normal file
@ -0,0 +1,21 @@
|
||||
import std/times
|
||||
import ./store
|
||||
import chronos
|
||||
|
||||
# Memory Implementation
|
||||
type MemoryRateLimitStore* = ref object
|
||||
bucketState: BucketState
|
||||
|
||||
proc newMemoryRateLimitStore*(): MemoryRateLimitStore =
|
||||
result = MemoryRateLimitStore()
|
||||
result.bucketState =
|
||||
BucketState(budget: 10, budgetCap: 10, lastTimeFull: Moment.now())
|
||||
|
||||
proc saveBucketState*(
|
||||
store: MemoryRateLimitStore, bucketState: BucketState
|
||||
): Future[bool] {.async.} =
|
||||
store.bucketState = bucketState
|
||||
return true
|
||||
|
||||
proc loadBucketState*(store: MemoryRateLimitStore): Future[BucketState] {.async.} =
|
||||
return store.bucketState
|
||||
78
ratelimit/store/sqlite.nim
Normal file
78
ratelimit/store/sqlite.nim
Normal file
@ -0,0 +1,78 @@
|
||||
import std/times
|
||||
import std/strutils
|
||||
import ./store
|
||||
import chronos
|
||||
import db_connector/db_sqlite
|
||||
|
||||
# SQLite Implementation
|
||||
type SqliteRateLimitStore* = ref object
|
||||
db: DbConn
|
||||
dbPath: string
|
||||
|
||||
proc newSqliteRateLimitStore*(dbPath: string = ":memory:"): SqliteRateLimitStore =
|
||||
result = SqliteRateLimitStore(dbPath: dbPath)
|
||||
result.db = open(dbPath, "", "", "")
|
||||
|
||||
# Create table if it doesn't exist
|
||||
result.db.exec(
|
||||
sql"""
|
||||
CREATE TABLE IF NOT EXISTS bucket_state (
|
||||
id INTEGER PRIMARY KEY,
|
||||
budget INTEGER NOT NULL,
|
||||
budget_cap INTEGER NOT NULL,
|
||||
last_time_full_seconds INTEGER NOT NULL
|
||||
)
|
||||
"""
|
||||
)
|
||||
|
||||
# Insert default state if table is empty
|
||||
let count = result.db.getValue(sql"SELECT COUNT(*) FROM bucket_state").parseInt()
|
||||
if count == 0:
|
||||
let defaultTimeSeconds = Moment.now().epochSeconds()
|
||||
result.db.exec(
|
||||
sql"""
|
||||
INSERT INTO bucket_state (id, budget, budget_cap, last_time_full_seconds)
|
||||
VALUES (1, 10, 10, ?)
|
||||
""",
|
||||
defaultTimeSeconds,
|
||||
)
|
||||
|
||||
proc close*(store: SqliteRateLimitStore) =
|
||||
if store.db != nil:
|
||||
store.db.close()
|
||||
|
||||
proc saveBucketState*(
|
||||
store: SqliteRateLimitStore, bucketState: BucketState
|
||||
): Future[bool] {.async.} =
|
||||
try:
|
||||
# Convert Moment to Unix seconds for storage
|
||||
let lastTimeSeconds = bucketState.lastTimeFull.epochSeconds()
|
||||
store.db.exec(
|
||||
sql"""
|
||||
UPDATE bucket_state
|
||||
SET budget = ?, budget_cap = ?, last_time_full_seconds = ?
|
||||
WHERE id = 1
|
||||
""",
|
||||
bucketState.budget,
|
||||
bucketState.budgetCap,
|
||||
lastTimeSeconds,
|
||||
)
|
||||
return true
|
||||
except:
|
||||
return false
|
||||
|
||||
proc loadBucketState*(store: SqliteRateLimitStore): Future[BucketState] {.async.} =
|
||||
let row = store.db.getRow(
|
||||
sql"""
|
||||
SELECT budget, budget_cap, last_time_full_seconds
|
||||
FROM bucket_state
|
||||
WHERE id = 1
|
||||
"""
|
||||
)
|
||||
# Convert Unix seconds back to Moment (seconds precission)
|
||||
let unixSeconds = row[2].parseInt().int64
|
||||
let lastTimeFull = Moment.init(unixSeconds, chronos.seconds(1))
|
||||
|
||||
return BucketState(
|
||||
budget: row[0].parseInt(), budgetCap: row[1].parseInt(), lastTimeFull: lastTimeFull
|
||||
)
|
||||
13
ratelimit/store/store.nim
Normal file
13
ratelimit/store/store.nim
Normal file
@ -0,0 +1,13 @@
|
||||
import std/[times, deques]
|
||||
import chronos
|
||||
|
||||
type
|
||||
BucketState* = object
|
||||
budget*: int
|
||||
budgetCap*: int
|
||||
lastTimeFull*: Moment
|
||||
|
||||
RateLimitStoreConcept* =
|
||||
concept s
|
||||
s.saveBucketState(BucketState) is Future[bool]
|
||||
s.loadBucketState() is Future[BucketState]
|
||||
198
ratelimit/token_bucket.nim
Normal file
198
ratelimit/token_bucket.nim
Normal file
@ -0,0 +1,198 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import chronos, std/math, std/options
|
||||
|
||||
const BUDGET_COMPENSATION_LIMIT_PERCENT = 0.25
|
||||
|
||||
## This is an extract from chronos/rate_limit.nim due to the found bug in the original implementation.
|
||||
## Unfortunately that bug cannot be solved without harm the original features of TokenBucket class.
|
||||
## So, this current shortcut is used to enable move ahead with nwaku rate limiter implementation.
|
||||
## ref: https://github.com/status-im/nim-chronos/issues/500
|
||||
##
|
||||
## This version of TokenBucket is different from the original one in chronos/rate_limit.nim in many ways:
|
||||
## - It has a new mode called `Compensating` which is the default mode.
|
||||
## Compensation is calculated as the not used bucket capacity in the last measured period(s) in average.
|
||||
## or up until maximum the allowed compansation treshold (Currently it is const 25%).
|
||||
## Also compensation takes care of the proper time period calculation to avoid non-usage periods that can lead to
|
||||
## overcompensation.
|
||||
## - Strict mode is also available which will only replenish when time period is over but also will fill
|
||||
## the bucket to the max capacity.
|
||||
|
||||
type
|
||||
ReplenishMode* = enum
|
||||
Strict
|
||||
Compensating
|
||||
|
||||
TokenBucket* = ref object
|
||||
budget: int ## Current number of tokens in the bucket
|
||||
budgetCap: int ## Bucket capacity
|
||||
lastTimeFull: Moment
|
||||
## This timer measures the proper periodizaiton of the bucket refilling
|
||||
fillDuration: Duration ## Refill period
|
||||
case replenishMode*: ReplenishMode
|
||||
of Strict:
|
||||
## In strict mode, the bucket is refilled only till the budgetCap
|
||||
discard
|
||||
of Compensating:
|
||||
## This is the default mode.
|
||||
maxCompensation: float
|
||||
|
||||
func periodDistance(bucket: TokenBucket, currentTime: Moment): float =
|
||||
## notice fillDuration cannot be zero by design
|
||||
## period distance is a float number representing the calculated period time
|
||||
## since the last time bucket was refilled.
|
||||
return
|
||||
nanoseconds(currentTime - bucket.lastTimeFull).float /
|
||||
nanoseconds(bucket.fillDuration).float
|
||||
|
||||
func getUsageAverageSince(bucket: TokenBucket, distance: float): float =
|
||||
if distance == 0.float:
|
||||
## in case there is zero time difference than the usage percentage is 100%
|
||||
return 1.0
|
||||
|
||||
## budgetCap can never be zero
|
||||
## usage average is calculated as a percentage of total capacity available over
|
||||
## the measured period
|
||||
return bucket.budget.float / bucket.budgetCap.float / distance
|
||||
|
||||
proc calcCompensation(bucket: TokenBucket, averageUsage: float): int =
|
||||
# if we already fully used or even overused the tokens, there is no place for compensation
|
||||
if averageUsage >= 1.0:
|
||||
return 0
|
||||
|
||||
## compensation is the not used bucket capacity in the last measured period(s) in average.
|
||||
## or maximum the allowed compansation treshold
|
||||
let compensationPercent =
|
||||
min((1.0 - averageUsage) * bucket.budgetCap.float, bucket.maxCompensation)
|
||||
return trunc(compensationPercent).int
|
||||
|
||||
func periodElapsed(bucket: TokenBucket, currentTime: Moment): bool =
|
||||
return currentTime - bucket.lastTimeFull >= bucket.fillDuration
|
||||
|
||||
## Update will take place if bucket is empty and trying to consume tokens.
|
||||
## It checks if the bucket can be replenished as refill duration is passed or not.
|
||||
## - strict mode:
|
||||
proc updateStrict(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.fillDuration == default(Duration):
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget)
|
||||
return
|
||||
|
||||
if not periodElapsed(bucket, currentTime):
|
||||
return
|
||||
|
||||
bucket.budget = bucket.budgetCap
|
||||
bucket.lastTimeFull = currentTime
|
||||
|
||||
## - compensating - ballancing load:
|
||||
## - between updates we calculate average load (current bucket capacity / number of periods till last update)
|
||||
## - gives the percentage load used recently
|
||||
## - with this we can replenish bucket up to 100% + calculated leftover from previous period (caped with max treshold)
|
||||
proc updateWithCompensation(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.fillDuration == default(Duration):
|
||||
bucket.budget = min(bucket.budgetCap, bucket.budget)
|
||||
return
|
||||
|
||||
# do not replenish within the same period
|
||||
if not periodElapsed(bucket, currentTime):
|
||||
return
|
||||
|
||||
let distance = bucket.periodDistance(currentTime)
|
||||
let recentAvgUsage = bucket.getUsageAverageSince(distance)
|
||||
let compensation = bucket.calcCompensation(recentAvgUsage)
|
||||
|
||||
bucket.budget = bucket.budgetCap + compensation
|
||||
bucket.lastTimeFull = currentTime
|
||||
|
||||
proc update(bucket: TokenBucket, currentTime: Moment) =
|
||||
if bucket.replenishMode == ReplenishMode.Compensating:
|
||||
updateWithCompensation(bucket, currentTime)
|
||||
else:
|
||||
updateStrict(bucket, currentTime)
|
||||
|
||||
## Returns the available capacity of the bucket: (budget, budgetCap)
|
||||
proc getAvailableCapacity*(
|
||||
bucket: TokenBucket, currentTime: Moment
|
||||
): tuple[budget: int, budgetCap: int] =
|
||||
if periodElapsed(bucket, currentTime):
|
||||
case bucket.replenishMode
|
||||
of ReplenishMode.Strict:
|
||||
return (bucket.budgetCap, bucket.budgetCap)
|
||||
of ReplenishMode.Compensating:
|
||||
let distance = bucket.periodDistance(currentTime)
|
||||
let recentAvgUsage = bucket.getUsageAverageSince(distance)
|
||||
let compensation = bucket.calcCompensation(recentAvgUsage)
|
||||
let availableBudget = bucket.budgetCap + compensation
|
||||
return (availableBudget, bucket.budgetCap)
|
||||
return (bucket.budget, bucket.budgetCap)
|
||||
|
||||
proc tryConsume*(bucket: TokenBucket, tokens: int, now = Moment.now()): bool =
|
||||
## If `tokens` are available, consume them,
|
||||
## Otherwhise, return false.
|
||||
|
||||
if bucket.budget >= bucket.budgetCap:
|
||||
bucket.lastTimeFull = now
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
return true
|
||||
|
||||
bucket.update(now)
|
||||
|
||||
if bucket.budget >= tokens:
|
||||
bucket.budget -= tokens
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc replenish*(bucket: TokenBucket, tokens: int, now = Moment.now()) =
|
||||
## Add `tokens` to the budget (capped to the bucket capacity)
|
||||
bucket.budget += tokens
|
||||
bucket.update(now)
|
||||
|
||||
proc new*(
|
||||
T: type[TokenBucket],
|
||||
budgetCap: int,
|
||||
fillDuration: Duration = 1.seconds,
|
||||
mode: ReplenishMode = ReplenishMode.Compensating,
|
||||
): T =
|
||||
assert not isZero(fillDuration)
|
||||
assert budgetCap != 0
|
||||
|
||||
## Create different mode TokenBucket
|
||||
case mode
|
||||
of ReplenishMode.Strict:
|
||||
return T(
|
||||
budget: budgetCap,
|
||||
budgetCap: budgetCap,
|
||||
fillDuration: fillDuration,
|
||||
lastTimeFull: Moment.now(),
|
||||
replenishMode: mode,
|
||||
)
|
||||
of ReplenishMode.Compensating:
|
||||
T(
|
||||
budget: budgetCap,
|
||||
budgetCap: budgetCap,
|
||||
fillDuration: fillDuration,
|
||||
lastTimeFull: Moment.now(),
|
||||
replenishMode: mode,
|
||||
maxCompensation: budgetCap.float * BUDGET_COMPENSATION_LIMIT_PERCENT,
|
||||
)
|
||||
|
||||
proc newStrict*(T: type[TokenBucket], capacity: int, period: Duration): TokenBucket =
|
||||
T.new(capacity, period, ReplenishMode.Strict)
|
||||
|
||||
proc newCompensating*(
|
||||
T: type[TokenBucket], capacity: int, period: Duration
|
||||
): TokenBucket =
|
||||
T.new(capacity, period, ReplenishMode.Compensating)
|
||||
|
||||
func `$`*(b: TokenBucket): string {.inline.} =
|
||||
if isNil(b):
|
||||
return "nil"
|
||||
return $b.budgetCap & "/" & $b.fillDuration
|
||||
|
||||
func `$`*(ob: Option[TokenBucket]): string {.inline.} =
|
||||
if ob.isNone():
|
||||
return "no-limit"
|
||||
|
||||
return $ob.get()
|
||||
40
tests/test_sqlite_store.nim
Normal file
40
tests/test_sqlite_store.nim
Normal file
@ -0,0 +1,40 @@
|
||||
{.used.}
|
||||
|
||||
import testutils/unittests
|
||||
import ../ratelimit/store/sqlite
|
||||
import ../ratelimit/store/store
|
||||
import chronos
|
||||
|
||||
suite "SqliteRateLimitStore Tests":
|
||||
asyncTest "newSqliteRateLimitStore - creates store with default values":
|
||||
## Given & When
|
||||
let now = Moment.now()
|
||||
let store = newSqliteRateLimitStore()
|
||||
defer:
|
||||
store.close()
|
||||
|
||||
## Then
|
||||
let bucketState = await store.loadBucketState()
|
||||
check bucketState.budget == 10
|
||||
check bucketState.budgetCap == 10
|
||||
check bucketState.lastTimeFull.epochSeconds() == now.epochSeconds()
|
||||
|
||||
asyncTest "saveBucketState and loadBucketState - state persistence":
|
||||
## Given
|
||||
let now = Moment.now()
|
||||
let store = newSqliteRateLimitStore()
|
||||
defer:
|
||||
store.close()
|
||||
|
||||
let newBucketState = BucketState(budget: 5, budgetCap: 20, lastTimeFull: now)
|
||||
|
||||
## When
|
||||
let saveResult = await store.saveBucketState(newBucketState)
|
||||
let loadedState = await store.loadBucketState()
|
||||
|
||||
## Then
|
||||
check saveResult == true
|
||||
check loadedState.budget == newBucketState.budget
|
||||
check loadedState.budgetCap == newBucketState.budgetCap
|
||||
check loadedState.lastTimeFull.epochSeconds() ==
|
||||
newBucketState.lastTimeFull.epochSeconds()
|
||||
Loading…
x
Reference in New Issue
Block a user