Merge pull request #318 from status-im/lookup-adjust

Fix lookup to sort and query closest nodes
This commit is contained in:
Kim De Mey 2020-12-18 14:24:50 +01:00 committed by GitHub
commit a945eacfd3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 130 additions and 52 deletions

View File

@ -73,9 +73,9 @@
## This might be a concern for mobile devices.
import
std/[tables, sets, options, math, sequtils],
std/[tables, sets, options, math, sequtils, algorithm],
stew/shims/net as stewNet, json_serialization/std/net,
stew/endians2, chronicles, chronos, stint, bearssl,
stew/endians2, chronicles, chronos, stint, bearssl, metrics,
eth/[rlp, keys, async_utils],
types, encoding, node, routing_table, enr, random2, sessions
@ -85,14 +85,19 @@ export options
{.push raises: [Defect].}
declarePublicGauge discovery_message_requests,
"Discovery protocol message requests", labels = ["response"]
logScope:
topics = "discv5"
const
alpha = 3 ## Kademlia concurrency factor
lookupRequestLimit = 3
findNodeResultLimit = 15 # applies in FINDNODE handler
maxNodesPerMessage = 3
lookupRequestLimit = 3 ## Amount of distances requested in a single Findnode
## message for a lookup or query
findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages
## that will be processed
maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message
lookupInterval = 60.seconds ## Interval of launching a random lookup to
## populate the routing table. go-ethereum seems to do 3 runs every 30
## minutes. Trinity starts one every minute.
@ -113,7 +118,7 @@ type
routingTable: RoutingTable
codec*: Codec
awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]]
lookupLoop: Future[void]
queryLoop: Future[void]
revalidateLoop: Future[void]
bootstrapRecords*: seq[Record]
rng*: ref BrHmacDrbgContext
@ -494,12 +499,24 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node,
## Verify and convert ENRs to a sequence of nodes. Only ENRs that pass
## verification will be added. ENRs are verified for duplicates, invalid
## addresses and invalid distances.
# TODO:
# - Should we fail and ignore values on first invalid Node?
# - Should we limit the amount of nodes? The discovery v5 specification holds
# no limit on the amount that can be returned.
var seen: HashSet[Node]
var count = 0
for r in enrs:
# Check and allow for processing of maximum `findNodeResultLimit` ENRs
# returned. This limitation is required so no huge lists of invalid ENRs
# are processed for no reason, and for not overwhelming a routing table
# with nodes from a malicious actor.
# The discovery v5 specification specifies no limit on the amount of ENRs
# that can be returned, but clients usually stick with the bucket size limit
# as in original Kademlia. Because of this it is chosen not to fail
# immediatly, but still process maximum `findNodeResultLimit`.
if count >= findNodeResultLimit:
debug "Response on findnode returned too many ENRs", enrs = enrs.len(),
limit = findNodeResultLimit, sender = fromNode.record.toURI
break
count.inc()
let node = newNode(r)
if node.isOk():
let n = node.get()
@ -566,6 +583,7 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
d.registerRequest(toNode, message, nonce)
trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T)
d.send(toNode, data)
discovery_message_requests.inc()
return reqId
proc ping*(d: Protocol, toNode: Node):
@ -582,6 +600,7 @@ proc ping*(d: Protocol, toNode: Node):
return ok(resp.get().pong)
else:
d.replaceNode(toNode)
discovery_message_requests.inc(labelValues = ["timed_out"])
return err("Pong message not received in time")
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]):
@ -599,6 +618,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]):
return ok(res)
else:
d.replaceNode(toNode)
discovery_message_requests.inc(labelValues = ["timed_out"])
return err(nodes.error)
proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
@ -615,6 +635,7 @@ proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
return ok(resp.get().talkresp)
else:
d.replaceNode(toNode)
discovery_message_requests.inc(labelValues = ["timed_out"])
return err("Talk response message not received in time")
proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} =
@ -631,38 +652,40 @@ proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} =
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
Future[seq[Node]] {.async, raises: [Exception, Defect].} =
let dists = lookupDistances(target, destNode.id)
var i = 0
# TODO: We can make use of the multiple distances here now.
while i < lookupRequestLimit and result.len < findNodeResultLimit:
let r = await d.findNode(destNode, @[dists[i]])
# TODO: Handle failures better. E.g. stop on different failures than timeout
if r.isOk:
# TODO: I guess it makes sense to limit here also to `findNodeResultLimit`?
result.add(r[])
inc i
for n in result:
discard d.addNode(n)
# Instead of doing max `lookupRequestLimit` findNode requests, make use
# of the discv5.1 functionality to request nodes for multiple distances.
let r = await d.findNode(destNode, dists)
if r.isOk:
result.add(r[])
# Attempt to add all nodes discovered
for n in result:
discard d.addNode(n)
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
{.async, raises: [Exception, Defect].} =
## Perform a lookup for the given target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
# TODO: Sort the returned nodes on distance
# Also use unseen nodes as a form of validation.
result = d.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false)
var asked = initHashSet[NodeId]()
asked.incl(d.localNode.id)
var seen = asked
for node in result:
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
seenOnly = false)
var asked, seen = initHashSet[NodeId]()
asked.incl(d.localNode.id) # No need to ask our own node
seen.incl(d.localNode.id) # No need to discover our own node
for node in closestNodes:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
while true:
var i = 0
while i < result.len and pendingQueries.len < alpha:
let n = result[i]
# Doing `alpha` amount of requests at once as long as closer non queried
# nodes are discovered.
while i < closestNodes.len and pendingQueries.len < alpha:
let n = closestNodes[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(d.lookupWorker(n, target))
inc i
@ -679,24 +702,80 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
if index != -1:
pendingQueries.del(index)
else:
error "Resulting query should have beeen in the pending queries"
error "Resulting query should have been in the pending queries"
let nodes = query.read
# TODO: Remove node on timed-out query?
for n in nodes:
if not seen.containsOrIncl(n.id):
if result.len < BUCKET_SIZE:
result.add(n)
# If it wasn't seen before, insert node while remaining sorted
closestNodes.insert(n, closestNodes.lowerBound(n,
proc(x: Node, n: Node): int =
cmp(distanceTo(x, target), distanceTo(n, target))
))
proc lookupRandom*(d: Protocol): Future[seq[Node]]
if closestNodes.len > BUCKET_SIZE:
closestNodes.del(closestNodes.high())
return closestNodes
proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
{.async, raises: [Exception, Defect].} =
## Query k nodes for the given target, returns all nodes found, including the
## nodes queried.
##
## This will take k nodes from the routing table closest to target and
## query them for nodes closest to target. If there are less than k nodes in
## the routing table, nodes returned by the first queries will be used.
var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false)
var asked, seen = initHashSet[NodeId]()
asked.incl(d.localNode.id) # No need to ask our own node
seen.incl(d.localNode.id) # No need to discover our own node
for node in queryBuffer:
seen.incl(node.id)
var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha)
while true:
var i = 0
while i < min(queryBuffer.len, k) and pendingQueries.len < alpha:
let n = queryBuffer[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(d.lookupWorker(n, target))
inc i
trace "discv5 pending queries", total = pendingQueries.len
if pendingQueries.len == 0:
break
let query = await one(pendingQueries)
trace "Got discv5 lookup query response"
let index = pendingQueries.find(query)
if index != -1:
pendingQueries.del(index)
else:
error "Resulting query should have been in the pending queries"
let nodes = query.read
# TODO: Remove node on timed-out query?
for n in nodes:
if not seen.containsOrIncl(n.id):
queryBuffer.add(n)
return queryBuffer
proc queryRandom*(d: Protocol): Future[seq[Node]]
{.async, raises:[Exception, Defect].} =
## Perform a lookup for a random target, return the closest n nodes to the
## target. Maximum value for n is `BUCKET_SIZE`.
## Perform a query for a random target, return all nodes discovered.
var id: NodeId
var buf: array[sizeof(id), byte]
brHmacDrbgGenerate(d.rng[], buf)
copyMem(addr id, addr buf[0], sizeof(id))
return await d.lookup(id)
return await d.query(id)
proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]]
{.async, raises: [Exception, Defect].} =
@ -747,19 +826,19 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
except CancelledError:
trace "revalidateLoop canceled"
proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} =
# TODO: General Exception raised.
try:
# lookup self (neighbour nodes)
let selfLookup = await d.lookup(d.localNode.id)
trace "Discovered nodes in self lookup", nodes = selfLookup
# query target self (neighbour nodes)
let selfQuery = await d.query(d.localNode.id)
trace "Discovered nodes in self target query", nodes = selfQuery.len
while true:
let randomLookup = await d.lookupRandom()
trace "Discovered nodes in random lookup", nodes = randomLookup
let randomQuery = await d.queryRandom()
trace "Discovered nodes in random target query", nodes = randomQuery.len
debug "Total nodes in discv5 routing table", total = d.routingTable.len()
await sleepAsync(lookupInterval)
except CancelledError:
trace "lookupLoop canceled"
trace "queryLoop canceled"
proc newProtocol*(privKey: PrivateKey,
externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port,
@ -822,7 +901,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} =
debug "Bootstrap node could not be added", uri = toURI(record)
proc start*(d: Protocol) {.raises: [Exception, Defect].} =
d.lookupLoop = lookupLoop(d)
d.queryLoop = queryLoop(d)
d.revalidateLoop = revalidateLoop(d)
proc close*(d: Protocol) {.raises: [Exception, Defect].} =
@ -831,8 +910,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
d.revalidateLoop.cancel()
if not d.lookupLoop.isNil:
d.lookupLoop.cancel()
if not d.queryLoop.isNil:
d.queryLoop.cancel()
d.transp.close()
@ -842,7 +921,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} =
debug "Closing discovery node", node = d.localNode
if not d.revalidateLoop.isNil:
await d.revalidateLoop.cancelAndWait()
if not d.lookupLoop.isNil:
await d.lookupLoop.cancelAndWait()
if not d.queryLoop.isNil:
await d.queryLoop.cancelAndWait()
await d.transp.closeWait()

View File

@ -87,7 +87,7 @@ const
DefaultTableIpLimits* = TableIpLimits(tableIpLimit: DefaultTableIpLimit,
bucketIpLimit: DefaultBucketIpLimit)
proc distanceTo(n: Node, id: NodeId): UInt256 =
proc distanceTo*(n: Node, id: NodeId): UInt256 =
## Calculate the distance to a NodeId.
n.id xor id

View File

@ -256,8 +256,7 @@ procSuite "Discovery v5 Tests":
let target = nodes[i]
let discovered = await nodes[nodeCount-1].lookup(target.localNode.id)
debug "Lookup result", target = target.localNode, discovered
# if lookUp would return ordered on distance we could check discovered[0]
check discovered.contains(target.localNode)
check discovered[0] == target.localNode
for node in nodes:
await node.closeWait()