[node] add SQLiteStore backend

Closes #138
This commit is contained in:
Michael Bradley, Jr 2022-07-22 18:38:49 -05:00 committed by Michael Bradley
parent 92d83bb1d2
commit 75666d01bf
14 changed files with 472 additions and 23 deletions

View File

@ -118,7 +118,7 @@ proc new*(T: type CodexServer, config: CodexConf): T =
wallet = WalletRef.new(EthPrivateKey.random()) wallet = WalletRef.new(EthPrivateKey.random())
network = BlockExcNetwork.new(switch) network = BlockExcNetwork.new(switch)
localStore = FSStore.new(config.dataDir / "repo", cache = cache) localStore = SQLiteStore.new(config.dataDir / "repo", cache = cache)
peerStore = PeerCtxStore.new() peerStore = PeerCtxStore.new()
pendingBlocks = PendingBlocksManager.new() pendingBlocks = PendingBlocksManager.new()
discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks) discovery = DiscoveryEngine.new(localStore, peerStore, network, blockDiscovery, pendingBlocks)

View File

@ -365,3 +365,6 @@ proc stop*(node: CodexNodeRef) {.async.} =
if contracts =? node.contracts: if contracts =? node.contracts:
await contracts.stop() await contracts.stop()
if not node.blockStore.isNil:
await node.blockStore.close

View File

@ -2,6 +2,7 @@ import ./stores/[
cachestore, cachestore,
blockstore, blockstore,
networkstore, networkstore,
fsstore] fsstore,
sqlitestore]
export cachestore, blockstore, networkstore, fsstore export cachestore, blockstore, networkstore, fsstore, sqlitestore

View File

@ -54,6 +54,13 @@ method listBlocks*(self: BlockStore, onBlock: OnBlock): Future[?!void] {.base.}
raiseAssert("Not implemented!") raiseAssert("Not implemented!")
method close*(self: Blockstore): Future[void] {.base.} =
## Close the blockstore, cleaning up resources managed by it.
## For some implementations this may be a no-op
##
raiseAssert("Not implemented!")
proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} = proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} =
## Check if the block exists in the blockstore. ## Check if the block exists in the blockstore.
## Return false if error encountered ## Return false if error encountered

View File

