From dd4985435a7ec9f5f8d83758ba70a79d8cf5ba00 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Fri, 17 Nov 2023 16:01:16 -0600 Subject: [PATCH] Fix timeout and delete (#86) * use unix time for ttl * don't remove all entries on peer removal * cleanup questionable tuple destructure * ignore vscode * fix endians decoding * allow removing by peerId * invalidate cache by peerId on remove * update test --- .gitignore | 2 +- codexdht/private/eth/p2p/discoveryv5/lru.nim | 7 + .../eth/p2p/discoveryv5/providers/cache.nim | 58 ++++++--- .../p2p/discoveryv5/providers/maintenance.nim | 23 ++-- .../eth/p2p/discoveryv5/providers/manager.nim | 121 +++++++++--------- tests/dht/test_providermngr.nim | 49 ++++++- 6 files changed, 163 insertions(+), 97 deletions(-) diff --git a/.gitignore b/.gitignore index 0314fb6..8bda8b1 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,4 @@ vendor/* NimBinaries .update.timestamp *.dSYM -*.vscode/* +.vscode/* diff --git a/codexdht/private/eth/p2p/discoveryv5/lru.nim b/codexdht/private/eth/p2p/discoveryv5/lru.nim index 6fbfdf6..2fa8a11 100644 --- a/codexdht/private/eth/p2p/discoveryv5/lru.nim +++ b/codexdht/private/eth/p2p/discoveryv5/lru.nim @@ -55,3 +55,10 @@ iterator items*[K, V](lru: LRUCache[K, V]): V = for item in lru.list: yield item[1] + +iterator keys*[K, V](lru: LRUCache[K, V]): K = + ## Get cached keys - this doesn't touch the cache + ## + + for item in lru.table.keys: + yield item diff --git a/codexdht/private/eth/p2p/discoveryv5/providers/cache.nim b/codexdht/private/eth/p2p/discoveryv5/providers/cache.nim index 4f0da5c..78c0ee2 100644 --- a/codexdht/private/eth/p2p/discoveryv5/providers/cache.nim +++ b/codexdht/private/eth/p2p/discoveryv5/providers/cache.nim @@ -11,6 +11,7 @@ import std/sequtils import pkg/chronicles import pkg/libp2p +import pkg/questionable import ../node import ../lru @@ -35,22 +36,21 @@ type func add*( self: var ProvidersCache, id: NodeId, - provider: SignedPeerRecord) = + record: SignedPeerRecord) = + ## Add providers for an id + ## to the cache if self.disable: return - var providers = - if id notin self.cache: - Providers.init(self.maxProviders.int) - else: - self.cache.get(id).get() + without var providers =? self.cache.get(id): + providers = Providers.init(self.maxProviders.int) let - peerId = provider.data.peerId + peerId = record.data.peerId - trace "Adding provider to cache", id, peerId - providers.put(peerId, provider) + trace "Adding provider record to cache", id, peerId + providers.put(peerId, record) self.cache.put(id, providers) proc get*( @@ -58,14 +58,13 @@ proc get*( id: NodeId, start = 0, stop = MaxProvidersPerEntry.int): seq[SignedPeerRecord] = + ## Get providers for an id + ## from the cache if self.disable: return - if id in self.cache: - let - recs = self.cache.get(id).get - + if recs =? self.cache.get(id): let providers = toSeq(recs)[start..= expired: trace "Found expired record", key - keys.add(key.get) - without pairs =? key.get.fromCidKey(), err: + keys.add(key) + without pairs =? key.fromCidKey(), err: trace "Error extracting parts from cid key", key continue @@ -92,9 +93,9 @@ proc cleanupOrphaned*( trace "Batch cleaned up", size = batchSize count.inc - if (key, _) =? (await item) and key.isSome: - without peerId =? key.get.fromProvKey(), err: - trace "Error extracting parts from cid key", key = key.get + if (maybeKey, _) =? (await item) and key =? maybeKey: + without peerId =? key.fromProvKey(), err: + trace "Error extracting parts from cid key", key continue without cidKey =? (CidKey / "*" / $peerId), err: @@ -121,7 +122,7 @@ proc cleanupOrphaned*( trace "Peer not orphaned, skipping", peerId continue - if err =? (await store.delete(key.get)).errorOption: + if err =? (await store.delete(key)).errorOption: trace "Error deleting orphaned peer", err = err.msg continue diff --git a/codexdht/private/eth/p2p/discoveryv5/providers/manager.nim b/codexdht/private/eth/p2p/discoveryv5/providers/manager.nim index e4e6a7d..44ba094 100644 --- a/codexdht/private/eth/p2p/discoveryv5/providers/manager.nim +++ b/codexdht/private/eth/p2p/discoveryv5/providers/manager.nim @@ -7,6 +7,7 @@ import std/sequtils import std/strutils +from std/times import now, utc, toTime, toUnix import pkg/stew/endians2 import pkg/datastore @@ -57,30 +58,30 @@ proc getProvByKey*(self: ProvidersManager, key: Key): Future[?!SignedPeerRecord] proc add*( self: ProvidersManager, - cid: NodeId, + id: NodeId, provider: SignedPeerRecord, ttl = ZeroDuration): Future[?!void] {.async.} = let peerId = provider.data.peerId - trace "Adding provider to persistent store", cid, peerId + trace "Adding provider to persistent store", id, peerId without provKey =? makeProviderKey(peerId), err: trace "Error creating key from provider record", err = err.msg return failure err.msg - without cidKey =? makeCidKey(cid, peerId), err: + without cidKey =? makeCidKey(id, peerId), err: trace "Error creating key from content id", err = err.msg return failure err.msg let + now = times.now().utc().toTime().toUnix() expires = if ttl > ZeroDuration: - ttl + ttl.seconds + now else: - Moment.fromNow(self.ttl) - ZeroMoment - - ttl = endians2.toBytesBE(expires.microseconds.uint64) + self.ttl.seconds + now + ttl = endians2.toBytesBE(expires.uint64) bytes: seq[byte] = if existing =? (await self.getProvByKey(provKey)) and @@ -94,17 +95,17 @@ proc add*( bytes if bytes.len > 0: - trace "Adding or updating provider record", cid, peerId + trace "Adding or updating provider record", id, peerId if err =? (await self.store.put(provKey, bytes)).errorOption: trace "Unable to store provider with key", key = provKey, err = err.msg - trace "Adding or updating cid", cid, key = cidKey, ttl = expires.minutes + trace "Adding or updating id", id, key = cidKey, ttl = expires.seconds if err =? (await self.store.put(cidKey, @ttl)).errorOption: trace "Unable to store provider with key", key = cidKey, err = err.msg return - self.cache.add(cid, provider) - trace "Provider for cid added", cidKey, provKey + self.cache.add(id, provider) + trace "Provider for id added", cidKey, provKey return success() proc get*( @@ -137,12 +138,10 @@ proc get*( trace "Cleaning up query iterator" discard (await cidIter.dispose()) + var keys: seq[Key] for item in cidIter: # TODO: =? doesn't support tuples - if pair =? (await item) and pair.key.isSome: - let - (key, val) = (pair.key.get, pair.data) - + if (maybeKey, val) =? (await item) and key =? maybeKey: without pairs =? key.fromCidKey() and provKey =? makeProviderKey(pairs.peerId), err: trace "Error creating key from provider record", err = err.msg @@ -151,17 +150,24 @@ proc get*( trace "Querying provider key", key = provKey without data =? (await self.store.get(provKey)): trace "Error getting provider", key = provKey + keys.add(key) continue without provider =? SignedPeerRecord.decode(data).mapErr(mapFailure), err: trace "Unable to decode provider from store", err = err.msg + keys.add(key) continue trace "Retrieved provider with key", key = provKey providers.add(provider) self.cache.add(id, provider) - trace "Retrieved providers from persistent store", cid = id, len = providers.len + trace "Deleting keys without provider from store", len = keys.len + if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption: + trace "Error deleting records from persistent store", err = err.msg + return failure err + + trace "Retrieved providers from persistent store", id = id, len = providers.len return success providers proc contains*( @@ -179,8 +185,8 @@ proc contains*(self: ProvidersManager, peerId: PeerId): Future[bool] {.async.} = return (await self.store.has(provKey)) |? false -proc contains*(self: ProvidersManager, cid: NodeId): Future[bool] {.async.} = - without cidKey =? (CidKey / $cid), err: +proc contains*(self: ProvidersManager, id: NodeId): Future[bool] {.async.} = + without cidKey =? (CidKey / $id), err: return false let @@ -197,15 +203,15 @@ proc contains*(self: ProvidersManager, cid: NodeId): Future[bool] {.async.} = discard (await iter.dispose()) for item in iter: - if pair =? (await item) and pair.key.isSome: + if (key, _) =? (await item) and key.isSome: return true return false -proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} = +proc remove*(self: ProvidersManager, id: NodeId): Future[?!void] {.async.} = - self.cache.drop(cid) - without cidKey =? (CidKey / $cid), err: + self.cache.drop(id) + without cidKey =? (CidKey / $id), err: return failure(err.msg) let @@ -225,16 +231,14 @@ proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} = keys: seq[Key] for item in iter: - if pair =? (await item) and pair.key.isSome: - let - key = pair.key.get() + if (maybeKey, _) =? (await item) and key =? maybeKey: keys.add(key) without pairs =? key.fromCidKey, err: trace "Unable to parse peer id from key", key return failure err - self.cache.remove(cid, pairs.peerId) + self.cache.remove(id, pairs.peerId) trace "Deleted record from store", key if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption: @@ -243,57 +247,60 @@ proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} = return success() -proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} = - without cidKey =? (CidKey / "*" / $peerId), err: - return failure err +proc remove*( + self: ProvidersManager, + peerId: PeerId, + entries = false): Future[?!void] {.async.} = - let - q = Query.init(cidKey) - - block: - without iter =? (await self.store.query(q)), err: - trace "Unable to obtain record for key", key = cidKey + if entries: + without cidKey =? (CidKey / "*" / $peerId), err: return failure err - defer: - if not isNil(iter): - trace "Cleaning up query iterator" - discard (await iter.dispose()) + let + q = Query.init(cidKey) - var - keys: seq[Key] + block: + without iter =? (await self.store.query(q)), err: + trace "Unable to obtain record for key", key = cidKey + return failure err - for item in iter: - if pair =? (await item) and pair.key.isSome: - let - key = pair.key.get() + defer: + if not isNil(iter): + trace "Cleaning up query iterator" + discard (await iter.dispose()) - keys.add(key) + var + keys: seq[Key] - let - parts = key.id.split(datastore.Separator) + for item in iter: + if (maybeKey, _) =? (await item) and key =? maybeKey: + keys.add(key) - self.cache.remove(NodeId.fromHex(parts[2]), peerId) + let + parts = key.id.split(datastore.Separator) - if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption: - trace "Error deleting record from persistent store", err = err.msg - return failure err + if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption: + trace "Error deleting record from persistent store", err = err.msg + return failure err - trace "Deleted records from store" + trace "Deleted records from store" - without provKey =? makeProviderKey(peerId), err: + without provKey =? peerId.makeProviderKey, err: return failure err + trace "Removing provider from cache", peerId + self.cache.remove(peerId) + trace "Removing provider record", key = provKey return (await self.store.delete(provKey)) proc remove*( self: ProvidersManager, - cid: NodeId, + id: NodeId, peerId: PeerId): Future[?!void] {.async.} = - self.cache.remove(cid, peerId) - without cidKey =? makeCidKey(cid, peerId), err: + self.cache.remove(id, peerId) + without cidKey =? makeCidKey(id, peerId), err: trace "Error creating key from content id", err = err.msg return failure err.msg diff --git a/tests/dht/test_providermngr.nim b/tests/dht/test_providermngr.nim index 607e316..6577c44 100644 --- a/tests/dht/test_providermngr.nim +++ b/tests/dht/test_providermngr.nim @@ -4,6 +4,7 @@ import std/sequtils import pkg/chronos import pkg/asynctest import pkg/datastore +from pkg/libp2p import PeerId import codexdht/dht import codexdht/private/eth/p2p/discoveryv5/spr @@ -100,10 +101,10 @@ suite "Test Providers Manager multiple": not (await manager.contains(nodeIds[49])) not (await manager.contains(nodeIds[99])) - test "Should remove by PeerId": - (await (manager.remove(providers[0].data.peerId))).tryGet - (await (manager.remove(providers[5].data.peerId))).tryGet - (await (manager.remove(providers[9].data.peerId))).tryGet + test "Should remove by PeerId with associated keys": + (await (manager.remove(providers[0].data.peerId, true))).tryGet + (await (manager.remove(providers[5].data.peerId, true))).tryGet + (await (manager.remove(providers[9].data.peerId, true))).tryGet for id in nodeIds: check: @@ -116,6 +117,22 @@ suite "Test Providers Manager multiple": not (await manager.contains(providers[5].data.peerId)) not (await manager.contains(providers[9].data.peerId)) + test "Should not return keys without provider": + for id in nodeIds: + check: + (await manager.get(id)).tryGet.len == 10 + + for provider in providers: + (await (manager.remove(provider.data.peerId))).tryGet + + for id in nodeIds: + check: + (await manager.get(id)).tryGet.len == 0 + + for provider in providers: + check: + not (await manager.contains(provider.data.peerId)) + suite "Test providers with cache": let rng = newRng() @@ -164,9 +181,9 @@ suite "Test providers with cache": not (await manager.contains(nodeIds[99])) test "Should remove by PeerId": - (await (manager.remove(providers[0].data.peerId))).tryGet - (await (manager.remove(providers[5].data.peerId))).tryGet - (await (manager.remove(providers[9].data.peerId))).tryGet + (await (manager.remove(providers[0].data.peerId, true))).tryGet + (await (manager.remove(providers[5].data.peerId, true))).tryGet + (await (manager.remove(providers[9].data.peerId, true))).tryGet for id in nodeIds: check: @@ -218,6 +235,24 @@ suite "Test Provider Maintenance": for id in nodeIds: check: (await manager.get(id)).tryGet.len == 0 + test "Should not cleanup unexpired": + let + unexpired = PrivateKey.example(rng).toSignedPeerRecord() + + (await manager.add(nodeIds[0], unexpired, ttl = 1.minutes)).tryGet + + await sleepAsync(500.millis) + await manager.store.cleanupExpired() + + let + unexpiredProvs = (await manager.get(nodeIds[0])).tryGet + + check: + unexpiredProvs.len == 1 + await (unexpired.data.peerId in manager) + + (await manager.remove(nodeIds[0])).tryGet + test "Should cleanup orphaned": for id in nodeIds: check: (await manager.get(id)).tryGet.len == 0