From 69ae7c2012d5ae89eab5ed3d7813daedba4018d9 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 29 Sep 2022 19:49:55 -0400 Subject: [PATCH] Persist providers (#49) * initial providers manager implementation * misc license update * spelling * rename provider tests * add provider tests * reexport provider manager * only update provider record if it changed * small bug in getProvByKey * pass providers to both constructors * fix records retrieval * disable cache during tests * avoid redefining data * adding back tests with cache * use `.errorOption` * proper err msg unpacking --- libp2pdht.nimble | 4 +- .../private/eth/p2p/discoveryv5/encoding.nim | 7 +- .../private/eth/p2p/discoveryv5/ip_vote.nim | 2 +- .../private/eth/p2p/discoveryv5/messages.nim | 4 +- .../eth/p2p/discoveryv5/messages_encoding.nim | 2 +- .../private/eth/p2p/discoveryv5/node.nim | 4 +- .../private/eth/p2p/discoveryv5/protocol.nim | 90 ++-- .../eth/p2p/discoveryv5/providersmngr.nim | 387 ++++++++++++++++++ .../eth/p2p/discoveryv5/routing_table.nim | 4 +- .../private/eth/p2p/discoveryv5/sessions.nim | 4 +- .../private/eth/p2p/discoveryv5/transport.nim | 2 +- tests/dht/test_providermngr.nim | 185 +++++++++ tests/dht/test_providers.nim | 4 +- tests/discv5/test_discoveryv5.nim | 2 +- tests/testAll.nim | 4 +- 15 files changed, 648 insertions(+), 57 deletions(-) create mode 100644 libp2pdht/private/eth/p2p/discoveryv5/providersmngr.nim create mode 100644 tests/dht/test_providermngr.nim diff --git a/libp2pdht.nimble b/libp2pdht.nimble index 54cd671..f098058 100644 --- a/libp2pdht.nimble +++ b/libp2pdht.nimble @@ -19,7 +19,9 @@ requires "nim >= 1.2.0", "secp256k1 >= 0.5.2 & < 0.6.0", "stew#head", "stint", - "asynctest >= 0.3.1 & < 0.4.0" + "asynctest >= 0.3.1 & < 0.4.0", + "https://github.com/status-im/nim-datastore#head", + "questionable" task coverage, "generates code coverage report": var (output, exitCode) = gorgeEx("which lcov") diff --git a/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim b/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim index 8cf2fed..a7bc4b7 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/encoding.nim @@ -1,5 +1,5 @@ -# nim-eth - Node Discovery Protocol v5 -# Copyright (c) 2020-2021 Status Research & Development GmbH +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -336,8 +336,7 @@ proc encodeHandshakePacket*(rng: var BrHmacDrbgContext, c: var Codec, # Add SPR of sequence number is newer if whoareyouData.recordSeq < c.localNode.record.seqNum: let encoded = ? c.localNode.record.encode.mapErr((e: CryptoError) => - ("Failed to encode local node's SignedPeerRecord: " & - $e).cstring) + ("Failed to encode local node's SignedPeerRecord: " & $e).cstring) authdata.add(encoded) let secrets = ? deriveKeys( diff --git a/libp2pdht/private/eth/p2p/discoveryv5/ip_vote.nim b/libp2pdht/private/eth/p2p/discoveryv5/ip_vote.nim index 20e8609..8a57f8f 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/ip_vote.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/ip_vote.nim @@ -1,4 +1,4 @@ -# nim-eth - Node Discovery Protocol v5 +# codex-dht - Codex DHT # Copyright (c) 2021 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). diff --git a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim index a27cedc..9573e08 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/messages.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/messages.nim @@ -1,5 +1,5 @@ -# nim-eth - Node Discovery Protocol v5 -# Copyright (c) 2020-2021 Status Research & Development GmbH +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim b/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim index 74ee990..6de6596 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/messages_encoding.nim @@ -1,4 +1,4 @@ -# nim-eth - Node Discovery Protocol v5 +# codex-dht - Codex DHT # Copyright (c) 2020-2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). diff --git a/libp2pdht/private/eth/p2p/discoveryv5/node.nim b/libp2pdht/private/eth/p2p/discoveryv5/node.nim index 0ee7327..8e1b153 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/node.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/node.nim @@ -1,5 +1,5 @@ -# nim-eth - Node Discovery Protocol v5 -# Copyright (c) 2020-2021 Status Research & Development GmbH +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index 1e03779..862dccd 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -1,5 +1,5 @@ -# nim-eth - Node Discovery Protocol v5 -# Copyright (c) 2020-2021 Status Research & Development GmbH +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -79,11 +79,13 @@ import stew/[base64, endians2, results], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl, metrics, libp2p/[crypto/crypto, routing_record], - "."/[transport, messages, messages_encoding, node, routing_table, spr, random2, ip_vote, nodes_verification, lru] + transport, messages, messages_encoding, node, + routing_table, spr, random2, ip_vote, nodes_verification, + providersmngr import nimcrypto except toHex -export options, results, node, spr +export options, results, node, spr, providersmngr declareCounter discovery_message_requests_outgoing, "Discovery protocol outgoing message requests", labels = ["response"] @@ -141,9 +143,6 @@ type tableIpLimits*: TableIpLimits bitsPerHop*: int - ProvidersCache = LRUCache[PeerId, SignedPeerRecord] - ItemsCache = LRUCache[NodeId, ProvidersCache] - Protocol* = ref object localNode*: Node privateKey: PrivateKey @@ -160,7 +159,7 @@ type talkProtocols*: Table[seq[byte], TalkProtocol] # TODO: Table is a bit of # overkill here, use sequence rng*: ref BrHmacDrbgContext - providers: ItemsCache + providers: ProvidersManager TalkProtocolHandler* = proc(p: TalkProtocol, request: seq[byte], fromId: NodeId, fromUdpAddress: Address): seq[byte] {.gcsafe, raises: [Defect].} @@ -258,7 +257,7 @@ proc updateRecord*( newSpr = spr.get() seqNo = d.localNode.record.seqNum - info "Updated discovery SPR", uri = toURI(newSpr) + info "Updated discovery SPR", uri = newSpr.toURI() d.localNode.record = newSpr d.localNode.record.data.seqNo = seqNo @@ -353,17 +352,11 @@ proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address, kind = MessageKind.talkresp d.sendResponse(fromId, fromAddr, talkresp, reqId) -proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) = +proc addProviderLocal(p: Protocol, cId: NodeId, prov: SignedPeerRecord) {.async.} = trace "adding provider to local db", n = p.localNode, cId, prov - var providers = - if cId notin p.providers: - ProvidersCache.init(MaxProvidersPerEntry) - else: - p.providers.get(cId).get() - - providers.put(prov.data.peerId, prov) - p.providers.put(cId, providers) + if (let res = (await p.providers.add(cid, prov)); res.isErr): + trace "Unable to add provider", cid, peerId = prov.data.peerId proc handleAddProvider( d: Protocol, @@ -371,23 +364,26 @@ proc handleAddProvider( fromAddr: Address, addProvider: AddProviderMessage, reqId: RequestId) = - d.addProviderLocal(addProvider.cId, addProvider.prov) + asyncSpawn d.addProviderLocal(addProvider.cId, addProvider.prov) proc handleGetProviders( d: Protocol, fromId: NodeId, fromAddr: Address, getProviders: GetProvidersMessage, - reqId: RequestId) = + reqId: RequestId) {.async.} = #TODO: add checks, add signed version - let provs = d.providers.get(getProviders.cId) - if provs.isSome: - trace "providers:", providers = toSeq(provs.get()) + let + provs = await d.providers.get(getProviders.cId) - ##TODO: handle multiple messages - let response = ProvidersMessage(total: 1, provs: toSeq(provs.get())) - d.sendResponse(fromId, fromAddr, response, reqId) + if provs.isErr: + trace "Unable to get providers", cid = getProviders.cId, err = provs.error.msg + return + + ##TODO: handle multiple messages + let response = ProvidersMessage(total: 1, provs: provs.get) + d.sendResponse(fromId, fromAddr, response, reqId) proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, message: Message) = @@ -410,7 +406,7 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address, d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId) of getProviders: discovery_message_requests_incoming.inc() - d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) + asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId) of regTopic, topicQuery: discovery_message_requests_incoming.inc() discovery_message_requests_incoming.inc(labelValues = ["no_response"]) @@ -698,7 +694,7 @@ proc addProvider*( if toNode != d.localNode: discard d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr)) else: - d.addProviderLocal(cId, pr) + asyncSpawn d.addProviderLocal(cId, pr) return res @@ -732,11 +728,13 @@ proc getProvidersLocal*( d: Protocol, cId: NodeId, maxitems: int = 5, - ): seq[SignedPeerRecord] {.raises: [KeyError,Defect].} = + ): Future[seq[SignedPeerRecord]] {.async.} = - return - if (cId in d.providers): toSeq(d.providers.get(cId).get()) - else: @[] + let provs = await d.providers.get(cId) + if provs.isErr: + trace "Unable to get local providers", cId, err = provs.error.msg + + return provs.get proc getProviders*( d: Protocol, @@ -746,7 +744,7 @@ proc getProviders*( ): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} = # What providers do we know about? - var res = d.getProvidersLocal(cId, maxitems) + var res = await d.getProvidersLocal(cId, maxitems) trace "local providers:", prov = res.mapIt(it) let nodesNearby = await d.lookup(cId) @@ -1014,7 +1012,8 @@ proc newProtocol*( bindIp = IPv4_any(), enrAutoUpdate = false, config = defaultDiscoveryConfig, - rng = newRng()): + rng = newRng(), + providers: ProvidersManager = nil): Protocol = # TODO: Tried adding bindPort = udpPort as parameter but that gave # "Error: internal error: environment misses: udpPort" in nim-beacon-chain. @@ -1058,6 +1057,13 @@ proc newProtocol*( config.tableIpLimits, rng) + providers = + if providers.isNil: + # TODO: There should be a passthrough datastore + ProvidersManager.new(SQLiteDatastore.new(Memory).expect("Should not fail!")) + else: + providers + result = Protocol( privateKey: privKey, localNode: node, @@ -1066,7 +1072,7 @@ proc newProtocol*( enrAutoUpdate: enrAutoUpdate, routingTable: routingTable, rng: rng, - providers: ItemsCache.init(MaxProvidersEntries)) + providers: providers) result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) @@ -1077,7 +1083,8 @@ proc newProtocol*( bootstrapRecords: openArray[SignedPeerRecord] = [], bindIp = IPv4_any(), config = defaultDiscoveryConfig, - rng = newRng()): + rng = newRng(), + providers: ProvidersManager = nil): Protocol = info "Discovery SPR initialized", seqNum = record.seqNum, uri = toURI(record) let node = newNode(record).expect("Properly initialized record") @@ -1085,6 +1092,14 @@ proc newProtocol*( # TODO Consider whether this should be a Defect doAssert rng != nil, "RNG initialization failed" + let + providers = + if providers.isNil: + # TODO: There should be a passthrough datastore + ProvidersManager.new(SQLiteDatastore.new(Memory).expect("Should not fail!")) + else: + providers + result = Protocol( privateKey: privKey, localNode: node, @@ -1093,7 +1108,8 @@ proc newProtocol*( enrAutoUpdate: false, #TODO this should be removed from nim-libp2p-dht routingTable: RoutingTable.init( node, config.bitsPerHop, config.tableIpLimits, rng), - rng: rng) + rng: rng, + providers: providers) result.transport = newTransport(result, privKey, node, bindPort, bindIp, rng) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/providersmngr.nim b/libp2pdht/private/eth/p2p/discoveryv5/providersmngr.nim new file mode 100644 index 0000000..d1e50d7 --- /dev/null +++ b/libp2pdht/private/eth/p2p/discoveryv5/providersmngr.nim @@ -0,0 +1,387 @@ +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +import std/sequtils +import std/strutils + +import pkg/datastore +import pkg/chronos +import pkg/libp2p +import pkg/chronicles +import pkg/stew/results as rs +import pkg/stew/byteutils +import pkg/questionable/results + +{.push raises: [Defect].} + +import ./lru +import ./node + +export node, lru, datastore + +logScope: + topics = "discv5 providers manager" + +const + DefaultProviderTTL = 24.hours + + ProvidersKey* = Key.init("/providers").tryGet # keys is of the form /providers/peerid = provider + CidKey* = Key.init("/cids").tryGet # keys is of the form /cids/cid/peerid/ttl = ttl + + MaxProvidersEntries* = 1000'u # one thousand records + MaxProvidersPerEntry* = 200'u # providers per entry + + ZeroMoment = Moment.init(0, Nanosecond) # for conversion between Duration and Moment + +type + ProvidersCache* = LRUCache[PeerId, SignedPeerRecord] + ItemsCache* = LRUCache[NodeId, ProvidersCache] + + ProvidersManager* = ref object of RootObj + store*: Datastore + providers*: ItemsCache + ttl*: Duration + maxItems*: uint + maxProviders*: uint + disableCache*: bool + +proc mapFailure[T](err: T): ref CatchableError = + newException(CatchableError, $err) + +proc makeProviderKey(peerId: PeerId): ?!Key = + (ProvidersKey / $peerId) + +proc makeCidKey(cid: NodeId, peerId: PeerId): ?!Key = + (CidKey / cid.toHex / $peerId / "ttl") + +proc peerIdFromCidKey(key: string): ?!PeerId = + let + parts = key.split("/") + + if parts.len == 5: + return PeerId.init(parts[3]).mapErr(mapFailure) + + return failure("Unable to extract peer id from key") + +func addCache*( + self: ProvidersManager, + cid: NodeId, + provider: SignedPeerRecord) = + + if self.disableCache: + return + + var providers = + if cid notin self.providers: + ProvidersCache.init(self.maxProviders.int) + else: + self.providers.get(cid).get() + + let + peerId = provider.data.peerId + + trace "Adding provider to cache", cid, peerId + providers.put(peerId, provider) + self.providers.put(cid, providers) + +func getCache*( + self: ProvidersManager, + cid: NodeId, + limit = MaxProvidersPerEntry.int): seq[SignedPeerRecord] = + + if self.disableCache: + return + + if cid in self.providers: + let + recs = self.providers.get(cid).get + providers = toSeq(recs)[0.. ZeroDuration: + ttl + else: + Moment.fromNow(self.ttl) - ZeroMoment + + ttl = expires.microseconds.uint64.toBytesBE + + bytes: seq[byte] = + if existing =? (await self.getProvByKey(provKey)) and + existing.data.seqNo >= provider.data.seqNo: + trace "Provider with same seqNo already exist", seqNo = $provider.data.seqNo + @[] + else: + without bytes =? provider.envelope.encode: + trace "Enable to encode provider" + return failure "Unable to encode provider" + bytes + + if bytes.len > 0: + trace "Adding or updating provider record", cid, peerId + if err =? (await self.store.put(provKey, bytes)).errorOption: + trace "Unable to store provider with key", key = provKey, err = err.msg + + trace "Adding or updating cid", cid, key = cidKey, ttl = expires.minutes + if err =? (await self.store.put(cidKey, @ttl)).errorOption: + trace "Unable to store provider with key", key = cidKey, err = err.msg + return + + self.addCache(cid, provider) + + trace "Provider for cid added", cidKey, provKey + return success() + +proc get*( + self: ProvidersManager, + id: NodeId, + limit = MaxProvidersPerEntry.int): Future[?!seq[SignedPeerRecord]] {.async.} = + trace "Retrieving providers from persistent store", cid = id + + let provs = self.getCache(id, limit) + if provs.len > 0: + return success provs + + without cidKey =? (CidKey / id.toHex), err: + return failure err.msg + + without cidIter =? + (await self.store.query(Query.init(cidKey, limit = limit))), err: + return failure err.msg + + defer: + discard (await cidIter.dispose()) + + trace "Querying providers from persistent store", cid = id, key = cidKey + var + providers: seq[SignedPeerRecord] + + for item in cidIter: + # TODO: =? doesn't support tuples + if pair =? (await item) and pair.key.isSome: + let + (key, val) = (pair.key.get, pair.data) + + without peerId =? key.id.peerIdFromCidKey() and + provKey =? makeProviderKey(peerId), err: + trace "Error creating key from provider record", err = err.msg + continue + + trace "Querying provider key", key = provKey + without data =? (await self.store.get(provKey)): + trace "Error getting provider", key = provKey + continue + + without provider =? self.decode(data), err: + trace "Unable to decode provider from store", err = err.msg + continue + + trace "Retrieved provider with key", key = provKey + providers.add(provider) + self.addCache(id, provider) + + trace "Retrieved providers from persistent store", cid = id, len = providers.len + return success providers + +proc contains*( + self: ProvidersManager, + id: NodeId, + peerId: PeerId): Future[bool] {.async.} = + without key =? makeCidKey(id, peerId), err: + return false + + return (await self.store.contains(key)) |? false + +proc contains*(self: ProvidersManager, peerId: PeerId): Future[bool] {.async.} = + without provKey =? makeProviderKey(peerId), err: + return false + + return (await self.store.contains(provKey)) |? false + +proc contains*(self: ProvidersManager, cid: NodeId): Future[bool] {.async.} = + without cidKey =? (CidKey / $cid), err: + return false + + let + q = Query.init(cidKey, limit = 1) + + without iter =? (await self.store.query(q)), err: + trace "Unable to obtain record for key", key = cidKey + return false + + defer: + trace "Cleaning up query iterator" + discard (await iter.dispose()) + + for item in iter: + if pair =? (await item) and pair.key.isSome: + return true + + return false + +proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} = + if cid in self.providers: + self.providers.del(cid) + + without cidKey =? (CidKey / $cid), err: + return failure(err.msg) + + let + q = Query.init(cidKey) + + without iter =? (await self.store.query(q)), err: + trace "Unable to obtain record for key", key = cidKey + return failure(err.msg) + + block: + defer: + trace "Cleaning up query iterator" + discard (await iter.dispose()) + + for item in iter: + if pair =? (await item) and pair.key.isSome: + let key = pair.key.get() + if err =? (await self.store.delete(key)).errorOption: + trace "Error deleting record from persistent store", err = err.msg + continue + + without peerId =? key.id.peerIdFromCidKey, err: + trace "Unable to parse peer id from key", key + continue + + self.removeCache(cid, peerId) + trace "Deleted record from store", key + + return success() + +proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} = + without cidKey =? (CidKey / "*" / $peerId), err: + return failure(err.msg) + + let + q = Query.init(cidKey) + + without iter =? (await self.store.query(q)), err: + trace "Unable to obtain record for key", key = cidKey + return failure(err.msg) + + block: + defer: + trace "Cleaning up query iterator" + discard (await iter.dispose()) + + for item in iter: + if pair =? (await item) and pair.key.isSome: + let + key = pair.key.get() + + if err =? (await self.store.delete(key)).errorOption: + trace "Error deleting record from persistent store", err = err.msg + continue + + trace "Deleted record from store", key + + let + parts = key.id.split(datastore.Separator) + + self.removeCache(NodeId.fromHex(parts[2]), peerId) + + without provKey =? makeProviderKey(peerId), err: + return failure(err.msg) + + trace "Removing provider record", key = provKey + return (await self.store.delete(provKey)) + +proc remove*( + self: ProvidersManager, + cid: NodeId, + peerId: PeerId): Future[?!void] {.async.} = + + self.removeCache(cid, peerId) + + without cidKey =? makeCidKey(cid, peerId), err: + trace "Error creating key from content id", err = err.msg + return failure err.msg + + return (await self.store.delete(cidKey)) + +func new*( + T: type ProvidersManager, + store: Datastore, + disableCache = false, + ttl = DefaultProviderTTL, + maxItems = MaxProvidersEntries, + maxProviders = MaxProvidersPerEntry): T = + + var + self = T( + store: store, + ttl: ttl, + maxItems: maxItems, + maxProviders: maxProviders, + disableCache: disableCache) + + if not disableCache: + self.providers = ItemsCache.init(maxItems.int) + + self diff --git a/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim b/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim index eb80210..059241e 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/routing_table.nim @@ -1,5 +1,5 @@ -# nim-eth - Node Discovery Protocol v5 -# Copyright (c) 2020-2021 Status Research & Development GmbH +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/libp2pdht/private/eth/p2p/discoveryv5/sessions.nim b/libp2pdht/private/eth/p2p/discoveryv5/sessions.nim index 5b52d17..46a5b3d 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/sessions.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/sessions.nim @@ -1,5 +1,5 @@ -# nim-eth - Node Discovery Protocol v5 -# Copyright (c) 2020-2021 Status Research & Development GmbH +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim index 3fd1d84..3d7a1e4 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/transport.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/transport.nim @@ -1,4 +1,4 @@ -# Copyright (c) 2020-2021 Status Research & Development GmbH +# Copyright (c) 2022 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). diff --git a/tests/dht/test_providermngr.nim b/tests/dht/test_providermngr.nim new file mode 100644 index 0000000..5cb4aa7 --- /dev/null +++ b/tests/dht/test_providermngr.nim @@ -0,0 +1,185 @@ + +import std/sequtils + +import pkg/chronos +import pkg/asynctest +import pkg/datastore +import pkg/libp2p + +import libp2pdht/dht +import libp2pdht/private/eth/p2p/discoveryv5/spr +import libp2pdht/private/eth/p2p/discoveryv5/providersmngr +import libp2pdht/discv5/node +import ./test_helper + +suite "Test Providers Manager simple": + let + ds = SQLiteDatastore.new(Memory).tryGet() + manager = ProvidersManager.new(ds, disableCache = true) + rng = newRng() + privKey = PrivateKey.example(rng) + provider = privKey.toSignedPeerRecord() + nodeId = NodeId.example(rng) + + teardownAll: + (await ds.close()).tryGet() + + test "Should add provider": + (await manager.add(nodeId, provider)).tryGet + + test "Should get provider": + let + prov = (await manager.get(nodeId)).tryGet + + check prov[0] == provider + + test "Should check provider presence": + check: + (await manager.contains(nodeId)) + (await manager.contains(provider.data.peerId)) + (await manager.contains(nodeId, provider.data.peerId)) + + test "Should update provider with newer seqno": + var + updated = provider + + updated.incSeqNo(privKey).tryGet + (await manager.add(nodeId, updated)).tryGet + let prov = (await manager.get(nodeId)).tryGet + check prov[0] == updated + + test "Should remove single record by NodeId and PeerId": + check: + (await manager.contains(nodeId)) + (await manager.contains(provider.data.peerId)) + + (await (manager.remove(nodeId, provider.data.peerId))).tryGet + + check: + not (await manager.contains(nodeId, provider.data.peerId)) + +suite "Test Providers Manager multiple": + let + rng = newRng() + privKeys = (0..<10).mapIt( PrivateKey.example(rng) ) + providers = privKeys.mapIt( it.toSignedPeerRecord() ) + nodeIds = (0..<100).mapIt( NodeId.example(rng) ) + + var + ds: SQLiteDatastore + manager: ProvidersManager + + setup: + ds = SQLiteDatastore.new(Memory).tryGet() + manager = ProvidersManager.new(ds, disableCache = true) + + for id in nodeIds: + for p in providers: + (await manager.add(id, p)).tryGet + + teardown: + (await ds.close()).tryGet() + ds = nil + manager = nil + + test "Should retrieve multiple records": + for id in nodeIds: + check: (await manager.get(id)).tryGet.len == 10 + + test "Should retrieve multiple records with limit": + for id in nodeIds: + check: (await manager.get(id, 5)).tryGet.len == 5 + + test "Should remove by NodeId": + (await (manager.remove(nodeIds[0]))).tryGet + (await (manager.remove(nodeIds[49]))).tryGet + (await (manager.remove(nodeIds[99]))).tryGet + + check: + not (await manager.contains(nodeIds[0])) + not (await manager.contains(nodeIds[49])) + not (await manager.contains(nodeIds[99])) + + test "Should remove by PeerId": + (await (manager.remove(providers[0].data.peerId))).tryGet + (await (manager.remove(providers[5].data.peerId))).tryGet + (await (manager.remove(providers[9].data.peerId))).tryGet + + for id in nodeIds: + check: + not (await manager.contains(id, providers[0].data.peerId)) + not (await manager.contains(id, providers[5].data.peerId)) + not (await manager.contains(id, providers[9].data.peerId)) + + check: + not (await manager.contains(providers[0].data.peerId)) + not (await manager.contains(providers[5].data.peerId)) + not (await manager.contains(providers[9].data.peerId)) + +suite "Test providers with cache": + let + rng = newRng() + privKeys = (0..<10).mapIt( PrivateKey.example(rng) ) + providers = privKeys.mapIt( it.toSignedPeerRecord() ) + nodeIds = (0..<100).mapIt( NodeId.example(rng) ) + + var + ds: SQLiteDatastore + manager: ProvidersManager + + setup: + ds = SQLiteDatastore.new(Memory).tryGet() + manager = ProvidersManager.new(ds) + + for id in nodeIds: + for p in providers: + (await manager.add(id, p)).tryGet + + teardown: + (await ds.close()).tryGet() + ds = nil + manager = nil + + test "Should retrieve multiple records": + for id in nodeIds: + check: (await manager.get(id)).tryGet.len == 10 + + test "Should retrieve multiple records with limit": + for id in nodeIds: + check: (await manager.get(id, 5)).tryGet.len == 5 + + test "Should remove by NodeId": + (await (manager.remove(nodeIds[0]))).tryGet + (await (manager.remove(nodeIds[49]))).tryGet + (await (manager.remove(nodeIds[99]))).tryGet + + check: + nodeIds[0] notin manager.providers + not (await manager.contains(nodeIds[0])) + + nodeIds[49] notin manager.providers + not (await manager.contains(nodeIds[49])) + + nodeIds[99] notin manager.providers + not (await manager.contains(nodeIds[99])) + + test "Should remove by PeerId": + (await (manager.remove(providers[0].data.peerId))).tryGet + (await (manager.remove(providers[5].data.peerId))).tryGet + (await (manager.remove(providers[9].data.peerId))).tryGet + + for id in nodeIds: + check: + providers[0].data.peerId notin manager.providers.get(id).get + not (await manager.contains(id, providers[0].data.peerId)) + + providers[5].data.peerId notin manager.providers.get(id).get + not (await manager.contains(id, providers[5].data.peerId)) + + providers[9].data.peerId notin manager.providers.get(id).get + not (await manager.contains(id, providers[9].data.peerId)) + + check: + not (await manager.contains(providers[0].data.peerId)) + not (await manager.contains(providers[5].data.peerId)) + not (await manager.contains(providers[9].data.peerId)) diff --git a/tests/dht/test_providers.nim b/tests/dht/test_providers.nim index 09d3f03..25e9454 100644 --- a/tests/dht/test_providers.nim +++ b/tests/dht/test_providers.nim @@ -99,7 +99,7 @@ suite "Providers Tests: node alone": debug "---- STARTING CHECKS ---" check (addedTo.len == 1) check (addedTo[0].id == node0.localNode.id) - check (node0.getProvidersLocal(targetId)[0].data.peerId == peerRec0.peerId) + check ((await node0.getProvidersLocal(targetId))[0].data.peerId == peerRec0.peerId) test "Node in isolation should retrieve": @@ -224,7 +224,7 @@ suite "Providers Tests: 20 nodes": debug "Providers:", providers check (providers.len == 1 and providers[0].data.peerId == peerRec0.peerId) - test "20 nodes, retieve after bootnode dies": + test "20 nodes, retrieve after bootnodes dies": debug "---- KILLING BOOTSTRAP NODE ---" let (node0, _) = nodes[0] let (node18, _) = nodes[^2] diff --git a/tests/discv5/test_discoveryv5.nim b/tests/discv5/test_discoveryv5.nim index 61177b1..8d71535 100644 --- a/tests/discv5/test_discoveryv5.nim +++ b/tests/discv5/test_discoveryv5.nim @@ -5,7 +5,7 @@ import chronos, chronicles, stint, asynctest, stew/shims/net, stew/byteutils, bearssl, libp2p/crypto/crypto, - libp2pdht/discv5/[transport, spr, node, routing_table, encoding, sessions, messages, nodes_verification], + libp2pdht/discv5/[transport, spr, node, routing_table, encoding, sessions, nodes_verification], libp2pdht/discv5/crypto as dhtcrypto, libp2pdht/discv5/protocol as discv5_protocol, ../dht/test_helper diff --git a/tests/testAll.nim b/tests/testAll.nim index ecb7977..f9fc17c 100644 --- a/tests/testAll.nim +++ b/tests/testAll.nim @@ -1,3 +1,5 @@ import - ./dht/test_providers, + ./dht/[test_providers, test_providermngr], ./discv5/[test_discoveryv5, test_discoveryv5_encoding] + +{.warning[UnusedImport]: off.}