it builds

This commit is contained in:
thatben 2025-06-02 15:00:32 +02:00
parent 230b9b8fb2
commit c374dfc844
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
10 changed files with 111 additions and 82 deletions

View File

@ -3,17 +3,17 @@ import pkg/questionable/results
type Component* = ref object of RootObj type Component* = ref object of RootObj
method awake*(c: Component): Future[?!void] {.async, base.} = method awake*(c: Component): Future[?!void] {.async: (raises: [CancelledError]), base.} =
# Awake is called on all components in an unspecified order. # Awake is called on all components in an unspecified order.
# Use this method to subscribe/connect to other components. # Use this method to subscribe/connect to other components.
return success() return success()
method start*(c: Component): Future[?!void] {.async, base.} = method start*(c: Component): Future[?!void] {.async: (raises: [CancelledError]), base.} =
# Start is called on all components in an unspecified order. # Start is called on all components in an unspecified order.
# Is is guaranteed that all components have already successfulled handled 'awake'. # Is is guaranteed that all components have already successfulled handled 'awake'.
# Use this method to begin the work of this component. # Use this method to begin the work of this component.
return success() return success()
method stop*(c: Component): Future[?!void] {.async, base.} = method stop*(c: Component): Future[?!void] {.async: (raises: [CancelledError]), base.} =
# Use this method to stop, unsubscribe, and clean up any resources. # Use this method to stop, unsubscribe, and clean up any resources.
return success() return success()

View File

@ -53,7 +53,10 @@ method start*(c: DhtCrawler): Future[?!void] {.async: (raises: [CancelledError])
await c.step() await c.step()
if c.state.config.dhtEnable: if c.state.config.dhtEnable:
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds) try:
await c.state.whileRunning(onStep, c.state.config.stepDelayMs.milliseconds)
except CatchableError as err:
return failure(err.msg)
return success() return success()

View File

@ -70,11 +70,14 @@ proc encode*(e: NodeEntry): seq[byte] =
e.toBytes() e.toBytes()
proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T = proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1: try:
return success( if bytes.len < 1:
NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64) return success(
) NodeEntry(id: Nid.fromStr("0"), lastVisit: 0.uint64, firstInactive: 0.uint64)
return NodeEntry.fromBytes(bytes) )
return NodeEntry.fromBytes(bytes)
except ValueError as err:
return failure(err.msg)
proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async: (raises: [CancelledError]).} = proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async: (raises: [CancelledError]).} =
without key =? Key.init(nodestoreName / $nid), err: without key =? Key.init(nodestoreName / $nid), err:

View File

@ -51,9 +51,12 @@ proc encode*(e: RequestEntry): seq[byte] =
e.toBytes() e.toBytes()
proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T = proc decode*(T: type RequestEntry, bytes: seq[byte]): ?!T =
if bytes.len < 1: try:
return success(RequestEntry(isValid: false)) if bytes.len < 1:
return RequestEntry.fromBytes(bytes) return success(RequestEntry(isValid: false))
return RequestEntry.fromBytes(bytes)
except ValueError as err:
return failure(err.msg)
method add*(s: RequestStore, rid: Rid): Future[?!void] {.async: (raises: []), base.} = method add*(s: RequestStore, rid: Rid): Future[?!void] {.async: (raises: []), base.} =
without key =? Key.init(requeststoreName / $rid), err: without key =? Key.init(requeststoreName / $rid), err:

View File

