diff --git a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim index 862dccd..041fecd 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/protocol.nim @@ -75,17 +75,30 @@ import std/[tables, sets, options, math, sequtils, algorithm, strutils], - stew/shims/net as stewNet, json_serialization/std/net, - stew/[base64, endians2, results], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl, - metrics, - libp2p/[crypto/crypto, routing_record], - transport, messages, messages_encoding, node, - routing_table, spr, random2, ip_vote, nodes_verification, - providersmngr + stew/shims/net as stewNet, + json_serialization/std/net, + stew/[base64, endians2, results], + pkg/[chronicles, chronicles/chronos_tools], + pkg/chronos, + pkg/stint, + pkg/bearssl, + pkg/metrics + +import "."/[ + messages, + messages_encoding, + node, + routing_table, + spr, + random2, + ip_vote, + nodes_verification, + providers, + transport] import nimcrypto except toHex -export options, results, node, spr, providersmngr +export options, results, node, spr, providers declareCounter discovery_message_requests_outgoing, "Discovery protocol outgoing message requests", labels = ["response"] @@ -710,7 +723,7 @@ proc sendGetProviders(d: Protocol, toNode: Node, resp = await d.waitMessage(toNode, reqId) if resp.isSome(): - if resp.get().kind == providers: + if resp.get().kind == MessageKind.providers: d.routingTable.setJustSeen(toNode) return ok(resp.get().provs) else: @@ -740,7 +753,7 @@ proc getProviders*( d: Protocol, cId: NodeId, maxitems: int = 5, - timeout: timer.Duration = chronos.milliseconds(5000) + timeout: Duration = 5000.milliseconds ): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} = # What providers do we know about? @@ -1121,23 +1134,12 @@ proc open*(d: Protocol) {.raises: [Defect, CatchableError].} = d.seedTable() -proc start*(d: Protocol) = +proc start*(d: Protocol) {.async.} = d.refreshLoop = refreshLoop(d) d.revalidateLoop = revalidateLoop(d) d.ipMajorityLoop = ipMajorityLoop(d) -proc close*(d: Protocol) = - doAssert(not d.transport.closed) - - debug "Closing discovery node", node = d.localNode - if not d.revalidateLoop.isNil: - d.revalidateLoop.cancel() - if not d.refreshLoop.isNil: - d.refreshLoop.cancel() - if not d.ipMajorityLoop.isNil: - d.ipMajorityLoop.cancel() - - d.transport.close() + await d.providers.start() proc closeWait*(d: Protocol) {.async.} = doAssert(not d.transport.closed) diff --git a/libp2pdht/private/eth/p2p/discoveryv5/providers.nim b/libp2pdht/private/eth/p2p/discoveryv5/providers.nim new file mode 100644 index 0000000..5e0c2f7 --- /dev/null +++ b/libp2pdht/private/eth/p2p/discoveryv5/providers.nim @@ -0,0 +1,5 @@ +import ./providers/cache +import ./providers/maintenance +import ./providers/manager + +export cache, maintenance, manager diff --git a/libp2pdht/private/eth/p2p/discoveryv5/providers/cache.nim b/libp2pdht/private/eth/p2p/discoveryv5/providers/cache.nim new file mode 100644 index 0000000..4f0da5c --- /dev/null +++ b/libp2pdht/private/eth/p2p/discoveryv5/providers/cache.nim @@ -0,0 +1,108 @@ +# codex-dht - Codex DHT +# Copyright (c) 2022 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + +import std/sequtils + +import pkg/chronicles +import pkg/libp2p + +import ../node +import ../lru +import ./common + +const + MaxProvidersEntries* = 1000'u # one thousand records + MaxProvidersPerEntry* = 200'u # providers per entry + +logScope: + topics = "discv5 providers cache" + +type + Providers* = LRUCache[PeerId, SignedPeerRecord] + ItemsCache* = LRUCache[NodeId, Providers] + + ProvidersCache* = object + disable: bool + cache*: ItemsCache + maxProviders*: int + +func add*( + self: var ProvidersCache, + id: NodeId, + provider: SignedPeerRecord) = + + if self.disable: + return + + var providers = + if id notin self.cache: + Providers.init(self.maxProviders.int) + else: + self.cache.get(id).get() + + let + peerId = provider.data.peerId + + trace "Adding provider to cache", id, peerId + providers.put(peerId, provider) + self.cache.put(id, providers) + +proc get*( + self: var ProvidersCache, + id: NodeId, + start = 0, + stop = MaxProvidersPerEntry.int): seq[SignedPeerRecord] = + + if self.disable: + return + + if id in self.cache: + let + recs = self.cache.get(id).get + + let + providers = toSeq(recs)[start..= expired: + trace "Found expired record", key + keys.add(key) + without pairs =? key.fromCidKey(), err: + trace "Error extracting parts from cid key", key + continue + + if keys.len >= batchSize: + break + + if err =? (await store.delete(keys)).errorOption: + trace "Error cleaning up batch, records left intact!", size = keys.len, err = err.msg + + trace "Cleaned up expired records", size = keys.len + +proc cleanupOrphaned*( + store: Datastore, + batchSize = ExpiredCleanupBatch) {.async.} = + trace "Cleaning up orphaned records" + + let + providersQuery = Query.init(ProvidersKey) + + block: + without iter =? (await store.query(providersQuery)), err: + trace "Unable to obtain record for key" + return + + defer: + if not isNil(iter): + trace "Cleaning up query iterator" + discard (await iter.dispose()) + + var count = 0 + for item in iter: + if count >= batchSize: + trace "Batch cleaned up", size = batchSize + + count.inc + if pair =? (await item) and pair.key.isSome: + let + key = pair.key.get() + + without peerId =? key.fromProvKey(), err: + trace "Error extracting parts from cid key", key + continue + + without cidKey =? (CidKey / "*" / $peerId), err: + trace "Error building cid key", err = err.msg + continue + + without cidIter =? (await store.query(Query.init(cidKey, limit = 1))), err: + trace "Error querying key", cidKey + continue + + let + res = (await allFinished(toSeq(cidIter))) + .filterIt( it.completed ) + .mapIt( it.read.get ) + .filterIt( it.key.isSome ).len + + if not isNil(cidIter): + trace "Disposing cid iter" + discard (await cidIter.dispose()) + + if res > 0: + trace "Peer not orphaned, skipping", peerId + continue + + if err =? (await store.delete(key)).errorOption: + trace "Error deleting orphaned peer", err = err.msg + continue + + trace "Cleaned up orphaned peer", peerId diff --git a/libp2pdht/private/eth/p2p/discoveryv5/providersmngr.nim b/libp2pdht/private/eth/p2p/discoveryv5/providers/manager.nim similarity index 52% rename from libp2pdht/private/eth/p2p/discoveryv5/providersmngr.nim rename to libp2pdht/private/eth/p2p/discoveryv5/providers/manager.nim index d1e50d7..9f78eef 100644 --- a/libp2pdht/private/eth/p2p/discoveryv5/providersmngr.nim +++ b/libp2pdht/private/eth/p2p/discoveryv5/providers/manager.nim @@ -14,122 +14,37 @@ import pkg/libp2p import pkg/chronicles import pkg/stew/results as rs import pkg/stew/byteutils +import pkg/questionable import pkg/questionable/results {.push raises: [Defect].} -import ./lru -import ./node +import ./maintenance +import ./cache +import ./common +import ../spr -export node, lru, datastore +export cache, datastore logScope: topics = "discv5 providers manager" const - DefaultProviderTTL = 24.hours - - ProvidersKey* = Key.init("/providers").tryGet # keys is of the form /providers/peerid = provider - CidKey* = Key.init("/cids").tryGet # keys is of the form /cids/cid/peerid/ttl = ttl - - MaxProvidersEntries* = 1000'u # one thousand records - MaxProvidersPerEntry* = 200'u # providers per entry - - ZeroMoment = Moment.init(0, Nanosecond) # for conversion between Duration and Moment + DefaultProviderTTL* = 24.hours type - ProvidersCache* = LRUCache[PeerId, SignedPeerRecord] - ItemsCache* = LRUCache[NodeId, ProvidersCache] - ProvidersManager* = ref object of RootObj store*: Datastore - providers*: ItemsCache + cache*: ProvidersCache ttl*: Duration maxItems*: uint maxProviders*: uint disableCache*: bool - -proc mapFailure[T](err: T): ref CatchableError = - newException(CatchableError, $err) - -proc makeProviderKey(peerId: PeerId): ?!Key = - (ProvidersKey / $peerId) - -proc makeCidKey(cid: NodeId, peerId: PeerId): ?!Key = - (CidKey / cid.toHex / $peerId / "ttl") - -proc peerIdFromCidKey(key: string): ?!PeerId = - let - parts = key.split("/") - - if parts.len == 5: - return PeerId.init(parts[3]).mapErr(mapFailure) - - return failure("Unable to extract peer id from key") - -func addCache*( - self: ProvidersManager, - cid: NodeId, - provider: SignedPeerRecord) = - - if self.disableCache: - return - - var providers = - if cid notin self.providers: - ProvidersCache.init(self.maxProviders.int) - else: - self.providers.get(cid).get() - - let - peerId = provider.data.peerId - - trace "Adding provider to cache", cid, peerId - providers.put(peerId, provider) - self.providers.put(cid, providers) - -func getCache*( - self: ProvidersManager, - cid: NodeId, - limit = MaxProvidersPerEntry.int): seq[SignedPeerRecord] = - - if self.disableCache: - return - - if cid in self.providers: - let - recs = self.providers.get(cid).get - providers = toSeq(recs)[0.. 0: return success provs without cidKey =? (CidKey / id.toHex), err: return failure err.msg - without cidIter =? - (await self.store.query(Query.init(cidKey, limit = limit))), err: - return failure err.msg - - defer: - discard (await cidIter.dispose()) - - trace "Querying providers from persistent store", cid = id, key = cidKey + trace "Querying providers from persistent store", id, key = cidKey var providers: seq[SignedPeerRecord] - for item in cidIter: - # TODO: =? doesn't support tuples - if pair =? (await item) and pair.key.isSome: - let - (key, val) = (pair.key.get, pair.data) + block: + without cidIter =? + (await self.store.query(Query.init(cidKey, offset = start, limit = stop))), err: + return failure err.msg - without peerId =? key.id.peerIdFromCidKey() and - provKey =? makeProviderKey(peerId), err: - trace "Error creating key from provider record", err = err.msg - continue + defer: + if not isNil(cidIter): + trace "Cleaning up query iterator" + discard (await cidIter.dispose()) - trace "Querying provider key", key = provKey - without data =? (await self.store.get(provKey)): - trace "Error getting provider", key = provKey - continue + for item in cidIter: + # TODO: =? doesn't support tuples + if pair =? (await item) and pair.key.isSome: + let + (key, val) = (pair.key.get, pair.data) - without provider =? self.decode(data), err: - trace "Unable to decode provider from store", err = err.msg - continue + without pairs =? key.fromCidKey() and + provKey =? makeProviderKey(pairs.peerId), err: + trace "Error creating key from provider record", err = err.msg + continue - trace "Retrieved provider with key", key = provKey - providers.add(provider) - self.addCache(id, provider) + trace "Querying provider key", key = provKey + without data =? (await self.store.get(provKey)): + trace "Error getting provider", key = provKey + continue - trace "Retrieved providers from persistent store", cid = id, len = providers.len + without provider =? SignedPeerRecord.decode(data).mapErr(mapFailure), err: + trace "Unable to decode provider from store", err = err.msg + continue + + trace "Retrieved provider with key", key = provKey + providers.add(provider) + self.cache.add(id, provider) + + trace "Retrieved providers from persistent store", cid = id, len = providers.len return success providers proc contains*( @@ -265,38 +185,40 @@ proc contains*(self: ProvidersManager, cid: NodeId): Future[bool] {.async.} = let q = Query.init(cidKey, limit = 1) - without iter =? (await self.store.query(q)), err: - trace "Unable to obtain record for key", key = cidKey - return false + block: + without iter =? (await self.store.query(q)), err: + trace "Unable to obtain record for key", key = cidKey + return false - defer: - trace "Cleaning up query iterator" - discard (await iter.dispose()) + defer: + if not isNil(iter): + trace "Cleaning up query iterator" + discard (await iter.dispose()) - for item in iter: - if pair =? (await item) and pair.key.isSome: - return true + for item in iter: + if pair =? (await item) and pair.key.isSome: + return true return false proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} = - if cid in self.providers: - self.providers.del(cid) + self.cache.drop(cid) without cidKey =? (CidKey / $cid), err: return failure(err.msg) let q = Query.init(cidKey) - without iter =? (await self.store.query(q)), err: - trace "Unable to obtain record for key", key = cidKey - return failure(err.msg) - block: + without iter =? (await self.store.query(q)), err: + trace "Unable to obtain record for key", key = cidKey + return failure(err.msg) + defer: - trace "Cleaning up query iterator" - discard (await iter.dispose()) + if not isNil(iter): + trace "Cleaning up query iterator" + discard (await iter.dispose()) for item in iter: if pair =? (await item) and pair.key.isSome: @@ -305,11 +227,11 @@ proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} = trace "Error deleting record from persistent store", err = err.msg continue - without peerId =? key.id.peerIdFromCidKey, err: + without pairs =? key.fromCidKey, err: trace "Unable to parse peer id from key", key continue - self.removeCache(cid, peerId) + self.cache.remove(cid, pairs.peerId) trace "Deleted record from store", key return success() @@ -321,14 +243,15 @@ proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} = let q = Query.init(cidKey) - without iter =? (await self.store.query(q)), err: - trace "Unable to obtain record for key", key = cidKey - return failure(err.msg) - block: + without iter =? (await self.store.query(q)), err: + trace "Unable to obtain record for key", key = cidKey + return failure(err.msg) + defer: - trace "Cleaning up query iterator" - discard (await iter.dispose()) + if not isNil(iter): + trace "Cleaning up query iterator" + discard (await iter.dispose()) for item in iter: if pair =? (await item) and pair.key.isSome: @@ -344,7 +267,7 @@ proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} = let parts = key.id.split(datastore.Separator) - self.removeCache(NodeId.fromHex(parts[2]), peerId) + self.cache.remove(NodeId.fromHex(parts[2]), peerId) without provKey =? makeProviderKey(peerId), err: return failure(err.msg) @@ -357,31 +280,64 @@ proc remove*( cid: NodeId, peerId: PeerId): Future[?!void] {.async.} = - self.removeCache(cid, peerId) - + self.cache.remove(cid, peerId) without cidKey =? makeCidKey(cid, peerId), err: trace "Error creating key from content id", err = err.msg return failure err.msg return (await self.store.delete(cidKey)) +proc cleanupExpiredLoop(self: ProvidersManager) {.async.} = + try: + while self.started: + await self.store.cleanupExpired(self.batchSize) + await sleepAsync(self.cleanupInterval) + except CancelledError as exc: + trace "Cancelled expired cleanup job", err = exc.msg + except CatchableError as exc: + trace "Exception in expired cleanup job", err = exc.msg + raiseAssert "Exception in expired cleanup job" + +proc cleanupOrphanedLoop(self: ProvidersManager) {.async.} = + try: + while self.started: + await self.store.cleanupOrphaned(self.batchSize) + await sleepAsync(self.cleanupInterval) + except CancelledError as exc: + trace "Cancelled orphaned cleanup job", err = exc.msg + except CatchableError as exc: + trace "Exception in orphaned cleanup job", err = exc.msg + raiseAssert "Exception in orphaned cleanup job" + +proc start*(self: ProvidersManager) {.async.} = + self.started = true + self.expiredLoop = self.cleanupExpiredLoop + self.orphanedLoop = self.cleanupOrphanedLoop + +proc stop*(self: ProvidersManager) {.async.} = + await self.expiredLoop.cancelAndWait() + await self.orphanedLoop.cancelAndWait() + self.started = false + func new*( T: type ProvidersManager, store: Datastore, disableCache = false, ttl = DefaultProviderTTL, maxItems = MaxProvidersEntries, - maxProviders = MaxProvidersPerEntry): T = + maxProviders = MaxProvidersPerEntry, + batchSize = ExpiredCleanupBatch, + cleanupInterval = CleanupInterval): T = - var - self = T( - store: store, - ttl: ttl, - maxItems: maxItems, - maxProviders: maxProviders, - disableCache: disableCache) - - if not disableCache: - self.providers = ItemsCache.init(maxItems.int) - - self + T( + store: store, + ttl: ttl, + maxItems: maxItems, + maxProviders: maxProviders, + disableCache: disableCache, + batchSize: batchSize, + cleanupInterval: cleanupInterval, + cache: ProvidersCache.init( + size = maxItems, + maxProviders = maxProviders, + disable = disableCache)) diff --git a/tests/dht/test_providermngr.nim b/tests/dht/test_providermngr.nim index 5cb4aa7..7ec742c 100644 --- a/tests/dht/test_providermngr.nim +++ b/tests/dht/test_providermngr.nim @@ -8,8 +8,9 @@ import pkg/libp2p import libp2pdht/dht import libp2pdht/private/eth/p2p/discoveryv5/spr -import libp2pdht/private/eth/p2p/discoveryv5/providersmngr +import libp2pdht/private/eth/p2p/discoveryv5/providers import libp2pdht/discv5/node +import libp2pdht/private/eth/p2p/discoveryv5/lru import ./test_helper suite "Test Providers Manager simple": @@ -154,13 +155,13 @@ suite "Test providers with cache": (await (manager.remove(nodeIds[99]))).tryGet check: - nodeIds[0] notin manager.providers + nodeIds[0] notin manager.cache.cache not (await manager.contains(nodeIds[0])) - nodeIds[49] notin manager.providers + nodeIds[49] notin manager.cache.cache not (await manager.contains(nodeIds[49])) - nodeIds[99] notin manager.providers + nodeIds[99] notin manager.cache.cache not (await manager.contains(nodeIds[99])) test "Should remove by PeerId": @@ -170,16 +171,59 @@ suite "Test providers with cache": for id in nodeIds: check: - providers[0].data.peerId notin manager.providers.get(id).get + providers[0].data.peerId notin manager.cache.cache.get(id).get not (await manager.contains(id, providers[0].data.peerId)) - providers[5].data.peerId notin manager.providers.get(id).get + providers[5].data.peerId notin manager.cache.cache.get(id).get not (await manager.contains(id, providers[5].data.peerId)) - providers[9].data.peerId notin manager.providers.get(id).get + providers[9].data.peerId notin manager.cache.cache.get(id).get not (await manager.contains(id, providers[9].data.peerId)) check: not (await manager.contains(providers[0].data.peerId)) not (await manager.contains(providers[5].data.peerId)) not (await manager.contains(providers[9].data.peerId)) + +suite "Test Provider Maintenance": + let + rng = newRng() + privKeys = (0..<10).mapIt( PrivateKey.example(rng) ) + providers = privKeys.mapIt( it.toSignedPeerRecord() ) + nodeIds = (0..<100).mapIt( NodeId.example(rng) ) + + var + ds: SQLiteDatastore + manager: ProvidersManager + + setupAll: + ds = SQLiteDatastore.new(Memory).tryGet() + manager = ProvidersManager.new(ds, disableCache = true) + + for id in nodeIds: + for p in providers: + (await manager.add(id, p, ttl = 1.millis)).tryGet + + teardownAll: + (await ds.close()).tryGet() + ds = nil + manager = nil + + test "Should cleanup expired": + for id in nodeIds: + check: (await manager.get(id)).tryGet.len == 10 + + await sleepAsync(500.millis) + await manager.store.cleanupExpired() + + for id in nodeIds: + check: (await manager.get(id)).tryGet.len == 0 + + test "Should cleanup orphaned": + for id in nodeIds: + check: (await manager.get(id)).tryGet.len == 0 + + await manager.store.cleanupOrphaned() + + for p in providers: + check: not (await manager.contains(p.data.peerId)) diff --git a/tests/dht/test_providers.nim b/tests/dht/test_providers.nim index 25e9454..0889c08 100644 --- a/tests/dht/test_providers.nim +++ b/tests/dht/test_providers.nim @@ -35,7 +35,7 @@ proc bootstrapNodes( for i in 0.. 0: await sleepAsync(chronos.milliseconds(delay)) diff --git a/tests/discv5/test_discoveryv5.nim b/tests/discv5/test_discoveryv5.nim index 8d71535..78030cc 100644 --- a/tests/discv5/test_discoveryv5.nim +++ b/tests/discv5/test_discoveryv5.nim @@ -293,7 +293,7 @@ suite "Discovery v5 Tests": let bootNode = initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20301)) - bootNode.start() + await bootNode.start() var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount) nodes.add(bootNode) @@ -312,7 +312,7 @@ suite "Discovery v5 Tests": # check (await n.ping(t.localNode)).isOk() for i in 1 ..< nodeCount: - nodes[i].start() + await nodes[i].start() for i in 0..