From 4740ffc144916b8a47a1c386e2e8eda728355ce9 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 19 Apr 2022 21:46:44 -0600 Subject: [PATCH] Cleanup engine and rework discovery (#87) * rework discovery with async queues * misc style changes * increase max message size for large manifests * use upraises and avoid exceptions on key access * increase sleep time to 100 millis * pass config * make list blocks trigger a callback on each block * check for nil on start/stop * fix tests and split out discovery tests * don't auto mount network * add list block tests * add discovery tests * rework moc discovery * move discovery moc to disc dir * don't force logging syncs * don't force moc discovery on all tests * rework discovery with methods * add top level utils file * don't use asyncCheck * don't pass entire blocks to list blocks calback * spelling --- config.nims | 4 - dagger/blockexchange/engine.nim | 430 ++++++++---------- dagger/blockexchange/network.nim | 39 +- dagger/blockexchange/networkpeer.nim | 2 +- dagger/blockexchange/peercontext.nim | 12 +- dagger/blockexchange/pendingblocks.nim | 36 +- dagger/dagger.nim | 2 +- dagger/discovery.nim | 36 +- dagger/erasure/erasure.nim | 2 +- dagger/node.nim | 30 +- dagger/rest/api.nim | 7 +- dagger/stores/blockstore.nim | 3 +- dagger/stores/cachestore.nim | 5 +- dagger/stores/fsstore.nim | 26 +- dagger/stores/networkstore.nim | 6 +- dagger/utils.nim | 4 + tests/config.nims | 1 - .../discovery}/mockdiscovery.nim | 27 +- .../blockexc/discovery/testdiscovery.nim | 192 ++++++++ tests/dagger/blockexc/testblockexc.nim | 117 ++--- tests/dagger/blockexc/testengine.nim | 5 +- tests/dagger/helpers/nodeutils.nim | 8 +- tests/dagger/stores/testcachestore.nim | 12 + tests/dagger/stores/testfsstore.nim | 6 +- tests/dagger/testblockexc.nim | 1 + 25 files changed, 597 insertions(+), 416 deletions(-) create mode 100644 dagger/utils.nim delete mode 100644 tests/config.nims rename tests/dagger/{ => blockexc/discovery}/mockdiscovery.nim (60%) create mode 100644 tests/dagger/blockexc/discovery/testdiscovery.nim diff --git a/config.nims b/config.nims index 35023252..a906bd99 100644 --- a/config.nims +++ b/config.nims @@ -68,10 +68,6 @@ switch("warning", "ObservableStores:off") switch("warning", "LockLevel:off") switch("define", "libp2p_pki_schemes=secp256k1") -#TODO this infects everything in this folder, ideally it would only -# apply to dagger.nim, but since dagger.nims is used for other purpose -# we can't use it. And dagger.cfg doesn't work -switch("define", "chronicles_sinks=textlines[dynamic],json[dynamic]") # begin Nimble config (version 1) when system.fileExists("nimble.paths"): diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index 7be6167e..4a86c63a 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -7,7 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[sequtils, sets, tables, sugar] +import std/sequtils +import std/sets import pkg/chronos import pkg/chronicles @@ -15,7 +16,7 @@ import pkg/libp2p import ../stores/blockstore import ../blocktype as bt -import ../utils/asyncheapqueue +import ../utils import ../discovery import ./protobuf/blockexc @@ -32,31 +33,19 @@ logScope: topics = "dagger blockexc engine" const - DefaultBlockTimeout* = 5.minutes DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 DefaultMaxRetries = 3 - - # Current advertisement is meant to be more efficient than - # correct, so blocks could be advertised more slowly than that - # Put some margin - BlockAdvertisementFrequency = 30.minutes + DefaultConcurrentDiscRequests = 10 + DefaultConcurrentAdvertRequests = 10 + DefaultDiscoveryTimeout = 1.minutes + DefaultMaxQueriedBlocksCache = 1000 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} - BlockDiscovery* = ref object - discoveredProvider: AsyncEvent - discoveryLoop: Future[void] - toDiscover: Cid - treatedPeer: HashSet[PeerId] - inflightIWant: HashSet[PeerId] - gotIWantResponse: AsyncEvent - provides: seq[PeerId] - lastDhtQuery: Moment - BlockExcEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance network*: BlockExcNetwork # network interface @@ -70,12 +59,15 @@ type peersPerRequest: int # max number of peers to request from wallet*: WalletRef # nitro wallet for micropayments pricing*: ?Pricing # optional bandwidth pricing - advertisedBlocks: seq[Cid] - advertisedIndex: int - advertisementFrequency: Duration - runningDiscoveries*: Table[Cid, BlockDiscovery] - blockAdded: AsyncEvent - discovery*: Discovery + discovery*: Discovery # Discovery interface + concurrentAdvReqs: int # Concurrent advertise requests + advertiseLoop*: Future[void] # Advertise loop task handle + advertiseQueue*: AsyncQueue[Cid] # Advertise queue + advertiseTasks*: seq[Future[void]] # Advertise tasks + concurrentDiscReqs: int # Concurrent discovery requests + discoveryLoop*: Future[void] # Discovery loop task handle + discoveryTasks*: seq[Future[void]] # Discovery tasks + discoveryQueue*: AsyncQueue[Cid] # Discovery queue Pricing* = object address*: EthAddress @@ -100,7 +92,76 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = b.taskQueue.pushOrUpdateNoWait(task).isOk() proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} -proc advertiseLoop(b: BlockExcEngine): Future[void] {.gcsafe.} + +proc discoveryLoopRunner(b: BlockExcEngine) {.async.} = + while b.blockexcRunning: + for cid in toSeq(b.pendingBlocks.wantList): + try: + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception in discovery loop", exc = exc.msg + + trace "About to sleep, number of wanted blocks", wanted = b.pendingBlocks.len + await sleepAsync(30.seconds) + +proc advertiseLoopRunner*(b: BlockExcEngine) {.async.} = + proc onBlock(cid: Cid) {.async.} = + try: + await b.advertiseQueue.put(cid) + except CatchableError as exc: + trace "Exception listing blocks", exc = exc.msg + + while b.blockexcRunning: + await b.localStore.listBlocks(onBlock) + await sleepAsync(30.seconds) + + trace "Exiting advertise task loop" + +proc advertiseTaskRunner(b: BlockExcEngine) {.async.} = + ## Run advertise tasks + ## + + while b.blockexcRunning: + try: + let cid = await b.advertiseQueue.get() + await b.discovery.provideBlock(cid) + except CatchableError as exc: + trace "Exception in advertise task runner", exc = exc.msg + + trace "Exiting advertise task runner" + +proc discoveryTaskRunner(b: BlockExcEngine) {.async.} = + ## Run discovery tasks + ## + + while b.blockexcRunning: + try: + let + cid = await b.discoveryQueue.get() + providers = await b.discovery + .findBlockProviders(cid) + .wait(DefaultDiscoveryTimeout) + + await allFuturesThrowing( + allFinished(providers.mapIt( b.network.dialPeer(it.data) ))) + except CatchableError as exc: + trace "Exception in discovery task runner", exc = exc.msg + + trace "Exiting discovery task runner" + +proc queueFindBlocksReq(b: BlockExcEngine, cids: seq[Cid]) {.async.} = + try: + for cid in cids: + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg + +proc queueProvideBlocksReq(b: BlockExcEngine, cids: seq[Cid]) {.async.} = + try: + for cid in cids: + await b.advertiseQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg proc start*(b: BlockExcEngine) {.async.} = ## Start the blockexc task @@ -116,13 +177,14 @@ proc start*(b: BlockExcEngine) {.async.} = for i in 0.. 0 + peers.keepItIf( + it != blockPeer + ) - debug "Requesting block from peer", providerCount = discovery.provides.len, - peer = discovery.provides[0], cid # request block b.network.request.sendWantList( - discovery.provides[0], + blockPeer.id, @[cid], wantType = WantType.wantBlock) # we want this remote to send us a block - #TODO substract the discovery time - return await blk.wait(timeout) + if peers.len == 0: + trace "Not enough peers to send want list to", cid = $cid + asyncSpawn b.queueFindBlocksReq(@[cid]) + return blk # no peers to send wants to + + # filter out the peer we've already requested from + let stop = min(peers.high, b.peersPerRequest) + trace "Sending want list requests to remaining peers", count = stop + 1 + for p in peers[0..stop]: + if cid notin p.peerHave: + # just send wants + b.network.request.sendWantList( + p.id, + @[cid], + wantType = WantType.wantHave) # we only want to know if the peer has the block + + return blk proc blockPresenceHandler*( b: BlockExcEngine, @@ -298,18 +291,25 @@ proc blockPresenceHandler*( ## Handle block presence ## + trace "Received presence update for peer", peer let peerCtx = b.getPeerCtx(peer) + if isNil(peerCtx): + return for blk in blocks: if presence =? Presence.init(blk): - if not isNil(peerCtx): - peerCtx.updatePresence(presence) - if presence.cid in b.runningDiscoveries: - let bd = b.runningDiscoveries[presence.cid] - if not presence.have: - bd.inflightIWant.excl(peer) - bd.treatedPeer.incl(peer) - bd.gotIWantResponse.fire() + peerCtx.updatePresence(presence) + + let + cids = toSeq(b.pendingBlocks.wantList).filterIt( + it in peerCtx.peerHave + ) + + if cids.len > 0: + b.network.request.sendWantList( + peerCtx.id, + cids, + wantType = WantType.wantBlock) # we want this remote to send us a block proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" @@ -330,21 +330,11 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = ## and schedule any new task to be ran ## - trace "Resolving blocks" + trace "Resolving blocks", blocks = blocks.len - var gotNewBlocks = false - for bl in blocks: - if bl.cid notin b.advertisedBlocks: #TODO that's very slow, maybe a ordered hashset instead - #TODO could do some smarter ordering here (insert it just before b.advertisedIndex, or similar) - b.advertisedBlocks.add(bl.cid) - asyncSpawn b.discovery.publishProvide(bl.cid) - gotNewBlocks = true - - if gotNewBlocks: - b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) - - b.blockAdded.fire() + b.pendingBlocks.resolve(blocks) + b.scheduleTasks(blocks) + asyncSpawn b.queueProvideBlocksReq(blocks.mapIt( it.cid )) proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, @@ -420,14 +410,20 @@ proc wantListHandler*( if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer -proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} = +proc accountHandler*( + engine: BlockExcEngine, + peer: PeerID, + account: Account) {.async.} = let context = engine.getPeerCtx(peer) if context.isNil: return context.account = account.some -proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} = +proc paymentHandler*( + engine: BlockExcEngine, + peer: PeerId, + payment: SignedState) {.async.} = without context =? engine.getPeerCtx(peer).option and account =? context.account: return @@ -450,13 +446,8 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = )) # broadcast our want list, the other peer will do the same - let wantList = collect(newSeqOfCap(b.runningDiscoveries.len)): - for cid, bd in b.runningDiscoveries: - bd.inflightIWant.incl(peer) - cid - - if wantList.len > 0: - b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true) + if b.pendingBlocks.len > 0: + b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) if address =? b.pricing.?address: b.network.request.sendAccount(peer, Account(address: address)) @@ -470,31 +461,6 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = # drop the peer from the peers table b.peers.keepItIf( it.id != peer ) -proc advertiseLoop(b: BlockExcEngine) {.async, gcsafe.} = - while true: - if b.advertisedIndex >= b.advertisedBlocks.len: - b.advertisedIndex = 0 - b.advertisementFrequency = BlockAdvertisementFrequency - - # check that we still have this block. - while - b.advertisedIndex < b.advertisedBlocks.len and - not(b.localStore.contains(b.advertisedBlocks[b.advertisedIndex])): - b.advertisedBlocks.delete(b.advertisedIndex) - - #publish it - if b.advertisedIndex < b.advertisedBlocks.len: - asyncSpawn b.discovery.publishProvide(b.advertisedBlocks[b.advertisedIndex]) - - inc b.advertisedIndex - let toSleep = - if b.advertisedBlocks.len > 0: - b.advertisementFrequency div b.advertisedBlocks.len - else: - 30.minutes - await sleepAsync(toSleep) or b.blockAdded.wait() - b.blockAdded.clear() - proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id @@ -516,6 +482,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = .mapIt(!it.read) if blocks.len > 0: + trace "Sending blocks to peer", peer = task.id, blocks = blocks.len b.network.request.sendBlocks( task.id, blocks) @@ -558,19 +525,25 @@ proc new*( discovery: Discovery, concurrentTasks = DefaultConcurrentTasks, maxRetries = DefaultMaxRetries, - peersPerRequest = DefaultMaxPeersPerRequest): T = + peersPerRequest = DefaultMaxPeersPerRequest, + concurrentAdvReqs = DefaultConcurrentAdvertRequests, + concurrentDiscReqs = DefaultConcurrentDiscRequests): T = - let engine = BlockExcEngine( - localStore: localStore, - pendingBlocks: PendingBlocksManager.new(), - blockAdded: newAsyncEvent(), - peersPerRequest: peersPerRequest, - network: network, - wallet: wallet, - concurrentTasks: concurrentTasks, - maxRetries: maxRetries, - discovery: discovery, - taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize)) + let + engine = BlockExcEngine( + localStore: localStore, + pendingBlocks: PendingBlocksManager.new(), + peersPerRequest: peersPerRequest, + network: network, + wallet: wallet, + concurrentTasks: concurrentTasks, + concurrentAdvReqs: concurrentAdvReqs, + concurrentDiscReqs: concurrentDiscReqs, + maxRetries: maxRetries, + taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), + discovery: discovery, + advertiseQueue: newAsyncQueue[Cid](DefaultTaskQueueSize), + discoveryQUeue: newAsyncQueue[Cid](DefaultTaskQueueSize)) proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: @@ -608,7 +581,6 @@ proc new*( onBlocks: blocksHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, - onPayment: paymentHandler - ) + onPayment: paymentHandler) return engine diff --git a/dagger/blockexchange/network.nim b/dagger/blockexchange/network.nim index 3f8729c6..af212c26 100644 --- a/dagger/blockexchange/network.nim +++ b/dagger/blockexchange/network.nim @@ -120,14 +120,16 @@ proc broadcastWantList*( trace "Sending want list to peer", peer = id, `type` = $wantType, len = cids.len - let wantList = makeWantList( - cids, - priority, - cancel, - wantType, - full, - sendDontHave) - b.peers[id].broadcast(Message(wantlist: wantList)) + let + wantList = makeWantList( + cids, + priority, + cancel, + wantType, + full, + sendDontHave) + b.peers.withValue(id, peer): + peer[].broadcast(Message(wantlist: wantList)) proc handleBlocks( b: BlockExcNetwork, @@ -153,9 +155,7 @@ proc handleBlocks( b.handlers.onBlocks(peer.id, blks) -template makeBlocks*( - blocks: seq[bt.Block]): - seq[pb.Block] = +template makeBlocks*(blocks: seq[bt.Block]): seq[pb.Block] = var blks: seq[pb.Block] for blk in blocks: blks.add(pb.Block( @@ -176,7 +176,8 @@ proc broadcastBlocks*( return trace "Sending blocks to peer", peer = id, len = blocks.len - b.peers[id].broadcast(pb.Message(payload: makeBlocks(blocks))) + b.peers.withValue(id, peer): + peer[].broadcast(pb.Message(payload: makeBlocks(blocks))) proc handleBlockPresence( b: BlockExcNetwork, @@ -202,7 +203,8 @@ proc broadcastBlockPresence*( return trace "Sending presence to peer", peer = id - b.peers[id].broadcast(Message(blockPresences: presence)) + b.peers.withValue(id, peer): + peer[].broadcast(Message(blockPresences: @presence)) proc handleAccount(network: BlockExcNetwork, peer: NetworkPeer, @@ -218,7 +220,8 @@ proc broadcastAccount*(network: BlockExcNetwork, return let message = Message(account: AccountMessage.init(account)) - network.peers[id].broadcast(message) + network.peers.withValue(id, peer): + peer[].broadcast(message) proc broadcastPayment*(network: BlockExcNetwork, id: PeerId, @@ -227,7 +230,8 @@ proc broadcastPayment*(network: BlockExcNetwork, return let message = Message(payment: StateChannelUpdate.init(payment)) - network.peers[id].broadcast(message) + network.peers.withValue(id, peer): + peer[].broadcast(message) proc handlePayment(network: BlockExcNetwork, peer: NetworkPeer, @@ -261,7 +265,7 @@ proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerID): NetworkPeer = ## if peer in b.peers: - return b.peers[peer] + return b.peers.getOrDefault(peer, nil) var getConn = proc(): Future[Connection] {.async.} = try: @@ -363,8 +367,7 @@ proc new*( sendBlocks: sendBlocks, sendPresence: sendPresence, sendAccount: sendAccount, - sendPayment: sendPayment - ) + sendPayment: sendPayment) b.init() return b diff --git a/dagger/blockexchange/networkpeer.nim b/dagger/blockexchange/networkpeer.nim index 269291cc..2ef8d473 100644 --- a/dagger/blockexchange/networkpeer.nim +++ b/dagger/blockexchange/networkpeer.nim @@ -17,7 +17,7 @@ import ./protobuf/blockexc logScope: topics = "dagger blockexc networkpeer" -const MaxMessageSize = 8 * 1024 * 1024 +const MaxMessageSize = 100 * 1024 * 1024 # manifest files can be big type RPCHandler* = proc(peer: NetworkPeer, msg: Message): Future[void] {.gcsafe.} diff --git a/dagger/blockexchange/peercontext.nim b/dagger/blockexchange/peercontext.nim index 99c7b15f..2fb24216 100644 --- a/dagger/blockexchange/peercontext.nim +++ b/dagger/blockexchange/peercontext.nim @@ -13,12 +13,12 @@ export payments, nitro type BlockExcPeerCtx* = ref object of RootObj id*: PeerID - peerPrices*: Table[Cid, UInt256] # remote peer have list including price - peerWants*: seq[Entry] # remote peers want lists - exchanged*: int # times peer has exchanged with us - lastExchange*: Moment # last time peer has exchanged with us - account*: ?Account # ethereum account of this peer - paymentChannel*: ?ChannelId # payment channel id + peerPrices*: Table[Cid, UInt256] # remote peer have list including price + peerWants*: seq[Entry] # remote peers want lists + exchanged*: int # times peer has exchanged with us + lastExchange*: Moment # last time peer has exchanged with us + account*: ?Account # ethereum account of this peer + paymentChannel*: ?ChannelId # payment channel id proc peerHave*(context: BlockExcPeerCtx): seq[Cid] = toSeq(context.peerPrices.keys) diff --git a/dagger/blockexchange/pendingblocks.nim b/dagger/blockexchange/pendingblocks.nim index 3b426bb8..3d504f9b 100644 --- a/dagger/blockexchange/pendingblocks.nim +++ b/dagger/blockexchange/pendingblocks.nim @@ -8,6 +8,11 @@ ## those terms. import std/tables +import std/sequtils + +import pkg/upraises + +push: {.upraises: [].} import pkg/questionable import pkg/chronicles @@ -19,18 +24,22 @@ import ../blocktype logScope: topics = "dagger blockexc pendingblocks" +const + DefaultBlockTimeout* = 10.minutes + type PendingBlocksManager* = ref object of RootObj blocks*: Table[Cid, Future[Block]] # pending Block requests -proc addOrAwait*( +proc getWantHandle*( p: PendingBlocksManager, - cid: Cid): Future[Block] {.async.} = + cid: Cid, + timeout = DefaultBlockTimeout): Future[Block] {.async.} = ## Add an event for a block ## if cid notin p.blocks: - p.blocks[cid] = newFuture[Block]() + p.blocks[cid] = newFuture[Block]().wait(timeout) trace "Adding pending future for block", cid try: @@ -52,11 +61,11 @@ proc resolve*( for blk in blocks: # resolve any pending blocks if blk.cid in p.blocks: - let pending = p.blocks[blk.cid] - if not pending.finished: - trace "Resolving block", cid = $blk.cid - pending.complete(blk) - p.blocks.del(blk.cid) + p.blocks.withValue(blk.cid, pending): + if not pending[].finished: + trace "Resolving block", cid = $blk.cid + pending[].complete(blk) + p.blocks.del(blk.cid) proc pending*( p: PendingBlocksManager, @@ -66,6 +75,17 @@ proc contains*( p: PendingBlocksManager, cid: Cid): bool = p.pending(cid) +iterator wantList*(p: PendingBlocksManager): Cid = + for k in p.blocks.keys: + yield k + +iterator wantHandles*(p: PendingBlocksManager): Future[Block] = + for v in p.blocks.values: + yield v + +func len*(p: PendingBlocksManager): int = + p.blocks.len + func new*(T: type PendingBlocksManager): T = T( blocks: initTable[Cid, Future[Block]]() diff --git a/dagger/dagger.nim b/dagger/dagger.nim index db9a0222..48ee6eac 100644 --- a/dagger/dagger.nim +++ b/dagger/dagger.nim @@ -123,7 +123,7 @@ proc new*(T: type DaggerServer, config: DaggerConf): T = erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) daggerNode = DaggerNodeRef.new(switch, store, engine, erasure, discovery) restServer = RestServerRef.new( - daggerNode.initRestApi(), + daggerNode.initRestApi(config), initTAddress("127.0.0.1" , config.apiPort), bufferSize = (1024 * 64), maxRequestBodySize = int.high) diff --git a/dagger/discovery.nim b/dagger/discovery.nim index 60d9f999..96c961a9 100644 --- a/dagger/discovery.nim +++ b/dagger/discovery.nim @@ -8,18 +8,20 @@ ## those terms. import pkg/chronos +import pkg/chronicles import pkg/libp2p import pkg/questionable import pkg/questionable/results import pkg/stew/shims/net import pkg/libp2pdht/discv5/protocol as discv5 -import rng +import ./rng +import ./errors export discv5 type - Discovery* = ref object + Discovery* = ref object of RootObj protocol: discv5.Protocol localInfo: PeerInfo @@ -55,15 +57,33 @@ proc toDiscoveryId*(cid: Cid): NodeId = ## To discovery id readUintBE[256](keccak256.digest(cid.data.buffer).data) -proc findBlockProviders*( +method findBlockProviders*( d: Discovery, - cid: Cid): Future[seq[SignedPeerRecord]] {.async.} = - return (await d.protocol.getProviders(cid.toDiscoveryId())).get() + cid: Cid): Future[seq[SignedPeerRecord]] {.async, base.} = + ## Find block providers + ## -proc publishProvide*(d: Discovery, cid: Cid) {.async.} = - let bid = cid.toDiscoveryId() - discard await d.protocol.addProvider(bid, d.localInfo.signedPeerRecord) + trace "Finding providers for block", cid = $cid + without providers =? + (await d.protocol.getProviders(cid.toDiscoveryId())).mapFailure, error: + trace "Error finding providers for block", cid = $cid, error = error.msg + return providers + +method provideBlock*(d: Discovery, cid: Cid) {.async, base.} = + ## Provide a bock Cid + ## + + trace "Providing block", cid = $cid + let + nodes = await d.protocol.addProvider( + cid.toDiscoveryId(), + d.localInfo.signedPeerRecord) + + if nodes.len <= 0: + trace "Couldn't provide to any nodes!" + + trace "Provided to nodes", nodes = nodes.len proc start*(d: Discovery) {.async.} = d.protocol.updateRecord(d.localInfo.signedPeerRecord).expect("updating SPR") diff --git a/dagger/erasure/erasure.nim b/dagger/erasure/erasure.nim index bb8970c4..cf52ba94 100644 --- a/dagger/erasure/erasure.nim +++ b/dagger/erasure/erasure.nim @@ -109,7 +109,7 @@ proc encode*( # TODO: this is a tight blocking loop so we sleep here to allow # other events to be processed, this should be addressed # by threading - await sleepAsync(10.millis) + await sleepAsync(100.millis) for j in 0.. 0 suite "NetworkStore - multiple nodes": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) + let + chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) var switch: seq[Switch] @@ -213,10 +219,9 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - for b in blocks[0..3]: - discard engine.discoverBlock(b.cid) - for b in blocks[12..15]: - discard engine.discoverBlock(b.cid) + let + pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -230,12 +235,16 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) + await allFuturesThrowing( + allFinished(pendingBlocks1), + allFinished(pendingBlocks2)) + check: engine.peers[0].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[0..3].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) + blocks[0..3].mapIt( $(it.cid) ).sorted(cmp[string]) engine.peers[3].peerHave.mapIt($it).sorted(cmp[string]) == - blocks[12..15].mapIt( it.cid ).mapIt($it).sorted(cmp[string]) + blocks[12..15].mapIt( $(it.cid) ).sorted(cmp[string]) test "should exchange blocks with multiple nodes": let @@ -243,10 +252,9 @@ suite "NetworkStore - multiple nodes": engine = downloader.engine # Add blocks from 1st peer to want list - for b in blocks[0..3]: - discard engine.discoverBlock(b.cid) - for b in blocks[12..15]: - discard engine.discoverBlock(b.cid) + let + pendingBlocks1 = blocks[0..3].mapIt( engine.pendingBlocks.getWantHandle( it.cid ) ) + pendingBlocks2 = blocks[12..15].mapIt( engine.pendingBlocks.getWantHandle( it.cid )) await allFutures( blocks[0..3].mapIt( blockexc[0].engine.localStore.putBlock(it) )) @@ -260,74 +268,9 @@ suite "NetworkStore - multiple nodes": await connectNodes(switch) await sleepAsync(1.seconds) - let wantListBlocks = await allFinished( - blocks[0..3].mapIt( downloader.getBlock(it.cid) )) - check wantListBlocks.mapIt( !it.read ) == blocks[0..3] - -suite "NetworkStore - discovery": - let chunker = RandomChunker.new(Rng.instance(), size = 4096, chunkSize = 256) - - var - switch: seq[Switch] - blockexc: seq[NetworkStore] - blocks: seq[bt.Block] - - setup: - while true: - let chunk = await chunker.getBytes() - if chunk.len <= 0: - break - - blocks.add(bt.Block.new(chunk).tryGet()) - - for e in generateNodes(4): - switch.add(e.switch) - blockexc.add(e.blockexc) - await e.blockexc.engine.start() - await allFuturesThrowing( - switch.mapIt( it.start() ) - ) + allFinished(pendingBlocks1), + allFinished(pendingBlocks2)) - teardown: - await allFuturesThrowing( - switch.mapIt( it.stop() ) - ) - - switch = @[] - blockexc = @[] - - test "Shouldn't launch discovery request if we are already connected": - await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks) - blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = - check false - await connectNodes(switch) - let blk = await blockexc[1].engine.requestBlock(blocks[0].cid) - - test "E2E discovery": - # Distribute the blocks amongst 1..3 - # Ask 0 to download everything without connecting him beforehand - - var advertised: Table[Cid, SignedPeerRecord] - - blockexc[1].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[1].peerInfo.signedPeerRecord - - blockexc[2].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[2].peerInfo.signedPeerRecord - - blockexc[3].engine.discovery.publishProvide_var = proc(d: Discovery, cid: Cid) = - advertised[cid] = switch[3].peerInfo.signedPeerRecord - - await blockexc[1].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[0..5]) - await blockexc[2].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[4..10]) - await blockexc[3].engine.blocksHandler(switch[0].peerInfo.peerId, blocks[10..15]) - - blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = - if cid in advertised: - result.add(advertised[cid]) - - let futs = collect(newSeq): - for b in blocks: - blockexc[0].engine.requestBlock(b.cid) - await allFutures(futs) + check pendingBlocks1.mapIt( it.read ) == blocks[0..3] + check pendingBlocks2.mapIt( it.read ) == blocks[12..15] diff --git a/tests/dagger/blockexc/testengine.nim b/tests/dagger/blockexc/testengine.nim index 9940c9c6..d741a120 100644 --- a/tests/dagger/blockexc/testengine.nim +++ b/tests/dagger/blockexc/testengine.nim @@ -66,8 +66,9 @@ suite "NetworkStore engine basic": wallet, network, discovery) + for b in blocks: - discard engine.discoverBlock(b.cid) + discard engine.pendingBlocks.getWantHandle(b.cid) engine.setupPeer(peerId) await done @@ -171,7 +172,7 @@ suite "NetworkStore engine handlers": test "stores blocks in local store": let pending = blocks.mapIt( - engine.pendingBlocks.addOrAwait( it.cid ) + engine.pendingBlocks.getWantHandle( it.cid ) ) await engine.blocksHandler(peerId, blocks) diff --git a/tests/dagger/helpers/nodeutils.nim b/tests/dagger/helpers/nodeutils.nim index e1ce5027..f190746f 100644 --- a/tests/dagger/helpers/nodeutils.nim +++ b/tests/dagger/helpers/nodeutils.nim @@ -11,10 +11,8 @@ import ../examples proc generateNodes*( num: Natural, - blocks: openArray[bt.Block] = [], - secureManagers: openarray[SecureProtocol] = [ - SecureProtocol.Noise, - ]): seq[tuple[switch: Switch, blockexc: NetworkStore]] = + blocks: openArray[bt.Block] = []): + seq[tuple[switch: Switch, blockexc: NetworkStore]] = for i in 0..