From d56eb6aee103635d1c0c17843cd735d738df103c Mon Sep 17 00:00:00 2001 From: markspanbroek Date: Wed, 19 Apr 2023 15:06:00 +0200 Subject: [PATCH] Validator (#387) * [contracts] Add SlotFreed event * [integration] allow test node to be stopped twice * [cli] add --validator option * [contracts] remove dead code * [contracts] instantiate OnChainMarket and OnChainClock only once * [contracts] add Validation * [sales] remove duplicate import * [market] add missing import * [market] subscribe to all SlotFilled events * [market] add freeSlot() * [sales] fix warnings * [market] subscribe to SlotFreed events * [contracts] fix warning * [validator] keep track of filled slots * [validation] remove slots that have ended * [proving] absorb Proofs into Market Both Proofs and Market are abstractions around the Marketplace contract, having them separately is more trouble than it's worth at the moment. * [market] add markProofAsMissing() * [clock] speed up waiting for clock in tests * [validator] mark proofs as missing * [timer] fix error on node shutdown * [cli] handle --persistence and --validator separately * [market] allow retrieval of proof timeout value * [validator] do not subscribe to SlotFreed events Freed slots are already handled in removeSlotsThatHaveEnded(), and onSlotsFreed() interfered with its iterator. * [validator] Start validation at the start of a new period To decrease the likelihood that we hit the validation timeout. * [validator] do not mark proofs as missing after timeout * [market] check whether proof can be marked as missing * [validator] simplify validation Simulate a transaction to mark proof as missing, instead of trying to keep track of all the conditions that may lead to a proof being marked as missing. * [build] use nim-ethers PR #40 Uses "pending" blocktag instead of "latest" blocktag for better simulation of transactions before sending them. https://github.com/status-im/nim-ethers/pull/40 * [integration] integration test for validator * [validator] monitor a maximum number of slots Adds cli parameter --validator-max-slots. * [market] fix missing collateral argument After rebasing, add the new argument to fillSlot calls. * [build] update to nim-ethers 0.2.5 * [validator] use Set instead of Table to keep track of slots * [validator] add logging * [validator] add test for slot failure * [market] use "pending" blocktag to use more up to date block time * [contracts] remove unused import * [validator] fix: wait until after period ends The smart contract checks that 'end < block.timestamp', so we need to wait until the block timestamp is greater than the period end. --- codex/clock.nim | 2 +- codex/codex.nim | 39 ++++-- codex/conf.nim | 12 ++ codex/contracts.nim | 2 - codex/contracts/interactions.nim | 6 +- .../interactions/clientinteractions.nim | 17 +-- .../interactions/hostinteractions.nim | 23 +--- codex/contracts/interactions/interactions.nim | 26 +--- .../interactions/validatorinteractions.nim | 21 +++ codex/contracts/market.nim | 84 +++++++++++- codex/contracts/marketplace.nim | 4 + codex/contracts/proofs.nim | 64 --------- codex/market.nim | 51 +++++++ codex/node.nim | 10 +- codex/{storageproofs/timing => }/periods.nim | 0 codex/proving.nim | 23 ++-- codex/sales.nim | 1 - codex/storageproofs.nim | 3 +- codex/storageproofs/storageproofs.nim | 3 +- codex/storageproofs/timing.nim | 4 - codex/storageproofs/timing/proofs.nim | 44 ------ codex/utils/timer.nim | 2 + codex/validation.nim | 102 ++++++++++++++ tests/codex/helpers/mockclock.nim | 24 +++- tests/codex/helpers/mockmarket.nim | 125 +++++++++++++++++- tests/codex/helpers/mockproofs.nim | 74 ----------- tests/codex/sales/testreservations.nim | 1 - tests/codex/sales/testsales.nim | 1 - tests/codex/testproving.nim | 56 ++++---- tests/codex/testvalidation.nim | 65 +++++++++ tests/contracts/testContracts.nim | 1 - tests/contracts/testDeployment.nim | 2 +- tests/contracts/testInteractions.nim | 46 ------- tests/contracts/testMarket.nim | 89 ++++++++++++- tests/contracts/testProofs.nim | 57 -------- tests/examples.nim | 2 +- tests/integration/nodes.nim | 9 +- tests/integration/testproofs.nim | 74 +++++++++-- tests/testCodex.nim | 1 + tests/testContracts.nim | 2 - vendor/nim-ethers | 2 +- 41 files changed, 722 insertions(+), 452 deletions(-) create mode 100644 codex/contracts/interactions/validatorinteractions.nim delete mode 100644 codex/contracts/proofs.nim rename codex/{storageproofs/timing => }/periods.nim (100%) delete mode 100644 codex/storageproofs/timing.nim delete mode 100644 codex/storageproofs/timing/proofs.nim create mode 100644 codex/validation.nim delete mode 100644 tests/codex/helpers/mockproofs.nim create mode 100644 tests/codex/testvalidation.nim delete mode 100644 tests/contracts/testInteractions.nim delete mode 100644 tests/contracts/testProofs.nim diff --git a/codex/clock.nim b/codex/clock.nim index 7cfebc25..069c4d28 100644 --- a/codex/clock.nim +++ b/codex/clock.nim @@ -10,7 +10,7 @@ type method now*(clock: Clock): SecondsSince1970 {.base, upraises: [].} = raiseAssert "not implemented" -proc waitUntil*(clock: Clock, time: SecondsSince1970) {.async.} = +method waitUntil*(clock: Clock, time: SecondsSince1970) {.base,async.} = while clock.now() < time: await sleepAsync(1.seconds) diff --git a/codex/codex.nim b/codex/codex.nim index 4ec7b867..f90ab69e 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -32,6 +32,7 @@ import ./utils/fileutils import ./erasure import ./discovery import ./contracts +import ./contracts/clock import ./utils/addrutils import ./namespaces @@ -100,13 +101,16 @@ proc new(_: type Contracts, config: CodexConf, repo: RepoStore): Contracts = - if not config.persistence: + if not config.persistence and not config.validator: if config.ethAccount.isSome: - warn "Ethereum account was set, but persistence is not enabled" + warn "Ethereum account was set, but neither persistence nor validator is enabled" return without account =? config.ethAccount: - error "Persistence enabled, but no Ethereum account was set" + if config.persistence: + error "Persistence enabled, but no Ethereum account was set" + if config.validator: + error "Validator enabled, but no Ethereum account was set" quit QuitFailure var deploy: Deployment @@ -123,17 +127,26 @@ proc new(_: type Contracts, error "Marketplace contract address not found in deployment file" quit QuitFailure - # TODO: at some point there may be cli options that enable client-only or host-only - # operation, and both client AND host will not necessarily need to be instantiated - let client = ClientInteractions.new(config.ethProvider, - account, - marketplaceAddress) - let host = HostInteractions.new(config.ethProvider, - account, - repo, - marketplaceAddress) + let provider = JsonRpcProvider.new(config.ethProvider) + let signer = provider.getSigner(account) + let marketplace = Marketplace.new(marketplaceAddress, signer) + let market = OnChainMarket.new(marketplace) + let clock = OnChainClock.new(provider) - (client.option, host.option) + var client: ?ClientInteractions + var host: ?HostInteractions + var validator: ?ValidatorInteractions + if config.persistence: + let purchasing = Purchasing.new(market, clock) + let proving = Proving.new(market, clock) + let sales = Sales.new(market, clock, proving, repo) + client = some ClientInteractions.new(clock, purchasing) + host = some HostInteractions.new(clock, sales, proving) + if config.validator: + let validation = Validation.new(clock, market, config.validatorMaxSlots) + validator = some ValidatorInteractions.new(clock, validation) + + (client, host, validator) proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): T = diff --git a/codex/conf.nim b/codex/conf.nim index 11cd8446..5fb4c51e 100644 --- a/codex/conf.nim +++ b/codex/conf.nim @@ -218,6 +218,18 @@ type name: "eth-deployment" .}: Option[string] + validator* {. + desc: "Enables validator, requires an Ethereum node" + defaultValue: false + name: "validator" + .}: bool + + validatorMaxSlots* {. + desc: "Maximum number of slots that the validator monitors" + defaultValue: 1000 + name: "validator-max-slots" + .}: int + of initNode: discard diff --git a/codex/contracts.nim b/codex/contracts.nim index d2cd0c9e..a9b99ac3 100644 --- a/codex/contracts.nim +++ b/codex/contracts.nim @@ -2,12 +2,10 @@ import contracts/requests import contracts/marketplace import contracts/deployment import contracts/market -import contracts/proofs import contracts/interactions export requests export marketplace export deployment export market -export proofs export interactions diff --git a/codex/contracts/interactions.nim b/codex/contracts/interactions.nim index 01dcd14e..13eae8a0 100644 --- a/codex/contracts/interactions.nim +++ b/codex/contracts/interactions.nim @@ -1,5 +1,9 @@ import ./interactions/interactions import ./interactions/hostinteractions import ./interactions/clientinteractions +import ./interactions/validatorinteractions -export interactions, hostinteractions, clientinteractions +export interactions +export hostinteractions +export clientinteractions +export validatorinteractions diff --git a/codex/contracts/interactions/clientinteractions.nim b/codex/contracts/interactions/clientinteractions.nim index b0162419..c30a22e4 100644 --- a/codex/contracts/interactions/clientinteractions.nim +++ b/codex/contracts/interactions/clientinteractions.nim @@ -1,12 +1,8 @@ import pkg/ethers import pkg/chronicles -import pkg/questionable -import pkg/questionable/results import ../../purchasing -import ../marketplace import ../market -import ../proofs import ../clock import ./interactions @@ -18,16 +14,9 @@ type purchasing*: Purchasing proc new*(_: type ClientInteractions, - providerUrl: string, - account: Address, - contractAddress: Address): ?!ClientInteractions = - - without prepared =? prepare(providerUrl, account, contractAddress), error: - return failure(error) - - let c = ClientInteractions.new(prepared.clock) - c.purchasing = Purchasing.new(prepared.market, prepared.clock) - return success(c) + clock: OnChainClock, + purchasing: Purchasing): ClientInteractions = + ClientInteractions(clock: clock, purchasing: purchasing) proc start*(self: ClientInteractions) {.async.} = await procCall ContractInteractions(self).start() diff --git a/codex/contracts/interactions/hostinteractions.nim b/codex/contracts/interactions/hostinteractions.nim index 04762700..2f9ba92d 100644 --- a/codex/contracts/interactions/hostinteractions.nim +++ b/codex/contracts/interactions/hostinteractions.nim @@ -1,12 +1,8 @@ import pkg/ethers import pkg/chronicles -import pkg/questionable -import pkg/questionable/results import ../../sales import ../../proving -import ../../stores -import ../proofs import ./interactions export sales @@ -19,21 +15,10 @@ type proving*: Proving proc new*(_: type HostInteractions, - providerUrl: string, - account: Address, - repo: RepoStore, - contractAddress: Address): ?!HostInteractions = - - without prepared =? prepare(providerUrl, account, contractAddress), error: - return failure(error) - - let proofs = OnChainProofs.new(prepared.contract) - let proving = Proving.new(proofs, prepared.clock) - - let h = HostInteractions.new(prepared.clock) - h.sales = Sales.new(prepared.market, prepared.clock, proving, repo) - h.proving = proving - return success(h) + clock: OnChainClock, + sales: Sales, + proving: Proving): HostInteractions = + HostInteractions(clock: clock, sales: sales, proving: proving) method start*(self: HostInteractions) {.async.} = await procCall ContractInteractions(self).start() diff --git a/codex/contracts/interactions/interactions.nim b/codex/contracts/interactions/interactions.nim index ed647e3b..3ad67991 100644 --- a/codex/contracts/interactions/interactions.nim +++ b/codex/contracts/interactions/interactions.nim @@ -1,33 +1,13 @@ import pkg/ethers -import ../../errors import ../clock import ../marketplace import ../market +export clock + type ContractInteractions* = ref object of RootObj - clock: OnChainClock - ContractInteractionsError* = object of CodexError - ReadDeploymentFileFailureError* = object of ContractInteractionsError - ContractAddressError* = object of ContractInteractionsError - -proc new*(T: type ContractInteractions, - clock: OnChainClock): T = - T(clock: clock) - -proc prepare*( - providerUrl: string = "ws://localhost:8545", - account, contractAddress: Address): - ?!tuple[contract: Marketplace, market: OnChainMarket, clock: OnChainClock] = - - let provider = JsonRpcProvider.new(providerUrl) - let signer = provider.getSigner(account) - - let contract = Marketplace.new(contractAddress, signer) - let market = OnChainMarket.new(contract) - let clock = OnChainClock.new(signer.provider) - - return success((contract, market, clock)) + clock*: OnChainClock method start*(self: ContractInteractions) {.async, base.} = await self.clock.start() diff --git a/codex/contracts/interactions/validatorinteractions.nim b/codex/contracts/interactions/validatorinteractions.nim new file mode 100644 index 00000000..1aa4026c --- /dev/null +++ b/codex/contracts/interactions/validatorinteractions.nim @@ -0,0 +1,21 @@ +import ./interactions +import ../../validation + +export validation + +type + ValidatorInteractions* = ref object of ContractInteractions + validation: Validation + +proc new*(_: type ValidatorInteractions, + clock: OnChainClock, + validation: Validation): ValidatorInteractions = + ValidatorInteractions(clock: clock, validation: validation) + +proc start*(self: ValidatorInteractions) {.async.} = + await procCall ContractInteractions(self).start() + await self.validation.start() + +proc stop*(self: ValidatorInteractions) {.async.} = + await self.validation.stop() + await procCall ContractInteractions(self).stop() diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index 901cb55a..068f02c5 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -39,6 +39,15 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = method getSigner*(market: OnChainMarket): Future[Address] {.async.} = return await market.signer.getAddress() +method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} = + let config = await market.contract.config() + let period = config.proofs.period + return Periodicity(seconds: period) + +method proofTimeout*(market: OnChainMarket): Future[UInt256] {.async.} = + let config = await market.contract.config() + return config.proofs.timeout + method myRequests*(market: OnChainMarket): Future[seq[RequestId]] {.async.} = return await market.contract.myRequests @@ -104,10 +113,54 @@ method fillSlot(market: OnChainMarket, await market.approveFunds(collateral) await market.contract.fillSlot(requestId, slotIndex, proof) +method freeSlot*(market: OnChainMarket, slotId: SlotId) {.async.} = + await market.contract.freeSlot(slotId) + method withdrawFunds(market: OnChainMarket, requestId: RequestId) {.async.} = await market.contract.withdrawFunds(requestId) +method isProofRequired*(market: OnChainMarket, + id: SlotId): Future[bool] {.async.} = + try: + return await market.contract.isProofRequired(id) + except ProviderError as e: + if e.revertReason.contains("Slot is free"): + return false + raise e + +method willProofBeRequired*(market: OnChainMarket, + id: SlotId): Future[bool] {.async.} = + try: + return await market.contract.willProofBeRequired(id) + except ProviderError as e: + if e.revertReason.contains("Slot is free"): + return false + raise e + +method submitProof*(market: OnChainMarket, + id: SlotId, + proof: seq[byte]) {.async.} = + await market.contract.submitProof(id, proof) + +method markProofAsMissing*(market: OnChainMarket, + id: SlotId, + period: Period) {.async.} = + await market.contract.markProofAsMissing(id, period) + +method canProofBeMarkedAsMissing*(market: OnChainMarket, + id: SlotId, + period: Period): Future[bool] {.async.} = + let provider = market.contract.provider + let contractWithoutSigner = market.contract.connect(provider) + let overrides = CallOverrides(blockTag: some BlockTag.pending) + try: + await contractWithoutSigner.markProofAsMissing(id, period, overrides) + return true + except EthersError as e: + trace "Proof can not be marked as missing", msg = e.msg + return false + method subscribeRequests(market: OnChainMarket, callback: OnRequest): Future[MarketSubscription] {.async.} = @@ -116,15 +169,30 @@ method subscribeRequests(market: OnChainMarket, let subscription = await market.contract.subscribe(StorageRequested, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeSlotFilled*(market: OnChainMarket, + callback: OnSlotFilled): + Future[MarketSubscription] {.async.} = + proc onEvent(event: SlotFilled) {.upraises:[].} = + callback(event.requestId, event.slotIndex) + let subscription = await market.contract.subscribe(SlotFilled, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method subscribeSlotFilled*(market: OnChainMarket, requestId: RequestId, slotIndex: UInt256, callback: OnSlotFilled): Future[MarketSubscription] {.async.} = - proc onEvent(event: SlotFilled) {.upraises:[].} = - if event.requestId == requestId and event.slotIndex == slotIndex: - callback(event.requestId, event.slotIndex) - let subscription = await market.contract.subscribe(SlotFilled, onEvent) + proc onSlotFilled(eventRequestId: RequestId, eventSlotIndex: UInt256) = + if eventRequestId == requestId and eventSlotIndex == slotIndex: + callback(requestId, slotIndex) + return await market.subscribeSlotFilled(onSlotFilled) + +method subscribeSlotFreed*(market: OnChainMarket, + callback: OnSlotFreed): + Future[MarketSubscription] {.async.} = + proc onEvent(event: SlotFreed) {.upraises:[].} = + callback(event.slotId) + let subscription = await market.contract.subscribe(SlotFreed, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) method subscribeFulfillment(market: OnChainMarket, @@ -157,5 +225,13 @@ method subscribeRequestFailed*(market: OnChainMarket, let subscription = await market.contract.subscribe(RequestFailed, onEvent) return OnChainMarketSubscription(eventSubscription: subscription) +method subscribeProofSubmission*(market: OnChainMarket, + callback: OnProofSubmitted): + Future[MarketSubscription] {.async.} = + proc onEvent(event: ProofSubmitted) {.upraises: [].} = + callback(event.id, event.proof) + let subscription = await market.contract.subscribe(ProofSubmitted, onEvent) + return OnChainMarketSubscription(eventSubscription: subscription) + method unsubscribe*(subscription: OnChainMarketSubscription) {.async.} = await subscription.eventSubscription.unsubscribe() diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index 2452ad5f..45d92335 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -11,6 +11,7 @@ export stint export ethers export erc20 export config +export requests type Marketplace* = ref object of Contract @@ -21,6 +22,9 @@ type requestId* {.indexed.}: RequestId slotIndex* {.indexed.}: UInt256 slotId*: SlotId + SlotFreed* = object of Event + requestId* {.indexed.}: RequestId + slotId*: SlotId RequestFulfilled* = object of Event requestId* {.indexed.}: RequestId RequestCancelled* = object of Event diff --git a/codex/contracts/proofs.nim b/codex/contracts/proofs.nim deleted file mode 100644 index 8e46b18d..00000000 --- a/codex/contracts/proofs.nim +++ /dev/null @@ -1,64 +0,0 @@ -import std/strutils -import pkg/ethers -import pkg/ethers/testing -import ../storageproofs/timing/proofs -import ./marketplace - -export proofs - -type - OnChainProofs* = ref object of Proofs - marketplace: Marketplace - pollInterval*: Duration - ProofsSubscription = proofs.Subscription - EventSubscription = ethers.Subscription - OnChainProofsSubscription = ref object of ProofsSubscription - eventSubscription: EventSubscription - -const DefaultPollInterval = 3.seconds - -proc new*(_: type OnChainProofs, marketplace: Marketplace): OnChainProofs = - OnChainProofs(marketplace: marketplace, pollInterval: DefaultPollInterval) - -method periodicity*(proofs: OnChainProofs): Future[Periodicity] {.async.} = - let config = await proofs.marketplace.config() - let period = config.proofs.period - return Periodicity(seconds: period) - -method isProofRequired*(proofs: OnChainProofs, - id: SlotId): Future[bool] {.async.} = - try: - return await proofs.marketplace.isProofRequired(id) - except ProviderError as e: - if e.revertReason.contains("Slot is free"): - return false - raise e - -method willProofBeRequired*(proofs: OnChainProofs, - id: SlotId): Future[bool] {.async.} = - try: - return await proofs.marketplace.willProofBeRequired(id) - except ProviderError as e: - if e.revertReason.contains("Slot is free"): - return false - raise e - -method slotState*(proofs: OnChainProofs, - id: SlotId): Future[SlotState] {.async.} = - return await proofs.marketplace.slotState(id) - -method submitProof*(proofs: OnChainProofs, - id: SlotId, - proof: seq[byte]) {.async.} = - await proofs.marketplace.submitProof(id, proof) - -method subscribeProofSubmission*(proofs: OnChainProofs, - callback: OnProofSubmitted): - Future[ProofsSubscription] {.async.} = - proc onEvent(event: ProofSubmitted) {.upraises: [].} = - callback(event.id, event.proof) - let subscription = await proofs.marketplace.subscribe(ProofSubmitted, onEvent) - return OnChainProofsSubscription(eventSubscription: subscription) - -method unsubscribe*(subscription: OnChainProofsSubscription) {.async, upraises:[].} = - await subscription.eventSubscription.unsubscribe() diff --git a/codex/market.nim b/codex/market.nim index a0afde21..e2a233a6 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -4,11 +4,13 @@ import pkg/questionable import pkg/ethers/erc20 import ./contracts/requests import ./clock +import ./periods export chronos export questionable export requests export SecondsSince1970 +export periods type Market* = ref object of RootObj @@ -16,12 +18,20 @@ type OnRequest* = proc(id: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} OnFulfillment* = proc(requestId: RequestId) {.gcsafe, upraises: [].} OnSlotFilled* = proc(requestId: RequestId, slotIndex: UInt256) {.gcsafe, upraises:[].} + OnSlotFreed* = proc(slotId: SlotId) {.gcsafe, upraises: [].} OnRequestCancelled* = proc(requestId: RequestId) {.gcsafe, upraises:[].} OnRequestFailed* = proc(requestId: RequestId) {.gcsafe, upraises:[].} + OnProofSubmitted* = proc(id: SlotId, proof: seq[byte]) {.gcsafe, upraises:[].} method getSigner*(market: Market): Future[Address] {.base, async.} = raiseAssert("not implemented") +method periodicity*(market: Market): Future[Periodicity] {.base, async.} = + raiseAssert("not implemented") + +method proofTimeout*(market: Market): Future[UInt256] {.base, async.} = + raiseAssert("not implemented") + method requestStorage*(market: Market, request: StorageRequest) {.base, async.} = raiseAssert("not implemented") @@ -67,6 +77,9 @@ method fillSlot*(market: Market, collateral: UInt256) {.base, async.} = raiseAssert("not implemented") +method freeSlot*(market: Market, slotId: SlotId) {.base, async.} = + raiseAssert("not implemented") + method withdrawFunds*(market: Market, requestId: RequestId) {.base, async.} = raiseAssert("not implemented") @@ -76,12 +89,40 @@ method subscribeRequests*(market: Market, Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method isProofRequired*(market: Market, + id: SlotId): Future[bool] {.base, async.} = + raiseAssert("not implemented") + +method willProofBeRequired*(market: Market, + id: SlotId): Future[bool] {.base, async.} = + raiseAssert("not implemented") + +method submitProof*(market: Market, + id: SlotId, + proof: seq[byte]) {.base, async.} = + raiseAssert("not implemented") + +method markProofAsMissing*(market: Market, + id: SlotId, + period: Period) {.base, async.} = + raiseAssert("not implemented") + +method canProofBeMarkedAsMissing*(market: Market, + id: SlotId, + period: Period): Future[bool] {.base, async.} = + raiseAssert("not implemented") + method subscribeFulfillment*(market: Market, requestId: RequestId, callback: OnFulfillment): Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeSlotFilled*(market: Market, + callback: OnSlotFilled): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method subscribeSlotFilled*(market: Market, requestId: RequestId, slotIndex: UInt256, @@ -89,6 +130,11 @@ method subscribeSlotFilled*(market: Market, Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeSlotFreed*(market: Market, + callback: OnSlotFreed): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method subscribeRequestCancelled*(market: Market, requestId: RequestId, callback: OnRequestCancelled): @@ -101,5 +147,10 @@ method subscribeRequestFailed*(market: Market, Future[Subscription] {.base, async.} = raiseAssert("not implemented") +method subscribeProofSubmission*(market: Market, + callback: OnProofSubmitted): + Future[Subscription] {.base, async.} = + raiseAssert("not implemented") + method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = raiseAssert("not implemented") diff --git a/codex/node.nim b/codex/node.nim index 2e789243..f20aad1b 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -47,6 +47,7 @@ type Contracts* = tuple client: ?ClientInteractions host: ?HostInteractions + validator: ?ValidatorInteractions CodexNodeRef* = ref object switch*: Switch @@ -316,7 +317,7 @@ proc new*( engine: BlockExcEngine, erasure: Erasure, discovery: Discovery, - contracts: Contracts = (ClientInteractions.none, HostInteractions.none)): T = + contracts = Contracts.default): T = T( switch: switch, blockStore: store, @@ -389,6 +390,13 @@ proc start*(node: CodexNodeRef) {.async.} = error "Unable to start client contract interactions: ", error=error.msg node.contracts.client = ClientInteractions.none + if validatorContracts =? node.contracts.validator: + try: + await validatorContracts.start() + except CatchableError as error: + error "Unable to start validator contract interactions: ", error=error.msg + node.contracts.validator = ValidatorInteractions.none + node.networkId = node.switch.peerInfo.peerId notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs diff --git a/codex/storageproofs/timing/periods.nim b/codex/periods.nim similarity index 100% rename from codex/storageproofs/timing/periods.nim rename to codex/periods.nim diff --git a/codex/proving.nim b/codex/proving.nim index 92fb4477..30b4ed60 100644 --- a/codex/proving.nim +++ b/codex/proving.nim @@ -2,23 +2,22 @@ import std/sets import pkg/upraises import pkg/questionable import pkg/chronicles -import ./storageproofs +import ./market import ./clock export sets -export storageproofs type Proving* = ref object - proofs: Proofs + market: Market clock: Clock loop: ?Future[void] slots*: HashSet[Slot] onProve: ?OnProve OnProve* = proc(slot: Slot): Future[seq[byte]] {.gcsafe, upraises: [].} -func new*(_: type Proving, proofs: Proofs, clock: Clock): Proving = - Proving(proofs: proofs, clock: clock) +func new*(_: type Proving, market: Market, clock: Clock): Proving = + Proving(market: market, clock: clock) proc onProve*(proving: Proving): ?OnProve = proving.onProve @@ -30,17 +29,17 @@ func add*(proving: Proving, slot: Slot) = proving.slots.incl(slot) proc getCurrentPeriod(proving: Proving): Future[Period] {.async.} = - let periodicity = await proving.proofs.periodicity() + let periodicity = await proving.market.periodicity() return periodicity.periodOf(proving.clock.now().u256) proc waitUntilPeriod(proving: Proving, period: Period) {.async.} = - let periodicity = await proving.proofs.periodicity() + let periodicity = await proving.market.periodicity() await proving.clock.waitUntil(periodicity.periodStart(period).truncate(int64)) proc removeEndedContracts(proving: Proving) {.async.} = var ended: HashSet[Slot] for slot in proving.slots: - let state = await proving.proofs.slotState(slot.id) + let state = await proving.market.slotState(slot.id) if state != SlotState.Filled: ended.incl(slot) proving.slots.excl(ended) @@ -50,7 +49,7 @@ proc prove(proving: Proving, slot: Slot) {.async.} = raiseAssert "onProve callback not set" try: let proof = await onProve(slot) - await proving.proofs.submitProof(slot.id, proof) + await proving.market.submitProof(slot.id, proof) except CatchableError as e: error "Submitting proof failed", msg = e.msg @@ -61,8 +60,8 @@ proc run(proving: Proving) {.async.} = await proving.removeEndedContracts() for slot in proving.slots: let id = slot.id - if (await proving.proofs.isProofRequired(id)) or - (await proving.proofs.willProofBeRequired(id)): + if (await proving.market.isProofRequired(id)) or + (await proving.market.willProofBeRequired(id)): asyncSpawn proving.prove(slot) await proving.waitUntilPeriod(currentPeriod + 1) except CancelledError: @@ -85,4 +84,4 @@ proc stop*(proving: Proving) {.async.} = proc subscribeProofSubmission*(proving: Proving, callback: OnProofSubmitted): Future[Subscription] = - proving.proofs.subscribeProofSubmission(callback) + proving.market.subscribeProofSubmission(callback) diff --git a/codex/sales.nim b/codex/sales.nim index 71ff5495..01f3da77 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -4,7 +4,6 @@ import pkg/upraises import pkg/stint import pkg/chronicles import pkg/datastore -import pkg/upraises import ./rng import ./market import ./clock diff --git a/codex/storageproofs.nim b/codex/storageproofs.nim index ef0a8f00..aabca183 100644 --- a/codex/storageproofs.nim +++ b/codex/storageproofs.nim @@ -1,7 +1,6 @@ import ./storageproofs/por -import ./storageproofs/timing import ./storageproofs/stpstore import ./storageproofs/stpnetwork import ./storageproofs/stpproto -export por, timing, stpstore, stpnetwork, stpproto +export por, stpstore, stpnetwork, stpproto diff --git a/codex/storageproofs/storageproofs.nim b/codex/storageproofs/storageproofs.nim index eb9ae6cf..179e7654 100644 --- a/codex/storageproofs/storageproofs.nim +++ b/codex/storageproofs/storageproofs.nim @@ -22,9 +22,8 @@ import ./por import ./stpnetwork import ./stpproto import ./stpstore -import ./timing -export stpnetwork, stpstore, por, timing, stpproto +export stpnetwork, stpstore, por, stpproto type StorageProofs* = object diff --git a/codex/storageproofs/timing.nim b/codex/storageproofs/timing.nim deleted file mode 100644 index 163295e0..00000000 --- a/codex/storageproofs/timing.nim +++ /dev/null @@ -1,4 +0,0 @@ -import ./timing/periods -import ./timing/proofs - -export periods, proofs diff --git a/codex/storageproofs/timing/proofs.nim b/codex/storageproofs/timing/proofs.nim deleted file mode 100644 index 2386d07b..00000000 --- a/codex/storageproofs/timing/proofs.nim +++ /dev/null @@ -1,44 +0,0 @@ -import pkg/chronos -import pkg/stint -import pkg/upraises -import ./periods -import ../../contracts/requests - -export chronos -export stint -export periods -export requests - -type - Proofs* = ref object of RootObj - Subscription* = ref object of RootObj - OnProofSubmitted* = proc(id: SlotId, proof: seq[byte]) {.gcsafe, upraises:[].} - -method periodicity*(proofs: Proofs): - Future[Periodicity] {.base, async.} = - raiseAssert("not implemented") - -method isProofRequired*(proofs: Proofs, - id: SlotId): Future[bool] {.base, async.} = - raiseAssert("not implemented") - -method willProofBeRequired*(proofs: Proofs, - id: SlotId): Future[bool] {.base, async.} = - raiseAssert("not implemented") - -method slotState*(proofs: Proofs, - id: SlotId): Future[SlotState] {.base, async.} = - raiseAssert("not implemented") - -method submitProof*(proofs: Proofs, - id: SlotId, - proof: seq[byte]) {.base, async.} = - raiseAssert("not implemented") - -method subscribeProofSubmission*(proofs: Proofs, - callback: OnProofSubmitted): - Future[Subscription] {.base, async.} = - raiseAssert("not implemented") - -method unsubscribe*(subscription: Subscription) {.base, async, upraises:[].} = - raiseAssert("not implemented") diff --git a/codex/utils/timer.nim b/codex/utils/timer.nim index 39dd122d..446570b6 100644 --- a/codex/utils/timer.nim +++ b/codex/utils/timer.nim @@ -32,6 +32,8 @@ proc timerLoop(timer: Timer) {.async.} = while true: await timer.callback() await sleepAsync(timer.interval) + except CancelledError: + raise except CatchableError as exc: error "Timer caught unhandled exception: ", name=timer.name, msg=exc.msg diff --git a/codex/validation.nim b/codex/validation.nim new file mode 100644 index 00000000..017e212d --- /dev/null +++ b/codex/validation.nim @@ -0,0 +1,102 @@ +import std/sets +import std/sequtils +import pkg/chronos +import pkg/chronicles +import ./market +import ./clock + +export market +export sets + +type + Validation* = ref object + slots: HashSet[SlotId] + maxSlots: int + clock: Clock + market: Market + subscriptions: seq[Subscription] + running: Future[void] + periodicity: Periodicity + proofTimeout: UInt256 + +logScope: + topics = "codex validator" + +proc new*(_: type Validation, + clock: Clock, + market: Market, + maxSlots: int): Validation = + Validation(clock: clock, market: market, maxSlots: maxSlots) + +proc slots*(validation: Validation): seq[SlotId] = + validation.slots.toSeq + +proc getCurrentPeriod(validation: Validation): UInt256 = + return validation.periodicity.periodOf(validation.clock.now().u256) + +proc waitUntilNextPeriod(validation: Validation) {.async.} = + let period = validation.getCurrentPeriod() + let periodEnd = validation.periodicity.periodEnd(period) + trace "Waiting until next period", currentPeriod = period + await validation.clock.waitUntil(periodEnd.truncate(int64) + 1) + +proc subscribeSlotFilled(validation: Validation) {.async.} = + proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) = + let slotId = slotId(requestId, slotIndex) + if slotId notin validation.slots: + if validation.slots.len < validation.maxSlots: + trace "Adding slot", slotId = $slotId + validation.slots.incl(slotId) + let subscription = await validation.market.subscribeSlotFilled(onSlotFilled) + validation.subscriptions.add(subscription) + +proc removeSlotsThatHaveEnded(validation: Validation) {.async.} = + var ended: HashSet[SlotId] + for slotId in validation.slots: + let state = await validation.market.slotState(slotId) + if state != SlotState.Filled: + trace "Removing slot", slot = $slotId + ended.incl(slotId) + validation.slots.excl(ended) + +proc markProofAsMissing(validation: Validation, + slotId: SlotId, + period: Period) {.async.} = + try: + if await validation.market.canProofBeMarkedAsMissing(slotId, period): + trace "Marking proof as missing", slotId = $slotId, period = period + await validation.market.markProofAsMissing(slotId, period) + except CancelledError: + raise + except CatchableError as e: + debug "Marking proof as missing failed", msg = e.msg + +proc markProofsAsMissing(validation: Validation) {.async.} = + for slotId in validation.slots: + let previousPeriod = validation.getCurrentPeriod() - 1 + await validation.markProofAsMissing(slotId, previousPeriod) + +proc run(validation: Validation) {.async.} = + trace "Validation started" + try: + while true: + await validation.waitUntilNextPeriod() + await validation.removeSlotsThatHaveEnded() + await validation.markProofsAsMissing() + except CancelledError: + trace "Validation stopped" + discard + except CatchableError as e: + error "Validation failed", msg = e.msg + +proc start*(validation: Validation) {.async.} = + validation.periodicity = await validation.market.periodicity() + validation.proofTimeout = await validation.market.proofTimeout() + await validation.subscribeSlotFilled() + validation.running = validation.run() + +proc stop*(validation: Validation) {.async.} = + await validation.running.cancelAndWait() + while validation.subscriptions.len > 0: + let subscription = validation.subscriptions.pop() + await subscription.unsubscribe() diff --git a/tests/codex/helpers/mockclock.nim b/tests/codex/helpers/mockclock.nim index 55283f8a..38a3156d 100644 --- a/tests/codex/helpers/mockclock.nim +++ b/tests/codex/helpers/mockclock.nim @@ -1,4 +1,5 @@ import std/times +import pkg/chronos import codex/clock export clock @@ -6,16 +7,33 @@ export clock type MockClock* = ref object of Clock time: SecondsSince1970 + waiting: seq[Waiting] + Waiting = ref object + until: SecondsSince1970 + future: Future[void] func new*(_: type MockClock, time: SecondsSince1970 = getTime().toUnix): MockClock = MockClock(time: time) -func set*(clock: MockClock, time: SecondsSince1970) = +proc set*(clock: MockClock, time: SecondsSince1970) = clock.time = time + var index = 0 + while index < clock.waiting.len: + if clock.waiting[index].until <= clock.time: + clock.waiting[index].future.complete() + clock.waiting.del(index) + else: + inc index -func advance*(clock: MockClock, seconds: int64) = - clock.time += seconds +proc advance*(clock: MockClock, seconds: int64) = + clock.set(clock.time + seconds) method now*(clock: MockClock): SecondsSince1970 = clock.time + +method waitUntil*(clock: MockClock, time: SecondsSince1970) {.async.} = + if time > clock.now(): + let future = newFuture[void]() + clock.waiting.add(Waiting(until: time, future: future)) + await future diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index bd9495de..5f89e9f6 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -1,15 +1,18 @@ import std/sequtils import std/tables import std/hashes +import std/sets import pkg/codex/market import pkg/codex/contracts/requests import pkg/codex/contracts/config +import ../examples export market export tables type MockMarket* = ref object of Market + periodicity: Periodicity activeRequests*: Table[Address, seq[RequestId]] activeSlots*: Table[Address, seq[SlotId]] requested*: seq[StorageRequest] @@ -18,10 +21,16 @@ type slotState*: Table[SlotId, SlotState] fulfilled*: seq[Fulfillment] filled*: seq[MockSlot] + freed*: seq[SlotId] + markedAsMissingProofs*: seq[SlotId] + canBeMarkedAsMissing: HashSet[SlotId] withdrawn*: seq[RequestId] + proofsRequired: HashSet[SlotId] + proofsToBeRequired: HashSet[SlotId] + proofEnds: Table[SlotId, UInt256] signer: Address subscriptions: Subscriptions - config: MarketplaceConfig + config*: MarketplaceConfig Fulfillment* = object requestId*: RequestId proof*: seq[byte] @@ -35,8 +44,10 @@ type onRequest: seq[RequestSubscription] onFulfillment: seq[FulfillmentSubscription] onSlotFilled: seq[SlotFilledSubscription] + onSlotFreed: seq[SlotFreedSubscription] onRequestCancelled: seq[RequestCancelledSubscription] onRequestFailed: seq[RequestFailedSubscription] + onProofSubmitted: seq[ProofSubmittedSubscription] RequestSubscription* = ref object of Subscription market: MockMarket callback: OnRequest @@ -46,9 +57,12 @@ type callback: OnFulfillment SlotFilledSubscription* = ref object of Subscription market: MockMarket - requestId: RequestId - slotIndex: UInt256 + requestId: ?RequestId + slotIndex: ?UInt256 callback: OnSlotFilled + SlotFreedSubscription* = ref object of Subscription + market: MockMarket + callback: OnSlotFreed RequestCancelledSubscription* = ref object of Subscription market: MockMarket requestId: RequestId @@ -57,6 +71,9 @@ type market: MockMarket requestId: RequestId callback: OnRequestCancelled + ProofSubmittedSubscription = ref object of Subscription + market: MockMarket + callback: OnProofSubmitted proc hash*(address: Address): Hash = hash(address.toArray) @@ -83,6 +100,12 @@ proc new*(_: type MockMarket): MockMarket = method getSigner*(market: MockMarket): Future[Address] {.async.} = return market.signer +method periodicity*(mock: MockMarket): Future[Periodicity] {.async.} = + return Periodicity(seconds: mock.config.proofs.period) + +method proofTimeout*(market: MockMarket): Future[UInt256] {.async.} = + return market.config.proofs.timeout + method requestStorage*(market: MockMarket, request: StorageRequest) {.async.} = market.requested.add(request) var subscriptions = market.subscriptions.onRequest @@ -139,10 +162,20 @@ proc emitSlotFilled*(market: MockMarket, slotIndex: UInt256) = var subscriptions = market.subscriptions.onSlotFilled for subscription in subscriptions: - if subscription.requestId == requestId and - subscription.slotIndex == slotIndex: + let requestMatches = + subscription.requestId.isNone or + subscription.requestId == some requestId + let slotMatches = + subscription.slotIndex.isNone or + subscription.slotIndex == some slotIndex + if requestMatches and slotMatches: subscription.callback(requestId, slotIndex) +proc emitSlotFreed*(market: MockMarket, slotId: SlotId) = + var subscriptions = market.subscriptions.onSlotFreed + for subscription in subscriptions: + subscription.callback(slotId) + proc emitRequestCancelled*(market: MockMarket, requestId: RequestId) = var subscriptions = market.subscriptions.onRequestCancelled @@ -174,6 +207,7 @@ proc fillSlot*(market: MockMarket, host: host ) market.filled.add(slot) + market.slotState[slotId(slot.requestId, slot.slotIndex)] = SlotState.Filled market.emitSlotFilled(requestId, slotIndex) method fillSlot*(market: MockMarket, @@ -183,11 +217,58 @@ method fillSlot*(market: MockMarket, collateral: UInt256) {.async.} = market.fillSlot(requestId, slotIndex, proof, market.signer) +method freeSlot*(market: MockMarket, slotId: SlotId) {.async.} = + market.freed.add(slotId) + market.emitSlotFreed(slotId) + method withdrawFunds*(market: MockMarket, requestId: RequestId) {.async.} = market.withdrawn.add(requestId) market.emitRequestCancelled(requestId) +proc setProofRequired*(mock: MockMarket, id: SlotId, required: bool) = + if required: + mock.proofsRequired.incl(id) + else: + mock.proofsRequired.excl(id) + +method isProofRequired*(mock: MockMarket, + id: SlotId): Future[bool] {.async.} = + return mock.proofsRequired.contains(id) + +proc setProofToBeRequired*(mock: MockMarket, id: SlotId, required: bool) = + if required: + mock.proofsToBeRequired.incl(id) + else: + mock.proofsToBeRequired.excl(id) + +method willProofBeRequired*(mock: MockMarket, + id: SlotId): Future[bool] {.async.} = + return mock.proofsToBeRequired.contains(id) + +proc setProofEnd*(mock: MockMarket, id: SlotId, proofEnd: UInt256) = + mock.proofEnds[id] = proofEnd + +method submitProof*(mock: MockMarket, id: SlotId, proof: seq[byte]) {.async.} = + for subscription in mock.subscriptions.onProofSubmitted: + subscription.callback(id, proof) + +method markProofAsMissing*(market: MockMarket, + id: SlotId, + period: Period) {.async.} = + market.markedAsMissingProofs.add(id) + +proc setCanProofBeMarkedAsMissing*(mock: MockMarket, id: SlotId, required: bool) = + if required: + mock.canBeMarkedAsMissing.incl(id) + else: + mock.canBeMarkedAsMissing.excl(id) + +method canProofBeMarkedAsMissing*(market: MockMarket, + id: SlotId, + period: Period): Future[bool] {.async.} = + return market.canBeMarkedAsMissing.contains(id) + method subscribeRequests*(market: MockMarket, callback: OnRequest): Future[Subscription] {.async.} = @@ -210,6 +291,13 @@ method subscribeFulfillment*(market: MockMarket, market.subscriptions.onFulfillment.add(subscription) return subscription +method subscribeSlotFilled*(market: MockMarket, + callback: OnSlotFilled): + Future[Subscription] {.async.} = + let subscription = SlotFilledSubscription(market: market, callback: callback) + market.subscriptions.onSlotFilled.add(subscription) + return subscription + method subscribeSlotFilled*(market: MockMarket, requestId: RequestId, slotIndex: UInt256, @@ -217,13 +305,20 @@ method subscribeSlotFilled*(market: MockMarket, Future[Subscription] {.async.} = let subscription = SlotFilledSubscription( market: market, - requestId: requestId, - slotIndex: slotIndex, + requestId: some requestId, + slotIndex: some slotIndex, callback: callback ) market.subscriptions.onSlotFilled.add(subscription) return subscription +method subscribeSlotFreed*(market: MockMarket, + callback: OnSlotFreed): + Future[Subscription] {.async.} = + let subscription = SlotFreedSubscription(market: market, callback: callback) + market.subscriptions.onSlotFreed.add(subscription) + return subscription + method subscribeRequestCancelled*(market: MockMarket, requestId: RequestId, callback: OnRequestCancelled): @@ -248,6 +343,16 @@ method subscribeRequestFailed*(market: MockMarket, market.subscriptions.onRequestFailed.add(subscription) return subscription +method subscribeProofSubmission*(mock: MockMarket, + callback: OnProofSubmitted): + Future[Subscription] {.async.} = + let subscription = ProofSubmittedSubscription( + market: mock, + callback: callback + ) + mock.subscriptions.onProofSubmitted.add(subscription) + return subscription + method unsubscribe*(subscription: RequestSubscription) {.async.} = subscription.market.subscriptions.onRequest.keepItIf(it != subscription) @@ -257,8 +362,14 @@ method unsubscribe*(subscription: FulfillmentSubscription) {.async.} = method unsubscribe*(subscription: SlotFilledSubscription) {.async.} = subscription.market.subscriptions.onSlotFilled.keepItIf(it != subscription) +method unsubscribe*(subscription: SlotFreedSubscription) {.async.} = + subscription.market.subscriptions.onSlotFreed.keepItIf(it != subscription) + method unsubscribe*(subscription: RequestCancelledSubscription) {.async.} = subscription.market.subscriptions.onRequestCancelled.keepItIf(it != subscription) method unsubscribe*(subscription: RequestFailedSubscription) {.async.} = subscription.market.subscriptions.onRequestFailed.keepItIf(it != subscription) + +method unsubscribe*(subscription: ProofSubmittedSubscription) {.async.} = + subscription.market.subscriptions.onProofSubmitted.keepItIf(it != subscription) diff --git a/tests/codex/helpers/mockproofs.nim b/tests/codex/helpers/mockproofs.nim deleted file mode 100644 index 4c86f7ee..00000000 --- a/tests/codex/helpers/mockproofs.nim +++ /dev/null @@ -1,74 +0,0 @@ -import std/sets -import std/tables -import std/sequtils -import pkg/upraises -import pkg/codex/storageproofs - -type - MockProofs* = ref object of Proofs - periodicity: Periodicity - proofsRequired: HashSet[SlotId] - proofsToBeRequired: HashSet[SlotId] - proofEnds: Table[SlotId, UInt256] - subscriptions: seq[MockSubscription] - slotStates: Table[SlotId, SlotState] - MockSubscription* = ref object of Subscription - proofs: MockProofs - callback: OnProofSubmitted - -const DefaultPeriodLength = 10.u256 - -func new*(_: type MockProofs): MockProofs = - MockProofs(periodicity: Periodicity(seconds: DefaultPeriodLength)) - -func setPeriodicity*(mock: MockProofs, periodicity: Periodicity) = - mock.periodicity = periodicity - -method periodicity*(mock: MockProofs): Future[Periodicity] {.async.} = - return mock.periodicity - -proc setProofRequired*(mock: MockProofs, id: SlotId, required: bool) = - if required: - mock.proofsRequired.incl(id) - else: - mock.proofsRequired.excl(id) - -method isProofRequired*(mock: MockProofs, - id: SlotId): Future[bool] {.async.} = - return mock.proofsRequired.contains(id) - -proc setProofToBeRequired*(mock: MockProofs, id: SlotId, required: bool) = - if required: - mock.proofsToBeRequired.incl(id) - else: - mock.proofsToBeRequired.excl(id) - -method willProofBeRequired*(mock: MockProofs, - id: SlotId): Future[bool] {.async.} = - return mock.proofsToBeRequired.contains(id) - -proc setProofEnd*(mock: MockProofs, id: SlotId, proofEnd: UInt256) = - mock.proofEnds[id] = proofEnd - -method submitProof*(mock: MockProofs, - id: SlotId, - proof: seq[byte]) {.async.} = - for subscription in mock.subscriptions: - subscription.callback(id, proof) - -method subscribeProofSubmission*(mock: MockProofs, - callback: OnProofSubmitted): - Future[Subscription] {.async.} = - let subscription = MockSubscription(proofs: mock, callback: callback) - mock.subscriptions.add(subscription) - return subscription - -method unsubscribe*(subscription: MockSubscription) {.async, upraises:[].} = - subscription.proofs.subscriptions.keepItIf(it != subscription) - -method slotState*(mock: MockProofs, - slotId: SlotId): Future[SlotState] {.async.} = - return mock.slotStates[slotId] - -proc setSlotState*(mock: MockProofs, slotId: SlotId, state: SlotState) = - mock.slotStates[slotId] = state diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 3486e478..70306a92 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -6,7 +6,6 @@ import pkg/asynctest import pkg/datastore import pkg/json_serialization import pkg/json_serialization/std/options -import pkg/stew/byteutils import pkg/codex/stores import pkg/codex/sales diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index eaa81122..241caa6e 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -146,7 +146,6 @@ suite "Sales": test "retrieves and stores data locally": var storingRequest: StorageRequest var storingSlot: UInt256 - var storingAvailability: Availability sales.onStore = proc(request: StorageRequest, slot: UInt256, onBatch: BatchProc): Future[?!void] {.async.} = diff --git a/tests/codex/testproving.nim b/tests/codex/testproving.nim index 8a2641a6..68bb9a50 100644 --- a/tests/codex/testproving.nim +++ b/tests/codex/testproving.nim @@ -1,7 +1,7 @@ import pkg/asynctest import pkg/chronos import pkg/codex/proving -import ./helpers/mockproofs +import ./helpers/mockmarket import ./helpers/mockclock import ./helpers/eventually import ./examples @@ -9,20 +9,20 @@ import ./examples suite "Proving": var proving: Proving - var proofs: MockProofs + var market: MockMarket var clock: MockClock setup: - proofs = MockProofs.new() + market = MockMarket.new() clock = MockClock.new() - proving = Proving.new(proofs, clock) + proving = Proving.new(market, clock) await proving.start() teardown: await proving.stop() - proc advanceToNextPeriod(proofs: MockProofs) {.async.} = - let periodicity = await proofs.periodicity() + proc advanceToNextPeriod(market: MockMarket) {.async.} = + let periodicity = await market.periodicity() clock.advance(periodicity.seconds.truncate(int64)) test "maintains a list of slots to watch": @@ -47,9 +47,9 @@ suite "Proving": proc onProve(slot: Slot): Future[seq[byte]] {.async.} = called = true proving.onProve = onProve - proofs.setSlotState(slot.id, SlotState.Filled) - proofs.setProofRequired(slot.id, true) - await proofs.advanceToNextPeriod() + market.slotState[slot.id] = SlotState.Filled + market.setProofRequired(slot.id, true) + await market.advanceToNextPeriod() check eventually called test "callback receives slot for which proof is required": @@ -60,14 +60,14 @@ suite "Proving": proc onProve(slot: Slot): Future[seq[byte]] {.async.} = callbackSlots.add(slot) proving.onProve = onProve - proofs.setSlotState(slot1.id, SlotState.Filled) - proofs.setSlotState(slot2.id, SlotState.Filled) - proofs.setProofRequired(slot1.id, true) - await proofs.advanceToNextPeriod() + market.slotState[slot1.id] = SlotState.Filled + market.slotState[slot2.id] = SlotState.Filled + market.setProofRequired(slot1.id, true) + await market.advanceToNextPeriod() check eventually callbackSlots == @[slot1] - proofs.setProofRequired(slot1.id, false) - proofs.setProofRequired(slot2.id, true) - await proofs.advanceToNextPeriod() + market.setProofRequired(slot1.id, false) + market.setProofRequired(slot2.id, true) + await market.advanceToNextPeriod() check eventually callbackSlots == @[slot1, slot2] test "invokes callback when proof is about to be required": @@ -77,24 +77,24 @@ suite "Proving": proc onProve(slot: Slot): Future[seq[byte]] {.async.} = called = true proving.onProve = onProve - proofs.setProofRequired(slot.id, false) - proofs.setProofToBeRequired(slot.id, true) - proofs.setSlotState(slot.id, SlotState.Filled) - await proofs.advanceToNextPeriod() + market.setProofRequired(slot.id, false) + market.setProofToBeRequired(slot.id, true) + market.slotState[slot.id] = SlotState.Filled + await market.advanceToNextPeriod() check eventually called test "stops watching when slot is finished": let slot = Slot.example proving.add(slot) - proofs.setProofEnd(slot.id, clock.now().u256) - await proofs.advanceToNextPeriod() + market.setProofEnd(slot.id, clock.now().u256) + await market.advanceToNextPeriod() var called: bool proc onProve(slot: Slot): Future[seq[byte]] {.async.} = called = true proving.onProve = onProve - proofs.setProofRequired(slot.id, true) - await proofs.advanceToNextPeriod() - proofs.setSlotState(slot.id, SlotState.Finished) + market.setProofRequired(slot.id, true) + await market.advanceToNextPeriod() + market.slotState[slot.id] = SlotState.Finished check eventually (not proving.slots.contains(slot)) check not called @@ -115,9 +115,9 @@ suite "Proving": let subscription = await proving.subscribeProofSubmission(onProofSubmission) proving.add(slot) - proofs.setSlotState(slot.id, SlotState.Filled) - proofs.setProofRequired(slot.id, true) - await proofs.advanceToNextPeriod() + market.slotState[slot.id] = SlotState.Filled + market.setProofRequired(slot.id, true) + await market.advanceToNextPeriod() check eventually receivedIds == @[slot.id] and receivedProofs == @[proof] diff --git a/tests/codex/testvalidation.nim b/tests/codex/testvalidation.nim new file mode 100644 index 00000000..2134ae95 --- /dev/null +++ b/tests/codex/testvalidation.nim @@ -0,0 +1,65 @@ +import pkg/asynctest +import pkg/chronos + +import codex/validation +import ./helpers/mockmarket +import ./helpers/mockclock +import ./helpers/eventually +import ./examples + +suite "validation": + + let period = 10 + let timeout = 5 + let maxSlots = 100 + let slot = Slot.example + let collateral = slot.request.ask.collateral + + var validation: Validation + var market: MockMarket + var clock: MockClock + + setup: + market = MockMarket.new() + clock = MockClock.new() + validation = Validation.new(clock, market, maxSlots) + market.config.proofs.period = period.u256 + market.config.proofs.timeout = timeout.u256 + await validation.start() + + teardown: + await validation.stop() + + test "the list of slots that it's monitoring is empty initially": + check validation.slots.len == 0 + + test "when a slot is filled on chain, it is added to the list": + await market.fillSlot(slot.request.id, slot.slotIndex, @[], collateral) + check validation.slots == @[slot.id] + + for state in [SlotState.Finished, SlotState.Failed]: + test "when slot state changes, it is removed from the list": + await market.fillSlot(slot.request.id, slot.slotIndex, @[], collateral) + market.slotState[slot.id] = state + clock.advance(period) + check eventually validation.slots.len == 0 + + test "when a proof is missed, it is marked as missing": + await market.fillSlot(slot.request.id, slot.slotIndex, @[], collateral) + market.setCanProofBeMarkedAsMissing(slot.id, true) + clock.advance(period) + await sleepAsync(1.millis) + check market.markedAsMissingProofs.contains(slot.id) + + test "when a proof can not be marked as missing, it will not be marked": + await market.fillSlot(slot.request.id, slot.slotIndex, @[], collateral) + market.setCanProofBeMarkedAsMissing(slot.id, false) + clock.advance(period) + await sleepAsync(1.millis) + check market.markedAsMissingProofs.len == 0 + + test "it does not monitor more than the maximum number of slots": + for _ in 0..