From 259a9adcffec79ab7a11943f014f304bcf3dbef1 Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Tue, 19 Apr 2022 19:12:44 -0600 Subject: [PATCH] rework discovery with async queues --- dagger/blockexchange/engine.nim | 428 +++++++++++++++----------------- 1 file changed, 201 insertions(+), 227 deletions(-) diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index 7be6167e..a33e59ad 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -7,7 +7,8 @@ ## This file may not be copied, modified, or distributed except according to ## those terms. -import std/[sequtils, sets, tables, sugar] +import std/sequtils +import std/sets import pkg/chronos import pkg/chronicles @@ -15,7 +16,7 @@ import pkg/libp2p import ../stores/blockstore import ../blocktype as bt -import ../utils/asyncheapqueue +import ../utils import ../discovery import ./protobuf/blockexc @@ -32,31 +33,19 @@ logScope: topics = "dagger blockexc engine" const - DefaultBlockTimeout* = 5.minutes DefaultMaxPeersPerRequest* = 10 DefaultTaskQueueSize = 100 DefaultConcurrentTasks = 10 DefaultMaxRetries = 3 - - # Current advertisement is meant to be more efficient than - # correct, so blocks could be advertised more slowly than that - # Put some margin - BlockAdvertisementFrequency = 30.minutes + DefaultConcurrentDiscRequests = 10 + DefaultConcurrentAdvertRequests = 10 + DefaultDiscoveryTimeout = 1.minutes + DefaultMaxQueriedBlocksCache = 1000 type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} - BlockDiscovery* = ref object - discoveredProvider: AsyncEvent - discoveryLoop: Future[void] - toDiscover: Cid - treatedPeer: HashSet[PeerId] - inflightIWant: HashSet[PeerId] - gotIWantResponse: AsyncEvent - provides: seq[PeerId] - lastDhtQuery: Moment - BlockExcEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance network*: BlockExcNetwork # network interface @@ -70,12 +59,15 @@ type peersPerRequest: int # max number of peers to request from wallet*: WalletRef # nitro wallet for micropayments pricing*: ?Pricing # optional bandwidth pricing - advertisedBlocks: seq[Cid] - advertisedIndex: int - advertisementFrequency: Duration - runningDiscoveries*: Table[Cid, BlockDiscovery] - blockAdded: AsyncEvent - discovery*: Discovery + discovery*: Discovery # Discovery interface + concurrentAdvReqs: int # Concurent advertise requests + advertiseLoop*: Future[void] # Advertise loop task handle + advertiseQueue*: AsyncQueue[Cid] # Advertise queue + advertiseTasks*: seq[Future[void]] # Advertise tasks + concurrentDiscReqs: int # Concurent discovery requests + discoveryLoop*: Future[void] # Discovery loop task handle + discoveryTasks*: seq[Future[void]] # Discovery tasks + discoveryQueue*: AsyncQueue[Cid] # Discovery queue Pricing* = object address*: EthAddress @@ -100,7 +92,76 @@ proc scheduleTask(b: BlockExcEngine, task: BlockExcPeerCtx): bool {.gcsafe} = b.taskQueue.pushOrUpdateNoWait(task).isOk() proc blockexcTaskRunner(b: BlockExcEngine): Future[void] {.gcsafe.} -proc advertiseLoop(b: BlockExcEngine): Future[void] {.gcsafe.} + +proc discoveryLoopRunner(b: BlockExcEngine) {.async.} = + while b.blockexcRunning: + for cid in toSeq(b.pendingBlocks.wantList): + try: + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception in discovery loop", exc = exc.msg + + trace "About to sleep, number of wanted blocks", wanted = b.pendingBlocks.len + await sleepAsync(30.seconds) + +proc advertiseLoopRunner*(b: BlockExcEngine) {.async.} = + proc onBlock(blk: bt.Block) {.async.} = + try: + await b.advertiseQueue.put(blk.cid) + except CatchableError as exc: + trace "Exception listing blocks", exc = exc.msg + + while b.blockexcRunning: + await b.localStore.listBlocks(onBlock) + await sleepAsync(30.seconds) + + trace "Exiting advertise task loop" + +proc advertiseTaskRunner(b: BlockExcEngine) {.async.} = + ## Run advertise tasks + ## + + while b.blockexcRunning: + try: + let cid = await b.advertiseQueue.get() + await b.discovery.provideBlock(cid) + except CatchableError as exc: + trace "Exception in advertise task runner", exc = exc.msg + + trace "Exiting advertise task runner" + +proc discoveryTaskRunner(b: BlockExcEngine) {.async.} = + ## Run discovery tasks + ## + + while b.blockexcRunning: + try: + let + cid = await b.discoveryQueue.get() + providers = await b.discovery + .findBlockProviders(cid) + .wait(DefaultDiscoveryTimeout) + + await allFuturesThrowing( + allFinished(providers.mapIt( b.network.dialPeer(it.data) ))) + except CatchableError as exc: + trace "Exception in discovery task runner", exc = exc.msg + + trace "Exiting discovery task runner" + +proc queueFindBlocksReq(b: BlockExcEngine, cids: seq[Cid]) {.async.} = + try: + for cid in cids: + await b.discoveryQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg + +proc queueProvideBlocksReq(b: BlockExcEngine, cids: seq[Cid]) {.async.} = + try: + for cid in cids: + await b.advertiseQueue.put(cid) + except CatchableError as exc: + trace "Exception queueing discovery request", exc = exc.msg proc start*(b: BlockExcEngine) {.async.} = ## Start the blockexc task @@ -116,13 +177,14 @@ proc start*(b: BlockExcEngine) {.async.} = for i in 0.. 0 + peers.keepItIf( + it != blockPeer + ) debug "Requesting block from peer", providerCount = discovery.provides.len, peer = discovery.provides[0], cid # request block b.network.request.sendWantList( - discovery.provides[0], + blockPeer.id, @[cid], wantType = WantType.wantBlock) # we want this remote to send us a block - #TODO substract the discovery time - return await blk.wait(timeout) + if peers.len == 0: + trace "Not enough peers to send want list to", cid = $cid + asyncSpawn b.queueFindBlocksReq(@[cid]) + return blk # no peers to send wants to + + # filter out the peer we've already requested from + let stop = min(peers.high, b.peersPerRequest) + trace "Sending want list requests to remaining peers", count = stop + 1 + for p in peers[0..stop]: + if cid notin p.peerHave: + # just send wants + b.network.request.sendWantList( + p.id, + @[cid], + wantType = WantType.wantHave) # we only want to know if the peer has the block + + return blk proc blockPresenceHandler*( b: BlockExcEngine, @@ -298,18 +293,25 @@ proc blockPresenceHandler*( ## Handle block presence ## + trace "Received presence update for peer", peer let peerCtx = b.getPeerCtx(peer) + if isNil(peerCtx): + return for blk in blocks: if presence =? Presence.init(blk): - if not isNil(peerCtx): - peerCtx.updatePresence(presence) - if presence.cid in b.runningDiscoveries: - let bd = b.runningDiscoveries[presence.cid] - if not presence.have: - bd.inflightIWant.excl(peer) - bd.treatedPeer.incl(peer) - bd.gotIWantResponse.fire() + peerCtx.updatePresence(presence) + + let + cids = toSeq(b.pendingBlocks.wantList).filterIt( + it in peerCtx.peerHave + ) + + if cids.len > 0: + b.network.request.sendWantList( + peerCtx.id, + cids, + wantType = WantType.wantBlock) # we want this remote to send us a block proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" @@ -330,21 +332,11 @@ proc resolveBlocks*(b: BlockExcEngine, blocks: seq[bt.Block]) = ## and schedule any new task to be ran ## - trace "Resolving blocks" + trace "Resolving blocks", blocks = blocks.len - var gotNewBlocks = false - for bl in blocks: - if bl.cid notin b.advertisedBlocks: #TODO that's very slow, maybe a ordered hashset instead - #TODO could do some smarter ordering here (insert it just before b.advertisedIndex, or similar) - b.advertisedBlocks.add(bl.cid) - asyncSpawn b.discovery.publishProvide(bl.cid) - gotNewBlocks = true - - if gotNewBlocks: - b.pendingBlocks.resolve(blocks) - b.scheduleTasks(blocks) - - b.blockAdded.fire() + b.pendingBlocks.resolve(blocks) + b.scheduleTasks(blocks) + asyncCheck b.queueProvideBlocksReq(blocks.mapIt( it.cid )) proc payForBlocks(engine: BlockExcEngine, peer: BlockExcPeerCtx, @@ -420,14 +412,20 @@ proc wantListHandler*( if not b.scheduleTask(peerCtx): trace "Unable to schedule task for peer", peer -proc accountHandler*(engine: BlockExcEngine, peer: PeerID, account: Account) {.async.} = +proc accountHandler*( + engine: BlockExcEngine, + peer: PeerID, + account: Account) {.async.} = let context = engine.getPeerCtx(peer) if context.isNil: return context.account = account.some -proc paymentHandler*(engine: BlockExcEngine, peer: PeerId, payment: SignedState) {.async.} = +proc paymentHandler*( + engine: BlockExcEngine, + peer: PeerId, + payment: SignedState) {.async.} = without context =? engine.getPeerCtx(peer).option and account =? context.account: return @@ -450,13 +448,8 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = )) # broadcast our want list, the other peer will do the same - let wantList = collect(newSeqOfCap(b.runningDiscoveries.len)): - for cid, bd in b.runningDiscoveries: - bd.inflightIWant.incl(peer) - cid - - if wantList.len > 0: - b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true) + if b.pendingBlocks.len > 0: + b.network.request.sendWantList(peer, toSeq(b.pendingBlocks.wantList), full = true) if address =? b.pricing.?address: b.network.request.sendAccount(peer, Account(address: address)) @@ -470,31 +463,6 @@ proc dropPeer*(b: BlockExcEngine, peer: PeerID) = # drop the peer from the peers table b.peers.keepItIf( it.id != peer ) -proc advertiseLoop(b: BlockExcEngine) {.async, gcsafe.} = - while true: - if b.advertisedIndex >= b.advertisedBlocks.len: - b.advertisedIndex = 0 - b.advertisementFrequency = BlockAdvertisementFrequency - - # check that we still have this block. - while - b.advertisedIndex < b.advertisedBlocks.len and - not(b.localStore.contains(b.advertisedBlocks[b.advertisedIndex])): - b.advertisedBlocks.delete(b.advertisedIndex) - - #publish it - if b.advertisedIndex < b.advertisedBlocks.len: - asyncSpawn b.discovery.publishProvide(b.advertisedBlocks[b.advertisedIndex]) - - inc b.advertisedIndex - let toSleep = - if b.advertisedBlocks.len > 0: - b.advertisementFrequency div b.advertisedBlocks.len - else: - 30.minutes - await sleepAsync(toSleep) or b.blockAdded.wait() - b.blockAdded.clear() - proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = trace "Handling task for peer", peer = task.id @@ -516,6 +484,7 @@ proc taskHandler*(b: BlockExcEngine, task: BlockExcPeerCtx) {.gcsafe, async.} = .mapIt(!it.read) if blocks.len > 0: + trace "Sending blocks to peer", peer = task.id, blocks = blocks.len b.network.request.sendBlocks( task.id, blocks) @@ -558,19 +527,25 @@ proc new*( discovery: Discovery, concurrentTasks = DefaultConcurrentTasks, maxRetries = DefaultMaxRetries, - peersPerRequest = DefaultMaxPeersPerRequest): T = + peersPerRequest = DefaultMaxPeersPerRequest, + concurrentAdvReqs = DefaultConcurrentAdvertRequests, + concurrentDiscReqs = DefaultConcurrentDiscRequests): T = - let engine = BlockExcEngine( - localStore: localStore, - pendingBlocks: PendingBlocksManager.new(), - blockAdded: newAsyncEvent(), - peersPerRequest: peersPerRequest, - network: network, - wallet: wallet, - concurrentTasks: concurrentTasks, - maxRetries: maxRetries, - discovery: discovery, - taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize)) + let + engine = BlockExcEngine( + localStore: localStore, + pendingBlocks: PendingBlocksManager.new(), + peersPerRequest: peersPerRequest, + network: network, + wallet: wallet, + concurrentTasks: concurrentTasks, + concurrentAdvReqs: concurrentAdvReqs, + concurrentDiscReqs: concurrentDiscReqs, + maxRetries: maxRetries, + taskQueue: newAsyncHeapQueue[BlockExcPeerCtx](DefaultTaskQueueSize), + discovery: discovery, + advertiseQueue: newAsyncQueue[Cid](DefaultTaskQueueSize), + discoveryQUeue: newAsyncQueue[Cid](DefaultTaskQueueSize)) proc peerEventHandler(peerId: PeerID, event: PeerEvent) {.async.} = if event.kind == PeerEventKind.Joined: @@ -608,7 +583,6 @@ proc new*( onBlocks: blocksHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, - onPayment: paymentHandler - ) + onPayment: paymentHandler) return engine