fix: serialize using flatty

This commit is contained in:
pablo 2025-08-12 12:09:57 +03:00
parent 1039d379db
commit b50240942f
No known key found for this signature in database
GPG Key ID: 78F35FCC60FDC63A
5 changed files with 47 additions and 91 deletions

View File

@ -7,9 +7,7 @@ license = "MIT"
srcDir = "src" srcDir = "src"
### Dependencies ### Dependencies
requires "nim >= 2.2.4", requires "nim >= 2.2.4", "chronicles", "chronos", "db_connector", "flatty"
"chronicles", "chronos", "db_connector",
"https://github.com/waku-org/token_bucket.git"
task buildSharedLib, "Build shared library for C bindings": task buildSharedLib, "Build shared library for C bindings":
exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim" exec "nim c --mm:refc --app:lib --out:../library/c-bindings/libchatsdk.so chat_sdk/chat_sdk.nim"

View File

@ -23,18 +23,18 @@ type
Normal Normal
Optional Optional
MsgIdMsg[T: Serializable] = tuple[msgId: string, msg: T] MsgIdMsg[T] = tuple[msgId: string, msg: T]
MessageSender*[T: Serializable] = proc(msgs: seq[MsgIdMsg[T]]) {.async.} MessageSender*[T] = proc(msgs: seq[MsgIdMsg[T]]) {.async.}
RateLimitManager*[T: Serializable] = ref object RateLimitManager*[T] = ref object
store: RateLimitStore[T] store: RateLimitStore[T]
bucket: TokenBucket bucket: TokenBucket
sender: MessageSender[T] sender: MessageSender[T]
sleepDuration: chronos.Duration sleepDuration: chronos.Duration
pxQueueHandleLoop: Future[void] pxQueueHandleLoop: Future[void]
proc new*[T: Serializable]( proc new*[T](
M: type[RateLimitManager[T]], M: type[RateLimitManager[T]],
store: RateLimitStore[T], store: RateLimitStore[T],
sender: MessageSender[T], sender: MessageSender[T],
@ -63,7 +63,7 @@ proc new*[T: Serializable](
sleepDuration: sleepDuration, sleepDuration: sleepDuration,
) )
proc getCapacityState[T: Serializable]( proc getCapacityState[T](
manager: RateLimitManager[T], now: Moment, count: int = 1 manager: RateLimitManager[T], now: Moment, count: int = 1
): CapacityState = ): CapacityState =
let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now) let (budget, budgetCap, _) = manager.bucket.getAvailableCapacity(now)
@ -76,7 +76,7 @@ proc getCapacityState[T: Serializable](
else: else:
return CapacityState.Normal return CapacityState.Normal
proc passToSender[T: Serializable]( proc passToSender[T](
manager: RateLimitManager[T], manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]], msgs: seq[tuple[msgId: string, msg: T]],
now: Moment, now: Moment,
@ -87,10 +87,10 @@ proc passToSender[T: Serializable](
if not consumed: if not consumed:
case priority case priority
of Priority.Critical: of Priority.Critical:
discard await manager.store.addToQueue(QueueType.Critical, msgs) discard await manager.store.pushToQueue(QueueType.Critical, msgs)
return SendResult.Enqueued return SendResult.Enqueued
of Priority.Normal: of Priority.Normal:
discard await manager.store.addToQueue(QueueType.Normal, msgs) discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued return SendResult.Enqueued
of Priority.Optional: of Priority.Optional:
return SendResult.Dropped return SendResult.Dropped
@ -102,7 +102,7 @@ proc passToSender[T: Serializable](
await manager.sender(msgs) await manager.sender(msgs)
return SendResult.PassedToSender return SendResult.PassedToSender
proc processCriticalQueue[T: Serializable]( proc processCriticalQueue[T](
manager: RateLimitManager[T], now: Moment manager: RateLimitManager[T], now: Moment
): Future[void] {.async.} = ): Future[void] {.async.} =
while manager.store.getQueueLength(QueueType.Critical) > 0: while manager.store.getQueueLength(QueueType.Critical) > 0:
@ -119,10 +119,10 @@ proc processCriticalQueue[T: Serializable](
discard await manager.passToSender(msgs, now, Priority.Critical) discard await manager.passToSender(msgs, now, Priority.Critical)
else: else:
# Put back to critical queue (add to front not possible, so we add to back and exit) # Put back to critical queue (add to front not possible, so we add to back and exit)
discard await manager.store.addToQueue(QueueType.Critical, msgs) discard await manager.store.pushToQueue(QueueType.Critical, msgs)
break break
proc processNormalQueue[T: Serializable]( proc processNormalQueue[T](
manager: RateLimitManager[T], now: Moment manager: RateLimitManager[T], now: Moment
): Future[void] {.async.} = ): Future[void] {.async.} =
while manager.store.getQueueLength(QueueType.Normal) > 0: while manager.store.getQueueLength(QueueType.Normal) > 0:
@ -137,10 +137,10 @@ proc processNormalQueue[T: Serializable](
discard await manager.passToSender(msgs, now, Priority.Normal) discard await manager.passToSender(msgs, now, Priority.Normal)
else: else:
# Put back to normal queue (add to front not possible, so we add to back and exit) # Put back to normal queue (add to front not possible, so we add to back and exit)
discard await manager.store.addToQueue(QueueType.Normal, msgs) discard await manager.store.pushToQueue(QueueType.Normal, msgs)
break break
proc sendOrEnqueue*[T: Serializable]( proc sendOrEnqueue*[T](
manager: RateLimitManager[T], manager: RateLimitManager[T],
msgs: seq[tuple[msgId: string, msg: T]], msgs: seq[tuple[msgId: string, msg: T]],
priority: Priority, priority: Priority,
@ -160,22 +160,22 @@ proc sendOrEnqueue*[T: Serializable](
of Priority.Critical: of Priority.Critical:
return await manager.passToSender(msgs, now, priority) return await manager.passToSender(msgs, now, priority)
of Priority.Normal: of Priority.Normal:
discard await manager.store.addToQueue(QueueType.Normal, msgs) discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued return SendResult.Enqueued
of Priority.Optional: of Priority.Optional:
return SendResult.Dropped return SendResult.Dropped
of CapacityState.None: of CapacityState.None:
case priority case priority
of Priority.Critical: of Priority.Critical:
discard await manager.store.addToQueue(QueueType.Critical, msgs) discard await manager.store.pushToQueue(QueueType.Critical, msgs)
return SendResult.Enqueued return SendResult.Enqueued
of Priority.Normal: of Priority.Normal:
discard await manager.store.addToQueue(QueueType.Normal, msgs) discard await manager.store.pushToQueue(QueueType.Normal, msgs)
return SendResult.Enqueued return SendResult.Enqueued
of Priority.Optional: of Priority.Optional:
return SendResult.Dropped return SendResult.Dropped
proc queueHandleLoop*[T: Serializable]( proc queueHandleLoop*[T](
manager: RateLimitManager[T], manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(), Moment.now(),
@ -191,18 +191,18 @@ proc queueHandleLoop*[T: Serializable](
# configurable sleep duration for processing queued messages # configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration) await sleepAsync(manager.sleepDuration)
proc start*[T: Serializable]( proc start*[T](
manager: RateLimitManager[T], manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(), Moment.now(),
) {.async.} = ) {.async.} =
manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider) manager.pxQueueHandleLoop = queueHandleLoop(manager, nowProvider)
proc stop*[T: Serializable](manager: RateLimitManager[T]) {.async.} = proc stop*[T](manager: RateLimitManager[T]) {.async.} =
if not isNil(manager.pxQueueHandleLoop): if not isNil(manager.pxQueueHandleLoop):
await manager.pxQueueHandleLoop.cancelAndWait() await manager.pxQueueHandleLoop.cancelAndWait()
func `$`*[T: Serializable](b: RateLimitManager[T]): string {.inline.} = func `$`*[T](b: RateLimitManager[T]): string {.inline.} =
if isNil(b): if isNil(b):
return "nil" return "nil"
return return

View File

@ -1,20 +1,10 @@
import std/[times, strutils, json, options] import std/[times, strutils, json, options, base64]
import db_connector/db_sqlite import db_connector/db_sqlite
import chronos import chronos
import flatty
# Generic deserialization function for basic types
proc fromBytesImpl(bytes: seq[byte], T: typedesc[string]): string =
# Convert each byte back to a character
result = newString(bytes.len)
for i, b in bytes:
result[i] = char(b)
type type
Serializable* = RateLimitStore*[T] = ref object
concept x
x.toBytes() is seq[byte]
RateLimitStore*[T: Serializable] = ref object
db: DbConn db: DbConn
dbPath: string dbPath: string
criticalLength: int criticalLength: int
@ -32,7 +22,7 @@ type
const BUCKET_STATE_KEY = "rate_limit_bucket_state" const BUCKET_STATE_KEY = "rate_limit_bucket_state"
proc new*[T: Serializable](M: type[RateLimitStore[T]], db: DbConn): M = proc new*[T](M: type[RateLimitStore[T]], db: DbConn): M =
result = M(db: db, criticalLength: 0, normalLength: 0, nextBatchId: 1) result = M(db: db, criticalLength: 0, normalLength: 0, nextBatchId: 1)
# Initialize cached lengths from database # Initialize cached lengths from database
@ -66,7 +56,7 @@ proc new*[T: Serializable](M: type[RateLimitStore[T]], db: DbConn): M =
return result return result
proc saveBucketState*[T: Serializable]( proc saveBucketState*[T](
store: RateLimitStore[T], bucketState: BucketState store: RateLimitStore[T], bucketState: BucketState
): Future[bool] {.async.} = ): Future[bool] {.async.} =
try: try:
@ -88,7 +78,7 @@ proc saveBucketState*[T: Serializable](
except: except:
return false return false
proc loadBucketState*[T: Serializable]( proc loadBucketState*[T](
store: RateLimitStore[T] store: RateLimitStore[T]
): Future[Option[BucketState]] {.async.} = ): Future[Option[BucketState]] {.async.} =
let jsonStr = let jsonStr =
@ -108,7 +98,7 @@ proc loadBucketState*[T: Serializable](
) )
) )
proc pushToQueue*[T: Serializable]( proc pushToQueue*[T](
store: RateLimitStore[T], store: RateLimitStore[T],
queueType: QueueType, queueType: QueueType,
msgs: seq[tuple[msgId: string, msg: T]], msgs: seq[tuple[msgId: string, msg: T]],
@ -123,18 +113,13 @@ proc pushToQueue*[T: Serializable](
store.db.exec(sql"BEGIN TRANSACTION") store.db.exec(sql"BEGIN TRANSACTION")
try: try:
for msg in msgs: for msg in msgs:
# Consistent serialization for all types let serialized = msg.msg.toFlatty()
let msgBytes = msg.msg.toBytes() let msgData = encode(serialized)
# Convert seq[byte] to string for SQLite storage (each byte becomes a character)
var binaryStr = newString(msgBytes.len)
for i, b in msgBytes:
binaryStr[i] = char(b)
store.db.exec( store.db.exec(
sql"INSERT INTO ratelimit_queues (queue_type, msg_id, msg_data, batch_id, created_at) VALUES (?, ?, ?, ?, ?)", sql"INSERT INTO ratelimit_queues (queue_type, msg_id, msg_data, batch_id, created_at) VALUES (?, ?, ?, ?, ?)",
queueTypeStr, queueTypeStr,
msg.msgId, msg.msgId,
binaryStr, msgData,
batchId, batchId,
now, now,
) )
@ -153,7 +138,7 @@ proc pushToQueue*[T: Serializable](
except: except:
return false return false
proc popFromQueue*[T: Serializable]( proc popFromQueue*[T](
store: RateLimitStore[T], queueType: QueueType store: RateLimitStore[T], queueType: QueueType
): Future[Option[seq[tuple[msgId: string, msg: T]]]] {.async.} = ): Future[Option[seq[tuple[msgId: string, msg: T]]]] {.async.} =
try: try:
@ -182,19 +167,10 @@ proc popFromQueue*[T: Serializable](
var msgs: seq[tuple[msgId: string, msg: T]] var msgs: seq[tuple[msgId: string, msg: T]]
for row in rows: for row in rows:
let msgIdStr = row[0] let msgIdStr = row[0]
let msgData = row[1] # SQLite returns BLOB as string where each char is a byte let msgDataB64 = row[1]
# Convert string back to seq[byte] properly (each char in string is a byte)
var msgBytes: seq[byte]
for c in msgData:
msgBytes.add(byte(c))
# Generic deserialization - works for any type that implements fromBytes let serialized = decode(msgDataB64)
when T is string: let msg = serialized.fromFlatty(T)
let msg = fromBytesImpl(msgBytes, T)
msgs.add((msgId: msgIdStr, msg: msg))
else:
# For other types, they need to provide their own fromBytes in the calling context
let msg = fromBytes(msgBytes, T)
msgs.add((msgId: msgIdStr, msg: msg)) msgs.add((msgId: msgIdStr, msg: msg))
# Delete the batch from database # Delete the batch from database
@ -214,9 +190,7 @@ proc popFromQueue*[T: Serializable](
except: except:
return none(seq[tuple[msgId: string, msg: T]]) return none(seq[tuple[msgId: string, msg: T]])
proc getQueueLength*[T: Serializable]( proc getQueueLength*[T](store: RateLimitStore[T], queueType: QueueType): int =
store: RateLimitStore[T], queueType: QueueType
): int =
case queueType case queueType
of QueueType.Critical: of QueueType.Critical:
return store.criticalLength return store.criticalLength

View File

@ -6,10 +6,6 @@ import db_connector/db_sqlite
import ../chat_sdk/migration import ../chat_sdk/migration
import std/[os, options] import std/[os, options]
# Implement the Serializable concept for string
proc toBytes*(s: string): seq[byte] =
cast[seq[byte]](s)
var dbName = "test_ratelimit_manager.db" var dbName = "test_ratelimit_manager.db"
suite "Queue RateLimitManager": suite "Queue RateLimitManager":

View File

@ -3,23 +3,11 @@ import ../ratelimit/store
import chronos import chronos
import db_connector/db_sqlite import db_connector/db_sqlite
import ../chat_sdk/migration import ../chat_sdk/migration
import std/[options, os] import std/[options, os, json]
import flatty
const dbName = "test_store.db" const dbName = "test_store.db"
# Implement the Serializable concept for string (for testing)
proc toBytes*(s: string): seq[byte] =
# Convert each character to a byte
result = newSeq[byte](s.len)
for i, c in s:
result[i] = byte(c)
proc fromBytes*(bytes: seq[byte], T: typedesc[string]): string =
# Convert each byte back to a character
result = newString(bytes.len)
for i, b in bytes:
result[i] = char(b)
suite "SqliteRateLimitStore Tests": suite "SqliteRateLimitStore Tests":
setup: setup:
let db = open(dbName, "", "", "") let db = open(dbName, "", "", "")
@ -81,7 +69,7 @@ suite "SqliteRateLimitStore Tests":
let msgs = @[("msg1", "Hello"), ("msg2", "World")] let msgs = @[("msg1", "Hello"), ("msg2", "World")]
## When ## When
let addResult = await store.addToQueue(QueueType.Critical, msgs) let addResult = await store.pushToQueue(QueueType.Critical, msgs)
## Then ## Then
check addResult == true check addResult == true
@ -110,11 +98,11 @@ suite "SqliteRateLimitStore Tests":
let batch3 = @[("msg3", "Third")] let batch3 = @[("msg3", "Third")]
## When - Add batches ## When - Add batches
let result1 = await store.addToQueue(QueueType.Normal, batch1) let result1 = await store.pushToQueue(QueueType.Normal, batch1)
check result1 == true check result1 == true
let result2 = await store.addToQueue(QueueType.Normal, batch2) let result2 = await store.pushToQueue(QueueType.Normal, batch2)
check result2 == true check result2 == true
let result3 = await store.addToQueue(QueueType.Normal, batch3) let result3 = await store.pushToQueue(QueueType.Normal, batch3)
check result3 == true check result3 == true
## Then - Check lengths ## Then - Check lengths
@ -147,9 +135,9 @@ suite "SqliteRateLimitStore Tests":
let normalMsgs = @[("norm1", "Normal Message")] let normalMsgs = @[("norm1", "Normal Message")]
## When ## When
let critResult = await store.addToQueue(QueueType.Critical, criticalMsgs) let critResult = await store.pushToQueue(QueueType.Critical, criticalMsgs)
check critResult == true check critResult == true
let normResult = await store.addToQueue(QueueType.Normal, normalMsgs) let normResult = await store.pushToQueue(QueueType.Normal, normalMsgs)
check normResult == true check normResult == true
## Then ## Then
@ -180,7 +168,7 @@ suite "SqliteRateLimitStore Tests":
block: block:
let store1 = RateLimitStore[string].new(db) let store1 = RateLimitStore[string].new(db)
let addResult = await store1.addToQueue(QueueType.Critical, msgs) let addResult = await store1.pushToQueue(QueueType.Critical, msgs)
check addResult == true check addResult == true
check store1.getQueueLength(QueueType.Critical) == 1 check store1.getQueueLength(QueueType.Critical) == 1
@ -205,7 +193,7 @@ suite "SqliteRateLimitStore Tests":
largeBatch.add(("msg" & $i, "Message " & $i)) largeBatch.add(("msg" & $i, "Message " & $i))
## When ## When
let addResult = await store.addToQueue(QueueType.Normal, largeBatch) let addResult = await store.pushToQueue(QueueType.Normal, largeBatch)
## Then ## Then
check addResult == true check addResult == true