From 724d41fcfac74a8e42eeeb7c4d9f81fa97fca23e Mon Sep 17 00:00:00 2001 From: gmega Date: Fri, 18 Apr 2025 11:27:24 -0300 Subject: [PATCH] add guards to fast start/stop cycles --- codex/blockexchange/swarm.nim | 48 +++++++++++++++++++++++++---------- codex/node.nim | 7 +++-- codex/rest/api.nim | 14 +++++++--- 3 files changed, 48 insertions(+), 21 deletions(-) diff --git a/codex/blockexchange/swarm.nim b/codex/blockexchange/swarm.nim index 0a838504..529b4b5b 100644 --- a/codex/blockexchange/swarm.nim +++ b/codex/blockexchange/swarm.nim @@ -64,8 +64,8 @@ type network*: BlockExcNetwork discovery*: Discovery peerEventHandler: PeerEventHandler - completionHandle: Future[?!void] + lifecycleLock: AsyncLock proc `$`(self: SwarmPeerCtx): string = return "SwarmPeerCtx(id = " & $self.id & ")" @@ -534,23 +534,42 @@ proc uninstallEventHandlers(self: Swarm) = proc start*(self: Swarm): Future[void] {.async: (raises: []).} = trace "Initialize swarm. Load block knowledge.", cid = self.cid - await self.loadBlockKnowledge() + try: + await self.lifecycleLock.acquire() + await self.loadBlockKnowledge() - trace "Joining swarm." - self.installEventHandlers() - # Bootstraps - self.trackedFutures.track(self.neighborMaintenanceLoop()) - self.trackedFutures.track(self.advertiseLoop()) + trace "Joining swarm." + self.installEventHandlers() + # Bootstraps + self.trackedFutures.track(self.neighborMaintenanceLoop()) + self.trackedFutures.track(self.advertiseLoop()) + except CancelledError: + return + finally: + try: + self.lifecycleLock.release() + except AsyncLockError as err: + # This is probably serious enough that I should raise defect in production code. + error "Failed to release lock, stopping the swarm might fail", err = err.msg proc stop*(self: Swarm): Future[void] {.async: (raises: []).} = trace "Stopping event loops and uninstalling handlers" - # We should probably have a way to actively inform the DHT tracker - # that we're leaving. - await self.trackedFutures.cancelTracked() - # Messages that arrive after this will be ignored (or - # might be delivered to another swarm if we restart a download). - self.uninstallEventHandlers() - trace "Left swarm" + try: + await self.lifecycleLock.acquire() + # We should probably have a way to actively inform the DHT tracker + # that we're leaving. + await self.trackedFutures.cancelTracked() + # Messages that arrive after this will be ignored (or + # might be delivered to another swarm if we restart a download). + self.uninstallEventHandlers() + trace "Left swarm" + except CancelledError: + return + finally: + try: + self.lifecycleLock.release() + except AsyncLockError as err: + error "Failed to release lock. Restarting this swarm might fail", err = err.msg proc new*( T: type Swarm, @@ -571,4 +590,5 @@ proc new*( downloadedBlocks: 0, trackedFutures: TrackedFutures.new(), completionHandle: newFuture[?!void]("codex.blockexchange.swarm.start"), + lifecycleLock: newAsyncLock(), ) diff --git a/codex/node.nim b/codex/node.nim index 70e624c5..68a03642 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -103,12 +103,11 @@ proc startDownload*( ## Start a download for a manifest ## - # This won't play nice with multiple downloads as the Swarm will install - # its own callbacks and listeners, but for experimentation is fine. - let swarm = Swarm.new( + # Don't start before placing the swarm in the table or + # a fast call to stop will miss it. + var swarm = Swarm.new( manifest, self.networkStore.localStore, self.engine.network, self.discovery ) - self.downloads[manifest.treeCid] = swarm await swarm.start() diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 8ff941a0..2fca772a 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -200,8 +200,9 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute if contentBody.isNone: return RestApiResponse.error(Http400, "Missing content body") - without manifest =? RestContent.fromJson(string.fromBytes(contentBody.get().data)), - err: + let rawData = string.fromBytes(contentBody.get().data) + + without manifest =? RestContent.fromJson(rawData), err: return RestApiResponse.error(Http400, err.msg) info "Starting download for manifest", @@ -222,10 +223,17 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute else: return RestApiResponse.error(Http404) - router.rawApi(MethodDelete, "/api/codex/v1/download/{cid}") do( + router.rawApi(MethodGet, "/api/codex/v1/download") do() -> RestApiResponse: + let swarms = node.downloads.keys().toSeq + return RestApiResponse.response( + $ %*{"activeSwarms": swarms}, contentType = "application/json" + ) + + router.api(MethodDelete, "/api/codex/v1/download/{cid}") do( cid: Cid, resp: HttpResponseRef ) -> RestApiResponse: let downloadId = cid.get() + info "Stopping download for manifest", cid = downloadId if downloadId notin node.downloads: return RestApiResponse.error(Http404)