[node] support self.cache=nil in SQLiteStore

also fix a discrepancy where cli option `--cache-size` is documented as
`0 disables the cache` in `codex/conf.nim`, but previously the value `0` would
result in a cache being constructed with default parameter values for
`CacheStore.new()`

Closes #180
This commit is contained in:
Michael Bradley, Jr 2022-08-04 18:51:05 -05:00 committed by Michael Bradley
parent b7df2d151c
commit 2ecf750959
3 changed files with 217 additions and 195 deletions

View File

@ -102,11 +102,11 @@ proc new*(T: type CodexServer, config: CodexConf): T =
.withTcpTransport({ServerFlags.ReuseAddr})
.build()
let cache =
if config.cacheSize > 0:
CacheStore.new(cacheSize = config.cacheSize * MiB)
else:
CacheStore.new()
var
cache: CacheStore
if config.cacheSize > 0:
cache = CacheStore.new(cacheSize = config.cacheSize * MiB)
let
discoveryBootstrapNodes = config.bootstrapNodes

View File

@ -70,17 +70,21 @@ method getBlock*(
## Save a copy to the cache if present in the database but not in the cache
##
trace "Getting block from cache or database", cid
if not self.cache.isNil:
trace "Getting block from cache or database", cid
else:
trace "Getting block from database", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success cid.emptyBlock.some
without cachedBlkOpt =? await self.cache.getBlock(cid), error:
trace "Unable to read block from cache", cid, error = error.msg
if not self.cache.isNil:
without cachedBlkOpt =? await self.cache.getBlock(cid), error:
trace "Unable to read block from cache", cid, error = error.msg
if cachedBlkOpt.isSome:
return success cachedBlkOpt
if cachedBlkOpt.isSome:
return success cachedBlkOpt
without blkKey =? blockKey(cid), error:
return failure error
@ -96,11 +100,12 @@ method getBlock*(
trace "Unable to construct block from data", cid, error = error.msg
return failure error
let
putCachedRes = await self.cache.putBlock(blk)
if not self.cache.isNil:
let
putCachedRes = await self.cache.putBlock(blk)
if putCachedRes.isErr:
trace "Unable to store block in cache", cid, error = putCachedRes.error.msg
if putCachedRes.isErr:
trace "Unable to store block in cache", cid, error = putCachedRes.error.msg
return success blk.some
@ -111,7 +116,10 @@ method putBlock*(
## Save a copy to the cache
##
trace "Putting block into database and cache", cid = blk.cid
if not self.cache.isNil:
trace "Putting block into database and cache", cid = blk.cid
else:
trace "Putting block into database", cid = blk.cid
if blk.isEmpty:
trace "Empty block, ignoring"
@ -127,11 +135,12 @@ method putBlock*(
trace "Unable to store block in database", key = blkKey.id, error = putRes.error.msg
return failure putRes.error
let
putCachedRes = await self.cache.putBlock(blk)
if not self.cache.isNil:
let
putCachedRes = await self.cache.putBlock(blk)
if putCachedRes.isErr:
trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg
if putCachedRes.isErr:
trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg
return success()
@ -141,17 +150,21 @@ method delBlock*(
## Delete a block from the database and cache
##
trace "Deleting block from cache and database", cid
if not self.cache.isNil:
trace "Deleting block from cache and database", cid
else:
trace "Deleting block from database", cid
if cid.isEmpty:
trace "Empty block, ignoring"
return success()
let
delCachedRes = await self.cache.delBlock(cid)
if not self.cache.isNil:
let
delCachedRes = await self.cache.delBlock(cid)
if delCachedRes.isErr:
trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg
if delCachedRes.isErr:
trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg
without blkKey =? blockKey(cid), error:
return failure error

View File

@ -15,211 +15,220 @@ import pkg/codex/stores
import ../helpers
suite "SQLite Store":
randomize()
var
store: SQLiteStore
let
repoDir = getAppDir() / "repo"
proc randomBlock(): bt.Block =
let
blockRes = bt.Block.new(($genOid()).toBytes)
require(blockRes.isOk)
blockRes.get
var
newBlock: bt.Block
setup:
removeDir(repoDir)
require(not dirExists(repoDir))
store = SQLiteStore.new(repoDir)
newBlock = randomBlock()
teardown:
if not store.isNil: await store.close
store = nil
removeDir(repoDir)
require(not dirExists(repoDir))
test "putBlock":
let
blkKeyRes = blockKey(newBlock.cid)
assert blkKeyRes.isOk
let
blkKey = blkKeyRes.get
proc runSuite(cache: bool) =
suite "SQLite Store " & (if cache: "(cache enabled)" else: "(cache disabled)"):
randomize()
var
# bypass cache
store: SQLiteStore
let
repoDir = getAppDir() / "repo"
proc randomBlock(): bt.Block =
let
blockRes = bt.Block.new(($genOid()).toBytes)
require(blockRes.isOk)
blockRes.get
var
newBlock: bt.Block
setup:
removeDir(repoDir)
require(not dirExists(repoDir))
if cache:
store = SQLiteStore.new(repoDir)
else:
store = SQLiteStore.new(repoDir, nil)
newBlock = randomBlock()
teardown:
if not store.isNil: await store.close
store = nil
removeDir(repoDir)
require(not dirExists(repoDir))
test "putBlock":
let
blkKeyRes = blockKey(newBlock.cid)
assert blkKeyRes.isOk
let
blkKey = blkKeyRes.get
var
# bypass enabled cache
containsRes = await store.datastore.contains(blkKey)
assert containsRes.isOk
assert not containsRes.get
let
putRes = await store.putBlock(newBlock)
check: putRes.isOk
# bypass enabled cache
containsRes = await store.datastore.contains(blkKey)
assert containsRes.isOk
assert not containsRes.get
assert containsRes.isOk
let
putRes = await store.putBlock(newBlock)
check: containsRes.get
check: putRes.isOk
test "getBlock":
var
r = rand(100)
# bypass cache
containsRes = await store.datastore.contains(blkKey)
# put `r` number of random blocks before putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
assert containsRes.isOk
assert kRes.isOk
check: containsRes.get
let
# bypass enabled cache
pRes = await store.datastore.put(kRes.get, b.data)
assert pRes.isOk
let
blkKeyRes = blockKey(newBlock.cid)
assert blkKeyRes.isOk
var
# bypass enabled cache
putRes = await store.datastore.put(blkKeyRes.get, newBlock.data)
assert putRes.isOk
test "getBlock":
var
r = rand(100)
# put `r` number of random blocks before putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
# put `r` number of random blocks after putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
assert kRes.isOk
assert kRes.isOk
let
# bypass cache
pRes = await store.datastore.put(kRes.get, b.data)
let
# bypass enabled cache
pRes = await store.datastore.put(kRes.get, b.data)
assert pRes.isOk
assert pRes.isOk
let
blkKeyRes = blockKey(newBlock.cid)
var
# get from database
getRes = await store.getBlock(newBlock.cid)
assert blkKeyRes.isOk
check: getRes.isOk
var
# bypass cache
putRes = await store.datastore.put(blkKeyRes.get, newBlock.data)
var
blkOpt = getRes.get
assert putRes.isOk
check:
blkOpt.isSome
blkOpt.get == newBlock
r = rand(100)
# put `r` number of random blocks after putting newBlock
if r > 0:
for _ in 0..r:
let
b = randomBlock()
kRes = blockKey(b.cid)
assert kRes.isOk
let
# bypass cache
pRes = await store.datastore.put(kRes.get, b.data)
assert pRes.isOk
var
# get from database
# get from enabled cache
getRes = await store.getBlock(newBlock.cid)
check: getRes.isOk
check: getRes.isOk
var
blkOpt = getRes.get
check:
blkOpt.isSome
blkOpt.get == newBlock
check:
blkOpt.isSome
blkOpt.get == newBlock
# get from cache
getRes = await store.getBlock(newBlock.cid)
check: getRes.isOk
blkOpt = getRes.get
check:
blkOpt.isSome
blkOpt.get == newBlock
test "fail getBlock":
let
getRes = await store.getBlock(newBlock.cid)
assert getRes.isOk
let
blkOpt = getRes.get
check: blkOpt.isNone
test "hasBlock":
let
putRes = await store.putBlock(newBlock)
assert putRes.isOk
let
hasRes = await store.hasBlock(newBlock.cid)
check:
hasRes.isOk
hasRes.get
await newBlock.cid in store
test "fail hasBlock":
let
hasRes = await store.hasBlock(newBlock.cid)
check:
hasRes.isOk
not hasRes.get
not (await newBlock.cid in store)
test "listBlocks":
var
newBlocks: seq[bt.Block]
for _ in 0..99:
test "fail getBlock":
let
b = randomBlock()
pRes = await store.putBlock(b)
getRes = await store.getBlock(newBlock.cid)
assert pRes.isOk
assert getRes.isOk
newBlocks.add(b)
let
blkOpt = getRes.get
var
called = 0
cids = toHashSet(newBlocks.mapIt(it.cid))
check: blkOpt.isNone
let
onBlock = proc(cid: Cid) {.async, gcsafe.} =
check: cid in cids
if cid in cids:
inc called
cids.excl(cid)
listRes = await store.listBlocks(onBlock)
test "hasBlock":
let
putRes = await store.putBlock(newBlock)
check:
listRes.isOk
called == newBlocks.len
assert putRes.isOk
test "delBlock":
let
putRes = await store.putBlock(newBlock)
let
hasRes = await store.hasBlock(newBlock.cid)
assert putRes.isOk
assert (await newBlock.cid in store)
check:
hasRes.isOk
hasRes.get
await newBlock.cid in store
let
delRes = await store.delBlock(newBlock.cid)
test "fail hasBlock":
let
hasRes = await store.hasBlock(newBlock.cid)
check:
delRes.isOk
not (await newBlock.cid in store)
check:
hasRes.isOk
not hasRes.get
not (await newBlock.cid in store)
test "listBlocks":
var
newBlocks: seq[bt.Block]
for _ in 0..99:
let
b = randomBlock()
pRes = await store.putBlock(b)
assert pRes.isOk
newBlocks.add(b)
var
called = 0
cids = toHashSet(newBlocks.mapIt(it.cid))
let
onBlock = proc(cid: Cid) {.async, gcsafe.} =
check: cid in cids
if cid in cids:
inc called
cids.excl(cid)
listRes = await store.listBlocks(onBlock)
check:
listRes.isOk
called == newBlocks.len
test "delBlock":
let
putRes = await store.putBlock(newBlock)
assert putRes.isOk
assert (await newBlock.cid in store)
let
delRes = await store.delBlock(newBlock.cid)
check:
delRes.isOk
not (await newBlock.cid in store)
runSuite(cache = true)
runSuite(cache = false)