From 6ec942a195d3e3fcdff5e5fa39a458f06f4028aa Mon Sep 17 00:00:00 2001 From: kdeme Date: Sun, 22 Mar 2020 22:14:10 +0100 Subject: [PATCH] discv5: quick fix for leaking pending requests --- eth/p2p/discoveryv5/protocol.nim | 48 +++++++++++++++++++++----------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index d7dc0ae..80ec19d 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -287,13 +287,27 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): Future[seq[Node]] else: break -proc findNode*(d: Protocol, toNode: Node, distance: uint32): Future[seq[Node]] {.async.} = +# TODO: This could be improved to do the clean-up immediatily in case a non +# whoareyou response does arrive, but we would need to store the AuthTag +# somewhere +proc registerRequest(d: Protocol, n: Node, packet: seq[byte], nonce: AuthTag) = + let request = PendingRequest(node: n, packet: packet) + if not d.pendingRequests.hasKeyOrPut(nonce, request): + sleepAsync(responseTimeout).addCallback() do(data: pointer): + d.pendingRequests.del(nonce) + +proc sendFindNode(d: Protocol, toNode: Node, distance: uint32): RequestId = let reqId = newRequestId() let packet = encodePacket(FindNodePacket(distance: distance), reqId) let (data, nonce) = d.codec.encodeEncrypted(toNode.id, toNode.address, packet, challenge = nil) - d.pendingRequests[nonce] = PendingRequest(node: toNode, packet: packet) + d.registerRequest(toNode, packet, nonce) + d.send(toNode, data) + return reqId + +proc findNode*(d: Protocol, toNode: Node, distance: uint32): Future[seq[Node]] {.async.} = + let reqId = sendFindNode(d, toNode, distance) result = await d.waitNodes(toNode, reqId) proc lookupDistances(target, dest: NodeId): seq[uint32] = @@ -380,56 +394,56 @@ proc processClient(transp: DatagramTransport, debug "Receive failed", exception = e.name, msg = e.msg, stacktrace = e.getStackTrace() -proc ping(p: Protocol, toNode: Node): RequestId = +proc sendPing(d: Protocol, toNode: Node): RequestId = let reqId = newRequestId() - ping = PingPacket(enrSeq: p.localNode.record.seqNum) + ping = PingPacket(enrSeq: d.localNode.record.seqNum) packet = encodePacket(ping, reqId) - (data, nonce) = p.codec.encodeEncrypted(toNode.id, toNode.address, packet, + (data, nonce) = d.codec.encodeEncrypted(toNode.id, toNode.address, packet, challenge = nil) - p.pendingRequests[nonce] = PendingRequest(node: toNode, packet: packet) - p.send(toNode, data) + d.registerRequest(toNode, packet, nonce) + d.send(toNode, data) return reqId -proc revalidateNode(p: Protocol, n: Node) +proc revalidateNode(d: Protocol, n: Node) {.async, raises:[Defect, Exception].} = # TODO: Exception trace "Ping to revalidate node", node = $n - let reqId = p.ping(n) + let reqId = d.sendPing(n) - let resp = await p.waitPacket(n, reqId) + let resp = await d.waitPacket(n, reqId) if resp.isSome and resp.get.kind == pong: let pong = resp.get.pong if pong.enrSeq > n.record.seqNum: # TODO: Request new ENR discard - p.routingTable.setJustSeen(n) + d.routingTable.setJustSeen(n) trace "Revalidated node", node = $n else: # For now we never remove bootstrap nodes. It might make sense to actually # do so and to retry them only in case we drop to a really low amount of # peers in the DHT - if n notin p.bootstrapNodes: + if n notin d.bootstrapNodes: trace "Revalidation of node failed, removing node", node = $n - p.routingTable.removeNode(n) + d.routingTable.removeNode(n) # Remove shared secrets when removing the node from routing table. # This might be to direct, so we could keep these longer. But better # would be to simply not remove the nodes immediatly but only after x # amount of failures. - discard p.codec.db.deleteKeys(n.id, n.address) + discard d.codec.db.deleteKeys(n.id, n.address) -proc revalidateLoop(p: Protocol) {.async.} = +proc revalidateLoop(d: Protocol) {.async.} = try: # TODO: We need to handle actual errors still, which might just allow to # continue the loop. However, currently `revalidateNode` raises a general # `Exception` making this rather hard. while true: await sleepAsync(rand(10 * 1000).milliseconds) - let n = p.routingTable.nodeToRevalidate() + let n = d.routingTable.nodeToRevalidate() if not n.isNil: # TODO: Should we do these in parallel and/or async to be certain of how # often nodes are revalidated? - await p.revalidateNode(n) + await d.revalidateNode(n) except CancelledError: trace "revalidateLoop canceled"