diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 1655c63..fd54c5e 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -116,7 +116,7 @@ type routingTable: RoutingTable codec*: Codec awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] - lookupLoop: Future[void] + queryLoop: Future[void] revalidateLoop: Future[void] bootstrapRecords*: seq[Record] rng*: ref BrHmacDrbgContext @@ -710,16 +710,63 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] return closestNodes -proc lookupRandom*(d: Protocol): Future[seq[Node]] +proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] + {.async, raises: [Exception, Defect].} = + ## Query k nodes for the given target, returns all nodes found, including the + ## nodes queried. + ## + ## This will take k nodes from the routing table closest to target and + ## query them for nodes closest to target. If there are less than k nodes in + ## the routing table, nodes returned by the first queries will be used. + var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in queryBuffer: + seen.incl(node.id) + + var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + + while true: + var i = 0 + while i < min(queryBuffer.len, k) and pendingQueries.len < alpha: + let n = queryBuffer[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(d.lookupWorker(n, target)) + inc i + + trace "discv5 pending queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got discv5 lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + else: + error "Resulting query should have been in the pending queries" + + let nodes = query.read + # TODO: Remove node on timed-out query? + for n in nodes: + if not seen.containsOrIncl(n.id): + queryBuffer.add(n) + + return queryBuffer + +proc queryRandom*(d: Protocol): Future[seq[Node]] {.async, raises:[Exception, Defect].} = - ## Perform a lookup for a random target, return the closest n nodes to the - ## target. Maximum value for n is `BUCKET_SIZE`. + ## Perform a query for a random target, return all nodes discovered. var id: NodeId var buf: array[sizeof(id), byte] brHmacDrbgGenerate(d.rng[], buf) copyMem(addr id, addr buf[0], sizeof(id)) - return await d.lookup(id) + return await d.query(id) proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async, raises: [Exception, Defect].} = @@ -770,19 +817,19 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = except CancelledError: trace "revalidateLoop canceled" -proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} = +proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} = # TODO: General Exception raised. try: - # lookup self (neighbour nodes) - let selfLookup = await d.lookup(d.localNode.id) - trace "Discovered nodes in self lookup", nodes = selfLookup + # query target self (neighbour nodes) + let selfQuery = await d.query(d.localNode.id) + trace "Discovered nodes in self target query", nodes = selfQuery.len while true: - let randomLookup = await d.lookupRandom() - trace "Discovered nodes in random lookup", nodes = randomLookup + let randomQuery = await d.queryRandom() + trace "Discovered nodes in random target query", nodes = randomQuery.len debug "Total nodes in discv5 routing table", total = d.routingTable.len() await sleepAsync(lookupInterval) except CancelledError: - trace "lookupLoop canceled" + trace "queryLoop canceled" proc newProtocol*(privKey: PrivateKey, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, @@ -845,7 +892,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} = debug "Bootstrap node could not be added", uri = toURI(record) proc start*(d: Protocol) {.raises: [Exception, Defect].} = - d.lookupLoop = lookupLoop(d) + d.queryLoop = queryLoop(d) d.revalidateLoop = revalidateLoop(d) proc close*(d: Protocol) {.raises: [Exception, Defect].} = @@ -854,8 +901,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() - if not d.lookupLoop.isNil: - d.lookupLoop.cancel() + if not d.queryLoop.isNil: + d.queryLoop.cancel() d.transp.close() @@ -865,7 +912,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() - if not d.lookupLoop.isNil: - await d.lookupLoop.cancelAndWait() + if not d.queryLoop.isNil: + await d.queryLoop.cancelAndWait() await d.transp.closeWait()