From 75666d01bf76cd95ac99a48a75ba13052d902b84 Mon Sep 17 00:00:00 2001 From: "Michael Bradley, Jr" Date: Fri, 22 Jul 2022 18:38:49 -0500 Subject: [PATCH] [node] add SQLiteStore backend Closes #138 --- codex/codex.nim | 2 +- codex/node.nim | 3 + codex/stores.nim | 5 +- codex/stores/blockstore.nim | 7 + codex/stores/cachestore.nim | 7 +- codex/stores/fsstore.nim | 17 +- codex/stores/networkstore.nim | 6 + codex/stores/sqlitestore.nim | 209 +++++++++++++++++++ tests/codex/storageproofs/testnetwork.nim | 1 - tests/codex/storageproofs/teststpstore.nim | 3 +- tests/codex/stores/testcachestore.nim | 2 +- tests/codex/stores/testfsstore.nim | 5 +- tests/codex/stores/testsqlitestore.nim | 225 +++++++++++++++++++++ tests/codex/teststores.nim | 3 +- 14 files changed, 472 insertions(+), 23 deletions(-) create mode 100644 codex/stores/sqlitestore.nim create mode 100644 tests/codex/stores/testsqlitestore.nim diff --git a/codex/codex.nim b/codex/codex.nim index 27baa2a2..939dbb9c 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -118,7 +118,7 @@ proc new*(T: type CodexServer, config: CodexConf): T = wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) - localStore = FSStore.new(config.dataDir / "repo", cache = cache) + localStore = SQLiteStore.new(config.dataDir / "repo", cache = cache) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) diff --git a/codex/node.nim b/codex/node.nim index 0dc1305d..196e1df4 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -365,3 +365,6 @@ proc stop*(node: CodexNodeRef) {.async.} = if contracts =? node.contracts: await contracts.stop() + + if not node.blockStore.isNil: + await node.blockStore.close diff --git a/codex/stores.nim b/codex/stores.nim index 86947906..c84e806c 100644 --- a/codex/stores.nim +++ b/codex/stores.nim @@ -2,6 +2,7 @@ import ./stores/[ cachestore, blockstore, networkstore, - fsstore] + fsstore, + sqlitestore] -export cachestore, blockstore, networkstore, fsstore +export cachestore, blockstore, networkstore, fsstore, sqlitestore diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 478c818e..6109ca4e 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -54,6 +54,13 @@ method listBlocks*(self: BlockStore, onBlock: OnBlock): Future[?!void] {.base.} raiseAssert("Not implemented!") +method close*(self: Blockstore): Future[void] {.base.} = + ## Close the blockstore, cleaning up resources managed by it. + ## For some implementations this may be a no-op + ## + + raiseAssert("Not implemented!") + proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} = ## Check if the block exists in the blockstore. ## Return false if error encountered diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index 0dd7b7b3..c3091b40 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -12,7 +12,6 @@ import pkg/upraises push: {.upraises: [].} - import std/options import pkg/chronicles @@ -132,6 +131,12 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = return success() +method close*(self: CacheStore): Future[void] {.async.} = + ## Close the blockstore, a no-op for this implementation + ## + + discard + func new*( _: type CacheStore, blocks: openArray[Block] = [], diff --git a/codex/stores/fsstore.nim b/codex/stores/fsstore.nim index bb1630d1..bae8d16c 100644 --- a/codex/stores/fsstore.nim +++ b/codex/stores/fsstore.nim @@ -146,19 +146,16 @@ method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} = for (fkind, filename) in folderPath.walkDir(relative = true): if fkind != pcFile: continue let cid = Cid.init(filename) - if cid.isOk: - # getting a weird `Error: unhandled exception: index 1 not in 0 .. 0 [IndexError]` - # compilation error if using different syntax/construct bellow - try: - await onBlock(cid.get()) - except CancelledError as exc: - trace "Cancelling list blocks" - raise exc - except CatchableError as exc: - trace "Couldn't get block", cid = $(cid.get()) + if cid.isOk: await onBlock(cid.get()) return success() +method close*(self: FSStore): Future[void] {.async.} = + ## Close the underlying cache + ## + + if not self.cache.isNil: await self.cache.close + proc new*( T: type FSStore, repoDir: string, diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 7d64a451..0097b285 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -83,6 +83,12 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = trace "Checking network store for block existence", cid return await self.localStore.hasBlock(cid) +method close*(self: NetworkStore): Future[void] {.async.} = + ## Close the underlying local blockstore + ## + + if not self.localStore.isNil: await self.localStore.close + proc new*( T: type NetworkStore, engine: BlockExcEngine, diff --git a/codex/stores/sqlitestore.nim b/codex/stores/sqlitestore.nim new file mode 100644 index 00000000..a824cc56 --- /dev/null +++ b/codex/stores/sqlitestore.nim @@ -0,0 +1,209 @@ +## Nim-Codex +## Copyright (c) 2021 Status Research & Development GmbH +## Licensed under either of +## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +## * MIT license ([LICENSE-MIT](LICENSE-MIT)) +## at your option. +## This file may not be copied, modified, or distributed except according to +## those terms. + +import pkg/upraises + +push: {.upraises: [].} + +import std/options + +import pkg/chronos +import pkg/chronicles +import pkg/datastore/sqlite_datastore +import pkg/libp2p +import pkg/questionable +import pkg/questionable/results + +import ./blockstore +import ./cachestore + +export blockstore, sqlite_datastore + +logScope: + topics = "codex sqlitestore" + +type + SQLiteStore* = ref object of BlockStore + cache: BlockStore + datastore: SQLiteDatastore + +const + allBlocks = when (let keyRes = Key.init("*"); true): + if keyRes.isOk: Query.init(keyRes.get) + else: raise (ref Defect)(msg: keyRes.error.msg) + +proc new*( + T: type SQLiteStore, + repoDir: string, + cache: BlockStore = CacheStore.new()): T = + + let + datastoreRes = SQLiteDatastore.new(repoDir) + + if datastoreRes.isErr: + raise (ref Defect)(msg: datastoreRes.error.msg) + + T(cache: cache, datastore: datastoreRes.get) + +proc datastore*(self: SQLiteStore): SQLiteDatastore = + self.datastore + +proc blockKey*(blockCid: Cid): ?!Key = + let + keyRes = Key.init($blockCid) + + if keyRes.isErr: + trace "Unable to construct CID from key", cid = blockCid, error = keyRes.error.msg + + keyRes + +method getBlock*( + self: SQLiteStore, + cid: Cid): Future[?!(?Block)] {.async.} = + ## Get a block from the cache or database. + ## Save a copy to the cache if present in the database but not in the cache + ## + + trace "Getting block from cache or database", cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return success cid.emptyBlock.some + + without cachedBlkOpt =? await self.cache.getBlock(cid), error: + trace "Unable to read block from cache", cid, error = error.msg + + if cachedBlkOpt.isSome: + return success cachedBlkOpt + + without blkKey =? blockKey(cid), error: + return failure error + + without dataOpt =? await self.datastore.get(blkKey), error: + trace "Unable to read block from database", key = blkKey.id, error = error.msg + return failure error + + without data =? dataOpt: + return success Block.none + + without blk =? Block.new(cid, data), error: + trace "Unable to construct block from data", cid, error = error.msg + return failure error + + let + putCachedRes = await self.cache.putBlock(blk) + + if putCachedRes.isErr: + trace "Unable to store block in cache", cid, error = putCachedRes.error.msg + + return success blk.some + +method putBlock*( + self: SQLiteStore, + blk: Block): Future[?!void] {.async.} = + ## Write a block's contents to the database with key based on blk.cid. + ## Save a copy to the cache + ## + + trace "Putting block into database and cache", cid = blk.cid + + if blk.isEmpty: + trace "Empty block, ignoring" + return success() + + without blkKey =? blockKey(blk.cid), error: + return failure error + + let + putRes = await self.datastore.put(blkKey, blk.data) + + if putRes.isErr: + trace "Unable to store block in database", key = blkKey.id, error = putRes.error.msg + return failure putRes.error + + let + putCachedRes = await self.cache.putBlock(blk) + + if putCachedRes.isErr: + trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg + + return success() + +method delBlock*( + self: SQLiteStore, + cid: Cid): Future[?!void] {.async.} = + ## Delete a block from the database and cache + ## + + trace "Deleting block from cache and database", cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return success() + + let + delCachedRes = await self.cache.delBlock(cid) + + if delCachedRes.isErr: + trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg + + without blkKey =? blockKey(cid), error: + return failure error + + let + delRes = await self.datastore.delete(blkKey) + + if delRes.isErr: + trace "Unable to delete block from database", key = blkKey.id, error = delRes.error.msg + return failure delRes.error + + return success() + +method hasBlock*( + self: SQLiteStore, + cid: Cid): Future[?!bool] {.async.} = + ## Check if a block exists in the database + ## + + trace "Checking database for block existence", cid + + if cid.isEmpty: + trace "Empty block, ignoring" + return true.success + + without blkKey =? blockKey(cid), error: + return failure error + + return await self.datastore.contains(blkKey) + +method listBlocks*( + self: SQLiteStore, + onBlock: OnBlock): Future[?!void] {.async.} = + ## Process list of all blocks in the database via callback. + ## This is an intensive operation + ## + + for kd in self.datastore.query(allBlocks): + let + (key, _) = await kd + cidRes = Cid.init(key.name) + + if cidRes.isOk: + await onBlock(cidRes.get) + else: + trace "Unable to construct CID from key", key = key.id, error = $cidRes.error + + return success() + +method close*(self: SQLiteStore): Future[void] {.async.} = + ## Close the underlying cache and SQLite datastore + ## + + if not self.cache.isNil: await self.cache.close + if not self.datastore.isNil: self.datastore.close diff --git a/tests/codex/storageproofs/testnetwork.nim b/tests/codex/storageproofs/testnetwork.nim index f3e67954..ad59a95f 100644 --- a/tests/codex/storageproofs/testnetwork.nim +++ b/tests/codex/storageproofs/testnetwork.nim @@ -48,7 +48,6 @@ suite "Storage Proofs Network": store: BlockStore ssk: st.SecretKey spk: st.PublicKey - repoDir: string stpstore: st.StpStore porMsg: PorMessage cid: Cid diff --git a/tests/codex/storageproofs/teststpstore.nim b/tests/codex/storageproofs/teststpstore.nim index bca3739a..49f6ac31 100644 --- a/tests/codex/storageproofs/teststpstore.nim +++ b/tests/codex/storageproofs/teststpstore.nim @@ -18,7 +18,6 @@ const suite "Test PoR store": let - (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random var @@ -58,7 +57,7 @@ suite "Test PoR store": tags = blocks.mapIt( Tag(idx: it, tag: porMsg.authenticators[it]) ) - repoDir = path.parentDir / "stp" + repoDir = getAppDir() / "stp" createDir(repoDir) stpstore = st.StpStore.init(repoDir) diff --git a/tests/codex/stores/testcachestore.nim b/tests/codex/stores/testcachestore.nim index 000ebf58..c1fd2d03 100644 --- a/tests/codex/stores/testcachestore.nim +++ b/tests/codex/stores/testcachestore.nim @@ -11,7 +11,7 @@ import pkg/codex/chunker import ../helpers -suite "Cache Store tests": +suite "Cache Store": var newBlock, newBlock1, newBlock2, newBlock3: Block store: CacheStore diff --git a/tests/codex/stores/testfsstore.nim b/tests/codex/stores/testfsstore.nim index 2ec7b165..a13cbf8b 100644 --- a/tests/codex/stores/testfsstore.nim +++ b/tests/codex/stores/testfsstore.nim @@ -17,16 +17,13 @@ import pkg/codex/blocktype as bt import ../helpers suite "FS Store": - let - (path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name - var store: FSStore repoDir: string newBlock = bt.Block.new("New Block".toBytes()).tryGet() setup: - repoDir = path.parentDir / "repo" + repoDir = getAppDir() / "repo" createDir(repoDir) store = FSStore.new(repoDir) diff --git a/tests/codex/stores/testsqlitestore.nim b/tests/codex/stores/testsqlitestore.nim new file mode 100644 index 00000000..53c6429e --- /dev/null +++ b/tests/codex/stores/testsqlitestore.nim @@ -0,0 +1,225 @@ +import std/oids +import std/options +import std/os +import std/random +import std/sequtils +import std/sets + +import pkg/asynctest +import pkg/chronos +import pkg/stew/byteutils + +import pkg/codex/chunker +import pkg/codex/blocktype as bt +import pkg/codex/stores + +import ../helpers + +suite "SQLite Store": + randomize() + + var + store: SQLiteStore + + let + repoDir = getAppDir() / "repo" + + proc randomBlock(): bt.Block = + let + blockRes = bt.Block.new(($genOid()).toBytes) + + require(blockRes.isOk) + blockRes.get + + var + newBlock: bt.Block + + setup: + removeDir(repoDir) + require(not dirExists(repoDir)) + store = SQLiteStore.new(repoDir) + newBlock = randomBlock() + + teardown: + if not store.isNil: await store.close + store = nil + removeDir(repoDir) + require(not dirExists(repoDir)) + + test "putBlock": + let + blkKeyRes = blockKey(newBlock.cid) + + assert blkKeyRes.isOk + + let + blkKey = blkKeyRes.get + + var + # bypass cache + containsRes = await store.datastore.contains(blkKey) + + assert containsRes.isOk + assert not containsRes.get + + let + putRes = await store.putBlock(newBlock) + + check: putRes.isOk + + # bypass cache + containsRes = await store.datastore.contains(blkKey) + + assert containsRes.isOk + + check: containsRes.get + + test "getBlock": + var + r = rand(100) + + # put `r` number of random blocks before putting newBlock + if r > 0: + for _ in 0..r: + let + b = randomBlock() + kRes = blockKey(b.cid) + + assert kRes.isOk + + let + # bypass cache + pRes = await store.datastore.put(kRes.get, b.data) + + assert pRes.isOk + + let + blkKeyRes = blockKey(newBlock.cid) + + assert blkKeyRes.isOk + + var + # bypass cache + putRes = await store.datastore.put(blkKeyRes.get, newBlock.data) + + assert putRes.isOk + + r = rand(100) + + # put `r` number of random blocks after putting newBlock + if r > 0: + for _ in 0..r: + let + b = randomBlock() + kRes = blockKey(b.cid) + + assert kRes.isOk + + let + # bypass cache + pRes = await store.datastore.put(kRes.get, b.data) + + assert pRes.isOk + + var + # get from database + getRes = await store.getBlock(newBlock.cid) + + check: getRes.isOk + + var + blkOpt = getRes.get + + check: + blkOpt.isSome + blkOpt.get == newBlock + + # get from cache + getRes = await store.getBlock(newBlock.cid) + + check: getRes.isOk + + blkOpt = getRes.get + + check: + blkOpt.isSome + blkOpt.get == newBlock + + test "fail getBlock": + let + getRes = await store.getBlock(newBlock.cid) + + assert getRes.isOk + + let + blkOpt = getRes.get + + check: blkOpt.isNone + + + test "hasBlock": + let + putRes = await store.putBlock(newBlock) + + assert putRes.isOk + + let + hasRes = await store.hasBlock(newBlock.cid) + + check: + hasRes.isOk + hasRes.get + await newBlock.cid in store + + test "fail hasBlock": + let + hasRes = await store.hasBlock(newBlock.cid) + + check: + hasRes.isOk + not hasRes.get + not (await newBlock.cid in store) + + test "listBlocks": + var + newBlocks: seq[bt.Block] + + for _ in 0..99: + let + b = randomBlock() + pRes = await store.putBlock(b) + + assert pRes.isOk + + newBlocks.add(b) + + var + called = 0 + cids = toHashSet(newBlocks.mapIt(it.cid)) + + let + onBlock = proc(cid: Cid) {.async, gcsafe.} = + check: cid in cids + if cid in cids: + inc called + cids.excl(cid) + + listRes = await store.listBlocks(onBlock) + + check: + listRes.isOk + called == newBlocks.len + + test "delBlock": + let + putRes = await store.putBlock(newBlock) + + assert putRes.isOk + assert (await newBlock.cid in store) + + let + delRes = await store.delBlock(newBlock.cid) + + check: + delRes.isOk + not (await newBlock.cid in store) diff --git a/tests/codex/teststores.nim b/tests/codex/teststores.nim index af77a058..6c2cfd33 100644 --- a/tests/codex/teststores.nim +++ b/tests/codex/teststores.nim @@ -1,4 +1,5 @@ -import ./stores/testfsstore import ./stores/testcachestore +import ./stores/testfsstore +import ./stores/testsqlitestore {.warning[UnusedImport]: off.}