fix tests and split out discovery tests

This commit is contained in:
Dmitriy Ryajov 2022-04-19 19:19:17 -06:00
parent 06e043871c
commit c3df822724
No known key found for this signature in database
GPG Key ID: DA8C680CE7C657A4
3 changed files with 225 additions and 89 deletions

View File

@ -0,0 +1,192 @@
import std/sequtils
import std/sugar
import std/algorithm
import std/tables
import pkg/asynctest
import pkg/chronos
import pkg/stew/byteutils
import pkg/libp2p
import pkg/libp2p/errors
import pkg/dagger/rng
import pkg/dagger/stores
import pkg/dagger/blockexchange
import pkg/dagger/chunker
import pkg/dagger/blocktype as bt
import ./mockdiscovery
import ../../helpers
import ../../examples
suite "Block Advertising and Discovery":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
switch: seq[Switch]
blockexc: seq[NetworkStore]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
teardown:
switch = @[]
blockexc = @[]
test "Should discover want list":
let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = MockDiscovery.new(s.peerInfo, 0.Port)
wallet = WalletRef.example
network = BlockExcNetwork.new(s)
localStore = CacheStore.new(blocks.mapIt( it ))
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
s.mount(network)
switch.add(s)
await allFuturesThrowing(
switch.mapIt( it.start() )
)
let
pendingBlocks = blocks.mapIt(
engine.pendingBlocks.getWantHandle(it.cid)
)
await engine.start() # fire up discovery loop
discovery.findBlockProvidersHandler =
proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] =
engine.resolveBlocks(blocks.filterIt( it.cid == cid ))
test "Should advertise have blocks":
let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = MockDiscovery.new(s.peerInfo, 0.Port)
wallet = WalletRef.example
network = BlockExcNetwork.new(s)
localStore = CacheStore.new(blocks.mapIt( it ))
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
s.mount(network)
switch.add(s)
await allFuturesThrowing(
switch.mapIt( it.start() )
)
let
advertised = initTable.collect:
for b in blocks: {b.cid: newFuture[void]()}
discovery.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
if cid in advertised and not advertised[cid].finished():
advertised[cid].complete()
await engine.start() # fire up advertise loop
await allFuturesThrowing(
allFinished(toSeq(advertised.values)))
suite "E2E - Multiple Nodes Discovery":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
switch: seq[Switch]
blockexc: seq[NetworkStore]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
for _ in 0..<4:
let
s = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
discovery = MockDiscovery.new(s.peerInfo, 0.Port)
wallet = WalletRef.example
network = BlockExcNetwork.new(s)
localStore = CacheStore.new(blocks.mapIt( it ))
engine = BlockExcEngine.new(localStore, wallet, network, discovery)
networkStore = NetworkStore.new(engine, localStore)
s.mount(network)
switch.add(s)
blockexc.add(networkStore)
teardown:
switch = @[]
blockexc = @[]
test "Should not launch discovery request if we are already connected":
await allFuturesThrowing(
blockexc.mapIt( it.engine.start() ) &
switch.mapIt( it.start() )
)
await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks)
MockDiscovery(blockexc[0].engine.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] =
check false
await connectNodes(switch)
let blk = await blockexc[1].engine.requestBlock(blocks[0].cid)
await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) &
switch.mapIt( it.stop() )
)
test "E2E - Should advertise and discover blocks":
# Distribute the blocks amongst 1..3
# Ask 0 to download everything without connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord]
MockDiscovery(blockexc[1].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
advertised.add(cid, switch[1].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[2].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
advertised.add(cid, switch[2].peerInfo.signedPeerRecord)
MockDiscovery(blockexc[3].engine.discovery)
.publishProvideHandler = proc(d: MockDiscovery, cid: Cid) =
advertised.add(cid, switch[3].peerInfo.signedPeerRecord)
await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5])
await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10])
await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15])
MockDiscovery(blockexc[0].engine.discovery)
.findBlockProvidersHandler = proc(d: MockDiscovery, cid: Cid): seq[SignedPeerRecord] =
if cid in advertised:
result.add(advertised[cid])
let futs = collect(newSeq):
for b in blocks:
blockexc[0].engine.requestBlock(b.cid)
await allFuturesThrowing(
switch.mapIt( it.start() ) &
blockexc.mapIt( it.engine.start() )
)
await allFutures(futs)
await allFuturesThrowing(
blockexc.mapIt( it.engine.stop() ) &
switch.mapIt( it.stop() )
)

View File

