diff --git a/libp2p/protocols/pubsub/gossipsub/scoring.nim b/libp2p/protocols/pubsub/gossipsub/scoring.nim index ee4f34da0..391535726 100644 --- a/libp2p/protocols/pubsub/gossipsub/scoring.nim +++ b/libp2p/protocols/pubsub/gossipsub/scoring.nim @@ -92,6 +92,7 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 = proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} = try: await g.switch.disconnect(peer.peerId) + peer.stopProcessingMessages() except CatchableError as exc: # Never cancelled trace "Failed to close connection", peer, error = exc.name, msg = exc.msg diff --git a/libp2p/protocols/pubsub/pubsubpeer.nim b/libp2p/protocols/pubsub/pubsubpeer.nim index b92c05dff..ed67709bd 100644 --- a/libp2p/protocols/pubsub/pubsubpeer.nim +++ b/libp2p/protocols/pubsub/pubsubpeer.nim @@ -74,7 +74,8 @@ type behaviourPenalty*: float64 # the eventual penalty score overheadRateLimitOpt*: Opt[TokenBucket] - rpcmessagequeue*: RpcMessageQueue + rpcmessagequeue: RpcMessageQueue + queueProcessingTask: Future[void] RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void] {.gcsafe, raises: [].} @@ -346,9 +347,9 @@ proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Opt[seq[byte]] else: return Opt.none(seq[byte]) -proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} = +proc processMessages(p: PubSubPeer) {.async.} = while true: - let m = queue.getMessage(p) + let m = p.rpcmessagequeue.getMessage(p) m.withValue(msg): if p.sendConn == nil: # Wait for a send conn to be setup. `connectOnce` will @@ -376,6 +377,17 @@ proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} = # Include a delay or a mechanism to prevent this loop from consuming too much CPU await sleepAsync(10) # For example, a 10 ms sleep +proc startProcessingMessages(p: PubSubPeer) = + if p.queueProcessingTask.isNil: + p.queueProcessingTask = p.processMessages() + +proc stopProcessingMessages*(p: PubSubPeer) = + if not p.queueProcessingTask.isNil: + p.queueProcessingTask.cancel() + p.rpcmessagequeue.clear() + libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0) + proc new*( T: typedesc[PubSubPeer], peerId: PeerId, @@ -397,4 +409,4 @@ proc new*( ) result.sentIHaves.addFirst(default(HashSet[MessageId])) result.heDontWants.addFirst(default(HashSet[MessageId])) - asyncSpawn result.processMessages(result.rpcmessagequeue) + result.startProcessingMessages() diff --git a/libp2p/protocols/pubsub/rpcmessagequeue.nim b/libp2p/protocols/pubsub/rpcmessagequeue.nim index 3abb492fa..f90f7e315 100644 --- a/libp2p/protocols/pubsub/rpcmessagequeue.nim +++ b/libp2p/protocols/pubsub/rpcmessagequeue.nim @@ -48,3 +48,7 @@ proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] = Opt.none(seq[byte]) else: Opt.none(seq[byte]) + +proc clear*(rpcMessageQueue: RpcMessageQueue) = + rpcMessageQueue.priorityQueue.clear() + rpcMessageQueue.nonPriorityQueue.clear() \ No newline at end of file