diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 69fda741..7d9415d6 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -200,7 +200,7 @@ proc blockPresenceHandler*( .filter do(cid: Cid) -> bool: not b.peers.anyIt( cid in it.peerHave )) -proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = +proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = trace "Schedule a task for new blocks" let @@ -209,17 +209,18 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = # schedule any new peers to provide blocks to for p in b.peers: for c in cids: # for each cid - # schedule a peer if it wants at least one - # cid and we have it in our local store - if c in p.peerWants and c in b.localStore: - if b.scheduleTask(p): - trace "Task scheduled for peer", peer = p.id - else: - trace "Unable to schedule task for peer", peer = p.id + # schedule a peer if it wants at least one cid + # and we have it in our local store + if c in p.peerWants: + if await (c in b.localStore): + if b.scheduleTask(p): + trace "Task scheduled for peer", peer = p.id + else: + trace "Unable to schedule task for peer", peer = p.id - break # do next peer + break # do next peer -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = +proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = ## Resolve pending blocks from the pending blocks manager ## and schedule any new task to be ran ## @@ -227,7 +228,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Resolving blocks", blocks = blocks.len b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) + await b.scheduleTasks(blocks) b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid )) proc payForBlocks(engine: BlockExcEngine, @@ -254,7 +255,7 @@ proc blocksHandler*( trace "Unable to store block", cid = blk.cid continue - b.resolveBlocks(blocks) + await b.resolveBlocks(blocks) let peerCtx = b.peers.get(peer) if peerCtx != nil: b.payForBlocks(peerCtx, blocks) @@ -289,8 +290,9 @@ proc wantListHandler*( # peer might want to ask for the same cid with # different want params - if e.sendDontHave and e.cid notin b.localStore: - dontHaves.add(e.cid) + if e.sendDontHave: + if not(await e.cid in b.localStore): + dontHaves.add(e.cid) # send don't have's to remote if dontHaves.len > 0: @@ -393,7 +395,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = for e in task.peerWants: if e.wantType == WantType.wantHave: var presence = Presence(cid: e.cid) - presence.have = b.localStore.hasblock(presence.cid) + presence.have = await (presence.cid in b.localStore) if presence.have and price =? b.pricing.?price: presence.price = price wants.add(BlockPresence.init(presence)) diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index a7439f8d..9625f833 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -47,11 +47,11 @@ method delBlock*( raiseAssert("Not implemented!") -method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} = +method hasBlock*(s: BlockStore, cid: Cid): Future[?!bool] {.base.} = ## Check if the block exists in the blockstore ## - return false + raiseAssert("Not implemented!") method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} = ## Get the list of blocks in the BlockStore. This is an intensive operation @@ -59,5 +59,9 @@ method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} = raiseAssert("Not implemented!") -proc contains*(s: BlockStore, blk: Cid): bool = - s.hasBlock(blk) +proc contains*(s: BlockStore, blk: Cid): Future[bool] {.async.} = + ## Check if the block exists in the blockstore. + ## Return false if error encountered + ## + + return (await s.hasBlock(blk)) |? false diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index e92ec50b..5e04d337 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -55,18 +55,30 @@ method getBlock*( trace "Empty block, ignoring" return cid.emptyBlock.success - return self.cache[cid].catch() +# if cid notin self.cache: +# return Block.failure("Block not found") # TODO: return nil -method hasBlock*(self: CacheStore, cid: Cid): bool = - ## check if the block exists + try: + echo "cachestore: before exception" + let x = self.cache[cid] + echo "cachestore: after exception" + return x.success + except CatchableError as e: + echo "cachestore: exception catched" + return Block.failure(e) + +# return self.cache[cid].catch() + +method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore ## - trace "Checking for block presence in cache", cid + trace "Checking CacheStore for block presence", cid if cid.isEmpty: trace "Empty block, ignoring" - return true + return true.success - cid in self.cache + return (cid in self.cache).success method listBlocks*(s: CacheStore, onBlock: OnBlock) {.async.} = for cid in toSeq(s.cache.keys): diff --git a/codex/stores/fsstore.nim b/codex/stores/fsstore.nim index 5ec1bead..721664f3 100644 --- a/codex/stores/fsstore.nim +++ b/codex/stores/fsstore.nim @@ -43,24 +43,31 @@ method getBlock*( ## Get a block from the stores ## + trace "Getting block from filestore", cid if cid.isEmpty: trace "Empty block, ignoring" return cid.emptyBlock.success - if cid in self.cache: - return await self.cache.getBlock(cid) - - if cid notin self: - return Block.failure("Couldn't find block in fs store") + # Try to get this block from the cache + let cachedBlock = await self.cache.getBlock(cid) + if cachedBlock.isOK: # TODO: check for success and non-emptiness + return cachedBlock + # Read file contents var data: seq[byte] - let path = self.blockPath(cid) - if ( - let res = io2.readFile(path, data); - res.isErr): - let error = io2.ioErrorMsg(res.error) - trace "Cannot read file from fs store", path , error - return Block.failure("Cannot read file from fs store") + let + path = self.blockPath(cid) + res = io2.readFile(path, data) + + # TODO: If file doesn't exist - return empty block, + # other I/O errors are signaled as failures + if res.isErr: + if not isFile(path): + return Block.failure("Couldn't find block in filestore") + else: + let error = io2.ioErrorMsg(res.error) + trace "Cannot read file from filestore", path, error + return Block.failure("Cannot read file from filestore") return Block.new(cid, data) @@ -74,15 +81,16 @@ method putBlock*( trace "Empty block, ignoring" return true - if blk.cid in self: + let path = self.blockPath(blk.cid) + if isFile(path): return true # if directory exists it wont fail - if io2.createPath(self.blockPath(blk.cid).parentDir).isErr: - trace "Unable to create block prefix dir", dir = self.blockPath(blk.cid).parentDir + let dir = path.parentDir + if io2.createPath(dir).isErr: + trace "Unable to create block prefix dir", dir return false - let path = self.blockPath(blk.cid) if ( let res = io2.writeFile(path, blk.data); res.isErr): @@ -117,16 +125,16 @@ method delBlock*( return await self.cache.delBlock(cid) -method hasBlock*(self: FSStore, cid: Cid): bool = +method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore ## - trace "Checking for block existence", cid + trace "Checking filestore for block existence", cid if cid.isEmpty: trace "Empty block, ignoring" - return true + return true.success - self.blockPath(cid).isFile() + return self.blockPath(cid).isFile().success method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} = debug "Listing all blocks in store" diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 5ce90d36..8c08c917 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -61,7 +61,7 @@ method putBlock*( if not (await self.localStore.putBlock(blk)): return false - self.engine.resolveBlocks(@[blk]) + await self.engine.resolveBlocks(@[blk]) return true method delBlock*( @@ -75,13 +75,12 @@ method delBlock*( {.pop.} -method hasBlock*( - self: NetworkStore, - cid: Cid): bool = +method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore ## - self.localStore.hasBlock(cid) + trace "Checking NetworkStore for block existence", cid + return await self.localStore.hasBlock(cid) proc new*( T: type NetworkStore, diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index e71b78b3..3e63e674 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -82,7 +82,7 @@ suite "Block Advertising and Discovery": blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - engine.resolveBlocks(blocks.filterIt( it.cid == cid )) + await engine.resolveBlocks(blocks.filterIt( it.cid == cid )) await allFuturesThrowing( allFinished(pendingBlocks)) diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index f898bb79..8e436c1c 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -122,7 +122,8 @@ suite "NetworkStore engine - 2 nodes": .pushOrUpdateNoWait(peerCtx1).isOk await sleepAsync(100.millis) - check nodeCmps1.localStore.hasBlock(blk.cid) + let present = await nodeCmps1.localStore.hasBlock(blk.cid) + check present.tryGet() test "Should get blocks from remote": let blocks = await allFinished( diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index e6193e70..9d41a8c3 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -242,7 +242,8 @@ suite "NetworkStore engine handlers": let resolved = await allFinished(pending) check resolved.mapIt( it.read ) == blocks for b in blocks: - check engine.localStore.hasBlock(b.cid) + let present = await engine.localStore.hasBlock(b.cid) + check present.tryGet() test "Should send payments for received blocks": let account = Account(address: EthAddress.example) diff --git a/tests/codex/stores/testcachestore.nim b/tests/codex/stores/testcachestore.nim index 0a0d2fc6..f4a826d2 100644 --- a/tests/codex/stores/testcachestore.nim +++ b/tests/codex/stores/testcachestore.nim @@ -23,9 +23,11 @@ suite "Cache Store tests": store = CacheStore.new() test "constructor": + echo "testcachestore: before exception" # cache size cannot be smaller than chunk size expect ValueError: discard CacheStore.new(cacheSize = 1, chunkSize = 2) + echo "testcachestore: after exception" store = CacheStore.new(cacheSize = 100, chunkSize = 1) check store.currentSize == 0 @@ -50,7 +52,7 @@ suite "Cache Store tests": check: await store.putBlock(newBlock1) - newBlock1.cid in store + (await store.hasBlock(newBlock1.cid)).tryGet() # block size bigger than entire cache store = CacheStore.new(cacheSize = 99, chunkSize = 98) @@ -62,9 +64,9 @@ suite "Cache Store tests": cacheSize = 200, chunkSize = 1) check: - not store.hasBlock(newBlock1.cid) - store.hasBlock(newBlock2.cid) - store.hasBlock(newBlock3.cid) + not (await store.hasBlock(newBlock1.cid)).tryGet() + (await store.hasBlock(newBlock2.cid)).tryGet() + (await store.hasBlock(newBlock2.cid)).tryGet() store.currentSize == newBlock2.data.len + newBlock3.data.len # 200 test "getBlock": @@ -85,11 +87,14 @@ suite "Cache Store tests": test "hasBlock": let store = CacheStore.new(@[newBlock]) - - check store.hasBlock(newBlock.cid) + check: + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store test "fail hasBlock": - check not store.hasBlock(newBlock.cid) + check: + not (await store.hasBlock(newBlock.cid)).tryGet() + not (await newBlock.cid in store) test "delBlock": # empty cache @@ -103,10 +108,12 @@ suite "Cache Store tests": store = CacheStore.new(@[newBlock1, newBlock2, newBlock3]) check: store.currentSize == 300 + (await store.delBlock(newBlock2.cid)).tryGet() + check: store.currentSize == 200 - newBlock2.cid notin store + not (await store.hasBlock(newBlock2.cid)).tryGet() test "listBlocks": discard await store.putBlock(newBlock1) @@ -114,7 +121,7 @@ suite "Cache Store tests": var listed = false await store.listBlocks( proc(cid: Cid) {.gcsafe, async.} = - check cid in store + check (await store.hasBlock(cid)).tryGet() listed = true ) diff --git a/tests/codex/stores/testfsstore.nim b/tests/codex/stores/testfsstore.nim index 4089623e..146f3313 100644 --- a/tests/codex/stores/testfsstore.nim +++ b/tests/codex/stores/testfsstore.nim @@ -33,9 +33,11 @@ suite "FS Store": removeDir(repoDir) test "putBlock": - check await store.putBlock(newBlock) - check fileExists(store.blockPath(newBlock.cid)) - check newBlock.cid in store + check: + await store.putBlock(newBlock) + fileExists(store.blockPath(newBlock.cid)) + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store test "getBlock": createDir(store.blockPath(newBlock.cid).parentDir) @@ -51,7 +53,14 @@ suite "FS Store": createDir(store.blockPath(newBlock.cid).parentDir) writeFile(store.blockPath(newBlock.cid), newBlock.data) - check store.hasBlock(newBlock.cid) + check: + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store + + test "fail hasBlock": + check: + not (await store.hasBlock(newBlock.cid)).tryGet() + not (await newBlock.cid in store) test "listBlocks": createDir(store.blockPath(newBlock.cid).parentDir) @@ -61,9 +70,6 @@ suite "FS Store": proc(cid: Cid) {.gcsafe, async.} = check cid == newBlock.cid) - test "fail hasBlock": - check not store.hasBlock(newBlock.cid) - test "delBlock": createDir(store.blockPath(newBlock.cid).parentDir) writeFile(store.blockPath(newBlock.cid), newBlock.data) diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 26e22475..5f5e828b 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -78,7 +78,8 @@ suite "Erasure encode/decode": decoded.len == encoded.originalLen for d in dropped: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "Should not tolerate loosing more than M data blocks in a single random column": const @@ -103,7 +104,8 @@ suite "Erasure encode/decode": decoded = (await erasure.decode(encoded)).tryGet() for d in dropped: - check d notin store + let present = await store.hasBlock(d) + check not present.tryGet() test "Should tolerate loosing M data blocks in M random columns": const @@ -130,7 +132,8 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() for d in manifest: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "Should not tolerate loosing more than M data blocks in M random columns": const @@ -179,7 +182,8 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() for d in manifest: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "Should tolerate loosing M (a.k.a row) contiguous parity blocks": const @@ -194,7 +198,8 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() for d in manifest: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "handles edge case of 0 parity blocks": const diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 98cbff01..b7fc89cc 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -85,7 +85,7 @@ suite "Test Node": manifestCid = (await storeFut).tryGet() check: - manifestCid in localStore + (await localStore.hasBlock(manifestCid)).tryGet() var manifestBlock = (await localStore.getBlock(manifestCid)).tryGet()