From 69bd359287fc4a75cafb50e91464ec0997cb1b8a Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 29 Sep 2022 22:16:59 -0400 Subject: [PATCH] Persist Dht providers (#257) * providers store integration --- codex/codex.nim | 12 +- codex/discovery.nim | 7 +- codex/namespaces.nim | 19 ++ codex/stores.nim | 12 +- codex/stores/sqlitestore.nim | 255 ------------------------- tests/codex/stores/testsqlitestore.nim | 222 --------------------- tests/codex/teststores.nim | 1 - vendor/asynctest | 2 +- vendor/nim-datastore | 2 +- vendor/nim-libp2p-dht | 2 +- 10 files changed, 38 insertions(+), 496 deletions(-) create mode 100644 codex/namespaces.nim delete mode 100644 codex/stores/sqlitestore.nim delete mode 100644 tests/codex/stores/testsqlitestore.nim diff --git a/codex/codex.nim b/codex/codex.nim index 282c5e0f..458a63ff 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -125,11 +125,14 @@ proc new*(T: type CodexServer, config: CodexConf): T = let discoveryBootstrapNodes = config.bootstrapNodes + discoveryStore = Datastore(SQLiteDatastore.new( + config.dataDir / "dht") + .expect("Should not fail!")) blockDiscovery = Discovery.new( switch.peerInfo, discoveryPort = config.discoveryPort, - bootstrapNodes = discoveryBootstrapNodes - ) + bootstrapNodes = discoveryBootstrapNodes, + store = discoveryStore) wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) @@ -141,7 +144,7 @@ proc new*(T: type CodexServer, config: CodexConf): T = msg: "Unable to create data directory for block store: " & repoDir) let - localStore = SQLiteStore.new(repoDir, cache = cache) + localStore = FSStore.new(repoDir, cache = cache) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) @@ -161,5 +164,4 @@ proc new*(T: type CodexServer, config: CodexConf): T = T( config: config, codexNode: codexNode, - restServer: restServer, - ) + restServer: restServer) diff --git a/codex/discovery.nim b/codex/discovery.nim index 3c46767d..71dd7d7b 100644 --- a/codex/discovery.nim +++ b/codex/discovery.nim @@ -39,7 +39,8 @@ proc new*( localInfo: PeerInfo, discoveryPort = 0.Port, bootstrapNodes: seq[SignedPeerRecord] = @[], - ): T = + store: Datastore = SQLiteDatastore.new(Memory) + .expect("Should not fail!")): T = T( protocol: newProtocol( @@ -47,8 +48,8 @@ proc new*( bindPort = discoveryPort, record = localInfo.signedPeerRecord, bootstrapRecords = bootstrapNodes, - rng = Rng.instance() - ), + rng = Rng.instance(), + providers = ProvidersManager.new(store)), localInfo: localInfo) proc toNodeId*(cid: Cid): NodeId = diff --git a/codex/namespaces.nim b/codex/namespaces.nim new file mode 100644 index 00000000..6d675a1b --- /dev/null +++ b/codex/namespaces.nim @@ -0,0 +1,19 @@ +## Nim-Codex +## Copyright (c) 2022 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import std/os + +const + CodexRepoNamespace* = "/repo" # repository namespace, blocks and manifests are subkeys + CodexBlocksNamespace* = CodexRepoNamespace / "blocks" # blocks namespace + CodexManifestNamespace* = CodexRepoNamespace / "manifests" # manifest namespace + CodexBlocksPersistNamespace* = # Cid's of persisted blocks goes here + CodexMetaNamespace / "blocks" / "persist" + CodexBlocksTtlNamespace* = # Cid TTL + CodexMetaNamespace / "blocks" / "ttl" diff --git a/codex/stores.nim b/codex/stores.nim index c84e806c..5b18f1f2 100644 --- a/codex/stores.nim +++ b/codex/stores.nim @@ -1,8 +1,6 @@ -import ./stores/[ - cachestore, - blockstore, - networkstore, - fsstore, - sqlitestore] +import ./stores/cachestore +import ./stores/blockstore +import ./stores/networkstore +import ./stores/fsstore -export cachestore, blockstore, networkstore, fsstore, sqlitestore +export cachestore, blockstore, networkstore, fsstore diff --git a/codex/stores/sqlitestore.nim b/codex/stores/sqlitestore.nim deleted file mode 100644 index 7dae2e6c..00000000 --- a/codex/stores/sqlitestore.nim +++ /dev/null @@ -1,255 +0,0 @@ -## Nim-Codex -## Copyright (c) 2021 Status Research & Development GmbH -## Licensed under either of -## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) -## * MIT license ([LICENSE-MIT](LICENSE-MIT)) -## at your option. -## This file may not be copied, modified, or distributed except according to -## those terms. - -import pkg/upraises - -push: {.upraises: [].} - -import pkg/chronos -import pkg/chronicles -import pkg/datastore/sqlite_datastore -import pkg/libp2p -import pkg/questionable -import pkg/questionable/results -import pkg/sqlite3_abi - -import ./blockstore -import ./cachestore - -export blockstore, sqlite_datastore - -logScope: - topics = "codex sqlitestore" - -type - ListBlocksQueryResponse = string - - ListBlocksQueryStmt = SQLiteStmt[(string), void] - - SQLiteStore* = ref object of BlockStore - cache: BlockStore - datastore: SQLiteDatastore - -const - listBlocksQueryStmtStr = """ - SELECT """ & idColName & """ FROM """ & tableName & """; - """ - - listBlocksQueryStmtIdCol = 0 - -proc new*( - T: type SQLiteStore, - repoDir: string, - cache: BlockStore = CacheStore.new()): T = - - let - datastoreRes = SQLiteDatastore.new(repoDir) - - if datastoreRes.isErr: - raise (ref Defect)(msg: datastoreRes.error.msg) - - T(cache: cache, datastore: datastoreRes.get) - -proc datastore*(self: SQLiteStore): SQLiteDatastore = - self.datastore - -proc blockKey*(blockCid: Cid): ?!Key = - let - keyRes = Key.init($blockCid) - - if keyRes.isErr: - trace "Unable to construct CID from key", cid = blockCid, error = keyRes.error.msg - - keyRes - -method getBlock*( - self: SQLiteStore, - cid: Cid): Future[?!Block] {.async.} = - ## Get a block from the cache or database. - ## Save a copy to the cache if present in the database but not in the cache - ## - - if not self.cache.isNil: - trace "Getting block from cache or database", cid - else: - trace "Getting block from database", cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return success cid.emptyBlock - - if not self.cache.isNil: - let - cachedBlockRes = await self.cache.getBlock(cid) - - if cachedBlockRes.isOk: - return success cachedBlockRes.get - else: - trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg - - without blkKey =? blockKey(cid), error: - return failure error - - without dataOpt =? await self.datastore.get(blkKey), error: - trace "Error requesting block from database", key = blkKey.id, error = error.msg - return failure error - - without data =? dataOpt: - return failure (ref BlockNotFoundError)(msg: "Block not in database") - - without blk =? Block.new(cid, data), error: - trace "Unable to construct block from data", cid, error = error.msg - return failure error - - if not self.cache.isNil: - let - putCachedRes = await self.cache.putBlock(blk) - - if putCachedRes.isErr: - trace "Unable to store block in cache", cid, error = putCachedRes.error.msg - - return success blk - -method putBlock*( - self: SQLiteStore, - blk: Block): Future[?!void] {.async.} = - ## Write a block's contents to the database with key based on blk.cid. - ## Save a copy to the cache - ## - - if not self.cache.isNil: - trace "Putting block into database and cache", cid = blk.cid - else: - trace "Putting block into database", cid = blk.cid - - if blk.isEmpty: - trace "Empty block, ignoring" - return success() - - without blkKey =? blockKey(blk.cid), error: - return failure error - - let - putRes = await self.datastore.put(blkKey, blk.data) - - if putRes.isErr: - trace "Unable to store block in database", key = blkKey.id, error = putRes.error.msg - return failure putRes.error - - if not self.cache.isNil: - let - putCachedRes = await self.cache.putBlock(blk) - - if putCachedRes.isErr: - trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg - - return success() - -method delBlock*( - self: SQLiteStore, - cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the cache and database - ## - - if not self.cache.isNil: - trace "Deleting block from cache and database", cid - else: - trace "Deleting block from database", cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return success() - - if not self.cache.isNil: - let - delCachedRes = await self.cache.delBlock(cid) - - if delCachedRes.isErr: - trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg - - without blkKey =? blockKey(cid), error: - return failure error - - let - delRes = await self.datastore.delete(blkKey) - - if delRes.isErr: - trace "Unable to delete block from database", key = blkKey.id, error = delRes.error.msg - return failure delRes.error - - return success() - -method hasBlock*( - self: SQLiteStore, - cid: Cid): Future[?!bool] {.async.} = - ## Check if a block exists in the database - ## - - trace "Checking database for block existence", cid - - if cid.isEmpty: - trace "Empty block, ignoring" - return true.success - - without blkKey =? blockKey(cid), error: - return failure error - - return await self.datastore.contains(blkKey) - -iterator listBlocksQuery(self: SQLiteStore): ListBlocksQueryResponse = - without listBlocksQueryStmt =? ListBlocksQueryStmt.prepare(self.datastore.env, - listBlocksQueryStmtStr), error: - - raise (ref Defect)(msg: error.msg) - - let - s = RawStmtPtr(listBlocksQueryStmt) - - defer: - discard sqlite3_reset(s) - s.dispose - - while true: - let - v = sqlite3_step(s) - - case v - of SQLITE_ROW: - yield $sqlite3_column_text_not_null(s, listBlocksQueryStmtIdCol) - of SQLITE_DONE: - break - else: - raise (ref Defect)(msg: $sqlite3_errstr(v)) - -method listBlocks*( - self: SQLiteStore, - onBlock: OnBlock): Future[?!void] {.async.} = - ## Process list of all blocks in the database via callback. - ## This is an intensive operation - ## - - for id in self.listBlocksQuery(): - let - # keys stored in id column of SQLiteDatastore are serialized Key - # instances that start with "/", so drop the first character - cidRes = Cid.init(id[1..^1]) - - if cidRes.isOk: - await onBlock(cidRes.get) - else: - trace "Unable to construct CID from key", key = id, error = $cidRes.error - - return success() - -method close*(self: SQLiteStore): Future[void] {.async.} = - ## Close the underlying cache and SQLite datastore - ## - - if not self.cache.isNil: await self.cache.close - if not self.datastore.isNil: self.datastore.close diff --git a/tests/codex/stores/testsqlitestore.nim b/tests/codex/stores/testsqlitestore.nim deleted file mode 100644 index 2e43eb92..00000000 --- a/tests/codex/stores/testsqlitestore.nim +++ /dev/null @@ -1,222 +0,0 @@ -import std/oids -import std/options -import std/os -import std/random -import std/sequtils -import std/sets - -import pkg/asynctest -import pkg/chronos -import pkg/stew/byteutils - -import pkg/codex/chunker -import pkg/codex/blocktype as bt -import pkg/codex/stores - -import ../helpers - -proc runSuite(cache: bool) = - suite "SQLite Store " & (if cache: "(cache enabled)" else: "(cache disabled)"): - randomize() - - var - store: SQLiteStore - - let - repoDir = getAppDir() / "repo" - - proc randomBlock(): bt.Block = - let - blockRes = bt.Block.new(($genOid()).toBytes) - - require(blockRes.isOk) - blockRes.get - - var - newBlock: bt.Block - - setup: - removeDir(repoDir) - require(not dirExists(repoDir)) - createDir(repoDir) - - if cache: - store = SQLiteStore.new(repoDir) - else: - store = SQLiteStore.new(repoDir, cache = nil) - - newBlock = randomBlock() - - teardown: - if not store.isNil: await store.close - store = nil - removeDir(repoDir) - require(not dirExists(repoDir)) - - test "putBlock": - let - blkKeyRes = blockKey(newBlock.cid) - - assert blkKeyRes.isOk - - let - blkKey = blkKeyRes.get - - var - # bypass enabled cache - containsRes = await store.datastore.contains(blkKey) - - assert containsRes.isOk - assert not containsRes.get - - let - putRes = await store.putBlock(newBlock) - - check: putRes.isOk - - # bypass enabled cache - containsRes = await store.datastore.contains(blkKey) - - assert containsRes.isOk - - check: containsRes.get - - test "getBlock": - var - r = rand(100) - - # put `r` number of random blocks before putting newBlock - if r > 0: - for _ in 0..r: - let - b = randomBlock() - kRes = blockKey(b.cid) - - assert kRes.isOk - - let - # bypass enabled cache - pRes = await store.datastore.put(kRes.get, b.data) - - assert pRes.isOk - - let - blkKeyRes = blockKey(newBlock.cid) - - assert blkKeyRes.isOk - - var - # bypass enabled cache - putRes = await store.datastore.put(blkKeyRes.get, newBlock.data) - - assert putRes.isOk - - r = rand(100) - - # put `r` number of random blocks after putting newBlock - if r > 0: - for _ in 0..r: - let - b = randomBlock() - kRes = blockKey(b.cid) - - assert kRes.isOk - - let - # bypass enabled cache - pRes = await store.datastore.put(kRes.get, b.data) - - assert pRes.isOk - - var - # get from database - getRes = await store.getBlock(newBlock.cid) - - check: - getRes.isOk - getRes.get == newBlock - - # get from enabled cache - getRes = await store.getBlock(newBlock.cid) - - check: - getRes.isOk - getRes.get == newBlock - - test "fail getBlock": - let - blkRes = await store.getBlock(newBlock.cid) - - check: - blkRes.isErr - blkRes.error of BlockNotFoundError - - test "hasBlock": - let - putRes = await store.putBlock(newBlock) - - assert putRes.isOk - - let - hasRes = await store.hasBlock(newBlock.cid) - - check: - hasRes.isOk - hasRes.get - await newBlock.cid in store - - test "fail hasBlock": - let - hasRes = await store.hasBlock(newBlock.cid) - - check: - hasRes.isOk - not hasRes.get - not (await newBlock.cid in store) - - test "listBlocks": - var - newBlocks: seq[bt.Block] - - for _ in 0..99: - let - b = randomBlock() - pRes = await store.putBlock(b) - - assert pRes.isOk - - newBlocks.add(b) - - var - called = 0 - cids = toHashSet(newBlocks.mapIt(it.cid)) - - let - onBlock = proc(cid: Cid) {.async, gcsafe.} = - check: cid in cids - if cid in cids: - inc called - cids.excl(cid) - - listRes = await store.listBlocks(onBlock) - - check: - listRes.isOk - called == newBlocks.len - - test "delBlock": - let - putRes = await store.putBlock(newBlock) - - assert putRes.isOk - assert (await newBlock.cid in store) - - let - delRes = await store.delBlock(newBlock.cid) - - check: - delRes.isOk - not (await newBlock.cid in store) - -runSuite(cache = true) -runSuite(cache = false) diff --git a/tests/codex/teststores.nim b/tests/codex/teststores.nim index 6c2cfd33..3e83116c 100644 --- a/tests/codex/teststores.nim +++ b/tests/codex/teststores.nim @@ -1,5 +1,4 @@ import ./stores/testcachestore import ./stores/testfsstore -import ./stores/testsqlitestore {.warning[UnusedImport]: off.} diff --git a/vendor/asynctest b/vendor/asynctest index 5347c59b..a236a5f0 160000 --- a/vendor/asynctest +++ b/vendor/asynctest @@ -1 +1 @@ -Subproject commit 5347c59b4b057443a014722aa40800cd8bb95c69 +Subproject commit a236a5f0f3031573ac2cb082b63dbf6e170e06e7 diff --git a/vendor/nim-datastore b/vendor/nim-datastore index 2769ce1d..f5dadd93 160000 --- a/vendor/nim-datastore +++ b/vendor/nim-datastore @@ -1 +1 @@ -Subproject commit 2769ce1de21e595e712fd1df7a195c34cd5d18de +Subproject commit f5dadd93be77bdd84e95b56e109636a51b979d0f diff --git a/vendor/nim-libp2p-dht b/vendor/nim-libp2p-dht index 39c0ffc9..69ae7c20 160000 --- a/vendor/nim-libp2p-dht +++ b/vendor/nim-libp2p-dht @@ -1 +1 @@ -Subproject commit 39c0ffc970bc40d7f9f6282fd037b6bf621ffc5c +Subproject commit 69ae7c2012d5ae89eab5ed3d7813daedba4018d9