[marketplace] Add Reservations Module (#340)

* [marketplace] reservations module

- add de/serialization for Availability
- add markUsed/markUnused in persisted availability
- add query for unused
- add reserve/release
- reservation module tests
- split ContractInteractions into client contracts and host contracts
- remove reservations start/stop as the repo start/stop is being managed by the node
- remove dedicated reservations metadata store and use the metadata store from the repo instead
- Split ContractInteractions into:
  - ClientInteractions (with purchasing)
  - HostInteractions (with sales and proving)
- compilation fix for nim 1.2

[repostore] fix started flag, add tests

[marketplace] persist slot index
For loading the sales state from chain, the slot index was not previously persisted in the contract. Will retrieve the slot index from the contract when the sales state is loaded.

* Revert repostore changes

In favour of separate PR https://github.com/status-im/nim-codex/pull/374.

* remove warnings

* clean up

* tests: stop repostore during teardown

* change constructor type identifier

Change Contracts constructor to accept Contracts type instead of ContractInteractions.

* change constructor return type to Result instead of Option

* fix and split interactions tests

* clean up, fix tests

* find availability by slot id

* remove duplication in host/client interactions

* add test for finding availability by slotId

* log instead of raiseAssert when failed to mark availability as unused

* move to SaleErrored state instead of raiseAssert

* remove unneeded reverse

It appears that order is not preserved in the repostore, so reversing does not have the intended effect here.

* update open api spec for potential rest endpoint errors

* move functions about available bytes to repostore

* WIP: reserve and release availabilities as needed

WIP: not tested yet

Availabilities are marked as used when matched (just before downloading starts) so that future matching logic does not match an availability currently in use.

As the download progresses, batches of blocks are written to disk, and the equivalent bytes are released from the reservation module. The size of the availability is reduced as well.

During a reserve or release operation, availability updates occur after the repo is manipulated. If the availability update operation fails, the reserve or release is rolled back to maintain correct accounting of bytes.

Finally, once download completes, or if an error occurs, the availability is marked as unused so future matching can occur.

* delete availability when all bytes released

* fix tests + cleanup

* remove availability from SalesContext callbacks

Availability is no longer used past the SaleDownloading state in the state machine. Cleanup of Availability (marking unused) is handled directly in the SaleDownloading state, and no longer in SaleErrored or SaleFinished. Likewise, Availabilities shouldn’t need to be handled on node restart.

Additionally, Availability was being passed in SalesContext callbacks, and now that Availability is only used temporarily in the SaleDownloading state, Availability is contextually irrelevant to the callbacks, except in OnStore possibly, though it was not being consumed.

* test clean up

* - remove availability from callbacks and constructors from previous commit that needed to be removed (oopsie)
- fix integration test that checks availabilities
  - there was a bug fixed that crashed the node due to a missing `return success` in onStore
  - the test was fixed by ensuring that availabilities are remaining on the node, and the size has been reduced
- change Availability back to non-ref object and constructor back to init
- add trace logging of all state transitions in state machine
- add generally useful trace logging

* fixes after rebase

1. Fix onProve callbacks
2. Use Slot type instead of tuple for retrieving active slot.
3. Bump codex-contracts-eth that exposes getActivceSlot call.

* swap contracts branch to not support slot collateral

Slot collateral changes in the contracts require further changes in the client code, so we’ll skip those changes for now and add in a separate commit.

* modify Interactions and Deployment constructors

- `HostInteractions` and `ClientInteractions` constructors were simplified to take a contract address and no overloads
- `Interactions` prepared simplified so there are no overloads
- `Deployment` constructor updated so that it takes an optional string parameter, instead `Option[string]`

* Move `batchProc` declaration

`batchProc` needs to be consumed by both `node` and `salescontext`, and they can’t reference each other as it creates a circular dependency.

* [reservations] rename `available` to `hasAvailable`

* [reservations] default error message to inner error msg

* add SaleIngored state

When a storage request is handled but the request does match availabilities, the sales agent machine is sent to the SaleIgnored state. In addition, the agent is constructed in a way that if the request is ignored, the sales agent is removed from the list of active agents being tracked in the sales module.
This commit is contained in:
Eric Mastro 2023-04-04 17:05:16 +10:00 committed by GitHub
parent 3198db414d
commit ccf349bd14
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
49 changed files with 1148 additions and 385 deletions

View File

@ -96,7 +96,10 @@ proc stop*(s: CodexServer) {.async.} =
s.runHandle.complete()
proc new(_: type ContractInteractions, config: CodexConf): ?ContractInteractions =
proc new(_: type Contracts,
config: CodexConf,
repo: RepoStore): Contracts =
if not config.persistence:
if config.ethAccount.isSome:
warn "Ethereum account was set, but persistence is not enabled"
@ -106,10 +109,31 @@ proc new(_: type ContractInteractions, config: CodexConf): ?ContractInteractions
error "Persistence enabled, but no Ethereum account was set"
quit QuitFailure
if deployment =? config.ethDeployment:
ContractInteractions.new(config.ethProvider, account, deployment)
else:
ContractInteractions.new(config.ethProvider, account)
var deploy: Deployment
try:
if deployFile =? config.ethDeployment:
deploy = Deployment.init(deployFile)
else:
deploy = Deployment.init()
except IOError as e:
error "Unable to read deployment json"
quit QuitFailure
without marketplaceAddress =? deploy.address(Marketplace):
error "Marketplace contract address not found in deployment file"
quit QuitFailure
# TODO: at some point there may be cli options that enable client-only or host-only
# operation, and both client AND host will not necessarily need to be instantiated
let client = ClientInteractions.new(config.ethProvider,
account,
marketplaceAddress)
let host = HostInteractions.new(config.ethProvider,
account,
repo,
marketplaceAddress)
(client.option, host.option)
proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): T =
@ -182,7 +206,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey):
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
contracts = ContractInteractions.new(config)
contracts = Contracts.new(config, repoStore)
codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery, contracts)
restServer = RestServerRef.new(
codexNode.initRestApi(config),

View File

@ -12,7 +12,7 @@ const defaultFile = "vendor" / "codex-contracts-eth" / "deployment-localhost.jso
## been exported with Hardhat deploy.
## See also:
## https://github.com/wighawag/hardhat-deploy/tree/master#6-hardhat-export
proc deployment*(file = defaultFile): Deployment =
proc init*(_: type Deployment, file: string = defaultFile): Deployment =
Deployment(json: parseFile(file))
proc address*(deployment: Deployment, Contract: typedesc): ?Address =

View File

@ -1,78 +1,5 @@
import pkg/ethers
import pkg/chronicles
import ../purchasing
import ../sales
import ../proving
import ./deployment
import ./marketplace
import ./market
import ./proofs
import ./clock
import ./interactions/interactions
import ./interactions/hostinteractions
import ./interactions/clientinteractions
export purchasing
export sales
export proving
export chronicles
type
ContractInteractions* = ref object
purchasing*: Purchasing
sales*: Sales
proving*: Proving
clock: OnChainClock
proc new*(_: type ContractInteractions,
signer: Signer,
deployment: Deployment): ?ContractInteractions =
without address =? deployment.address(Marketplace):
error "Unable to determine address of the Marketplace smart contract"
return none ContractInteractions
let contract = Marketplace.new(address, signer)
let market = OnChainMarket.new(contract)
let proofs = OnChainProofs.new(contract)
let clock = OnChainClock.new(signer.provider)
let proving = Proving.new(proofs, clock)
some ContractInteractions(
purchasing: Purchasing.new(market, clock),
sales: Sales.new(market, clock, proving),
proving: proving,
clock: clock
)
proc new*(_: type ContractInteractions,
providerUrl: string,
account: Address,
deploymentFile: string = string.default): ?ContractInteractions =
let provider = JsonRpcProvider.new(providerUrl)
let signer = provider.getSigner(account)
var deploy: Deployment
try:
if deploymentFile == string.default:
deploy = deployment()
else:
deploy = deployment(deploymentFile)
except IOError as e:
error "Unable to read deployment json", msg = e.msg
return none ContractInteractions
ContractInteractions.new(signer, deploy)
proc new*(_: type ContractInteractions,
account: Address): ?ContractInteractions =
ContractInteractions.new("ws://localhost:8545", account)
proc start*(interactions: ContractInteractions) {.async.} =
await interactions.clock.start()
await interactions.sales.start()
await interactions.proving.start()
await interactions.purchasing.start()
proc stop*(interactions: ContractInteractions) {.async.} =
await interactions.purchasing.stop()
await interactions.sales.stop()
await interactions.proving.stop()
await interactions.clock.stop()
export interactions, hostinteractions, clientinteractions

View File

@ -0,0 +1,38 @@
import pkg/ethers
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import ../../purchasing
import ../marketplace
import ../market
import ../proofs
import ../clock
import ./interactions
export purchasing
export chronicles
type
ClientInteractions* = ref object of ContractInteractions
purchasing*: Purchasing
proc new*(_: type ClientInteractions,
providerUrl: string,
account: Address,
contractAddress: Address): ?!ClientInteractions =
without prepared =? prepare(providerUrl, account, contractAddress), error:
return failure(error)
let c = ClientInteractions.new(prepared.clock)
c.purchasing = Purchasing.new(prepared.market, prepared.clock)
return success(c)
proc start*(self: ClientInteractions) {.async.} =
await procCall ContractInteractions(self).start()
await self.purchasing.start()
proc stop*(self: ClientInteractions) {.async.} =
await self.purchasing.stop()
await procCall ContractInteractions(self).stop()

View File

@ -0,0 +1,46 @@
import pkg/ethers
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import ../../sales
import ../../proving
import ../../stores
import ../proofs
import ./interactions
export sales
export proving
export chronicles
type
HostInteractions* = ref object of ContractInteractions
sales*: Sales
proving*: Proving
proc new*(_: type HostInteractions,
providerUrl: string,
account: Address,
repo: RepoStore,
contractAddress: Address): ?!HostInteractions =
without prepared =? prepare(providerUrl, account, contractAddress), error:
return failure(error)
let proofs = OnChainProofs.new(prepared.contract)
let proving = Proving.new(proofs, prepared.clock)
let h = HostInteractions.new(prepared.clock)
h.sales = Sales.new(prepared.market, prepared.clock, proving, repo)
h.proving = proving
return success(h)
method start*(self: HostInteractions) {.async.} =
await procCall ContractInteractions(self).start()
await self.sales.start()
await self.proving.start()
method stop*(self: HostInteractions) {.async.} =
await self.sales.stop()
await self.proving.stop()
await procCall ContractInteractions(self).start()

View File

@ -0,0 +1,36 @@
import pkg/ethers
import ../../errors
import ../clock
import ../marketplace
import ../market
type
ContractInteractions* = ref object of RootObj
clock: OnChainClock
ContractInteractionsError* = object of CodexError
ReadDeploymentFileFailureError* = object of ContractInteractionsError
ContractAddressError* = object of ContractInteractionsError
proc new*(T: type ContractInteractions,
clock: OnChainClock): T =
T(clock: clock)
proc prepare*(
providerUrl: string = "ws://localhost:8545",
account, contractAddress: Address):
?!tuple[contract: Marketplace, market: OnChainMarket, clock: OnChainClock] =
let provider = JsonRpcProvider.new(providerUrl)
let signer = provider.getSigner(account)
let contract = Marketplace.new(contractAddress, signer)
let market = OnChainMarket.new(contract)
let clock = OnChainClock.new(signer.provider)
return success((contract, market, clock))
method start*(self: ContractInteractions) {.async, base.} =
await self.clock.start()
method stop*(self: ContractInteractions) {.async, base.} =
await self.clock.stop()

View File

@ -79,13 +79,15 @@ method getHost(market: OnChainMarket,
else:
return none Address
method getRequestFromSlotId*(market: OnChainMarket,
slotId: SlotId): Future[?StorageRequest] {.async.} =
method getActiveSlot*(
market: OnChainMarket,
slotId: SlotId): Future[?Slot] {.async.} =
try:
return some await market.contract.getRequestFromSlotId(slotId)
return some await market.contract.getActiveSlot(slotId)
except ProviderError as e:
if e.revertReason.contains("Slot is free"):
return none StorageRequest
return none Slot
raise e
method fillSlot(market: OnChainMarket,

View File

@ -48,7 +48,7 @@ proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.}
proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.}
proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.}
proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.}
proc getRequestFromSlotId*(marketplace: Marketplace, id: SlotId): StorageRequest {.contract, view.}
proc getActiveSlot*(marketplace: Marketplace, id: SlotId): Slot {.contract, view.}
proc myRequests*(marketplace: Marketplace): seq[RequestId] {.contract, view.}
proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.}

