add guards to fast start/stop cycles

This commit is contained in:
gmega 2025-04-18 11:27:24 -03:00
parent ceff7757c6
commit 724d41fcfa
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
3 changed files with 48 additions and 21 deletions

View File

@ -64,8 +64,8 @@ type
network*: BlockExcNetwork network*: BlockExcNetwork
discovery*: Discovery discovery*: Discovery
peerEventHandler: PeerEventHandler peerEventHandler: PeerEventHandler
completionHandle: Future[?!void] completionHandle: Future[?!void]
lifecycleLock: AsyncLock
proc `$`(self: SwarmPeerCtx): string = proc `$`(self: SwarmPeerCtx): string =
return "SwarmPeerCtx(id = " & $self.id & ")" return "SwarmPeerCtx(id = " & $self.id & ")"
@ -534,23 +534,42 @@ proc uninstallEventHandlers(self: Swarm) =
proc start*(self: Swarm): Future[void] {.async: (raises: []).} = proc start*(self: Swarm): Future[void] {.async: (raises: []).} =
trace "Initialize swarm. Load block knowledge.", cid = self.cid trace "Initialize swarm. Load block knowledge.", cid = self.cid
await self.loadBlockKnowledge() try:
await self.lifecycleLock.acquire()
await self.loadBlockKnowledge()
trace "Joining swarm." trace "Joining swarm."
self.installEventHandlers() self.installEventHandlers()
# Bootstraps # Bootstraps
self.trackedFutures.track(self.neighborMaintenanceLoop()) self.trackedFutures.track(self.neighborMaintenanceLoop())
self.trackedFutures.track(self.advertiseLoop()) self.trackedFutures.track(self.advertiseLoop())
except CancelledError:
return
finally:
try:
self.lifecycleLock.release()
except AsyncLockError as err:
# This is probably serious enough that I should raise defect in production code.
error "Failed to release lock, stopping the swarm might fail", err = err.msg
proc stop*(self: Swarm): Future[void] {.async: (raises: []).} = proc stop*(self: Swarm): Future[void] {.async: (raises: []).} =
trace "Stopping event loops and uninstalling handlers" trace "Stopping event loops and uninstalling handlers"
# We should probably have a way to actively inform the DHT tracker try:
# that we're leaving. await self.lifecycleLock.acquire()
await self.trackedFutures.cancelTracked() # We should probably have a way to actively inform the DHT tracker
# Messages that arrive after this will be ignored (or # that we're leaving.
# might be delivered to another swarm if we restart a download). await self.trackedFutures.cancelTracked()
self.uninstallEventHandlers() # Messages that arrive after this will be ignored (or
trace "Left swarm" # might be delivered to another swarm if we restart a download).
self.uninstallEventHandlers()
trace "Left swarm"
except CancelledError:
return
finally:
try:
self.lifecycleLock.release()
except AsyncLockError as err:
error "Failed to release lock. Restarting this swarm might fail", err = err.msg
proc new*( proc new*(
T: type Swarm, T: type Swarm,
@ -571,4 +590,5 @@ proc new*(
downloadedBlocks: 0, downloadedBlocks: 0,
trackedFutures: TrackedFutures.new(), trackedFutures: TrackedFutures.new(),
completionHandle: newFuture[?!void]("codex.blockexchange.swarm.start"), completionHandle: newFuture[?!void]("codex.blockexchange.swarm.start"),
lifecycleLock: newAsyncLock(),
) )

View File

@ -103,12 +103,11 @@ proc startDownload*(
## Start a download for a manifest ## Start a download for a manifest
## ##
# This won't play nice with multiple downloads as the Swarm will install # Don't start before placing the swarm in the table or
# its own callbacks and listeners, but for experimentation is fine. # a fast call to stop will miss it.
let swarm = Swarm.new( var swarm = Swarm.new(
manifest, self.networkStore.localStore, self.engine.network, self.discovery manifest, self.networkStore.localStore, self.engine.network, self.discovery
) )
self.downloads[manifest.treeCid] = swarm self.downloads[manifest.treeCid] = swarm
await swarm.start() await swarm.start()

View File

@ -200,8 +200,9 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
if contentBody.isNone: if contentBody.isNone:
return RestApiResponse.error(Http400, "Missing content body") return RestApiResponse.error(Http400, "Missing content body")
without manifest =? RestContent.fromJson(string.fromBytes(contentBody.get().data)), let rawData = string.fromBytes(contentBody.get().data)
err:
without manifest =? RestContent.fromJson(rawData), err:
return RestApiResponse.error(Http400, err.msg) return RestApiResponse.error(Http400, err.msg)
info "Starting download for manifest", info "Starting download for manifest",
@ -222,10 +223,17 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
else: else:
return RestApiResponse.error(Http404) return RestApiResponse.error(Http404)
router.rawApi(MethodDelete, "/api/codex/v1/download/{cid}") do( router.rawApi(MethodGet, "/api/codex/v1/download") do() -> RestApiResponse:
let swarms = node.downloads.keys().toSeq
return RestApiResponse.response(
$ %*{"activeSwarms": swarms}, contentType = "application/json"
)
router.api(MethodDelete, "/api/codex/v1/download/{cid}") do(
cid: Cid, resp: HttpResponseRef cid: Cid, resp: HttpResponseRef
) -> RestApiResponse: ) -> RestApiResponse:
let downloadId = cid.get() let downloadId = cid.get()
info "Stopping download for manifest", cid = downloadId
if downloadId notin node.downloads: if downloadId notin node.downloads:
return RestApiResponse.error(Http404) return RestApiResponse.error(Http404)