diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index a944f0b..2ce2651 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -73,9 +73,9 @@ ## This might be a concern for mobile devices. import - std/[tables, sets, options, math, sequtils], + std/[tables, sets, options, math, sequtils, algorithm], stew/shims/net as stewNet, json_serialization/std/net, - stew/endians2, chronicles, chronos, stint, bearssl, + stew/endians2, chronicles, chronos, stint, bearssl, metrics, eth/[rlp, keys, async_utils], types, encoding, node, routing_table, enr, random2, sessions @@ -85,14 +85,19 @@ export options {.push raises: [Defect].} +declarePublicGauge discovery_message_requests, + "Discovery protocol message requests", labels = ["response"] + logScope: topics = "discv5" const alpha = 3 ## Kademlia concurrency factor - lookupRequestLimit = 3 - findNodeResultLimit = 15 # applies in FINDNODE handler - maxNodesPerMessage = 3 + lookupRequestLimit = 3 ## Amount of distances requested in a single Findnode + ## message for a lookup or query + findNodeResultLimit = 16 ## Maximum amount of ENRs in the total Nodes messages + ## that will be processed + maxNodesPerMessage = 3 ## Maximum amount of ENRs per individual Nodes message lookupInterval = 60.seconds ## Interval of launching a random lookup to ## populate the routing table. go-ethereum seems to do 3 runs every 30 ## minutes. Trinity starts one every minute. @@ -113,7 +118,7 @@ type routingTable: RoutingTable codec*: Codec awaitedMessages: Table[(NodeId, RequestId), Future[Option[Message]]] - lookupLoop: Future[void] + queryLoop: Future[void] revalidateLoop: Future[void] bootstrapRecords*: seq[Record] rng*: ref BrHmacDrbgContext @@ -494,12 +499,24 @@ proc verifyNodesRecords*(enrs: openarray[Record], fromNode: Node, ## Verify and convert ENRs to a sequence of nodes. Only ENRs that pass ## verification will be added. ENRs are verified for duplicates, invalid ## addresses and invalid distances. - # TODO: - # - Should we fail and ignore values on first invalid Node? - # - Should we limit the amount of nodes? The discovery v5 specification holds - # no limit on the amount that can be returned. var seen: HashSet[Node] + var count = 0 for r in enrs: + # Check and allow for processing of maximum `findNodeResultLimit` ENRs + # returned. This limitation is required so no huge lists of invalid ENRs + # are processed for no reason, and for not overwhelming a routing table + # with nodes from a malicious actor. + # The discovery v5 specification specifies no limit on the amount of ENRs + # that can be returned, but clients usually stick with the bucket size limit + # as in original Kademlia. Because of this it is chosen not to fail + # immediatly, but still process maximum `findNodeResultLimit`. + if count >= findNodeResultLimit: + debug "Response on findnode returned too many ENRs", enrs = enrs.len(), + limit = findNodeResultLimit, sender = fromNode.record.toURI + break + + count.inc() + let node = newNode(r) if node.isOk(): let n = node.get() @@ -566,6 +583,7 @@ proc sendMessage*[T: SomeMessage](d: Protocol, toNode: Node, m: T): d.registerRequest(toNode, message, nonce) trace "Send message packet", dstId = toNode.id, address, kind = messageKind(T) d.send(toNode, data) + discovery_message_requests.inc() return reqId proc ping*(d: Protocol, toNode: Node): @@ -582,6 +600,7 @@ proc ping*(d: Protocol, toNode: Node): return ok(resp.get().pong) else: d.replaceNode(toNode) + discovery_message_requests.inc(labelValues = ["timed_out"]) return err("Pong message not received in time") proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]): @@ -599,6 +618,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint32]): return ok(res) else: d.replaceNode(toNode) + discovery_message_requests.inc(labelValues = ["timed_out"]) return err(nodes.error) proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): @@ -615,6 +635,7 @@ proc talkreq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): return ok(resp.get().talkresp) else: d.replaceNode(toNode) + discovery_message_requests.inc(labelValues = ["timed_out"]) return err("Talk response message not received in time") proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} = @@ -631,38 +652,40 @@ proc lookupDistances(target, dest: NodeId): seq[uint32] {.raises: [Defect].} = proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async, raises: [Exception, Defect].} = let dists = lookupDistances(target, destNode.id) - var i = 0 - # TODO: We can make use of the multiple distances here now. - while i < lookupRequestLimit and result.len < findNodeResultLimit: - let r = await d.findNode(destNode, @[dists[i]]) - # TODO: Handle failures better. E.g. stop on different failures than timeout - if r.isOk: - # TODO: I guess it makes sense to limit here also to `findNodeResultLimit`? - result.add(r[]) - inc i - for n in result: - discard d.addNode(n) + # Instead of doing max `lookupRequestLimit` findNode requests, make use + # of the discv5.1 functionality to request nodes for multiple distances. + let r = await d.findNode(destNode, dists) + if r.isOk: + result.add(r[]) + + # Attempt to add all nodes discovered + for n in result: + discard d.addNode(n) proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] {.async, raises: [Exception, Defect].} = ## Perform a lookup for the given target, return the closest n nodes to the ## target. Maximum value for n is `BUCKET_SIZE`. - # TODO: Sort the returned nodes on distance - # Also use unseen nodes as a form of validation. - result = d.routingTable.neighbours(target, BUCKET_SIZE, seenOnly = false) - var asked = initHashSet[NodeId]() - asked.incl(d.localNode.id) - var seen = asked - for node in result: + # `closestNodes` holds the k closest nodes to target found, sorted by distance + # Unvalidated nodes are used for requests as a form of validation. + var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE, + seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in closestNodes: seen.incl(node.id) var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) while true: var i = 0 - while i < result.len and pendingQueries.len < alpha: - let n = result[i] + # Doing `alpha` amount of requests at once as long as closer non queried + # nodes are discovered. + while i < closestNodes.len and pendingQueries.len < alpha: + let n = closestNodes[i] if not asked.containsOrIncl(n.id): pendingQueries.add(d.lookupWorker(n, target)) inc i @@ -679,24 +702,80 @@ proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]] if index != -1: pendingQueries.del(index) else: - error "Resulting query should have beeen in the pending queries" + error "Resulting query should have been in the pending queries" let nodes = query.read + # TODO: Remove node on timed-out query? for n in nodes: if not seen.containsOrIncl(n.id): - if result.len < BUCKET_SIZE: - result.add(n) + # If it wasn't seen before, insert node while remaining sorted + closestNodes.insert(n, closestNodes.lowerBound(n, + proc(x: Node, n: Node): int = + cmp(distanceTo(x, target), distanceTo(n, target)) + )) -proc lookupRandom*(d: Protocol): Future[seq[Node]] + if closestNodes.len > BUCKET_SIZE: + closestNodes.del(closestNodes.high()) + + return closestNodes + +proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] + {.async, raises: [Exception, Defect].} = + ## Query k nodes for the given target, returns all nodes found, including the + ## nodes queried. + ## + ## This will take k nodes from the routing table closest to target and + ## query them for nodes closest to target. If there are less than k nodes in + ## the routing table, nodes returned by the first queries will be used. + var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false) + + var asked, seen = initHashSet[NodeId]() + asked.incl(d.localNode.id) # No need to ask our own node + seen.incl(d.localNode.id) # No need to discover our own node + for node in queryBuffer: + seen.incl(node.id) + + var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + + while true: + var i = 0 + while i < min(queryBuffer.len, k) and pendingQueries.len < alpha: + let n = queryBuffer[i] + if not asked.containsOrIncl(n.id): + pendingQueries.add(d.lookupWorker(n, target)) + inc i + + trace "discv5 pending queries", total = pendingQueries.len + + if pendingQueries.len == 0: + break + + let query = await one(pendingQueries) + trace "Got discv5 lookup query response" + + let index = pendingQueries.find(query) + if index != -1: + pendingQueries.del(index) + else: + error "Resulting query should have been in the pending queries" + + let nodes = query.read + # TODO: Remove node on timed-out query? + for n in nodes: + if not seen.containsOrIncl(n.id): + queryBuffer.add(n) + + return queryBuffer + +proc queryRandom*(d: Protocol): Future[seq[Node]] {.async, raises:[Exception, Defect].} = - ## Perform a lookup for a random target, return the closest n nodes to the - ## target. Maximum value for n is `BUCKET_SIZE`. + ## Perform a query for a random target, return all nodes discovered. var id: NodeId var buf: array[sizeof(id), byte] brHmacDrbgGenerate(d.rng[], buf) copyMem(addr id, addr buf[0], sizeof(id)) - return await d.lookup(id) + return await d.query(id) proc resolve*(d: Protocol, id: NodeId): Future[Option[Node]] {.async, raises: [Exception, Defect].} = @@ -747,19 +826,19 @@ proc revalidateLoop(d: Protocol) {.async, raises: [Exception, Defect].} = except CancelledError: trace "revalidateLoop canceled" -proc lookupLoop(d: Protocol) {.async, raises: [Exception, Defect].} = +proc queryLoop(d: Protocol) {.async, raises: [Exception, Defect].} = # TODO: General Exception raised. try: - # lookup self (neighbour nodes) - let selfLookup = await d.lookup(d.localNode.id) - trace "Discovered nodes in self lookup", nodes = selfLookup + # query target self (neighbour nodes) + let selfQuery = await d.query(d.localNode.id) + trace "Discovered nodes in self target query", nodes = selfQuery.len while true: - let randomLookup = await d.lookupRandom() - trace "Discovered nodes in random lookup", nodes = randomLookup + let randomQuery = await d.queryRandom() + trace "Discovered nodes in random target query", nodes = randomQuery.len debug "Total nodes in discv5 routing table", total = d.routingTable.len() await sleepAsync(lookupInterval) except CancelledError: - trace "lookupLoop canceled" + trace "queryLoop canceled" proc newProtocol*(privKey: PrivateKey, externalIp: Option[ValidIpAddress], tcpPort, udpPort: Port, @@ -822,7 +901,7 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} = debug "Bootstrap node could not be added", uri = toURI(record) proc start*(d: Protocol) {.raises: [Exception, Defect].} = - d.lookupLoop = lookupLoop(d) + d.queryLoop = queryLoop(d) d.revalidateLoop = revalidateLoop(d) proc close*(d: Protocol) {.raises: [Exception, Defect].} = @@ -831,8 +910,8 @@ proc close*(d: Protocol) {.raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: d.revalidateLoop.cancel() - if not d.lookupLoop.isNil: - d.lookupLoop.cancel() + if not d.queryLoop.isNil: + d.queryLoop.cancel() d.transp.close() @@ -842,7 +921,7 @@ proc closeWait*(d: Protocol) {.async, raises: [Exception, Defect].} = debug "Closing discovery node", node = d.localNode if not d.revalidateLoop.isNil: await d.revalidateLoop.cancelAndWait() - if not d.lookupLoop.isNil: - await d.lookupLoop.cancelAndWait() + if not d.queryLoop.isNil: + await d.queryLoop.cancelAndWait() await d.transp.closeWait() diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 98ecebe..00cb68c 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -87,7 +87,7 @@ const DefaultTableIpLimits* = TableIpLimits(tableIpLimit: DefaultTableIpLimit, bucketIpLimit: DefaultBucketIpLimit) -proc distanceTo(n: Node, id: NodeId): UInt256 = +proc distanceTo*(n: Node, id: NodeId): UInt256 = ## Calculate the distance to a NodeId. n.id xor id diff --git a/tests/p2p/test_discoveryv5.nim b/tests/p2p/test_discoveryv5.nim index 98cef69..782ba81 100644 --- a/tests/p2p/test_discoveryv5.nim +++ b/tests/p2p/test_discoveryv5.nim @@ -256,8 +256,7 @@ procSuite "Discovery v5 Tests": let target = nodes[i] let discovered = await nodes[nodeCount-1].lookup(target.localNode.id) debug "Lookup result", target = target.localNode, discovered - # if lookUp would return ordered on distance we could check discovered[0] - check discovered.contains(target.localNode) + check discovered[0] == target.localNode for node in nodes: await node.closeWait()