diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index dacfb213..ecfaf3a4 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -83,7 +83,7 @@ proc advertiseQueueLoop*(b: DiscoveryEngine) {.async.} = trace "Exception listing blocks", exc = exc.msg while b.discEngineRunning: - await b.localStore.listBlocks(onBlock) + discard await b.localStore.listBlocks(onBlock) trace "About to sleep advertise loop", sleep = b.advertiseLoopSleep await sleepAsync(b.advertiseLoopSleep) diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 69fda741..964b38eb 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -9,6 +9,8 @@ import std/sequtils import std/sets +import std/options +import std/algorithm import pkg/chronos import pkg/chronicles @@ -200,7 +202,7 @@ proc blockPresenceHandler*( .filter do(cid: Cid) -> bool: not b.peers.anyIt( cid in it.peerHave )) -proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = +proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = trace "Schedule a task for new blocks" let @@ -209,17 +211,18 @@ proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = # schedule any new peers to provide blocks to for p in b.peers: for c in cids: # for each cid - # schedule a peer if it wants at least one - # cid and we have it in our local store - if c in p.peerWants and c in b.localStore: - if b.scheduleTask(p): - trace "Task scheduled for peer", peer = p.id - else: - trace "Unable to schedule task for peer", peer = p.id + # schedule a peer if it wants at least one cid + # and we have it in our local store + if c in p.peerWants: + if await (c in b.localStore): + if b.scheduleTask(p): + trace "Task scheduled for peer", peer = p.id + else: + trace "Unable to schedule task for peer", peer = p.id - break # do next peer + break # do next peer -proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = +proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) {.async.} = ## Resolve pending blocks from the pending blocks manager ## and schedule any new task to be ran ## @@ -227,7 +230,7 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Resolving blocks", blocks = blocks.len b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) + await b.scheduleTasks(blocks) b.discovery.queueProvideBlocksReq(blocks.mapIt( it.cid )) proc payForBlocks(engine: BlockExcEngine, @@ -250,11 +253,10 @@ proc blocksHandler*( trace "Got blocks from peer", peer, len = blocks.len for blk in blocks: - if not (await b.localStore.putBlock(blk)): + if isErr (await b.localStore.putBlock(blk)): trace "Unable to store block", cid = blk.cid - continue - b.resolveBlocks(blocks) + await b.resolveBlocks(blocks) let peerCtx = b.peers.get(peer) if peerCtx != nil: b.payForBlocks(peerCtx, blocks) @@ -289,8 +291,9 @@ proc wantListHandler*( # peer might want to ask for the same cid with # different want params - if e.sendDontHave and e.cid notin b.localStore: - dontHaves.add(e.cid) + if e.sendDontHave: + if not(await e.cid in b.localStore): + dontHaves.add(e.cid) # send don't have's to remote if dontHaves.len > 0: @@ -358,22 +361,25 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id - var wantsBlocks = newAsyncHeapQueue[Entry](queueType = QueueType.Max) - # get blocks and wants to send to the remote - for e in task.peerWants: - if e.wantType == WantType.wantBlock: - await wantsBlocks.push(e) + # PART 1: Send to the peer blocks he wants to get, + # if they present in our local store # TODO: There should be all sorts of accounting of # bytes sent/received here + + var wantsBlocks = task.peerWants.filterIt(it.wantType == WantType.wantBlock) + if wantsBlocks.len > 0: + wantsBlocks.sort(SortOrder.Descending) + let blockFuts = await allFinished(wantsBlocks.mapIt( b.localStore.getBlock(it.cid) )) + # Extract succesfully received blocks let blocks = blockFuts - .filterIt((not it.failed) and it.read.isOk) - .mapIt(!it.read) + .filterIt(it.completed and it.read.isOk and it.read.get.isSome) + .mapIt(it.read.get.get) if blocks.len > 0: trace "Sending blocks to peer", peer = task.id, blocks = blocks.len @@ -381,11 +387,15 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = task.id, blocks) - # Remove successfully sent blocks - task.peerWants.keepIf( - proc(e: Entry): bool = - not blocks.anyIt( it.cid == e.cid ) - ) + # Remove successfully sent blocks + task.peerWants.keepIf( + proc(e: Entry): bool = + not blocks.anyIt( it.cid == e.cid ) + ) + + + # PART 2: Send to the peer prices of the blocks he wants to discover, + # if they present in our local store var wants: seq[BlockPresence] # do not remove wants from the queue unless @@ -393,7 +403,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = for e in task.peerWants: if e.wantType == WantType.wantHave: var presence = Presence(cid: e.cid) - presence.have = b.localStore.hasblock(presence.cid) + presence.have = await (presence.cid in b.localStore) if presence.have and price =? b.pricing.?price: presence.price = price wants.add(BlockPresence.init(presence)) diff --git a/codex/blocktype.nim b/codex/blocktype.nim index f16d155f..081acdc2 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -21,6 +21,14 @@ import pkg/questionable/results import ./errors +const + BlockSize* = 31 * 64 * 4 # block size + +type + Block* = ref object of RootObj + cid*: Cid + data*: seq[byte] + template EmptyCid*: untyped = var emptyCid {.global, threadvar.}: @@ -90,14 +98,6 @@ template EmptyBlock*: untyped = emptyBlock -const - BlockSize* = 31 * 64 * 4 # block size - -type - Block* = ref object of RootObj - cid*: Cid - data*: seq[byte] - proc isEmpty*(cid: Cid): bool = cid == EmptyCid[cid.cidver] .catch diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 87fc6a17..01401f2b 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -114,10 +114,14 @@ proc encode*( for j in 0..= encoded.K: @@ -242,7 +248,7 @@ proc decode*( return failure(error) trace "Recovered block", cid = blk.cid - if not (await self.store.putBlock(blk)): + if isErr (await self.store.putBlock(blk)): trace "Unable to store block!", cid = blk.cid return failure("Unable to store block!") except CancelledError as exc: diff --git a/codex/node.nim b/codex/node.nim index b234e07e..4bd3550c 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -66,9 +66,12 @@ proc retrieve*( cid: Cid): Future[?!LPStream] {.async.} = trace "Received retrieval request", cid - without blk =? await node.blockStore.getBlock(cid): - return failure( - newException(CodexError, "Couldn't retrieve block for Cid!")) + without blkOrNone =? await node.blockStore.getBlock(cid), error: + return failure(error) + + without blk =? blkOrNone: + trace "Block not found", cid + return failure("Block not found") if manifest =? Manifest.decode(blk.data, blk.cid): @@ -134,7 +137,7 @@ proc store*( return failure("Unable to init block from chunk!") blockManifest.add(blk.cid) - if not (await node.blockStore.putBlock(blk)): + if isErr (await node.blockStore.putBlock(blk)): # trace "Unable to store block", cid = blk.cid return failure("Unable to store block " & $blk.cid) @@ -155,7 +158,7 @@ proc store*( trace "Unable to init block from manifest data!" return failure("Unable to init block from manifest data!") - if not (await node.blockStore.putBlock(manifest)): + if isErr (await node.blockStore.putBlock(manifest)): trace "Unable to store manifest", cid = manifest.cid return failure("Unable to store manifest " & $manifest.cid) @@ -187,13 +190,17 @@ proc store(node: CodexNodeRef, cids: seq[Cid]): Future[?!void] {.async.} = proc store(node: CodexNodeRef, cid: Cid): Future[?!void] {.async.} = ## Retrieves dataset from the network, and stores it locally - if node.blockstore.hasBlock(cid): + without present =? await node.blockstore.hasBlock(cid): + return failure newException(CodexError, "Unable to find block " & $cid) + if present: return success() - without blk =? await node.blockstore.getBlock(cid): + without blkOrNone =? await node.blockstore.getBlock(cid): + return failure newException(CodexError, "Unable to retrieve block " & $cid) + without blk =? blkOrNone: return failure newException(CodexError, "Unable to retrieve block " & $cid) - if not (await node.blockstore.putBlock(blk)): + if isErr (await node.blockstore.putBlock(blk)): return failure newException(CodexError, "Unable to store block " & $cid) if manifest =? Manifest.decode(blk.data, blk.cid): @@ -224,10 +231,14 @@ proc requestStorage*(self: CodexNodeRef, trace "Purchasing not available" return failure "Purchasing not available" - without blk =? (await self.blockStore.getBlock(cid)), error: + without blkOrNone =? (await self.blockStore.getBlock(cid)), error: trace "Unable to retrieve manifest block", cid return failure(error) + without blk =? blkOrNone: + trace "Manifest block not found", cid + return failure("Manifest block not found") + without mc =? blk.cid.contentType(): trace "Couldn't identify Cid!", cid return failure("Couldn't identify Cid! " & $cid) @@ -254,7 +265,7 @@ proc requestStorage*(self: CodexNodeRef, trace "Unable to create block from encoded manifest" return failure(error) - if not (await self.blockStore.putBlock(encodedBlk)): + if isErr (await self.blockStore.putBlock(encodedBlk)): trace "Unable to store encoded manifest block", cid = encodedBlk.cid return failure("Unable to store encoded manifest block") diff --git a/codex/stores/blockstore.nim b/codex/stores/blockstore.nim index a7439f8d..478c818e 100644 --- a/codex/stores/blockstore.nim +++ b/codex/stores/blockstore.nim @@ -13,6 +13,7 @@ push: {.upraises: [].} import pkg/chronos import pkg/libp2p +import pkg/questionable import pkg/questionable/results import ../blocktype @@ -23,41 +24,39 @@ type OnBlock* = proc(cid: Cid): Future[void] {.upraises: [], gcsafe.} BlockStore* = ref object of RootObj -method getBlock*( - b: BlockStore, - cid: Cid): Future[?!Block] {.base.} = - ## Get a block from the stores +method getBlock*(self: BlockStore, cid: Cid): Future[?! (? Block)] {.base.} = + ## Get a block from the blockstore ## raiseAssert("Not implemented!") -method putBlock*( - s: BlockStore, - blk: Block): Future[bool] {.base.} = +method putBlock*(self: BlockStore, blk: Block): Future[?!void] {.base.} = ## Put a block to the blockstore ## raiseAssert("Not implemented!") -method delBlock*( - s: BlockStore, - cid: Cid): Future[?!void] {.base.} = +method delBlock*(self: BlockStore, cid: Cid): Future[?!void] {.base.} = ## Delete a block from the blockstore ## raiseAssert("Not implemented!") -method hasBlock*(s: BlockStore, cid: Cid): bool {.base.} = +method hasBlock*(self: BlockStore, cid: Cid): Future[?!bool] {.base.} = ## Check if the block exists in the blockstore ## - return false + raiseAssert("Not implemented!") -method listBlocks*(s: BlockStore, onBlock: OnBlock): Future[void] {.base.} = +method listBlocks*(self: BlockStore, onBlock: OnBlock): Future[?!void] {.base.} = ## Get the list of blocks in the BlockStore. This is an intensive operation ## raiseAssert("Not implemented!") -proc contains*(s: BlockStore, blk: Cid): bool = - s.hasBlock(blk) +proc contains*(self: BlockStore, blk: Cid): Future[bool] {.async.} = + ## Check if the block exists in the blockstore. + ## Return false if error encountered + ## + + return (await self.hasBlock(blk)) |? false diff --git a/codex/stores/cachestore.nim b/codex/stores/cachestore.nim index e92ec50b..0dd7b7b3 100644 --- a/codex/stores/cachestore.nim +++ b/codex/stores/cachestore.nim @@ -44,34 +44,45 @@ const DefaultCacheSizeMiB* = 100 DefaultCacheSize* = DefaultCacheSizeMiB * MiB # bytes -method getBlock*( - self: CacheStore, - cid: Cid): Future[?!Block] {.async.} = +method getBlock*(self: CacheStore, cid: Cid): Future[?! (? Block)] {.async.} = ## Get a block from the stores ## trace "Getting block from cache", cid if cid.isEmpty: trace "Empty block, ignoring" - return cid.emptyBlock.success + return cid.emptyBlock.some.success - return self.cache[cid].catch() + if cid notin self.cache: + return Block.none.success -method hasBlock*(self: CacheStore, cid: Cid): bool = - ## check if the block exists + try: + let blk = self.cache[cid] + return blk.some.success + except CatchableError as exc: + trace "Exception requesting block", cid, exc = exc.msg + return failure(exc) + +method hasBlock*(self: CacheStore, cid: Cid): Future[?!bool] {.async.} = + ## Check if the block exists in the blockstore ## - trace "Checking for block presence in cache", cid + trace "Checking CacheStore for block presence", cid if cid.isEmpty: trace "Empty block, ignoring" - return true + return true.success - cid in self.cache + return (cid in self.cache).success + +method listBlocks*(s: CacheStore, onBlock: OnBlock): Future[?!void] {.async.} = + ## Get the list of blocks in the BlockStore. This is an intensive operation + ## -method listBlocks*(s: CacheStore, onBlock: OnBlock) {.async.} = for cid in toSeq(s.cache.keys): await onBlock(cid) + return success() + func putBlockSync(self: CacheStore, blk: Block): bool = let blkSize = blk.data.len # in bytes @@ -94,22 +105,19 @@ func putBlockSync(self: CacheStore, blk: Block): bool = self.currentSize += blkSize return true -method putBlock*( - self: CacheStore, - blk: Block): Future[bool] {.async.} = +method putBlock*(self: CacheStore, blk: Block): Future[?!void] {.async.} = ## Put a block to the blockstore ## trace "Storing block in cache", cid = blk.cid if blk.isEmpty: trace "Empty block, ignoring" - return true + return success() - return self.putBlockSync(blk) + discard self.putBlockSync(blk) + return success() -method delBlock*( - self: CacheStore, - cid: Cid): Future[?!void] {.async.} = +method delBlock*(self: CacheStore, cid: Cid): Future[?!void] {.async.} = ## Delete a block from the blockstore ## diff --git a/codex/stores/fsstore.nim b/codex/stores/fsstore.nim index 5ec1bead..bb1630d1 100644 --- a/codex/stores/fsstore.nim +++ b/codex/stores/fsstore.nim @@ -37,67 +37,73 @@ type template blockPath*(self: FSStore, cid: Cid): string = self.repoDir / ($cid)[^self.postfixLen..^1] / $cid -method getBlock*( - self: FSStore, - cid: Cid): Future[?!Block] {.async.} = +method getBlock*(self: FSStore, cid: Cid): Future[?! (? Block)] {.async.} = ## Get a block from the stores ## + trace "Getting block from filestore", cid if cid.isEmpty: trace "Empty block, ignoring" - return cid.emptyBlock.success + return cid.emptyBlock.some.success - if cid in self.cache: - return await self.cache.getBlock(cid) - - if cid notin self: - return Block.failure("Couldn't find block in fs store") + let cachedBlock = await self.cache.getBlock(cid) + if cachedBlock.isErr: + return cachedBlock + if cachedBlock.get.isSome: + trace "Retrieved block from cache", cid + return cachedBlock + # Read file contents var data: seq[byte] - let path = self.blockPath(cid) - if ( - let res = io2.readFile(path, data); - res.isErr): - let error = io2.ioErrorMsg(res.error) - trace "Cannot read file from fs store", path , error - return Block.failure("Cannot read file from fs store") + let + path = self.blockPath(cid) + res = io2.readFile(path, data) - return Block.new(cid, data) + if res.isErr: + if not isFile(path): # May be, check instead that "res.error == ERROR_FILE_NOT_FOUND" ? + return Block.none.success + else: + let error = io2.ioErrorMsg(res.error) + trace "Cannot read file from filestore", path, error + return failure("Cannot read file from filestore") -method putBlock*( - self: FSStore, - blk: Block): Future[bool] {.async.} = - ## Put a block to the blockstore + without var blk =? Block.new(cid, data), error: + return error.failure + + # TODO: add block to the cache + return blk.some.success + +method putBlock*(self: FSStore, blk: Block): Future[?!void] {.async.} = + ## Write block contents to file with name based on blk.cid, + ## save second copy to the cache ## if blk.isEmpty: trace "Empty block, ignoring" - return true - - if blk.cid in self: - return true - - # if directory exists it wont fail - if io2.createPath(self.blockPath(blk.cid).parentDir).isErr: - trace "Unable to create block prefix dir", dir = self.blockPath(blk.cid).parentDir - return false + return success() let path = self.blockPath(blk.cid) - if ( - let res = io2.writeFile(path, blk.data); - res.isErr): + if isFile(path): + return success() + + # If directory exists createPath wont fail + let dir = path.parentDir + if io2.createPath(dir).isErr: + trace "Unable to create block prefix dir", dir + return failure("Unable to create block prefix dir") + + let res = io2.writeFile(path, blk.data) + if res.isErr: let error = io2.ioErrorMsg(res.error) trace "Unable to store block", path, cid = blk.cid, error - return false + return failure("Unable to store block") - if not (await self.cache.putBlock(blk)): + if isErr (await self.cache.putBlock(blk)): trace "Unable to store block in cache", cid = blk.cid - return true + return success() -method delBlock*( - self: FSStore, - cid: Cid): Future[?!void] {.async.} = +method delBlock*(self: FSStore, cid: Cid): Future[?!void] {.async.} = ## Delete a block from the blockstore ## @@ -111,33 +117,35 @@ method delBlock*( res = io2.removeFile(path) if res.isErr: - let errmsg = io2.ioErrorMsg(res.error) - trace "Unable to delete block", path, cid, errmsg - return errmsg.failure + let error = io2.ioErrorMsg(res.error) + trace "Unable to delete block", path, cid, error + return error.failure return await self.cache.delBlock(cid) -method hasBlock*(self: FSStore, cid: Cid): bool = +method hasBlock*(self: FSStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore ## - trace "Checking for block existence", cid + trace "Checking filestore for block existence", cid if cid.isEmpty: trace "Empty block, ignoring" - return true + return true.success - self.blockPath(cid).isFile() + return self.blockPath(cid).isFile().success -method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} = - debug "Listing all blocks in store" +method listBlocks*(self: FSStore, onBlock: OnBlock): Future[?!void] {.async.} = + ## Get the list of blocks in the BlockStore. This is an intensive operation + ## + + trace "Listing all blocks in filestore" for (pkind, folderPath) in self.repoDir.walkDir(): if pkind != pcDir: continue - let baseName = basename(folderPath) - if baseName.len != self.postfixLen: continue + if len(folderPath.basename) != self.postfixLen: continue - for (fkind, filePath) in folderPath.walkDir(false): + for (fkind, filename) in folderPath.walkDir(relative = true): if fkind != pcFile: continue - let cid = Cid.init(basename(filePath)) + let cid = Cid.init(filename) if cid.isOk: # getting a weird `Error: unhandled exception: index 1 not in 0 .. 0 [IndexError]` # compilation error if using different syntax/construct bellow @@ -149,6 +157,8 @@ method listBlocks*(self: FSStore, onBlock: OnBlock) {.async.} = except CatchableError as exc: trace "Couldn't get block", cid = $(cid.get()) + return success() + proc new*( T: type FSStore, repoDir: string, diff --git a/codex/stores/networkstore.nim b/codex/stores/networkstore.nim index 5ce90d36..7d64a451 100644 --- a/codex/stores/networkstore.nim +++ b/codex/stores/networkstore.nim @@ -31,57 +31,57 @@ type engine*: BlockExcEngine # blockexc decision engine localStore*: BlockStore # local block store -method getBlock*( - self: NetworkStore, - cid: Cid): Future[?!bt.Block] {.async.} = +method getBlock*(self: NetworkStore, cid: Cid): Future[?! (? bt.Block)] {.async.} = ## Get a block from a remote peer ## - trace "Getting block", cid - without var blk =? (await self.localStore.getBlock(cid)): - trace "Couldn't get from local store", cid - try: - blk = await self.engine.requestBlock(cid) - except CatchableError as exc: - trace "Exception requesting block", cid, exc = exc.msg - return failure(exc.msg) + trace "Getting block from network store", cid - trace "Retrieved block from local store", cid - return blk.success + let blk = await self.localStore.getBlock(cid) + if blk.isErr: + return blk + if blk.get.isSome: + trace "Retrieved block from local store", cid + return blk -method putBlock*( - self: NetworkStore, - blk: bt.Block): Future[bool] {.async.} = - ## Store block locally and notify the - ## network + trace "Block not found in local store", cid + try: + # TODO: What if block isn't available in the engine too? + let blk = await self.engine.requestBlock(cid) + # TODO: add block to the local store + return blk.some.success + except CatchableError as exc: + trace "Exception requesting block", cid, exc = exc.msg + return failure(exc) + +method putBlock*(self: NetworkStore, blk: bt.Block): Future[?!void] {.async.} = + ## Store block locally and notify the network ## - trace "Puting block", cid = blk.cid + trace "Puting block into network store", cid = blk.cid - if not (await self.localStore.putBlock(blk)): - return false + let res = await self.localStore.putBlock(blk) + if res.isErr: + return res - self.engine.resolveBlocks(@[blk]) - return true + await self.engine.resolveBlocks(@[blk]) + return success() -method delBlock*( - self: NetworkStore, - cid: Cid): Future[?!void] = +method delBlock*(self: NetworkStore, cid: Cid): Future[?!void] = ## Delete a block from the blockstore ## - trace "Deleting block from networkstore", cid + trace "Deleting block from network store", cid return self.localStore.delBlock(cid) {.pop.} -method hasBlock*( - self: NetworkStore, - cid: Cid): bool = +method hasBlock*(self: NetworkStore, cid: Cid): Future[?!bool] {.async.} = ## Check if the block exists in the blockstore ## - self.localStore.hasBlock(cid) + trace "Checking network store for block existence", cid + return await self.localStore.hasBlock(cid) proc new*( T: type NetworkStore, diff --git a/codex/streams/storestream.nim b/codex/streams/storestream.nim index 83958af9..54959924 100644 --- a/codex/streams/storestream.nim +++ b/codex/streams/storestream.nim @@ -7,6 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. +import std/options + import pkg/upraises push: {.upraises: [].} @@ -52,36 +54,40 @@ proc `size=`*(self: StoreStream, size: int) {.error: "Setting the size is forbidden".} = discard +method atEof*(self: StoreStream): bool = + self.offset >= self.size + method readOnce*( self: StoreStream, pbytes: pointer, nbytes: int): Future[int] {.async.} = + ## Read `nbytes` from current position in the StoreStream into output buffer pointed by `pbytes`. + ## Return how many bytes were actually read before EOF was encountered. + ## Raise exception if we are already at EOF. + trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len if self.atEof: raise newLPStreamEOFError() - var - read = 0 - - trace "Reading from manifest", cid = self.manifest.cid.get(), blocks = self.manifest.len + # The loop iterates over blocks in the StoreStream, + # reading them and copying their data into outbuf + var read = 0 # Bytes read so far, and thus write offset in the outbuf while read < nbytes and not self.atEof: + # Compute from the current stream position `self.offset` the block num/offset to read + # Compute how many bytes to read from this block let - pos = self.offset div self.manifest.blockSize - blk = (await self.store.getBlock(self.manifest[pos])).tryGet() + blockNum = self.offset div self.manifest.blockSize + blockOffset = self.offset mod self.manifest.blockSize + readBytes = min(nbytes - read, self.manifest.blockSize - blockOffset) - blockOffset = - if self.offset >= self.manifest.blockSize: - self.offset mod self.manifest.blockSize - else: - self.offset + # Read contents of block `blockNum` + without blkOrNone =? await self.store.getBlock(self.manifest[blockNum]), error: + raise newLPStreamReadError(error) + without blk =? blkOrNone: + raise newLPStreamReadError("Block not found") + trace "Reading bytes from store stream", blockNum, cid = blk.cid, bytes = readBytes, blockOffset - readBytes = - if (nbytes - read) >= (self.manifest.blockSize - blockOffset): - self.manifest.blockSize - blockOffset - else: - min(nbytes - read, self.manifest.blockSize) - - trace "Reading bytes from store stream", pos, cid = blk.cid, bytes = readBytes, blockOffset = blockOffset + # Copy `readBytes` bytes starting at `blockOffset` from the block into the outbuf copyMem( pbytes.offset(read), if blk.isEmpty: @@ -90,14 +96,12 @@ method readOnce*( blk.data[blockOffset].addr, readBytes) + # Update current positions in the stream and outbuf self.offset += readBytes read += readBytes return read -method atEof*(self: StoreStream): bool = - self.offset >= self.manifest.len * self.manifest.blockSize - method closeImpl*(self: StoreStream) {.async.} = try: trace "Closing StoreStream" diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index e71b78b3..3e63e674 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -82,7 +82,7 @@ suite "Block Advertising and Discovery": blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - engine.resolveBlocks(blocks.filterIt( it.cid == cid )) + await engine.resolveBlocks(blocks.filterIt( it.cid == cid )) await allFuturesThrowing( allFinished(pendingBlocks)) diff --git a/tests/codex/blockexchange/engine/testblockexc.nim b/tests/codex/blockexchange/engine/testblockexc.nim index b8c5f0a9..7371bf66 100644 --- a/tests/codex/blockexchange/engine/testblockexc.nim +++ b/tests/codex/blockexchange/engine/testblockexc.nim @@ -106,7 +106,7 @@ suite "NetworkStore engine - 2 nodes": test "Should send want-have for block": let blk = bt.Block.new("Block 1".toBytes).tryGet() - check await nodeCmps2.localStore.putBlock(blk) + (await nodeCmps2.localStore.putBlock(blk)).tryGet() let entry = Entry( `block`: blk.cid.data.buffer, @@ -121,12 +121,12 @@ suite "NetworkStore engine - 2 nodes": .taskQueue .pushOrUpdateNoWait(peerCtx1).isOk - check eventually nodeCmps1.localStore.hasBlock(blk.cid) + check eventually (await nodeCmps1.localStore.hasBlock(blk.cid)).tryGet() test "Should get blocks from remote": let blocks = await allFinished( blocks2.mapIt( nodeCmps1.networkStore.getBlock(it.cid) )) - check blocks.mapIt( !it.read ) == blocks2 + check blocks.mapIt( it.read().tryGet().get() ) == blocks2 test "Remote should send blocks when available": let blk = bt.Block.new("Block 1".toBytes).tryGet() @@ -137,7 +137,7 @@ suite "NetworkStore engine - 2 nodes": # second trigger blockexc to resolve any pending requests # for the block - check await nodeCmps2.networkStore.putBlock(blk) + (await nodeCmps2.networkStore.putBlock(blk)).tryGet() # should succeed retrieving block from remote check await nodeCmps1.networkStore.getBlock(blk.cid) @@ -196,14 +196,8 @@ suite "NetworkStore - multiple nodes": pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) - await allFutures( - blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) )) - await allFutures( - blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) )) - await allFutures( - blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) )) - await allFutures( - blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) )) + for i in 0..15: + (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() await connectNodes(switch) await sleepAsync(1.seconds) @@ -232,14 +226,8 @@ suite "NetworkStore - multiple nodes": pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) - await allFutures( - blocks[0..3].mapIt( networkStore[0].engine.localStore.putBlock(it) )) - await allFutures( - blocks[4..7].mapIt( networkStore[1].engine.localStore.putBlock(it) )) - await allFutures( - blocks[8..11].mapIt( networkStore[2].engine.localStore.putBlock(it) )) - await allFutures( - blocks[12..15].mapIt( networkStore[3].engine.localStore.putBlock(it) )) + for i in 0..15: + (await networkStore[i div 4].engine.localStore.putBlock(blocks[i])).tryGet() await connectNodes(switch) await sleepAsync(1.seconds) diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index e6193e70..5fa2b803 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -227,8 +227,8 @@ suite "NetworkStore engine handlers": sendPresence: sendPresence )) - check await engine.localStore.putBlock(blocks[0]) - check await engine.localStore.putBlock(blocks[1]) + (await engine.localStore.putBlock(blocks[0])).tryGet() + (await engine.localStore.putBlock(blocks[1])).tryGet() await engine.wantListHandler(peerId, wantList) await done @@ -242,7 +242,8 @@ suite "NetworkStore engine handlers": let resolved = await allFinished(pending) check resolved.mapIt( it.read ) == blocks for b in blocks: - check engine.localStore.hasBlock(b.cid) + let present = await engine.localStore.hasBlock(b.cid) + check present.tryGet() test "Should send payments for received blocks": let account = Account(address: EthAddress.example) @@ -355,7 +356,7 @@ suite "Task Handler": blks[0].cid == blocks[1].cid for blk in blocks: - check await engine.localStore.putBlock(blk) + (await engine.localStore.putBlock(blk)).tryGet() engine.network.request.sendBlocks = sendBlocks # second block to send by priority @@ -393,7 +394,7 @@ suite "Task Handler": ] for blk in blocks: - check await engine.localStore.putBlock(blk) + (await engine.localStore.putBlock(blk)).tryGet() engine.network.request.sendPresence = sendPresence # have block diff --git a/tests/codex/helpers.nim b/tests/codex/helpers.nim index 7dc22877..0b92f8df 100644 --- a/tests/codex/helpers.nim +++ b/tests/codex/helpers.nim @@ -1,3 +1,5 @@ +import std/options + import pkg/chronos import pkg/libp2p import pkg/libp2p/varint @@ -17,7 +19,7 @@ export randomchunker, nodeutils, mockdiscovery, eventually # is changed here, because blocks are now `ref` # types. This is only in tests!!! func `==`*(a, b: bt.Block): bool = - (a.cid == b.cid) and (a.data == b. data) + (a.cid == b.cid) and (a.data == b.data) proc lenPrefix*(msg: openArray[byte]): seq[byte] = ## Write `msg` with a varint-encoded length prefix @@ -45,7 +47,7 @@ proc corruptBlocks*( pos.add(i) var - blk = (await store.getBlock(manifest[i])).tryGet() + blk = (await store.getBlock(manifest[i])).tryGet().get() bytePos: seq[int] while true: diff --git a/tests/codex/storageproofs/testnetwork.nim b/tests/codex/storageproofs/testnetwork.nim index 10357ba1..f3e67954 100644 --- a/tests/codex/storageproofs/testnetwork.nim +++ b/tests/codex/storageproofs/testnetwork.nim @@ -65,12 +65,9 @@ suite "Storage Proofs Network": let chunk = await chunker.getBytes(); chunk.len > 0): - let - blk = bt.Block.new(chunk).tryGet() - + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) - if not (await store.putBlock(blk)): - raise newException(CatchableError, "Unable to store block " & $blk.cid) + (await store.putBlock(blk)).tryGet() cid = manifest.cid.tryGet() por = await PoR.init( diff --git a/tests/codex/storageproofs/testpor.nim b/tests/codex/storageproofs/testpor.nim index 5b9c8481..b58e9590 100644 --- a/tests/codex/storageproofs/testpor.nim +++ b/tests/codex/storageproofs/testpor.nim @@ -36,12 +36,9 @@ suite "BLS PoR": let chunk = await chunker.getBytes(); chunk.len > 0): - let - blk = bt.Block.new(chunk).tryGet() - + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) - if not (await store.putBlock(blk)): - raise newException(CatchableError, "Unable to store block " & $blk.cid) + (await store.putBlock(blk)).tryGet() test "Test PoR without corruption": let @@ -97,12 +94,9 @@ suite "Test Serialization": let chunk = await chunker.getBytes(); chunk.len > 0): - let - blk = bt.Block.new(chunk).tryGet() - + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) - if not (await store.putBlock(blk)): - raise newException(CatchableError, "Unable to store block " & $blk.cid) + (await store.putBlock(blk)).tryGet() (spk, ssk) = st.keyGen() por = await PoR.init( diff --git a/tests/codex/storageproofs/teststpstore.nim b/tests/codex/storageproofs/teststpstore.nim index 545ff68f..bca3739a 100644 --- a/tests/codex/storageproofs/teststpstore.nim +++ b/tests/codex/storageproofs/teststpstore.nim @@ -44,12 +44,9 @@ suite "Test PoR store": let chunk = await chunker.getBytes(); chunk.len > 0): - let - blk = bt.Block.new(chunk).tryGet() - + let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) - if not (await store.putBlock(blk)): - raise newException(CatchableError, "Unable to store block " & $blk.cid) + (await store.putBlock(blk)).tryGet() cid = manifest.cid.tryGet() por = await PoR.init( diff --git a/tests/codex/stores/testcachestore.nim b/tests/codex/stores/testcachestore.nim index 0a0d2fc6..000ebf58 100644 --- a/tests/codex/stores/testcachestore.nim +++ b/tests/codex/stores/testcachestore.nim @@ -1,4 +1,5 @@ import std/strutils +import std/options import pkg/chronos import pkg/asynctest @@ -48,13 +49,13 @@ suite "Cache Store tests": test "putBlock": - check: - await store.putBlock(newBlock1) - newBlock1.cid in store + (await store.putBlock(newBlock1)).tryGet() + check (await store.hasBlock(newBlock1.cid)).tryGet() # block size bigger than entire cache store = CacheStore.new(cacheSize = 99, chunkSize = 98) - check not await store.putBlock(newBlock1) + (await store.putBlock(newBlock1)).tryGet() + check not (await store.hasBlock(newBlock1.cid)).tryGet() # block being added causes removal of LRU block store = CacheStore.new( @@ -62,60 +63,63 @@ suite "Cache Store tests": cacheSize = 200, chunkSize = 1) check: - not store.hasBlock(newBlock1.cid) - store.hasBlock(newBlock2.cid) - store.hasBlock(newBlock3.cid) + not (await store.hasBlock(newBlock1.cid)).tryGet() + (await store.hasBlock(newBlock2.cid)).tryGet() + (await store.hasBlock(newBlock2.cid)).tryGet() store.currentSize == newBlock2.data.len + newBlock3.data.len # 200 test "getBlock": store = CacheStore.new(@[newBlock]) let blk = await store.getBlock(newBlock.cid) - - check: - blk.isOk - blk.get == newBlock + check blk.tryGet().get() == newBlock test "fail getBlock": let blk = await store.getBlock(newBlock.cid) - - check: - blk.isErr - blk.error of system.KeyError + check blk.tryGet().isNone() test "hasBlock": let store = CacheStore.new(@[newBlock]) - - check store.hasBlock(newBlock.cid) + check: + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store test "fail hasBlock": - check not store.hasBlock(newBlock.cid) + check: + not (await store.hasBlock(newBlock.cid)).tryGet() + not (await newBlock.cid in store) test "delBlock": # empty cache (await store.delBlock(newBlock1.cid)).tryGet() + check not (await store.hasBlock(newBlock1.cid)).tryGet() + + (await store.putBlock(newBlock1)).tryGet() + check (await store.hasBlock(newBlock1.cid)).tryGet() # successfully deleted - discard await store.putBlock(newBlock1) (await store.delBlock(newBlock1.cid)).tryGet() + check not (await store.hasBlock(newBlock1.cid)).tryGet() # deletes item should decrement size store = CacheStore.new(@[newBlock1, newBlock2, newBlock3]) check: store.currentSize == 300 + (await store.delBlock(newBlock2.cid)).tryGet() + check: store.currentSize == 200 - newBlock2.cid notin store + not (await store.hasBlock(newBlock2.cid)).tryGet() test "listBlocks": - discard await store.putBlock(newBlock1) + (await store.putBlock(newBlock1)).tryGet() var listed = false - await store.listBlocks( + (await store.listBlocks( proc(cid: Cid) {.gcsafe, async.} = - check cid in store + check (await store.hasBlock(cid)).tryGet() listed = true - ) + )).tryGet() check listed diff --git a/tests/codex/stores/testfsstore.nim b/tests/codex/stores/testfsstore.nim index 4089623e..2ec7b165 100644 --- a/tests/codex/stores/testfsstore.nim +++ b/tests/codex/stores/testfsstore.nim @@ -1,4 +1,5 @@ import std/os +import std/options import pkg/questionable import pkg/questionable/results @@ -33,36 +34,43 @@ suite "FS Store": removeDir(repoDir) test "putBlock": - check await store.putBlock(newBlock) - check fileExists(store.blockPath(newBlock.cid)) - check newBlock.cid in store + (await store.putBlock(newBlock)).tryGet() + check: + fileExists(store.blockPath(newBlock.cid)) + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store test "getBlock": createDir(store.blockPath(newBlock.cid).parentDir) writeFile(store.blockPath(newBlock.cid), newBlock.data) let blk = await store.getBlock(newBlock.cid) - check blk.option == newBlock.some + check blk.tryGet().get() == newBlock test "fail getBlock": let blk = await store.getBlock(newBlock.cid) - check blk.isErr + check blk.tryGet().isNone test "hasBlock": createDir(store.blockPath(newBlock.cid).parentDir) writeFile(store.blockPath(newBlock.cid), newBlock.data) - check store.hasBlock(newBlock.cid) + check: + (await store.hasBlock(newBlock.cid)).tryGet() + await newBlock.cid in store + + test "fail hasBlock": + check: + not (await store.hasBlock(newBlock.cid)).tryGet() + not (await newBlock.cid in store) test "listBlocks": createDir(store.blockPath(newBlock.cid).parentDir) writeFile(store.blockPath(newBlock.cid), newBlock.data) - await store.listBlocks( + (await store.listBlocks( proc(cid: Cid) {.gcsafe, async.} = - check cid == newBlock.cid) - - test "fail hasBlock": - check not store.hasBlock(newBlock.cid) + check cid == newBlock.cid + )).tryGet() test "delBlock": createDir(store.blockPath(newBlock.cid).parentDir) diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 26e22475..4ece1823 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -37,7 +37,7 @@ suite "Erasure encode/decode": let blk = bt.Block.new(chunk).tryGet() manifest.add(blk.cid) - check (await store.putBlock(blk)) + (await store.putBlock(blk)).tryGet() proc encode(buffers, parity: int): Future[Manifest] {.async.} = let @@ -78,7 +78,8 @@ suite "Erasure encode/decode": decoded.len == encoded.originalLen for d in dropped: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "Should not tolerate loosing more than M data blocks in a single random column": const @@ -103,7 +104,8 @@ suite "Erasure encode/decode": decoded = (await erasure.decode(encoded)).tryGet() for d in dropped: - check d notin store + let present = await store.hasBlock(d) + check not present.tryGet() test "Should tolerate loosing M data blocks in M random columns": const @@ -130,7 +132,8 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() for d in manifest: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "Should not tolerate loosing more than M data blocks in M random columns": const @@ -179,7 +182,8 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() for d in manifest: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "Should tolerate loosing M (a.k.a row) contiguous parity blocks": const @@ -194,7 +198,8 @@ suite "Erasure encode/decode": discard (await erasure.decode(encoded)).tryGet() for d in manifest: - check d in store + let present = await store.hasBlock(d) + check present.tryGet() test "handles edge case of 0 parity blocks": const diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 98cbff01..71947cd8 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -85,10 +85,10 @@ suite "Test Node": manifestCid = (await storeFut).tryGet() check: - manifestCid in localStore + (await localStore.hasBlock(manifestCid)).tryGet() var - manifestBlock = (await localStore.getBlock(manifestCid)).tryGet() + manifestBlock = (await localStore.getBlock(manifestCid)).tryGet().get() localManifest = Manifest.decode(manifestBlock.data).tryGet() check: @@ -104,20 +104,18 @@ suite "Test Node": let chunk = await chunker.getBytes(); chunk.len > 0): - let - blk = bt.Block.new(chunk).tryGet() - + let blk = bt.Block.new(chunk).tryGet() original &= chunk - check await localStore.putBlock(blk) + (await localStore.putBlock(blk)).tryGet() manifest.add(blk.cid) let manifestBlock = bt.Block.new( - manifest.encode().tryGet(), - codec = DagPBCodec) - .tryGet() + manifest.encode().tryGet(), + codec = DagPBCodec + ).tryGet() - check await localStore.putBlock(manifestBlock) + (await localStore.putBlock(manifestBlock)).tryGet() let stream = (await node.retrieve(manifestBlock.cid)).tryGet() var data: seq[byte] @@ -125,9 +123,7 @@ suite "Test Node": var buf = newSeq[byte](BlockSize) res = await stream.readOnce(addr buf[0], BlockSize div 2) - buf.setLen(res) - data &= buf check data == original @@ -137,7 +133,7 @@ suite "Test Node": testString = "Block 1" blk = bt.Block.new(testString.toBytes).tryGet() - check (await localStore.putBlock(blk)) + (await localStore.putBlock(blk)).tryGet() let stream = (await node.retrieve(blk.cid)).tryGet() var data = newSeq[byte](testString.len) diff --git a/tests/codex/teststorestream.nim b/tests/codex/teststorestream.nim index 0a82785e..45311f31 100644 --- a/tests/codex/teststorestream.nim +++ b/tests/codex/teststorestream.nim @@ -16,6 +16,13 @@ suite "StoreStream": store: BlockStore stream: StoreStream + # Check that `buf` contains `size` bytes with values start, start+1... + proc sequential_bytes(buf: seq[byte], size: int, start: int): bool = + for i in 0.. blockSize": var buf = newSeq[byte](11) + n = 0 while not stream.atEof: - let - read = (await stream.readOnce(addr buf[0], buf.len)) + let read = (await stream.readOnce(addr buf[0], buf.len)) - if stream.atEof.not: + if not stream.atEof: check read == 11 else: check read == 1 + check sequential_bytes(buf,read,n) + n += read + test "Read exact bytes within block boundary": var buf = newSeq[byte](5) await stream.readExactly(addr buf[0], 5) - check buf == [byte 0, 1, 2, 3, 4] + check sequential_bytes(buf,5,0) test "Read exact bytes outside of block boundary": var buf = newSeq[byte](15) await stream.readExactly(addr buf[0], 15) - check buf == [byte 0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14] + check sequential_bytes(buf,15,0)