Persist Dht providers (#257)

* providers store integration
This commit is contained in:
Dmitriy Ryajov 2022-09-29 22:16:59 -04:00 committed by GitHub
parent 90c818bdda
commit 69bd359287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 38 additions and 496 deletions

View File

@ -125,11 +125,14 @@ proc new*(T: type CodexServer, config: CodexConf): T =
let let
discoveryBootstrapNodes = config.bootstrapNodes discoveryBootstrapNodes = config.bootstrapNodes
discoveryStore = Datastore(SQLiteDatastore.new(
config.dataDir / "dht")
.expect("Should not fail!"))
blockDiscovery = Discovery.new( blockDiscovery = Discovery.new(
switch.peerInfo, switch.peerInfo,
discoveryPort = config.discoveryPort, discoveryPort = config.discoveryPort,
bootstrapNodes = discoveryBootstrapNodes bootstrapNodes = discoveryBootstrapNodes,
) store = discoveryStore)
wallet = WalletRef.new(EthPrivateKey.random()) wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
@ -141,7 +144,7 @@ proc new*(T: type CodexServer, config: CodexConf): T =
msg: "Unable to create data directory for block store: " & repoDir) msg: "Unable to create data directory for block store: " & repoDir)
let let
localStore = SQLiteStore.new(repoDir, cache = cache) localStore = FSStore.new(repoDir, cache = cache)
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)
@ -161,5 +164,4 @@ proc new*(T: type CodexServer, config: CodexConf): T =
T( T(
config: config, config: config,
codexNode: codexNode, codexNode: codexNode,
restServer: restServer, restServer: restServer)
)

View File

