Fix timeout and delete (#86)

* use unix time for ttl

* don't remove all entries on peer removal

* cleanup questionable tuple destructure

* ignore vscode

* fix endians decoding

* allow removing by peerId

* invalidate cache by peerId on remove

* update test
This commit is contained in:
Dmitriy Ryajov 2023-11-17 16:01:16 -06:00 committed by GitHub
parent 91b2eaec89
commit dd4985435a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 163 additions and 97 deletions

2
.gitignore vendored
View File

@ -12,4 +12,4 @@ vendor/*
NimBinaries NimBinaries
.update.timestamp .update.timestamp
*.dSYM *.dSYM
*.vscode/* .vscode/*

View File

@ -55,3 +55,10 @@ iterator items*[K, V](lru: LRUCache[K, V]): V =
for item in lru.list: for item in lru.list:
yield item[1] yield item[1]
iterator keys*[K, V](lru: LRUCache[K, V]): K =
## Get cached keys - this doesn't touch the cache
##
for item in lru.table.keys:
yield item

View File

@ -11,6 +11,7 @@ import std/sequtils
import pkg/chronicles import pkg/chronicles
import pkg/libp2p import pkg/libp2p
import pkg/questionable
import ../node import ../node
import ../lru import ../lru
@ -35,22 +36,21 @@ type
func add*( func add*(
self: var ProvidersCache, self: var ProvidersCache,
id: NodeId, id: NodeId,
provider: SignedPeerRecord) = record: SignedPeerRecord) =
## Add providers for an id
## to the cache
if self.disable: if self.disable:
return return
var providers = without var providers =? self.cache.get(id):
if id notin self.cache: providers = Providers.init(self.maxProviders.int)
Providers.init(self.maxProviders.int)
else:
self.cache.get(id).get()
let let
peerId = provider.data.peerId peerId = record.data.peerId
trace "Adding provider to cache", id, peerId trace "Adding provider record to cache", id, peerId
providers.put(peerId, provider) providers.put(peerId, record)
self.cache.put(id, providers) self.cache.put(id, providers)
proc get*( proc get*(
@ -58,14 +58,13 @@ proc get*(
id: NodeId, id: NodeId,
start = 0, start = 0,
stop = MaxProvidersPerEntry.int): seq[SignedPeerRecord] = stop = MaxProvidersPerEntry.int): seq[SignedPeerRecord] =
## Get providers for an id
## from the cache
if self.disable: if self.disable:
return return
if id in self.cache: if recs =? self.cache.get(id):
let
recs = self.cache.get(id).get
let let
providers = toSeq(recs)[start..<min(recs.len, stop)] providers = toSeq(recs)[start..<min(recs.len, stop)]
@ -74,23 +73,40 @@ proc get*(
func remove*( func remove*(
self: var ProvidersCache, self: var ProvidersCache,
id: NodeId,
peerId: PeerId) = peerId: PeerId) =
## Remove a provider record from an id
## from the cache
##
if self.disable: if self.disable:
return return
if id notin self.cache: for id in self.cache.keys:
if var providers =? self.cache.get(id):
trace "Removing provider from cache", id, peerId
providers.del(peerId)
self.cache.put(id, providers)
func remove*(
self: var ProvidersCache,
id: NodeId,
peerId: PeerId) =
## Remove a provider record from an id
## from the cache
##
if self.disable:
return return
var if var providers =? self.cache.get(id):
providers = self.cache.get(id).get() trace "Removing record from cache", id
providers.del(peerId)
trace "Removing provider from cache", id self.cache.put(id, providers)
providers.del(peerId)
self.cache.put(id, providers)
func drop*(self: var ProvidersCache, id: NodeId) = func drop*(self: var ProvidersCache, id: NodeId) =
## Drop all the providers for an entry
##
if self.disable: if self.disable:
return return

View File

@ -9,6 +9,7 @@
import std/options import std/options
import std/sequtils import std/sequtils
from std/times import now, utc, toTime, toUnix
import pkg/stew/endians2 import pkg/stew/endians2
import pkg/chronos import pkg/chronos
@ -29,9 +30,6 @@ proc cleanupExpired*(
batchSize = ExpiredCleanupBatch) {.async.} = batchSize = ExpiredCleanupBatch) {.async.} =
trace "Cleaning up expired records" trace "Cleaning up expired records"
let
now = Moment.now()
let let
q = Query.init(CidKey, limit = batchSize) q = Query.init(CidKey, limit = batchSize)
@ -48,15 +46,18 @@ proc cleanupExpired*(
var var
keys = newSeq[Key]() keys = newSeq[Key]()
let
now = times.now().utc().toTime().toUnix()
for item in iter: for item in iter:
if (key, data) =? (await item) and key.isSome: if (maybeKey, data) =? (await item) and key =? maybeKey:
let let
expired = Moment.init(uint64.fromBytesBE(data).int64, Microsecond) expired = endians2.fromBytesBE(uint64, data).int64
if now >= expired: if now >= expired:
trace "Found expired record", key trace "Found expired record", key
keys.add(key.get) keys.add(key)
without pairs =? key.get.fromCidKey(), err: without pairs =? key.fromCidKey(), err:
trace "Error extracting parts from cid key", key trace "Error extracting parts from cid key", key
continue continue
@ -92,9 +93,9 @@ proc cleanupOrphaned*(
trace "Batch cleaned up", size = batchSize trace "Batch cleaned up", size = batchSize
count.inc count.inc
if (key, _) =? (await item) and key.isSome: if (maybeKey, _) =? (await item) and key =? maybeKey:
without peerId =? key.get.fromProvKey(), err: without peerId =? key.fromProvKey(), err:
trace "Error extracting parts from cid key", key = key.get trace "Error extracting parts from cid key", key
continue continue
without cidKey =? (CidKey / "*" / $peerId), err: without cidKey =? (CidKey / "*" / $peerId), err:
@ -121,7 +122,7 @@ proc cleanupOrphaned*(
trace "Peer not orphaned, skipping", peerId trace "Peer not orphaned, skipping", peerId
continue continue
if err =? (await store.delete(key.get)).errorOption: if err =? (await store.delete(key)).errorOption:
trace "Error deleting orphaned peer", err = err.msg trace "Error deleting orphaned peer", err = err.msg
continue continue

View File

@ -7,6 +7,7 @@
import std/sequtils import std/sequtils
import std/strutils import std/strutils
from std/times import now, utc, toTime, toUnix
import pkg/stew/endians2 import pkg/stew/endians2
import pkg/datastore import pkg/datastore
@ -57,30 +58,30 @@ proc getProvByKey*(self: ProvidersManager, key: Key): Future[?!SignedPeerRecord]
proc add*( proc add*(
self: ProvidersManager, self: ProvidersManager,
cid: NodeId, id: NodeId,
provider: SignedPeerRecord, provider: SignedPeerRecord,
ttl = ZeroDuration): Future[?!void] {.async.} = ttl = ZeroDuration): Future[?!void] {.async.} =
let let
peerId = provider.data.peerId peerId = provider.data.peerId
trace "Adding provider to persistent store", cid, peerId trace "Adding provider to persistent store", id, peerId
without provKey =? makeProviderKey(peerId), err: without provKey =? makeProviderKey(peerId), err:
trace "Error creating key from provider record", err = err.msg trace "Error creating key from provider record", err = err.msg
return failure err.msg return failure err.msg
without cidKey =? makeCidKey(cid, peerId), err: without cidKey =? makeCidKey(id, peerId), err:
trace "Error creating key from content id", err = err.msg trace "Error creating key from content id", err = err.msg
return failure err.msg return failure err.msg
let let
now = times.now().utc().toTime().toUnix()
expires = expires =
if ttl > ZeroDuration: if ttl > ZeroDuration:
ttl ttl.seconds + now
else: else:
Moment.fromNow(self.ttl) - ZeroMoment self.ttl.seconds + now
ttl = endians2.toBytesBE(expires.uint64)
ttl = endians2.toBytesBE(expires.microseconds.uint64)
bytes: seq[byte] = bytes: seq[byte] =
if existing =? (await self.getProvByKey(provKey)) and if existing =? (await self.getProvByKey(provKey)) and
@ -94,17 +95,17 @@ proc add*(
bytes bytes
if bytes.len > 0: if bytes.len > 0:
trace "Adding or updating provider record", cid, peerId trace "Adding or updating provider record", id, peerId
if err =? (await self.store.put(provKey, bytes)).errorOption: if err =? (await self.store.put(provKey, bytes)).errorOption:
trace "Unable to store provider with key", key = provKey, err = err.msg trace "Unable to store provider with key", key = provKey, err = err.msg
trace "Adding or updating cid", cid, key = cidKey, ttl = expires.minutes trace "Adding or updating id", id, key = cidKey, ttl = expires.seconds
if err =? (await self.store.put(cidKey, @ttl)).errorOption: if err =? (await self.store.put(cidKey, @ttl)).errorOption:
trace "Unable to store provider with key", key = cidKey, err = err.msg trace "Unable to store provider with key", key = cidKey, err = err.msg
return return
self.cache.add(cid, provider) self.cache.add(id, provider)
trace "Provider for cid added", cidKey, provKey trace "Provider for id added", cidKey, provKey
return success() return success()
proc get*( proc get*(
@ -137,12 +138,10 @@ proc get*(
trace "Cleaning up query iterator" trace "Cleaning up query iterator"
discard (await cidIter.dispose()) discard (await cidIter.dispose())
var keys: seq[Key]
for item in cidIter: for item in cidIter:
# TODO: =? doesn't support tuples # TODO: =? doesn't support tuples
if pair =? (await item) and pair.key.isSome: if (maybeKey, val) =? (await item) and key =? maybeKey:
let
(key, val) = (pair.key.get, pair.data)
without pairs =? key.fromCidKey() and without pairs =? key.fromCidKey() and
provKey =? makeProviderKey(pairs.peerId), err: provKey =? makeProviderKey(pairs.peerId), err:
trace "Error creating key from provider record", err = err.msg trace "Error creating key from provider record", err = err.msg
@ -151,17 +150,24 @@ proc get*(
trace "Querying provider key", key = provKey trace "Querying provider key", key = provKey
without data =? (await self.store.get(provKey)): without data =? (await self.store.get(provKey)):
trace "Error getting provider", key = provKey trace "Error getting provider", key = provKey
keys.add(key)
continue continue
without provider =? SignedPeerRecord.decode(data).mapErr(mapFailure), err: without provider =? SignedPeerRecord.decode(data).mapErr(mapFailure), err:
trace "Unable to decode provider from store", err = err.msg trace "Unable to decode provider from store", err = err.msg
keys.add(key)
continue continue
trace "Retrieved provider with key", key = provKey trace "Retrieved provider with key", key = provKey
providers.add(provider) providers.add(provider)
self.cache.add(id, provider) self.cache.add(id, provider)
trace "Retrieved providers from persistent store", cid = id, len = providers.len trace "Deleting keys without provider from store", len = keys.len
if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption:
trace "Error deleting records from persistent store", err = err.msg
return failure err
trace "Retrieved providers from persistent store", id = id, len = providers.len
return success providers return success providers
proc contains*( proc contains*(
@ -179,8 +185,8 @@ proc contains*(self: ProvidersManager, peerId: PeerId): Future[bool] {.async.} =
return (await self.store.has(provKey)) |? false return (await self.store.has(provKey)) |? false
proc contains*(self: ProvidersManager, cid: NodeId): Future[bool] {.async.} = proc contains*(self: ProvidersManager, id: NodeId): Future[bool] {.async.} =
without cidKey =? (CidKey / $cid), err: without cidKey =? (CidKey / $id), err:
return false return false
let let
@ -197,15 +203,15 @@ proc contains*(self: ProvidersManager, cid: NodeId): Future[bool] {.async.} =
discard (await iter.dispose()) discard (await iter.dispose())
for item in iter: for item in iter:
if pair =? (await item) and pair.key.isSome: if (key, _) =? (await item) and key.isSome:
return true return true
return false return false
proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} = proc remove*(self: ProvidersManager, id: NodeId): Future[?!void] {.async.} =
self.cache.drop(cid) self.cache.drop(id)
without cidKey =? (CidKey / $cid), err: without cidKey =? (CidKey / $id), err:
return failure(err.msg) return failure(err.msg)
let let
@ -225,16 +231,14 @@ proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} =
keys: seq[Key] keys: seq[Key]
for item in iter: for item in iter:
if pair =? (await item) and pair.key.isSome: if (maybeKey, _) =? (await item) and key =? maybeKey:
let
key = pair.key.get()
keys.add(key) keys.add(key)
without pairs =? key.fromCidKey, err: without pairs =? key.fromCidKey, err:
trace "Unable to parse peer id from key", key trace "Unable to parse peer id from key", key
return failure err return failure err
self.cache.remove(cid, pairs.peerId) self.cache.remove(id, pairs.peerId)
trace "Deleted record from store", key trace "Deleted record from store", key
if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption: if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption:
@ -243,57 +247,60 @@ proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} =
return success() return success()
proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} = proc remove*(
without cidKey =? (CidKey / "*" / $peerId), err: self: ProvidersManager,
return failure err peerId: PeerId,
entries = false): Future[?!void] {.async.} =
let if entries:
q = Query.init(cidKey) without cidKey =? (CidKey / "*" / $peerId), err:
block:
without iter =? (await self.store.query(q)), err:
trace "Unable to obtain record for key", key = cidKey
return failure err return failure err
defer: let
if not isNil(iter): q = Query.init(cidKey)
trace "Cleaning up query iterator"
discard (await iter.dispose())
var block:
keys: seq[Key] without iter =? (await self.store.query(q)), err:
trace "Unable to obtain record for key", key = cidKey
return failure err
for item in iter: defer:
if pair =? (await item) and pair.key.isSome: if not isNil(iter):
let trace "Cleaning up query iterator"
key = pair.key.get() discard (await iter.dispose())
keys.add(key) var
keys: seq[Key]
let for item in iter:
parts = key.id.split(datastore.Separator) if (maybeKey, _) =? (await item) and key =? maybeKey:
keys.add(key)
self.cache.remove(NodeId.fromHex(parts[2]), peerId) let
parts = key.id.split(datastore.Separator)
if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption: if keys.len > 0 and err =? (await self.store.delete(keys)).errorOption:
trace "Error deleting record from persistent store", err = err.msg trace "Error deleting record from persistent store", err = err.msg
return failure err return failure err
trace "Deleted records from store" trace "Deleted records from store"
without provKey =? makeProviderKey(peerId), err: without provKey =? peerId.makeProviderKey, err:
return failure err return failure err
trace "Removing provider from cache", peerId
self.cache.remove(peerId)
trace "Removing provider record", key = provKey trace "Removing provider record", key = provKey
return (await self.store.delete(provKey)) return (await self.store.delete(provKey))
proc remove*( proc remove*(
self: ProvidersManager, self: ProvidersManager,
cid: NodeId, id: NodeId,
peerId: PeerId): Future[?!void] {.async.} = peerId: PeerId): Future[?!void] {.async.} =
self.cache.remove(cid, peerId) self.cache.remove(id, peerId)
without cidKey =? makeCidKey(cid, peerId), err: without cidKey =? makeCidKey(id, peerId), err:
trace "Error creating key from content id", err = err.msg trace "Error creating key from content id", err = err.msg
return failure err.msg return failure err.msg

View File

@ -4,6 +4,7 @@ import std/sequtils
import pkg/chronos import pkg/chronos
import pkg/asynctest import pkg/asynctest
import pkg/datastore import pkg/datastore
from pkg/libp2p import PeerId
import codexdht/dht import codexdht/dht
import codexdht/private/eth/p2p/discoveryv5/spr import codexdht/private/eth/p2p/discoveryv5/spr
@ -100,10 +101,10 @@ suite "Test Providers Manager multiple":
not (await manager.contains(nodeIds[49])) not (await manager.contains(nodeIds[49]))
not (await manager.contains(nodeIds[99])) not (await manager.contains(nodeIds[99]))
test "Should remove by PeerId": test "Should remove by PeerId with associated keys":
(await (manager.remove(providers[0].data.peerId))).tryGet (await (manager.remove(providers[0].data.peerId, true))).tryGet
(await (manager.remove(providers[5].data.peerId))).tryGet (await (manager.remove(providers[5].data.peerId, true))).tryGet
(await (manager.remove(providers[9].data.peerId))).tryGet (await (manager.remove(providers[9].data.peerId, true))).tryGet
for id in nodeIds: for id in nodeIds:
check: check:
@ -116,6 +117,22 @@ suite "Test Providers Manager multiple":
not (await manager.contains(providers[5].data.peerId)) not (await manager.contains(providers[5].data.peerId))
not (await manager.contains(providers[9].data.peerId)) not (await manager.contains(providers[9].data.peerId))
test "Should not return keys without provider":
for id in nodeIds:
check:
(await manager.get(id)).tryGet.len == 10
for provider in providers:
(await (manager.remove(provider.data.peerId))).tryGet
for id in nodeIds:
check:
(await manager.get(id)).tryGet.len == 0
for provider in providers:
check:
not (await manager.contains(provider.data.peerId))
suite "Test providers with cache": suite "Test providers with cache":
let let
rng = newRng() rng = newRng()
@ -164,9 +181,9 @@ suite "Test providers with cache":
not (await manager.contains(nodeIds[99])) not (await manager.contains(nodeIds[99]))
test "Should remove by PeerId": test "Should remove by PeerId":
(await (manager.remove(providers[0].data.peerId))).tryGet (await (manager.remove(providers[0].data.peerId, true))).tryGet
(await (manager.remove(providers[5].data.peerId))).tryGet (await (manager.remove(providers[5].data.peerId, true))).tryGet
(await (manager.remove(providers[9].data.peerId))).tryGet (await (manager.remove(providers[9].data.peerId, true))).tryGet
for id in nodeIds: for id in nodeIds:
check: check:
@ -218,6 +235,24 @@ suite "Test Provider Maintenance":
for id in nodeIds: for id in nodeIds:
check: (await manager.get(id)).tryGet.len == 0 check: (await manager.get(id)).tryGet.len == 0
test "Should not cleanup unexpired":
let
unexpired = PrivateKey.example(rng).toSignedPeerRecord()
(await manager.add(nodeIds[0], unexpired, ttl = 1.minutes)).tryGet
await sleepAsync(500.millis)
await manager.store.cleanupExpired()
let
unexpiredProvs = (await manager.get(nodeIds[0])).tryGet
check:
unexpiredProvs.len == 1
await (unexpired.data.peerId in manager)
(await manager.remove(nodeIds[0])).tryGet
test "Should cleanup orphaned": test "Should cleanup orphaned":
for id in nodeIds: for id in nodeIds:
check: (await manager.get(id)).tryGet.len == 0 check: (await manager.get(id)).tryGet.len == 0