From 4175689745937c0b6e076c9b83ab0c05167c55a6 Mon Sep 17 00:00:00 2001 From: markspanbroek Date: Tue, 8 Nov 2022 02:10:17 -0500 Subject: [PATCH] Load purchase state from chain (#283) * [purchasing] Simplify test * [utils] Move StorageRequest.example up one level * [purchasing] Load purchases from market * [purchasing] load purchase states * Implement myRequest() and getState() methods for OnChainMarket * [proofs] Fix intermittently failing tests Ensures that examples of proofs in tests are never of length 0; these are considered invalid proofs by the smart contract logic. * [contracts] Fix failing test With the new solidity contracts update, a contract can only be paid out after it started. * [market] Add method to get request end time * [purchasing] wait until purchase is finished Purchase.wait() would previously wait until purchase was started, now we wait until it is finished. * [purchasing] Handle 'finished' and 'failed' states * [marketplace] move to failed state once request fails - Add support for subscribing to request failure events. - Add supporting contract tests for subscribing to request failure events. - Allow the PurchaseStarted state to move to PurchaseFailure once a request failure event is emitted - Add supporting tests for moving from PurchaseStarted to PurchaseFailure - Add state transition tests for PurchaseUnknown. * [marketplace] Fix test with longer sleepAsync * [integration] Add function to restart a codex node * [purchasing] Set client address before requesting storage To prevent the purchase id (which equals the request id) from changing once it's been submitted. * [contracts] Fix: OnChainMarket.getState() Had the wrong method signature before * [purchasing] Load purchases on node start * [purchasing] Rename state 'PurchaseError' to 'PurchaseErrored' Allows for an exception type called 'PurchaseError' * [purchasing] Load purchases in background No longer calls market.getRequest() for every purchase on node start. * [contracts] Add `$` for RequestId, SlotId and Nonce To aid with debugging * [purchasing] Add Purchasing.stop() To ensure that all contract interactions have both a start() and a stop() for * [tests] Remove sleepAsync where possible Use `eventually` loop instead, to make sure that we're not waiting unnecessarily. * [integration] Fix: handle non-json response in test * [purchasing] Add purchase state to json * [integration] Ensure that purchase is submitted before restart Fixes test failure on slower CI * [purchasing] re-implement `description` as method Allows description to be set in the same module where the state type is defined. Co-authored-by: Eric Mastro * [contracts] fix typo Co-authored-by: Eric Mastro * [market] Use more generic error type Should we decide to change the provider type later Co-authored-by: Eric Mastro Co-authored-by: Eric Mastro --- codex/contracts/interactions.nim | 2 + codex/contracts/market.nim | 33 ++++- codex/contracts/requests.nim | 9 ++ codex/contracts/storage.nim | 7 + codex/market.nim | 29 +++- codex/node.nim | 2 +- codex/purchasing.nim | 28 +++- codex/purchasing/purchase.nim | 44 ++++-- codex/purchasing/statemachine.nim | 8 +- codex/purchasing/states/cancelled.nim | 9 +- codex/purchasing/states/error.nim | 7 +- codex/purchasing/states/failed.nim | 12 ++ codex/purchasing/states/finished.nim | 12 ++ codex/purchasing/states/pending.nim | 10 +- codex/purchasing/states/started.nim | 27 +++- codex/purchasing/states/submitted.nim | 11 +- codex/purchasing/states/unknown.nim | 37 ++++++ codex/rest/json.nim | 2 +- tests/codex/helpers/mockmarket.nim | 56 +++++++- tests/codex/testproving.nim | 16 +-- tests/codex/testpurchasing.nim | 185 ++++++++++++++++++++++---- tests/codex/testsales.nim | 34 ++--- tests/contracts/examples.nim | 29 ---- tests/contracts/testContracts.nim | 12 +- tests/contracts/testMarket.nim | 99 ++++++++++---- tests/contracts/testProofs.nim | 2 +- tests/examples.nim | 33 +++++ tests/integration/nodes.nim | 44 ++++-- tests/testIntegration.nim | 31 ++++- vendor/dagger-contracts | 2 +- 30 files changed, 655 insertions(+), 177 deletions(-) create mode 100644 codex/purchasing/states/failed.nim create mode 100644 codex/purchasing/states/finished.nim create mode 100644 codex/purchasing/states/unknown.nim diff --git a/codex/contracts/interactions.nim b/codex/contracts/interactions.nim index d6d75dda..5f4dcd9c 100644 --- a/codex/contracts/interactions.nim +++ b/codex/contracts/interactions.nim @@ -69,8 +69,10 @@ proc start*(interactions: ContractInteractions) {.async.} = await interactions.clock.start() await interactions.sales.start() await interactions.proving.start() + await interactions.purchasing.start() proc stop*(interactions: ContractInteractions) {.async.} = + await interactions.purchasing.stop() await interactions.sales.stop() await interactions.proving.stop() await interactions.clock.stop() diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index b84f1c50..2ed4148f 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -28,13 +28,11 @@ func new*(_: type OnChainMarket, contract: Storage): OnChainMarket = method getSigner*(market: OnChainMarket): Future[Address] {.async.} = return await market.signer.getAddress() -method requestStorage(market: OnChainMarket, - request: StorageRequest): - Future[StorageRequest] {.async.} = - var request = request - request.client = await market.signer.getAddress() +method myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} = + return await market.contract.myRequests + +method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} = await market.contract.requestStorage(request) - return request method getRequest(market: OnChainMarket, id: RequestId): Future[?StorageRequest] {.async.} = @@ -45,6 +43,19 @@ method getRequest(market: OnChainMarket, return none StorageRequest raise e +method getState*(market: OnChainMarket, + requestId: RequestId): Future[?RequestState] {.async.} = + try: + return some await market.contract.state(requestId) + except ProviderError as e: + if e.revertReason.contains("Unknown request"): + return none RequestState + raise e + +method getRequestEnd*(market: OnChainMarket, + id: RequestId): Future[SecondsSince1970] {.async.} = + return await market.contract.requestEnd(id) + method getHost(market: OnChainMarket, requestId: RequestId, slotIndex: UInt256): Future[?Address] {.async.} = @@ -104,5 +115,15 @@ method subscribeRequestCancelled*(market: OnChainMarket, let subscription = await market.contract.subscribe(RequestCancelled, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeRequestFailed*(market: OnChainMarket, + requestId: RequestId, + callback: OnRequestFailed): + Future[MarketSubscription] {.async.} = + proc onEvent(event: RequestFailed) {.upraises:[].} = + if event.requestId == requestId: + callback(event.requestId) + let subscription = await market.contract.subscribe(RequestFailed, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = await subscription.eventSubscription.unsubscribe() diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index 10612765..15d7b5c4 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -33,6 +33,12 @@ type SlotId* = distinct array[32, byte] RequestId* = distinct array[32, byte] Nonce* = distinct array[32, byte] + RequestState* {.pure.} = enum + New + Started + Cancelled + Finished + Failed proc `==`*(x, y: Nonce): bool {.borrow.} proc `==`*(x, y: RequestId): bool {.borrow.} @@ -42,6 +48,9 @@ proc hash*(x: SlotId): Hash {.borrow.} func toArray*(id: RequestId | SlotId | Nonce): array[32, byte] = array[32, byte](id) +proc `$`*(id: RequestId | SlotId | Nonce): string = + id.toArray.toHex + func fromTuple(_: type StorageRequest, tupl: tuple): StorageRequest = StorageRequest( client: tupl[0], diff --git a/codex/contracts/storage.nim b/codex/contracts/storage.nim index 2c1c0028..b83a4988 100644 --- a/codex/contracts/storage.nim +++ b/codex/contracts/storage.nim @@ -2,6 +2,7 @@ import pkg/ethers import pkg/json_rpc/rpcclient import pkg/stint import pkg/chronos +import ../clock import ./requests export stint @@ -20,6 +21,8 @@ type requestId* {.indexed.}: RequestId RequestCancelled* = object of Event requestId* {.indexed.}: RequestId + RequestFailed* = object of Event + requestId* {.indexed.}: RequestId ProofSubmitted* = object of Event id*: SlotId proof*: seq[byte] @@ -41,6 +44,10 @@ proc payoutSlot*(storage: Storage, requestId: RequestId, slotIndex: UInt256) {.c proc getRequest*(storage: Storage, id: RequestId): StorageRequest {.contract, view.} proc getHost*(storage: Storage, id: SlotId): Address {.contract, view.} +proc myRequests*(storage: Storage): seq[RequestId] {.contract, view.} +proc state*(storage: Storage, requestId: RequestId): RequestState {.contract, view.} +proc requestEnd*(storage: Storage, requestId: RequestId): SecondsSince1970 {.contract, view.} + proc proofPeriod*(storage: Storage): UInt256 {.contract, view.} proc proofTimeout*(storage: Storage): UInt256 {.contract, view.} diff --git a/codex/market.nim b/codex/market.nim index ff185f62..c3cd039a 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -2,10 +2,12 @@ import pkg/chronos import pkg/upraises import pkg/questionable import ./contracts/requests +import ./clock export chronos export questionable export requests +export SecondsSince1970 type Market* = ref object of RootObj @@ -14,13 +16,16 @@ type OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].} OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].} OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].} + OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].} method getSigner*(market: Market): Future[Address] {.base, async.} = raiseAssert("not implemented") method requestStorage*(market: Market, - request: StorageRequest): - Future[StorageRequest] {.base, async.} = + request: StorageRequest) {.base, async.} = + raiseAssert("not implemented") + +method myRequests*(market: Market): Future[seq[RequestId]] {.base, async.} = raiseAssert("not implemented") method getRequest*(market: Market, @@ -28,6 +33,14 @@ method getRequest*(market: Market, Future[?StorageRequest] {.base, async.} = raiseAssert("not implemented") +method getState*(market: Market, + requestId: RequestId): Future[?RequestState] {.base, async.} = + raiseAssert("not implemented") + +method getRequestEnd*(market: Market, + id: RequestId): Future[SecondsSince1970] {.base, async.} = + raiseAssert("not implemented") + method getHost*(market: Market, requestId: RequestId, slotIndex: UInt256): Future[?Address] {.base, async.} = @@ -62,9 +75,15 @@ method subscribeSlotFilled*(market: Market, raiseAssert("not implemented") method subscribeRequestCancelled*(market: Market, - requestId: RequestId, - callback: OnRequestCancelled): - Future[Subscription] {.base, async.} = + requestId: RequestId, + callback: OnRequestCancelled): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + +method subscribeRequestFailed*(market: Market, + requestId: RequestId, + callback: OnRequestFailed): + Future[Subscription] {.base, async.} = raiseAssert("not implemented") method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = diff --git a/codex/node.nim b/codex/node.nim index 837bdaf7..b850d029 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -288,7 +288,7 @@ proc requestStorage*(self: CodexNodeRef, expiry: expiry |? 0.u256 ) - let purchase = contracts.purchasing.purchase(request) + let purchase = await contracts.purchasing.purchase(request) return success purchase.id proc new*( diff --git a/codex/purchasing.nim b/codex/purchasing.nim index 7500b711..656f1d23 100644 --- a/codex/purchasing.nim +++ b/codex/purchasing.nim @@ -8,6 +8,7 @@ import ./clock import ./purchasing/purchase export questionable +export chronos export market export purchase @@ -31,7 +32,22 @@ proc new*(_: type Purchasing, market: Market, clock: Clock): Purchasing = requestExpiryInterval: DefaultRequestExpiryInterval, ) -proc populate*(purchasing: Purchasing, request: StorageRequest): StorageRequest = +proc load*(purchasing: Purchasing) {.async.} = + let market = purchasing.market + let requestIds = await market.myRequests() + for requestId in requestIds: + let purchase = Purchase.new(requestId, purchasing.market, purchasing.clock) + purchase.load() + purchasing.purchases[purchase.id] = purchase + +proc start*(purchasing: Purchasing) {.async.} = + await purchasing.load() + +proc stop*(purchasing: Purchasing) {.async.} = + discard + +proc populate*(purchasing: Purchasing, + request: StorageRequest): Future[StorageRequest] {.async.} = result = request if result.ask.proofProbability == 0.u256: result.ask.proofProbability = purchasing.proofProbability @@ -41,13 +57,15 @@ proc populate*(purchasing: Purchasing, request: StorageRequest): StorageRequest var id = result.nonce.toArray doAssert randomBytes(id) == 32 result.nonce = Nonce(id) + result.client = await purchasing.market.getSigner() -proc purchase*(purchasing: Purchasing, request: StorageRequest): Purchase = - let request = purchasing.populate(request) - let purchase = newPurchase(request, purchasing.market, purchasing.clock) +proc purchase*(purchasing: Purchasing, + request: StorageRequest): Future[Purchase] {.async.} = + let request = await purchasing.populate(request) + let purchase = Purchase.new(request, purchasing.market, purchasing.clock) purchase.start() purchasing.purchases[purchase.id] = purchase - purchase + return purchase func getPurchase*(purchasing: Purchasing, id: PurchaseId): ?Purchase = if purchasing.purchases.hasKey(id): diff --git a/codex/purchasing/purchase.nim b/codex/purchasing/purchase.nim index 1aa8709e..2301beff 100644 --- a/codex/purchasing/purchase.nim +++ b/codex/purchasing/purchase.nim @@ -1,37 +1,59 @@ import ./statemachine import ./states/pending +import ./states/unknown import ./purchaseid -# Purchase is implemented as a state machine: +# Purchase is implemented as a state machine. # -# pending ----> submitted ----------> started -# \ \ \ -# \ \ -----------> cancelled -# \ \ \ -# --------------------------------------> error +# It can either be a new (pending) purchase that still needs to be submitted +# on-chain, or it is a purchase that was previously submitted on-chain, and +# we're just restoring its (unknown) state after a node restart. # +# | +# v +# ------------------------- unknown +# | / / +# v v / +# pending ----> submitted ----> started ---------> finished <----/ +# \ \ / +# \ ------------> failed <----/ +# \ / +# --> cancelled <----------------------- export Purchase export purchaseid +export statemachine -func newPurchase*(request: StorageRequest, - market: Market, - clock: Clock): Purchase = +func new*(_: type Purchase, + requestId: RequestId, + market: Market, + clock: Clock): Purchase = Purchase( future: Future[void].new(), - request: request, + requestId: requestId, market: market, clock: clock ) +func new*(_: type Purchase, + request: StorageRequest, + market: Market, + clock: Clock): Purchase = + let purchase = Purchase.new(request.id, market, clock) + purchase.request = some request + return purchase + proc start*(purchase: Purchase) = purchase.switch(PurchasePending()) +proc load*(purchase: Purchase) = + purchase.switch(PurchaseUnknown()) + proc wait*(purchase: Purchase) {.async.} = await purchase.future func id*(purchase: Purchase): PurchaseId = - PurchaseId(purchase.request.id) + PurchaseId(purchase.requestId) func finished*(purchase: Purchase): bool = purchase.future.finished diff --git a/codex/purchasing/statemachine.nim b/codex/purchasing/statemachine.nim index 5af282fb..aab01026 100644 --- a/codex/purchasing/statemachine.nim +++ b/codex/purchasing/statemachine.nim @@ -1,6 +1,7 @@ import ../utils/statemachine import ../market import ../clock +import ../errors export market export clock @@ -11,5 +12,10 @@ type future*: Future[void] market*: Market clock*: Clock - request*: StorageRequest + requestId*: RequestId + request*: ?StorageRequest PurchaseState* = ref object of AsyncState + PurchaseError* = object of CodexError + +method description*(state: PurchaseState): string {.base.} = + raiseAssert "description not implemented for state" diff --git a/codex/purchasing/states/cancelled.nim b/codex/purchasing/states/cancelled.nim index 6d9a1581..93798adb 100644 --- a/codex/purchasing/states/cancelled.nim +++ b/codex/purchasing/states/cancelled.nim @@ -8,10 +8,13 @@ method enterAsync*(state: PurchaseCancelled) {.async.} = raiseAssert "invalid state" try: - await purchase.market.withdrawFunds(purchase.request.id) + await purchase.market.withdrawFunds(purchase.requestId) except CatchableError as error: - state.switch(PurchaseError(error: error)) + state.switch(PurchaseErrored(error: error)) return let error = newException(Timeout, "Purchase cancelled due to timeout") - state.switch(PurchaseError(error: error)) + state.switch(PurchaseErrored(error: error)) + +method description*(state: PurchaseCancelled): string = + "cancelled" diff --git a/codex/purchasing/states/error.nim b/codex/purchasing/states/error.nim index 7f67c6b1..edddc192 100644 --- a/codex/purchasing/states/error.nim +++ b/codex/purchasing/states/error.nim @@ -1,10 +1,13 @@ import ../statemachine -type PurchaseError* = ref object of PurchaseState +type PurchaseErrored* = ref object of PurchaseState error*: ref CatchableError -method enter*(state: PurchaseError) = +method enter*(state: PurchaseErrored) = without purchase =? (state.context as Purchase): raiseAssert "invalid state" purchase.future.fail(state.error) + +method description*(state: PurchaseErrored): string = + "errored" diff --git a/codex/purchasing/states/failed.nim b/codex/purchasing/states/failed.nim new file mode 100644 index 00000000..7f73104f --- /dev/null +++ b/codex/purchasing/states/failed.nim @@ -0,0 +1,12 @@ +import ../statemachine +import ./error + +type + PurchaseFailed* = ref object of PurchaseState + +method enter*(state: PurchaseFailed) = + let error = newException(PurchaseError, "Purchase failed") + state.switch(PurchaseErrored(error: error)) + +method description*(state: PurchaseFailed): string = + "failed" diff --git a/codex/purchasing/states/finished.nim b/codex/purchasing/states/finished.nim new file mode 100644 index 00000000..ce933207 --- /dev/null +++ b/codex/purchasing/states/finished.nim @@ -0,0 +1,12 @@ +import ../statemachine + +type PurchaseFinished* = ref object of PurchaseState + +method enter*(state: PurchaseFinished) = + without purchase =? (state.context as Purchase): + raiseAssert "invalid state" + + purchase.future.complete() + +method description*(state: PurchaseFinished): string = + "finished" diff --git a/codex/purchasing/states/pending.nim b/codex/purchasing/states/pending.nim index a66e4fc4..8ade593c 100644 --- a/codex/purchasing/states/pending.nim +++ b/codex/purchasing/states/pending.nim @@ -5,13 +5,17 @@ import ./error type PurchasePending* = ref object of PurchaseState method enterAsync(state: PurchasePending) {.async.} = - without purchase =? (state.context as Purchase): + without purchase =? (state.context as Purchase) and + request =? purchase.request: raiseAssert "invalid state" try: - purchase.request = await purchase.market.requestStorage(purchase.request) + await purchase.market.requestStorage(request) except CatchableError as error: - state.switch(PurchaseError(error: error)) + state.switch(PurchaseErrored(error: error)) return state.switch(PurchaseSubmitted()) + +method description*(state: PurchasePending): string = + "pending" diff --git a/codex/purchasing/states/started.nim b/codex/purchasing/states/started.nim index 7e1d0b72..6d134c5e 100644 --- a/codex/purchasing/states/started.nim +++ b/codex/purchasing/states/started.nim @@ -1,9 +1,32 @@ import ../statemachine +import ./error +import ./finished +import ./failed type PurchaseStarted* = ref object of PurchaseState -method enter*(state: PurchaseStarted) = +method enterAsync*(state: PurchaseStarted) {.async.} = without purchase =? (state.context as Purchase): raiseAssert "invalid state" - purchase.future.complete() + let clock = purchase.clock + let market = purchase.market + + let failed = newFuture[void]() + proc callback(_: RequestId) = + failed.complete() + let subscription = await market.subscribeRequestFailed(purchase.requestId, callback) + + let ended = clock.waitUntil(await market.getRequestEnd(purchase.requestId)) + try: + let fut = await one(ended, failed) + if fut.id == failed.id: + state.switch(PurchaseFailed()) + else: + state.switch(PurchaseFinished()) + await subscription.unsubscribe() + except CatchableError as error: + state.switch(PurchaseErrored(error: error)) + +method description*(state: PurchaseStarted): string = + "started" diff --git a/codex/purchasing/states/submitted.nim b/codex/purchasing/states/submitted.nim index 76fc69b0..9d5c8589 100644 --- a/codex/purchasing/states/submitted.nim +++ b/codex/purchasing/states/submitted.nim @@ -6,7 +6,8 @@ import ./cancelled type PurchaseSubmitted* = ref object of PurchaseState method enterAsync(state: PurchaseSubmitted) {.async.} = - without purchase =? (state.context as Purchase): + without purchase =? (state.context as Purchase) and + request =? purchase.request: raiseAssert "invalid state" let market = purchase.market @@ -16,13 +17,12 @@ method enterAsync(state: PurchaseSubmitted) {.async.} = let done = newFuture[void]() proc callback(_: RequestId) = done.complete() - let request = purchase.request let subscription = await market.subscribeFulfillment(request.id, callback) await done await subscription.unsubscribe() proc withTimeout(future: Future[void]) {.async.} = - let expiry = purchase.request.expiry.truncate(int64) + let expiry = request.expiry.truncate(int64) await future.withTimeout(clock, expiry) try: @@ -31,7 +31,10 @@ method enterAsync(state: PurchaseSubmitted) {.async.} = state.switch(PurchaseCancelled()) return except CatchableError as error: - state.switch(PurchaseError(error: error)) + state.switch(PurchaseErrored(error: error)) return state.switch(PurchaseStarted()) + +method description*(state: PurchaseSubmitted): string = + "submitted" diff --git a/codex/purchasing/states/unknown.nim b/codex/purchasing/states/unknown.nim new file mode 100644 index 00000000..0102fa43 --- /dev/null +++ b/codex/purchasing/states/unknown.nim @@ -0,0 +1,37 @@ +import ../statemachine +import ./submitted +import ./started +import ./cancelled +import ./finished +import ./failed +import ./error + +type PurchaseUnknown* = ref object of PurchaseState + +method enterAsync(state: PurchaseUnknown) {.async.} = + without purchase =? (state.context as Purchase): + raiseAssert "invalid state" + + try: + if (request =? await purchase.market.getRequest(purchase.requestId)) and + (requestState =? await purchase.market.getState(purchase.requestId)): + + purchase.request = some request + + case requestState + of RequestState.New: + state.switch(PurchaseSubmitted()) + of RequestState.Started: + state.switch(PurchaseStarted()) + of RequestState.Cancelled: + state.switch(PurchaseCancelled()) + of RequestState.Finished: + state.switch(PurchaseFinished()) + of RequestState.Failed: + state.switch(PurchaseFailed()) + + except CatchableError as error: + state.switch(PurchaseErrored(error: error)) + +method description*(state: PurchaseUnknown): string = + "unknown" diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 16f88e3b..2add0e94 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -50,7 +50,7 @@ func `%`*(id: RequestId | SlotId | Nonce): JsonNode = func `%`*(purchase: Purchase): JsonNode = %*{ - "finished": purchase.finished, + "state": (purchase.state as PurchaseState).?description |? "none", "error": purchase.error.?msg, "request": purchase.request, } diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index cad26691..b8a11f6f 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -1,13 +1,20 @@ import std/sequtils +import std/tables +import std/hashes import pkg/codex/market export market +export tables type MockMarket* = ref object of Market + activeRequests*: Table[Address, seq[RequestId]] requested*: seq[StorageRequest] + requestEnds*: Table[RequestId, SecondsSince1970] + state*: Table[RequestId, RequestState] fulfilled*: seq[Fulfillment] filled*: seq[Slot] + withdrawn*: seq[RequestId] signer: Address subscriptions: Subscriptions Fulfillment* = object @@ -24,6 +31,7 @@ type onFulfillment: seq[FulfillmentSubscription] onSlotFilled: seq[SlotFilledSubscription] onRequestCancelled: seq[RequestCancelledSubscription] + onRequestFailed: seq[RequestFailedSubscription] RequestSubscription* = ref object of Subscription market: MockMarket callback: OnRequest @@ -40,6 +48,16 @@ type market: MockMarket requestId: RequestId callback: OnRequestCancelled + RequestFailedSubscription* = ref object of Subscription + market: MockMarket + requestId: RequestId + callback: OnRequestCancelled + +proc hash*(address: Address): Hash = + hash(address.toArray) + +proc hash*(requestId: RequestId): Hash = + hash(requestId.toArray) proc new*(_: type MockMarket): MockMarket = MockMarket(signer: Address.example) @@ -47,14 +65,14 @@ proc new*(_: type MockMarket): MockMarket = method getSigner*(market: MockMarket): Future[Address] {.async.} = return market.signer -method requestStorage*(market: MockMarket, - request: StorageRequest): - Future[StorageRequest] {.async.} = +method requestStorage*(market: MockMarket, request: StorageRequest) {.async.} = market.requested.add(request) var subscriptions = market.subscriptions.onRequest for subscription in subscriptions: subscription.callback(request.id, request.ask) - return request + +method myRequests*(market: MockMarket): Future[seq[RequestId]] {.async.} = + return market.activeRequests[market.signer] method getRequest(market: MockMarket, id: RequestId): Future[?StorageRequest] {.async.} = @@ -63,6 +81,14 @@ method getRequest(market: MockMarket, return some request return none StorageRequest +method getState*(market: MockMarket, + requestId: RequestId): Future[?RequestState] {.async.} = + return market.state.?[requestId] + +method getRequestEnd*(market: MockMarket, + id: RequestId): Future[SecondsSince1970] {.async.} = + return market.requestEnds[id] + method getHost(market: MockMarket, requestId: RequestId, slotIndex: UInt256): Future[?Address] {.async.} = @@ -93,6 +119,12 @@ proc emitRequestFulfilled*(market: MockMarket, requestId: RequestId) = if subscription.requestId == requestId: subscription.callback(requestId) +proc emitRequestFailed*(market: MockMarket, requestId: RequestId) = + var subscriptions = market.subscriptions.onRequestFailed + for subscription in subscriptions: + if subscription.requestId == requestId: + subscription.callback(requestId) + proc fillSlot*(market: MockMarket, requestId: RequestId, slotIndex: UInt256, @@ -115,6 +147,7 @@ method fillSlot*(market: MockMarket, method withdrawFunds*(market: MockMarket, requestId: RequestId) {.async.} = + market.withdrawn.add(requestId) market.emitRequestCancelled(requestId) method subscribeRequests*(market: MockMarket, @@ -165,6 +198,18 @@ method subscribeRequestCancelled*(market: MockMarket, market.subscriptions.onRequestCancelled.add(subscription) return subscription +method subscribeRequestFailed*(market: MockMarket, + requestId: RequestId, + callback: OnRequestFailed): + Future[Subscription] {.async.} = + let subscription = RequestFailedSubscription( + market: market, + requestId: requestId, + callback: callback + ) + market.subscriptions.onRequestFailed.add(subscription) + return subscription + method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) @@ -176,3 +221,6 @@ method unsubscribe*(subscription: SlotFilledSubscription) {.async.} = method unsubscribe*(subscription: RequestCancelledSubscription) {.async.} = subscription.market.subscriptions.onRequestCancelled.keepItIf(it != subscription) + +method unsubscribe*(subscription: RequestFailedSubscription) {.async.} = + subscription.market.subscriptions.onRequestFailed.keepItIf(it != subscription) diff --git a/tests/codex/testproving.nim b/tests/codex/testproving.nim index cea76107..bf810391 100644 --- a/tests/codex/testproving.nim +++ b/tests/codex/testproving.nim @@ -3,6 +3,7 @@ import pkg/chronos import pkg/codex/proving import ./helpers/mockproofs import ./helpers/mockclock +import ./helpers/eventually import ./examples suite "Proving": @@ -23,7 +24,6 @@ suite "Proving": proc advanceToNextPeriod(proofs: MockProofs) {.async.} = let periodicity = await proofs.periodicity() clock.advance(periodicity.seconds.truncate(int64)) - await sleepAsync(2.seconds) test "maintains a list of contract ids to watch": let id1, id2 = SlotId.example @@ -49,7 +49,7 @@ suite "Proving": proving.onProofRequired = onProofRequired proofs.setProofRequired(id, true) await proofs.advanceToNextPeriod() - check called + check eventually called test "callback receives id of contract for which proof is required": let id1, id2 = SlotId.example @@ -61,11 +61,11 @@ suite "Proving": proving.onProofRequired = onProofRequired proofs.setProofRequired(id1, true) await proofs.advanceToNextPeriod() - check callbackIds == @[id1] + check eventually callbackIds == @[id1] proofs.setProofRequired(id1, false) proofs.setProofRequired(id2, true) await proofs.advanceToNextPeriod() - check callbackIds == @[id1, id2] + check eventually callbackIds == @[id1, id2] test "invokes callback when proof is about to be required": let id = SlotId.example @@ -77,7 +77,7 @@ suite "Proving": proofs.setProofRequired(id, false) proofs.setProofToBeRequired(id, true) await proofs.advanceToNextPeriod() - check called + check eventually called test "stops watching when contract has ended": let id = SlotId.example @@ -90,17 +90,17 @@ suite "Proving": proving.onProofRequired = onProofRequired proofs.setProofRequired(id, true) await proofs.advanceToNextPeriod() - check not proving.slots.contains(id) + check eventually (not proving.slots.contains(id)) check not called test "submits proofs": let id = SlotId.example - let proof = seq[byte].example + let proof = exampleProof() await proving.submitProof(id, proof) test "supports proof submission subscriptions": let id = SlotId.example - let proof = seq[byte].example + let proof = exampleProof() var receivedIds: seq[SlotId] var receivedProofs: seq[seq[byte]] diff --git a/tests/codex/testpurchasing.nim b/tests/codex/testpurchasing.nim index b2efd537..0c525ccf 100644 --- a/tests/codex/testpurchasing.nim +++ b/tests/codex/testpurchasing.nim @@ -4,8 +4,10 @@ import pkg/chronos import pkg/upraises import pkg/stint import pkg/codex/purchasing +import pkg/codex/purchasing/states/[finished, failed, error, started, submitted, unknown] import ./helpers/mockmarket import ./helpers/mockclock +import ./helpers/eventually import ./examples suite "Purchasing": @@ -29,7 +31,7 @@ suite "Purchasing": ) test "submits a storage request when asked": - discard purchasing.purchase(request) + discard await purchasing.purchase(request) let submitted = market.requested[0] check submitted.ask.slots == request.ask.slots check submitted.ask.slotSize == request.ask.slotSize @@ -37,8 +39,8 @@ suite "Purchasing": check submitted.ask.reward == request.ask.reward test "remembers purchases": - let purchase1 = purchasing.purchase(request) - let purchase2 = purchasing.purchase(request) + let purchase1 = await purchasing.purchase(request) + let purchase2 = await purchasing.purchase(request) check purchasing.getPurchase(purchase1.id) == some purchase1 check purchasing.getPurchase(purchase2.id) == some purchase2 @@ -47,12 +49,12 @@ suite "Purchasing": test "can change default value for proof probability": purchasing.proofProbability = 42.u256 - discard purchasing.purchase(request) + discard await purchasing.purchase(request) check market.requested[0].ask.proofProbability == 42.u256 test "can override proof probability per request": request.ask.proofProbability = 42.u256 - discard purchasing.purchase(request) + discard await purchasing.purchase(request) check market.requested[0].ask.proofProbability == 42.u256 test "has a default value for request expiration interval": @@ -61,53 +63,178 @@ suite "Purchasing": test "can change default value for request expiration interval": purchasing.requestExpiryInterval = 42.u256 let start = getTime().toUnix() - discard purchasing.purchase(request) + discard await purchasing.purchase(request) check market.requested[0].expiry == (start + 42).u256 test "can override expiry time per request": let expiry = (getTime().toUnix() + 42).u256 request.expiry = expiry - discard purchasing.purchase(request) + discard await purchasing.purchase(request) check market.requested[0].expiry == expiry test "includes a random nonce in every storage request": - discard purchasing.purchase(request) - discard purchasing.purchase(request) + discard await purchasing.purchase(request) + discard await purchasing.purchase(request) check market.requested[0].nonce != market.requested[1].nonce - test "succeeds when request is fulfilled": - let purchase = purchasing.purchase(request) + test "sets client address in request": + discard await purchasing.purchase(request) + check market.requested[0].client == await market.getSigner() + + test "succeeds when request is finished": + let purchase = await purchasing.purchase(request) let request = market.requested[0] + let requestEnd = getTime().toUnix() + 42 + market.requestEnds[request.id] = requestEnd market.emitRequestFulfilled(request.id) + clock.set(requestEnd) await purchase.wait() check purchase.error.isNone test "fails when request times out": - let purchase = purchasing.purchase(request) + let purchase = await purchasing.purchase(request) let request = market.requested[0] clock.set(request.expiry.truncate(int64)) expect PurchaseTimeout: await purchase.wait() test "checks that funds were withdrawn when purchase times out": - let purchase = purchasing.purchase(request) + let purchase = await purchasing.purchase(request) let request = market.requested[0] - var receivedIds: seq[RequestId] clock.set(request.expiry.truncate(int64)) - - proc onRequestCancelled(id: RequestId) {.gcsafe, upraises:[].} = - receivedIds.add(id) - - # will only be fired when `withdrawFunds` is called on purchase timeout - let subscription = await market.subscribeRequestCancelled( - request.id, - onRequestCancelled) - var purchaseTimedOut = false - try: + expect PurchaseTimeout: await purchase.wait() - except PurchaseTimeout: - purchaseTimedOut = true + check market.withdrawn == @[request.id] - await subscription.unsubscribe() - check purchaseTimedOut - check receivedIds == @[request.id] +suite "Purchasing state machine": + + var purchasing: Purchasing + var market: MockMarket + var clock: MockClock + var request: StorageRequest + + setup: + market = MockMarket.new() + clock = MockClock.new() + purchasing = Purchasing.new(market, clock) + request = StorageRequest( + ask: StorageAsk( + slots: uint8.example.uint64, + slotSize: uint32.example.u256, + duration: uint16.example.u256, + reward: uint8.example.u256 + ) + ) + + test "loads active purchases from market": + let me = await market.getSigner() + let request1, request2, request3 = StorageRequest.example + market.requested = @[request1, request2, request3] + market.activeRequests[me] = @[request1.id, request2.id] + await purchasing.load() + check isSome purchasing.getPurchase(PurchaseId(request1.id)) + check isSome purchasing.getPurchase(PurchaseId(request2.id)) + check isNone purchasing.getPurchase(PurchaseId(request3.id)) + + test "loads correct purchase.future state for purchases from market": + let me = await market.getSigner() + let request1, request2, request3, request4, request5 = StorageRequest.example + market.requested = @[request1, request2, request3, request4, request5] + market.activeRequests[me] = @[request1.id, request2.id, request3.id, request4.id, request5.id] + market.state[request1.id] = RequestState.New + market.state[request2.id] = RequestState.Started + market.state[request3.id] = RequestState.Cancelled + market.state[request4.id] = RequestState.Finished + market.state[request5.id] = RequestState.Failed + + # ensure the started state doesn't error, giving a false positive test result + market.requestEnds[request2.id] = clock.now() - 1 + + await purchasing.load() + check purchasing.getPurchase(PurchaseId(request1.id)).?finished == false.some + check purchasing.getPurchase(PurchaseId(request2.id)).?finished == true.some + check purchasing.getPurchase(PurchaseId(request3.id)).?finished == true.some + check purchasing.getPurchase(PurchaseId(request4.id)).?finished == true.some + check purchasing.getPurchase(PurchaseId(request5.id)).?finished == true.some + check purchasing.getPurchase(PurchaseId(request5.id)).?error.isSome + + test "moves to PurchaseSubmitted when request state is New": + let request = StorageRequest.example + let purchase = Purchase.new(request, market, clock) + market.requested = @[request] + market.state[request.id] = RequestState.New + purchase.switch(PurchaseUnknown()) + check (purchase.state as PurchaseSubmitted).isSome + + test "moves to PurchaseStarted when request state is Started": + let request = StorageRequest.example + let purchase = Purchase.new(request, market, clock) + market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) + market.requested = @[request] + market.state[request.id] = RequestState.Started + purchase.switch(PurchaseUnknown()) + check (purchase.state as PurchaseStarted).isSome + + test "moves to PurchaseErrored when request state is Cancelled": + let request = StorageRequest.example + let purchase = Purchase.new(request, market, clock) + market.requested = @[request] + market.state[request.id] = RequestState.Cancelled + purchase.switch(PurchaseUnknown()) + check (purchase.state as PurchaseErrored).isSome + check purchase.error.?msg == "Purchase cancelled due to timeout".some + + test "moves to PurchaseFinished when request state is Finished": + let request = StorageRequest.example + let purchase = Purchase.new(request, market, clock) + market.requested = @[request] + market.state[request.id] = RequestState.Finished + purchase.switch(PurchaseUnknown()) + check (purchase.state as PurchaseFinished).isSome + + test "moves to PurchaseErrored when request state is Failed": + let request = StorageRequest.example + let purchase = Purchase.new(request, market, clock) + market.requested = @[request] + market.state[request.id] = RequestState.Failed + purchase.switch(PurchaseUnknown()) + check (purchase.state as PurchaseErrored).isSome + check purchase.error.?msg == "Purchase failed".some + + test "moves to PurchaseErrored state once RequestFailed emitted": + let me = await market.getSigner() + let request = StorageRequest.example + market.requested = @[request] + market.activeRequests[me] = @[request.id] + market.state[request.id] = RequestState.Started + market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) + await purchasing.load() + + # emit mock contract failure event + market.emitRequestFailed(request.id) + # must allow time for the callback to trigger the completion of the future + await sleepAsync(chronos.milliseconds(10)) + + # now check the result + let purchase = purchasing.getPurchase(PurchaseId(request.id)) + let state = purchase.?state + check (state as PurchaseErrored).isSome + check (!purchase).error.?msg == "Purchase failed".some + + test "moves to PurchaseFinished state once request finishes": + let me = await market.getSigner() + let request = StorageRequest.example + market.requested = @[request] + market.activeRequests[me] = @[request.id] + market.state[request.id] = RequestState.Started + market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) + await purchasing.load() + + # advance the clock to the end of the request + clock.advance(request.ask.duration.truncate(int64)) + + # now check the result + proc getState: ?PurchaseState = + purchasing.getPurchase(PurchaseId(request.id)).?state as PurchaseState + + check eventually (getState() as PurchaseFinished).isSome diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index 81fbd21b..073e07c2 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -6,6 +6,7 @@ import pkg/codex/proving import pkg/codex/sales import ./helpers/mockmarket import ./helpers/mockclock +import ./helpers/eventually import ./examples suite "Sales": @@ -26,7 +27,7 @@ suite "Sales": cid: "some cid" ) ) - let proof = seq[byte].example + let proof = exampleProof() var sales: Sales var market: MockMarket @@ -75,21 +76,21 @@ suite "Sales": test "makes storage unavailable when matching request comes in": sales.add(availability) - discard await market.requestStorage(request) + await market.requestStorage(request) check sales.available.len == 0 test "ignores request when no matching storage is available": sales.add(availability) var tooBig = request tooBig.ask.slotSize = request.ask.slotSize + 1 - discard await market.requestStorage(tooBig) + await market.requestStorage(tooBig) check sales.available == @[availability] test "ignores request when reward is too low": sales.add(availability) var tooCheap = request tooCheap.ask.reward = request.ask.reward - 1 - discard await market.requestStorage(tooCheap) + await market.requestStorage(tooCheap) check sales.available == @[availability] test "retrieves and stores data locally": @@ -103,8 +104,8 @@ suite "Sales": storingSlot = slot storingAvailability = availability sales.add(availability) - let requested = await market.requestStorage(request) - check storingRequest == requested + await market.requestStorage(request) + check storingRequest == request check storingSlot < request.ask.slots.u256 check storingAvailability == availability @@ -115,7 +116,7 @@ suite "Sales": availability: Availability) {.async.} = raise error sales.add(availability) - discard await market.requestStorage(request) + await market.requestStorage(request) check sales.available == @[availability] test "generates proof of storage": @@ -126,13 +127,13 @@ suite "Sales": provingRequest = request provingSlot = slot sales.add(availability) - let requested = await market.requestStorage(request) - check provingRequest == requested + await market.requestStorage(request) + check provingRequest == request check provingSlot < request.ask.slots.u256 test "fills a slot": sales.add(availability) - discard await market.requestStorage(request) + await market.requestStorage(request) check market.filled.len == 1 check market.filled[0].requestId == request.id check market.filled[0].slotIndex < request.ask.slots.u256 @@ -150,7 +151,7 @@ suite "Sales": soldRequest = request soldSlotIndex = slotIndex sales.add(availability) - discard await market.requestStorage(request) + await market.requestStorage(request) check soldAvailability == availability check soldRequest == request check soldSlotIndex < request.ask.slots.u256 @@ -171,7 +172,7 @@ suite "Sales": clearedRequest = request clearedSlotIndex = slotIndex sales.add(availability) - discard await market.requestStorage(request) + await market.requestStorage(request) check clearedAvailability == availability check clearedRequest == request check clearedSlotIndex < request.ask.slots.u256 @@ -183,7 +184,7 @@ suite "Sales": availability: Availability) {.async.} = await sleepAsync(1.hours) sales.add(availability) - discard await market.requestStorage(request) + await market.requestStorage(request) for slotIndex in 0..