feat(marketplace): persistent availabilities (#1099)

* Add availability enabled parameter

* Return bytes to availability when finished

* Add until parameter

* Remove debug message

* Clean up and fix tests

* Update documentations and cleanup

* Avoid swallowing CancelledError

* Move until validation to reservations module

* Call onAvailabilityAdded callabck when the availability is enabled in sales

* Remove until validation in restapi when creating an availability

* Add openapi documentation

* Use results instead of stew/results (#1112)

* feat: request duration limit (#1057)

* feat: request duration limit

* Fix tests and duration type

* Add custom error

* Remove merge issue

* Update codex contracts eth

* Update market config and fix test

* Fix SlotReservationsConfig syntax

* Update dependencies

* test: remove doubled test

* chore: update contracts repo

---------

Co-authored-by: Arnaud <arnaud@status.im>

* fix(statemachine): do not raise from state.run (#1115)

* fix(statemachine): do not raise from state.run

* fix rebase

* fix exception handling in SaleProvingSimulated.prove

- re-raise CancelledError
- don't return State on CatchableError
- expect the Proofs_InvalidProof custom error instead of checking a string

* asyncSpawn salesagent.onCancelled

This was swallowing a KeyError in one of the tests (fixed in the previous commit)

* remove error handling states in asyncstatemachine

* revert unneeded changes

* formatting

* PR feedback, logging updates

* chore(integration): simplify block expiration integration test (#1100)

* chore(integration): simplify block expiration integration test

* clean up

* fix after rebase

* perf: contract storage optimizations (#1094)

* perf: contract storage optimizations

* Apply optimization changes

* Apply optimizing parameters sizing

* Update codex-contracts-eth

* bump latest changes in contracts branch

* Change requestDurationLimit to uint64

* fix tests

* fix tests

---------

Co-authored-by: Arnaud <arnaud@status.im>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>

* bump contracts to master (#1122)

* Add availability enabled parameter

* Return bytes to availability when finished

* Add until parameter

* Clean up and fix tests

* Move until validation to reservations module

* Apply suggestion changes: return the reservation module error

* Apply suggestion changes for until dates

* Apply suggestion changes: reorganize tests

* Fix indent

* Remove test related to timing issue

* Add raises errors to async pragram and remove useless try except

* Update open api documentation

* Fix wording

* Remove the httpClient restart statements

* Use market.getRequestEnd to set validUntil

* Remove returnBytes

* Use clock.now in testing

* Move the api validation file to the right file

---------

Co-authored-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
Arnaud 2025-03-26 12:45:22 +01:00 committed by GitHub
parent 60b6996eb0
commit 7deeb7d2b3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 564 additions and 172 deletions

View File

@ -484,10 +484,19 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
without availability =? (
await reservations.createAvailability(
restAv.totalSize, restAv.duration, restAv.minPricePerBytePerSecond,
restAv.totalSize,
restAv.duration,
restAv.minPricePerBytePerSecond,
restAv.totalCollateral,
enabled = restAv.enabled |? true,
until = restAv.until |? 0,
)
), error:
if error of CancelledError:
raise error
if error of UntilOutOfBoundsError:
return RestApiResponse.error(Http422, error.msg)
return RestApiResponse.error(Http500, error.msg, headers = headers)
return RestApiResponse.response(
@ -524,6 +533,7 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
## tokens) to be matched against the request's pricePerBytePerSecond
## totalCollateral - total collateral (in amount of
## tokens) that can be distributed among matching requests
try:
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Persistence is not enabled")
@ -577,10 +587,21 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
if totalCollateral =? restAv.totalCollateral:
availability.totalCollateral = totalCollateral
if err =? (await reservations.update(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
if until =? restAv.until:
availability.until = until
return RestApiResponse.response(Http200)
if enabled =? restAv.enabled:
availability.enabled = enabled
if err =? (await reservations.update(availability)).errorOption:
if err of CancelledError:
raise err
if err of UntilOutOfBoundsError:
return RestApiResponse.error(Http422, err.msg)
else:
return RestApiResponse.error(Http500, err.msg)
return RestApiResponse.response(Http204)
except CatchableError as exc:
trace "Excepting processing request", exc = exc.msg
return RestApiResponse.error(Http500)

View File

@ -33,6 +33,8 @@ type
minPricePerBytePerSecond* {.serialize.}: UInt256
totalCollateral* {.serialize.}: UInt256
freeSize* {.serialize.}: ?uint64
enabled* {.serialize.}: ?bool
until* {.serialize.}: ?SecondsSince1970
RestSalesAgent* = object
state* {.serialize.}: string

View File

@ -113,7 +113,6 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc cleanUp(
sales: Sales,
agent: SalesAgent,
returnBytes: bool,
reprocessSlot: bool,
returnedCollateral: ?UInt256,
processing: Future[void],
@ -132,7 +131,7 @@ proc cleanUp(
# if reservation for the SalesAgent was not created, then it means
# that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned
if returnBytes and request =? data.request and reservation =? data.reservation:
if request =? data.request and reservation =? data.reservation:
if returnErr =? (
await sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId, reservation.id, request.ask.slotSize
@ -203,9 +202,9 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest)
agent.onCleanUp = proc(
returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
await sales.cleanUp(agent, returnBytes, reprocessSlot, returnedCollateral, done)
await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: uint64) =
sales.filled(request, slotIndex, done)
@ -271,12 +270,12 @@ proc load*(sales: Sales) {.async.} =
newSalesAgent(sales.context, slot.request.id, slot.slotIndex, some slot.request)
agent.onCleanUp = proc(
returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
# since workers are not being dispatched, this future has not been created
# by a worker. Create a dummy one here so we can call sales.cleanUp
let done: Future[void] = nil
await sales.cleanUp(agent, returnBytes, reprocessSlot, returnedCollateral, done)
await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done)
# There is no need to assign agent.onFilled as slots loaded from `mySlots`
# are inherently already filled and so assigning agent.onFilled would be
@ -285,7 +284,9 @@ proc load*(sales: Sales) {.async.} =
agent.start(SaleUnknown())
sales.agents.add agent
proc OnAvailabilitySaved(sales: Sales, availability: Availability) {.async.} =
proc OnAvailabilitySaved(
sales: Sales, availability: Availability
) {.async: (raises: []).} =
## When availabilities are modified or added, the queue should be unpaused if
## it was paused and any slots in the queue should have their `seen` flag
## cleared.
@ -533,8 +534,9 @@ proc startSlotQueue(sales: Sales) =
slotQueue.start()
proc OnAvailabilitySaved(availability: Availability) {.async.} =
await sales.OnAvailabilitySaved(availability)
proc OnAvailabilitySaved(availability: Availability) {.async: (raises: []).} =
if availability.enabled:
await sales.OnAvailabilitySaved(availability)
reservations.OnAvailabilitySaved = OnAvailabilitySaved

View File

@ -35,6 +35,7 @@ import std/sequtils
import std/sugar
import std/typetraits
import std/sequtils
import std/times
import pkg/chronos
import pkg/datastore
import pkg/nimcrypto
@ -70,6 +71,12 @@ type
minPricePerBytePerSecond* {.serialize.}: UInt256
totalCollateral {.serialize.}: UInt256
totalRemainingCollateral* {.serialize.}: UInt256
# If set to false, the availability will not accept new slots.
# If enabled, it will not impact any existing slots that are already being hosted.
enabled* {.serialize.}: bool
# Specifies the latest timestamp after which the availability will no longer host any slots.
# If set to 0, there will be no restrictions.
until* {.serialize.}: SecondsSince1970
Reservation* = ref object
id* {.serialize.}: ReservationId
@ -77,6 +84,7 @@ type
size* {.serialize.}: uint64
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: uint64
validUntil* {.serialize.}: SecondsSince1970
Reservations* = ref object of RootObj
availabilityLock: AsyncLock
@ -84,10 +92,14 @@ type
repo: RepoStore
OnAvailabilitySaved: ?OnAvailabilitySaved
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
IterDispose* = proc(): Future[?!void] {.gcsafe, closure.}
OnAvailabilitySaved* =
proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
GetNext* = proc(): Future[?seq[byte]] {.
upraises: [], gcsafe, async: (raises: [CancelledError]), closure
.}
IterDispose* =
proc(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]), closure.}
OnAvailabilitySaved* = proc(availability: Availability): Future[void] {.
upraises: [], gcsafe, async: (raises: [])
.}
StorableIter* = ref object
finished*: bool
next*: GetNext
@ -102,13 +114,20 @@ type
SerializationError* = object of ReservationsError
UpdateFailedError* = object of ReservationsError
BytesOutOfBoundsError* = object of ReservationsError
UntilOutOfBoundsError* = object of ReservationsError
const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet
proc hash*(x: AvailabilityId): Hash {.borrow.}
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
proc all*(
self: Reservations, T: type SomeStorableObject
): Future[?!seq[T]] {.async: (raises: [CancelledError]).}
proc all*(
self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId
): Future[?!seq[T]] {.async: (raises: [CancelledError]).}
template withLock(lock, body) =
try:
@ -128,6 +147,8 @@ proc init*(
duration: uint64,
minPricePerBytePerSecond: UInt256,
totalCollateral: UInt256,
enabled: bool,
until: SecondsSince1970,
): Availability =
var id: array[32, byte]
doAssert randomBytes(id) == 32
@ -139,6 +160,8 @@ proc init*(
minPricePerBytePerSecond: minPricePerBytePerSecond,
totalCollateral: totalCollateral,
totalRemainingCollateral: totalCollateral,
enabled: enabled,
until: until,
)
func totalCollateral*(self: Availability): UInt256 {.inline.} =
@ -154,6 +177,7 @@ proc init*(
size: uint64,
requestId: RequestId,
slotIndex: uint64,
validUntil: SecondsSince1970,
): Reservation =
var id: array[32, byte]
doAssert randomBytes(id) == 32
@ -163,6 +187,7 @@ proc init*(
size: size,
requestId: requestId,
slotIndex: slotIndex,
validUntil: validUntil,
)
func toArray(id: SomeStorableId): array[32, byte] =
@ -217,11 +242,19 @@ func available*(self: Reservations): uint =
func hasAvailable*(self: Reservations, bytes: uint): bool =
self.repo.available(bytes.NBytes)
proc exists*(self: Reservations, key: Key): Future[bool] {.async.} =
proc exists*(
self: Reservations, key: Key
): Future[bool] {.async: (raises: [CancelledError]).} =
let exists = await self.repo.metaDs.ds.contains(key)
return exists
proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} =
iterator items(self: StorableIter): Future[?seq[byte]] =
while not self.finished:
yield self.next()
proc getImpl(
self: Reservations, key: Key
): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} =
if not await self.exists(key):
let err =
newException(NotExistsError, "object with key " & $key & " does not exist")
@ -234,7 +267,7 @@ proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} =
proc get*(
self: Reservations, key: Key, T: type SomeStorableObject
): Future[?!T] {.async.} =
): Future[?!T] {.async: (raises: [CancelledError]).} =
without serialized =? await self.getImpl(key), error:
return failure(error)
@ -243,7 +276,9 @@ proc get*(
return success obj
proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.async.} =
proc updateImpl(
self: Reservations, obj: SomeStorableObject
): Future[?!void] {.async: (raises: [CancelledError]).} =
trace "updating " & $(obj.type), id = obj.id
without key =? obj.key, error:
@ -256,10 +291,15 @@ proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.a
proc updateAvailability(
self: Reservations, obj: Availability
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
availabilityId = obj.id
if obj.until < 0:
let error =
newException(UntilOutOfBoundsError, "Cannot set until to a negative value")
return failure(error)
without key =? obj.key, error:
return failure(error)
@ -269,21 +309,25 @@ proc updateAvailability(
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if OnAvailabilitySaved =? self.OnAvailabilitySaved:
# when chronos v4 is implemented, and OnAvailabilitySaved is annotated
# with async:(raises:[]), we can remove this try/catch as we know, with
# certainty, that nothing will be raised
try:
await OnAvailabilitySaved(obj)
except CancelledError as e:
raise e
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `OnAvailabilitySaved` can raise because it is caller-defined
warn "Unknown error during 'OnAvailabilitySaved' callback", error = e.msg
await OnAvailabilitySaved(obj)
return res
else:
return failure(err)
if obj.until > 0:
without allReservations =? await self.all(Reservation, obj.id), error:
error.msg = "Error updating reservation: " & error.msg
return failure(error)
let requestEnds = allReservations.mapIt(it.validUntil)
if requestEnds.len > 0 and requestEnds.max > obj.until:
let error = newException(
UntilOutOfBoundsError,
"Until parameter must be greater or equal to the longest currently hosted slot",
)
return failure(error)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
@ -306,26 +350,23 @@ proc updateAvailability(
# inform subscribers that Availability has been modified (with increased
# size)
if OnAvailabilitySaved =? self.OnAvailabilitySaved:
# when chronos v4 is implemented, and OnAvailabilitySaved is annotated
# with async:(raises:[]), we can remove this try/catch as we know, with
# certainty, that nothing will be raised
try:
await OnAvailabilitySaved(obj)
except CancelledError as e:
raise e
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `OnAvailabilitySaved` can raise because it is caller-defined
warn "Unknown error during 'OnAvailabilitySaved' callback", error = e.msg
await OnAvailabilitySaved(obj)
return res
proc update*(self: Reservations, obj: Reservation): Future[?!void] {.async.} =
proc update*(
self: Reservations, obj: Reservation
): Future[?!void] {.async: (raises: [CancelledError]).} =
return await self.updateImpl(obj)
proc update*(self: Reservations, obj: Availability): Future[?!void] {.async.} =
withLock(self.availabilityLock):
return await self.updateAvailability(obj)
proc update*(
self: Reservations, obj: Availability
): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
withLock(self.availabilityLock):
return await self.updateAvailability(obj)
except AsyncLockError as e:
error "Lock error when trying to update the availability", err = e.msg
return failure(e)
proc delete(self: Reservations, key: Key): Future[?!void] {.async.} =
trace "deleting object", key
@ -391,12 +432,20 @@ proc createAvailability*(
duration: uint64,
minPricePerBytePerSecond: UInt256,
totalCollateral: UInt256,
enabled: bool,
until: SecondsSince1970,
): Future[?!Availability] {.async.} =
trace "creating availability",
size, duration, minPricePerBytePerSecond, totalCollateral
size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until
let availability =
Availability.init(size, size, duration, minPricePerBytePerSecond, totalCollateral)
if until < 0:
let error =
newException(UntilOutOfBoundsError, "Cannot set until to a negative value")
return failure(error)
let availability = Availability.init(
size, size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until
)
let bytes = availability.freeSize
if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption:
@ -420,6 +469,7 @@ method createReservation*(
requestId: RequestId,
slotIndex: uint64,
collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?!Reservation] {.async, base.} =
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
@ -436,9 +486,11 @@ method createReservation*(
)
return failure(error)
trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex
trace "Creating reservation",
availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
let reservation =
Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
@ -448,7 +500,7 @@ method createReservation*(
availability.freeSize -= slotSize
# adjust the remaining totalRemainingCollateral
availability.totalRemainingCollateral -= slotSize.stuint(256) * collateralPerByte
availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte
# update availability with reduced size
trace "Updating availability with reduced size"
@ -527,7 +579,7 @@ proc release*(
reservationId: ReservationId,
availabilityId: AvailabilityId,
bytes: uint,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
topics = "release"
bytes
@ -565,13 +617,9 @@ proc release*(
return success()
iterator items(self: StorableIter): Future[?seq[byte]] =
while not self.finished:
yield self.next()
proc storables(
self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey
): Future[?!StorableIter] {.async.} =
): Future[?!StorableIter] {.async: (raises: [CancelledError]).} =
var iter = StorableIter()
let query = Query.init(queryKey)
when T is Availability:
@ -589,7 +637,7 @@ proc storables(
return failure(error)
# /sales/reservations
proc next(): Future[?seq[byte]] {.async.} =
proc next(): Future[?seq[byte]] {.async: (raises: [CancelledError]).} =
await idleAsync()
iter.finished = results.finished
if not results.finished and res =? (await results.next()) and res.data.len > 0 and
@ -598,7 +646,7 @@ proc storables(
return none seq[byte]
proc dispose(): Future[?!void] {.async.} =
proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} =
return await results.dispose()
iter.next = next
@ -607,32 +655,40 @@ proc storables(
proc allImpl(
self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey
): Future[?!seq[T]] {.async.} =
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
var ret: seq[T] = @[]
without storables =? (await self.storables(T, queryKey)), error:
return failure(error)
for storable in storables.items:
without bytes =? (await storable):
continue
try:
without bytes =? (await storable):
continue
without obj =? T.fromJson(bytes), error:
error "json deserialization error",
json = string.fromBytes(bytes), error = error.msg
continue
without obj =? T.fromJson(bytes), error:
error "json deserialization error",
json = string.fromBytes(bytes), error = error.msg
continue
ret.add obj
ret.add obj
except CancelledError as err:
raise err
except CatchableError as err:
error "Error when retrieving storable", error = err.msg
continue
return success(ret)
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} =
proc all*(
self: Reservations, T: type SomeStorableObject
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
return await self.allImpl(T)
proc all*(
self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId
): Future[?!seq[T]] {.async.} =
without key =? (ReservationsKey / $availabilityId):
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
without key =? key(availabilityId):
return failure("no key")
return await self.allImpl(T, key)
@ -641,6 +697,7 @@ proc findAvailability*(
self: Reservations,
size, duration: uint64,
pricePerBytePerSecond, collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?Availability] {.async.} =
without storables =? (await self.storables(Availability)), e:
error "failed to get all storables", error = e.msg
@ -648,11 +705,14 @@ proc findAvailability*(
for item in storables.items:
if bytes =? (await item) and availability =? Availability.fromJson(bytes):
if size <= availability.freeSize and duration <= availability.duration and
if availability.enabled and size <= availability.freeSize and
duration <= availability.duration and
collateralPerByte <= availability.maxCollateralPerByte and
pricePerBytePerSecond >= availability.minPricePerBytePerSecond:
pricePerBytePerSecond >= availability.minPricePerBytePerSecond and
(availability.until == 0 or availability.until >= validUntil):
trace "availability matched",
id = availability.id,
enabled = availability.enabled,
size,
availFreeSize = availability.freeSize,
duration,
@ -660,7 +720,8 @@ proc findAvailability*(
pricePerBytePerSecond,
availMinPricePerBytePerSecond = availability.minPricePerBytePerSecond,
collateralPerByte,
availMaxCollateralPerByte = availability.maxCollateralPerByte
availMaxCollateralPerByte = availability.maxCollateralPerByte,
until = availability.until
# TODO: As soon as we're on ARC-ORC, we can use destructors
# to automatically dispose our iterators when they fall out of scope.
@ -672,6 +733,7 @@ proc findAvailability*(
trace "availability did not match",
id = availability.id,
enabled = availability.enabled,
size,
availFreeSize = availability.freeSize,
duration,
@ -679,4 +741,5 @@ proc findAvailability*(
pricePerBytePerSecond,
availMinPricePerBytePerSecond = availability.minPricePerBytePerSecond,
collateralPerByte,
availMaxCollateralPerByte = availability.maxCollateralPerByte
availMaxCollateralPerByte = availability.maxCollateralPerByte,
until = availability.until

View File

@ -27,7 +27,7 @@ type
onFilled*: ?OnFilled
OnCleanUp* = proc(
returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none
reprocessSlot = false, returnedCollateral = UInt256.none
): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}

View File

@ -34,9 +34,7 @@ method run*(
if onCleanUp =? agent.onCleanUp:
await onCleanUp(
returnBytes = true,
reprocessSlot = false,
returnedCollateral = some currentCollateral,
reprocessSlot = false, returnedCollateral = some currentCollateral
)
warn "Sale cancelled due to timeout",

View File

@ -34,7 +34,7 @@ method run*(
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot)
await onCleanUp(reprocessSlot = state.reprocessSlot)
except CancelledError as e:
trace "SaleErrored.run was cancelled", error = e.msgDetail
except CatchableError as e:

View File

@ -50,7 +50,7 @@ method run*(
await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral)
except SlotStateMismatchError as e:
debug "Slot is already filled, ignoring slot"
return some State(SaleIgnored(reprocessSlot: false, returnBytes: true))
return some State(SaleIgnored(reprocessSlot: false))
except MarketError as e:
return some State(SaleErrored(error: e))
# other CatchableErrors are handled "automatically" by the SaleState

View File

@ -36,6 +36,9 @@ method run*(
requestId = data.requestId, slotIndex = data.slotIndex
try:
if onClear =? agent.context.onClear:
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnedCollateral = state.returnedCollateral)
except CancelledError as e:

View File

@ -14,7 +14,6 @@ logScope:
type SaleIgnored* = ref object of SaleState
reprocessSlot*: bool # readd slot to queue with `seen` flag
returnBytes*: bool # return unreleased bytes from Reservation to Availability
method `$`*(state: SaleIgnored): string =
"SaleIgnored"
@ -26,9 +25,7 @@ method run*(
try:
if onCleanUp =? agent.onCleanUp:
await onCleanUp(
reprocessSlot = state.reprocessSlot, returnBytes = state.returnBytes
)
await onCleanUp(reprocessSlot = state.reprocessSlot)
except CancelledError as e:
trace "SaleIgnored.run was cancelled", error = e.msgDetail
except CatchableError as e:

View File

@ -56,7 +56,7 @@ method run*(
let slotId = slotId(data.requestId, data.slotIndex)
let state = await market.slotState(slotId)
if state != SlotState.Free and state != SlotState.Repair:
return some State(SaleIgnored(reprocessSlot: false, returnBytes: false))
return some State(SaleIgnored(reprocessSlot: false))
# TODO: Once implemented, check to ensure the host is allowed to fill the slot,
# due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal)
@ -68,10 +68,12 @@ method run*(
pricePerBytePerSecond = request.ask.pricePerBytePerSecond
collateralPerByte = request.ask.collateralPerByte
let requestEnd = await market.getRequestEnd(data.requestId)
without availability =?
await reservations.findAvailability(
request.ask.slotSize, request.ask.duration, request.ask.pricePerBytePerSecond,
request.ask.collateralPerByte,
request.ask.collateralPerByte, requestEnd,
):
debug "No availability found for request, ignoring"
@ -82,7 +84,7 @@ method run*(
without reservation =?
await reservations.createReservation(
availability.id, request.ask.slotSize, request.id, data.slotIndex,
request.ask.collateralPerByte,
request.ask.collateralPerByte, requestEnd,
), error:
trace "Creation of reservation failed"
# Race condition:

View File

@ -46,7 +46,7 @@ method run*(
await market.reserveSlot(data.requestId, data.slotIndex)
except SlotReservationNotAllowedError as e:
debug "Slot cannot be reserved, ignoring", error = e.msg
return some State(SaleIgnored(reprocessSlot: false, returnBytes: true))
return some State(SaleIgnored(reprocessSlot: false))
except MarketError as e:
return some State(SaleErrored(error: e))
# other CatchableErrors are handled "automatically" by the SaleState
@ -57,7 +57,7 @@ method run*(
# do not re-add this slot to the queue, and return bytes from Reservation to
# the Availability
debug "Slot cannot be reserved, ignoring"
return some State(SaleIgnored(reprocessSlot: false, returnBytes: true))
return some State(SaleIgnored(reprocessSlot: false))
except CancelledError as e:
trace "SaleSlotReserving.run was cancelled", error = e.msgDetail
except CatchableError as e:

View File

@ -105,7 +105,7 @@ proc updateQuotaUsage*(
minusUsed: NBytes = 0.NBytes,
plusReserved: NBytes = 0.NBytes,
minusReserved: NBytes = 0.NBytes,
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
await self.metaDs.modify(
QuotaUsedKey,
proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} =

View File

@ -380,7 +380,9 @@ method close*(self: RepoStore): Future[void] {.async.} =
# RepoStore procs
###########################################################
proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
proc reserve*(
self: RepoStore, bytes: NBytes
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Reserve bytes
##
@ -388,7 +390,9 @@ proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
await self.updateQuotaUsage(plusReserved = bytes)
proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} =
proc release*(
self: RepoStore, bytes: NBytes
): Future[?!void] {.async: (raises: [CancelledError]).} =
## Release bytes
##

View File

@ -163,6 +163,14 @@ components:
totalCollateral:
type: string
description: Total collateral (in amount of tokens) that can be used for matching requests
enabled:
type: boolean
description: Enable the ability to receive sales on this availability.
default: true
until:
type: integer
description: Specifies the latest timestamp, after which the availability will no longer host any slots. If set to 0, there will be no restrictions.
default: 0
SalesAvailabilityREAD:
allOf:
@ -239,6 +247,9 @@ components:
slotIndex:
type: string
description: Slot Index as decimal string
validUntil:
type: integer
description: Timestamp after which the reservation will no longer be valid.
StorageRequestCreation:
type: object
@ -704,7 +715,7 @@ paths:
"400":
description: Invalid data input
"422":
description: The provided parameters did not pass validation
description: Not enough node's storage quota available or the provided parameters did not pass validation
"500":
description: Error reserving availability
"503":

View File

@ -75,6 +75,8 @@ proc example*(
duration = uint16.example.uint64,
minPricePerBytePerSecond = uint8.example.u256,
totalCollateral = totalSize.u256 * collateralPerByte,
enabled = true,
until = 0.SecondsSince1970,
)
proc example*(_: type Reservation): Reservation =

View File

@ -2,6 +2,7 @@ import pkg/chronos
import pkg/codex/sales
import pkg/codex/stores
import pkg/questionable/results
import pkg/codex/clock
type MockReservations* = ref object of Reservations
createReservationThrowBytesOutOfBoundsError: bool
@ -28,6 +29,7 @@ method createReservation*(
requestId: RequestId,
slotIndex: uint64,
collateralPerByte: UInt256,
validUntil: SecondsSince1970,
): Future[?!Reservation] {.async.} =
if self.createReservationThrowBytesOutOfBoundsError:
let error = newException(
@ -45,4 +47,5 @@ method createReservation*(
requestId,
slotIndex,
collateralPerByte,
validUntil,
)

View File

@ -22,16 +22,14 @@ asyncchecksuite "sales state 'cancelled'":
var market: MockMarket
var state: SaleCancelled
var agent: SalesAgent
var returnBytesWas = bool.none
var reprocessSlotWas = bool.none
var returnedCollateralValue = UInt256.none
setup:
market = MockMarket.new()
let onCleanUp = proc(
returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
returnBytesWas = some returnBytes
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral
@ -40,7 +38,7 @@ asyncchecksuite "sales state 'cancelled'":
agent.onCleanUp = onCleanUp
state = SaleCancelled.new()
test "calls onCleanUp with returnBytes = false, reprocessSlot = true, and returnedCollateral = currentCollateral":
test "calls onCleanUp with reprocessSlot = true, and returnedCollateral = currentCollateral":
market.fillSlot(
requestId = request.id,
slotIndex = slotIndex,
@ -49,6 +47,5 @@ asyncchecksuite "sales state 'cancelled'":
collateral = currentCollateral,
)
discard await state.run(agent)
check eventually returnBytesWas == some true
check eventually reprocessSlotWas == some false
check eventually returnedCollateralValue == some currentCollateral

View File

@ -20,14 +20,12 @@ asyncchecksuite "sales state 'errored'":
var state: SaleErrored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc(
returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(market: market, clock: clock)
@ -35,8 +33,7 @@ asyncchecksuite "sales state 'errored'":
agent.onCleanUp = onCleanUp
state = SaleErrored(error: newException(ValueError, "oh no!"))
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
test "calls onCleanUp with reprocessSlot = true":
state = SaleErrored(error: newException(ValueError, "oh no!"), reprocessSlot: true)
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == true

View File

@ -47,7 +47,6 @@ suite "sales state 'filling'":
let next = !(await state.run(agent))
check next of SaleIgnored
check SaleIgnored(next).reprocessSlot == false
check SaleIgnored(next).returnBytes
test "run switches to errored with other error ":
let error = newException(MarketError, "some error")

View File

@ -23,22 +23,23 @@ asyncchecksuite "sales state 'finished'":
var market: MockMarket
var state: SaleFinished
var agent: SalesAgent
var returnBytesWas = bool.none
var reprocessSlotWas = bool.none
var returnedCollateralValue = UInt256.none
var saleCleared = bool.none
setup:
market = MockMarket.new()
let onCleanUp = proc(
returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
returnBytesWas = some returnBytes
reprocessSlotWas = some reprocessSlot
returnedCollateralValue = returnedCollateral
let context = SalesContext(market: market, clock: clock)
agent = newSalesAgent(context, request.id, slotIndex, request.some)
agent.onCleanUp = onCleanUp
agent.context.onClear = some proc(request: StorageRequest, idx: uint64) =
saleCleared = some true
state = SaleFinished(returnedCollateral: some currentCollateral)
test "switches to cancelled state when request expires":
@ -49,8 +50,8 @@ asyncchecksuite "sales state 'finished'":
let next = state.onFailed(request)
check !next of SaleFailed
test "calls onCleanUp with returnBytes = false, reprocessSlot = true, and returnedCollateral = currentCollateral":
test "calls onCleanUp with reprocessSlot = true, and returnedCollateral = currentCollateral":
discard await state.run(agent)
check eventually returnBytesWas == some false
check eventually reprocessSlotWas == some false
check eventually returnedCollateralValue == some currentCollateral
check eventually saleCleared == some true

View File

@ -20,14 +20,12 @@ asyncchecksuite "sales state 'ignored'":
var state: SaleIgnored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc(
returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none
reprocessSlot = false, returnedCollateral = UInt256.none
) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(market: market, clock: clock)
@ -36,7 +34,6 @@ asyncchecksuite "sales state 'ignored'":
state = SaleIgnored.new()
test "calls onCleanUp with values assigned to SaleIgnored":
state = SaleIgnored(reprocessSlot: true, returnBytes: true)
state = SaleIgnored(reprocessSlot: true)
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == true

View File

@ -13,6 +13,7 @@ import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/sales/reservations
import pkg/codex/stores/repostore
import times
import ../../../asynctest
import ../../helpers
import ../../examples
@ -39,6 +40,8 @@ asyncchecksuite "sales state 'preparing'":
duration = request.ask.duration + 60.uint64,
minPricePerBytePerSecond = request.ask.pricePerBytePerSecond,
totalCollateral = request.ask.collateralPerSlot * request.ask.slots.u256,
enabled = true,
until = 0.SecondsSince1970,
)
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
@ -52,6 +55,8 @@ asyncchecksuite "sales state 'preparing'":
context.reservations = reservations
agent = newSalesAgent(context, request.id, slotIndex, request.some)
market.requestEnds[request.id] = clock.now() + cast[int64](request.ask.duration)
teardown:
await repo.stop()
@ -67,10 +72,14 @@ asyncchecksuite "sales state 'preparing'":
let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled
proc createAvailability() {.async.} =
proc createAvailability(enabled = true) {.async.} =
let a = await reservations.createAvailability(
availability.totalSize, availability.duration,
availability.minPricePerBytePerSecond, availability.totalCollateral,
availability.totalSize,
availability.duration,
availability.minPricePerBytePerSecond,
availability.totalCollateral,
enabled,
until = 0.SecondsSince1970,
)
availability = a.get
@ -79,7 +88,11 @@ asyncchecksuite "sales state 'preparing'":
check next of SaleIgnored
let ignored = SaleIgnored(next)
check ignored.reprocessSlot
check ignored.returnBytes == false
test "run switches to ignored when availability is not enabled":
await createAvailability(enabled = false)
let next = !(await state.run(agent))
check next of SaleIgnored
test "run switches to slot reserving state after reservation created":
await createAvailability()
@ -94,7 +107,6 @@ asyncchecksuite "sales state 'preparing'":
check next of SaleIgnored
let ignored = SaleIgnored(next)
check ignored.reprocessSlot
check ignored.returnBytes == false
test "run switches to errored when reserve fails with other error":
await createAvailability()

View File

@ -67,4 +67,3 @@ asyncchecksuite "sales state 'SlotReserving'":
let next = !(await state.run(agent))
check next of SaleIgnored
check SaleIgnored(next).reprocessSlot == false
check SaleIgnored(next).returnBytes

View File

@ -1,5 +1,5 @@
import std/random
import std/times
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
@ -8,6 +8,7 @@ import pkg/datastore
import pkg/codex/stores
import pkg/codex/errors
import pkg/codex/sales
import pkg/codex/clock
import pkg/codex/utils/json
import ../../asynctest
@ -39,19 +40,22 @@ asyncchecksuite "Reservations module":
await repoTmp.destroyDb()
await metaTmp.destroyDb()
proc createAvailability(): Availability =
proc createAvailability(enabled = true, until = 0.SecondsSince1970): Availability =
let example = Availability.example(collateralPerByte)
let totalSize = rand(100000 .. 200000).uint64
let totalCollateral = totalSize.u256 * collateralPerByte
let availability = waitFor reservations.createAvailability(
totalSize, example.duration, example.minPricePerBytePerSecond, totalCollateral
totalSize, example.duration, example.minPricePerBytePerSecond, totalCollateral,
enabled, until,
)
return availability.get
proc createReservation(availability: Availability): Reservation =
let size = rand(1 ..< availability.freeSize.int)
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let reservation = waitFor reservations.createReservation(
availability.id, size.uint64, RequestId.example, uint64.example, 1.u256
availability.id, size.uint64, RequestId.example, uint64.example, 1.u256,
validUntil,
)
return reservation.get
@ -64,8 +68,12 @@ asyncchecksuite "Reservations module":
check (await reservations.all(Availability)).get.len == 0
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256)
let availability2 = Availability.init(1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256)
let availability1 = Availability.init(
1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256, true, 0.SecondsSince1970
)
let availability2 = Availability.init(
1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256, true, 0.SecondsSince1970
)
check availability1.id != availability2.id
test "can reserve available storage":
@ -128,20 +136,24 @@ asyncchecksuite "Reservations module":
test "cannot create reservation with non-existant availability":
let availability = Availability.example
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let created = await reservations.createReservation(
availability.id, uint64.example, RequestId.example, uint64.example, 1.u256
availability.id, uint64.example, RequestId.example, uint64.example, 1.u256,
validUntil,
)
check created.isErr
check created.error of NotExistsError
test "cannot create reservation larger than availability size":
let availability = createAvailability()
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let created = await reservations.createReservation(
availability.id,
availability.totalSize + 1,
RequestId.example,
uint64.example,
UInt256.example,
validUntil,
)
check created.isErr
check created.error of BytesOutOfBoundsError
@ -149,23 +161,26 @@ asyncchecksuite "Reservations module":
test "cannot create reservation larger than availability size - concurrency test":
proc concurrencyTest(): Future[void] {.async.} =
let availability = createAvailability()
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let one = reservations.createReservation(
availability.id,
availability.totalSize - 1,
RequestId.example,
uint64.example,
UInt256.example,
validUntil,
)
let two = reservations.createReservation(
availability.id, availability.totalSize, RequestId.example, uint64.example,
UInt256.example,
UInt256.example, validUntil,
)
let oneResult = await one
let twoResult = await two
check oneResult.isErr or twoResult.isErr
if oneResult.isErr:
check oneResult.error of BytesOutOfBoundsError
if twoResult.isErr:
@ -259,6 +274,48 @@ asyncchecksuite "Reservations module":
check isOk await reservations.update(availability)
check (repo.quotaReservedBytes - origQuota) == 100.NBytes
test "create availability set enabled to true by default":
let availability = createAvailability()
check availability.enabled == true
test "create availability set until to 0 by default":
let availability = createAvailability()
check availability.until == 0.SecondsSince1970
test "create availability whith correct values":
var until = getTime().toUnix()
let availability = createAvailability(enabled = false, until = until)
check availability.enabled == false
check availability.until == until
test "create an availability fails when trying set until with a negative value":
let totalSize = rand(100000 .. 200000).uint64
let example = Availability.example(collateralPerByte)
let totalCollateral = totalSize.u256 * collateralPerByte
let result = await reservations.createAvailability(
totalSize,
example.duration,
example.minPricePerBytePerSecond,
totalCollateral,
enabled = true,
until = -1.SecondsSince1970,
)
check result.isErr
check result.error of UntilOutOfBoundsError
test "update an availability fails when trying set until with a negative value":
let until = getTime().toUnix()
let availability = createAvailability(until = until)
availability.until = -1
let result = await reservations.update(availability)
check result.isErr
check result.error of UntilOutOfBoundsError
test "reservation can be partially released":
let availability = createAvailability()
let reservation = createReservation(availability)
@ -285,7 +342,9 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved called when availability is created":
var added: Availability
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(
a: Availability
) {.gcsafe, async: (raises: []).} =
added = a
let availability = createAvailability()
@ -295,7 +354,9 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved called when availability size is increased":
var availability = createAvailability()
var added: Availability
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(
a: Availability
) {.gcsafe, async: (raises: []).} =
added = a
availability.freeSize += 1
discard await reservations.update(availability)
@ -305,7 +366,21 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved is not called when availability size is decreased":
var availability = createAvailability()
var called = false
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(
a: Availability
) {.gcsafe, async: (raises: []).} =
called = true
availability.freeSize -= 1.uint64
discard await reservations.update(availability)
check not called
test "OnAvailabilitySaved is not called when availability is disabled":
var availability = createAvailability(enabled = false)
var called = false
reservations.OnAvailabilitySaved = proc(
a: Availability
) {.gcsafe, async: (raises: []).} =
called = true
availability.freeSize -= 1
discard await reservations.update(availability)
@ -315,7 +390,7 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved called when availability duration is increased":
var availability = createAvailability()
var added: Availability
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
added = a
availability.duration += 1
discard await reservations.update(availability)
@ -325,7 +400,7 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved is not called when availability duration is decreased":
var availability = createAvailability()
var called = false
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
called = true
availability.duration -= 1
discard await reservations.update(availability)
@ -335,7 +410,7 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved called when availability minPricePerBytePerSecond is increased":
var availability = createAvailability()
var added: Availability
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
added = a
availability.minPricePerBytePerSecond += 1.u256
discard await reservations.update(availability)
@ -345,7 +420,7 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved is not called when availability minPricePerBytePerSecond is decreased":
var availability = createAvailability()
var called = false
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
called = true
availability.minPricePerBytePerSecond -= 1.u256
discard await reservations.update(availability)
@ -355,7 +430,7 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved called when availability totalCollateral is increased":
var availability = createAvailability()
var added: Availability
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
added = a
availability.totalCollateral = availability.totalCollateral + 1.u256
discard await reservations.update(availability)
@ -365,7 +440,7 @@ asyncchecksuite "Reservations module":
test "OnAvailabilitySaved is not called when availability totalCollateral is decreased":
var availability = createAvailability()
var called = false
reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} =
reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} =
called = true
availability.totalCollateral = availability.totalCollateral - 1.u256
discard await reservations.update(availability)
@ -374,32 +449,69 @@ asyncchecksuite "Reservations module":
test "availabilities can be found":
let availability = createAvailability()
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let found = await reservations.findAvailability(
availability.freeSize, availability.duration,
availability.minPricePerBytePerSecond, collateralPerByte,
availability.minPricePerBytePerSecond, collateralPerByte, validUntil,
)
check found.isSome
check found.get == availability
test "does not find an availability when is it disabled":
let availability = createAvailability(enabled = false)
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let found = await reservations.findAvailability(
availability.freeSize, availability.duration,
availability.minPricePerBytePerSecond, collateralPerByte, validUntil,
)
check found.isNone
test "finds an availability when the until date is after the duration":
let example = Availability.example(collateralPerByte)
let until = getTime().toUnix() + example.duration.SecondsSince1970
let availability = createAvailability(until = until)
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let found = await reservations.findAvailability(
availability.freeSize, availability.duration,
availability.minPricePerBytePerSecond, collateralPerByte, validUntil,
)
check found.isSome
check found.get == availability
test "does not find an availability when the until date is before the duration":
let example = Availability.example(collateralPerByte)
let until = getTime().toUnix() + 1.SecondsSince1970
let availability = createAvailability(until = until)
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let found = await reservations.findAvailability(
availability.freeSize, availability.duration,
availability.minPricePerBytePerSecond, collateralPerByte, validUntil,
)
check found.isNone
test "non-matching availabilities are not found":
let availability = createAvailability()
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let found = await reservations.findAvailability(
availability.freeSize + 1,
availability.duration,
availability.minPricePerBytePerSecond,
collateralPerByte,
validUntil,
)
check found.isNone
test "non-existent availability cannot be found":
let availability = Availability.example
let validUntil = getTime().toUnix() + 30.SecondsSince1970
let found = await reservations.findAvailability(
availability.freeSize, availability.duration,
availability.minPricePerBytePerSecond, collateralPerByte,
availability.minPricePerBytePerSecond, collateralPerByte, validUntil,
)
check found.isNone
@ -420,7 +532,12 @@ asyncchecksuite "Reservations module":
test "fails to create availability with size that is larger than available quota":
let created = await reservations.createAvailability(
DefaultQuotaBytes.uint64 + 1, uint64.example, UInt256.example, UInt256.example
DefaultQuotaBytes.uint64 + 1,
uint64.example,
UInt256.example,
UInt256.example,
enabled = true,
until = 0.SecondsSince1970,
)
check created.isErr
check created.error of ReserveFailedError

View File

@ -14,6 +14,7 @@ import pkg/codex/stores/repostore
import pkg/codex/blocktype as bt
import pkg/codex/node
import pkg/codex/utils/asyncstatemachine
import times
import ../../asynctest
import ../helpers
import ../helpers/mockmarket
@ -152,6 +153,8 @@ asyncchecksuite "Sales":
duration = 60.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
enabled = true,
until = 0.SecondsSince1970,
)
request = StorageRequest(
ask: StorageAsk(
@ -221,10 +224,11 @@ asyncchecksuite "Sales":
let key = availability.id.key.get
(waitFor reservations.get(key, Availability)).get
proc createAvailability() =
proc createAvailability(enabled = true, until = 0.SecondsSince1970) =
let a = waitFor reservations.createAvailability(
availability.totalSize, availability.duration,
availability.minPricePerBytePerSecond, availability.totalCollateral,
availability.minPricePerBytePerSecond, availability.totalCollateral, enabled,
until,
)
availability = a.get # update id
@ -380,14 +384,14 @@ asyncchecksuite "Sales":
check eventually getAvailability().freeSize ==
availability.freeSize - request.ask.slotSize
test "non-downloaded bytes are returned to availability once finished":
test "bytes are returned to availability once finished":
var slotIndex = 0.uint64
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
slotIndex = slot
let blk = bt.Block.new(@[1.byte]).get
await onBatch(@[blk])
await onBatch(blk.repeat(request.ask.slotSize))
let sold = newFuture[void]()
sales.onSale = proc(request: StorageRequest, slotIndex: uint64) =
@ -403,7 +407,7 @@ asyncchecksuite "Sales":
market.slotState[request.slotId(slotIndex)] = SlotState.Finished
clock.advance(request.ask.duration.int64)
check eventually getAvailability().freeSize == origSize - 1
check eventually getAvailability().freeSize == origSize
test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1
@ -439,6 +443,34 @@ asyncchecksuite "Sales":
market.slotState[request.slotId(3.uint64)] = SlotState.Filled
check wasIgnored()
test "ignores request when availability is not enabled":
createAvailability(enabled = false)
await market.requestStorage(request)
check wasIgnored()
test "ignores request when availability until terminates before the duration":
let until = getTime().toUnix()
createAvailability(until = until)
await market.requestStorage(request)
check wasIgnored()
test "retrieves request when availability until terminates after the duration":
let requestEnd = getTime().toUnix() + cast[int64](request.ask.duration)
let until = requestEnd + 1
createAvailability(until = until)
var storingRequest: StorageRequest
sales.onStore = proc(
request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false
): Future[?!void] {.async.} =
storingRequest = request
return success()
market.requestEnds[request.id] = requestEnd
await market.requestStorage(request)
check eventually storingRequest == request
test "retrieves and stores data locally":
var storingRequest: StorageRequest
var storingSlot: uint64
@ -563,6 +595,8 @@ asyncchecksuite "Sales":
# by other slots
request.ask.slots = 1
market.requestExpiry[request.id] = expiry
market.requestEnds[request.id] =
getTime().toUnix() + cast[int64](request.ask.duration)
let origSize = availability.freeSize
sales.onStore = proc(
@ -621,10 +655,28 @@ asyncchecksuite "Sales":
test "deletes inactive reservations on load":
createAvailability()
let validUntil = getTime().toUnix() + 30.SecondsSince1970
discard await reservations.createReservation(
availability.id, 100.uint64, RequestId.example, 0.uint64, UInt256.example
availability.id, 100.uint64, RequestId.example, 0.uint64, UInt256.example,
validUntil,
)
check (await reservations.all(Reservation)).get.len == 1
await sales.load()
check (await reservations.all(Reservation)).get.len == 0
check getAvailability().freeSize == availability.freeSize # was restored
test "update an availability fails when trying change the until date before an existing reservation":
let until = getTime().toUnix() + 300.SecondsSince1970
createAvailability(until = until)
market.requestEnds[request.id] =
getTime().toUnix() + cast[int64](request.ask.duration)
await market.requestStorage(request)
await allowRequestToStart()
availability.until = getTime().toUnix()
let result = await reservations.update(availability)
check result.isErr
check result.error of UntilOutOfBoundsError

View File

@ -294,6 +294,8 @@ proc postAvailabilityRaw*(
client: CodexClient,
totalSize, duration: uint64,
minPricePerBytePerSecond, totalCollateral: UInt256,
enabled: ?bool = bool.none,
until: ?SecondsSince1970 = SecondsSince1970.none,
): Future[HttpClientResponseRef] {.async: (raises: [CancelledError, HttpError]).} =
## Post sales availability endpoint
##
@ -304,18 +306,27 @@ proc postAvailabilityRaw*(
"duration": duration,
"minPricePerBytePerSecond": minPricePerBytePerSecond,
"totalCollateral": totalCollateral,
"enabled": enabled,
"until": until,
}
return await client.post(url, $json)
proc postAvailability*(
client: CodexClient,
totalSize, duration: uint64,
minPricePerBytePerSecond, totalCollateral: UInt256,
enabled: ?bool = bool.none,
until: ?SecondsSince1970 = SecondsSince1970.none,
): Future[?!Availability] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.postAvailabilityRaw(
totalSize, duration, minPricePerBytePerSecond, totalCollateral
totalSize = totalSize,
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
enabled = enabled,
until = until,
)
let body = await response.body
doAssert response.status == 201,
@ -327,6 +338,8 @@ proc patchAvailabilityRaw*(
availabilityId: AvailabilityId,
totalSize, freeSize, duration: ?uint64 = uint64.none,
minPricePerBytePerSecond, totalCollateral: ?UInt256 = UInt256.none,
enabled: ?bool = bool.none,
until: ?SecondsSince1970 = SecondsSince1970.none,
): Future[HttpClientResponseRef] {.
async: (raw: true, raises: [CancelledError, HttpError])
.} =
@ -352,6 +365,12 @@ proc patchAvailabilityRaw*(
if totalCollateral =? totalCollateral:
json["totalCollateral"] = %totalCollateral
if enabled =? enabled:
json["enabled"] = %enabled
if until =? until:
json["until"] = %until
client.patch(url, $json)
proc patchAvailability*(
@ -359,6 +378,8 @@ proc patchAvailability*(
availabilityId: AvailabilityId,
totalSize, duration: ?uint64 = uint64.none,
minPricePerBytePerSecond, totalCollateral: ?UInt256 = UInt256.none,
enabled: ?bool = bool.none,
until: ?SecondsSince1970 = SecondsSince1970.none,
): Future[void] {.async: (raises: [CancelledError, HttpError]).} =
let response = await client.patchAvailabilityRaw(
availabilityId,
@ -366,8 +387,10 @@ proc patchAvailability*(
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
enabled = enabled,
until = until,
)
doAssert response.status == 200, "expected 200 OK, got " & $response.status
doAssert response.status == 204, "expected No Content, got " & $response.status
proc getAvailabilities*(
client: CodexClient

View File

@ -1,3 +1,5 @@
import std/times
import std/httpclient
import ../examples
import ../contracts/time
import ../contracts/deployment

View File

@ -275,7 +275,9 @@ marketplacesuite "Simulate invalid proofs":
# totalSize=slotSize, # should match 1 slot only
# duration=totalPeriods.periods.u256,
# minPricePerBytePerSecond=minPricePerBytePerSecond,
# totalCollateral=slotSize * minPricePerBytePerSecond
# totalCollateral=slotSize * minPricePerBytePerSecond,
# enabled = true.some,
# until = 0.SecondsSince1970.some,
# )
# let cid = client0.upload(data).get

View File

@ -35,6 +35,7 @@ twonodessuite "REST API":
duration = 2.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
enabled = true.some,
)
).get
let space = (await client1.space()).tryGet()

View File

@ -364,5 +364,21 @@ asyncchecksuite "Rest API validation":
check responseBefore.status == 422
check (await responseBefore.body) == "Collateral per byte must be greater than zero"
test "creating availability fails when until is negative":
let totalSize = 12.uint64
let minPricePerBytePerSecond = 1.u256
let totalCollateral = totalSize.u256 * minPricePerBytePerSecond
let response = await client.postAvailabilityRaw(
totalSize = totalSize,
duration = 2.uint64,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = totalCollateral,
until = -1.SecondsSince1970.some,
)
check:
response.status == 422
(await response.body) == "Cannot set until to a negative value"
waitFor node.stop()
node.removeDataDir()

View File

@ -1,4 +1,5 @@
import std/httpclient
import std/times
import pkg/codex/contracts
from pkg/codex/stores/repostore/types import DefaultQuotaBytes
import ./twonodes
@ -17,22 +18,14 @@ proc findItem[T](items: seq[T], item: T): ?!T =
multinodesuite "Sales":
let salesConfig = NodeConfigs(
clients: CodexConfigs
.init(nodes = 1)
.withLogFile()
.withLogTopics(
"node", "marketplace", "sales", "reservations", "node", "proving", "clock"
).some,
providers: CodexConfigs
.init(nodes = 1)
.withLogFile()
.withLogTopics(
"node", "marketplace", "sales", "reservations", "node", "proving", "clock"
).some,
clients: CodexConfigs.init(nodes = 1).some,
providers: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock")
.some,
)
let minPricePerBytePerSecond = 1.u256
var host: CodexClient
var client: CodexClient
@ -80,11 +73,15 @@ multinodesuite "Sales":
)
).get
var until = getTime().toUnix()
await host.patchAvailability(
availability.id,
duration = 100.uint64.some,
minPricePerBytePerSecond = 2.u256.some,
totalCollateral = 200.u256.some,
enabled = false.some,
until = until.some,
)
let updatedAvailability =
@ -94,6 +91,8 @@ multinodesuite "Sales":
check updatedAvailability.totalCollateral == 200
check updatedAvailability.totalSize == 140000.uint64
check updatedAvailability.freeSize == 140000.uint64
check updatedAvailability.enabled == false
check updatedAvailability.until == until
test "updating availability - updating totalSize", salesConfig:
let availability = (
@ -105,6 +104,7 @@ multinodesuite "Sales":
)
).get
await host.patchAvailability(availability.id, totalSize = 100000.uint64.some)
let updatedAvailability =
((await host.getAvailabilities()).get).findItem(availability).get
check updatedAvailability.totalSize == 100000
@ -165,3 +165,72 @@ multinodesuite "Sales":
((await host.getAvailabilities()).get).findItem(availability).get
check newUpdatedAvailability.totalSize == originalSize + 20000
check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000
test "updating availability fails with until negative", salesConfig:
let availability = (
await host.postAvailability(
totalSize = 140000.uint64,
duration = 200.uint64,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
)
).get
let response =
await host.patchAvailabilityRaw(availability.id, until = -1.SecondsSince1970.some)
check:
(await response.body) == "Cannot set until to a negative value"
test "returns an error when trying to update the until date before an existing a request is finished",
salesConfig:
let size = 0xFFFFFF.uint64
let data = await RandomChunker.example(blocks = 8)
let duration = 20 * 60.uint64
let minPricePerBytePerSecond = 3.u256
let collateralPerByte = 1.u256
let ecNodes = 3.uint
let ecTolerance = 1.uint
# host makes storage available
let availability = (
await host.postAvailability(
totalSize = size,
duration = duration,
minPricePerBytePerSecond = minPricePerBytePerSecond,
totalCollateral = size.u256 * minPricePerBytePerSecond,
)
).get
# client requests storage
let cid = (await client.upload(data)).get
let id = (
await client.requestStorage(
cid,
duration = duration,
pricePerBytePerSecond = minPricePerBytePerSecond,
proofProbability = 3.u256,
expiry = 10 * 60.uint64,
collateralPerByte = collateralPerByte,
nodes = ecNodes,
tolerance = ecTolerance,
)
).get
check eventually(
await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000
)
let purchase = (await client.getPurchase(id)).get
check purchase.error == none string
let unixNow = getTime().toUnix()
let until = unixNow + 1.SecondsSince1970
let response = await host.patchAvailabilityRaw(
availabilityId = availability.id, until = until.some
)
check:
response.status == 422
(await response.body) ==
"Until parameter must be greater or equal to the longest currently hosted slot"

@ -1 +1 @@
Subproject commit d67860add63fd23cdacde1d3da8f4739c2660c2d
Subproject commit 5778e373fa97286f389e0aef61f1e8f30a934dab