From a506739b232623870ab780c81effbd3bd664511d Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 17 Dec 2020 15:22:48 +0100 Subject: [PATCH] Add query proc and use this for recurrent queries This query proc is similar to the original (faulty) lookup proc. But as we don't need to look for specific targets, it can be used still as it gives a quicker and broader search resulting in more nodes. --- eth/p2p/discoveryv5/protocol.nim | 81 +++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 17 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 1655c63..fd54c5e 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -116,7 +116,7 @@ type routingTable: RoutingTable codec*: Codec awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] - lookupLoop: Future[void] + queryLoop: Future[void] revalidateLoop: Future[void] bootstrapRecords*: seq[Record] rng*: ref BrHmacDrbgContext @@ -710,16 +710,63 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] return closestNodes -proc lookupRandom*(d: Protocol): Future[seq[Node]] +proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] + {.async, raises: [Exception, Defect].} = + ## Query k nodes for the given target, returns all nodes found, including the + ## nodes queried. + ## + ## This will take k nodes from the routing table closest to target and + ## query them for nodes closest to target. If there are less than k nodes in + ## the routing table, nodes returned by the first queries will be used. + var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in queryBuffer: + seen.incl(node.id) + + var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + + while true: + var i = 0 + while i < min(queryBuffer.len, k) and pendingQueries.len < alpha: + let n = queryBuffer[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(d.lookupWorker(n, target)) + inc i + + trace "discv5 pending queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got discv5 lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + else: + error "Resulting query should have been in the pending queries" + + let nodes = query.read + # TODO: Remove node on timed-out query? + for n in nodes: + if not seen.containsOrIncl(n.id): + queryBuffer.add(n) + + return queryBuffer + +proc queryRandom*(d: Protocol): Future[seq[Node]] {.async, raises:[Exception, Defect].} = - ## Perform a lookup for a random target, return the closest n nodes to the - ## target. Maximum value for n is `BUCKET_SIZE`. + ## Perform a query for a random target, return all nodes discovered. var id: NodeId var buf: array[sizeof(id), byte] brHmacDrbgGenerate(d.rng[], buf) copyMem(addr id, addr buf[0], sizeof(id)) - return await d.lookup(id) + return await d.query(id) proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async, raises: [Exception, Defect].} = @@ -770,19 +817,19 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = except CancelledError: trace "revalidateLoop canceled" -proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} = +proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} = # TODO: General Exception raised. try: - # lookup self (neighbour nodes) - let selfLookup = await d.lookup(d.localNode.id) - trace "Discovered nodes in self lookup", nodes = selfLookup + # query target self (neighbour nodes) + let selfQuery = await d.query(d.localNode.id) + trace "Discovered nodes in self target query", nodes = selfQuery.len while true: - let randomLookup = await d.lookupRandom() - trace "Discovered nodes in random lookup", nodes = randomLookup + let randomQuery = await d.queryRandom() + trace "Discovered nodes in random target query", nodes = randomQuery.len debug "Total nodes in discv5 routing table", total = d.routingTable.len() await sleepAsync(lookupInterval) except CancelledError: - trace "lookupLoop canceled" + trace "queryLoop canceled" proc newProtocol*(privKey: PrivateKey, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, @@ -845,7 +892,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} = debug "Bootstrap node could not be added", uri = toURI(record) proc start*(d: Protocol) {.raises: [Exception, Defect].} = - d.lookupLoop = lookupLoop(d) + d.queryLoop = queryLoop(d) d.revalidateLoop = revalidateLoop(d) proc close*(d: Protocol) {.raises: [Exception, Defect].} = @@ -854,8 +901,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() - if not d.lookupLoop.isNil: - d.lookupLoop.cancel() + if not d.queryLoop.isNil: + d.queryLoop.cancel() d.transp.close() @@ -865,7 +912,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() - if not d.lookupLoop.isNil: - await d.lookupLoop.cancelAndWait() + if not d.queryLoop.isNil: + await d.queryLoop.cancelAndWait() await d.transp.closeWait()