mirror of
https://github.com/logos-storage/logos-storage-nim-dht.git
synced 2026-01-02 13:33:08 +00:00
Merge pull request #99 from codex-storage/fix-removal
add link reliability metrics, fix aggressive node removal on first packet loss
This commit is contained in:
commit
ee4e2102d9
@ -22,6 +22,7 @@ export stint
|
|||||||
|
|
||||||
const
|
const
|
||||||
avgSmoothingFactor = 0.9
|
avgSmoothingFactor = 0.9
|
||||||
|
seenSmoothingFactor = 0.9
|
||||||
|
|
||||||
type
|
type
|
||||||
NodeId* = UInt256
|
NodeId* = UInt256
|
||||||
@ -41,9 +42,10 @@ type
|
|||||||
pubkey*: PublicKey
|
pubkey*: PublicKey
|
||||||
address*: Option[Address]
|
address*: Option[Address]
|
||||||
record*: SignedPeerRecord
|
record*: SignedPeerRecord
|
||||||
seen*: bool ## Indicates if there was at least one successful
|
seen*: float ## Indicates if there was at least one successful
|
||||||
## request-response with this node, or if the nde was verified
|
## request-response with this node, or if the nde was verified
|
||||||
## through the underlying transport mechanisms.
|
## through the underlying transport mechanisms. After first contact
|
||||||
|
## it tracks how reliable is the communication with the node.
|
||||||
stats*: Stats # traffic measurements and statistics
|
stats*: Stats # traffic measurements and statistics
|
||||||
|
|
||||||
func toNodeId*(pid: PeerId): NodeId =
|
func toNodeId*(pid: PeerId): NodeId =
|
||||||
@ -193,6 +195,18 @@ func shortLog*(address: Address): string =
|
|||||||
|
|
||||||
chronicles.formatIt(Address): shortLog(it)
|
chronicles.formatIt(Address): shortLog(it)
|
||||||
|
|
||||||
|
func registerSeen*(n:Node, seen = true) =
|
||||||
|
## Register event of seeing (getting message from) or not seeing (missing message) node
|
||||||
|
## Note: interpretation might depend on NAT type
|
||||||
|
if n.seen == 0: # first time seeing the node
|
||||||
|
n.seen = 1
|
||||||
|
else:
|
||||||
|
n.seen = seenSmoothingFactor * n.seen + (1.0 - seenSmoothingFactor) * seen.float
|
||||||
|
|
||||||
|
func alreadySeen*(n:Node) : bool =
|
||||||
|
## Was the node seen at least once?
|
||||||
|
n.seen > 0
|
||||||
|
|
||||||
# collecting performane metrics
|
# collecting performane metrics
|
||||||
func registerRtt*(n: Node, rtt: Duration) =
|
func registerRtt*(n: Node, rtt: Duration) =
|
||||||
## register an RTT measurement
|
## register an RTT measurement
|
||||||
|
|||||||
@ -133,6 +133,10 @@ const
|
|||||||
MaxProvidersEntries* = 1_000_000 # one million records
|
MaxProvidersEntries* = 1_000_000 # one million records
|
||||||
MaxProvidersPerEntry* = 20 # providers per entry
|
MaxProvidersPerEntry* = 20 # providers per entry
|
||||||
## call
|
## call
|
||||||
|
FindnodeSeenThreshold = 1.0 ## threshold used as findnode response filter
|
||||||
|
LookupSeenThreshold = 0.0 ## threshold used for lookup nodeset selection
|
||||||
|
QuerySeenThreshold = 0.0 ## threshold used for query nodeset selection
|
||||||
|
NoreplyRemoveThreshold = 0.5 ## remove node on no reply if 'seen' is below this value
|
||||||
|
|
||||||
func shortLog*(record: SignedPeerRecord): string =
|
func shortLog*(record: SignedPeerRecord): string =
|
||||||
## Returns compact string representation of ``SignedPeerRecord``.
|
## Returns compact string representation of ``SignedPeerRecord``.
|
||||||
@ -249,14 +253,14 @@ proc randomNodes*(d: Protocol, maxAmount: int,
|
|||||||
d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField))
|
d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField))
|
||||||
|
|
||||||
proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE,
|
proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE,
|
||||||
seenOnly = false): seq[Node] =
|
seenThreshold = 0.0): seq[Node] =
|
||||||
## Return up to k neighbours (closest node ids) of the given node id.
|
## Return up to k neighbours (closest node ids) of the given node id.
|
||||||
d.routingTable.neighbours(id, k, seenOnly)
|
d.routingTable.neighbours(id, k, seenThreshold)
|
||||||
|
|
||||||
proc neighboursAtDistances*(d: Protocol, distances: seq[uint16],
|
proc neighboursAtDistances*(d: Protocol, distances: seq[uint16],
|
||||||
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
|
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
|
||||||
## Return up to k neighbours (closest node ids) at given distances.
|
## Return up to k neighbours (closest node ids) at given distances.
|
||||||
d.routingTable.neighboursAtDistances(distances, k, seenOnly)
|
d.routingTable.neighboursAtDistances(distances, k, seenThreshold)
|
||||||
|
|
||||||
proc nodesDiscovered*(d: Protocol): int = d.routingTable.len
|
proc nodesDiscovered*(d: Protocol): int = d.routingTable.len
|
||||||
|
|
||||||
@ -344,7 +348,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
|
|||||||
# TODO: Still deduplicate also?
|
# TODO: Still deduplicate also?
|
||||||
if fn.distances.all(proc (x: uint16): bool = return x <= 256):
|
if fn.distances.all(proc (x: uint16): bool = return x <= 256):
|
||||||
d.sendNodes(fromId, fromAddr, reqId,
|
d.sendNodes(fromId, fromAddr, reqId,
|
||||||
d.routingTable.neighboursAtDistances(fn.distances, seenOnly = true, k = FindNodeResultLimit))
|
d.routingTable.neighboursAtDistances(fn.distances, FindNodeResultLimit, FindnodeSeenThreshold))
|
||||||
else:
|
else:
|
||||||
# At least one invalid distance, but the polite node we are, still respond
|
# At least one invalid distance, but the polite node we are, still respond
|
||||||
# with empty nodes.
|
# with empty nodes.
|
||||||
@ -353,7 +357,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
|
|||||||
proc handleFindNodeFast(d: Protocol, fromId: NodeId, fromAddr: Address,
|
proc handleFindNodeFast(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
fnf: FindNodeFastMessage, reqId: RequestId) =
|
fnf: FindNodeFastMessage, reqId: RequestId) =
|
||||||
d.sendNodes(fromId, fromAddr, reqId,
|
d.sendNodes(fromId, fromAddr, reqId,
|
||||||
d.routingTable.neighbours(fnf.target, seenOnly = true, k = FindNodeFastResultLimit))
|
d.routingTable.neighbours(fnf.target, FindNodeFastResultLimit, FindnodeSeenThreshold))
|
||||||
# TODO: if known, maybe we should add exact target even if not yet "seen"
|
# TODO: if known, maybe we should add exact target even if not yet "seen"
|
||||||
|
|
||||||
proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||||
@ -449,9 +453,9 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte],
|
|||||||
else:
|
else:
|
||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc replaceNode(d: Protocol, n: Node) =
|
proc replaceNode(d: Protocol, n: Node, forceRemoveBelow = 1.0) =
|
||||||
if n.record notin d.bootstrapRecords:
|
if n.record notin d.bootstrapRecords:
|
||||||
d.routingTable.replaceNode(n)
|
d.routingTable.replaceNode(n, forceRemoveBelow)
|
||||||
else:
|
else:
|
||||||
# For now we never remove bootstrap nodes. It might make sense to actually
|
# For now we never remove bootstrap nodes. It might make sense to actually
|
||||||
# do so and to retry them only in case we drop to a really low amount of
|
# do so and to retry them only in case we drop to a really low amount of
|
||||||
@ -550,16 +554,20 @@ proc ping*(d: Protocol, toNode: Node):
|
|||||||
# trace "ping RTT:", rtt, node = toNode
|
# trace "ping RTT:", rtt, node = toNode
|
||||||
toNode.registerRtt(rtt)
|
toNode.registerRtt(rtt)
|
||||||
|
|
||||||
|
d.routingTable.setJustSeen(toNode, resp.isSome())
|
||||||
if resp.isSome():
|
if resp.isSome():
|
||||||
if resp.get().kind == pong:
|
if resp.get().kind == pong:
|
||||||
d.routingTable.setJustSeen(toNode)
|
|
||||||
return ok(resp.get().pong)
|
return ok(resp.get().pong)
|
||||||
else:
|
else:
|
||||||
d.replaceNode(toNode)
|
d.replaceNode(toNode)
|
||||||
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
|
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
|
||||||
return err("Invalid response to ping message")
|
return err("Invalid response to ping message")
|
||||||
else:
|
else:
|
||||||
d.replaceNode(toNode)
|
# A ping (or the pong) was lost, what should we do? Previous implementation called
|
||||||
|
# d.replaceNode(toNode) immediately, which removed the node. This is too aggressive,
|
||||||
|
# especially if we have a temporary network outage. Although bootstrap nodes are protected
|
||||||
|
# from being removed, everything else would slowly be removed.
|
||||||
|
d.replaceNode(toNode, NoreplyRemoveThreshold)
|
||||||
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
|
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
|
||||||
return err("Pong message not received in time")
|
return err("Pong message not received in time")
|
||||||
|
|
||||||
@ -573,9 +581,9 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
|
|||||||
msg = FindNodeMessage(distances: distances)
|
msg = FindNodeMessage(distances: distances)
|
||||||
nodes = await d.waitNodeResponses(toNode, msg)
|
nodes = await d.waitNodeResponses(toNode, msg)
|
||||||
|
|
||||||
|
d.routingTable.setJustSeen(toNode, nodes.isOk)
|
||||||
if nodes.isOk:
|
if nodes.isOk:
|
||||||
let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances)
|
let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances)
|
||||||
d.routingTable.setJustSeen(toNode)
|
|
||||||
return ok(res)
|
return ok(res)
|
||||||
else:
|
else:
|
||||||
trace "findNode nodes not OK."
|
trace "findNode nodes not OK."
|
||||||
@ -592,9 +600,9 @@ proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId):
|
|||||||
msg = FindNodeFastMessage(target: target)
|
msg = FindNodeFastMessage(target: target)
|
||||||
nodes = await d.waitNodeResponses(toNode, msg)
|
nodes = await d.waitNodeResponses(toNode, msg)
|
||||||
|
|
||||||
|
d.routingTable.setJustSeen(toNode, nodes.isOk)
|
||||||
if nodes.isOk:
|
if nodes.isOk:
|
||||||
let res = verifyNodesRecords(nodes.get(), toNode, FindNodeFastResultLimit)
|
let res = verifyNodesRecords(nodes.get(), toNode, FindNodeFastResultLimit)
|
||||||
d.routingTable.setJustSeen(toNode)
|
|
||||||
return ok(res)
|
return ok(res)
|
||||||
else:
|
else:
|
||||||
d.replaceNode(toNode)
|
d.replaceNode(toNode)
|
||||||
@ -614,16 +622,17 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
|
|||||||
# trace "talk RTT:", rtt, node = toNode
|
# trace "talk RTT:", rtt, node = toNode
|
||||||
toNode.registerRtt(rtt)
|
toNode.registerRtt(rtt)
|
||||||
|
|
||||||
|
d.routingTable.setJustSeen(toNode, resp.isSome())
|
||||||
if resp.isSome():
|
if resp.isSome():
|
||||||
if resp.get().kind == talkResp:
|
if resp.get().kind == talkResp:
|
||||||
d.routingTable.setJustSeen(toNode)
|
|
||||||
return ok(resp.get().talkResp.response)
|
return ok(resp.get().talkResp.response)
|
||||||
else:
|
else:
|
||||||
d.replaceNode(toNode)
|
d.replaceNode(toNode)
|
||||||
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
|
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
|
||||||
return err("Invalid response to talk request message")
|
return err("Invalid response to talk request message")
|
||||||
else:
|
else:
|
||||||
d.replaceNode(toNode)
|
# remove on loss only if there is a replacement
|
||||||
|
d.replaceNode(toNode, NoreplyRemoveThreshold)
|
||||||
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
|
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
|
||||||
return err("Talk response message not received in time")
|
return err("Talk response message not received in time")
|
||||||
|
|
||||||
@ -664,7 +673,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]]
|
|||||||
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
||||||
# Unvalidated nodes are used for requests as a form of validation.
|
# Unvalidated nodes are used for requests as a form of validation.
|
||||||
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
|
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
|
||||||
seenOnly = false)
|
LookupSeenThreshold)
|
||||||
|
|
||||||
var asked, seen = initHashSet[NodeId]()
|
var asked, seen = initHashSet[NodeId]()
|
||||||
asked.incl(d.localNode.id) # No need to ask our own node
|
asked.incl(d.localNode.id) # No need to ask our own node
|
||||||
@ -742,9 +751,9 @@ proc sendGetProviders(d: Protocol, toNode: Node,
|
|||||||
let
|
let
|
||||||
resp = await d.waitResponse(toNode, msg)
|
resp = await d.waitResponse(toNode, msg)
|
||||||
|
|
||||||
|
d.routingTable.setJustSeen(toNode, resp.isSome())
|
||||||
if resp.isSome():
|
if resp.isSome():
|
||||||
if resp.get().kind == MessageKind.providers:
|
if resp.get().kind == MessageKind.providers:
|
||||||
d.routingTable.setJustSeen(toNode)
|
|
||||||
return ok(resp.get().provs)
|
return ok(resp.get().provs)
|
||||||
else:
|
else:
|
||||||
# TODO: do we need to do something when there is an invalid response?
|
# TODO: do we need to do something when there is an invalid response?
|
||||||
@ -752,8 +761,8 @@ proc sendGetProviders(d: Protocol, toNode: Node,
|
|||||||
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
|
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
|
||||||
return err("Invalid response to GetProviders message")
|
return err("Invalid response to GetProviders message")
|
||||||
else:
|
else:
|
||||||
# TODO: do we need to do something when there is no response?
|
# remove on loss only if there is a replacement
|
||||||
d.replaceNode(toNode)
|
d.replaceNode(toNode, NoreplyRemoveThreshold)
|
||||||
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
|
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
|
||||||
return err("GetProviders response message not received in time")
|
return err("GetProviders response message not received in time")
|
||||||
|
|
||||||
@ -827,7 +836,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
|||||||
## This will take k nodes from the routing table closest to target and
|
## This will take k nodes from the routing table closest to target and
|
||||||
## query them for nodes closest to target. If there are less than k nodes in
|
## query them for nodes closest to target. If there are less than k nodes in
|
||||||
## the routing table, nodes returned by the first queries will be used.
|
## the routing table, nodes returned by the first queries will be used.
|
||||||
var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false)
|
var queryBuffer = d.routingTable.neighbours(target, k, QuerySeenThreshold)
|
||||||
|
|
||||||
var asked, seen = initHashSet[NodeId]()
|
var asked, seen = initHashSet[NodeId]()
|
||||||
asked.incl(d.localNode.id) # No need to ask our own node
|
asked.incl(d.localNode.id) # No need to ask our own node
|
||||||
@ -1047,7 +1056,8 @@ proc debugPrintLoop(d: Protocol) {.async.} =
|
|||||||
debug "bucket", depth = b.getDepth,
|
debug "bucket", depth = b.getDepth,
|
||||||
len = b.nodes.len, standby = b.replacementLen
|
len = b.nodes.len, standby = b.replacementLen
|
||||||
for n in b.nodes:
|
for n in b.nodes:
|
||||||
debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int
|
debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int,
|
||||||
|
reliability = n.seen.round(3)
|
||||||
# bandwidth estimates are based on limited information, so not logging it yet to avoid confusion
|
# bandwidth estimates are based on limited information, so not logging it yet to avoid confusion
|
||||||
# trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3)
|
# trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3)
|
||||||
|
|
||||||
|
|||||||
@ -218,7 +218,7 @@ proc remove(k: KBucket, n: Node): bool =
|
|||||||
let i = k.nodes.find(n)
|
let i = k.nodes.find(n)
|
||||||
if i != -1:
|
if i != -1:
|
||||||
dht_routing_table_nodes.dec()
|
dht_routing_table_nodes.dec()
|
||||||
if k.nodes[i].seen:
|
if alreadySeen(k.nodes[i]):
|
||||||
dht_routing_table_nodes.dec(labelValues = ["seen"])
|
dht_routing_table_nodes.dec(labelValues = ["seen"])
|
||||||
k.nodes.delete(i)
|
k.nodes.delete(i)
|
||||||
trace "removed node:", node = n
|
trace "removed node:", node = n
|
||||||
@ -431,27 +431,31 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
|
|||||||
|
|
||||||
proc removeNode*(r: var RoutingTable, n: Node) =
|
proc removeNode*(r: var RoutingTable, n: Node) =
|
||||||
## Remove the node `n` from the routing table.
|
## Remove the node `n` from the routing table.
|
||||||
|
## No replemennt added, even if there is in replacement cache.
|
||||||
let b = r.bucketForNode(n.id)
|
let b = r.bucketForNode(n.id)
|
||||||
if b.remove(n):
|
if b.remove(n):
|
||||||
ipLimitDec(r, b, n)
|
ipLimitDec(r, b, n)
|
||||||
|
|
||||||
proc replaceNode*(r: var RoutingTable, n: Node) =
|
proc replaceNode*(r: var RoutingTable, n: Node, forceRemoveBelow = 1.0) =
|
||||||
## Replace node `n` with last entry in the replacement cache. If there are
|
## Replace node `n` with last entry in the replacement cache. If there are
|
||||||
## no entries in the replacement cache, node `n` will simply be removed.
|
## no entries in the replacement cache, node `n` will either be removed
|
||||||
# TODO: Kademlia paper recommends here to not remove nodes if there are no
|
## or kept based on `forceRemoveBelow`. Default: remove.
|
||||||
# replacements. However, that would require a bit more complexity in the
|
## Note: Kademlia paper recommends here to not remove nodes if there are no
|
||||||
# revalidation as you don't want to try pinging that node all the time.
|
## replacements. This might mean pinging nodes that are not reachable, but
|
||||||
|
## also avoids being too agressive because UDP losses or temporary network
|
||||||
|
## failures.
|
||||||
let b = r.bucketForNode(n.id)
|
let b = r.bucketForNode(n.id)
|
||||||
if b.remove(n):
|
if (b.replacementCache.len > 0 or n.seen <= forceRemoveBelow):
|
||||||
debug "Node removed from routing table", n
|
if b.remove(n):
|
||||||
ipLimitDec(r, b, n)
|
debug "Node removed from routing table", n
|
||||||
|
ipLimitDec(r, b, n)
|
||||||
|
|
||||||
if b.replacementCache.len > 0:
|
if b.replacementCache.len > 0:
|
||||||
# Nodes in the replacement cache are already included in the ip limits.
|
# Nodes in the replacement cache are already included in the ip limits.
|
||||||
let rn = b.replacementCache[high(b.replacementCache)]
|
let rn = b.replacementCache[high(b.replacementCache)]
|
||||||
b.add(rn)
|
b.add(rn)
|
||||||
b.replacementCache.delete(high(b.replacementCache))
|
b.replacementCache.delete(high(b.replacementCache))
|
||||||
debug "Node added to routing table from replacement cache", node=rn
|
debug "Node added to routing table from replacement cache", node=rn
|
||||||
|
|
||||||
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
|
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
|
||||||
## Get the `Node` with `id` as `NodeId` from the routing table.
|
## Get the `Node` with `id` as `NodeId` from the routing table.
|
||||||
@ -472,16 +476,16 @@ proc nodesByDistanceTo(r: RoutingTable, k: KBucket, id: NodeId): seq[Node] =
|
|||||||
sortedByIt(k.nodes, r.distance(it.id, id))
|
sortedByIt(k.nodes, r.distance(it.id, id))
|
||||||
|
|
||||||
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
|
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
|
||||||
seenOnly = false): seq[Node] =
|
seenThreshold = 0.0): seq[Node] =
|
||||||
## Return up to k neighbours of the given node id.
|
## Return up to k neighbours of the given node id.
|
||||||
## When seenOnly is set to true, only nodes that have been contacted
|
## When seenThreshold is set, only nodes that have been contacted
|
||||||
## previously successfully will be selected.
|
## previously successfully and were seen enough recently will be selected.
|
||||||
result = newSeqOfCap[Node](k * 2)
|
result = newSeqOfCap[Node](k * 2)
|
||||||
block addNodes:
|
block addNodes:
|
||||||
for bucket in r.bucketsByDistanceTo(id):
|
for bucket in r.bucketsByDistanceTo(id):
|
||||||
for n in r.nodesByDistanceTo(bucket, id):
|
for n in r.nodesByDistanceTo(bucket, id):
|
||||||
# Only provide actively seen nodes when `seenOnly` set.
|
# Avoid nodes with 'seen' value below threshold
|
||||||
if not seenOnly or n.seen:
|
if n.seen >= seenThreshold:
|
||||||
result.add(n)
|
result.add(n)
|
||||||
if result.len == k * 2:
|
if result.len == k * 2:
|
||||||
break addNodes
|
break addNodes
|
||||||
@ -493,22 +497,22 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
|
|||||||
result.setLen(k)
|
result.setLen(k)
|
||||||
|
|
||||||
proc neighboursAtDistance*(r: RoutingTable, distance: uint16,
|
proc neighboursAtDistance*(r: RoutingTable, distance: uint16,
|
||||||
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
|
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
|
||||||
## Return up to k neighbours at given logarithmic distance.
|
## Return up to k neighbours at given logarithmic distance.
|
||||||
result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenOnly)
|
result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenThreshold)
|
||||||
# This is a bit silly, first getting closest nodes then to only keep the ones
|
# This is a bit silly, first getting closest nodes then to only keep the ones
|
||||||
# that are exactly the requested distance.
|
# that are exactly the requested distance.
|
||||||
keepIf(result, proc(n: Node): bool = r.logDistance(n.id, r.localNode.id) == distance)
|
keepIf(result, proc(n: Node): bool = r.logDistance(n.id, r.localNode.id) == distance)
|
||||||
|
|
||||||
proc neighboursAtDistances*(r: RoutingTable, distances: seq[uint16],
|
proc neighboursAtDistances*(r: RoutingTable, distances: seq[uint16],
|
||||||
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
|
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
|
||||||
## Return up to k neighbours at given logarithmic distances.
|
## Return up to k neighbours at given logarithmic distances.
|
||||||
# TODO: This will currently return nodes with neighbouring distances on the
|
# TODO: This will currently return nodes with neighbouring distances on the
|
||||||
# first one prioritize. It might end up not including all the node distances
|
# first one prioritize. It might end up not including all the node distances
|
||||||
# requested. Need to rework the logic here and not use the neighbours call.
|
# requested. Need to rework the logic here and not use the neighbours call.
|
||||||
if distances.len > 0:
|
if distances.len > 0:
|
||||||
result = r.neighbours(r.idAtDistance(r.localNode.id, distances[0]), k,
|
result = r.neighbours(r.idAtDistance(r.localNode.id, distances[0]), k,
|
||||||
seenOnly)
|
seenThreshold)
|
||||||
# This is a bit silly, first getting closest nodes then to only keep the ones
|
# This is a bit silly, first getting closest nodes then to only keep the ones
|
||||||
# that are exactly the requested distances.
|
# that are exactly the requested distances.
|
||||||
keepIf(result, proc(n: Node): bool =
|
keepIf(result, proc(n: Node): bool =
|
||||||
@ -525,18 +529,19 @@ proc moveRight[T](arr: var openArray[T], a, b: int) =
|
|||||||
shallowCopy(arr[i + 1], arr[i])
|
shallowCopy(arr[i + 1], arr[i])
|
||||||
shallowCopy(arr[a], t)
|
shallowCopy(arr[a], t)
|
||||||
|
|
||||||
proc setJustSeen*(r: RoutingTable, n: Node) =
|
proc setJustSeen*(r: RoutingTable, n: Node, seen = true) =
|
||||||
## Move `n` to the head (most recently seen) of its bucket.
|
## If seen, move `n` to the head (most recently seen) of its bucket.
|
||||||
## If `n` is not in the routing table, do nothing.
|
## If `n` is not in the routing table, do nothing.
|
||||||
let b = r.bucketForNode(n.id)
|
let b = r.bucketForNode(n.id)
|
||||||
let idx = b.nodes.find(n)
|
if seen:
|
||||||
if idx >= 0:
|
let idx = b.nodes.find(n)
|
||||||
if idx != 0:
|
if idx >= 0:
|
||||||
b.nodes.moveRight(0, idx - 1)
|
if idx != 0:
|
||||||
|
b.nodes.moveRight(0, idx - 1)
|
||||||
|
|
||||||
if not n.seen:
|
if not alreadySeen(n): # first time seeing the node
|
||||||
b.nodes[0].seen = true
|
dht_routing_table_nodes.inc(labelValues = ["seen"])
|
||||||
dht_routing_table_nodes.inc(labelValues = ["seen"])
|
n.registerSeen(seen)
|
||||||
|
|
||||||
proc nodeToRevalidate*(r: RoutingTable): Node =
|
proc nodeToRevalidate*(r: RoutingTable): Node =
|
||||||
## Return a node to revalidate. The least recently seen node from a random
|
## Return a node to revalidate. The least recently seen node from a random
|
||||||
|
|||||||
@ -231,7 +231,8 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
|
|||||||
if node.address.isSome() and a == node.address.get():
|
if node.address.isSome() and a == node.address.get():
|
||||||
# TODO: maybe here we could verify that the address matches what we were
|
# TODO: maybe here we could verify that the address matches what we were
|
||||||
# sending the 'whoareyou' message to. In that case, we can set 'seen'
|
# sending the 'whoareyou' message to. In that case, we can set 'seen'
|
||||||
node.seen = true
|
# TODO: verify how this works with restrictive NAT and firewall scenarios.
|
||||||
|
node.registerSeen()
|
||||||
if t.client.addNode(node):
|
if t.client.addNode(node):
|
||||||
trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered()
|
trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered()
|
||||||
discard t.sendPending(node)
|
discard t.sendPending(node)
|
||||||
|
|||||||
@ -101,7 +101,7 @@ proc nodesAtDistanceUniqueIp*(
|
|||||||
|
|
||||||
proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool =
|
proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool =
|
||||||
# Add it as a seen node, warning: for testing convenience only!
|
# Add it as a seen node, warning: for testing convenience only!
|
||||||
n.seen = true
|
n.registerSeen()
|
||||||
d.addNode(n)
|
d.addNode(n)
|
||||||
|
|
||||||
func udpExample*(_: type MultiAddress): MultiAddress =
|
func udpExample*(_: type MultiAddress): MultiAddress =
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user