From c3df8227240220476bd24159cdad1d00efdb39dc Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 19 Apr 2022 19:19:17 -0600 Subject: [PATCH] fix tests and split out discovery tests --- .../blockexc/discovery/testdiscovery.nim | 192 ++++++++++++++++++ tests/dagger/blockexc/testblockexc.nim | 117 +++-------- tests/dagger/blockexc/testengine.nim | 5 +- 3 files changed, 225 insertions(+), 89 deletions(-) create mode 100644 tests/dagger/blockexc/discovery/testdiscovery.nim diff --git a/tests/dagger/blockexc/discovery/testdiscovery.nim b/tests/dagger/blockexc/discovery/testdiscovery.nim new file mode 100644 index 00000000..bd596a56 --- /dev/null +++ b/tests/dagger/blockexc/discovery/testdiscovery.nim @@ -0,0 +1,192 @@ +import std/sequtils +import std/sugar +import std/algorithm +import std/tables + +import pkg/asynctest +import pkg/chronos +import pkg/stew/byteutils + +import pkg/libp2p +import pkg/libp2p/errors + +import pkg/dagger/rng +import pkg/dagger/stores +import pkg/dagger/blockexchange +import pkg/dagger/chunker +import pkg/dagger/blocktype as bt + +import ./mockdiscovery + +import ../../helpers +import ../../examples + +suite "Block Advertising and Discovery": + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + + var + switch: seq[Switch] + blockexc: seq[NetworkStore] + blocks: seq[bt.Block] + + setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.new(chunk).tryGet()) + + teardown: + switch = @[] + blockexc = @[] + + test "Should discover want list": + let + s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) + discovery = MockDiscovery.new(s.peerInfo, 0.Port) + wallet = WalletRef.example + network = BlockExcNetwork.new(s) + localStore = CacheStore.new(blocks.mapIt( it )) + engine = BlockExcEngine.new(localStore, wallet, network, discovery) + + s.mount(network) + switch.add(s) + + await allFuturesThrowing( + switch.mapIt( it.start() ) + ) + + let + pendingBlocks = blocks.mapIt( + engine.pendingBlocks.getWantHandle(it.cid) + ) + + await engine.start() # fire up discovery loop + discovery.findBlockProvidersHandler = + proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] = + engine.resolveBlocks(blocks.filterIt( it.cid == cid )) + + test "Should advertise have blocks": + let + s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) + discovery = MockDiscovery.new(s.peerInfo, 0.Port) + wallet = WalletRef.example + network = BlockExcNetwork.new(s) + localStore = CacheStore.new(blocks.mapIt( it )) + engine = BlockExcEngine.new(localStore, wallet, network, discovery) + + s.mount(network) + switch.add(s) + + await allFuturesThrowing( + switch.mapIt( it.start() ) + ) + + let + advertised = initTable.collect: + for b in blocks: {b.cid: newFuture[void]()} + + discovery.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) = + if cid in advertised and not advertised[cid].finished(): + advertised[cid].complete() + + await engine.start() # fire up advertise loop + await allFuturesThrowing( + allFinished(toSeq(advertised.values))) + +suite "E2E - Multiple Nodes Discovery": + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + + var + switch: seq[Switch] + blockexc: seq[NetworkStore] + blocks: seq[bt.Block] + + setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.new(chunk).tryGet()) + + for _ in 0..<4: + let + s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr}) + discovery = MockDiscovery.new(s.peerInfo, 0.Port) + wallet = WalletRef.example + network = BlockExcNetwork.new(s) + localStore = CacheStore.new(blocks.mapIt( it )) + engine = BlockExcEngine.new(localStore, wallet, network, discovery) + networkStore = NetworkStore.new(engine, localStore) + + s.mount(network) + switch.add(s) + blockexc.add(networkStore) + + teardown: + switch = @[] + blockexc = @[] + + test "Should not launch discovery request if we are already connected": + await allFuturesThrowing( + blockexc.mapIt( it.engine.start() ) & + switch.mapIt( it.start() ) + ) + + await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks) + MockDiscovery(blockexc[0].engine.discovery) + .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] = + check false + + await connectNodes(switch) + let blk = await blockexc[1].engine.requestBlock(blocks[0].cid) + + await allFuturesThrowing( + blockexc.mapIt( it.engine.stop() ) & + switch.mapIt( it.stop() ) + ) + + test "E2E - Should advertise and discover blocks": + # Distribute the blocks amongst 1..3 + # Ask 0 to download everything without connecting him beforehand + + var advertised: Table[Cid, SignedPeerRecord] + + MockDiscovery(blockexc[1].engine.discovery) + .publishProvideHandler = proc(d: MockDiscovery, cid: Cid) = + advertised.add(cid, switch[1].peerInfo.signedPeerRecord) + + MockDiscovery(blockexc[2].engine.discovery) + .publishProvideHandler = proc(d: MockDiscovery, cid: Cid) = + advertised.add(cid, switch[2].peerInfo.signedPeerRecord) + + MockDiscovery(blockexc[3].engine.discovery) + .publishProvideHandler = proc(d: MockDiscovery, cid: Cid) = + advertised.add(cid, switch[3].peerInfo.signedPeerRecord) + + await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) + await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10]) + await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15]) + + MockDiscovery(blockexc[0].engine.discovery) + .findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] = + if cid in advertised: + result.add(advertised[cid]) + + let futs = collect(newSeq): + for b in blocks: + blockexc[0].engine.requestBlock(b.cid) + + await allFuturesThrowing( + switch.mapIt( it.start() ) & + blockexc.mapIt( it.engine.start() ) + ) + + await allFutures(futs) + + await allFuturesThrowing( + blockexc.mapIt( it.engine.stop() ) & + switch.mapIt( it.stop() ) + ) diff --git a/tests/dagger/blockexc/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim index 361c772f..d80f643b 100644 --- a/tests/dagger/blockexc/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -1,5 +1,4 @@ import std/sequtils -import std/sugar import std/algorithm import pkg/asynctest @@ -8,7 +7,6 @@ import pkg/stew/byteutils import pkg/libp2p import pkg/libp2p/errors -import pkg/libp2pdht/discv5/protocol as discv5 import pkg/dagger/rng import pkg/dagger/stores @@ -38,6 +36,7 @@ suite "NetworkStore engine - 2 nodes": engine1, engine2: BlockExcEngine localStore1, localStore2: BlockStore discovery1, discovery2: Discovery + pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]] setup: while true: @@ -86,8 +85,8 @@ suite "NetworkStore engine - 2 nodes": ) # initialize our want lists - for b in blocks2: discard blockexc1.engine.discoverBlock(b.cid) - for b in blocks1: discard blockexc2.engine.discoverBlock(b.cid) + pendingBlocks1 = blocks2.mapIt( blockexc1.engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks1.mapIt( blockexc2.engine.pendingBlocks.getWantHandle( it.cid ) ) pricing1.address = wallet1.address pricing2.address = wallet2.address @@ -98,7 +97,7 @@ suite "NetworkStore engine - 2 nodes": switch2.peerInfo.peerId, switch2.peerInfo.addrs) - await sleepAsync(100.milliseconds) # give some time to exchange lists + await sleepAsync(1.seconds) # give some time to exchange lists peerCtx2 = blockexc1.engine.getPeerCtx(peerId2) peerCtx1 = blockexc2.engine.getPeerCtx(peerId1) @@ -113,12 +112,18 @@ suite "NetworkStore engine - 2 nodes": check not isNil(peerCtx1) check not isNil(peerCtx2) + await allFuturesThrowing( + allFinished(pendingBlocks1)) + + await allFuturesThrowing( + allFinished(pendingBlocks2)) + check: peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) == - toSeq(blockexc2.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string]) + pendingBlocks2.mapIt( $it.read.cid ).sorted(cmp[string]) peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) == - toSeq(blockexc1.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string]) + pendingBlocks1.mapIt( $it.read.cid ).sorted(cmp[string]) test "exchanges accounts on connect": check peerCtx1.account.?address == pricing1.address.some @@ -175,7 +180,8 @@ suite "NetworkStore engine - 2 nodes": check wallet2.balance(channel, Asset) > 0 suite "NetworkStore - multiple nodes": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + let + chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] @@ -213,10 +219,9 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - for b in blocks[0..3]: - discard engine.discoverBlock(b.cid) - for b in blocks[12..15]: - discard engine.discoverBlock(b.cid) + let + pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -230,12 +235,16 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) + await allFuturesThrowing( + allFinished(pendingBlocks1), + allFinished(pendingBlocks2)) + check: engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[0..3].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) + blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string]) engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) + blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string]) test "should exchange blocks with multiple nodes": let @@ -243,10 +252,9 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - for b in blocks[0..3]: - discard engine.discoverBlock(b.cid) - for b in blocks[12..15]: - discard engine.discoverBlock(b.cid) + let + pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -260,74 +268,9 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) - let wantListBlocks = await allFinished( - blocks[0..3].mapIt( downloader.getBlock(it.cid) )) - check wantListBlocks.mapIt( !it.read ) == blocks[0..3] - -suite "NetworkStore - discovery": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) - - var - switch: seq[Switch] - blockexc: seq[NetworkStore] - blocks: seq[bt.Block] - - setup: - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - blocks.add(bt.Block.new(chunk).tryGet()) - - for e in generateNodes(4): - switch.add(e.switch) - blockexc.add(e.blockexc) - await e.blockexc.engine.start() - await allFuturesThrowing( - switch.mapIt( it.start() ) - ) + allFinished(pendingBlocks1), + allFinished(pendingBlocks2)) - teardown: - await allFuturesThrowing( - switch.mapIt( it.stop() ) - ) - - switch = @[] - blockexc = @[] - - test "Shouldn't launch discovery request if we are already connected": - await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks) - blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = - check false - await connectNodes(switch) - let blk = await blockexc[1].engine.requestBlock(blocks[0].cid) - - test "E2E discovery": - # Distribute the blocks amongst 1..3 - # Ask 0 to download everything without connecting him beforehand - - var advertised: Table[Cid, SignedPeerRecord] - - blockexc[1].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[1].peerInfo.signedPeerRecord - - blockexc[2].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[2].peerInfo.signedPeerRecord - - blockexc[3].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[3].peerInfo.signedPeerRecord - - await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) - await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10]) - await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15]) - - blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = - if cid in advertised: - result.add(advertised[cid]) - - let futs = collect(newSeq): - for b in blocks: - blockexc[0].engine.requestBlock(b.cid) - await allFutures(futs) + check pendingBlocks1.mapIt( it.read ) == blocks[0..3] + check pendingBlocks2.mapIt( it.read ) == blocks[12..15] diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index 9940c9c6..d741a120 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -66,8 +66,9 @@ suite "NetworkStore engine basic": wallet, network, discovery) + for b in blocks: - discard engine.discoverBlock(b.cid) + discard engine.pendingBlocks.getWantHandle(b.cid) engine.setupPeer(peerId) await done @@ -171,7 +172,7 @@ suite "NetworkStore engine handlers": test "stores blocks in local store": let pending = blocks.mapIt( - engine.pendingBlocks.addOrAwait( it.cid ) + engine.pendingBlocks.getWantHandle( it.cid ) ) await engine.blocksHandler(peerId, blocks)