@ -28,9 +28,12 @@ proc encode(s: Nid): seq[byte] =
s.toBytes() s.toBytes()
proc decode(T: type Nid, bytes: seq[byte]): ?!T = proc decode(T: type Nid, bytes: seq[byte]): ?!T =
if bytes.len < 1: try:
return success(Nid.fromStr("0")) if bytes.len < 1:
return Nid.fromBytes(bytes) return success(Nid.fromStr("0"))
return Nid.fromBytes(bytes)
except ValueError as err:
return failure(err.msg)
proc saveItem(this: List, item: Nid): Future[?!void] {.async: (raises: [CancelledError]).} = proc saveItem(this: List, item: Nid): Future[?!void] {.async: (raises: [CancelledError]).} =
without itemKey =? Key.init(this.name / $item), err: without itemKey =? Key.init(this.name / $item), err:
@ -38,7 +41,7 @@ proc saveItem(this: List, item: Nid): Future[?!void] {.async: (raises: [Cancelle
?await this.store.put(itemKey, item) ?await this.store.put(itemKey, item)
return success() return success()
method load*(this: List): Future[?!void] {.async, base.} = method load*(this: List): Future[?!void] {.async: (raises: [CancelledError]), base.} =
without queryKey =? Key.init(this.name), err: without queryKey =? Key.init(this.name), err:
return failure(err) return failure(err)
without iter =? (await query[Nid](this.store, Query.init(queryKey))), err: without iter =? (await query[Nid](this.store, Query.init(queryKey))), err:
@ -58,7 +61,7 @@ method load*(this: List): Future[?!void] {.async, base.} =
proc contains*(this: List, nid: Nid): bool = proc contains*(this: List, nid: Nid): bool =
this.items.anyIt(it == nid) this.items.anyIt(it == nid)
method add*(this: List, nid: Nid): Future[?!void] {.async, base.} = method add*(this: List, nid: Nid): Future[?!void] {.async: (raises: [CancelledError]), base.} =
if this.contains(nid): if this.contains(nid):
return success() return success()
@ -69,7 +72,7 @@ method add*(this: List, nid: Nid): Future[?!void] {.async, base.} =
return success() return success()
method remove*(this: List, nid: Nid): Future[?!void] {.async, base.} = method remove*(this: List, nid: Nid): Future[?!void] {.async: (raises: [CancelledError]), base.} =
if not this.contains(nid): if not this.contains(nid):
return success() return success()

View File

@ -113,12 +113,18 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) =
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR") d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
method start*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} = method start*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} =
d.protocol.open() try:
await d.protocol.start() d.protocol.open()
await d.protocol.start()
except CatchableError as exc:
return failure(exc.msg)
return success() return success()
method stop*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} = method stop*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.protocol.closeWait() try:
await d.protocol.closeWait()
except CatchableError as exc:
return failure(exc.msg)
return success() return success()
proc new( proc new(

View File

@ -110,13 +110,16 @@ method getRequestInfo*(
notStarted() notStarted()
method awake*(m: MarketplaceService): Future[?!void] {.async: (raises: [CancelledError]).} = method awake*(m: MarketplaceService): Future[?!void] {.async: (raises: [CancelledError]).} =
let provider = JsonRpcProvider.new(m.state.config.ethProvider) try:
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress): let provider = JsonRpcProvider.new(m.state.config.ethProvider)
return failure("Invalid MarketplaceAddress provided") without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
return failure("Invalid MarketplaceAddress provided")
let marketplace = Marketplace.new(marketplaceAddress, provider) let marketplace = Marketplace.new(marketplaceAddress, provider)
m.market = some(OnChainMarket.new(marketplace)) m.market = some(OnChainMarket.new(marketplace))
return success() return success()
except JsonRpcProviderError as err:
return failure(err.msg)
proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService = proc new(T: type MarketplaceService, state: State, clock: Clock): MarketplaceService =
return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock) return MarketplaceService(state: state, market: none(OnChainMarket), clock: clock)

View File

@ -89,6 +89,8 @@ template convertEthersError(body) =
body body
except EthersError as error: except EthersError as error:
raiseMarketError(error.msgDetail) raiseMarketError(error.msgDetail)
except CatchableError as error:
raiseMarketError(error.msg)
proc loadConfig( proc loadConfig(
market: OnChainMarket market: OnChainMarket
@ -138,43 +140,43 @@ proc periodicity*(
let period = config.proofs.period let period = config.proofs.period
return Periodicity(seconds: period) return Periodicity(seconds: period)
proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} = proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let config = await market.config() let config = await market.config()
return config.proofs.timeout return config.proofs.timeout
proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} = proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let config = await market.config() let config = await market.config()
return config.collateral.repairRewardPercentage return config.collateral.repairRewardPercentage
proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} = proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let config = await market.config() let config = await market.config()
return config.requestDurationLimit return config.requestDurationLimit
proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} = proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let config = await market.config() let config = await market.config()
return config.proofs.downtime return config.proofs.downtime
proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async: (raises: [CancelledError]).} = proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending) let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.getPointer(slotId, overrides) return await market.contract.getPointer(slotId, overrides)
proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async: (raises: [CancelledError]).} = proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
return await market.contract.myRequests return await market.contract.myRequests
proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async: (raises: [CancelledError]).} = proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let slots = await market.contract.mySlots() let slots = await market.contract.mySlots()
debug "Fetched my slots", numSlots = len(slots) debug "Fetched my slots", numSlots = len(slots)
return slots return slots
proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (raises: [CancelledError]).} = proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
debug "Requesting storage" debug "Requesting storage"
await market.approveFunds(request.totalPrice()) await market.approveFunds(request.totalPrice())
@ -182,7 +184,7 @@ proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (ra
proc getRequest*( proc getRequest*(
market: OnChainMarket, id: RequestId market: OnChainMarket, id: RequestId
): Future[?StorageRequest] {.async: (raises: [CancelledError]).} = ): Future[?StorageRequest] {.async: (raises: [CancelledError, MarketError]).} =
let key = $id let key = $id
convertEthersError: convertEthersError:
@ -194,7 +196,7 @@ proc getRequest*(
proc requestState*( proc requestState*(
market: OnChainMarket, requestId: RequestId market: OnChainMarket, requestId: RequestId
): Future[?RequestState] {.async: (raises: [CancelledError]).} = ): Future[?RequestState] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
try: try:
let overrides = CallOverrides(blockTag: some BlockTag.pending) let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -202,22 +204,22 @@ proc requestState*(
except Marketplace_UnknownRequest: except Marketplace_UnknownRequest:
return none RequestState return none RequestState
proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async: (raises: [CancelledError]).} = proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending) let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.slotState(slotId, overrides) return await market.contract.slotState(slotId, overrides)
proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} = proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
return (await market.contract.requestEnd(id)).uint64 return (await market.contract.requestEnd(id)).uint64
proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} = proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
return (await market.contract.requestExpiry(id)).uint64 return (await market.contract.requestExpiry(id)).uint64
proc getHost( proc getHost(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64 market: OnChainMarket, requestId: RequestId, slotIndex: uint64
): Future[?Address] {.async: (raises: [CancelledError]).} = ): Future[?Address] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let slotId = slotId(requestId, slotIndex) let slotId = slotId(requestId, slotIndex)
let address = await market.contract.getHost(slotId) let address = await market.contract.getHost(slotId)
@ -228,11 +230,11 @@ proc getHost(
proc currentCollateral*( proc currentCollateral*(
market: OnChainMarket, slotId: SlotId market: OnChainMarket, slotId: SlotId
): Future[UInt256] {.async: (raises: [CancelledError]).} = ): Future[UInt256] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
return await market.contract.currentCollateral(slotId) return await market.contract.currentCollateral(slotId)
proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async: (raises: [CancelledError]).} = proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
try: try:
return some await market.contract.getActiveSlot(slotId) return some await market.contract.getActiveSlot(slotId)
@ -245,7 +247,7 @@ proc fillSlot(
slotIndex: uint64, slotIndex: uint64,
proof: Groth16Proof, proof: Groth16Proof,
collateral: UInt256, collateral: UInt256,
) {.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
logScope: logScope:
requestId requestId
@ -259,11 +261,11 @@ proc fillSlot(
proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async: (raises: [CancelledError]).} = proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async: (raises: [CancelledError]).} =
raiseAssert("Not supported") raiseAssert("Not supported")
proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async: (raises: [CancelledError]).} = proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
discard await market.contract.withdrawFunds(requestId).confirm(1) discard await market.contract.withdrawFunds(requestId).confirm(1)
proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} = proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
try: try:
let overrides = CallOverrides(blockTag: some BlockTag.pending) let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -271,7 +273,7 @@ proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async:
except Marketplace_SlotIsFree: except Marketplace_SlotIsFree:
return false return false
proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} = proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
try: try:
let overrides = CallOverrides(blockTag: some BlockTag.pending) let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -281,16 +283,16 @@ proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.asy
proc getChallenge*( proc getChallenge*(
market: OnChainMarket, id: SlotId market: OnChainMarket, id: SlotId
): Future[ProofChallenge] {.async: (raises: [CancelledError]).} = ): Future[ProofChallenge] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending) let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.getChallenge(id, overrides) return await market.contract.getChallenge(id, overrides)
proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async: (raises: [CancelledError]).} = proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
discard await market.contract.submitProof(id, proof).confirm(1) discard await market.contract.submitProof(id, proof).confirm(1)
proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async: (raises: [CancelledError]).} = proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
discard await market.contract.markProofAsMissing(id, period).confirm(1) discard await market.contract.markProofAsMissing(id, period).confirm(1)
@ -309,7 +311,7 @@ proc canProofBeMarkedAsMissing*(
proc reserveSlot*( proc reserveSlot*(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64 market: OnChainMarket, requestId: RequestId, slotIndex: uint64
) {.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
discard await market.contract discard await market.contract
.reserveSlot( .reserveSlot(
@ -322,13 +324,13 @@ proc reserveSlot*(
proc canReserveSlot*( proc canReserveSlot*(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64 market: OnChainMarket, requestId: RequestId, slotIndex: uint64
): Future[bool] {.async: (raises: [CancelledError]).} = ): Future[bool] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
return await market.contract.canReserveSlot(requestId, slotIndex) return await market.contract.canReserveSlot(requestId, slotIndex)
proc subscribeRequests*( proc subscribeRequests*(
market: OnChainMarket, callback: OnRequest market: OnChainMarket, callback: OnRequest
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} = proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in Request subscription", msg = eventErr.msg error "There was an error in Request subscription", msg = eventErr.msg
@ -342,7 +344,7 @@ proc subscribeRequests*(
proc subscribeSlotFilled*( proc subscribeSlotFilled*(
market: OnChainMarket, callback: OnSlotFilled market: OnChainMarket, callback: OnSlotFilled
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} = proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in SlotFilled subscription", msg = eventErr.msg error "There was an error in SlotFilled subscription", msg = eventErr.msg
@ -359,7 +361,7 @@ proc subscribeSlotFilled*(
requestId: RequestId, requestId: RequestId,
slotIndex: uint64, slotIndex: uint64,
callback: OnSlotFilled, callback: OnSlotFilled,
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) = proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) =
if eventRequestId == requestId and eventSlotIndex == slotIndex: if eventRequestId == requestId and eventSlotIndex == slotIndex:
callback(requestId, slotIndex) callback(requestId, slotIndex)
@ -369,7 +371,7 @@ proc subscribeSlotFilled*(
proc subscribeSlotFreed*( proc subscribeSlotFreed*(
market: OnChainMarket, callback: OnSlotFreed market: OnChainMarket, callback: OnSlotFreed
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} = proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in SlotFreed subscription", msg = eventErr.msg error "There was an error in SlotFreed subscription", msg = eventErr.msg
@ -383,7 +385,7 @@ proc subscribeSlotFreed*(
proc subscribeSlotReservationsFull*( proc subscribeSlotReservationsFull*(
market: OnChainMarket, callback: OnSlotReservationsFull market: OnChainMarket, callback: OnSlotReservationsFull
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} = proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in SlotReservationsFull subscription", error "There was an error in SlotReservationsFull subscription",
@ -398,7 +400,7 @@ proc subscribeSlotReservationsFull*(
proc subscribeFulfillment( proc subscribeFulfillment(
market: OnChainMarket, callback: OnFulfillment market: OnChainMarket, callback: OnFulfillment
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
@ -412,7 +414,7 @@ proc subscribeFulfillment(
proc subscribeFulfillment( proc subscribeFulfillment(
market: OnChainMarket, requestId: RequestId, callback: OnFulfillment market: OnChainMarket, requestId: RequestId, callback: OnFulfillment
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} = proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
@ -427,7 +429,7 @@ proc subscribeFulfillment(
proc subscribeRequestCancelled*( proc subscribeRequestCancelled*(
market: OnChainMarket, callback: OnRequestCancelled market: OnChainMarket, callback: OnRequestCancelled
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in RequestCancelled subscription", msg = eventErr.msg error "There was an error in RequestCancelled subscription", msg = eventErr.msg
@ -441,7 +443,7 @@ proc subscribeRequestCancelled*(
proc subscribeRequestCancelled*( proc subscribeRequestCancelled*(
market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} = proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in RequestCancelled subscription", msg = eventErr.msg error "There was an error in RequestCancelled subscription", msg = eventErr.msg
@ -456,7 +458,7 @@ proc subscribeRequestCancelled*(
proc subscribeRequestFailed*( proc subscribeRequestFailed*(
market: OnChainMarket, callback: OnRequestFailed market: OnChainMarket, callback: OnRequestFailed
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in RequestFailed subscription", msg = eventErr.msg error "There was an error in RequestFailed subscription", msg = eventErr.msg
@ -470,7 +472,7 @@ proc subscribeRequestFailed*(
proc subscribeRequestFailed*( proc subscribeRequestFailed*(
market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} = proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in RequestFailed subscription", msg = eventErr.msg error "There was an error in RequestFailed subscription", msg = eventErr.msg
@ -485,7 +487,7 @@ proc subscribeRequestFailed*(
proc subscribeProofSubmission*( proc subscribeProofSubmission*(
market: OnChainMarket, callback: OnProofSubmitted market: OnChainMarket, callback: OnProofSubmitted
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} = ): Future[MarketSubscription] {.async: (raises: [CancelledError, MarketError]).} =
proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} = proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} =
without event =? eventResult, eventErr: without event =? eventResult, eventErr:
error "There was an error in ProofSubmitted subscription", msg = eventErr.msg error "There was an error in ProofSubmitted subscription", msg = eventErr.msg
@ -497,18 +499,21 @@ proc subscribeProofSubmission*(
let subscription = await market.contract.subscribe(ProofSubmitted, onEvent) let subscription = await market.contract.subscribe(ProofSubmitted, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription) return OnChainMarketSubscription(eventSubscription: subscription)
proc unsubscribe*(subscription: OnChainMarketSubscription) {.async: (raises: [CancelledError]).} = proc unsubscribe*(subscription: OnChainMarketSubscription) {.async: (raises: [CancelledError, MarketError]).} =
await subscription.eventSubscription.unsubscribe() try:
await subscription.eventSubscription.unsubscribe()
except ProviderError as err:
raiseMarketError(err.msg)
proc queryPastSlotFilledEvents*( proc queryPastSlotFilledEvents*(
market: OnChainMarket, fromBlock: BlockTag market: OnChainMarket, fromBlock: BlockTag
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = ): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest) return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest)
proc queryPastSlotFilledEvents*( proc queryPastSlotFilledEvents*(
market: OnChainMarket, blocksAgo: int market: OnChainMarket, blocksAgo: int
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = ): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
@ -516,21 +521,21 @@ proc queryPastSlotFilledEvents*(
proc queryPastSlotFilledEvents*( proc queryPastSlotFilledEvents*(
market: OnChainMarket, fromTime: int64 market: OnChainMarket, fromTime: int64
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} = ): Future[seq[SlotFilled]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)
return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock)) return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock))
proc queryPastStorageRequestedEvents*( proc queryPastStorageRequestedEvents*(
market: OnChainMarket, fromBlock: BlockTag market: OnChainMarket, fromBlock: BlockTag
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = ): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
return return
await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest) await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest)
proc queryPastStorageRequestedEvents*( proc queryPastStorageRequestedEvents*(
market: OnChainMarket, blocksAgo: int market: OnChainMarket, blocksAgo: int
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = ): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo) let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
@ -538,7 +543,7 @@ proc queryPastStorageRequestedEvents*(
proc queryPastStorageRequestedEventsFromTime*( proc queryPastStorageRequestedEventsFromTime*(
market: OnChainMarket, fromTime: int64 market: OnChainMarket, fromTime: int64
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} = ): Future[seq[StorageRequested]] {.async: (raises: [CancelledError, MarketError]).} =
convertEthersError: convertEthersError:
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime) let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)

View File

@ -45,7 +45,7 @@ proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async: (raise
asyncSpawn worker() asyncSpawn worker()
method whileRunning*(s: State, step: OnStep, delay: Duration) {.async, base.} = method whileRunning*(s: State, step: OnStep, delay: Duration) {.async: (raises: []), base.} =
# We use a small delay before starting the workers because 'whileRunning' is likely called from # We use a small delay before starting the workers because 'whileRunning' is likely called from
# component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling). # component 'start' methods, which are executed sequentially in arbitrary order (to prevent temporal coupling).
# Worker steps might start raising events that other components haven't had time to subscribe to yet. # Worker steps might start raising events that other components haven't had time to subscribe to yet.

View File

@ -15,7 +15,7 @@ type
queue: AsyncEventQueue[?T] queue: AsyncEventQueue[?T]
subscriptions: seq[AsyncDataEventSubscription] subscriptions: seq[AsyncDataEventSubscription]
AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] AsyncDataEventHandler*[T] = proc(data: T): Future[?!void] {.gcsafe, async: (raises: [CancelledError]).}
proc newAsyncDataEvent*[T](): AsyncDataEvent[T] = proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
AsyncDataEvent[T]( AsyncDataEvent[T](
@ -41,14 +41,17 @@ proc subscribe*[T](
) )
proc listener() {.async: (raises: [CancelledError]).} = proc listener() {.async: (raises: [CancelledError]).} =
while true: try:
let items = await event.queue.waitEvents(subscription.key) while true:
for item in items: let items = await event.queue.waitEvents(subscription.key)
if data =? item: for item in items:
subscription.inHandler = true if data =? item:
subscription.lastResult = (await handler(data)) subscription.inHandler = true
subscription.inHandler = false subscription.lastResult = (await handler(data))
subscription.fireEvent.fire() subscription.inHandler = false
subscription.fireEvent.fire()
except AsyncEventQueueFullError as err:
raiseAssert("AsyncEventQueueFullError in asyncdataevent.listener()")
subscription.listenFuture = listener() subscription.listenFuture = listener()