diff --git a/codex/codex.nim b/codex/codex.nim index 48a13de5..4ec7b867 100644 --- a/codex/codex.nim +++ b/codex/codex.nim @@ -96,7 +96,10 @@ proc stop*(s: CodexServer) {.async.} = s.runHandle.complete() -proc new(_: type ContractInteractions, config: CodexConf): ?ContractInteractions = +proc new(_: type Contracts, + config: CodexConf, + repo: RepoStore): Contracts = + if not config.persistence: if config.ethAccount.isSome: warn "Ethereum account was set, but persistence is not enabled" @@ -106,10 +109,31 @@ proc new(_: type ContractInteractions, config: CodexConf): ?ContractInteractions error "Persistence enabled, but no Ethereum account was set" quit QuitFailure - if deployment =? config.ethDeployment: - ContractInteractions.new(config.ethProvider, account, deployment) - else: - ContractInteractions.new(config.ethProvider, account) + var deploy: Deployment + try: + if deployFile =? config.ethDeployment: + deploy = Deployment.init(deployFile) + else: + deploy = Deployment.init() + except IOError as e: + error "Unable to read deployment json" + quit QuitFailure + + without marketplaceAddress =? deploy.address(Marketplace): + error "Marketplace contract address not found in deployment file" + quit QuitFailure + + # TODO: at some point there may be cli options that enable client-only or host-only + # operation, and both client AND host will not necessarily need to be instantiated + let client = ClientInteractions.new(config.ethProvider, + account, + marketplaceAddress) + let host = HostInteractions.new(config.ethProvider, + account, + repo, + marketplaceAddress) + + (client.option, host.option) proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): T = @@ -182,7 +206,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks) store = NetworkStore.new(engine, repoStore) erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider) - contracts = ContractInteractions.new(config) + contracts = Contracts.new(config, repoStore) codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery, contracts) restServer = RestServerRef.new( codexNode.initRestApi(config), diff --git a/codex/contracts/deployment.nim b/codex/contracts/deployment.nim index 3c1357b3..408a7503 100644 --- a/codex/contracts/deployment.nim +++ b/codex/contracts/deployment.nim @@ -12,7 +12,7 @@ const defaultFile = "vendor" / "codex-contracts-eth" / "deployment-localhost.jso ## been exported with Hardhat deploy. ## See also: ## https://github.com/wighawag/hardhat-deploy/tree/master#6-hardhat-export -proc deployment*(file = defaultFile): Deployment = +proc init*(_: type Deployment, file: string = defaultFile): Deployment = Deployment(json: parseFile(file)) proc address*(deployment: Deployment, Contract: typedesc): ?Address = diff --git a/codex/contracts/interactions.nim b/codex/contracts/interactions.nim index 4cf16492..01dcd14e 100644 --- a/codex/contracts/interactions.nim +++ b/codex/contracts/interactions.nim @@ -1,78 +1,5 @@ -import pkg/ethers -import pkg/chronicles -import ../purchasing -import ../sales -import ../proving -import ./deployment -import ./marketplace -import ./market -import ./proofs -import ./clock +import ./interactions/interactions +import ./interactions/hostinteractions +import ./interactions/clientinteractions -export purchasing -export sales -export proving -export chronicles - -type - ContractInteractions* = ref object - purchasing*: Purchasing - sales*: Sales - proving*: Proving - clock: OnChainClock - -proc new*(_: type ContractInteractions, - signer: Signer, - deployment: Deployment): ?ContractInteractions = - - without address =? deployment.address(Marketplace): - error "Unable to determine address of the Marketplace smart contract" - return none ContractInteractions - - let contract = Marketplace.new(address, signer) - let market = OnChainMarket.new(contract) - let proofs = OnChainProofs.new(contract) - let clock = OnChainClock.new(signer.provider) - let proving = Proving.new(proofs, clock) - some ContractInteractions( - purchasing: Purchasing.new(market, clock), - sales: Sales.new(market, clock, proving), - proving: proving, - clock: clock - ) - -proc new*(_: type ContractInteractions, - providerUrl: string, - account: Address, - deploymentFile: string = string.default): ?ContractInteractions = - - let provider = JsonRpcProvider.new(providerUrl) - let signer = provider.getSigner(account) - - var deploy: Deployment - try: - if deploymentFile == string.default: - deploy = deployment() - else: - deploy = deployment(deploymentFile) - except IOError as e: - error "Unable to read deployment json", msg = e.msg - return none ContractInteractions - - ContractInteractions.new(signer, deploy) - -proc new*(_: type ContractInteractions, - account: Address): ?ContractInteractions = - ContractInteractions.new("ws://localhost:8545", account) - -proc start*(interactions: ContractInteractions) {.async.} = - await interactions.clock.start() - await interactions.sales.start() - await interactions.proving.start() - await interactions.purchasing.start() - -proc stop*(interactions: ContractInteractions) {.async.} = - await interactions.purchasing.stop() - await interactions.sales.stop() - await interactions.proving.stop() - await interactions.clock.stop() +export interactions, hostinteractions, clientinteractions diff --git a/codex/contracts/interactions/clientinteractions.nim b/codex/contracts/interactions/clientinteractions.nim new file mode 100644 index 00000000..b0162419 --- /dev/null +++ b/codex/contracts/interactions/clientinteractions.nim @@ -0,0 +1,38 @@ +import pkg/ethers +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results + +import ../../purchasing +import ../marketplace +import ../market +import ../proofs +import ../clock +import ./interactions + +export purchasing +export chronicles + +type + ClientInteractions* = ref object of ContractInteractions + purchasing*: Purchasing + +proc new*(_: type ClientInteractions, + providerUrl: string, + account: Address, + contractAddress: Address): ?!ClientInteractions = + + without prepared =? prepare(providerUrl, account, contractAddress), error: + return failure(error) + + let c = ClientInteractions.new(prepared.clock) + c.purchasing = Purchasing.new(prepared.market, prepared.clock) + return success(c) + +proc start*(self: ClientInteractions) {.async.} = + await procCall ContractInteractions(self).start() + await self.purchasing.start() + +proc stop*(self: ClientInteractions) {.async.} = + await self.purchasing.stop() + await procCall ContractInteractions(self).stop() diff --git a/codex/contracts/interactions/hostinteractions.nim b/codex/contracts/interactions/hostinteractions.nim new file mode 100644 index 00000000..04762700 --- /dev/null +++ b/codex/contracts/interactions/hostinteractions.nim @@ -0,0 +1,46 @@ +import pkg/ethers +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results + +import ../../sales +import ../../proving +import ../../stores +import ../proofs +import ./interactions + +export sales +export proving +export chronicles + +type + HostInteractions* = ref object of ContractInteractions + sales*: Sales + proving*: Proving + +proc new*(_: type HostInteractions, + providerUrl: string, + account: Address, + repo: RepoStore, + contractAddress: Address): ?!HostInteractions = + + without prepared =? prepare(providerUrl, account, contractAddress), error: + return failure(error) + + let proofs = OnChainProofs.new(prepared.contract) + let proving = Proving.new(proofs, prepared.clock) + + let h = HostInteractions.new(prepared.clock) + h.sales = Sales.new(prepared.market, prepared.clock, proving, repo) + h.proving = proving + return success(h) + +method start*(self: HostInteractions) {.async.} = + await procCall ContractInteractions(self).start() + await self.sales.start() + await self.proving.start() + +method stop*(self: HostInteractions) {.async.} = + await self.sales.stop() + await self.proving.stop() + await procCall ContractInteractions(self).start() diff --git a/codex/contracts/interactions/interactions.nim b/codex/contracts/interactions/interactions.nim new file mode 100644 index 00000000..ed647e3b --- /dev/null +++ b/codex/contracts/interactions/interactions.nim @@ -0,0 +1,36 @@ +import pkg/ethers +import ../../errors +import ../clock +import ../marketplace +import ../market + +type + ContractInteractions* = ref object of RootObj + clock: OnChainClock + ContractInteractionsError* = object of CodexError + ReadDeploymentFileFailureError* = object of ContractInteractionsError + ContractAddressError* = object of ContractInteractionsError + +proc new*(T: type ContractInteractions, + clock: OnChainClock): T = + T(clock: clock) + +proc prepare*( + providerUrl: string = "ws://localhost:8545", + account, contractAddress: Address): + ?!tuple[contract: Marketplace, market: OnChainMarket, clock: OnChainClock] = + + let provider = JsonRpcProvider.new(providerUrl) + let signer = provider.getSigner(account) + + let contract = Marketplace.new(contractAddress, signer) + let market = OnChainMarket.new(contract) + let clock = OnChainClock.new(signer.provider) + + return success((contract, market, clock)) + +method start*(self: ContractInteractions) {.async, base.} = + await self.clock.start() + +method stop*(self: ContractInteractions) {.async, base.} = + await self.clock.stop() diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index ff2dbcbc..2b1207ab 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -79,13 +79,15 @@ method getHost(market: OnChainMarket, else: return none Address -method getRequestFromSlotId*(market: OnChainMarket, - slotId: SlotId): Future[?StorageRequest] {.async.} = +method getActiveSlot*( + market: OnChainMarket, + slotId: SlotId): Future[?Slot] {.async.} = + try: - return some await market.contract.getRequestFromSlotId(slotId) + return some await market.contract.getActiveSlot(slotId) except ProviderError as e: if e.revertReason.contains("Slot is free"): - return none StorageRequest + return none Slot raise e method fillSlot(market: OnChainMarket, diff --git a/codex/contracts/marketplace.nim b/codex/contracts/marketplace.nim index f23f9aee..d578548d 100644 --- a/codex/contracts/marketplace.nim +++ b/codex/contracts/marketplace.nim @@ -48,7 +48,7 @@ proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.} proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.} proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.} proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.} -proc getRequestFromSlotId*(marketplace: Marketplace, id: SlotId): StorageRequest {.contract, view.} +proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.} proc myRequests*(marketplace: Marketplace): seq[RequestId] {.contract, view.} proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.} diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index f9c3cd73..188a9032 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -71,6 +71,12 @@ func fromTuple(_: type StorageRequest, tupl: tuple): StorageRequest = nonce: tupl[4] ) +func fromTuple(_: type Slot, tupl: tuple): Slot = + Slot( + request: tupl[0], + slotIndex: tupl[1] + ) + func fromTuple(_: type StorageAsk, tupl: tuple): StorageAsk = StorageAsk( slots: tupl[0], @@ -133,6 +139,9 @@ func encode*(encoder: var AbiEncoder, id: RequestId | SlotId | Nonce) = func encode*(encoder: var AbiEncoder, request: StorageRequest) = encoder.write(request.fieldValues) +func encode*(encoder: var AbiEncoder, request: Slot) = + encoder.write(request.fieldValues) + func decode*(decoder: var AbiDecoder, T: type StoragePoR): ?!T = let tupl = ?decoder.read(StoragePoR.fieldTypes) success StoragePoR.fromTuple(tupl) @@ -153,6 +162,10 @@ func decode*(decoder: var AbiDecoder, T: type StorageRequest): ?!T = let tupl = ?decoder.read(StorageRequest.fieldTypes) success StorageRequest.fromTuple(tupl) +func decode*(decoder: var AbiDecoder, T: type Slot): ?!T = + let tupl = ?decoder.read(Slot.fieldTypes) + success Slot.fromTuple(tupl) + func id*(request: StorageRequest): RequestId = let encoding = AbiEncoder.encode((request, )) RequestId(keccak256.digest(encoding).data) diff --git a/codex/market.nim b/codex/market.nim index dd78c381..df8caf57 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -58,8 +58,10 @@ method getHost*(market: Market, slotIndex: UInt256): Future[?Address] {.base, async.} = raiseAssert("not implemented") -method getRequestFromSlotId*(market: Market, - slotId: SlotId): Future[?StorageRequest] {.base, async.} = +method getActiveSlot*( + market: Market, + slotId: SlotId): Future[?Slot] {.base, async.} = + raiseAssert("not implemented") method fillSlot*(market: Market, diff --git a/codex/node.nim b/codex/node.nim index 342d7632..a77d1091 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -31,6 +31,9 @@ import ./streams import ./erasure import ./discovery import ./contracts +import ./node/batch + +export batch logScope: topics = "codex node" @@ -39,10 +42,12 @@ const FetchBatch = 200 type - BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, raises: [Defect].} - CodexError = object of CatchableError + Contracts* = tuple + client: ?ClientInteractions + host: ?HostInteractions + CodexNodeRef* = ref object switch*: Switch networkId*: PeerId @@ -50,7 +55,7 @@ type engine*: BlockExcEngine erasure*: Erasure discovery*: Discovery - contracts*: ?ContractInteractions + contracts*: Contracts proc findPeer*( node: CodexNodeRef, @@ -251,7 +256,7 @@ proc requestStorage*(self: CodexNodeRef, ## trace "Received a request for storage!", cid, duration, nodes, tolerance, reward - without contracts =? self.contracts: + without contracts =? self.contracts.client: trace "Purchasing not available" return failure "Purchasing not available" @@ -309,7 +314,7 @@ proc new*( engine: BlockExcEngine, erasure: Erasure, discovery: Discovery, - contracts = ContractInteractions.none): T = + contracts: Contracts = (ClientInteractions.none, HostInteractions.none)): T = T( switch: switch, blockStore: store, @@ -331,46 +336,56 @@ proc start*(node: CodexNodeRef) {.async.} = if not node.discovery.isNil: await node.discovery.start() - if contracts =? node.contracts: + if hostContracts =? node.contracts.host: # TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead - contracts.sales.onStore = proc(request: StorageRequest, - slot: UInt256, - availability: ?Availability) {.async.} = + hostContracts.sales.onStore = proc(request: StorageRequest, + slot: UInt256, + onBatch: BatchProc): Future[?!void] {.async.} = ## store data in local storage ## without cid =? Cid.init(request.content.cid): trace "Unable to parse Cid", cid - raise newException(CodexError, "Unable to parse Cid") + let error = newException(CodexError, "Unable to parse Cid") + return failure(error) without manifest =? await node.fetchManifest(cid), error: trace "Unable to fetch manifest for cid", cid - raise error + return failure(error) trace "Fetching block for manifest", cid # TODO: This will probably require a call to `getBlock` either way, # since fetching of blocks will have to be selective according # to a combination of parameters, such as node slot position # and dataset geometry - let fetchRes = await node.fetchBatched(manifest) - if fetchRes.isErr: - raise newException(CodexError, "Unable to retrieve blocks") + if fetchErr =? (await node.fetchBatched(manifest, onBatch = onBatch)).errorOption: + let error = newException(CodexError, "Unable to retrieve blocks") + error.parent = fetchErr + return failure(error) - contracts.sales.onClear = proc(availability: ?Availability, - request: StorageRequest, - slotIndex: UInt256) = + return success() + + hostContracts.sales.onClear = proc(request: StorageRequest, + slotIndex: UInt256) = # TODO: remove data from local storage discard - contracts.proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = + hostContracts.proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = # TODO: generate proof return @[42'u8] try: - await contracts.start() + await hostContracts.start() except CatchableError as error: - error "Unable to start contract interactions: ", error=error.msg - node.contracts = ContractInteractions.none + error "Unable to start host contract interactions: ", error=error.msg + node.contracts.host = HostInteractions.none + + if clientContracts =? node.contracts.client: + try: + await clientContracts.start() + except CatchableError as error: + error "Unable to start client contract interactions: ", error=error.msg + node.contracts.client = ClientInteractions.none node.networkId = node.switch.peerInfo.peerId notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs @@ -390,8 +405,11 @@ proc stop*(node: CodexNodeRef) {.async.} = if not node.discovery.isNil: await node.discovery.stop() - if contracts =? node.contracts: - await contracts.stop() + if clientContracts =? node.contracts.client: + await clientContracts.stop() + + if hostContracts =? node.contracts.host: + await hostContracts.stop() if not node.blockStore.isNil: await node.blockStore.close diff --git a/codex/node/batch.nim b/codex/node/batch.nim new file mode 100644 index 00000000..b6482989 --- /dev/null +++ b/codex/node/batch.nim @@ -0,0 +1,6 @@ +import pkg/chronos +import pkg/upraises +import ../blocktype as bt + +type + BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, upraises:[].} diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 5db1f4e5..38e6fb6d 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -255,10 +255,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = "/api/codex/v1/sales/availability") do () -> RestApiResponse: ## Returns storage that is for sale - without contracts =? node.contracts: + without contracts =? node.contracts.host: return RestApiResponse.error(Http503, "Sales unavailable") - let json = %contracts.sales.available + without unused =? (await contracts.sales.context.reservations.unused), err: + return RestApiResponse.error(Http500, err.msg) + + let json = %unused return RestApiResponse.response($json) router.rawApi( @@ -270,7 +273,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = ## duration - maximum time the storage should be sold for (in seconds) ## minPrice - minimum price to be paid (in amount of tokens) - without contracts =? node.contracts: + without contracts =? node.contracts.host: return RestApiResponse.error(Http503, "Sales unavailable") let body = await request.getBody() @@ -278,7 +281,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = without availability =? Availability.fromJson(body), error: return RestApiResponse.error(Http400, error.msg) - contracts.sales.add(availability) + let reservations = contracts.sales.context.reservations + + if not reservations.hasAvailable(availability.size.truncate(uint)): + return RestApiResponse.error(Http422, "Not enough storage quota") + + if err =? (await reservations.reserve(availability)).errorOption: + return RestApiResponse.error(Http500, err.msg) let json = %availability return RestApiResponse.response($json) @@ -288,7 +297,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = "/api/codex/v1/storage/purchases/{id}") do ( id: PurchaseId) -> RestApiResponse: - without contracts =? node.contracts: + without contracts =? node.contracts.client: return RestApiResponse.error(Http503, "Purchasing unavailable") without id =? id.tryGet.catch, error: diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 9d68d819..c578ef9f 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -48,7 +48,7 @@ func `%`*(stint: StInt|StUint): JsonNode = func `%`*(arr: openArray[byte]): JsonNode = %("0x" & arr.toHex) -func `%`*(id: RequestId | SlotId | Nonce): JsonNode = +func `%`*(id: RequestId | SlotId | Nonce | AvailabilityId): JsonNode = % id.toArray func `%`*(purchase: Purchase): JsonNode = diff --git a/codex/sales.nim b/codex/sales.nim index c859badd..71ff5495 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -2,16 +2,17 @@ import std/sequtils import pkg/questionable import pkg/upraises import pkg/stint -import pkg/nimcrypto import pkg/chronicles +import pkg/datastore +import pkg/upraises import ./rng import ./market import ./clock import ./proving +import ./stores import ./contracts/requests import ./sales/salescontext import ./sales/salesagent -import ./sales/availability import ./sales/statemachine import ./sales/states/downloading import ./sales/states/unknown @@ -35,13 +36,15 @@ import ./sales/states/unknown ## | | ---- storage proof ---> | export stint -export availability +export reservations + +logScope: + topics = "sales" type Sales* = ref object context*: SalesContext subscription*: ?market.Subscription - available: seq[Availability] agents*: seq[SalesAgent] proc `onStore=`*(sales: Sales, onStore: OnStore) = @@ -59,103 +62,57 @@ proc onClear*(sales: Sales): ?OnClear = sales.context.onClear proc onSale*(sales: Sales): ?OnSale = sales.context.onSale -proc available*(sales: Sales): seq[Availability] = sales.available - -proc init*(_: type Availability, - size: UInt256, - duration: UInt256, - minPrice: UInt256): Availability = - var id: array[32, byte] - doAssert randomBytes(id) == 32 - Availability(id: id, size: size, duration: duration, minPrice: minPrice) - -func add*(sales: Sales, availability: Availability) = - if not sales.available.contains(availability): - sales.available.add(availability) - # TODO: add to disk (persist), serialise to json. - -func remove*(sales: Sales, availability: Availability) = - sales.available.keepItIf(it != availability) - # TODO: remove from disk availability, mark as in use by assigning - # a slotId, so that it can be used for restoration (node restart) - func new*(_: type Sales, market: Market, clock: Clock, - proving: Proving): Sales = + proving: Proving, + repo: RepoStore): Sales = - let sales = Sales(context: SalesContext( + Sales(context: SalesContext( market: market, clock: clock, - proving: proving + proving: proving, + reservations: Reservations.new(repo) )) - proc onSaleErrored(availability: Availability) = - sales.add(availability) - - sales.context.onSaleErrored = some onSaleErrored - sales - -func findAvailability*(sales: Sales, ask: StorageAsk): ?Availability = - for availability in sales.available: - if ask.slotSize <= availability.size and - ask.duration <= availability.duration and - ask.pricePerSlot >= availability.minPrice: - return some availability - proc randomSlotIndex(numSlots: uint64): UInt256 = let rng = Rng.instance let slotIndex = rng.rand(numSlots - 1) return slotIndex.u256 -proc findSlotIndex(numSlots: uint64, - requestId: RequestId, - slotId: SlotId): ?UInt256 = - for i in 0.. 0: + + return some Json.decode(string.fromBytes(serialized), Availability) + + return none Availability + + iter.next = next + return success iter + +proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} = + var ret: seq[Availability] = @[] + + without availabilities =? (await r.availabilities), err: + return failure(err) + + for a in availabilities: + if availability =? (await a) and not availability.used: + ret.add availability + + return success(ret) + +proc find*( + self: Reservations, + size, duration, minPrice: UInt256, + used: bool): Future[?Availability] {.async.} = + + + without availabilities =? (await self.availabilities), err: + error "failed to get all availabilities", error = err.msg + return none Availability + + for a in availabilities: + if availability =? (await a): + + if used == availability.used and + size <= availability.size and + duration <= availability.duration and + minPrice >= availability.minPrice: + + trace "availability matched", + used, availUsed = availability.used, + size, availsize = availability.size, + duration, availDuration = availability.duration, + minPrice, availMinPrice = availability.minPrice + + return some availability + + trace "availiability did not match", + used, availUsed = availability.used, + size, availsize = availability.size, + duration, availDuration = availability.duration, + minPrice, availMinPrice = availability.minPrice diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 3bad1e93..372c3da8 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -1,27 +1,35 @@ import pkg/chronos +import pkg/chronicles import pkg/stint import ../contracts/requests import ../utils/asyncspawn import ./statemachine import ./salescontext import ./salesdata -import ./availability +import ./reservations + +export reservations + +logScope: + topics = "sales statemachine" type SalesAgent* = ref object of Machine context*: SalesContext data*: SalesData subscribed: bool +func `==`*(a, b: SalesAgent): bool = + a.data.requestId == b.data.requestId and + a.data.slotIndex == b.data.slotIndex + proc newSalesAgent*(context: SalesContext, requestId: RequestId, slotIndex: UInt256, - availability: ?Availability, request: ?StorageRequest): SalesAgent = SalesAgent( context: context, data: SalesData( requestId: requestId, - availability: availability, slotIndex: slotIndex, request: request)) diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index 092ae277..ede2b1a6 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -1,8 +1,11 @@ +import pkg/questionable +import pkg/questionable/results import pkg/upraises +import ../node/batch import ../market import ../clock import ../proving -import ./availability +import ./reservations type SalesContext* = ref object @@ -11,15 +14,17 @@ type onStore*: ?OnStore onClear*: ?OnClear onSale*: ?OnSale - onSaleErrored*: ?OnSaleErrored + onIgnored*: OnIgnored proving*: Proving + reservations*: Reservations + OnStore* = proc(request: StorageRequest, slot: UInt256, - availability: ?Availability): Future[void] {.gcsafe, upraises: [].} - OnClear* = proc(availability: ?Availability,# TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all) - request: StorageRequest, + onBatch: BatchProc): Future[?!void] {.gcsafe, upraises: [].} + OnProve* = proc(request: StorageRequest, + slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].} + OnClear* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} - OnSale* = proc(availability: ?Availability, # TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all) - request: StorageRequest, + OnSale* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} - OnSaleErrored* = proc(availability: Availability) {.gcsafe, upraises: [].} + OnIgnored* = proc() {.gcsafe, upraises: [].} diff --git a/codex/sales/salesdata.nim b/codex/sales/salesdata.nim index 87a9ccfb..d8226877 100644 --- a/codex/sales/salesdata.nim +++ b/codex/sales/salesdata.nim @@ -1,13 +1,12 @@ import pkg/chronos import ../contracts/requests import ../market -import ./availability +import ./reservations type SalesData* = ref object requestId*: RequestId ask*: StorageAsk - availability*: ?Availability # TODO: when availability persistence is added, change this to not optional request*: ?StorageRequest slotIndex*: UInt256 failed*: market.Subscription diff --git a/codex/sales/statemachine.nim b/codex/sales/statemachine.nim index bd824072..440e11a3 100644 --- a/codex/sales/statemachine.nim +++ b/codex/sales/statemachine.nim @@ -16,9 +16,6 @@ type SaleState* = ref object of State SaleError* = ref object of CodexError -method `$`*(state: SaleState): string {.base.} = - raiseAssert "not implemented" - method onCancelled*(state: SaleState, request: StorageRequest): ?State {.base, upraises:[].} = discard diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 0e162c6f..d4e14729 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -1,3 +1,7 @@ +import pkg/chronicles +import pkg/questionable +import pkg/questionable/results +import ../../blocktype as bt import ../../market import ../salesagent import ../statemachine @@ -5,12 +9,15 @@ import ./errorhandling import ./cancelled import ./failed import ./filled +import ./ignored import ./proving +import ./errored type SaleDownloading* = ref object of ErrorHandlingState - failedSubscription: ?market.Subscription - hasCancelled: ?Future[void] + +logScope: + topics = "sales downloading" method `$`*(state: SaleDownloading): string = "SaleDownloading" @@ -28,6 +35,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} let agent = SalesAgent(machine) let data = agent.data let context = agent.context + let reservations = context.reservations await agent.retrieveRequest() await agent.subscribe() @@ -38,5 +46,49 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} without request =? data.request: raiseAssert "no sale request" - await onStore(request, data.slotIndex, data.availability) + without availability =? await reservations.find( + request.ask.slotSize, + request.ask.duration, + request.ask.pricePerSlot, + used = false): + info "no availability found for request, ignoring", + slotSize = request.ask.slotSize, + duration = request.ask.duration, + pricePerSlot = request.ask.pricePerSlot, + used = false + return some State(SaleIgnored()) + + # mark availability as used so that it is not matched to other requests + if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption: + return some State(SaleErrored(error: markUsedErr)) + + proc onBatch(blocks: seq[bt.Block]) {.async.} = + # release batches of blocks as they are written to disk and + # update availability size + var bytes: uint = 0 + for blk in blocks: + bytes += blk.data.len.uint + + trace "Releasing batch of bytes written to disk", bytes + let r = await reservations.release(availability.id, bytes) + # `tryGet` will raise the exception that occurred during release, if there + # was one. The exception will be caught in the closure and sent to the + # SaleErrored state. + r.tryGet() + + template markUnused(id: AvailabilityId) = + if markUnusedErr =? (await reservations.markUnused(id)).errorOption: + return some State(SaleErrored(error: markUnusedErr)) + + trace "Starting download" + if err =? (await onStore(request, + data.slotIndex, + onBatch)).errorOption: + + markUnused(availability.id) + return some State(SaleErrored(error: err)) + + trace "Download complete" + + markUnused(availability.id) return some State(SaleProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 10cded06..d8fb21d5 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -1,3 +1,5 @@ +import pkg/questionable +import pkg/questionable/results import pkg/upraises import pkg/chronicles import ../statemachine @@ -19,16 +21,8 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = if onClear =? context.onClear and request =? data.request and slotIndex =? data.slotIndex: - onClear(data.availability, request, slotIndex) + onClear(request, slotIndex) - # TODO: when availability persistence is added, change this to not optional - # NOTE: with this in place, restoring state for a restarted node will - # never free up availability once finished. Persisting availability - # on disk is required for this. - if onSaleErrored =? context.onSaleErrored and - availability =? data.availability: - onSaleErrored(availability) - - await agent.unsubscribe() + await agent.unsubscribe() error "Sale error", error=state.error.msg diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 68d8a40f..73054d87 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -26,12 +26,6 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = context.proving.add(Slot(request: request, slotIndex: slotIndex)) if onSale =? context.onSale: - onSale(data.availability, request, slotIndex) - - # TODO: Keep track of contract completion using local clock. When contract - # has finished, we need to add back availability to the sales module. - # This will change when the state machine is updated to include the entire - # sales process, as well as when availability is persisted, so leaving it - # as a TODO for now. + onSale(request, slotIndex) await agent.unsubscribe() diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim new file mode 100644 index 00000000..c6f35e42 --- /dev/null +++ b/codex/sales/states/ignored.nim @@ -0,0 +1,18 @@ +import pkg/chronos +import ../statemachine +import ../salesagent +import ./errorhandling + +type + SaleIgnored* = ref object of ErrorHandlingState + +method `$`*(state: SaleIgnored): string = "SaleIgnored" + +method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = + let agent = SalesAgent(machine) + let context = agent.context + + if onIgnored =? context.onIgnored: + onIgnored() + + await agent.unsubscribe() diff --git a/codex/stores/repostore.nim b/codex/stores/repostore.nim index 6a804276..935d5c46 100644 --- a/codex/stores/repostore.nim +++ b/codex/stores/repostore.nim @@ -64,6 +64,12 @@ iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] = func totalUsed*(self: RepoStore): uint = (self.quotaUsedBytes + self.quotaReservedBytes) +func available*(self: RepoStore): uint = + return self.quotaMaxBytes - self.totalUsed + +func available*(self: RepoStore, bytes: uint): bool = + return bytes < self.available() + method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} = ## Get a block from the blockstore ## diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index e5f74742..315ffc12 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -15,6 +15,12 @@ type State* = ref object of RootObj Event* = proc(state: State): ?State {.gcsafe, upraises:[].} +logScope: + topics = "statemachine" + +method `$`*(state: State): string {.base.} = + raiseAssert "not implemented" + proc transition(_: type Event, previous, next: State): Event = return proc (state: State): ?State = if state == previous: @@ -59,6 +65,7 @@ proc scheduler(machine: Machine) {.async.} = if not machine.running.isNil: await machine.running.cancelAndWait() machine.state = next + debug "enter state", state = machine.state machine.running = machine.run(machine.state) machine.running.addCallback(onRunComplete) except CancelledError: diff --git a/openapi.yaml b/openapi.yaml index 0fa0c2e8..f8d31e1c 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -293,6 +293,8 @@ paths: type: array items: $ref: "#/components/schemas/SalesAvailability" + "500": + description: Error getting unused availabilities "503": description: Sales are unavailable @@ -312,6 +314,8 @@ paths: application/json: schema: $ref: "#/components/schemas/SalesAvailability" + "500": + description: Error reserving availablility "503": description: Sales are unavailable diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index df5b8dc4..7df7c0fd 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -105,12 +105,15 @@ method getRequest(market: MockMarket, return some request return none StorageRequest -method getRequestFromSlotId*(market: MockMarket, - slotId: SlotId): Future[?StorageRequest] {.async.} = +method getActiveSlot*( + market: MockMarket, + slotId: SlotId): Future[?Slot] {.async.} = + for slot in market.filled: - if slotId(slot.requestId, slot.slotIndex) == slotId: - return await market.getRequest(slot.requestId) - return none StorageRequest + if slotId(slot.requestId, slot.slotIndex) == slotId and + request =? await market.getRequest(slot.requestId): + return some Slot(request: request, slotIndex: slot.slotIndex) + return none Slot method requestState*(market: MockMarket, requestId: RequestId): Future[?RequestState] {.async.} = @@ -118,11 +121,9 @@ method requestState*(market: MockMarket, method slotState*(market: MockMarket, slotId: SlotId): Future[SlotState] {.async.} = - if market.slotState.hasKey(slotId): - return market.slotState[slotId] - else: + if not market.slotState.hasKey(slotId): return SlotState.Free - + return market.slotState[slotId] method getRequestEnd*(market: MockMarket, id: RequestId): Future[SecondsSince1970] {.async.} = diff --git a/tests/codex/sales/helpers.nim b/tests/codex/sales/helpers.nim new file mode 100644 index 00000000..c2dd94a2 --- /dev/null +++ b/tests/codex/sales/helpers.nim @@ -0,0 +1,14 @@ +import pkg/chronos +import pkg/questionable +import pkg/questionable/results + +import pkg/codex/sales/reservations + +proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} = + var ret: seq[Availability] = @[] + without availabilities =? (await r.availabilities), err: + raiseAssert "failed to get availabilities, error: " & err.msg + for a in availabilities: + if availability =? (await a): + ret.add availability + return ret diff --git a/tests/codex/sales/states/testfilled.nim b/tests/codex/sales/states/testfilled.nim index 02040e84..3e943d1b 100644 --- a/tests/codex/sales/states/testfilled.nim +++ b/tests/codex/sales/states/testfilled.nim @@ -29,7 +29,6 @@ suite "sales state 'filled'": agent = newSalesAgent(context, request.id, slotIndex, - Availability.none, StorageRequest.none) state = SaleFilled.new() diff --git a/tests/codex/sales/states/testunknown.nim b/tests/codex/sales/states/testunknown.nim index f9a9c0d5..d5e714e8 100644 --- a/tests/codex/sales/states/testunknown.nim +++ b/tests/codex/sales/states/testunknown.nim @@ -27,7 +27,6 @@ suite "sales state 'unknown'": agent = newSalesAgent(context, request.id, slotIndex, - Availability.none, StorageRequest.none) state = SaleUnknown.new() diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim new file mode 100644 index 00000000..a85e4db9 --- /dev/null +++ b/tests/codex/sales/testreservations.nim @@ -0,0 +1,188 @@ +import pkg/questionable +import pkg/questionable/results + +import pkg/chronos +import pkg/asynctest +import pkg/datastore +import pkg/json_serialization +import pkg/json_serialization/std/options +import pkg/stew/byteutils + +import pkg/codex/stores +import pkg/codex/sales + +import ../examples +import ./helpers + +suite "Reservations module": + + var + repo: RepoStore + repoDs: Datastore + metaDs: SQLiteDatastore + availability: Availability + reservations: Reservations + + setup: + repoDs = SQLiteDatastore.new(Memory).tryGet() + metaDs = SQLiteDatastore.new(Memory).tryGet() + repo = RepoStore.new(repoDs, metaDs) + reservations = Reservations.new(repo) + availability = Availability.example + + test "availability can be serialised and deserialised": + let availability = Availability.example + let serialised = availability.toJson + check Json.decode(serialised, Availability) == availability + + test "has no availability initially": + check (await reservations.allAvailabilities()).len == 0 + + test "generates unique ids for storage availability": + let availability1 = Availability.init(1.u256, 2.u256, 3.u256) + let availability2 = Availability.init(1.u256, 2.u256, 3.u256) + check availability1.id != availability2.id + + test "can reserve available storage": + let availability1 = Availability.example + let availability2 = Availability.example + check isOk await reservations.reserve(availability1) + check isOk await reservations.reserve(availability2) + + let availabilities = await reservations.allAvailabilities() + check: + # perform unordered checks + availabilities.len == 2 + availabilities.contains(availability1) + availabilities.contains(availability2) + + test "reserved availability exists": + check isOk await reservations.reserve(availability) + + without exists =? await reservations.exists(availability.id): + fail() + + check exists + + test "reserved availability can be partially released": + let size = availability.size.truncate(uint) + check isOk await reservations.reserve(availability) + check isOk await reservations.release(availability.id, size - 1) + + without a =? await reservations.get(availability.id): + fail() + + check a.size == 1 + + test "availability is deleted after being fully released": + let size = availability.size.truncate(uint) + check isOk await reservations.reserve(availability) + check isOk await reservations.release(availability.id, size) + + without exists =? await reservations.exists(availability.id): + fail() + + check not exists + + test "non-existant availability cannot be released": + let size = availability.size.truncate(uint) + let r = await reservations.release(availability.id, size - 1) + check r.error of AvailabilityGetFailedError + check r.error.msg == "Availability does not exist" + + test "added availability is not used initially": + check isOk await reservations.reserve(availability) + + without available =? await reservations.get(availability.id): + fail() + + check not available.used + + test "availability can be marked used": + check isOk await reservations.reserve(availability) + + check isOk await reservations.markUsed(availability.id) + + without available =? await reservations.get(availability.id): + fail() + + check available.used + + test "availability can be marked unused": + check isOk await reservations.reserve(availability) + + check isOk await reservations.markUsed(availability.id) + check isOk await reservations.markUnused(availability.id) + + without available =? await reservations.get(availability.id): + fail() + + check not available.used + + test "used availability can be found": + check isOk await reservations.reserve(availability) + + check isOk await reservations.markUsed(availability.id) + + without available =? await reservations.find(availability.size, + availability.duration, availability.minPrice, used = true): + + fail() + + test "unused availability can be found": + check isOk await reservations.reserve(availability) + + without available =? await reservations.find(availability.size, + availability.duration, availability.minPrice, used = false): + + fail() + + test "non-existant availability cannot be found": + check isNone (await reservations.find(availability.size, + availability.duration, availability.minPrice, used = false)) + + test "non-existant availability cannot be retrieved": + let r = await reservations.get(availability.id) + check r.error of AvailabilityGetFailedError + check r.error.msg == "Availability does not exist" + + test "same availability cannot be reserved twice": + check isOk await reservations.reserve(availability) + let r = await reservations.reserve(availability) + check r.error of AvailabilityAlreadyExistsError + + test "can get available bytes in repo": + check reservations.available == DefaultQuotaBytes + + test "reserving availability reduces available bytes": + check isOk await reservations.reserve(availability) + check reservations.available == + DefaultQuotaBytes - availability.size.truncate(uint) + + test "reports quota available to be reserved": + check reservations.hasAvailable(availability.size.truncate(uint)) + + test "reports quota not available to be reserved": + repo = RepoStore.new(repoDs, metaDs, + quotaMaxBytes = availability.size.truncate(uint) - 1) + reservations = Reservations.new(repo) + check not reservations.hasAvailable(availability.size.truncate(uint)) + + test "fails to reserve availability with size that is larger than available quota": + repo = RepoStore.new(repoDs, metaDs, + quotaMaxBytes = availability.size.truncate(uint) - 1) + reservations = Reservations.new(repo) + let r = await reservations.reserve(availability) + check r.error of AvailabilityReserveFailedError + check r.error.parent of QuotaNotEnoughError + check exists =? (await reservations.exists(availability.id)) and not exists + + test "fails to release availability size that is larger than available quota": + let size = availability.size.truncate(uint) + repo = RepoStore.new(repoDs, metaDs, + quotaMaxBytes = size) + reservations = Reservations.new(repo) + discard await reservations.reserve(availability) + let r = await reservations.release(availability.id, size + 1) + check r.error of AvailabilityReleaseFailedError + check r.error.parent.msg == "Cannot release this many bytes" diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index e71ca014..78df8997 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -4,99 +4,135 @@ import std/sugar import std/times import pkg/asynctest import pkg/chronos +import pkg/datastore +import pkg/questionable +import pkg/questionable/results import pkg/codex/sales import pkg/codex/sales/salesdata +import pkg/codex/sales/salescontext +import pkg/codex/sales/reservations +import pkg/codex/stores/repostore import pkg/codex/proving +import pkg/codex/blocktype as bt +import pkg/codex/node import ../helpers/mockmarket import ../helpers/mockclock import ../helpers/eventually import ../examples +import ./helpers suite "Sales": - let availability = Availability.init( - size=100.u256, - duration=60.u256, - minPrice=600.u256 - ) - var request = StorageRequest( - ask: StorageAsk( - slots: 4, - slotSize: 100.u256, - duration: 60.u256, - reward: 10.u256, - ), - content: StorageContent( - cid: "some cid" - ), - expiry: (getTime() + initDuration(hours=1)).toUnix.u256 - ) let proof = exampleProof() + var availability: Availability + var request: StorageRequest var sales: Sales var market: MockMarket var clock: MockClock var proving: Proving + var reservations: Reservations + var repo: RepoStore setup: + availability = Availability.init( + size=100.u256, + duration=60.u256, + minPrice=600.u256 + ) + request = StorageRequest( + ask: StorageAsk( + slots: 4, + slotSize: 100.u256, + duration: 60.u256, + reward: 10.u256, + ), + content: StorageContent( + cid: "some cid" + ), + expiry: (getTime() + initDuration(hours=1)).toUnix.u256 + ) + market = MockMarket.new() clock = MockClock.new() proving = Proving.new() - sales = Sales.new(market, clock, proving) + let repoDs = SQLiteDatastore.new(Memory).tryGet() + let metaDs = SQLiteDatastore.new(Memory).tryGet() + repo = RepoStore.new(repoDs, metaDs) + await repo.start() + sales = Sales.new(market, clock, proving, repo) + reservations = sales.context.reservations sales.onStore = proc(request: StorageRequest, slot: UInt256, - availability: ?Availability) {.async.} = - discard + onBatch: BatchProc): Future[?!void] {.async.} = + return success() proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = return proof await sales.start() request.expiry = (clock.now() + 42).u256 teardown: + await repo.stop() await sales.stop() - test "has no availability initially": - check sales.available.len == 0 + proc getAvailability: ?!Availability = + waitFor reservations.get(availability.id) - test "can add available storage": - let availability1 = Availability.example - let availability2 = Availability.example - sales.add(availability1) - check sales.available.contains(availability1) - sales.add(availability2) - check sales.available.contains(availability1) - check sales.available.contains(availability2) + proc wasIgnored: Future[bool] {.async.} = + return + eventually sales.agents.len == 1 and # agent created at first + eventually sales.agents.len == 0 # then removed once ignored - test "can remove available storage": - sales.add(availability) - sales.remove(availability) - check sales.available.len == 0 + test "makes storage unavailable when downloading a matched request": + var used = false + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + onBatch: BatchProc): Future[?!void] {.async.} = + without avail =? await reservations.get(availability.id): + fail() + used = avail.used + return success() - test "generates unique ids for storage availability": - let availability1 = Availability.init(1.u256, 2.u256, 3.u256) - let availability2 = Availability.init(1.u256, 2.u256, 3.u256) - check availability1.id != availability2.id - - test "makes storage unavailable when matching request comes in": - sales.add(availability) + check isOk await reservations.reserve(availability) await market.requestStorage(request) - check eventually sales.available.len == 0 + check eventually used - test "ignores request when no matching storage is available": - sales.add(availability) - var tooBig = request - tooBig.ask.slotSize = request.ask.slotSize + 1 - await market.requestStorage(tooBig) - await sleepAsync(1.millis) - check eventually sales.available == @[availability] + test "reduces remaining availability size after download": + let blk = bt.Block.example + request.ask.slotSize = blk.data.len.u256 + availability.size = request.ask.slotSize + 1 + sales.onStore = proc(request: StorageRequest, + slot: UInt256, + onBatch: BatchProc): Future[?!void] {.async.} = + await onBatch(@[blk]) + return success() + check isOk await reservations.reserve(availability) + await market.requestStorage(request) + check eventually getAvailability().?size == success 1.u256 + + test "ignores download when duration not long enough": + availability.duration = request.ask.duration - 1 + check isOk await reservations.reserve(availability) + await market.requestStorage(request) + check await wasIgnored() + + test "ignores request when slot size is too small": + availability.size = request.ask.slotSize - 1 + check isOk await reservations.reserve(availability) + await market.requestStorage(request) + check await wasIgnored() test "ignores request when reward is too low": - sales.add(availability) - var tooCheap = request - tooCheap.ask.reward = request.ask.reward - 1 - await market.requestStorage(tooCheap) - await sleepAsync(1.millis) - check eventually sales.available == @[availability] + availability.minPrice = request.ask.pricePerSlot + 1 + check isOk await reservations.reserve(availability) + await market.requestStorage(request) + check await wasIgnored() + + test "availability remains unused when request is ignored": + availability.minPrice = request.ask.pricePerSlot + 1 + check isOk await reservations.reserve(availability) + await market.requestStorage(request) + check getAvailability().?used == success false test "retrieves and stores data locally": var storingRequest: StorageRequest @@ -104,30 +140,26 @@ suite "Sales": var storingAvailability: Availability sales.onStore = proc(request: StorageRequest, slot: UInt256, - availability: ?Availability) {.async.} = + onBatch: BatchProc): Future[?!void] {.async.} = storingRequest = request storingSlot = slot - check availability.isSome - storingAvailability = !availability - sales.add(availability) + return success() + check isOk await reservations.reserve(availability) await market.requestStorage(request) check eventually storingRequest == request check storingSlot < request.ask.slots.u256 - check storingAvailability == availability test "handles errors during state run": var saleFailed = false - sales.onStore = proc(request: StorageRequest, - slot: UInt256, - availability: ?Availability) {.async.} = - # raise an exception so machine.onError is called + proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = + # raise exception so machine.onError is called raise newException(ValueError, "some error") - # onSaleErrored is called in SaleErrored.run - proc onSaleErrored(availability: Availability) = + # onClear is called in SaleErrored.run + sales.onClear = proc(request: StorageRequest, + idx: UInt256) = saleFailed = true - sales.context.onSaleErrored = some onSaleErrored - sales.add(availability) + check isOk await reservations.reserve(availability) await market.requestStorage(request) check eventually saleFailed @@ -135,12 +167,12 @@ suite "Sales": let error = newException(IOError, "data retrieval failed") sales.onStore = proc(request: StorageRequest, slot: UInt256, - availability: ?Availability) {.async.} = - raise error - sales.add(availability) + onBatch: BatchProc): Future[?!void] {.async.} = + return failure(error) + check isOk await reservations.reserve(availability) await market.requestStorage(request) - await sleepAsync(1.millis) - check eventually sales.available == @[availability] + check eventually getAvailability().?used == success false + check getAvailability().?size == success availability.size test "generates proof of storage": var provingRequest: StorageRequest @@ -148,13 +180,13 @@ suite "Sales": proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = provingRequest = slot.request provingSlot = slot.slotIndex - sales.add(availability) + check isOk await reservations.reserve(availability) await market.requestStorage(request) check eventually provingRequest == request check provingSlot < request.ask.slots.u256 test "fills a slot": - sales.add(availability) + check isOk await reservations.reserve(availability) await market.requestStorage(request) check eventually market.filled.len == 1 check market.filled[0].requestId == request.id @@ -166,14 +198,13 @@ suite "Sales": var soldAvailability: Availability var soldRequest: StorageRequest var soldSlotIndex: UInt256 - sales.onSale = proc(availability: ?Availability, - request: StorageRequest, + sales.onSale = proc(request: StorageRequest, slotIndex: UInt256) = if a =? availability: soldAvailability = a soldRequest = request soldSlotIndex = slotIndex - sales.add(availability) + check isOk await reservations.reserve(availability) await market.requestStorage(request) check eventually soldAvailability == availability check soldRequest == request @@ -184,55 +215,48 @@ suite "Sales": # which then calls the onClear callback proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} = raise newException(IOError, "proof failed") - var clearedAvailability: Availability var clearedRequest: StorageRequest var clearedSlotIndex: UInt256 - sales.onClear = proc(availability: ?Availability, - request: StorageRequest, + sales.onClear = proc(request: StorageRequest, slotIndex: UInt256) = - if a =? availability: - clearedAvailability = a clearedRequest = request clearedSlotIndex = slotIndex - sales.add(availability) + check isOk await reservations.reserve(availability) await market.requestStorage(request) - check eventually clearedAvailability == availability - check clearedRequest == request + check eventually clearedRequest == request check clearedSlotIndex < request.ask.slots.u256 test "makes storage available again when other host fills the slot": let otherHost = Address.example sales.onStore = proc(request: StorageRequest, slot: UInt256, - availability: ?Availability) {.async.} = + onBatch: BatchProc): Future[?!void] {.async.} = await sleepAsync(chronos.hours(1)) - sales.add(availability) + return success() + check isOk await reservations.reserve(availability) await market.requestStorage(request) - await sleepAsync(1.millis) for slotIndex in 0.. 0 and newSize < size.u256 diff --git a/tests/integration/testproofs.nim b/tests/integration/testproofs.nim index 06962571..81042aaa 100644 --- a/tests/integration/testproofs.nim +++ b/tests/integration/testproofs.nim @@ -11,7 +11,8 @@ twonodessuite "Proving integration test", debug1=false, debug2=false: var config: MarketplaceConfig setup: - marketplace = Marketplace.new(!deployment().address(Marketplace), provider) + let deployment = Deployment.init() + marketplace = Marketplace.new(!deployment.address(Marketplace), provider) config = await marketplace.config() await provider.getSigner(accounts[0]).mint() await provider.getSigner(accounts[1]).mint() diff --git a/tests/integration/tokens.nim b/tests/integration/tokens.nim index dd114e96..eb171e01 100644 --- a/tests/integration/tokens.nim +++ b/tests/integration/tokens.nim @@ -5,13 +5,15 @@ import ../contracts/token proc mint*(signer: Signer, amount = 1_000_000.u256) {.async.} = ## Mints a considerable amount of tokens and approves them for transfer to ## the Marketplace contract. - let token = TestToken.new(!deployment().address(TestToken), signer) - let marketplace = Marketplace.new(!deployment().address(Marketplace), signer) + let deployment = Deployment.init() + let token = TestToken.new(!deployment.address(TestToken), signer) + let marketplace = Marketplace.new(!deployment.address(Marketplace), signer) await token.mint(await signer.getAddress(), amount) proc deposit*(signer: Signer) {.async.} = ## Deposits sufficient collateral into the Marketplace contract. - let marketplace = Marketplace.new(!deployment().address(Marketplace), signer) + let deployment = Deployment.init() + let marketplace = Marketplace.new(!deployment.address(Marketplace), signer) let config = await marketplace.config() let tokenAddress = await marketplace.token() let token = Erc20Token.new(tokenAddress, signer) diff --git a/tests/testContracts.nim b/tests/testContracts.nim index 0e8fde1d..28afaa23 100644 --- a/tests/testContracts.nim +++ b/tests/testContracts.nim @@ -2,6 +2,7 @@ import ./contracts/testCollateral import ./contracts/testContracts import ./contracts/testMarket import ./contracts/testProofs +import ./contracts/testDeployment import ./contracts/testInteractions import ./contracts/testClock diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index cde54362..fb76f7d0 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit cde543626236bd48188354d842cbe1513052c560 +Subproject commit fb76f7d0b2f94914b00f2a0f4136ebfb27df6abc diff --git a/vendor/questionable b/vendor/questionable index 30e4184a..6cbbda7e 160000 --- a/vendor/questionable +++ b/vendor/questionable @@ -1 +1 @@ -Subproject commit 30e4184a99c8c1ba329925912d2c5d4b09acf8cc +Subproject commit 6cbbda7e4d009e02d0583b325b31dc68dff27854