From ce59dbd4a21ffe7d0eb9e1fc81c2d9d7cbc0fb30 Mon Sep 17 00:00:00 2001 From: Tanguy Date: Wed, 20 Apr 2022 14:28:11 +0200 Subject: [PATCH] Revert "Cleanup engine and rework discovery (#87)" This reverts commit 4740ffc144916b8a47a1c386e2e8eda728355ce9. --- config.nims | 4 + dagger/blockexchange/engine.nim | 430 ++++++++++-------- dagger/blockexchange/network.nim | 39 +- dagger/blockexchange/networkpeer.nim | 2 +- dagger/blockexchange/peercontext.nim | 12 +- dagger/blockexchange/pendingblocks.nim | 36 +- dagger/dagger.nim | 2 +- dagger/discovery.nim | 36 +- dagger/erasure/erasure.nim | 2 +- dagger/node.nim | 30 +- dagger/rest/api.nim | 7 +- dagger/stores/blockstore.nim | 3 +- dagger/stores/cachestore.nim | 5 +- dagger/stores/fsstore.nim | 26 +- dagger/stores/networkstore.nim | 6 +- dagger/utils.nim | 4 - tests/config.nims | 1 + .../blockexc/discovery/testdiscovery.nim | 192 -------- tests/dagger/blockexc/testblockexc.nim | 119 +++-- tests/dagger/blockexc/testengine.nim | 5 +- tests/dagger/helpers/nodeutils.nim | 8 +- .../discovery => }/mockdiscovery.nim | 27 +- tests/dagger/stores/testcachestore.nim | 12 - tests/dagger/stores/testfsstore.nim | 6 +- tests/dagger/testblockexc.nim | 1 - 25 files changed, 417 insertions(+), 598 deletions(-) delete mode 100644 dagger/utils.nim create mode 100644 tests/config.nims delete mode 100644 tests/dagger/blockexc/discovery/testdiscovery.nim rename tests/dagger/{blockexc/discovery => }/mockdiscovery.nim (60%) diff --git a/config.nims b/config.nims index a906bd99..35023252 100644 --- a/config.nims +++ b/config.nims @@ -68,6 +68,10 @@ switch("warning", "ObservableStores:off") switch("warning", "LockLevel:off") switch("define", "libp2p_pki_schemes=secp256k1") +#TODO this infects everything in this folder, ideally it would only +# apply to dagger.nim, but since dagger.nims is used for other purpose +# we can't use it. And dagger.cfg doesn't work +switch("define", "chronicles_sinks=textlines[dynamic],json[dynamic]") # begin Nimble config (version 1) when system.fileExists("nimble.paths"): diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index 4a86c63a..7be6167e 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -7,8 +7,7 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/sequtils -import std/sets +import std/[sequtils, sets, tables, sugar] import pkg/chronos import pkg/chronicles @@ -16,7 +15,7 @@ import pkg/libp2p import ../stores/blockstore import ../blocktype as bt -import ../utils +import ../utils/asyncheapqueue import ../discovery import ./protobuf/blockexc @@ -33,19 +32,31 @@ logScope: topics = "dagger blockexc engine" const + DefaultBlockTimeout* = 5.minutes DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 DefaultMaxRetries = 3 - DefaultConcurrentDiscRequests = 10 - DefaultConcurrentAdvertRequests = 10 - DefaultDiscoveryTimeout = 1.minutes - DefaultMaxQueriedBlocksCache = 1000 + + # Current advertisement is meant to be more efficient than + # correct, so blocks could be advertised more slowly than that + # Put some margin + BlockAdvertisementFrequency = 30.minutes type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} + BlockDiscovery* = ref object + discoveredProvider: AsyncEvent + discoveryLoop: Future[void] + toDiscover: Cid + treatedPeer: HashSet[PeerId] + inflightIWant: HashSet[PeerId] + gotIWantResponse: AsyncEvent + provides: seq[PeerId] + lastDhtQuery: Moment + BlockExcEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance network*: BlockExcNetwork # network interface @@ -59,15 +70,12 @@ type peersPerRequest: int # max number of peers to request from wallet*: WalletRef # nitro wallet for micropayments pricing*: ?Pricing # optional bandwidth pricing - discovery*: Discovery # Discovery interface - concurrentAdvReqs: int # Concurrent advertise requests - advertiseLoop*: Future[void] # Advertise loop task handle - advertiseQueue*: AsyncQueue[Cid] # Advertise queue - advertiseTasks*: seq[Future[void]] # Advertise tasks - concurrentDiscReqs: int # Concurrent discovery requests - discoveryLoop*: Future[void] # Discovery loop task handle - discoveryTasks*: seq[Future[void]] # Discovery tasks - discoveryQueue*: AsyncQueue[Cid] # Discovery queue + advertisedBlocks: seq[Cid] + advertisedIndex: int + advertisementFrequency: Duration + runningDiscoveries*: Table[Cid, BlockDiscovery] + blockAdded: AsyncEvent + discovery*: Discovery Pricing* = object address*: EthAddress @@ -92,76 +100,7 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = b.taskQueue.pushOrUpdateNoWait(task).isOk() proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} - -proc discoveryLoopRunner(b: BlockExcEngine) {.async.} = - while b.blockexcRunning: - for cid in toSeq(b.pendingBlocks.wantList): - try: - await b.discoveryQueue.put(cid) - except CatchableError as exc: - trace "Exception in discovery loop", exc = exc.msg - - trace "About to sleep, number of wanted blocks", wanted = b.pendingBlocks.len - await sleepAsync(30.seconds) - -proc advertiseLoopRunner*(b: BlockExcEngine) {.async.} = - proc onBlock(cid: Cid) {.async.} = - try: - await b.advertiseQueue.put(cid) - except CatchableError as exc: - trace "Exception listing blocks", exc = exc.msg - - while b.blockexcRunning: - await b.localStore.listBlocks(onBlock) - await sleepAsync(30.seconds) - - trace "Exiting advertise task loop" - -proc advertiseTaskRunner(b: BlockExcEngine) {.async.} = - ## Run advertise tasks - ## - - while b.blockexcRunning: - try: - let cid = await b.advertiseQueue.get() - await b.discovery.provideBlock(cid) - except CatchableError as exc: - trace "Exception in advertise task runner", exc = exc.msg - - trace "Exiting advertise task runner" - -proc discoveryTaskRunner(b: BlockExcEngine) {.async.} = - ## Run discovery tasks - ## - - while b.blockexcRunning: - try: - let - cid = await b.discoveryQueue.get() - providers = await b.discovery - .findBlockProviders(cid) - .wait(DefaultDiscoveryTimeout) - - await allFuturesThrowing( - allFinished(providers.mapIt( b.network.dialPeer(it.data) ))) - except CatchableError as exc: - trace "Exception in discovery task runner", exc = exc.msg - - trace "Exiting discovery task runner" - -proc queueFindBlocksReq(b: BlockExcEngine, cids: seq[Cid]) {.async.} = - try: - for cid in cids: - await b.discoveryQueue.put(cid) - except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg - -proc queueProvideBlocksReq(b: BlockExcEngine, cids: seq[Cid]) {.async.} = - try: - for cid in cids: - await b.advertiseQueue.put(cid) - except CatchableError as exc: - trace "Exception queueing discovery request", exc = exc.msg +proc advertiseLoop(b: BlockExcEngine): Future[void] {.gcsafe.} proc start*(b: BlockExcEngine) {.async.} = ## Start the blockexc task @@ -177,14 +116,13 @@ proc start*(b: BlockExcEngine) {.async.} = for i in 0.. 0 + debug "Requesting block from peer", providerCount = discovery.provides.len, + peer = discovery.provides[0], cid # request block b.network.request.sendWantList( - blockPeer.id, + discovery.provides[0], @[cid], wantType = WantType.wantBlock) # we want this remote to send us a block - if peers.len == 0: - trace "Not enough peers to send want list to", cid = $cid - asyncSpawn b.queueFindBlocksReq(@[cid]) - return blk # no peers to send wants to - - # filter out the peer we've already requested from - let stop = min(peers.high, b.peersPerRequest) - trace "Sending want list requests to remaining peers", count = stop + 1 - for p in peers[0..stop]: - if cid notin p.peerHave: - # just send wants - b.network.request.sendWantList( - p.id, - @[cid], - wantType = WantType.wantHave) # we only want to know if the peer has the block - - return blk + #TODO substract the discovery time + return await blk.wait(timeout) proc blockPresenceHandler*( b: BlockExcEngine, @@ -291,25 +298,18 @@ proc blockPresenceHandler*( ## Handle block presence ## - trace "Received presence update for peer", peer let peerCtx = b.getPeerCtx(peer) - if isNil(peerCtx): - return for blk in blocks: if presence =? Presence.init(blk): - peerCtx.updatePresence(presence) - - let - cids = toSeq(b.pendingBlocks.wantList).filterIt( - it in peerCtx.peerHave - ) - - if cids.len > 0: - b.network.request.sendWantList( - peerCtx.id, - cids, - wantType = WantType.wantBlock) # we want this remote to send us a block + if not isNil(peerCtx): + peerCtx.updatePresence(presence) + if presence.cid in b.runningDiscoveries: + let bd = b.runningDiscoveries[presence.cid] + if not presence.have: + bd.inflightIWant.excl(peer) + bd.treatedPeer.incl(peer) + bd.gotIWantResponse.fire() proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" @@ -330,11 +330,21 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = ## and schedule any new task to be ran ## - trace "Resolving blocks", blocks = blocks.len + trace "Resolving blocks" - b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) - asyncSpawn b.queueProvideBlocksReq(blocks.mapIt( it.cid )) + var gotNewBlocks = false + for bl in blocks: + if bl.cid notin b.advertisedBlocks: #TODO that's very slow, maybe a ordered hashset instead + #TODO could do some smarter ordering here (insert it just before b.advertisedIndex, or similar) + b.advertisedBlocks.add(bl.cid) + asyncSpawn b.discovery.publishProvide(bl.cid) + gotNewBlocks = true + + if gotNewBlocks: + b.pendingBlocks.resolve(blocks) + b.scheduleTasks(blocks) + + b.blockAdded.fire() proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, @@ -410,20 +420,14 @@ proc wantListHandler*( if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer -proc accountHandler*( - engine: BlockExcEngine, - peer: PeerID, - account: Account) {.async.} = +proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} = let context = engine.getPeerCtx(peer) if context.isNil: return context.account = account.some -proc paymentHandler*( - engine: BlockExcEngine, - peer: PeerId, - payment: SignedState) {.async.} = +proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} = without context =? engine.getPeerCtx(peer).option and account =? context.account: return @@ -446,8 +450,13 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = )) # broadcast our want list, the other peer will do the same - if b.pendingBlocks.len > 0: - b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) + let wantList = collect(newSeqOfCap(b.runningDiscoveries.len)): + for cid, bd in b.runningDiscoveries: + bd.inflightIWant.incl(peer) + cid + + if wantList.len > 0: + b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true) if address =? b.pricing.?address: b.network.request.sendAccount(peer, Account(address: address)) @@ -461,6 +470,31 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = # drop the peer from the peers table b.peers.keepItIf( it.id != peer ) +proc advertiseLoop(b: BlockExcEngine) {.async, gcsafe.} = + while true: + if b.advertisedIndex >= b.advertisedBlocks.len: + b.advertisedIndex = 0 + b.advertisementFrequency = BlockAdvertisementFrequency + + # check that we still have this block. + while + b.advertisedIndex < b.advertisedBlocks.len and + not(b.localStore.contains(b.advertisedBlocks[b.advertisedIndex])): + b.advertisedBlocks.delete(b.advertisedIndex) + + #publish it + if b.advertisedIndex < b.advertisedBlocks.len: + asyncSpawn b.discovery.publishProvide(b.advertisedBlocks[b.advertisedIndex]) + + inc b.advertisedIndex + let toSleep = + if b.advertisedBlocks.len > 0: + b.advertisementFrequency div b.advertisedBlocks.len + else: + 30.minutes + await sleepAsync(toSleep) or b.blockAdded.wait() + b.blockAdded.clear() + proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id @@ -482,7 +516,6 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = .mapIt(!it.read) if blocks.len > 0: - trace "Sending blocks to peer", peer = task.id, blocks = blocks.len b.network.request.sendBlocks( task.id, blocks) @@ -525,25 +558,19 @@ proc new*( discovery: Discovery, concurrentTasks = DefaultConcurrentTasks, maxRetries = DefaultMaxRetries, - peersPerRequest = DefaultMaxPeersPerRequest, - concurrentAdvReqs = DefaultConcurrentAdvertRequests, - concurrentDiscReqs = DefaultConcurrentDiscRequests): T = + peersPerRequest = DefaultMaxPeersPerRequest): T = - let - engine = BlockExcEngine( - localStore: localStore, - pendingBlocks: PendingBlocksManager.new(), - peersPerRequest: peersPerRequest, - network: network, - wallet: wallet, - concurrentTasks: concurrentTasks, - concurrentAdvReqs: concurrentAdvReqs, - concurrentDiscReqs: concurrentDiscReqs, - maxRetries: maxRetries, - taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), - discovery: discovery, - advertiseQueue: newAsyncQueue[Cid](DefaultTaskQueueSize), - discoveryQUeue: newAsyncQueue[Cid](DefaultTaskQueueSize)) + let engine = BlockExcEngine( + localStore: localStore, + pendingBlocks: PendingBlocksManager.new(), + blockAdded: newAsyncEvent(), + peersPerRequest: peersPerRequest, + network: network, + wallet: wallet, + concurrentTasks: concurrentTasks, + maxRetries: maxRetries, + discovery: discovery, + taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize)) proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: @@ -581,6 +608,7 @@ proc new*( onBlocks: blocksHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, - onPayment: paymentHandler) + onPayment: paymentHandler + ) return engine diff --git a/dagger/blockexchange/network.nim b/dagger/blockexchange/network.nim index af212c26..3f8729c6 100644 --- a/dagger/blockexchange/network.nim +++ b/dagger/blockexchange/network.nim @@ -120,16 +120,14 @@ proc broadcastWantList*( trace "Sending want list to peer", peer = id, `type` = $wantType, len = cids.len - let - wantList = makeWantList( - cids, - priority, - cancel, - wantType, - full, - sendDontHave) - b.peers.withValue(id, peer): - peer[].broadcast(Message(wantlist: wantList)) + let wantList = makeWantList( + cids, + priority, + cancel, + wantType, + full, + sendDontHave) + b.peers[id].broadcast(Message(wantlist: wantList)) proc handleBlocks( b: BlockExcNetwork, @@ -155,7 +153,9 @@ proc handleBlocks( b.handlers.onBlocks(peer.id, blks) -template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = +template makeBlocks*( + blocks: seq[bt.Block]): + seq[pb.Block] = var blks: seq[pb.Block] for blk in blocks: blks.add(pb.Block( @@ -176,8 +176,7 @@ proc broadcastBlocks*( return trace "Sending blocks to peer", peer = id, len = blocks.len - b.peers.withValue(id, peer): - peer[].broadcast(pb.Message(payload: makeBlocks(blocks))) + b.peers[id].broadcast(pb.Message(payload: makeBlocks(blocks))) proc handleBlockPresence( b: BlockExcNetwork, @@ -203,8 +202,7 @@ proc broadcastBlockPresence*( return trace "Sending presence to peer", peer = id - b.peers.withValue(id, peer): - peer[].broadcast(Message(blockPresences: @presence)) + b.peers[id].broadcast(Message(blockPresences: presence)) proc handleAccount(network: BlockExcNetwork, peer: NetworkPeer, @@ -220,8 +218,7 @@ proc broadcastAccount*(network: BlockExcNetwork, return let message = Message(account: AccountMessage.init(account)) - network.peers.withValue(id, peer): - peer[].broadcast(message) + network.peers[id].broadcast(message) proc broadcastPayment*(network: BlockExcNetwork, id: PeerId, @@ -230,8 +227,7 @@ proc broadcastPayment*(network: BlockExcNetwork, return let message = Message(payment: StateChannelUpdate.init(payment)) - network.peers.withValue(id, peer): - peer[].broadcast(message) + network.peers[id].broadcast(message) proc handlePayment(network: BlockExcNetwork, peer: NetworkPeer, @@ -265,7 +261,7 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerID): NetworkPeer = ## if peer in b.peers: - return b.peers.getOrDefault(peer, nil) + return b.peers[peer] var getConn = proc(): Future[Connection] {.async.} = try: @@ -367,7 +363,8 @@ proc new*( sendBlocks: sendBlocks, sendPresence: sendPresence, sendAccount: sendAccount, - sendPayment: sendPayment) + sendPayment: sendPayment + ) b.init() return b diff --git a/dagger/blockexchange/networkpeer.nim b/dagger/blockexchange/networkpeer.nim index 2ef8d473..269291cc 100644 --- a/dagger/blockexchange/networkpeer.nim +++ b/dagger/blockexchange/networkpeer.nim @@ -17,7 +17,7 @@ import ./protobuf/blockexc logScope: topics = "dagger blockexc networkpeer" -const MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big +const MaxMessageSize = 8 * 1024 * 1024 type RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} diff --git a/dagger/blockexchange/peercontext.nim b/dagger/blockexchange/peercontext.nim index 2fb24216..99c7b15f 100644 --- a/dagger/blockexchange/peercontext.nim +++ b/dagger/blockexchange/peercontext.nim @@ -13,12 +13,12 @@ export payments, nitro type BlockExcPeerCtx* = ref object of RootObj id*: PeerID - peerPrices*: Table[Cid, UInt256] # remote peer have list including price - peerWants*: seq[Entry] # remote peers want lists - exchanged*: int # times peer has exchanged with us - lastExchange*: Moment # last time peer has exchanged with us - account*: ?Account # ethereum account of this peer - paymentChannel*: ?ChannelId # payment channel id + peerPrices*: Table[Cid, UInt256] # remote peer have list including price + peerWants*: seq[Entry] # remote peers want lists + exchanged*: int # times peer has exchanged with us + lastExchange*: Moment # last time peer has exchanged with us + account*: ?Account # ethereum account of this peer + paymentChannel*: ?ChannelId # payment channel id proc peerHave*(context: BlockExcPeerCtx): seq[Cid] = toSeq(context.peerPrices.keys) diff --git a/dagger/blockexchange/pendingblocks.nim b/dagger/blockexchange/pendingblocks.nim index 3d504f9b..3b426bb8 100644 --- a/dagger/blockexchange/pendingblocks.nim +++ b/dagger/blockexchange/pendingblocks.nim @@ -8,11 +8,6 @@ ## those terms. import std/tables -import std/sequtils - -import pkg/upraises - -push: {.upraises: [].} import pkg/questionable import pkg/chronicles @@ -24,22 +19,18 @@ import ../blocktype logScope: topics = "dagger blockexc pendingblocks" -const - DefaultBlockTimeout* = 10.minutes - type PendingBlocksManager* = ref object of RootObj blocks*: Table[Cid, Future[Block]] # pending Block requests -proc getWantHandle*( +proc addOrAwait*( p: PendingBlocksManager, - cid: Cid, - timeout = DefaultBlockTimeout): Future[Block] {.async.} = + cid: Cid): Future[Block] {.async.} = ## Add an event for a block ## if cid notin p.blocks: - p.blocks[cid] = newFuture[Block]().wait(timeout) + p.blocks[cid] = newFuture[Block]() trace "Adding pending future for block", cid try: @@ -61,11 +52,11 @@ proc resolve*( for blk in blocks: # resolve any pending blocks if blk.cid in p.blocks: - p.blocks.withValue(blk.cid, pending): - if not pending[].finished: - trace "Resolving block", cid = $blk.cid - pending[].complete(blk) - p.blocks.del(blk.cid) + let pending = p.blocks[blk.cid] + if not pending.finished: + trace "Resolving block", cid = $blk.cid + pending.complete(blk) + p.blocks.del(blk.cid) proc pending*( p: PendingBlocksManager, @@ -75,17 +66,6 @@ proc contains*( p: PendingBlocksManager, cid: Cid): bool = p.pending(cid) -iterator wantList*(p: PendingBlocksManager): Cid = - for k in p.blocks.keys: - yield k - -iterator wantHandles*(p: PendingBlocksManager): Future[Block] = - for v in p.blocks.values: - yield v - -func len*(p: PendingBlocksManager): int = - p.blocks.len - func new*(T: type PendingBlocksManager): T = T( blocks: initTable[Cid, Future[Block]]() diff --git a/dagger/dagger.nim b/dagger/dagger.nim index 48ee6eac..db9a0222 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -123,7 +123,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery) restServer = RestServerRef.new( - daggerNode.initRestApi(config), + daggerNode.initRestApi(), initTAddress("127.0.0.1" , config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high) diff --git a/dagger/discovery.nim b/dagger/discovery.nim index 96c961a9..60d9f999 100644 --- a/dagger/discovery.nim +++ b/dagger/discovery.nim @@ -8,20 +8,18 @@ ## those terms. import pkg/chronos -import pkg/chronicles import pkg/libp2p import pkg/questionable import pkg/questionable/results import pkg/stew/shims/net import pkg/libp2pdht/discv5/protocol as discv5 -import ./rng -import ./errors +import rng export discv5 type - Discovery* = ref object of RootObj + Discovery* = ref object protocol: discv5.Protocol localInfo: PeerInfo @@ -57,33 +55,15 @@ proc toDiscoveryId*(cid: Cid): NodeId = ## To discovery id readUintBE[256](keccak256.digest(cid.data.buffer).data) -method findBlockProviders*( +proc findBlockProviders*( d: Discovery, - cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = - ## Find block providers - ## + cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = + return (await d.protocol.getProviders(cid.toDiscoveryId())).get() - trace "Finding providers for block", cid = $cid - without providers =? - (await d.protocol.getProviders(cid.toDiscoveryId())).mapFailure, error: - trace "Error finding providers for block", cid = $cid, error = error.msg +proc publishProvide*(d: Discovery, cid: Cid) {.async.} = + let bid = cid.toDiscoveryId() + discard await d.protocol.addProvider(bid, d.localInfo.signedPeerRecord) - return providers - -method provideBlock*(d: Discovery, cid: Cid) {.async, base.} = - ## Provide a bock Cid - ## - - trace "Providing block", cid = $cid - let - nodes = await d.protocol.addProvider( - cid.toDiscoveryId(), - d.localInfo.signedPeerRecord) - - if nodes.len <= 0: - trace "Couldn't provide to any nodes!" - - trace "Provided to nodes", nodes = nodes.len proc start*(d: Discovery) {.async.} = d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR") diff --git a/dagger/erasure/erasure.nim b/dagger/erasure/erasure.nim index cf52ba94..bb8970c4 100644 --- a/dagger/erasure/erasure.nim +++ b/dagger/erasure/erasure.nim @@ -109,7 +109,7 @@ proc encode*( # TODO: this is a tight blocking loop so we sleep here to allow # other events to be processed, this should be addressed # by threading - await sleepAsync(100.millis) + await sleepAsync(10.millis) for j in 0.. 0 suite "NetworkStore - multiple nodes": - let - chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] @@ -219,9 +213,10 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - let - pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) - pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) + for b in blocks[0..3]: + discard engine.discoverBlock(b.cid) + for b in blocks[12..15]: + discard engine.discoverBlock(b.cid) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -235,16 +230,12 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) - await allFuturesThrowing( - allFinished(pendingBlocks1), - allFinished(pendingBlocks2)) - check: engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string]) + blocks[0..3].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string]) + blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) test "should exchange blocks with multiple nodes": let @@ -252,9 +243,10 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - let - pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) - pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) + for b in blocks[0..3]: + discard engine.discoverBlock(b.cid) + for b in blocks[12..15]: + discard engine.discoverBlock(b.cid) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -268,9 +260,74 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) - await allFuturesThrowing( - allFinished(pendingBlocks1), - allFinished(pendingBlocks2)) + let wantListBlocks = await allFinished( + blocks[0..3].mapIt( downloader.getBlock(it.cid) )) + check wantListBlocks.mapIt( !it.read ) == blocks[0..3] - check pendingBlocks1.mapIt( it.read ) == blocks[0..3] - check pendingBlocks2.mapIt( it.read ) == blocks[12..15] +suite "NetworkStore - discovery": + let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + + var + switch: seq[Switch] + blockexc: seq[NetworkStore] + blocks: seq[bt.Block] + + setup: + while true: + let chunk = await chunker.getBytes() + if chunk.len <= 0: + break + + blocks.add(bt.Block.new(chunk).tryGet()) + + for e in generateNodes(4): + switch.add(e.switch) + blockexc.add(e.blockexc) + await e.blockexc.engine.start() + + await allFuturesThrowing( + switch.mapIt( it.start() ) + ) + + teardown: + await allFuturesThrowing( + switch.mapIt( it.stop() ) + ) + + switch = @[] + blockexc = @[] + + test "Shouldn't launch discovery request if we are already connected": + await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks) + blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = + check false + await connectNodes(switch) + let blk = await blockexc[1].engine.requestBlock(blocks[0].cid) + + test "E2E discovery": + # Distribute the blocks amongst 1..3 + # Ask 0 to download everything without connecting him beforehand + + var advertised: Table[Cid, SignedPeerRecord] + + blockexc[1].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = + advertised[cid] = switch[1].peerInfo.signedPeerRecord + + blockexc[2].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = + advertised[cid] = switch[2].peerInfo.signedPeerRecord + + blockexc[3].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = + advertised[cid] = switch[3].peerInfo.signedPeerRecord + + await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) + await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10]) + await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15]) + + blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = + if cid in advertised: + result.add(advertised[cid]) + + let futs = collect(newSeq): + for b in blocks: + blockexc[0].engine.requestBlock(b.cid) + await allFutures(futs) diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index d741a120..9940c9c6 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -66,9 +66,8 @@ suite "NetworkStore engine basic": wallet, network, discovery) - for b in blocks: - discard engine.pendingBlocks.getWantHandle(b.cid) + discard engine.discoverBlock(b.cid) engine.setupPeer(peerId) await done @@ -172,7 +171,7 @@ suite "NetworkStore engine handlers": test "stores blocks in local store": let pending = blocks.mapIt( - engine.pendingBlocks.getWantHandle( it.cid ) + engine.pendingBlocks.addOrAwait( it.cid ) ) await engine.blocksHandler(peerId, blocks) diff --git a/tests/dagger/helpers/nodeutils.nim b/tests/dagger/helpers/nodeutils.nim index f190746f..e1ce5027 100644 --- a/tests/dagger/helpers/nodeutils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -11,8 +11,10 @@ import ../examples proc generateNodes*( num: Natural, - blocks: openArray[bt.Block] = []): - seq[tuple[switch: Switch, blockexc: NetworkStore]] = + blocks: openArray[bt.Block] = [], + secureManagers: openarray[SecureProtocol] = [ + SecureProtocol.Noise, + ]): seq[tuple[switch: Switch, blockexc: NetworkStore]] = for i in 0..