Cleanup discovery

This commit is contained in:
Tanguy 2022-04-21 13:26:22 +02:00
parent ce59dbd4a2
commit 9eb3b61c6e
No known key found for this signature in database
GPG Key ID: 7DD8EC6B6CE6C45E
2 changed files with 95 additions and 106 deletions

View File

@ -49,13 +49,12 @@ type
BlockDiscovery* = ref object
discoveredProvider: AsyncEvent
running: AsyncEvent
discoveryLoop: Future[void]
toDiscover: Cid
treatedPeer: HashSet[PeerId]
inflightIWant: HashSet[PeerId]
gotIWantResponse: AsyncEvent
provides: seq[PeerId]
lastDhtQuery: Moment
BlockExcEngine* = ref object of RootObj
localStore*: BlockStore # where we localStore blocks for this instance
@ -147,57 +146,50 @@ proc stop*(b: BlockExcEngine) {.async.} =
trace "NetworkStore stopped"
proc discoverOnDht(b: BlockExcEngine, bd: BlockDiscovery) {.async.} =
bd.lastDhtQuery = Moment.fromNow(10.hours)
defer: bd.lastDhtQuery = Moment.now()
let discoveredProviders = await b.discovery.findBlockProviders(bd.toDiscover)
for peer in discoveredProviders:
asyncSpawn b.network.dialPeer(peer.data)
proc discoverLoop(b: BlockExcEngine, bd: BlockDiscovery) {.async.} =
# First, try connected peers
# After a percent of peers declined, or a timeout passed, query DHT
# rinse & repeat
#
# TODO add a global timeout
debug "starting block discovery", cid=bd.toDiscover
bd.gotIWantResponse.fire()
while true:
# wait for iwant replies
proc lowInflight(bd: BlockDiscovery) {.async.} =
while bd.inflightIWant.len > 3:
await bd.gotIWantResponse.wait()
bd.gotIWantResponse.clear()
var foundPeerNew = false
for p in b.peers:
if bd.toDiscover in p.peerHave and p.id notin bd.treatedPeer:
bd.provides.add(p.id)
bd.treatedPeer.incl(p.id)
bd.inflightIWant.excl(p.id)
foundPeerNew = true
proc discoveredPeer(bd: BlockDiscovery, peer: PeerId, hasBlock: bool) =
bd.inflightIWant.excl(peer)
if hasBlock and peer notin bd.provides:
bd.provides.add(peer)
bd.discoveredProvider.fire()
bd.running.clear()
bd.gotIWantResponse.fire()
if foundPeerNew:
bd.discoveredProvider.fire()
continue
proc resume(bd: BlockDiscovery) = bd.running.fire()
trace "asking peers", cid=bd.toDiscover, peers=b.peers.len, treated=bd.treatedPeer.len, inflight=bd.inflightIWant.len
for p in b.peers:
if p.id notin bd.treatedPeer and p.id notin bd.inflightIWant:
# just send wants
bd.inflightIWant.incl(p.id)
b.network.request.sendWantList(
p.id,
@[bd.toDiscover],
wantType = WantType.wantHave,
sendDontHave = true)
proc discoverLoop(b: BlockExcEngine, bd: BlockDiscovery) {.async.} =
debug "starting block discovery", cid=bd.toDiscover
if bd.inflightIWant.len < 3 and #TODO or a timeout
bd.lastDhtQuery < Moment.now() - 5.seconds:
#start query
asyncSpawn b.discoverOnDht(bd)
# Check current peers
for p in b.peers:
if bd.toDiscover in p.peerHave:
bd.provides.add(p.id)
else:
b.network.request.sendWantList(
p.id,
@[bd.toDiscover],
wantType = WantType.wantHave,
sendDontHave = true)
bd.inflightIWant.incl(p.id)
if bd.provides.len > 0:
bd.discoveredProvider.fire()
bd.running.clear()
await bd.lowInflight() or sleepAsync(500.milliseconds)
while true:
await bd.running.wait()
debug "asking peers", cid=bd.toDiscover
#start query
let discoveredProviders = await b.discovery.findBlockProviders(bd.toDiscover)
for peer in discoveredProviders:
asyncSpawn b.network.dialPeer(peer.data)
await sleepAsync(30.seconds)
proc discoverBlock*(b: BlockExcEngine, cid: Cid): BlockDiscovery =
if cid in b.runningDiscoveries:
@ -207,7 +199,9 @@ proc discoverBlock*(b: BlockExcEngine, cid: Cid): BlockDiscovery =
toDiscover: cid,
discoveredProvider: newAsyncEvent(),
gotIWantResponse: newAsyncEvent(),
running: newAsyncEvent(),
)
result.running.fire()
result.discoveryLoop = b.discoverLoop(result)
b.runningDiscoveries[cid] = result
return result
@ -217,6 +211,52 @@ proc stopDiscovery(b: BlockExcEngine, cid: Cid) =
b.runningDiscoveries[cid].discoveryLoop.cancel()
b.runningDiscoveries.del(cid)
proc blockGetter(
b: BlockExcEngine,
cid: Cid) {.async.} =
let
timeoutFut = sleepAsync(DefaultBlockTimeout)
blk = b.pendingBlocks.addOrAwait(cid)
discovery = b.discoverBlock(cid)
defer: b.stopDiscovery(cid)
# TODO handle cancellation
# need to count how much people are waiting for the pending
# block, and stop this if it hit 0
while not blk.finished:
await timeoutFut or blk or discovery.discoveredProvider.wait()
if timeoutFut.finished:
# Timeout passed, fail the block
blk.cancel()
return
if blk.finished:
# a peer sent us the block out of the blue
return
# We got a provider
# In reality, we could keep discovering until we find a suitable price, etc
while discovery.provides.len > 0:
let provider = discovery.provides.pop()
debug "Requesting block from peer", providerCount = discovery.provides.len + 1,
peer = provider, cid
# request block
b.network.request.sendWantList(
provider,
@[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block
if await blk.withTimeout(1.seconds):
# got the block
return
# No discovered peer sent us the block, restart discovery
discovery.resume()
proc requestBlock*(
b: BlockExcEngine,
cid: Cid,
@ -237,59 +277,11 @@ proc requestBlock*(
# be careful, don't give back control to main loop here
# otherwise, the block might slip in
if cid in b.pendingBlocks:
return await b.pendingBlocks.blocks[cid].wait(timeout)
if cid notin b.pendingBlocks:
# We are the first one to request this block, so we handle it
asyncSpawn b.blockGetter(cid)
# We are the first one to request this block, so we handle it
let
timeoutFut = sleepAsync(timeout)
blk = b.pendingBlocks.addOrAwait(cid)
discovery = b.discoverBlock(cid)
# Just take the first discovered peer
try:
await timeoutFut or blk or discovery.discoveredProvider.wait()
discovery.discoveredProvider.clear()
except CancelledError as exc:
#TODO also wrong, same issue as below
blk.cancel()
b.stopDiscovery(cid)
raise exc
if timeoutFut.finished:
# TODO this is wrong, because other user may rely on us
# to handle this block. This proc should be asyncSpawned
#
# Other people may be using the discovery or blk
# so don't kill them
blk.cancel()
b.stopDiscovery(cid)
raise newException(AsyncTimeoutError, "")
if blk.finished:
# a peer sent us the block out of the blue, why not
b.stopDiscovery(cid)
return await blk
# We got a provider
# Currently, we just ask him for the block, and hope he gives it to us
#
# In reality, we could keep discovering until we find a suitable price, etc
b.stopDiscovery(cid)
timeoutFut.cancel()
assert discovery.provides.len > 0
debug "Requesting block from peer", providerCount = discovery.provides.len,
peer = discovery.provides[0], cid
# request block
b.network.request.sendWantList(
discovery.provides[0],
@[cid],
wantType = WantType.wantBlock) # we want this remote to send us a block
#TODO substract the discovery time
return await blk.wait(timeout)
return await b.pendingBlocks.addOrAwait(cid).wait(timeout)
proc blockPresenceHandler*(
b: BlockExcEngine,
@ -306,10 +298,7 @@ proc blockPresenceHandler*(
peerCtx.updatePresence(presence)
if presence.cid in b.runningDiscoveries:
let bd = b.runningDiscoveries[presence.cid]
if not presence.have:
bd.inflightIWant.excl(peer)
bd.treatedPeer.incl(peer)
bd.gotIWantResponse.fire()
bd.discoveredPeer(peer, presence.have)
proc scheduleTasks(b: BlockExcEngine, blocks: seq[bt.Block]) =
trace "Schedule a task for new blocks"
@ -456,7 +445,7 @@ proc setupPeer*(b: BlockExcEngine, peer: PeerID) =
cid
if wantList.len > 0:
b.network.request.sendWantList(peer, wantList, full = true, sendDontHave = true)
b.network.request.sendWantList(peer, wantList, full = true)
if address =? b.pricing.?address:
b.network.request.sendAccount(peer, Account(address: address))

View File

@ -280,7 +280,7 @@ suite "NetworkStore - discovery":
blocks.add(bt.Block.new(chunk).tryGet())
for e in generateNodes(4):
for e in generateNodes(5):
switch.add(e.switch)
blockexc.add(e.blockexc)
await e.blockexc.engine.start()
@ -299,7 +299,7 @@ suite "NetworkStore - discovery":
test "Shouldn't launch discovery request if we are already connected":
await blockexc[0].engine.blocksHandler(switch[1].peerInfo.peerId, blocks)
blockexc[0].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] =
blockexc[1].engine.discovery.findBlockProviders_var = proc(d: Discovery, cid: Cid): seq[SignedPeerRecord] =
check false
await connectNodes(switch)
let blk = await blockexc[1].engine.requestBlock(blocks[0].cid)