[marketplace] Availability improvements (#535)

## Problem
When Availabilities are created, the amount of bytes in the Availability are reserved in the repo, so those bytes on disk cannot be written to otherwise. When a request for storage is received by a node, if a previously created Availability is matched, an attempt will be made to fill a slot in the request (more accurately, the request's slots are added to the SlotQueue, and eventually those slots will be processed). During download, bytes that were reserved for the Availability were released (as they were written to disk). To prevent more bytes from being released than were reserved in the Availability, the Availability was marked as used during the download, so that no other requests would match the Availability, and therefore no new downloads (and byte releases) would begin. The unfortunate downside to this, is that the number of Availabilities a node has determines the download concurrency capacity. If, for example, a node creates a single Availability that covers all available disk space the operator is willing to use, that single Availability would mean that only one download could occur at a time, meaning the node could potentially miss out on storage opportunities.

## Solution
To alleviate the concurrency issue, each time a slot is processed, a Reservation is created, which takes size (aka reserved bytes) away from the Availability and stores them in the Reservation object. This can be done as many times as needed as long as there are enough bytes remaining in the Availability. Therefore, concurrent downloads are no longer limited by the number of Availabilities. Instead, they would more likely be limited to the SlotQueue's `maxWorkers`.

From a database design perspective, an Availability has zero or more Reservations.

Reservations are persisted in the RepoStore's metadata, along with Availabilities. The metadata store key path for Reservations is ` meta / sales / reservations / <availabilityId> / <reservationId>`, while Availabilities are stored one level up, eg `meta / sales / reservations / <availabilityId> `, allowing all Reservations for an Availability to be queried (this is not currently needed, but may be useful when work to restore Availability size is implemented, more on this later).

### Lifecycle
When a reservation is created, its size is deducted from the Availability, and when a reservation is deleted, any remaining size (bytes not written to disk) is returned to the Availability. If the request finishes, is cancelled (expired), or an error occurs, the Reservation is deleted (and any undownloaded bytes returned to the Availability). In addition, when the Sales module starts, any Reservations that are not actively being used in a filled slot, are deleted.

Having a Reservation persisted until after a storage request is completed, will allow for the originally set Availability size to be reclaimed once a request contract has been completed. This is a feature that is yet to be implemented, however the work in this PR is a step in the direction towards enabling this.

### Unknowns
Reservation size is determined by the `StorageAsk.slotSize`. If during download, more bytes than `slotSize` are attempted to be downloaded than this, then the Reservation update will fail, and the state machine will move to a `SaleErrored` state, deleting the Reservation. This will likely prevent the slot from being filled.

### Notes
Based on #514
This commit is contained in:
Eric 2023-09-29 14:33:08 +10:00 committed by GitHub
parent 2f1c778d02
commit 570a1f7b67
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 818 additions and 464 deletions

View File

@ -284,7 +284,13 @@ proc requestStorage*(
let request = StorageRequest(
ask: StorageAsk(
slots: nodes + tolerance,
slotSize: (encoded.blockSize.int * encoded.steps).u256,
# TODO: Specify slot-specific size (as below) once dispersal is
# implemented. The current implementation downloads the entire dataset, so
# it is currently set to be the size of the entire dataset. This is
# because the slotSize is used to determine the amount of bytes to reserve
# in a Reservations
# TODO: slotSize: (encoded.blockSize.int * encoded.steps).u256,
slotSize: (encoded.blockSize.int * encoded.blocks.len).u256,
duration: duration,
proofProbability: proofProbability,
reward: reward,

View File

@ -340,10 +340,10 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")
without unused =? (await contracts.sales.context.reservations.unused), err:
without avails =? (await contracts.sales.context.reservations.all(Availability)), err:
return RestApiResponse.error(Http500, err.msg)
let json = %unused
let json = %avails
return RestApiResponse.response($json, contentType="application/json")
router.rawApi(
@ -365,20 +365,21 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
return RestApiResponse.error(Http400, error.msg)
let reservations = contracts.sales.context.reservations
# assign id to availability via init
let availability = Availability.init(restAv.size,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)
if not reservations.hasAvailable(availability.size.truncate(uint)):
if not reservations.hasAvailable(restAv.size.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")
if err =? (await reservations.reserve(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
without availability =? (
await reservations.createAvailability(
restAv.size,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)
), error:
return RestApiResponse.error(Http500, error.msg)
let json = %availability
return RestApiResponse.response($json, contentType="application/json")
return RestApiResponse.response(availability.toJson,
contentType="application/json")
router.api(
MethodGet,

View File

@ -1,7 +1,7 @@
import std/sequtils
import std/sugar
import std/tables
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/chronicles
import pkg/datastore
@ -101,8 +101,49 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
if sales.running:
sales.agents.keepItIf(it != agent)
proc filled(sales: Sales,
processing: Future[void]) =
proc cleanUp(sales: Sales,
agent: SalesAgent,
processing: Future[void]) {.async.} =
let data = agent.data
trace "cleaning up sales agent",
requestId = data.requestId,
slotIndex = data.slotIndex,
reservationId = data.reservation.?id |? ReservationId.default,
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
# TODO: return bytes that were used in the request back to the availability
# as well, which will require removing the bytes from disk (perhaps via
# setting blockTTL to -1 and then running block maintainer?)
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (await sales.context.reservations.deleteReservation(
reservation.id,
reservation.availabilityId
)).errorOption:
error "failure deleting reservation",
error = deleteErr.msg,
reservationId = reservation.id,
availabilityId = reservation.availabilityId
await sales.remove(agent)
# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()
proc filled(
sales: Sales,
request: StorageRequest,
slotIndex: UInt256,
processing: Future[void]) =
if onSale =? sales.context.onSale:
onSale(request, slotIndex)
# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()
@ -117,15 +158,39 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest
)
agent.context.onCleanUp = proc {.async.} =
await sales.remove(agent)
agent.onCleanUp = proc {.async.} =
await sales.cleanUp(agent, done)
agent.context.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done)
agent.start(SalePreparing())
sales.agents.add agent
proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} =
let reservations = sales.context.reservations
without reservs =? await reservations.all(Reservation):
info "no unused reservations found for deletion"
let unused = reservs.filter(r => (
let slotId = slotId(r.requestId, r.slotIndex)
not activeSlots.any(slot => slot.id == slotId)
))
info "found unused reservations for deletion", unused = unused.len
for reservation in unused:
logScope:
reservationId = reservation.id
availabilityId = reservation.availabilityId
if err =? (await reservations.deleteReservation(
reservation.id, reservation.availabilityId
)).errorOption:
error "failed to delete unused reservation", error = err.msg
else:
trace "deleted unused reservation"
proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
let market = sales.context.market
let slotIds = await market.mySlots()
@ -139,21 +204,26 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
return slots
proc load*(sales: Sales) {.async.} =
let slots = await sales.mySlots()
let activeSlots = await sales.mySlots()
for slot in slots:
await sales.deleteInactiveReservations(activeSlots)
for slot in activeSlots:
let agent = newSalesAgent(
sales.context,
slot.request.id,
slot.slotIndex,
some slot.request)
agent.context.onCleanUp = proc {.async.} = await sales.remove(agent)
agent.onCleanUp = proc {.async.} =
let done = newFuture[void]("onCleanUp_Dummy")
await sales.cleanUp(agent, done)
await done # completed in sales.cleanUp
agent.start(SaleUnknown())
sales.agents.add agent
proc onReservationAdded(sales: Sales, availability: Availability) {.async.} =
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
## Query last 256 blocks for new requests, adding them to the queue. `push`
## checks for availability before adding to the queue. If processed, the
## sales agent will check if the slot is free.
@ -162,9 +232,9 @@ proc onReservationAdded(sales: Sales, availability: Availability) {.async.} =
let queue = context.slotQueue
logScope:
topics = "marketplace sales onReservationAdded callback"
topics = "marketplace sales onAvailabilityAdded callback"
trace "reservation added, querying past storage requests to add to queue"
trace "availability added, querying past storage requests to add to queue"
try:
let events = await market.queryPastStorageRequests(256)
@ -384,10 +454,10 @@ proc startSlotQueue(sales: Sales) {.async.} =
asyncSpawn slotQueue.start()
reservations.onReservationAdded =
proc(availability: Availability) {.async.} =
await sales.onReservationAdded(availability)
proc onAvailabilityAdded(availability: Availability) {.async.} =
await sales.onAvailabilityAdded(availability)
reservations.onAvailabilityAdded = onAvailabilityAdded
proc subscribe(sales: Sales) {.async.} =
await sales.subscribeRequested()

View File

@ -6,56 +6,79 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
##
## +--------------------------------------+
## | RESERVATION |
## +--------------------------------------+ |--------------------------------------|
## | AVAILABILITY | | ReservationId | id | PK |
## |--------------------------------------| |--------------------------------------|
## | AvailabilityId | id | PK |<-||-------o<-| AvailabilityId | availabilityId | FK |
## |--------------------------------------| |--------------------------------------|
## | UInt256 | size | | | UInt256 | size | |
## |--------------------------------------| |--------------------------------------|
## | UInt256 | duration | | | SlotId | slotId | |
## |--------------------------------------| +--------------------------------------+
## | UInt256 | minPrice | |
## |--------------------------------------|
## | UInt256 | maxCollateral | |
## +--------------------------------------+
import pkg/upraises
push: {.upraises: [].}
import std/typetraits
import pkg/chronos
import pkg/chronicles
import pkg/upraises
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/stint
import pkg/stew/byteutils
import pkg/datastore
import pkg/nimcrypto
import pkg/questionable
import pkg/questionable/results
import ../utils/json
push: {.upraises: [].}
import pkg/datastore
import pkg/stint
import pkg/stew/byteutils
import ../stores
import ../contracts/requests
import ../utils/json
export requests
export chronicles
logScope:
topics = "reservations"
topics = "sales reservations"
type
AvailabilityId* = distinct array[32, byte]
Availability* = object
ReservationId* = distinct array[32, byte]
SomeStorableObject = Availability | Reservation
SomeStorableId = AvailabilityId | ReservationId
Availability* = ref object
id* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
duration* {.serialize.}: UInt256
minPrice* {.serialize.}: UInt256
maxCollateral* {.serialize.}: UInt256
used*: bool
Reservation* = ref object
id* {.serialize.}: ReservationId
availabilityId* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256
Reservations* = ref object
repo: RepoStore
onReservationAdded: ?OnReservationAdded
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
OnReservationAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
AvailabilityIter* = ref object
onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
StorableIter* = ref object
finished*: bool
next*: GetNext
AvailabilityError* = object of CodexError
AvailabilityAlreadyExistsError* = object of AvailabilityError
AvailabilityReserveFailedError* = object of AvailabilityError
AvailabilityReleaseFailedError* = object of AvailabilityError
AvailabilityDeleteFailedError* = object of AvailabilityError
AvailabilityGetFailedError* = object of AvailabilityError
AvailabilityUpdateFailedError* = object of AvailabilityError
ReservationsError* = object of CodexError
ReserveFailedError* = object of ReservationsError
ReleaseFailedError* = object of ReservationsError
DeleteFailedError* = object of ReservationsError
GetFailedError* = object of ReservationsError
NotExistsError* = object of ReservationsError
SerializationError* = object of ReservationsError
UpdateFailedError* = object of ReservationsError
BytesOutOfBoundsError* = object of ReservationsError
const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
@ -77,10 +100,29 @@ proc init*(
doAssert randomBytes(id) == 32
Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice, maxCollateral: maxCollateral)
func toArray*(id: AvailabilityId): array[32, byte] =
proc init*(
_: type Reservation,
availabilityId: AvailabilityId,
size: UInt256,
requestId: RequestId,
slotIndex: UInt256
): Reservation =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex)
func toArray(id: SomeStorableId): array[32, byte] =
array[32, byte](id)
proc `==`*(x, y: AvailabilityId): bool {.borrow.}
proc `==`*(x, y: ReservationId): bool {.borrow.}
proc `==`*(x, y: Reservation): bool =
x.id == y.id and
x.availabilityId == y.availabilityId and
x.size == y.size and
x.requestId == y.requestId and
x.slotIndex == y.slotIndex
proc `==`*(x, y: Availability): bool =
x.id == y.id and
x.size == y.size and
@ -88,9 +130,9 @@ proc `==`*(x, y: Availability): bool =
x.maxCollateral == y.maxCollateral and
x.minPrice == y.minPrice
proc `$`*(id: AvailabilityId): string = id.toArray.toHex
proc `$`*(id: SomeStorableId): string = id.toArray.toHex
proc toErr[E1: ref CatchableError, E2: AvailabilityError](
proc toErr[E1: ref CatchableError, E2: ReservationsError](
e1: E1,
_: type E2,
msg: string = e1.msg): ref E2 =
@ -99,28 +141,30 @@ proc toErr[E1: ref CatchableError, E2: AvailabilityError](
proc writeValue*(
writer: var JsonWriter,
value: AvailabilityId) {.upraises:[IOError].} =
value: SomeStorableId) {.upraises:[IOError].} =
## used for chronicles' logs
mixin writeValue
writer.writeValue value.toArray
writer.writeValue %value
proc readValue*[T: AvailabilityId](
reader: var JsonReader,
value: var T) {.upraises: [SerializationError, IOError].} =
proc `onAvailabilityAdded=`*(self: Reservations,
onAvailabilityAdded: OnAvailabilityAdded) =
self.onAvailabilityAdded = some onAvailabilityAdded
mixin readValue
value = T reader.readValue(T.distinctBase)
func key*(id: AvailabilityId): ?!Key =
## sales / reservations / <availabilityId>
(ReservationsKey / $id)
proc `onReservationAdded=`*(self: Reservations,
onReservationAdded: OnReservationAdded) =
self.onReservationAdded = some onReservationAdded
func key(id: AvailabilityId): ?!Key =
(ReservationsKey / id.toArray.toHex)
func key*(reservationId: ReservationId, availabilityId: AvailabilityId): ?!Key =
## sales / reservations / <availabilityId> / <reservationId>
(availabilityId.key / $reservationId)
func key*(availability: Availability): ?!Key =
return availability.id.key
func key*(reservation: Reservation): ?!Key =
return key(reservation.id, reservation.availabilityId)
func available*(self: Reservations): uint = self.repo.available
func hasAvailable*(self: Reservations, bytes: uint): bool =
@ -128,84 +172,123 @@ func hasAvailable*(self: Reservations, bytes: uint): bool =
proc exists*(
self: Reservations,
id: AvailabilityId): Future[?!bool] {.async.} =
without key =? id.key, err:
return failure(err)
key: Key): Future[bool] {.async.} =
let exists = await self.repo.metaDs.contains(key)
return success(exists)
return exists
proc getImpl(
self: Reservations,
key: Key): Future[?!seq[byte]] {.async.} =
if exists =? (await self.exists(key)) and not exists:
let err = newException(NotExistsError, "object with key " & $key & " does not exist")
return failure(err)
without serialized =? await self.repo.metaDs.get(key), error:
return failure(error.toErr(GetFailedError))
return success serialized
proc get*(
self: Reservations,
id: AvailabilityId): Future[?!Availability] {.async.} =
key: Key,
T: type SomeStorableObject): Future[?!T] {.async.} =
if exists =? (await self.exists(id)) and not exists:
let err = newException(AvailabilityGetFailedError,
"Availability does not exist")
return failure(err)
without serialized =? await self.getImpl(key), error:
return failure(error)
without key =? id.key, err:
return failure(err.toErr(AvailabilityGetFailedError))
without obj =? T.fromJson(serialized), error:
return failure(error.toErr(SerializationError))
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err.toErr(AvailabilityGetFailedError))
without availability =? Json.decode(serialized, Availability).catch, err:
return failure(err.toErr(AvailabilityGetFailedError))
return success availability
return success obj
proc update(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
obj: SomeStorableObject): Future[?!void] {.async.} =
trace "updating availability", id = availability.id, size = availability.size,
used = availability.used
trace "updating " & $(obj.type), id = obj.id, size = obj.size
without key =? availability.key, err:
return failure(err)
without key =? obj.key, error:
return failure(error)
if err =? (await self.repo.metaDs.put(
key,
@(availability.toJson.toBytes))).errorOption:
return failure(err.toErr(AvailabilityUpdateFailedError))
@(obj.toJson.toBytes)
)).errorOption:
return failure(err.toErr(UpdateFailedError))
return success()
proc delete(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
key: Key): Future[?!void] {.async.} =
trace "deleting availability", id
trace "deleting object", key
without availability =? (await self.get(id)), err:
return failure(err)
without key =? availability.key, err:
return failure(err)
if exists =? (await self.exists(key)) and not exists:
return success()
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(AvailabilityDeleteFailedError))
return failure(err.toErr(DeleteFailedError))
return success()
proc reserve*(
proc deleteReservation*(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
reservationId: ReservationId,
availabilityId: AvailabilityId): Future[?!void] {.async.} =
if exists =? (await self.exists(availability.id)) and exists:
let err = newException(AvailabilityAlreadyExistsError,
"Availability already exists")
return failure(err)
logScope:
reservationId
availabilityId
without key =? availability.key, err:
return failure(err)
trace "deleting reservation"
without key =? key(reservationId, availabilityId), error:
return failure(error)
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.size
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.size += reservation.size
if updateErr =? (await self.update(availability)).errorOption:
return failure(updateErr)
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
return success()
proc createAvailability*(
self: Reservations,
size: UInt256,
duration: UInt256,
minPrice: UInt256,
maxCollateral: UInt256): Future[?!Availability] {.async.} =
trace "creating availability", size, duration, minPrice, maxCollateral
let availability = Availability.init(
size, duration, minPrice, maxCollateral
)
let bytes = availability.size.truncate(uint)
if reserveErr =? (await self.repo.reserve(bytes)).errorOption:
return failure(reserveErr.toErr(AvailabilityReserveFailedError))
return failure(reserveErr.toErr(ReserveFailedError))
if updateErr =? (await self.update(availability)).errorOption:
@ -217,144 +300,192 @@ proc reserve*(
return failure(updateErr)
if onReservationAdded =? self.onReservationAdded:
if onAvailabilityAdded =? self.onAvailabilityAdded:
try:
await onReservationAdded(availability)
await onAvailabilityAdded(availability)
except CatchableError as e:
# we don't have any insight into types of errors that `onProcessSlot` can
# throw because it is caller-defined
warn "Unknown error during 'onReservationAdded' callback",
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = availability.id, error = e.msg
return success()
return success(availability)
proc createReservation*(
self: Reservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
requestId: RequestId,
slotIndex: UInt256
): Future[?!Reservation] {.async.} =
trace "creating reservation", availabilityId, slotSize, requestId, slotIndex
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
if availability.size < slotSize:
let error = newException(BytesOutOfBoundsError, "trying to reserve an " &
"amount of bytes that is greater than the total size of the Availability")
return failure(error)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
# reduce availability size by the slot size, which is now accounted for in
# the newly created Reservation
availability.size -= slotSize
# update availability with reduced size
if updateErr =? (await self.update(availability)).errorOption:
trace "rolling back reservation creation"
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success(reservation)
proc release*(
self: Reservations,
id: AvailabilityId,
reservationId: ReservationId,
availabilityId: AvailabilityId,
bytes: uint): Future[?!void] {.async.} =
trace "releasing bytes and updating availability", bytes, id
logScope:
topics = "release"
bytes
reservationId
availabilityId
without var availability =? (await self.get(id)), err:
return failure(err)
trace "releasing bytes and updating reservation"
without key =? id.key, err:
return failure(err)
without key =? key(reservationId, availabilityId), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
if reservation.size < bytes.u256:
let error = newException(BytesOutOfBoundsError,
"trying to release an amount of bytes that is greater than the total " &
"size of the Reservation")
return failure(error)
if releaseErr =? (await self.repo.release(bytes)).errorOption:
return failure(releaseErr.toErr(AvailabilityReleaseFailedError))
return failure(releaseErr.toErr(ReleaseFailedError))
availability.size = (availability.size.truncate(uint) - bytes).u256
reservation.size -= bytes.u256
template rollbackRelease(e: ref CatchableError) =
# persist partially used Reservation with updated size
if err =? (await self.update(reservation)).errorOption:
# rollback release if an update error encountered
trace "rolling back release"
if rollbackErr =? (await self.repo.reserve(bytes)).errorOption:
rollbackErr.parent = e
rollbackErr.parent = err
return failure(rollbackErr)
# remove completely used availabilities
if availability.size == 0.u256:
if err =? (await self.delete(availability.id)).errorOption:
rollbackRelease(err)
return failure(err)
return success()
# persist partially used availability with updated size
if err =? (await self.update(availability)).errorOption:
rollbackRelease(err)
return failure(err)
return success()
proc markUsed*(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
without var availability =? (await self.get(id)), err:
return failure(err)
availability.used = true
let r = await self.update(availability)
if r.isOk:
trace "availability marked used", id = id.toArray.toHex
return r
proc markUnused*(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
without var availability =? (await self.get(id)), err:
return failure(err)
availability.used = false
let r = await self.update(availability)
if r.isOk:
trace "availability marked unused", id = id.toArray.toHex
return r
iterator items*(self: AvailabilityIter): Future[?Availability] =
iterator items(self: StorableIter): Future[?seq[byte]] =
while not self.finished:
yield self.next()
proc availabilities*(
self: Reservations): Future[?!AvailabilityIter] {.async.} =
proc storables(
self: Reservations,
T: type SomeStorableObject
): Future[?!StorableIter] {.async.} =
var iter = AvailabilityIter()
var iter = StorableIter()
let query = Query.init(ReservationsKey)
when T is Availability:
# should indicate key length of 4, but let the .key logic determine it
without defaultKey =? AvailabilityId.default.key, error:
return failure(error)
elif T is Reservation:
# should indicate key length of 5, but let the .key logic determine it
without defaultKey =? key(ReservationId.default, AvailabilityId.default), error:
return failure(error)
else:
raiseAssert "unknown type"
without results =? await self.repo.metaDs.query(query), err:
return failure(err)
without results =? await self.repo.metaDs.query(query), error:
return failure(error)
proc next(): Future[?Availability] {.async.} =
proc next(): Future[?seq[byte]] {.async.} =
await idleAsync()
iter.finished = results.finished
if not results.finished and
r =? (await results.next()) and
serialized =? r.data and
serialized.len > 0:
res =? (await results.next()) and
res.data.len > 0 and
key =? res.key and
key.namespaces.len == defaultKey.namespaces.len:
return some Json.decode(string.fromBytes(serialized), Availability)
return some res.data
return none Availability
return none seq[byte]
iter.next = next
return success iter
proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
proc all*(
self: Reservations,
T: type SomeStorableObject
): Future[?!seq[T]] {.async.} =
without availabilities =? (await r.availabilities), err:
return failure(err)
var ret: seq[T] = @[]
for a in availabilities:
if availability =? (await a) and not availability.used:
ret.add availability
without storables =? (await self.storables(T)), error:
return failure(error)
for storable in storables.items:
without bytes =? (await storable):
continue
without obj =? T.fromJson(bytes), error:
error "json deserialization error",
json = string.fromBytes(bytes),
error = error.msg
continue
ret.add obj
return success(ret)
proc find*(
proc findAvailability*(
self: Reservations,
size, duration, minPrice, collateral: UInt256,
used: bool): Future[?Availability] {.async.} =
size, duration, minPrice, collateral: UInt256
): Future[?Availability] {.async.} =
without availabilities =? (await self.availabilities), err:
error "failed to get all availabilities", error = err.msg
without storables =? (await self.storables(Availability)), e:
error "failed to get all storables", error = e.msg
return none Availability
for a in availabilities:
if availability =? (await a):
for item in storables.items:
if bytes =? (await item) and
availability =? Availability.fromJson(bytes):
if used == availability.used and
size <= availability.size and
if size <= availability.size and
duration <= availability.duration and
collateral <= availability.maxCollateral and
minPrice >= availability.minPrice:
trace "availability matched",
used, availUsed = availability.used,
size, availsize = availability.size,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
@ -363,7 +494,6 @@ proc find*(
return some availability
trace "availiability did not match",
used, availUsed = availability.used,
size, availsize = availability.size,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,

View File

@ -21,6 +21,14 @@ type
context*: SalesContext
data*: SalesData
subscribed: bool
# Slot-level callbacks.
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
SalesAgentError = object of CodexError
AllSlotsFilledError* = object of SalesAgentError

View File

@ -11,11 +11,11 @@ type
SalesContext* = ref object
market*: Market
clock*: Clock
# Sales-level callbacks. Closure will be overwritten each time a slot is
# processed.
onStore*: ?OnStore
onClear*: ?OnClear
onSale*: ?OnSale
onFilled*: ?OnFilled
onCleanUp*: OnCleanUp
onProve*: ?OnProve
reservations*: Reservations
slotQueue*: SlotQueue
@ -29,11 +29,3 @@ type
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
# OnFilled has same function as OnSale, but is kept for internal purposes and should not be set by any external
# purposes as it is used for freeing Queue Workers after slot is filled. And the callbacks allows only
# one callback to be set, so if some other component would use it, it would override the Slot Queue freeing
# mechanism which would lead to blocking of the queue.
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].}

View File

@ -10,3 +10,4 @@ type
request*: ?StorageRequest
slotIndex*: UInt256
cancelled*: Future[void]
reservation*: ?Reservation

View File

@ -14,10 +14,9 @@ import ./errored
type
SaleDownloading* = ref object of ErrorHandlingState
availability*: Availability
logScope:
topics = "marketplace sales downloading"
topics = "marketplace sales downloading"
method `$`*(state: SaleDownloading): string = "SaleDownloading"
@ -36,7 +35,6 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
let data = agent.data
let context = agent.context
let reservations = context.reservations
let availability = state.availability
without onStore =? context.onStore:
raiseAssert "onStore callback not set"
@ -47,9 +45,14 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
without slotIndex =? data.slotIndex:
raiseAssert("no slot index assigned")
# mark availability as used so that it is not matched to other requests
if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption:
return some State(SaleErrored(error: markUsedErr))
without reservation =? data.reservation:
raiseAssert("no reservation")
logScope:
requestId = request.id
slotIndex
reservationId = reservation.id
availabilityId = reservation.availabilityId
proc onBatch(blocks: seq[bt.Block]) {.async.} =
# release batches of blocks as they are written to disk and
@ -59,25 +62,19 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
bytes += blk.data.len.uint
trace "Releasing batch of bytes written to disk", bytes
let r = await reservations.release(availability.id, bytes)
let r = await reservations.release(reservation.id,
reservation.availabilityId,
bytes)
# `tryGet` will raise the exception that occurred during release, if there
# was one. The exception will be caught in the closure and sent to the
# SaleErrored state.
r.tryGet()
template markUnused(id: AvailabilityId) =
if markUnusedErr =? (await reservations.markUnused(id)).errorOption:
return some State(SaleErrored(error: markUnusedErr))
trace "Starting download"
if err =? (await onStore(request,
slotIndex,
onBatch)).errorOption:
markUnused(availability.id)
return some State(SaleErrored(error: err))
trace "Download complete"
markUnused(availability.id)
return some State(SaleInitialProving())

View File

@ -4,6 +4,7 @@ import pkg/upraises
import pkg/chronicles
import ../statemachine
import ../salesagent
import ../../utils/exceptions
logScope:
topics = "marketplace sales errored"
@ -21,13 +22,13 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
let data = agent.data
let context = agent.context
error "Sale error", error=state.error.msg, requestId = $data.requestId
error "Sale error", error=state.error.msgDetail, requestId = data.requestId, slotIndex = data.slotIndex
if onClear =? context.onClear and
request =? data.request and
slotIndex =? data.slotIndex:
onClear(request, slotIndex)
if onCleanUp =? context.onCleanUp:
if onCleanUp =? agent.onCleanUp:
await onCleanUp()

View File

@ -26,8 +26,9 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State =
method `$`*(state: SaleFilled): string = "SaleFilled"
method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
let data = SalesAgent(machine).data
let context = SalesAgent(machine).context
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
let market = context.market
without slotIndex =? data.slotIndex:
@ -39,9 +40,7 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} =
info "Slot succesfully filled", requestId = $data.requestId, slotIndex
if request =? data.request and slotIndex =? data.slotIndex:
if onSale =? context.onSale:
onSale(request, slotIndex)
if onFilled =? context.onFilled:
if onFilled =? agent.onFilled:
onFilled(request, slotIndex)
when codex_enable_proof_failures:

View File

@ -23,7 +23,6 @@ method onFailed*(state: SaleFinished, request: StorageRequest): ?State =
method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
let data = agent.data
let context = agent.context
without request =? data.request:
raiseAssert "no sale request"
@ -33,5 +32,5 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
info "Slot finished and paid out", requestId = $data.requestId, slotIndex
if onCleanUp =? context.onCleanUp:
if onCleanUp =? agent.onCleanUp:
await onCleanUp()

View File

@ -14,7 +14,6 @@ method `$`*(state: SaleIgnored): string = "SaleIgnored"
method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
let context = agent.context
if onCleanUp =? context.onCleanUp:
if onCleanUp =? agent.onCleanUp:
await onCleanUp()

View File

@ -10,6 +10,7 @@ import ./failed
import ./filled
import ./ignored
import ./downloading
import ./errored
type
SalePreparing* = ref object of ErrorHandlingState
@ -50,20 +51,33 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
# TODO: Once implemented, check to ensure the host is allowed to fill the slot,
# due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal)
logScope:
slotIndex = data.slotIndex
slotSize = request.ask.slotSize
duration = request.ask.duration
pricePerSlot = request.ask.pricePerSlot
# availability was checked for this slot when it entered the queue, however
# check to the ensure that there is still availability as they may have
# changed since being added (other slots may have been processed in that time)
without availability =? await reservations.find(
without availability =? await reservations.findAvailability(
request.ask.slotSize,
request.ask.duration,
request.ask.pricePerSlot,
request.ask.collateral,
used = false):
info "no availability found for request, ignoring",
slotSize = request.ask.slotSize,
duration = request.ask.duration,
pricePerSlot = request.ask.pricePerSlot,
used = false
request.ask.collateral):
debug "no availability found for request, ignoring"
return some State(SaleIgnored())
return some State(SaleDownloading(availability: availability))
info "availability found for request, creating reservation"
without reservation =? await reservations.createReservation(
availability.id,
request.ask.slotSize,
request.id,
data.slotIndex
), error:
return some State(SaleErrored(error: error))
data.reservation = some reservation
return some State(SaleDownloading())

View File

@ -103,6 +103,8 @@ proc stop*(machine: Machine) {.async.} =
if not machine.started:
return
trace "stopping state machine"
machine.started = false
await machine.trackedFutures.cancelTracked()

View File

@ -0,0 +1,7 @@
import std/strformat
proc msgDetail*(e: ref CatchableError): string =
var msg = e.msg
if e.parent != nil:
msg = fmt"{msg} Inner exception: {e.parent.msg}"
return msg

View File

@ -234,6 +234,13 @@ proc fromJson*[T: object](
let json = ?catch parseJson(string.fromBytes(bytes))
T.fromJson(json)
proc fromJson*[T: ref object](
_: type T,
bytes: seq[byte]
): ?!T =
let json = ?catch parseJson(string.fromBytes(bytes))
T.fromJson(json)
func `%`*(s: string): JsonNode = newJString(s)
func `%`*(n: uint): JsonNode =
@ -307,6 +314,9 @@ func `%`*[T: distinct](id: T): JsonNode =
type baseType = T.distinctBase
% baseType(id)
func toJson*(obj: object): string = $(%obj)
func toJson*(obj: ref object): string = $(%obj)
proc toJsnImpl(x: NimNode): NimNode =
case x.kind
of nnkBracket: # array

View File

@ -16,14 +16,12 @@ proc len*(self: TrackedFutures): int = self.futures.len
proc removeFuture(self: TrackedFutures, future: FutureBase) =
if not self.cancelling and not future.isNil:
trace "removing tracked future"
self.futures.del(future.id)
proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] =
if self.cancelling:
return fut
trace "tracking future", id = fut.id
self.futures[fut.id] = FutureBase(fut)
fut
@ -42,6 +40,8 @@ proc track*[T, U](future: Future[T], self: U): Future[T] =
proc cancelTracked*(self: TrackedFutures) {.async.} =
self.cancelling = true
trace "cancelling tracked futures"
for future in self.futures.values:
if not future.isNil and not future.finished:
trace "cancelling tracked future", id = future.id

View File

@ -60,3 +60,11 @@ proc example*(_: type Availability): Availability =
minPrice = uint64.example.u256,
maxCollateral = uint16.example.u256
)
proc example*(_: type Reservation): Reservation =
Reservation.init(
availabilityId = AvailabilityId(array[32, byte].example),
size = uint16.example.u256,
slotId = SlotId.example
)

View File

@ -1,17 +0,0 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales/reservations
import ../helpers
export checktest
proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
without availabilities =? (await r.availabilities), err:
raiseAssert "failed to get availabilities, error: " & err.msg
for a in availabilities:
if availability =? (await a):
ret.add availability
return ret

View File

@ -1,40 +1,60 @@
import std/random
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/asynctest
import pkg/datastore
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/codex/stores
import pkg/codex/sales
import pkg/codex/utils/json
import ../examples
import ./helpers
import ../helpers
asyncchecksuite "Reservations module":
var
repo: RepoStore
repoDs: Datastore
metaDs: SQLiteDatastore
availability: Availability
reservations: Reservations
setup:
randomize(1.int64) # create reproducible results
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
reservations = Reservations.new(repo)
availability = Availability.example
proc createAvailability(): Availability =
let example = Availability.example
let size = rand(100000..200000)
let availability = waitFor reservations.createAvailability(
size.u256,
example.duration,
example.minPrice,
example.maxCollateral
)
return availability.get
proc createReservation(availability: Availability): Reservation =
let size = rand(1..<availability.size.truncate(int))
let reservation = waitFor reservations.createReservation(
availability.id,
size.u256,
RequestId.example,
UInt256.example
)
return reservation.get
test "availability can be serialised and deserialised":
let availability = Availability.example
let serialised = availability.toJson
check Json.decode(serialised, Availability) == availability
let serialised = %availability
check Availability.fromJson(serialised).get == availability
test "has no availability initially":
check (await reservations.allAvailabilities()).len == 0
check (await reservations.all(Availability)).get.len == 0
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.u256, 2.u256, 3.u256, 4.u256)
@ -42,12 +62,18 @@ asyncchecksuite "Reservations module":
check availability1.id != availability2.id
test "can reserve available storage":
let availability1 = Availability.example
let availability2 = Availability.example
check isOk await reservations.reserve(availability1)
check isOk await reservations.reserve(availability2)
let availability = createAvailability()
check availability.id != AvailabilityId.default
let availabilities = await reservations.allAvailabilities()
test "creating availability reserves bytes in repo":
let orig = repo.available
let availability = createAvailability()
check repo.available == (orig.u256 - availability.size).truncate(uint)
test "can get all availabilities":
let availability1 = createAvailability()
let availability2 = createAvailability()
let availabilities = !(await reservations.all(Availability))
check:
# perform unordered checks
availabilities.len == 2
@ -55,132 +81,198 @@ asyncchecksuite "Reservations module":
availabilities.contains(availability2)
test "reserved availability exists":
check isOk await reservations.reserve(availability)
let availability = createAvailability()
without exists =? await reservations.exists(availability.id):
without exists =? await reservations.exists(availability.key.get):
fail()
check exists
test "reserved availability can be partially released":
let size = availability.size.truncate(uint)
check isOk await reservations.reserve(availability)
check isOk await reservations.release(availability.id, size - 1)
test "reservation can be created":
let availability = createAvailability()
let reservation = createReservation(availability)
check reservation.id != ReservationId.default
without a =? await reservations.get(availability.id):
fail()
test "can get all reservations":
let availability1 = createAvailability()
let availability2 = createAvailability()
let reservation1 = createReservation(availability1)
let reservation2 = createReservation(availability2)
let availabilities = !(await reservations.all(Availability))
let reservations = !(await reservations.all(Reservation))
check:
# perform unordered checks
availabilities.len == 2
reservations.len == 2
reservations.contains(reservation1)
reservations.contains(reservation2)
check a.size == 1
test "cannot create reservation with non-existant availability":
let availability = Availability.example
let created = await reservations.createReservation(
availability.id,
UInt256.example,
RequestId.example,
UInt256.example
)
check created.isErr
check created.error of NotExistsError
test "availability is deleted after being fully released":
let size = availability.size.truncate(uint)
check isOk await reservations.reserve(availability)
check isOk await reservations.release(availability.id, size)
test "cannot create reservation larger than availability size":
let availability = createAvailability()
let created = await reservations.createReservation(
availability.id,
availability.size + 1,
RequestId.example,
UInt256.example
)
check created.isErr
check created.error of BytesOutOfBoundsError
without exists =? await reservations.exists(availability.id):
fail()
test "creating reservation reduces availability size":
let availability = createAvailability()
let orig = availability.size
let reservation = createReservation(availability)
let key = availability.id.key.get
let updated = (await reservations.get(key, Availability)).get
check updated.size == orig - reservation.size
check not exists
test "can check if reservation exists":
let availability = createAvailability()
let reservation = createReservation(availability)
let key = reservation.key.get
check await reservations.exists(key)
test "non-existant availability cannot be released":
let size = availability.size.truncate(uint)
let r = await reservations.release(availability.id, size - 1)
check r.error of AvailabilityGetFailedError
check r.error.msg == "Availability does not exist"
test "non-existant availability does not exist":
let key = AvailabilityId.example.key.get
check not (await reservations.exists(key))
test "added availability is not used initially":
check isOk await reservations.reserve(availability)
test "non-existant reservation does not exist":
let key = key(ReservationId.example, AvailabilityId.example).get
check not (await reservations.exists(key))
without available =? await reservations.get(availability.id):
fail()
test "can check if availability exists":
let availability = createAvailability()
let key = availability.key.get
check await reservations.exists(key)
check not available.used
test "can delete reservation":
let availability = createAvailability()
let reservation = createReservation(availability)
check isOk (await reservations.deleteReservation(
reservation.id, reservation.availabilityId)
)
let key = reservation.key.get
check not (await reservations.exists(key))
test "availability can be marked used":
check isOk await reservations.reserve(availability)
test "deleting reservation returns bytes back to availability":
let availability = createAvailability()
let orig = availability.size
let reservation = createReservation(availability)
discard await reservations.deleteReservation(
reservation.id, reservation.availabilityId
)
let key = availability.key.get
let updated = !(await reservations.get(key, Availability))
check updated.size == orig
check isOk await reservations.markUsed(availability.id)
test "reservation can be partially released":
let availability = createAvailability()
let reservation = createReservation(availability)
check isOk await reservations.release(
reservation.id,
reservation.availabilityId,
1
)
let key = reservation.key.get
let updated = !(await reservations.get(key, Reservation))
check updated.size == reservation.size - 1
without available =? await reservations.get(availability.id):
fail()
test "cannot release more bytes than size of reservation":
let availability = createAvailability()
let reservation = createReservation(availability)
let updated = await reservations.release(
reservation.id,
reservation.availabilityId,
(reservation.size + 1).truncate(uint)
)
check updated.isErr
check updated.error of BytesOutOfBoundsError
check available.used
test "cannot release bytes from non-existant reservation":
let availability = createAvailability()
let reservation = createReservation(availability)
let updated = await reservations.release(
ReservationId.example,
availability.id,
1
)
check updated.isErr
check updated.error of NotExistsError
test "availability can be marked unused":
check isOk await reservations.reserve(availability)
test "onAvailabilityAdded called when availability is reserved":
var added: Availability
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a
check isOk await reservations.markUsed(availability.id)
check isOk await reservations.markUnused(availability.id)
let availability = createAvailability()
without available =? await reservations.get(availability.id):
fail()
check added == availability
check not available.used
test "availabilities can be found":
let availability = createAvailability()
test "used availability can be found":
check isOk await reservations.reserve(availability)
let found = await reservations.findAvailability(
availability.size,
availability.duration,
availability.minPrice,
availability.maxCollateral)
check isOk await reservations.markUsed(availability.id)
check found.isSome
check found.get == availability
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, availability.maxCollateral, used = true):
test "non-matching availabilities are not found":
let availability = createAvailability()
fail()
let found = await reservations.findAvailability(
availability.size + 1,
availability.duration,
availability.minPrice,
availability.maxCollateral)
test "unused availability can be found":
check isOk await reservations.reserve(availability)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, availability.maxCollateral, used = false):
fail()
check found.isNone
test "non-existant availability cannot be found":
check isNone (await reservations.find(availability.size,
availability.duration, availability.minPrice, availability.maxCollateral, used = false))
let availability = Availability.example
let found = (await reservations.findAvailability(
availability.size,
availability.duration,
availability.minPrice,
availability.maxCollateral
))
check found.isNone
test "non-existant availability cannot be retrieved":
let r = await reservations.get(availability.id)
check r.error of AvailabilityGetFailedError
check r.error.msg == "Availability does not exist"
test "same availability cannot be reserved twice":
check isOk await reservations.reserve(availability)
let r = await reservations.reserve(availability)
check r.error of AvailabilityAlreadyExistsError
let key = AvailabilityId.example.key.get
let got = await reservations.get(key, Availability)
check got.error of NotExistsError
test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes
test "reserving availability reduces available bytes":
check isOk await reservations.reserve(availability)
check reservations.available ==
DefaultQuotaBytes - availability.size.truncate(uint)
test "reports quota available to be reserved":
check reservations.hasAvailable(availability.size.truncate(uint))
check reservations.hasAvailable(DefaultQuotaBytes - 1)
test "reports quota not available to be reserved":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
check not reservations.hasAvailable(availability.size.truncate(uint))
check not reservations.hasAvailable(DefaultQuotaBytes + 1)
test "fails to reserve availability with size that is larger than available quota":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
let r = await reservations.reserve(availability)
check r.error of AvailabilityReserveFailedError
check r.error.parent of QuotaNotEnoughError
check exists =? (await reservations.exists(availability.id)) and not exists
test "fails to release availability size that is larger than available quota":
let size = availability.size.truncate(uint)
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = size)
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
let r = await reservations.release(availability.id, size + 1)
check r.error of AvailabilityReleaseFailedError
check r.error.parent.msg == "Cannot release this many bytes"
test "fails to create availability with size that is larger than available quota":
let created = await reservations.createAvailability(
(DefaultQuotaBytes + 1).u256,
UInt256.example,
UInt256.example,
UInt256.example
)
check created.isErr
check created.error of ReserveFailedError
check created.error.parent of QuotaNotEnoughError

