diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index bbf89f67..ad0e0b2b 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -58,6 +58,10 @@ method fillSlot(market: OnChainMarket, proof: seq[byte]) {.async.} = await market.contract.fillSlot(requestId, slotIndex, proof) +method withdrawFunds(market: OnChainMarket, + requestId: array[32, byte]) {.async.} = + await market.contract.withdrawFunds(requestId) + method subscribeRequests(market: OnChainMarket, callback: OnRequest): Future[MarketSubscription] {.async.} = @@ -87,5 +91,15 @@ method subscribeFulfillment(market: OnChainMarket, let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeRequestCancelled*(market: OnChainMarket, + requestId: array[32, byte], + callback: OnRequestCancelled): + Future[MarketSubscription] {.async.} = + proc onEvent(event: RequestCancelled) {.upraises:[].} = + if event.requestId == requestId: + callback(event.requestId) + let subscription = await market.contract.subscribe(RequestCancelled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = await subscription.eventSubscription.unsubscribe() diff --git a/codex/contracts/storage.nim b/codex/contracts/storage.nim index 7d5228d4..d04f25e1 100644 --- a/codex/contracts/storage.nim +++ b/codex/contracts/storage.nim @@ -18,7 +18,8 @@ type slotId* {.indexed.}: SlotId RequestFulfilled* = object of Event requestId* {.indexed.}: RequestId - + RequestCancelled* = object of Event + requestId* {.indexed.}: Id ProofSubmitted* = object of Event id*: SlotId proof*: seq[byte] @@ -34,6 +35,7 @@ proc balanceOf*(storage: Storage, account: Address): UInt256 {.contract, view.} proc requestStorage*(storage: Storage, request: StorageRequest) {.contract.} proc fillSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256, proof: seq[byte]) {.contract.} +proc withdrawFunds*(storage: Storage, requestId: Id) {.contract.} proc payoutSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256) {.contract.} proc getRequest*(storage: Storage, id: RequestId): StorageRequest {.contract, view.} proc getHost*(storage: Storage, id: SlotId): Address {.contract, view.} diff --git a/codex/market.nim b/codex/market.nim index deeabfe8..906fb5c2 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -10,9 +10,10 @@ export requests type Market* = ref object of RootObj Subscription* = ref object of RootObj - OnRequest* = proc(id: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} - OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].} - OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].} + OnRequest* = proc(id: array[32, byte], ask: StorageAsk) {.gcsafe, upraises:[].} + OnFulfillment* = proc(requestId: array[32, byte]) {.gcsafe, upraises: [].} + OnSlotFilled* = proc(requestId: array[32, byte], slotIndex: UInt256) {.gcsafe, upraises:[].} + OnRequestCancelled* = proc(requestId: array[32, byte]) {.gcsafe, upraises:[].} method getSigner*(market: Market): Future[Address] {.base, async.} = raiseAssert("not implemented") @@ -38,6 +39,10 @@ method fillSlot*(market: Market, proof: seq[byte]) {.base, async.} = raiseAssert("not implemented") +method withdrawFunds*(market: Market, + requestId: array[32, byte]) {.base, async.} = + raiseAssert("not implemented") + method subscribeRequests*(market: Market, callback: OnRequest): Future[Subscription] {.base, async.} = @@ -56,5 +61,11 @@ method subscribeSlotFilled*(market: Market, Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeRequestCancelled*(market: Market, + requestId: array[32, byte], + callback: OnRequestCancelled): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") diff --git a/codex/purchasing.nim b/codex/purchasing.nim index 7991ee03..65e83038 100644 --- a/codex/purchasing.nim +++ b/codex/purchasing.nim @@ -23,6 +23,12 @@ type clock: Clock request*: StorageRequest PurchaseTimeout* = Timeout + RequestState* = enum + New = 1, # [default] waiting to fill slots + Started = 2, # all slots filled, accepting regular proofs + Cancelled = 3, # not enough slots filled before expiry + Finished = 4, # successfully completed + Failed = 5 # too many nodes have failed to provide proofs, data lost PurchaseId* = distinct array[32, byte] const DefaultProofProbability = 100.u256 @@ -75,6 +81,7 @@ func getPurchase*(purchasing: Purchasing, id: PurchaseId): ?Purchase = proc run(purchase: Purchase) {.async.} = let market = purchase.market let clock = purchase.clock + var state = RequestState.New proc requestStorage {.async.} = purchase.request = await market.requestStorage(purchase.request) @@ -87,13 +94,26 @@ proc run(purchase: Purchase) {.async.} = let subscription = await market.subscribeFulfillment(request.id, callback) await done await subscription.unsubscribe() + state = RequestState.Started proc withTimeout(future: Future[void]) {.async.} = let expiry = purchase.request.expiry.truncate(int64) await future.withTimeout(clock, expiry) await requestStorage() - await waitUntilFulfilled().withTimeout() + try: + await waitUntilFulfilled().withTimeout() + except PurchaseTimeout as e: + if state != RequestState.Started: + # If contract was fulfilled, the state would be RequestState.Started. + # Otherwise, the request would have timed out and should be considered + # cancelled. However, the request state hasn't been updated to + # RequestState.Cancelled yet so we can't check for that state or listen for + # an event emission. Instead, the state will be updated when the client + # requests to withdraw funds from the storage request. + await market.withdrawFunds(purchase.request.id) + state = RequestState.Cancelled + raise e proc start(purchase: Purchase) = purchase.future = purchase.run() @@ -104,6 +124,9 @@ proc wait*(purchase: Purchase) {.async.} = func id*(purchase: Purchase): PurchaseId = PurchaseId(purchase.request.id) +func cancelled*(purchase: Purchase): bool = + purchase.future.cancelled + func finished*(purchase: Purchase): bool = purchase.future.finished diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 74825046..9a2febaa 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -23,6 +23,7 @@ type onRequest: seq[RequestSubscription] onFulfillment: seq[FulfillmentSubscription] onSlotFilled: seq[SlotFilledSubscription] + onRequestCancelled: seq[RequestCancelledSubscription] RequestSubscription* = ref object of Subscription market: MockMarket callback: OnRequest @@ -35,6 +36,10 @@ type requestId: RequestId slotIndex: UInt256 callback: OnSlotFilled + RequestCancelledSubscription* = ref object of Subscription + market: MockMarket + requestId: array[32, byte] + callback: OnRequestCancelled proc new*(_: type MockMarket): MockMarket = MockMarket(signer: Address.example) @@ -75,6 +80,13 @@ proc emitSlotFilled*(market: MockMarket, subscription.slotIndex == slotIndex: subscription.callback(requestId, slotIndex) +proc emitRequestCancelled*(market: MockMarket, + requestId: array[32, byte]) = + var subscriptions = market.subscriptions.onRequestCancelled + for subscription in subscriptions: + if subscription.requestId == requestId: + subscription.callback(requestId) + proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onFulfillment for subscription in subscriptions: @@ -101,6 +113,10 @@ method fillSlot*(market: MockMarket, proof: seq[byte]) {.async.} = market.fillSlot(requestId, slotIndex, proof, market.signer) +method withdrawFunds*(market: MockMarket, + requestId: array[32, byte]) {.async.} = + market.emitRequestCancelled(requestId) + method subscribeRequests*(market: MockMarket, callback: OnRequest): Future[Subscription] {.async.} = @@ -137,6 +153,18 @@ method subscribeSlotFilled*(market: MockMarket, market.subscriptions.onSlotFilled.add(subscription) return subscription +method subscribeRequestCancelled*(market: MockMarket, + requestId: array[32, byte], + callback: OnRequestCancelled): + Future[Subscription] {.async.} = + let subscription = RequestCancelledSubscription( + market: market, + requestId: requestId, + callback: callback + ) + market.subscriptions.onRequestCancelled.add(subscription) + return subscription + method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) @@ -145,3 +173,6 @@ method unsubscribe*(subscription: FulfillmentSubscription) {.async.} = method unsubscribe*(subscription: SlotFilledSubscription) {.async.} = subscription.market.subscriptions.onSlotFilled.keepItIf(it != subscription) + +method unsubscribe*(subscription: RequestCancelledSubscription) {.async.} = + subscription.market.subscriptions.onRequestCancelled.keepItIf(it != subscription) diff --git a/tests/codex/testpurchasing.nim b/tests/codex/testpurchasing.nim index 45bc4a05..c1fd7289 100644 --- a/tests/codex/testpurchasing.nim +++ b/tests/codex/testpurchasing.nim @@ -1,6 +1,7 @@ import std/times import pkg/asynctest import pkg/chronos +import pkg/upraises import pkg/stint import pkg/codex/purchasing import ./helpers/mockmarket @@ -87,3 +88,19 @@ suite "Purchasing": clock.set(request.expiry.truncate(int64)) expect PurchaseTimeout: await purchase.wait() + + test "supports request cancelled subscription when request times out": + let purchase = purchasing.purchase(request) + let request = market.requested[0] + var receivedIds: seq[array[32, byte]] + clock.set(request.expiry.truncate(int64)) + proc onRequestCancelled(id: array[32, byte]) {.gcsafe, upraises:[].} = + receivedIds.add(id) + let subscription = await market.subscribeRequestCancelled( + request.id, + onRequestCancelled) + try: + await purchase.wait() + except PurchaseTimeout: + check receivedIds == @[request.id] + await subscription.unsubscribe() diff --git a/vendor/dagger-contracts b/vendor/dagger-contracts index 9ab65ae5..503c496f 160000 --- a/vendor/dagger-contracts +++ b/vendor/dagger-contracts @@ -1 +1 @@ -Subproject commit 9ab65ae5a61a09a6849cc4adbd8ef58fb89c037e +Subproject commit 503c496fe24b77f9ad180bf9b85dfa55baf4cb0e