238 lines
6.5 KiB
Nim
238 lines
6.5 KiB
Nim
import std/sequtils
|
|
import std/sugar
|
|
import std/tables
|
|
|
|
import pkg/asynctest
|
|
|
|
import pkg/chronos
|
|
import pkg/chronicles
|
|
import pkg/libp2p
|
|
|
|
import pkg/dagger/rng
|
|
import pkg/dagger/stores
|
|
import pkg/dagger/blockexchange
|
|
import pkg/dagger/chunker
|
|
import pkg/dagger/blocktype as bt
|
|
import pkg/dagger/blockexchange/engine
|
|
|
|
import ../../helpers/mockdiscovery
|
|
|
|
import ../../helpers
|
|
import ../../examples
|
|
|
|
suite "Test Discovery Engine":
|
|
let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256)
|
|
|
|
var
|
|
blocks: seq[bt.Block]
|
|
switch: Switch
|
|
peerStore: PeerCtxStore
|
|
blockDiscovery: MockDiscovery
|
|
pendingBlocks: PendingBlocksManager
|
|
localStore: CacheStore
|
|
network: BlockExcNetwork
|
|
|
|
setup:
|
|
while true:
|
|
let chunk = await chunker.getBytes()
|
|
if chunk.len <= 0:
|
|
break
|
|
|
|
blocks.add(bt.Block.new(chunk).tryGet())
|
|
|
|
switch = newStandardSwitch(transportFlags = {ServerFlags.ReuseAddr})
|
|
network = BlockExcNetwork.new(switch)
|
|
peerStore = PeerCtxStore.new()
|
|
pendingBlocks = PendingBlocksManager.new()
|
|
blockDiscovery = MockDiscovery.new()
|
|
|
|
test "Should Query Wants":
|
|
var
|
|
localStore = CacheStore.new()
|
|
discoveryEngine = DiscoveryEngine.new(
|
|
localStore,
|
|
peerStore,
|
|
network,
|
|
blockDiscovery,
|
|
pendingBlocks,
|
|
discoveryLoopSleep = 100.millis)
|
|
wants = blocks.mapIt( pendingBlocks.getWantHandle(it.cid) )
|
|
|
|
blockDiscovery.findBlockProvidersHandler =
|
|
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
|
|
pendingBlocks.resolve(blocks.filterIt( it.cid == cid))
|
|
|
|
await discoveryEngine.start()
|
|
await allFuturesThrowing(allFinished(wants)).wait(1.seconds)
|
|
await discoveryEngine.stop()
|
|
|
|
test "Should Advertise Haves":
|
|
var
|
|
localStore = CacheStore.new(blocks.mapIt( it ))
|
|
discoveryEngine = DiscoveryEngine.new(
|
|
localStore,
|
|
peerStore,
|
|
network,
|
|
blockDiscovery,
|
|
pendingBlocks,
|
|
discoveryLoopSleep = 100.millis)
|
|
haves = collect(initTable):
|
|
for b in blocks:
|
|
{ b.cid: newFuture[void]() }
|
|
|
|
blockDiscovery.publishProvideHandler =
|
|
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
|
|
if not haves[cid].finished:
|
|
haves[cid].complete
|
|
|
|
await discoveryEngine.start()
|
|
await allFuturesThrowing(
|
|
allFinished(toSeq(haves.values))).wait(1.seconds)
|
|
await discoveryEngine.stop()
|
|
|
|
test "Should queue discovery request":
|
|
var
|
|
localStore = CacheStore.new()
|
|
discoveryEngine = DiscoveryEngine.new(
|
|
localStore,
|
|
peerStore,
|
|
network,
|
|
blockDiscovery,
|
|
pendingBlocks,
|
|
discoveryLoopSleep = 100.millis)
|
|
want = newFuture[void]()
|
|
|
|
blockDiscovery.findBlockProvidersHandler =
|
|
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
|
|
check cid == blocks[0].cid
|
|
if not want.finished:
|
|
want.complete()
|
|
|
|
await discoveryEngine.start()
|
|
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
|
|
await want.wait(1.seconds)
|
|
await discoveryEngine.stop()
|
|
|
|
test "Should queue advertise request":
|
|
var
|
|
localStore = CacheStore.new(@[blocks[0]])
|
|
discoveryEngine = DiscoveryEngine.new(
|
|
localStore,
|
|
peerStore,
|
|
network,
|
|
blockDiscovery,
|
|
pendingBlocks,
|
|
discoveryLoopSleep = 100.millis)
|
|
have = newFuture[void]()
|
|
|
|
blockDiscovery.publishProvideHandler =
|
|
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
|
|
check cid == blocks[0].cid
|
|
if not have.finished:
|
|
have.complete()
|
|
|
|
await discoveryEngine.start()
|
|
await have.wait(1.seconds)
|
|
await discoveryEngine.stop()
|
|
|
|
test "Should not request more than minPeersPerBlock":
|
|
var
|
|
localStore = CacheStore.new()
|
|
minPeers = 2
|
|
discoveryEngine = DiscoveryEngine.new(
|
|
localStore,
|
|
peerStore,
|
|
network,
|
|
blockDiscovery,
|
|
pendingBlocks,
|
|
discoveryLoopSleep = 5.minutes,
|
|
minPeersPerBlock = minPeers)
|
|
want = newAsyncEvent()
|
|
|
|
blockDiscovery.findBlockProvidersHandler =
|
|
proc(d: MockDiscovery, cid: Cid): Future[seq[SignedPeerRecord]] {.async, gcsafe.} =
|
|
check cid == blocks[0].cid
|
|
check peerStore.len < minPeers
|
|
var
|
|
peerCtx = BlockExcPeerCtx(id: PeerID.example)
|
|
|
|
peerCtx.peerPrices[cid] = 0.u256
|
|
peerStore.add(peerCtx)
|
|
want.fire()
|
|
|
|
await discoveryEngine.start()
|
|
while peerStore.len < minPeers:
|
|
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
|
|
await want.wait()
|
|
want.clear()
|
|
|
|
check peerStore.len == minPeers
|
|
await discoveryEngine.stop()
|
|
|
|
test "Should not request if there is already an inflight discovery request":
|
|
var
|
|
localStore = CacheStore.new()
|
|
discoveryEngine = DiscoveryEngine.new(
|
|
localStore,
|
|
peerStore,
|
|
network,
|
|
blockDiscovery,
|
|
pendingBlocks,
|
|
discoveryLoopSleep = 100.millis,
|
|
concurrentDiscReqs = 2)
|
|
reqs = newFuture[void]()
|
|
count = 0
|
|
|
|
blockDiscovery.findBlockProvidersHandler =
|
|
proc(d: MockDiscovery, cid: Cid):
|
|
Future[seq[SignedPeerRecord]] {.gcsafe, async.} =
|
|
check cid == blocks[0].cid
|
|
if count > 0:
|
|
check false
|
|
count.inc
|
|
|
|
await reqs # queue the request
|
|
|
|
await discoveryEngine.start()
|
|
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
|
|
await sleepAsync(200.millis)
|
|
|
|
discoveryEngine.queueFindBlocksReq(@[blocks[0].cid])
|
|
await sleepAsync(200.millis)
|
|
|
|
reqs.complete()
|
|
await discoveryEngine.stop()
|
|
|
|
test "Should not request if there is already an inflight advertise request":
|
|
var
|
|
localStore = CacheStore.new()
|
|
discoveryEngine = DiscoveryEngine.new(
|
|
localStore,
|
|
peerStore,
|
|
network,
|
|
blockDiscovery,
|
|
pendingBlocks,
|
|
discoveryLoopSleep = 100.millis,
|
|
concurrentAdvReqs = 2)
|
|
reqs = newFuture[void]()
|
|
count = 0
|
|
|
|
blockDiscovery.publishProvideHandler =
|
|
proc(d: MockDiscovery, cid: Cid) {.async, gcsafe.} =
|
|
check cid == blocks[0].cid
|
|
if count > 0:
|
|
check false
|
|
count.inc
|
|
|
|
await reqs # queue the request
|
|
|
|
await discoveryEngine.start()
|
|
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
|
|
await sleepAsync(200.millis)
|
|
|
|
discoveryEngine.queueProvideBlocksReq(@[blocks[0].cid])
|
|
await sleepAsync(200.millis)
|
|
|
|
reqs.complete()
|
|
await discoveryEngine.stop()
|