@ -39,7 +39,8 @@ proc new*(
localInfo: PeerInfo, localInfo: PeerInfo,
discoveryPort = 0.Port, discoveryPort = 0.Port,
bootstrapNodes: seq[SignedPeerRecord] = @[], bootstrapNodes: seq[SignedPeerRecord] = @[],
): T = store: Datastore = SQLiteDatastore.new(Memory)
.expect("Should not fail!")): T =
T( T(
protocol: newProtocol( protocol: newProtocol(
@ -47,8 +48,8 @@ proc new*(
bindPort = discoveryPort, bindPort = discoveryPort,
record = localInfo.signedPeerRecord, record = localInfo.signedPeerRecord,
bootstrapRecords = bootstrapNodes, bootstrapRecords = bootstrapNodes,
rng = Rng.instance() rng = Rng.instance(),
), providers = ProvidersManager.new(store)),
localInfo: localInfo) localInfo: localInfo)
proc toNodeId*(cid: Cid): NodeId = proc toNodeId*(cid: Cid): NodeId =

19
codex/namespaces.nim Normal file
View File

@ -0,0 +1,19 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/os
const
CodexRepoNamespace* = "/repo" # repository namespace, blocks and manifests are subkeys
CodexBlocksNamespace* = CodexRepoNamespace / "blocks" # blocks namespace
CodexManifestNamespace* = CodexRepoNamespace / "manifests" # manifest namespace
CodexBlocksPersistNamespace* = # Cid's of persisted blocks goes here
CodexMetaNamespace / "blocks" / "persist"
CodexBlocksTtlNamespace* = # Cid TTL
CodexMetaNamespace / "blocks" / "ttl"

View File

@ -1,8 +1,6 @@
import ./stores/[ import ./stores/cachestore
cachestore, import ./stores/blockstore
blockstore, import ./stores/networkstore
networkstore, import ./stores/fsstore
fsstore,
sqlitestore]
export cachestore, blockstore, networkstore, fsstore, sqlitestore export cachestore, blockstore, networkstore, fsstore

View File

@ -1,255 +0,0 @@
## Nim-Codex
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push: {.upraises: [].}
import pkg/chronos
import pkg/chronicles
import pkg/datastore/sqlite_datastore
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import pkg/sqlite3_abi
import ./blockstore
import ./cachestore
export blockstore, sqlite_datastore
logScope:
topics = "codex sqlitestore"
type
ListBlocksQueryResponse = string
ListBlocksQueryStmt = SQLiteStmt[(string), void]
SQLiteStore* = ref object of BlockStore
cache: BlockStore
datastore: SQLiteDatastore
const
listBlocksQueryStmtStr = """
SELECT """ & idColName & """ FROM """ & tableName & """;
"""
listBlocksQueryStmtIdCol = 0
proc new*(
T: type SQLiteStore,
repoDir: string,
cache: BlockStore = CacheStore.new()): T =
let
datastoreRes = SQLiteDatastore.new(repoDir)
if datastoreRes.isErr:
raise (ref Defect)(msg: datastoreRes.error.msg)
T(cache: cache, datastore: datastoreRes.get)
proc datastore*(self: SQLiteStore): SQLiteDatastore =
self.datastore
proc blockKey*(blockCid: Cid): ?!Key =
let
keyRes = Key.init($blockCid)
if keyRes.isErr:
trace "Unable to construct CID from key", cid = blockCid, error = keyRes.error.msg
keyRes
method getBlock*(
self: SQLiteStore,
cid: Cid): Future[?!Block] {.async.} =
## Get a block from the cache or database.
## Save a copy to the cache if present in the database but not in the cache
##
if not self.cache.isNil:
trace "Getting block from cache or database", cid
else:
trace "Getting block from database", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success cid.emptyBlock
if not self.cache.isNil:
let
cachedBlockRes = await self.cache.getBlock(cid)
if cachedBlockRes.isOk:
return success cachedBlockRes.get
else:
trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg
without blkKey =? blockKey(cid), error:
return failure error
without dataOpt =? await self.datastore.get(blkKey), error:
trace "Error requesting block from database", key = blkKey.id, error = error.msg
return failure error
without data =? dataOpt:
return failure (ref BlockNotFoundError)(msg: "Block not in database")
without blk =? Block.new(cid, data), error:
trace "Unable to construct block from data", cid, error = error.msg
return failure error
if not self.cache.isNil:
let
putCachedRes = await self.cache.putBlock(blk)
if putCachedRes.isErr:
trace "Unable to store block in cache", cid, error = putCachedRes.error.msg
return success blk
method putBlock*(
self: SQLiteStore,
blk: Block): Future[?!void] {.async.} =
## Write a block's contents to the database with key based on blk.cid.
## Save a copy to the cache
##
if not self.cache.isNil:
trace "Putting block into database and cache", cid = blk.cid
else:
trace "Putting block into database", cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
without blkKey =? blockKey(blk.cid), error:
return failure error
let
putRes = await self.datastore.put(blkKey, blk.data)
if putRes.isErr:
trace "Unable to store block in database", key = blkKey.id, error = putRes.error.msg
return failure putRes.error
if not self.cache.isNil:
let
putCachedRes = await self.cache.putBlock(blk)
if putCachedRes.isErr:
trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg
return success()
method delBlock*(
self: SQLiteStore,
cid: Cid): Future[?!void] {.async.} =
## Delete a block from the cache and database
##
if not self.cache.isNil:
trace "Deleting block from cache and database", cid
else:
trace "Deleting block from database", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
if not self.cache.isNil:
let
delCachedRes = await self.cache.delBlock(cid)
if delCachedRes.isErr:
trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg
without blkKey =? blockKey(cid), error:
return failure error
let
delRes = await self.datastore.delete(blkKey)
if delRes.isErr:
trace "Unable to delete block from database", key = blkKey.id, error = delRes.error.msg
return failure delRes.error
return success()
method hasBlock*(
self: SQLiteStore,
cid: Cid): Future[?!bool] {.async.} =
## Check if a block exists in the database
##
trace "Checking database for block existence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true.success
without blkKey =? blockKey(cid), error:
return failure error
return await self.datastore.contains(blkKey)
iterator listBlocksQuery(self: SQLiteStore): ListBlocksQueryResponse =
without listBlocksQueryStmt =? ListBlocksQueryStmt.prepare(self.datastore.env,
listBlocksQueryStmtStr), error:
raise (ref Defect)(msg: error.msg)
let
s = RawStmtPtr(listBlocksQueryStmt)
defer:
discard sqlite3_reset(s)
s.dispose
while true:
let
v = sqlite3_step(s)
case v
of SQLITE_ROW:
yield $sqlite3_column_text_not_null(s, listBlocksQueryStmtIdCol)
of SQLITE_DONE:
break
else:
raise (ref Defect)(msg: $sqlite3_errstr(v))
method listBlocks*(
self: SQLiteStore,
onBlock: OnBlock): Future[?!void] {.async.} =
## Process list of all blocks in the database via callback.
## This is an intensive operation
##
for id in self.listBlocksQuery():
let
# keys stored in id column of SQLiteDatastore are serialized Key
# instances that start with "/", so drop the first character
cidRes = Cid.init(id[1..^1])
if cidRes.isOk:
await onBlock(cidRes.get)
else:
trace "Unable to construct CID from key", key = id, error = $cidRes.error
return success()
method close*(self: SQLiteStore): Future[void] {.async.} =
## Close the underlying cache and SQLite datastore
##
if not self.cache.isNil: await self.cache.close
if not self.datastore.isNil: self.datastore.close

View File

@ -1,222 +0,0 @@
import std/oids
import std/options
import std/os
import std/random
import std/sequtils
import std/sets
import pkg/asynctest
import pkg/chronos
import pkg/stew/byteutils
import pkg/codex/chunker
import pkg/codex/blocktype as bt
import pkg/codex/stores
import ../helpers
proc runSuite(cache: bool) =
suite "SQLite Store " & (if cache: "(cache enabled)" else: "(cache disabled)"):
randomize()
var
store: SQLiteStore
let
repoDir = getAppDir() / "repo"
proc randomBlock(): bt.Block =
let
blockRes = bt.Block.new(($genOid()).toBytes)
require(blockRes.isOk)
blockRes.get
var
newBlock: bt.Block
setup:
removeDir(repoDir)
require(not dirExists(repoDir))
createDir(repoDir)
if cache:
store = SQLiteStore.new(repoDir)
else:
store = SQLiteStore.new(repoDir, cache = nil)
newBlock = randomBlock()
teardown:
if not store.isNil: await store.close
store = nil
removeDir(repoDir)
require(not dirExists(repoDir))
test "putBlock":
let
blkKeyRes = blockKey(newBlock.cid)
assert blkKeyRes.isOk
let
blkKey = blkKeyRes.get
var
# bypass enabled cache
containsRes = await store.datastore.contains(blkKey)
assert containsRes.isOk
assert not containsRes.get
let
putRes = await store.putBlock(newBlock)
check: putRes.isOk
# bypass enabled cache
containsRes = await store.datastore.contains(blkKey)
assert containsRes.isOk
check: containsRes.get
test "getBlock":
var
r = rand(100)
# put `r` number of random blocks before putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
assert kRes.isOk
let
# bypass enabled cache
pRes = await store.datastore.put(kRes.get, b.data)
assert pRes.isOk
let
blkKeyRes = blockKey(newBlock.cid)
assert blkKeyRes.isOk
var
# bypass enabled cache
putRes = await store.datastore.put(blkKeyRes.get, newBlock.data)
assert putRes.isOk
r = rand(100)
# put `r` number of random blocks after putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
assert kRes.isOk
let
# bypass enabled cache
pRes = await store.datastore.put(kRes.get, b.data)
assert pRes.isOk
var
# get from database
getRes = await store.getBlock(newBlock.cid)
check:
getRes.isOk
getRes.get == newBlock
# get from enabled cache
getRes = await store.getBlock(newBlock.cid)
check:
getRes.isOk
getRes.get == newBlock
test "fail getBlock":
let
blkRes = await store.getBlock(newBlock.cid)
check:
blkRes.isErr
blkRes.error of BlockNotFoundError
test "hasBlock":
let
putRes = await store.putBlock(newBlock)
assert putRes.isOk
let
hasRes = await store.hasBlock(newBlock.cid)
check:
hasRes.isOk
hasRes.get
await newBlock.cid in store
test "fail hasBlock":
let
hasRes = await store.hasBlock(newBlock.cid)
check:
hasRes.isOk
not hasRes.get
not (await newBlock.cid in store)
test "listBlocks":
var
newBlocks: seq[bt.Block]
for _ in 0..99:
let
b = randomBlock()
pRes = await store.putBlock(b)
assert pRes.isOk
newBlocks.add(b)
var
called = 0
cids = toHashSet(newBlocks.mapIt(it.cid))
let
onBlock = proc(cid: Cid) {.async, gcsafe.} =
check: cid in cids
if cid in cids:
inc called
cids.excl(cid)
listRes = await store.listBlocks(onBlock)
check:
listRes.isOk
called == newBlocks.len
test "delBlock":
let
putRes = await store.putBlock(newBlock)
assert putRes.isOk
assert (await newBlock.cid in store)
let
delRes = await store.delBlock(newBlock.cid)
check:
delRes.isOk
not (await newBlock.cid in store)
runSuite(cache = true)
runSuite(cache = false)

View File

@ -1,5 +1,4 @@
import ./stores/testcachestore import ./stores/testcachestore
import ./stores/testfsstore import ./stores/testfsstore
import ./stores/testsqlitestore
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}

2
vendor/asynctest vendored

@ -1 +1 @@
Subproject commit 5347c59b4b057443a014722aa40800cd8bb95c69 Subproject commit a236a5f0f3031573ac2cb082b63dbf6e170e06e7

@ -1 +1 @@
Subproject commit 2769ce1de21e595e712fd1df7a195c34cd5d18de Subproject commit f5dadd93be77bdd84e95b56e109636a51b979d0f

@ -1 +1 @@
Subproject commit 39c0ffc970bc40d7f9f6282fd037b6bf621ffc5c Subproject commit 69ae7c2012d5ae89eab5ed3d7813daedba4018d9