diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 007c1a5b..291eea14 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -17,12 +17,14 @@ import pkg/chronos import pkg/libp2p/[cid, switch, multihash, multicodec] import pkg/metrics import pkg/stint +import pkg/questionable import ../../stores/blockstore import ../../blocktype import ../../utils import ../../merkletree import ../../logutils +import ../../manifest import ../protobuf/blockexc import ../protobuf/presence @@ -293,19 +295,28 @@ proc cancelBlocks(b: BlockExcEngine, addrs: seq[BlockAddress]) {.async.} = if failed.len > 0: trace "Failed to send block request cancellations to peers", peers = failed.len +proc getAnnouceCids(blocksDelivery: seq[BlockDelivery]): seq[Cid] = + var cids = initHashSet[Cid]() + for bd in blocksDelivery: + if bd.address.leaf: + cids.incl(bd.address.treeCid) + else: + without isM =? bd.address.cid.isManifest, err: + warn "Unable to determine if cid is manifest" + continue + if isM: + cids.incl(bd.address.cid) + return cids.toSeq + proc resolveBlocks*(b: BlockExcEngine, blocksDelivery: seq[BlockDelivery]) {.async.} = trace "Resolving blocks", blocks = blocksDelivery.len b.pendingBlocks.resolve(blocksDelivery) await b.scheduleTasks(blocksDelivery) - var cids = initHashSet[Cid]() - for bd in blocksDelivery: - cids.incl(bd.blk.cid) - if bd.address.leaf: - cids.incl(bd.address.treeCid) - + let announceCids = getAnnouceCids(blocksDelivery) await b.cancelBlocks(blocksDelivery.mapIt(it.address)) - b.discovery.queueProvideBlocksReq(cids.toSeq) + + b.discovery.queueProvideBlocksReq(announceCids) proc resolveBlocks*(b: BlockExcEngine, blocks: seq[Block]) {.async.} = await b.resolveBlocks( diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 96571cf3..5bf02b1b 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -15,6 +15,7 @@ import pkg/codex/chunker import pkg/codex/discovery import pkg/codex/blocktype import pkg/codex/utils/asyncheapqueue +import pkg/codex/manifest import ../../../asynctest import ../../helpers @@ -389,6 +390,51 @@ asyncchecksuite "NetworkStore engine handlers": discard await allFinished(pending) await allFuturesThrowing(cancellations.values().toSeq) + test "resolveBlocks should queue manifest CIDs for discovery": + engine.network = BlockExcNetwork( + request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) + + let + manifest = Manifest.new( + treeCid = Cid.example, + blockSize = 123.NBytes, + datasetSize = 234.NBytes + ) + + let manifestBlk = Block.new(data = manifest.encode().tryGet(), codec = ManifestCodec).tryGet() + let blks = @[manifestBlk] + + await engine.resolveBlocks(blks) + + check: + manifestBlk.cid in engine.discovery.advertiseQueue + + test "resolveBlocks should queue tree CIDs for discovery": + engine.network = BlockExcNetwork( + request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) + + let + tCid = Cid.example + delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: true, treeCid: tCid)) + + await engine.resolveBlocks(@[delivery]) + + check: + tCid in engine.discovery.advertiseQueue + + test "resolveBlocks should not queue non-manifest non-tree CIDs for discovery": + engine.network = BlockExcNetwork( + request: BlockExcRequest(sendWantCancellations: NopSendWantCancellationsProc)) + + let + blkCid = Cid.example + delivery = BlockDelivery(blk: Block.example, address: BlockAddress(leaf: false, cid: blkCid)) + + await engine.resolveBlocks(@[delivery]) + + check: + blkCid notin engine.discovery.advertiseQueue + asyncchecksuite "Task Handler": var rng: Rng