From e1c1089e4f171c6784d0175daa80374df804807d Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 7 Oct 2024 21:39:54 +0200 Subject: [PATCH 1/6] fix aggressive node removal from on first packet loss UDP packets get lost easily. We can't just remove nodes from the routing table at first loss, as it can create issues in small networks and in cases of temporary connection failures. Signed-off-by: Csaba Kiraly --- .../private/eth/p2p/discoveryv5/protocol.nim | 17 ++++++---- .../eth/p2p/discoveryv5/routing_table.nim | 32 +++++++++++-------- 2 files changed, 29 insertions(+), 20 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 0ac76d9..40e8af0 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -449,9 +449,9 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte], else: ok() -proc replaceNode(d: Protocol, n: Node) = +proc replaceNode(d: Protocol, n: Node, removeIfNoReplacement = true) = if n.record notin d.bootstrapRecords: - d.routingTable.replaceNode(n) + d.routingTable.replaceNode(n, removeIfNoReplacement) else: # For now we never remove bootstrap nodes. It might make sense to actually # do so and to retry them only in case we drop to a really low amount of @@ -559,7 +559,11 @@ proc ping*(d: Protocol, toNode: Node): dht_message_requests_outgoing.inc(labelValues = ["invalid_response"]) return err("Invalid response to ping message") else: - d.replaceNode(toNode) + # A ping (or the pong) was lost, what should we do? Previous implementation called + # d.replaceNode(toNode) immediately, which removed the node. This is too aggressive, + # especially if we have a temporary network outage. Although bootstrap nodes are protected + # from being removed, everything else would slowly be removed. + d.replaceNode(toNode, removeIfNoReplacement = false) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Pong message not received in time") @@ -623,7 +627,8 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): dht_message_requests_outgoing.inc(labelValues = ["invalid_response"]) return err("Invalid response to talk request message") else: - d.replaceNode(toNode) + # remove on loss only if there is a replacement + d.replaceNode(toNode, removeIfNoReplacement = false) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Talk response message not received in time") @@ -752,8 +757,8 @@ proc sendGetProviders(d: Protocol, toNode: Node, dht_message_requests_outgoing.inc(labelValues = ["invalid_response"]) return err("Invalid response to GetProviders message") else: - # TODO: do we need to do something when there is no response? - d.replaceNode(toNode) + # remove on loss only if there is a replacement + d.replaceNode(toNode, removeIfNoReplacement = false) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("GetProviders response message not received in time") diff --git a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim index dcfa75c..b0ffa9f 100644 --- a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -431,27 +431,31 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus = proc removeNode*(r: var RoutingTable, n: Node) = ## Remove the node `n` from the routing table. + ## No replemennt added, even if there is in replacement cache. let b = r.bucketForNode(n.id) if b.remove(n): ipLimitDec(r, b, n) -proc replaceNode*(r: var RoutingTable, n: Node) = +proc replaceNode*(r: var RoutingTable, n: Node, removeIfNoReplacement = true) = ## Replace node `n` with last entry in the replacement cache. If there are - ## no entries in the replacement cache, node `n` will simply be removed. - # TODO: Kademlia paper recommends here to not remove nodes if there are no - # replacements. However, that would require a bit more complexity in the - # revalidation as you don't want to try pinging that node all the time. + ## no entries in the replacement cache, node `n` will either be removed + ## or kept based on `removeIfNoReplacement`. + ## Note: Kademlia paper recommends here to not remove nodes if there are no + ## replacements. This might mean pinging nodes that are not reachable, but + ## also avoids being too agressive because UDP losses or temporary network + ## failures. let b = r.bucketForNode(n.id) - if b.remove(n): - debug "Node removed from routing table", n - ipLimitDec(r, b, n) + if (b.replacementCache.len > 0 or removeIfNoReplacement): + if b.remove(n): + debug "Node removed from routing table", n + ipLimitDec(r, b, n) - if b.replacementCache.len > 0: - # Nodes in the replacement cache are already included in the ip limits. - let rn = b.replacementCache[high(b.replacementCache)] - b.add(rn) - b.replacementCache.delete(high(b.replacementCache)) - debug "Node added to routing table from replacement cache", node=rn + if b.replacementCache.len > 0: + # Nodes in the replacement cache are already included in the ip limits. + let rn = b.replacementCache[high(b.replacementCache)] + b.add(rn) + b.replacementCache.delete(high(b.replacementCache)) + debug "Node added to routing table from replacement cache", node=rn proc getNode*(r: RoutingTable, id: NodeId): Option[Node] = ## Get the `Node` with `id` as `NodeId` from the routing table. From 02bc12e639625fd153dfc3881ee9d9badda11ada Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 14 Oct 2024 15:33:29 +0200 Subject: [PATCH 2/6] change node seen flag to an exponential moving average keep defaults as before Signed-off-by: Csaba Kiraly # Conflicts: # codexdht/private/eth/p2p/discoveryv5/node.nim # codexdht/private/eth/p2p/discoveryv5/routing_table.nim --- codexdht/private/eth/p2p/discoveryv5/node.nim | 18 ++++++++- .../private/eth/p2p/discoveryv5/protocol.nim | 19 +++++---- .../eth/p2p/discoveryv5/routing_table.nim | 39 ++++++++++--------- .../private/eth/p2p/discoveryv5/transport.nim | 3 +- tests/dht/test_helper.nim | 2 +- 5 files changed, 50 insertions(+), 31 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/node.nim b/codexdht/private/eth/p2p/discoveryv5/node.nim index 1bfacf8..2f3d6c9 100644 --- a/codexdht/private/eth/p2p/discoveryv5/node.nim +++ b/codexdht/private/eth/p2p/discoveryv5/node.nim @@ -22,6 +22,7 @@ export stint const avgSmoothingFactor = 0.9 + seenSmoothingFactor = 0.9 type NodeId* = UInt256 @@ -41,9 +42,10 @@ type pubkey*: PublicKey address*: Option[Address] record*: SignedPeerRecord - seen*: bool ## Indicates if there was at least one successful + seen*: float ## Indicates if there was at least one successful ## request-response with this node, or if the nde was verified - ## through the underlying transport mechanisms. + ## through the underlying transport mechanisms. After first contact + ## it tracks how reliable is the communication with the node. stats*: Stats # traffic measurements and statistics func toNodeId*(pid: PeerId): NodeId = @@ -193,6 +195,18 @@ func shortLog*(address: Address): string = chronicles.formatIt(Address): shortLog(it) +func registerSeen*(n:Node, seen = true) = + ## Register event of seeing (getting message from) or not seeing (missing message) node + ## Note: interpretation might depend on NAT type + if n.seen == 0: # first time seeing the node + n.seen = 1 + else: + n.seen = seenSmoothingFactor * n.seen + (1.0 - seenSmoothingFactor) * seen.float + +func alreadySeen*(n:Node) : bool = + ## Was the node seen at least once? + n.seen > 0 + # collecting performane metrics func registerRtt*(n: Node, rtt: Duration) = ## register an RTT measurement diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 40e8af0..9c61a27 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -133,6 +133,9 @@ const MaxProvidersEntries* = 1_000_000 # one million records MaxProvidersPerEntry* = 20 # providers per entry ## call + FindnodeSeenThreshold = 1.0 ## threshold used as findnode response filter + LookupSeenThreshold = 0.0 ## threshold used for lookup nodeset selection + QuerySeenThreshold = 0.0 ## threshold used for query nodeset selection func shortLog*(record: SignedPeerRecord): string = ## Returns compact string representation of ``SignedPeerRecord``. @@ -249,14 +252,14 @@ proc randomNodes*(d: Protocol, maxAmount: int, d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField)) proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE, - seenOnly = false): seq[Node] = + seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours (closest node ids) of the given node id. - d.routingTable.neighbours(id, k, seenOnly) + d.routingTable.neighbours(id, k, seenThreshold) proc neighboursAtDistances*(d: Protocol, distances: seq[uint16], - k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours (closest node ids) at given distances. - d.routingTable.neighboursAtDistances(distances, k, seenOnly) + d.routingTable.neighboursAtDistances(distances, k, seenThreshold) proc nodesDiscovered*(d: Protocol): int = d.routingTable.len @@ -344,7 +347,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, # TODO: Still deduplicate also? if fn.distances.all(proc (x: uint16): bool = return x <= 256): d.sendNodes(fromId, fromAddr, reqId, - d.routingTable.neighboursAtDistances(fn.distances, seenOnly = true, k = FindNodeResultLimit)) + d.routingTable.neighboursAtDistances(fn.distances, FindNodeResultLimit, FindnodeSeenThreshold)) else: # At least one invalid distance, but the polite node we are, still respond # with empty nodes. @@ -353,7 +356,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address, proc handleFindNodeFast(d: Protocol, fromId: NodeId, fromAddr: Address, fnf: FindNodeFastMessage, reqId: RequestId) = d.sendNodes(fromId, fromAddr, reqId, - d.routingTable.neighbours(fnf.target, seenOnly = true, k = FindNodeFastResultLimit)) + d.routingTable.neighbours(fnf.target, FindNodeFastResultLimit, FindnodeSeenThreshold)) # TODO: if known, maybe we should add exact target even if not yet "seen" proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, @@ -669,7 +672,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] # `closestNodes` holds the k closest nodes to target found, sorted by distance # Unvalidated nodes are used for requests as a form of validation. var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE, - seenOnly = false) + LookupSeenThreshold) var asked, seen = initHashSet[NodeId]() asked.incl(d.localNode.id) # No need to ask our own node @@ -832,7 +835,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] ## This will take k nodes from the routing table closest to target and ## query them for nodes closest to target. If there are less than k nodes in ## the routing table, nodes returned by the first queries will be used. - var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false) + var queryBuffer = d.routingTable.neighbours(target, k, QuerySeenThreshold) var asked, seen = initHashSet[NodeId]() asked.incl(d.localNode.id) # No need to ask our own node diff --git a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim index b0ffa9f..b2c8a1d 100644 --- a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -218,7 +218,7 @@ proc remove(k: KBucket, n: Node): bool = let i = k.nodes.find(n) if i != -1: dht_routing_table_nodes.dec() - if k.nodes[i].seen: + if alreadySeen(k.nodes[i]): dht_routing_table_nodes.dec(labelValues = ["seen"]) k.nodes.delete(i) trace "removed node:", node = n @@ -476,16 +476,16 @@ proc nodesByDistanceTo(r: RoutingTable, k: KBucket, id: NodeId): seq[Node] = sortedByIt(k.nodes, r.distance(it.id, id)) proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE, - seenOnly = false): seq[Node] = + seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours of the given node id. - ## When seenOnly is set to true, only nodes that have been contacted - ## previously successfully will be selected. + ## When seenThreshold is set, only nodes that have been contacted + ## previously successfully and were seen enough recently will be selected. result = newSeqOfCap[Node](k * 2) block addNodes: for bucket in r.bucketsByDistanceTo(id): for n in r.nodesByDistanceTo(bucket, id): - # Only provide actively seen nodes when `seenOnly` set. - if not seenOnly or n.seen: + # Avoid nodes with 'seen' value below threshold + if n.seen >= seenThreshold: result.add(n) if result.len == k * 2: break addNodes @@ -497,22 +497,22 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE, result.setLen(k) proc neighboursAtDistance*(r: RoutingTable, distance: uint16, - k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours at given logarithmic distance. - result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenOnly) + result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenThreshold) # This is a bit silly, first getting closest nodes then to only keep the ones # that are exactly the requested distance. keepIf(result, proc(n: Node): bool = r.logDistance(n.id, r.localNode.id) == distance) proc neighboursAtDistances*(r: RoutingTable, distances: seq[uint16], - k: int = BUCKET_SIZE, seenOnly = false): seq[Node] = + k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] = ## Return up to k neighbours at given logarithmic distances. # TODO: This will currently return nodes with neighbouring distances on the # first one prioritize. It might end up not including all the node distances # requested. Need to rework the logic here and not use the neighbours call. if distances.len > 0: result = r.neighbours(r.idAtDistance(r.localNode.id, distances[0]), k, - seenOnly) + seenThreshold) # This is a bit silly, first getting closest nodes then to only keep the ones # that are exactly the requested distances. keepIf(result, proc(n: Node): bool = @@ -529,18 +529,19 @@ proc moveRight[T](arr: var openArray[T], a, b: int) = shallowCopy(arr[i + 1], arr[i]) shallowCopy(arr[a], t) -proc setJustSeen*(r: RoutingTable, n: Node) = - ## Move `n` to the head (most recently seen) of its bucket. +proc setJustSeen*(r: RoutingTable, n: Node, seen = true) = + ## If seen, move `n` to the head (most recently seen) of its bucket. ## If `n` is not in the routing table, do nothing. let b = r.bucketForNode(n.id) - let idx = b.nodes.find(n) - if idx >= 0: - if idx != 0: - b.nodes.moveRight(0, idx - 1) + if seen: + let idx = b.nodes.find(n) + if idx >= 0: + if idx != 0: + b.nodes.moveRight(0, idx - 1) - if not n.seen: - b.nodes[0].seen = true - dht_routing_table_nodes.inc(labelValues = ["seen"]) + if not alreadySeen(n): # first time seeing the node + dht_routing_table_nodes.inc(labelValues = ["seen"]) + n.registerSeen(seen) proc nodeToRevalidate*(r: RoutingTable): Node = ## Return a node to revalidate. The least recently seen node from a random diff --git a/codexdht/private/eth/p2p/discoveryv5/transport.nim b/codexdht/private/eth/p2p/discoveryv5/transport.nim index 2caa73e..04fe20c 100644 --- a/codexdht/private/eth/p2p/discoveryv5/transport.nim +++ b/codexdht/private/eth/p2p/discoveryv5/transport.nim @@ -231,7 +231,8 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) = if node.address.isSome() and a == node.address.get(): # TODO: maybe here we could verify that the address matches what we were # sending the 'whoareyou' message to. In that case, we can set 'seen' - node.seen = true + # TODO: verify how this works with restrictive NAT and firewall scenarios. + node.registerSeen() if t.client.addNode(node): trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered() discard t.sendPending(node) diff --git a/tests/dht/test_helper.nim b/tests/dht/test_helper.nim index 317399c..3d92b96 100644 --- a/tests/dht/test_helper.nim +++ b/tests/dht/test_helper.nim @@ -101,7 +101,7 @@ proc nodesAtDistanceUniqueIp*( proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool = # Add it as a seen node, warning: for testing convenience only! - n.seen = true + n.registerSeen() d.addNode(n) func udpExample*(_: type MultiAddress): MultiAddress = From 7507e99c96ba7eaeb9de2566460a2d9ecf93e918 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 14 Oct 2024 09:19:01 +0200 Subject: [PATCH 3/6] register "not seen" when missing replies Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 9c61a27..9e7d060 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -553,9 +553,9 @@ proc ping*(d: Protocol, toNode: Node): # trace "ping RTT:", rtt, node = toNode toNode.registerRtt(rtt) + d.routingTable.setJustSeen(toNode, resp.isSome()) if resp.isSome(): if resp.get().kind == pong: - d.routingTable.setJustSeen(toNode) return ok(resp.get().pong) else: d.replaceNode(toNode) @@ -580,9 +580,9 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): msg = FindNodeMessage(distances: distances) nodes = await d.waitNodeResponses(toNode, msg) + d.routingTable.setJustSeen(toNode, nodes.isOk) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances) - d.routingTable.setJustSeen(toNode) return ok(res) else: trace "findNode nodes not OK." @@ -599,9 +599,9 @@ proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId): msg = FindNodeFastMessage(target: target) nodes = await d.waitNodeResponses(toNode, msg) + d.routingTable.setJustSeen(toNode, nodes.isOk) if nodes.isOk: let res = verifyNodesRecords(nodes.get(), toNode, FindNodeFastResultLimit) - d.routingTable.setJustSeen(toNode) return ok(res) else: d.replaceNode(toNode) @@ -621,9 +621,9 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): # trace "talk RTT:", rtt, node = toNode toNode.registerRtt(rtt) + d.routingTable.setJustSeen(toNode, resp.isSome()) if resp.isSome(): if resp.get().kind == talkResp: - d.routingTable.setJustSeen(toNode) return ok(resp.get().talkResp.response) else: d.replaceNode(toNode) @@ -750,9 +750,9 @@ proc sendGetProviders(d: Protocol, toNode: Node, let resp = await d.waitResponse(toNode, msg) + d.routingTable.setJustSeen(toNode, resp.isSome()) if resp.isSome(): if resp.get().kind == MessageKind.providers: - d.routingTable.setJustSeen(toNode) return ok(resp.get().provs) else: # TODO: do we need to do something when there is an invalid response? From 6310c50ce00e9a1028425bae41d84be312560207 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 14 Oct 2024 15:35:10 +0200 Subject: [PATCH 4/6] introduce NoreplyRemoveThreshold Signed-off-by: Csaba Kiraly # Conflicts: # codexdht/private/eth/p2p/discoveryv5/protocol.nim --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 11 ++++++----- .../private/eth/p2p/discoveryv5/routing_table.nim | 6 +++--- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 9e7d060..999a9cf 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -136,6 +136,7 @@ const FindnodeSeenThreshold = 1.0 ## threshold used as findnode response filter LookupSeenThreshold = 0.0 ## threshold used for lookup nodeset selection QuerySeenThreshold = 0.0 ## threshold used for query nodeset selection + NoreplyRemoveThreshold = 0.0 ## remove node on no reply if 'seen' is below this value func shortLog*(record: SignedPeerRecord): string = ## Returns compact string representation of ``SignedPeerRecord``. @@ -452,9 +453,9 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte], else: ok() -proc replaceNode(d: Protocol, n: Node, removeIfNoReplacement = true) = +proc replaceNode(d: Protocol, n: Node, forceRemoveBelow = 1.0) = if n.record notin d.bootstrapRecords: - d.routingTable.replaceNode(n, removeIfNoReplacement) + d.routingTable.replaceNode(n, forceRemoveBelow) else: # For now we never remove bootstrap nodes. It might make sense to actually # do so and to retry them only in case we drop to a really low amount of @@ -566,7 +567,7 @@ proc ping*(d: Protocol, toNode: Node): # d.replaceNode(toNode) immediately, which removed the node. This is too aggressive, # especially if we have a temporary network outage. Although bootstrap nodes are protected # from being removed, everything else would slowly be removed. - d.replaceNode(toNode, removeIfNoReplacement = false) + d.replaceNode(toNode, NoreplyRemoveThreshold) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Pong message not received in time") @@ -631,7 +632,7 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]): return err("Invalid response to talk request message") else: # remove on loss only if there is a replacement - d.replaceNode(toNode, removeIfNoReplacement = false) + d.replaceNode(toNode, NoreplyRemoveThreshold) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("Talk response message not received in time") @@ -761,7 +762,7 @@ proc sendGetProviders(d: Protocol, toNode: Node, return err("Invalid response to GetProviders message") else: # remove on loss only if there is a replacement - d.replaceNode(toNode, removeIfNoReplacement = false) + d.replaceNode(toNode, NoreplyRemoveThreshold) dht_message_requests_outgoing.inc(labelValues = ["no_response"]) return err("GetProviders response message not received in time") diff --git a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim index b2c8a1d..3ad0b8c 100644 --- a/codexdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/codexdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -436,16 +436,16 @@ proc removeNode*(r: var RoutingTable, n: Node) = if b.remove(n): ipLimitDec(r, b, n) -proc replaceNode*(r: var RoutingTable, n: Node, removeIfNoReplacement = true) = +proc replaceNode*(r: var RoutingTable, n: Node, forceRemoveBelow = 1.0) = ## Replace node `n` with last entry in the replacement cache. If there are ## no entries in the replacement cache, node `n` will either be removed - ## or kept based on `removeIfNoReplacement`. + ## or kept based on `forceRemoveBelow`. Default: remove. ## Note: Kademlia paper recommends here to not remove nodes if there are no ## replacements. This might mean pinging nodes that are not reachable, but ## also avoids being too agressive because UDP losses or temporary network ## failures. let b = r.bucketForNode(n.id) - if (b.replacementCache.len > 0 or removeIfNoReplacement): + if (b.replacementCache.len > 0 or n.seen <= forceRemoveBelow): if b.remove(n): debug "Node removed from routing table", n ipLimitDec(r, b, n) From fee5a9ced29022adcac9dd9d914439b3818a5bc3 Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Mon, 14 Oct 2024 10:11:37 +0200 Subject: [PATCH 5/6] set NoreplyRemoveThreshold to 0.5 Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index 999a9cf..b7a23b9 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -136,7 +136,7 @@ const FindnodeSeenThreshold = 1.0 ## threshold used as findnode response filter LookupSeenThreshold = 0.0 ## threshold used for lookup nodeset selection QuerySeenThreshold = 0.0 ## threshold used for query nodeset selection - NoreplyRemoveThreshold = 0.0 ## remove node on no reply if 'seen' is below this value + NoreplyRemoveThreshold = 0.5 ## remove node on no reply if 'seen' is below this value func shortLog*(record: SignedPeerRecord): string = ## Returns compact string representation of ``SignedPeerRecord``. From 1a344f1fd77cbcab43acb532fd80de82e97b88db Mon Sep 17 00:00:00 2001 From: Csaba Kiraly Date: Tue, 15 Oct 2024 18:17:49 +0200 Subject: [PATCH 6/6] log reliability based on loss statistics Signed-off-by: Csaba Kiraly --- codexdht/private/eth/p2p/discoveryv5/protocol.nim | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/codexdht/private/eth/p2p/discoveryv5/protocol.nim b/codexdht/private/eth/p2p/discoveryv5/protocol.nim index b7a23b9..77afee3 100644 --- a/codexdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/codexdht/private/eth/p2p/discoveryv5/protocol.nim @@ -1056,7 +1056,8 @@ proc debugPrintLoop(d: Protocol) {.async.} = debug "bucket", depth = b.getDepth, len = b.nodes.len, standby = b.replacementLen for n in b.nodes: - debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int + debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int, + reliability = n.seen.round(3) # bandwidth estimates are based on limited information, so not logging it yet to avoid confusion # trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3)