From 25f68c1e4c3d5727d2c3f7db316cc9eaec2db247 Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Thu, 9 Mar 2023 00:34:26 +1100 Subject: [PATCH] [marketplace] Load sales state from chain (#306) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [marketplace] get active slots from chain # Conflicts: # codex/contracts/market.nim * [marketplace] make on chain event callbacks async # Conflicts: # tests/codex/helpers/mockmarket.nim * [marketplace] make availability optional for node restart # Conflicts: # tests/codex/testsales.nim * [marketplace] add async state machine Allows for `enterAsync` to be cancelled. * [marketplace] move sale process to async state machine * [marketplace] sales state machine tests * bump dagger-contracts * [marketplace] fix ci issue with chronicles output * PR comments - add slotIndex to `SalesAgent` constructor - remove `SalesAgent.init` - rename `SalesAgent.init` to `start` and `SalesAgent.deinit` to `stop`. - rename `SalesAgent. populateRequest` to `SalesAgent.retreiveRequest`. - move availability removal to the downloading state. once availability is persisted to disk, it should survive node restarts. - * [marketplace] handle slot filled by other host Handle the case when in the downloading, proving, or filling states, that another host fills the slot. * [marketplace] use requestId for mySlots * [marketplace] infer slot index from slotid prevents reassigning a random slot index when restoring state from chain * [marketplace] update to work with latest contracts * [marketplace] clean up * [marketplace] align with contract changes - getState / state > requestState - getSlot > getRequestFromSlotId - support MarketplaceConfig - support slotState, remove unneeded Slot type - collateral > config.collateral.initialAmount - remove proofPeriod contract call - Revert reason “Slot empty” > “Slot is free” - getProofEnd > read SlotState Tests for changes * [marketplace] add missing file * [marketplace] bump codex-contracts-eth * [config] remove unused imports * [sales] cleanup * [sales] fix: do not crash when fetching state fails * [sales] make slotIndex non-optional * Rebase and update NBS commit Rebase on top of main and update NBS commit to the CI fix. * [marketplace] use async subscription event handlers * [marketplace] support slotIndex no longer optional Previously, SalesAgent.slotIndex had been moved to not optional. However, there were still many places where optionality was assumed. This commit removes those assumuptions. * [marketplace] sales state machine: use slotState Use `slotState` instead of `requestState` for sales state machine. * [marketplace] clean up * [statemachine] adds a statemachine for async workflows Allows events to be scheduled synchronously. See https://github.com/status-im/nim-codex/pull/344 Co-Authored-By: Ben Bierens Co-Authored-By: Eric Mastro * [market] make market callbacks synchronous * [statemachine] export Event * [statemachine] ensure that no errors are raised * [statemachine] add machine parameter to run method * [statemachine] initialize queue on start * [statemachine] check futures before cancelling them * [sales] use new async state machine - states use new run() method and event mechanism - StartState starts subscriptions and loads request * [statemachine] fix unsusbscribe before subscribe * [sales] replace old state transition tests * [sales] separate state machine from sales data * [sales] remove reference from SalesData to Sales * [sales] separate sales context from sales * [sales] move decoupled types into their own modules * [sales] move retrieveRequest to SalesData * [sales] move subscription logic into SalesAgent * [sales] unsubscribe when finished or errored * [build] revert back to released version of nim-ethers * [sales] remove SaleStart state * [sales] add missing base method * [sales] move asyncSpawn helper to utils * [sales] fix imports * [sales] remove unused variables * [sales statemachine] add async state machine error handling (#349) * [statemachine] add error handling to asyncstatemachine - add error handling to catch errors during state.run - Sales: add ErrorState to identify which state to transition to during an error. This had to be added to SalesAgent constructor due to circular dependency issues, otherwise it would have been added directly to SalesAgent. - Sales: when an error during run is encountered, the SaleErrorState is constructed with the error, and by default (base impl) will return the error state, so the machine can transition to it. This can be overridden by individual states if needed. * [sales] rename onSaleFailed to onSaleErrored Because there is already a state named SaleFailed which is meant to react to an onchain RequestFailed event and also because this callback is called from SaleErrored, renaming to onSaleErrored prevents ambiguity and confusion as to what has happened at the callback callsite. * [statemachine] forward error to state directly without going through a machine method first * [statemachine] remove unnecessary error handling AsyncQueueFullError is already handled in schedule() * [statemachine] test that cancellation ignores onError * [sales] simplify error handling in states Rely on the state machine error handling instead of catching errors in the state run method --------- Co-authored-by: Mark Spanbroek * [statemachine] prevent memory leaks prevent memory leaks and nil access defects by: - allowing multiple subscribe/unsubscribes of salesagent - disallowing individual salesagent subscription calls to be made externally (requires the .subscribed check) - allowing mutiple start/stops of asyncstatemachine - disregard asyncstatemachine schedules if machine not yet started * [salesagent] add salesagent-specific tests 1. test multiple subscribe/unsubscribes 2. test scheduling machine without being started 3. test subscriptions are working correctly with external events 4. test errors can be overridden at the state level for ErrorHandlingStates. --------- Co-authored-by: Eric Mastro Co-authored-by: Mark Spanbroek Co-authored-by: Ben Bierens --- codex/contracts/Readme.md | 3 +- codex/contracts/config.nim | 71 +++++ codex/contracts/market.nim | 22 +- codex/contracts/marketplace.nim | 10 +- codex/contracts/proofs.nim | 18 +- codex/contracts/requests.nim | 6 + codex/market.nim | 13 +- codex/node.nim | 4 +- codex/proving.nim | 3 +- codex/purchasing/states/started.nim | 2 + codex/purchasing/states/unknown.nim | 2 +- codex/sales.nim | 267 +++++++---------- codex/sales/availability.nim | 8 + codex/sales/salesagent.nim | 123 ++++++++ codex/sales/salescontext.nim | 28 ++ codex/sales/salesdata.nim | 16 + codex/sales/statemachine.nim | 42 +++ codex/sales/states/cancelled.nim | 14 + codex/sales/states/downloading.nim | 42 +++ codex/sales/states/errored.nim | 34 +++ codex/sales/states/errorhandling.nim | 9 + codex/sales/states/failed.nim | 13 + codex/sales/states/filled.nim | 32 ++ codex/sales/states/filling.nim | 30 ++ codex/sales/states/finished.nim | 37 +++ codex/sales/states/proving.nim | 35 +++ codex/sales/states/unknown.nim | 46 +++ codex/storageproofs/timing/proofs.nim | 5 +- codex/utils/asyncspawn.nim | 10 + codex/utils/asyncstatemachine.nim | 86 ++++++ codex/utils/statemachine.nim | 28 ++ tests/codex/helpers/mockmarket.nim | 54 +++- tests/codex/helpers/mockproofs.nim | 15 +- tests/codex/sales/states/testdownloading.nim | 29 ++ tests/codex/sales/states/testfilled.nim | 46 +++ tests/codex/sales/states/testfilling.nim | 29 ++ tests/codex/sales/states/testfinished.nim | 23 ++ tests/codex/sales/states/testproving.nim | 30 ++ tests/codex/sales/states/testunknown.nim | 61 ++++ tests/codex/sales/testsales.nim | 290 +++++++++++++++++++ tests/codex/sales/testsalesagent.nim | 165 +++++++++++ tests/codex/sales/teststates.nim | 8 + tests/codex/testproving.nim | 5 + tests/codex/testpurchasing.nim | 28 +- tests/codex/testsales.nim | 214 +------------- tests/codex/testutils.nim | 2 + tests/codex/utils/testasyncstatemachine.nim | 110 +++++++ tests/codex/utils/teststatemachineasync.nim | 30 ++ tests/contracts/testContracts.nim | 5 +- tests/contracts/testMarket.nim | 59 +++- tests/contracts/testProofs.nim | 11 +- tests/integration/tokens.nim | 3 +- vendor/codex-contracts-eth | 2 +- 53 files changed, 1830 insertions(+), 448 deletions(-) create mode 100644 codex/contracts/config.nim create mode 100644 codex/sales/availability.nim create mode 100644 codex/sales/salesagent.nim create mode 100644 codex/sales/salescontext.nim create mode 100644 codex/sales/salesdata.nim create mode 100644 codex/sales/statemachine.nim create mode 100644 codex/sales/states/cancelled.nim create mode 100644 codex/sales/states/downloading.nim create mode 100644 codex/sales/states/errored.nim create mode 100644 codex/sales/states/errorhandling.nim create mode 100644 codex/sales/states/failed.nim create mode 100644 codex/sales/states/filled.nim create mode 100644 codex/sales/states/filling.nim create mode 100644 codex/sales/states/finished.nim create mode 100644 codex/sales/states/proving.nim create mode 100644 codex/sales/states/unknown.nim create mode 100644 codex/utils/asyncspawn.nim create mode 100644 codex/utils/asyncstatemachine.nim create mode 100644 tests/codex/sales/states/testdownloading.nim create mode 100644 tests/codex/sales/states/testfilled.nim create mode 100644 tests/codex/sales/states/testfilling.nim create mode 100644 tests/codex/sales/states/testfinished.nim create mode 100644 tests/codex/sales/states/testproving.nim create mode 100644 tests/codex/sales/states/testunknown.nim create mode 100644 tests/codex/sales/testsales.nim create mode 100644 tests/codex/sales/testsalesagent.nim create mode 100644 tests/codex/sales/teststates.nim create mode 100644 tests/codex/utils/testasyncstatemachine.nim create mode 100644 tests/codex/utils/teststatemachineasync.nim diff --git a/codex/contracts/Readme.md b/codex/contracts/Readme.md index b2105075..a05edbd0 100644 --- a/codex/contracts/Readme.md +++ b/codex/contracts/Readme.md @@ -39,7 +39,8 @@ Hosts need to put up collateral before participating in storage contracts. A host can learn about the amount of collateral that is required: ```nim -let collateral = await marketplace.collateral() +let config = await marketplace.config() +let collateral = config.collateral.initialAmount ``` The host then needs to prepare a payment to the smart contract by calling the diff --git a/codex/contracts/config.nim b/codex/contracts/config.nim new file mode 100644 index 00000000..ffec1d95 --- /dev/null +++ b/codex/contracts/config.nim @@ -0,0 +1,71 @@ +import pkg/contractabi +import pkg/ethers/fields +import pkg/questionable/results + +export contractabi + +type + MarketplaceConfig* = object + collateral*: CollateralConfig + proofs*: ProofConfig + CollateralConfig* = object + initialAmount*: UInt256 # amount of collateral necessary to fill a slot + minimumAmount*: UInt256 # frees slot when collateral drops below this minimum + slashCriterion*: UInt256 # amount of proofs missed that lead to slashing + slashPercentage*: UInt256 # percentage of the collateral that is slashed + ProofConfig* = object + period*: UInt256 # proofs requirements are calculated per period (in seconds) + timeout*: UInt256 # mark proofs as missing before the timeout (in seconds) + downtime*: uint8 # ignore this much recent blocks for proof requirements + + +func fromTuple(_: type ProofConfig, tupl: tuple): ProofConfig = + ProofConfig( + period: tupl[0], + timeout: tupl[1], + downtime: tupl[2] + ) + +func fromTuple(_: type CollateralConfig, tupl: tuple): CollateralConfig = + CollateralConfig( + initialAmount: tupl[0], + minimumAmount: tupl[1], + slashCriterion: tupl[2], + slashPercentage: tupl[3] + ) + +func fromTuple(_: type MarketplaceConfig, tupl: tuple): MarketplaceConfig = + MarketplaceConfig( + collateral: tupl[0], + proofs: tupl[1] + ) + +func solidityType*(_: type ProofConfig): string = + solidityType(ProofConfig.fieldTypes) + +func solidityType*(_: type CollateralConfig): string = + solidityType(CollateralConfig.fieldTypes) + +func solidityType*(_: type MarketplaceConfig): string = + solidityType(CollateralConfig.fieldTypes) + +func encode*(encoder: var AbiEncoder, slot: ProofConfig) = + encoder.write(slot.fieldValues) + +func encode*(encoder: var AbiEncoder, slot: CollateralConfig) = + encoder.write(slot.fieldValues) + +func encode*(encoder: var AbiEncoder, slot: MarketplaceConfig) = + encoder.write(slot.fieldValues) + +func decode*(decoder: var AbiDecoder, T: type ProofConfig): ?!T = + let tupl = ?decoder.read(ProofConfig.fieldTypes) + success ProofConfig.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type CollateralConfig): ?!T = + let tupl = ?decoder.read(CollateralConfig.fieldTypes) + success CollateralConfig.fromTuple(tupl) + +func decode*(decoder: var AbiDecoder, T: type MarketplaceConfig): ?!T = + let tupl = ?decoder.read(MarketplaceConfig.fieldTypes) + success MarketplaceConfig.fromTuple(tupl) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 69dab2cf..acefe06c 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -31,6 +31,9 @@ method getSigner*(market: OnChainMarket): Future[Address] {.async.} = method myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} = return await market.contract.myRequests +method mySlots*(market: OnChainMarket): Future[seq[SlotId]] {.async.} = + return await market.contract.mySlots() + method requestStorage(market: OnChainMarket, request: StorageRequest){.async.} = await market.contract.requestStorage(request) @@ -43,15 +46,19 @@ method getRequest(market: OnChainMarket, return none StorageRequest raise e -method getState*(market: OnChainMarket, +method requestState*(market: OnChainMarket, requestId: RequestId): Future[?RequestState] {.async.} = try: - return some await market.contract.state(requestId) + return some await market.contract.requestState(requestId) except ProviderError as e: if e.revertReason.contains("Unknown request"): return none RequestState raise e +method slotState*(market: OnChainMarket, + slotId: SlotId): Future[SlotState] {.async.} = + return await market.contract.slotState(slotId) + method getRequestEnd*(market: OnChainMarket, id: RequestId): Future[SecondsSince1970] {.async.} = return await market.contract.requestEnd(id) @@ -66,6 +73,15 @@ method getHost(market: OnChainMarket, else: return none Address +method getRequestFromSlotId*(market: OnChainMarket, + slotId: SlotId): Future[?StorageRequest] {.async.} = + try: + return some await market.contract.getRequestFromSlotId(slotId) + except ProviderError as e: + if e.revertReason.contains("Slot is free"): + return none StorageRequest + raise e + method fillSlot(market: OnChainMarket, requestId: RequestId, slotIndex: UInt256, @@ -119,7 +135,7 @@ method subscribeRequestFailed*(market: OnChainMarket, requestId: RequestId, callback: OnRequestFailed): Future[MarketSubscription] {.async.} = - proc onEvent(event: RequestFailed) {.upraises:[].} = + proc onEvent(event: RequestFailed) {.upraises:[]} = if event.requestId == requestId: callback(event.requestId) let subscription = await market.contract.subscribe(RequestFailed, onEvent) diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index db00aeaa..b16e4206 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -4,9 +4,11 @@ import pkg/stint import pkg/chronos import ../clock import ./requests +import ./config export stint export ethers +export config type Marketplace* = ref object of Contract @@ -28,7 +30,7 @@ type proof*: seq[byte] -proc collateral*(marketplace: Marketplace): UInt256 {.contract, view.} +proc config*(marketplace: Marketplace): MarketplaceConfig {.contract, view.} proc slashMisses*(marketplace: Marketplace): UInt256 {.contract, view.} proc slashPercentage*(marketplace: Marketplace): UInt256 {.contract, view.} proc minCollateralThreshold*(marketplace: Marketplace): UInt256 {.contract, view.} @@ -43,12 +45,14 @@ proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.} proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.} proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.} proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.} +proc getRequestFromSlotId*(marketplace: Marketplace, id: SlotId): StorageRequest {.contract, view.} proc myRequests*(marketplace: Marketplace): seq[RequestId] {.contract, view.} -proc state*(marketplace: Marketplace, requestId: RequestId): RequestState {.contract, view.} +proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.} +proc requestState*(marketplace: Marketplace, requestId: RequestId): RequestState {.contract, view.} +proc slotState*(marketplace: Marketplace, slotId: SlotId): SlotState {.contract, view.} proc requestEnd*(marketplace: Marketplace, requestId: RequestId): SecondsSince1970 {.contract, view.} -proc proofPeriod*(marketplace: Marketplace): UInt256 {.contract, view.} proc proofTimeout*(marketplace: Marketplace): UInt256 {.contract, view.} proc proofEnd*(marketplace: Marketplace, id: SlotId): UInt256 {.contract, view.} diff --git a/codex/contracts/proofs.nim b/codex/contracts/proofs.nim index f5ce5c69..8e46b18d 100644 --- a/codex/contracts/proofs.nim +++ b/codex/contracts/proofs.nim @@ -21,7 +21,8 @@ proc new*(_: type OnChainProofs, marketplace: Marketplace): OnChainProofs = OnChainProofs(marketplace: marketplace, pollInterval: DefaultPollInterval) method periodicity*(proofs: OnChainProofs): Future[Periodicity] {.async.} = - let period = await proofs.marketplace.proofPeriod() + let config = await proofs.marketplace.config() + let period = config.proofs.period return Periodicity(seconds: period) method isProofRequired*(proofs: OnChainProofs, @@ -29,7 +30,7 @@ method isProofRequired*(proofs: OnChainProofs, try: return await proofs.marketplace.isProofRequired(id) except ProviderError as e: - if e.revertReason.contains("Slot empty"): + if e.revertReason.contains("Slot is free"): return false raise e @@ -38,18 +39,13 @@ method willProofBeRequired*(proofs: OnChainProofs, try: return await proofs.marketplace.willProofBeRequired(id) except ProviderError as e: - if e.revertReason.contains("Slot empty"): + if e.revertReason.contains("Slot is free"): return false raise e -method getProofEnd*(proofs: OnChainProofs, - id: SlotId): Future[UInt256] {.async.} = - try: - return await proofs.marketplace.proofEnd(id) - except ProviderError as e: - if e.revertReason.contains("Slot empty"): - return 0.u256 - raise e +method slotState*(proofs: OnChainProofs, + id: SlotId): Future[SlotState] {.async.} = + return await proofs.marketplace.slotState(id) method submitProof*(proofs: OnChainProofs, id: SlotId, diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index 2f4551b3..2a866952 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -39,6 +39,12 @@ type Cancelled Finished Failed + SlotState* {.pure.} = enum + Free + Filled + Finished + Failed + Paid proc `==`*(x, y: Nonce): bool {.borrow.} proc `==`*(x, y: RequestId): bool {.borrow.} diff --git a/codex/market.nim b/codex/market.nim index c3cd039a..6aecac59 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -28,15 +28,22 @@ method requestStorage*(market: Market, method myRequests*(market: Market): Future[seq[RequestId]] {.base, async.} = raiseAssert("not implemented") +method mySlots*(market: Market): Future[seq[SlotId]] {.base, async.} = + raiseAssert("not implemented") + method getRequest*(market: Market, id: RequestId): Future[?StorageRequest] {.base, async.} = raiseAssert("not implemented") -method getState*(market: Market, +method requestState*(market: Market, requestId: RequestId): Future[?RequestState] {.base, async.} = raiseAssert("not implemented") +method slotState*(market: Market, + slotId: SlotId): Future[SlotState] {.base, async.} = + raiseAssert("not implemented") + method getRequestEnd*(market: Market, id: RequestId): Future[SecondsSince1970] {.base, async.} = raiseAssert("not implemented") @@ -46,6 +53,10 @@ method getHost*(market: Market, slotIndex: UInt256): Future[?Address] {.base, async.} = raiseAssert("not implemented") +method getRequestFromSlotId*(market: Market, + slotId: SlotId): Future[?StorageRequest] {.base, async.} = + raiseAssert("not implemented") + method fillSlot*(market: Market, requestId: RequestId, slotIndex: UInt256, diff --git a/codex/node.nim b/codex/node.nim index b9c49090..b7d4af43 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -333,7 +333,7 @@ proc start*(node: CodexNodeRef) {.async.} = # TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead contracts.sales.onStore = proc(request: StorageRequest, slot: UInt256, - availability: Availability) {.async.} = + availability: ?Availability) {.async.} = ## store data in local storage ## @@ -354,7 +354,7 @@ proc start*(node: CodexNodeRef) {.async.} = if fetchRes.isErr: raise newException(CodexError, "Unable to retrieve blocks") - contracts.sales.onClear = proc(availability: Availability, + contracts.sales.onClear = proc(availability: ?Availability, request: StorageRequest, slotIndex: UInt256) = # TODO: remove data from local storage diff --git a/codex/proving.nim b/codex/proving.nim index 05b88de2..bf14b234 100644 --- a/codex/proving.nim +++ b/codex/proving.nim @@ -38,7 +38,8 @@ proc removeEndedContracts(proving: Proving) {.async.} = let now = proving.clock.now().u256 var ended: HashSet[SlotId] for id in proving.slots: - if now >= (await proving.proofs.getProofEnd(id)): + let state = await proving.proofs.slotState(id) + if state != SlotState.Filled: ended.incl(id) proving.slots.excl(ended) diff --git a/codex/purchasing/states/started.nim b/codex/purchasing/states/started.nim index 6d134c5e..439bc566 100644 --- a/codex/purchasing/states/started.nim +++ b/codex/purchasing/states/started.nim @@ -21,8 +21,10 @@ method enterAsync*(state: PurchaseStarted) {.async.} = try: let fut = await one(ended, failed) if fut.id == failed.id: + ended.cancel() state.switch(PurchaseFailed()) else: + failed.cancel() state.switch(PurchaseFinished()) await subscription.unsubscribe() except CatchableError as error: diff --git a/codex/purchasing/states/unknown.nim b/codex/purchasing/states/unknown.nim index 0102fa43..6b66e964 100644 --- a/codex/purchasing/states/unknown.nim +++ b/codex/purchasing/states/unknown.nim @@ -14,7 +14,7 @@ method enterAsync(state: PurchaseUnknown) {.async.} = try: if (request =? await purchase.market.getRequest(purchase.requestId)) and - (requestState =? await purchase.market.getState(purchase.requestId)): + (requestState =? await purchase.market.requestState(purchase.requestId)): purchase.request = some request diff --git a/codex/sales.nim b/codex/sales.nim index 4758e1a1..d88621c1 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -9,6 +9,12 @@ import ./market import ./clock import ./proving import ./contracts/requests +import ./sales/salescontext +import ./sales/salesagent +import ./sales/availability +import ./sales/statemachine +import ./sales/states/downloading +import ./sales/states/unknown ## Sales holds a list of available storage that it may sell. ## @@ -29,55 +35,36 @@ import ./contracts/requests ## | | ---- storage proof ---> | export stint +export availability type Sales* = ref object - market: Market - clock: Clock - subscription: ?market.Subscription - available*: seq[Availability] - onStore: ?OnStore - onProve: ?OnProve - onClear: ?OnClear - onSale: ?OnSale - proving: Proving - Availability* = object - id*: array[32, byte] - size*: UInt256 - duration*: UInt256 - minPrice*: UInt256 - SalesAgent = ref object - sales: Sales - requestId: RequestId - ask: StorageAsk - availability: Availability - request: ?StorageRequest - slotIndex: ?UInt256 - subscription: ?market.Subscription - running: ?Future[void] - waiting: ?Future[void] - finished: bool - OnStore = proc(request: StorageRequest, - slot: UInt256, - availability: Availability): Future[void] {.gcsafe, upraises: [].} - OnProve = proc(request: StorageRequest, - slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].} - OnClear = proc(availability: Availability, - request: StorageRequest, - slotIndex: UInt256) {.gcsafe, upraises: [].} - OnSale = proc(availability: Availability, - request: StorageRequest, - slotIndex: UInt256) {.gcsafe, upraises: [].} + context*: SalesContext + subscription*: ?market.Subscription + available: seq[Availability] + agents*: seq[SalesAgent] -func new*(_: type Sales, - market: Market, - clock: Clock, - proving: Proving): Sales = - Sales( - market: market, - clock: clock, - proving: proving - ) +proc `onStore=`*(sales: Sales, onStore: OnStore) = + sales.context.onStore = some onStore + +proc `onProve=`*(sales: Sales, onProve: OnProve) = + sales.context.onProve = some onProve + +proc `onClear=`*(sales: Sales, onClear: OnClear) = + sales.context.onClear = some onClear + +proc `onSale=`*(sales: Sales, callback: OnSale) = + sales.context.onSale = some callback + +proc onStore*(sales: Sales): ?OnStore = sales.context.onStore + +proc onProve*(sales: Sales): ?OnProve = sales.context.onProve + +proc onClear*(sales: Sales): ?OnClear = sales.context.onClear + +proc onSale*(sales: Sales): ?OnSale = sales.context.onSale + +proc available*(sales: Sales): seq[Availability] = sales.available proc init*(_: type Availability, size: UInt256, @@ -87,140 +74,95 @@ proc init*(_: type Availability, doAssert randomBytes(id) == 32 Availability(id: id, size: size, duration: duration, minPrice: minPrice) -proc `onStore=`*(sales: Sales, onStore: OnStore) = - sales.onStore = some onStore - -proc `onProve=`*(sales: Sales, onProve: OnProve) = - sales.onProve = some onProve - -proc `onClear=`*(sales: Sales, onClear: OnClear) = - sales.onClear = some onClear - -proc `onSale=`*(sales: Sales, callback: OnSale) = - sales.onSale = some callback - func add*(sales: Sales, availability: Availability) = - sales.available.add(availability) + if not sales.available.contains(availability): + sales.available.add(availability) + # TODO: add to disk (persist), serialise to json. func remove*(sales: Sales, availability: Availability) = sales.available.keepItIf(it != availability) + # TODO: remove from disk availability, mark as in use by assigning + # a slotId, so that it can be used for restoration (node restart) -func findAvailability(sales: Sales, ask: StorageAsk): ?Availability = +func new*(_: type Sales, + market: Market, + clock: Clock, + proving: Proving): Sales = + + let sales = Sales(context: SalesContext( + market: market, + clock: clock, + proving: proving + )) + + proc onSaleErrored(availability: Availability) = + sales.add(availability) + + sales.context.onSaleErrored = some onSaleErrored + sales + +func findAvailability*(sales: Sales, ask: StorageAsk): ?Availability = for availability in sales.available: if ask.slotSize <= availability.size and ask.duration <= availability.duration and ask.pricePerSlot >= availability.minPrice: return some availability -proc finish(agent: SalesAgent, success: bool) = - if agent.finished: - return - - agent.finished = true - - if subscription =? agent.subscription: - asyncSpawn subscription.unsubscribe() - - if running =? agent.running: - running.cancel() - - if waiting =? agent.waiting: - waiting.cancel() - - if success: - if request =? agent.request and - slotIndex =? agent.slotIndex: - agent.sales.proving.add(request.slotId(slotIndex)) - - if onSale =? agent.sales.onSale: - onSale(agent.availability, request, slotIndex) - else: - if onClear =? agent.sales.onClear and - request =? agent.request and - slotIndex =? agent.slotIndex: - onClear(agent.availability, request, slotIndex) - agent.sales.add(agent.availability) - -proc selectSlot(agent: SalesAgent) = +proc randomSlotIndex(numSlots: uint64): UInt256 = let rng = Rng.instance - let slotIndex = rng.rand(agent.ask.slots - 1) - agent.slotIndex = some slotIndex.u256 + let slotIndex = rng.rand(numSlots - 1) + return slotIndex.u256 -proc onSlotFilled(agent: SalesAgent, - requestId: RequestId, - slotIndex: UInt256) {.async.} = - try: - let market = agent.sales.market - let host = await market.getHost(requestId, slotIndex) - let me = await market.getSigner() - agent.finish(success = (host == me.some)) - except CatchableError: - agent.finish(success = false) +proc findSlotIndex(numSlots: uint64, + requestId: RequestId, + slotId: SlotId): ?UInt256 = + for i in 0.. agent.data == expected) diff --git a/tests/codex/sales/testsalesagent.nim b/tests/codex/sales/testsalesagent.nim new file mode 100644 index 00000000..2785b7bd --- /dev/null +++ b/tests/codex/sales/testsalesagent.nim @@ -0,0 +1,165 @@ +import std/sets +import std/sequtils +import std/sugar +import std/times +import pkg/asynctest +import pkg/chronos +import pkg/codex/sales +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/sales/statemachine +import pkg/codex/sales/states/errorhandling +import pkg/codex/proving +import ../helpers/mockmarket +import ../helpers/mockclock +import ../helpers/eventually +import ../examples + +var onCancelCalled = false +var onFailedCalled = false +var onSlotFilledCalled = false +var onErrorCalled = false + +type + MockState = ref object of SaleState + MockErrorState = ref object of ErrorHandlingState + +method `$`*(state: MockState): string = "MockState" + +method onCancelled*(state: MockState, request: StorageRequest): ?State = + onCancelCalled = true + +method onFailed*(state: MockState, request: StorageRequest): ?State = + onFailedCalled = true + +method onSlotFilled*(state: MockState, requestId: RequestId, + slotIndex: UInt256): ?State = + onSlotFilledCalled = true + +method onError*(state: MockErrorState, err: ref CatchableError): ?State = + onErrorCalled = true + +method run*(state: MockErrorState, machine: Machine): Future[?State] {.async.} = + raise newException(ValueError, "failure") + +suite "Sales agent": + + let availability = Availability.init( + size=100.u256, + duration=60.u256, + minPrice=600.u256 + ) + var request = StorageRequest( + ask: StorageAsk( + slots: 4, + slotSize: 100.u256, + duration: 60.u256, + reward: 10.u256, + ), + content: StorageContent( + cid: "some cid" + ), + expiry: (getTime() + initDuration(hours=1)).toUnix.u256 + ) + + var agent: SalesAgent + var context: SalesContext + var slotIndex: UInt256 + var market: MockMarket + var clock: MockClock + + setup: + market = MockMarket.new() + clock = MockClock.new() + context = SalesContext(market: market, clock: clock) + slotIndex = 0.u256 + onCancelCalled = false + onFailedCalled = false + onSlotFilledCalled = false + agent = newSalesAgent(context, + request.id, + slotIndex, + some availability, + some request) + request.expiry = (getTime() + initDuration(hours=1)).toUnix.u256 + + teardown: + await agent.stop() + + test "can retrieve request": + agent = newSalesAgent(context, + request.id, + slotIndex, + some availability, + none StorageRequest) + market.requested = @[request] + await agent.retrieveRequest() + check agent.data.request == some request + + test "subscribe assigns subscriptions/futures": + await agent.subscribe() + check not agent.data.cancelled.isNil + check not agent.data.failed.isNil + check not agent.data.fulfilled.isNil + check not agent.data.slotFilled.isNil + + test "unsubscribe deassigns subscriptions/futures": + await agent.subscribe() + await agent.unsubscribe() + check agent.data.cancelled.isNil + check agent.data.failed.isNil + check agent.data.fulfilled.isNil + check agent.data.slotFilled.isNil + + test "subscribe can be called multiple times, without overwriting subscriptions/futures": + await agent.subscribe() + let cancelled = agent.data.cancelled + let failed = agent.data.failed + let fulfilled = agent.data.fulfilled + let slotFilled = agent.data.slotFilled + await agent.subscribe() + check cancelled == agent.data.cancelled + check failed == agent.data.failed + check fulfilled == agent.data.fulfilled + check slotFilled == agent.data.slotFilled + + test "unsubscribe can be called multiple times": + await agent.subscribe() + await agent.unsubscribe() + await agent.unsubscribe() + + test "subscribe can be called when request expiry has lapsed": + # succeeds when agent.data.fulfilled.isNil + request.expiry = (getTime() - initDuration(seconds=1)).toUnix.u256 + agent.data.request = some request + check agent.data.fulfilled.isNil + await agent.subscribe() + + test "current state onCancelled called when cancel emitted": + let state = MockState.new() + agent.start(state) + await agent.subscribe() + clock.set(request.expiry.truncate(int64)) + check eventually onCancelCalled + + test "cancelled future is finished (cancelled) when fulfillment emitted": + agent.start(MockState.new()) + await agent.subscribe() + market.emitRequestFulfilled(request.id) + check eventually agent.data.cancelled.cancelled() + + test "current state onFailed called when failed emitted": + agent.start(MockState.new()) + await agent.subscribe() + market.emitRequestFailed(request.id) + check eventually onFailedCalled + + test "current state onSlotFilled called when slot filled emitted": + agent.start(MockState.new()) + await agent.subscribe() + market.emitSlotFilled(request.id, slotIndex) + check eventually onSlotFilledCalled + + test "ErrorHandlingState.onError can be overridden at the state level": + agent.start(MockErrorState.new()) + check eventually onErrorCalled diff --git a/tests/codex/sales/teststates.nim b/tests/codex/sales/teststates.nim new file mode 100644 index 00000000..145558a8 --- /dev/null +++ b/tests/codex/sales/teststates.nim @@ -0,0 +1,8 @@ +import ./states/testunknown +import ./states/testdownloading +import ./states/testfilling +import ./states/testfinished +import ./states/testproving +import ./states/testfilled + +{.warning[UnusedImport]: off.} diff --git a/tests/codex/testproving.nim b/tests/codex/testproving.nim index bf810391..0386a0b2 100644 --- a/tests/codex/testproving.nim +++ b/tests/codex/testproving.nim @@ -47,6 +47,7 @@ suite "Proving": proc onProofRequired(id: SlotId) = called = true proving.onProofRequired = onProofRequired + proofs.setSlotState(id, SlotState.Filled) proofs.setProofRequired(id, true) await proofs.advanceToNextPeriod() check eventually called @@ -59,6 +60,8 @@ suite "Proving": proc onProofRequired(id: SlotId) = callbackIds.add(id) proving.onProofRequired = onProofRequired + proofs.setSlotState(id1, SlotState.Filled) + proofs.setSlotState(id2, SlotState.Filled) proofs.setProofRequired(id1, true) await proofs.advanceToNextPeriod() check eventually callbackIds == @[id1] @@ -76,6 +79,7 @@ suite "Proving": proving.onProofRequired = onProofRequired proofs.setProofRequired(id, false) proofs.setProofToBeRequired(id, true) + proofs.setSlotState(id, SlotState.Filled) await proofs.advanceToNextPeriod() check eventually called @@ -90,6 +94,7 @@ suite "Proving": proving.onProofRequired = onProofRequired proofs.setProofRequired(id, true) await proofs.advanceToNextPeriod() + proofs.setSlotState(id, SlotState.Finished) check eventually (not proving.slots.contains(id)) check not called diff --git a/tests/codex/testpurchasing.nim b/tests/codex/testpurchasing.nim index 0c525ccf..f2c00a67 100644 --- a/tests/codex/testpurchasing.nim +++ b/tests/codex/testpurchasing.nim @@ -141,11 +141,11 @@ suite "Purchasing state machine": let request1, request2, request3, request4, request5 = StorageRequest.example market.requested = @[request1, request2, request3, request4, request5] market.activeRequests[me] = @[request1.id, request2.id, request3.id, request4.id, request5.id] - market.state[request1.id] = RequestState.New - market.state[request2.id] = RequestState.Started - market.state[request3.id] = RequestState.Cancelled - market.state[request4.id] = RequestState.Finished - market.state[request5.id] = RequestState.Failed + market.requestState[request1.id] = RequestState.New + market.requestState[request2.id] = RequestState.Started + market.requestState[request3.id] = RequestState.Cancelled + market.requestState[request4.id] = RequestState.Finished + market.requestState[request5.id] = RequestState.Failed # ensure the started state doesn't error, giving a false positive test result market.requestEnds[request2.id] = clock.now() - 1 @@ -162,7 +162,7 @@ suite "Purchasing state machine": let request = StorageRequest.example let purchase = Purchase.new(request, market, clock) market.requested = @[request] - market.state[request.id] = RequestState.New + market.requestState[request.id] = RequestState.New purchase.switch(PurchaseUnknown()) check (purchase.state as PurchaseSubmitted).isSome @@ -171,7 +171,7 @@ suite "Purchasing state machine": let purchase = Purchase.new(request, market, clock) market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) market.requested = @[request] - market.state[request.id] = RequestState.Started + market.requestState[request.id] = RequestState.Started purchase.switch(PurchaseUnknown()) check (purchase.state as PurchaseStarted).isSome @@ -179,7 +179,7 @@ suite "Purchasing state machine": let request = StorageRequest.example let purchase = Purchase.new(request, market, clock) market.requested = @[request] - market.state[request.id] = RequestState.Cancelled + market.requestState[request.id] = RequestState.Cancelled purchase.switch(PurchaseUnknown()) check (purchase.state as PurchaseErrored).isSome check purchase.error.?msg == "Purchase cancelled due to timeout".some @@ -188,7 +188,7 @@ suite "Purchasing state machine": let request = StorageRequest.example let purchase = Purchase.new(request, market, clock) market.requested = @[request] - market.state[request.id] = RequestState.Finished + market.requestState[request.id] = RequestState.Finished purchase.switch(PurchaseUnknown()) check (purchase.state as PurchaseFinished).isSome @@ -196,7 +196,7 @@ suite "Purchasing state machine": let request = StorageRequest.example let purchase = Purchase.new(request, market, clock) market.requested = @[request] - market.state[request.id] = RequestState.Failed + market.requestState[request.id] = RequestState.Failed purchase.switch(PurchaseUnknown()) check (purchase.state as PurchaseErrored).isSome check purchase.error.?msg == "Purchase failed".some @@ -206,7 +206,7 @@ suite "Purchasing state machine": let request = StorageRequest.example market.requested = @[request] market.activeRequests[me] = @[request.id] - market.state[request.id] = RequestState.Started + market.requestState[request.id] = RequestState.Started market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) await purchasing.load() @@ -226,7 +226,7 @@ suite "Purchasing state machine": let request = StorageRequest.example market.requested = @[request] market.activeRequests[me] = @[request.id] - market.state[request.id] = RequestState.Started + market.requestState[request.id] = RequestState.Started market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64) await purchasing.load() @@ -234,7 +234,7 @@ suite "Purchasing state machine": clock.advance(request.ask.duration.truncate(int64)) # now check the result - proc getState: ?PurchaseState = + proc requestState: ?PurchaseState = purchasing.getPurchase(PurchaseId(request.id)).?state as PurchaseState - check eventually (getState() as PurchaseFinished).isSome + check eventually (requestState() as PurchaseFinished).isSome diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index 073e07c2..80d20a9e 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -1,212 +1,4 @@ -import std/sets -import pkg/asynctest -import pkg/chronos -import pkg/codex/contracts/requests -import pkg/codex/proving -import pkg/codex/sales -import ./helpers/mockmarket -import ./helpers/mockclock -import ./helpers/eventually -import ./examples +import ./sales/testsales +import ./sales/teststates -suite "Sales": - - let availability = Availability.init( - size=100.u256, - duration=60.u256, - minPrice=600.u256 - ) - var request = StorageRequest( - ask: StorageAsk( - slots: 4, - slotSize: 100.u256, - duration: 60.u256, - reward: 10.u256, - ), - content: StorageContent( - cid: "some cid" - ) - ) - let proof = exampleProof() - - var sales: Sales - var market: MockMarket - var clock: MockClock - var proving: Proving - - setup: - market = MockMarket.new() - clock = MockClock.new() - proving = Proving.new() - sales = Sales.new(market, clock, proving) - sales.onStore = proc(request: StorageRequest, - slot: UInt256, - availability: Availability) {.async.} = - discard - sales.onProve = proc(request: StorageRequest, - slot: UInt256): Future[seq[byte]] {.async.} = - return proof - await sales.start() - request.expiry = (clock.now() + 42).u256 - - teardown: - await sales.stop() - - test "has no availability initially": - check sales.available.len == 0 - - test "can add available storage": - let availability1 = Availability.example - let availability2 = Availability.example - sales.add(availability1) - check sales.available.contains(availability1) - sales.add(availability2) - check sales.available.contains(availability1) - check sales.available.contains(availability2) - - test "can remove available storage": - sales.add(availability) - sales.remove(availability) - check sales.available.len == 0 - - test "generates unique ids for storage availability": - let availability1 = Availability.init(1.u256, 2.u256, 3.u256) - let availability2 = Availability.init(1.u256, 2.u256, 3.u256) - check availability1.id != availability2.id - - test "makes storage unavailable when matching request comes in": - sales.add(availability) - await market.requestStorage(request) - check sales.available.len == 0 - - test "ignores request when no matching storage is available": - sales.add(availability) - var tooBig = request - tooBig.ask.slotSize = request.ask.slotSize + 1 - await market.requestStorage(tooBig) - check sales.available == @[availability] - - test "ignores request when reward is too low": - sales.add(availability) - var tooCheap = request - tooCheap.ask.reward = request.ask.reward - 1 - await market.requestStorage(tooCheap) - check sales.available == @[availability] - - test "retrieves and stores data locally": - var storingRequest: StorageRequest - var storingSlot: UInt256 - var storingAvailability: Availability - sales.onStore = proc(request: StorageRequest, - slot: UInt256, - availability: Availability) {.async.} = - storingRequest = request - storingSlot = slot - storingAvailability = availability - sales.add(availability) - await market.requestStorage(request) - check storingRequest == request - check storingSlot < request.ask.slots.u256 - check storingAvailability == availability - - test "makes storage available again when data retrieval fails": - let error = newException(IOError, "data retrieval failed") - sales.onStore = proc(request: StorageRequest, - slot: UInt256, - availability: Availability) {.async.} = - raise error - sales.add(availability) - await market.requestStorage(request) - check sales.available == @[availability] - - test "generates proof of storage": - var provingRequest: StorageRequest - var provingSlot: UInt256 - sales.onProve = proc(request: StorageRequest, - slot: UInt256): Future[seq[byte]] {.async.} = - provingRequest = request - provingSlot = slot - sales.add(availability) - await market.requestStorage(request) - check provingRequest == request - check provingSlot < request.ask.slots.u256 - - test "fills a slot": - sales.add(availability) - await market.requestStorage(request) - check market.filled.len == 1 - check market.filled[0].requestId == request.id - check market.filled[0].slotIndex < request.ask.slots.u256 - check market.filled[0].proof == proof - check market.filled[0].host == await market.getSigner() - - test "calls onSale when slot is filled": - var soldAvailability: Availability - var soldRequest: StorageRequest - var soldSlotIndex: UInt256 - sales.onSale = proc(availability: Availability, - request: StorageRequest, - slotIndex: UInt256) = - soldAvailability = availability - soldRequest = request - soldSlotIndex = slotIndex - sales.add(availability) - await market.requestStorage(request) - check soldAvailability == availability - check soldRequest == request - check soldSlotIndex < request.ask.slots.u256 - - test "calls onClear when storage becomes available again": - # fail the proof intentionally to trigger `agent.finish(success=false)`, - # which then calls the onClear callback - sales.onProve = proc(request: StorageRequest, - slot: UInt256): Future[seq[byte]] {.async.} = - raise newException(IOError, "proof failed") - var clearedAvailability: Availability - var clearedRequest: StorageRequest - var clearedSlotIndex: UInt256 - sales.onClear = proc(availability: Availability, - request: StorageRequest, - slotIndex: UInt256) = - clearedAvailability = availability - clearedRequest = request - clearedSlotIndex = slotIndex - sales.add(availability) - await market.requestStorage(request) - check clearedAvailability == availability - check clearedRequest == request - check clearedSlotIndex < request.ask.slots.u256 - - test "makes storage available again when other host fills the slot": - let otherHost = Address.example - sales.onStore = proc(request: StorageRequest, - slot: UInt256, - availability: Availability) {.async.} = - await sleepAsync(1.hours) - sales.add(availability) - await market.requestStorage(request) - for slotIndex in 0..