View File

@ -71,6 +71,12 @@ func fromTuple(_: type StorageRequest, tupl: tuple): StorageRequest =
nonce: tupl[4]
)
func fromTuple(_: type Slot, tupl: tuple): Slot =
Slot(
request: tupl[0],
slotIndex: tupl[1]
)
func fromTuple(_: type StorageAsk, tupl: tuple): StorageAsk =
StorageAsk(
slots: tupl[0],
@ -133,6 +139,9 @@ func encode*(encoder: var AbiEncoder, id: RequestId | SlotId | Nonce) =
func encode*(encoder: var AbiEncoder, request: StorageRequest) =
encoder.write(request.fieldValues)
func encode*(encoder: var AbiEncoder, request: Slot) =
encoder.write(request.fieldValues)
func decode*(decoder: var AbiDecoder, T: type StoragePoR): ?!T =
let tupl = ?decoder.read(StoragePoR.fieldTypes)
success StoragePoR.fromTuple(tupl)
@ -153,6 +162,10 @@ func decode*(decoder: var AbiDecoder, T: type StorageRequest): ?!T =
let tupl = ?decoder.read(StorageRequest.fieldTypes)
success StorageRequest.fromTuple(tupl)
func decode*(decoder: var AbiDecoder, T: type Slot): ?!T =
let tupl = ?decoder.read(Slot.fieldTypes)
success Slot.fromTuple(tupl)
func id*(request: StorageRequest): RequestId =
let encoding = AbiEncoder.encode((request, ))
RequestId(keccak256.digest(encoding).data)

View File

@ -58,8 +58,10 @@ method getHost*(market: Market,
slotIndex: UInt256): Future[?Address] {.base, async.} =
raiseAssert("not implemented")
method getRequestFromSlotId*(market: Market,
slotId: SlotId): Future[?StorageRequest] {.base, async.} =
method getActiveSlot*(
market: Market,
slotId: SlotId): Future[?Slot] {.base, async.} =
raiseAssert("not implemented")
method fillSlot*(market: Market,

View File

@ -31,6 +31,9 @@ import ./streams
import ./erasure
import ./discovery
import ./contracts
import ./node/batch
export batch
logScope:
topics = "codex node"
@ -39,10 +42,12 @@ const
FetchBatch = 200
type
BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, raises: [Defect].}
CodexError = object of CatchableError
Contracts* = tuple
client: ?ClientInteractions
host: ?HostInteractions
CodexNodeRef* = ref object
switch*: Switch
networkId*: PeerId
@ -50,7 +55,7 @@ type
engine*: BlockExcEngine
erasure*: Erasure
discovery*: Discovery
contracts*: ?ContractInteractions
contracts*: Contracts
proc findPeer*(
node: CodexNodeRef,
@ -251,7 +256,7 @@ proc requestStorage*(self: CodexNodeRef,
##
trace "Received a request for storage!", cid, duration, nodes, tolerance, reward
without contracts =? self.contracts:
without contracts =? self.contracts.client:
trace "Purchasing not available"
return failure "Purchasing not available"
@ -309,7 +314,7 @@ proc new*(
engine: BlockExcEngine,
erasure: Erasure,
discovery: Discovery,
contracts = ContractInteractions.none): T =
contracts: Contracts = (ClientInteractions.none, HostInteractions.none)): T =
T(
switch: switch,
blockStore: store,
@ -331,46 +336,56 @@ proc start*(node: CodexNodeRef) {.async.} =
if not node.discovery.isNil:
await node.discovery.start()
if contracts =? node.contracts:
if hostContracts =? node.contracts.host:
# TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead
contracts.sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
hostContracts.sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
## store data in local storage
##
without cid =? Cid.init(request.content.cid):
trace "Unable to parse Cid", cid
raise newException(CodexError, "Unable to parse Cid")
let error = newException(CodexError, "Unable to parse Cid")
return failure(error)
without manifest =? await node.fetchManifest(cid), error:
trace "Unable to fetch manifest for cid", cid
raise error
return failure(error)
trace "Fetching block for manifest", cid
# TODO: This will probably require a call to `getBlock` either way,
# since fetching of blocks will have to be selective according
# to a combination of parameters, such as node slot position
# and dataset geometry
let fetchRes = await node.fetchBatched(manifest)
if fetchRes.isErr:
raise newException(CodexError, "Unable to retrieve blocks")
if fetchErr =? (await node.fetchBatched(manifest, onBatch = onBatch)).errorOption:
let error = newException(CodexError, "Unable to retrieve blocks")
error.parent = fetchErr
return failure(error)
contracts.sales.onClear = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) =
return success()
hostContracts.sales.onClear = proc(request: StorageRequest,
slotIndex: UInt256) =
# TODO: remove data from local storage
discard
contracts.proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
hostContracts.proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
# TODO: generate proof
return @[42'u8]
try:
await contracts.start()
await hostContracts.start()
except CatchableError as error:
error "Unable to start contract interactions: ", error=error.msg
node.contracts = ContractInteractions.none
error "Unable to start host contract interactions: ", error=error.msg
node.contracts.host = HostInteractions.none
if clientContracts =? node.contracts.client:
try:
await clientContracts.start()
except CatchableError as error:
error "Unable to start client contract interactions: ", error=error.msg
node.contracts.client = ClientInteractions.none
node.networkId = node.switch.peerInfo.peerId
notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
@ -390,8 +405,11 @@ proc stop*(node: CodexNodeRef) {.async.} =
if not node.discovery.isNil:
await node.discovery.stop()
if contracts =? node.contracts:
await contracts.stop()
if clientContracts =? node.contracts.client:
await clientContracts.stop()
if hostContracts =? node.contracts.host:
await hostContracts.stop()
if not node.blockStore.isNil:
await node.blockStore.close

6
codex/node/batch.nim Normal file
View File

@ -0,0 +1,6 @@
import pkg/chronos
import pkg/upraises
import ../blocktype as bt
type
BatchProc* = proc(blocks: seq[bt.Block]): Future[void] {.gcsafe, upraises:[].}

View File

@ -255,10 +255,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
"/api/codex/v1/sales/availability") do () -> RestApiResponse:
## Returns storage that is for sale
without contracts =? node.contracts:
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")
let json = %contracts.sales.available
without unused =? (await contracts.sales.context.reservations.unused), err:
return RestApiResponse.error(Http500, err.msg)
let json = %unused
return RestApiResponse.response($json)
router.rawApi(
@ -270,7 +273,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
## duration - maximum time the storage should be sold for (in seconds)
## minPrice - minimum price to be paid (in amount of tokens)
without contracts =? node.contracts:
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")
let body = await request.getBody()
@ -278,7 +281,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
without availability =? Availability.fromJson(body), error:
return RestApiResponse.error(Http400, error.msg)
contracts.sales.add(availability)
let reservations = contracts.sales.context.reservations
if not reservations.hasAvailable(availability.size.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")
if err =? (await reservations.reserve(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
let json = %availability
return RestApiResponse.response($json)
@ -288,7 +297,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
"/api/codex/v1/storage/purchases/{id}") do (
id: PurchaseId) -> RestApiResponse:
without contracts =? node.contracts:
without contracts =? node.contracts.client:
return RestApiResponse.error(Http503, "Purchasing unavailable")
without id =? id.tryGet.catch, error:

View File

@ -48,7 +48,7 @@ func `%`*(stint: StInt|StUint): JsonNode =
func `%`*(arr: openArray[byte]): JsonNode =
%("0x" & arr.toHex)
func `%`*(id: RequestId | SlotId | Nonce): JsonNode =
func `%`*(id: RequestId | SlotId | Nonce | AvailabilityId): JsonNode =
% id.toArray
func `%`*(purchase: Purchase): JsonNode =

View File

@ -2,16 +2,17 @@ import std/sequtils
import pkg/questionable
import pkg/upraises
import pkg/stint
import pkg/nimcrypto
import pkg/chronicles
import pkg/datastore
import pkg/upraises
import ./rng
import ./market
import ./clock
import ./proving
import ./stores
import ./contracts/requests
import ./sales/salescontext
import ./sales/salesagent
import ./sales/availability
import ./sales/statemachine
import ./sales/states/downloading
import ./sales/states/unknown
@ -35,13 +36,15 @@ import ./sales/states/unknown
## | | ---- storage proof ---> |
export stint
export availability
export reservations
logScope:
topics = "sales"
type
Sales* = ref object
context*: SalesContext
subscription*: ?market.Subscription
available: seq[Availability]
agents*: seq[SalesAgent]
proc `onStore=`*(sales: Sales, onStore: OnStore) =
@ -59,103 +62,57 @@ proc onClear*(sales: Sales): ?OnClear = sales.context.onClear
proc onSale*(sales: Sales): ?OnSale = sales.context.onSale
proc available*(sales: Sales): seq[Availability] = sales.available
proc init*(_: type Availability,
size: UInt256,
duration: UInt256,
minPrice: UInt256): Availability =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
func add*(sales: Sales, availability: Availability) =
if not sales.available.contains(availability):
sales.available.add(availability)
# TODO: add to disk (persist), serialise to json.
func remove*(sales: Sales, availability: Availability) =
sales.available.keepItIf(it != availability)
# TODO: remove from disk availability, mark as in use by assigning
# a slotId, so that it can be used for restoration (node restart)
func new*(_: type Sales,
market: Market,
clock: Clock,
proving: Proving): Sales =
proving: Proving,
repo: RepoStore): Sales =
let sales = Sales(context: SalesContext(
Sales(context: SalesContext(
market: market,
clock: clock,
proving: proving
proving: proving,
reservations: Reservations.new(repo)
))
proc onSaleErrored(availability: Availability) =
sales.add(availability)
sales.context.onSaleErrored = some onSaleErrored
sales
func findAvailability*(sales: Sales, ask: StorageAsk): ?Availability =
for availability in sales.available:
if ask.slotSize <= availability.size and
ask.duration <= availability.duration and
ask.pricePerSlot >= availability.minPrice:
return some availability
proc randomSlotIndex(numSlots: uint64): UInt256 =
let rng = Rng.instance
let slotIndex = rng.rand(numSlots - 1)
return slotIndex.u256
proc findSlotIndex(numSlots: uint64,
requestId: RequestId,
slotId: SlotId): ?UInt256 =
for i in 0..<numSlots:
if slotId(requestId, i.u256) == slotId:
return some i.u256
return none UInt256
proc handleRequest(sales: Sales,
requestId: RequestId,
ask: StorageAsk) =
without availability =? sales.findAvailability(ask):
return
sales.remove(availability)
debug "handling storage requested",
slots = ask.slots, slotSize = ask.slotSize, duration = ask.duration,
reward = ask.reward, maxSlotLoss = ask.maxSlotLoss
# TODO: check if random slot is actually available (not already filled)
let slotIndex = randomSlotIndex(ask.slots)
let agent = newSalesAgent(
sales.context,
requestId,
slotIndex,
some availability,
none StorageRequest
)
agent.context.onIgnored = proc {.gcsafe, upraises:[].} =
sales.agents.keepItIf(it != agent)
agent.start(SaleDownloading())
sales.agents.add agent
proc load*(sales: Sales) {.async.} =
let market = sales.context.market
# TODO: restore availability from disk
let slotIds = await market.mySlots()
for slotId in slotIds:
# TODO: this needs to be optimised
if request =? await market.getRequestFromSlotId(slotId):
let availability = sales.findAvailability(request.ask)
without slotIndex =? findSlotIndex(request.ask.slots,
request.id,
slotId):
raiseAssert "could not find slot index"
if slot =? (await market.getActiveSlot(slotId)):
let agent = newSalesAgent(
sales.context,
request.id,
slotIndex,
availability,
some request)
slot.request.id,
slot.slotIndex,
some slot.request)
agent.start(SaleUnknown())
sales.agents.add agent

View File

@ -1,8 +0,0 @@
import pkg/stint
type
Availability* = object
id*: array[32, byte]
size*: UInt256
duration*: UInt256
minPrice*: UInt256

View File

@ -0,0 +1,349 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/typetraits
import pkg/chronos
import pkg/chronicles
import pkg/upraises
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/stint
import pkg/stew/byteutils
import pkg/nimcrypto
import pkg/questionable
import pkg/questionable/results
push: {.upraises: [].}
import pkg/datastore
import ../stores
import ../contracts/requests
export requests
logScope:
topics = "reservations"
type
AvailabilityId* = distinct array[32, byte]
Availability* = object
id*: AvailabilityId
size*: UInt256
duration*: UInt256
minPrice*: UInt256
used*: bool
Reservations* = ref object
repo: RepoStore
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
AvailabilityIter* = ref object
finished*: bool
next*: GetNext
AvailabilityError* = object of CodexError
AvailabilityAlreadyExistsError* = object of AvailabilityError
AvailabilityReserveFailedError* = object of AvailabilityError
AvailabilityReleaseFailedError* = object of AvailabilityError
AvailabilityDeleteFailedError* = object of AvailabilityError
AvailabilityGetFailedError* = object of AvailabilityError
AvailabilityUpdateFailedError* = object of AvailabilityError
const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet
proc new*(
T: type Reservations,
repo: RepoStore): Reservations =
T(repo: repo)
proc init*(
_: type Availability,
size: UInt256,
duration: UInt256,
minPrice: UInt256): Availability =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice)
func toArray*(id: AvailabilityId): array[32, byte] =
array[32, byte](id)
proc `==`*(x, y: AvailabilityId): bool {.borrow.}
proc `==`*(x, y: Availability): bool =
x.id == y.id and
x.size == y.size and
x.duration == y.duration and
x.minPrice == y.minPrice
proc `$`*(id: AvailabilityId): string = id.toArray.toHex
proc toErr[E1: ref CatchableError, E2: AvailabilityError](
e1: E1,
_: type E2,
msg: string = e1.msg): ref E2 =
return newException(E2, msg, e1)
proc writeValue*(
writer: var JsonWriter,
value: SlotId | AvailabilityId) {.upraises:[IOError].} =
mixin writeValue
writer.writeValue value.toArray
proc readValue*[T: SlotId | AvailabilityId](
reader: var JsonReader,
value: var T) {.upraises: [SerializationError, IOError].} =
mixin readValue
value = T reader.readValue(T.distinctBase)
func key(id: AvailabilityId): ?!Key =
(ReservationsKey / id.toArray.toHex)
func key*(availability: Availability): ?!Key =
return availability.id.key
func available*(self: Reservations): uint = self.repo.available
func hasAvailable*(self: Reservations, bytes: uint): bool =
self.repo.available(bytes)
proc exists*(
self: Reservations,
id: AvailabilityId): Future[?!bool] {.async.} =
without key =? id.key, err:
return failure(err)
let exists = await self.repo.metaDs.contains(key)
return success(exists)
proc get*(
self: Reservations,
id: AvailabilityId): Future[?!Availability] {.async.} =
if exists =? (await self.exists(id)) and not exists:
let err = newException(AvailabilityGetFailedError,
"Availability does not exist")
return failure(err)
without key =? id.key, err:
return failure(err.toErr(AvailabilityGetFailedError))
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err.toErr(AvailabilityGetFailedError))
without availability =? Json.decode(serialized, Availability).catch, err:
return failure(err.toErr(AvailabilityGetFailedError))
return success availability
proc update(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
trace "updating availability", id = availability.id, size = availability.size,
used = availability.used
without key =? availability.key, err:
return failure(err)
if err =? (await self.repo.metaDs.put(
key,
@(availability.toJson.toBytes))).errorOption:
return failure(err.toErr(AvailabilityUpdateFailedError))
return success()
proc delete(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
trace "deleting availability", id
without availability =? (await self.get(id)), err:
return failure(err)
without key =? availability.key, err:
return failure(err)
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(AvailabilityDeleteFailedError))
return success()
proc reserve*(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
if exists =? (await self.exists(availability.id)) and exists:
let err = newException(AvailabilityAlreadyExistsError,
"Availability already exists")
return failure(err)
without key =? availability.key, err:
return failure(err)
let bytes = availability.size.truncate(uint)
if reserveErr =? (await self.repo.reserve(bytes)).errorOption:
return failure(reserveErr.toErr(AvailabilityReserveFailedError))
if updateErr =? (await self.update(availability)).errorOption:
# rollback the reserve
trace "rolling back reserve"
if rollbackErr =? (await self.repo.release(bytes)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
proc release*(
self: Reservations,
id: AvailabilityId,
bytes: uint): Future[?!void] {.async.} =
trace "releasing bytes and updating availability", bytes, id
without var availability =? (await self.get(id)), err:
return failure(err)
without key =? id.key, err:
return failure(err)
if releaseErr =? (await self.repo.release(bytes)).errorOption:
return failure(releaseErr.toErr(AvailabilityReleaseFailedError))
availability.size = (availability.size.truncate(uint) - bytes).u256
template rollbackRelease(e: ref CatchableError) =
trace "rolling back release"
if rollbackErr =? (await self.repo.reserve(bytes)).errorOption:
rollbackErr.parent = e
return failure(rollbackErr)
# remove completely used availabilities
if availability.size == 0.u256:
if err =? (await self.delete(availability.id)).errorOption:
rollbackRelease(err)
return failure(err)
return success()
# persist partially used availability with updated size
if err =? (await self.update(availability)).errorOption:
rollbackRelease(err)
return failure(err)
return success()
proc markUsed*(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
without var availability =? (await self.get(id)), err:
return failure(err)
availability.used = true
let r = await self.update(availability)
if r.isOk:
trace "availability marked used", id = id.toArray.toHex
return r
proc markUnused*(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
without var availability =? (await self.get(id)), err:
return failure(err)
availability.used = false
let r = await self.update(availability)
if r.isOk:
trace "availability marked unused", id = id.toArray.toHex
return r
iterator items*(self: AvailabilityIter): Future[?Availability] =
while not self.finished:
yield self.next()
proc availabilities*(
self: Reservations): Future[?!AvailabilityIter] {.async.} =
var iter = AvailabilityIter()
let query = Query.init(ReservationsKey)
without results =? await self.repo.metaDs.query(query), err:
return failure(err)
proc next(): Future[?Availability] {.async.} =
await idleAsync()
iter.finished = results.finished
if not results.finished and
r =? (await results.next()) and
serialized =? r.data and
serialized.len > 0:
return some Json.decode(string.fromBytes(serialized), Availability)
return none Availability
iter.next = next
return success iter
proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
without availabilities =? (await r.availabilities), err:
return failure(err)
for a in availabilities:
if availability =? (await a) and not availability.used:
ret.add availability
return success(ret)
proc find*(
self: Reservations,
size, duration, minPrice: UInt256,
used: bool): Future[?Availability] {.async.} =
without availabilities =? (await self.availabilities), err:
error "failed to get all availabilities", error = err.msg
return none Availability
for a in availabilities:
if availability =? (await a):
if used == availability.used and
size <= availability.size and
duration <= availability.duration and
minPrice >= availability.minPrice:
trace "availability matched",
used, availUsed = availability.used,
size, availsize = availability.size,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice
return some availability
trace "availiability did not match",
used, availUsed = availability.used,
size, availsize = availability.size,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice

View File

@ -1,27 +1,35 @@
import pkg/chronos
import pkg/chronicles
import pkg/stint
import ../contracts/requests
import ../utils/asyncspawn
import ./statemachine
import ./salescontext
import ./salesdata
import ./availability
import ./reservations
export reservations
logScope:
topics = "sales statemachine"
type SalesAgent* = ref object of Machine
context*: SalesContext
data*: SalesData
subscribed: bool
func `==`*(a, b: SalesAgent): bool =
a.data.requestId == b.data.requestId and
a.data.slotIndex == b.data.slotIndex
proc newSalesAgent*(context: SalesContext,
requestId: RequestId,
slotIndex: UInt256,
availability: ?Availability,
request: ?StorageRequest): SalesAgent =
SalesAgent(
context: context,
data: SalesData(
requestId: requestId,
availability: availability,
slotIndex: slotIndex,
request: request))

View File

@ -1,8 +1,11 @@
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ../node/batch
import ../market
import ../clock
import ../proving
import ./availability
import ./reservations
type
SalesContext* = ref object
@ -11,15 +14,17 @@ type
onStore*: ?OnStore
onClear*: ?OnClear
onSale*: ?OnSale
onSaleErrored*: ?OnSaleErrored
onIgnored*: OnIgnored
proving*: Proving
reservations*: Reservations
OnStore* = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability): Future[void] {.gcsafe, upraises: [].}
OnClear* = proc(availability: ?Availability,# TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all)
request: StorageRequest,
onBatch: BatchProc): Future[?!void] {.gcsafe, upraises: [].}
OnProve* = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].}
OnClear* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale* = proc(availability: ?Availability, # TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all)
request: StorageRequest,
OnSale* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSaleErrored* = proc(availability: Availability) {.gcsafe, upraises: [].}
OnIgnored* = proc() {.gcsafe, upraises: [].}

View File

@ -1,13 +1,12 @@
import pkg/chronos
import ../contracts/requests
import ../market
import ./availability
import ./reservations
type
SalesData* = ref object
requestId*: RequestId
ask*: StorageAsk
availability*: ?Availability # TODO: when availability persistence is added, change this to not optional
request*: ?StorageRequest
slotIndex*: UInt256
failed*: market.Subscription

View File

@ -16,9 +16,6 @@ type
SaleState* = ref object of State
SaleError* = ref object of CodexError
method `$`*(state: SaleState): string {.base.} =
raiseAssert "not implemented"
method onCancelled*(state: SaleState, request: StorageRequest): ?State {.base, upraises:[].} =
discard

View File

@ -1,3 +1,7 @@
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import ../../blocktype as bt
import ../../market
import ../salesagent
import ../statemachine
@ -5,12 +9,15 @@ import ./errorhandling
import ./cancelled
import ./failed
import ./filled
import ./ignored
import ./proving
import ./errored
type
SaleDownloading* = ref object of ErrorHandlingState
failedSubscription: ?market.Subscription
hasCancelled: ?Future[void]
logScope:
topics = "sales downloading"
method `$`*(state: SaleDownloading): string = "SaleDownloading"
@ -28,6 +35,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
let reservations = context.reservations
await agent.retrieveRequest()
await agent.subscribe()
@ -38,5 +46,49 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
without request =? data.request:
raiseAssert "no sale request"
await onStore(request, data.slotIndex, data.availability)
without availability =? await reservations.find(
request.ask.slotSize,
request.ask.duration,
request.ask.pricePerSlot,
used = false):
info "no availability found for request, ignoring",
slotSize = request.ask.slotSize,
duration = request.ask.duration,
pricePerSlot = request.ask.pricePerSlot,
used = false
return some State(SaleIgnored())
# mark availability as used so that it is not matched to other requests
if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption:
return some State(SaleErrored(error: markUsedErr))
proc onBatch(blocks: seq[bt.Block]) {.async.} =
# release batches of blocks as they are written to disk and
# update availability size
var bytes: uint = 0
for blk in blocks:
bytes += blk.data.len.uint
trace "Releasing batch of bytes written to disk", bytes
let r = await reservations.release(availability.id, bytes)
# `tryGet` will raise the exception that occurred during release, if there
# was one. The exception will be caught in the closure and sent to the
# SaleErrored state.
r.tryGet()
template markUnused(id: AvailabilityId) =
if markUnusedErr =? (await reservations.markUnused(id)).errorOption:
return some State(SaleErrored(error: markUnusedErr))
trace "Starting download"
if err =? (await onStore(request,
data.slotIndex,
onBatch)).errorOption:
markUnused(availability.id)
return some State(SaleErrored(error: err))
trace "Download complete"
markUnused(availability.id)
return some State(SaleProving())

View File

@ -1,3 +1,5 @@
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import pkg/chronicles
import ../statemachine
@ -19,16 +21,8 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
if onClear =? context.onClear and
request =? data.request and
slotIndex =? data.slotIndex:
onClear(data.availability, request, slotIndex)
onClear(request, slotIndex)
# TODO: when availability persistence is added, change this to not optional
# NOTE: with this in place, restoring state for a restarted node will
# never free up availability once finished. Persisting availability
# on disk is required for this.
if onSaleErrored =? context.onSaleErrored and
availability =? data.availability:
onSaleErrored(availability)
await agent.unsubscribe()
await agent.unsubscribe()
error "Sale error", error=state.error.msg

View File

@ -26,12 +26,6 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
context.proving.add(Slot(request: request, slotIndex: slotIndex))
if onSale =? context.onSale:
onSale(data.availability, request, slotIndex)
# TODO: Keep track of contract completion using local clock. When contract
# has finished, we need to add back availability to the sales module.
# This will change when the state machine is updated to include the entire
# sales process, as well as when availability is persisted, so leaving it
# as a TODO for now.
onSale(request, slotIndex)
await agent.unsubscribe()

View File

@ -0,0 +1,18 @@
import pkg/chronos
import ../statemachine
import ../salesagent
import ./errorhandling
type
SaleIgnored* = ref object of ErrorHandlingState
method `$`*(state: SaleIgnored): string = "SaleIgnored"
method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
let context = agent.context
if onIgnored =? context.onIgnored:
onIgnored()
await agent.unsubscribe()

View File

@ -64,6 +64,12 @@ iterator items*(q: BlockExpirationIter): Future[?BlockExpiration] =
func totalUsed*(self: RepoStore): uint =
(self.quotaUsedBytes + self.quotaReservedBytes)
func available*(self: RepoStore): uint =
return self.quotaMaxBytes - self.totalUsed
func available*(self: RepoStore, bytes: uint): bool =
return bytes < self.available()
method getBlock*(self: RepoStore, cid: Cid): Future[?!Block] {.async.} =
## Get a block from the blockstore
##

View File

@ -15,6 +15,12 @@ type
State* = ref object of RootObj
Event* = proc(state: State): ?State {.gcsafe, upraises:[].}
logScope:
topics = "statemachine"
method `$`*(state: State): string {.base.} =
raiseAssert "not implemented"
proc transition(_: type Event, previous, next: State): Event =
return proc (state: State): ?State =
if state == previous:
@ -59,6 +65,7 @@ proc scheduler(machine: Machine) {.async.} =
if not machine.running.isNil:
await machine.running.cancelAndWait()
machine.state = next
debug "enter state", state = machine.state
machine.running = machine.run(machine.state)
machine.running.addCallback(onRunComplete)
except CancelledError:

View File

@ -293,6 +293,8 @@ paths:
type: array
items:
$ref: "#/components/schemas/SalesAvailability"
"500":
description: Error getting unused availabilities
"503":
description: Sales are unavailable
@ -312,6 +314,8 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/SalesAvailability"
"500":
description: Error reserving availablility
"503":
description: Sales are unavailable

View File

@ -105,12 +105,15 @@ method getRequest(market: MockMarket,
return some request
return none StorageRequest
method getRequestFromSlotId*(market: MockMarket,
slotId: SlotId): Future[?StorageRequest] {.async.} =
method getActiveSlot*(
market: MockMarket,
slotId: SlotId): Future[?Slot] {.async.} =
for slot in market.filled:
if slotId(slot.requestId, slot.slotIndex) == slotId:
return await market.getRequest(slot.requestId)
return none StorageRequest
if slotId(slot.requestId, slot.slotIndex) == slotId and
request =? await market.getRequest(slot.requestId):
return some Slot(request: request, slotIndex: slot.slotIndex)
return none Slot
method requestState*(market: MockMarket,
requestId: RequestId): Future[?RequestState] {.async.} =
@ -118,11 +121,9 @@ method requestState*(market: MockMarket,
method slotState*(market: MockMarket,
slotId: SlotId): Future[SlotState] {.async.} =
if market.slotState.hasKey(slotId):
return market.slotState[slotId]
else:
if not market.slotState.hasKey(slotId):
return SlotState.Free
return market.slotState[slotId]
method getRequestEnd*(market: MockMarket,
id: RequestId): Future[SecondsSince1970] {.async.} =

View File

@ -0,0 +1,14 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales/reservations
proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
without availabilities =? (await r.availabilities), err:
raiseAssert "failed to get availabilities, error: " & err.msg
for a in availabilities:
if availability =? (await a):
ret.add availability
return ret

View File

@ -29,7 +29,6 @@ suite "sales state 'filled'":
agent = newSalesAgent(context,
request.id,
slotIndex,
Availability.none,
StorageRequest.none)
state = SaleFilled.new()

View File

@ -27,7 +27,6 @@ suite "sales state 'unknown'":
agent = newSalesAgent(context,
request.id,
slotIndex,
Availability.none,
StorageRequest.none)
state = SaleUnknown.new()

View File

@ -0,0 +1,188 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/asynctest
import pkg/datastore
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/stew/byteutils
import pkg/codex/stores
import pkg/codex/sales
import ../examples
import ./helpers
suite "Reservations module":
var
repo: RepoStore
repoDs: Datastore
metaDs: SQLiteDatastore
availability: Availability
reservations: Reservations
setup:
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
reservations = Reservations.new(repo)
availability = Availability.example
test "availability can be serialised and deserialised":
let availability = Availability.example
let serialised = availability.toJson
check Json.decode(serialised, Availability) == availability
test "has no availability initially":
check (await reservations.allAvailabilities()).len == 0
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.u256, 2.u256, 3.u256)
let availability2 = Availability.init(1.u256, 2.u256, 3.u256)
check availability1.id != availability2.id
test "can reserve available storage":
let availability1 = Availability.example
let availability2 = Availability.example
check isOk await reservations.reserve(availability1)
check isOk await reservations.reserve(availability2)
let availabilities = await reservations.allAvailabilities()
check:
# perform unordered checks
availabilities.len == 2
availabilities.contains(availability1)
availabilities.contains(availability2)
test "reserved availability exists":
check isOk await reservations.reserve(availability)
without exists =? await reservations.exists(availability.id):
fail()
check exists
test "reserved availability can be partially released":
let size = availability.size.truncate(uint)
check isOk await reservations.reserve(availability)
check isOk await reservations.release(availability.id, size - 1)
without a =? await reservations.get(availability.id):
fail()
check a.size == 1
test "availability is deleted after being fully released":
let size = availability.size.truncate(uint)
check isOk await reservations.reserve(availability)
check isOk await reservations.release(availability.id, size)
without exists =? await reservations.exists(availability.id):
fail()
check not exists
test "non-existant availability cannot be released":
let size = availability.size.truncate(uint)
let r = await reservations.release(availability.id, size - 1)
check r.error of AvailabilityGetFailedError
check r.error.msg == "Availability does not exist"
test "added availability is not used initially":
check isOk await reservations.reserve(availability)
without available =? await reservations.get(availability.id):
fail()
check not available.used
test "availability can be marked used":
check isOk await reservations.reserve(availability)
check isOk await reservations.markUsed(availability.id)
without available =? await reservations.get(availability.id):
fail()
check available.used
test "availability can be marked unused":
check isOk await reservations.reserve(availability)
check isOk await reservations.markUsed(availability.id)
check isOk await reservations.markUnused(availability.id)
without available =? await reservations.get(availability.id):
fail()
check not available.used
test "used availability can be found":
check isOk await reservations.reserve(availability)
check isOk await reservations.markUsed(availability.id)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, used = true):
fail()
test "unused availability can be found":
check isOk await reservations.reserve(availability)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, used = false):
fail()
test "non-existant availability cannot be found":
check isNone (await reservations.find(availability.size,
availability.duration, availability.minPrice, used = false))
test "non-existant availability cannot be retrieved":
let r = await reservations.get(availability.id)
check r.error of AvailabilityGetFailedError
check r.error.msg == "Availability does not exist"
test "same availability cannot be reserved twice":
check isOk await reservations.reserve(availability)
let r = await reservations.reserve(availability)
check r.error of AvailabilityAlreadyExistsError
test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes
test "reserving availability reduces available bytes":
check isOk await reservations.reserve(availability)
check reservations.available ==
DefaultQuotaBytes - availability.size.truncate(uint)
test "reports quota available to be reserved":
check reservations.hasAvailable(availability.size.truncate(uint))
test "reports quota not available to be reserved":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
check not reservations.hasAvailable(availability.size.truncate(uint))
test "fails to reserve availability with size that is larger than available quota":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
let r = await reservations.reserve(availability)
check r.error of AvailabilityReserveFailedError
check r.error.parent of QuotaNotEnoughError
check exists =? (await reservations.exists(availability.id)) and not exists
test "fails to release availability size that is larger than available quota":
let size = availability.size.truncate(uint)
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = size)
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
let r = await reservations.release(availability.id, size + 1)
check r.error of AvailabilityReleaseFailedError
check r.error.parent.msg == "Cannot release this many bytes"

View File

@ -4,99 +4,135 @@ import std/sugar
import std/times
import pkg/asynctest
import pkg/chronos
import pkg/datastore
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales
import pkg/codex/sales/salesdata
import pkg/codex/sales/salescontext
import pkg/codex/sales/reservations
import pkg/codex/stores/repostore
import pkg/codex/proving
import pkg/codex/blocktype as bt
import pkg/codex/node
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/eventually
import ../examples
import ./helpers
suite "Sales":
let availability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=600.u256
)
var request = StorageRequest(
ask: StorageAsk(
slots: 4,
slotSize: 100.u256,
duration: 60.u256,
reward: 10.u256,
),
content: StorageContent(
cid: "some cid"
),
expiry: (getTime() + initDuration(hours=1)).toUnix.u256
)
let proof = exampleProof()
var availability: Availability
var request: StorageRequest
var sales: Sales
var market: MockMarket
var clock: MockClock
var proving: Proving
var reservations: Reservations
var repo: RepoStore
setup:
availability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=600.u256
)
request = StorageRequest(
ask: StorageAsk(
slots: 4,
slotSize: 100.u256,
duration: 60.u256,
reward: 10.u256,
),
content: StorageContent(
cid: "some cid"
),
expiry: (getTime() + initDuration(hours=1)).toUnix.u256
)
market = MockMarket.new()
clock = MockClock.new()
proving = Proving.new()
sales = Sales.new(market, clock, proving)
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
await repo.start()
sales = Sales.new(market, clock, proving, repo)
reservations = sales.context.reservations
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
discard
onBatch: BatchProc): Future[?!void] {.async.} =
return success()
proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
return proof
await sales.start()
request.expiry = (clock.now() + 42).u256
teardown:
await repo.stop()
await sales.stop()
test "has no availability initially":
check sales.available.len == 0
proc getAvailability: ?!Availability =
waitFor reservations.get(availability.id)
test "can add available storage":
let availability1 = Availability.example
let availability2 = Availability.example
sales.add(availability1)
check sales.available.contains(availability1)
sales.add(availability2)
check sales.available.contains(availability1)
check sales.available.contains(availability2)
proc wasIgnored: Future[bool] {.async.} =
return
eventually sales.agents.len == 1 and # agent created at first
eventually sales.agents.len == 0 # then removed once ignored
test "can remove available storage":
sales.add(availability)
sales.remove(availability)
check sales.available.len == 0
test "makes storage unavailable when downloading a matched request":
var used = false
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
without avail =? await reservations.get(availability.id):
fail()
used = avail.used
return success()
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.u256, 2.u256, 3.u256)
let availability2 = Availability.init(1.u256, 2.u256, 3.u256)
check availability1.id != availability2.id
test "makes storage unavailable when matching request comes in":
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually sales.available.len == 0
check eventually used
test "ignores request when no matching storage is available":
sales.add(availability)
var tooBig = request
tooBig.ask.slotSize = request.ask.slotSize + 1
await market.requestStorage(tooBig)
await sleepAsync(1.millis)
check eventually sales.available == @[availability]
test "reduces remaining availability size after download":
let blk = bt.Block.example
request.ask.slotSize = blk.data.len.u256
availability.size = request.ask.slotSize + 1
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await onBatch(@[blk])
return success()
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually getAvailability().?size == success 1.u256
test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check await wasIgnored()
test "ignores request when slot size is too small":
availability.size = request.ask.slotSize - 1
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check await wasIgnored()
test "ignores request when reward is too low":
sales.add(availability)
var tooCheap = request
tooCheap.ask.reward = request.ask.reward - 1
await market.requestStorage(tooCheap)
await sleepAsync(1.millis)
check eventually sales.available == @[availability]
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check await wasIgnored()
test "availability remains unused when request is ignored":
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check getAvailability().?used == success false
test "retrieves and stores data locally":
var storingRequest: StorageRequest
@ -104,30 +140,26 @@ suite "Sales":
var storingAvailability: Availability
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
onBatch: BatchProc): Future[?!void] {.async.} =
storingRequest = request
storingSlot = slot
check availability.isSome
storingAvailability = !availability
sales.add(availability)
return success()
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
check storingAvailability == availability
test "handles errors during state run":
var saleFailed = false
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
# raise an exception so machine.onError is called
proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
# raise exception so machine.onError is called
raise newException(ValueError, "some error")
# onSaleErrored is called in SaleErrored.run
proc onSaleErrored(availability: Availability) =
# onClear is called in SaleErrored.run
sales.onClear = proc(request: StorageRequest,
idx: UInt256) =
saleFailed = true
sales.context.onSaleErrored = some onSaleErrored
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually saleFailed
@ -135,12 +167,12 @@ suite "Sales":
let error = newException(IOError, "data retrieval failed")
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
raise error
sales.add(availability)
onBatch: BatchProc): Future[?!void] {.async.} =
return failure(error)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
await sleepAsync(1.millis)
check eventually sales.available == @[availability]
check eventually getAvailability().?used == success false
check getAvailability().?size == success availability.size
test "generates proof of storage":
var provingRequest: StorageRequest
@ -148,13 +180,13 @@ suite "Sales":
proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
provingRequest = slot.request
provingSlot = slot.slotIndex
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually provingRequest == request
check provingSlot < request.ask.slots.u256
test "fills a slot":
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually market.filled.len == 1
check market.filled[0].requestId == request.id
@ -166,14 +198,13 @@ suite "Sales":
var soldAvailability: Availability
var soldRequest: StorageRequest
var soldSlotIndex: UInt256
sales.onSale = proc(availability: ?Availability,
request: StorageRequest,
sales.onSale = proc(request: StorageRequest,
slotIndex: UInt256) =
if a =? availability:
soldAvailability = a
soldRequest = request
soldSlotIndex = slotIndex
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually soldAvailability == availability
check soldRequest == request
@ -184,55 +215,48 @@ suite "Sales":
# which then calls the onClear callback
proving.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
raise newException(IOError, "proof failed")
var clearedAvailability: Availability
var clearedRequest: StorageRequest
var clearedSlotIndex: UInt256
sales.onClear = proc(availability: ?Availability,
request: StorageRequest,
sales.onClear = proc(request: StorageRequest,
slotIndex: UInt256) =
if a =? availability:
clearedAvailability = a
clearedRequest = request
clearedSlotIndex = slotIndex
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually clearedAvailability == availability
check clearedRequest == request
check eventually clearedRequest == request
check clearedSlotIndex < request.ask.slots.u256
test "makes storage available again when other host fills the slot":
let otherHost = Address.example
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
sales.add(availability)
return success()
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
await sleepAsync(1.millis)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
await sleepAsync(chronos.seconds(2))
check sales.available == @[availability]
check eventually (await reservations.allAvailabilities) == @[availability]
test "makes storage available again when request expires":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
sales.add(availability)
return success()
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
await sleepAsync(1.millis)
clock.set(request.expiry.truncate(int64))
check eventually (sales.available == @[availability])
check eventually (await reservations.allAvailabilities) == @[availability]
test "adds proving for slot when slot is filled":
var soldSlotIndex: UInt256
sales.onSale = proc(availability: ?Availability,
request: StorageRequest,
sales.onSale = proc(request: StorageRequest,
slotIndex: UInt256) =
soldSlotIndex = slotIndex
check proving.slots.len == 0
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually proving.slots.len == 1
check proving.slots.contains(Slot(request: request, slotIndex: soldSlotIndex))
@ -269,9 +293,7 @@ suite "Sales":
market.activeRequests[me] = @[request.id]
await sales.load()
let expected = SalesData(requestId: request.id,
availability: none Availability,
request: some request)
let expected = SalesData(requestId: request.id, request: some request)
# because sales.load() calls agent.start, we won't know the slotIndex
# randomly selected for the agent, and we also won't know the value of
# `failed`/`fulfilled`/`cancelled` futures, so we need to compare
@ -280,7 +302,6 @@ suite "Sales":
# randomly re-assigned, so this may no longer be needed
proc `==` (data0, data1: SalesData): bool =
return data0.requestId == data1.requestId and
data0.availability == data1.availability and
data0.request == data1.request
check eventually sales.agents.len == 2

View File

@ -44,11 +44,6 @@ method run*(state: MockErrorState, machine: Machine): Future[?State] {.async.} =
suite "Sales agent":
let availability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=600.u256
)
var request = StorageRequest(
ask: StorageAsk(
slots: 4,
@ -79,7 +74,6 @@ suite "Sales agent":
agent = newSalesAgent(context,
request.id,
slotIndex,
some availability,
some request)
request.expiry = (getTime() + initDuration(hours=1)).toUnix.u256
@ -90,7 +84,6 @@ suite "Sales agent":
agent = newSalesAgent(context,
request.id,
slotIndex,
some availability,
none StorageRequest)
market.requested = @[request]
await agent.retrieveRequest()

View File

@ -1,4 +1,5 @@
import ./sales/testsales
import ./sales/teststates
import ./sales/testreservations
{.warning[UnusedImport]: off.}

View File

@ -13,6 +13,11 @@ type
var runs, cancellations, errors = [0, 0, 0, 0]
method `$`(state: State1): string = "State1"
method `$`(state: State2): string = "State2"
method `$`(state: State3): string = "State3"
method `$`(state: State4): string = "State4"
method run(state: State1, machine: Machine): Future[?State] {.async.} =
inc runs[0]
return some State(State2.new())

View File

@ -12,7 +12,7 @@ ethersuite "Collateral":
var token: TestToken
setup:
let deployment = deployment()
let deployment = Deployment.init()
marketplace = Marketplace.new(!deployment.address(Marketplace), provider.getSigner())
token = TestToken.new(!deployment.address(TestToken), provider.getSigner())
await token.mint(accounts[0], 1000.u256)

View File

@ -27,7 +27,7 @@ ethersuite "Marketplace contracts":
client = provider.getSigner(accounts[0])
host = provider.getSigner(accounts[1])
let deployment = deployment()
let deployment = Deployment.init()
marketplace = Marketplace.new(!deployment.address(Marketplace), provider.getSigner())
token = TestToken.new(!deployment.address(TestToken), provider.getSigner())

View File

@ -0,0 +1,17 @@
import std/os
import pkg/codex/contracts
import pkg/codex/stores
import ../ethertest
suite "Deployment":
let deploymentFile = "vendor" / "codex-contracts-eth" / "deployment-localhost.json"
test "can be instantiated with a deployment file":
let deployment = Deployment.init(deploymentFile)
test "contract address can be retreived from deployment json":
let deployment = Deployment.init(deploymentFile)
check deployment.address(Marketplace).isSome
test "can be instantiated without a deployment file":
discard Deployment.init()

View File

@ -1,34 +1,46 @@
import std/os
import codex/contracts
import pkg/datastore
import pkg/codex/contracts
import pkg/codex/stores
import ../ethertest
import ./examples
ethersuite "Marketplace Contract Interactions":
ethersuite "Marketplace Contract Interactions - Client":
let url = "http://localhost:8545"
let account = Address.example
let contractAddress = Address.example
var contracts: ContractInteractions
setup:
contracts = !ContractInteractions.new(account)
test "can be instantiated with a signer and deployment info":
let signer = provider.getSigner()
let deployment = deployment()
check ContractInteractions.new(signer, deployment).isSome
test "can be instantiated with a provider url":
let url = "http://localhost:8545"
let account = Address.example
let deployment = "vendor" / "codex-contracts-eth" / "deployment-localhost.json"
check ContractInteractions.new(url, account).isSome
check ContractInteractions.new(url, account, deployment).isSome
test "can be instantiated with a provider url, account, and contract address":
check ClientInteractions.new(url, account, contractAddress).isSuccess
test "provides purchasing":
check contracts.purchasing != nil
let client = !ClientInteractions.new(url, account, contractAddress)
check client.purchasing != nil
ethersuite "Marketplace Contract Interactions - Host":
let url = "http://localhost:8545"
let account = Address.example
let contractAddress = Address.example
var
repo: RepoStore
repoDs: Datastore
metaDs: Datastore
setup:
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
test "can be instantiated with a provider url, account, repo, and contract address":
check HostInteractions.new(url, account, repo, contractAddress).isSuccess
test "provides sales":
check contracts.sales != nil
let host = !HostInteractions.new(url, account, repo, contractAddress)
check host.sales != nil
test "provides proving":
check contracts.proving != nil
let host = !HostInteractions.new(url, account, repo, contractAddress)
check host.proving != nil

View File

@ -19,7 +19,7 @@ ethersuite "On-Chain Market":
var periodicity: Periodicity
setup:
let deployment = deployment()
let deployment = Deployment.init()
marketplace = Marketplace.new(!deployment.address(Marketplace), provider.getSigner())
token = TestToken.new(!deployment.address(TestToken), provider.getSigner())
await token.mint(accounts[0], 1_000_000_000.u256)
@ -253,14 +253,15 @@ ethersuite "On-Chain Market":
await token.approve(marketplace.address, request.price)
await market.requestStorage(request)
let slotId = request.slotId(slotIndex)
check (await market.getRequestFromSlotId(slotId)) == none StorageRequest
check (await market.getActiveSlot(slotId)) == none Slot
test "can retrieve request details from slot id":
await token.approve(marketplace.address, request.price)
await market.requestStorage(request)
await market.fillSlot(request.id, slotIndex, proof)
let slotId = request.slotId(slotIndex)
check (await market.getRequestFromSlotId(slotId)) == some request
let expected = Slot(request: request, slotIndex: slotIndex)
check (await market.getActiveSlot(slotId)) == some expected
test "retrieves correct slot state when request is unknown":
let slotId = request.slotId(slotIndex)

View File

@ -11,7 +11,7 @@ ethersuite "On-Chain Proofs":
var marketplace: Marketplace
setup:
let deployment = deployment()
let deployment = Deployment.init()
marketplace = Marketplace.new(!deployment.address(Marketplace), provider.getSigner())
proofs = OnChainProofs.new(marketplace)

View File

@ -1,5 +1,6 @@
import std/json
import pkg/chronos
import pkg/stint
import ../contracts/time
import ../codex/helpers/eventually
import ./twonodes
@ -62,8 +63,9 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check client1.getPurchase(id){"request"}{"ask"}{"reward"} == %"0x2"
test "nodes negotiate contracts on the marketplace":
let size: uint64 = 0xFFFFF
# client 2 makes storage available
discard client2.postAvailability(size=0xFFFFF, duration=200, minPrice=300)
discard client2.postAvailability(size=size, duration=200, minPrice=300)
# client 1 requests storage
let expiry = (await provider.currentTime()) + 30
@ -72,4 +74,7 @@ twonodessuite "Integration tests", debug1 = false, debug2 = false:
check eventually client1.getPurchase(purchase){"state"} == %"started"
check client1.getPurchase(purchase){"error"} == newJNull()
check client2.getAvailabilities().len == 0
let availabilities = client2.getAvailabilities()
check availabilities.len == 1
let newSize = UInt256.fromHex(availabilities[0]{"size"}.getStr)
check newSize > 0 and newSize < size.u256

View File

@ -11,7 +11,8 @@ twonodessuite "Proving integration test", debug1=false, debug2=false:
var config: MarketplaceConfig
setup:
marketplace = Marketplace.new(!deployment().address(Marketplace), provider)
let deployment = Deployment.init()
marketplace = Marketplace.new(!deployment.address(Marketplace), provider)
config = await marketplace.config()
await provider.getSigner(accounts[0]).mint()
await provider.getSigner(accounts[1]).mint()

View File

@ -5,13 +5,15 @@ import ../contracts/token
proc mint*(signer: Signer, amount = 1_000_000.u256) {.async.} =
## Mints a considerable amount of tokens and approves them for transfer to
## the Marketplace contract.
let token = TestToken.new(!deployment().address(TestToken), signer)
let marketplace = Marketplace.new(!deployment().address(Marketplace), signer)
let deployment = Deployment.init()
let token = TestToken.new(!deployment.address(TestToken), signer)
let marketplace = Marketplace.new(!deployment.address(Marketplace), signer)
await token.mint(await signer.getAddress(), amount)
proc deposit*(signer: Signer) {.async.} =
## Deposits sufficient collateral into the Marketplace contract.
let marketplace = Marketplace.new(!deployment().address(Marketplace), signer)
let deployment = Deployment.init()
let marketplace = Marketplace.new(!deployment.address(Marketplace), signer)
let config = await marketplace.config()
let tokenAddress = await marketplace.token()
let token = Erc20Token.new(tokenAddress, signer)

View File

@ -2,6 +2,7 @@ import ./contracts/testCollateral
import ./contracts/testContracts
import ./contracts/testMarket
import ./contracts/testProofs
import ./contracts/testDeployment
import ./contracts/testInteractions
import ./contracts/testClock

@ -1 +1 @@
Subproject commit cde543626236bd48188354d842cbe1513052c560
Subproject commit fb76f7d0b2f94914b00f2a0f4136ebfb27df6abc

2
vendor/questionable vendored

@ -1 +1 @@
Subproject commit 30e4184a99c8c1ba329925912d2c5d4b09acf8cc
Subproject commit 6cbbda7e4d009e02d0583b325b31dc68dff27854