fix aggressive node removal from on first packet loss

UDP packets get lost easily. We can't just remove
nodes from the routing table at first loss, as it can
create issues in small networks and in cases of temporary
connection failures.

Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com>
This commit is contained in:
Csaba Kiraly 2024-10-07 21:39:54 +02:00
parent c1d2ea410d
commit e1c1089e4f
No known key found for this signature in database
GPG Key ID: 0FE274EE8C95166E
2 changed files with 29 additions and 20 deletions

View File

@ -449,9 +449,9 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte],
else:
ok()
proc replaceNode(d: Protocol, n: Node) =
proc replaceNode(d: Protocol, n: Node, removeIfNoReplacement = true) =
if n.record notin d.bootstrapRecords:
d.routingTable.replaceNode(n)
d.routingTable.replaceNode(n, removeIfNoReplacement)
else:
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
@ -559,7 +559,11 @@ proc ping*(d: Protocol, toNode: Node):
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to ping message")
else:
d.replaceNode(toNode)
# A ping (or the pong) was lost, what should we do? Previous implementation called
# d.replaceNode(toNode) immediately, which removed the node. This is too aggressive,
# especially if we have a temporary network outage. Although bootstrap nodes are protected
# from being removed, everything else would slowly be removed.
d.replaceNode(toNode, removeIfNoReplacement = false)
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Pong message not received in time")
@ -623,7 +627,8 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to talk request message")
else:
d.replaceNode(toNode)
# remove on loss only if there is a replacement
d.replaceNode(toNode, removeIfNoReplacement = false)
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Talk response message not received in time")
@ -752,8 +757,8 @@ proc sendGetProviders(d: Protocol, toNode: Node,
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to GetProviders message")
else:
# TODO: do we need to do something when there is no response?
d.replaceNode(toNode)
# remove on loss only if there is a replacement
d.replaceNode(toNode, removeIfNoReplacement = false)
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("GetProviders response message not received in time")

View File

@ -431,27 +431,31 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
proc removeNode*(r: var RoutingTable, n: Node) =
## Remove the node `n` from the routing table.
## No replemennt added, even if there is in replacement cache.
let b = r.bucketForNode(n.id)
if b.remove(n):
ipLimitDec(r, b, n)
proc replaceNode*(r: var RoutingTable, n: Node) =
proc replaceNode*(r: var RoutingTable, n: Node, removeIfNoReplacement = true) =
## Replace node `n` with last entry in the replacement cache. If there are
## no entries in the replacement cache, node `n` will simply be removed.
# TODO: Kademlia paper recommends here to not remove nodes if there are no
# replacements. However, that would require a bit more complexity in the
# revalidation as you don't want to try pinging that node all the time.
## no entries in the replacement cache, node `n` will either be removed
## or kept based on `removeIfNoReplacement`.
## Note: Kademlia paper recommends here to not remove nodes if there are no
## replacements. This might mean pinging nodes that are not reachable, but
## also avoids being too agressive because UDP losses or temporary network
## failures.
let b = r.bucketForNode(n.id)
if b.remove(n):
debug "Node removed from routing table", n
ipLimitDec(r, b, n)
if (b.replacementCache.len > 0 or removeIfNoReplacement):
if b.remove(n):
debug "Node removed from routing table", n
ipLimitDec(r, b, n)
if b.replacementCache.len > 0:
# Nodes in the replacement cache are already included in the ip limits.
let rn = b.replacementCache[high(b.replacementCache)]
b.add(rn)
b.replacementCache.delete(high(b.replacementCache))
debug "Node added to routing table from replacement cache", node=rn
if b.replacementCache.len > 0:
# Nodes in the replacement cache are already included in the ip limits.
let rn = b.replacementCache[high(b.replacementCache)]
b.add(rn)
b.replacementCache.delete(high(b.replacementCache))
debug "Node added to routing table from replacement cache", node=rn
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
## Get the `Node` with `id` as `NodeId` from the routing table.