diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index 687243e6f..c2f23b2d9 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -52,6 +52,10 @@ type DropConn* = proc(peer: PubSubPeer) {.gcsafe, raises: [].} # have to pass peer as it's unknown during init OnEvent* = proc(peer: PubSubPeer, event: PubSubPeerEvent) {.gcsafe, raises: [].} + Ttlmessage* = object + msg*: seq[byte] + ttl*: Moment + RpcMessageQueue* = ref object # Tracks async tasks for sending high-priority peer-published messages. sendPriorityQueue: Deque[Future[void]] @@ -59,6 +63,8 @@ type nonPriorityQueue: AsyncQueue[seq[byte]] # Task for processing non-priority message queue. sendNonPriorityTask: Future[void] + # The max duration a message to be relayed can wait to be sent before it is dropped. The default is 500ms. + maxDurationInNonPriorityQueue: Duration PubSubPeer* = ref object of RootObj getConn*: GetConn # callback to establish a new send connection @@ -300,7 +306,7 @@ proc sendEncoded*(p: PubSubPeer, msg: seq[byte], isHighPriority: bool = false) { when defined(libp2p_expensive_metrics): libp2p_gossipsub_priority_queue_size.inc(labelValues = [$p.peerId]) else: - await p.rpcmessagequeue.nonPriorityQueue.addLast(msg) + await p.rpcmessagequeue.nonPriorityQueue.addLast(Ttlmessage(msg: msg, ttl: Moment.now())) when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.inc(labelValues = [$p.peerId]) trace "message queued", p, msg = shortLog(msg) @@ -386,7 +392,9 @@ proc sendNonPriorityTask(p: PubSubPeer) {.async.} = await p.rpcmessagequeue.sendPriorityQueue[^1] when defined(libp2p_expensive_metrics): libp2p_gossipsub_non_priority_queue_size.dec(labelValues = [$p.peerId]) - await p.sendMsg(msg) + if Moment.now() - ttlMsg.ttl >= p.rpcmessagequeue.maxDurationInNonPriorityQueue: + continue + await p.sendMsg(ttlMsg.msg) proc startSendNonPriorityTask(p: PubSubPeer) = debug "starting sendNonPriorityTask", p @@ -404,10 +412,11 @@ proc stopSendNonPriorityTask*(p: PubSubPeer) = libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) -proc new(T: typedesc[RpcMessageQueue]): T = +proc new(T: typedesc[RpcMessageQueue], maxDurationInNonPriorityQueue = 500.milliseconds): T = return T( sendPriorityQueue: initDeque[Future[void]](), - nonPriorityQueue: newAsyncQueue[seq[byte]](), + nonPriorityQueue: newAsyncQueue[Ttlmessage](), + maxDurationInNonPriorityQueue: maxDurationInNonPriorityQueue, ) proc new*(