clean up after peer is disconnected

This commit is contained in:
Diego 2024-01-17 17:38:25 +01:00
parent a649c3162b
commit aa29406c96
No known key found for this signature in database
GPG Key ID: C9DAC9BF68D1F806
3 changed files with 21 additions and 4 deletions

View File

@ -92,6 +92,7 @@ proc colocationFactor(g: GossipSub, peer: PubSubPeer): float64 =
proc disconnectPeer*(g: GossipSub, peer: PubSubPeer) {.async.} =
try:
await g.switch.disconnect(peer.peerId)
peer.stopProcessingMessages()
except CatchableError as exc: # Never cancelled
trace "Failed to close connection", peer, error = exc.name, msg = exc.msg

View File

@ -74,7 +74,8 @@ type
behaviourPenalty*: float64 # the eventual penalty score
overheadRateLimitOpt*: Opt[TokenBucket]
rpcmessagequeue*: RpcMessageQueue
rpcmessagequeue: RpcMessageQueue
queueProcessingTask: Future[void]
RPCHandler* = proc(peer: PubSubPeer, data: seq[byte]): Future[void]
{.gcsafe, raises: [].}
@ -346,9 +347,9 @@ proc getMessage(rpcMessageQueue: RpcMessageQueue, p: PubSubPeer): Opt[seq[byte]]
else:
return Opt.none(seq[byte])
proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} =
proc processMessages(p: PubSubPeer) {.async.} =
while true:
let m = queue.getMessage(p)
let m = p.rpcmessagequeue.getMessage(p)
m.withValue(msg):
if p.sendConn == nil:
# Wait for a send conn to be setup. `connectOnce` will
@ -376,6 +377,17 @@ proc processMessages(p: PubSubPeer, queue: RpcMessageQueue) {.async.} =
# Include a delay or a mechanism to prevent this loop from consuming too much CPU
await sleepAsync(10) # For example, a 10 ms sleep
proc startProcessingMessages(p: PubSubPeer) =
if p.queueProcessingTask.isNil:
p.queueProcessingTask = p.processMessages()
proc stopProcessingMessages*(p: PubSubPeer) =
if not p.queueProcessingTask.isNil:
p.queueProcessingTask.cancel()
p.rpcmessagequeue.clear()
libp2p_gossipsub_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
libp2p_gossipsub_non_priority_queue_size.set(labelValues = [$p.peerId], value = 0)
proc new*(
T: typedesc[PubSubPeer],
peerId: PeerId,
@ -397,4 +409,4 @@ proc new*(
)
result.sentIHaves.addFirst(default(HashSet[MessageId]))
result.heDontWants.addFirst(default(HashSet[MessageId]))
asyncSpawn result.processMessages(result.rpcmessagequeue)
result.startProcessingMessages()

View File

@ -48,3 +48,7 @@ proc getNonPriorityMessage*(rpcMessageQueue: RpcMessageQueue): Opt[seq[byte]] =
Opt.none(seq[byte])
else:
Opt.none(seq[byte])
proc clear*(rpcMessageQueue: RpcMessageQueue) =
rpcMessageQueue.priorityQueue.clear()
rpcMessageQueue.nonPriorityQueue.clear()