diff --git a/codexdht/private/eth/p2p/discoveryv5/node.nim b/codexdht/private/eth/p2p/discoveryv5/node.nim index 1bfacf8..2f3d6c9 100644 --- a/codexdht/private/eth/p2p/discoveryv5/node.nim +++ b/codexdht/private/eth/p2p/discoveryv5/node.nim @@ -22,6 +22,7 @@ export stint const avgSmoothingFactor = 0.9 + seenSmoothingFactor = 0.9 type NodeId* = UInt256 @@ -41,9 +42,10 @@ type pubkey*: PublicKey address*: Option[Address] record*: SignedPeerRecord - seen*: bool ## Indicates if there was at least one successful + seen*: float ## Indicates if there was at least one successful ## request-response with this node, or if the nde was verified - ## through the underlying transport mechanisms. + ## through the underlying transport mechanisms. After first contact + ## it tracks how reliable is the communication with the node. stats*: Stats # traffic measurements and statistics func toNodeId*(pid: PeerId): NodeId = @@ -193,6 +195,18 @@ func shortLog*(address: Address): string = chronicles.formatIt(Address): shortLog(it) +func registerSeen*(n:Node, seen = true) = + ## Register event of seeing (getting message from) or not seeing (missing message) node + ## Note: interpretation might depend on NAT type + if n.seen == 0: # first time seeing the node + n.seen = 1 + else: + n.seen = seenSmoothingFactor * n.seen + (1.0 - seenSmoothingFactor) * seen.float + +func alreadySeen*(n:Node) : bool = + ## Was the node seen at least once? + n.seen > 0 + # collecting performane metrics func registerRtt*(n: Node, rtt: Duration) = ## register an RTT measurement diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 0ac76d9..77afee3 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -133,6 +133,10 @@ const MaxProvidersEntries* = 1_000_000 # one million records MaxProvidersPerEntry* = 20 # providers per entry ## call + FindnodeSeenThreshold = 1.0 ## threshold used as findnode response filter + LookupSeenThreshold = 0.0 ## threshold used for lookup nodeset selection + QuerySeenThreshold = 0.0 ## threshold used for query nodeset selection + NoreplyRemoveThreshold = 0.5 ## remove node on no reply if 'seen' is below this value func shortLog*(record: SignedPeerRecord): string = ## Returns compact string representation of ``SignedPeerRecord``. @@ -249,14 +253,14 @@ proc randomNodes*(d: Protocol, maxAmount: int, d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField)) proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE, - seenOnly = false): seq[Node] = + seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours (closest node ids) of the given node id. - d.routingTable.neighbours(id, k, seenOnly) + d.routingTable.neighbours(id, k, seenThreshold) proc neighboursAtDistances*(d: Protocol, distances: seq[uint16], - k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours (closest node ids) at given distances. - d.routingTable.neighboursAtDistances(distances, k, seenOnly) + d.routingTable.neighboursAtDistances(distances, k, seenThreshold) proc nodesDiscovered*(d: Protocol): int = d.routingTable.len @@ -344,7 +348,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, # TODO: Still deduplicate also? if fn.distances.all(proc (x: uint16): bool = return x <= 256): d.sendNodes(fromId, fromAddr, reqId, - d.routingTable.neighboursAtDistances(fn.distances, seenOnly = true, k = FindNodeResultLimit)) + d.routingTable.neighboursAtDistances(fn.distances, FindNodeResultLimit, FindnodeSeenThreshold)) else: # At least one invalid distance, but the polite node we are, still respond # with empty nodes. @@ -353,7 +357,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, proc handleFindNodeFast(d: Protocol, fromId: NodeId, fromAddr: Address, fnf: FindNodeFastMessage, reqId: RequestId) = d.sendNodes(fromId, fromAddr, reqId, - d.routingTable.neighbours(fnf.target, seenOnly = true, k = FindNodeFastResultLimit)) + d.routingTable.neighbours(fnf.target, FindNodeFastResultLimit, FindnodeSeenThreshold)) # TODO: if known, maybe we should add exact target even if not yet "seen" proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, @@ -449,9 +453,9 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte], else: ok() -proc replaceNode(d: Protocol, n: Node) = +proc replaceNode(d: Protocol, n: Node, forceRemoveBelow = 1.0) = if n.record notin d.bootstrapRecords: - d.routingTable.replaceNode(n) + d.routingTable.replaceNode(n, forceRemoveBelow) else: # For now we never remove bootstrap nodes. It might make sense to actually # do so and to retry them only in case we drop to a really low amount of @@ -550,16 +554,20 @@ proc ping*(d: Protocol, toNode: Node): # trace "ping RTT:", rtt, node = toNode toNode.registerRtt(rtt) + d.routingTable.setJustSeen(toNode, resp.isSome()) if resp.isSome(): if resp.get().kind == pong: - d.routingTable.setJustSeen(toNode) return ok(resp.get().pong) else: d.replaceNode(toNode) dht_message_requests_outgoing.inc(labelValues = ["invalid_response"]) return err("Invalid response to ping message") else: - d.replaceNode(toNode) + # A ping (or the pong) was lost, what should we do? Previous implementation called + # d.replaceNode(toNode) immediately, which removed the node. This is too aggressive, + # especially if we have a temporary network outage. Although bootstrap nodes are protected + # from being removed, everything else would slowly be removed. + d.replaceNode(toNode, NoreplyRemoveThreshold) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Pong message not received in time") @@ -573,9 +581,9 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): msg = FindNodeMessage(distances: distances) nodes = await d.waitNodeResponses(toNode, msg) + d.routingTable.setJustSeen(toNode, nodes.isOk) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances) - d.routingTable.setJustSeen(toNode) return ok(res) else: trace "findNode nodes not OK." @@ -592,9 +600,9 @@ proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId): msg = FindNodeFastMessage(target: target) nodes = await d.waitNodeResponses(toNode, msg) + d.routingTable.setJustSeen(toNode, nodes.isOk) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeFastResultLimit) - d.routingTable.setJustSeen(toNode) return ok(res) else: d.replaceNode(toNode) @@ -614,16 +622,17 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): # trace "talk RTT:", rtt, node = toNode toNode.registerRtt(rtt) + d.routingTable.setJustSeen(toNode, resp.isSome()) if resp.isSome(): if resp.get().kind == talkResp: - d.routingTable.setJustSeen(toNode) return ok(resp.get().talkResp.response) else: d.replaceNode(toNode) dht_message_requests_outgoing.inc(labelValues = ["invalid_response"]) return err("Invalid response to talk request message") else: - d.replaceNode(toNode) + # remove on loss only if there is a replacement + d.replaceNode(toNode, NoreplyRemoveThreshold) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Talk response message not received in time") @@ -664,7 +673,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] # `closestNodes` holds the k closest nodes to target found, sorted by distance # Unvalidated nodes are used for requests as a form of validation. var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE, - seenOnly = false) + LookupSeenThreshold) var asked, seen = initHashSet[NodeId]() asked.incl(d.localNode.id) # No need to ask our own node @@ -742,9 +751,9 @@ proc sendGetProviders(d: Protocol, toNode: Node, let resp = await d.waitResponse(toNode, msg) + d.routingTable.setJustSeen(toNode, resp.isSome()) if resp.isSome(): if resp.get().kind == MessageKind.providers: - d.routingTable.setJustSeen(toNode) return ok(resp.get().provs) else: # TODO: do we need to do something when there is an invalid response? @@ -752,8 +761,8 @@ proc sendGetProviders(d: Protocol, toNode: Node, dht_message_requests_outgoing.inc(labelValues = ["invalid_response"]) return err("Invalid response to GetProviders message") else: - # TODO: do we need to do something when there is no response? - d.replaceNode(toNode) + # remove on loss only if there is a replacement + d.replaceNode(toNode, NoreplyRemoveThreshold) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("GetProviders response message not received in time") @@ -827,7 +836,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] ## This will take k nodes from the routing table closest to target and ## query them for nodes closest to target. If there are less than k nodes in ## the routing table, nodes returned by the first queries will be used. - var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false) + var queryBuffer = d.routingTable.neighbours(target, k, QuerySeenThreshold) var asked, seen = initHashSet[NodeId]() asked.incl(d.localNode.id) # No need to ask our own node @@ -1047,7 +1056,8 @@ proc debugPrintLoop(d: Protocol) {.async.} = debug "bucket", depth = b.getDepth, len = b.nodes.len, standby = b.replacementLen for n in b.nodes: - debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int + debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int, + reliability = n.seen.round(3) # bandwidth estimates are based on limited information, so not logging it yet to avoid confusion # trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3) diff --git a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim index dcfa75c..3ad0b8c 100644 --- a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -218,7 +218,7 @@ proc remove(k: KBucket, n: Node): bool = let i = k.nodes.find(n) if i != -1: dht_routing_table_nodes.dec() - if k.nodes[i].seen: + if alreadySeen(k.nodes[i]): dht_routing_table_nodes.dec(labelValues = ["seen"]) k.nodes.delete(i) trace "removed node:", node = n @@ -431,27 +431,31 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus = proc removeNode*(r: var RoutingTable, n: Node) = ## Remove the node `n` from the routing table. + ## No replemennt added, even if there is in replacement cache. let b = r.bucketForNode(n.id) if b.remove(n): ipLimitDec(r, b, n) -proc replaceNode*(r: var RoutingTable, n: Node) = +proc replaceNode*(r: var RoutingTable, n: Node, forceRemoveBelow = 1.0) = ## Replace node `n` with last entry in the replacement cache. If there are - ## no entries in the replacement cache, node `n` will simply be removed. - # TODO: Kademlia paper recommends here to not remove nodes if there are no - # replacements. However, that would require a bit more complexity in the - # revalidation as you don't want to try pinging that node all the time. + ## no entries in the replacement cache, node `n` will either be removed + ## or kept based on `forceRemoveBelow`. Default: remove. + ## Note: Kademlia paper recommends here to not remove nodes if there are no + ## replacements. This might mean pinging nodes that are not reachable, but + ## also avoids being too agressive because UDP losses or temporary network + ## failures. let b = r.bucketForNode(n.id) - if b.remove(n): - debug "Node removed from routing table", n - ipLimitDec(r, b, n) + if (b.replacementCache.len > 0 or n.seen <= forceRemoveBelow): + if b.remove(n): + debug "Node removed from routing table", n + ipLimitDec(r, b, n) - if b.replacementCache.len > 0: - # Nodes in the replacement cache are already included in the ip limits. - let rn = b.replacementCache[high(b.replacementCache)] - b.add(rn) - b.replacementCache.delete(high(b.replacementCache)) - debug "Node added to routing table from replacement cache", node=rn + if b.replacementCache.len > 0: + # Nodes in the replacement cache are already included in the ip limits. + let rn = b.replacementCache[high(b.replacementCache)] + b.add(rn) + b.replacementCache.delete(high(b.replacementCache)) + debug "Node added to routing table from replacement cache", node=rn proc getNode*(r: RoutingTable, id: NodeId): Option[Node] = ## Get the `Node` with `id` as `NodeId` from the routing table. @@ -472,16 +476,16 @@ proc nodesByDistanceTo(r: RoutingTable, k: KBucket, id: NodeId): seq[Node] = sortedByIt(k.nodes, r.distance(it.id, id)) proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE, - seenOnly = false): seq[Node] = + seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours of the given node id. - ## When seenOnly is set to true, only nodes that have been contacted - ## previously successfully will be selected. + ## When seenThreshold is set, only nodes that have been contacted + ## previously successfully and were seen enough recently will be selected. result = newSeqOfCap[Node](k * 2) block addNodes: for bucket in r.bucketsByDistanceTo(id): for n in r.nodesByDistanceTo(bucket, id): - # Only provide actively seen nodes when `seenOnly` set. - if not seenOnly or n.seen: + # Avoid nodes with 'seen' value below threshold + if n.seen >= seenThreshold: result.add(n) if result.len == k * 2: break addNodes @@ -493,22 +497,22 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE, result.setLen(k) proc neighboursAtDistance*(r: RoutingTable, distance: uint16, - k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours at given logarithmic distance. - result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenOnly) + result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenThreshold) # This is a bit silly, first getting closest nodes then to only keep the ones # that are exactly the requested distance. keepIf(result, proc(n: Node): bool = r.logDistance(n.id, r.localNode.id) == distance) proc neighboursAtDistances*(r: RoutingTable, distances: seq[uint16], - k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours at given logarithmic distances. # TODO: This will currently return nodes with neighbouring distances on the # first one prioritize. It might end up not including all the node distances # requested. Need to rework the logic here and not use the neighbours call. if distances.len > 0: result = r.neighbours(r.idAtDistance(r.localNode.id, distances[0]), k, - seenOnly) + seenThreshold) # This is a bit silly, first getting closest nodes then to only keep the ones # that are exactly the requested distances. keepIf(result, proc(n: Node): bool = @@ -525,18 +529,19 @@ proc moveRight[T](arr: var openArray[T], a, b: int) = shallowCopy(arr[i + 1], arr[i]) shallowCopy(arr[a], t) -proc setJustSeen*(r: RoutingTable, n: Node) = - ## Move `n` to the head (most recently seen) of its bucket. +proc setJustSeen*(r: RoutingTable, n: Node, seen = true) = + ## If seen, move `n` to the head (most recently seen) of its bucket. ## If `n` is not in the routing table, do nothing. let b = r.bucketForNode(n.id) - let idx = b.nodes.find(n) - if idx >= 0: - if idx != 0: - b.nodes.moveRight(0, idx - 1) + if seen: + let idx = b.nodes.find(n) + if idx >= 0: + if idx != 0: + b.nodes.moveRight(0, idx - 1) - if not n.seen: - b.nodes[0].seen = true - dht_routing_table_nodes.inc(labelValues = ["seen"]) + if not alreadySeen(n): # first time seeing the node + dht_routing_table_nodes.inc(labelValues = ["seen"]) + n.registerSeen(seen) proc nodeToRevalidate*(r: RoutingTable): Node = ## Return a node to revalidate. The least recently seen node from a random diff --git a/codexdht/private/eth/p2p/discoveryv5/transport.nim b/codexdht/private/eth/p2p/discoveryv5/transport.nim index 2caa73e..04fe20c 100644 --- a/codexdht/private/eth/p2p/discoveryv5/transport.nim +++ b/codexdht/private/eth/p2p/discoveryv5/transport.nim @@ -231,7 +231,8 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = if node.address.isSome() and a == node.address.get(): # TODO: maybe here we could verify that the address matches what we were # sending the 'whoareyou' message to. In that case, we can set 'seen' - node.seen = true + # TODO: verify how this works with restrictive NAT and firewall scenarios. + node.registerSeen() if t.client.addNode(node): trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered() discard t.sendPending(node) diff --git a/tests/dht/test_helper.nim b/tests/dht/test_helper.nim index 317399c..3d92b96 100644 --- a/tests/dht/test_helper.nim +++ b/tests/dht/test_helper.nim @@ -101,7 +101,7 @@ proc nodesAtDistanceUniqueIp*( proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool = # Add it as a seen node, warning: for testing convenience only! - n.seen = true + n.registerSeen() d.addNode(n) func udpExample*(_: type MultiAddress): MultiAddress =