This commit is contained in:
thatben 2025-06-02 14:30:28 +02:00
parent e06d03e40c
commit 230b9b8fb2
No known key found for this signature in database
GPG Key ID: 62C543548433D43E
16 changed files with 97 additions and 94 deletions

View File

@ -18,7 +18,7 @@ type Application* = ref object
state: State
components: seq[Component]
proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
proc initializeApp(app: Application, config: Config): Future[?!void] {.async: (raises: [CancelledError]).} =
app.state = State(
status: ApplicationStatus.Running,
config: config,
@ -48,7 +48,7 @@ proc initializeApp(app: Application, config: Config): Future[?!void] {.async.} =
return success()
proc stopComponents(app: Application) {.async.} =
proc stopComponents(app: Application) {.async: (raises: [CancelledError]).} =
for c in app.components:
if err =? (await c.stop()).errorOption:
error "Failed to stop component", err = err.msg

View File

@ -19,7 +19,7 @@ type ChainCrawler* = ref object of Component
proc onNewRequest(c: ChainCrawler, rid: Rid): Future[?!void] {.async: (raises: []).} =
return await c.store.add(rid)
method start*(c: ChainCrawler): Future[?!void] {.async.} =
method start*(c: ChainCrawler): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onRequest(rid: Rid): Future[?!void] {.async: (raises: []).} =

View File

@ -65,7 +65,7 @@ proc step(c: ChainMetrics): Future[?!void] {.async: (raises: []).} =
c.updateMetrics(update)
return success()
method start*(c: ChainMetrics): Future[?!void] {.async.} =
method start*(c: ChainMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =

View File

@ -46,7 +46,7 @@ proc step(c: DhtCrawler): Future[?!void] {.async: (raises: []).} =
return success()
method start*(c: DhtCrawler): Future[?!void] {.async.} =
method start*(c: DhtCrawler): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onStep(): Future[?!void] {.async: (raises: []), gcsafe.} =

View File

@ -27,7 +27,7 @@ proc updateMetrics(d: DhtMetrics) =
proc handleCheckEvent(
d: DhtMetrics, event: DhtNodeCheckEventData
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
if event.isOk:
?await d.ok.add(event.id)
?await d.nok.remove(event.id)
@ -38,31 +38,31 @@ proc handleCheckEvent(
d.updateMetrics()
return success()
proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async.} =
proc handleDeleteEvent(d: DhtMetrics, nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} =
for nid in nids:
?await d.ok.remove(nid)
?await d.nok.remove(nid)
d.updateMetrics()
return success()
method awake*(d: DhtMetrics): Future[?!void] {.async.} =
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
method awake*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.handleCheckEvent(event)
proc onDelete(nids: seq[Nid]): Future[?!void] {.async.} =
proc onDelete(nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.handleDeleteEvent(nids)
d.subCheck = d.state.events.dhtNodeCheck.subscribe(onCheck)
d.subDel = d.state.events.nodesDeleted.subscribe(onDelete)
return success()
method start*(d: DhtMetrics): Future[?!void] {.async.} =
method start*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
?await d.ok.load()
?await d.nok.load()
return success()
method stop*(d: DhtMetrics): Future[?!void] {.async.} =
method stop*(d: DhtMetrics): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.state.events.dhtNodeCheck.unsubscribe(d.subCheck)
await d.state.events.nodesDeleted.unsubscribe(d.subDel)
return success()

View File

@ -76,7 +76,7 @@ proc decode*(T: type NodeEntry, bytes: seq[byte]): ?!T =
)
return NodeEntry.fromBytes(bytes)
proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async: (raises: [CancelledError]).} =
without key =? Key.init(nodestoreName / $nid), err:
error "failed to format key", err = err.msg
return failure(err)
@ -91,7 +91,7 @@ proc storeNodeIsNew(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
return success(not exists)
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
proc fireNewNodesDiscovered(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} =
await s.state.events.newNodesDiscovered.fire(nids)
proc fireNodesDeleted(
@ -99,7 +99,7 @@ proc fireNodesDeleted(
): Future[?!void] {.async: (raises: []).} =
await s.state.events.nodesDeleted.fire(nids)
proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} =
var newNodes = newSeq[Nid]()
for nid in nids:
without isNew =? (await s.storeNodeIsNew(nid)), err:
@ -114,7 +114,7 @@ proc processFoundNodes(s: NodeStore, nids: seq[Nid]): Future[?!void] {.async.} =
proc processNodeCheck(
s: NodeStore, event: DhtNodeCheckEventData
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
without key =? Key.init(nodestoreName / $(event.id)), err:
error "failed to format key", err = err.msg
return failure(err)
@ -142,7 +142,7 @@ proc processNodeCheck(
?await s.store.put(key, entry)
return success()
proc deleteEntry(s: NodeStore, nid: Nid): Future[?!bool] {.async.} =
proc deleteEntry(s: NodeStore, nid: Nid): Future[?!bool] {.async: (raises: [CancelledError]).} =
without key =? Key.init(nodestoreName / $nid), err:
error "failed to format key", err = err.msg
return failure(err)
@ -202,20 +202,20 @@ method deleteEntries*(
?await s.fireNodesDeleted(deleted)
return success()
method start*(s: NodeStore): Future[?!void] {.async.} =
method start*(s: NodeStore): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNodesFound(nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} =
return await s.processFoundNodes(nids)
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async.} =
proc onCheck(event: DhtNodeCheckEventData): Future[?!void] {.async: (raises: [CancelledError]).} =
return await s.processNodeCheck(event)
s.subFound = s.state.events.nodesFound.subscribe(onNodesFound)
s.subCheck = s.state.events.dhtNodeCheck.subscribe(onCheck)
return success()
method stop*(s: NodeStore): Future[?!void] {.async.} =
method stop*(s: NodeStore): Future[?!void] {.async: (raises: [CancelledError]).} =
await s.state.events.nodesFound.unsubscribe(s.subFound)
await s.state.events.dhtNodeCheck.unsubscribe(s.subCheck)
return success()

View File

@ -56,7 +56,7 @@ proc raiseRoutingTableNodes(t: TimeTracker): Future[?!void] {.async: (raises: []
return failure(err)
return success()
method start*(t: TimeTracker): Future[?!void] {.async.} =
method start*(t: TimeTracker): Future[?!void] {.async: (raises: [CancelledError]).} =
info "starting..."
proc onCheckRevisitAndExpiry(): Future[?!void] {.async: (raises: []), gcsafe.} =

View File

@ -54,10 +54,10 @@ method pop*(t: TodoList): Future[?!Nid] {.async: (raises: []), base.} =
return success(item)
method awake*(t: TodoList): Future[?!void] {.async.} =
method awake*(t: TodoList): Future[?!void] {.async: (raises: [CancelledError]).} =
info "initializing..."
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async.} =
proc onNewNodes(nids: seq[Nid]): Future[?!void] {.async: (raises: [CancelledError]).} =
t.addNodes(nids)
return success()
@ -65,7 +65,7 @@ method awake*(t: TodoList): Future[?!void] {.async.} =
t.subRev = t.state.events.nodesToRevisit.subscribe(onNewNodes)
return success()
method stop*(t: TodoList): Future[?!void] {.async.} =
method stop*(t: TodoList): Future[?!void] {.async: (raises: [CancelledError]).} =
await t.state.events.newNodesDiscovered.unsubscribe(t.subNew)
await t.state.events.nodesToRevisit.unsubscribe(t.subRev)
return success()

View File

@ -18,7 +18,7 @@ import ./components/chainmetrics
import ./components/chaincrawler
import ./components/requeststore
proc createComponents*(state: State): Future[?!seq[Component]] {.async.} =
proc createComponents*(state: State): Future[?!seq[Component]] {.async: (raises: [CancelledError]).} =
var components: seq[Component] = newSeq[Component]()
let clock = createClock()

View File

@ -32,7 +32,7 @@ proc decode(T: type Nid, bytes: seq[byte]): ?!T =
return success(Nid.fromStr("0"))
return Nid.fromBytes(bytes)
proc saveItem(this: List, item: Nid): Future[?!void] {.async.} =
proc saveItem(this: List, item: Nid): Future[?!void] {.async: (raises: [CancelledError]).} =
without itemKey =? Key.init(this.name / $item), err:
return failure(err)
?await this.store.put(itemKey, item)

View File

@ -75,15 +75,18 @@ method getNeighbors*(
except CatchableError as exc:
return failure(exc.msg)
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async.} =
proc findPeer*(d: Dht, peerId: PeerId): Future[?PeerRecord] {.async: (raises: [CancelledError]).} =
trace "protocol.resolve..."
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
try:
let node = await d.protocol.resolve(toNodeId(peerId))
return
if node.isSome():
node.get().record.data.some
else:
PeerRecord.none
except CatchableError as exc:
error "CatchableError in protocol.resolve", err = exc.msg
return PeerRecord.none
method removeProvider*(d: Dht, peerId: PeerId): Future[void] {.base, gcsafe.} =
trace "Removing provider", peerId
@ -109,12 +112,12 @@ proc updateDhtRecord(d: Dht, addrs: openArray[MultiAddress]) =
if not d.protocol.isNil:
d.protocol.updateRecord(d.dhtRecord).expect("Should update SPR")
method start*(d: Dht): Future[?!void] {.async.} =
method start*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} =
d.protocol.open()
await d.protocol.start()
return success()
method stop*(d: Dht): Future[?!void] {.async.} =
method stop*(d: Dht): Future[?!void] {.async: (raises: [CancelledError]).} =
await d.protocol.closeWait()
return success()
@ -154,7 +157,7 @@ proc new(
self
proc createDht*(state: State): Future[?!Dht] {.async.} =
proc createDht*(state: State): Future[?!Dht] {.async: (raises: [CancelledError]).} =
without dhtStore =? createDatastore(state.config.dataDir / "dht"), err:
return failure(err)
let keyPath = state.config.dataDir / "privatekey"

View File

@ -50,7 +50,7 @@ proc fetchRequestInfo(
method subscribeToNewRequests*(
m: MarketplaceService, onNewRequest: OnNewRequest
): Future[?!void] {.async: (raises: []), base.} =
proc resultWrapper(rid: Rid): Future[void] {.async.} =
proc resultWrapper(rid: Rid): Future[void] {.async: (raises: [CancelledError]).} =
let response = await onNewRequest(rid)
if error =? response.errorOption:
raiseAssert("Error result in handling of onNewRequest callback: " & error.msg)
@ -109,7 +109,7 @@ method getRequestInfo*(
else:
notStarted()
method awake*(m: MarketplaceService): Future[?!void] {.async.} =
method awake*(m: MarketplaceService): Future[?!void] {.async: (raises: [CancelledError]).} =
let provider = JsonRpcProvider.new(m.state.config.ethProvider)
without marketplaceAddress =? Address.init(m.state.config.marketplaceAddress):
return failure("Invalid MarketplaceAddress provided")

View File

@ -1,5 +1,5 @@
import pkg/contractabi
import pkg/ethers/fields
import pkg/ethers/contracts/fields
import pkg/questionable/results
export contractabi

View File

@ -121,7 +121,7 @@ proc config(
return resolvedConfig
proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
proc approveFunds(market: OnChainMarket, amount: UInt256) {.async: (raises: [CancelledError]).} =
raiseAssert("Not available: approveFunds")
proc getZkeyHash*(
@ -138,43 +138,43 @@ proc periodicity*(
let period = config.proofs.period
return Periodicity(seconds: period)
proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async.} =
proc proofTimeout*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} =
convertEthersError:
let config = await market.config()
return config.proofs.timeout
proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async.} =
proc repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} =
convertEthersError:
let config = await market.config()
return config.collateral.repairRewardPercentage
proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async.} =
proc requestDurationLimit*(market: OnChainMarket): Future[uint64] {.async: (raises: [CancelledError]).} =
convertEthersError:
let config = await market.config()
return config.requestDurationLimit
proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} =
proc proofDowntime*(market: OnChainMarket): Future[uint8] {.async: (raises: [CancelledError]).} =
convertEthersError:
let config = await market.config()
return config.proofs.downtime
proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} =
proc getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async: (raises: [CancelledError]).} =
convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.getPointer(slotId, overrides)
proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} =
proc myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async: (raises: [CancelledError]).} =
convertEthersError:
return await market.contract.myRequests
proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} =
proc mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async: (raises: [CancelledError]).} =
convertEthersError:
let slots = await market.contract.mySlots()
debug "Fetched my slots", numSlots = len(slots)
return slots
proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} =
proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async: (raises: [CancelledError]).} =
convertEthersError:
debug "Requesting storage"
await market.approveFunds(request.totalPrice())
@ -182,7 +182,7 @@ proc requestStorage(market: OnChainMarket, request: StorageRequest) {.async.} =
proc getRequest*(
market: OnChainMarket, id: RequestId
): Future[?StorageRequest] {.async.} =
): Future[?StorageRequest] {.async: (raises: [CancelledError]).} =
let key = $id
convertEthersError:
@ -194,7 +194,7 @@ proc getRequest*(
proc requestState*(
market: OnChainMarket, requestId: RequestId
): Future[?RequestState] {.async.} =
): Future[?RequestState] {.async: (raises: [CancelledError]).} =
convertEthersError:
try:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -202,22 +202,22 @@ proc requestState*(
except Marketplace_UnknownRequest:
return none RequestState
proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async.} =
proc slotState*(market: OnChainMarket, slotId: SlotId): Future[SlotState] {.async: (raises: [CancelledError]).} =
convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.slotState(slotId, overrides)
proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async.} =
proc getRequestEnd*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} =
convertEthersError:
return (await market.contract.requestEnd(id)).uint64
proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async.} =
proc requestExpiresAt*(market: OnChainMarket, id: RequestId): Future[uint64] {.async: (raises: [CancelledError]).} =
convertEthersError:
return (await market.contract.requestExpiry(id)).uint64
proc getHost(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
): Future[?Address] {.async.} =
): Future[?Address] {.async: (raises: [CancelledError]).} =
convertEthersError:
let slotId = slotId(requestId, slotIndex)
let address = await market.contract.getHost(slotId)
@ -228,11 +228,11 @@ proc getHost(
proc currentCollateral*(
market: OnChainMarket, slotId: SlotId
): Future[UInt256] {.async.} =
): Future[UInt256] {.async: (raises: [CancelledError]).} =
convertEthersError:
return await market.contract.currentCollateral(slotId)
proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async.} =
proc getActiveSlot*(market: OnChainMarket, slotId: SlotId): Future[?Slot] {.async: (raises: [CancelledError]).} =
convertEthersError:
try:
return some await market.contract.getActiveSlot(slotId)
@ -245,7 +245,7 @@ proc fillSlot(
slotIndex: uint64,
proof: Groth16Proof,
collateral: UInt256,
) {.async.} =
) {.async: (raises: [CancelledError]).} =
convertEthersError:
logScope:
requestId
@ -256,14 +256,14 @@ proc fillSlot(
discard await market.contract.fillSlot(requestId, slotIndex, proof).confirm(1)
trace "fillSlot transaction completed"
proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} =
proc freeSlot*(market: OnChainMarket, slotId: SlotId) {.async: (raises: [CancelledError]).} =
raiseAssert("Not supported")
proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} =
proc withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async: (raises: [CancelledError]).} =
convertEthersError:
discard await market.contract.withdrawFunds(requestId).confirm(1)
proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} =
convertEthersError:
try:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -271,7 +271,7 @@ proc isProofRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.}
except Marketplace_SlotIsFree:
return false
proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async.} =
proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.async: (raises: [CancelledError]).} =
convertEthersError:
try:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -281,22 +281,22 @@ proc willProofBeRequired*(market: OnChainMarket, id: SlotId): Future[bool] {.asy
proc getChallenge*(
market: OnChainMarket, id: SlotId
): Future[ProofChallenge] {.async.} =
): Future[ProofChallenge] {.async: (raises: [CancelledError]).} =
convertEthersError:
let overrides = CallOverrides(blockTag: some BlockTag.pending)
return await market.contract.getChallenge(id, overrides)
proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async.} =
proc submitProof*(market: OnChainMarket, id: SlotId, proof: Groth16Proof) {.async: (raises: [CancelledError]).} =
convertEthersError:
discard await market.contract.submitProof(id, proof).confirm(1)
proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async.} =
proc markProofAsMissing*(market: OnChainMarket, id: SlotId, period: Period) {.async: (raises: [CancelledError]).} =
convertEthersError:
discard await market.contract.markProofAsMissing(id, period).confirm(1)
proc canProofBeMarkedAsMissing*(
market: OnChainMarket, id: SlotId, period: Period
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
let provider = market.contract.provider
let contractWithoutSigner = market.contract.connect(provider)
let overrides = CallOverrides(blockTag: some BlockTag.pending)
@ -309,7 +309,7 @@ proc canProofBeMarkedAsMissing*(
proc reserveSlot*(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
) {.async.} =
) {.async: (raises: [CancelledError]).} =
convertEthersError:
discard await market.contract
.reserveSlot(
@ -322,13 +322,13 @@ proc reserveSlot*(
proc canReserveSlot*(
market: OnChainMarket, requestId: RequestId, slotIndex: uint64
): Future[bool] {.async.} =
): Future[bool] {.async: (raises: [CancelledError]).} =
convertEthersError:
return await market.contract.canReserveSlot(requestId, slotIndex)
proc subscribeRequests*(
market: OnChainMarket, callback: OnRequest
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!StorageRequested) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in Request subscription", msg = eventErr.msg
@ -342,7 +342,7 @@ proc subscribeRequests*(
proc subscribeSlotFilled*(
market: OnChainMarket, callback: OnSlotFilled
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!SlotFilled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in SlotFilled subscription", msg = eventErr.msg
@ -359,7 +359,7 @@ proc subscribeSlotFilled*(
requestId: RequestId,
slotIndex: uint64,
callback: OnSlotFilled,
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: uint64) =
if eventRequestId == requestId and eventSlotIndex == slotIndex:
callback(requestId, slotIndex)
@ -369,7 +369,7 @@ proc subscribeSlotFilled*(
proc subscribeSlotFreed*(
market: OnChainMarket, callback: OnSlotFreed
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!SlotFreed) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in SlotFreed subscription", msg = eventErr.msg
@ -383,7 +383,7 @@ proc subscribeSlotFreed*(
proc subscribeSlotReservationsFull*(
market: OnChainMarket, callback: OnSlotReservationsFull
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!SlotReservationsFull) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in SlotReservationsFull subscription",
@ -398,7 +398,7 @@ proc subscribeSlotReservationsFull*(
proc subscribeFulfillment(
market: OnChainMarket, callback: OnFulfillment
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
@ -412,7 +412,7 @@ proc subscribeFulfillment(
proc subscribeFulfillment(
market: OnChainMarket, requestId: RequestId, callback: OnFulfillment
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!RequestFulfilled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFulfillment subscription", msg = eventErr.msg
@ -427,7 +427,7 @@ proc subscribeFulfillment(
proc subscribeRequestCancelled*(
market: OnChainMarket, callback: OnRequestCancelled
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestCancelled subscription", msg = eventErr.msg
@ -441,7 +441,7 @@ proc subscribeRequestCancelled*(
proc subscribeRequestCancelled*(
market: OnChainMarket, requestId: RequestId, callback: OnRequestCancelled
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!RequestCancelled) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestCancelled subscription", msg = eventErr.msg
@ -456,7 +456,7 @@ proc subscribeRequestCancelled*(
proc subscribeRequestFailed*(
market: OnChainMarket, callback: OnRequestFailed
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFailed subscription", msg = eventErr.msg
@ -470,7 +470,7 @@ proc subscribeRequestFailed*(
proc subscribeRequestFailed*(
market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!RequestFailed) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in RequestFailed subscription", msg = eventErr.msg
@ -485,7 +485,7 @@ proc subscribeRequestFailed*(
proc subscribeProofSubmission*(
market: OnChainMarket, callback: OnProofSubmitted
): Future[MarketSubscription] {.async.} =
): Future[MarketSubscription] {.async: (raises: [CancelledError]).} =
proc onEvent(eventResult: ?!ProofSubmitted) {.upraises: [].} =
without event =? eventResult, eventErr:
error "There was an error in ProofSubmitted subscription", msg = eventErr.msg
@ -497,18 +497,18 @@ proc subscribeProofSubmission*(
let subscription = await market.contract.subscribe(ProofSubmitted, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription)
proc unsubscribe*(subscription: OnChainMarketSubscription) {.async.} =
proc unsubscribe*(subscription: OnChainMarketSubscription) {.async: (raises: [CancelledError]).} =
await subscription.eventSubscription.unsubscribe()
proc queryPastSlotFilledEvents*(
market: OnChainMarket, fromBlock: BlockTag
): Future[seq[SlotFilled]] {.async.} =
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} =
convertEthersError:
return await market.contract.queryFilter(SlotFilled, fromBlock, BlockTag.latest)
proc queryPastSlotFilledEvents*(
market: OnChainMarket, blocksAgo: int
): Future[seq[SlotFilled]] {.async.} =
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
@ -516,21 +516,21 @@ proc queryPastSlotFilledEvents*(
proc queryPastSlotFilledEvents*(
market: OnChainMarket, fromTime: int64
): Future[seq[SlotFilled]] {.async.} =
): Future[seq[SlotFilled]] {.async: (raises: [CancelledError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)
return await market.queryPastSlotFilledEvents(BlockTag.init(fromBlock))
proc queryPastStorageRequestedEvents*(
market: OnChainMarket, fromBlock: BlockTag
): Future[seq[StorageRequested]] {.async.} =
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} =
convertEthersError:
return
await market.contract.queryFilter(StorageRequested, fromBlock, BlockTag.latest)
proc queryPastStorageRequestedEvents*(
market: OnChainMarket, blocksAgo: int
): Future[seq[StorageRequested]] {.async.} =
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.pastBlockTag(blocksAgo)
@ -538,7 +538,7 @@ proc queryPastStorageRequestedEvents*(
proc queryPastStorageRequestedEventsFromTime*(
market: OnChainMarket, fromTime: int64
): Future[seq[StorageRequested]] {.async.} =
): Future[seq[StorageRequested]] {.async: (raises: [CancelledError]).} =
convertEthersError:
let fromBlock = await market.contract.provider.blockNumberForEpoch(fromTime)

View File

@ -33,10 +33,10 @@ type
config*: Config
events*: Events
proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async.} =
proc delayedWorkerStart(s: State, step: OnStep, delay: Duration) {.async: (raises: [CancelledError]).} =
await sleepAsync(1.seconds)
proc worker(): Future[void] {.async.} =
proc worker(): Future[void] {.async: (raises: [CancelledError]).} =
while s.status == ApplicationStatus.Running:
if err =? (await step()).errorOption:
error "Failure-result caught in main loop. Stopping...", err = err.msg

View File

@ -24,7 +24,7 @@ proc newAsyncDataEvent*[T](): AsyncDataEvent[T] =
proc performUnsubscribe[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
) {.async.} =
) {.async: (raises: [CancelledError]).} =
if subscription in event.subscriptions:
await subscription.listenFuture.cancelAndWait()
event.subscriptions.delete(event.subscriptions.find(subscription))
@ -40,7 +40,7 @@ proc subscribe*[T](
delayedUnsubscribe: false,
)
proc listener() {.async.} =
proc listener() {.async: (raises: [CancelledError]).} =
while true:
let items = await event.queue.waitEvents(subscription.key)
for item in items:
@ -81,13 +81,13 @@ proc fire*[T](
proc unsubscribe*[T](
event: AsyncDataEvent[T], subscription: AsyncDataEventSubscription
) {.async.} =
) {.async: (raises: [CancelledError]).} =
if subscription.inHandler:
subscription.delayedUnsubscribe = true
else:
await event.performUnsubscribe(subscription)
proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async.} =
proc unsubscribeAll*[T](event: AsyncDataEvent[T]) {.async: (raises: [CancelledError]).} =
let all = event.subscriptions
for subscription in all:
await event.unsubscribe(subscription)