diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 1c11fbde..e42851c5 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -349,21 +349,24 @@ proc blocksDeliveryHandler*( var validatedBlocksDelivery: seq[BlockDelivery] for bd in blocksDelivery: + logScope: + peer = peer + address = bd.address if err =? b.validateBlockDelivery(bd).errorOption: - warn "Block validation failed", address = bd.address, msg = err.msg + warn "Block validation failed", msg = err.msg continue if err =? (await b.localStore.putBlock(bd.blk)).errorOption: - error "Unable to store block", address = bd.address, err = err.msg + error "Unable to store block", err = err.msg continue if bd.address.leaf: without proof =? bd.proof: - error "Proof expected for a leaf block delivery", address = bd.address + error "Proof expected for a leaf block delivery" continue if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption: - error "Unable to store proof and cid for a block", address = bd.address + error "Unable to store proof and cid for a block" continue validatedBlocksDelivery.add(bd) @@ -398,11 +401,11 @@ proc wantListHandler*( logScope: peer = peerCtx.id - # cid = e.cid + address = e.address wantType = $e.wantType if idx < 0: # updating entry - trace "Processing new want list entry", address = e.address + trace "Processing new want list entry" let have = await e.address in b.localStore @@ -414,21 +417,21 @@ proc wantListHandler*( codex_block_exchange_want_have_lists_received.inc() if not have and e.sendDontHave: - trace "Adding dont have entry to presence response", address = e.address + trace "Adding dont have entry to presence response" presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.DontHave, price: price)) elif have and e.wantType == WantType.WantHave: - trace "Adding have entry to presence response", address = e.address + trace "Adding have entry to presence response" presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.Have, price: price)) elif e.wantType == WantType.WantBlock: - trace "Added entry to peer's want blocks list", address = e.address + trace "Added entry to peer's want blocks list" peerCtx.peerWants.add(e) codex_block_exchange_want_block_lists_received.inc() else: diff --git a/codex/blocktype.nim b/codex/blocktype.nim index 8666766b..a26a3157 100644 --- a/codex/blocktype.nim +++ b/codex/blocktype.nim @@ -96,7 +96,7 @@ func new*( codec = multiCodec("raw") ): ?!Block = ## creates a new block for both storage and network IO - ## + ## let hash = ? MultiHash.digest($mcodec, data).mapFailure @@ -132,7 +132,7 @@ func new*( proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!Cid = ## Returns cid representing empty content, given cid version, hash codec and data codec - ## + ## const Sha256 = multiCodec("sha2-256") @@ -140,17 +140,17 @@ proc emptyCid*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!C DagPB = multiCodec("dag-pb") DagJson = multiCodec("dag-json") - var index {.global, threadvar.}: Table[(CIDv0, Sha256, DagPB), Result[Cid, CidError]] + var index {.global, threadvar.}: Table[(CidVersion, MultiCodec, MultiCodec), Cid] once: index = { # source https://ipld.io/specs/codecs/dag-pb/fixtures/cross-codec/#dagpb_empty - (CIDv0, Sha256, DagPB): Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n"), - (CIDv1, Sha256, DagPB): Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi"), # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku - (CIDv1, Sha256, DagJson): Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP"), # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta - (CIDv1, Sha256, Raw): Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW"), + (CIDv0, Sha256, DagPB): ? Cid.init("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n").mapFailure, + (CIDv1, Sha256, DagPB): ? Cid.init("zdj7Wkkhxcu2rsiN6GUyHCLsSLL47kdUNfjbFqBUUhMFTZKBi").mapFailure, # base36: bafybeihdwdcefgh4dqkjv67uzcmw7ojee6xedzdetojuzjevtenxquvyku + (CIDv1, Sha256, DagJson): ? Cid.init("z4EBG9jGUWMVxX9deANWX7iPyExLswe2akyF7xkNAaYgugvnhmP").mapFailure, # base36: baguqeera6mfu3g6n722vx7dbitpnbiyqnwah4ddy4b5c3rwzxc5pntqcupta + (CIDv1, Sha256, Raw): ? Cid.init("zb2rhmy65F3REf8SZp7De11gxtECBGgUKaLdiDj7MCGCHxbDW").mapFailure, }.toTable - index[(version, hcodec, dcodec)].catch.flatMap((a: Result[Cid, CidError]) => a.mapFailure) + index[(version, hcodec, dcodec)].catch proc emptyDigest*(version: CidVersion, hcodec: MultiCodec, dcodec: MultiCodec): ?!MultiHash = emptyCid(version, hcodec, dcodec) @@ -161,11 +161,11 @@ proc emptyBlock*(version: CidVersion, hcodec: MultiCodec): ?!Block = .flatMap((cid: Cid) => Block.new(cid = cid, data = @[])) proc emptyBlock*(cid: Cid): ?!Block = - cid.mhash.mapFailure.flatMap((mhash: MultiHash) => + cid.mhash.mapFailure.flatMap((mhash: MultiHash) => emptyBlock(cid.cidver, mhash.mcodec)) proc isEmpty*(cid: Cid): bool = - success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) => + success(cid) == cid.mhash.mapFailure.flatMap((mhash: MultiHash) => emptyCid(cid.cidver, mhash.mcodec, cid.mcodec)) proc isEmpty*(blk: Block): bool = diff --git a/codex/codex.nim b/codex/codex.nim index 7ab56c3a..bc0d53ff 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -231,8 +231,6 @@ proc new*( wallet = WalletRef.new(EthPrivateKey.random()) network = BlockExcNetwork.new(switch) - treeReader = TreeReader.new() - repoData = case config.repoKind of repoFS: Datastore(FSDatastore.new($config.dataDir, depth = 5) .expect("Should create repo file data store!")) @@ -243,7 +241,6 @@ proc new*( repoDs = repoData, metaDs = SQLiteDatastore.new(config.dataDir / CodexMetaNamespace) .expect("Should create meta data store!"), - treeReader = treeReader, quotaMaxBytes = config.storageQuota.uint, blockTtl = config.blockTtl) diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 987bff48..1bab089a 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -102,13 +102,17 @@ proc getPendingBlocks( proc isFinished(): bool = pendingBlocks.len == 0 - proc genNext(): Future[(?!bt.Block, int)] {.async.} = + proc genNext(): Future[(?!bt.Block, int)] {.async.} = let completedFut = await one(pendingBlocks) - pendingBlocks.del(pendingBlocks.find(completedFut)) - return await completedFut + if (let i = pendingBlocks.find(completedFut); i >= 0): + pendingBlocks.del(i) + return await completedFut + else: + let (_, index) = await completedFut + raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index) Iter.new(genNext, isFinished) - + proc prepareEncodingData( self: Erasure, manifest: Manifest, @@ -128,9 +132,9 @@ proc prepareEncodingData( for fut in pendingBlocksIter: let (blkOrErr, idx) = await fut without blk =? blkOrErr, err: - warn "Failed retreiving a block", idx, treeCid = manifest.treeCid + warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg continue - + let pos = indexToPos(params.steps, idx, step) shallowCopy(data[pos], if blk.isEmpty: emptyBlock else: blk.data) cids[idx] = blk.cid @@ -164,7 +168,7 @@ proc prepareDecodingData( ## `emptyBlock` - the empty block to be used for padding ## - let + let indicies = toSeq(countup(step, encoded.blocksCount - 1, encoded.steps)) pendingBlocksIter = self.getPendingBlocks(encoded, indicies) @@ -180,7 +184,7 @@ proc prepareDecodingData( let (blkOrErr, idx) = await fut without blk =? blkOrErr, err: - trace "Failed retreiving a block", idx, treeCid = encoded.treeCid + trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg continue let @@ -368,7 +372,6 @@ proc decode*( data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int)) - resolved = 0 data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M diff --git a/codex/merkletree/merkletree.nim b/codex/merkletree/merkletree.nim index 3bfb9071..74ddba6d 100644 --- a/codex/merkletree/merkletree.nim +++ b/codex/merkletree/merkletree.nim @@ -46,7 +46,7 @@ type ########################################################### func computeTreeHeight(leavesCount: int): int = - if isPowerOfTwo(leavesCount): + if isPowerOfTwo(leavesCount): fastLog2(leavesCount) + 1 else: fastLog2(leavesCount) + 2 @@ -84,16 +84,16 @@ proc init*( proc addDataBlock*(self: var MerkleTreeBuilder, dataBlock: openArray[byte]): ?!void = ## Hashes the data block and adds the result of hashing to a buffer - ## + ## let oldLen = self.buffer.len self.buffer.setLen(oldLen + self.digestSize) digestFn(self.mcodec, self.buffer, oldLen, dataBlock) proc addLeaf*(self: var MerkleTreeBuilder, leaf: MultiHash): ?!void = if leaf.mcodec != self.mcodec or leaf.size != self.digestSize: - return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " & + return failure("Expected mcodec to be " & $self.mcodec & " and digest size to be " & $self.digestSize & " but was " & $leaf.mcodec & " and " & $leaf.size) - + let oldLen = self.buffer.len self.buffer.setLen(oldLen + self.digestSize) self.buffer[oldLen..= self.leavesCount: return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" ) - + success(self.nodeBufferToMultiHash(index)) proc getLeafCid*(self: MerkleTree, index: Natural, version = CIDv1, dataCodec = multiCodec("raw")): ?!Cid = @@ -216,7 +216,7 @@ proc height*(self: MerkleTree): Natural = proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = ## Extracts proof from a tree for a given index - ## + ## ## Given a tree built from data blocks A, B and C ## H5 ## / \ @@ -230,7 +230,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = ## - 0,[H1, H4] for data block A ## - 1,[H0, H4] for data block B ## - 2,[0x00, H3] for data block C - ## + ## if index >= self.leavesCount: return failure("Index " & $index & " out of range [0.." & $(self.leavesCount - 1) & "]" ) @@ -250,7 +250,7 @@ proc getProof*(self: MerkleTree, index: Natural): ?!MerkleProof = var dummyValue = if level.index == 0: zero else: one if siblingIndex < level.offset + level.width: - proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = + proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = self.nodesBuffer[siblingIndex * self.digestSize..<(siblingIndex + 1) * self.digestSize] else: proofNodesBuffer[level.index * self.digestSize..<(level.index + 1) * self.digestSize] = dummyValue @@ -281,9 +281,9 @@ proc init*( if totalNodes * digestSize == nodesBuffer.len: success( MerkleTree( - mcodec: mcodec, - digestSize: digestSize, - leavesCount: leavesCount, + mcodec: mcodec, + digestSize: digestSize, + leavesCount: leavesCount, nodesBuffer: nodesBuffer ) ) @@ -296,24 +296,28 @@ proc init*( ): ?!MerkleTree = without leaf =? leaves.?[0]: return failure("At least one leaf is required") - + var builder = ? MerkleTreeBuilder.init(mcodec = leaf.mcodec) for l in leaves: - if err =? builder.addLeaf(l).errorOption: - return failure(err) - + let res = builder.addLeaf(l) + if res.isErr: + return failure(res.error) + builder.build() proc init*( T: type MerkleTree, cids: openArray[Cid] ): ?!MerkleTree = - let leaves = collect: - for idx, cid in cids: - without mhash =? cid.mhash.mapFailure, errx: - return failure(errx) - mhash + var leaves = newSeq[MultiHash]() + + for cid in cids: + let res = cid.mhash.mapFailure + if res.isErr: + return failure(res.error) + else: + leaves.add(res.value) MerkleTree.init(leaves) @@ -341,7 +345,7 @@ proc verifyLeaf*(self: MerkleProof, leaf: MultiHash, treeRoot: MultiHash): ?!boo else: concatBuf[0..^1] = self.nodesBuffer[offset..<(offset + self.digestSize)] & digestBuf ? digestFn(self.mcodec, digestBuf, 0, concatBuf) - + let computedRoot = ? MultiHash.init(self.mcodec, digestBuf).mapFailure success(computedRoot == treeRoot) @@ -365,8 +369,8 @@ proc `$`*(self: MerkleProof): string = ", nodes: " & $self.nodes func `==`*(a, b: MerkleProof): bool = - (a.index == b.index) and - (a.mcodec == b.mcodec) and + (a.index == b.index) and + (a.mcodec == b.mcodec) and (a.digestSize == b.digestSize) and (a.nodesBuffer == b.nodesBuffer) @@ -381,11 +385,11 @@ proc init*( let mcodec = nodes[0].mcodec digestSize = nodes[0].size - + var nodesBuffer = newSeq[byte](nodes.len * digestSize) for nodeIndex, node in nodes: nodesBuffer[nodeIndex * digestSize..<(nodeIndex + 1) * digestSize] = node.data.buffer[node.dpos.. node.blockStore.getBlock(BlockAddress.init(manifest.treeCid, i))) for batchNum in 0.. items[i]) @@ -75,7 +81,7 @@ proc fromRange*[U, V, S: Ordinal](_: type Iter, a: U, b: V, step: S = 1): Iter[U let u = i inc(i, step) u - + proc isFinished(): bool = (step > 0 and i > b) or (step < 0 and i < b) diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index dd18993e..23cc1996 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -242,8 +242,13 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord + discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + + discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + + discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) MockDiscovery(blockexc[0].engine.discovery.discovery) @@ -284,8 +289,13 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord + discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + + discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + + discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) MockDiscovery(blockexc[0].engine.discovery.discovery) diff --git a/tests/codex/testerasure.nim b/tests/codex/testerasure.nim index 901507f3..41732a1a 100644 --- a/tests/codex/testerasure.nim +++ b/tests/codex/testerasure.nim @@ -29,7 +29,7 @@ asyncchecksuite "Erasure encode/decode": metaDs = SQLiteDatastore.new(Memory).tryGet() rng = Rng.instance() chunker = RandomChunker.new(rng, size = dataSetSize, chunkSize = BlockSize) - store = CacheStore.new(cacheSize = (dataSetSize * 8), chunkSize = BlockSize) + store = RepoStore.new(repoDs, metaDs) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) manifest = await storeDataGetManifest(store, chunker) diff --git a/tests/codex/testnode.nim b/tests/codex/testnode.nim index 0d85fdb6..57e52558 100644 --- a/tests/codex/testnode.nim +++ b/tests/codex/testnode.nim @@ -102,8 +102,7 @@ asyncchecksuite "Test Node": fetched = (await node.fetchManifest(manifestBlock.cid)).tryGet() check: - fetched.cid == manifest.cid - # fetched.blocks == manifest.blocks + fetched == manifest test "Block Batching": let