From 68c9b7b3ad91a5632fb02aa7c32caee1777184e6 Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 15 Dec 2020 13:24:57 +0100 Subject: [PATCH 1/5] Fix lookup to sort and query closest nodes --- eth/p2p/discoveryv5/protocol.nim | 38 ++++++++++++++++++--------- eth/p2p/discoveryv5/routing_table.nim | 2 +- tests/p2p/test_discoveryv5.nim | 3 +-- 3 files changed, 28 insertions(+), 15 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index a944f0b..fe538cc 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -73,7 +73,7 @@ ## This might be a concern for mobile devices. import - std/[tables, sets, options, math, sequtils], + std/[tables, sets, options, math, sequtils, algorithm], stew/shims/net as stewNet, json_serialization/std/net, stew/endians2, chronicles, chronos, stint, bearssl, eth/[rlp, keys, async_utils], @@ -633,6 +633,7 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): let dists = lookupDistances(target, destNode.id) var i = 0 # TODO: We can make use of the multiple distances here now. + # Do findNode requests with different distances until we hit limits. while i < lookupRequestLimit and result.len < findNodeResultLimit: let r = await d.findNode(destNode, @[dists[i]]) # TODO: Handle failures better. E.g. stop on different failures than timeout @@ -648,21 +649,25 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async, raises: [Exception, Defect].} = ## Perform a lookup for the given target, return the closest n nodes to the ## target. Maximum value for n is `BUCKET_SIZE`. - # TODO: Sort the returned nodes on distance - # Also use unseen nodes as a form of validation. - result = d.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false) - var asked = initHashSet[NodeId]() - asked.incl(d.localNode.id) - var seen = asked - for node in result: + # `closestNodes` holds the k closest nodes to target found, sorted by distance + # Unvalidated nodes are used for requests as a form of validation. + var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE, + seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in closestNodes: seen.incl(node.id) var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) while true: var i = 0 - while i < result.len and pendingQueries.len < alpha: - let n = result[i] + # Doing `alpha` amount of requests at once as long as closer non queried + # nodes are discovered. + while i < closestNodes.len and pendingQueries.len < alpha: + let n = closestNodes[i] if not asked.containsOrIncl(n.id): pendingQueries.add(d.lookupWorker(n, target)) inc i @@ -682,10 +687,19 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] error "Resulting query should have beeen in the pending queries" let nodes = query.read + # TODO: Remove node on timed-out query? for n in nodes: if not seen.containsOrIncl(n.id): - if result.len < BUCKET_SIZE: - result.add(n) + # If it wasn't seen before, insert node while remaining sorted + closestNodes.insert(n, closestNodes.lowerBound(n, + proc(x: Node, n: Node): int = + cmp(distanceTo(x, target), distanceTo(n, target)) + )) + + if closestNodes.len > BUCKET_SIZE: + closestNodes.del(closestNodes.high()) + + return closestNodes proc lookupRandom*(d: Protocol): Future[seq[Node]] {.async, raises:[Exception, Defect].} = diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 98ecebe..00cb68c 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -87,7 +87,7 @@ const DefaultTableIpLimits* = TableIpLimits(tableIpLimit: DefaultTableIpLimit, bucketIpLimit: DefaultBucketIpLimit) -proc distanceTo(n: Node, id: NodeId): UInt256 = +proc distanceTo*(n: Node, id: NodeId): UInt256 = ## Calculate the distance to a NodeId. n.id xor id diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 98cef69..782ba81 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -256,8 +256,7 @@ procSuite "Discovery v5 Tests": let target = nodes[i] let discovered = await nodes[nodeCount-1].lookup(target.localNode.id) debug "Lookup result", target = target.localNode, discovered - # if lookUp would return ordered on distance we could check discovered[0] - check discovered.contains(target.localNode) + check discovered[0] == target.localNode for node in nodes: await node.closeWait() From e1acc1ae2d8197eb5ba2d883ac933f649c173dbf Mon Sep 17 00:00:00 2001 From: kdeme Date: Wed, 16 Dec 2020 00:07:49 +0100 Subject: [PATCH 2/5] Avoid sending more requests to node that timed out + request metrics --- eth/p2p/discoveryv5/protocol.nim | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index fe538cc..1655c63 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -75,7 +75,7 @@ import std/[tables, sets, options, math, sequtils, algorithm], stew/shims/net as stewNet, json_serialization/std/net, - stew/endians2, chronicles, chronos, stint, bearssl, + stew/endians2, chronicles, chronos, stint, bearssl, metrics, eth/[rlp, keys, async_utils], types, encoding, node, routing_table, enr, random2, sessions @@ -85,13 +85,16 @@ export options {.push raises: [Defect].} +declarePublicGauge discovery_message_requests, + "Discovery protocol message requests", labels = ["response"] + logScope: topics = "discv5" const alpha = 3 ## Kademlia concurrency factor lookupRequestLimit = 3 - findNodeResultLimit = 15 # applies in FINDNODE handler + findNodeResultLimit = 16 # applies in FINDNODE handler maxNodesPerMessage = 3 lookupInterval = 60.seconds ## Interval of launching a random lookup to ## populate the routing table. go-ethereum seems to do 3 runs every 30 @@ -566,6 +569,7 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T): d.registerRequest(toNode, message, nonce) trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T) d.send(toNode, data) + discovery_message_requests.inc() return reqId proc ping*(d: Protocol, toNode: Node): @@ -582,6 +586,7 @@ proc ping*(d: Protocol, toNode: Node): return ok(resp.get().pong) else: d.replaceNode(toNode) + discovery_message_requests.inc(labelValues = ["timed_out"]) return err("Pong message not received in time") proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]): @@ -599,6 +604,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]): return ok(res) else: d.replaceNode(toNode) + discovery_message_requests.inc(labelValues = ["timed_out"]) return err(nodes.error) proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): @@ -615,6 +621,7 @@ proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): return ok(resp.get().talkresp) else: d.replaceNode(toNode) + discovery_message_requests.inc(labelValues = ["timed_out"]) return err("Talk response message not received in time") proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} = @@ -640,7 +647,9 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): if r.isOk: # TODO: I guess it makes sense to limit here also to `findNodeResultLimit`? result.add(r[]) - inc i + inc i + else: + break for n in result: discard d.addNode(n) @@ -684,7 +693,7 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] if index != -1: pendingQueries.del(index) else: - error "Resulting query should have beeen in the pending queries" + error "Resulting query should have been in the pending queries" let nodes = query.read # TODO: Remove node on timed-out query? From a506739b232623870ab780c81effbd3bd664511d Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 17 Dec 2020 15:22:48 +0100 Subject: [PATCH 3/5] Add query proc and use this for recurrent queries This query proc is similar to the original (faulty) lookup proc. But as we don't need to look for specific targets, it can be used still as it gives a quicker and broader search resulting in more nodes. --- eth/p2p/discoveryv5/protocol.nim | 81 +++++++++++++++++++++++++------- 1 file changed, 64 insertions(+), 17 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 1655c63..fd54c5e 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -116,7 +116,7 @@ type routingTable: RoutingTable codec*: Codec awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] - lookupLoop: Future[void] + queryLoop: Future[void] revalidateLoop: Future[void] bootstrapRecords*: seq[Record] rng*: ref BrHmacDrbgContext @@ -710,16 +710,63 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] return closestNodes -proc lookupRandom*(d: Protocol): Future[seq[Node]] +proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] + {.async, raises: [Exception, Defect].} = + ## Query k nodes for the given target, returns all nodes found, including the + ## nodes queried. + ## + ## This will take k nodes from the routing table closest to target and + ## query them for nodes closest to target. If there are less than k nodes in + ## the routing table, nodes returned by the first queries will be used. + var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in queryBuffer: + seen.incl(node.id) + + var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + + while true: + var i = 0 + while i < min(queryBuffer.len, k) and pendingQueries.len < alpha: + let n = queryBuffer[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(d.lookupWorker(n, target)) + inc i + + trace "discv5 pending queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got discv5 lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + else: + error "Resulting query should have been in the pending queries" + + let nodes = query.read + # TODO: Remove node on timed-out query? + for n in nodes: + if not seen.containsOrIncl(n.id): + queryBuffer.add(n) + + return queryBuffer + +proc queryRandom*(d: Protocol): Future[seq[Node]] {.async, raises:[Exception, Defect].} = - ## Perform a lookup for a random target, return the closest n nodes to the - ## target. Maximum value for n is `BUCKET_SIZE`. + ## Perform a query for a random target, return all nodes discovered. var id: NodeId var buf: array[sizeof(id), byte] brHmacDrbgGenerate(d.rng[], buf) copyMem(addr id, addr buf[0], sizeof(id)) - return await d.lookup(id) + return await d.query(id) proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async, raises: [Exception, Defect].} = @@ -770,19 +817,19 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = except CancelledError: trace "revalidateLoop canceled" -proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} = +proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} = # TODO: General Exception raised. try: - # lookup self (neighbour nodes) - let selfLookup = await d.lookup(d.localNode.id) - trace "Discovered nodes in self lookup", nodes = selfLookup + # query target self (neighbour nodes) + let selfQuery = await d.query(d.localNode.id) + trace "Discovered nodes in self target query", nodes = selfQuery.len while true: - let randomLookup = await d.lookupRandom() - trace "Discovered nodes in random lookup", nodes = randomLookup + let randomQuery = await d.queryRandom() + trace "Discovered nodes in random target query", nodes = randomQuery.len debug "Total nodes in discv5 routing table", total = d.routingTable.len() await sleepAsync(lookupInterval) except CancelledError: - trace "lookupLoop canceled" + trace "queryLoop canceled" proc newProtocol*(privKey: PrivateKey, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, @@ -845,7 +892,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} = debug "Bootstrap node could not be added", uri = toURI(record) proc start*(d: Protocol) {.raises: [Exception, Defect].} = - d.lookupLoop = lookupLoop(d) + d.queryLoop = queryLoop(d) d.revalidateLoop = revalidateLoop(d) proc close*(d: Protocol) {.raises: [Exception, Defect].} = @@ -854,8 +901,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() - if not d.lookupLoop.isNil: - d.lookupLoop.cancel() + if not d.queryLoop.isNil: + d.queryLoop.cancel() d.transp.close() @@ -865,7 +912,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() - if not d.lookupLoop.isNil: - await d.lookupLoop.cancelAndWait() + if not d.queryLoop.isNil: + await d.queryLoop.cancelAndWait() await d.transp.closeWait() From 21423fca42a3abf0a0924cd2fba0ae2116a95258 Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 17 Dec 2020 15:42:04 +0100 Subject: [PATCH 4/5] Use of discv5.1 findnode request with multiple distances --- eth/p2p/discoveryv5/protocol.nim | 23 +++++++++-------------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index fd54c5e..cf75a09 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -638,21 +638,16 @@ proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} = proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async, raises: [Exception, Defect].} = let dists = lookupDistances(target, destNode.id) - var i = 0 - # TODO: We can make use of the multiple distances here now. - # Do findNode requests with different distances until we hit limits. - while i < lookupRequestLimit and result.len < findNodeResultLimit: - let r = await d.findNode(destNode, @[dists[i]]) - # TODO: Handle failures better. E.g. stop on different failures than timeout - if r.isOk: - # TODO: I guess it makes sense to limit here also to `findNodeResultLimit`? - result.add(r[]) - inc i - else: - break - for n in result: - discard d.addNode(n) + # Instead of doing max `lookupRequestLimit` findNode requests, make use + # of the discv5.1 functionality to request nodes for multiple distances. + let r = await d.findNode(destNode, dists) + if r.isOk: + result.add(r[]) + + # Attempt to add all nodes discovered + for n in result: + discard d.addNode(n) proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async, raises: [Exception, Defect].} = From 12ec608efd8a96cbaf2c82ad463a9b6a5129b6d4 Mon Sep 17 00:00:00 2001 From: kdeme Date: Thu, 17 Dec 2020 17:06:42 +0100 Subject: [PATCH 5/5] Put a limit on the nodes returned on a findnode request --- eth/p2p/discoveryv5/protocol.nim | 28 +++++++++++++++++++++------- 1 file changed, 21 insertions(+), 7 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index cf75a09..2ce2651 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -93,9 +93,11 @@ logScope: const alpha = 3 ## Kademlia concurrency factor - lookupRequestLimit = 3 - findNodeResultLimit = 16 # applies in FINDNODE handler - maxNodesPerMessage = 3 + lookupRequestLimit = 3 ## Amount of distances requested in a single Findnode + ## message for a lookup or query + findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages + ## that will be processed + maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message lookupInterval = 60.seconds ## Interval of launching a random lookup to ## populate the routing table. go-ethereum seems to do 3 runs every 30 ## minutes. Trinity starts one every minute. @@ -497,12 +499,24 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node, ## Verify and convert ENRs to a sequence of nodes. Only ENRs that pass ## verification will be added. ENRs are verified for duplicates, invalid ## addresses and invalid distances. - # TODO: - # - Should we fail and ignore values on first invalid Node? - # - Should we limit the amount of nodes? The discovery v5 specification holds - # no limit on the amount that can be returned. var seen: HashSet[Node] + var count = 0 for r in enrs: + # Check and allow for processing of maximum `findNodeResultLimit` ENRs + # returned. This limitation is required so no huge lists of invalid ENRs + # are processed for no reason, and for not overwhelming a routing table + # with nodes from a malicious actor. + # The discovery v5 specification specifies no limit on the amount of ENRs + # that can be returned, but clients usually stick with the bucket size limit + # as in original Kademlia. Because of this it is chosen not to fail + # immediatly, but still process maximum `findNodeResultLimit`. + if count >= findNodeResultLimit: + debug "Response on findnode returned too many ENRs", enrs = enrs.len(), + limit = findNodeResultLimit, sender = fromNode.record.toURI + break + + count.inc() + let node = newNode(r) if node.isOk(): let n = node.get()