nim-eth/eth/p2p/discoveryv5/routing_table.nim
Kim De Mey 9a467225fb
Add routing table metrics + tweaks + fixes (#261)
- routing table metrics + option in dcli
- only forward "seen" nodes on a findNode request
- setJustSeen & replace on ping AND findnode
- self lookup only at start
- revalidate 10x more
- use bitsPerHop (b) of 5
- small fix in resolve
- small fix in bucket split
2020-06-30 13:35:15 +02:00

375 lines
14 KiB
Nim

import
std/[algorithm, times, sequtils, bitops, random, sets, options],
stint, chronicles, metrics,
node
export options
{.push raises: [Defect].}
declarePublicGauge routing_table_nodes,
"Discovery routing table nodes", labels = ["state"]
type
RoutingTable* = object
thisNode: Node
buckets: seq[KBucket]
bitsPerHop: int ## This value indicates how many bits (at minimum) you get
## closer to finding your target per query. Practically, it tells you also
## how often your "not in range" branch will split off. Setting this to 1
## is the basic, non accelerated version, which will never split off the
## not in range branch and which will result in log base2 n hops per lookup.
## Setting it higher will increase the amount of splitting on a not in range
## branch (thus holding more nodes with a better keyspace coverage) and this
## will result in an improvement of log base(2^b) n hops per lookup.
KBucket = ref object
istart, iend: NodeId ## Range of NodeIds this KBucket covers. This is not a
## simple logarithmic distance as buckets can be split over a prefix that
## does not cover the `thisNode` id.
nodes: seq[Node] ## Node entries of the KBucket. Sorted according to last
## time seen. First entry (head) is considered the most recently seen node
## and the last entry (tail) is considered the least recently seen node.
## Here "seen" means a successful request-response. This can also not have
## occured yet.
replacementCache: seq[Node] ## Nodes that could not be added to the `nodes`
## seq as it is full and without stale nodes. This is practically a small
## LRU cache.
lastUpdated: float ## epochTime of last update to `nodes` in the KBucket.
const
BUCKET_SIZE* = 16
REPLACEMENT_CACHE_SIZE* = 8
ID_SIZE = 256
proc distanceTo(n: Node, id: NodeId): UInt256 =
## Calculate the distance to a NodeId.
n.id xor id
proc logDist*(a, b: NodeId): uint32 =
## Calculate the logarithmic distance between two `NodeId`s.
##
## According the specification, this is the log base 2 of the distance. But it
## is rather the log base 2 of the distance + 1, as else the 0 value can not
## be used (e.g. by FindNode call to return peer its own ENR)
## For NodeId of 256 bits, range is 0-256.
let a = a.toBytes
let b = b.toBytes
var lz = 0
for i in countdown(a.len - 1, 0):
let x = a[i] xor b[i]
if x == 0:
lz += 8
else:
lz += bitops.countLeadingZeroBits(x)
break
return uint32(a.len * 8 - lz)
proc newKBucket(istart, iend: NodeId): KBucket =
result.new()
result.istart = istart
result.iend = iend
result.nodes = @[]
result.replacementCache = @[]
proc midpoint(k: KBucket): NodeId =
k.istart + (k.iend - k.istart) div 2.u256
proc distanceTo(k: KBucket, id: NodeId): UInt256 = k.midpoint xor id
proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] =
sortedByIt(k.nodes, it.distanceTo(id))
proc len(k: KBucket): int {.inline.} = k.nodes.len
proc tail(k: KBucket): Node {.inline.} = k.nodes[high(k.nodes)]
proc add(k: KBucket, n: Node): Node =
## Try to add the given node to this bucket.
##
## If the node is already present, nothing is done, as the node should only
## be moved in case of a new succesful request-reponse.
##
## If the node is not already present and the bucket has fewer than k entries,
## it is inserted as the last entry of the bucket (least recently seen node),
## and nil is returned.
##
## If the bucket is full, the node at the last entry of the bucket (least
## recently seen), which should be evicted if it fails to respond to a ping,
## is returned.
##
## Reasoning here is that adding nodes will happen for a big part from
## lookups, which do not necessarily return nodes that are (still) reachable.
## So, more trust is put in the own ordering and newly additions are added
## as least recently seen (in fact they are never seen yet from this node its
## perspective).
## However, in discovery v5 it can be that a node is added after a incoming
## request, and considering a handshake that needs to be done, it is likely
## that this node is reachable. An additional `addSeen` proc could be created
## for this,
k.lastUpdated = epochTime()
let nodeIdx = k.nodes.find(n)
if nodeIdx != -1:
return nil
elif k.len < BUCKET_SIZE:
k.nodes.add(n)
routing_table_nodes.inc()
return nil
else:
return k.tail
proc addReplacement(k: KBucket, n: Node) =
## Add the node to the tail of the replacement cache of the KBucket.
##
## If the replacement cache is full, the oldest (first entry) node will be
## removed. If the node is already in the replacement cache, it will be moved
## to the tail.
let nodeIdx = k.replacementCache.find(n)
if nodeIdx != -1:
k.replacementCache.delete(nodeIdx)
k.replacementCache.add(n)
else:
doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE)
if k.replacementCache.len == REPLACEMENT_CACHE_SIZE:
k.replacementCache.delete(0)
k.replacementCache.add(n)
proc removeNode(k: KBucket, n: Node) =
let i = k.nodes.find(n)
if i != -1:
k.nodes.delete(i)
routing_table_nodes.dec()
proc split(k: KBucket): tuple[lower, upper: KBucket] =
## Split at the median id
let splitid = k.midpoint
result.lower = newKBucket(k.istart, splitid)
result.upper = newKBucket(splitid + 1.u256, k.iend)
for node in k.nodes:
let bucket = if node.id <= splitid: result.lower else: result.upper
bucket.nodes.add(node)
for node in k.replacementCache:
let bucket = if node.id <= splitid: result.lower else: result.upper
bucket.replacementCache.add(node)
proc inRange(k: KBucket, n: Node): bool {.inline.} =
k.istart <= n.id and n.id <= k.iend
proc contains(k: KBucket, n: Node): bool = n in k.nodes
proc binaryGetBucketForNode*(buckets: openarray[KBucket],
id: NodeId): KBucket =
## Given a list of ordered buckets, returns the bucket for a given `NodeId`.
## Returns nil if no bucket in range for given `id` is found.
let bucketPos = lowerBound(buckets, id) do(a: KBucket, b: NodeId) -> int:
cmp(a.iend, b)
# Prevent cases where `lowerBound` returns an out of range index e.g. at empty
# openarray, or when the id is out range for all buckets in the openarray.
if bucketPos < buckets.len:
let bucket = buckets[bucketPos]
if bucket.istart <= id and id <= bucket.iend:
result = bucket
proc computeSharedPrefixBits(nodes: openarray[NodeId]): int =
## Count the number of prefix bits shared by all nodes.
if nodes.len < 2:
return ID_SIZE
var mask = zero(UInt256)
let one = one(UInt256)
for i in 1 .. ID_SIZE:
mask = mask or (one shl (ID_SIZE - i))
let reference = nodes[0] and mask
for j in 1 .. nodes.high:
if (nodes[j] and mask) != reference: return i - 1
for n in nodes:
echo n.toHex()
# Reaching this would mean that all node ids are equal
doAssert(false, "Unable to calculate number of shared prefix bits")
proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = 8) {.inline.} =
r.thisNode = thisNode
r.buckets = @[newKBucket(0.u256, high(Uint256))]
r.bitsPerHop = bitsPerHop
randomize() # for later `randomNodes` selection
proc splitBucket(r: var RoutingTable, index: int) =
let bucket = r.buckets[index]
let (a, b) = bucket.split()
r.buckets[index] = a
r.buckets.insert(b, index + 1)
proc bucketForNode(r: RoutingTable, id: NodeId): KBucket =
result = binaryGetBucketForNode(r.buckets, id)
doAssert(not result.isNil(),
"Routing table should always cover the full id space")
proc removeNode*(r: var RoutingTable, n: Node) =
## Remove the node `n` from the routing table.
r.bucketForNode(n.id).removeNode(n)
proc addNode*(r: var RoutingTable, n: Node): Node =
## Try to add the node to the routing table.
##
## First, an attempt will be done to add the node to the bucket in its range.
## If this fails, the bucket will be split if it is eligable for splitting.
## If so, a new attempt will be done to add the node. If not, the node will be
## added to the replacement cache.
if n == r.thisNode:
# warn "Trying to add ourselves to the routing table", node = n
return
let bucket = r.bucketForNode(n.id)
let evictionCandidate = bucket.add(n)
if not evictionCandidate.isNil:
# Split if the bucket has the local node in its range or if the depth is not
# congruent to 0 mod `bitsPerHop`
#
# Calculate the prefix shared by all nodes in the bucket's range, not the
# ones actually in the bucket.
let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend])
if bucket.inRange(r.thisNode) or
(depth mod r.bitsPerHop != 0 and depth != ID_SIZE):
r.splitBucket(r.buckets.find(bucket))
return r.addNode(n) # retry adding
else:
# When bucket doesn't get split the node is added to the replacement cache
bucket.addReplacement(n)
# Nothing added, return evictionCandidate
return evictionCandidate
proc replaceNode*(r: var RoutingTable, n: Node) =
## Replace node `n` with last entry in the replacement cache. If there are
## no entries in the replacement cache, node `n` will simply be removed.
# TODO: Kademlia paper recommends here to not remove nodes if there are no
# replacements. However, that would require a bit more complexity in the
# revalidation as you don't want to try pinging that node all the time.
let b = r.bucketForNode(n.id)
let idx = b.nodes.find(n)
if idx != -1:
routing_table_nodes.dec()
if b.nodes[idx].seen:
routing_table_nodes.dec(labelValues = ["seen"])
b.nodes.delete(idx)
if b.replacementCache.len > 0:
b.nodes.add(b.replacementCache[high(b.replacementCache)])
routing_table_nodes.inc()
b.replacementCache.delete(high(b.replacementCache))
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
let b = r.bucketForNode(id)
for n in b.nodes:
if n.id == id:
return some(n)
proc contains*(r: RoutingTable, n: Node): bool = n in r.bucketForNode(n.id)
proc bucketsByDistanceTo(r: RoutingTable, id: NodeId): seq[KBucket] =
sortedByIt(r.buckets, it.distanceTo(id))
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
seenOnly = false): seq[Node] =
## Return up to k neighbours of the given node.
result = newSeqOfCap[Node](k * 2)
block addNodes:
for bucket in r.bucketsByDistanceTo(id):
for n in bucket.nodesByDistanceTo(id):
# Only provide actively seen nodes when `seenOnly` set
if not seenOnly or n.seen:
result.add(n)
if result.len == k * 2:
break addNodes
# TODO: is this sort still needed? Can we get nodes closer from the "next"
# bucket?
result = sortedByIt(result, it.distanceTo(id))
if result.len > k:
result.setLen(k)
proc idAtDistance*(id: NodeId, dist: uint32): NodeId =
## Calculate the "lowest" `NodeId` for given logarithmic distance.
## A logarithmic distance obviously covers a whole range of distances and thus
## potential `NodeId`s.
# xor the NodeId with 2^(d - 1) or one could say, calculate back the leading
# zeroes and xor those` with the id.
id xor (1.stuint(256) shl (dist.int - 1))
proc neighboursAtDistance*(r: RoutingTable, distance: uint32,
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
result = r.neighbours(idAtDistance(r.thisNode.id, distance), k, seenOnly)
# This is a bit silly, first getting closest nodes then to only keep the ones
# that are exactly the requested distance.
keepIf(result, proc(n: Node): bool = logDist(n.id, r.thisNode.id) == distance)
proc len*(r: RoutingTable): int =
for b in r.buckets: result += b.len
proc moveRight[T](arr: var openarray[T], a, b: int) =
## In `arr` move elements in range [a, b] right by 1.
var t: T
shallowCopy(t, arr[b + 1])
for i in countdown(b, a):
shallowCopy(arr[i + 1], arr[i])
shallowCopy(arr[a], t)
proc setJustSeen*(r: RoutingTable, n: Node) =
## Move `n` to the head (most recently seen) of its bucket.
## If `n` is not in the routing table, do nothing.
let b = r.bucketForNode(n.id)
let idx = b.nodes.find(n)
if idx >= 0:
if idx != 0:
b.nodes.moveRight(0, idx - 1)
b.lastUpdated = epochTime()
if not n.seen:
b.nodes[0].seen = true
routing_table_nodes.inc(labelValues = ["seen"])
proc nodeToRevalidate*(r: RoutingTable): Node =
## Return a node to revalidate. The least recently seen node from a random
## bucket is selected.
var buckets = r.buckets
shuffle(buckets)
# TODO: Should we prioritize less-recently-updated buckets instead? Could use
# `lastUpdated` for this, but it would probably make more sense to only update
# that value on revalidation then and rename it to `lastValidated`.
for b in buckets:
if b.len > 0:
return b.nodes[^1]
proc randomNodes*(r: RoutingTable, maxAmount: int,
pred: proc(x: Node): bool {.gcsafe, noSideEffect.} = nil): seq[Node] =
var maxAmount = maxAmount
let sz = r.len
if maxAmount > sz:
debug "Less peers in routing table than maximum requested",
requested = maxAmount, present = sz
maxAmount = sz
result = newSeqOfCap[Node](maxAmount)
var seen = initHashSet[Node]()
# This is a rather inefficient way of randomizing nodes from all buckets, but even if we
# iterate over all nodes in the routing table, the time it takes would still be
# insignificant compared to the time it takes for the network roundtrips when connecting
# to nodes.
# However, "time it takes" might not be relevant, as there might be no point
# in providing more `randomNodes` as the routing table might not have anything
# new to provide. And there is no way for the calling code to know this. So
# while it will take less total time compared to e.g. an (async)
# randomLookup, the time might be wasted as all nodes are possibly seen
# already.
while len(seen) < maxAmount:
# TODO: Is it important to get a better random source for these sample calls?
let bucket = sample(r.buckets)
if bucket.nodes.len != 0:
let node = sample(bucket.nodes)
if node notin seen:
seen.incl(node)
if pred.isNil() or node.pred:
result.add(node)