change node seen flag to an exponential moving average
keep defaults as before Signed-off-by: Csaba Kiraly <csaba.kiraly@gmail.com> # Conflicts: # codexdht/private/eth/p2p/discoveryv5/node.nim # codexdht/private/eth/p2p/discoveryv5/routing_table.nim
This commit is contained in:
parent
e1c1089e4f
commit
02bc12e639
|
@ -22,6 +22,7 @@ export stint
|
|||
|
||||
const
|
||||
avgSmoothingFactor = 0.9
|
||||
seenSmoothingFactor = 0.9
|
||||
|
||||
type
|
||||
NodeId* = UInt256
|
||||
|
@ -41,9 +42,10 @@ type
|
|||
pubkey*: PublicKey
|
||||
address*: Option[Address]
|
||||
record*: SignedPeerRecord
|
||||
seen*: bool ## Indicates if there was at least one successful
|
||||
seen*: float ## Indicates if there was at least one successful
|
||||
## request-response with this node, or if the nde was verified
|
||||
## through the underlying transport mechanisms.
|
||||
## through the underlying transport mechanisms. After first contact
|
||||
## it tracks how reliable is the communication with the node.
|
||||
stats*: Stats # traffic measurements and statistics
|
||||
|
||||
func toNodeId*(pid: PeerId): NodeId =
|
||||
|
@ -193,6 +195,18 @@ func shortLog*(address: Address): string =
|
|||
|
||||
chronicles.formatIt(Address): shortLog(it)
|
||||
|
||||
func registerSeen*(n:Node, seen = true) =
|
||||
## Register event of seeing (getting message from) or not seeing (missing message) node
|
||||
## Note: interpretation might depend on NAT type
|
||||
if n.seen == 0: # first time seeing the node
|
||||
n.seen = 1
|
||||
else:
|
||||
n.seen = seenSmoothingFactor * n.seen + (1.0 - seenSmoothingFactor) * seen.float
|
||||
|
||||
func alreadySeen*(n:Node) : bool =
|
||||
## Was the node seen at least once?
|
||||
n.seen > 0
|
||||
|
||||
# collecting performane metrics
|
||||
func registerRtt*(n: Node, rtt: Duration) =
|
||||
## register an RTT measurement
|
||||
|
|
|
@ -133,6 +133,9 @@ const
|
|||
MaxProvidersEntries* = 1_000_000 # one million records
|
||||
MaxProvidersPerEntry* = 20 # providers per entry
|
||||
## call
|
||||
FindnodeSeenThreshold = 1.0 ## threshold used as findnode response filter
|
||||
LookupSeenThreshold = 0.0 ## threshold used for lookup nodeset selection
|
||||
QuerySeenThreshold = 0.0 ## threshold used for query nodeset selection
|
||||
|
||||
func shortLog*(record: SignedPeerRecord): string =
|
||||
## Returns compact string representation of ``SignedPeerRecord``.
|
||||
|
@ -249,14 +252,14 @@ proc randomNodes*(d: Protocol, maxAmount: int,
|
|||
d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField))
|
||||
|
||||
proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE,
|
||||
seenOnly = false): seq[Node] =
|
||||
seenThreshold = 0.0): seq[Node] =
|
||||
## Return up to k neighbours (closest node ids) of the given node id.
|
||||
d.routingTable.neighbours(id, k, seenOnly)
|
||||
d.routingTable.neighbours(id, k, seenThreshold)
|
||||
|
||||
proc neighboursAtDistances*(d: Protocol, distances: seq[uint16],
|
||||
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
|
||||
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
|
||||
## Return up to k neighbours (closest node ids) at given distances.
|
||||
d.routingTable.neighboursAtDistances(distances, k, seenOnly)
|
||||
d.routingTable.neighboursAtDistances(distances, k, seenThreshold)
|
||||
|
||||
proc nodesDiscovered*(d: Protocol): int = d.routingTable.len
|
||||
|
||||
|
@ -344,7 +347,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
|
|||
# TODO: Still deduplicate also?
|
||||
if fn.distances.all(proc (x: uint16): bool = return x <= 256):
|
||||
d.sendNodes(fromId, fromAddr, reqId,
|
||||
d.routingTable.neighboursAtDistances(fn.distances, seenOnly = true, k = FindNodeResultLimit))
|
||||
d.routingTable.neighboursAtDistances(fn.distances, FindNodeResultLimit, FindnodeSeenThreshold))
|
||||
else:
|
||||
# At least one invalid distance, but the polite node we are, still respond
|
||||
# with empty nodes.
|
||||
|
@ -353,7 +356,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
|
|||
proc handleFindNodeFast(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||
fnf: FindNodeFastMessage, reqId: RequestId) =
|
||||
d.sendNodes(fromId, fromAddr, reqId,
|
||||
d.routingTable.neighbours(fnf.target, seenOnly = true, k = FindNodeFastResultLimit))
|
||||
d.routingTable.neighbours(fnf.target, FindNodeFastResultLimit, FindnodeSeenThreshold))
|
||||
# TODO: if known, maybe we should add exact target even if not yet "seen"
|
||||
|
||||
proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
|
||||
|
@ -669,7 +672,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]]
|
|||
# `closestNodes` holds the k closest nodes to target found, sorted by distance
|
||||
# Unvalidated nodes are used for requests as a form of validation.
|
||||
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
|
||||
seenOnly = false)
|
||||
LookupSeenThreshold)
|
||||
|
||||
var asked, seen = initHashSet[NodeId]()
|
||||
asked.incl(d.localNode.id) # No need to ask our own node
|
||||
|
@ -832,7 +835,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
|
|||
## This will take k nodes from the routing table closest to target and
|
||||
## query them for nodes closest to target. If there are less than k nodes in
|
||||
## the routing table, nodes returned by the first queries will be used.
|
||||
var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false)
|
||||
var queryBuffer = d.routingTable.neighbours(target, k, QuerySeenThreshold)
|
||||
|
||||
var asked, seen = initHashSet[NodeId]()
|
||||
asked.incl(d.localNode.id) # No need to ask our own node
|
||||
|
|
|
@ -218,7 +218,7 @@ proc remove(k: KBucket, n: Node): bool =
|
|||
let i = k.nodes.find(n)
|
||||
if i != -1:
|
||||
dht_routing_table_nodes.dec()
|
||||
if k.nodes[i].seen:
|
||||
if alreadySeen(k.nodes[i]):
|
||||
dht_routing_table_nodes.dec(labelValues = ["seen"])
|
||||
k.nodes.delete(i)
|
||||
trace "removed node:", node = n
|
||||
|
@ -476,16 +476,16 @@ proc nodesByDistanceTo(r: RoutingTable, k: KBucket, id: NodeId): seq[Node] =
|
|||
sortedByIt(k.nodes, r.distance(it.id, id))
|
||||
|
||||
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
|
||||
seenOnly = false): seq[Node] =
|
||||
seenThreshold = 0.0): seq[Node] =
|
||||
## Return up to k neighbours of the given node id.
|
||||
## When seenOnly is set to true, only nodes that have been contacted
|
||||
## previously successfully will be selected.
|
||||
## When seenThreshold is set, only nodes that have been contacted
|
||||
## previously successfully and were seen enough recently will be selected.
|
||||
result = newSeqOfCap[Node](k * 2)
|
||||
block addNodes:
|
||||
for bucket in r.bucketsByDistanceTo(id):
|
||||
for n in r.nodesByDistanceTo(bucket, id):
|
||||
# Only provide actively seen nodes when `seenOnly` set.
|
||||
if not seenOnly or n.seen:
|
||||
# Avoid nodes with 'seen' value below threshold
|
||||
if n.seen >= seenThreshold:
|
||||
result.add(n)
|
||||
if result.len == k * 2:
|
||||
break addNodes
|
||||
|
@ -497,22 +497,22 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
|
|||
result.setLen(k)
|
||||
|
||||
proc neighboursAtDistance*(r: RoutingTable, distance: uint16,
|
||||
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
|
||||
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
|
||||
## Return up to k neighbours at given logarithmic distance.
|
||||
result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenOnly)
|
||||
result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenThreshold)
|
||||
# This is a bit silly, first getting closest nodes then to only keep the ones
|
||||
# that are exactly the requested distance.
|
||||
keepIf(result, proc(n: Node): bool = r.logDistance(n.id, r.localNode.id) == distance)
|
||||
|
||||
proc neighboursAtDistances*(r: RoutingTable, distances: seq[uint16],
|
||||
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
|
||||
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
|
||||
## Return up to k neighbours at given logarithmic distances.
|
||||
# TODO: This will currently return nodes with neighbouring distances on the
|
||||
# first one prioritize. It might end up not including all the node distances
|
||||
# requested. Need to rework the logic here and not use the neighbours call.
|
||||
if distances.len > 0:
|
||||
result = r.neighbours(r.idAtDistance(r.localNode.id, distances[0]), k,
|
||||
seenOnly)
|
||||
seenThreshold)
|
||||
# This is a bit silly, first getting closest nodes then to only keep the ones
|
||||
# that are exactly the requested distances.
|
||||
keepIf(result, proc(n: Node): bool =
|
||||
|
@ -529,18 +529,19 @@ proc moveRight[T](arr: var openArray[T], a, b: int) =
|
|||
shallowCopy(arr[i + 1], arr[i])
|
||||
shallowCopy(arr[a], t)
|
||||
|
||||
proc setJustSeen*(r: RoutingTable, n: Node) =
|
||||
## Move `n` to the head (most recently seen) of its bucket.
|
||||
proc setJustSeen*(r: RoutingTable, n: Node, seen = true) =
|
||||
## If seen, move `n` to the head (most recently seen) of its bucket.
|
||||
## If `n` is not in the routing table, do nothing.
|
||||
let b = r.bucketForNode(n.id)
|
||||
if seen:
|
||||
let idx = b.nodes.find(n)
|
||||
if idx >= 0:
|
||||
if idx != 0:
|
||||
b.nodes.moveRight(0, idx - 1)
|
||||
|
||||
if not n.seen:
|
||||
b.nodes[0].seen = true
|
||||
if not alreadySeen(n): # first time seeing the node
|
||||
dht_routing_table_nodes.inc(labelValues = ["seen"])
|
||||
n.registerSeen(seen)
|
||||
|
||||
proc nodeToRevalidate*(r: RoutingTable): Node =
|
||||
## Return a node to revalidate. The least recently seen node from a random
|
||||
|
|
|
@ -231,7 +231,8 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
|
|||
if node.address.isSome() and a == node.address.get():
|
||||
# TODO: maybe here we could verify that the address matches what we were
|
||||
# sending the 'whoareyou' message to. In that case, we can set 'seen'
|
||||
node.seen = true
|
||||
# TODO: verify how this works with restrictive NAT and firewall scenarios.
|
||||
node.registerSeen()
|
||||
if t.client.addNode(node):
|
||||
trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered()
|
||||
discard t.sendPending(node)
|
||||
|
|
|
@ -101,7 +101,7 @@ proc nodesAtDistanceUniqueIp*(
|
|||
|
||||
proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool =
|
||||
# Add it as a seen node, warning: for testing convenience only!
|
||||
n.seen = true
|
||||
n.registerSeen()
|
||||
d.addNode(n)
|
||||
|
||||
func udpExample*(_: type MultiAddress): MultiAddress =
|
||||
|
|
Loading…
Reference in New Issue