diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 1525ff32..64aa624c 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -349,21 +349,24 @@ proc blocksDeliveryHandler*( var validatedBlocksDelivery: seq[BlockDelivery] for bd in blocksDelivery: + logScope: + peer = peer + address = bd.address if err =? b.validateBlockDelivery(bd).errorOption: - warn "Block validation failed", address = bd.address, msg = err.msg + warn "Block validation failed", msg = err.msg continue if err =? (await b.localStore.putBlock(bd.blk)).errorOption: - error "Unable to store block", address = bd.address, err = err.msg + error "Unable to store block", err = err.msg continue if bd.address.leaf: without proof =? bd.proof: - error "Proof expected for a leaf block delivery", address = bd.address + error "Proof expected for a leaf block delivery" continue if err =? (await b.localStore.putBlockCidAndProof(bd.address.treeCid, bd.address.index, bd.blk.cid, proof)).errorOption: - error "Unable to store proof and cid for a block", address = bd.address + error "Unable to store proof and cid for a block" continue validatedBlocksDelivery.add(bd) @@ -398,11 +401,11 @@ proc wantListHandler*( logScope: peer = peerCtx.id - # cid = e.cid + address = e.address wantType = $e.wantType if idx < 0: # updating entry - trace "Processing new want list entry", address = e.address + trace "Processing new want list entry" let have = await e.address in b.localStore @@ -414,21 +417,21 @@ proc wantListHandler*( codex_block_exchange_want_have_lists_received.inc() if not have and e.sendDontHave: - trace "Adding dont have entry to presence response", address = e.address + trace "Adding dont have entry to presence response" presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.DontHave, price: price)) elif have and e.wantType == WantType.WantHave: - trace "Adding have entry to presence response", address = e.address + trace "Adding have entry to presence response" presence.add( BlockPresence( address: e.address, `type`: BlockPresenceType.Have, price: price)) elif e.wantType == WantType.WantBlock: - trace "Added entry to peer's want blocks list", address = e.address + trace "Added entry to peer's want blocks list" peerCtx.peerWants.add(e) codex_block_exchange_want_block_lists_received.inc() else: diff --git a/codex/erasure/erasure.nim b/codex/erasure/erasure.nim index 987bff48..35109a99 100644 --- a/codex/erasure/erasure.nim +++ b/codex/erasure/erasure.nim @@ -104,8 +104,12 @@ proc getPendingBlocks( proc genNext(): Future[(?!bt.Block, int)] {.async.} = let completedFut = await one(pendingBlocks) - pendingBlocks.del(pendingBlocks.find(completedFut)) - return await completedFut + if (let i = pendingBlocks.find(completedFut); i >= 0): + pendingBlocks.del(i) + return await completedFut + else: + let (_, index) = await completedFut + raise newException(CatchableError, "Future for block id not found, tree cid: " & $manifest.treeCid & ", index: " & $index) Iter.new(genNext, isFinished) @@ -128,7 +132,7 @@ proc prepareEncodingData( for fut in pendingBlocksIter: let (blkOrErr, idx) = await fut without blk =? blkOrErr, err: - warn "Failed retreiving a block", idx, treeCid = manifest.treeCid + warn "Failed retreiving a block", treeCid = manifest.treeCid, idx, msg = err.msg continue let pos = indexToPos(params.steps, idx, step) @@ -180,7 +184,7 @@ proc prepareDecodingData( let (blkOrErr, idx) = await fut without blk =? blkOrErr, err: - trace "Failed retreiving a block", idx, treeCid = encoded.treeCid + trace "Failed retreiving a block", idx, treeCid = encoded.treeCid, msg = err.msg continue let @@ -368,7 +372,6 @@ proc decode*( data = seq[seq[byte]].new() parityData = seq[seq[byte]].new() recovered = newSeqWith[seq[byte]](encoded.ecK, newSeq[byte](encoded.blockSize.int)) - resolved = 0 data[].setLen(encoded.ecK) # set len to K parityData[].setLen(encoded.ecM) # set len to M diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index dea54d55..731ace05 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -78,26 +78,27 @@ func available*(self: RepoStore, bytes: uint): bool = return bytes < self.available() proc encode(cidAndProof: (Cid, MerkleProof)): seq[byte] = + ## Encodes a tuple of cid and merkle proof in a following format: + ## | 8-bytes | n-bytes | remaining bytes | + ## | n | cid | proof | + ## + ## where n is a size of cid + ## let (cid, proof) = cidAndProof cidBytes = cid.data.buffer proofBytes = proof.encode + n = cidBytes.len + nBytes = n.uint64.toBytesBE - var buf = newSeq[byte](1 + cidBytes.len + proofBytes.len) - - buf[0] = cid.data.buffer.len.byte # cid shouldnt be more than 255 bytes? - buf[1..cidBytes.len] = cidBytes - buf[cidBytes.len + 1..^1] = proofBytes - - buf + @nBytes & cidBytes & proofBytes proc decode(_: type (Cid, MerkleProof), data: seq[byte]): ?!(Cid, MerkleProof) = - let cidLen = data[0].int - + let + n = uint64.fromBytesBE(data[0..