diff --git a/dagger/blockexchange/network.nim b/dagger/blockexchange/network.nim index e046e2bd..19830c5c 100644 --- a/dagger/blockexchange/network.nim +++ b/dagger/blockexchange/network.nim @@ -131,7 +131,7 @@ proc broadcastWantList*( proc handleBlocks( b: BlockExcNetwork, peer: NetworkPeer, - blocks: seq[auto]): Future[void] = + blocks: seq[pb.Block]): Future[void] = ## Handle incoming blocks ## @@ -141,13 +141,14 @@ proc handleBlocks( trace "Handling blocks for peer", peer = peer.id var blks: seq[bt.Block] - for blk in blocks: - when blk is pb.Block: - blks.add(bt.Block.init(Cid.init(blk.prefix).get(), blk.data)) - elif blk is seq[byte]: - blks.add(bt.Block.init(Cid.init(blk).get(), blk)) - else: - error("Invalid block type") + for blob in blocks: + without cid =? Cid.init(blob.prefix): + trace "Unable to initialize Cid from protobuf message" + + without blk =? bt.Block.init(cid, blob.data, verify = true): + trace "Unable to initialize Block from data" + + blks.add(blk) b.handlers.onBlocks(peer.id, blks) diff --git a/dagger/blocktype.nim b/dagger/blocktype.nim index 1d53e967..6cf0c240 100644 --- a/dagger/blocktype.nim +++ b/dagger/blocktype.nim @@ -11,6 +11,10 @@ import pkg/libp2p import pkg/stew/byteutils +import pkg/questionable +import pkg/questionable/results + +import ./errors type Block* = object of RootObj @@ -25,18 +29,32 @@ func init*( T: type Block, data: openArray[byte] = [], version = CIDv1, - hcodec = multiCodec("sha2-256"), - codec = multiCodec("raw")): T = - let hash = MultiHash.digest($hcodec, data).get() - Block( - cid: Cid.init(version, codec, hash).get(), + mcodec = multiCodec("sha2-256"), + codec = multiCodec("raw")): ?!T = + + let + hash = ? MultiHash.digest($mcodec, data).mapFailure + cid = ? Cid.init(version, codec, hash).mapFailure + + success Block( + cid: cid, data: @data) func init*( T: type Block, cid: Cid, - data: openArray[byte] = [], - verify: bool = false): T = - Block( - cid: cid, - data: @data) + data: openArray[byte], + verify: bool = false): ?!T = + + let + mhash = ? cid.mhash.mapFailure + b = ? Block.init( + data = @data, + version = cid.cidver, + codec = cid.mcodec, + mcodec = mhash.mcodec) + + if verify and cid != b.cid: + return failure "Cid's don't match!" + + success b diff --git a/dagger/node.nim b/dagger/node.nim index 2877de65..de397e11 100644 --- a/dagger/node.nim +++ b/dagger/node.nim @@ -133,8 +133,8 @@ proc store*( chunk.len > 0): trace "Got data from stream", len = chunk.len - let - blk = bt.Block.init(chunk) + without blk =? bt.Block.init(chunk): + return failure("Unable to init block from chunk!") blockManifest.put(blk.cid) if not (await node.blockStore.putBlock(blk)): @@ -154,7 +154,10 @@ proc store*( newException(DaggerError, "Could not generate dataset manifest!")) # Store as a dag-pb block - let manifest = bt.Block.init(data = data, codec = ManifestCodec) + without manifest =? bt.Block.init(data = data, codec = ManifestCodec): + trace "Unable to init block from manifest data!" + return failure("Unable to init block from manifest data!") + if not (await node.blockStore.putBlock(manifest)): trace "Unable to store manifest", cid = manifest.cid return failure("Unable to store manifest " & $manifest.cid) @@ -165,8 +168,8 @@ proc store*( return failure(cid.error.msg) trace "Stored data", manifestCid = manifest.cid, - contentCid = !cid, - blocks = blockManifest.len + contentCid = !cid, + blocks = blockManifest.len return manifest.cid.success diff --git a/dagger/stores/fsstore.nim b/dagger/stores/fsstore.nim index 4c3d7651..b7b0f770 100644 --- a/dagger/stores/fsstore.nim +++ b/dagger/stores/fsstore.nim @@ -54,7 +54,7 @@ method getBlock*( trace "Cannot read file from fs store", path , error return Block.failure("Cannot read file from fs store") - return Block.init(cid, data).success + return Block.init(cid, data) method putBlock*( self: FSStore, diff --git a/tests/dagger/blockexc/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim index 855dbb6b..f57ddb1f 100644 --- a/tests/dagger/blockexc/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -41,14 +41,14 @@ suite "NetworkStore engine - 2 nodes": if chunk.len <= 0: break - blocks1.add(bt.Block.init(chunk)) + blocks1.add(bt.Block.init(chunk).get()) while true: let chunk = await chunker2.getBytes() if chunk.len <= 0: break - blocks2.add(bt.Block.init(chunk)) + blocks2.add(bt.Block.init(chunk).get()) switch1 = newStandardSwitch() switch2 = newStandardSwitch() @@ -119,7 +119,7 @@ suite "NetworkStore engine - 2 nodes": check peerCtx2.account.?address == pricing2.address.some test "should send want-have for block": - let blk = bt.Block.init("Block 1".toBytes) + let blk = bt.Block.init("Block 1".toBytes).get() check await blockexc2.engine.localStore.putBlock(blk) let entry = Entry( @@ -144,7 +144,7 @@ suite "NetworkStore engine - 2 nodes": check blocks.mapIt( !it.read ) == blocks2 test "remote should send blocks when available": - let blk = bt.Block.init("Block 1".toBytes) + let blk = bt.Block.init("Block 1".toBytes).get() # should fail retrieving block from remote check not await blockexc1.getBlock(blk.cid) @@ -183,7 +183,7 @@ suite "NetworkStore - multiple nodes": if chunk.len <= 0: break - blocks.add(bt.Block.init(chunk)) + blocks.add(bt.Block.init(chunk).get()) for e in generateNodes(5): switch.add(e.switch) diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index 1f743d52..43d7bffd 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -20,8 +20,8 @@ import ../examples suite "NetworkStore engine basic": let rng = Rng.instance() - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() + seckey = PrivateKey.random(rng[]).get() + peerId = PeerID.init(seckey.getPublicKey().get()).get() chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) wallet = WalletRef.example @@ -35,7 +35,7 @@ suite "NetworkStore engine basic": if chunk.len <= 0: break - blocks.add(bt.Block.init(chunk)) + blocks.add(bt.Block.init(chunk).get()) done = newFuture[void]() @@ -87,8 +87,8 @@ suite "NetworkStore engine basic": suite "NetworkStore engine handlers": let rng = Rng.instance() - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() + seckey = PrivateKey.random(rng[]).get() + peerId = PeerID.init(seckey.getPublicKey().get()).get() chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) wallet = WalletRef.example @@ -104,7 +104,7 @@ suite "NetworkStore engine handlers": if chunk.len <= 0: break - blocks.add(bt.Block.init(chunk)) + blocks.add(bt.Block.init(chunk).get()) done = newFuture[void]() engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) @@ -228,15 +228,15 @@ suite "Task Handler": if chunk.len <= 0: break - blocks.add(bt.Block.init(chunk)) + blocks.add(bt.Block.init(chunk).get()) done = newFuture[void]() engine = BlockExcEngine.new(MemoryStore.new(), wallet, BlockExcNetwork()) peersCtx = @[] for i in 0..3: - let seckey = PrivateKey.random(rng[]).tryGet() - peers.add(PeerID.init(seckey.getPublicKey().tryGet()).tryGet()) + let seckey = PrivateKey.random(rng[]).get() + peers.add(PeerID.init(seckey.getPublicKey().get()).get()) peersCtx.add(BlockExcPeerCtx( id: peers[i] @@ -282,7 +282,7 @@ suite "Task Handler": test "Should send presence": let present = blocks - let missing = @[bt.Block.init("missing".toBytes)] + let missing = @[bt.Block.init("missing".toBytes).get()] let price = (!engine.pricing).price proc sendPresence(id: PeerID, presence: seq[BlockPresence]) = diff --git a/tests/dagger/blockexc/testnetwork.nim b/tests/dagger/blockexc/testnetwork.nim index 6a2e9cbc..a4f94e8b 100644 --- a/tests/dagger/blockexc/testnetwork.nim +++ b/tests/dagger/blockexc/testnetwork.nim @@ -20,8 +20,8 @@ import ../examples suite "NetworkStore network": let rng = Rng.instance() - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerID.init(seckey.getPublicKey().tryGet()).tryGet() + seckey = PrivateKey.random(rng[]).get() + peerId = PeerID.init(seckey.getPublicKey().get()).get() chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256) var @@ -40,7 +40,7 @@ suite "NetworkStore network": if chunk.len <= 0: break - blocks.add(bt.Block.init(chunk)) + blocks.add(bt.Block.init(chunk).get()) done = newFuture[void]() buffer = BufferStream.new() @@ -156,7 +156,7 @@ suite "NetworkStore Network - e2e": if chunk.len <= 0: break - blocks.add(bt.Block.init(chunk)) + blocks.add(bt.Block.init(chunk).get()) done = newFuture[void]() switch1 = newStandardSwitch() diff --git a/tests/dagger/examples.nim b/tests/dagger/examples.nim index ef647b20..fceb40a4 100644 --- a/tests/dagger/examples.nim +++ b/tests/dagger/examples.nim @@ -42,7 +42,7 @@ proc example*(_: type Pricing): Pricing = proc example*(_: type Block): Block = let length = rand(4096) let bytes = newSeqWith(length, rand(uint8)) - Block.init(bytes) + Block.init(bytes).tryGet proc example*(_: type PeerId): PeerID = let key = PrivateKey.random(Rng.instance[]).get diff --git a/tests/dagger/stores/testfsstore.nim b/tests/dagger/stores/testfsstore.nim index 9bcc3f5e..c4a27a14 100644 --- a/tests/dagger/stores/testfsstore.nim +++ b/tests/dagger/stores/testfsstore.nim @@ -23,7 +23,7 @@ suite "FS Store": var store: FSStore repoDir: string - newBlock = Block.init("New Block".toBytes()) + newBlock = Block.init("New Block".toBytes()).get() setup: repoDir = path.parentDir / "repo" diff --git a/tests/dagger/stores/testmemorystore.nim b/tests/dagger/stores/testmemorystore.nim index d84aa802..36fd8fed 100644 --- a/tests/dagger/stores/testmemorystore.nim +++ b/tests/dagger/stores/testmemorystore.nim @@ -15,7 +15,7 @@ import ../helpers suite "Memory Store tests": test "putBlock": let - newBlock = Block.init("New Block".toBytes()) + newBlock = Block.init("New Block".toBytes()).get() store = MemoryStore.new() check await store.putBlock(newBlock) @@ -23,7 +23,7 @@ suite "Memory Store tests": test "getBlock": let - newBlock = Block.init("New Block".toBytes()) + newBlock = Block.init("New Block".toBytes()).get() store = MemoryStore.new(@[newBlock]) let blk = await store.getBlock(newBlock.cid) @@ -32,7 +32,7 @@ suite "Memory Store tests": test "fail getBlock": let - newBlock = Block.init("New Block".toBytes()) + newBlock = Block.init("New Block".toBytes()).get() store = MemoryStore.new(@[]) let blk = await store.getBlock(newBlock.cid) @@ -40,21 +40,21 @@ suite "Memory Store tests": test "hasBlock": let - newBlock = Block.init("New Block".toBytes()) + newBlock = Block.init("New Block".toBytes()).get() store = MemoryStore.new(@[newBlock]) check store.hasBlock(newBlock.cid) test "fail hasBlock": let - newBlock = Block.init("New Block".toBytes()) + newBlock = Block.init("New Block".toBytes()).get() store = MemoryStore.new(@[]) check not store.hasBlock(newBlock.cid) test "delBlock": let - newBlock = Block.init("New Block".toBytes()) + newBlock = Block.init("New Block".toBytes()).get() store = MemoryStore.new(@[newBlock]) check await store.delBlock(newBlock.cid) diff --git a/tests/dagger/testmanifest.nim b/tests/dagger/testmanifest.nim index b8735d82..d2f4cf33 100644 --- a/tests/dagger/testmanifest.nim +++ b/tests/dagger/testmanifest.nim @@ -17,13 +17,13 @@ suite "Manifest": test "Should produce valid tree hash checksum": without var manifest =? BlocksManifest.init( blocks = @[ - Block.init("Block 1".toBytes).cid, - Block.init("Block 2".toBytes).cid, - Block.init("Block 3".toBytes).cid, - Block.init("Block 4".toBytes).cid, - Block.init("Block 5".toBytes).cid, - Block.init("Block 6".toBytes).cid, - Block.init("Block 7".toBytes).cid, + Block.init("Block 1".toBytes).get().cid, + Block.init("Block 2".toBytes).get().cid, + Block.init("Block 3".toBytes).get().cid, + Block.init("Block 4".toBytes).get().cid, + Block.init("Block 5".toBytes).get().cid, + Block.init("Block 6".toBytes).get().cid, + Block.init("Block 7".toBytes).get().cid, ]): fail() @@ -42,7 +42,9 @@ suite "Manifest": test "Should encode/decode to/from manifest": let - blocks = (0..<1000).mapIt( Block.init(("Block " & $it).toBytes).cid ) + blocks = (0..<1000).mapIt( + Block.init(("Block " & $it).toBytes).get().cid + ) var manifest = BlocksManifest.init(blocks).get() diff --git a/tests/dagger/testnode.nim b/tests/dagger/testnode.nim index eb3e40e7..59e00d85 100644 --- a/tests/dagger/testnode.nim +++ b/tests/dagger/testnode.nim @@ -56,27 +56,27 @@ suite "Test Node": storeFut = node.store(stream) var - manifest = BlocksManifest.init().tryGet() + manifest = BlocksManifest.init().get() try: while ( let chunk = await chunker.getBytes(); chunk.len > 0): await stream.pushData(chunk) - manifest.put(bt.Block.init(chunk).cid) + manifest.put(bt.Block.init(chunk).get().cid) finally: await stream.pushEof() await stream.close() let - manifestCid = (await storeFut).tryGet() + manifestCid = (await storeFut).get() check: manifestCid in localStore var manifestBlock = (await localStore.getBlock(manifestCid)).get() - localManifest = BlocksManifest.init(manifestBlock).tryGet() + localManifest = BlocksManifest.init(manifestBlock).get() check: manifest.len == localManifest.len @@ -84,7 +84,7 @@ suite "Test Node": test "Retrieve Data Stream": var - manifest = BlocksManifest.init().tryGet() + manifest = BlocksManifest.init().get() original: seq[byte] while ( @@ -92,7 +92,7 @@ suite "Test Node": chunk.len > 0): let - blk = bt.Block.init(chunk) + blk = bt.Block.init(chunk).get() original &= chunk check await localStore.putBlock(blk) @@ -100,8 +100,8 @@ suite "Test Node": let manifestBlock = bt.Block.init( - manifest.encode().tryGet(), - codec = ManifestCodec) + manifest.encode().get(), + codec = ManifestCodec).get() check await localStore.putBlock(manifestBlock) @@ -125,7 +125,7 @@ suite "Test Node": test "Retrieve One Block": let testString = "Block 1" - blk = bt.Block.init(testString.toBytes) + blk = bt.Block.init(testString.toBytes).get() var stream = BufferStream.new()