allow swarms to be started/stopped so we can support sequential experiments

This commit is contained in:
gmega 2025-04-16 15:50:51 -03:00
parent 256e33193e
commit ceff7757c6
No known key found for this signature in database
GPG Key ID: 6290D34EAD824B18
3 changed files with 122 additions and 48 deletions

View File

@ -63,6 +63,7 @@ type
# Network interface
network*: BlockExcNetwork
discovery*: Discovery
peerEventHandler: PeerEventHandler
completionHandle: Future[?!void]
@ -217,15 +218,6 @@ proc loadBlockKnowledge*(self: Swarm): Future[void] {.async: (raises: []).} =
self.updateDownloadedBlocks(totalBlocks)
proc start*(self: Swarm): Future[void] {.async: (raises: []).} =
trace "Initialize swarm. Load block knowledge.", cid = self.cid
await self.loadBlockKnowledge()
trace "Joining swarm."
# Bootstraps
self.trackedFutures.track(self.neighborMaintenanceLoop())
self.trackedFutures.track(self.advertiseLoop())
proc sendBlockRequests(
self: Swarm, peer: PeerId, requests: seq[int]
) {.async: (raw: true).} =
@ -464,6 +456,102 @@ proc dropPeer*(self: Swarm, peer: PeerId) {.raises: [].} =
# drop the peer from the peers table
self.peers.del(peer)
# Since this is all very fragile, we add checks to make sure the messages we're
# getting are for the right swarm. This is especially important as we run iterated
# experiments and replace the handlers for each new download.
proc checkBlockAddress(self: Swarm, address: BlockAddress): bool =
if address.leaf:
return address.treeCid == self.manifest.treeCid
else:
return address.cid == self.manifest.treeCid
proc checkAddresses[T](self: Swarm, messages: seq[T]): bool =
return messages.allIt(checkBlockAddress(self, it.address))
proc installEventHandlers(self: Swarm) =
proc peerEventHandler(
peerId: PeerId, event: PeerEvent
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
if event.kind == PeerEventKind.Joined:
await self.setupPeer(peerId)
else:
self.dropPeer(peerId)
self.peerEventHandler = peerEventHandler
proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} =
if not self.checkAddresses(wantList.entries):
return
if wantList.full:
await self.handleBlockKnowledgeRequest(peer)
else:
await self.handleBlockRequest(peer, wantList.entries.mapIt(it.address).toSeq)
proc blocksPresenceHandler(
peer: PeerId, presence: seq[BlockPresence]
) {.async: (raises: []).} =
if not self.checkAddresses(presence):
return
self.handleBlockKnowledgeResponse(peer, presence)
proc blocksDeliveryHandler(
peer: PeerId, blocksDelivery: seq[BlockDelivery]
): Future[void] {.async: (raises: []).} =
if not self.checkAddresses(blocksDelivery):
return
self.handleBlockDelivery(peer, blocksDelivery)
if not isNil(self.network.switch):
self.network.switch.addPeerEventHandler(self.peerEventHandler, PeerEventKind.Joined)
self.network.switch.addPeerEventHandler(self.peerEventHandler, PeerEventKind.Left)
self.network.handlers = BlockExcHandlers(
onWantList: wantListHandler,
onBlocksDelivery: blocksDeliveryHandler,
onPresence: blocksPresenceHandler,
onAccount: nil,
onPayment: nil,
)
proc uninstallEventHandlers(self: Swarm) =
if not isNil(self.network.switch):
self.network.switch.removePeerEventHandler(
self.peerEventHandler, PeerEventKind.Joined
)
self.network.switch.removePeerEventHandler(
self.peerEventHandler, PeerEventKind.Left
)
self.network.handlers = BlockExcHandlers(
onWantList: nil,
onBlocksDelivery: nil,
onPresence: nil,
onAccount: nil,
onPayment: nil,
)
proc start*(self: Swarm): Future[void] {.async: (raises: []).} =
trace "Initialize swarm. Load block knowledge.", cid = self.cid
await self.loadBlockKnowledge()
trace "Joining swarm."
self.installEventHandlers()
# Bootstraps
self.trackedFutures.track(self.neighborMaintenanceLoop())
self.trackedFutures.track(self.advertiseLoop())
proc stop*(self: Swarm): Future[void] {.async: (raises: []).} =
trace "Stopping event loops and uninstalling handlers"
# We should probably have a way to actively inform the DHT tracker
# that we're leaving.
await self.trackedFutures.cancelTracked()
# Messages that arrive after this will be ignored (or
# might be delivered to another swarm if we restart a download).
self.uninstallEventHandlers()
trace "Left swarm"
proc new*(
T: type Swarm,
dataset: Manifest,
@ -471,7 +559,7 @@ proc new*(
network: BlockExcNetwork,
discovery: Discovery,
): Swarm =
let self = Swarm(
return Swarm(
manifest: dataset,
localStore: localStore,
network: network,
@ -484,41 +572,3 @@ proc new*(
trackedFutures: TrackedFutures.new(),
completionHandle: newFuture[?!void]("codex.blockexchange.swarm.start"),
)
proc peerEventHandler(
peerId: PeerId, event: PeerEvent
): Future[void] {.gcsafe, async: (raises: [CancelledError]).} =
if event.kind == PeerEventKind.Joined:
await self.setupPeer(peerId)
else:
self.dropPeer(peerId)
proc wantListHandler(peer: PeerId, wantList: WantList) {.async: (raises: []).} =
if wantList.full:
await self.handleBlockKnowledgeRequest(peer)
else:
await self.handleBlockRequest(peer, wantList.entries.mapIt(it.address).toSeq)
proc blocksPresenceHandler(
peer: PeerId, presence: seq[BlockPresence]
) {.async: (raises: []).} =
self.handleBlockKnowledgeResponse(peer, presence)
proc blocksDeliveryHandler(
peer: PeerId, blocksDelivery: seq[BlockDelivery]
): Future[void] {.async: (raises: []).} =
self.handleBlockDelivery(peer, blocksDelivery)
if not isNil(network.switch):
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Joined)
network.switch.addPeerEventHandler(peerEventHandler, PeerEventKind.Left)
network.handlers = BlockExcHandlers(
onWantList: wantListHandler,
onBlocksDelivery: blocksDeliveryHandler,
onPresence: blocksPresenceHandler,
onAccount: nil,
onPayment: nil,
)
return self

View File

@ -120,6 +120,17 @@ proc downloadStatus*(self: CodexNodeRef, dataset: Cid): ?(int, int) =
except KeyError:
return none((int, int))
proc stopDownload*(self: CodexNodeRef, dataset: Cid): Future[?!void] {.async.} =
## Stop a download for a manifest
##
if dataset notin self.downloads:
return success()
await self.downloads[dataset].stop()
self.downloads.del(dataset)
return success()
proc storeManifest*(
self: CodexNodeRef, manifest: Manifest
): Future[?!bt.Block] {.async.} =

View File

@ -222,6 +222,19 @@ proc initDataApi(node: CodexNodeRef, repoStore: RepoStore, router: var RestRoute
else:
return RestApiResponse.error(Http404)
router.rawApi(MethodDelete, "/api/codex/v1/download/{cid}") do(
cid: Cid, resp: HttpResponseRef
) -> RestApiResponse:
let downloadId = cid.get()
if downloadId notin node.downloads:
return RestApiResponse.error(Http404)
if err =? (await node.stopDownload(downloadId)).errorOption:
return RestApiResponse.error(Http500, err.msg)
resp.status = Http204
await resp.sendBody("")
router.rawApi(MethodPost, "/api/codex/v1/data") do() -> RestApiResponse:
## Upload a file in a streaming manner
##