From 230b9b8fb29a11027c480db5614b6817c7141261 Mon Sep 17 00:00:00 2001 From: thatben Date: Mon, 2 Jun 2025 14:30:28 +0200 Subject: [PATCH] updating --- codexcrawler/application.nim | 4 +- codexcrawler/components/chaincrawler.nim | 2 +- codexcrawler/components/chainmetrics.nim | 2 +- codexcrawler/components/dhtcrawler.nim | 2 +- codexcrawler/components/dhtmetrics.nim | 14 +-- codexcrawler/components/nodestore.nim | 18 ++-- codexcrawler/components/timetracker.nim | 2 +- codexcrawler/components/todolist.nim | 6 +- codexcrawler/installer.nim | 2 +- codexcrawler/list.nim | 2 +- codexcrawler/services/dht.nim | 25 +++--- codexcrawler/services/marketplace.nim | 4 +- codexcrawler/services/marketplace/config.nim | 2 +- codexcrawler/services/marketplace/market.nim | 94 ++++++++++---------- codexcrawler/state.nim | 4 +- codexcrawler/utils/asyncdataevent.nim | 8 +- 16 files changed, 97 insertions(+), 94 deletions(-) diff --git a/codexcrawler/application.nim b/codexcrawler/application.nim index e5a0d07..d3b34dd 100644 --- a/codexcrawler/application.nim +++ b/codexcrawler/application.nim @@ -18,7 +18,7 @@ type Application* = ref object state: State components: seq[Component] -proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} = +proc initializeApp(app: Application, config: Config): Future[?!void] {.async: (raises: [CancelledError]).} = app.state = State( status: ApplicationStatus.Running, config: config, @@ -48,7 +48,7 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} = return success() -proc stopComponents(app: Application) {.async.} = +proc stopComponents(app: Application) {.async: (raises: [CancelledError]).} = for c in app.components: if err =? (await c.stop()).errorOption: error "Failed to stop component", err = err.msg diff --git a/codexcrawler/components/chaincrawler.nim b/codexcrawler/components/chaincrawler.nim index f957630..3f9cd8d 100644 --- a/codexcrawler/components/chaincrawler.nim +++ b/codexcrawler/components/chaincrawler.nim @@ -19,7 +19,7 @@ type ChainCrawler* = ref object of Component proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} = return await c.store.add(rid) -method start*(c: ChainCrawler): Future[?!void] {.async.} = +method start*(c: ChainCrawler): Future[?!void] {.async: (raises: [CancelledError]).} = info "starting..." proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} = diff --git a/codexcrawler/components/chainmetrics.nim b/codexcrawler/components/chainmetrics.nim index bf7d8f6..6d6340d 100644 --- a/codexcrawler/components/chainmetrics.nim +++ b/codexcrawler/components/chainmetrics.nim @@ -65,7 +65,7 @@ proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} = c.updateMetrics(update) return success() -method start*(c: ChainMetrics): Future[?!void] {.async.} = +method start*(c: ChainMetrics): Future[?!void] {.async: (raises: [CancelledError]).} = info "starting..." proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = diff --git a/codexcrawler/components/dhtcrawler.nim b/codexcrawler/components/dhtcrawler.nim index 0c594b2..fa80170 100644 --- a/codexcrawler/components/dhtcrawler.nim +++ b/codexcrawler/components/dhtcrawler.nim @@ -46,7 +46,7 @@ proc step(c: DhtCrawler): Future[?!void] {.async: (raises: []).} = return success() -method start*(c: DhtCrawler): Future[?!void] {.async.} = +method start*(c: DhtCrawler): Future[?!void] {.async: (raises: [CancelledError]).} = info "starting..." proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} = diff --git a/codexcrawler/components/dhtmetrics.nim b/codexcrawler/components/dhtmetrics.nim index 374e169..54d4d33 100644 --- a/codexcrawler/components/dhtmetrics.nim +++ b/codexcrawler/components/dhtmetrics.nim @@ -27,7 +27,7 @@ proc updateMetrics(d: DhtMetrics) = proc handleCheckEvent( d: DhtMetrics, event: DhtNodeCheckEventData -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = if event.isOk: ?await d.ok.add(event.id) ?await d.nok.remove(event.id) @@ -38,31 +38,31 @@ proc handleCheckEvent( d.updateMetrics() return success() -proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.} = +proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} = for nid in nids: ?await d.ok.remove(nid) ?await d.nok.remove(nid) d.updateMetrics() return success() -method awake*(d: DhtMetrics): Future[?!void] {.async.} = - proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = +method awake*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} = + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async: (raises: [CancelledError]).} = await d.handleCheckEvent(event) - proc onDelete(nids: seq[Nid]): Future[?!void] {.async.} = + proc onDelete(nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} = await d.handleDeleteEvent(nids) d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck) d.subDel = d.state.events.nodesDeleted.subscribe(onDelete) return success() -method start*(d: DhtMetrics): Future[?!void] {.async.} = +method start*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} = info "starting..." ?await d.ok.load() ?await d.nok.load() return success() -method stop*(d: DhtMetrics): Future[?!void] {.async.} = +method stop*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} = await d.state.events.dhtNodeCheck.unsubscribe(d.subCheck) await d.state.events.nodesDeleted.unsubscribe(d.subDel) return success() diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index ef22ae0..f10ff85 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -76,7 +76,7 @@ proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T = ) return NodeEntry.fromBytes(bytes) -proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = +proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async: (raises: [CancelledError]).} = without key =? Key.init(nodestoreName / $nid), err: error "failed to format key", err = err.msg return failure(err) @@ -91,7 +91,7 @@ proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = return success(not exists) -proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = +proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} = await s.state.events.newNodesDiscovered.fire(nids) proc fireNodesDeleted( @@ -99,7 +99,7 @@ proc fireNodesDeleted( ): Future[?!void] {.async: (raises: []).} = await s.state.events.nodesDeleted.fire(nids) -proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = +proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} = var newNodes = newSeq[Nid]() for nid in nids: without isNew =? (await s.storeNodeIsNew(nid)), err: @@ -114,7 +114,7 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} = proc processNodeCheck( s: NodeStore, event: DhtNodeCheckEventData -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = without key =? Key.init(nodestoreName / $(event.id)), err: error "failed to format key", err = err.msg return failure(err) @@ -142,7 +142,7 @@ proc processNodeCheck( ?await s.store.put(key, entry) return success() -proc deleteEntry(s: NodeStore, nid: Nid): Future[?!bool] {.async.} = +proc deleteEntry(s: NodeStore, nid: Nid): Future[?!bool] {.async: (raises: [CancelledError]).} = without key =? Key.init(nodestoreName / $nid), err: error "failed to format key", err = err.msg return failure(err) @@ -202,20 +202,20 @@ method deleteEntries*( ?await s.fireNodesDeleted(deleted) return success() -method start*(s: NodeStore): Future[?!void] {.async.} = +method start*(s: NodeStore): Future[?!void] {.async: (raises: [CancelledError]).} = info "starting..." - proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} = + proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} = return await s.processFoundNodes(nids) - proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} = + proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async: (raises: [CancelledError]).} = return await s.processNodeCheck(event) s.subFound = s.state.events.nodesFound.subscribe(onNodesFound) s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck) return success() -method stop*(s: NodeStore): Future[?!void] {.async.} = +method stop*(s: NodeStore): Future[?!void] {.async: (raises: [CancelledError]).} = await s.state.events.nodesFound.unsubscribe(s.subFound) await s.state.events.dhtNodeCheck.unsubscribe(s.subCheck) return success() diff --git a/codexcrawler/components/timetracker.nim b/codexcrawler/components/timetracker.nim index 2683b0a..b38d2a0 100644 --- a/codexcrawler/components/timetracker.nim +++ b/codexcrawler/components/timetracker.nim @@ -56,7 +56,7 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: [] return failure(err) return success() -method start*(t: TimeTracker): Future[?!void] {.async.} = +method start*(t: TimeTracker): Future[?!void] {.async: (raises: [CancelledError]).} = info "starting..." proc onCheckRevisitAndExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} = diff --git a/codexcrawler/components/todolist.nim b/codexcrawler/components/todolist.nim index 33144ae..2d4093e 100644 --- a/codexcrawler/components/todolist.nim +++ b/codexcrawler/components/todolist.nim @@ -54,10 +54,10 @@ method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} = return success(item) -method awake*(t: TodoList): Future[?!void] {.async.} = +method awake*(t: TodoList): Future[?!void] {.async: (raises: [CancelledError]).} = info "initializing..." - proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} = + proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} = t.addNodes(nids) return success() @@ -65,7 +65,7 @@ method awake*(t: TodoList): Future[?!void] {.async.} = t.subRev = t.state.events.nodesToRevisit.subscribe(onNewNodes) return success() -method stop*(t: TodoList): Future[?!void] {.async.} = +method stop*(t: TodoList): Future[?!void] {.async: (raises: [CancelledError]).} = await t.state.events.newNodesDiscovered.unsubscribe(t.subNew) await t.state.events.nodesToRevisit.unsubscribe(t.subRev) return success() diff --git a/codexcrawler/installer.nim b/codexcrawler/installer.nim index 80fa924..e8a12f1 100644 --- a/codexcrawler/installer.nim +++ b/codexcrawler/installer.nim @@ -18,7 +18,7 @@ import ./components/chainmetrics import ./components/chaincrawler import ./components/requeststore -proc createComponents*(state: State): Future[?!seq[Component]] {.async.} = +proc createComponents*(state: State): Future[?!seq[Component]] {.async: (raises: [CancelledError]).} = var components: seq[Component] = newSeq[Component]() let clock = createClock() diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index d1985a4..7ead86c 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -32,7 +32,7 @@ proc decode(T: type Nid, bytes: seq[byte]): ?!T = return success(Nid.fromStr("0")) return Nid.fromBytes(bytes) -proc saveItem(this: List, item: Nid): Future[?!void] {.async.} = +proc saveItem(this: List, item: Nid): Future[?!void] {.async: (raises: [CancelledError]).} = without itemKey =? Key.init(this.name / $item), err: return failure(err) ?await this.store.put(itemKey, item) diff --git a/codexcrawler/services/dht.nim b/codexcrawler/services/dht.nim index e4726fc..e5068db 100644 --- a/codexcrawler/services/dht.nim +++ b/codexcrawler/services/dht.nim @@ -75,15 +75,18 @@ method getNeighbors*( except CatchableError as exc: return failure(exc.msg) -proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} = +proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async: (raises: [CancelledError]).} = trace "protocol.resolve..." - let node = await d.protocol.resolve(toNodeId(peerId)) - - return - if node.isSome(): - node.get().record.data.some - else: - PeerRecord.none + try: + let node = await d.protocol.resolve(toNodeId(peerId)) + return + if node.isSome(): + node.get().record.data.some + else: + PeerRecord.none + except CatchableError as exc: + error "CatchableError in protocol.resolve", err = exc.msg + return PeerRecord.none method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} = trace "Removing provider", peerId @@ -109,12 +112,12 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) = if not d.protocol.isNil: d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") -method start*(d: Dht): Future[?!void] {.async.} = +method start*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} = d.protocol.open() await d.protocol.start() return success() -method stop*(d: Dht): Future[?!void] {.async.} = +method stop*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} = await d.protocol.closeWait() return success() @@ -154,7 +157,7 @@ proc new( self -proc createDht*(state: State): Future[?!Dht] {.async.} = +proc createDht*(state: State): Future[?!Dht] {.async: (raises: [CancelledError]).} = without dhtStore =? createDatastore(state.config.dataDir / "dht"), err: return failure(err) let keyPath = state.config.dataDir / "privatekey" diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim index 7513d02..4c69fda 100644 --- a/codexcrawler/services/marketplace.nim +++ b/codexcrawler/services/marketplace.nim @@ -50,7 +50,7 @@ proc fetchRequestInfo( method subscribeToNewRequests*( m: MarketplaceService, onNewRequest: OnNewRequest ): Future[?!void] {.async: (raises: []), base.} = - proc resultWrapper(rid: Rid): Future[void] {.async.} = + proc resultWrapper(rid: Rid): Future[void] {.async: (raises: [CancelledError]).} = let response = await onNewRequest(rid) if error =? response.errorOption: raiseAssert("Error result in handling of onNewRequest callback: " & error.msg) @@ -109,7 +109,7 @@ method getRequestInfo*( else: notStarted() -method awake*(m: MarketplaceService): Future[?!void] {.async.} = +method awake*(m: MarketplaceService): Future[?!void] {.async: (raises: [CancelledError]).} = let provider = JsonRpcProvider.new(m.state.config.ethProvider) without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): return failure("Invalid MarketplaceAddress provided") diff --git a/codexcrawler/services/marketplace/config.nim b/codexcrawler/services/marketplace/config.nim index 3c31c8b..83b39c0 100644 --- a/codexcrawler/services/marketplace/config.nim +++ b/codexcrawler/services/marketplace/config.nim @@ -1,5 +1,5 @@ import pkg/contractabi -import pkg/ethers/fields +import pkg/ethers/contracts/fields import pkg/questionable/results export contractabi diff --git a/codexcrawler/services/marketplace/market.nim b/codexcrawler/services/marketplace/market.nim index 6b00376..64fa8c2 100644 --- a/codexcrawler/services/marketplace/market.nim +++ b/codexcrawler/services/marketplace/market.nim @@ -121,7 +121,7 @@ proc config( return resolvedConfig -proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = +proc approveFunds(market: OnChainMarket, amount: UInt256) {.async: (raises: [CancelledError]).} = raiseAssert("Not available: approveFunds") proc getZkeyHash*( @@ -138,43 +138,43 @@ proc periodicity*( let period = config.proofs.period return Periodicity(seconds: period) -proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async.} = +proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} = convertEthersError: let config = await market.config() return config.proofs.timeout -proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async.} = +proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} = convertEthersError: let config = await market.config() return config.collateral.repairRewardPercentage -proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} = +proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} = convertEthersError: let config = await market.config() return config.requestDurationLimit -proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} = +proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} = convertEthersError: let config = await market.config() return config.proofs.downtime -proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} = +proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async: (raises: [CancelledError]).} = convertEthersError: let overrides = CallOverrides(blockTag: some BlockTag.pending) return await market.contract.getPointer(slotId, overrides) -proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} = +proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async: (raises: [CancelledError]).} = convertEthersError: return await market.contract.myRequests -proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} = +proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async: (raises: [CancelledError]).} = convertEthersError: let slots = await market.contract.mySlots() debug "Fetched my slots", numSlots = len(slots) return slots -proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} = +proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (raises: [CancelledError]).} = convertEthersError: debug "Requesting storage" await market.approveFunds(request.totalPrice()) @@ -182,7 +182,7 @@ proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} = proc getRequest*( market: OnChainMarket, id: RequestId -): Future[?StorageRequest] {.async.} = +): Future[?StorageRequest] {.async: (raises: [CancelledError]).} = let key = $id convertEthersError: @@ -194,7 +194,7 @@ proc getRequest*( proc requestState*( market: OnChainMarket, requestId: RequestId -): Future[?RequestState] {.async.} = +): Future[?RequestState] {.async: (raises: [CancelledError]).} = convertEthersError: try: let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -202,22 +202,22 @@ proc requestState*( except Marketplace_UnknownRequest: return none RequestState -proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async.} = +proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async: (raises: [CancelledError]).} = convertEthersError: let overrides = CallOverrides(blockTag: some BlockTag.pending) return await market.contract.slotState(slotId, overrides) -proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async.} = +proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} = convertEthersError: return (await market.contract.requestEnd(id)).uint64 -proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async.} = +proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} = convertEthersError: return (await market.contract.requestExpiry(id)).uint64 proc getHost( market: OnChainMarket, requestId: RequestId, slotIndex: uint64 -): Future[?Address] {.async.} = +): Future[?Address] {.async: (raises: [CancelledError]).} = convertEthersError: let slotId = slotId(requestId, slotIndex) let address = await market.contract.getHost(slotId) @@ -228,11 +228,11 @@ proc getHost( proc currentCollateral*( market: OnChainMarket, slotId: SlotId -): Future[UInt256] {.async.} = +): Future[UInt256] {.async: (raises: [CancelledError]).} = convertEthersError: return await market.contract.currentCollateral(slotId) -proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async.} = +proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async: (raises: [CancelledError]).} = convertEthersError: try: return some await market.contract.getActiveSlot(slotId) @@ -245,7 +245,7 @@ proc fillSlot( slotIndex: uint64, proof: Groth16Proof, collateral: UInt256, -) {.async.} = +) {.async: (raises: [CancelledError]).} = convertEthersError: logScope: requestId @@ -256,14 +256,14 @@ proc fillSlot( discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(1) trace "fillSlot transaction completed" -proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} = +proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async: (raises: [CancelledError]).} = raiseAssert("Not supported") -proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} = +proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async: (raises: [CancelledError]).} = convertEthersError: discard await market.contract.withdrawFunds(requestId).confirm(1) -proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} = +proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} = convertEthersError: try: let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -271,7 +271,7 @@ proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} except Marketplace_SlotIsFree: return false -proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} = +proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} = convertEthersError: try: let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -281,22 +281,22 @@ proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.asy proc getChallenge*( market: OnChainMarket, id: SlotId -): Future[ProofChallenge] {.async.} = +): Future[ProofChallenge] {.async: (raises: [CancelledError]).} = convertEthersError: let overrides = CallOverrides(blockTag: some BlockTag.pending) return await market.contract.getChallenge(id, overrides) -proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} = +proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async: (raises: [CancelledError]).} = convertEthersError: discard await market.contract.submitProof(id, proof).confirm(1) -proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} = +proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async: (raises: [CancelledError]).} = convertEthersError: discard await market.contract.markProofAsMissing(id, period).confirm(1) proc canProofBeMarkedAsMissing*( market: OnChainMarket, id: SlotId, period: Period -): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError]).} = let provider = market.contract.provider let contractWithoutSigner = market.contract.connect(provider) let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -309,7 +309,7 @@ proc canProofBeMarkedAsMissing*( proc reserveSlot*( market: OnChainMarket, requestId: RequestId, slotIndex: uint64 -) {.async.} = +) {.async: (raises: [CancelledError]).} = convertEthersError: discard await market.contract .reserveSlot( @@ -322,13 +322,13 @@ proc reserveSlot*( proc canReserveSlot*( market: OnChainMarket, requestId: RequestId, slotIndex: uint64 -): Future[bool] {.async.} = +): Future[bool] {.async: (raises: [CancelledError]).} = convertEthersError: return await market.contract.canReserveSlot(requestId, slotIndex) proc subscribeRequests*( market: OnChainMarket, callback: OnRequest -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in Request subscription", msg = eventErr.msg @@ -342,7 +342,7 @@ proc subscribeRequests*( proc subscribeSlotFilled*( market: OnChainMarket, callback: OnSlotFilled -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in SlotFilled subscription", msg = eventErr.msg @@ -359,7 +359,7 @@ proc subscribeSlotFilled*( requestId: RequestId, slotIndex: uint64, callback: OnSlotFilled, -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) = if eventRequestId == requestId and eventSlotIndex == slotIndex: callback(requestId, slotIndex) @@ -369,7 +369,7 @@ proc subscribeSlotFilled*( proc subscribeSlotFreed*( market: OnChainMarket, callback: OnSlotFreed -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in SlotFreed subscription", msg = eventErr.msg @@ -383,7 +383,7 @@ proc subscribeSlotFreed*( proc subscribeSlotReservationsFull*( market: OnChainMarket, callback: OnSlotReservationsFull -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in SlotReservationsFull subscription", @@ -398,7 +398,7 @@ proc subscribeSlotReservationsFull*( proc subscribeFulfillment( market: OnChainMarket, callback: OnFulfillment -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFulfillment subscription", msg = eventErr.msg @@ -412,7 +412,7 @@ proc subscribeFulfillment( proc subscribeFulfillment( market: OnChainMarket, requestId: RequestId, callback: OnFulfillment -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFulfillment subscription", msg = eventErr.msg @@ -427,7 +427,7 @@ proc subscribeFulfillment( proc subscribeRequestCancelled*( market: OnChainMarket, callback: OnRequestCancelled -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestCancelled subscription", msg = eventErr.msg @@ -441,7 +441,7 @@ proc subscribeRequestCancelled*( proc subscribeRequestCancelled*( market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestCancelled subscription", msg = eventErr.msg @@ -456,7 +456,7 @@ proc subscribeRequestCancelled*( proc subscribeRequestFailed*( market: OnChainMarket, callback: OnRequestFailed -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFailed subscription", msg = eventErr.msg @@ -470,7 +470,7 @@ proc subscribeRequestFailed*( proc subscribeRequestFailed*( market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFailed subscription", msg = eventErr.msg @@ -485,7 +485,7 @@ proc subscribeRequestFailed*( proc subscribeProofSubmission*( market: OnChainMarket, callback: OnProofSubmitted -): Future[MarketSubscription] {.async.} = +): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in ProofSubmitted subscription", msg = eventErr.msg @@ -497,18 +497,18 @@ proc subscribeProofSubmission*( let subscription = await market.contract.subscribe(ProofSubmitted, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) -proc unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = +proc unsubscribe*(subscription: OnChainMarketSubscription) {.async: (raises: [CancelledError]).} = await subscription.eventSubscription.unsubscribe() proc queryPastSlotFilledEvents*( market: OnChainMarket, fromBlock: BlockTag -): Future[seq[SlotFilled]] {.async.} = +): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = convertEthersError: return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest) proc queryPastSlotFilledEvents*( market: OnChainMarket, blocksAgo: int -): Future[seq[SlotFilled]] {.async.} = +): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = convertEthersError: let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) @@ -516,21 +516,21 @@ proc queryPastSlotFilledEvents*( proc queryPastSlotFilledEvents*( market: OnChainMarket, fromTime: int64 -): Future[seq[SlotFilled]] {.async.} = +): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = convertEthersError: let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock)) proc queryPastStorageRequestedEvents*( market: OnChainMarket, fromBlock: BlockTag -): Future[seq[StorageRequested]] {.async.} = +): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = convertEthersError: return await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest) proc queryPastStorageRequestedEvents*( market: OnChainMarket, blocksAgo: int -): Future[seq[StorageRequested]] {.async.} = +): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = convertEthersError: let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) @@ -538,7 +538,7 @@ proc queryPastStorageRequestedEvents*( proc queryPastStorageRequestedEventsFromTime*( market: OnChainMarket, fromTime: int64 -): Future[seq[StorageRequested]] {.async.} = +): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = convertEthersError: let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index 76e293c..d98c337 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -33,10 +33,10 @@ type config*: Config events*: Events -proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} = +proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async: (raises: [CancelledError]).} = await sleepAsync(1.seconds) - proc worker(): Future[void] {.async.} = + proc worker(): Future[void] {.async: (raises: [CancelledError]).} = while s.status == ApplicationStatus.Running: if err =? (await step()).errorOption: error "Failure-result caught in main loop. Stopping...", err = err.msg diff --git a/codexcrawler/utils/asyncdataevent.nim b/codexcrawler/utils/asyncdataevent.nim index e847c7d..aba1020 100644 --- a/codexcrawler/utils/asyncdataevent.nim +++ b/codexcrawler/utils/asyncdataevent.nim @@ -24,7 +24,7 @@ proc newAsyncDataEvent*[T](): AsyncDataEvent[T] = proc performUnsubscribe[T]( event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription -) {.async.} = +) {.async: (raises: [CancelledError]).} = if subscription in event.subscriptions: await subscription.listenFuture.cancelAndWait() event.subscriptions.delete(event.subscriptions.find(subscription)) @@ -40,7 +40,7 @@ proc subscribe*[T]( delayedUnsubscribe: false, ) - proc listener() {.async.} = + proc listener() {.async: (raises: [CancelledError]).} = while true: let items = await event.queue.waitEvents(subscription.key) for item in items: @@ -81,13 +81,13 @@ proc fire*[T]( proc unsubscribe*[T]( event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription -) {.async.} = +) {.async: (raises: [CancelledError]).} = if subscription.inHandler: subscription.delayedUnsubscribe = true else: await event.performUnsubscribe(subscription) -proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} = +proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async: (raises: [CancelledError]).} = let all = event.subscriptions for subscription in all: await event.unsubscribe(subscription)