Discv5 ip limits for routing table (#308)

* Add ip limits to routing table and routing table buckets

* Fix order of ip limit check and duplicate check for replacement

* Fix ip limit for node with updated ip in ENR

* Fix bug where address wouldn't update on ENR update

and update some comments

* Reuse some add/remove code in routing table

* Fix seen bug on ENR update in routing table

* Rework addNode to make sure to do address check always

and adjust some logs.

* More documentation on the ip limits in routing table [skip ci]
This commit is contained in:
Kim De Mey 2020-11-26 18:20:15 +01:00 committed by GitHub
parent b88fef203b
commit b4c1391be9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 605 additions and 146 deletions

27
eth/net/utils.nim Normal file
View File

@ -0,0 +1,27 @@
import
std/[tables, hashes],
stew/shims/net as stewNet
{.push raises: [Defect].}
type
IpLimits* = object
limit*: uint
ips: Table[ValidIpAddress, uint]
proc hash(ip: ValidIpAddress): Hash = hash($ip)
proc inc*(ipLimits: var IpLimits, ip: ValidIpAddress): bool =
let val = ipLimits.ips.getOrDefault(ip, 0)
if val < ipLimits.limit:
ipLimits.ips[ip] = val + 1
true
else:
false
proc dec*(ipLimits: var IpLimits, ip: ValidIpAddress) =
let val = ipLimits.ips.getOrDefault(ip, 0)
if val == 1:
ipLimits.ips.del(ip)
elif val > 1:
ipLimits.ips[ip] = val - 1

View File

@ -46,6 +46,19 @@ func newNode*(r: Record): Result[Node, cstring] =
ok(Node(id: pk.get().toNodeId(), pubkey: pk.get(), record: r,
address: none(Address)))
proc updateNode*(n: Node, pk: PrivateKey, ip: Option[ValidIpAddress],
tcpPort, udpPort: Port, extraFields: openarray[FieldPair] = []):
Result[void, cstring] =
? n.record.update(pk, ip, tcpPort, udpPort, extraFields)
if ip.isSome():
let a = Address(ip: ip.get(), port: Port(udpPort))
n.address = some(a)
else:
n.address = none(Address)
ok()
func hash*(n: Node): hashes.Hash = hash(n.pubkey.toRaw)
func `==`*(a, b: Node): bool =
(a.isNil and b.isNil) or

View File

@ -127,11 +127,12 @@ type
proc addNode*(d: Protocol, node: Node): bool =
## Add `Node` to discovery routing table.
##
## Returns false only if `Node` is not eligable for adding (no Address).
if node.address.isSome():
# Only add nodes with an address to the routing table
discard d.routingTable.addNode(node)
## Returns true only when `Node` was added as a new entry to a bucket in the
## routing table.
if d.routingTable.addNode(node) == Added:
return true
else:
return false
proc addNode*(d: Protocol, r: Record): bool =
## Add `Node` from a `Record` to discovery routing table.
@ -393,10 +394,10 @@ proc receive*(d: Protocol, a: Address, packet: openArray[byte]) {.gcsafe,
# Not filling table with nodes without correct IP in the ENR
# TODO: Should we care about this???
if node.address.isSome() and a == node.address.get():
debug "Adding new node to routing table", node
discard d.addNode(node)
if d.addNode(node):
trace "Added new node to routing table after handshake", node
else:
debug "Packet decoding error", error = decoded.error, address = a
trace "Packet decoding error", error = decoded.error, address = a
# TODO: Not sure why but need to pop the raises here as it is apparently not
# enough to put it in the raises pragma of `processClient` and other async procs.
@ -641,7 +642,7 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
inc i
for n in result:
discard d.routingTable.addNode(n)
discard d.addNode(n)
proc lookup*(d: Protocol, target: NodeId): Future[seq[Node]]
{.async, raises: [Exception, Defect].} =
@ -760,7 +761,9 @@ proc newProtocol*(privKey: PrivateKey,
localEnrFields: openarray[(string, seq[byte])] = [],
bootstrapRecords: openarray[Record] = [],
previousRecord = none[enr.Record](),
bindIp = IPv4_any(), rng = newRng()):
bindIp = IPv4_any(),
tableIpLimits = DefaultTableIpLimits,
rng = newRng()):
Protocol {.raises: [Defect].} =
# TODO: Tried adding bindPort = udpPort as parameter but that gave
# "Error: internal error: environment misses: udpPort" in nim-beacon-chain.
@ -793,7 +796,7 @@ proc newProtocol*(privKey: PrivateKey,
bootstrapRecords: @bootstrapRecords,
rng: rng)
result.routingTable.init(node, 5, rng)
result.routingTable.init(node, DefaultBitsPerHop, tableIpLimits, rng)
proc open*(d: Protocol) {.raises: [Exception, Defect].} =
info "Starting discovery node", node = d.localNode,
@ -808,8 +811,10 @@ proc open*(d: Protocol) {.raises: [Exception, Defect].} =
d.transp = newDatagramTransport(processClient, udata = d, local = ta)
for record in d.bootstrapRecords:
debug "Adding bootstrap node", uri = toURI(record)
discard d.addNode(record)
if d.addNode(record):
debug "Added bootstrap node", uri = toURI(record)
else:
debug "Bootstrap node could not be added", uri = toURI(record)
proc start*(d: Protocol) {.raises: [Exception, Defect].} =
d.lookupLoop = lookupLoop(d)

View File

@ -1,7 +1,8 @@
import
std/[algorithm, times, sequtils, bitops, sets, options],
stint, chronicles, metrics, bearssl,
node, random2
stint, chronicles, metrics, bearssl, chronos, stew/shims/net as stewNet,
../../net/utils,
node, random2, enr
export options
@ -22,6 +23,8 @@ type
## Setting it higher will increase the amount of splitting on a not in range
## branch (thus holding more nodes with a better keyspace coverage) and this
## will result in an improvement of log base(2^b) n hops per lookup.
ipLimits: IpLimits ## IP limits for total routing table: all buckets and
## replacement caches.
rng: ref BrHmacDrbgContext
KBucket = ref object
@ -37,12 +40,52 @@ type
## seq as it is full and without stale nodes. This is practically a small
## LRU cache.
lastUpdated: float ## epochTime of last update to `nodes` in the KBucket.
ipLimits: IpLimits ## IP limits for bucket: node entries and replacement
## cache entries combined.
## The routing table IP limits are applied on both the total table, and on the
## individual buckets. In each case, the active node entries, but also the
## entries waiting in the replacement cache are accounted for. This way, the
## replacement cache can't get filled with nodes that then can't be added due
## to the limits that apply.
##
## As entries are not verified (=contacted) immediately before or on entry, it
## is possible that a malicious node could fill (poison) the routing table or
## a specific bucket with ENRs with IPs it does not control. The effect of
## this would be that a node that actually owns the IP could have a difficult
## time getting its ENR distrubuted in the DHT and as a consequence would
## not be reached from the outside as much (or at all). However, that node can
## still search and find nodes to connect to. So it would practically be a
## similar situation as a node that is not reachable behind the NAT because
## port mapping is not set up properly.
## There is the possiblity to set the IP limit on verified (=contacted) nodes
## only, but that would allow for lookups to be done on a higher set of nodes
## owned by the same identity. This is a worse alternative.
## Next, doing lookups only on verified nodes would slow down discovery start
## up.
TableIpLimits* = object
tableIpLimit*: uint
bucketIpLimit*: uint
NodeStatus* = enum
Added
LocalNode
Existing
IpLimitReached
ReplacementAdded
ReplacementExisting
NoAddress
const
BUCKET_SIZE* = 16 ## Maximum amount of nodes per bucket
REPLACEMENT_CACHE_SIZE* = 8 ## Maximum amount of nodes per replacement cache
## of a bucket
ID_SIZE = 256
DefaultBitsPerHop* = 5
DefaultBucketIpLimit* = 2'u
DefaultTableIpLimit* = 10'u
DefaultTableIpLimits* = TableIpLimits(tableIpLimit: DefaultTableIpLimit,
bucketIpLimit: DefaultBucketIpLimit)
proc distanceTo(n: Node, id: NodeId): UInt256 =
## Calculate the distance to a NodeId.
@ -67,12 +110,13 @@ proc logDist*(a, b: NodeId): uint32 =
break
return uint32(a.len * 8 - lz)
proc newKBucket(istart, iend: NodeId): KBucket =
proc newKBucket(istart, iend: NodeId, bucketIpLimit: uint): KBucket =
result.new()
result.istart = istart
result.iend = iend
result.nodes = @[]
result.replacementCache = @[]
result.ipLimits.limit = bucketIpLimit
proc midpoint(k: KBucket): NodeId =
k.istart + (k.iend - k.istart) div 2.u256
@ -84,79 +128,64 @@ proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] =
proc len(k: KBucket): int {.inline.} = k.nodes.len
proc tail(k: KBucket): Node {.inline.} = k.nodes[high(k.nodes)]
proc add(k: KBucket, n: Node): Node =
## Try to add the given node to this bucket.
##
## If the node is already present, nothing is done, as the node should only
## be moved in case of a new succesful request-reponse.
##
## If the node is not already present and the bucket has fewer than k entries,
## it is inserted as the last entry of the bucket (least recently seen node),
## and nil is returned.
##
## If the bucket is full, the node at the last entry of the bucket (least
## recently seen), which should be evicted if it fails to respond to a ping,
## is returned.
##
## Reasoning here is that adding nodes will happen for a big part from
## lookups, which do not necessarily return nodes that are (still) reachable.
## So, more trust is put in the own ordering and newly additions are added
## as least recently seen (in fact they are never seen yet from this node its
## perspective).
## However, in discovery v5 it can be that a node is added after a incoming
## request, and considering a handshake that needs to be done, it is likely
## that this node is reachable. An additional `addSeen` proc could be created
## for this.
k.lastUpdated = epochTime()
let nodeIdx = k.nodes.find(n)
if nodeIdx != -1:
if k.nodes[nodeIdx].record.seqNum < n.record.seqNum:
# In case of a newer record, it gets replaced.
k.nodes[nodeIdx].record = n.record
return nil
elif k.len < BUCKET_SIZE:
proc ipLimitInc(r: var RoutingTable, b: KBucket, n: Node): bool =
## Check if the ip limits of the routing table and the bucket are reached for
## the specified `Node` its ip.
## When one of the ip limits is reached return false, else increment them and
## return true.
let ip = n.address.get().ip # Node from table should always have an address
# Check ip limit for bucket
if not b.ipLimits.inc(ip):
return false
# Check ip limit for routing table
if not r.ipLimits.inc(ip):
b.iplimits.dec(ip)
return false
return true
proc ipLimitDec(r: var RoutingTable, b: KBucket, n: Node) =
## Decrement the ip limits of the routing table and the bucket for the
## specified `Node` its ip.
let ip = n.address.get().ip # Node from table should always have an address
b.ipLimits.dec(ip)
r.ipLimits.dec(ip)
proc add(k: KBucket, n: Node) =
k.nodes.add(n)
routing_table_nodes.inc()
return nil
else:
return k.tail
proc addReplacement(k: KBucket, n: Node) =
## Add the node to the tail of the replacement cache of the KBucket.
##
## If the replacement cache is full, the oldest (first entry) node will be
## removed. If the node is already in the replacement cache, it will be moved
## to the tail.
let nodeIdx = k.replacementCache.find(n)
if nodeIdx != -1:
if k.replacementCache[nodeIdx].record.seqNum <= n.record.seqNum:
# In case the record sequence number is higher or the same, the node gets
# moved to the tail.
k.replacementCache.delete(nodeIdx)
k.replacementCache.add(n)
else:
doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE)
if k.replacementCache.len == REPLACEMENT_CACHE_SIZE:
k.replacementCache.delete(0)
k.replacementCache.add(n)
proc removeNode(k: KBucket, n: Node) =
proc remove(k: KBucket, n: Node): bool =
let i = k.nodes.find(n)
if i != -1:
k.nodes.delete(i)
routing_table_nodes.dec()
if k.nodes[i].seen:
routing_table_nodes.dec(labelValues = ["seen"])
k.nodes.delete(i)
true
else:
false
proc split(k: KBucket): tuple[lower, upper: KBucket] =
## Split the kbucket `k` at the median id.
let splitid = k.midpoint
result.lower = newKBucket(k.istart, splitid)
result.upper = newKBucket(splitid + 1.u256, k.iend)
result.lower = newKBucket(k.istart, splitid, k.ipLimits.limit)
result.upper = newKBucket(splitid + 1.u256, k.iend, k.ipLimits.limit)
for node in k.nodes:
let bucket = if node.id <= splitid: result.lower else: result.upper
bucket.nodes.add(node)
# Ip limits got reset because of the newKBuckets, so there is the need to
# increment again for each added node. It should however never fail as the
# previous bucket had the same limits.
doAssert(bucket.ipLimits.inc(node.address.get().ip),
"IpLimit increment should work as all buckets have the same limits")
for node in k.replacementCache:
let bucket = if node.id <= splitid: result.lower else: result.upper
bucket.replacementCache.add(node)
doAssert(bucket.ipLimits.inc(node.address.get().ip),
"IpLimit increment should work as all buckets have the same limits")
proc inRange(k: KBucket, n: Node): bool {.inline.} =
k.istart <= n.id and n.id <= k.iend
@ -197,13 +226,14 @@ proc computeSharedPrefixBits(nodes: openarray[NodeId]): int =
# Reaching this would mean that all node ids are equal.
doAssert(false, "Unable to calculate number of shared prefix bits")
proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = 5,
rng: ref BrHmacDrbgContext) {.inline.} =
proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = DefaultBitsPerHop,
ipLimits = DefaultTableIpLimits, rng: ref BrHmacDrbgContext) {.inline.} =
## Initialize the routing table for provided `Node` and bitsPerHop value.
## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper.
r.thisNode = thisNode
r.buckets = @[newKBucket(0.u256, high(Uint256))]
r.buckets = @[newKBucket(0.u256, high(Uint256), ipLimits.bucketIpLimit)]
r.bitsPerHop = bitsPerHop
r.ipLimits.limit = ipLimits.tableIpLimit
r.rng = rng
proc splitBucket(r: var RoutingTable, index: int) =
@ -217,39 +247,122 @@ proc bucketForNode(r: RoutingTable, id: NodeId): KBucket =
doAssert(not result.isNil(),
"Routing table should always cover the full id space")
proc removeNode*(r: var RoutingTable, n: Node) =
## Remove the node `n` from the routing table.
r.bucketForNode(n.id).removeNode(n)
proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus =
## Add the node to the tail of the replacement cache of the KBucket.
##
## If the replacement cache is full, the oldest (first entry) node will be
## removed. If the node is already in the replacement cache, it will be moved
## to the tail.
## When the IP of the node has reached the IP limits for the bucket or the
## total routing table, the node will not be added to the replacement cache.
let nodeIdx = k.replacementCache.find(n)
if nodeIdx != -1:
if k.replacementCache[nodeIdx].record.seqNum <= n.record.seqNum:
# In case the record sequence number is higher or the same, the new node
# gets moved to the tail.
if k.replacementCache[nodeIdx].address.get().ip != n.address.get().ip:
if not ipLimitInc(r, k, n):
return IpLimitReached
ipLimitDec(r, k, k.replacementCache[nodeIdx])
k.replacementCache.delete(nodeIdx)
k.replacementCache.add(n)
return ReplacementExisting
elif not ipLimitInc(r, k, n):
return IpLimitReached
else:
doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE)
proc addNode*(r: var RoutingTable, n: Node): Node =
if k.replacementCache.len == REPLACEMENT_CACHE_SIZE:
# Remove ip from limits for the to be deleted node.
ipLimitDec(r, k, k.replacementCache[0])
k.replacementCache.delete(0)
k.replacementCache.add(n)
return ReplacementAdded
proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
## Try to add the node to the routing table.
##
## First, an attempt will be done to add the node to the bucket in its range.
## If this fails, the bucket will be split if it is eligable for splitting.
## If so, a new attempt will be done to add the node. If not, the node will be
## added to the replacement cache.
##
## In case the node was already in the table, it will be updated if it has a
## newer record.
## When the IP of the node has reached the IP limits for the bucket or the
## total routing table, the node will not be added to the bucket, nor its
## replacement cache.
# Don't allow nodes without an address field in the ENR to be added.
# This could also be reworked by having another Node type that always has an
# address.
if n.address.isNone():
return NoAddress
if n == r.thisNode:
# warn "Trying to add ourselves to the routing table", node = n
return
return LocalNode
let bucket = r.bucketForNode(n.id)
let evictionCandidate = bucket.add(n)
if not evictionCandidate.isNil:
# Split if the bucket has the local node in its range or if the depth is not
# congruent to 0 mod `bitsPerHop`
#
## Check if the node is already present. If so, check if the record requires
## updating.
let nodeIdx = bucket.nodes.find(n)
if nodeIdx != -1:
if bucket.nodes[nodeIdx].record.seqNum < n.record.seqNum:
# In case of a newer record, it gets replaced.
if bucket.nodes[nodeIdx].address.get().ip != n.address.get().ip:
if not ipLimitInc(r, bucket, n):
return IpLimitReached
ipLimitDec(r, bucket, bucket.nodes[nodeIdx])
# Copy over the seen status, we trust here that after the ENR update the
# node will still be reachable, but it might not be the case.
n.seen = bucket.nodes[nodeIdx].seen
bucket.nodes[nodeIdx] = n
return Existing
# If the bucket has fewer than `BUCKET_SIZE` entries, it is inserted as the
# last entry of the bucket (least recently seen node). If the bucket is
# full, it might get split and adding is retried, else it is added as a
# replacement.
# Reasoning here is that adding nodes will happen for a big part from
# lookups, which do not necessarily return nodes that are (still) reachable.
# So, more trust is put in the own ordering by actually contacting peers and
# newly additions are added as least recently seen (in fact they have not been
# seen yet from our node its perspective).
# However, in discovery v5 a node can also be added after a incoming request
# if a handshake is done and an ENR is provided, and considering that this
# handshake needs to be done, it is more likely that this node is reachable.
# However, it is not certain and depending on different NAT mechanisms and
# timers it might still fail. For this reason we currently do not add a way to
# immediately add nodes to the most recently seen spot.
if bucket.len < BUCKET_SIZE:
if not ipLimitInc(r, bucket, n):
return IpLimitReached
bucket.add(n)
else:
# Bucket must be full, but lets see if it should be split the bucket.
# Calculate the prefix shared by all nodes in the bucket's range, not the
# ones actually in the bucket.
let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend])
# Split if the bucket has the local node in its range or if the depth is not
# congruent to 0 mod `bitsPerHop`
if bucket.inRange(r.thisNode) or
(depth mod r.bitsPerHop != 0 and depth != ID_SIZE):
r.splitBucket(r.buckets.find(bucket))
return r.addNode(n) # retry adding
else:
# When bucket doesn't get split the node is added to the replacement cache
bucket.addReplacement(n)
return r.addReplacement(bucket, n)
# Nothing added, return evictionCandidate
return evictionCandidate
proc removeNode*(r: var RoutingTable, n: Node) =
## Remove the node `n` from the routing table.
let b = r.bucketForNode(n.id)
if b.remove(n):
ipLimitDec(r, b, n)
proc replaceNode*(r: var RoutingTable, n: Node) =
## Replace node `n` with last entry in the replacement cache. If there are
@ -258,16 +371,12 @@ proc replaceNode*(r: var RoutingTable, n: Node) =
# replacements. However, that would require a bit more complexity in the
# revalidation as you don't want to try pinging that node all the time.
let b = r.bucketForNode(n.id)
let idx = b.nodes.find(n)
if idx != -1:
routing_table_nodes.dec()
if b.nodes[idx].seen:
routing_table_nodes.dec(labelValues = ["seen"])
b.nodes.delete(idx)
if b.remove(n):
ipLimitDec(r, b, n)
if b.replacementCache.len > 0:
b.nodes.add(b.replacementCache[high(b.replacementCache)])
routing_table_nodes.inc()
# Nodes in the replacement cache are already included in the ip limits.
b.add(b.replacementCache[high(b.replacementCache)])
b.replacementCache.delete(high(b.replacementCache))
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =

