[marketplace] make on chain event callbacks async

# Conflicts:
#	tests/codex/helpers/mockmarket.nim
This commit is contained in:
Eric Mastro 2022-11-10 15:50:19 +11:00 committed by Mark Spanbroek
parent 10815ac0b5
commit fd398fc304
No known key found for this signature in database
GPG Key ID: FBE3E9548D427C00
4 changed files with 14 additions and 12 deletions

View File

@ -91,7 +91,7 @@ method subscribeRequests(market: OnChainMarket,
callback: OnRequest): callback: OnRequest):
Future[MarketSubscription] {.async.} = Future[MarketSubscription] {.async.} =
proc onEvent(event: StorageRequested) {.upraises:[].} = proc onEvent(event: StorageRequested) {.upraises:[].} =
callback(event.requestId, event.ask) asyncSpawn callback(event.requestId, event.ask)
let subscription = await market.contract.subscribe(StorageRequested, onEvent) let subscription = await market.contract.subscribe(StorageRequested, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription) return OnChainMarketSubscription(eventSubscription: subscription)
@ -102,7 +102,7 @@ method subscribeSlotFilled*(market: OnChainMarket,
Future[MarketSubscription] {.async.} = Future[MarketSubscription] {.async.} =
proc onEvent(event: SlotFilled) {.upraises:[].} = proc onEvent(event: SlotFilled) {.upraises:[].} =
if event.requestId == requestId and event.slotIndex == slotIndex: if event.requestId == requestId and event.slotIndex == slotIndex:
callback(event.requestId, event.slotIndex) asyncSpawn callback(event.requestId, event.slotIndex)
let subscription = await market.contract.subscribe(SlotFilled, onEvent) let subscription = await market.contract.subscribe(SlotFilled, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription) return OnChainMarketSubscription(eventSubscription: subscription)
@ -112,7 +112,7 @@ method subscribeFulfillment(market: OnChainMarket,
Future[MarketSubscription] {.async.} = Future[MarketSubscription] {.async.} =
proc onEvent(event: RequestFulfilled) {.upraises:[].} = proc onEvent(event: RequestFulfilled) {.upraises:[].} =
if event.requestId == requestId: if event.requestId == requestId:
callback(event.requestId) asyncSpawn callback(event.requestId)
let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) let subscription = await market.contract.subscribe(RequestFulfilled, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription) return OnChainMarketSubscription(eventSubscription: subscription)
@ -122,7 +122,7 @@ method subscribeRequestCancelled*(market: OnChainMarket,
Future[MarketSubscription] {.async.} = Future[MarketSubscription] {.async.} =
proc onEvent(event: RequestCancelled) {.upraises:[].} = proc onEvent(event: RequestCancelled) {.upraises:[].} =
if event.requestId == requestId: if event.requestId == requestId:
callback(event.requestId) asyncSpawn callback(event.requestId)
let subscription = await market.contract.subscribe(RequestCancelled, onEvent) let subscription = await market.contract.subscribe(RequestCancelled, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription) return OnChainMarketSubscription(eventSubscription: subscription)
@ -132,7 +132,7 @@ method subscribeRequestFailed*(market: OnChainMarket,
Future[MarketSubscription] {.async.} = Future[MarketSubscription] {.async.} =
proc onEvent(event: RequestFailed) {.upraises:[].} = proc onEvent(event: RequestFailed) {.upraises:[].} =
if event.requestId == requestId: if event.requestId == requestId:
callback(event.requestId) asyncSpawn callback(event.requestId)
let subscription = await market.contract.subscribe(RequestFailed, onEvent) let subscription = await market.contract.subscribe(RequestFailed, onEvent)
return OnChainMarketSubscription(eventSubscription: subscription) return OnChainMarketSubscription(eventSubscription: subscription)

View File

@ -13,7 +13,7 @@ method enterAsync*(state: PurchaseStarted) {.async.} =
let market = purchase.market let market = purchase.market
let failed = newFuture[void]() let failed = newFuture[void]()
proc callback(_: RequestId) = proc callback(_: RequestId) {.async.} =
failed.complete() failed.complete()
let subscription = await market.subscribeRequestFailed(purchase.requestId, callback) let subscription = await market.subscribeRequestFailed(purchase.requestId, callback)
@ -21,8 +21,10 @@ method enterAsync*(state: PurchaseStarted) {.async.} =
try: try:
let fut = await one(ended, failed) let fut = await one(ended, failed)
if fut.id == failed.id: if fut.id == failed.id:
ended.cancel()
state.switch(PurchaseFailed()) state.switch(PurchaseFailed())
else: else:
failed.cancel()
state.switch(PurchaseFinished()) state.switch(PurchaseFinished())
await subscription.unsubscribe() await subscription.unsubscribe()
except CatchableError as error: except CatchableError as error:

View File

@ -15,7 +15,7 @@ method enterAsync(state: PurchaseSubmitted) {.async.} =
proc wait {.async.} = proc wait {.async.} =
let done = newFuture[void]() let done = newFuture[void]()
proc callback(_: RequestId) = proc callback(_: RequestId) {.async.} =
done.complete() done.complete()
let subscription = await market.subscribeFulfillment(request.id, callback) let subscription = await market.subscribeFulfillment(request.id, callback)
await done await done

View File

@ -65,7 +65,7 @@ method requestStorage*(market: MockMarket, request: StorageRequest) {.async.} =
market.requested.add(request) market.requested.add(request)
var subscriptions = market.subscriptions.onRequest var subscriptions = market.subscriptions.onRequest
for subscription in subscriptions: for subscription in subscriptions:
subscription.callback(request.id, request.ask) await subscription.callback(request.id, request.ask)
method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} = method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} =
return market.activeRequests[market.signer] return market.activeRequests[market.signer]
@ -103,26 +103,26 @@ proc emitSlotFilled*(market: MockMarket,
for subscription in subscriptions: for subscription in subscriptions:
if subscription.requestId == requestId and if subscription.requestId == requestId and
subscription.slotIndex == slotIndex: subscription.slotIndex == slotIndex:
subscription.callback(requestId, slotIndex) asyncSpawn subscription.callback(requestId, slotIndex)
proc emitRequestCancelled*(market: MockMarket, proc emitRequestCancelled*(market: MockMarket,
requestId: RequestId) = requestId: RequestId) =
var subscriptions = market.subscriptions.onRequestCancelled var subscriptions = market.subscriptions.onRequestCancelled
for subscription in subscriptions: for subscription in subscriptions:
if subscription.requestId == requestId: if subscription.requestId == requestId:
subscription.callback(requestId) asyncSpawn subscription.callback(requestId)
proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) = proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) =
var subscriptions = market.subscriptions.onFulfillment var subscriptions = market.subscriptions.onFulfillment
for subscription in subscriptions: for subscription in subscriptions:
if subscription.requestId == requestId: if subscription.requestId == requestId:
subscription.callback(requestId) asyncSpawn subscription.callback(requestId)
proc emitRequestFailed*(market: MockMarket, requestId: RequestId) = proc emitRequestFailed*(market: MockMarket, requestId: RequestId) =
var subscriptions = market.subscriptions.onRequestFailed var subscriptions = market.subscriptions.onRequestFailed
for subscription in subscriptions: for subscription in subscriptions:
if subscription.requestId == requestId: if subscription.requestId == requestId:
subscription.callback(requestId) asyncSpawn subscription.callback(requestId)
proc fillSlot*(market: MockMarket, proc fillSlot*(market: MockMarket,
requestId: RequestId, requestId: RequestId,