diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 2ce2651..0295561 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -98,9 +98,8 @@ const findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages ## that will be processed maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message - lookupInterval = 60.seconds ## Interval of launching a random lookup to - ## populate the routing table. go-ethereum seems to do 3 runs every 30 - ## minutes. Trinity starts one every minute. + refreshInterval = 5.minutes ## Interval of launching a random query to + ## refresh the routing table. revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this ## value in milliseconds handshakeTimeout* = 2.seconds ## timeout for the reply on the @@ -118,8 +117,9 @@ type routingTable: RoutingTable codec*: Codec awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] - queryLoop: Future[void] + refreshLoop: Future[void] revalidateLoop: Future[void] + lastLookup: chronos.Moment bootstrapRecords*: seq[Record] rng*: ref BrHmacDrbgContext @@ -717,6 +717,7 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] if closestNodes.len > BUCKET_SIZE: closestNodes.del(closestNodes.high()) + d.lastLookup = now(chronos.Moment) return closestNodes proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] @@ -765,6 +766,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] if not seen.containsOrIncl(n.id): queryBuffer.add(n) + d.lastLookup = now(chronos.Moment) return queryBuffer proc queryRandom*(d: Protocol): Future[seq[Node]] @@ -777,6 +779,18 @@ proc queryRandom*(d: Protocol): Future[seq[Node]] return await d.query(id) +proc queryRandom*(d: Protocol, enrField: (string, seq[byte])): + Future[seq[Node]] {.async, raises:[Exception, Defect].} = + ## Perform a query for a random target, return all nodes discovered which + ## contain enrField. + let nodes = await d.queryRandom() + var filtered: seq[Node] + for n in nodes: + if n.record.contains(enrField): + filtered.add(n) + + return filtered + proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async, raises: [Exception, Defect].} = ## Resolve a `Node` based on provided `NodeId`. @@ -816,6 +830,8 @@ proc revalidateNode*(d: Protocol, n: Node) discard d.addNode(nodes[][0]) proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = + ## Loop which revalidates the nodes in the routing table by sending the ping + ## message. # TODO: General Exception raised. try: while true: @@ -826,19 +842,24 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = except CancelledError: trace "revalidateLoop canceled" -proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} = +proc refreshLoop(d: Protocol) {.async, raises: [Exception, Defect].} = + ## Loop that refreshes the routing table by starting a random query in case + ## no queries were done since `refreshInterval` or more. # TODO: General Exception raised. try: - # query target self (neighbour nodes) + # start with a query target self (neighbour nodes) let selfQuery = await d.query(d.localNode.id) trace "Discovered nodes in self target query", nodes = selfQuery.len while true: - let randomQuery = await d.queryRandom() - trace "Discovered nodes in random target query", nodes = randomQuery.len - debug "Total nodes in discv5 routing table", total = d.routingTable.len() - await sleepAsync(lookupInterval) + let currentTime = now(chronos.Moment) + if currentTime > (d.lastLookup + refreshInterval): + let randomQuery = await d.queryRandom() + trace "Discovered nodes in random target query", nodes = randomQuery.len + debug "Total nodes in discv5 routing table", total = d.routingTable.len() + + await sleepAsync(refreshInterval) except CancelledError: - trace "queryLoop canceled" + trace "refreshLoop canceled" proc newProtocol*(privKey: PrivateKey, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, @@ -901,7 +922,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} = debug "Bootstrap node could not be added", uri = toURI(record) proc start*(d: Protocol) {.raises: [Exception, Defect].} = - d.queryLoop = queryLoop(d) + d.refreshLoop = refreshLoop(d) d.revalidateLoop = revalidateLoop(d) proc close*(d: Protocol) {.raises: [Exception, Defect].} = @@ -910,8 +931,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() - if not d.queryLoop.isNil: - d.queryLoop.cancel() + if not d.refreshLoop.isNil: + d.refreshLoop.cancel() d.transp.close() @@ -921,7 +942,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() - if not d.queryLoop.isNil: - await d.queryLoop.cancelAndWait() + if not d.refreshLoop.isNil: + await d.refreshLoop.cancelAndWait() await d.transp.closeWait()