[marketplace] reservations module

- add de/serialization for Availability
- add markUsed/markUnused in persisted availability
- add query for unused
- add reserve/release
- reservation module tests
- split ContractInteractions into client contracts and host contracts
- remove reservations start/stop as the repo start/stop is being managed by the node
- remove dedicated reservations metadata store and use the metadata store from the repo instead
- Split ContractInteractions into:
  - ClientInteractions (with purchasing)
  - HostInteractions (with sales and proving)
- compilation fix for nim 1.2

[repostore] fix started flag, add tests

[marketplace] persist slot index
For loading the sales state from chain, the slot index was not previously persisted in the contract. Will retrieve the slot index from the contract when the sales state is loaded.
This commit is contained in:
Eric Mastro 2023-03-15 17:43:31 +11:00
parent de3d97db1e
commit 3eacdd8a30
No known key found for this signature in database
28 changed files with 906 additions and 248 deletions

View File

@ -96,7 +96,10 @@ proc stop*(s: CodexServer) {.async.} =
s.runHandle.complete()
proc new(_: type ContractInteractions, config: CodexConf): ?ContractInteractions =
proc new(_: type ContractInteractions,
config: CodexConf,
repo: RepoStore): Contracts =
if not config.persistence:
if config.ethAccount.isSome:
warn "Ethereum account was set, but persistence is not enabled"
@ -106,10 +109,16 @@ proc new(_: type ContractInteractions, config: CodexConf): ?ContractInteractions
error "Persistence enabled, but no Ethereum account was set"
quit QuitFailure
var client: ?ClientInteractions
var host: ?HostInteractions
if deployment =? config.ethDeployment:
ContractInteractions.new(config.ethProvider, account, deployment)
client = ClientInteractions.new(config.ethProvider, account, deployment)
host = HostInteractions.new(config.ethProvider, account, repo, deployment)
else:
ContractInteractions.new(config.ethProvider, account)
client = ClientInteractions.new(config.ethProvider, account)
host = HostInteractions.new(config.ethProvider, account, repo)
(client, host)
proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey): T =
@ -177,7 +186,7 @@ proc new*(T: type CodexServer, config: CodexConf, privateKey: CodexPrivateKey):
engine = BlockExcEngine.new(repoStore, wallet, network, blockDiscovery, peerStore, pendingBlocks)
store = NetworkStore.new(engine, repoStore)
erasure = Erasure.new(store, leoEncoderProvider, leoDecoderProvider)
contracts = ContractInteractions.new(config)
contracts = ContractInteractions.new(config, repoStore)
codexNode = CodexNodeRef.new(switch, store, engine, erasure, discovery, contracts)
restServer = RestServerRef.new(
codexNode.initRestApi(config),

View File

@ -1,78 +1,5 @@
import pkg/ethers
import pkg/chronicles
import ../purchasing
import ../sales
import ../proving
import ./deployment
import ./marketplace
import ./market
import ./proofs
import ./clock
import ./interactions/interactions
import ./interactions/hostinteractions
import ./interactions/clientinteractions
export purchasing
export sales
export proving
export chronicles
type
ContractInteractions* = ref object
purchasing*: Purchasing
sales*: Sales
proving*: Proving
clock: OnChainClock
proc new*(_: type ContractInteractions,
signer: Signer,
deployment: Deployment): ?ContractInteractions =
without address =? deployment.address(Marketplace):
error "Unable to determine address of the Marketplace smart contract"
return none ContractInteractions
let contract = Marketplace.new(address, signer)
let market = OnChainMarket.new(contract)
let proofs = OnChainProofs.new(contract)
let clock = OnChainClock.new(signer.provider)
let proving = Proving.new(proofs, clock)
some ContractInteractions(
purchasing: Purchasing.new(market, clock),
sales: Sales.new(market, clock, proving),
proving: proving,
clock: clock
)
proc new*(_: type ContractInteractions,
providerUrl: string,
account: Address,
deploymentFile: string = string.default): ?ContractInteractions =
let provider = JsonRpcProvider.new(providerUrl)
let signer = provider.getSigner(account)
var deploy: Deployment
try:
if deploymentFile == string.default:
deploy = deployment()
else:
deploy = deployment(deploymentFile)
except IOError as e:
error "Unable to read deployment json", msg = e.msg
return none ContractInteractions
ContractInteractions.new(signer, deploy)
proc new*(_: type ContractInteractions,
account: Address): ?ContractInteractions =
ContractInteractions.new("ws://localhost:8545", account)
proc start*(interactions: ContractInteractions) {.async.} =
await interactions.clock.start()
await interactions.sales.start()
await interactions.proving.start()
await interactions.purchasing.start()
proc stop*(interactions: ContractInteractions) {.async.} =
await interactions.purchasing.stop()
await interactions.sales.stop()
await interactions.proving.stop()
await interactions.clock.stop()
export interactions, hostinteractions, clientinteractions

View File

@ -0,0 +1,55 @@
import pkg/ethers
import pkg/chronicles
import ../../purchasing
import ../deployment
import ../marketplace
import ../market
import ../proofs
import ../clock
import ./interactions
export purchasing
export chronicles
type
ClientInteractions* = ref object of ContractInteractions
purchasing*: Purchasing
proc new*(_: type ClientInteractions,
signer: Signer,
deployment: Deployment): ?ClientInteractions =
without address =? deployment.address(Marketplace):
error "Unable to determine address of the Marketplace smart contract"
return none ClientInteractions
let contract = Marketplace.new(address, signer)
let market = OnChainMarket.new(contract)
let clock = OnChainClock.new(signer.provider)
let c = ClientInteractions.new(clock)
c.purchasing = Purchasing.new(market, clock)
some c
proc new*(_: type ClientInteractions,
providerUrl: string,
account: Address,
deploymentFile: string = string.default): ?ClientInteractions =
without prepared =? prepare(providerUrl, account, deploymentFile):
return none ClientInteractions
ClientInteractions.new(prepared.signer, prepared.deploy)
proc new*(_: type ClientInteractions,
account: Address): ?ClientInteractions =
ClientInteractions.new("ws://localhost:8545", account)
proc start*(self: ClientInteractions) {.async.} =
await procCall ContractInteractions(self).start()
await self.purchasing.start()
proc stop*(self: ClientInteractions) {.async.} =
await self.purchasing.stop()
await procCall ContractInteractions(self).stop()

View File

@ -0,0 +1,67 @@
import pkg/ethers
import pkg/chronicles
import ../../sales
import ../../proving
import ../../stores
import ../deployment
import ../marketplace
import ../market
import ../proofs
import ../clock
import ./interactions
export sales
export proving
export chronicles
type
HostInteractions* = ref object of ContractInteractions
sales*: Sales
proving*: Proving
proc new*(_: type HostInteractions,
signer: Signer,
deployment: Deployment,
repoStore: RepoStore): ?HostInteractions =
without address =? deployment.address(Marketplace):
error "Unable to determine address of the Marketplace smart contract"
return none HostInteractions
let contract = Marketplace.new(address, signer)
let market = OnChainMarket.new(contract)
let proofs = OnChainProofs.new(contract)
let clock = OnChainClock.new(signer.provider)
let proving = Proving.new(proofs, clock)
let h = HostInteractions.new(clock)
h.sales = Sales.new(market, clock, proving, repoStore)
h.proving = proving
some h
proc new*(_: type HostInteractions,
providerUrl: string,
account: Address,
repo: RepoStore,
deploymentFile: string = string.default): ?HostInteractions =
without prepared =? prepare(providerUrl, account, deploymentFile):
return none HostInteractions
HostInteractions.new(prepared.signer, prepared.deploy, repo)
proc new*(_: type HostInteractions,
account: Address,
repo: RepoStore): ?HostInteractions =
HostInteractions.new("ws://localhost:8545", account, repo)
method start*(self: HostInteractions) {.async.} =
await procCall ContractInteractions(self).start()
await self.sales.start()
await self.proving.start()
method stop*(self: HostInteractions) {.async.} =
await self.sales.stop()
await self.proving.stop()
await procCall ContractInteractions(self).start()

View File

@ -0,0 +1,45 @@
import pkg/ethers
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import ../../errors
import ../deployment
import ../clock
type
ContractInteractions* = ref object of RootObj
clock: OnChainClock
ContractInteractionsError* = object of CodexError
ReadDeploymentFileFailureError* = object of ContractInteractionsError
method new*[T: ContractInteractions](_: type T, clock: OnChainClock): T {.base.} =
T(clock: clock)
proc prepare*(
providerUrl: string = "ws://localhost:8545",
account: Address,
deploymentFile: string = string.default):
?!tuple[signer: JsonRpcSigner, deploy: Deployment] =
let provider = JsonRpcProvider.new(providerUrl)
let signer = provider.getSigner(account)
var deploy: Deployment
try:
if deploymentFile == string.default:
deploy = deployment()
else:
deploy = deployment(deploymentFile)
except IOError as e:
let err = newException(ReadDeploymentFileFailureError,
"Unable to read deployment json")
err.parent = e
return failure(err)
return success((signer, deploy))
method start*(self: ContractInteractions) {.async, base.} =
await self.clock.start()
method stop*(self: ContractInteractions) {.async, base.} =
await self.clock.stop()

View File

@ -73,13 +73,15 @@ method getHost(market: OnChainMarket,
else:
return none Address
method getRequestFromSlotId*(market: OnChainMarket,
slotId: SlotId): Future[?StorageRequest] {.async.} =
method getActiveSlot*(
market: OnChainMarket,
slotId: SlotId): Future[?(StorageRequest, UInt256)] {.async.} =
try:
return some await market.contract.getRequestFromSlotId(slotId)
return some await market.contract.getActiveSlot(slotId)
except ProviderError as e:
if e.revertReason.contains("Slot is free"):
return none StorageRequest
return none (StorageRequest, UInt256)
raise e
method fillSlot(market: OnChainMarket,

View File

@ -45,7 +45,7 @@ proc withdrawFunds*(marketplace: Marketplace, requestId: RequestId) {.contract.}
proc freeSlot*(marketplace: Marketplace, id: SlotId) {.contract.}
proc getRequest*(marketplace: Marketplace, id: RequestId): StorageRequest {.contract, view.}
proc getHost*(marketplace: Marketplace, id: SlotId): Address {.contract, view.}
proc getRequestFromSlotId*(marketplace: Marketplace, id: SlotId): StorageRequest {.contract, view.}
proc getActiveSlot*(marketplace: Marketplace, id: SlotId): (StorageRequest, UInt256) {.contract, view.}
proc myRequests*(marketplace: Marketplace): seq[RequestId] {.contract, view.}
proc mySlots*(marketplace: Marketplace): seq[SlotId] {.contract, view.}

View File

@ -53,8 +53,10 @@ method getHost*(market: Market,
slotIndex: UInt256): Future[?Address] {.base, async.} =
raiseAssert("not implemented")
method getRequestFromSlotId*(market: Market,
slotId: SlotId): Future[?StorageRequest] {.base, async.} =
method getActiveSlot*(
market: Market,
slotId: SlotId): Future[?(StorageRequest, UInt256)] {.base, async.} =
raiseAssert("not implemented")
method fillSlot*(market: Market,

View File

@ -43,6 +43,10 @@ type
CodexError = object of CatchableError
Contracts* = tuple
client: ?ClientInteractions
host: ?HostInteractions
CodexNodeRef* = ref object
switch*: Switch
networkId*: PeerId
@ -50,7 +54,7 @@ type
engine*: BlockExcEngine
erasure*: Erasure
discovery*: Discovery
contracts*: ?ContractInteractions
contracts*: Contracts
proc findPeer*(
node: CodexNodeRef,
@ -250,7 +254,7 @@ proc requestStorage*(self: CodexNodeRef,
##
trace "Received a request for storage!", cid, duration, nodes, tolerance, reward
without contracts =? self.contracts:
without contracts =? self.contracts.client:
trace "Purchasing not available"
return failure "Purchasing not available"
@ -307,7 +311,7 @@ proc new*(
engine: BlockExcEngine,
erasure: Erasure,
discovery: Discovery,
contracts = ContractInteractions.none): T =
contracts: Contracts = (ClientInteractions.none, HostInteractions.none)): T =
T(
switch: switch,
blockStore: store,
@ -329,9 +333,9 @@ proc start*(node: CodexNodeRef) {.async.} =
if not node.discovery.isNil:
await node.discovery.start()
if contracts =? node.contracts:
if hostContracts =? node.contracts.host:
# TODO: remove Sales callbacks, pass BlockStore and StorageProofs instead
contracts.sales.onStore = proc(request: StorageRequest,
hostContracts.sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
## store data in local storage
@ -354,22 +358,29 @@ proc start*(node: CodexNodeRef) {.async.} =
if fetchRes.isErr:
raise newException(CodexError, "Unable to retrieve blocks")
contracts.sales.onClear = proc(availability: ?Availability,
hostContracts.sales.onClear = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) =
# TODO: remove data from local storage
discard
contracts.sales.onProve = proc(request: StorageRequest,
hostContracts.sales.onProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.async.} =
# TODO: generate proof
return @[42'u8]
try:
await contracts.start()
await hostContracts.start()
except CatchableError as error:
error "Unable to start contract interactions: ", error=error.msg
node.contracts = ContractInteractions.none
error "Unable to start host contract interactions: ", error=error.msg
node.contracts.host = HostInteractions.none
if clientContracts =? node.contracts.client:
try:
await clientContracts.start()
except CatchableError as error:
error "Unable to start client contract interactions: ", error=error.msg
node.contracts.client = ClientInteractions.none
node.networkId = node.switch.peerInfo.peerId
notice "Started codex node", id = $node.networkId, addrs = node.switch.peerInfo.addrs
@ -389,8 +400,11 @@ proc stop*(node: CodexNodeRef) {.async.} =
if not node.discovery.isNil:
await node.discovery.stop()
if contracts =? node.contracts:
await contracts.stop()
if clientContracts =? node.contracts.client:
await clientContracts.stop()
if hostContracts =? node.contracts.host:
await hostContracts.stop()
if not node.blockStore.isNil:
await node.blockStore.close

View File

@ -248,10 +248,13 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
"/api/codex/v1/sales/availability") do () -> RestApiResponse:
## Returns storage that is for sale
without contracts =? node.contracts:
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")
let json = %contracts.sales.available
without unused =? (await contracts.sales.reservations.unused), err:
return RestApiResponse.error(Http500, err.msg)
let json = %unused
return RestApiResponse.response($json)
router.rawApi(
@ -263,7 +266,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
## duration - maximum time the storage should be sold for (in seconds)
## minPrice - minimum price to be paid (in amount of tokens)
without contracts =? node.contracts:
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")
let body = await request.getBody()
@ -271,7 +274,11 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
without availability =? Availability.fromJson(body), error:
return RestApiResponse.error(Http400, error.msg)
contracts.sales.add(availability)
if not contracts.sales.reservations.available(availability.size.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")
if err =? (await contracts.sales.reservations.reserve(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
let json = %availability
return RestApiResponse.response($json)
@ -281,7 +288,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
"/api/codex/v1/storage/purchases/{id}") do (
id: PurchaseId) -> RestApiResponse:
without contracts =? node.contracts:
without contracts =? node.contracts.client:
return RestApiResponse.error(Http503, "Purchasing unavailable")
without id =? id.tryGet.catch, error:

View File

@ -39,13 +39,13 @@ proc fromJson*(_: type StorageRequestParams,
func `%`*(address: Address): JsonNode =
% $address
func `%`*(stint: StInt|StUint): JsonNode =
func `%`*(stint: StInt|StUInt): JsonNode =
%("0x" & stint.toHex)
func `%`*(arr: openArray[byte]): JsonNode =
%("0x" & arr.toHex)
func `%`*(id: RequestId | SlotId | Nonce): JsonNode =
func `%`*(id: RequestId | SlotId | Nonce | AvailabilityId): JsonNode =
% id.toArray
func `%`*(purchase: Purchase): JsonNode =

View File

@ -4,14 +4,15 @@ import pkg/upraises
import pkg/stint
import pkg/nimcrypto
import pkg/chronicles
import pkg/datastore
import ./rng
import ./market
import ./clock
import ./proving
import ./stores
import ./contracts/requests
import ./sales/salescontext
import ./sales/salesagent
import ./sales/availability
import ./sales/statemachine
import ./sales/states/downloading
import ./sales/states/unknown
@ -35,13 +36,12 @@ import ./sales/states/unknown
## | | ---- storage proof ---> |
export stint
export availability
export reservations
type
Sales* = ref object
context*: SalesContext
subscription*: ?market.Subscription
available: seq[Availability]
agents*: seq[SalesAgent]
proc `onStore=`*(sales: Sales, onStore: OnStore) =
@ -64,77 +64,36 @@ proc onClear*(sales: Sales): ?OnClear = sales.context.onClear
proc onSale*(sales: Sales): ?OnSale = sales.context.onSale
proc available*(sales: Sales): seq[Availability] = sales.available
proc init*(_: type Availability,
size: UInt256,
duration: UInt256,
minPrice: UInt256): Availability =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
func add*(sales: Sales, availability: Availability) =
if not sales.available.contains(availability):
sales.available.add(availability)
# TODO: add to disk (persist), serialise to json.
func remove*(sales: Sales, availability: Availability) =
sales.available.keepItIf(it != availability)
# TODO: remove from disk availability, mark as in use by assigning
# a slotId, so that it can be used for restoration (node restart)
func new*(_: type Sales,
market: Market,
clock: Clock,
proving: Proving): Sales =
proving: Proving,
repo: RepoStore): Sales =
let sales = Sales(context: SalesContext(
Sales(context: SalesContext(
market: market,
clock: clock,
proving: proving
proving: proving,
reservations: Reservations.new(repo)
))
proc onSaleErrored(availability: Availability) =
sales.add(availability)
sales.context.onSaleErrored = some onSaleErrored
sales
func findAvailability*(sales: Sales, ask: StorageAsk): ?Availability =
for availability in sales.available:
if ask.slotSize <= availability.size and
ask.duration <= availability.duration and
ask.pricePerSlot >= availability.minPrice:
return some availability
proc randomSlotIndex(numSlots: uint64): UInt256 =
let rng = Rng.instance
let slotIndex = rng.rand(numSlots - 1)
return slotIndex.u256
proc findSlotIndex(numSlots: uint64,
requestId: RequestId,
slotId: SlotId): ?UInt256 =
for i in 0..<numSlots:
if slotId(requestId, i.u256) == slotId:
return some i.u256
return none UInt256
proc handleRequest(sales: Sales,
requestId: RequestId,
ask: StorageAsk) =
without availability =? sales.findAvailability(ask):
return
sales.remove(availability)
let reservations = sales.context.reservations
# TODO: check if random slot is actually available (not already filled)
let slotIndex = randomSlotIndex(ask.slots)
let agent = newSalesAgent(
sales.context,
requestId,
slotIndex,
some availability,
none Availability,
none StorageRequest
)
agent.start(SaleDownloading())
@ -143,17 +102,15 @@ proc handleRequest(sales: Sales,
proc load*(sales: Sales) {.async.} =
let market = sales.context.market
# TODO: restore availability from disk
let slotIds = await market.mySlots()
for slotId in slotIds:
# TODO: this needs to be optimised
if request =? await market.getRequestFromSlotId(slotId):
let availability = sales.findAvailability(request.ask)
without slotIndex =? findSlotIndex(request.ask.slots,
request.id,
slotId):
raiseAssert "could not find slot index"
if (request, slotIndex) =? (await market.getActiveSlot(slotId)):
let availability = await sales.context.reservations.find(
request.ask.slotSize,
request.ask.duration,
request.ask.pricePerSlot,
used = true)
let agent = newSalesAgent(
sales.context,

View File

@ -1,8 +0,0 @@
import pkg/stint
type
Availability* = object
id*: array[32, byte]
size*: UInt256
duration*: UInt256
minPrice*: UInt256

View File

@ -0,0 +1,314 @@
## Nim-Codex
## Copyright (c) 2022 Status Research & Development GmbH
## Licensed under either of
## * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
## * MIT license ([LICENSE-MIT](LICENSE-MIT))
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
import std/typetraits
import pkg/chronos
import pkg/chronicles
import pkg/upraises
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/stint
import pkg/nimcrypto
push: {.upraises: [].}
import pkg/datastore
import pkg/stew/byteutils
import ../stores
import ../namespaces
import ../contracts/requests
export requests
type
AvailabilityId* = distinct array[32, byte]
Availability* = object
id*: AvailabilityId
size*: UInt256
duration*: UInt256
minPrice*: UInt256
slotId*: ?SlotId
Reservations* = ref object
repo: RepoStore
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
AvailabilityIter* = ref object
finished*: bool
next*: GetNext
AvailabilityError* = object of CodexError
AvailabilityNotExistsError* = object of AvailabilityError
AvailabilityAlreadyExistsError* = object of AvailabilityError
AvailabilityReserveFailedError* = object of AvailabilityError
AvailabilityReleaseFailedError* = object of AvailabilityError
AvailabilityDeleteFailedError* = object of AvailabilityError
AvailabilityPutFailedError* = object of AvailabilityError
AvailabilityGetFailedError* = object of AvailabilityError
const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet
proc new*(
T: type Reservations,
repo: RepoStore): Reservations =
T(repo: repo)
proc init*(
_: type Availability,
size: UInt256,
duration: UInt256,
minPrice: UInt256): Availability =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice)
func toArray*(id: AvailabilityId): array[32, byte] =
array[32, byte](id)
proc `==`*(x, y: AvailabilityId): bool {.borrow.}
proc `==`*(x, y: Availability): bool =
x.id == y.id and
x.size == y.size and
x.duration == y.duration and
x.minPrice == y.minPrice
proc toErr[E1: ref CatchableError, E2: AvailabilityError](
e1: E1,
_: type E2,
msg: string = "see inner exception"): ref E2 =
return newException(E2, msg, e1)
proc writeValue*(
writer: var JsonWriter,
value: SlotId | AvailabilityId) {.upraises:[IOError].} =
mixin writeValue
writer.writeValue value.toArray
proc readValue*[T: SlotId | AvailabilityId](
reader: var JsonReader,
value: var T) {.upraises: [SerializationError, IOError].} =
mixin readValue
value = T reader.readValue(T.distinctBase)
func used*(availability: Availability): bool =
availability.slotId.isSome
func key(id: AvailabilityId): ?!Key =
(ReservationsKey / id.toArray.toHex)
func key*(availability: Availability): ?!Key =
return availability.id.key
func available*(self: Reservations): uint =
return self.repo.quotaMaxBytes - self.repo.totalUsed
func available*(self: Reservations, bytes: uint): bool =
return bytes < self.available()
proc exists*(
self: Reservations,
id: AvailabilityId): Future[?!bool] {.async.} =
without key =? id.key, err:
return failure(err)
let exists = await self.repo.metaDs.contains(key)
return success(exists)
proc get*(
self: Reservations,
id: AvailabilityId): Future[?!Availability] {.async.} =
if exists =? (await self.exists(id)) and not exists:
let err = newException(AvailabilityNotExistsError,
"Availability does not exist")
return failure(err)
without key =? id.key, err:
return failure(err)
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err)
without availability =? Json.decode(serialized, Availability).catch, err:
return failure(err)
return success availability
proc update(
self: Reservations,
availability: Availability,
slotId: ?SlotId): Future[?!void] {.async.} =
without var updated =? await self.get(availability.id), err:
return failure(err)
updated.slotId = slotId
without key =? availability.key, err:
return failure(err)
if err =? (await self.repo.metaDs.put(
key,
@(updated.toJson.toBytes))).errorOption:
return failure(err)
return success()
proc reserve*(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
if exists =? (await self.exists(availability.id)) and exists:
let err = newException(AvailabilityAlreadyExistsError,
"Availability already exists")
return failure(err)
without key =? availability.key, err:
return failure(err)
if err =? (await self.repo.metaDs.put(
key,
@(availability.toJson.toBytes))).errorOption:
return failure(err)
# TODO: reconcile data sizes -- availability uses UInt256 and RepoStore
# uses uint, thus the need to truncate
if reserveInnerErr =? (await self.repo.reserve(
availability.size.truncate(uint))).errorOption:
let reserveErr = reserveInnerErr.toErr(AvailabilityReserveFailedError,
"Availability reservation failed")
# rollback persisted availability
if rollbackInnerErr =? (await self.repo.metaDs.delete(key)).errorOption:
let rollbackErr = rollbackInnerErr.toErr(AvailabilityDeleteFailedError,
"Failed to delete persisted availability during rollback")
rollbackInnerErr.parent = reserveErr
return failure(rollbackErr)
return failure(reserveErr)
return success()
# TODO: call site not yet determined. Perhaps reuse of Availabilty should be set
# on creation (from the REST endpoint). Reusable availability wouldn't get
# released after contract completion. Non-reusable availability would.
proc release*(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
without availability =? (await self.get(id)), err:
return failure(err.toErr(AvailabilityGetFailedError))
without key =? id.key, err:
return failure(err)
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(AvailabilityDeleteFailedError))
# TODO: reconcile data sizes -- availability uses UInt256 and RepoStore
# uses uint, thus the need to truncate
if releaseInnerErr =? (await self.repo.release(
availability.size.truncate(uint))).errorOption:
let releaseErr = releaseInnerErr.toErr(AvailabilityReleaseFailedError)
# rollback delete
if rollbackInnerErr =? (await self.repo.metaDs.put(
key,
@(availability.toJson.toBytes))).errorOption:
let rollbackErr = rollbackInnerErr.toErr(
AvailabilityPutFailedError,
"Failed to restore persisted availability during rollback")
rollbackInnerErr.parent = releaseErr
return failure(rollbackErr)
return failure(releaseErr)
return success()
proc markUsed*(
self: Reservations,
availability: Availability,
slotId: SlotId): Future[?!void] {.async.} =
return await self.update(availability, some slotId)
proc markUnused*(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
return await self.update(availability, none SlotId)
iterator items*(self: AvailabilityIter): Future[?Availability] =
while not self.finished:
yield self.next()
proc availabilities*(
self: Reservations): Future[?!AvailabilityIter] {.async.} =
var iter = AvailabilityIter()
let query = Query.init(ReservationsKey)
without results =? await self.repo.metaDs.query(query), err:
return failure(err)
proc next(): Future[?Availability] {.async.} =
await idleAsync()
iter.finished = results.finished
if not results.finished and
r =? (await results.next()) and
serialized =? r.data and
serialized.len > 0:
return some Json.decode(string.fromBytes(serialized), Availability)
return none Availability
iter.next = next
return success iter
proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
without availabilities =? (await r.availabilities), err:
return failure(err)
for a in availabilities:
if availability =? (await a) and not availability.used:
ret.add availability
return success(ret)
proc find*(
self: Reservations,
size, duration, minPrice: UInt256,
used: bool): Future[?Availability] {.async.} =
without availabilities =? (await self.availabilities), err:
error "failed to get all availabilities", error = err.msg
return none Availability
for a in availabilities:
if availability =? (await a):
let satisfiesUsed = (used and availability.used) or
(not used and not availability.used)
if satisfiesUsed and
size <= availability.size and
duration <= availability.duration and
minPrice >= availability.minPrice:
return some availability

View File

@ -5,7 +5,9 @@ import ../utils/asyncspawn
import ./statemachine
import ./salescontext
import ./salesdata
import ./availability
import ./reservations
export reservations
type SalesAgent* = ref object of Machine
context*: SalesContext

View File

@ -2,7 +2,7 @@ import pkg/upraises
import ../market
import ../clock
import ../proving
import ./availability
import ./reservations
type
SalesContext* = ref object
@ -12,17 +12,17 @@ type
onProve*: ?OnProve
onClear*: ?OnClear
onSale*: ?OnSale
onSaleErrored*: ?OnSaleErrored
proving*: Proving
reservations*: Reservations
OnStore* = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability): Future[void] {.gcsafe, upraises: [].}
OnProve* = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].}
OnClear* = proc(availability: ?Availability,# TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all)
OnClear* = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale* = proc(availability: ?Availability, # TODO: when availability changes introduced, make availability non-optional (if we need to keep it at all)
OnSale* = proc(availability: ?Availability,
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSaleErrored* = proc(availability: Availability) {.gcsafe, upraises: [].}

View File

@ -1,13 +1,13 @@
import pkg/chronos
import ../contracts/requests
import ../market
import ./availability
import ./reservations
type
SalesData* = ref object
requestId*: RequestId
ask*: StorageAsk
availability*: ?Availability # TODO: when availability persistence is added, change this to not optional
availability*: ?Availability
request*: ?StorageRequest
slotIndex*: UInt256
failed*: market.Subscription

View File

@ -1,3 +1,6 @@
import pkg/chronicles
import pkg/questionable
import pkg/questionable/results
import ../../market
import ../salesagent
import ../statemachine
@ -38,5 +41,24 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
without request =? data.request:
raiseAssert "no sale request"
without availability =? await context.reservations.find(
request.ask.slotSize,
request.ask.duration,
request.ask.pricePerSlot,
used = false):
info "no availability found for request, ignoring",
slotSize = request.ask.slotSize,
duration = request.ask.duration,
pricePerSlot = request.ask.pricePerSlot,
used = false
return
data.availability = some availability
if err =? (await agent.context.reservations.markUsed(
availability,
request.slotId(data.slotIndex))).errorOption:
raiseAssert "failed to mark availability as used, error: " & err.msg
await onStore(request, data.slotIndex, data.availability)
return some State(SaleProving())

View File

@ -1,3 +1,5 @@
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import pkg/chronicles
import ../statemachine
@ -21,13 +23,13 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
slotIndex =? data.slotIndex:
onClear(data.availability, request, slotIndex)
# TODO: when availability persistence is added, change this to not optional
# NOTE: with this in place, restoring state for a restarted node will
# never free up availability once finished. Persisting availability
# on disk is required for this.
if onSaleErrored =? context.onSaleErrored and
availability =? data.availability:
onSaleErrored(availability)
if availability =? data.availability:
let reservations = context.reservations
if (exists =? await reservations.exists(availability.id)) and
exists == true:
if err =? (await reservations.markUnused(availability)).errorOption:
raiseAssert "Failed to mark availability unused, error: " & err.msg
await agent.unsubscribe()

View File

@ -372,7 +372,7 @@ proc stop*(self: RepoStore): Future[void] {.async.} =
## Stop repo
##
if self.started:
if not self.started:
trace "Repo is not started"
return
@ -380,6 +380,8 @@ proc stop*(self: RepoStore): Future[void] {.async.} =
(await self.repoDs.close()).expect("Should close repo store!")
(await self.metaDs.close()).expect("Should close meta store!")
self.started = false
func new*(
T: type RepoStore,
repoDs: Datastore,

View File

@ -102,12 +102,15 @@ method getRequest(market: MockMarket,
return some request
return none StorageRequest
method getRequestFromSlotId*(market: MockMarket,
slotId: SlotId): Future[?StorageRequest] {.async.} =
method getActiveSlot*(
market: MockMarket,
slotId: SlotId): Future[?(StorageRequest, UInt256)] {.async.} =
for slot in market.filled:
if slotId(slot.requestId, slot.slotIndex) == slotId:
return await market.getRequest(slot.requestId)
return none StorageRequest
if slotId(slot.requestId, slot.slotIndex) == slotId and
request =? await market.getRequest(slot.requestId):
return some (request, slot.slotIndex)
return none (StorageRequest, UInt256)
method requestState*(market: MockMarket,
requestId: RequestId): Future[?RequestState] {.async.} =
@ -115,11 +118,9 @@ method requestState*(market: MockMarket,
method slotState*(market: MockMarket,
slotId: SlotId): Future[SlotState] {.async.} =
if market.slotState.hasKey(slotId):
return market.slotState[slotId]
else:
if not market.slotState.hasKey(slotId):
return SlotState.Free
return market.slotState[slotId]
method getRequestEnd*(market: MockMarket,
id: RequestId): Future[SecondsSince1970] {.async.} =

View File

@ -0,0 +1,16 @@
import std/algorithm
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales/reservations
proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
without availabilities =? (await r.availabilities), err:
raiseAssert "failed to get availabilities, error: " & err.msg
for a in availabilities:
if availability =? (await a):
ret.add availability
return ret.reversed()

View File

@ -0,0 +1,201 @@
import std/sequtils
import std/sugar
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/asynctest
import pkg/datastore
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/stew/byteutils
import pkg/codex/stores
import pkg/codex/sales
import ../examples
import ./helpers
suite "Reservations module":
var
repo: RepoStore
repoDs: Datastore
metaDs: Datastore
availability: Availability
reservations: Reservations
setup:
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
reservations = Reservations.new(repo)
availability = Availability.example
test "availability can be serialised and deserialised":
let availability = Availability.example
let serialised = availability.toJson
check Json.decode(serialised, Availability) == availability
test "has no availability initially":
check (await reservations.allAvailabilities()).len == 0
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.u256, 2.u256, 3.u256)
let availability2 = Availability.init(1.u256, 2.u256, 3.u256)
check availability1.id != availability2.id
test "can reserve available storage":
let availability1 = Availability.example
let availability2 = Availability.example
check isOk await reservations.reserve(availability1)
check isOk await reservations.reserve(availability2)
let availabilities = await reservations.allAvailabilities()
check:
# perform unordered checks
availabilities.len == 2
availabilities.contains(availability1)
availabilities.contains(availability2)
test "reserved availability exists":
check isOk await reservations.reserve(availability)
without exists =? await reservations.exists(availability.id):
fail()
check exists
test "reserved availability can be released":
check isOk await reservations.reserve(availability)
check isOk await reservations.release(availability.id)
without exists =? await reservations.exists(availability.id):
fail()
check not exists
test "non-existant availability cannot be released":
let r = await reservations.release(availability.id)
check r.error of AvailabilityGetFailedError
check r.error.parent of AvailabilityNotExistsError
test "added availability is not used initially":
check isOk await reservations.reserve(availability)
without available =? await reservations.get(availability.id):
fail()
check not available.used
test "availability can be marked used":
check isOk await reservations.reserve(availability)
check isOk await reservations.markUsed(availability, SlotId.example)
without available =? await reservations.get(availability.id):
fail()
check available.used
test "availability can be marked unused":
check isOk await reservations.reserve(availability)
check isOk await reservations.markUsed(availability, SlotId.example)
check isOk await reservations.markUnused(availability)
without available =? await reservations.get(availability.id):
fail()
check not available.used
test "used availability can be found":
check isOk await reservations.reserve(availability)
check isOk await reservations.markUsed(availability, SlotId.example)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, used = true):
fail()
test "unused availability can be found":
check isOk await reservations.reserve(availability)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, used = false):
fail()
test "non-existant availability cannot be found":
check isNone (await reservations.find(availability.size,
availability.duration, availability.minPrice, used = false))
test "non-existant availability cannot be retrieved":
let r = await reservations.get(availability.id)
check r.error of AvailabilityNotExistsError
test "same availability cannot be reserved twice":
check isOk await reservations.reserve(availability)
let r = await reservations.reserve(availability)
check r.error of AvailabilityAlreadyExistsError
test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes
test "reserving availability reduces available bytes":
check isOk await reservations.reserve(availability)
check reservations.available ==
DefaultQuotaBytes - availability.size.truncate(uint)
test "reports quota available to be reserved":
check reservations.available(availability.size.truncate(uint))
test "reports quota not available to be reserved":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
check not reservations.available(availability.size.truncate(uint))
test "fails to reserve availability size that is larger than available quota":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
let r = await reservations.reserve(availability)
check r.error of AvailabilityReserveFailedError
check r.error.parent of QuotaNotEnoughError
test "rolls back persisted availability if repo reservation fails":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
check exists =? (await reservations.exists(availability.id)) and not exists
test "fails to release availability size that is larger than available quota":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint))
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
# increase size of availability past repo quota, so that the next release
# will fail
availability.size += 1.u256
let key = !(availability.key)
check isOk await metaDs.put(key, @(availability.toJson.toBytes))
let r = await reservations.release(availability.id)
check r.error of AvailabilityReleaseFailedError
check r.error.parent.msg == "Cannot release this many bytes"
test "rolls back persisted availability if repo release fails":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint))
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
# increase size of availability past repo quota, so that the next release
# will fail
availability.size += 1.u256
let key = !(availability.key)
check isOk await metaDs.put(key, @(availability.toJson.toBytes))
discard await reservations.release(availability.id)
check exists =? (await reservations.exists(availability.id)) and exists

View File

@ -4,13 +4,19 @@ import std/sugar
import std/times
import pkg/asynctest
import pkg/chronos
import pkg/datastore
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales
import pkg/codex/sales/salesdata
import pkg/codex/sales/reservations
import pkg/codex/stores/repostore
import pkg/codex/proving
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/eventually
import ../examples
import ./helpers
suite "Sales":
@ -37,12 +43,17 @@ suite "Sales":
var market: MockMarket
var clock: MockClock
var proving: Proving
var reservations: Reservations
setup:
market = MockMarket.new()
clock = MockClock.new()
proving = Proving.new()
sales = Sales.new(market, clock, proving)
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
let repo = RepoStore.new(repoDs, metaDs)
sales = Sales.new(market, clock, proving, repo)
reservations = sales.context.reservations
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
@ -56,48 +67,33 @@ suite "Sales":
teardown:
await sales.stop()
test "has no availability initially":
check sales.available.len == 0
test "can add available storage":
let availability1 = Availability.example
let availability2 = Availability.example
sales.add(availability1)
check sales.available.contains(availability1)
sales.add(availability2)
check sales.available.contains(availability1)
check sales.available.contains(availability2)
test "can remove available storage":
sales.add(availability)
sales.remove(availability)
check sales.available.len == 0
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.u256, 2.u256, 3.u256)
let availability2 = Availability.init(1.u256, 2.u256, 3.u256)
check availability1.id != availability2.id
test "makes storage unavailable when matching request comes in":
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually sales.available.len == 0
await sleepAsync(1.millis)
without availability =? await reservations.get(availability.id):
fail()
check availability.used
test "ignores request when no matching storage is available":
sales.add(availability)
check isOk await reservations.reserve(availability)
var tooBig = request
tooBig.ask.slotSize = request.ask.slotSize + 1
await market.requestStorage(tooBig)
await sleepAsync(1.millis)
check eventually sales.available == @[availability]
without availability =? await reservations.get(availability.id):
fail()
check not availability.used
test "ignores request when reward is too low":
sales.add(availability)
check isOk await reservations.reserve(availability)
var tooCheap = request
tooCheap.ask.reward = request.ask.reward - 1
await market.requestStorage(tooCheap)
await sleepAsync(1.millis)
check eventually sales.available == @[availability]
without availability =? await reservations.get(availability.id):
fail()
check not availability.used
test "retrieves and stores data locally":
var storingRequest: StorageRequest
@ -110,7 +106,7 @@ suite "Sales":
storingSlot = slot
check availability.isSome
storingAvailability = !availability
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
@ -124,11 +120,12 @@ suite "Sales":
# raise an exception so machine.onError is called
raise newException(ValueError, "some error")
# onSaleErrored is called in SaleErrored.run
proc onSaleErrored(availability: Availability) =
# onClear is called in SaleErrored.run
sales.onClear = proc(availability: ?Availability,
request: StorageRequest,
idx: UInt256) =
saleFailed = true
sales.context.onSaleErrored = some onSaleErrored
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually saleFailed
@ -138,10 +135,12 @@ suite "Sales":
slot: UInt256,
availability: ?Availability) {.async.} =
raise error
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
await sleepAsync(1.millis)
check eventually sales.available == @[availability]
without availability =? await reservations.get(availability.id):
fail()
check not availability.used
test "generates proof of storage":
var provingRequest: StorageRequest
@ -150,13 +149,13 @@ suite "Sales":
slot: UInt256): Future[seq[byte]] {.async.} =
provingRequest = request
provingSlot = slot
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually provingRequest == request
check provingSlot < request.ask.slots.u256
test "fills a slot":
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually market.filled.len == 1
check market.filled[0].requestId == request.id
@ -175,7 +174,7 @@ suite "Sales":
soldAvailability = a
soldRequest = request
soldSlotIndex = slotIndex
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually soldAvailability == availability
check soldRequest == request
@ -197,7 +196,7 @@ suite "Sales":
clearedAvailability = a
clearedRequest = request
clearedSlotIndex = slotIndex
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually clearedAvailability == availability
check clearedRequest == request
@ -209,24 +208,26 @@ suite "Sales":
slot: UInt256,
availability: ?Availability) {.async.} =
await sleepAsync(chronos.hours(1))
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
await sleepAsync(1.millis)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
await sleepAsync(chronos.seconds(2))
check sales.available == @[availability]
without availabilities =? (await reservations.allAvailabilities):
fail()
check availabilities == @[availability]
test "makes storage available again when request expires":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
availability: ?Availability) {.async.} =
await sleepAsync(chronos.hours(1))
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
await sleepAsync(1.millis)
clock.set(request.expiry.truncate(int64))
check eventually (sales.available == @[availability])
check eventually (await reservations.allAvailabilities) == @[availability]
test "adds proving for slot when slot is filled":
var soldSlotIndex: UInt256
@ -235,7 +236,7 @@ suite "Sales":
slotIndex: UInt256) =
soldSlotIndex = slotIndex
check proving.slots.len == 0
sales.add(availability)
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check eventually proving.slots.len == 1
check proving.slots.contains(request.slotId(soldSlotIndex))

View File

@ -48,6 +48,25 @@ suite "RepoStore":
proc createTestBlock(size: int): bt.Block =
bt.Block.new('a'.repeat(size).toBytes).tryGet()
test "Should set started flag once started":
await repo.start()
check repo.started
test "Should set started flag to false once stopped":
await repo.start()
await repo.stop()
check not repo.started
test "Should allow start to be called multiple times":
await repo.start()
await repo.start()
check repo.started
test "Should allow stop to be called multiple times":
await repo.stop()
await repo.stop()
check not repo.started
test "Should update current used bytes on block put":
let blk = createTestBlock(200)

View File

@ -1,4 +1,5 @@
import ./sales/testsales
import ./sales/teststates
import ./sales/testreservations
{.warning[UnusedImport]: off.}

@ -1 +1 @@
Subproject commit cde543626236bd48188354d842cbe1513052c560
Subproject commit 974e5470b50957411a0c9efa1b4948aa01b90d62

2
vendor/questionable vendored

@ -1 +1 @@
Subproject commit 30e4184a99c8c1ba329925912d2c5d4b09acf8cc
Subproject commit 6cbbda7e4d009e02d0583b325b31dc68dff27854