From fd398fc3044a0d58dae4a2c803229391d6d5b879 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Thu, 10 Nov 2022 15:50:19 +1100 Subject: [PATCH] [marketplace] make on chain event callbacks async # Conflicts: # tests/codex/helpers/mockmarket.nim --- codex/contracts/market.nim | 10 +++++----- codex/purchasing/states/started.nim | 4 +++- codex/purchasing/states/submitted.nim | 2 +- tests/codex/helpers/mockmarket.nim | 10 +++++----- 4 files changed, 14 insertions(+), 12 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 9e39d6a2..bce50836 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -91,7 +91,7 @@ method subscribeRequests(market: OnChainMarket, callback: OnRequest): Future[MarketSubscription] {.async.} = proc onEvent(event: StorageRequested) {.upraises:[].} = - callback(event.requestId, event.ask) + asyncSpawn callback(event.requestId, event.ask) let subscription = await market.contract.subscribe(StorageRequested, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) @@ -102,7 +102,7 @@ method subscribeSlotFilled*(market: OnChainMarket, Future[MarketSubscription] {.async.} = proc onEvent(event: SlotFilled) {.upraises:[].} = if event.requestId == requestId and event.slotIndex == slotIndex: - callback(event.requestId, event.slotIndex) + asyncSpawn callback(event.requestId, event.slotIndex) let subscription = await market.contract.subscribe(SlotFilled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) @@ -112,7 +112,7 @@ method subscribeFulfillment(market: OnChainMarket, Future[MarketSubscription] {.async.} = proc onEvent(event: RequestFulfilled) {.upraises:[].} = if event.requestId == requestId: - callback(event.requestId) + asyncSpawn callback(event.requestId) let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) @@ -122,7 +122,7 @@ method subscribeRequestCancelled*(market: OnChainMarket, Future[MarketSubscription] {.async.} = proc onEvent(event: RequestCancelled) {.upraises:[].} = if event.requestId == requestId: - callback(event.requestId) + asyncSpawn callback(event.requestId) let subscription = await market.contract.subscribe(RequestCancelled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) @@ -132,7 +132,7 @@ method subscribeRequestFailed*(market: OnChainMarket, Future[MarketSubscription] {.async.} = proc onEvent(event: RequestFailed) {.upraises:[].} = if event.requestId == requestId: - callback(event.requestId) + asyncSpawn callback(event.requestId) let subscription = await market.contract.subscribe(RequestFailed, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) diff --git a/codex/purchasing/states/started.nim b/codex/purchasing/states/started.nim index 6d134c5e..8834789b 100644 --- a/codex/purchasing/states/started.nim +++ b/codex/purchasing/states/started.nim @@ -13,7 +13,7 @@ method enterAsync*(state: PurchaseStarted) {.async.} = let market = purchase.market let failed = newFuture[void]() - proc callback(_: RequestId) = + proc callback(_: RequestId) {.async.} = failed.complete() let subscription = await market.subscribeRequestFailed(purchase.requestId, callback) @@ -21,8 +21,10 @@ method enterAsync*(state: PurchaseStarted) {.async.} = try: let fut = await one(ended, failed) if fut.id == failed.id: + ended.cancel() state.switch(PurchaseFailed()) else: + failed.cancel() state.switch(PurchaseFinished()) await subscription.unsubscribe() except CatchableError as error: diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 9d5c8589..1e76dae5 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -15,7 +15,7 @@ method enterAsync(state: PurchaseSubmitted) {.async.} = proc wait {.async.} = let done = newFuture[void]() - proc callback(_: RequestId) = + proc callback(_: RequestId) {.async.} = done.complete() let subscription = await market.subscribeFulfillment(request.id, callback) await done diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index a8ac19dc..94151bc0 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -65,7 +65,7 @@ method requestStorage*(market: MockMarket, request: StorageRequest) {.async.} = market.requested.add(request) var subscriptions = market.subscriptions.onRequest for subscription in subscriptions: - subscription.callback(request.id, request.ask) + await subscription.callback(request.id, request.ask) method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} = return market.activeRequests[market.signer] @@ -103,26 +103,26 @@ proc emitSlotFilled*(market: MockMarket, for subscription in subscriptions: if subscription.requestId == requestId and subscription.slotIndex == slotIndex: - subscription.callback(requestId, slotIndex) + asyncSpawn subscription.callback(requestId, slotIndex) proc emitRequestCancelled*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onRequestCancelled for subscription in subscriptions: if subscription.requestId == requestId: - subscription.callback(requestId) + asyncSpawn subscription.callback(requestId) proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onFulfillment for subscription in subscriptions: if subscription.requestId == requestId: - subscription.callback(requestId) + asyncSpawn subscription.callback(requestId) proc emitRequestFailed*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onRequestFailed for subscription in subscriptions: if subscription.requestId == requestId: - subscription.callback(requestId) + asyncSpawn subscription.callback(requestId) proc fillSlot*(market: MockMarket, requestId: RequestId,