[stores] update getBlock return type
change return type for `method getBlock` from `Future[?!(?Block)]` to `Future[?!Block]` use `type BlockNotFoundError = object of CodexError` to differentiate between "block not found in the store" and other errors also make some logic and error handling/messages more consistent across BlockStore implementations closes #177 closes #182 closes #210 alternative to #205, #209
This commit is contained in:
parent
e0e2d7b583
commit
3d823dcbc6
|
@ -379,8 +379,8 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} =
|
|||
|
||||
# Extract succesfully received blocks
|
||||
let blocks = blockFuts
|
||||
.filterIt(it.completed and it.read.isOk and it.read.get.isSome)
|
||||
.mapIt(it.read.get.get)
|
||||
.filterIt(it.completed and it.read.isOk)
|
||||
.mapIt(it.read.get)
|
||||
|
||||
if blocks.len > 0:
|
||||
trace "Sending blocks to peer", peer = task.id, blocks = blocks.len
|
||||
|
|
|
@ -29,6 +29,8 @@ type
|
|||
cid*: Cid
|
||||
data*: seq[byte]
|
||||
|
||||
BlockNotFoundError* = object of CodexError
|
||||
|
||||
template EmptyCid*: untyped =
|
||||
var
|
||||
emptyCid {.global, threadvar.}:
|
||||
|
|
|
@ -114,13 +114,9 @@ proc encode*(
|
|||
for j in 0..<blocks:
|
||||
let idx = blockIdx[j]
|
||||
if idx < manifest.len:
|
||||
without blkOrNone =? await dataBlocks[j], error:
|
||||
without blk =? (await dataBlocks[j]), error:
|
||||
trace "Unable to retrieve block", error = error.msg
|
||||
return error.failure
|
||||
|
||||
without blk =? blkOrNone:
|
||||
trace "Block not found", cid = encoded[idx]
|
||||
return failure("Block not found")
|
||||
return failure error
|
||||
|
||||
trace "Encoding block", cid = blk.cid, pos = idx
|
||||
shallowCopy(data[j], blk.data)
|
||||
|
@ -211,12 +207,8 @@ proc decode*(
|
|||
|
||||
idxPendingBlocks.del(idxPendingBlocks.find(done))
|
||||
|
||||
without blkOrNone =? (await done), error:
|
||||
trace "Failed retrieving block", exc = error.msg
|
||||
return error.failure
|
||||
|
||||
without blk =? blkOrNone:
|
||||
trace "Block not found"
|
||||
without blk =? (await done), error:
|
||||
trace "Failed retrieving block", error = error.msg
|
||||
continue
|
||||
|
||||
if idx >= encoded.K:
|
||||
|
|
|
@ -73,12 +73,9 @@ proc fetchManifest*(
|
|||
return failure "CID has invalid content type for manifest"
|
||||
|
||||
trace "Received retrieval request", cid
|
||||
without blkOrNone =? await node.blockStore.getBlock(cid), error:
|
||||
return failure(error)
|
||||
|
||||
without blk =? blkOrNone:
|
||||
trace "Block not found", cid
|
||||
return failure("Block not found")
|
||||
without blk =? await node.blockStore.getBlock(cid), error:
|
||||
return failure error
|
||||
|
||||
without manifest =? Manifest.decode(blk):
|
||||
return failure(
|
||||
|
@ -107,7 +104,7 @@ proc fetchBatched*(
|
|||
|
||||
await allFuturesThrowing(allFinished(blocks))
|
||||
if not onBatch.isNil:
|
||||
await onBatch(blocks.mapIt( it.read.get.get ))
|
||||
await onBatch(blocks.mapIt( it.read.get ))
|
||||
except CancelledError as exc:
|
||||
raise exc
|
||||
except CatchableError as exc:
|
||||
|
|
|
@ -24,7 +24,7 @@ type
|
|||
OnBlock* = proc(cid: Cid): Future[void] {.upraises: [], gcsafe.}
|
||||
BlockStore* = ref object of RootObj
|
||||
|
||||
method getBlock*(self: BlockStore, cid: Cid): Future[?! (? Block)] {.base.} =
|
||||
method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} =
|
||||
## Get a block from the blockstore
|
||||
##
|
||||
|
||||
|
|
|
@ -43,24 +43,24 @@ const
|
|||
DefaultCacheSizeMiB* = 100
|
||||
DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes
|
||||
|
||||
method getBlock*(self: CacheStore, cid: Cid): Future[?! (? Block)] {.async.} =
|
||||
method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
## Get a block from the stores
|
||||
##
|
||||
|
||||
trace "Getting block from cache", cid
|
||||
|
||||
if cid.isEmpty:
|
||||
trace "Empty block, ignoring"
|
||||
return cid.emptyBlock.some.success
|
||||
return success cid.emptyBlock
|
||||
|
||||
if cid notin self.cache:
|
||||
return Block.none.success
|
||||
return failure (ref BlockNotFoundError)(msg: "Block not in cache")
|
||||
|
||||
try:
|
||||
let blk = self.cache[cid]
|
||||
return blk.some.success
|
||||
return success self.cache[cid]
|
||||
except CatchableError as exc:
|
||||
trace "Exception requesting block", cid, exc = exc.msg
|
||||
return failure(exc)
|
||||
trace "Error requesting block from cache", cid, error = exc.msg
|
||||
return failure exc
|
||||
|
||||
method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
## Check if the block exists in the blockstore
|
||||
|
|
|
@ -37,47 +37,70 @@ type
|
|||
template blockPath*(self: FSStore, cid: Cid): string =
|
||||
self.repoDir / ($cid)[^self.postfixLen..^1] / $cid
|
||||
|
||||
method getBlock*(self: FSStore, cid: Cid): Future[?! (? Block)] {.async.} =
|
||||
## Get a block from the stores
|
||||
method getBlock*(self: FSStore, cid: Cid): Future[?!Block] {.async.} =
|
||||
## Get a block from the cache or filestore.
|
||||
## Save a copy to the cache if present in the filestore but not in the cache
|
||||
##
|
||||
|
||||
if not self.cache.isNil:
|
||||
trace "Getting block from cache or filestore", cid
|
||||
else:
|
||||
trace "Getting block from filestore", cid
|
||||
|
||||
if cid.isEmpty:
|
||||
trace "Empty block, ignoring"
|
||||
return cid.emptyBlock.some.success
|
||||
return success cid.emptyBlock
|
||||
|
||||
let cachedBlock = await self.cache.getBlock(cid)
|
||||
if cachedBlock.isErr:
|
||||
return cachedBlock
|
||||
if cachedBlock.get.isSome:
|
||||
trace "Retrieved block from cache", cid
|
||||
return cachedBlock
|
||||
if not self.cache.isNil:
|
||||
let
|
||||
cachedBlockRes = await self.cache.getBlock(cid)
|
||||
|
||||
if not cachedBlockRes.isErr:
|
||||
return success cachedBlockRes.get
|
||||
else:
|
||||
trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg
|
||||
|
||||
# Read file contents
|
||||
var data: seq[byte]
|
||||
var
|
||||
data: seq[byte]
|
||||
|
||||
let
|
||||
path = self.blockPath(cid)
|
||||
res = io2.readFile(path, data)
|
||||
|
||||
if res.isErr:
|
||||
if not isFile(path): # May be, check instead that "res.error == ERROR_FILE_NOT_FOUND" ?
|
||||
return Block.none.success
|
||||
return failure (ref BlockNotFoundError)(msg: "Block not in filestore")
|
||||
else:
|
||||
let error = io2.ioErrorMsg(res.error)
|
||||
trace "Cannot read file from filestore", path, error
|
||||
return failure("Cannot read file from filestore")
|
||||
let
|
||||
error = io2.ioErrorMsg(res.error)
|
||||
|
||||
without var blk =? Block.new(cid, data), error:
|
||||
return error.failure
|
||||
trace "Error requesting block from filestore", path, error
|
||||
return failure "Error requesting block from filestore: " & error
|
||||
|
||||
# TODO: add block to the cache
|
||||
return blk.some.success
|
||||
without blk =? Block.new(cid, data), error:
|
||||
trace "Unable to construct block from data", cid, error = error.msg
|
||||
return failure error
|
||||
|
||||
if not self.cache.isNil:
|
||||
let
|
||||
putCachedRes = await self.cache.putBlock(blk)
|
||||
|
||||
if putCachedRes.isErr:
|
||||
trace "Unable to store block in cache", cid, error = putCachedRes.error.msg
|
||||
|
||||
return success blk
|
||||
|
||||
method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} =
|
||||
## Write block contents to file with name based on blk.cid,
|
||||
## save second copy to the cache
|
||||
## Write a block's contents to a file with name based on blk.cid.
|
||||
## Save a copy to the cache
|
||||
##
|
||||
|
||||
if not self.cache.isNil:
|
||||
trace "Putting block into filestore and cache", cid = blk.cid
|
||||
else:
|
||||
trace "Putting block into filestore", cid = blk.cid
|
||||
|
||||
if blk.isEmpty:
|
||||
trace "Empty block, ignoring"
|
||||
return success()
|
||||
|
@ -98,20 +121,35 @@ method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} =
|
|||
trace "Unable to store block", path, cid = blk.cid, error
|
||||
return failure("Unable to store block")
|
||||
|
||||
if isErr (await self.cache.putBlock(blk)):
|
||||
trace "Unable to store block in cache", cid = blk.cid
|
||||
if not self.cache.isNil:
|
||||
let
|
||||
putCachedRes = await self.cache.putBlock(blk)
|
||||
|
||||
if putCachedRes.isErr:
|
||||
trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg
|
||||
|
||||
return success()
|
||||
|
||||
method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} =
|
||||
## Delete a block from the blockstore
|
||||
## Delete a block from the cache and filestore
|
||||
##
|
||||
|
||||
if not self.cache.isNil:
|
||||
trace "Deleting block from cache and filestore", cid
|
||||
else:
|
||||
trace "Deleting block from filestore", cid
|
||||
|
||||
if cid.isEmpty:
|
||||
trace "Empty block, ignoring"
|
||||
return success()
|
||||
|
||||
if not self.cache.isNil:
|
||||
let
|
||||
delCachedRes = await self.cache.delBlock(cid)
|
||||
|
||||
if delCachedRes.isErr:
|
||||
trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg
|
||||
|
||||
let
|
||||
path = self.blockPath(cid)
|
||||
res = io2.removeFile(path)
|
||||
|
@ -121,10 +159,10 @@ method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} =
|
|||
trace "Unable to delete block", path, cid, error
|
||||
return error.failure
|
||||
|
||||
return await self.cache.delBlock(cid)
|
||||
return success()
|
||||
|
||||
method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} =
|
||||
## Check if the block exists in the blockstore
|
||||
## Check if a block exists in the filestore
|
||||
##
|
||||
|
||||
trace "Checking filestore for block existence", cid
|
||||
|
@ -135,7 +173,8 @@ method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} =
|
|||
return self.blockPath(cid).isFile().success
|
||||
|
||||
method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} =
|
||||
## Get the list of blocks in the BlockStore. This is an intensive operation
|
||||
## Process list of all blocks in the filestore via callback.
|
||||
## This is an intensive operation
|
||||
##
|
||||
|
||||
trace "Listing all blocks in filestore"
|
||||
|
|
|
@ -31,28 +31,20 @@ type
|
|||
engine*: BlockExcEngine # blockexc decision engine
|
||||
localStore*: BlockStore # local block store
|
||||
|
||||
method getBlock*(self: NetworkStore, cid: Cid): Future[?! (? bt.Block)] {.async.} =
|
||||
method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} =
|
||||
## Get a block from a remote peer
|
||||
##
|
||||
|
||||
trace "Getting block from network store", cid
|
||||
trace "Getting block from local store or network", cid
|
||||
|
||||
let blk = await self.localStore.getBlock(cid)
|
||||
if blk.isErr:
|
||||
return blk
|
||||
if blk.get.isSome:
|
||||
trace "Retrieved block from local store", cid
|
||||
return blk
|
||||
|
||||
trace "Block not found in local store", cid
|
||||
try:
|
||||
without blk =? await self.localStore.getBlock(cid), error:
|
||||
if not (error of BlockNotFoundError): return failure error
|
||||
trace "Block not in local store", cid
|
||||
# TODO: What if block isn't available in the engine too?
|
||||
let blk = await self.engine.requestBlock(cid)
|
||||
# TODO: add block to the local store
|
||||
return blk.some.success
|
||||
except CatchableError as exc:
|
||||
trace "Exception requesting block", cid, exc = exc.msg
|
||||
return failure(exc)
|
||||
# TODO: add retrieved block to the local store
|
||||
return (await self.engine.requestBlock(cid)).catch
|
||||
|
||||
return success blk
|
||||
|
||||
method putBlock*(self: NetworkStore, blk: bt.Block): Future[?!void] {.async.} =
|
||||
## Store block locally and notify the network
|
||||
|
|
|
@ -11,8 +11,6 @@ import pkg/upraises
|
|||
|
||||
push: {.upraises: [].}
|
||||
|
||||
import std/options
|
||||
|
||||
import pkg/chronos
|
||||
import pkg/chronicles
|
||||
import pkg/datastore/sqlite_datastore
|
||||
|
@ -72,7 +70,7 @@ proc blockKey*(blockCid: Cid): ?!Key =
|
|||
|
||||
method getBlock*(
|
||||
self: SQLiteStore,
|
||||
cid: Cid): Future[?!(?Block)] {.async.} =
|
||||
cid: Cid): Future[?!Block] {.async.} =
|
||||
## Get a block from the cache or database.
|
||||
## Save a copy to the cache if present in the database but not in the cache
|
||||
##
|
||||
|
@ -84,24 +82,26 @@ method getBlock*(
|
|||
|
||||
if cid.isEmpty:
|
||||
trace "Empty block, ignoring"
|
||||
return success cid.emptyBlock.some
|
||||
return success cid.emptyBlock
|
||||
|
||||
if not self.cache.isNil:
|
||||
without cachedBlkOpt =? await self.cache.getBlock(cid), error:
|
||||
trace "Unable to read block from cache", cid, error = error.msg
|
||||
let
|
||||
cachedBlockRes = await self.cache.getBlock(cid)
|
||||
|
||||
if cachedBlkOpt.isSome:
|
||||
return success cachedBlkOpt
|
||||
if cachedBlockRes.isOk:
|
||||
return success cachedBlockRes.get
|
||||
else:
|
||||
trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg
|
||||
|
||||
without blkKey =? blockKey(cid), error:
|
||||
return failure error
|
||||
|
||||
without dataOpt =? await self.datastore.get(blkKey), error:
|
||||
trace "Unable to read block from database", key = blkKey.id, error = error.msg
|
||||
trace "Error requesting block from database", key = blkKey.id, error = error.msg
|
||||
return failure error
|
||||
|
||||
without data =? dataOpt:
|
||||
return success Block.none
|
||||
return failure (ref BlockNotFoundError)(msg: "Block not in database")
|
||||
|
||||
without blk =? Block.new(cid, data), error:
|
||||
trace "Unable to construct block from data", cid, error = error.msg
|
||||
|
@ -114,7 +114,7 @@ method getBlock*(
|
|||
if putCachedRes.isErr:
|
||||
trace "Unable to store block in cache", cid, error = putCachedRes.error.msg
|
||||
|
||||
return success blk.some
|
||||
return success blk
|
||||
|
||||
method putBlock*(
|
||||
self: SQLiteStore,
|
||||
|
@ -154,7 +154,7 @@ method putBlock*(
|
|||
method delBlock*(
|
||||
self: SQLiteStore,
|
||||
cid: Cid): Future[?!void] {.async.} =
|
||||
## Delete a block from the database and cache
|
||||
## Delete a block from the cache and database
|
||||
##
|
||||
|
||||
if not self.cache.isNil:
|
||||
|
|
|
@ -81,10 +81,9 @@ method readOnce*(
|
|||
readBytes = min(nbytes - read, self.manifest.blockSize - blockOffset)
|
||||
|
||||
# Read contents of block `blockNum`
|
||||
without blkOrNone =? await self.store.getBlock(self.manifest[blockNum]), error:
|
||||
without blk =? await self.store.getBlock(self.manifest[blockNum]), error:
|
||||
raise newLPStreamReadError(error)
|
||||
without blk =? blkOrNone:
|
||||
raise newLPStreamReadError("Block not found")
|
||||
|
||||
trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset
|
||||
|
||||
# Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf
|
||||
|
|
|
@ -126,7 +126,7 @@ suite "NetworkStore engine - 2 nodes":
|
|||
test "Should get blocks from remote":
|
||||
let blocks = await allFinished(
|
||||
blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) ))
|
||||
check blocks.mapIt( it.read().tryGet().get() ) == blocks2
|
||||
check blocks.mapIt( it.read().tryGet() ) == blocks2
|
||||
|
||||
test "Remote should send blocks when available":
|
||||
let blk = bt.Block.new("Block 1".toBytes).tryGet()
|
||||
|
|
|
@ -47,7 +47,7 @@ proc corruptBlocks*(
|
|||
|
||||
pos.add(i)
|
||||
var
|
||||
blk = (await store.getBlock(manifest[i])).tryGet().get()
|
||||
blk = (await store.getBlock(manifest[i])).tryGet()
|
||||
bytePos: seq[int]
|
||||
|
||||
while true:
|
||||
|
|
|
@ -72,11 +72,13 @@ suite "Cache Store":
|
|||
store = CacheStore.new(@[newBlock])
|
||||
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
check blk.tryGet().get() == newBlock
|
||||
check blk.tryGet() == newBlock
|
||||
|
||||
test "fail getBlock":
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
check blk.tryGet().isNone()
|
||||
check:
|
||||
blk.isErr
|
||||
blk.error of BlockNotFoundError
|
||||
|
||||
test "hasBlock":
|
||||
let store = CacheStore.new(@[newBlock])
|
||||
|
|
|
@ -16,7 +16,8 @@ import pkg/codex/blocktype as bt
|
|||
|
||||
import ../helpers
|
||||
|
||||
suite "FS Store":
|
||||
proc runSuite(cache: bool) =
|
||||
suite "FS Store " & (if cache: "(cache enabled)" else: "(cache disabled)"):
|
||||
var
|
||||
store: FSStore
|
||||
repoDir: string
|
||||
|
@ -25,7 +26,11 @@ suite "FS Store":
|
|||
setup:
|
||||
repoDir = getAppDir() / "repo"
|
||||
createDir(repoDir)
|
||||
|
||||
if cache:
|
||||
store = FSStore.new(repoDir)
|
||||
else:
|
||||
store = FSStore.new(repoDir, postfixLen = 2, cache = nil)
|
||||
|
||||
teardown:
|
||||
removeDir(repoDir)
|
||||
|
@ -41,11 +46,13 @@ suite "FS Store":
|
|||
createDir(store.blockPath(newBlock.cid).parentDir)
|
||||
writeFile(store.blockPath(newBlock.cid), newBlock.data)
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
check blk.tryGet().get() == newBlock
|
||||
check blk.tryGet() == newBlock
|
||||
|
||||
test "fail getBlock":
|
||||
let blk = await store.getBlock(newBlock.cid)
|
||||
check blk.tryGet().isNone
|
||||
check:
|
||||
blk.isErr
|
||||
blk.error of BlockNotFoundError
|
||||
|
||||
test "hasBlock":
|
||||
createDir(store.blockPath(newBlock.cid).parentDir)
|
||||
|
@ -74,4 +81,8 @@ suite "FS Store":
|
|||
writeFile(store.blockPath(newBlock.cid), newBlock.data)
|
||||
|
||||
(await store.delBlock(newBlock.cid)).tryGet()
|
||||
|
||||
check not fileExists(store.blockPath(newBlock.cid))
|
||||
|
||||
runSuite(cache = true)
|
||||
runSuite(cache = false)
|
||||
|
|
|
@ -43,7 +43,7 @@ proc runSuite(cache: bool) =
|
|||
if cache:
|
||||
store = SQLiteStore.new(repoDir)
|
||||
else:
|
||||
store = SQLiteStore.new(repoDir, nil)
|
||||
store = SQLiteStore.new(repoDir, cache = nil)
|
||||
|
||||
newBlock = randomBlock()
|
||||
|
||||
|
@ -132,37 +132,24 @@ proc runSuite(cache: bool) =
|
|||
# get from database
|
||||
getRes = await store.getBlock(newBlock.cid)
|
||||
|
||||
check: getRes.isOk
|
||||
|
||||
var
|
||||
blkOpt = getRes.get
|
||||
|
||||
check:
|
||||
blkOpt.isSome
|
||||
blkOpt.get == newBlock
|
||||
getRes.isOk
|
||||
getRes.get == newBlock
|
||||
|
||||
# get from enabled cache
|
||||
getRes = await store.getBlock(newBlock.cid)
|
||||
|
||||
check: getRes.isOk
|
||||
|
||||
blkOpt = getRes.get
|
||||
|
||||
check:
|
||||
blkOpt.isSome
|
||||
blkOpt.get == newBlock
|
||||
getRes.isOk
|
||||
getRes.get == newBlock
|
||||
|
||||
test "fail getBlock":
|
||||
let
|
||||
getRes = await store.getBlock(newBlock.cid)
|
||||
|
||||
assert getRes.isOk
|
||||
|
||||
let
|
||||
blkOpt = getRes.get
|
||||
|
||||
check: blkOpt.isNone
|
||||
blkRes = await store.getBlock(newBlock.cid)
|
||||
|
||||
check:
|
||||
blkRes.isErr
|
||||
blkRes.error of BlockNotFoundError
|
||||
|
||||
test "hasBlock":
|
||||
let
|
||||
|
|
|
@ -158,7 +158,7 @@ suite "Test Node":
|
|||
(await localStore.hasBlock(manifestCid)).tryGet()
|
||||
|
||||
var
|
||||
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet().get()
|
||||
manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()
|
||||
localManifest = Manifest.decode(manifestBlock).tryGet()
|
||||
|
||||
check:
|
||||
|
|
Loading…
Reference in New Issue