Add query proc and use this for recurrent queries

This query proc is similar to the original (faulty) lookup proc.
But as we don't need to look for specific targets, it can be
used still as it gives a quicker and broader search resulting
in more nodes.
This commit is contained in:
kdeme 2020-12-17 15:22:48 +01:00
parent e1acc1ae2d
commit a506739b23
No known key found for this signature in database
GPG Key ID: 4E8DD21420AF43F5
1 changed files with 64 additions and 17 deletions

View File

@ -116,7 +116,7 @@ type
routingTable: RoutingTable routingTable: RoutingTable
codec*: Codec codec*: Codec
awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]]
lookupLoop: Future[void] queryLoop: Future[void]
revalidateLoop: Future[void] revalidateLoop: Future[void]
bootstrapRecords*: seq[Record] bootstrapRecords*: seq[Record]
rng*: ref BrHmacDrbgContext rng*: ref BrHmacDrbgContext
@ -710,16 +710,63 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
return closestNodes return closestNodes
proc lookupRandom*(d: Protocol): Future[seq[Node]] proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
{.async, raises: [Exception, Defect].} =
## Query k nodes for the given target, returns all nodes found, including the
## nodes queried.
##
## This will take k nodes from the routing table closest to target and
## query them for nodes closest to target. If there are less than k nodes in
## the routing table, nodes returned by the first queries will be used.
var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false)
var asked, seen = initHashSet[NodeId]()
asked.incl(d.localNode.id) # No need to ask our own node
seen.incl(d.localNode.id) # No need to discover our own node
for node in queryBuffer:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
while true:
var i = 0
while i < min(queryBuffer.len, k) and pendingQueries.len < alpha:
let n = queryBuffer[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(d.lookupWorker(n, target))
inc i
trace "discv5 pending queries", total = pendingQueries.len
if pendingQueries.len == 0:
break
let query = await one(pendingQueries)
trace "Got discv5 lookup query response"
let index = pendingQueries.find(query)
if index != -1:
pendingQueries.del(index)
else:
error "Resulting query should have been in the pending queries"
let nodes = query.read
# TODO: Remove node on timed-out query?
for n in nodes:
if not seen.containsOrIncl(n.id):
queryBuffer.add(n)
return queryBuffer
proc queryRandom*(d: Protocol): Future[seq[Node]]
{.async, raises:[Exception, Defect].} = {.async, raises:[Exception, Defect].} =
## Perform a lookup for a random target, return the closest n nodes to the ## Perform a query for a random target, return all nodes discovered.
## target. Maximum value for n is `BUCKET_SIZE`.
var id: NodeId var id: NodeId
var buf: array[sizeof(id), byte] var buf: array[sizeof(id), byte]
brHmacDrbgGenerate(d.rng[], buf) brHmacDrbgGenerate(d.rng[], buf)
copyMem(addr id, addr buf[0], sizeof(id)) copyMem(addr id, addr buf[0], sizeof(id))
return await d.lookup(id) return await d.query(id)
proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
{.async, raises: [Exception, Defect].} = {.async, raises: [Exception, Defect].} =
@ -770,19 +817,19 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
except CancelledError: except CancelledError:
trace "revalidateLoop canceled" trace "revalidateLoop canceled"
proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} = proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised. # TODO: General Exception raised.
try: try:
# lookup self (neighbour nodes) # query target self (neighbour nodes)
let selfLookup = await d.lookup(d.localNode.id) let selfQuery = await d.query(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = selfLookup trace "Discovered nodes in self target query", nodes = selfQuery.len
while true: while true:
let randomLookup = await d.lookupRandom() let randomQuery = await d.queryRandom()
trace "Discovered nodes in random lookup", nodes = randomLookup trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in discv5 routing table", total = d.routingTable.len() debug "Total nodes in discv5 routing table", total = d.routingTable.len()
await sleepAsync(lookupInterval) await sleepAsync(lookupInterval)
except CancelledError: except CancelledError:
trace "lookupLoop canceled" trace "queryLoop canceled"
proc newProtocol*(privKey: PrivateKey, proc newProtocol*(privKey: PrivateKey,
externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port,
@ -845,7 +892,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} =
debug "Bootstrap node could not be added", uri = toURI(record) debug "Bootstrap node could not be added", uri = toURI(record)
proc start*(d: Protocol) {.raises: [Exception, Defect].} = proc start*(d: Protocol) {.raises: [Exception, Defect].} =
d.lookupLoop = lookupLoop(d) d.queryLoop = queryLoop(d)
d.revalidateLoop = revalidateLoop(d) d.revalidateLoop = revalidateLoop(d)
proc close*(d: Protocol) {.raises: [Exception, Defect].} = proc close*(d: Protocol) {.raises: [Exception, Defect].} =
@ -854,8 +901,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil: if not d.revalidateLoop.isNil:
d.revalidateLoop.cancel() d.revalidateLoop.cancel()
if not d.lookupLoop.isNil: if not d.queryLoop.isNil:
d.lookupLoop.cancel() d.queryLoop.cancel()
d.transp.close() d.transp.close()
@ -865,7 +912,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil: if not d.revalidateLoop.isNil:
await d.revalidateLoop.cancelAndWait() await d.revalidateLoop.cancelAndWait()
if not d.lookupLoop.isNil: if not d.queryLoop.isNil:
await d.lookupLoop.cancelAndWait() await d.queryLoop.cancelAndWait()
await d.transp.closeWait() await d.transp.closeWait()