Apply suggestions from code review

Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
Pablo Lopez 2025-07-16 16:10:23 +03:00 committed by GitHub
parent f8e2ace1df
commit b9e80bcc37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -4,10 +4,10 @@ import waku/common/rate_limit/token_bucket
import chronos import chronos
type type
CapacityState = enum CapacityState {.pure.} = enum
Normal = 0 Normal
AlmostNone = 1 AlmostNone
None = 2 None
SendResult* = enum SendResult* = enum
PassedToSender = 0 PassedToSender = 0
@ -19,23 +19,23 @@ type
Critical = 0 Critical = 0
Normal = 1 Normal = 1
Optional = 2 Optional = 2
MsgIdMsg = tuple[msgId: string, msg: T]
Serializable* = Serializable* =
concept x concept x
x.toBytes() is seq[byte] x.toBytes() is seq[byte]
MessageSender*[T: Serializable] = MessageSender*[T: Serializable] =
proc(msgs: seq[tuple[msgId: string, msg: T]]): Future[void] {.async.} proc(msgs: seq[tuple[msgId: string, msg: T]]) {.async.}
RateLimitManager*[T: Serializable] = ref object RateLimitManager*[T: Serializable] = ref object
bucket: TokenBucket bucket: TokenBucket
sender: MessageSender[T] sender: MessageSender[T]
running: bool running: bool
queueCritical: Deque[seq[tuple[msgId: string, msg: T]]] queueCritical: Deque[tuple[msgId: string, msg: T]]
queueNormal: Deque[seq[tuple[msgId: string, msg: T]]] queueNormal: Deque[seq[tuple[msgId: string, msg: T]]]
sleepDuration: chronos.Duration sleepDuration: chronos.Duration
proc newRateLimitManager*[T: Serializable]( proc new*[T: Serializable](
sender: MessageSender[T], sender: MessageSender[T],
capacity: int = 100, capacity: int = 100,
duration: chronos.Duration = chronos.minutes(10), duration: chronos.Duration = chronos.minutes(10),
@ -167,19 +167,19 @@ proc start*[T: Serializable](
manager: RateLimitManager[T], manager: RateLimitManager[T],
nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} = nowProvider: proc(): Moment {.gcsafe.} = proc(): Moment {.gcsafe.} =
Moment.now(), Moment.now(),
): Future[void] {.async.} = ) {.async.} =
manager.running = true manager.running = true
while manager.running: while true:
try: try:
let now = nowProvider() let now = nowProvider()
await manager.processCriticalQueue(now) await manager.processCriticalQueue(now)
await manager.processNormalQueue(now) await manager.processNormalQueue(now)
# configurable sleep duration for processing queued messages # configurable sleep duration for processing queued messages
await sleepAsync(manager.sleepDuration)
except Exception as e: except Exception as e:
echo "Error in queue processing: ", e.msg echo "Error in queue processing: ", e.msg
await sleepAsync(manager.sleepDuration)
await sleepAsync(manager.sleepDuration)
proc stop*[T: Serializable](manager: RateLimitManager[T]) = proc stop*[T: Serializable](manager: RateLimitManager[T]) =
manager.running = false manager.running = false