diff --git a/codex/blockexchange/engine/engine.nim b/codex/blockexchange/engine/engine.nim index 4acc87ab..453dea4f 100644 --- a/codex/blockexchange/engine/engine.nim +++ b/codex/blockexchange/engine/engine.nim @@ -64,6 +64,10 @@ declareCounter(codex_block_exchange_blocks_sent, "codex blockexchange blocks sen declareCounter( codex_block_exchange_blocks_received, "codex blockexchange blocks received" ) +declareCounter( + codex_block_exchange_spurious_blocks_received, + "codex blockexchange unrequested/duplicate blocks received", +) const DefaultMaxPeersPerRequest* = 10 @@ -80,13 +84,15 @@ const type TaskHandler* = proc(task: BlockExcPeerCtx): Future[void] {.gcsafe.} TaskScheduler* = proc(task: BlockExcPeerCtx): bool {.gcsafe.} + PeerSelector* = + proc(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx {.gcsafe, raises: [].} BlockExcEngine* = ref object of RootObj localStore*: BlockStore # Local block store for this instance - network*: BlockExcNetwork # Petwork interface + network*: BlockExcNetwork # Network interface peers*: PeerCtxStore # Peers we're currently actively exchanging with taskQueue*: AsyncHeapQueue[BlockExcPeerCtx] - # Peers we're currently processing tasks for + selectPeer*: PeerSelector # Peers we're currently processing tasks for concurrentTasks: int # Number of concurrent peers we're serving at any given time trackedFutures: TrackedFutures # Tracks futures of blockexc tasks blockexcRunning: bool # Indicates if the blockexc task is running @@ -205,8 +211,19 @@ proc searchForNewPeers(self: BlockExcEngine, cid: Cid) = else: trace "Not searching for new peers, rate limit not expired", cid = cid -proc randomPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = - Rng.instance.sample(peers) +proc evictPeer(self: BlockExcEngine, peer: PeerId) = + ## Cleanup disconnected peer + ## + + trace "Evicting disconnected/departed peer", peer + + let peerCtx = self.peers.get(peer) + if not peerCtx.isNil: + for address in peerCtx.blocksRequested: + self.pendingBlocks.clearRequest(address, peer.some) + + # drop the peer from the peers table + self.peers.remove(peer) proc downloadInternal( self: BlockExcEngine, address: BlockAddress @@ -245,23 +262,34 @@ proc downloadInternal( # control what happens and how many neighbors we get like this. self.searchForNewPeers(address.cidOrTreeCid) - # We wait for a bit and then retry. Since we've shipped our wantlists to - # connected peers, they might reply and we might request the block in the - # meantime as part of `blockPresenceHandler`. + # We now wait for a bit and then retry. If the handle gets completed in the + # meantime (cause the presence handler might have requested the block and + # received it in the meantime), we are done. await handle or sleepAsync(self.pendingBlocks.retryInterval) - # If we already got the block, we're done. Otherwise, we'll go for another - # cycle, potentially refreshing knowledge on peers again, and looking up - # the DHT again. if handle.finished: break + # If we still don't have the block, we'll go for another cycle. trace "No peers for block, will retry shortly" continue - let scheduledPeer = peers.with.randomPeer - self.pendingBlocks.setInFlight(address, true) - scheduledPeer.blockRequested(address) - await self.sendWantBlock(@[address], scheduledPeer) + # Once again, it might happen that the block was requested to a peer + # in the meantime. If so, we don't need to do anything. Otherwise, + # we'll be the ones placing the request. + let scheduledPeer = + if not self.pendingBlocks.isRequested(address): + let peer = self.selectPeer(peers.with) + self.pendingBlocks.markRequested(address, peer.id) + peer.blockRequested(address) + trace "Request block from block retry loop" + await self.sendWantBlock(@[address], peer) + peer + else: + let peerId = self.pendingBlocks.getRequestPeer(address).get() + self.peers.get(peerId) + assert not scheduledPeer.isNil + + # Parks until either the block is received, or the peer times out. let activityTimer = scheduledPeer.activityTimer() await handle or activityTimer # TODO: or peerDropped activityTimer.cancel() @@ -275,8 +303,11 @@ proc downloadInternal( break else: # If the peer timed out, retries immediately. - trace "Dropping timed out peer.", peer = scheduledPeer.id - # TODO: disconnect peer + trace "Peer timed out during block request", peer = scheduledPeer.id + await self.network.dropPeer(scheduledPeer.id) + # Evicts peer immediately or we may end up picking it again in the + # next retry. + self.evictPeer(scheduledPeer.id) except CancelledError as exc: trace "Block download cancelled" if not handle.finished: @@ -286,7 +317,7 @@ proc downloadInternal( if not handle.finished: handle.fail(exc) finally: - self.pendingBlocks.setInFlight(address, false) + self.pendingBlocks.clearRequest(address) proc requestBlocks*( self: BlockExcEngine, addresses: seq[BlockAddress] @@ -375,12 +406,12 @@ proc blockPresenceHandler*( let ourWantCids = ourWantList.filterIt( it in peerHave and not self.pendingBlocks.retriesExhausted(it) and - not self.pendingBlocks.isInFlight(it) + not self.pendingBlocks.isRequested(it) ).toSeq for address in ourWantCids: - self.pendingBlocks.setInFlight(address, true) self.pendingBlocks.decRetries(address) + self.pendingBlocks.markRequested(address, peer) peerCtx.blockRequested(address) if ourWantCids.len > 0: @@ -388,6 +419,8 @@ proc blockPresenceHandler*( # FIXME: this will result in duplicate requests for blocks if err =? catch(await self.sendWantBlock(ourWantCids, peerCtx)).errorOption: warn "Failed to send wantBlock to peer", peer, err = err.msg + for address in ourWantCids: + self.pendingBlocks.clearRequest(address, peer.some) proc scheduleTasks( self: BlockExcEngine, blocksDelivery: seq[BlockDelivery] @@ -417,18 +450,17 @@ proc cancelBlocks( ## Tells neighboring peers that we're no longer interested in a block. ## - let addrSet = toHashSet(addrs) - var pendingCancellations: Table[PeerId, HashSet[BlockAddress]] + let blocksDelivered = toHashSet(addrs) + var scheduledCancellations: Table[PeerId, HashSet[BlockAddress]] if self.peers.len == 0: return - trace "Sending block request cancellations to peers", - addrs, peers = self.peers.peerIds - - proc processPeer( + proc dispatchCancellations( entry: tuple[peerId: PeerId, addresses: HashSet[BlockAddress]] ): Future[PeerId] {.async: (raises: [CancelledError]).} = + trace "Sending block request cancellations to peer", + peer = entry.peerId, addresses = entry.addresses.len await self.network.request.sendWantCancellations( peer = entry.peerId, addresses = entry.addresses.toSeq ) @@ -437,21 +469,22 @@ proc cancelBlocks( try: for peerCtx in self.peers.peers.values: - # Have we requested any of the blocks we're cancelling to this peer? - let intersection = peerCtx.blocksRequested.intersection(addrSet) + # Do we have pending requests, towards this peer, for any of the blocks + # that were just delivered? + let intersection = peerCtx.blocksRequested.intersection(blocksDelivered) if intersection.len > 0: - pendingCancellations[peerCtx.id] = intersection + # If so, schedules a cancellation. + scheduledCancellations[peerCtx.id] = intersection - # If so, dispatches cancellations. let (succeededFuts, failedFuts) = await allFinishedFailed[PeerId]( - toSeq(pendingCancellations.pairs).map(processPeer) + toSeq(scheduledCancellations.pairs).map(dispatchCancellations) ) (await allFinished(succeededFuts)).mapIt(it.read).apply do(peerId: PeerId): let ctx = self.peers.get(peerId) if not ctx.isNil: ctx.cleanPresence(addrs) - for address in pendingCancellations[peerId]: + for address in scheduledCancellations[peerId]: ctx.blockRequestCancelled(address) if failedFuts.len > 0: @@ -535,6 +568,12 @@ proc blocksDeliveryHandler*( address = bd.address try: + # Unknown peers and unrequested blocks are dropped with a warning. + if peerCtx == nil or not peerCtx.blockReceived(bd.address): + warn "Dropping unrequested or duplicate block received from peer" + codex_block_exchange_spurious_blocks_received.inc() + continue + if err =? self.validateBlockDelivery(bd).errorOption: warn "Block validation failed", msg = err.msg continue @@ -554,9 +593,6 @@ proc blocksDeliveryHandler*( ).errorOption: warn "Unable to store proof and cid for a block" continue - - if peerCtx != nil: - peerCtx.blockReceived(bd.address) except CatchableError as exc: warn "Error handling block delivery", error = exc.msg continue @@ -681,7 +717,7 @@ proc paymentHandler*( else: context.paymentChannel = self.wallet.acceptChannel(payment).option -proc setupPeer*( +proc peerAddedHandler*( self: BlockExcEngine, peer: PeerId ) {.async: (raises: [CancelledError]).} = ## Perform initial setup, such as want @@ -701,15 +737,6 @@ proc setupPeer*( trace "Sending account to peer", peer await self.network.request.sendAccount(peer, Account(address: address)) -proc dropPeer*(self: BlockExcEngine, peer: PeerId) {.raises: [].} = - ## Cleanup disconnected peer - ## - - trace "Dropping peer", peer - - # drop the peer from the peers table - self.peers.remove(peer) - proc localLookup( self: BlockExcEngine, address: BlockAddress ): Future[?!BlockDelivery] {.async: (raises: [CancelledError]).} = @@ -775,7 +802,7 @@ proc taskHandler*( peerCtx.wantedBlocks.keepItIf(it notin sent) finally: # Better safe than sorry: if an exception does happen, we don't want to keep - # those in flight as it'll effectively prevent the blocks from ever being sent. + # those as sent, as it'll effectively prevent the blocks from ever being sent again. peerCtx.blocksSent.keepItIf(it notin wantedBlocks) proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = @@ -792,6 +819,9 @@ proc blockexcTaskRunner(self: BlockExcEngine) {.async: (raises: []).} = info "Exiting blockexc task runner" +proc selectRandom*(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = + Rng.instance.sample(peers) + proc new*( T: type BlockExcEngine, localStore: BlockStore, @@ -803,6 +833,7 @@ proc new*( pendingBlocks: PendingBlocksManager, maxBlocksPerMessage = DefaultMaxBlocksPerMessage, concurrentTasks = DefaultConcurrentTasks, + selectPeer: PeerSelector = selectRandom, ): BlockExcEngine = ## Create new block exchange engine instance ## @@ -821,18 +852,6 @@ proc new*( advertiser: advertiser, ) - proc peerEventHandler( - peerId: PeerId, event: PeerEvent - ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = - if event.kind == PeerEventKind.Joined: - await self.setupPeer(peerId) - else: - self.dropPeer(peerId) - - if not isNil(network.switch): - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - proc blockWantListHandler( peer: PeerId, wantList: WantList ): Future[void] {.async: (raises: []).} = @@ -858,12 +877,24 @@ proc new*( ): Future[void] {.async: (raises: []).} = self.paymentHandler(peer, payment) + proc peerAddedHandler( + peer: PeerId + ): Future[void] {.async: (raises: [CancelledError]).} = + await self.peerAddedHandler(peer) + + proc peerDepartedHandler( + peer: PeerId + ): Future[void] {.async: (raises: [CancelledError]).} = + self.evictPeer(peer) + network.handlers = BlockExcHandlers( onWantList: blockWantListHandler, onBlocksDelivery: blocksDeliveryHandler, onPresence: blockPresenceHandler, onAccount: accountHandler, onPayment: paymentHandler, + onPeerJoined: peerAddedHandler, + onPeerDeparted: peerDepartedHandler, ) return self diff --git a/codex/blockexchange/engine/pendingblocks.nim b/codex/blockexchange/engine/pendingblocks.nim index d78dda5a..5579a1fa 100644 --- a/codex/blockexchange/engine/pendingblocks.nim +++ b/codex/blockexchange/engine/pendingblocks.nim @@ -42,7 +42,7 @@ type BlockReq* = object handle*: BlockHandle - inFlight*: bool + requested*: ?PeerId blockRetries*: int startTime*: int64 @@ -56,7 +56,7 @@ proc updatePendingBlockGauge(p: PendingBlocksManager) = codex_block_exchange_pending_block_requests.set(p.blocks.len.int64) proc getWantHandle*( - self: PendingBlocksManager, address: BlockAddress, inFlight = false + self: PendingBlocksManager, address: BlockAddress, requested: ?PeerId = PeerId.none ): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = ## Add an event for a block ## @@ -66,7 +66,7 @@ proc getWantHandle*( do: let blk = BlockReq( handle: newFuture[Block]("pendingBlocks.getWantHandle"), - inFlight: inFlight, + requested: requested, blockRetries: self.blockRetries, startTime: getMonoTime().ticks, ) @@ -89,9 +89,9 @@ proc getWantHandle*( return handle proc getWantHandle*( - self: PendingBlocksManager, cid: Cid, inFlight = false + self: PendingBlocksManager, cid: Cid, requested: ?PeerId = PeerId.none ): Future[Block] {.async: (raw: true, raises: [CancelledError, RetriesExhaustedError]).} = - self.getWantHandle(BlockAddress.init(cid), inFlight) + self.getWantHandle(BlockAddress.init(cid), requested) proc resolve*( self: PendingBlocksManager, blocksDelivery: seq[BlockDelivery] @@ -128,19 +128,37 @@ func retriesExhausted*(self: PendingBlocksManager, address: BlockAddress): bool self.blocks.withValue(address, pending): result = pending[].blockRetries <= 0 -func setInFlight*(self: PendingBlocksManager, address: BlockAddress, inFlight = true) = - ## Set inflight status for a block +func isRequested*(self: PendingBlocksManager, address: BlockAddress): bool = + ## Check if a block has been requested to a peer + ## + result = false + self.blocks.withValue(address, pending): + result = pending[].requested.isSome + +func getRequestPeer*(self: PendingBlocksManager, address: BlockAddress): ?PeerId = + ## Returns the peer that requested this block + ## + result = PeerId.none + self.blocks.withValue(address, pending): + result = pending[].requested + +proc markRequested*(self: PendingBlocksManager, address: BlockAddress, peer: PeerId) = + ## Marks this block as having been requested to a peer ## - self.blocks.withValue(address, pending): - pending[].inFlight = inFlight - -func isInFlight*(self: PendingBlocksManager, address: BlockAddress): bool = - ## Check if a block is in flight - ## + if self.isRequested(address): + error "Attempt to request block twice", address = address, peer = peer self.blocks.withValue(address, pending): - result = pending[].inFlight + pending[].requested = peer.some + +proc clearRequest*( + self: PendingBlocksManager, address: BlockAddress, peer: ?PeerId = PeerId.none +) = + self.blocks.withValue(address, pending): + if peer.isSome: + assert peer == pending[].requested + pending[].requested = PeerId.none func contains*(self: PendingBlocksManager, cid: Cid): bool = BlockAddress.init(cid) in self.blocks diff --git a/codex/blockexchange/network/network.nim b/codex/blockexchange/network/network.nim index d4754110..fb6ff2d5 100644 --- a/codex/blockexchange/network/network.nim +++ b/codex/blockexchange/network/network.nim @@ -44,6 +44,7 @@ type AccountHandler* = proc(peer: PeerId, account: Account) {.gcsafe, async: (raises: []).} PaymentHandler* = proc(peer: PeerId, payment: SignedState) {.gcsafe, async: (raises: []).} + PeerEventHandler* = proc(peer: PeerId) {.gcsafe, async: (raises: [CancelledError]).} BlockExcHandlers* = object onWantList*: WantListHandler @@ -51,6 +52,9 @@ type onPresence*: BlockPresenceHandler onAccount*: AccountHandler onPayment*: PaymentHandler + onPeerJoined*: PeerEventHandler + onPeerDeparted*: PeerEventHandler + onPeerDropped*: PeerEventHandler WantListSender* = proc( id: PeerId, @@ -240,84 +244,102 @@ proc handlePayment( await network.handlers.onPayment(peer.id, payment) proc rpcHandler( - b: BlockExcNetwork, peer: NetworkPeer, msg: Message + self: BlockExcNetwork, peer: NetworkPeer, msg: Message ) {.async: (raises: []).} = ## handle rpc messages ## if msg.wantList.entries.len > 0: - b.trackedFutures.track(b.handleWantList(peer, msg.wantList)) + self.trackedFutures.track(self.handleWantList(peer, msg.wantList)) if msg.payload.len > 0: - b.trackedFutures.track(b.handleBlocksDelivery(peer, msg.payload)) + self.trackedFutures.track(self.handleBlocksDelivery(peer, msg.payload)) if msg.blockPresences.len > 0: - b.trackedFutures.track(b.handleBlockPresence(peer, msg.blockPresences)) + self.trackedFutures.track(self.handleBlockPresence(peer, msg.blockPresences)) if account =? Account.init(msg.account): - b.trackedFutures.track(b.handleAccount(peer, account)) + self.trackedFutures.track(self.handleAccount(peer, account)) if payment =? SignedState.init(msg.payment): - b.trackedFutures.track(b.handlePayment(peer, payment)) + self.trackedFutures.track(self.handlePayment(peer, payment)) -proc getOrCreatePeer(b: BlockExcNetwork, peer: PeerId): NetworkPeer = +proc getOrCreatePeer(self: BlockExcNetwork, peer: PeerId): NetworkPeer = ## Creates or retrieves a BlockExcNetwork Peer ## - if peer in b.peers: - return b.peers.getOrDefault(peer, nil) + if peer in self.peers: + return self.peers.getOrDefault(peer, nil) var getConn: ConnProvider = proc(): Future[Connection] {. async: (raises: [CancelledError]) .} = try: trace "Getting new connection stream", peer - return await b.switch.dial(peer, Codec) + return await self.switch.dial(peer, Codec) except CancelledError as error: raise error except CatchableError as exc: trace "Unable to connect to blockexc peer", exc = exc.msg - if not isNil(b.getConn): - getConn = b.getConn + if not isNil(self.getConn): + getConn = self.getConn let rpcHandler = proc(p: NetworkPeer, msg: Message) {.async: (raises: []).} = - await b.rpcHandler(p, msg) + await self.rpcHandler(p, msg) # create new pubsub peer let blockExcPeer = NetworkPeer.new(peer, getConn, rpcHandler) debug "Created new blockexc peer", peer - b.peers[peer] = blockExcPeer + self.peers[peer] = blockExcPeer return blockExcPeer -proc setupPeer*(b: BlockExcNetwork, peer: PeerId) = - ## Perform initial setup, such as want - ## list exchange - ## - - discard b.getOrCreatePeer(peer) - -proc dialPeer*(b: BlockExcNetwork, peer: PeerRecord) {.async.} = +proc dialPeer*(self: BlockExcNetwork, peer: PeerRecord) {.async.} = ## Dial a peer ## - if b.isSelf(peer.peerId): + if self.isSelf(peer.peerId): trace "Skipping dialing self", peer = peer.peerId return - if peer.peerId in b.peers: + if peer.peerId in self.peers: trace "Already connected to peer", peer = peer.peerId return - await b.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) + await self.switch.connect(peer.peerId, peer.addresses.mapIt(it.address)) -proc dropPeer*(b: BlockExcNetwork, peer: PeerId) = +proc dropPeer*( + self: BlockExcNetwork, peer: PeerId +) {.async: (raises: [CancelledError]).} = + trace "Dropping peer", peer + + try: + if not self.switch.isNil: + await self.switch.disconnect(peer) + except CatchableError as error: + warn "Error attempting to disconnect from peer", peer = peer, error = error.msg + + if not self.handlers.onPeerDropped.isNil: + await self.handlers.onPeerDropped(peer) + +proc handlePeerJoined*( + self: BlockExcNetwork, peer: PeerId +) {.async: (raises: [CancelledError]).} = + discard self.getOrCreatePeer(peer) + if not self.handlers.onPeerJoined.isNil: + await self.handlers.onPeerJoined(peer) + +proc handlePeerDeparted*( + self: BlockExcNetwork, peer: PeerId +) {.async: (raises: [CancelledError]).} = ## Cleanup disconnected peer ## - trace "Dropping peer", peer - b.peers.del(peer) + trace "Cleaning up departed peer", peer + self.peers.del(peer) + if not self.handlers.onPeerDeparted.isNil: + await self.handlers.onPeerDeparted(peer) method init*(self: BlockExcNetwork) = ## Perform protocol initialization @@ -327,9 +349,11 @@ method init*(self: BlockExcNetwork) = peerId: PeerId, event: PeerEvent ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = if event.kind == PeerEventKind.Joined: - self.setupPeer(peerId) + await self.handlePeerJoined(peerId) + elif event.kind == PeerEventKind.Left: + await self.handlePeerDeparted(peerId) else: - self.dropPeer(peerId) + warn "Unknown peer event", event self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) self.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) diff --git a/codex/blockexchange/peers/peercontext.nim b/codex/blockexchange/peers/peercontext.nim index cca6d387..5d805023 100644 --- a/codex/blockexchange/peers/peercontext.nim +++ b/codex/blockexchange/peers/peercontext.nim @@ -89,9 +89,11 @@ proc blockRequested*(self: BlockExcPeerCtx, address: BlockAddress) = proc blockRequestCancelled*(self: BlockExcPeerCtx, address: BlockAddress) = self.blocksRequested.excl(address) -proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress) = +proc blockReceived*(self: BlockExcPeerCtx, address: BlockAddress): bool = + let wasRequested = address in self.blocksRequested self.blocksRequested.excl(address) self.lastExchange = Moment.now() + wasRequested proc activityTimer*( self: BlockExcPeerCtx diff --git a/tests/codex/blockexchange/engine/testengine.nim b/tests/codex/blockexchange/engine/testengine.nim index 6688b417..7c3933c5 100644 --- a/tests/codex/blockexchange/engine/testengine.nim +++ b/tests/codex/blockexchange/engine/testengine.nim @@ -27,8 +27,6 @@ const NopSendWantCancellationsProc = proc( asyncchecksuite "NetworkStore engine basic": var - rng: Rng - seckey: PrivateKey peerId: PeerId chunker: Chunker wallet: WalletRef @@ -39,9 +37,7 @@ asyncchecksuite "NetworkStore engine basic": done: Future[void] setup: - rng = Rng.instance() - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb) wallet = WalletRef.example blockDiscovery = Discovery.new() @@ -83,7 +79,7 @@ asyncchecksuite "NetworkStore engine basic": for b in blocks: discard engine.pendingBlocks.getWantHandle(b.cid) - await engine.setupPeer(peerId) + await engine.peerAddedHandler(peerId) await done.wait(100.millis) @@ -111,14 +107,12 @@ asyncchecksuite "NetworkStore engine basic": ) engine.pricing = pricing.some - await engine.setupPeer(peerId) + await engine.peerAddedHandler(peerId) await done.wait(100.millis) asyncchecksuite "NetworkStore engine handlers": var - rng: Rng - seckey: PrivateKey peerId: PeerId chunker: Chunker wallet: WalletRef @@ -134,8 +128,7 @@ asyncchecksuite "NetworkStore engine handlers": blocks: seq[Block] setup: - rng = Rng.instance() - chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) + chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb) while true: let chunk = await chunker.getBytes() @@ -144,8 +137,7 @@ asyncchecksuite "NetworkStore engine handlers": blocks.add(Block.new(chunk).tryGet()) - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example wallet = WalletRef.example blockDiscovery = Discovery.new() peerStore = PeerCtxStore.new() @@ -249,6 +241,9 @@ asyncchecksuite "NetworkStore engine handlers": test "Should store blocks in local store": let pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) + for blk in blocks: + peerCtx.blockRequested(blk.address) + let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) # Install NOP for want list cancellations so they don't cause a crash @@ -274,6 +269,9 @@ asyncchecksuite "NetworkStore engine handlers": (it.address, Presence(address: it.address, price: rand(uint16).u256, have: true)) ).toTable + for blk in blocks: + peerContext.blockRequested(blk.address) + engine.network = BlockExcNetwork( request: BlockExcRequest( sendPayment: proc( @@ -337,33 +335,44 @@ asyncchecksuite "NetworkStore engine handlers": check a in peerCtx.peerHave check peerCtx.blocks[a].price == price - test "Should send cancellations for received blocks": + test "Should send cancellations for requested blocks only": let - pending = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) - blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) - cancellations = newTable(blocks.mapIt((it.address, newFuture[void]())).toSeq) + pendingPeer = peerId # peer towards which we have pending block requests + pendingPeerCtx = peerCtx + senderPeer = PeerId.example # peer that will actually send the blocks + senderPeerCtx = BlockExcPeerCtx(id: senderPeer) + reqBlocks = @[blocks[0], blocks[4]] # blocks that we requested to pendingPeer + reqBlockAddrs = reqBlocks.mapIt(it.address) + blockHandles = blocks.mapIt(engine.pendingBlocks.getWantHandle(it.cid)) - peerCtx.blocks = blocks.mapIt( - (it.address, Presence(address: it.address, have: true, price: UInt256.example)) - ).toTable + var cancelled: HashSet[BlockAddress] + + engine.peers.add(senderPeerCtx) + for address in reqBlockAddrs: + pendingPeerCtx.blockRequested(address) + + for address in blocks.mapIt(it.address): + senderPeerCtx.blockRequested(address) proc sendWantCancellations( id: PeerId, addresses: seq[BlockAddress] ) {.async: (raises: [CancelledError]).} = + assert id == pendingPeer for address in addresses: - cancellations[address].catch.expect("address should exist").complete() + cancelled.incl(address) engine.network = BlockExcNetwork( request: BlockExcRequest(sendWantCancellations: sendWantCancellations) ) - await engine.blocksDeliveryHandler(peerId, blocksDelivery) - discard await allFinished(pending).wait(100.millis) - await allFuturesThrowing(cancellations.values().toSeq) + let blocksDelivery = blocks.mapIt(BlockDelivery(blk: it, address: it.address)) + await engine.blocksDeliveryHandler(senderPeer, blocksDelivery) + discard await allFinished(blockHandles).wait(100.millis) + + check cancelled == reqBlockAddrs.toHashSet() asyncchecksuite "Block Download": var - rng: Rng seckey: PrivateKey peerId: PeerId chunker: Chunker @@ -380,8 +389,7 @@ asyncchecksuite "Block Download": blocks: seq[Block] setup: - rng = Rng.instance() - chunker = RandomChunker.new(rng, size = 1024'nb, chunkSize = 256'nb) + chunker = RandomChunker.new(Rng.instance(), size = 1024'nb, chunkSize = 256'nb) while true: let chunk = await chunker.getBytes() @@ -390,8 +398,7 @@ asyncchecksuite "Block Download": blocks.add(Block.new(chunk).tryGet()) - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example wallet = WalletRef.example blockDiscovery = Discovery.new() peerStore = PeerCtxStore.new() @@ -409,13 +416,27 @@ asyncchecksuite "Block Download": localStore, wallet, network, discovery, advertiser, peerStore, pendingBlocks ) - peerCtx = BlockExcPeerCtx(id: peerId) + peerCtx = BlockExcPeerCtx(id: peerId, activityTimeout: 100.milliseconds) engine.peers.add(peerCtx) - test "Should exhaust retries": + test "Should reschedule blocks on peer timeout": + let + slowPeer = peerId + fastPeer = PeerId.example + slowPeerCtx = peerCtx + # "Fast" peer has in fact a generous timeout. This should avoid timing issues + # in the test. + fastPeerCtx = BlockExcPeerCtx(id: fastPeer, activityTimeout: 60.seconds) + requestedBlock = blocks[0] + var - retries = 2 - address = BlockAddress.init(blocks[0].cid) + slowPeerWantList = newFuture[void]("slowPeerWantList") + fastPeerWantList = newFuture[void]("fastPeerWantList") + slowPeerDropped = newFuture[void]("slowPeerDropped") + slowPeerBlockRequest = newFuture[void]("slowPeerBlockRequest") + fastPeerBlockRequest = newFuture[void]("fastPeerBlockRequest") + + engine.peers.add(fastPeerCtx) proc sendWantList( id: PeerId, @@ -426,68 +447,63 @@ asyncchecksuite "Block Download": full: bool = false, sendDontHave: bool = false, ) {.async: (raises: [CancelledError]).} = - check wantType == WantHave - check not engine.pendingBlocks.isInFlight(address) - check engine.pendingBlocks.retries(address) == retries - retries -= 1 + check addresses == @[requestedBlock.address] - engine.pendingBlocks.blockRetries = 2 - engine.pendingBlocks.retryInterval = 10.millis + if wantType == WantBlock: + if id == slowPeer: + slowPeerBlockRequest.complete() + else: + fastPeerBlockRequest.complete() + + if wantType == WantHave: + if id == slowPeer: + slowPeerWantList.complete() + else: + fastPeerWantList.complete() + + proc onPeerDropped( + peer: PeerId + ): Future[void] {.async: (raises: [CancelledError]).} = + assert peer == slowPeer + slowPeerDropped.complete() + + proc selectPeer(peers: seq[BlockExcPeerCtx]): BlockExcPeerCtx = + # Looks for the slow peer. + for peer in peers: + if peer.id == slowPeer: + return peer + + return peers[0] + + engine.selectPeer = selectPeer + engine.pendingBlocks.retryInterval = 200.milliseconds engine.network = BlockExcNetwork(request: BlockExcRequest(sendWantList: sendWantList)) + engine.network.handlers.onPeerDropped = onPeerDropped - let pending = engine.requestBlock(address) + let blockHandle = engine.requestBlock(requestedBlock.address) - expect RetriesExhaustedError: - discard (await pending).tryGet() + # Waits for the peer to send its want list to both peers. + await slowPeerWantList.wait(5.seconds) + await fastPeerWantList.wait(5.seconds) - test "Should retry block request": - var - address = BlockAddress.init(blocks[0].cid) - steps = newAsyncEvent() + let blockPresence = + @[BlockPresence(address: requestedBlock.address, type: BlockPresenceType.Have)] - proc sendWantList( - id: PeerId, - addresses: seq[BlockAddress], - priority: int32 = 0, - cancel: bool = false, - wantType: WantType = WantType.WantHave, - full: bool = false, - sendDontHave: bool = false, - ) {.async: (raises: [CancelledError]).} = - case wantType - of WantHave: - check engine.pendingBlocks.isInFlight(address) == false - check engine.pendingBlocks.retriesExhausted(address) == false - steps.fire() - of WantBlock: - check engine.pendingBlocks.isInFlight(address) == true - check engine.pendingBlocks.retriesExhausted(address) == false - steps.fire() - - engine.pendingBlocks.blockRetries = 10 - engine.pendingBlocks.retryInterval = 10.millis - engine.network = BlockExcNetwork( - request: BlockExcRequest( - sendWantList: sendWantList, sendWantCancellations: NopSendWantCancellationsProc - ) - ) - - let pending = engine.requestBlock(address) - await steps.wait() - - # add blocks precense - peerCtx.blocks = blocks.mapIt( - (it.address, Presence(address: it.address, have: true, price: UInt256.example)) - ).toTable - - steps.clear() - await steps.wait() + await engine.blockPresenceHandler(slowPeer, blockPresence) + await engine.blockPresenceHandler(fastPeer, blockPresence) + # Waits for the peer to ask for the block. + await slowPeerBlockRequest.wait(5.seconds) + # Don't reply and wait for the peer to be dropped by timeout. + await slowPeerDropped.wait(5.seconds) + # The engine should retry and ask the fast peer for the block. + await fastPeerBlockRequest.wait(5.seconds) await engine.blocksDeliveryHandler( - peerId, @[BlockDelivery(blk: blocks[0], address: address)] + fastPeer, @[BlockDelivery(blk: requestedBlock, address: requestedBlock.address)] ) - check (await pending).tryGet() == blocks[0] + + discard await blockHandle.wait(5.seconds) test "Should cancel block request": var @@ -520,15 +536,8 @@ asyncchecksuite "Block Download": expect CancelledError: discard (await pending).tryGet() - # test "Should not keep looking up providers for the same dataset repeatedly": - # let - # blocks = await makeRandomBlocks(datasetSize = 4096, blockSize = 128'nb) - # manifest = await storeDataGetManifest(store, blocks) - asyncchecksuite "Task Handler": var - rng: Rng - seckey: PrivateKey peerId: PeerId chunker: Chunker wallet: WalletRef @@ -546,8 +555,7 @@ asyncchecksuite "Task Handler": blocks: seq[Block] setup: - rng = Rng.instance() - chunker = RandomChunker.new(rng, size = 1024, chunkSize = 256'nb) + chunker = RandomChunker.new(Rng.instance(), size = 1024, chunkSize = 256'nb) while true: let chunk = await chunker.getBytes() if chunk.len <= 0: @@ -555,8 +563,7 @@ asyncchecksuite "Task Handler": blocks.add(Block.new(chunk).tryGet()) - seckey = PrivateKey.random(rng[]).tryGet() - peerId = PeerId.init(seckey.getPublicKey().tryGet()).tryGet() + peerId = PeerId.example wallet = WalletRef.example blockDiscovery = Discovery.new() peerStore = PeerCtxStore.new() @@ -576,9 +583,7 @@ asyncchecksuite "Task Handler": peersCtx = @[] for i in 0 .. 3: - let seckey = PrivateKey.random(rng[]).tryGet() - peers.add(PeerId.init(seckey.getPublicKey().tryGet()).tryGet()) - + peers.add(PeerId.example) peersCtx.add(BlockExcPeerCtx(id: peers[i])) peerStore.add(peersCtx[i]) @@ -625,12 +630,12 @@ asyncchecksuite "Task Handler": # await engine.taskHandler(peersCtx[0]) - test "Should set in-flight for outgoing blocks": + test "Should mark outgoing blocks as sent": proc sendBlocksDelivery( id: PeerId, blocksDelivery: seq[BlockDelivery] ) {.async: (raises: [CancelledError]).} = let blockAddress = peersCtx[0].wantedBlocks.toSeq[0] - check peersCtx[0].isInFlight(blockAddress) + check peersCtx[0].isBlockSent(blockAddress) for blk in blocks: (await engine.localStore.putBlock(blk)).tryGet() @@ -640,10 +645,10 @@ asyncchecksuite "Task Handler": await engine.taskHandler(peersCtx[0]) - test "Should clear in-flight when local lookup fails": + test "Should not mark blocks for which local look fails as sent": peersCtx[0].wantedBlocks.incl(blocks[0].address) await engine.taskHandler(peersCtx[0]) let blockAddress = peersCtx[0].wantedBlocks.toSeq[0] - check not peersCtx[0].isInFlight(blockAddress) + check not peersCtx[0].isBlockSent(blockAddress)