diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index f69bcd2..2e2c41c 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -685,7 +685,11 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId, fast: bool): # Attempt to add all nodes discovered for n in result: if d.addNode(n): - d.trackedFutures.add(d.removeIfClientMode(n)) + let fut = d.removeIfClientMode(n) + fut.addCallback(proc(data: pointer) = + d.trackedFutures.remove(fut) + ) + d.trackedFutures.add(fut) proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] {.async.} = ## Perform a lookup for the given target, return the closest n nodes to the @@ -971,9 +975,6 @@ proc populateTable*(d: Protocol) {.async.} = total = d.routingTable.len() proc revalidateNode*(d: Protocol, n: Node) {.async.} = - # Prune completed futures to avoid unbounded growth - d.trackedFutures.keepItIf(not it.finished) - let pong = await d.ping(n) if pong.isOk(): @@ -1252,8 +1253,6 @@ proc closeWait*(d: Protocol) {.async.} = if not d.ipMajorityLoop.isNil: await d.ipMajorityLoop.cancelAndWait() - for fut in d.trackedFutures: - if not fut.finished: - await fut.cancelAndWait() + d.trackedFutures.cancelTracked() await d.transport.closeWait() diff --git a/codexdht/private/eth/p2p/discoveryv5/transport.nim b/codexdht/private/eth/p2p/discoveryv5/transport.nim index d6ce8b0..afd4d06 100644 --- a/codexdht/private/eth/p2p/discoveryv5/transport.nim +++ b/codexdht/private/eth/p2p/discoveryv5/transport.nim @@ -238,7 +238,10 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = # We keep adding the node in the line above in order to not break anything. # Then we remove the node if it using client mode. # The operation is async because the check is done over TalkProtocol. - t.client.trackedFutures.add(t.client.removeIfClientMode(node)) + let fut = t.client.removeIfClientMode(node) + fut.addCallback(proc(data: pointer) = + t.client.trackedFutures.remove(fut)) + t.client.trackedFutures.add(fut) discard t.sendPending(node) else: