diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 24396745..e2826ced 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -379,8 +379,8 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # Extract succesfully received blocks let blocks = blockFuts - .filterIt(it.completed and it.read.isOk and it.read.get.isSome) - .mapIt(it.read.get.get) + .filterIt(it.completed and it.read.isOk) + .mapIt(it.read.get) if blocks.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = blocks.len diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 1b41d2b9..e974b6a6 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -29,6 +29,18 @@ type cid*: Cid data*: seq[byte] + BlockErrorKind* = enum + BlockConstructErr, + BlockKeyErr, + BlockNetReqErr, + BlockNotFoundErr, + BlockReadErr + + BlockError* = object of CodexError + kind*: BlockErrorKind + + BlockResult* = Result[Block, ref BlockError] + template EmptyCid*: untyped = var emptyCid {.global, threadvar.}: diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 01401f2b..ea89edc4 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -114,13 +114,15 @@ proc encode*( for j in 0..= encoded.K: trace "Retrieved parity block", cid = blk.cid, idx shallowCopy(parityData[idx - encoded.K], if blk.isEmpty: emptyBlock else: blk.data) diff --git a/codex/node.nim b/codex/node.nim index 39df9c5e..24b8fd88 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -73,14 +73,13 @@ proc fetchManifest*( return failure "CID has invalid content type for manifest" trace "Received retrieval request", cid - without blkOrNone =? await node.blockStore.getBlock(cid), error: - return failure(error) - without blk =? blkOrNone: - trace "Block not found", cid - return failure("Block not found") + let + getRes = await node.blockStore.getBlock(cid) - without manifest =? Manifest.decode(blk): + if getRes.isErr: return failure getRes.error + + without manifest =? Manifest.decode(getRes.get): return failure( newException(CodexError, "Unable to decode as manifest")) @@ -107,7 +106,7 @@ proc fetchBatched*( await allFuturesThrowing(allFinished(blocks)) if not onBatch.isNil: - await onBatch(blocks.mapIt( it.read.get.get )) + await onBatch(blocks.mapIt( it.read.get )) except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 6109ca4e..fa7eaaaa 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -24,7 +24,7 @@ type OnBlock* = proc(cid: Cid): Future[void] {.upraises: [], gcsafe.} BlockStore* = ref object of RootObj -method getBlock*(self: BlockStore, cid: Cid): Future[?! (? Block)] {.base.} = +method getBlock*(self: BlockStore, cid: Cid): Future[BlockResult] {.base.} = ## Get a block from the blockstore ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index c3091b40..bec1ea4e 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -43,24 +43,24 @@ const DefaultCacheSizeMiB* = 100 DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes -method getBlock*(self: CacheStore, cid: Cid): Future[?! (? Block)] {.async.} = +method getBlock*(self: CacheStore, cid: Cid): Future[BlockResult] {.async.} = ## Get a block from the stores ## trace "Getting block from cache", cid + if cid.isEmpty: trace "Empty block, ignoring" - return cid.emptyBlock.some.success + return ok cid.emptyBlock if cid notin self.cache: - return Block.none.success + return failure (ref BlockError)(kind: BlockNotFoundErr, msg: "Block not in cache") try: - let blk = self.cache[cid] - return blk.some.success + return ok self.cache[cid] except CatchableError as exc: - trace "Exception requesting block", cid, exc = exc.msg - return failure(exc) + trace "Error requesting block from cache", cid, error = exc.msg + return failure (ref BlockError)(kind: BlockReadErr, msg: exc.msg) method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore diff --git a/codex/stores/fsstore.nim b/codex/stores/fsstore.nim index bae8d16c..d81390a0 100644 --- a/codex/stores/fsstore.nim +++ b/codex/stores/fsstore.nim @@ -37,47 +37,71 @@ type template blockPath*(self: FSStore, cid: Cid): string = self.repoDir / ($cid)[^self.postfixLen..^1] / $cid -method getBlock*(self: FSStore, cid: Cid): Future[?! (? Block)] {.async.} = - ## Get a block from the stores +method getBlock*(self: FSStore, cid: Cid): Future[BlockResult] {.async.} = + ## Get a block from the cache or filestore. + ## Save a copy to the cache if present in the filestore but not in the cache ## - trace "Getting block from filestore", cid + if not self.cache.isNil: + trace "Getting block from cache or filestore", cid + else: + trace "Getting block from filestore", cid + if cid.isEmpty: trace "Empty block, ignoring" - return cid.emptyBlock.some.success + return ok cid.emptyBlock - let cachedBlock = await self.cache.getBlock(cid) - if cachedBlock.isErr: - return cachedBlock - if cachedBlock.get.isSome: - trace "Retrieved block from cache", cid - return cachedBlock + if not self.cache.isNil: + let + cachedBlockRes = await self.cache.getBlock(cid) + + if not cachedBlockRes.isErr: + return ok cachedBlockRes.get + else: + trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg # Read file contents - var data: seq[byte] + var + data: seq[byte] + let path = self.blockPath(cid) res = io2.readFile(path, data) if res.isErr: if not isFile(path): # May be, check instead that "res.error == ERROR_FILE_NOT_FOUND" ? - return Block.none.success + return failure (ref BlockError)(kind: BlockNotFoundErr, msg: "Block not in filestore") else: - let error = io2.ioErrorMsg(res.error) - trace "Cannot read file from filestore", path, error - return failure("Cannot read file from filestore") + let + error = io2.ioErrorMsg(res.error) - without var blk =? Block.new(cid, data), error: - return error.failure + trace "Error requesting block from filestore", path, error + return failure (ref BlockError)( + kind: BlockReadErr, msg: "Error requesting block from filestore: " & error) - # TODO: add block to the cache - return blk.some.success + without blk =? Block.new(cid, data), error: + trace "Unable to construct block from data", cid, error = error.msg + return failure (ref BlockError)(kind: BlockConstructErr, msg: error.msg) + + if not self.cache.isNil: + let + putCachedRes = await self.cache.putBlock(blk) + + if putCachedRes.isErr: + trace "Unable to store block in cache", cid, error = putCachedRes.error.msg + + return ok blk method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} = - ## Write block contents to file with name based on blk.cid, - ## save second copy to the cache + ## Write a block's contents to a file with name based on blk.cid. + ## Save a copy to the cache ## + if not self.cache.isNil: + trace "Putting block into filestore and cache", cid = blk.cid + else: + trace "Putting block into filestore", cid = blk.cid + if blk.isEmpty: trace "Empty block, ignoring" return success() @@ -98,20 +122,35 @@ method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} = trace "Unable to store block", path, cid = blk.cid, error return failure("Unable to store block") - if isErr (await self.cache.putBlock(blk)): - trace "Unable to store block in cache", cid = blk.cid + if not self.cache.isNil: + let + putCachedRes = await self.cache.putBlock(blk) + + if putCachedRes.isErr: + trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg return success() method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore + ## Delete a block from the cache and filestore ## - trace "Deleting block from filestore", cid + if not self.cache.isNil: + trace "Deleting block from cache and filestore", cid + else: + trace "Deleting block from filestore", cid + if cid.isEmpty: trace "Empty block, ignoring" return success() + if not self.cache.isNil: + let + delCachedRes = await self.cache.delBlock(cid) + + if delCachedRes.isErr: + trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg + let path = self.blockPath(cid) res = io2.removeFile(path) @@ -121,10 +160,10 @@ method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} = trace "Unable to delete block", path, cid, error return error.failure - return await self.cache.delBlock(cid) + return success() method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} = - ## Check if the block exists in the blockstore + ## Check if a block exists in the filestore ## trace "Checking filestore for block existence", cid @@ -135,7 +174,8 @@ method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} = return self.blockPath(cid).isFile().success method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} = - ## Get the list of blocks in the BlockStore. This is an intensive operation + ## Process list of all blocks in the filestore via callback. + ## This is an intensive operation ## trace "Listing all blocks in filestore" diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 0097b285..995a89c3 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -31,28 +31,27 @@ type engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store -method getBlock*(self: NetworkStore, cid: Cid): Future[?! (? bt.Block)] {.async.} = +method getBlock*(self: NetworkStore, cid: Cid): Future[BlockResult] {.async.} = ## Get a block from a remote peer ## - trace "Getting block from network store", cid + trace "Getting block from local store or network", cid - let blk = await self.localStore.getBlock(cid) - if blk.isErr: - return blk - if blk.get.isSome: - trace "Retrieved block from local store", cid - return blk + let + getRes = await self.localStore.getBlock(cid) - trace "Block not found in local store", cid - try: - # TODO: What if block isn't available in the engine too? - let blk = await self.engine.requestBlock(cid) - # TODO: add block to the local store - return blk.some.success - except CatchableError as exc: - trace "Exception requesting block", cid, exc = exc.msg - return failure(exc) + if getRes.isErr: + if getRes.error.kind != BlockNotFoundErr: return getRes + trace "Block not in local store", cid + try: + # TODO: What if block isn't available in the engine too? + return ok await self.engine.requestBlock(cid) + # TODO: add block to the local store + except CatchableError as exc: + trace "Error requesting block from network", cid, error = exc.msg + return failure (ref BlockError)(kind: BlockNetReqErr, msg: exc.msg) + + return getRes method putBlock*(self: NetworkStore, blk: bt.Block): Future[?!void] {.async.} = ## Store block locally and notify the network diff --git a/codex/stores/sqlitestore.nim b/codex/stores/sqlitestore.nim index 03d697bf..dae4dd82 100644 --- a/codex/stores/sqlitestore.nim +++ b/codex/stores/sqlitestore.nim @@ -11,8 +11,6 @@ import pkg/upraises push: {.upraises: [].} -import std/options - import pkg/chronos import pkg/chronicles import pkg/datastore/sqlite_datastore @@ -72,7 +70,7 @@ proc blockKey*(blockCid: Cid): ?!Key = method getBlock*( self: SQLiteStore, - cid: Cid): Future[?!(?Block)] {.async.} = + cid: Cid): Future[BlockResult] {.async.} = ## Get a block from the cache or database. ## Save a copy to the cache if present in the database but not in the cache ## @@ -84,28 +82,30 @@ method getBlock*( if cid.isEmpty: trace "Empty block, ignoring" - return success cid.emptyBlock.some + return ok cid.emptyBlock if not self.cache.isNil: - without cachedBlkOpt =? await self.cache.getBlock(cid), error: - trace "Unable to read block from cache", cid, error = error.msg + let + cachedBlockRes = await self.cache.getBlock(cid) - if cachedBlkOpt.isSome: - return success cachedBlkOpt + if not cachedBlockRes.isErr: + return ok cachedBlockRes.get + else: + trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg without blkKey =? blockKey(cid), error: - return failure error + return failure (ref BlockError)(kind: BlockKeyErr, msg: error.msg) without dataOpt =? await self.datastore.get(blkKey), error: - trace "Unable to read block from database", key = blkKey.id, error = error.msg - return failure error + trace "Error requesting block from database", key = blkKey.id, error = error.msg + return failure (ref BlockError)(kind: BlockReadErr, msg: error.msg) without data =? dataOpt: - return success Block.none + return failure (ref BlockError)(kind: BlockNotFoundErr, msg: "Block not in database") without blk =? Block.new(cid, data), error: trace "Unable to construct block from data", cid, error = error.msg - return failure error + return failure (ref BlockError)(kind: BlockConstructErr, msg: error.msg) if not self.cache.isNil: let @@ -114,7 +114,7 @@ method getBlock*( if putCachedRes.isErr: trace "Unable to store block in cache", cid, error = putCachedRes.error.msg - return success blk.some + return ok blk method putBlock*( self: SQLiteStore, @@ -154,7 +154,7 @@ method putBlock*( method delBlock*( self: SQLiteStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the database and cache + ## Delete a block from the cache and database ## if not self.cache.isNil: diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 54959924..2c10a755 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -81,10 +81,14 @@ method readOnce*( readBytes = min(nbytes - read, self.manifest.blockSize - blockOffset) # Read contents of block `blockNum` - without blkOrNone =? await self.store.getBlock(self.manifest[blockNum]), error: - raise newLPStreamReadError(error) - without blk =? blkOrNone: - raise newLPStreamReadError("Block not found") + let + getRes = await self.store.getBlock(self.manifest[blockNum]) + + if getRes.isErr: raise newLPStreamReadError(getRes.error) + + let + blk = getRes.get + trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 7371bf66..0f91cf63 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -126,7 +126,7 @@ suite "NetworkStore engine - 2 nodes": test "Should get blocks from remote": let blocks = await allFinished( blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) )) - check blocks.mapIt( it.read().tryGet().get() ) == blocks2 + check blocks.mapIt( it.read().tryGet() ) == blocks2 test "Remote should send blocks when available": let blk = bt.Block.new("Block 1".toBytes).tryGet() diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 0b92f8df..475bf7a1 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -47,7 +47,7 @@ proc corruptBlocks*( pos.add(i) var - blk = (await store.getBlock(manifest[i])).tryGet().get() + blk = (await store.getBlock(manifest[i])).tryGet() bytePos: seq[int] while true: diff --git a/tests/codex/stores/testcachestore.nim b/tests/codex/stores/testcachestore.nim index c1fd2d03..13e2c74c 100644 --- a/tests/codex/stores/testcachestore.nim +++ b/tests/codex/stores/testcachestore.nim @@ -72,11 +72,13 @@ suite "Cache Store": store = CacheStore.new(@[newBlock]) let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().get() == newBlock + check blk.tryGet() == newBlock test "fail getBlock": let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().isNone() + check: + blk.isErr + blk.error.kind == BlockNotFoundErr test "hasBlock": let store = CacheStore.new(@[newBlock]) diff --git a/tests/codex/stores/testfsstore.nim b/tests/codex/stores/testfsstore.nim index a13cbf8b..c81d004d 100644 --- a/tests/codex/stores/testfsstore.nim +++ b/tests/codex/stores/testfsstore.nim @@ -16,62 +16,73 @@ import pkg/codex/blocktype as bt import ../helpers -suite "FS Store": - var - store: FSStore - repoDir: string - newBlock = bt.Block.new("New Block".toBytes()).tryGet() +proc runSuite(cache: bool) = + suite "FS Store " & (if cache: "(cache enabled)" else: "(cache disabled)"): + var + store: FSStore + repoDir: string + newBlock = bt.Block.new("New Block".toBytes()).tryGet() - setup: - repoDir = getAppDir() / "repo" - createDir(repoDir) - store = FSStore.new(repoDir) + setup: + repoDir = getAppDir() / "repo" + createDir(repoDir) - teardown: - removeDir(repoDir) + if cache: + store = FSStore.new(repoDir) + else: + store = FSStore.new(repoDir, postfixLen = 2, cache = nil) - test "putBlock": - (await store.putBlock(newBlock)).tryGet() - check: - fileExists(store.blockPath(newBlock.cid)) - (await store.hasBlock(newBlock.cid)).tryGet() - await newBlock.cid in store + teardown: + removeDir(repoDir) - test "getBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) - let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().get() == newBlock + test "putBlock": + (await store.putBlock(newBlock)).tryGet() + check: + fileExists(store.blockPath(newBlock.cid)) + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store - test "fail getBlock": - let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().isNone + test "getBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) + let blk = await store.getBlock(newBlock.cid) + check blk.tryGet() == newBlock - test "hasBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) + test "fail getBlock": + let blk = await store.getBlock(newBlock.cid) + check: + blk.isErr + blk.error.kind == BlockNotFoundErr - check: - (await store.hasBlock(newBlock.cid)).tryGet() - await newBlock.cid in store + test "hasBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) - test "fail hasBlock": - check: - not (await store.hasBlock(newBlock.cid)).tryGet() - not (await newBlock.cid in store) + check: + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store - test "listBlocks": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) + test "fail hasBlock": + check: + not (await store.hasBlock(newBlock.cid)).tryGet() + not (await newBlock.cid in store) - (await store.listBlocks( - proc(cid: Cid) {.gcsafe, async.} = - check cid == newBlock.cid - )).tryGet() + test "listBlocks": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) - test "delBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) + (await store.listBlocks( + proc(cid: Cid) {.gcsafe, async.} = + check cid == newBlock.cid + )).tryGet() - (await store.delBlock(newBlock.cid)).tryGet() - check not fileExists(store.blockPath(newBlock.cid)) + test "delBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) + + (await store.delBlock(newBlock.cid)).tryGet() + + check not fileExists(store.blockPath(newBlock.cid)) + +runSuite(cache = true) +runSuite(cache = false) diff --git a/tests/codex/stores/testsqlitestore.nim b/tests/codex/stores/testsqlitestore.nim index 7d8975e1..8e012bd6 100644 --- a/tests/codex/stores/testsqlitestore.nim +++ b/tests/codex/stores/testsqlitestore.nim @@ -43,7 +43,7 @@ proc runSuite(cache: bool) = if cache: store = SQLiteStore.new(repoDir) else: - store = SQLiteStore.new(repoDir, nil) + store = SQLiteStore.new(repoDir, cache = nil) newBlock = randomBlock() @@ -132,37 +132,24 @@ proc runSuite(cache: bool) = # get from database getRes = await store.getBlock(newBlock.cid) - check: getRes.isOk - - var - blkOpt = getRes.get - check: - blkOpt.isSome - blkOpt.get == newBlock + getRes.isOk + getRes.get == newBlock # get from enabled cache getRes = await store.getBlock(newBlock.cid) - check: getRes.isOk - - blkOpt = getRes.get - check: - blkOpt.isSome - blkOpt.get == newBlock + getRes.isOk + getRes.get == newBlock test "fail getBlock": let - getRes = await store.getBlock(newBlock.cid) - - assert getRes.isOk - - let - blkOpt = getRes.get - - check: blkOpt.isNone + blkRes = await store.getBlock(newBlock.cid) + check: + blkRes.isErr + blkRes.error.kind == BlockNotFoundErr test "hasBlock": let diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 4d0c35af..b9eef95c 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -158,7 +158,7 @@ suite "Test Node": (await localStore.hasBlock(manifestCid)).tryGet() var - manifestBlock = (await localStore.getBlock(manifestCid)).tryGet().get() + manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() localManifest = Manifest.decode(manifestBlock).tryGet() check: