From c646b9c2b909ace798347afb3b91cb3c858b5bc1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 12 Sep 2022 15:45:49 -0600 Subject: [PATCH 1/5] providers LRU - avoid unchecked growth --- libp2pdht/private/eth/p2p/discoveryv5/lru.nim | 14 +- .../private/eth/p2p/discoveryv5/protocol.nim | 142 +++++++++--------- 2 files changed, 83 insertions(+), 73 deletions(-) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/lru.nim b/libp2pdht/private/eth/p2p/discoveryv5/lru.nim index d4ffb41..3d6bdad 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/lru.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/lru.nim @@ -2,10 +2,12 @@ import std/[tables, lists, options] {.push raises: [Defect].} +export tables, lists, options + type LRUCache*[K, V] = object of RootObj list: DoublyLinkedList[(K, V)] # Head is MRU k:v and tail is LRU k:v - table: Table[K, DoublyLinkedNode[(K, V)]] # DoublyLinkedNode is alraedy ref + table: Table[K, DoublyLinkedNode[(K, V)]] # DoublyLinkedNode is already ref capacity: int func init*[K, V](T: type LRUCache[K, V], capacity: int): LRUCache[K, V] = @@ -39,3 +41,13 @@ func del*[K, V](lru: var LRUCache[K, V], key: K) = func len*[K, V](lru: LRUCache[K, V]): int = lru.table.len + +proc contains*[K, V](lru: LRUCache[K, V], k: K): bool = + k in lru.table + +iterator items*[K, V](lru: LRUCache[K, V]): V = + ## This will not increment LRU/MRU access stats + ## + + for item in lru.list: + yield item[1] diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index c26dae6..178c25b 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -79,7 +79,7 @@ import stew/[base64, endians2, results], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl, metrics, libp2p/[crypto/crypto, routing_record], - "."/[transport, messages, messages_encoding, node, routing_table, spr, random2, ip_vote, nodes_verification] + "."/[transport, messages, messages_encoding, node, routing_table, spr, random2, ip_vote, nodes_verification, lru] import nimcrypto except toHex @@ -98,20 +98,22 @@ logScope: topics = "discv5" const - alpha = 3 ## Kademlia concurrency factor - lookupRequestLimit = 3 ## Amount of distances requested in a single Findnode + Alpha = 3 ## Kademlia concurrency factor + LookupRequestLimit = 3 ## Amount of distances requested in a single Findnode ## message for a lookup or query - findNodeResultLimit = 16 ## Maximum amount of SPRs in the total Nodes messages + FindNodeResultLimit = 16 ## Maximum amount of SPRs in the total Nodes messages ## that will be processed - maxNodesPerMessage = 3 ## Maximum amount of SPRs per individual Nodes message - refreshInterval = 5.minutes ## Interval of launching a random query to + MaxNodesPerMessage = 3 ## Maximum amount of SPRs per individual Nodes message + RefreshInterval = 5.minutes ## Interval of launching a random query to ## refresh the routing table. - revalidateMax = 10000 ## Revalidation of a peer is done between 0 and this + RevalidateMax = 10000 ## Revalidation of a peer is done between 0 and this ## value in milliseconds - ipMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port + IpMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port ## majority and updating this when SPR auto update is set. - initialLookups = 1 ## Amount of lookups done when populating the routing table - responseTimeout* = 4.seconds ## timeout for the response of a request-response + InitialLookups = 1 ## Amount of lookups done when populating the routing table + ResponseTimeout* = 4.seconds ## timeout for the response of a request-response + MaxProvidersEntries* = 1_000_000 # one million records + MaxProvidersPerEntry* = 20 # providers per entry ## call type @@ -119,6 +121,9 @@ type tableIpLimits*: TableIpLimits bitsPerHop*: int + ProvidersCache = LRUCache[PeerId, SignedPeerRecord] + ItemsCache = LRUCache[NodeId, ProvidersCache] + Protocol* = ref object localNode*: Node privateKey: PrivateKey @@ -135,7 +140,7 @@ type talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of # overkill here, use sequence rng*: ref BrHmacDrbgContext - providers: Table[NodeId, seq[SignedPeerRecord]] + providers: ItemsCache TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] {.gcsafe, raises: [Defect].} @@ -267,11 +272,11 @@ proc sendNodes(d: Protocol, toId: NodeId, toAddr: Address, reqId: RequestId, # TODO: Do the total calculation based on the max UDP packet size we want to # send and the SPR size of all (max 16) nodes. # Which UDP packet size to take? 1280? 576? - message.total = ceil(nodes.len / maxNodesPerMessage).uint32 + message.total = ceil(nodes.len / MaxNodesPerMessage).uint32 for i in 0 ..< nodes.len: message.sprs.add(nodes[i].record) - if message.sprs.len == maxNodesPerMessage: + if message.sprs.len == MaxNodesPerMessage: d.sendNodes(toId, toAddr, message, reqId) message.sprs.setLen(0) @@ -329,23 +334,36 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, d.sendResponse(fromId, fromAddr, talkresp, reqId) proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) = - trace "adding provider to local db", cid=cId, spr=prov.data - p.providers.mgetOrPut(cId, @[]).add(prov) + trace "adding provider to local db", n=p.localNode, cId, prov -proc handleAddProvider(d: Protocol, fromId: NodeId, fromAddr: Address, - addProvider: AddProviderMessage, reqId: RequestId) = + var providers = + if p.providers.get(cId).isNone: + ProvidersCache.init(MaxProvidersPerEntry) + else: + p.providers.get(cId).get() + + providers.put(prov.data.peerId, prov) + p.providers.put(cId, providers) + +proc handleAddProvider( + d: Protocol, + fromId: NodeId, + fromAddr: Address, + addProvider: AddProviderMessage, + reqId: RequestId) = d.addProviderLocal(addProvider.cId, addProvider.prov) proc handleGetProviders(d: Protocol, fromId: NodeId, fromAddr: Address, getProviders: GetProvidersMessage, reqId: RequestId) = #TODO: add checks, add signed version - let provs = d.providers.getOrDefault(getProviders.cId) - trace "providers:", prov=provs.mapIt(it.data) + let provs = d.providers.get(getProviders.cId) + if provs.isSome: + trace "providers:", provs - ##TODO: handle multiple messages - let response = ProvidersMessage(total: 1, provs: provs) - d.sendResponse(fromId, fromAddr, response, reqId) + ##TODO: handle multiple messages + let response = ProvidersMessage(total: 1, provs: toSeq(provs.get())) + d.sendResponse(fromId, fromAddr, response, reqId) proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, message: Message) = @@ -406,7 +424,7 @@ proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId): result = newFuture[Option[Message]]("waitMessage") let res = result let key = (fromNode.id, reqId) - sleepAsync(responseTimeout).addCallback() do(data: pointer): + sleepAsync(ResponseTimeout).addCallback() do(data: pointer): d.awaitedMessages.del(key) if not res.finished: res.complete(none(Message)) @@ -422,12 +440,12 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId): ## Same counts for out of order receival. var op = await d.waitMessage(fromNode, reqId) if op.isSome: - if op.get.kind == nodes: + if op.get.kind == MessageKind.nodes: var res = op.get.nodes.sprs let total = op.get.nodes.total for i in 1 ..< total: op = await d.waitMessage(fromNode, reqId) - if op.isSome and op.get.kind == nodes: + if op.isSome and op.get.kind == MessageKind.nodes: res.add(op.get.nodes.sprs) else: # No error on this as we received some nodes. @@ -498,7 +516,7 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]): let nodes = await d.waitNodes(toNode, reqId) if nodes.isOk: - let res = verifyNodesRecords(nodes.get(), toNode, findNodeResultLimit, distances) + let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances) d.routingTable.setJustSeen(toNode) return ok(res) else: @@ -515,7 +533,7 @@ proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId): let nodes = await d.waitNodes(toNode, reqId) if nodes.isOk: - let res = verifyNodesRecords(nodes.get(), toNode, findNodeResultLimit) + let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit) d.routingTable.setJustSeen(toNode) return ok(res) else: @@ -550,7 +568,7 @@ proc lookupDistances*(target, dest: NodeId): seq[uint16] = let tdAsInt = int(td) result.add(td) var i = 1 - while result.len < lookupRequestLimit: + while result.len < LookupRequestLimit: if tdAsInt + i < 256: result.add(td + uint16(i)) if tdAsInt - i > 0: @@ -561,7 +579,7 @@ proc lookupWorker(d: Protocol, destNode: Node, target: NodeId): Future[seq[Node]] {.async.} = let dists = lookupDistances(target, destNode.id) - # Instead of doing max `lookupRequestLimit` findNode requests, make use + # Instead of doing max `LookupRequestLimit` findNode requests, make use # of the discv5.1 functionality to request nodes for multiple distances. let r = await d.findNode(destNode, dists) if r.isOk: @@ -597,13 +615,13 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]] for node in closestNodes: seen.incl(node.id) - var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + var pendingQueries = newSeqOfCap[Future[seq[Node]]](Alpha) while true: var i = 0 - # Doing `alpha` amount of requests at once as long as closer non queried + # Doing `Alpha` amount of requests at once as long as closer non queried # nodes are discovered. - while i < closestNodes.len and pendingQueries.len < alpha: + while i < closestNodes.len and pendingQueries.len < Alpha: let n = closestNodes[i] if not asked.containsOrIncl(n.id): if fast: @@ -693,7 +711,7 @@ proc getProvidersLocal*( ): seq[SignedPeerRecord] {.raises: [KeyError,Defect].} = return - if (cId in d.providers): d.providers[cId] + if (cId in d.providers): toSeq(d.providers.get(cId).get()) else: @[] proc getProviders*( @@ -752,11 +770,11 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]] for node in queryBuffer: seen.incl(node.id) - var pendingQueries = newSeqOfCap[Future[seq[Node]]](alpha) + var pendingQueries = newSeqOfCap[Future[seq[Node]]](Alpha) while true: var i = 0 - while i < min(queryBuffer.len, k) and pendingQueries.len < alpha: + while i < min(queryBuffer.len, k) and pendingQueries.len < Alpha: let n = queryBuffer[i] if not asked.containsOrIncl(n.id): pendingQueries.add(d.lookupWorker(n, target)) @@ -847,8 +865,8 @@ proc populateTable*(d: Protocol) {.async.} = let selfQuery = await d.query(d.localNode.id) trace "Discovered nodes in self target query", nodes = selfQuery.len - # `initialLookups` random queries - for i in 0.. (d.lastLookup + refreshInterval): + if currentTime > (d.lastLookup + RefreshInterval): let randomQuery = await d.queryRandom() trace "Discovered nodes in random target query", nodes = randomQuery.len debug "Total nodes in discv5 routing table", total = d.routingTable.len() - await sleepAsync(refreshInterval) + await sleepAsync(RefreshInterval) except CancelledError: trace "refreshLoop canceled" @@ -944,7 +962,7 @@ proc ipMajorityLoop(d: Protocol) {.async.} = debug "Discovered external address matches current address", majority, current = d.localNode.address - await sleepAsync(ipMajorityInterval) + await sleepAsync(IpMajorityInterval) except CancelledError: trace "ipMajorityLoop canceled" @@ -1009,42 +1027,22 @@ proc newProtocol*( # TODO Consider whether this should be a Defect doAssert rng != nil, "RNG initialization failed" + let + routingTable = RoutingTable.init( + node, + config.bitsPerHop, + config.tableIpLimits, + rng) + result = Protocol( privateKey: privKey, localNode: node, bootstrapRecords: @bootstrapRecords, ipVote: IpVote.init(), enrAutoUpdate: enrAutoUpdate, - routingTable: RoutingTable.init( - node, config.bitsPerHop, config.tableIpLimits, rng), - rng: rng) - - result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) - -proc newProtocol*( - privKey: PrivateKey, - bindPort: Port, - record: SignedPeerRecord, - bootstrapRecords: openArray[SignedPeerRecord] = [], - bindIp = IPv4_any(), - config = defaultDiscoveryConfig, - rng = newRng()): - Protocol = - info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record) - let node = newNode(record).expect("Properly initialized record") - - # TODO Consider whether this should be a Defect - doAssert rng != nil, "RNG initialization failed" - - result = Protocol( - privateKey: privKey, - localNode: node, - bootstrapRecords: @bootstrapRecords, - ipVote: IpVote.init(), - enrAutoUpdate: false, #TODO this should be removed from nim-libp2p-dht - routingTable: RoutingTable.init( - node, config.bitsPerHop, config.tableIpLimits, rng), - rng: rng) + routingTable: routingTable, + rng: rng, + providers: ItemsCache.init(MaxProvidersEntries)) result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) From fca255b2f733eece9b3e7f0ddb11bf753425f58f Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 12 Sep 2022 15:46:24 -0600 Subject: [PATCH 2/5] use `pure` enums --- .../private/eth/p2p/discoveryv5/messages.nim | 22 +++++++++---------- tests/discv5/test_discoveryv5_encoding.nim | 4 ++-- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim index 50114e8..a27cedc 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim @@ -22,7 +22,7 @@ import export providers_messages type - MessageKind* = enum + MessageKind* {.pure.} = enum # TODO This is needed only to make Nim 1.2.6 happy # Without it, the `MessageKind` type cannot be used as # a discriminator in case objects. @@ -116,16 +116,16 @@ type discard template messageKind*(T: typedesc[SomeMessage]): MessageKind = - when T is PingMessage: ping - elif T is PongMessage: pong - elif T is FindNodeMessage: findNode - elif T is FindNodeFastMessage: findNodeFast - elif T is NodesMessage: nodes - elif T is TalkReqMessage: talkReq - elif T is TalkRespMessage: talkResp - elif T is AddProviderMessage: addProvider - elif T is GetProvidersMessage: getProviders - elif T is ProvidersMessage: providers + when T is PingMessage: MessageKind.ping + elif T is PongMessage: MessageKind.pong + elif T is FindNodeMessage: MessageKind.findNode + elif T is FindNodeFastMessage: MessageKind.findNodeFast + elif T is NodesMessage: MessageKind.nodes + elif T is TalkReqMessage: MessageKind.talkReq + elif T is TalkRespMessage: MessageKind.talkResp + elif T is AddProviderMessage: MessageKind.addProvider + elif T is GetProvidersMessage: MessageKind.getProviders + elif T is ProvidersMessage: MessageKind.providers proc hash*(reqId: RequestId): Hash = hash(reqId.id) diff --git a/tests/discv5/test_discoveryv5_encoding.nim b/tests/discv5/test_discoveryv5_encoding.nim index 863dd66..22cf991 100644 --- a/tests/discv5/test_discoveryv5_encoding.nim +++ b/tests/discv5/test_discoveryv5_encoding.nim @@ -89,7 +89,7 @@ suite "Discovery v5.1 Protocol Message Encodings": let message = decoded.get() check: message.reqId == reqId - message.kind == nodes + message.kind == MessageKind.nodes message.nodes.total == total message.nodes.sprs.len() == 0 @@ -111,7 +111,7 @@ suite "Discovery v5.1 Protocol Message Encodings": let message = decoded.get() check: message.reqId == reqId - message.kind == nodes + message.kind == MessageKind.nodes message.nodes.total == total message.nodes.sprs.len() == 2 message.nodes.sprs[0] == s1 From 23e20a2f1c2ce8c08236333cf41dd41d98b1fd2a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 12 Sep 2022 18:41:44 -0600 Subject: [PATCH 3/5] bad merge --- .../private/eth/p2p/discoveryv5/protocol.nim | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index 178c25b..ae8cf81 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -1046,6 +1046,33 @@ proc newProtocol*( result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) +proc newProtocol*( + privKey: PrivateKey, + bindPort: Port, + record: SignedPeerRecord, + bootstrapRecords: openArray[SignedPeerRecord] = [], + bindIp = IPv4_any(), + config = defaultDiscoveryConfig, + rng = newRng()): + Protocol = + info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record) + let node = newNode(record).expect("Properly initialized record") + + # TODO Consider whether this should be a Defect + doAssert rng != nil, "RNG initialization failed" + + result = Protocol( + privateKey: privKey, + localNode: node, + bootstrapRecords: @bootstrapRecords, + ipVote: IpVote.init(), + enrAutoUpdate: false, #TODO this should be removed from nim-libp2p-dht + routingTable: RoutingTable.init( + node, config.bitsPerHop, config.tableIpLimits, rng), + rng: rng) + + result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) + proc open*(d: Protocol) {.raises: [Defect, CatchableError].} = info "Starting discovery node", node = d.localNode From f84bc647cef66684cc9b2e2d997cbbc6b1aa6e7f Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 12 Sep 2022 21:08:26 -0600 Subject: [PATCH 4/5] don't touch the cache when checking for id --- libp2pdht/private/eth/p2p/discoveryv5/protocol.nim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index ae8cf81..a9bdedf 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -337,7 +337,7 @@ proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) = trace "adding provider to local db", n=p.localNode, cId, prov var providers = - if p.providers.get(cId).isNone: + if cId notin p.providers: ProvidersCache.init(MaxProvidersPerEntry) else: p.providers.get(cId).get() From f5afe784c5b6c6850f827fad30a0ba1898e04cbf Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Mon, 12 Sep 2022 21:08:55 -0600 Subject: [PATCH 5/5] prevent npe when table is 0 --- libp2pdht/private/eth/p2p/discoveryv5/lru.nim | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/lru.nim b/libp2pdht/private/eth/p2p/discoveryv5/lru.nim index 3d6bdad..6fbfdf6 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/lru.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/lru.nim @@ -11,6 +11,7 @@ type capacity: int func init*[K, V](T: type LRUCache[K, V], capacity: int): LRUCache[K, V] = + doAssert capacity > 0, "Capacity should be greater than 0!" LRUCache[K, V](capacity: capacity) # Table and list init is done default func get*[K, V](lru: var LRUCache[K, V], key: K): Option[V] = @@ -27,7 +28,7 @@ func put*[K, V](lru: var LRUCache[K, V], key: K, value: V) = if not node.isNil: lru.list.remove(node) else: - if lru.table.len >= lru.capacity: + if lru.len > 0 and lru.table.len >= lru.capacity: lru.table.del(lru.list.tail.value[0]) lru.list.remove(lru.list.tail) @@ -43,10 +44,13 @@ func len*[K, V](lru: LRUCache[K, V]): int = lru.table.len proc contains*[K, V](lru: LRUCache[K, V], k: K): bool = + ## Check for cached item - this doesn't touch the cache + ## + k in lru.table iterator items*[K, V](lru: LRUCache[K, V]): V = - ## This will not increment LRU/MRU access stats + ## Get cached items - this doesn't touch the cache ## for item in lru.list: