From d669e344bc48aacf605f09482dd07e63b5a42d43 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Thu, 12 May 2022 15:52:03 -0600 Subject: [PATCH] Engine (#89) * rework discovery with async queues * increase max message size for large manifests * increase sleep time to 100 millis * pass config * check for nil on start/stop * fix tests and split out discovery tests * don't auto mount network * add discovery tests * rework moc discovery * move discovery moc to disc dir * don't force logging syncs * don't force moc discovery on all tests * rework discovery with methods * add top level utils file * don't use asyncCheck * don't pass entire blocks to list blocks calback * spelling * - don't send want reqs to peers reporting the cid - Don't request blocks directly on presense update, use `requestBlock` * bug, nodes should not have blocks in local store * Add failing test * prefetch blocks so that download isn't serial * if request already pending, return the handle * fire discovery if no peers report block as have * only query discovery if not enough nodes for cid * wrap async req in template * use non awaiting version of queue routines * rework E2E tests as unittest * re-add chronicles sinks Co-authored-by: Tanguy --- dagger/blockexchange/engine.nim | 462 +++++++++--------- dagger/blockexchange/networkpeer.nim | 2 +- dagger/dagger.nim | 2 +- dagger/discovery.nim | 36 +- dagger/erasure/erasure.nim | 2 +- dagger/node.nim | 43 +- dagger/por/por.nim | 6 +- dagger/rest/api.nim | 7 +- dagger/utils.nim | 4 + tests/config.nims | 1 - .../discovery}/mockdiscovery.nim | 27 +- .../blockexc/discovery/testdiscovery.nim | 232 +++++++++ tests/dagger/blockexc/testblockexc.nim | 117 ++--- tests/dagger/blockexc/testengine.nim | 5 +- tests/dagger/helpers/nodeutils.nim | 8 +- tests/dagger/testblockexc.nim | 1 + 16 files changed, 592 insertions(+), 363 deletions(-) create mode 100644 dagger/utils.nim delete mode 100644 tests/config.nims rename tests/dagger/{ => blockexc/discovery}/mockdiscovery.nim (60%) create mode 100644 tests/dagger/blockexc/discovery/testdiscovery.nim diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index bdb186c1..e30ba674 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -7,7 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[sequtils, sets, tables, sugar] +import std/sequtils +import std/sets import pkg/chronos import pkg/chronicles @@ -15,7 +16,7 @@ import pkg/libp2p import ../stores/blockstore import ../blocktype as bt -import ../utils/asyncheapqueue +import ../utils import ../discovery import ./protobuf/blockexc @@ -32,31 +33,20 @@ logScope: topics = "dagger blockexc engine" const - DefaultBlockTimeout* = 5.minutes DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 DefaultMaxRetries = 3 - - # Current advertisement is meant to be more efficient than - # correct, so blocks could be advertised more slowly than that - # Put some margin - BlockAdvertisementFrequency = 30.minutes + DefaultConcurrentDiscRequests = 10 + DefaultConcurrentAdvertRequests = 10 + DefaultDiscoveryTimeout = 1.minutes + DefaultMaxQueriedBlocksCache = 1000 + DefaultMinPeersPerBlock = 3 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} - BlockDiscovery* = ref object - discoveredProvider: AsyncEvent - discoveryLoop: Future[void] - toDiscover: Cid - treatedPeer: HashSet[PeerId] - inflightIWant: HashSet[PeerId] - gotIWantResponse: AsyncEvent - provides: seq[PeerId] - lastDhtQuery: Moment - BlockExcEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance network*: BlockExcNetwork # network interface @@ -70,12 +60,16 @@ type peersPerRequest: int # max number of peers to request from wallet*: WalletRef # nitro wallet for micropayments pricing*: ?Pricing # optional bandwidth pricing - advertisedBlocks: seq[Cid] - advertisedIndex: int - advertisementFrequency: Duration - runningDiscoveries*: Table[Cid, BlockDiscovery] - blockAdded: AsyncEvent - discovery*: Discovery + discovery*: Discovery # Discovery interface + concurrentAdvReqs: int # Concurrent advertise requests + advertiseLoop*: Future[void] # Advertise loop task handle + advertiseQueue*: AsyncQueue[Cid] # Advertise queue + advertiseTasks*: seq[Future[void]] # Advertise tasks + concurrentDiscReqs: int # Concurrent discovery requests + discoveryLoop*: Future[void] # Discovery loop task handle + discoveryTasks*: seq[Future[void]] # Discovery tasks + discoveryQueue*: AsyncQueue[Cid] # Discovery queue + minPeersPerBlock*: int # Max number of peers with block Pricing* = object address*: EthAddress @@ -100,7 +94,95 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = b.taskQueue.pushOrUpdateNoWait(task).isOk() proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} -proc advertiseLoop(b: BlockExcEngine): Future[void] {.gcsafe.} + +proc discoveryLoopRunner(b: BlockExcEngine) {.async.} = + while b.blockexcRunning: + for cid in toSeq(b.pendingBlocks.wantList): + try: + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception in discovery loop", exc = exc.msg + + trace "About to sleep, number of wanted blocks", wanted = b.pendingBlocks.len + await sleepAsync(30.seconds) + +proc advertiseLoopRunner*(b: BlockExcEngine) {.async.} = + proc onBlock(cid: Cid) {.async.} = + try: + await b.advertiseQueue.put(cid) + except CatchableError as exc: + trace "Exception listing blocks", exc = exc.msg + + while b.blockexcRunning: + await b.localStore.listBlocks(onBlock) + await sleepAsync(30.seconds) + + trace "Exiting advertise task loop" + +proc advertiseTaskRunner(b: BlockExcEngine) {.async.} = + ## Run advertise tasks + ## + + while b.blockexcRunning: + try: + let cid = await b.advertiseQueue.get() + await b.discovery.provideBlock(cid) + except CatchableError as exc: + trace "Exception in advertise task runner", exc = exc.msg + + trace "Exiting advertise task runner" + +proc discoveryTaskRunner(b: BlockExcEngine) {.async.} = + ## Run discovery tasks + ## + + while b.blockexcRunning: + try: + let + cid = await b.discoveryQueue.get() + haves = b.peers.filterIt( + it.peerHave.anyIt( it == cid ) + ) + + trace "Got peers for block", cid = $cid, count = haves.len + let + providers = + if haves.len < b.minPeersPerBlock: + await b.discovery + .findBlockProviders(cid) + .wait(DefaultDiscoveryTimeout) + else: + @[] + + checkFutures providers.mapIt( b.network.dialPeer(it.data) ) + except CatchableError as exc: + trace "Exception in discovery task runner", exc = exc.msg + + trace "Exiting discovery task runner" + +template queueFindBlocksReq(b: BlockExcEngine, cids: seq[Cid]) = + proc queueReq() {.async.} = + try: + for cid in cids: + if cid notin b.discoveryQueue: + trace "Queueing find block request", cid = $cid + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg + + asyncSpawn queueReq() + +template queueProvideBlocksReq(b: BlockExcEngine, cids: seq[Cid]) = + proc queueReq() {.async.} = + try: + for cid in cids: + if cid notin b.advertiseQueue: + trace "Queueing provide block request", cid = $cid + await b.advertiseQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg + + asyncSpawn queueReq() proc start*(b: BlockExcEngine) {.async.} = ## Start the blockexc task @@ -116,14 +198,14 @@ proc start*(b: BlockExcEngine) {.async.} = for i in 0.. 0 + peers.keepItIf( + it != blockPeer and cid notin it.peerHave + ) - debug "Requesting block from peer", providerCount = discovery.provides.len, - peer = discovery.provides[0], cid # request block b.network.request.sendWantList( - discovery.provides[0], + blockPeer.id, @[cid], wantType = WantType.wantBlock) # we want this remote to send us a block - #TODO substract the discovery time - return await blk.wait(timeout) + if peers.len == 0: + trace "Not enough peers to send want list to", cid = $cid + b.queueFindBlocksReq(@[cid]) + return blk # no peers to send wants to + + # filter out the peer we've already requested from + let stop = min(peers.high, b.peersPerRequest) + trace "Sending want list requests to remaining peers", count = stop + 1 + for p in peers[0..stop]: + if cid notin p.peerHave: + # just send wants + b.network.request.sendWantList( + p.id, + @[cid], + wantType = WantType.wantHave) # we only want to know if the peer has the block + + return blk proc blockPresenceHandler*( b: BlockExcEngine, @@ -299,18 +318,33 @@ proc blockPresenceHandler*( ## Handle block presence ## + trace "Received presence update for peer", peer let peerCtx = b.getPeerCtx(peer) + if isNil(peerCtx): + return for blk in blocks: if presence =? Presence.init(blk): - if not isNil(peerCtx): - peerCtx.updatePresence(presence) - if presence.cid in b.runningDiscoveries: - let bd = b.runningDiscoveries[presence.cid] - if not presence.have: - bd.inflightIWant.excl(peer) - bd.treatedPeer.incl(peer) - bd.gotIWantResponse.fire() + peerCtx.updatePresence(presence) + + var + cids = toSeq(b.pendingBlocks.wantList).filterIt( + it in peerCtx.peerHave + ) + + trace "Received presence update for cids", peer, cids = $cids + + if cids.len > 0: + b.network.request.sendWantList( + peer, + cids, + wantType = WantType.wantBlock) # we want this remote to send us a block + + # if none of the connected peers report our wants in their have list, + # fire up discovery + b.queueFindBlocksReq(toSeq(b.pendingBlocks.wantList) + .filter(proc(cid: Cid): bool = + (not b.peers.anyIt( cid in it.peerHave )))) proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" @@ -331,21 +365,11 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = ## and schedule any new task to be ran ## - trace "Resolving blocks" + trace "Resolving blocks", blocks = blocks.len - var gotNewBlocks = false - for bl in blocks: - if bl.cid notin b.advertisedBlocks: #TODO that's very slow, maybe a ordered hashset instead - #TODO could do some smarter ordering here (insert it just before b.advertisedIndex, or similar) - b.advertisedBlocks.add(bl.cid) - asyncSpawn b.discovery.publishProvide(bl.cid) - gotNewBlocks = true - - if gotNewBlocks: - b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) - - b.blockAdded.fire() + b.pendingBlocks.resolve(blocks) + b.scheduleTasks(blocks) + b.queueProvideBlocksReq(blocks.mapIt( it.cid )) proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, @@ -421,14 +445,20 @@ proc wantListHandler*( if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer -proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} = +proc accountHandler*( + engine: BlockExcEngine, + peer: PeerID, + account: Account) {.async.} = let context = engine.getPeerCtx(peer) if context.isNil: return context.account = account.some -proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} = +proc paymentHandler*( + engine: BlockExcEngine, + peer: PeerId, + payment: SignedState) {.async.} = without context =? engine.getPeerCtx(peer).option and account =? context.account: return @@ -451,13 +481,8 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = )) # broadcast our want list, the other peer will do the same - let wantList = collect(newSeqOfCap(b.runningDiscoveries.len)): - for cid, bd in b.runningDiscoveries: - bd.inflightIWant.incl(peer) - cid - - if wantList.len > 0: - b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true) + if b.pendingBlocks.len > 0: + b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) if address =? b.pricing.?address: b.network.request.sendAccount(peer, Account(address: address)) @@ -471,31 +496,6 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = # drop the peer from the peers table b.peers.keepItIf( it.id != peer ) -proc advertiseLoop(b: BlockExcEngine) {.async, gcsafe.} = - while true: - if b.advertisedIndex >= b.advertisedBlocks.len: - b.advertisedIndex = 0 - b.advertisementFrequency = BlockAdvertisementFrequency - - # check that we still have this block. - while - b.advertisedIndex < b.advertisedBlocks.len and - not(b.localStore.contains(b.advertisedBlocks[b.advertisedIndex])): - b.advertisedBlocks.delete(b.advertisedIndex) - - #publish it - if b.advertisedIndex < b.advertisedBlocks.len: - asyncSpawn b.discovery.publishProvide(b.advertisedBlocks[b.advertisedIndex]) - - inc b.advertisedIndex - let toSleep = - if b.advertisedBlocks.len > 0: - b.advertisementFrequency div b.advertisedBlocks.len - else: - 30.minutes - await sleepAsync(toSleep) or b.blockAdded.wait() - b.blockAdded.clear() - proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id @@ -517,6 +517,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = .mapIt(!it.read) if blocks.len > 0: + trace "Sending blocks to peer", peer = task.id, blocks = blocks.len b.network.request.sendBlocks( task.id, blocks) @@ -559,19 +560,27 @@ proc new*( discovery: Discovery, concurrentTasks = DefaultConcurrentTasks, maxRetries = DefaultMaxRetries, - peersPerRequest = DefaultMaxPeersPerRequest): T = + peersPerRequest = DefaultMaxPeersPerRequest, + concurrentAdvReqs = DefaultConcurrentAdvertRequests, + concurrentDiscReqs = DefaultConcurrentDiscRequests, + minPeersPerBlock = DefaultMinPeersPerBlock): T = - let engine = BlockExcEngine( - localStore: localStore, - pendingBlocks: PendingBlocksManager.new(), - blockAdded: newAsyncEvent(), - peersPerRequest: peersPerRequest, - network: network, - wallet: wallet, - concurrentTasks: concurrentTasks, - maxRetries: maxRetries, - discovery: discovery, - taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize)) + let + engine = BlockExcEngine( + localStore: localStore, + pendingBlocks: PendingBlocksManager.new(), + peersPerRequest: peersPerRequest, + network: network, + wallet: wallet, + concurrentTasks: concurrentTasks, + concurrentAdvReqs: concurrentAdvReqs, + concurrentDiscReqs: concurrentDiscReqs, + maxRetries: maxRetries, + taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), + discovery: discovery, + advertiseQueue: newAsyncQueue[Cid](DefaultTaskQueueSize), + discoveryQueue: newAsyncQueue[Cid](DefaultTaskQueueSize), + minPeersPerBlock: minPeersPerBlock) proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: @@ -609,7 +618,6 @@ proc new*( onBlocks: blocksHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, - onPayment: paymentHandler - ) + onPayment: paymentHandler) return engine diff --git a/dagger/blockexchange/networkpeer.nim b/dagger/blockexchange/networkpeer.nim index 269291cc..2ef8d473 100644 --- a/dagger/blockexchange/networkpeer.nim +++ b/dagger/blockexchange/networkpeer.nim @@ -17,7 +17,7 @@ import ./protobuf/blockexc logScope: topics = "dagger blockexc networkpeer" -const MaxMessageSize = 8 * 1024 * 1024 +const MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big type RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} diff --git a/dagger/dagger.nim b/dagger/dagger.nim index aebb593c..cf92ab82 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -129,7 +129,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = ) daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery, contracts) restServer = RestServerRef.new( - daggerNode.initRestApi(), + daggerNode.initRestApi(config), initTAddress("127.0.0.1" , config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high) diff --git a/dagger/discovery.nim b/dagger/discovery.nim index 60d9f999..96c961a9 100644 --- a/dagger/discovery.nim +++ b/dagger/discovery.nim @@ -8,18 +8,20 @@ ## those terms. import pkg/chronos +import pkg/chronicles import pkg/libp2p import pkg/questionable import pkg/questionable/results import pkg/stew/shims/net import pkg/libp2pdht/discv5/protocol as discv5 -import rng +import ./rng +import ./errors export discv5 type - Discovery* = ref object + Discovery* = ref object of RootObj protocol: discv5.Protocol localInfo: PeerInfo @@ -55,15 +57,33 @@ proc toDiscoveryId*(cid: Cid): NodeId = ## To discovery id readUintBE[256](keccak256.digest(cid.data.buffer).data) -proc findBlockProviders*( +method findBlockProviders*( d: Discovery, - cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - return (await d.protocol.getProviders(cid.toDiscoveryId())).get() + cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = + ## Find block providers + ## -proc publishProvide*(d: Discovery, cid: Cid) {.async.} = - let bid = cid.toDiscoveryId() - discard await d.protocol.addProvider(bid, d.localInfo.signedPeerRecord) + trace "Finding providers for block", cid = $cid + without providers =? + (await d.protocol.getProviders(cid.toDiscoveryId())).mapFailure, error: + trace "Error finding providers for block", cid = $cid, error = error.msg + return providers + +method provideBlock*(d: Discovery, cid: Cid) {.async, base.} = + ## Provide a bock Cid + ## + + trace "Providing block", cid = $cid + let + nodes = await d.protocol.addProvider( + cid.toDiscoveryId(), + d.localInfo.signedPeerRecord) + + if nodes.len <= 0: + trace "Couldn't provide to any nodes!" + + trace "Provided to nodes", nodes = nodes.len proc start*(d: Discovery) {.async.} = d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR") diff --git a/dagger/erasure/erasure.nim b/dagger/erasure/erasure.nim index bb8970c4..cf52ba94 100644 --- a/dagger/erasure/erasure.nim +++ b/dagger/erasure/erasure.nim @@ -109,7 +109,7 @@ proc encode*( # TODO: this is a tight blocking loop so we sleep here to allow # other events to be processed, this should be addressed # by threading - await sleepAsync(10.millis) + await sleepAsync(100.millis) for j in 0.. 0 suite "NetworkStore - multiple nodes": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + let + chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] @@ -213,10 +219,9 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - for b in blocks[0..3]: - discard engine.discoverBlock(b.cid) - for b in blocks[12..15]: - discard engine.discoverBlock(b.cid) + let + pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -230,12 +235,16 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) + await allFuturesThrowing( + allFinished(pendingBlocks1), + allFinished(pendingBlocks2)) + check: engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[0..3].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) + blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string]) engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) + blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string]) test "should exchange blocks with multiple nodes": let @@ -243,10 +252,9 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - for b in blocks[0..3]: - discard engine.discoverBlock(b.cid) - for b in blocks[12..15]: - discard engine.discoverBlock(b.cid) + let + pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -260,74 +268,9 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) - let wantListBlocks = await allFinished( - blocks[0..3].mapIt( downloader.getBlock(it.cid) )) - check wantListBlocks.mapIt( !it.read ) == blocks[0..3] - -suite "NetworkStore - discovery": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) - - var - switch: seq[Switch] - blockexc: seq[NetworkStore] - blocks: seq[bt.Block] - - setup: - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - blocks.add(bt.Block.new(chunk).tryGet()) - - for e in generateNodes(4): - switch.add(e.switch) - blockexc.add(e.blockexc) - await e.blockexc.engine.start() - await allFuturesThrowing( - switch.mapIt( it.start() ) - ) + allFinished(pendingBlocks1), + allFinished(pendingBlocks2)) - teardown: - await allFuturesThrowing( - switch.mapIt( it.stop() ) - ) - - switch = @[] - blockexc = @[] - - test "Shouldn't launch discovery request if we are already connected": - await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks) - blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = - check false - await connectNodes(switch) - let blk = await blockexc[1].engine.requestBlock(blocks[0].cid) - - test "E2E discovery": - # Distribute the blocks amongst 1..3 - # Ask 0 to download everything without connecting him beforehand - - var advertised: Table[Cid, SignedPeerRecord] - - blockexc[1].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[1].peerInfo.signedPeerRecord - - blockexc[2].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[2].peerInfo.signedPeerRecord - - blockexc[3].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[3].peerInfo.signedPeerRecord - - await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) - await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10]) - await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15]) - - blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = - if cid in advertised: - result.add(advertised[cid]) - - let futs = collect(newSeq): - for b in blocks: - blockexc[0].engine.requestBlock(b.cid) - await allFutures(futs) + check pendingBlocks1.mapIt( it.read ) == blocks[0..3] + check pendingBlocks2.mapIt( it.read ) == blocks[12..15] diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index 9940c9c6..d741a120 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -66,8 +66,9 @@ suite "NetworkStore engine basic": wallet, network, discovery) + for b in blocks: - discard engine.discoverBlock(b.cid) + discard engine.pendingBlocks.getWantHandle(b.cid) engine.setupPeer(peerId) await done @@ -171,7 +172,7 @@ suite "NetworkStore engine handlers": test "stores blocks in local store": let pending = blocks.mapIt( - engine.pendingBlocks.addOrAwait( it.cid ) + engine.pendingBlocks.getWantHandle( it.cid ) ) await engine.blocksHandler(peerId, blocks) diff --git a/tests/dagger/helpers/nodeutils.nim b/tests/dagger/helpers/nodeutils.nim index e1ce5027..f190746f 100644 --- a/tests/dagger/helpers/nodeutils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -11,10 +11,8 @@ import ../examples proc generateNodes*( num: Natural, - blocks: openArray[bt.Block] = [], - secureManagers: openarray[SecureProtocol] = [ - SecureProtocol.Noise, - ]): seq[tuple[switch: Switch, blockexc: NetworkStore]] = + blocks: openArray[bt.Block] = []): + seq[tuple[switch: Switch, blockexc: NetworkStore]] = for i in 0..