mirror of
https://github.com/logos-storage/logos-storage-nim-dht.git
synced 2026-01-05 23:13:10 +00:00
Adding maintenance routines to cleanup expired and orphaned records (#52)
* add maintenance routines and split out cache * start provider manager * export provider manager * add maintenance tests * start is now async
This commit is contained in:
parent
69ae7c2012
commit
4b9fa0356e
@ -75,17 +75,30 @@
|
|||||||
|
|
||||||
import
|
import
|
||||||
std/[tables, sets, options, math, sequtils, algorithm, strutils],
|
std/[tables, sets, options, math, sequtils, algorithm, strutils],
|
||||||
stew/shims/net as stewNet, json_serialization/std/net,
|
stew/shims/net as stewNet,
|
||||||
stew/[base64, endians2, results], chronicles, chronicles/chronos_tools, chronos, chronos/timer, stint, bearssl,
|
json_serialization/std/net,
|
||||||
metrics,
|
stew/[base64, endians2, results],
|
||||||
libp2p/[crypto/crypto, routing_record],
|
pkg/[chronicles, chronicles/chronos_tools],
|
||||||
transport, messages, messages_encoding, node,
|
pkg/chronos,
|
||||||
routing_table, spr, random2, ip_vote, nodes_verification,
|
pkg/stint,
|
||||||
providersmngr
|
pkg/bearssl,
|
||||||
|
pkg/metrics
|
||||||
|
|
||||||
|
import "."/[
|
||||||
|
messages,
|
||||||
|
messages_encoding,
|
||||||
|
node,
|
||||||
|
routing_table,
|
||||||
|
spr,
|
||||||
|
random2,
|
||||||
|
ip_vote,
|
||||||
|
nodes_verification,
|
||||||
|
providers,
|
||||||
|
transport]
|
||||||
|
|
||||||
import nimcrypto except toHex
|
import nimcrypto except toHex
|
||||||
|
|
||||||
export options, results, node, spr, providersmngr
|
export options, results, node, spr, providers
|
||||||
|
|
||||||
declareCounter discovery_message_requests_outgoing,
|
declareCounter discovery_message_requests_outgoing,
|
||||||
"Discovery protocol outgoing message requests", labels = ["response"]
|
"Discovery protocol outgoing message requests", labels = ["response"]
|
||||||
@ -710,7 +723,7 @@ proc sendGetProviders(d: Protocol, toNode: Node,
|
|||||||
resp = await d.waitMessage(toNode, reqId)
|
resp = await d.waitMessage(toNode, reqId)
|
||||||
|
|
||||||
if resp.isSome():
|
if resp.isSome():
|
||||||
if resp.get().kind == providers:
|
if resp.get().kind == MessageKind.providers:
|
||||||
d.routingTable.setJustSeen(toNode)
|
d.routingTable.setJustSeen(toNode)
|
||||||
return ok(resp.get().provs)
|
return ok(resp.get().provs)
|
||||||
else:
|
else:
|
||||||
@ -740,7 +753,7 @@ proc getProviders*(
|
|||||||
d: Protocol,
|
d: Protocol,
|
||||||
cId: NodeId,
|
cId: NodeId,
|
||||||
maxitems: int = 5,
|
maxitems: int = 5,
|
||||||
timeout: timer.Duration = chronos.milliseconds(5000)
|
timeout: Duration = 5000.milliseconds
|
||||||
): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} =
|
): Future[DiscResult[seq[SignedPeerRecord]]] {.async.} =
|
||||||
|
|
||||||
# What providers do we know about?
|
# What providers do we know about?
|
||||||
@ -1121,23 +1134,12 @@ proc open*(d: Protocol) {.raises: [Defect, CatchableError].} =
|
|||||||
|
|
||||||
d.seedTable()
|
d.seedTable()
|
||||||
|
|
||||||
proc start*(d: Protocol) =
|
proc start*(d: Protocol) {.async.} =
|
||||||
d.refreshLoop = refreshLoop(d)
|
d.refreshLoop = refreshLoop(d)
|
||||||
d.revalidateLoop = revalidateLoop(d)
|
d.revalidateLoop = revalidateLoop(d)
|
||||||
d.ipMajorityLoop = ipMajorityLoop(d)
|
d.ipMajorityLoop = ipMajorityLoop(d)
|
||||||
|
|
||||||
proc close*(d: Protocol) =
|
await d.providers.start()
|
||||||
doAssert(not d.transport.closed)
|
|
||||||
|
|
||||||
debug "Closing discovery node", node = d.localNode
|
|
||||||
if not d.revalidateLoop.isNil:
|
|
||||||
d.revalidateLoop.cancel()
|
|
||||||
if not d.refreshLoop.isNil:
|
|
||||||
d.refreshLoop.cancel()
|
|
||||||
if not d.ipMajorityLoop.isNil:
|
|
||||||
d.ipMajorityLoop.cancel()
|
|
||||||
|
|
||||||
d.transport.close()
|
|
||||||
|
|
||||||
proc closeWait*(d: Protocol) {.async.} =
|
proc closeWait*(d: Protocol) {.async.} =
|
||||||
doAssert(not d.transport.closed)
|
doAssert(not d.transport.closed)
|
||||||
|
|||||||
5
libp2pdht/private/eth/p2p/discoveryv5/providers.nim
Normal file
5
libp2pdht/private/eth/p2p/discoveryv5/providers.nim
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
import ./providers/cache
|
||||||
|
import ./providers/maintenance
|
||||||
|
import ./providers/manager
|
||||||
|
|
||||||
|
export cache, maintenance, manager
|
||||||
108
libp2pdht/private/eth/p2p/discoveryv5/providers/cache.nim
Normal file
108
libp2pdht/private/eth/p2p/discoveryv5/providers/cache.nim
Normal file
@ -0,0 +1,108 @@
|
|||||||
|
# codex-dht - Codex DHT
|
||||||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import std/sequtils
|
||||||
|
|
||||||
|
import pkg/chronicles
|
||||||
|
import pkg/libp2p
|
||||||
|
|
||||||
|
import ../node
|
||||||
|
import ../lru
|
||||||
|
import ./common
|
||||||
|
|
||||||
|
const
|
||||||
|
MaxProvidersEntries* = 1000'u # one thousand records
|
||||||
|
MaxProvidersPerEntry* = 200'u # providers per entry
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "discv5 providers cache"
|
||||||
|
|
||||||
|
type
|
||||||
|
Providers* = LRUCache[PeerId, SignedPeerRecord]
|
||||||
|
ItemsCache* = LRUCache[NodeId, Providers]
|
||||||
|
|
||||||
|
ProvidersCache* = object
|
||||||
|
disable: bool
|
||||||
|
cache*: ItemsCache
|
||||||
|
maxProviders*: int
|
||||||
|
|
||||||
|
func add*(
|
||||||
|
self: var ProvidersCache,
|
||||||
|
id: NodeId,
|
||||||
|
provider: SignedPeerRecord) =
|
||||||
|
|
||||||
|
if self.disable:
|
||||||
|
return
|
||||||
|
|
||||||
|
var providers =
|
||||||
|
if id notin self.cache:
|
||||||
|
Providers.init(self.maxProviders.int)
|
||||||
|
else:
|
||||||
|
self.cache.get(id).get()
|
||||||
|
|
||||||
|
let
|
||||||
|
peerId = provider.data.peerId
|
||||||
|
|
||||||
|
trace "Adding provider to cache", id, peerId
|
||||||
|
providers.put(peerId, provider)
|
||||||
|
self.cache.put(id, providers)
|
||||||
|
|
||||||
|
proc get*(
|
||||||
|
self: var ProvidersCache,
|
||||||
|
id: NodeId,
|
||||||
|
start = 0,
|
||||||
|
stop = MaxProvidersPerEntry.int): seq[SignedPeerRecord] =
|
||||||
|
|
||||||
|
if self.disable:
|
||||||
|
return
|
||||||
|
|
||||||
|
if id in self.cache:
|
||||||
|
let
|
||||||
|
recs = self.cache.get(id).get
|
||||||
|
|
||||||
|
let
|
||||||
|
providers = toSeq(recs)[start..<min(recs.len, stop)]
|
||||||
|
|
||||||
|
trace "Providers already cached", id, len = providers.len
|
||||||
|
return providers
|
||||||
|
|
||||||
|
func remove*(
|
||||||
|
self: var ProvidersCache,
|
||||||
|
id: NodeId,
|
||||||
|
peerId: PeerId) =
|
||||||
|
|
||||||
|
if self.disable:
|
||||||
|
return
|
||||||
|
|
||||||
|
if id notin self.cache:
|
||||||
|
return
|
||||||
|
|
||||||
|
var
|
||||||
|
providers = self.cache.get(id).get()
|
||||||
|
|
||||||
|
trace "Removing provider from cache", id
|
||||||
|
providers.del(peerId)
|
||||||
|
self.cache.put(id, providers)
|
||||||
|
|
||||||
|
func drop*(self: var ProvidersCache, id: NodeId) =
|
||||||
|
if self.disable:
|
||||||
|
return
|
||||||
|
|
||||||
|
self.cache.del(id)
|
||||||
|
|
||||||
|
func init*(
|
||||||
|
T: type ProvidersCache,
|
||||||
|
size = MaxProvidersEntries,
|
||||||
|
maxProviders = MaxProvidersEntries,
|
||||||
|
disable = false): T =
|
||||||
|
|
||||||
|
T(
|
||||||
|
cache: ItemsCache.init(size.int),
|
||||||
|
maxProviders: maxProviders.int,
|
||||||
|
disable: disable)
|
||||||
58
libp2pdht/private/eth/p2p/discoveryv5/providers/common.nim
Normal file
58
libp2pdht/private/eth/p2p/discoveryv5/providers/common.nim
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
# codex-dht - Codex DHT
|
||||||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import std/sequtils
|
||||||
|
import std/strutils
|
||||||
|
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/libp2p
|
||||||
|
import pkg/datastore
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import ../node
|
||||||
|
|
||||||
|
export node, results
|
||||||
|
|
||||||
|
const
|
||||||
|
ProvidersKey* = Key.init("/providers").tryGet # keys is of the form /providers/peerid = provider
|
||||||
|
CidKey* = Key.init("/cids").tryGet # keys is of the form /cids/cid/peerid/ttl = ttl
|
||||||
|
|
||||||
|
ZeroMoment* = Moment.init(0, Nanosecond) # for conversion between Duration and Moment
|
||||||
|
|
||||||
|
proc mapFailure*[T](err: T): ref CatchableError =
|
||||||
|
newException(CatchableError, $err)
|
||||||
|
|
||||||
|
proc makeProviderKey*(peerId: PeerId): ?!Key =
|
||||||
|
(ProvidersKey / $peerId)
|
||||||
|
|
||||||
|
proc makeCidKey*(cid: NodeId, peerId: PeerId): ?!Key =
|
||||||
|
(CidKey / cid.toHex / $peerId / "ttl")
|
||||||
|
|
||||||
|
proc fromCidKey*(key: Key): ?!tuple[id: NodeId, peerId: PeerId] =
|
||||||
|
let
|
||||||
|
parts = key.id.split(datastore.Separator)
|
||||||
|
|
||||||
|
if parts.len == 5:
|
||||||
|
let
|
||||||
|
peerId = ?PeerId.init(parts[3]).mapErr(mapFailure)
|
||||||
|
id = ?NodeId.fromHex(parts[2]).catch
|
||||||
|
|
||||||
|
return success (id, peerId)
|
||||||
|
|
||||||
|
return failure("Unable to extract peer id from key")
|
||||||
|
|
||||||
|
proc fromProvKey*(key: Key): ?!PeerId =
|
||||||
|
let
|
||||||
|
parts = key.id.split(datastore.Separator)
|
||||||
|
|
||||||
|
if parts.len != 3:
|
||||||
|
return failure("Can't find peer id in key")
|
||||||
|
|
||||||
|
return success ?PeerId.init(parts[^1]).mapErr(mapFailure)
|
||||||
129
libp2pdht/private/eth/p2p/discoveryv5/providers/maintenance.nim
Normal file
129
libp2pdht/private/eth/p2p/discoveryv5/providers/maintenance.nim
Normal file
@ -0,0 +1,129 @@
|
|||||||
|
# codex-dht - Codex DHT
|
||||||
|
# Copyright (c) 2022 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
|
import std/options
|
||||||
|
import std/sequtils
|
||||||
|
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/libp2p
|
||||||
|
import pkg/datastore
|
||||||
|
import pkg/chronicles
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
|
||||||
|
import ./common
|
||||||
|
|
||||||
|
const
|
||||||
|
ExpiredCleanupBatch* = 1000
|
||||||
|
CleanupInterval* = 5.minutes
|
||||||
|
|
||||||
|
proc cleanupExpired*(
|
||||||
|
store: Datastore,
|
||||||
|
batchSize = ExpiredCleanupBatch) {.async.} =
|
||||||
|
trace "Cleaning up expired records"
|
||||||
|
|
||||||
|
let
|
||||||
|
now = Moment.now()
|
||||||
|
|
||||||
|
let
|
||||||
|
q = Query.init(CidKey)
|
||||||
|
|
||||||
|
block:
|
||||||
|
without iter =? (await store.query(q)), err:
|
||||||
|
trace "Unable to obtain record for key", err = err.msg
|
||||||
|
return
|
||||||
|
|
||||||
|
defer:
|
||||||
|
if not isNil(iter):
|
||||||
|
trace "Cleaning up query iterator"
|
||||||
|
discard (await iter.dispose())
|
||||||
|
|
||||||
|
var
|
||||||
|
keys = newSeq[Key]()
|
||||||
|
|
||||||
|
for item in iter:
|
||||||
|
if pair =? (await item) and pair.key.isSome:
|
||||||
|
let
|
||||||
|
(key, data) = (pair.key.get(), pair.data)
|
||||||
|
expired = Moment.init(uint64.fromBytesBE(data).int64, Microsecond)
|
||||||
|
|
||||||
|
if now >= expired:
|
||||||
|
trace "Found expired record", key
|
||||||
|
keys.add(key)
|
||||||
|
without pairs =? key.fromCidKey(), err:
|
||||||
|
trace "Error extracting parts from cid key", key
|
||||||
|
continue
|
||||||
|
|
||||||
|
if keys.len >= batchSize:
|
||||||
|
break
|
||||||
|
|
||||||
|
if err =? (await store.delete(keys)).errorOption:
|
||||||
|
trace "Error cleaning up batch, records left intact!", size = keys.len, err = err.msg
|
||||||
|
|
||||||
|
trace "Cleaned up expired records", size = keys.len
|
||||||
|
|
||||||
|
proc cleanupOrphaned*(
|
||||||
|
store: Datastore,
|
||||||
|
batchSize = ExpiredCleanupBatch) {.async.} =
|
||||||
|
trace "Cleaning up orphaned records"
|
||||||
|
|
||||||
|
let
|
||||||
|
providersQuery = Query.init(ProvidersKey)
|
||||||
|
|
||||||
|
block:
|
||||||
|
without iter =? (await store.query(providersQuery)), err:
|
||||||
|
trace "Unable to obtain record for key"
|
||||||
|
return
|
||||||
|
|
||||||
|
defer:
|
||||||
|
if not isNil(iter):
|
||||||
|
trace "Cleaning up query iterator"
|
||||||
|
discard (await iter.dispose())
|
||||||
|
|
||||||
|
var count = 0
|
||||||
|
for item in iter:
|
||||||
|
if count >= batchSize:
|
||||||
|
trace "Batch cleaned up", size = batchSize
|
||||||
|
|
||||||
|
count.inc
|
||||||
|
if pair =? (await item) and pair.key.isSome:
|
||||||
|
let
|
||||||
|
key = pair.key.get()
|
||||||
|
|
||||||
|
without peerId =? key.fromProvKey(), err:
|
||||||
|
trace "Error extracting parts from cid key", key
|
||||||
|
continue
|
||||||
|
|
||||||
|
without cidKey =? (CidKey / "*" / $peerId), err:
|
||||||
|
trace "Error building cid key", err = err.msg
|
||||||
|
continue
|
||||||
|
|
||||||
|
without cidIter =? (await store.query(Query.init(cidKey, limit = 1))), err:
|
||||||
|
trace "Error querying key", cidKey
|
||||||
|
continue
|
||||||
|
|
||||||
|
let
|
||||||
|
res = (await allFinished(toSeq(cidIter)))
|
||||||
|
.filterIt( it.completed )
|
||||||
|
.mapIt( it.read.get )
|
||||||
|
.filterIt( it.key.isSome ).len
|
||||||
|
|
||||||
|
if not isNil(cidIter):
|
||||||
|
trace "Disposing cid iter"
|
||||||
|
discard (await cidIter.dispose())
|
||||||
|
|
||||||
|
if res > 0:
|
||||||
|
trace "Peer not orphaned, skipping", peerId
|
||||||
|
continue
|
||||||
|
|
||||||
|
if err =? (await store.delete(key)).errorOption:
|
||||||
|
trace "Error deleting orphaned peer", err = err.msg
|
||||||
|
continue
|
||||||
|
|
||||||
|
trace "Cleaned up orphaned peer", peerId
|
||||||
@ -14,122 +14,37 @@ import pkg/libp2p
|
|||||||
import pkg/chronicles
|
import pkg/chronicles
|
||||||
import pkg/stew/results as rs
|
import pkg/stew/results as rs
|
||||||
import pkg/stew/byteutils
|
import pkg/stew/byteutils
|
||||||
|
import pkg/questionable
|
||||||
import pkg/questionable/results
|
import pkg/questionable/results
|
||||||
|
|
||||||
{.push raises: [Defect].}
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import ./lru
|
import ./maintenance
|
||||||
import ./node
|
import ./cache
|
||||||
|
import ./common
|
||||||
|
import ../spr
|
||||||
|
|
||||||
export node, lru, datastore
|
export cache, datastore
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "discv5 providers manager"
|
topics = "discv5 providers manager"
|
||||||
|
|
||||||
const
|
const
|
||||||
DefaultProviderTTL = 24.hours
|
DefaultProviderTTL* = 24.hours
|
||||||
|
|
||||||
ProvidersKey* = Key.init("/providers").tryGet # keys is of the form /providers/peerid = provider
|
|
||||||
CidKey* = Key.init("/cids").tryGet # keys is of the form /cids/cid/peerid/ttl = ttl
|
|
||||||
|
|
||||||
MaxProvidersEntries* = 1000'u # one thousand records
|
|
||||||
MaxProvidersPerEntry* = 200'u # providers per entry
|
|
||||||
|
|
||||||
ZeroMoment = Moment.init(0, Nanosecond) # for conversion between Duration and Moment
|
|
||||||
|
|
||||||
type
|
type
|
||||||
ProvidersCache* = LRUCache[PeerId, SignedPeerRecord]
|
|
||||||
ItemsCache* = LRUCache[NodeId, ProvidersCache]
|
|
||||||
|
|
||||||
ProvidersManager* = ref object of RootObj
|
ProvidersManager* = ref object of RootObj
|
||||||
store*: Datastore
|
store*: Datastore
|
||||||
providers*: ItemsCache
|
cache*: ProvidersCache
|
||||||
ttl*: Duration
|
ttl*: Duration
|
||||||
maxItems*: uint
|
maxItems*: uint
|
||||||
maxProviders*: uint
|
maxProviders*: uint
|
||||||
disableCache*: bool
|
disableCache*: bool
|
||||||
|
expiredLoop*: Future[void]
|
||||||
proc mapFailure[T](err: T): ref CatchableError =
|
orphanedLoop*: Future[void]
|
||||||
newException(CatchableError, $err)
|
started*: bool
|
||||||
|
batchSize*: int
|
||||||
proc makeProviderKey(peerId: PeerId): ?!Key =
|
cleanupInterval*: Duration
|
||||||
(ProvidersKey / $peerId)
|
|
||||||
|
|
||||||
proc makeCidKey(cid: NodeId, peerId: PeerId): ?!Key =
|
|
||||||
(CidKey / cid.toHex / $peerId / "ttl")
|
|
||||||
|
|
||||||
proc peerIdFromCidKey(key: string): ?!PeerId =
|
|
||||||
let
|
|
||||||
parts = key.split("/")
|
|
||||||
|
|
||||||
if parts.len == 5:
|
|
||||||
return PeerId.init(parts[3]).mapErr(mapFailure)
|
|
||||||
|
|
||||||
return failure("Unable to extract peer id from key")
|
|
||||||
|
|
||||||
func addCache*(
|
|
||||||
self: ProvidersManager,
|
|
||||||
cid: NodeId,
|
|
||||||
provider: SignedPeerRecord) =
|
|
||||||
|
|
||||||
if self.disableCache:
|
|
||||||
return
|
|
||||||
|
|
||||||
var providers =
|
|
||||||
if cid notin self.providers:
|
|
||||||
ProvidersCache.init(self.maxProviders.int)
|
|
||||||
else:
|
|
||||||
self.providers.get(cid).get()
|
|
||||||
|
|
||||||
let
|
|
||||||
peerId = provider.data.peerId
|
|
||||||
|
|
||||||
trace "Adding provider to cache", cid, peerId
|
|
||||||
providers.put(peerId, provider)
|
|
||||||
self.providers.put(cid, providers)
|
|
||||||
|
|
||||||
func getCache*(
|
|
||||||
self: ProvidersManager,
|
|
||||||
cid: NodeId,
|
|
||||||
limit = MaxProvidersPerEntry.int): seq[SignedPeerRecord] =
|
|
||||||
|
|
||||||
if self.disableCache:
|
|
||||||
return
|
|
||||||
|
|
||||||
if cid in self.providers:
|
|
||||||
let
|
|
||||||
recs = self.providers.get(cid).get
|
|
||||||
providers = toSeq(recs)[0..<min(recs.len, limit)]
|
|
||||||
|
|
||||||
trace "Providers already cached", cid, len = providers.len
|
|
||||||
return providers
|
|
||||||
|
|
||||||
func removeCache*(
|
|
||||||
self: ProvidersManager,
|
|
||||||
cid: NodeId,
|
|
||||||
peerId: PeerId) =
|
|
||||||
|
|
||||||
if self.disableCache:
|
|
||||||
return
|
|
||||||
|
|
||||||
if cid notin self.providers:
|
|
||||||
return
|
|
||||||
|
|
||||||
var
|
|
||||||
providers = self.providers.get(cid).get()
|
|
||||||
|
|
||||||
trace "Removing provider from cache", cid
|
|
||||||
providers.del(peerId)
|
|
||||||
self.providers.put(cid, providers)
|
|
||||||
|
|
||||||
proc decode(
|
|
||||||
self: ProvidersManager,
|
|
||||||
bytes: seq[byte]): ?!SignedPeerRecord =
|
|
||||||
|
|
||||||
let
|
|
||||||
provider = ?SignedPeerRecord.decode(bytes).mapErr(mapFailure)
|
|
||||||
|
|
||||||
return success provider
|
|
||||||
|
|
||||||
proc getProvByKey*(self: ProvidersManager, key: Key): Future[?!SignedPeerRecord] {.async.} =
|
proc getProvByKey*(self: ProvidersManager, key: Key): Future[?!SignedPeerRecord] {.async.} =
|
||||||
|
|
||||||
@ -137,7 +52,7 @@ proc getProvByKey*(self: ProvidersManager, key: Key): Future[?!SignedPeerRecord]
|
|||||||
trace "No provider in store"
|
trace "No provider in store"
|
||||||
return failure("No no provider in store")
|
return failure("No no provider in store")
|
||||||
|
|
||||||
return self.decode(bytes)
|
return SignedPeerRecord.decode(bytes).mapErr(mapFailure)
|
||||||
|
|
||||||
proc add*(
|
proc add*(
|
||||||
self: ProvidersManager,
|
self: ProvidersManager,
|
||||||
@ -187,60 +102,65 @@ proc add*(
|
|||||||
trace "Unable to store provider with key", key = cidKey, err = err.msg
|
trace "Unable to store provider with key", key = cidKey, err = err.msg
|
||||||
return
|
return
|
||||||
|
|
||||||
self.addCache(cid, provider)
|
self.cache.add(cid, provider)
|
||||||
|
|
||||||
trace "Provider for cid added", cidKey, provKey
|
trace "Provider for cid added", cidKey, provKey
|
||||||
return success()
|
return success()
|
||||||
|
|
||||||
proc get*(
|
proc get*(
|
||||||
self: ProvidersManager,
|
self: ProvidersManager,
|
||||||
id: NodeId,
|
id: NodeId,
|
||||||
limit = MaxProvidersPerEntry.int): Future[?!seq[SignedPeerRecord]] {.async.} =
|
start = 0,
|
||||||
trace "Retrieving providers from persistent store", cid = id
|
stop = MaxProvidersPerEntry.int): Future[?!seq[SignedPeerRecord]] {.async.} =
|
||||||
|
trace "Retrieving providers from persistent store", id
|
||||||
|
|
||||||
|
let
|
||||||
|
provs = self.cache.get(id, start = start, stop = stop)
|
||||||
|
|
||||||
let provs = self.getCache(id, limit)
|
|
||||||
if provs.len > 0:
|
if provs.len > 0:
|
||||||
return success provs
|
return success provs
|
||||||
|
|
||||||
without cidKey =? (CidKey / id.toHex), err:
|
without cidKey =? (CidKey / id.toHex), err:
|
||||||
return failure err.msg
|
return failure err.msg
|
||||||
|
|
||||||
without cidIter =?
|
trace "Querying providers from persistent store", id, key = cidKey
|
||||||
(await self.store.query(Query.init(cidKey, limit = limit))), err:
|
|
||||||
return failure err.msg
|
|
||||||
|
|
||||||
defer:
|
|
||||||
discard (await cidIter.dispose())
|
|
||||||
|
|
||||||
trace "Querying providers from persistent store", cid = id, key = cidKey
|
|
||||||
var
|
var
|
||||||
providers: seq[SignedPeerRecord]
|
providers: seq[SignedPeerRecord]
|
||||||
|
|
||||||
for item in cidIter:
|
block:
|
||||||
# TODO: =? doesn't support tuples
|
without cidIter =?
|
||||||
if pair =? (await item) and pair.key.isSome:
|
(await self.store.query(Query.init(cidKey, offset = start, limit = stop))), err:
|
||||||
let
|
return failure err.msg
|
||||||
(key, val) = (pair.key.get, pair.data)
|
|
||||||
|
|
||||||
without peerId =? key.id.peerIdFromCidKey() and
|
defer:
|
||||||
provKey =? makeProviderKey(peerId), err:
|
if not isNil(cidIter):
|
||||||
trace "Error creating key from provider record", err = err.msg
|
trace "Cleaning up query iterator"
|
||||||
continue
|
discard (await cidIter.dispose())
|
||||||
|
|
||||||
trace "Querying provider key", key = provKey
|
for item in cidIter:
|
||||||
without data =? (await self.store.get(provKey)):
|
# TODO: =? doesn't support tuples
|
||||||
trace "Error getting provider", key = provKey
|
if pair =? (await item) and pair.key.isSome:
|
||||||
continue
|
let
|
||||||
|
(key, val) = (pair.key.get, pair.data)
|
||||||
|
|
||||||
without provider =? self.decode(data), err:
|
without pairs =? key.fromCidKey() and
|
||||||
trace "Unable to decode provider from store", err = err.msg
|
provKey =? makeProviderKey(pairs.peerId), err:
|
||||||
continue
|
trace "Error creating key from provider record", err = err.msg
|
||||||
|
continue
|
||||||
|
|
||||||
trace "Retrieved provider with key", key = provKey
|
trace "Querying provider key", key = provKey
|
||||||
providers.add(provider)
|
without data =? (await self.store.get(provKey)):
|
||||||
self.addCache(id, provider)
|
trace "Error getting provider", key = provKey
|
||||||
|
continue
|
||||||
|
|
||||||
trace "Retrieved providers from persistent store", cid = id, len = providers.len
|
without provider =? SignedPeerRecord.decode(data).mapErr(mapFailure), err:
|
||||||
|
trace "Unable to decode provider from store", err = err.msg
|
||||||
|
continue
|
||||||
|
|
||||||
|
trace "Retrieved provider with key", key = provKey
|
||||||
|
providers.add(provider)
|
||||||
|
self.cache.add(id, provider)
|
||||||
|
|
||||||
|
trace "Retrieved providers from persistent store", cid = id, len = providers.len
|
||||||
return success providers
|
return success providers
|
||||||
|
|
||||||
proc contains*(
|
proc contains*(
|
||||||
@ -265,38 +185,40 @@ proc contains*(self: ProvidersManager, cid: NodeId): Future[bool] {.async.} =
|
|||||||
let
|
let
|
||||||
q = Query.init(cidKey, limit = 1)
|
q = Query.init(cidKey, limit = 1)
|
||||||
|
|
||||||
without iter =? (await self.store.query(q)), err:
|
block:
|
||||||
trace "Unable to obtain record for key", key = cidKey
|
without iter =? (await self.store.query(q)), err:
|
||||||
return false
|
trace "Unable to obtain record for key", key = cidKey
|
||||||
|
return false
|
||||||
|
|
||||||
defer:
|
defer:
|
||||||
trace "Cleaning up query iterator"
|
if not isNil(iter):
|
||||||
discard (await iter.dispose())
|
trace "Cleaning up query iterator"
|
||||||
|
discard (await iter.dispose())
|
||||||
|
|
||||||
for item in iter:
|
for item in iter:
|
||||||
if pair =? (await item) and pair.key.isSome:
|
if pair =? (await item) and pair.key.isSome:
|
||||||
return true
|
return true
|
||||||
|
|
||||||
return false
|
return false
|
||||||
|
|
||||||
proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} =
|
proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} =
|
||||||
if cid in self.providers:
|
|
||||||
self.providers.del(cid)
|
|
||||||
|
|
||||||
|
self.cache.drop(cid)
|
||||||
without cidKey =? (CidKey / $cid), err:
|
without cidKey =? (CidKey / $cid), err:
|
||||||
return failure(err.msg)
|
return failure(err.msg)
|
||||||
|
|
||||||
let
|
let
|
||||||
q = Query.init(cidKey)
|
q = Query.init(cidKey)
|
||||||
|
|
||||||
without iter =? (await self.store.query(q)), err:
|
|
||||||
trace "Unable to obtain record for key", key = cidKey
|
|
||||||
return failure(err.msg)
|
|
||||||
|
|
||||||
block:
|
block:
|
||||||
|
without iter =? (await self.store.query(q)), err:
|
||||||
|
trace "Unable to obtain record for key", key = cidKey
|
||||||
|
return failure(err.msg)
|
||||||
|
|
||||||
defer:
|
defer:
|
||||||
trace "Cleaning up query iterator"
|
if not isNil(iter):
|
||||||
discard (await iter.dispose())
|
trace "Cleaning up query iterator"
|
||||||
|
discard (await iter.dispose())
|
||||||
|
|
||||||
for item in iter:
|
for item in iter:
|
||||||
if pair =? (await item) and pair.key.isSome:
|
if pair =? (await item) and pair.key.isSome:
|
||||||
@ -305,11 +227,11 @@ proc remove*(self: ProvidersManager, cid: NodeId): Future[?!void] {.async.} =
|
|||||||
trace "Error deleting record from persistent store", err = err.msg
|
trace "Error deleting record from persistent store", err = err.msg
|
||||||
continue
|
continue
|
||||||
|
|
||||||
without peerId =? key.id.peerIdFromCidKey, err:
|
without pairs =? key.fromCidKey, err:
|
||||||
trace "Unable to parse peer id from key", key
|
trace "Unable to parse peer id from key", key
|
||||||
continue
|
continue
|
||||||
|
|
||||||
self.removeCache(cid, peerId)
|
self.cache.remove(cid, pairs.peerId)
|
||||||
trace "Deleted record from store", key
|
trace "Deleted record from store", key
|
||||||
|
|
||||||
return success()
|
return success()
|
||||||
@ -321,14 +243,15 @@ proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} =
|
|||||||
let
|
let
|
||||||
q = Query.init(cidKey)
|
q = Query.init(cidKey)
|
||||||
|
|
||||||
without iter =? (await self.store.query(q)), err:
|
|
||||||
trace "Unable to obtain record for key", key = cidKey
|
|
||||||
return failure(err.msg)
|
|
||||||
|
|
||||||
block:
|
block:
|
||||||
|
without iter =? (await self.store.query(q)), err:
|
||||||
|
trace "Unable to obtain record for key", key = cidKey
|
||||||
|
return failure(err.msg)
|
||||||
|
|
||||||
defer:
|
defer:
|
||||||
trace "Cleaning up query iterator"
|
if not isNil(iter):
|
||||||
discard (await iter.dispose())
|
trace "Cleaning up query iterator"
|
||||||
|
discard (await iter.dispose())
|
||||||
|
|
||||||
for item in iter:
|
for item in iter:
|
||||||
if pair =? (await item) and pair.key.isSome:
|
if pair =? (await item) and pair.key.isSome:
|
||||||
@ -344,7 +267,7 @@ proc remove*(self: ProvidersManager, peerId: PeerId): Future[?!void] {.async.} =
|
|||||||
let
|
let
|
||||||
parts = key.id.split(datastore.Separator)
|
parts = key.id.split(datastore.Separator)
|
||||||
|
|
||||||
self.removeCache(NodeId.fromHex(parts[2]), peerId)
|
self.cache.remove(NodeId.fromHex(parts[2]), peerId)
|
||||||
|
|
||||||
without provKey =? makeProviderKey(peerId), err:
|
without provKey =? makeProviderKey(peerId), err:
|
||||||
return failure(err.msg)
|
return failure(err.msg)
|
||||||
@ -357,31 +280,64 @@ proc remove*(
|
|||||||
cid: NodeId,
|
cid: NodeId,
|
||||||
peerId: PeerId): Future[?!void] {.async.} =
|
peerId: PeerId): Future[?!void] {.async.} =
|
||||||
|
|
||||||
self.removeCache(cid, peerId)
|
self.cache.remove(cid, peerId)
|
||||||
|
|
||||||
without cidKey =? makeCidKey(cid, peerId), err:
|
without cidKey =? makeCidKey(cid, peerId), err:
|
||||||
trace "Error creating key from content id", err = err.msg
|
trace "Error creating key from content id", err = err.msg
|
||||||
return failure err.msg
|
return failure err.msg
|
||||||
|
|
||||||
return (await self.store.delete(cidKey))
|
return (await self.store.delete(cidKey))
|
||||||
|
|
||||||
|
proc cleanupExpiredLoop(self: ProvidersManager) {.async.} =
|
||||||
|
try:
|
||||||
|
while self.started:
|
||||||
|
await self.store.cleanupExpired(self.batchSize)
|
||||||
|
await sleepAsync(self.cleanupInterval)
|
||||||
|
except CancelledError as exc:
|
||||||
|
trace "Cancelled expired cleanup job", err = exc.msg
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "Exception in expired cleanup job", err = exc.msg
|
||||||
|
raiseAssert "Exception in expired cleanup job"
|
||||||
|
|
||||||
|
proc cleanupOrphanedLoop(self: ProvidersManager) {.async.} =
|
||||||
|
try:
|
||||||
|
while self.started:
|
||||||
|
await self.store.cleanupOrphaned(self.batchSize)
|
||||||
|
await sleepAsync(self.cleanupInterval)
|
||||||
|
except CancelledError as exc:
|
||||||
|
trace "Cancelled orphaned cleanup job", err = exc.msg
|
||||||
|
except CatchableError as exc:
|
||||||
|
trace "Exception in orphaned cleanup job", err = exc.msg
|
||||||
|
raiseAssert "Exception in orphaned cleanup job"
|
||||||
|
|
||||||
|
proc start*(self: ProvidersManager) {.async.} =
|
||||||
|
self.started = true
|
||||||
|
self.expiredLoop = self.cleanupExpiredLoop
|
||||||
|
self.orphanedLoop = self.cleanupOrphanedLoop
|
||||||
|
|
||||||
|
proc stop*(self: ProvidersManager) {.async.} =
|
||||||
|
await self.expiredLoop.cancelAndWait()
|
||||||
|
await self.orphanedLoop.cancelAndWait()
|
||||||
|
self.started = false
|
||||||
|
|
||||||
func new*(
|
func new*(
|
||||||
T: type ProvidersManager,
|
T: type ProvidersManager,
|
||||||
store: Datastore,
|
store: Datastore,
|
||||||
disableCache = false,
|
disableCache = false,
|
||||||
ttl = DefaultProviderTTL,
|
ttl = DefaultProviderTTL,
|
||||||
maxItems = MaxProvidersEntries,
|
maxItems = MaxProvidersEntries,
|
||||||
maxProviders = MaxProvidersPerEntry): T =
|
maxProviders = MaxProvidersPerEntry,
|
||||||
|
batchSize = ExpiredCleanupBatch,
|
||||||
|
cleanupInterval = CleanupInterval): T =
|
||||||
|
|
||||||
var
|
T(
|
||||||
self = T(
|
store: store,
|
||||||
store: store,
|
ttl: ttl,
|
||||||
ttl: ttl,
|
maxItems: maxItems,
|
||||||
maxItems: maxItems,
|
maxProviders: maxProviders,
|
||||||
maxProviders: maxProviders,
|
disableCache: disableCache,
|
||||||
disableCache: disableCache)
|
batchSize: batchSize,
|
||||||
|
cleanupInterval: cleanupInterval,
|
||||||
if not disableCache:
|
cache: ProvidersCache.init(
|
||||||
self.providers = ItemsCache.init(maxItems.int)
|
size = maxItems,
|
||||||
|
maxProviders = maxProviders,
|
||||||
self
|
disable = disableCache))
|
||||||
@ -8,8 +8,9 @@ import pkg/libp2p
|
|||||||
|
|
||||||
import libp2pdht/dht
|
import libp2pdht/dht
|
||||||
import libp2pdht/private/eth/p2p/discoveryv5/spr
|
import libp2pdht/private/eth/p2p/discoveryv5/spr
|
||||||
import libp2pdht/private/eth/p2p/discoveryv5/providersmngr
|
import libp2pdht/private/eth/p2p/discoveryv5/providers
|
||||||
import libp2pdht/discv5/node
|
import libp2pdht/discv5/node
|
||||||
|
import libp2pdht/private/eth/p2p/discoveryv5/lru
|
||||||
import ./test_helper
|
import ./test_helper
|
||||||
|
|
||||||
suite "Test Providers Manager simple":
|
suite "Test Providers Manager simple":
|
||||||
@ -154,13 +155,13 @@ suite "Test providers with cache":
|
|||||||
(await (manager.remove(nodeIds[99]))).tryGet
|
(await (manager.remove(nodeIds[99]))).tryGet
|
||||||
|
|
||||||
check:
|
check:
|
||||||
nodeIds[0] notin manager.providers
|
nodeIds[0] notin manager.cache.cache
|
||||||
not (await manager.contains(nodeIds[0]))
|
not (await manager.contains(nodeIds[0]))
|
||||||
|
|
||||||
nodeIds[49] notin manager.providers
|
nodeIds[49] notin manager.cache.cache
|
||||||
not (await manager.contains(nodeIds[49]))
|
not (await manager.contains(nodeIds[49]))
|
||||||
|
|
||||||
nodeIds[99] notin manager.providers
|
nodeIds[99] notin manager.cache.cache
|
||||||
not (await manager.contains(nodeIds[99]))
|
not (await manager.contains(nodeIds[99]))
|
||||||
|
|
||||||
test "Should remove by PeerId":
|
test "Should remove by PeerId":
|
||||||
@ -170,16 +171,59 @@ suite "Test providers with cache":
|
|||||||
|
|
||||||
for id in nodeIds:
|
for id in nodeIds:
|
||||||
check:
|
check:
|
||||||
providers[0].data.peerId notin manager.providers.get(id).get
|
providers[0].data.peerId notin manager.cache.cache.get(id).get
|
||||||
not (await manager.contains(id, providers[0].data.peerId))
|
not (await manager.contains(id, providers[0].data.peerId))
|
||||||
|
|
||||||
providers[5].data.peerId notin manager.providers.get(id).get
|
providers[5].data.peerId notin manager.cache.cache.get(id).get
|
||||||
not (await manager.contains(id, providers[5].data.peerId))
|
not (await manager.contains(id, providers[5].data.peerId))
|
||||||
|
|
||||||
providers[9].data.peerId notin manager.providers.get(id).get
|
providers[9].data.peerId notin manager.cache.cache.get(id).get
|
||||||
not (await manager.contains(id, providers[9].data.peerId))
|
not (await manager.contains(id, providers[9].data.peerId))
|
||||||
|
|
||||||
check:
|
check:
|
||||||
not (await manager.contains(providers[0].data.peerId))
|
not (await manager.contains(providers[0].data.peerId))
|
||||||
not (await manager.contains(providers[5].data.peerId))
|
not (await manager.contains(providers[5].data.peerId))
|
||||||
not (await manager.contains(providers[9].data.peerId))
|
not (await manager.contains(providers[9].data.peerId))
|
||||||
|
|
||||||
|
suite "Test Provider Maintenance":
|
||||||
|
let
|
||||||
|
rng = newRng()
|
||||||
|
privKeys = (0..<10).mapIt( PrivateKey.example(rng) )
|
||||||
|
providers = privKeys.mapIt( it.toSignedPeerRecord() )
|
||||||
|
nodeIds = (0..<100).mapIt( NodeId.example(rng) )
|
||||||
|
|
||||||
|
var
|
||||||
|
ds: SQLiteDatastore
|
||||||
|
manager: ProvidersManager
|
||||||
|
|
||||||
|
setupAll:
|
||||||
|
ds = SQLiteDatastore.new(Memory).tryGet()
|
||||||
|
manager = ProvidersManager.new(ds, disableCache = true)
|
||||||
|
|
||||||
|
for id in nodeIds:
|
||||||
|
for p in providers:
|
||||||
|
(await manager.add(id, p, ttl = 1.millis)).tryGet
|
||||||
|
|
||||||
|
teardownAll:
|
||||||
|
(await ds.close()).tryGet()
|
||||||
|
ds = nil
|
||||||
|
manager = nil
|
||||||
|
|
||||||
|
test "Should cleanup expired":
|
||||||
|
for id in nodeIds:
|
||||||
|
check: (await manager.get(id)).tryGet.len == 10
|
||||||
|
|
||||||
|
await sleepAsync(500.millis)
|
||||||
|
await manager.store.cleanupExpired()
|
||||||
|
|
||||||
|
for id in nodeIds:
|
||||||
|
check: (await manager.get(id)).tryGet.len == 0
|
||||||
|
|
||||||
|
test "Should cleanup orphaned":
|
||||||
|
for id in nodeIds:
|
||||||
|
check: (await manager.get(id)).tryGet.len == 0
|
||||||
|
|
||||||
|
await manager.store.cleanupOrphaned()
|
||||||
|
|
||||||
|
for p in providers:
|
||||||
|
check: not (await manager.contains(p.data.peerId))
|
||||||
|
|||||||
@ -35,7 +35,7 @@ proc bootstrapNodes(
|
|||||||
for i in 0..<nodecount:
|
for i in 0..<nodecount:
|
||||||
let privKey = PrivateKey.example(rng)
|
let privKey = PrivateKey.example(rng)
|
||||||
let node = initDiscoveryNode(rng, privKey, localAddress(20302 + i), bootnodes)
|
let node = initDiscoveryNode(rng, privKey, localAddress(20302 + i), bootnodes)
|
||||||
node.start()
|
await node.start()
|
||||||
result.add((node, privKey))
|
result.add((node, privKey))
|
||||||
if delay > 0:
|
if delay > 0:
|
||||||
await sleepAsync(chronos.milliseconds(delay))
|
await sleepAsync(chronos.milliseconds(delay))
|
||||||
|
|||||||
@ -293,7 +293,7 @@ suite "Discovery v5 Tests":
|
|||||||
|
|
||||||
let bootNode =
|
let bootNode =
|
||||||
initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20301))
|
initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20301))
|
||||||
bootNode.start()
|
await bootNode.start()
|
||||||
|
|
||||||
var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount)
|
var nodes = newSeqOfCap[discv5_protocol.Protocol](nodeCount)
|
||||||
nodes.add(bootNode)
|
nodes.add(bootNode)
|
||||||
@ -312,7 +312,7 @@ suite "Discovery v5 Tests":
|
|||||||
# check (await n.ping(t.localNode)).isOk()
|
# check (await n.ping(t.localNode)).isOk()
|
||||||
|
|
||||||
for i in 1 ..< nodeCount:
|
for i in 1 ..< nodeCount:
|
||||||
nodes[i].start()
|
await nodes[i].start()
|
||||||
|
|
||||||
for i in 0..<nodeCount-1:
|
for i in 0..<nodeCount-1:
|
||||||
let target = nodes[i]
|
let target = nodes[i]
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user