diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 24396745..e2826ced 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -379,8 +379,8 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = # Extract succesfully received blocks let blocks = blockFuts - .filterIt(it.completed and it.read.isOk and it.read.get.isSome) - .mapIt(it.read.get.get) + .filterIt(it.completed and it.read.isOk) + .mapIt(it.read.get) if blocks.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = blocks.len diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 01401f2b..c8407484 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -114,14 +114,10 @@ proc encode*( for j in 0..= encoded.K: diff --git a/codex/node.nim b/codex/node.nim index 39df9c5e..7bad41e6 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -107,7 +107,7 @@ proc fetchBatched*( await allFuturesThrowing(allFinished(blocks)) if not onBatch.isNil: - await onBatch(blocks.mapIt( it.read.get.get )) + await onBatch(blocks.mapIt( it.read.get )) except CancelledError as exc: raise exc except CatchableError as exc: diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index 6109ca4e..bda25c23 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -24,7 +24,7 @@ type OnBlock* = proc(cid: Cid): Future[void] {.upraises: [], gcsafe.} BlockStore* = ref object of RootObj -method getBlock*(self: BlockStore, cid: Cid): Future[?! (? Block)] {.base.} = +method getBlock*(self: BlockStore, cid: Cid): Future[?!Block] {.base.} = ## Get a block from the blockstore ## diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index c3091b40..9a90ef04 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -43,24 +43,24 @@ const DefaultCacheSizeMiB* = 100 DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes -method getBlock*(self: CacheStore, cid: Cid): Future[?! (? Block)] {.async.} = +method getBlock*(self: CacheStore, cid: Cid): Future[?!Block] {.async.} = ## Get a block from the stores ## trace "Getting block from cache", cid + if cid.isEmpty: trace "Empty block, ignoring" - return cid.emptyBlock.some.success + return success cid.emptyBlock if cid notin self.cache: - return Block.none.success + return failure "Block not in cache" try: - let blk = self.cache[cid] - return blk.some.success + return success self.cache[cid] except CatchableError as exc: - trace "Exception requesting block", cid, exc = exc.msg - return failure(exc) + trace "Error requesting block from cache", cid, error = exc.msg + return failure exc method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore diff --git a/codex/stores/fsstore.nim b/codex/stores/fsstore.nim index bae8d16c..e44e804a 100644 --- a/codex/stores/fsstore.nim +++ b/codex/stores/fsstore.nim @@ -37,47 +37,70 @@ type template blockPath*(self: FSStore, cid: Cid): string = self.repoDir / ($cid)[^self.postfixLen..^1] / $cid -method getBlock*(self: FSStore, cid: Cid): Future[?! (? Block)] {.async.} = - ## Get a block from the stores +method getBlock*(self: FSStore, cid: Cid): Future[?!Block] {.async.} = + ## Get a block from the cache or filestore. + ## Save a copy to the cache if present in the filestore but not in the cache ## - trace "Getting block from filestore", cid + if not self.cache.isNil: + trace "Getting block from cache or filestore", cid + else: + trace "Getting block from filestore", cid + if cid.isEmpty: trace "Empty block, ignoring" - return cid.emptyBlock.some.success + return success cid.emptyBlock - let cachedBlock = await self.cache.getBlock(cid) - if cachedBlock.isErr: - return cachedBlock - if cachedBlock.get.isSome: - trace "Retrieved block from cache", cid - return cachedBlock + if not self.cache.isNil: + let + cachedBlockRes = await self.cache.getBlock(cid) + + if not cachedBlockRes.isErr: + return cachedBlockRes + else: + trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg # Read file contents - var data: seq[byte] + var + data: seq[byte] + let path = self.blockPath(cid) res = io2.readFile(path, data) if res.isErr: if not isFile(path): # May be, check instead that "res.error == ERROR_FILE_NOT_FOUND" ? - return Block.none.success + return failure "Block not in filestore" else: - let error = io2.ioErrorMsg(res.error) - trace "Cannot read file from filestore", path, error - return failure("Cannot read file from filestore") + let + error = io2.ioErrorMsg(res.error) - without var blk =? Block.new(cid, data), error: - return error.failure + trace "Error requesting block from filestore", path, error + return failure "Error requesting block from filestore: " & error - # TODO: add block to the cache - return blk.some.success + without blk =? Block.new(cid, data), error: + trace "Unable to construct block from data", cid, error = error.msg + return failure error + + if not self.cache.isNil: + let + putCachedRes = await self.cache.putBlock(blk) + + if putCachedRes.isErr: + trace "Unable to store block in cache", cid, error = putCachedRes.error.msg + + return success blk method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} = - ## Write block contents to file with name based on blk.cid, - ## save second copy to the cache + ## Write a block's contents to a file with name based on blk.cid. + ## Save a copy to the cache ## + if not self.cache.isNil: + trace "Putting block into filestore and cache", cid = blk.cid + else: + trace "Putting block into filestore", cid = blk.cid + if blk.isEmpty: trace "Empty block, ignoring" return success() @@ -98,20 +121,35 @@ method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} = trace "Unable to store block", path, cid = blk.cid, error return failure("Unable to store block") - if isErr (await self.cache.putBlock(blk)): - trace "Unable to store block in cache", cid = blk.cid + if not self.cache.isNil: + let + putCachedRes = await self.cache.putBlock(blk) + + if putCachedRes.isErr: + trace "Unable to store block in cache", cid = blk.cid, error = putCachedRes.error.msg return success() method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the blockstore + ## Delete a block from the cache and filestore ## - trace "Deleting block from filestore", cid + if not self.cache.isNil: + trace "Deleting block from cache and filestore", cid + else: + trace "Deleting block from filestore", cid + if cid.isEmpty: trace "Empty block, ignoring" return success() + if not self.cache.isNil: + let + delCachedRes = await self.cache.delBlock(cid) + + if delCachedRes.isErr: + trace "Unable to delete block from cache", cid, error = delCachedRes.error.msg + let path = self.blockPath(cid) res = io2.removeFile(path) @@ -121,10 +159,10 @@ method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} = trace "Unable to delete block", path, cid, error return error.failure - return await self.cache.delBlock(cid) + return success() method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} = - ## Check if the block exists in the blockstore + ## Check if a block exists in the filestore ## trace "Checking filestore for block existence", cid @@ -135,7 +173,8 @@ method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} = return self.blockPath(cid).isFile().success method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} = - ## Get the list of blocks in the BlockStore. This is an intensive operation + ## Process list of all blocks in the filestore via callback. + ## This is an intensive operation ## trace "Listing all blocks in filestore" diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 0097b285..592f1fc3 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -31,28 +31,27 @@ type engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store -method getBlock*(self: NetworkStore, cid: Cid): Future[?! (? bt.Block)] {.async.} = +method getBlock*(self: NetworkStore, cid: Cid): Future[?!bt.Block] {.async.} = ## Get a block from a remote peer ## - trace "Getting block from network store", cid + trace "Getting block from local store or network", cid - let blk = await self.localStore.getBlock(cid) - if blk.isErr: - return blk - if blk.get.isSome: - trace "Retrieved block from local store", cid - return blk + if not (await cid in self.localStore): + trace "Block not in local store", cid - trace "Block not found in local store", cid - try: - # TODO: What if block isn't available in the engine too? - let blk = await self.engine.requestBlock(cid) - # TODO: add block to the local store - return blk.some.success - except CatchableError as exc: - trace "Exception requesting block", cid, exc = exc.msg - return failure(exc) + try: + # TODO: What if block isn't available in the engine too? + let + blk = await self.engine.requestBlock(cid) + + # TODO: add block to the local store + return success blk + except CatchableError as exc: + trace "Error requesting block from network", cid, error = exc.msg + return failure exc + + return await self.localStore.getBlock(cid) method putBlock*(self: NetworkStore, blk: bt.Block): Future[?!void] {.async.} = ## Store block locally and notify the network diff --git a/codex/stores/sqlitestore.nim b/codex/stores/sqlitestore.nim index 03d697bf..61856a5c 100644 --- a/codex/stores/sqlitestore.nim +++ b/codex/stores/sqlitestore.nim @@ -11,8 +11,6 @@ import pkg/upraises push: {.upraises: [].} -import std/options - import pkg/chronos import pkg/chronicles import pkg/datastore/sqlite_datastore @@ -72,7 +70,7 @@ proc blockKey*(blockCid: Cid): ?!Key = method getBlock*( self: SQLiteStore, - cid: Cid): Future[?!(?Block)] {.async.} = + cid: Cid): Future[?!Block] {.async.} = ## Get a block from the cache or database. ## Save a copy to the cache if present in the database but not in the cache ## @@ -84,24 +82,26 @@ method getBlock*( if cid.isEmpty: trace "Empty block, ignoring" - return success cid.emptyBlock.some + return success cid.emptyBlock if not self.cache.isNil: - without cachedBlkOpt =? await self.cache.getBlock(cid), error: - trace "Unable to read block from cache", cid, error = error.msg + let + cachedBlockRes = await self.cache.getBlock(cid) - if cachedBlkOpt.isSome: - return success cachedBlkOpt + if not cachedBlockRes.isErr: + return cachedBlockRes + else: + trace "Unable to read block from cache", cid, error = cachedBlockRes.error.msg without blkKey =? blockKey(cid), error: return failure error without dataOpt =? await self.datastore.get(blkKey), error: - trace "Unable to read block from database", key = blkKey.id, error = error.msg + trace "Error requesting block from database", key = blkKey.id, error = error.msg return failure error without data =? dataOpt: - return success Block.none + return failure "Block not in database" without blk =? Block.new(cid, data), error: trace "Unable to construct block from data", cid, error = error.msg @@ -114,7 +114,7 @@ method getBlock*( if putCachedRes.isErr: trace "Unable to store block in cache", cid, error = putCachedRes.error.msg - return success blk.some + return success blk method putBlock*( self: SQLiteStore, @@ -154,7 +154,7 @@ method putBlock*( method delBlock*( self: SQLiteStore, cid: Cid): Future[?!void] {.async.} = - ## Delete a block from the database and cache + ## Delete a block from the cache and database ## if not self.cache.isNil: diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index 7371bf66..0f91cf63 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -126,7 +126,7 @@ suite "NetworkStore engine - 2 nodes": test "Should get blocks from remote": let blocks = await allFinished( blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) )) - check blocks.mapIt( it.read().tryGet().get() ) == blocks2 + check blocks.mapIt( it.read().tryGet() ) == blocks2 test "Remote should send blocks when available": let blk = bt.Block.new("Block 1".toBytes).tryGet() diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 0b92f8df..475bf7a1 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -47,7 +47,7 @@ proc corruptBlocks*( pos.add(i) var - blk = (await store.getBlock(manifest[i])).tryGet().get() + blk = (await store.getBlock(manifest[i])).tryGet() bytePos: seq[int] while true: diff --git a/tests/codex/stores/testcachestore.nim b/tests/codex/stores/testcachestore.nim index c1fd2d03..a7027f48 100644 --- a/tests/codex/stores/testcachestore.nim +++ b/tests/codex/stores/testcachestore.nim @@ -72,11 +72,11 @@ suite "Cache Store": store = CacheStore.new(@[newBlock]) let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().get() == newBlock + check blk.tryGet() == newBlock test "fail getBlock": let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().isNone() + check blk.isErr test "hasBlock": let store = CacheStore.new(@[newBlock]) diff --git a/tests/codex/stores/testfsstore.nim b/tests/codex/stores/testfsstore.nim index a13cbf8b..175cfae7 100644 --- a/tests/codex/stores/testfsstore.nim +++ b/tests/codex/stores/testfsstore.nim @@ -16,62 +16,71 @@ import pkg/codex/blocktype as bt import ../helpers -suite "FS Store": - var - store: FSStore - repoDir: string - newBlock = bt.Block.new("New Block".toBytes()).tryGet() +proc runSuite(cache: bool) = + suite "FS Store " & (if cache: "(cache enabled)" else: "(cache disabled)"): + var + store: FSStore + repoDir: string + newBlock = bt.Block.new("New Block".toBytes()).tryGet() - setup: - repoDir = getAppDir() / "repo" - createDir(repoDir) - store = FSStore.new(repoDir) + setup: + repoDir = getAppDir() / "repo" + createDir(repoDir) - teardown: - removeDir(repoDir) + if cache: + store = FSStore.new(repoDir) + else: + store = FSStore.new(repoDir, postfixLen = 2, cache = nil) - test "putBlock": - (await store.putBlock(newBlock)).tryGet() - check: - fileExists(store.blockPath(newBlock.cid)) - (await store.hasBlock(newBlock.cid)).tryGet() - await newBlock.cid in store + teardown: + removeDir(repoDir) - test "getBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) - let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().get() == newBlock + test "putBlock": + (await store.putBlock(newBlock)).tryGet() + check: + fileExists(store.blockPath(newBlock.cid)) + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store - test "fail getBlock": - let blk = await store.getBlock(newBlock.cid) - check blk.tryGet().isNone + test "getBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) + let blk = await store.getBlock(newBlock.cid) + check blk.tryGet() == newBlock - test "hasBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) + test "fail getBlock": + let blk = await store.getBlock(newBlock.cid) + check blk.isErr - check: - (await store.hasBlock(newBlock.cid)).tryGet() - await newBlock.cid in store + test "hasBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) - test "fail hasBlock": - check: - not (await store.hasBlock(newBlock.cid)).tryGet() - not (await newBlock.cid in store) + check: + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store - test "listBlocks": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) + test "fail hasBlock": + check: + not (await store.hasBlock(newBlock.cid)).tryGet() + not (await newBlock.cid in store) - (await store.listBlocks( - proc(cid: Cid) {.gcsafe, async.} = - check cid == newBlock.cid - )).tryGet() + test "listBlocks": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) - test "delBlock": - createDir(store.blockPath(newBlock.cid).parentDir) - writeFile(store.blockPath(newBlock.cid), newBlock.data) + (await store.listBlocks( + proc(cid: Cid) {.gcsafe, async.} = + check cid == newBlock.cid + )).tryGet() - (await store.delBlock(newBlock.cid)).tryGet() - check not fileExists(store.blockPath(newBlock.cid)) + test "delBlock": + createDir(store.blockPath(newBlock.cid).parentDir) + writeFile(store.blockPath(newBlock.cid), newBlock.data) + + (await store.delBlock(newBlock.cid)).tryGet() + + check not fileExists(store.blockPath(newBlock.cid)) + +runSuite(cache = true) +runSuite(cache = false) diff --git a/tests/codex/stores/testsqlitestore.nim b/tests/codex/stores/testsqlitestore.nim index 7d8975e1..7d488bda 100644 --- a/tests/codex/stores/testsqlitestore.nim +++ b/tests/codex/stores/testsqlitestore.nim @@ -43,7 +43,7 @@ proc runSuite(cache: bool) = if cache: store = SQLiteStore.new(repoDir) else: - store = SQLiteStore.new(repoDir, nil) + store = SQLiteStore.new(repoDir, cache = nil) newBlock = randomBlock() @@ -132,37 +132,19 @@ proc runSuite(cache: bool) = # get from database getRes = await store.getBlock(newBlock.cid) - check: getRes.isOk - - var - blkOpt = getRes.get - check: - blkOpt.isSome - blkOpt.get == newBlock + getRes.isOk + getRes.get == newBlock # get from enabled cache getRes = await store.getBlock(newBlock.cid) - check: getRes.isOk - - blkOpt = getRes.get - check: - blkOpt.isSome - blkOpt.get == newBlock + getRes.isOk + getRes.get == newBlock test "fail getBlock": - let - getRes = await store.getBlock(newBlock.cid) - - assert getRes.isOk - - let - blkOpt = getRes.get - - check: blkOpt.isNone - + check: (await store.getBlock(newBlock.cid)).isErr test "hasBlock": let diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 4d0c35af..b9eef95c 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -158,7 +158,7 @@ suite "Test Node": (await localStore.hasBlock(manifestCid)).tryGet() var - manifestBlock = (await localStore.getBlock(manifestCid)).tryGet().get() + manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() localManifest = Manifest.decode(manifestBlock).tryGet() check: