diff --git a/dagger/blockexchange/engine.nim b/dagger/blockexchange/engine.nim index 7be6167e..6bd30ef1 100644 --- a/dagger/blockexchange/engine.nim +++ b/dagger/blockexchange/engine.nim @@ -49,13 +49,12 @@ type BlockDiscovery* = ref object discoveredProvider: AsyncEvent + running: AsyncEvent discoveryLoop: Future[void] toDiscover: Cid - treatedPeer: HashSet[PeerId] inflightIWant: HashSet[PeerId] gotIWantResponse: AsyncEvent provides: seq[PeerId] - lastDhtQuery: Moment BlockExcEngine* = ref object of RootObj localStore*: BlockStore # where we localStore blocks for this instance @@ -147,57 +146,50 @@ proc stop*(b: BlockExcEngine) {.async.} = trace "NetworkStore stopped" -proc discoverOnDht(b: BlockExcEngine, bd: BlockDiscovery) {.async.} = - bd.lastDhtQuery = Moment.fromNow(10.hours) - defer: bd.lastDhtQuery = Moment.now() - - let discoveredProviders = await b.discovery.findBlockProviders(bd.toDiscover) - - for peer in discoveredProviders: - asyncSpawn b.network.dialPeer(peer.data) - -proc discoverLoop(b: BlockExcEngine, bd: BlockDiscovery) {.async.} = - # First, try connected peers - # After a percent of peers declined, or a timeout passed, query DHT - # rinse & repeat - # - # TODO add a global timeout - - debug "starting block discovery", cid=bd.toDiscover - - bd.gotIWantResponse.fire() - while true: - # wait for iwant replies +proc lowInflight(bd: BlockDiscovery) {.async.} = + while bd.inflightIWant.len > 3: await bd.gotIWantResponse.wait() bd.gotIWantResponse.clear() - var foundPeerNew = false - for p in b.peers: - if bd.toDiscover in p.peerHave and p.id notin bd.treatedPeer: - bd.provides.add(p.id) - bd.treatedPeer.incl(p.id) - bd.inflightIWant.excl(p.id) - foundPeerNew = true +proc discoveredPeer(bd: BlockDiscovery, peer: PeerId, hasBlock: bool) = + bd.inflightIWant.excl(peer) + if hasBlock and peer notin bd.provides: + bd.provides.add(peer) + bd.discoveredProvider.fire() + bd.running.clear() + bd.gotIWantResponse.fire() - if foundPeerNew: - bd.discoveredProvider.fire() - continue +proc resume(bd: BlockDiscovery) = bd.running.fire() - trace "asking peers", cid=bd.toDiscover, peers=b.peers.len, treated=bd.treatedPeer.len, inflight=bd.inflightIWant.len - for p in b.peers: - if p.id notin bd.treatedPeer and p.id notin bd.inflightIWant: - # just send wants - bd.inflightIWant.incl(p.id) - b.network.request.sendWantList( - p.id, - @[bd.toDiscover], - wantType = WantType.wantHave, - sendDontHave = true) +proc discoverLoop(b: BlockExcEngine, bd: BlockDiscovery) {.async.} = + debug "starting block discovery", cid=bd.toDiscover - if bd.inflightIWant.len < 3 and #TODO or a timeout - bd.lastDhtQuery < Moment.now() - 5.seconds: - #start query - asyncSpawn b.discoverOnDht(bd) + # Check current peers + for p in b.peers: + if bd.toDiscover in p.peerHave: + bd.provides.add(p.id) + else: + b.network.request.sendWantList( + p.id, + @[bd.toDiscover], + wantType = WantType.wantHave, + sendDontHave = true) + bd.inflightIWant.incl(p.id) + + if bd.provides.len > 0: + bd.discoveredProvider.fire() + bd.running.clear() + + await bd.lowInflight() or sleepAsync(500.milliseconds) + + while true: + await bd.running.wait() + debug "asking peers", cid=bd.toDiscover + #start query + let discoveredProviders = await b.discovery.findBlockProviders(bd.toDiscover) + for peer in discoveredProviders: + asyncSpawn b.network.dialPeer(peer.data) + await sleepAsync(30.seconds) proc discoverBlock*(b: BlockExcEngine, cid: Cid): BlockDiscovery = if cid in b.runningDiscoveries: @@ -207,7 +199,9 @@ proc discoverBlock*(b: BlockExcEngine, cid: Cid): BlockDiscovery = toDiscover: cid, discoveredProvider: newAsyncEvent(), gotIWantResponse: newAsyncEvent(), + running: newAsyncEvent(), ) + result.running.fire() result.discoveryLoop = b.discoverLoop(result) b.runningDiscoveries[cid] = result return result @@ -217,6 +211,52 @@ proc stopDiscovery(b: BlockExcEngine, cid: Cid) = b.runningDiscoveries[cid].discoveryLoop.cancel() b.runningDiscoveries.del(cid) +proc blockGetter( + b: BlockExcEngine, + cid: Cid) {.async.} = + let + timeoutFut = sleepAsync(DefaultBlockTimeout) + blk = b.pendingBlocks.addOrAwait(cid) + discovery = b.discoverBlock(cid) + + defer: b.stopDiscovery(cid) + + # TODO handle cancellation + # need to count how much people are waiting for the pending + # block, and stop this if it hit 0 + + while not blk.finished: + await timeoutFut or blk or discovery.discoveredProvider.wait() + + if timeoutFut.finished: + # Timeout passed, fail the block + blk.cancel() + return + + if blk.finished: + # a peer sent us the block out of the blue + return + + # We got a provider + # In reality, we could keep discovering until we find a suitable price, etc + while discovery.provides.len > 0: + let provider = discovery.provides.pop() + + debug "Requesting block from peer", providerCount = discovery.provides.len + 1, + peer = provider, cid + # request block + b.network.request.sendWantList( + provider, + @[cid], + wantType = WantType.wantBlock) # we want this remote to send us a block + + if await blk.withTimeout(1.seconds): + # got the block + return + + # No discovered peer sent us the block, restart discovery + discovery.resume() + proc requestBlock*( b: BlockExcEngine, cid: Cid, @@ -237,59 +277,11 @@ proc requestBlock*( # be careful, don't give back control to main loop here # otherwise, the block might slip in - if cid in b.pendingBlocks: - return await b.pendingBlocks.blocks[cid].wait(timeout) + if cid notin b.pendingBlocks: + # We are the first one to request this block, so we handle it + asyncSpawn b.blockGetter(cid) - # We are the first one to request this block, so we handle it - let - timeoutFut = sleepAsync(timeout) - blk = b.pendingBlocks.addOrAwait(cid) - discovery = b.discoverBlock(cid) - - # Just take the first discovered peer - try: - await timeoutFut or blk or discovery.discoveredProvider.wait() - discovery.discoveredProvider.clear() - except CancelledError as exc: - #TODO also wrong, same issue as below - blk.cancel() - b.stopDiscovery(cid) - raise exc - - if timeoutFut.finished: - # TODO this is wrong, because other user may rely on us - # to handle this block. This proc should be asyncSpawned - # - # Other people may be using the discovery or blk - # so don't kill them - blk.cancel() - b.stopDiscovery(cid) - raise newException(AsyncTimeoutError, "") - - if blk.finished: - # a peer sent us the block out of the blue, why not - b.stopDiscovery(cid) - return await blk - - # We got a provider - # Currently, we just ask him for the block, and hope he gives it to us - # - # In reality, we could keep discovering until we find a suitable price, etc - b.stopDiscovery(cid) - timeoutFut.cancel() - - assert discovery.provides.len > 0 - - debug "Requesting block from peer", providerCount = discovery.provides.len, - peer = discovery.provides[0], cid - # request block - b.network.request.sendWantList( - discovery.provides[0], - @[cid], - wantType = WantType.wantBlock) # we want this remote to send us a block - - #TODO substract the discovery time - return await blk.wait(timeout) + return await b.pendingBlocks.addOrAwait(cid).wait(timeout) proc blockPresenceHandler*( b: BlockExcEngine, @@ -306,10 +298,7 @@ proc blockPresenceHandler*( peerCtx.updatePresence(presence) if presence.cid in b.runningDiscoveries: let bd = b.runningDiscoveries[presence.cid] - if not presence.have: - bd.inflightIWant.excl(peer) - bd.treatedPeer.incl(peer) - bd.gotIWantResponse.fire() + bd.discoveredPeer(peer, presence.have) proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) = trace "Schedule a task for new blocks" @@ -456,7 +445,7 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) = cid if wantList.len > 0: - b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true) + b.network.request.sendWantList(peer, wantList, full = true) if address =? b.pricing.?address: b.network.request.sendAccount(peer, Account(address: address)) diff --git a/tests/dagger/blockexc/testblockexc.nim b/tests/dagger/blockexc/testblockexc.nim index 361c772f..736abd7f 100644 --- a/tests/dagger/blockexc/testblockexc.nim +++ b/tests/dagger/blockexc/testblockexc.nim @@ -280,7 +280,7 @@ suite "NetworkStore - discovery": blocks.add(bt.Block.new(chunk).tryGet()) - for e in generateNodes(4): + for e in generateNodes(5): switch.add(e.switch) blockexc.add(e.blockexc) await e.blockexc.engine.start() @@ -299,7 +299,7 @@ suite "NetworkStore - discovery": test "Shouldn't launch discovery request if we are already connected": await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks) - blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = + blockexc[1].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] = check false await connectNodes(switch) let blk = await blockexc[1].engine.requestBlock(blocks[0].cid)