From ba19465892ac15e0892469738178b4b961cf1a2b Mon Sep 17 00:00:00 2001 From: kdeme Date: Tue, 23 Jun 2020 16:11:58 +0200 Subject: [PATCH] Functional replacement cache --- eth/p2p/discoveryv5/protocol.nim | 2 +- eth/p2p/discoveryv5/routing_table.nim | 65 +++++++++++++++----- tests/p2p/test_routing_table.nim | 86 ++++++++++++++++++++++++++- 3 files changed, 134 insertions(+), 19 deletions(-) diff --git a/eth/p2p/discoveryv5/protocol.nim b/eth/p2p/discoveryv5/protocol.nim index 0689e33..4228b98 100644 --- a/eth/p2p/discoveryv5/protocol.nim +++ b/eth/p2p/discoveryv5/protocol.nim @@ -666,7 +666,7 @@ proc revalidateNode*(d: Protocol, n: Node) # peers in the DHT if n.record notin d.bootstrapRecords: trace "Revalidation of node failed, removing node", record = n.record - d.routingTable.removeNode(n) + d.routingTable.replaceNode(n) # Remove shared secrets when removing the node from routing table. # This might be to direct, so we could keep these longer. But better # would be to simply not remove the nodes immediatly but only after x diff --git a/eth/p2p/discoveryv5/routing_table.nim b/eth/p2p/discoveryv5/routing_table.nim index 3329a12..d6a46eb 100644 --- a/eth/p2p/discoveryv5/routing_table.nim +++ b/eth/p2p/discoveryv5/routing_table.nim @@ -3,6 +3,8 @@ import stint, chronicles, node +export options + {.push raises: [Defect].} type @@ -28,7 +30,8 @@ type ## Here "seen" means a successful request-response. This can also not have ## occured yet. replacementCache: seq[Node] ## Nodes that could not be added to the `nodes` - ## seq as it is full and without stale nodes. + ## seq as it is full and without stale nodes. This is practically a small + ## LRU cache. lastUpdated: float ## epochTime of last update to the KBucket # TODO: Should this update be about changes made only in `nodes`? @@ -87,12 +90,9 @@ proc add(k: KBucket, n: Node): Node = ## it is inserted as the last entry of the bucket (least recently seen node), ## and nil is returned. ## - ## If the bucket is full, the node is added to the bucket's replacement cache - ## and the node at the last entry of the bucket (least recently seen), which - ## should be evicted if it fails to respond to a ping, is returned. - ## - ## If the replacement cache is also full, the node at the last entry of the - ## bucket is returned. The new node is nowhere stored and thus lost. + ## If the bucket is full, the node at the last entry of the bucket (least + ## recently seen), which should be evicted if it fails to respond to a ping, + ## is returned. ## ## Reasoning here is that adding nodes will happen for a big part from ## lookups, which do not necessarily return nodes that are (still) reachable. @@ -110,12 +110,25 @@ proc add(k: KBucket, n: Node): Node = elif k.len < BUCKET_SIZE: k.nodes.add(n) return nil - elif k.replacementCache.len < REPLACEMENT_CACHE_SIZE: - k.replacementCache.add(n) - return k.tail else: return k.tail +proc addReplacement(k: KBucket, n: Node) = + ## Add the node to the tail of the replacement cache of the KBucket. + ## + ## If the replacement cache is full, the oldest (first entry) node will be + ## removed. If the node is already in the replacement cache, it will be moved + ## to the tail. + let nodeIdx = k.replacementCache.find(n) + if nodeIdx != -1: + k.replacementCache.delete(nodeIdx) + k.replacementCache.add(n) + else: + doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE) + if k.replacementCache.len == REPLACEMENT_CACHE_SIZE: + k.replacementCache.delete(0) + k.replacementCache.add(n) + proc removeNode(k: KBucket, n: Node) = let i = k.nodes.find(n) if i != -1: k.nodes.delete(i) @@ -189,9 +202,16 @@ proc bucketForNode(r: RoutingTable, id: NodeId): KBucket = binaryGetBucketForNode(r.buckets, id) proc removeNode*(r: var RoutingTable, n: Node) = + ## Remove the node `n` from the routing table. r.bucketForNode(n.id).removeNode(n) proc addNode*(r: var RoutingTable, n: Node): Node = + ## Try to add the node to the routing table. + ## + ## First, an attempt will be done to add the node to the bucket in its range. + ## If this fails, the bucket will be split if it is eligable for splitting. + ## If so, a new attempt will be done to add the node. If not, the node will be + ## added to the replacement cache. if n == r.thisNode: # warn "Trying to add ourselves to the routing table", node = n return @@ -204,15 +224,30 @@ proc addNode*(r: var RoutingTable, n: Node): Node = # Calculate the prefix shared by all nodes in the bucket's range, not the # ones actually in the bucket. let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend]) - # TODO: Shouldn't the adding to replacement cache be done only if the bucket - # doesn't get split? if bucket.inRange(r.thisNode) or (depth mod r.bitsPerHop != 0 and depth != ID_SIZE): r.splitBucket(r.buckets.find(bucket)) - return r.addNode(n) # retry + return r.addNode(n) # retry adding + else: + # When bucket doesn't get split the node is added to the replacement cache + bucket.addReplacement(n) - # Nothing added, ping evictionCandidate - return evictionCandidate + # Nothing added, return evictionCandidate + return evictionCandidate + +proc replaceNode*(r: var RoutingTable, n: Node) = + ## Replace node `n` with last entry in the replacement cache. If there are + ## no entries in the replacement cache, node `n` will simply be removed. + # TODO: Kademlia paper recommends here to not remove nodes if there are no + # replacements. However, that would require a bit more complexity in the + # revalidation as you don't want to try pinging that node all the time. + let b = r.bucketForNode(n.id) + let idx = b.nodes.find(n) + if idx != -1: + b.nodes.delete(idx) + if b.replacementCache.len > 0: + b.nodes.add(b.replacementCache[high(b.replacementCache)]) + b.replacementCache.delete(high(b.replacementCache)) proc getNode*(r: RoutingTable, id: NodeId): Option[Node] = let b = r.bucketForNode(id) diff --git a/tests/p2p/test_routing_table.nim b/tests/p2p/test_routing_table.nim index d9a6ea0..fc320d1 100644 --- a/tests/p2p/test_routing_table.nim +++ b/tests/p2p/test_routing_table.nim @@ -1,6 +1,6 @@ import - unittest, stew/shims/net, stint, - eth/keys, eth/p2p/discoveryv5/[routing_table, node], + unittest, + eth/p2p/discoveryv5/[routing_table, node], ./discv5_test_helper suite "Routing Table Tests": @@ -53,7 +53,7 @@ suite "Routing Table Tests": # Add 16 more nodes with only 1 bit shared prefix with previous 16. This # should cause the initial bucket to split and and fill the second bucket # with the 16 new entries. - for n in 0..<16: + for n in 0.. Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + # create a full replacement cache + let replacementNodes = node.nodesAtDistance(256, REPLACEMENT_CACHE_SIZE) + for n in replacementNodes: + check table.addNode(n) != nil + + # Add one more node to replacement (would drop first one) + let lastNode = node.nodeAtDistance(256) + check table.addNode(lastNode) != nil + + # This should replace the last node in the bucket, with the last one of + # the replacement cache. + table.replaceNode(table.nodeToRevalidate()) + block: + # Should return the last node of the replacement cache successfully. + let result = table.getNode(lastNode.id) + check: + result.isSome() + result.get() == lastNode + block: + # This node should be removed + check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone() + + test "Empty replacement cache": + let node = generateNode() + var table: RoutingTable + + # bitsPerHop = 1 -> Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket TODO: no need to store bucketNodes + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + table.replaceNode(table.nodeToRevalidate()) + # This node should still be removed + check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone() + + test "Double replacement": + let node = generateNode() + var table: RoutingTable + + # bitsPerHop = 1 -> Split only the branch in range of own id + table.init(node, 1) + + # create a full bucket + let bucketNodes = node.nodesAtDistance(256, BUCKET_SIZE) + for n in bucketNodes: + check table.addNode(n) == nil + + # create a full replacement cache + let replacementNodes = node.nodesAtDistance(256, REPLACEMENT_CACHE_SIZE) + for n in replacementNodes: + check table.addNode(n) != nil + + check table.addNode(replacementNodes[0]) != nil + + table.replaceNode(table.nodeToRevalidate()) + block: + # Should return the last node of the replacement cache successfully. + let result = table.getNode(replacementNodes[0].id) + check: + result.isSome() + result.get() == replacementNodes[0] + block: + # This node should be removed + check (table.getNode(bucketNodes[bucketNodes.high].id)).isNone()