From 7e35b329b464e5d93b64e598a211052b341bed32 Mon Sep 17 00:00:00 2001 From: kdeme Date: Fri, 5 Jun 2020 16:43:18 +0200 Subject: [PATCH 1/7] Add limit to the replacement cache --- eth/p2p/discoveryv5/routing_table.nim | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index d63a930..59d5648 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -18,6 +18,7 @@ type const BUCKET_SIZE* = 16 + REPLACEMENT_CACHE_SIZE* = 8 BITS_PER_HOP = 8 ID_SIZE = 256 @@ -64,14 +65,18 @@ proc head(k: KBucket): Node {.inline.} = k.nodes[0] proc add(k: KBucket, n: Node): Node = ## Try to add the given node to this bucket. - ## If the node is already present, it is moved to the tail of the list, and we return nil. + ## If the node is already present, it is moved to the tail of the list, and + ## nil is returned. - ## If the node is not already present and the bucket has fewer than k entries, it is inserted - ## at the tail of the list, and we return nil. + ## If the node is not already present and the bucket has fewer than k entries, + ## it is inserted at the tail of the list, and nil is returned. - ## If the bucket is full, we add the node to the bucket's replacement cache and return the - ## node at the head of the list (i.e. the least recently seen), which should be evicted if it - ## fails to respond to a ping. + ## If the bucket is full, the node is added to the bucket's replacement cache + ## and the node at the head of the list (i.e. the least recently seen), which + ## should be evicted if it fails to respond to a ping, is returned. + + ## If the replacement cache is also full, the node at the head of the + ## list is returned. The new node is nowhere stored and thus lost. k.lastUpdated = epochTime() let nodeIdx = k.nodes.find(n) if nodeIdx != -1: @@ -79,9 +84,11 @@ proc add(k: KBucket, n: Node): Node = k.nodes.add(n) elif k.len < BUCKET_SIZE: k.nodes.add(n) - else: + elif k.replacementCache.len < REPLACEMENT_CACHE_SIZE: k.replacementCache.add(n) return k.head + else: + return k.head return nil proc removeNode(k: KBucket, n: Node) = From 6c85a48b4c29d0e7bf29c764a4d022a9208fe1bd Mon Sep 17 00:00:00 2001 From: kdeme Date: Fri, 5 Jun 2020 22:56:23 +0200 Subject: [PATCH 2/7] Fix bucket ordering and add comments on this --- eth/p2p/discoveryv5/routing_table.nim | 73 ++++++++++++++++++--------- 1 file changed, 49 insertions(+), 24 deletions(-) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 59d5648..7a71337 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -11,10 +11,18 @@ type buckets: seq[KBucket] KBucket = ref object - istart, iend: NodeId - nodes: seq[Node] - replacementCache: seq[Node] - lastUpdated: float # epochTime + istart, iend: NodeId ## Range of NodeIds this KBucket covers. This is not a + ## simple logarithmic distance as buckets can be split over a prefix that + ## does not cover the `thisNode` id. + nodes: seq[Node] ## Node entries of the KBucket. Sorted according to last + ## time seen. First entry (head) is considered the most recently seen node + ## and the last entry (tail) is considered the least recently seen node. + ## Here "seen" means a successful request-response. This can also not have + ## occured yet. + replacementCache: seq[Node] ## Nodes that could not be added to the `nodes` + ## seq as it is full and without stale nodes. + lastUpdated: float ## epochTime of last update to the KBucket + # TODO: Should this update be about changes made only in `nodes`? const BUCKET_SIZE* = 16 @@ -60,36 +68,46 @@ proc nodesByDistanceTo(k: KBucket, id: NodeId): seq[Node] = sortedByIt(k.nodes, it.distanceTo(id)) proc len(k: KBucket): int {.inline.} = k.nodes.len -proc head(k: KBucket): Node {.inline.} = k.nodes[0] +proc tail(k: KBucket): Node {.inline.} = k.nodes[high(k.nodes)] proc add(k: KBucket, n: Node): Node = ## Try to add the given node to this bucket. - - ## If the node is already present, it is moved to the tail of the list, and - ## nil is returned. - + ## + ## If the node is already present, nothing is done, as the node should only + ## be moved in case of a new succesful request-reponse. + ## ## If the node is not already present and the bucket has fewer than k entries, - ## it is inserted at the tail of the list, and nil is returned. - + ## it is inserted as the last entry of the bucket (least recently seen node), + ## and nil is returned. + ## ## If the bucket is full, the node is added to the bucket's replacement cache - ## and the node at the head of the list (i.e. the least recently seen), which + ## and the node at the last entry of the bucket (least recently seen), which ## should be evicted if it fails to respond to a ping, is returned. - - ## If the replacement cache is also full, the node at the head of the - ## list is returned. The new node is nowhere stored and thus lost. - k.lastUpdated = epochTime() + ## + ## If the replacement cache is also full, the node at the last entry of the + ## bucket is returned. The new node is nowhere stored and thus lost. + ## + ## Reasoning here is that adding nodes will happen for a big part from + ## lookups, which do not necessarily return nodes that are (still) reachable. + ## So, more trust is put in the own ordering and newly additions are added + ## as least recently seen (in fact they are never seen yet from this node its + ## perspective). + ## However, in discovery v5 it can be that a node is added after a incoming + ## request, and considering a handshake that needs to be done, it is likely + ## that this node is reachable. An additional `addSeen` proc could be created + ## for this, + k.lastUpdated = epochTime() # TODO: only when an actual update is done? let nodeIdx = k.nodes.find(n) if nodeIdx != -1: - k.nodes.delete(nodeIdx) - k.nodes.add(n) + return nil elif k.len < BUCKET_SIZE: k.nodes.add(n) + return nil elif k.replacementCache.len < REPLACEMENT_CACHE_SIZE: k.replacementCache.add(n) - return k.head + return k.tail else: - return k.head - return nil + return k.tail proc removeNode(k: KBucket, n: Node) = let i = k.nodes.find(n) @@ -238,13 +256,14 @@ proc moveRight[T](arr: var openarray[T], a, b: int) {.inline.} = shallowCopy(arr[a], t) proc setJustSeen*(r: RoutingTable, n: Node) = - # Move `n` to front of its bucket + ## Move `n` to the head (most recently seen) of its bucket. let b = r.bucketForNode(n.id) let idx = b.nodes.find(n) + # TODO: This assert might be troublesome if we start using it for every + # message response & then ping a node that is not in our table (e.g. in tests) doAssert(idx >= 0) if idx != 0: b.nodes.moveRight(0, idx - 1) - b.nodes[0] = n b.lastUpdated = epochTime() proc nodeToRevalidate*(r: RoutingTable): Node = @@ -267,10 +286,16 @@ proc randomNodes*(r: RoutingTable, maxAmount: int, result = newSeqOfCap[Node](maxAmount) var seen = initHashSet[Node]() - # This is a rather inneficient way of randomizing nodes from all buckets, but even if we + # This is a rather inefficient way of randomizing nodes from all buckets, but even if we # iterate over all nodes in the routing table, the time it takes would still be # insignificant compared to the time it takes for the network roundtrips when connecting # to nodes. + # However, "time it takes" might not be relevant, as there might be no point + # in providing more `randomNodes` as the routing table might not have anything + # new to provide. And there is no way for the calling code to know this. So + # while it will take less total time compared to e.g. an (async) + # randomLookup, the time might be wasted as all nodes are possibly seen + # already. while len(seen) < maxAmount: # TODO: Is it important to get a better random source for these sample calls? let bucket = sample(r.buckets) From 2d7b3440f2842ca1d434024cc9f46375decca378 Mon Sep 17 00:00:00 2001 From: kdeme Date: Wed, 17 Jun 2020 13:51:30 +0200 Subject: [PATCH 3/7] make bitsPerHop configurable + add routing table tests --- eth.nimble | 3 +- eth/p2p/discoveryv5/routing_table.nim | 19 ++++++--- tests/p2p/discv5_test_helper.nim | 55 ++++++++++++++++++++++++ tests/p2p/p2p_test_helper.nim | 12 ------ tests/p2p/test_discoveryv5.nim | 59 +++----------------------- tests/p2p/test_protocol_handlers.nim | 3 +- tests/p2p/test_routing_table.nim | 61 +++++++++++++++++++++++++++ tests/p2p/test_shh_connect.nim | 4 +- tests/p2p/tserver.nim | 2 +- 9 files changed, 142 insertions(+), 76 deletions(-) create mode 100644 tests/p2p/discv5_test_helper.nim create mode 100644 tests/p2p/test_routing_table.nim diff --git a/eth.nimble b/eth.nimble index 5caf7a1..b9ecb13 100644 --- a/eth.nimble +++ b/eth.nimble @@ -50,7 +50,8 @@ proc runP2pTests() = "test_protocol_handlers", "test_enr", "test_discoveryv5", - "test_discv5_encoding" + "test_discv5_encoding", + "test_routing_table" ]: runTest("tests/p2p/" & filename) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 7a71337..1016e19 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -9,6 +9,14 @@ type RoutingTable* = object thisNode: Node buckets: seq[KBucket] + bitsPerHop: int ## This value indicates how many bits (at minimum) you get + ## closer to finding your target per query. Practically, it tells you also + ## how often your "not in range" branch will split off. Setting this to 1 + ## is the basic, non accelerated version, which will never split off the + ## not in range branch and which will result in log base2 n hops per lookup. + ## Setting it higher will increase the amount of splitting on a not in range + ## branch (thus holding more nodes with a better keyspace coverage) and this + ## will result in an improvement of log base(2^b) n hops per lookup. KBucket = ref object istart, iend: NodeId ## Range of NodeIds this KBucket covers. This is not a @@ -27,7 +35,6 @@ type const BUCKET_SIZE* = 16 REPLACEMENT_CACHE_SIZE* = 8 - BITS_PER_HOP = 8 ID_SIZE = 256 proc distanceTo(n: Node, id: NodeId): UInt256 = @@ -165,9 +172,10 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int = doAssert(false, "Unable to calculate number of shared prefix bits") -proc init*(r: var RoutingTable, thisNode: Node) {.inline.} = +proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = 8) {.inline.} = r.thisNode = thisNode r.buckets = @[newKBucket(0.u256, high(Uint256))] + r.bitsPerHop = bitsPerHop randomize() # for later `randomNodes` selection proc splitBucket(r: var RoutingTable, index: int) = @@ -189,13 +197,14 @@ proc addNode*(r: var RoutingTable, n: Node): Node = let bucket = r.bucketForNode(n.id) let evictionCandidate = bucket.add(n) if not evictionCandidate.isNil: - # Split if the bucket has the local node in its range or if the depth is not congruent - # to 0 mod BITS_PER_HOP + # Split if the bucket has the local node in its range or if the depth is not + # congruent to 0 mod `bitsPerHop` let depth = computeSharedPrefixBits(bucket.nodes) # TODO: Shouldn't the adding to replacement cache be done only if the bucket # doesn't get split? - if bucket.inRange(r.thisNode) or (depth mod BITS_PER_HOP != 0 and depth != ID_SIZE): + if bucket.inRange(r.thisNode) or + (depth mod r.bitsPerHop != 0 and depth != ID_SIZE): r.splitBucket(r.buckets.find(bucket)) return r.addNode(n) # retry diff --git a/tests/p2p/discv5_test_helper.nim b/tests/p2p/discv5_test_helper.nim new file mode 100644 index 0000000..9cb2978 --- /dev/null +++ b/tests/p2p/discv5_test_helper.nim @@ -0,0 +1,55 @@ +import + testutils/unittests, stew/shims/net, nimcrypto, + eth/[keys, rlp, trie/db], + eth/p2p/discoveryv5/[discovery_db, enr, node, types, routing_table, encoding], + eth/p2p/discoveryv5/protocol as discv5_protocol + + +proc localAddress*(port: int): Address = + Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port)) + +proc initDiscoveryNode*(privKey: PrivateKey, address: Address, + bootstrapRecords: openarray[Record] = [], + localEnrFields: openarray[FieldPair] = []): + discv5_protocol.Protocol = + var db = DiscoveryDB.init(newMemoryDB()) + result = newProtocol(privKey, db, + some(address.ip), + address.port, address.port, + bootstrapRecords = bootstrapRecords, + localEnrFields = localEnrFields) + + result.open() + +proc nodeIdInNodes*(id: NodeId, nodes: openarray[Node]): bool = + for n in nodes: + if id == n.id: return true + +# Creating a random packet with specific nodeid each time +proc randomPacket*(tag: PacketTag): seq[byte] = + var + authTag: AuthTag + msg: array[44, byte] + + check randomBytes(authTag) == authTag.len + check randomBytes(msg) == msg.len + result.add(tag) + result.add(rlp.encode(authTag)) + result.add(msg) + +proc generateNode*(privKey = PrivateKey.random()[], port: int = 20302, + localEnrFields: openarray[FieldPair] = []): Node = + let port = Port(port) + let enr = enr.Record.init(1, privKey, some(ValidIpAddress.init("127.0.0.1")), + port, port, localEnrFields).expect("Properly intialized private key") + result = newNode(enr).expect("Properly initialized node") + +proc nodeAtDistance*(n: Node, d: uint32): Node = + while true: + let node = generateNode() + if logDist(n.id, node.id) == d: + return node + +proc nodesAtDistance*(n: Node, d: uint32, amount: int): seq[Node] = + for i in 0.. Split only the branch in range of own id + table.init(node, 1) + + for j in 0..5'u32: + for i in 0.. Split only the branch in range of own id + table.init(node, 1) + + # Add 16 nodes, distance 256 + for i in 0.. Date: Mon, 22 Jun 2020 16:46:58 +0200 Subject: [PATCH 4/7] Fix depth calculation for bucket splitting --- eth/p2p/discoveryv5/routing_table.nim | 15 ++++++++------ tests/p2p/test_routing_table.nim | 28 ++++++++++++++++----------- 2 files changed, 26 insertions(+), 17 deletions(-) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 1016e19..3329a12 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -153,7 +153,7 @@ proc binaryGetBucketForNode(buckets: openarray[KBucket], if result.isNil: raise (ref Defect)(msg: "No bucket found for node with id " & $id) -proc computeSharedPrefixBits(nodes: openarray[Node]): int = +proc computeSharedPrefixBits(nodes: openarray[NodeId]): int = ## Count the number of prefix bits shared by all nodes. if nodes.len < 2: return ID_SIZE @@ -163,13 +163,14 @@ proc computeSharedPrefixBits(nodes: openarray[Node]): int = for i in 1 .. ID_SIZE: mask = mask or (one shl (ID_SIZE - i)) - let reference = nodes[0].id and mask + let reference = nodes[0] and mask for j in 1 .. nodes.high: - if (nodes[j].id and mask) != reference: return i - 1 + if (nodes[j] and mask) != reference: return i - 1 for n in nodes: - echo n.id.toHex() + echo n.toHex() + # Reaching this would mean that all node ids are equal doAssert(false, "Unable to calculate number of shared prefix bits") proc init*(r: var RoutingTable, thisNode: Node, bitsPerHop = 8) {.inline.} = @@ -199,8 +200,10 @@ proc addNode*(r: var RoutingTable, n: Node): Node = if not evictionCandidate.isNil: # Split if the bucket has the local node in its range or if the depth is not # congruent to 0 mod `bitsPerHop` - - let depth = computeSharedPrefixBits(bucket.nodes) + # + # Calculate the prefix shared by all nodes in the bucket's range, not the + # ones actually in the bucket. + let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend]) # TODO: Shouldn't the adding to replacement cache be done only if the bucket # doesn't get split? if bucket.inRange(r.thisNode) or diff --git a/tests/p2p/test_routing_table.nim b/tests/p2p/test_routing_table.nim index 373489c..d9a6ea0 100644 --- a/tests/p2p/test_routing_table.nim +++ b/tests/p2p/test_routing_table.nim @@ -40,22 +40,28 @@ suite "Routing Table Tests": let node = generateNode() var table: RoutingTable - # bitsPerHop = 2, allow not in range branch to split once. + # bitsPerHop = 2, allow not in range branch to split once (2 buckets). table.init(node, 2) - # Add 16 nodes, distance 256 - for i in 0.. Date: Tue, 23 Jun 2020 16:11:58 +0200 Subject: [PATCH 5/7] Functional replacement cache --- eth/p2p/discoveryv5/protocol.nim | 2 +- eth/p2p/discoveryv5/routing_table.nim | 65 +++++++++++++++----- tests/p2p/test_routing_table.nim | 86 ++++++++++++++++++++++++++- 3 files changed, 134 insertions(+), 19 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 0689e33..4228b98 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -666,7 +666,7 @@ proc revalidateNode*(d: Protocol, n: Node) # peers in the DHT if n.record notin d.bootstrapRecords: trace "Revalidation of node failed, removing node", record = n.record - d.routingTable.removeNode(n) + d.routingTable.replaceNode(n) # Remove shared secrets when removing the node from routing table. # This might be to direct, so we could keep these longer. But better # would be to simply not remove the nodes immediatly but only after x diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 3329a12..d6a46eb 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -3,6 +3,8 @@ import stint, chronicles, node +export options + {.push raises: [Defect].} type @@ -28,7 +30,8 @@ type ## Here "seen" means a successful request-response. This can also not have ## occured yet. replacementCache: seq[Node] ## Nodes that could not be added to the `nodes` - ## seq as it is full and without stale nodes. + ## seq as it is full and without stale nodes. This is practically a small + ## LRU cache. lastUpdated: float ## epochTime of last update to the KBucket # TODO: Should this update be about changes made only in `nodes`? @@ -87,12 +90,9 @@ proc add(k: KBucket, n: Node): Node = ## it is inserted as the last entry of the bucket (least recently seen node), ## and nil is returned. ## - ## If the bucket is full, the node is added to the bucket's replacement cache - ## and the node at the last entry of the bucket (least recently seen), which - ## should be evicted if it fails to respond to a ping, is returned. - ## - ## If the replacement cache is also full, the node at the last entry of the - ## bucket is returned. The new node is nowhere stored and thus lost. + ## If the bucket is full, the node at the last entry of the bucket (least + ## recently seen), which should be evicted if it fails to respond to a ping, + ## is returned. ## ## Reasoning here is that adding nodes will happen for a big part from ## lookups, which do not necessarily return nodes that are (still) reachable. @@ -110,12 +110,25 @@ proc add(k: KBucket, n: Node): Node = elif k.len < BUCKET_SIZE: k.nodes.add(n) return nil - elif k.replacementCache.len < REPLACEMENT_CACHE_SIZE: - k.replacementCache.add(n) - return k.tail else: return k.tail +proc addReplacement(k: KBucket, n: Node) = + ## Add the node to the tail of the replacement cache of the KBucket. + ## + ## If the replacement cache is full, the oldest (first entry) node will be + ## removed. If the node is already in the replacement cache, it will be moved + ## to the tail. + let nodeIdx = k.replacementCache.find(n) + if nodeIdx != -1: + k.replacementCache.delete(nodeIdx) + k.replacementCache.add(n) + else: + doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE) + if k.replacementCache.len == REPLACEMENT_CACHE_SIZE: + k.replacementCache.delete(0) + k.replacementCache.add(n) + proc removeNode(k: KBucket, n: Node) = let i = k.nodes.find(n) if i != -1: k.nodes.delete(i) @@ -189,9 +202,16 @@ proc bucketForNode(r: RoutingTable, id: NodeId): KBucket = binaryGetBucketForNode(r.buckets, id) proc removeNode*(r: var RoutingTable, n: Node) = + ## Remove the node `n` from the routing table. r.bucketForNode(n.id).removeNode(n) proc addNode*(r: var RoutingTable, n: Node): Node = + ## Try to add the node to the routing table. + ## + ## First, an attempt will be done to add the node to the bucket in its range. + ## If this fails, the bucket will be split if it is eligable for splitting. + ## If so, a new attempt will be done to add the node. If not, the node will be + ## added to the replacement cache. if n == r.thisNode: # warn "Trying to add ourselves to the routing table", node = n return @@ -204,15 +224,30 @@ proc addNode*(r: var RoutingTable, n: Node): Node = # Calculate the prefix shared by all nodes in the bucket's range, not the # ones actually in the bucket. let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend]) - # TODO: Shouldn't the adding to replacement cache be done only if the bucket - # doesn't get split? if bucket.inRange(r.thisNode) or (depth mod r.bitsPerHop != 0 and depth != ID_SIZE): r.splitBucket(r.buckets.find(bucket)) - return r.addNode(n) # retry + return r.addNode(n) # retry adding + else: + # When bucket doesn't get split the node is added to the replacement cache + bucket.addReplacement(n) - # Nothing added, ping evictionCandidate - return evictionCandidate + # Nothing added, return evictionCandidate + return evictionCandidate + +proc replaceNode*(r: var RoutingTable, n: Node) = + ## Replace node `n` with last entry in the replacement cache. If there are + ## no entries in the replacement cache, node `n` will simply be removed. + # TODO: Kademlia paper recommends here to not remove nodes if there are no + # replacements. However, that would require a bit more complexity in the + # revalidation as you don't want to try pinging that node all the time. + let b = r.bucketForNode(n.id) + let idx = b.nodes.find(n) + if idx != -1: + b.nodes.delete(idx) + if b.replacementCache.len > 0: + b.nodes.add(b.replacementCache[high(b.replacementCache)]) + b.replacementCache.delete(high(b.replacementCache)) proc getNode*(r: RoutingTable, id: NodeId): Option[Node] = let b = r.bucketForNode(id) diff --git a/tests/p2p/test_routing_table.nim b/tests/p2p/test_routing_table.nim index d9a6ea0..fc320d1 100644 --- a/tests/p2p/test_routing_table.nim +++ b/tests/p2p/test_routing_table.nim @@ -1,6 +1,6 @@ import - unittest, stew/shims/net, stint, - eth/keys, eth/p2p/discoveryv5/[routing_table, node], + unittest, + eth/p2p/discoveryv5/[routing_table, node], ./discv5_test_helper suite "Routing Table Tests": @@ -53,7 +53,7 @@ suite "Routing Table Tests": # Add 16 more nodes with only 1 bit shared prefix with previous 16. This # should cause the initial bucket to split and and fill the second bucket # with the 16 new entries. - for n in 0..<16: + for n in 0.. Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + # create a full replacement cache + let replacementNodes = node.nodesAtDistance(256, REPLACEMENT_CACHE_SIZE) + for n in replacementNodes: + check table.addNode(n) != nil + + # Add one more node to replacement (would drop first one) + let lastNode = node.nodeAtDistance(256) + check table.addNode(lastNode) != nil + + # This should replace the last node in the bucket, with the last one of + # the replacement cache. + table.replaceNode(table.nodeToRevalidate()) + block: + # Should return the last node of the replacement cache successfully. + let result = table.getNode(lastNode.id) + check: + result.isSome() + result.get() == lastNode + block: + # This node should be removed + check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone() + + test "Empty replacement cache": + let node = generateNode() + var table: RoutingTable + + # bitsPerHop = 1 -> Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket TODO: no need to store bucketNodes + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + table.replaceNode(table.nodeToRevalidate()) + # This node should still be removed + check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone() + + test "Double replacement": + let node = generateNode() + var table: RoutingTable + + # bitsPerHop = 1 -> Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + # create a full replacement cache + let replacementNodes = node.nodesAtDistance(256, REPLACEMENT_CACHE_SIZE) + for n in replacementNodes: + check table.addNode(n) != nil + + check table.addNode(replacementNodes[0]) != nil + + table.replaceNode(table.nodeToRevalidate()) + block: + # Should return the last node of the replacement cache successfully. + let result = table.getNode(replacementNodes[0].id) + check: + result.isSome() + result.get() == replacementNodes[0] + block: + # This node should be removed + check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone() From 5ffe6bb8ff4bb384d2ccf8fb64e518140254de2c Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 23 Jun 2020 17:54:12 +0200 Subject: [PATCH 6/7] Add more routing table kbucket tests --- tests/p2p/test_routing_table.nim | 109 ++++++++++++++++++++++++++++++- 1 file changed, 108 insertions(+), 1 deletion(-) diff --git a/tests/p2p/test_routing_table.nim b/tests/p2p/test_routing_table.nim index fc320d1..b5dfe29 100644 --- a/tests/p2p/test_routing_table.nim +++ b/tests/p2p/test_routing_table.nim @@ -100,6 +100,30 @@ suite "Routing Table Tests": # This node should be removed check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone() + test "Empty bucket": + let node = generateNode() + var table: RoutingTable + + # bitsPerHop = 1 -> Split only the branch in range of own id + table.init(node, 1) + + check table.nodeToRevalidate().isNil() + + # try to replace not existing node + table.replaceNode(generateNode()) + check table.len == 0 + + let addedNode = generateNode() + check table.addNode(addedNode) == nil + check table.len == 1 + + # try to replace not existing node + table.replaceNode(generateNode()) + check table.len == 1 + + table.replaceNode(addedNode) + check table.len == 0 + test "Empty replacement cache": let node = generateNode() var table: RoutingTable @@ -116,7 +140,37 @@ suite "Routing Table Tests": # This node should still be removed check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone() - test "Double replacement": + test "Double add": + let node = generateNode() + var table: RoutingTable + + # bitsPerHop = 1 -> Split only the branch in range of own id + table.init(node, 1) + + let doubleNode = node.nodeAtDistance(256) + # Try to add the node twice + check table.addNode(doubleNode) == nil + check table.addNode(doubleNode) == nil + + for n in 0.. Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + # swap seen order + for n in bucketNodes: + table.setJustSeen(n) + + for n in bucketNodes: + table.replaceNode(table.nodeToRevalidate()) + check (table.getNode(n.id)).isNone() + + test "Just seen replacement": + let node = generateNode() + var table: RoutingTable + + # bitsPerHop = 1 -> Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + # create a full replacement cache + let replacementNodes = node.nodesAtDistance(256, REPLACEMENT_CACHE_SIZE) + for n in replacementNodes: + check table.addNode(n) != nil + + for i in countdown(replacementNodes.high, 0): + table.replaceNode(table.nodeToRevalidate()) + table.setJustSeen(replacementNodes[i]) + + for n in replacementNodes: + let result = table.getNode(n.id) + check: + result.isSome() + result.get() == n + + for i in 0.. Date: Wed, 24 Jun 2020 12:29:59 +0200 Subject: [PATCH 7/7] routing table: resolve some TODOs --- eth/p2p/discoveryv5/routing_table.nim | 44 ++++++++++++++------------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index d6a46eb..ec741d6 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -32,8 +32,7 @@ type replacementCache: seq[Node] ## Nodes that could not be added to the `nodes` ## seq as it is full and without stale nodes. This is practically a small ## LRU cache. - lastUpdated: float ## epochTime of last update to the KBucket - # TODO: Should this update be about changes made only in `nodes`? + lastUpdated: float ## epochTime of last update to `nodes` in the KBucket. const BUCKET_SIZE* = 16 @@ -103,7 +102,7 @@ proc add(k: KBucket, n: Node): Node = ## request, and considering a handshake that needs to be done, it is likely ## that this node is reachable. An additional `addSeen` proc could be created ## for this, - k.lastUpdated = epochTime() # TODO: only when an actual update is done? + k.lastUpdated = epochTime() let nodeIdx = k.nodes.find(n) if nodeIdx != -1: return nil @@ -150,22 +149,20 @@ proc inRange(k: KBucket, n: Node): bool {.inline.} = proc contains(k: KBucket, n: Node): bool = n in k.nodes -proc binaryGetBucketForNode(buckets: openarray[KBucket], - id: NodeId): KBucket {.inline.} = - ## Given a list of ordered buckets, returns the bucket for a given node. +proc binaryGetBucketForNode*(buckets: openarray[KBucket], + id: NodeId): KBucket = + ## Given a list of ordered buckets, returns the bucket for a given `NodeId`. + ## Returns nil if no bucket in range for given `id` is found. let bucketPos = lowerBound(buckets, id) do(a: KBucket, b: NodeId) -> int: cmp(a.iend, b) - # Prevents edge cases where bisect_left returns an out of range index + + # Prevent cases where `lowerBound` returns an out of range index e.g. at empty + # openarray, or when the id is out range for all buckets in the openarray. if bucketPos < buckets.len: let bucket = buckets[bucketPos] if bucket.istart <= id and id <= bucket.iend: result = bucket - # TODO: Is this really an error that should occur? Feels a lot like a work- - # around to another problem. Set to Defect for now. - if result.isNil: - raise (ref Defect)(msg: "No bucket found for node with id " & $id) - proc computeSharedPrefixBits(nodes: openarray[NodeId]): int = ## Count the number of prefix bits shared by all nodes. if nodes.len < 2: @@ -199,7 +196,9 @@ proc splitBucket(r: var RoutingTable, index: int) = r.buckets.insert(b, index + 1) proc bucketForNode(r: RoutingTable, id: NodeId): KBucket = - binaryGetBucketForNode(r.buckets, id) + result = binaryGetBucketForNode(r.buckets, id) + doAssert(not result.isNil(), + "Routing table should always cover the full id space") proc removeNode*(r: var RoutingTable, n: Node) = ## Remove the node `n` from the routing table. @@ -294,7 +293,7 @@ proc neighboursAtDistance*(r: RoutingTable, distance: uint32, proc len*(r: RoutingTable): int = for b in r.buckets: result += b.len -proc moveRight[T](arr: var openarray[T], a, b: int) {.inline.} = +proc moveRight[T](arr: var openarray[T], a, b: int) = ## In `arr` move elements in range [a, b] right by 1. var t: T shallowCopy(t, arr[b + 1]) @@ -304,19 +303,22 @@ proc moveRight[T](arr: var openarray[T], a, b: int) {.inline.} = proc setJustSeen*(r: RoutingTable, n: Node) = ## Move `n` to the head (most recently seen) of its bucket. + ## If `n` is not in the routing table, do nothing. let b = r.bucketForNode(n.id) let idx = b.nodes.find(n) - # TODO: This assert might be troublesome if we start using it for every - # message response & then ping a node that is not in our table (e.g. in tests) - doAssert(idx >= 0) - if idx != 0: - b.nodes.moveRight(0, idx - 1) - b.lastUpdated = epochTime() + if idx >= 0: + if idx != 0: + b.nodes.moveRight(0, idx - 1) + b.lastUpdated = epochTime() proc nodeToRevalidate*(r: RoutingTable): Node = + ## Return a node to revalidate. The least recently seen node from a random + ## bucket is selected. var buckets = r.buckets shuffle(buckets) - # TODO: Should we prioritize less-recently-updated buckets instead? + # TODO: Should we prioritize less-recently-updated buckets instead? Could use + # `lastUpdated` for this, but it would probably make more sense to only update + # that value on revalidation then and rename it to `lastValidated`. for b in buckets: if b.len > 0: return b.nodes[^1]