From 0c3fbad470f1eb118f694c9d1771fd8358d855fb Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Thu, 18 Aug 2022 16:01:47 +1000 Subject: [PATCH] [purchasing] Withdraw funds when request times out When a request for storage times out (not enough slots filled), the client will initiate a withdraw request to retrieve its funds out of the contract, setting the state of the request to RequestState.Cancelled. The client will also emit a RequestCancelled event for others to listen to (ie hosts will need to listen for this event to withdraw its collateral). Add unit test that checks for emission of RequestCancelled after request is purchased request expires. Update dagger-contracts dependency to commit that holds the changes supporting withdrawing of funds. --- codex/contracts/market.nim | 14 ++++++++++++++ codex/contracts/storage.nim | 4 +++- codex/market.nim | 17 +++++++++++++--- codex/purchasing.nim | 25 +++++++++++++++++++++++- tests/codex/helpers/mockmarket.nim | 31 ++++++++++++++++++++++++++++++ tests/codex/testpurchasing.nim | 17 ++++++++++++++++ vendor/dagger-contracts | 2 +- 7 files changed, 104 insertions(+), 6 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index bbf89f67..ad0e0b2b 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -58,6 +58,10 @@ method fillSlot(market: OnChainMarket, proof: seq[byte]) {.async.} = await market.contract.fillSlot(requestId, slotIndex, proof) +method withdrawFunds(market: OnChainMarket, + requestId: array[32, byte]) {.async.} = + await market.contract.withdrawFunds(requestId) + method subscribeRequests(market: OnChainMarket, callback: OnRequest): Future[MarketSubscription] {.async.} = @@ -87,5 +91,15 @@ method subscribeFulfillment(market: OnChainMarket, let subscription = await market.contract.subscribe(RequestFulfilled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeRequestCancelled*(market: OnChainMarket, + requestId: array[32, byte], + callback: OnRequestCancelled): + Future[MarketSubscription] {.async.} = + proc onEvent(event: RequestCancelled) {.upraises:[].} = + if event.requestId == requestId: + callback(event.requestId) + let subscription = await market.contract.subscribe(RequestCancelled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = await subscription.eventSubscription.unsubscribe() diff --git a/codex/contracts/storage.nim b/codex/contracts/storage.nim index 7d5228d4..d04f25e1 100644 --- a/codex/contracts/storage.nim +++ b/codex/contracts/storage.nim @@ -18,7 +18,8 @@ type slotId* {.indexed.}: SlotId RequestFulfilled* = object of Event requestId* {.indexed.}: RequestId - + RequestCancelled* = object of Event + requestId* {.indexed.}: Id ProofSubmitted* = object of Event id*: SlotId proof*: seq[byte] @@ -34,6 +35,7 @@ proc balanceOf*(storage: Storage, account: Address): UInt256 {.contract, view.} proc requestStorage*(storage: Storage, request: StorageRequest) {.contract.} proc fillSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256, proof: seq[byte]) {.contract.} +proc withdrawFunds*(storage: Storage, requestId: Id) {.contract.} proc payoutSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256) {.contract.} proc getRequest*(storage: Storage, id: RequestId): StorageRequest {.contract, view.} proc getHost*(storage: Storage, id: SlotId): Address {.contract, view.} diff --git a/codex/market.nim b/codex/market.nim index deeabfe8..906fb5c2 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -10,9 +10,10 @@ export requests type Market* = ref object of RootObj Subscription* = ref object of RootObj - OnRequest* = proc(id: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} - OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].} - OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].} + OnRequest* = proc(id: array[32, byte], ask: StorageAsk) {.gcsafe, upraises:[].} + OnFulfillment* = proc(requestId: array[32, byte]) {.gcsafe, upraises: [].} + OnSlotFilled* = proc(requestId: array[32, byte], slotIndex: UInt256) {.gcsafe, upraises:[].} + OnRequestCancelled* = proc(requestId: array[32, byte]) {.gcsafe, upraises:[].} method getSigner*(market: Market): Future[Address] {.base, async.} = raiseAssert("not implemented") @@ -38,6 +39,10 @@ method fillSlot*(market: Market, proof: seq[byte]) {.base, async.} = raiseAssert("not implemented") +method withdrawFunds*(market: Market, + requestId: array[32, byte]) {.base, async.} = + raiseAssert("not implemented") + method subscribeRequests*(market: Market, callback: OnRequest): Future[Subscription] {.base, async.} = @@ -56,5 +61,11 @@ method subscribeSlotFilled*(market: Market, Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeRequestCancelled*(market: Market, + requestId: array[32, byte], + callback: OnRequestCancelled): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") diff --git a/codex/purchasing.nim b/codex/purchasing.nim index 7991ee03..65e83038 100644 --- a/codex/purchasing.nim +++ b/codex/purchasing.nim @@ -23,6 +23,12 @@ type clock: Clock request*: StorageRequest PurchaseTimeout* = Timeout + RequestState* = enum + New = 1, # [default] waiting to fill slots + Started = 2, # all slots filled, accepting regular proofs + Cancelled = 3, # not enough slots filled before expiry + Finished = 4, # successfully completed + Failed = 5 # too many nodes have failed to provide proofs, data lost PurchaseId* = distinct array[32, byte] const DefaultProofProbability = 100.u256 @@ -75,6 +81,7 @@ func getPurchase*(purchasing: Purchasing, id: PurchaseId): ?Purchase = proc run(purchase: Purchase) {.async.} = let market = purchase.market let clock = purchase.clock + var state = RequestState.New proc requestStorage {.async.} = purchase.request = await market.requestStorage(purchase.request) @@ -87,13 +94,26 @@ proc run(purchase: Purchase) {.async.} = let subscription = await market.subscribeFulfillment(request.id, callback) await done await subscription.unsubscribe() + state = RequestState.Started proc withTimeout(future: Future[void]) {.async.} = let expiry = purchase.request.expiry.truncate(int64) await future.withTimeout(clock, expiry) await requestStorage() - await waitUntilFulfilled().withTimeout() + try: + await waitUntilFulfilled().withTimeout() + except PurchaseTimeout as e: + if state != RequestState.Started: + # If contract was fulfilled, the state would be RequestState.Started. + # Otherwise, the request would have timed out and should be considered + # cancelled. However, the request state hasn't been updated to + # RequestState.Cancelled yet so we can't check for that state or listen for + # an event emission. Instead, the state will be updated when the client + # requests to withdraw funds from the storage request. + await market.withdrawFunds(purchase.request.id) + state = RequestState.Cancelled + raise e proc start(purchase: Purchase) = purchase.future = purchase.run() @@ -104,6 +124,9 @@ proc wait*(purchase: Purchase) {.async.} = func id*(purchase: Purchase): PurchaseId = PurchaseId(purchase.request.id) +func cancelled*(purchase: Purchase): bool = + purchase.future.cancelled + func finished*(purchase: Purchase): bool = purchase.future.finished diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index 74825046..9a2febaa 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -23,6 +23,7 @@ type onRequest: seq[RequestSubscription] onFulfillment: seq[FulfillmentSubscription] onSlotFilled: seq[SlotFilledSubscription] + onRequestCancelled: seq[RequestCancelledSubscription] RequestSubscription* = ref object of Subscription market: MockMarket callback: OnRequest @@ -35,6 +36,10 @@ type requestId: RequestId slotIndex: UInt256 callback: OnSlotFilled + RequestCancelledSubscription* = ref object of Subscription + market: MockMarket + requestId: array[32, byte] + callback: OnRequestCancelled proc new*(_: type MockMarket): MockMarket = MockMarket(signer: Address.example) @@ -75,6 +80,13 @@ proc emitSlotFilled*(market: MockMarket, subscription.slotIndex == slotIndex: subscription.callback(requestId, slotIndex) +proc emitRequestCancelled*(market: MockMarket, + requestId: array[32, byte]) = + var subscriptions = market.subscriptions.onRequestCancelled + for subscription in subscriptions: + if subscription.requestId == requestId: + subscription.callback(requestId) + proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onFulfillment for subscription in subscriptions: @@ -101,6 +113,10 @@ method fillSlot*(market: MockMarket, proof: seq[byte]) {.async.} = market.fillSlot(requestId, slotIndex, proof, market.signer) +method withdrawFunds*(market: MockMarket, + requestId: array[32, byte]) {.async.} = + market.emitRequestCancelled(requestId) + method subscribeRequests*(market: MockMarket, callback: OnRequest): Future[Subscription] {.async.} = @@ -137,6 +153,18 @@ method subscribeSlotFilled*(market: MockMarket, market.subscriptions.onSlotFilled.add(subscription) return subscription +method subscribeRequestCancelled*(market: MockMarket, + requestId: array[32, byte], + callback: OnRequestCancelled): + Future[Subscription] {.async.} = + let subscription = RequestCancelledSubscription( + market: market, + requestId: requestId, + callback: callback + ) + market.subscriptions.onRequestCancelled.add(subscription) + return subscription + method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) @@ -145,3 +173,6 @@ method unsubscribe*(subscription: FulfillmentSubscription) {.async.} = method unsubscribe*(subscription: SlotFilledSubscription) {.async.} = subscription.market.subscriptions.onSlotFilled.keepItIf(it != subscription) + +method unsubscribe*(subscription: RequestCancelledSubscription) {.async.} = + subscription.market.subscriptions.onRequestCancelled.keepItIf(it != subscription) diff --git a/tests/codex/testpurchasing.nim b/tests/codex/testpurchasing.nim index 45bc4a05..c1fd7289 100644 --- a/tests/codex/testpurchasing.nim +++ b/tests/codex/testpurchasing.nim @@ -1,6 +1,7 @@ import std/times import pkg/asynctest import pkg/chronos +import pkg/upraises import pkg/stint import pkg/codex/purchasing import ./helpers/mockmarket @@ -87,3 +88,19 @@ suite "Purchasing": clock.set(request.expiry.truncate(int64)) expect PurchaseTimeout: await purchase.wait() + + test "supports request cancelled subscription when request times out": + let purchase = purchasing.purchase(request) + let request = market.requested[0] + var receivedIds: seq[array[32, byte]] + clock.set(request.expiry.truncate(int64)) + proc onRequestCancelled(id: array[32, byte]) {.gcsafe, upraises:[].} = + receivedIds.add(id) + let subscription = await market.subscribeRequestCancelled( + request.id, + onRequestCancelled) + try: + await purchase.wait() + except PurchaseTimeout: + check receivedIds == @[request.id] + await subscription.unsubscribe() diff --git a/vendor/dagger-contracts b/vendor/dagger-contracts index 9ab65ae5..503c496f 160000 --- a/vendor/dagger-contracts +++ b/vendor/dagger-contracts @@ -1 +1 @@ -Subproject commit 9ab65ae5a61a09a6849cc4adbd8ef58fb89c037e +Subproject commit 503c496fe24b77f9ad180bf9b85dfa55baf4cb0e