drop old msgs to be relayed

This commit is contained in:
Diego 2024-01-15 18:40:14 +01:00
parent b16ec00327
commit a10b8af737
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806

View File

@ -52,6 +52,10 @@ type
DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init
OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].}
Ttlmessage* = object
msg*: seq[byte]
ttl*: Moment
RpcMessageQueue* = ref object
# Tracks async tasks for sending high-priority peer-published messages.
sendPriorityQueue: Deque[Future[void]]
@ -59,6 +63,8 @@ type
nonPriorityQueue: AsyncQueue[seq[byte]]
# Task for processing non-priority message queue.
sendNonPriorityTask: Future[void]
# The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms.
maxDurationInNonPriorityQueue: Duration
PubSubPeer* = ref object of RootObj
getConn*: GetConn # callback to establish a new send connection
@ -300,7 +306,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) {
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId])
else:
await p.rpcmessagequeue.nonPriorityQueue.addLast(msg)
await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now()))
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId])
trace "message queued", p, msg = shortLog(msg)
@ -386,7 +392,9 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} =
await p.rpcmessagequeue.sendPriorityQueue[^1]
when defined(libp2p_expensive_metrics):
libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId])
await p.sendMsg(msg)
if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue:
continue
await p.sendMsg(ttlMsg.msg)
proc startSendNonPriorityTask(p: PubSubPeer) =
debug "starting sendNonPriorityTask", p
@ -404,10 +412,11 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) =
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
proc new(T: typedesc[RpcMessageQueue]): T =
proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 500.milliseconds): T =
return T(
sendPriorityQueue: initDeque[Future[void]](),
nonPriorityQueue: newAsyncQueue[seq[byte]](),
nonPriorityQueue: newAsyncQueue[Ttlmessage](),
maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue,
)
proc new*(