diff --git a/codex/blockexchange/swarm.nim b/codex/blockexchange/swarm.nim index 0cf638b7..0a838504 100644 --- a/codex/blockexchange/swarm.nim +++ b/codex/blockexchange/swarm.nim @@ -63,6 +63,7 @@ type # Network interface network*: BlockExcNetwork discovery*: Discovery + peerEventHandler: PeerEventHandler completionHandle: Future[?!void] @@ -217,15 +218,6 @@ proc loadBlockKnowledge*(self: Swarm): Future[void] {.async: (raises: []).} = self.updateDownloadedBlocks(totalBlocks) -proc start*(self: Swarm): Future[void] {.async: (raises: []).} = - trace "Initialize swarm. Load block knowledge.", cid = self.cid - await self.loadBlockKnowledge() - - trace "Joining swarm." - # Bootstraps - self.trackedFutures.track(self.neighborMaintenanceLoop()) - self.trackedFutures.track(self.advertiseLoop()) - proc sendBlockRequests( self: Swarm, peer: PeerId, requests: seq[int] ) {.async: (raw: true).} = @@ -464,6 +456,102 @@ proc dropPeer*(self: Swarm, peer: PeerId) {.raises: [].} = # drop the peer from the peers table self.peers.del(peer) +# Since this is all very fragile, we add checks to make sure the messages we're +# getting are for the right swarm. This is especially important as we run iterated +# experiments and replace the handlers for each new download. +proc checkBlockAddress(self: Swarm, address: BlockAddress): bool = + if address.leaf: + return address.treeCid == self.manifest.treeCid + else: + return address.cid == self.manifest.treeCid + +proc checkAddresses[T](self: Swarm, messages: seq[T]): bool = + return messages.allIt(checkBlockAddress(self, it.address)) + +proc installEventHandlers(self: Swarm) = + proc peerEventHandler( + peerId: PeerId, event: PeerEvent + ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = + if event.kind == PeerEventKind.Joined: + await self.setupPeer(peerId) + else: + self.dropPeer(peerId) + + self.peerEventHandler = peerEventHandler + + proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} = + if not self.checkAddresses(wantList.entries): + return + + if wantList.full: + await self.handleBlockKnowledgeRequest(peer) + else: + await self.handleBlockRequest(peer, wantList.entries.mapIt(it.address).toSeq) + + proc blocksPresenceHandler( + peer: PeerId, presence: seq[BlockPresence] + ) {.async: (raises: []).} = + if not self.checkAddresses(presence): + return + + self.handleBlockKnowledgeResponse(peer, presence) + + proc blocksDeliveryHandler( + peer: PeerId, blocksDelivery: seq[BlockDelivery] + ): Future[void] {.async: (raises: []).} = + if not self.checkAddresses(blocksDelivery): + return + self.handleBlockDelivery(peer, blocksDelivery) + + if not isNil(self.network.switch): + self.network.switch.addPeerEventHandler(self.peerEventHandler, PeerEventKind.Joined) + self.network.switch.addPeerEventHandler(self.peerEventHandler, PeerEventKind.Left) + + self.network.handlers = BlockExcHandlers( + onWantList: wantListHandler, + onBlocksDelivery: blocksDeliveryHandler, + onPresence: blocksPresenceHandler, + onAccount: nil, + onPayment: nil, + ) + +proc uninstallEventHandlers(self: Swarm) = + if not isNil(self.network.switch): + self.network.switch.removePeerEventHandler( + self.peerEventHandler, PeerEventKind.Joined + ) + self.network.switch.removePeerEventHandler( + self.peerEventHandler, PeerEventKind.Left + ) + + self.network.handlers = BlockExcHandlers( + onWantList: nil, + onBlocksDelivery: nil, + onPresence: nil, + onAccount: nil, + onPayment: nil, + ) + +proc start*(self: Swarm): Future[void] {.async: (raises: []).} = + trace "Initialize swarm. Load block knowledge.", cid = self.cid + await self.loadBlockKnowledge() + + trace "Joining swarm." + self.installEventHandlers() + # Bootstraps + self.trackedFutures.track(self.neighborMaintenanceLoop()) + self.trackedFutures.track(self.advertiseLoop()) + +proc stop*(self: Swarm): Future[void] {.async: (raises: []).} = + trace "Stopping event loops and uninstalling handlers" + # We should probably have a way to actively inform the DHT tracker + # that we're leaving. + await self.trackedFutures.cancelTracked() + # Messages that arrive after this will be ignored (or + # might be delivered to another swarm if we restart a download). + self.uninstallEventHandlers() + trace "Left swarm" + proc new*( T: type Swarm, dataset: Manifest, @@ -471,7 +559,7 @@ proc new*( network: BlockExcNetwork, discovery: Discovery, ): Swarm = - let self = Swarm( + return Swarm( manifest: dataset, localStore: localStore, network: network, @@ -484,41 +572,3 @@ proc new*( trackedFutures: TrackedFutures.new(), completionHandle: newFuture[?!void]("codex.blockexchange.swarm.start"), ) - - proc peerEventHandler( - peerId: PeerId, event: PeerEvent - ): Future[void] {.gcsafe, async: (raises: [CancelledError]).} = - if event.kind == PeerEventKind.Joined: - await self.setupPeer(peerId) - else: - self.dropPeer(peerId) - - proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} = - if wantList.full: - await self.handleBlockKnowledgeRequest(peer) - else: - await self.handleBlockRequest(peer, wantList.entries.mapIt(it.address).toSeq) - - proc blocksPresenceHandler( - peer: PeerId, presence: seq[BlockPresence] - ) {.async: (raises: []).} = - self.handleBlockKnowledgeResponse(peer, presence) - - proc blocksDeliveryHandler( - peer: PeerId, blocksDelivery: seq[BlockDelivery] - ): Future[void] {.async: (raises: []).} = - self.handleBlockDelivery(peer, blocksDelivery) - - if not isNil(network.switch): - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined) - network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left) - - network.handlers = BlockExcHandlers( - onWantList: wantListHandler, - onBlocksDelivery: blocksDeliveryHandler, - onPresence: blocksPresenceHandler, - onAccount: nil, - onPayment: nil, - ) - - return self diff --git a/codex/node.nim b/codex/node.nim index 482ed6f9..70e624c5 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -120,6 +120,17 @@ proc downloadStatus*(self: CodexNodeRef, dataset: Cid): ?(int, int) = except KeyError: return none((int, int)) +proc stopDownload*(self: CodexNodeRef, dataset: Cid): Future[?!void] {.async.} = + ## Stop a download for a manifest + ## + if dataset notin self.downloads: + return success() + + await self.downloads[dataset].stop() + + self.downloads.del(dataset) + return success() + proc storeManifest*( self: CodexNodeRef, manifest: Manifest ): Future[?!bt.Block] {.async.} = diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 3b34aecc..8ff941a0 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -222,6 +222,19 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute else: return RestApiResponse.error(Http404) + router.rawApi(MethodDelete, "/api/codex/v1/download/{cid}") do( + cid: Cid, resp: HttpResponseRef + ) -> RestApiResponse: + let downloadId = cid.get() + if downloadId notin node.downloads: + return RestApiResponse.error(Http404) + + if err =? (await node.stopDownload(downloadId)).errorOption: + return RestApiResponse.error(Http500, err.msg) + + resp.status = Http204 + await resp.sendBody("") + router.rawApi(MethodPost, "/api/codex/v1/data") do() -> RestApiResponse: ## Upload a file in a streaming manner ##