From 9379c7c66260ed98cf5f2493b97e18a80061f7c4 Mon Sep 17 00:00:00 2001 From: Ben Bierens <39762930+benbierens@users.noreply.github.com> Date: Thu, 9 May 2024 10:03:35 +0200 Subject: [PATCH] Advertise manifests and trees (#797) * wip update advertise loop to announce only manifests and trees * fixes e2e tests * fixes tests for discovery engine --------- Co-authored-by: Dmitriy Ryajov --- codex/blockexchange/engine/discovery.nim | 26 +++- .../blockexchange/discovery/testdiscovery.nim | 126 +++++++++--------- .../discovery/testdiscoveryengine.nim | 45 +++---- 3 files changed, 102 insertions(+), 95 deletions(-) diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 5434a5e6..eb68bce8 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -11,6 +11,7 @@ import std/sequtils import pkg/chronos import pkg/libp2p/cid +import pkg/libp2p/multicodec import pkg/metrics import pkg/questionable import pkg/questionable/results @@ -25,6 +26,7 @@ import ../../utils import ../../discovery import ../../stores/blockstore import ../../logutils +import ../../manifest logScope: topics = "codex discoveryengine" @@ -76,14 +78,32 @@ proc discoveryQueueLoop(b: DiscoveryEngine) {.async.} = await sleepAsync(b.discoveryLoopSleep) +proc advertiseBlock(b: DiscoveryEngine, cid: Cid) {.async.} = + without isM =? cid.isManifest, err: + warn "Unable to determine if cid is manifest" + return + + if isM: + without blk =? await b.localStore.getBlock(cid), err: + error "Error retrieving manifest block", cid, err = err.msg + return + + without manifest =? Manifest.decode(blk), err: + error "Unable to decode as manifest", err = err.msg + return + + # announce manifest cid and tree cid + await b.advertiseQueue.put(cid) + await b.advertiseQueue.put(manifest.treeCid) + proc advertiseQueueLoop(b: DiscoveryEngine) {.async.} = while b.discEngineRunning: if cids =? await b.localStore.listBlocks(blockType = b.advertiseType): trace "Begin iterating blocks..." for c in cids: if cid =? await c: - await b.advertiseQueue.put(cid) - await sleepAsync(50.millis) + b.advertiseBlock(cid) + await sleepAsync(100.millis) trace "Iterating blocks finished." await sleepAsync(b.advertiseLoopSleep) @@ -248,7 +268,7 @@ proc new*( discoveryLoopSleep = DefaultDiscoveryLoopSleep, advertiseLoopSleep = DefaultAdvertiseLoopSleep, minPeersPerBlock = DefaultMinPeersPerBlock, - advertiseType = BlockType.Both + advertiseType = BlockType.Manifest ): DiscoveryEngine = ## Create a discovery engine instance for advertising services ## diff --git a/tests/codex/blockexchange/discovery/testdiscovery.nim b/tests/codex/blockexchange/discovery/testdiscovery.nim index 28d315f4..9ba29e5d 100644 --- a/tests/codex/blockexchange/discovery/testdiscovery.nim +++ b/tests/codex/blockexchange/discovery/testdiscovery.nim @@ -50,7 +50,7 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery = MockDiscovery.new() wallet = WalletRef.example network = BlockExcNetwork.new(switch) - localStore = CacheStore.new(blocks.mapIt( it )) + localStore = CacheStore.new(blocks.mapIt(it)) peerStore = PeerCtxStore.new() pendingBlocks = PendingBlocksManager.new() @@ -92,57 +92,40 @@ asyncchecksuite "Block Advertising and Discovery": blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - await engine.resolveBlocks(blocks.filterIt( it.cid == cid )) + await engine.resolveBlocks(blocks.filterIt(it.cid == cid)) await allFuturesThrowing( allFinished(pendingBlocks)) await engine.stop() - test "Should advertise both manifests and blocks": + test "Should advertise both manifests and trees": let + cids = @[manifest.cid.tryGet, manifest.treeCid] advertised = initTable.collect: - for b in (blocks & manifestBlock): {b.cid: newFuture[void]()} + for cid in cids: {cid: newFuture[void]()} blockDiscovery .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = if cid in advertised and not advertised[cid].finished(): advertised[cid].complete() - discovery.advertiseType = BlockType.Both - await engine.start() # fire up advertise loop + await engine.start() await allFuturesThrowing( allFinished(toSeq(advertised.values))) await engine.stop() - test "Should advertise local manifests": + test "Should not advertise local blocks": let - advertised = newFuture[Cid]() + blockCids = blocks.mapIt(it.cid) blockDiscovery .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = - check manifestBlock.cid == cid - advertised.complete(cid) + check: + cid notin blockCids - discovery.advertiseType = BlockType.Manifest - await engine.start() # fire up advertise loop - check (await advertised.wait(10.millis)) == manifestBlock.cid - await engine.stop() - - test "Should advertise local blocks": - let - advertised = initTable.collect: - for b in blocks: {b.cid: newFuture[void]()} - - blockDiscovery - .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async.} = - if cid in advertised and not advertised[cid].finished(): - advertised[cid].complete() - - discovery.advertiseType = BlockType.Block - await engine.start() # fire up advertise loop - await allFuturesThrowing( - allFinished(toSeq(advertised.values))) + await engine.start() + await sleepAsync(3.seconds) await engine.stop() test "Should not launch discovery if remote peer has block": @@ -165,7 +148,7 @@ asyncchecksuite "Block Advertising and Discovery": proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] = check false - await engine.start() # fire up discovery loop + await engine.start() engine.pendingBlocks.resolve(blocks.mapIt(BlockDelivery(blk: it, address: it.address))) await allFuturesThrowing( @@ -173,23 +156,33 @@ asyncchecksuite "Block Advertising and Discovery": await engine.stop() -asyncchecksuite "E2E - Multiple Nodes Discovery": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) +proc asBlock(m: Manifest): bt.Block = + let mdata = m.encode().tryGet() + bt.Block.new(data = mdata, codec = ManifestCodec).tryGet() +asyncchecksuite "E2E - Multiple Nodes Discovery": var switch: seq[Switch] blockexc: seq[NetworkStore] - blocks: seq[bt.Block] + manifests: seq[Manifest] + mBlocks: seq[bt.Block] + trees: seq[CodexTree] setup: - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - blocks.add(bt.Block.new(chunk).tryGet()) - for _ in 0..<4: + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + var blocks = newSeq[bt.Block]() + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.new(chunk).tryGet()) + let (manifest, tree) = makeManifestAndTree(blocks).tryGet() + manifests.add(manifest) + mBlocks.add(manifest.asBlock()) + trees.add(tree) + let s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) blockDiscovery = MockDiscovery.new() @@ -223,9 +216,12 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": teardown: switch = @[] blockexc = @[] + manifests = @[] + mBlocks = @[] + trees = @[] test "E2E - Should advertise and discover blocks": - # Distribute the blocks amongst 1..3 + # Distribute the manifests and trees amongst 1..3 # Ask 0 to download everything without connecting him beforehand var advertised: Table[Cid, SignedPeerRecord] @@ -242,14 +238,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord - discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) + await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))]) - discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) + await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))]) - discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) + await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))]) MockDiscovery(blockexc[0].engine.discovery.discovery) .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): @@ -258,22 +254,22 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": result.add(advertised[cid]) let futs = collect(newSeq): - for b in blocks: - blockexc[0].engine.requestBlock(b.cid) + for m in mBlocks[0..2]: + blockexc[0].engine.requestBlock(m.cid) await allFuturesThrowing( - switch.mapIt( it.start() ) & - blockexc.mapIt( it.engine.start() )).wait(10.seconds) + switch.mapIt(it.start()) & + blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) await allFuturesThrowing( - blockexc.mapIt( it.engine.stop() ) & - switch.mapIt( it.stop() )).wait(10.seconds) + blockexc.mapIt(it.engine.stop()) & + switch.mapIt(it.stop())).wait(10.seconds) test "E2E - Should advertise and discover blocks with peers already connected": # Distribute the blocks amongst 1..3 - # Ask 0 to download everything without connecting him beforehand + # Ask 0 to download everything *WITH* connecting him beforehand var advertised: Table[Cid, SignedPeerRecord] @@ -289,14 +285,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": .publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid): Future[void] {.async.} = advertised[cid] = switch[3].peerInfo.signedPeerRecord - discard blocks[0..5].mapIt(blockexc[1].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[0..5].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[1].engine.pendingBlocks.getWantHandle(mBlocks[0].cid) + await blockexc[1].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[0], address: BlockAddress(leaf: false, cid: mBlocks[0].cid))]) - discard blocks[4..10].mapIt(blockexc[2].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[4..10].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[2].engine.pendingBlocks.getWantHandle(mBlocks[1].cid) + await blockexc[2].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[1], address: BlockAddress(leaf: false, cid: mBlocks[1].cid))]) - discard blocks[10..15].mapIt(blockexc[3].engine.pendingBlocks.getWantHandle(it.address)) - await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, blocks[10..15].mapIt(BlockDelivery(blk: it, address: it.address))) + discard blockexc[3].engine.pendingBlocks.getWantHandle(mBlocks[2].cid) + await blockexc[3].engine.blocksDeliveryHandler(switch[0].peerInfo.peerId, @[BlockDelivery(blk: mBlocks[2], address: BlockAddress(leaf: false, cid: mBlocks[2].cid))]) MockDiscovery(blockexc[0].engine.discovery.discovery) .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): @@ -305,14 +301,14 @@ asyncchecksuite "E2E - Multiple Nodes Discovery": return @[advertised[cid]] let - futs = blocks.mapIt( blockexc[0].engine.requestBlock( it.cid ) ) + futs = mBlocks[0..2].mapIt(blockexc[0].engine.requestBlock(it.cid)) await allFuturesThrowing( - switch.mapIt( it.start() ) & - blockexc.mapIt( it.engine.start() )).wait(10.seconds) + switch.mapIt(it.start()) & + blockexc.mapIt(it.engine.start())).wait(10.seconds) await allFutures(futs).wait(10.seconds) await allFuturesThrowing( - blockexc.mapIt( it.engine.stop() ) & - switch.mapIt( it.stop() )).wait(10.seconds) + blockexc.mapIt(it.engine.stop()) & + switch.mapIt(it.stop())).wait(10.seconds) diff --git a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim index 3984fe6a..ff6b60d2 100644 --- a/tests/codex/blockexchange/discovery/testdiscoveryengine.nim +++ b/tests/codex/blockexchange/discovery/testdiscoveryengine.nim @@ -10,17 +10,26 @@ import pkg/codex/blockexchange import pkg/codex/chunker import pkg/codex/blocktype as bt import pkg/codex/blockexchange/engine +import pkg/codex/manifest +import pkg/codex/merkletree import ../../../asynctest import ../../helpers import ../../helpers/mockdiscovery import ../../examples +proc asBlock(m: Manifest): bt.Block = + let mdata = m.encode().tryGet() + bt.Block.new(data = mdata, codec = ManifestCodec).tryGet() + asyncchecksuite "Test Discovery Engine": let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var blocks: seq[bt.Block] + manifest: Manifest + tree: CodexTree + manifestBlock: bt.Block switch: Switch peerStore: PeerCtxStore blockDiscovery: MockDiscovery @@ -35,6 +44,10 @@ asyncchecksuite "Test Discovery Engine": blocks.add(bt.Block.new(chunk).tryGet()) + (manifest, tree) = makeManifestAndTree(blocks).tryGet() + manifestBlock = manifest.asBlock() + blocks.add(manifestBlock) + switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) network = BlockExcNetwork.new(switch) peerStore = PeerCtxStore.new() @@ -51,11 +64,11 @@ asyncchecksuite "Test Discovery Engine": blockDiscovery, pendingBlocks, discoveryLoopSleep = 100.millis) - wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) ) + wants = blocks.mapIt(pendingBlocks.getWantHandle(it.cid) ) blockDiscovery.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} = - pendingBlocks.resolve(blocks.filterIt( it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address))) + pendingBlocks.resolve(blocks.filterIt(it.cid == cid).mapIt(BlockDelivery(blk: it, address: it.address))) await discoveryEngine.start() await allFuturesThrowing(allFinished(wants)).wait(1.seconds) @@ -63,7 +76,7 @@ asyncchecksuite "Test Discovery Engine": test "Should Advertise Haves": var - localStore = CacheStore.new(blocks.mapIt( it )) + localStore = CacheStore.new(blocks.mapIt(it)) discoveryEngine = DiscoveryEngine.new( localStore, peerStore, @@ -72,8 +85,8 @@ asyncchecksuite "Test Discovery Engine": pendingBlocks, discoveryLoopSleep = 100.millis) haves = collect(initTable): - for b in blocks: - { b.cid: newFuture[void]() } + for cid in @[manifestBlock.cid, manifest.treeCid]: + { cid: newFuture[void]() } blockDiscovery.publishBlockProvideHandler = proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = @@ -108,28 +121,6 @@ asyncchecksuite "Test Discovery Engine": await want.wait(1.seconds) await discoveryEngine.stop() - test "Should queue advertise request": - var - localStore = CacheStore.new(@[blocks[0]]) - discoveryEngine = DiscoveryEngine.new( - localStore, - peerStore, - network, - blockDiscovery, - pendingBlocks, - discoveryLoopSleep = 100.millis) - have = newFuture[void]() - - blockDiscovery.publishBlockProvideHandler = - proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} = - check cid == blocks[0].cid - if not have.finished: - have.complete() - - await discoveryEngine.start() - await have.wait(1.seconds) - await discoveryEngine.stop() - test "Should not request more than minPeersPerBlock": var localStore = CacheStore.new()