@ -1,5 +1,4 @@
import std/sequtils
import std/sugar
import std/algorithm
import pkg/asynctest
@ -8,7 +7,6 @@ import pkg/stew/byteutils
import pkg/libp2p
import pkg/libp2p/errors
import pkg/libp2pdht/discv5/protocol as discv5
import pkg/dagger/rng
import pkg/dagger/stores
@ -38,6 +36,7 @@ suite "NetworkStore engine - 2 nodes":
engine1, engine2: BlockExcEngine
localStore1, localStore2: BlockStore
discovery1, discovery2: Discovery
pendingBlocks1, pendingBlocks2: seq[Future[bt.Block]]
setup:
while true:
@ -86,8 +85,8 @@ suite "NetworkStore engine - 2 nodes":
)
# initialize our want lists
for b in blocks2: discard blockexc1.engine.discoverBlock(b.cid)
for b in blocks1: discard blockexc2.engine.discoverBlock(b.cid)
pendingBlocks1 = blocks2.mapIt( blockexc1.engine.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks2 = blocks1.mapIt( blockexc2.engine.pendingBlocks.getWantHandle( it.cid ) )
pricing1.address = wallet1.address
pricing2.address = wallet2.address
@ -98,7 +97,7 @@ suite "NetworkStore engine - 2 nodes":
switch2.peerInfo.peerId,
switch2.peerInfo.addrs)
await sleepAsync(100.milliseconds) # give some time to exchange lists
await sleepAsync(1.seconds) # give some time to exchange lists
peerCtx2 = blockexc1.engine.getPeerCtx(peerId2)
peerCtx1 = blockexc2.engine.getPeerCtx(peerId1)
@ -113,12 +112,18 @@ suite "NetworkStore engine - 2 nodes":
check not isNil(peerCtx1)
check not isNil(peerCtx2)
await allFuturesThrowing(
allFinished(pendingBlocks1))
await allFuturesThrowing(
allFinished(pendingBlocks2))
check:
peerCtx1.peerHave.mapIt( $it ).sorted(cmp[string]) ==
toSeq(blockexc2.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string])
pendingBlocks2.mapIt( $it.read.cid ).sorted(cmp[string])
peerCtx2.peerHave.mapIt( $it ).sorted(cmp[string]) ==
toSeq(blockexc1.engine.runningDiscoveries.keys()).mapIt( $it ).sorted(cmp[string])
pendingBlocks1.mapIt( $it.read.cid ).sorted(cmp[string])
test "exchanges accounts on connect":
check peerCtx1.account.?address == pricing1.address.some
@ -175,7 +180,8 @@ suite "NetworkStore engine - 2 nodes":
check wallet2.balance(channel, Asset) > 0
suite "NetworkStore - multiple nodes":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
let
chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
switch: seq[Switch]
@ -213,10 +219,9 @@ suite "NetworkStore - multiple nodes":
engine = downloader.engine
# Add blocks from 1st peer to want list
for b in blocks[0..3]:
discard engine.discoverBlock(b.cid)
for b in blocks[12..15]:
discard engine.discoverBlock(b.cid)
let
pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid ))
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
@ -230,12 +235,16 @@ suite "NetworkStore - multiple nodes":
await connectNodes(switch)
await sleepAsync(1.seconds)
await allFuturesThrowing(
allFinished(pendingBlocks1),
allFinished(pendingBlocks2))
check:
engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) ==
blocks[0..3].mapIt( it.cid ).mapIt($it).sorted(cmp[string])
blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string])
engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) ==
blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string])
blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string])
test "should exchange blocks with multiple nodes":
let
@ -243,10 +252,9 @@ suite "NetworkStore - multiple nodes":
engine = downloader.engine
# Add blocks from 1st peer to want list
for b in blocks[0..3]:
discard engine.discoverBlock(b.cid)
for b in blocks[12..15]:
discard engine.discoverBlock(b.cid)
let
pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) )
pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid ))
await allFutures(
blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) ))
@ -260,74 +268,9 @@ suite "NetworkStore - multiple nodes":
await connectNodes(switch)
await sleepAsync(1.seconds)
let wantListBlocks = await allFinished(
blocks[0..3].mapIt( downloader.getBlock(it.cid) ))
check wantListBlocks.mapIt( !it.read ) == blocks[0..3]
suite "NetworkStore - discovery":
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
var
switch: seq[Switch]
blockexc: seq[NetworkStore]
blocks: seq[bt.Block]
setup:
while true:
let chunk = await chunker.getBytes()
if chunk.len <= 0:
break
blocks.add(bt.Block.new(chunk).tryGet())
for e in generateNodes(4):
switch.add(e.switch)
blockexc.add(e.blockexc)
await e.blockexc.engine.start()
await allFuturesThrowing(
switch.mapIt( it.start() )
)
allFinished(pendingBlocks1),
allFinished(pendingBlocks2))
teardown:
await allFuturesThrowing(
switch.mapIt( it.stop() )
)
switch = @[]
blockexc = @[]
test "Shouldn't launch discovery request if we are already connected":
await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks)
blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] =
check false
await connectNodes(switch)
let blk = await blockexc[1].engine.requestBlock(blocks[0].cid)
test "E2E discovery":
# Distribute the blocks amongst 1..3
# Ask 0 to download everything without connecting him beforehand
var advertised: Table[Cid, SignedPeerRecord]
blockexc[1].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) =
advertised[cid] = switch[1].peerInfo.signedPeerRecord
blockexc[2].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) =
advertised[cid] = switch[2].peerInfo.signedPeerRecord
blockexc[3].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) =
advertised[cid] = switch[3].peerInfo.signedPeerRecord
await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5])
await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10])
await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15])
blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] =
if cid in advertised:
result.add(advertised[cid])
let futs = collect(newSeq):
for b in blocks:
blockexc[0].engine.requestBlock(b.cid)
await allFutures(futs)
check pendingBlocks1.mapIt( it.read ) == blocks[0..3]
check pendingBlocks2.mapIt( it.read ) == blocks[12..15]

View File

@ -66,8 +66,9 @@ suite "NetworkStore engine basic":
wallet,
network,
discovery)
for b in blocks:
discard engine.discoverBlock(b.cid)
discard engine.pendingBlocks.getWantHandle(b.cid)
engine.setupPeer(peerId)
await done
@ -171,7 +172,7 @@ suite "NetworkStore engine handlers":
test "stores blocks in local store":
let pending = blocks.mapIt(
engine.pendingBlocks.addOrAwait( it.cid )
engine.pendingBlocks.getWantHandle( it.cid )
)
await engine.blocksHandler(peerId, blocks)