@ -12,7 +12,6 @@ import pkg/upraises
push: {.upraises: [].} push: {.upraises: [].}
import std/options import std/options
import pkg/chronicles import pkg/chronicles
@ -132,6 +131,12 @@ method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} =
return success() return success()
method close*(self: CacheStore): Future[void] {.async.} =
## Close the blockstore, a no-op for this implementation
##
discard
func new*( func new*(
_: type CacheStore, _: type CacheStore,
blocks: openArray[Block] = [], blocks: openArray[Block] = [],

View File

@ -146,19 +146,16 @@ method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} =
for (fkind, filename) in folderPath.walkDir(relative = true): for (fkind, filename) in folderPath.walkDir(relative = true):
if fkind != pcFile: continue if fkind != pcFile: continue
let cid = Cid.init(filename) let cid = Cid.init(filename)
if cid.isOk: if cid.isOk: await onBlock(cid.get())
# getting a weird `Error: unhandled exception: index 1 not in 0 .. 0 [IndexError]`
# compilation error if using different syntax/construct bellow
try:
await onBlock(cid.get())
except CancelledError as exc:
trace "Cancelling list blocks"
raise exc
except CatchableError as exc:
trace "Couldn't get block", cid = $(cid.get())
return success() return success()
method close*(self: FSStore): Future[void] {.async.} =
## Close the underlying cache
##
if not self.cache.isNil: await self.cache.close
proc new*( proc new*(
T: type FSStore, T: type FSStore,
repoDir: string, repoDir: string,

View File

@ -83,6 +83,12 @@ method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} =
trace "Checking network store for block existence", cid trace "Checking network store for block existence", cid
return await self.localStore.hasBlock(cid) return await self.localStore.hasBlock(cid)
method close*(self: NetworkStore): Future[void] {.async.} =
## Close the underlying local blockstore
##
if not self.localStore.isNil: await self.localStore.close
proc new*( proc new*(
T: type NetworkStore, T: type NetworkStore,
engine: BlockExcEngine, engine: BlockExcEngine,

View File

@ -0,0 +1,209 @@
## Nim-Codex
## Copyright (c) 2021 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import pkg/upraises
push: {.upraises: [].}
import std/options
import pkg/chronos
import pkg/chronicles
import pkg/datastore/sqlite_datastore
import pkg/libp2p
import pkg/questionable
import pkg/questionable/results
import ./blockstore
import ./cachestore
export blockstore, sqlite_datastore
logScope:
topics = "codex sqlitestore"
type
SQLiteStore* = ref object of BlockStore
cache: BlockStore
datastore: SQLiteDatastore
const
allBlocks = when (let keyRes = Key.init("*"); true):
if keyRes.isOk: Query.init(keyRes.get)
else: raise (ref Defect)(msg: keyRes.error.msg)
proc new*(
T: type SQLiteStore,
repoDir: string,
cache: BlockStore = CacheStore.new()): T =
let
datastoreRes = SQLiteDatastore.new(repoDir)
if datastoreRes.isErr:
raise (ref Defect)(msg: datastoreRes.error.msg)
T(cache: cache, datastore: datastoreRes.get)
proc datastore*(self: SQLiteStore): SQLiteDatastore =
self.datastore
proc blockKey*(blockCid: Cid): ?!Key =
let
keyRes = Key.init($blockCid)
if keyRes.isErr:
trace "Unable to construct CID from key", cid = blockCid, error = keyRes.error.msg
keyRes
method getBlock*(
self: SQLiteStore,
cid: Cid): Future[?!(?Block)] {.async.} =
## Get a block from the cache or database.
## Save a copy to the cache if present in the database but not in the cache
##
trace "Getting block from cache or database", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success cid.emptyBlock.some
without cachedBlkOpt =? await self.cache.getBlock(cid), error:
trace "Unable to read block from cache", cid, error = error.msg
if cachedBlkOpt.isSome:
return success cachedBlkOpt
without blkKey =? blockKey(cid), error:
return failure error
without dataOpt =? await self.datastore.get(blkKey), error:
trace "Unable to read block from database", key = blkKey.id, error = error.msg
return failure error
without data =? dataOpt:
return success Block.none
without blk =? Block.new(cid, data), error:
trace "Unable to construct block from data", cid, error = error.msg
return failure error
let
putCachedRes = await self.cache.putBlock(blk)
if putCachedRes.isErr:
trace "Unable to store block in cache", cid, error = putCachedRes.error.msg
return success blk.some
method putBlock*(
self: SQLiteStore,
blk: Block): Future[?!void] {.async.} =
## Write a block's contents to the database with key based on blk.cid.
## Save a copy to the cache
##
trace "Putting block into database and cache", cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
return success()
without blkKey =? blockKey(blk.cid), error:
return failure error
let
putRes = await self.datastore.put(blkKey, blk.data)
if putRes.isErr:
trace "Unable to store block in database", key = blkKey.id, error = putRes.error.msg
return failure putRes.error
let
putCachedRes = await self.cache.putBlock(blk)
if putCachedRes.isErr:
trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg
return success()
method delBlock*(
self: SQLiteStore,
cid: Cid): Future[?!void] {.async.} =
## Delete a block from the database and cache
##
trace "Deleting block from cache and database", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
let
delCachedRes = await self.cache.delBlock(cid)
if delCachedRes.isErr:
trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg
without blkKey =? blockKey(cid), error:
return failure error
let
delRes = await self.datastore.delete(blkKey)
if delRes.isErr:
trace "Unable to delete block from database", key = blkKey.id, error = delRes.error.msg
return failure delRes.error
return success()
method hasBlock*(
self: SQLiteStore,
cid: Cid): Future[?!bool] {.async.} =
## Check if a block exists in the database
##
trace "Checking database for block existence", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return true.success
without blkKey =? blockKey(cid), error:
return failure error
return await self.datastore.contains(blkKey)
method listBlocks*(
self: SQLiteStore,
onBlock: OnBlock): Future[?!void] {.async.} =
## Process list of all blocks in the database via callback.
## This is an intensive operation
##
for kd in self.datastore.query(allBlocks):
let
(key, _) = await kd
cidRes = Cid.init(key.name)
if cidRes.isOk:
await onBlock(cidRes.get)
else:
trace "Unable to construct CID from key", key = key.id, error = $cidRes.error
return success()
method close*(self: SQLiteStore): Future[void] {.async.} =
## Close the underlying cache and SQLite datastore
##
if not self.cache.isNil: await self.cache.close
if not self.datastore.isNil: self.datastore.close

View File

@ -48,7 +48,6 @@ suite "Storage Proofs Network":
store: BlockStore store: BlockStore
ssk: st.SecretKey ssk: st.SecretKey
spk: st.PublicKey spk: st.PublicKey
repoDir: string
stpstore: st.StpStore stpstore: st.StpStore
porMsg: PorMessage porMsg: PorMessage
cid: Cid cid: Cid

View File

@ -18,7 +18,6 @@ const
suite "Test PoR store": suite "Test PoR store":
let let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random blocks = toSeq([1, 5, 10, 14, 20, 12, 22]) # TODO: maybe make them random
var var
@ -58,7 +57,7 @@ suite "Test PoR store":
tags = blocks.mapIt( tags = blocks.mapIt(
Tag(idx: it, tag: porMsg.authenticators[it]) ) Tag(idx: it, tag: porMsg.authenticators[it]) )
repoDir = path.parentDir / "stp" repoDir = getAppDir() / "stp"
createDir(repoDir) createDir(repoDir)
stpstore = st.StpStore.init(repoDir) stpstore = st.StpStore.init(repoDir)

View File

@ -11,7 +11,7 @@ import pkg/codex/chunker
import ../helpers import ../helpers
suite "Cache Store tests": suite "Cache Store":
var var
newBlock, newBlock1, newBlock2, newBlock3: Block newBlock, newBlock1, newBlock2, newBlock3: Block
store: CacheStore store: CacheStore

View File

@ -17,16 +17,13 @@ import pkg/codex/blocktype as bt
import ../helpers import ../helpers
suite "FS Store": suite "FS Store":
let
(path, _, _) = instantiationInfo(-2, fullPaths = true) # get this file's name
var var
store: FSStore store: FSStore
repoDir: string repoDir: string
newBlock = bt.Block.new("New Block".toBytes()).tryGet() newBlock = bt.Block.new("New Block".toBytes()).tryGet()
setup: setup:
repoDir = path.parentDir / "repo" repoDir = getAppDir() / "repo"
createDir(repoDir) createDir(repoDir)
store = FSStore.new(repoDir) store = FSStore.new(repoDir)

View File

@ -0,0 +1,225 @@
import std/oids
import std/options
import std/os
import std/random
import std/sequtils
import std/sets
import pkg/asynctest
import pkg/chronos
import pkg/stew/byteutils
import pkg/codex/chunker
import pkg/codex/blocktype as bt
import pkg/codex/stores
import ../helpers
suite "SQLite Store":
randomize()
var
store: SQLiteStore
let
repoDir = getAppDir() / "repo"
proc randomBlock(): bt.Block =
let
blockRes = bt.Block.new(($genOid()).toBytes)
require(blockRes.isOk)
blockRes.get
var
newBlock: bt.Block
setup:
removeDir(repoDir)
require(not dirExists(repoDir))
store = SQLiteStore.new(repoDir)
newBlock = randomBlock()
teardown:
if not store.isNil: await store.close
store = nil
removeDir(repoDir)
require(not dirExists(repoDir))
test "putBlock":
let
blkKeyRes = blockKey(newBlock.cid)
assert blkKeyRes.isOk
let
blkKey = blkKeyRes.get
var
# bypass cache
containsRes = await store.datastore.contains(blkKey)
assert containsRes.isOk
assert not containsRes.get
let
putRes = await store.putBlock(newBlock)
check: putRes.isOk
# bypass cache
containsRes = await store.datastore.contains(blkKey)
assert containsRes.isOk
check: containsRes.get
test "getBlock":
var
r = rand(100)
# put `r` number of random blocks before putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
assert kRes.isOk
let
# bypass cache
pRes = await store.datastore.put(kRes.get, b.data)
assert pRes.isOk
let
blkKeyRes = blockKey(newBlock.cid)
assert blkKeyRes.isOk
var
# bypass cache
putRes = await store.datastore.put(blkKeyRes.get, newBlock.data)
assert putRes.isOk
r = rand(100)
# put `r` number of random blocks after putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
assert kRes.isOk
let
# bypass cache
pRes = await store.datastore.put(kRes.get, b.data)
assert pRes.isOk
var
# get from database
getRes = await store.getBlock(newBlock.cid)
check: getRes.isOk
var
blkOpt = getRes.get
check:
blkOpt.isSome
blkOpt.get == newBlock
# get from cache
getRes = await store.getBlock(newBlock.cid)
check: getRes.isOk
blkOpt = getRes.get
check:
blkOpt.isSome
blkOpt.get == newBlock
test "fail getBlock":
let
getRes = await store.getBlock(newBlock.cid)
assert getRes.isOk
let
blkOpt = getRes.get
check: blkOpt.isNone
test "hasBlock":
let
putRes = await store.putBlock(newBlock)
assert putRes.isOk
let
hasRes = await store.hasBlock(newBlock.cid)
check:
hasRes.isOk
hasRes.get
await newBlock.cid in store
test "fail hasBlock":
let
hasRes = await store.hasBlock(newBlock.cid)
check:
hasRes.isOk
not hasRes.get
not (await newBlock.cid in store)
test "listBlocks":
var
newBlocks: seq[bt.Block]
for _ in 0..99:
let
b = randomBlock()
pRes = await store.putBlock(b)
assert pRes.isOk
newBlocks.add(b)
var
called = 0
cids = toHashSet(newBlocks.mapIt(it.cid))
let
onBlock = proc(cid: Cid) {.async, gcsafe.} =
check: cid in cids
if cid in cids:
inc called
cids.excl(cid)
listRes = await store.listBlocks(onBlock)
check:
listRes.isOk
called == newBlocks.len
test "delBlock":
let
putRes = await store.putBlock(newBlock)
assert putRes.isOk
assert (await newBlock.cid in store)
let
delRes = await store.delBlock(newBlock.cid)
check:
delRes.isOk
not (await newBlock.cid in store)

View File

@ -1,4 +1,5 @@
import ./stores/testfsstore
import ./stores/testcachestore import ./stores/testcachestore
import ./stores/testfsstore
import ./stores/testsqlitestore
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}