View File

@ -1,9 +1,11 @@
import
stew/shims/net, bearssl,
stew/shims/net, bearssl, chronos,
eth/keys,
eth/p2p/discoveryv5/[enr, node, routing_table],
eth/p2p/discoveryv5/protocol as discv5_protocol
export net
proc localAddress*(port: int): Address =
Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port))
@ -13,12 +15,17 @@ proc initDiscoveryNode*(rng: ref BrHmacDrbgContext, privKey: PrivateKey,
localEnrFields: openarray[(string, seq[byte])] = [],
previousRecord = none[enr.Record]()):
discv5_protocol.Protocol =
# set bucketIpLimit to allow bucket split
let tableIpLimits = TableIpLimits(tableIpLimit: 1000, bucketIpLimit: 24)
result = newProtocol(privKey,
some(address.ip),
address.port, address.port,
bootstrapRecords = bootstrapRecords,
localEnrFields = localEnrFields,
previousRecord = previousRecord, rng = rng)
previousRecord = previousRecord,
tableIpLimits = tableIpLimits,
rng = rng)
result.open()
@ -27,22 +34,39 @@ proc nodeIdInNodes*(id: NodeId, nodes: openarray[Node]): bool =
if id == n.id: return true
proc generateNode*(privKey: PrivateKey, port: int = 20302,
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1"),
localEnrFields: openarray[FieldPair] = []): Node =
let port = Port(port)
let enr = enr.Record.init(1, privKey, some(ValidIpAddress.init("127.0.0.1")),
let enr = enr.Record.init(1, privKey, some(ip),
port, port, localEnrFields).expect("Properly intialized private key")
result = newNode(enr).expect("Properly initialized node")
proc nodeAtDistance*(n: Node, rng: var BrHmacDrbgContext, d: uint32): Node =
proc nodeAndPrivKeyAtDistance*(n: Node, rng: var BrHmacDrbgContext, d: uint32,
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): (Node, PrivateKey) =
while true:
let node = generateNode(PrivateKey.random(rng))
let pk = PrivateKey.random(rng)
let node = generateNode(pk, ip = ip)
if logDist(n.id, node.id) == d:
return node
return (node, pk)
proc nodeAtDistance*(n: Node, rng: var BrHmacDrbgContext, d: uint32,
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node =
let (node, _) = n.nodeAndPrivKeyAtDistance(rng, d, ip)
node
proc nodesAtDistance*(
n: Node, rng: var BrHmacDrbgContext, d: uint32, amount: int): seq[Node] =
n: Node, rng: var BrHmacDrbgContext, d: uint32, amount: int,
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): seq[Node] =
for i in 0..<amount:
result.add(nodeAtDistance(n, rng, d))
result.add(nodeAtDistance(n, rng, d, ip))
proc nodesAtDistanceUniqueIp*(
n: Node, rng: var BrHmacDrbgContext, d: uint32, amount: int,
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): seq[Node] =
var ta = initTAddress(ip, Port(0))
for i in 0..<amount:
ta.inc()
result.add(nodeAtDistance(n, rng, d, ValidIpAddress.init(ta.address())))
proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool =
# Add it as a seen node, warning: for testing convenience only!

View File

@ -319,7 +319,7 @@ procSuite "Discovery v5 Tests":
n.get().record.seqNum == targetSeqNum
# Add the updated version
check mainNode.addNode(n.get())
discard mainNode.addNode(n.get())
# Update seqNum in ENR again, ping lookupNode to be added in routing table,
# close targetNode, resolve should lookup, check if we get updated ENR.

View File

@ -1,93 +1,108 @@
import
std/unittest,
bearssl, eth/keys, eth/p2p/discoveryv5/[routing_table, node],
bearssl, eth/keys, eth/p2p/discoveryv5/[routing_table, node, enr],
./discv5_test_helper
suite "Routing Table Tests":
let rng = newRng()
# Used for testing. Could also at runtime check whether the address is the
# loopback address as these are only allowed to be added when coming from
# another loopback nodes, however that check is done in the protocol code and
# thus independent of routing_table.
let ipLimits = TableIpLimits(tableIpLimit: 200,
bucketIpLimit: BUCKET_SIZE + REPLACEMENT_CACHE_SIZE + 1)
test "Add local node":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
table.init(node, 1, ipLimits, rng = rng)
check table.addNode(node) == LocalNode
test "Bucket splitting in range branch b=1":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
for j in 0..5'u32:
for i in 0..<BUCKET_SIZE:
check table.addNode(node.nodeAtDistance(rng[], 256-j)) == nil
check table.addNode(node.nodeAtDistance(rng[], 256-j)) != nil
check table.addNode(node.nodeAtDistance(rng[], 256-j)) == Added
check table.addNode(node.nodeAtDistance(rng[], 256-j)) == ReplacementAdded
test "Bucket splitting off range branch b=1":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
# Add 16 nodes, distance 256
for i in 0..<BUCKET_SIZE:
check table.addNode(node.nodeAtDistance(rng[], 256)) == nil
check table.addNode(node.nodeAtDistance(rng[], 256)) == Added
# This should split the bucket in the distance 256 branch, and the distance
# <=255 branch. But not add the node, as distance 256 bucket is already full
# and b=1 will not allow it to spit any further
check table.addNode(node.nodeAtDistance(rng[], 256)) != nil
check table.addNode(node.nodeAtDistance(rng[], 256)) == ReplacementAdded
# This add should be allowed as it is on the branch where the own node's id
# id belongs to.
check table.addNode(node.nodeAtDistance(rng[], 255)) == nil
check table.addNode(node.nodeAtDistance(rng[], 255)) == Added
test "Bucket splitting off range branch b=2":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
# bitsPerHop = 2, allow not in range branch to split once (2 buckets).
table.init(node, 2, rng)
table.init(node, 2, ipLimits, rng = rng)
# Add 16 nodes, distance 256 from `node`, but all with 2 bits shared prefix
# among themselves.
let firstNode = node.nodeAtDistance(rng[], 256)
check table.addNode(firstNode) == nil
check table.addNode(firstNode) == Added
for n in 1..<BUCKET_SIZE:
check table.addNode(firstNode.nodeAtDistance(rng[], 254)) == nil
check table.addNode(firstNode.nodeAtDistance(rng[], 254)) == Added
# Add 16 more nodes with only 1 bit shared prefix with previous 16. This
# should cause the initial bucket to split and and fill the second bucket
# with the 16 new entries.
for n in 0..<BUCKET_SIZE:
check table.addNode(firstNode.nodeAtDistance(rng[], 255)) == nil
check table.addNode(firstNode.nodeAtDistance(rng[], 255)) == Added
# Adding another should fail as both buckets will be full and not be
# allowed to split another time.
check table.addNode(node.nodeAtDistance(rng[], 256)) != nil
check table.addNode(node.nodeAtDistance(rng[], 256)) == ReplacementAdded
# And also when targetting one of the two specific buckets.
check table.addNode(firstNode.nodeAtDistance(rng[], 255)) != nil
check table.addNode(firstNode.nodeAtDistance(rng[], 254)) != nil
check table.addNode(firstNode.nodeAtDistance(rng[], 255)) == ReplacementAdded
check table.addNode(firstNode.nodeAtDistance(rng[], 254)) == ReplacementAdded
# This add should be allowed as it is on the branch where the own node's id
# id belongs to.
check table.addNode(node.nodeAtDistance(rng[], 255)) == nil
check table.addNode(node.nodeAtDistance(rng[], 255)) == Added
test "Replacement cache":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
# create a full bucket
let bucketNodes = node.nodesAtDistance(rng[], 256, BUCKET_SIZE)
for n in bucketNodes:
check table.addNode(n) == nil
check table.addNode(n) == Added
# create a full replacement cache
let replacementNodes = node.nodesAtDistance(rng[], 256, REPLACEMENT_CACHE_SIZE)
for n in replacementNodes:
check table.addNode(n) != nil
check table.addNode(n) == ReplacementAdded
# Add one more node to replacement (would drop first one)
let lastNode = node.nodeAtDistance(rng[], 256)
check table.addNode(lastNode) != nil
check table.addNode(lastNode) == ReplacementAdded
# This should replace the last node in the bucket, with the last one of
# the replacement cache.
@ -107,7 +122,7 @@ suite "Routing Table Tests":
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
check table.nodeToRevalidate().isNil()
@ -116,7 +131,7 @@ suite "Routing Table Tests":
check table.len == 0
let addedNode = generateNode(PrivateKey.random(rng[]))
check table.addNode(addedNode) == nil
check table.addNode(addedNode) == Added
check table.len == 1
# try to replace not existing node
@ -131,12 +146,12 @@ suite "Routing Table Tests":
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
# create a full bucket TODO: no need to store bucketNodes
let bucketNodes = node.nodesAtDistance(rng[], 256, BUCKET_SIZE)
for n in bucketNodes:
check table.addNode(n) == nil
check table.addNode(n) == Added
table.replaceNode(table.nodeToRevalidate())
# This node should still be removed
@ -147,19 +162,19 @@ suite "Routing Table Tests":
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
let doubleNode = node.nodeAtDistance(rng[], 256)
# Try to add the node twice
check table.addNode(doubleNode) == nil
check table.addNode(doubleNode) == nil
check table.addNode(doubleNode) == Added
check table.addNode(doubleNode) == Existing
for n in 0..<BUCKET_SIZE-1:
check table.addNode(node.nodeAtDistance(rng[], 256)) == nil
check table.addNode(node.nodeAtDistance(rng[], 256)) == Added
check table.addNode(node.nodeAtDistance(rng[], 256)) != nil
check table.addNode(node.nodeAtDistance(rng[], 256)) == ReplacementAdded
# Check when adding again once the bucket is full
check table.addNode(doubleNode) == nil
check table.addNode(doubleNode) == Existing
# Test if its order is preserved, there is one node in replacement cache
# which is why we run `BUCKET_SIZE` times.
@ -177,19 +192,19 @@ suite "Routing Table Tests":
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
# create a full bucket
let bucketNodes = node.nodesAtDistance(rng[], 256, BUCKET_SIZE)
for n in bucketNodes:
check table.addNode(n) == nil
check table.addNode(n) == Added
# create a full replacement cache
let replacementNodes = node.nodesAtDistance(rng[], 256, REPLACEMENT_CACHE_SIZE)
for n in replacementNodes:
check table.addNode(n) != nil
check table.addNode(n) == ReplacementAdded
check table.addNode(replacementNodes[0]) != nil
check table.addNode(replacementNodes[0]) == ReplacementExisting
table.replaceNode(table.nodeToRevalidate())
block:
@ -207,12 +222,12 @@ suite "Routing Table Tests":
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
# create a full bucket
let bucketNodes = node.nodesAtDistance(rng[], 256, BUCKET_SIZE)
for n in bucketNodes:
check table.addNode(n) == nil
check table.addNode(n) == Added
# swap seen order
for n in bucketNodes:
@ -227,17 +242,17 @@ suite "Routing Table Tests":
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, rng)
table.init(node, 1, ipLimits, rng = rng)
# create a full bucket
let bucketNodes = node.nodesAtDistance(rng[], 256, BUCKET_SIZE)
for n in bucketNodes:
check table.addNode(n) == nil
check table.addNode(n) == Added
# create a full replacement cache
let replacementNodes = node.nodesAtDistance(rng[], 256, REPLACEMENT_CACHE_SIZE)
for n in replacementNodes:
check table.addNode(n) != nil
check table.addNode(n) == ReplacementAdded
for i in countdown(replacementNodes.high, 0):
table.replaceNode(table.nodeToRevalidate())
@ -254,3 +269,269 @@ suite "Routing Table Tests":
check:
result.isSome()
result.get() == bucketNodes[i]
test "Ip limits on bucket":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, DefaultTableIpLimits, rng = rng)
block: # First bucket
let sameIpNodes = node.nodesAtDistance(rng[], 256,
int(DefaultTableIpLimits.bucketIpLimit))
for n in sameIpNodes:
check table.addNode(n) == Added
# Try to add a node, which should fail due to ip bucket limit
let anotherSameIpNode = node.nodeAtDistance(rng[], 256)
check table.addNode(anotherSameIpNode) == IpLimitReached
# Remove one and try add again
table.replaceNode(table.nodeToRevalidate())
check table.addNode(anotherSameIpNode) == Added
# Further fill the bucket with nodes with different ip.
let diffIpNodes = node.nodesAtDistanceUniqueIp(rng[], 256,
int(BUCKET_SIZE - DefaultTableIpLimits.bucketIpLimit),
ValidIpAddress.init("192.168.0.1"))
for n in diffIpNodes:
check table.addNode(n) == Added
block: # Second bucket
# Try to add another node with the same IP, but different distance.
# This should split the bucket and add it.
let anotherSameIpNode = node.nodeAtDistance(rng[], 255)
check table.addNode(anotherSameIpNode) == Added
# Add more nodes with different ip and distance 255 to get in the new bucket
let diffIpNodes = node.nodesAtDistanceUniqueIp(rng[], 255,
int(BUCKET_SIZE - DefaultTableIpLimits.bucketIpLimit - 1),
ValidIpAddress.init("192.168.1.1"))
for n in diffIpNodes:
check table.addNode(n) == Added
let sameIpNodes = node.nodesAtDistance(rng[], 255,
int(DefaultTableIpLimits.bucketIpLimit - 1))
for n in sameIpNodes:
check table.addNode(n) == Added
# Adding in another one should fail again
let anotherSameIpNode2 = node.nodeAtDistance(rng[], 255)
check table.addNode(anotherSameIpNode2) == IpLimitReached
test "Ip limits on routing table":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
# bitsPerHop = 1 -> Split only the branch in range of own id
table.init(node, 1, DefaultTableIpLimits, rng = rng)
let amount = uint32(DefaultTableIpLimits.tableIpLimit div
DefaultTableIpLimits.bucketIpLimit)
# Fill `amount` of buckets, each with 14 nodes with different ips and 2
# with equal ones.
for j in 0..<amount:
let nodes = node.nodesAtDistanceUniqueIp(rng[], 256 - j,
int(BUCKET_SIZE - DefaultTableIpLimits.bucketIpLimit),
ValidIpAddress.init("192.168.0.1"))
for n in nodes:
check table.addNode(n) == Added
let sameIpNodes = node.nodesAtDistance(rng[], 256 - j,
int(DefaultTableIpLimits.bucketIpLimit))
for n in sameIpNodes:
check table.addNode(n) == Added
# Add a node with a different IP, should work and split a bucket once more.
let anotherDiffIpNode = node.nodeAtDistance(rng[], 256 - amount,
ValidIpAddress.init("192.168.1.1"))
check table.addNode(anotherDiffIpNode) == Added
let amountLeft = int(DefaultTableIpLimits.tableIpLimit mod
DefaultTableIpLimits.bucketIpLimit)
let sameIpNodes = node.nodesAtDistance(rng[], 256 - amount, amountLeft)
for n in sameIpNodes:
check table.addNode(n) == Added
# Add a node with same ip to this fresh bucket, should fail because of total
# ip limit of routing table is reached.
let anotherSameIpNode = node.nodeAtDistance(rng[], 256 - amount)
check table.addNode(anotherSameIpNode) == IpLimitReached
test "Ip limits on replacement cache":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
table.init(node, 1, DefaultTableIpLimits, rng = rng)
let diffIpNodes = node.nodesAtDistanceUniqueIp(rng[], 256,
int(BUCKET_SIZE - DefaultTableIpLimits.bucketIpLimit + 1),
ValidIpAddress.init("192.168.0.1"))
for n in diffIpNodes:
check table.addNode(n) == Added
let sameIpNodes = node.nodesAtDistance(rng[], 256,
int(DefaultTableIpLimits.bucketIpLimit - 1))
for n in sameIpNodes:
check table.addNode(n) == Added
let anotherSameIpNode1 = node.nodeAtDistance(rng[], 256)
check table.addNode(anotherSameIpNode1) == ReplacementAdded
let anotherSameIpNode2 = node.nodeAtDistance(rng[], 256)
check table.addNode(anotherSameIpNode2) == IpLimitReached
block: # Replace node to see if the first one becomes available
table.replaceNode(table.nodeToRevalidate())
let res = table.getNode(anotherSameIpNode1.id)
check:
res.isSome()
res.get() == anotherSameIpNode1
table.getNode(anotherSameIpNode2.id).isNone()
block: # Replace again to see if the first one never becomes available
table.replaceNode(table.nodeToRevalidate())
check:
table.getNode(anotherSameIpNode1.id).isNone()
table.getNode(anotherSameIpNode2.id).isNone()
test "Ip limits on replacement cache: deletion":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
table.init(node, 1, DefaultTableIpLimits, rng = rng)
block: # Fill bucket
let sameIpNodes = node.nodesAtDistance(rng[], 256,
int(DefaultTableIpLimits.bucketIpLimit - 1))
for n in sameIpNodes:
check table.addNode(n) == Added
let diffIpNodes = node.nodesAtDistanceUniqueIp(rng[], 256,
int(BUCKET_SIZE - DefaultTableIpLimits.bucketIpLimit + 1),
ValidIpAddress.init("192.168.0.1"))
for n in diffIpNodes:
check table.addNode(n) == Added
block: # Fill bucket replacement cache
let sameIpNode = node.nodeAtDistance(rng[], 256)
check table.addNode(sameIpNode) == ReplacementAdded
let diffIpNodes = node.nodesAtDistanceUniqueIp(rng[], 256,
int(REPLACEMENT_CACHE_SIZE - 1),
ValidIpAddress.init("192.168.1.1"))
for n in diffIpNodes:
check table.addNode(n) == ReplacementAdded
# Try to add node to replacement, but limit is reached
let sameIpNode = node.nodeAtDistance(rng[], 256)
check table.addNode(sameIpNode) == IpLimitReached
# Add one with different ip, to remove the first
let diffIpNode = node.nodeAtDistance(rng[], 256,
ValidIpAddress.init("192.168.2.1"))
check table.addNode(diffIpNode) == ReplacementAdded
# Now the add should work
check table.addNode(sameIpNode) == ReplacementAdded
test "Ip limits on replacement cache: double add":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
table.init(node, 1, DefaultTableIpLimits, rng = rng)
# Fill bucket
let diffIpNodes = node.nodesAtDistanceUniqueIp(rng[], 256, BUCKET_SIZE,
ValidIpAddress.init("192.168.0.1"))
for n in diffIpNodes:
check table.addNode(n) == Added
# Test if double add does not account for the ip limits.
for i in 0..<DefaultTableIpLimits.bucketIpLimit:
let sameIpNode = node.nodeAtDistance(rng[], 256)
check table.addNode(sameIpNode) == ReplacementAdded
# Add it again
check table.addNode(sameIpNode) == ReplacementExisting
let sameIpNode = node.nodeAtDistance(rng[], 256)
check table.addNode(sameIpNode) == IpLimitReached
test "Ip limits on bucket: double add with new ip":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
table.init(node, 1, DefaultTableIpLimits, rng = rng)
let pk = PrivateKey.random(rng[])
let sameIpNode1 = generateNode(pk)
check table.addNode(sameIpNode1) == Added
let updatedNode1 = generateNode(pk)
# Need to do an update to get seqNum increased
let updated = updatedNode1.updateNode(pk,
some(ValidIpAddress.init("192.168.0.1")), Port(9000), Port(9000))
check updated.isOk()
check table.addNode(updatedNode1) == Existing
let sameIpNodes = node.nodesAtDistance(rng[], 256,
int(DefaultTableIpLimits.bucketIpLimit))
for n in sameIpNodes:
check table.addNode(n) == Added
check table.len == int(DefaultTableIpLimits.bucketIpLimit) + 1
test "Ip limits on replacement cache: double add with new ip":
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
table.init(node, 1, DefaultTableIpLimits, rng = rng)
# Fill bucket
let diffIpNodes = node.nodesAtDistanceUniqueIp(rng[], 256, BUCKET_SIZE,
ValidIpAddress.init("192.168.0.1"))
for n in diffIpNodes:
check table.addNode(n) == Added
let (sameIpNode1, pk) = node.nodeAndPrivKeyAtDistance(rng[], 256)
check table.addNode(sameIpNode1) == ReplacementAdded
# For replacements we don't need to get seqNum increased as the node will
# still get pushed in front of the queue.
let updatedNode1 = generateNode(pk, ip = ValidIpAddress.init("192.168.1.1"))
check table.addNode(updatedNode1) == ReplacementExisting
let sameIpNodes = node.nodesAtDistance(rng[], 256,
int(DefaultTableIpLimits.bucketIpLimit))
for n in sameIpNodes:
check table.addNode(n) == ReplacementAdded
test "Ip limits on bucket: even more adds with new ip":
# This tests against an issue where the ip of the nodes would not get updated
let node = generateNode(PrivateKey.random(rng[]))
var table: RoutingTable
table.init(node, 1, DefaultTableIpLimits, rng = rng)
let pk = PrivateKey.random(rng[])
let sameIpNode1 = generateNode(pk)
check table.addNode(sameIpNode1) == Added
let updatedNode1 = generateNode(pk)
for i in 0..<DefaultTableIpLimits.bucketIpLimit + 1:
# Need to do an update to get seqNum increased
let updated = updatedNode1.updateNode(pk,
some(ValidIpAddress.init("192.168.0.1")), Port(9000+i), Port(9000+i))
check updated.isOk()
check table.addNode(updatedNode1) == Existing
let sameIpNodes = node.nodesAtDistance(rng[], 256,
int(DefaultTableIpLimits.bucketIpLimit))
for n in sameIpNodes:
check table.addNode(n) == Added
check table.len == int(DefaultTableIpLimits.bucketIpLimit) + 1