View File

@ -1,4 +1,3 @@
import std/sets
import std/sequtils
import std/sugar
import std/times
@ -15,11 +14,11 @@ import pkg/codex/sales/slotqueue
import pkg/codex/stores/repostore
import pkg/codex/blocktype as bt
import pkg/codex/node
import ../helpers
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/always
import ../examples
import ./helpers
asyncchecksuite "Sales - start":
let proof = exampleProof()
@ -122,11 +121,11 @@ asyncchecksuite "Sales":
var itemsProcessed: seq[SlotQueueItem]
setup:
availability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=600.u256,
maxCollateral=400.u256
availability = Availability(
size: 100.u256,
duration: 60.u256,
minPrice: 600.u256,
maxCollateral: 400.u256
)
request = StorageRequest(
ask: StorageAsk(
@ -169,8 +168,18 @@ asyncchecksuite "Sales":
await sales.stop()
await repo.stop()
proc getAvailability: ?!Availability =
waitFor reservations.get(availability.id)
proc getAvailability: Availability =
let key = availability.id.key.get
(waitFor reservations.get(key, Availability)).get
proc createAvailability() =
let a = waitFor reservations.createAvailability(
availability.size,
availability.duration,
availability.minPrice,
availability.maxCollateral
)
availability = a.get # update id
proc notProcessed(itemsProcessed: seq[SlotQueueItem],
request: StorageRequest): bool =
@ -188,7 +197,7 @@ asyncchecksuite "Sales":
var request1 = StorageRequest.example
request1.ask.collateral = request.ask.collateral + 1
discard await reservations.reserve(availability)
createAvailability()
# saturate queue
while queue.len < queue.size - 1:
await market.requestStorage(StorageRequest.example)
@ -197,126 +206,141 @@ asyncchecksuite "Sales":
await sleepAsync(5.millis) # wait for request slots to be added to queue
return request1
proc wasIgnored(): bool =
let run = proc(): Future[bool] {.async.} =
always (
getAvailability().size == availability.size and
(waitFor reservations.all(Reservation)).get.len == 0
)
waitFor run()
test "processes all request's slots once StorageRequested emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually items.allIt(itemsProcessed.contains(it))
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
createAvailability()
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually items.allIt(itemsProcessed.contains(it))
test "removes slots from slot queue once RequestCancelled emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitRequestCancelled(request1.id)
check always itemsProcessed.notProcessed(request1)
let request1 = await addRequestToSaturatedQueue()
market.emitRequestCancelled(request1.id)
check always itemsProcessed.notProcessed(request1)
test "removes request from slot queue once RequestFailed emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFailed(request1.id)
check always itemsProcessed.notProcessed(request1)
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFailed(request1.id)
check always itemsProcessed.notProcessed(request1)
test "removes request from slot queue once RequestFulfilled emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFulfilled(request1.id)
check always itemsProcessed.notProcessed(request1)
let request1 = await addRequestToSaturatedQueue()
market.emitRequestFulfilled(request1.id)
check always itemsProcessed.notProcessed(request1)
test "removes slot index from slot queue once SlotFilled emitted":
let request1 = await addRequestToSaturatedQueue()
market.emitSlotFilled(request1.id, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))
let request1 = await addRequestToSaturatedQueue()
market.emitSlotFilled(request1.id, 1.u256)
let expected = SlotQueueItem.init(request1, 1'u16)
check always (not itemsProcessed.contains(expected))
test "adds slot index to slot queue once SlotFreed emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
check isOk await reservations.reserve(availability)
market.requested.add request # "contract" must be able to return request
market.emitSlotFreed(request.id, 2.u256)
createAvailability()
market.requested.add request # "contract" must be able to return request
market.emitSlotFreed(request.id, 2.u256)
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)
let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected)
test "adds past requests to queue once availability added":
var itemsProcessed: seq[SlotQueueItem] = @[]
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
var itemsProcessed: seq[SlotQueueItem] = @[]
await market.requestStorage(request)
# ignore all
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
done.complete()
# now add matching availability
check isOk await reservations.reserve(availability)
check eventually itemsProcessed.len == request.ask.slots.int
await market.requestStorage(request)
await sleepAsync(10.millis)
test "makes storage unavailable when downloading a matched request":
var used = false
# check how many slots were processed by the queue
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
# now add matching availability
createAvailability()
check eventually itemsProcessed.len == request.ask.slots.int
test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
without avail =? await reservations.get(availability.id):
fail()
used = avail.used
let blk = bt.Block.new( @[1.byte] ).get
onBatch( blk.repeat(request.ask.slotSize.truncate(int)) )
return success()
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually used
check eventually getAvailability().size == availability.size - request.ask.slotSize
test "reduces remaining availability size after download":
let blk = bt.Block.example
request.ask.slotSize = blk.data.len.u256
availability.size = request.ask.slotSize + 1
test "non-downloaded bytes are returned to availability once finished":
var slotIndex = 0.u256
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await onBatch(@[blk])
slotIndex = slot
let blk = bt.Block.new( @[1.byte] ).get
onBatch(@[ blk ])
return success()
check isOk await reservations.reserve(availability)
createAvailability()
let origSize = availability.size
await market.requestStorage(request)
check eventually getAvailability().?size == success 1.u256
await sleepAsync(2.millis) # allow proving to start
# complete request
market.slotState[request.slotId(slotIndex)] = SlotState.Finished
clock.advance(request.ask.duration.truncate(int64))
check eventually getAvailability().size == origSize - 1
test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check getAvailability().?size == success availability.size
check wasIgnored()
test "ignores request when slot size is too small":
availability.size = request.ask.slotSize - 1
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check getAvailability().?size == success availability.size
check wasIgnored()
test "ignores request when reward is too low":
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check getAvailability().?size == success availability.size
test "availability remains unused when request is ignored":
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.reserve(availability)
await market.requestStorage(request)
check getAvailability().?used == success false
check wasIgnored()
test "ignores request when asked collateral is too high":
var tooBigCollateral = request
tooBigCollateral.ask.collateral = availability.maxCollateral + 1
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(tooBigCollateral)
check getAvailability().?size == success availability.size
check wasIgnored()
test "ignores request when slot state is not free":
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
market.slotState[request.slotId(0.u256)] = SlotState.Filled
market.slotState[request.slotId(1.u256)] = SlotState.Filled
market.slotState[request.slotId(2.u256)] = SlotState.Filled
market.slotState[request.slotId(3.u256)] = SlotState.Filled
check getAvailability().?size == success availability.size
check wasIgnored()
test "retrieves and stores data locally":
var storingRequest: StorageRequest
@ -327,7 +351,7 @@ asyncchecksuite "Sales":
storingRequest = request
storingSlot = slot
return success()
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
@ -342,7 +366,7 @@ asyncchecksuite "Sales":
sales.onClear = proc(request: StorageRequest,
idx: UInt256) =
saleFailed = true
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually saleFailed
@ -352,10 +376,9 @@ asyncchecksuite "Sales":
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return failure(error)
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually getAvailability().?used == success false
check getAvailability().?size == success availability.size
check getAvailability().size == availability.size
test "generates proof of storage":
var provingRequest: StorageRequest
@ -363,13 +386,13 @@ asyncchecksuite "Sales":
sales.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
provingRequest = slot.request
provingSlot = slot.slotIndex
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually provingRequest == request
check provingSlot < request.ask.slots.u256
test "fills a slot":
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually market.filled.len > 0
check market.filled[0].requestId == request.id
@ -378,19 +401,15 @@ asyncchecksuite "Sales":
check market.filled[0].host == await market.getSigner()
test "calls onFilled when slot is filled":
var soldAvailability: Availability
var soldRequest: StorageRequest
var soldSlotIndex: UInt256
var soldRequest = StorageRequest.default
var soldSlotIndex = UInt256.high
sales.onSale = proc(request: StorageRequest,
slotIndex: UInt256) =
if a =? availability:
soldAvailability = a
soldRequest = request
soldSlotIndex = slotIndex
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually soldAvailability == availability
check soldRequest == request
check eventually soldRequest == request
check soldSlotIndex < request.ask.slots.u256
test "calls onClear when storage becomes available again":
@ -404,7 +423,7 @@ asyncchecksuite "Sales":
slotIndex: UInt256) =
clearedRequest = request
clearedSlotIndex = slotIndex
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
check eventually clearedRequest == request
check clearedSlotIndex < request.ask.slots.u256
@ -416,22 +435,24 @@ asyncchecksuite "Sales":
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
check eventually (await reservations.allAvailabilities) == @[availability]
check eventually (await reservations.all(Availability)).get == @[availability]
test "makes storage available again when request expires":
let origSize = availability.size
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
check isOk await reservations.reserve(availability)
createAvailability()
await market.requestStorage(request)
clock.set(request.expiry.truncate(int64))
check eventually (await reservations.allAvailabilities) == @[availability]
check eventually (await reservations.all(Availability)).get == @[availability]
check getAvailability().size == origSize
test "loads active slots from market":
let me = await market.getSigner()
@ -469,3 +490,15 @@ asyncchecksuite "Sales":
check eventually sales.agents.len == 2
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 0.u256)
check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256)
test "deletes inactive reservations on load":
createAvailability()
discard await reservations.createReservation(
availability.id,
100.u256,
RequestId.example,
UInt256.example)
check (await reservations.all(Reservation)).get.len == 1
await sales.load()
check (await reservations.all(Reservation)).get.len == 0
check getAvailability().size == availability.size # was restored

View File

@ -211,7 +211,7 @@ checksuite "json serialization":
},
"expiry": "1691545330"
}""".flatten
check $(%request) == expected
check request.toJson == expected
test "deserializes UInt256 from non-hex string representation":
let json = newJString("100000")

View File

@ -1,6 +1,7 @@
import std/random
import std/sequtils
import std/times
import std/typetraits
import pkg/codex/contracts/requests
import pkg/codex/sales/slotqueue
import pkg/stint
@ -19,8 +20,9 @@ proc example*[T](_: type seq[T]): seq[T] =
proc example*(_: type UInt256): UInt256 =
UInt256.fromBytes(array[32, byte].example)
proc example*[T: RequestId | SlotId | Nonce](_: type T): T =
T(array[32, byte].example)
proc example*[T: distinct](_: type T): T =
type baseType = T.distinctBase
T(baseType.example)
proc example*(_: type StorageRequest): StorageRequest =
StorageRequest(