mirror of https://github.com/status-im/nim-eth.git
discv5: quick fix for leaking pending requests
This commit is contained in:
parent
713f2e3bff
commit
6ec942a195
|
@ -287,13 +287,27 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): Future[seq[Node]]
|
||||||
else:
|
else:
|
||||||
break
|
break
|
||||||
|
|
||||||
proc findNode*(d: Protocol, toNode: Node, distance: uint32): Future[seq[Node]] {.async.} =
|
# TODO: This could be improved to do the clean-up immediatily in case a non
|
||||||
|
# whoareyou response does arrive, but we would need to store the AuthTag
|
||||||
|
# somewhere
|
||||||
|
proc registerRequest(d: Protocol, n: Node, packet: seq[byte], nonce: AuthTag) =
|
||||||
|
let request = PendingRequest(node: n, packet: packet)
|
||||||
|
if not d.pendingRequests.hasKeyOrPut(nonce, request):
|
||||||
|
sleepAsync(responseTimeout).addCallback() do(data: pointer):
|
||||||
|
d.pendingRequests.del(nonce)
|
||||||
|
|
||||||
|
proc sendFindNode(d: Protocol, toNode: Node, distance: uint32): RequestId =
|
||||||
let reqId = newRequestId()
|
let reqId = newRequestId()
|
||||||
let packet = encodePacket(FindNodePacket(distance: distance), reqId)
|
let packet = encodePacket(FindNodePacket(distance: distance), reqId)
|
||||||
let (data, nonce) = d.codec.encodeEncrypted(toNode.id, toNode.address, packet,
|
let (data, nonce) = d.codec.encodeEncrypted(toNode.id, toNode.address, packet,
|
||||||
challenge = nil)
|
challenge = nil)
|
||||||
d.pendingRequests[nonce] = PendingRequest(node: toNode, packet: packet)
|
d.registerRequest(toNode, packet, nonce)
|
||||||
|
|
||||||
d.send(toNode, data)
|
d.send(toNode, data)
|
||||||
|
return reqId
|
||||||
|
|
||||||
|
proc findNode*(d: Protocol, toNode: Node, distance: uint32): Future[seq[Node]] {.async.} =
|
||||||
|
let reqId = sendFindNode(d, toNode, distance)
|
||||||
result = await d.waitNodes(toNode, reqId)
|
result = await d.waitNodes(toNode, reqId)
|
||||||
|
|
||||||
proc lookupDistances(target, dest: NodeId): seq[uint32] =
|
proc lookupDistances(target, dest: NodeId): seq[uint32] =
|
||||||
|
@ -380,56 +394,56 @@ proc processClient(transp: DatagramTransport,
|
||||||
debug "Receive failed", exception = e.name, msg = e.msg,
|
debug "Receive failed", exception = e.name, msg = e.msg,
|
||||||
stacktrace = e.getStackTrace()
|
stacktrace = e.getStackTrace()
|
||||||
|
|
||||||
proc ping(p: Protocol, toNode: Node): RequestId =
|
proc sendPing(d: Protocol, toNode: Node): RequestId =
|
||||||
let
|
let
|
||||||
reqId = newRequestId()
|
reqId = newRequestId()
|
||||||
ping = PingPacket(enrSeq: p.localNode.record.seqNum)
|
ping = PingPacket(enrSeq: d.localNode.record.seqNum)
|
||||||
packet = encodePacket(ping, reqId)
|
packet = encodePacket(ping, reqId)
|
||||||
(data, nonce) = p.codec.encodeEncrypted(toNode.id, toNode.address, packet,
|
(data, nonce) = d.codec.encodeEncrypted(toNode.id, toNode.address, packet,
|
||||||
challenge = nil)
|
challenge = nil)
|
||||||
p.pendingRequests[nonce] = PendingRequest(node: toNode, packet: packet)
|
d.registerRequest(toNode, packet, nonce)
|
||||||
p.send(toNode, data)
|
d.send(toNode, data)
|
||||||
return reqId
|
return reqId
|
||||||
|
|
||||||
proc revalidateNode(p: Protocol, n: Node)
|
proc revalidateNode(d: Protocol, n: Node)
|
||||||
{.async, raises:[Defect, Exception].} = # TODO: Exception
|
{.async, raises:[Defect, Exception].} = # TODO: Exception
|
||||||
trace "Ping to revalidate node", node = $n
|
trace "Ping to revalidate node", node = $n
|
||||||
let reqId = p.ping(n)
|
let reqId = d.sendPing(n)
|
||||||
|
|
||||||
let resp = await p.waitPacket(n, reqId)
|
let resp = await d.waitPacket(n, reqId)
|
||||||
if resp.isSome and resp.get.kind == pong:
|
if resp.isSome and resp.get.kind == pong:
|
||||||
let pong = resp.get.pong
|
let pong = resp.get.pong
|
||||||
if pong.enrSeq > n.record.seqNum:
|
if pong.enrSeq > n.record.seqNum:
|
||||||
# TODO: Request new ENR
|
# TODO: Request new ENR
|
||||||
discard
|
discard
|
||||||
|
|
||||||
p.routingTable.setJustSeen(n)
|
d.routingTable.setJustSeen(n)
|
||||||
trace "Revalidated node", node = $n
|
trace "Revalidated node", node = $n
|
||||||
else:
|
else:
|
||||||
# For now we never remove bootstrap nodes. It might make sense to actually
|
# For now we never remove bootstrap nodes. It might make sense to actually
|
||||||
# do so and to retry them only in case we drop to a really low amount of
|
# do so and to retry them only in case we drop to a really low amount of
|
||||||
# peers in the DHT
|
# peers in the DHT
|
||||||
if n notin p.bootstrapNodes:
|
if n notin d.bootstrapNodes:
|
||||||
trace "Revalidation of node failed, removing node", node = $n
|
trace "Revalidation of node failed, removing node", node = $n
|
||||||
p.routingTable.removeNode(n)
|
d.routingTable.removeNode(n)
|
||||||
# Remove shared secrets when removing the node from routing table.
|
# Remove shared secrets when removing the node from routing table.
|
||||||
# This might be to direct, so we could keep these longer. But better
|
# This might be to direct, so we could keep these longer. But better
|
||||||
# would be to simply not remove the nodes immediatly but only after x
|
# would be to simply not remove the nodes immediatly but only after x
|
||||||
# amount of failures.
|
# amount of failures.
|
||||||
discard p.codec.db.deleteKeys(n.id, n.address)
|
discard d.codec.db.deleteKeys(n.id, n.address)
|
||||||
|
|
||||||
proc revalidateLoop(p: Protocol) {.async.} =
|
proc revalidateLoop(d: Protocol) {.async.} =
|
||||||
try:
|
try:
|
||||||
# TODO: We need to handle actual errors still, which might just allow to
|
# TODO: We need to handle actual errors still, which might just allow to
|
||||||
# continue the loop. However, currently `revalidateNode` raises a general
|
# continue the loop. However, currently `revalidateNode` raises a general
|
||||||
# `Exception` making this rather hard.
|
# `Exception` making this rather hard.
|
||||||
while true:
|
while true:
|
||||||
await sleepAsync(rand(10 * 1000).milliseconds)
|
await sleepAsync(rand(10 * 1000).milliseconds)
|
||||||
let n = p.routingTable.nodeToRevalidate()
|
let n = d.routingTable.nodeToRevalidate()
|
||||||
if not n.isNil:
|
if not n.isNil:
|
||||||
# TODO: Should we do these in parallel and/or async to be certain of how
|
# TODO: Should we do these in parallel and/or async to be certain of how
|
||||||
# often nodes are revalidated?
|
# often nodes are revalidated?
|
||||||
await p.revalidateNode(n)
|
await d.revalidateNode(n)
|
||||||
except CancelledError:
|
except CancelledError:
|
||||||
trace "revalidateLoop canceled"
|
trace "revalidateLoop canceled"
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue