From c374dfc84425c6007d2be6d5df68b328114824a9 Mon Sep 17 00:00:00 2001 From: thatben Date: Mon, 2 Jun 2025 15:00:32 +0200 Subject: [PATCH] it builds --- codexcrawler/component.nim | 6 +- codexcrawler/components/dhtcrawler.nim | 5 +- codexcrawler/components/nodestore.nim | 13 +-- codexcrawler/components/requeststore.nim | 9 +- codexcrawler/list.nim | 15 ++-- codexcrawler/services/dht.nim | 12 ++- codexcrawler/services/marketplace.nim | 15 ++-- codexcrawler/services/marketplace/market.nim | 95 ++++++++++---------- codexcrawler/state.nim | 2 +- codexcrawler/utils/asyncdataevent.nim | 21 +++-- 10 files changed, 111 insertions(+), 82 deletions(-) diff --git a/codexcrawler/component.nim b/codexcrawler/component.nim index ce1517b..623e757 100644 --- a/codexcrawler/component.nim +++ b/codexcrawler/component.nim @@ -3,17 +3,17 @@ import pkg/questionable/results type Component* = ref object of RootObj -method awake*(c: Component): Future[?!void] {.async, base.} = +method awake*(c: Component): Future[?!void] {.async: (raises: [CancelledError]), base.} = # Awake is called on all components in an unspecified order. # Use this method to subscribe/connect to other components. return success() -method start*(c: Component): Future[?!void] {.async, base.} = +method start*(c: Component): Future[?!void] {.async: (raises: [CancelledError]), base.} = # Start is called on all components in an unspecified order. # Is is guaranteed that all components have already successfulled handled 'awake'. # Use this method to begin the work of this component. return success() -method stop*(c: Component): Future[?!void] {.async, base.} = +method stop*(c: Component): Future[?!void] {.async: (raises: [CancelledError]), base.} = # Use this method to stop, unsubscribe, and clean up any resources. return success() diff --git a/codexcrawler/components/dhtcrawler.nim b/codexcrawler/components/dhtcrawler.nim index fa80170..7b8846e 100644 --- a/codexcrawler/components/dhtcrawler.nim +++ b/codexcrawler/components/dhtcrawler.nim @@ -53,7 +53,10 @@ method start*(c: DhtCrawler): Future[?!void] {.async: (raises: [CancelledError]) await c.step() if c.state.config.dhtEnable: - await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) + try: + await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) + except CatchableError as err: + return failure(err.msg) return success() diff --git a/codexcrawler/components/nodestore.nim b/codexcrawler/components/nodestore.nim index f10ff85..7d68fbe 100644 --- a/codexcrawler/components/nodestore.nim +++ b/codexcrawler/components/nodestore.nim @@ -70,11 +70,14 @@ proc encode*(e: NodeEntry): seq[byte] = e.toBytes() proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T = - if bytes.len < 1: - return success( - NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64) - ) - return NodeEntry.fromBytes(bytes) + try: + if bytes.len < 1: + return success( + NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64) + ) + return NodeEntry.fromBytes(bytes) + except ValueError as err: + return failure(err.msg) proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async: (raises: [CancelledError]).} = without key =? Key.init(nodestoreName / $nid), err: diff --git a/codexcrawler/components/requeststore.nim b/codexcrawler/components/requeststore.nim index 836e1fb..6658087 100644 --- a/codexcrawler/components/requeststore.nim +++ b/codexcrawler/components/requeststore.nim @@ -51,9 +51,12 @@ proc encode*(e: RequestEntry): seq[byte] = e.toBytes() proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T = - if bytes.len < 1: - return success(RequestEntry(isValid: false)) - return RequestEntry.fromBytes(bytes) + try: + if bytes.len < 1: + return success(RequestEntry(isValid: false)) + return RequestEntry.fromBytes(bytes) + except ValueError as err: + return failure(err.msg) method add*(s: RequestStore, rid: Rid): Future[?!void] {.async: (raises: []), base.} = without key =? Key.init(requeststoreName / $rid), err: diff --git a/codexcrawler/list.nim b/codexcrawler/list.nim index 7ead86c..d19ebdf 100644 --- a/codexcrawler/list.nim +++ b/codexcrawler/list.nim @@ -28,9 +28,12 @@ proc encode(s: Nid): seq[byte] = s.toBytes() proc decode(T: type Nid, bytes: seq[byte]): ?!T = - if bytes.len < 1: - return success(Nid.fromStr("0")) - return Nid.fromBytes(bytes) + try: + if bytes.len < 1: + return success(Nid.fromStr("0")) + return Nid.fromBytes(bytes) + except ValueError as err: + return failure(err.msg) proc saveItem(this: List, item: Nid): Future[?!void] {.async: (raises: [CancelledError]).} = without itemKey =? Key.init(this.name / $item), err: @@ -38,7 +41,7 @@ proc saveItem(this: List, item: Nid): Future[?!void] {.async: (raises: [Cancelle ?await this.store.put(itemKey, item) return success() -method load*(this: List): Future[?!void] {.async, base.} = +method load*(this: List): Future[?!void] {.async: (raises: [CancelledError]), base.} = without queryKey =? Key.init(this.name), err: return failure(err) without iter =? (await query[Nid](this.store, Query.init(queryKey))), err: @@ -58,7 +61,7 @@ method load*(this: List): Future[?!void] {.async, base.} = proc contains*(this: List, nid: Nid): bool = this.items.anyIt(it == nid) -method add*(this: List, nid: Nid): Future[?!void] {.async, base.} = +method add*(this: List, nid: Nid): Future[?!void] {.async: (raises: [CancelledError]), base.} = if this.contains(nid): return success() @@ -69,7 +72,7 @@ method add*(this: List, nid: Nid): Future[?!void] {.async, base.} = return success() -method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} = +method remove*(this: List, nid: Nid): Future[?!void] {.async: (raises: [CancelledError]), base.} = if not this.contains(nid): return success() diff --git a/codexcrawler/services/dht.nim b/codexcrawler/services/dht.nim index e5068db..254f3a3 100644 --- a/codexcrawler/services/dht.nim +++ b/codexcrawler/services/dht.nim @@ -113,12 +113,18 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) = d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") method start*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} = - d.protocol.open() - await d.protocol.start() + try: + d.protocol.open() + await d.protocol.start() + except CatchableError as exc: + return failure(exc.msg) return success() method stop*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} = - await d.protocol.closeWait() + try: + await d.protocol.closeWait() + except CatchableError as exc: + return failure(exc.msg) return success() proc new( diff --git a/codexcrawler/services/marketplace.nim b/codexcrawler/services/marketplace.nim index 4c69fda..fd2c4f3 100644 --- a/codexcrawler/services/marketplace.nim +++ b/codexcrawler/services/marketplace.nim @@ -110,13 +110,16 @@ method getRequestInfo*( notStarted() method awake*(m: MarketplaceService): Future[?!void] {.async: (raises: [CancelledError]).} = - let provider = JsonRpcProvider.new(m.state.config.ethProvider) - without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): - return failure("Invalid MarketplaceAddress provided") + try: + let provider = JsonRpcProvider.new(m.state.config.ethProvider) + without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): + return failure("Invalid MarketplaceAddress provided") - let marketplace = Marketplace.new(marketplaceAddress, provider) - m.market = some(OnChainMarket.new(marketplace)) - return success() + let marketplace = Marketplace.new(marketplaceAddress, provider) + m.market = some(OnChainMarket.new(marketplace)) + return success() + except JsonRpcProviderError as err: + return failure(err.msg) proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService = return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock) diff --git a/codexcrawler/services/marketplace/market.nim b/codexcrawler/services/marketplace/market.nim index 64fa8c2..4dfc501 100644 --- a/codexcrawler/services/marketplace/market.nim +++ b/codexcrawler/services/marketplace/market.nim @@ -89,6 +89,8 @@ template convertEthersError(body) = body except EthersError as error: raiseMarketError(error.msgDetail) + except CatchableError as error: + raiseMarketError(error.msg) proc loadConfig( market: OnChainMarket @@ -138,43 +140,43 @@ proc periodicity*( let period = config.proofs.period return Periodicity(seconds: period) -proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} = +proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() return config.proofs.timeout -proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} = +proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() return config.collateral.repairRewardPercentage -proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} = +proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() return config.requestDurationLimit -proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} = +proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let config = await market.config() return config.proofs.downtime -proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async: (raises: [CancelledError]).} = +proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let overrides = CallOverrides(blockTag: some BlockTag.pending) return await market.contract.getPointer(slotId, overrides) -proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async: (raises: [CancelledError]).} = +proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: return await market.contract.myRequests -proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async: (raises: [CancelledError]).} = +proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let slots = await market.contract.mySlots() debug "Fetched my slots", numSlots = len(slots) return slots -proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (raises: [CancelledError]).} = +proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: debug "Requesting storage" await market.approveFunds(request.totalPrice()) @@ -182,7 +184,7 @@ proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (ra proc getRequest*( market: OnChainMarket, id: RequestId -): Future[?StorageRequest] {.async: (raises: [CancelledError]).} = +): Future[?StorageRequest] {.async: (raises: [CancelledError, MarketError]).} = let key = $id convertEthersError: @@ -194,7 +196,7 @@ proc getRequest*( proc requestState*( market: OnChainMarket, requestId: RequestId -): Future[?RequestState] {.async: (raises: [CancelledError]).} = +): Future[?RequestState] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: try: let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -202,22 +204,22 @@ proc requestState*( except Marketplace_UnknownRequest: return none RequestState -proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async: (raises: [CancelledError]).} = +proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let overrides = CallOverrides(blockTag: some BlockTag.pending) return await market.contract.slotState(slotId, overrides) -proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} = +proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: return (await market.contract.requestEnd(id)).uint64 -proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} = +proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: return (await market.contract.requestExpiry(id)).uint64 proc getHost( market: OnChainMarket, requestId: RequestId, slotIndex: uint64 -): Future[?Address] {.async: (raises: [CancelledError]).} = +): Future[?Address] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let slotId = slotId(requestId, slotIndex) let address = await market.contract.getHost(slotId) @@ -228,11 +230,11 @@ proc getHost( proc currentCollateral*( market: OnChainMarket, slotId: SlotId -): Future[UInt256] {.async: (raises: [CancelledError]).} = +): Future[UInt256] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: return await market.contract.currentCollateral(slotId) -proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async: (raises: [CancelledError]).} = +proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: try: return some await market.contract.getActiveSlot(slotId) @@ -245,7 +247,7 @@ proc fillSlot( slotIndex: uint64, proof: Groth16Proof, collateral: UInt256, -) {.async: (raises: [CancelledError]).} = +) {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: logScope: requestId @@ -259,11 +261,11 @@ proc fillSlot( proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async: (raises: [CancelledError]).} = raiseAssert("Not supported") -proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async: (raises: [CancelledError]).} = +proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: discard await market.contract.withdrawFunds(requestId).confirm(1) -proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} = +proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: try: let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -271,7 +273,7 @@ proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: except Marketplace_SlotIsFree: return false -proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} = +proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: try: let overrides = CallOverrides(blockTag: some BlockTag.pending) @@ -281,16 +283,16 @@ proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.asy proc getChallenge*( market: OnChainMarket, id: SlotId -): Future[ProofChallenge] {.async: (raises: [CancelledError]).} = +): Future[ProofChallenge] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let overrides = CallOverrides(blockTag: some BlockTag.pending) return await market.contract.getChallenge(id, overrides) -proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async: (raises: [CancelledError]).} = +proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: discard await market.contract.submitProof(id, proof).confirm(1) -proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async: (raises: [CancelledError]).} = +proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: discard await market.contract.markProofAsMissing(id, period).confirm(1) @@ -309,7 +311,7 @@ proc canProofBeMarkedAsMissing*( proc reserveSlot*( market: OnChainMarket, requestId: RequestId, slotIndex: uint64 -) {.async: (raises: [CancelledError]).} = +) {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: discard await market.contract .reserveSlot( @@ -322,13 +324,13 @@ proc reserveSlot*( proc canReserveSlot*( market: OnChainMarket, requestId: RequestId, slotIndex: uint64 -): Future[bool] {.async: (raises: [CancelledError]).} = +): Future[bool] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: return await market.contract.canReserveSlot(requestId, slotIndex) proc subscribeRequests*( market: OnChainMarket, callback: OnRequest -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in Request subscription", msg = eventErr.msg @@ -342,7 +344,7 @@ proc subscribeRequests*( proc subscribeSlotFilled*( market: OnChainMarket, callback: OnSlotFilled -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in SlotFilled subscription", msg = eventErr.msg @@ -359,7 +361,7 @@ proc subscribeSlotFilled*( requestId: RequestId, slotIndex: uint64, callback: OnSlotFilled, -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) = if eventRequestId == requestId and eventSlotIndex == slotIndex: callback(requestId, slotIndex) @@ -369,7 +371,7 @@ proc subscribeSlotFilled*( proc subscribeSlotFreed*( market: OnChainMarket, callback: OnSlotFreed -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in SlotFreed subscription", msg = eventErr.msg @@ -383,7 +385,7 @@ proc subscribeSlotFreed*( proc subscribeSlotReservationsFull*( market: OnChainMarket, callback: OnSlotReservationsFull -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in SlotReservationsFull subscription", @@ -398,7 +400,7 @@ proc subscribeSlotReservationsFull*( proc subscribeFulfillment( market: OnChainMarket, callback: OnFulfillment -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFulfillment subscription", msg = eventErr.msg @@ -412,7 +414,7 @@ proc subscribeFulfillment( proc subscribeFulfillment( market: OnChainMarket, requestId: RequestId, callback: OnFulfillment -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFulfillment subscription", msg = eventErr.msg @@ -427,7 +429,7 @@ proc subscribeFulfillment( proc subscribeRequestCancelled*( market: OnChainMarket, callback: OnRequestCancelled -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestCancelled subscription", msg = eventErr.msg @@ -441,7 +443,7 @@ proc subscribeRequestCancelled*( proc subscribeRequestCancelled*( market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestCancelled subscription", msg = eventErr.msg @@ -456,7 +458,7 @@ proc subscribeRequestCancelled*( proc subscribeRequestFailed*( market: OnChainMarket, callback: OnRequestFailed -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFailed subscription", msg = eventErr.msg @@ -470,7 +472,7 @@ proc subscribeRequestFailed*( proc subscribeRequestFailed*( market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in RequestFailed subscription", msg = eventErr.msg @@ -485,7 +487,7 @@ proc subscribeRequestFailed*( proc subscribeProofSubmission*( market: OnChainMarket, callback: OnProofSubmitted -): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = +): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} = proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} = without event =? eventResult, eventErr: error "There was an error in ProofSubmitted subscription", msg = eventErr.msg @@ -497,18 +499,21 @@ proc subscribeProofSubmission*( let subscription = await market.contract.subscribe(ProofSubmitted, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) -proc unsubscribe*(subscription: OnChainMarketSubscription) {.async: (raises: [CancelledError]).} = - await subscription.eventSubscription.unsubscribe() +proc unsubscribe*(subscription: OnChainMarketSubscription) {.async: (raises: [CancelledError, MarketError]).} = + try: + await subscription.eventSubscription.unsubscribe() + except ProviderError as err: + raiseMarketError(err.msg) proc queryPastSlotFilledEvents*( market: OnChainMarket, fromBlock: BlockTag -): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = +): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest) proc queryPastSlotFilledEvents*( market: OnChainMarket, blocksAgo: int -): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = +): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) @@ -516,21 +521,21 @@ proc queryPastSlotFilledEvents*( proc queryPastSlotFilledEvents*( market: OnChainMarket, fromTime: int64 -): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = +): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock)) proc queryPastStorageRequestedEvents*( market: OnChainMarket, fromBlock: BlockTag -): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = +): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: return await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest) proc queryPastStorageRequestedEvents*( market: OnChainMarket, blocksAgo: int -): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = +): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) @@ -538,7 +543,7 @@ proc queryPastStorageRequestedEvents*( proc queryPastStorageRequestedEventsFromTime*( market: OnChainMarket, fromTime: int64 -): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = +): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} = convertEthersError: let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) diff --git a/codexcrawler/state.nim b/codexcrawler/state.nim index d98c337..f29d6e1 100644 --- a/codexcrawler/state.nim +++ b/codexcrawler/state.nim @@ -45,7 +45,7 @@ proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async: (raise asyncSpawn worker() -method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = +method whileRunning*(s: State, step: OnStep, delay: Duration) {.async: (raises: []), base.} = # We use a small delay before starting the workers because 'whileRunning' is likely called from # component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling). # Worker steps might start raising events that other components haven't had time to subscribe to yet. diff --git a/codexcrawler/utils/asyncdataevent.nim b/codexcrawler/utils/asyncdataevent.nim index aba1020..823871f 100644 --- a/codexcrawler/utils/asyncdataevent.nim +++ b/codexcrawler/utils/asyncdataevent.nim @@ -15,7 +15,7 @@ type queue: AsyncEventQueue[?T] subscriptions: seq[AsyncDataEventSubscription] - AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] + AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).} proc newAsyncDataEvent*[T](): AsyncDataEvent[T] = AsyncDataEvent[T]( @@ -41,14 +41,17 @@ proc subscribe*[T]( ) proc listener() {.async: (raises: [CancelledError]).} = - while true: - let items = await event.queue.waitEvents(subscription.key) - for item in items: - if data =? item: - subscription.inHandler = true - subscription.lastResult = (await handler(data)) - subscription.inHandler = false - subscription.fireEvent.fire() + try: + while true: + let items = await event.queue.waitEvents(subscription.key) + for item in items: + if data =? item: + subscription.inHandler = true + subscription.lastResult = (await handler(data)) + subscription.inHandler = false + subscription.fireEvent.fire() + except AsyncEventQueueFullError as err: + raiseAssert("AsyncEventQueueFullError in asyncdataevent.listener()") subscription.listenFuture = listener()