All tests that use the Reservation module updated

- add requestId and slotIndex to Reservation (hopefully these will prove to be useful when we persist Reservations until request are completed, to add back bytes to Availability)
- add querying of all reservations, with accompanying tests
- change from find to findAvailabilities
- move onCleanUp from SalesContext to SalesAgent as it was getting overwritten for each slot processed
- remove sales agent AFTER deleting reservation, as this was causing some SIGSEGVs
- retrofit testsales and testslotqueue to match updated Reservations module API
This commit is contained in:
Eric 2023-08-25 14:58:27 +10:00
parent 0e751fe27d
commit 0d6b3f862b
No known key found for this signature in database
15 changed files with 399 additions and 281 deletions

View File

@ -340,7 +340,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")
without unused =? (await contracts.sales.context.reservations.allAvailabilities), err:
without unused =? (await contracts.sales.context.reservations.all(Availability)), err:
return RestApiResponse.error(Http500, err.msg)
let json = %unused
@ -378,8 +378,8 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
), error:
return RestApiResponse.error(Http500, error.msg)
let json = %availability
return RestApiResponse.response($json, contentType="application/json")
return RestApiResponse.response(availability.toJson,
contentType="application/json")
router.api(
MethodGet,

View File

@ -105,19 +105,31 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc cleanUp(sales: Sales,
agent: SalesAgent,
processing: Future[void]) {.async.} =
await sales.remove(agent)
let data = agent.data
trace "cleaning up sales agent",
requestId = data.requestId,
slotIndex = data.slotIndex,
reservationId = data.reservation.?id |? ReservationId.default,
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
# delete reservation and return reservation bytes back to the availability
if reservation =? agent.data.reservation and
deleteErr =? (await sales.context.reservations.deleteReservation(
reservation.id,
reservation.availabilityId
)).errorOption:
error "failure deleting reservation",
error = deleteErr.msg,
reservationId = reservation.id,
availabilityId = reservation.availabilityId
await sales.remove(agent)
proc filled(sales: Sales,
processing: Future[void]) =
# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()
@ -133,8 +145,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest
)
agent.context.onCleanUp = proc {.async.} =
await sales.remove(agent)
agent.onCleanUp = proc {.async.} =
await sales.cleanUp(agent, done)
agent.context.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(done)
@ -157,6 +169,8 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
proc load*(sales: Sales) {.async.} =
let slots = await sales.mySlots()
# TODO: add slots to slotqueue, as workers need to be dispatched
for slot in slots:
let agent = newSalesAgent(
sales.context,
@ -164,7 +178,7 @@ proc load*(sales: Sales) {.async.} =
slot.slotIndex,
some slot.request)
agent.context.onCleanUp = proc {.async.} = await sales.remove(agent)
agent.onCleanUp = proc {.async.} = await sales.remove(agent)
agent.start(SaleUnknown())
sales.agents.add agent

View File

@ -49,7 +49,7 @@ type
ReservationId* = distinct array[32, byte]
SomeStorableObject = Availability | Reservation
SomeStorableId = AvailabilityId | ReservationId
Availability* = object
Availability* = ref object
id* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
duration* {.serialize.}: UInt256
@ -60,22 +60,24 @@ type
id* {.serialize.}: ReservationId
availabilityId* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
slotId* {.serialize.}: SlotId
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256
Reservations* = ref object
repo: RepoStore
onAvailabilityAdded: ?OnAvailabilityAdded
onMarkUnused: ?OnAvailabilityAdded
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
AvailabilityIter* = ref object
StorableIter* = ref object
finished*: bool
next*: GetNext
ReservationsError* = object of CodexError
AlreadyExistsError* = object of ReservationsError
ReserveFailedError* = object of ReservationsError
ReleaseFailedError* = object of ReservationsError
DeleteFailedError* = object of ReservationsError
GetFailedError* = object of ReservationsError
NotExistsError* = object of ReservationsError
SerializationError* = object of ReservationsError
UpdateFailedError* = object of ReservationsError
BytesOutOfBoundsError* = object of ReservationsError
@ -103,12 +105,13 @@ proc init*(
_: type Reservation,
availabilityId: AvailabilityId,
size: UInt256,
slotId: SlotId
requestId: RequestId,
slotIndex: UInt256
): Reservation =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, slotId: slotId)
Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex)
func toArray(id: SomeStorableId): array[32, byte] =
array[32, byte](id)
@ -119,7 +122,8 @@ proc `==`*(x, y: Reservation): bool =
x.id == y.id and
x.availabilityId == y.availabilityId and
x.size == y.size and
x.slotId == y.slotId
x.requestId == y.requestId and
x.slotIndex == y.slotIndex
proc `==`*(x, y: Availability): bool =
x.id == y.id and
x.size == y.size and
@ -179,11 +183,11 @@ proc getImpl(
key: Key): Future[?!seq[byte]] {.async.} =
if exists =? (await self.exists(key)) and not exists:
let err = newException(GetFailedError, "object with key " & $key & " does not exist")
let err = newException(NotExistsError, "object with key " & $key & " does not exist")
return failure(err)
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err.toErr(GetFailedError))
without serialized =? await self.repo.metaDs.get(key), error:
return failure(error.toErr(GetFailedError))
return success serialized
@ -192,11 +196,11 @@ proc get*(
key: Key,
T: type SomeStorableObject): Future[?!T] {.async.} =
without serialized =? await self.getImpl(key), err:
return failure(err)
without serialized =? await self.getImpl(key), error:
return failure(error)
without obj =? T.fromJson(serialized), err:
return failure(err.toErr(GetFailedError))
without obj =? T.fromJson(serialized), error:
return failure(error.toErr(SerializationError))
return success obj
@ -206,8 +210,8 @@ proc update(
trace "updating " & $(obj.type), id = obj.id, size = obj.size
without key =? obj.key, err:
return failure(err)
without key =? obj.key, error:
return failure(error)
if err =? (await self.repo.metaDs.put(
key,
@ -236,16 +240,24 @@ proc deleteReservation*(
reservationId: ReservationId,
availabilityId: AvailabilityId): Future[?!void] {.async.} =
trace "deleting reservation", reservationId, availabilityId
logScope:
reservationId
availabilityId
without key =? key(reservationId, availabilityId), err:
return failure(err)
without reservation =? (await self.get(key, Reservation)), error:
trace "deleting reservation"
without key =? key(reservationId, availabilityId), error:
return failure(error)
without reservation =? (await self.get(key, Reservation)), error:
if error of NotExistsError:
return success()
else:
return failure(error)
if reservation.size > 0.u256:
# return remaining bytes to availability
trace "returning remaining reservation bytes to availability",
size = reservation.size
without availabilityKey =? availabilityId.key, error:
return failure(error)
@ -269,18 +281,11 @@ proc createAvailability*(
minPrice: UInt256,
maxCollateral: UInt256): Future[?!Availability] {.async.} =
trace "creating availability", size, duration, minPrice, maxCollateral
let availability = Availability.init(
size, duration, minPrice, maxCollateral
)
without key =? availability.key, err:
return failure(err)
if exists =? (await self.exists(key)) and exists:
let err = newException(AlreadyExistsError,
"Availability already exists")
return failure(err)
let bytes = availability.size.truncate(uint)
if reserveErr =? (await self.repo.reserve(bytes)).errorOption:
@ -311,18 +316,13 @@ proc createReservation*(
self: Reservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
slotId: SlotId
requestId: RequestId,
slotIndex: UInt256
): Future[?!Reservation] {.async.} =
let reservation = Reservation.init(availabilityId, slotSize, slotId)
trace "creating reservation", availabilityId, slotSize, requestId, slotIndex
without key =? reservation.key, error:
return failure(error)
if exists =? (await self.exists(key)) and exists:
let err = newException(AlreadyExistsError,
"Reservation already exists")
return failure(err)
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
without availabilityKey =? availabilityId.key, error:
return failure(error)
@ -342,17 +342,8 @@ proc createReservation*(
# the newly created Reservation
availability.size -= slotSize
# remove availabilities with no reserved bytes remaining
if availability.size == 0.u256:
without key =? availability.key, error:
return failure(error)
if err =? (await self.delete(key)).errorOption:
# rollbackRelease(err)
return failure(err)
# otherwise, update availability with reduced size
elif updateErr =? (await self.update(availability)).errorOption:
# update availability with reduced size
if updateErr =? (await self.update(availability)).errorOption:
trace "rolling back reservation creation"
@ -383,11 +374,11 @@ proc release*(
trace "releasing bytes and updating reservation"
without key =? key(reservationId, availabilityId), err:
return failure(err)
without key =? key(reservationId, availabilityId), error:
return failure(error)
without var reservation =? (await self.get(key, Reservation)), err:
return failure(err)
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
if reservation.size < bytes.u256:
let error = newException(BytesOutOfBoundsError,
@ -400,8 +391,6 @@ proc release*(
reservation.size -= bytes.u256
# TODO: remove used up reservation after sales process is complete
# persist partially used Reservation with updated size
if err =? (await self.update(reservation)).errorOption:
@ -414,58 +403,75 @@ proc release*(
return success()
iterator items*(self: AvailabilityIter): Future[?Availability] =
iterator items(self: StorableIter): Future[?seq[byte]] =
while not self.finished:
yield self.next()
proc availabilities*(
self: Reservations): Future[?!AvailabilityIter] {.async.} =
proc storables(
self: Reservations,
T: type SomeStorableObject
): Future[?!StorableIter] {.async.} =
var iter = AvailabilityIter()
var iter = StorableIter()
let query = Query.init(ReservationsKey)
when T is Availability:
# should indicate key length of 4, but let the .key logic determine it
without defaultKey =? AvailabilityId.default.key, error:
return failure(error)
else:
# should indicate key length of 5, but let the .key logic determine it
without defaultKey =? key(ReservationId.default, AvailabilityId.default), error:
return failure(error)
without results =? await self.repo.metaDs.query(query), err:
return failure(err)
without results =? await self.repo.metaDs.query(query), error:
return failure(error)
proc next(): Future[?Availability] {.async.} =
proc next(): Future[?seq[byte]] {.async.} =
await idleAsync()
iter.finished = results.finished
if not results.finished and
r =? (await results.next()) and
serialized =? r.data and
serialized.len > 0:
res =? (await results.next()) and
res.data.len > 0 and
key =? res.key and
key.namespaces.len == defaultKey.namespaces.len:
return Availability.fromJson(serialized).option
return some res.data
return none Availability
return none seq[byte]
iter.next = next
return success iter
proc allAvailabilities*(r: Reservations): Future[?!seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
proc all*(
self: Reservations,
T: type SomeStorableObject
): Future[?!seq[T]] {.async.} =
without availabilities =? (await r.availabilities), err:
return failure(err)
var ret: seq[T] = @[]
for a in availabilities:
if availability =? (await a):
ret.add availability
without storables =? (await self.storables(T)), error:
return failure(error)
# NOTICE: there is a swallowed deserialization error
for storable in storables.items:
if bytes =? (await storable) and
obj =? T.fromJson(bytes):
ret.add obj
return success(ret)
proc find*(
proc findAvailability*(
self: Reservations,
size, duration, minPrice, collateral: UInt256
): Future[?Availability] {.async.} =
without availabilities =? (await self.availabilities), err:
error "failed to get all availabilities", error = err.msg
without storables =? (await self.storables(Availability)), error:
error "failed to get all storables", error = error.msg
return none Availability
for a in availabilities:
if availability =? (await a):
for item in storables.items:
if bytes =? (await item) and
availability =? Availability.fromJson(bytes):
if size <= availability.size and
duration <= availability.duration and

View File

@ -21,6 +21,10 @@ type
context*: SalesContext
data*: SalesData
subscribed: bool
onCleanUp*: OnCleanUp
OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].}
SalesAgentError = object of CodexError
AllSlotsFilledError* = object of SalesAgentError

View File

@ -16,7 +16,7 @@ type
SaleDownloading* = ref object of ErrorHandlingState
logScope:
topics = "marketplace sales downloading"
topics = "marketplace sales downloading"
method `$`*(state: SaleDownloading): string = "SaleDownloading"
@ -48,6 +48,12 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
without reservation =? data.reservation:
raiseAssert("no reservation")
logScope:
requestId = request.id
slotIndex
reservationId = reservation.id
availabilityId = reservation.availabilityId
proc onBatch(blocks: seq[bt.Block]) {.async.} =
# release batches of blocks as they are written to disk and
# update availability size
@ -68,9 +74,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
if err =? (await onStore(request,
slotIndex,
onBatch)).errorOption:
return some State(SaleErrored(error: err))
trace "Download complete"
return some State(SaleInitialProving())

View File

@ -28,6 +28,6 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
slotIndex =? data.slotIndex:
onClear(request, slotIndex)
if onCleanUp =? context.onCleanUp:
if onCleanUp =? agent.onCleanUp:
await onCleanUp()

View File

@ -33,5 +33,5 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} =
info "Slot finished and paid out", requestId = $data.requestId, slotIndex
if onCleanUp =? context.onCleanUp:
if onCleanUp =? agent.onCleanUp:
await onCleanUp()

View File

@ -16,5 +16,5 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine)
let context = agent.context
if onCleanUp =? context.onCleanUp:
if onCleanUp =? agent.onCleanUp:
await onCleanUp()

View File

@ -51,25 +51,31 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
# TODO: Once implemented, check to ensure the host is allowed to fill the slot,
# due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal)
logScope:
slotIndex = data.slotIndex
slotSize = request.ask.slotSize
duration = request.ask.duration
pricePerSlot = request.ask.pricePerSlot
# availability was checked for this slot when it entered the queue, however
# check to the ensure that there is still availability as they may have
# changed since being added (other slots may have been processed in that time)
without availability =? await reservations.find(
without availability =? await reservations.findAvailability(
request.ask.slotSize,
request.ask.duration,
request.ask.pricePerSlot,
request.ask.collateral):
info "no availability found for request, ignoring",
slotSize = request.ask.slotSize,
duration = request.ask.duration,
pricePerSlot = request.ask.pricePerSlot
info "no availability found for request, ignoring"
return some State(SaleIgnored())
info "availability found for request, creating reservation"
without reservation =? await reservations.createReservation(
availability.id,
request.ask.slotSize,
slotId
request.id,
data.slotIndex
), error:
return some State(SaleErrored(error: error))

View File

@ -103,6 +103,8 @@ proc stop*(machine: Machine) {.async.} =
if not machine.started:
return
trace "stopping state machine"
machine.started = false
await machine.trackedFutures.cancelTracked()

View File

@ -16,14 +16,12 @@ proc len*(self: TrackedFutures): int = self.futures.len
proc removeFuture(self: TrackedFutures, future: FutureBase) =
if not self.cancelling and not future.isNil:
trace "removing tracked future"
self.futures.del(future.id)
proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] =
if self.cancelling:
return fut
trace "tracking future", id = fut.id
self.futures[fut.id] = FutureBase(fut)
fut
@ -42,6 +40,8 @@ proc track*[T, U](future: Future[T], self: U): Future[T] =
proc cancelTracked*(self: TrackedFutures) {.async.} =
self.cancelling = true
trace "cancelling tracked futures"
for future in self.futures.values:
if not future.isNil and not future.finished:
trace "cancelling tracked future", id = future.id

View File

@ -60,3 +60,11 @@ proc example*(_: type Availability): Availability =
minPrice = uint64.example.u256,
maxCollateral = uint16.example.u256
)
proc example*(_: type Reservation): Reservation =
Reservation.init(
availabilityId = AvailabilityId(array[32, byte].example),
size = uint16.example.u256,
slotId = SlotId.example
)

View File

@ -1,3 +1,5 @@
import std/random
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
@ -16,33 +18,43 @@ asyncchecksuite "Reservations module":
repo: RepoStore
repoDs: Datastore
metaDs: SQLiteDatastore
availability: Availability
reservations: Reservations
setup:
randomize()
repoDs = SQLiteDatastore.new(Memory).tryGet()
metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
reservations = Reservations.new(repo)
availability = Availability.example
proc createAvailability(): Availability =
let example = Availability.example
let size = rand(100000..200000)
let availability = waitFor reservations.createAvailability(
example.size,
size.u256,
example.duration,
example.minPrice,
example.maxCollateral
)
return availability.get
proc createReservation(availability: Availability): Reservation =
let size = rand(1..<availability.size.truncate(int))
let reservation = waitFor reservations.createReservation(
availability.id,
size.u256,
RequestId.example,
UInt256.example
)
return reservation.get
test "availability can be serialised and deserialised":
let availability = Availability.example
let serialised = %availability
check Availability.fromJson(serialised).get == availability
test "has no availability initially":
check (await reservations.allAvailabilities()).get.len == 0
check (await reservations.all(Availability)).get.len == 0
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.u256, 2.u256, 3.u256, 4.u256)
@ -50,12 +62,18 @@ asyncchecksuite "Reservations module":
check availability1.id != availability2.id
test "can reserve available storage":
let availability = createAvailability()
check availability.id != AvailabilityId.default
test "creating availability reserves bytes in repo":
let orig = repo.available
let availability = createAvailability()
check repo.available == (orig.u256 - availability.size).truncate(uint)
test "can get all availabilities":
let availability1 = createAvailability()
let availability2 = createAvailability()
check availability1.id != AvailabilityId.default
check availability2.id != AvailabilityId.default
let availabilities = (await reservations.allAvailabilities()).get
let availabilities = !(await reservations.all(Availability))
check:
# perform unordered checks
availabilities.len == 2
@ -70,144 +88,189 @@ asyncchecksuite "Reservations module":
check exists
test "reserved availability can be partially released":
let size = availability.size.truncate(uint)
check isOk await reservations.create(availability)
check isOk await reservations.release(availability.id, size - 1)
test "reservation can be created":
let availability = createAvailability()
let reservation = createReservation(availability)
check reservation.id != ReservationId.default
without a =? await reservations.get(availability.id):
fail()
test "can get all reservations":
let availability1 = createAvailability()
let availability2 = createAvailability()
let reservation1 = createReservation(availability1)
let reservation2 = createReservation(availability2)
let reservations = !(await reservations.all(Reservation))
check:
# perform unordered checks
reservations.len == 2
reservations.contains(reservation1)
reservations.contains(reservation2)
check a.size == 1
test "cannot create reservation with non-existant availability":
let availability = Availability.example
let created = await reservations.createReservation(
availability.id,
UInt256.example,
RequestId.example,
UInt256.example
)
check created.isErr
check created.error of NotExistsError
test "availability is deleted after being fully released":
let size = availability.size.truncate(uint)
check isOk await reservations.create(availability)
check isOk await reservations.release(availability.id, size)
test "cannot create reservation larger than availability size":
let availability = createAvailability()
let created = await reservations.createReservation(
availability.id,
availability.size + 1,
RequestId.example,
UInt256.example
)
check created.isErr
check created.error of BytesOutOfBoundsError
without exists =? await reservations.exists(availability.id):
fail()
test "creating reservation reduces availability size":
let availability = createAvailability()
let orig = availability.size
let reservation = createReservation(availability)
let key = availability.id.key.get
let updated = (await reservations.get(key, Availability)).get
check updated.size == orig - reservation.size
check not exists
test "can check if reservation exists":
let availability = createAvailability()
let reservation = createReservation(availability)
let key = reservation.key.get
check await reservations.exists(key)
test "non-existant availability cannot be released":
let size = availability.size.truncate(uint)
let r = await reservations.release(availability.id, size - 1)
check r.error of AvailabilityGetFailedError
check r.error.msg == "Availability does not exist"
test "non-existant availability does not exist":
let key = AvailabilityId.example.key.get
check not (await reservations.exists(key))
test "added availability is not used initially":
check isOk await reservations.create(availability)
test "non-existant reservation does not exist":
let key = key(ReservationId.example, AvailabilityId.example).get
check not (await reservations.exists(key))
without available =? await reservations.get(availability.id):
fail()
test "can check if availability exists":
let availability = createAvailability()
let key = availability.key.get
check await reservations.exists(key)
check not available.used
test "can delete reservation":
let availability = createAvailability()
let reservation = createReservation(availability)
check isOk (await reservations.deleteReservation(
reservation.id, reservation.availabilityId)
)
let key = reservation.key.get
check not (await reservations.exists(key))
test "availability can be marked used":
check isOk await reservations.create(availability)
test "deleting reservation returns bytes back to availability":
let availability = createAvailability()
let orig = availability.size
let reservation = createReservation(availability)
discard await reservations.deleteReservation(
reservation.id, reservation.availabilityId
)
let key = availability.key.get
let updated = !(await reservations.get(key, Availability))
check updated.size == orig
check isOk await reservations.markUsed(availability.id)
test "reservation can be partially released":
let availability = createAvailability()
let reservation = createReservation(availability)
check isOk await reservations.release(
reservation.id,
reservation.availabilityId,
1
)
let key = reservation.key.get
let updated = !(await reservations.get(key, Reservation))
check updated.size == reservation.size - 1
without available =? await reservations.get(availability.id):
fail()
test "cannot release more bytes than size of reservation":
let availability = createAvailability()
let reservation = createReservation(availability)
let updated = await reservations.release(
reservation.id,
reservation.availabilityId,
(reservation.size + 1).truncate(uint)
)
check updated.isErr
check updated.error of BytesOutOfBoundsError
check available.used
test "cannot release bytes from non-existant reservation":
let availability = createAvailability()
let reservation = createReservation(availability)
let updated = await reservations.release(
ReservationId.example,
availability.id,
1
)
check updated.isErr
check updated.error of NotExistsError
test "availability can be marked unused":
check isOk await reservations.create(availability)
check isOk await reservations.markUsed(availability.id)
check isOk await reservations.markUnused(availability.id)
without available =? await reservations.get(availability.id):
fail()
check not available.used
test "onMarkedUnused called when availability marked unused":
var markedUnused: Availability
reservations.onMarkUnused = proc(a: Availability) {.async.} =
markedUnused = a
check isOk await reservations.create(availability)
check isOk await reservations.markUnused(availability.id)
check markedUnused == availability
test "onAdded called when availability is reserved":
test "onAvailabilityAdded called when availability is reserved":
var added: Availability
reservations.onAdded = proc(a: Availability) {.async.} =
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a
check isOk await reservations.create(availability)
let availability = createAvailability()
check added == availability
test "used availability can be found":
check isOk await reservations.create(availability)
test "availabilities can be found":
let availability = createAvailability()
check isOk await reservations.markUsed(availability.id)
let found = await reservations.findAvailability(
availability.size,
availability.duration,
availability.minPrice,
availability.maxCollateral)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, availability.maxCollateral, used = true):
check found.isSome
check found.get == availability
fail()
test "non-matching availabilities are not found":
let availability = createAvailability()
test "unused availability can be found":
check isOk await reservations.create(availability)
let found = await reservations.findAvailability(
availability.size + 1,
availability.duration,
availability.minPrice,
availability.maxCollateral)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, availability.maxCollateral, used = false):
fail()
check found.isNone
test "non-existant availability cannot be found":
check isNone (await reservations.find(availability.size,
availability.duration, availability.minPrice, availability.maxCollateral, used = false))
let availability = Availability.example
let found = (await reservations.findAvailability(
availability.size,
availability.duration,
availability.minPrice,
availability.maxCollateral
))
check found.isNone
test "non-existant availability cannot be retrieved":
let r = await reservations.get(availability.id)
check r.error of AvailabilityGetFailedError
check r.error.msg == "Availability does not exist"
test "same availability cannot be reserved twice":
check isOk await reservations.create(availability)
let r = await reservations.create(availability)
check r.error of AvailabilityAlreadyExistsError
let key = AvailabilityId.example.key.get
let got = await reservations.get(key, Availability)
check got.error of NotExistsError
test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes
test "reserving availability reduces available bytes":
check isOk await reservations.create(availability)
check reservations.available ==
DefaultQuotaBytes - availability.size.truncate(uint)
test "reports quota available to be reserved":
check reservations.hasAvailable(availability.size.truncate(uint))
check reservations.hasAvailable(DefaultQuotaBytes - 1)
test "reports quota not available to be reserved":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
check not reservations.hasAvailable(availability.size.truncate(uint))
check not reservations.hasAvailable(DefaultQuotaBytes + 1)
test "fails to reserve availability with size that is larger than available quota":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
let r = await reservations.create(availability)
check r.error of AvailabilityReserveFailedError
check r.error.parent of QuotaNotEnoughError
check exists =? (await reservations.exists(availability.id)) and not exists
test "fails to release availability size that is larger than available quota":
let size = availability.size.truncate(uint)
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = size)
reservations = Reservations.new(repo)
discard await reservations.create(availability)
let r = await reservations.release(availability.id, size + 1)
check r.error of AvailabilityReleaseFailedError
check r.error.parent.msg == "Cannot release this many bytes"
test "fails to create availability with size that is larger than available quota":
let created = await reservations.createAvailability(
(DefaultQuotaBytes + 1).u256,
UInt256.example,
UInt256.example,
UInt256.example
)
check created.isErr
check created.error of ReserveFailedError
check created.error.parent of QuotaNotEnoughError

View File

@ -15,11 +15,11 @@ import pkg/codex/sales/slotqueue
import pkg/codex/stores/repostore
import pkg/codex/blocktype as bt
import pkg/codex/node
import ../helpers
import ../helpers/mockmarket
import ../helpers/mockclock
import ../helpers/always
import ../examples
import ./helpers
asyncchecksuite "Sales - start":
let proof = exampleProof()
@ -122,11 +122,11 @@ asyncchecksuite "Sales":
var itemsProcessed: seq[SlotQueueItem]
setup:
availability = Availability.init(
size=100.u256,
duration=60.u256,
minPrice=600.u256,
maxCollateral=400.u256
availability = Availability(
size: 100.u256,
duration: 60.u256,
minPrice: 600.u256,
maxCollateral: 400.u256
)
request = StorageRequest(
ask: StorageAsk(
@ -169,8 +169,18 @@ asyncchecksuite "Sales":
await sales.stop()
await repo.stop()
proc getAvailability: ?!Availability =
waitFor reservations.get(availability.id)
proc getAvailability: Availability =
let key = availability.id.key.get
(waitFor reservations.get(key, Availability)).get
proc createAvailability() =
let a = waitFor reservations.createAvailability(
availability.size,
availability.duration,
availability.minPrice,
availability.maxCollateral
)
availability = a.get # update id
proc notProcessed(itemsProcessed: seq[SlotQueueItem],
request: StorageRequest): bool =
@ -188,7 +198,7 @@ asyncchecksuite "Sales":
var request1 = StorageRequest.example
request1.ask.collateral = request.ask.collateral + 1
discard await reservations.reserve(availability)
createAvailability()
# saturate queue
while queue.len < queue.size - 1:
await market.requestStorage(StorageRequest.example)
@ -197,11 +207,19 @@ asyncchecksuite "Sales":
await sleepAsync(5.millis) # wait for request slots to be added to queue
return request1
proc wasIgnored(): bool =
let run = proc(): Future[bool] {.async.} =
always (
getAvailability().size == availability.size and
(waitFor reservations.all(Reservation)).get.len == 0
)
waitFor run()
test "processes all request's slots once StorageRequested emitted":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually items.allIt(itemsProcessed.contains(it))
@ -232,7 +250,7 @@ asyncchecksuite "Sales":
itemsProcessed.add item
done.complete()
check isOk await reservations.create(availability)
createAvailability()
market.requested.add request # "contract" must be able to return request
market.emitSlotFreed(request.id, 2.u256)
@ -248,75 +266,69 @@ asyncchecksuite "Sales":
await market.requestStorage(request)
# now add matching availability
check isOk await reservations.reserve(availability)
createAvailability()
check eventually itemsProcessed.len == request.ask.slots.int
test "makes storage unavailable when downloading a matched request":
var used = false
test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
without avail =? await reservations.get(availability.id):
fail()
used = avail.used
let blk = bt.Block.new( @[1.byte] ).get
onBatch( blk.repeat(request.ask.slotSize.truncate(int)) )
return success()
check isOk await reservations.create(availability)
createAvailability()
let origSize = availability.size
await market.requestStorage(request)
check eventually used
check eventuallyCheck getAvailability().size == availability.size - request.ask.slotSize
test "reduces remaining availability size after download":
let blk = bt.Block.example
request.ask.slotSize = blk.data.len.u256
availability.size = request.ask.slotSize + 1
test "non-downloaded bytes are returned to availability once finished":
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await onBatch(@[blk])
let blk = bt.Block.new( @[1.byte] ).get
onBatch(@[ blk ])
return success()
check isOk await reservations.create(availability)
createAvailability()
let origSize = availability.size
await market.requestStorage(request)
check eventually getAvailability().?size == success 1.u256
await sleepAsync(1.millis)
check eventuallyCheck getAvailability().size == origSize - 1
test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check getAvailability().?size == success availability.size
check wasIgnored()
test "ignores request when slot size is too small":
availability.size = request.ask.slotSize - 1
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check getAvailability().?size == success availability.size
check wasIgnored()
test "ignores request when reward is too low":
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check getAvailability().?size == success availability.size
test "availability remains unused when request is ignored":
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.create(availability)
await market.requestStorage(request)
check getAvailability().?used == success false
check wasIgnored()
test "ignores request when asked collateral is too high":
var tooBigCollateral = request
tooBigCollateral.ask.collateral = availability.maxCollateral + 1
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(tooBigCollateral)
check getAvailability().?size == success availability.size
check wasIgnored()
test "ignores request when slot state is not free":
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
market.slotState[request.slotId(0.u256)] = SlotState.Filled
market.slotState[request.slotId(1.u256)] = SlotState.Filled
market.slotState[request.slotId(2.u256)] = SlotState.Filled
market.slotState[request.slotId(3.u256)] = SlotState.Filled
check getAvailability().?size == success availability.size
check wasIgnored()
test "retrieves and stores data locally":
var storingRequest: StorageRequest
@ -327,7 +339,7 @@ asyncchecksuite "Sales":
storingRequest = request
storingSlot = slot
return success()
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
@ -342,7 +354,7 @@ asyncchecksuite "Sales":
sales.onClear = proc(request: StorageRequest,
idx: UInt256) =
saleFailed = true
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventually saleFailed
@ -352,10 +364,9 @@ asyncchecksuite "Sales":
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return failure(error)
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventually getAvailability().?used == success false
check getAvailability().?size == success availability.size
check getAvailability().size == availability.size
test "generates proof of storage":
var provingRequest: StorageRequest
@ -363,13 +374,13 @@ asyncchecksuite "Sales":
sales.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
provingRequest = slot.request
provingSlot = slot.slotIndex
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventually provingRequest == request
check provingSlot < request.ask.slots.u256
test "fills a slot":
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventually market.filled.len > 0
check market.filled[0].requestId == request.id
@ -378,19 +389,15 @@ asyncchecksuite "Sales":
check market.filled[0].host == await market.getSigner()
test "calls onFilled when slot is filled":
var soldAvailability: Availability
var soldRequest: StorageRequest
var soldSlotIndex: UInt256
var soldRequest = StorageRequest.default
var soldSlotIndex = UInt256.high
sales.onSale = proc(request: StorageRequest,
slotIndex: UInt256) =
if a =? availability:
soldAvailability = a
soldRequest = request
soldSlotIndex = slotIndex
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventually soldAvailability == availability
check soldRequest == request
check eventuallyCheck soldRequest == request
check soldSlotIndex < request.ask.slots.u256
test "calls onClear when storage becomes available again":
@ -404,7 +411,7 @@ asyncchecksuite "Sales":
slotIndex: UInt256) =
clearedRequest = request
clearedSlotIndex = slotIndex
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventually clearedRequest == request
check clearedSlotIndex < request.ask.slots.u256
@ -416,22 +423,24 @@ asyncchecksuite "Sales":
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
check eventually (await reservations.allAvailabilities) == @[availability]
check eventuallyCheck (await reservations.all(Availability)).get == @[availability]
test "makes storage available again when request expires":
let origSize = availability.size
sales.onStore = proc(request: StorageRequest,
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
clock.set(request.expiry.truncate(int64))
check eventuallyCheck (await reservations.allAvailabilities) == @[availability]
check eventuallyCheck (await reservations.all(Availability)).get == @[availability]
check getAvailability().size == origSize
test "adds proving for slot when slot is filled":
var soldSlotIndex: UInt256
@ -439,7 +448,7 @@ asyncchecksuite "Sales":
slotIndex: UInt256) =
soldSlotIndex = slotIndex
check proving.slots.len == 0
check isOk await reservations.create(availability)
createAvailability()
await market.requestStorage(request)
check eventuallyCheck proving.slots.len == 1
check proving.slots.contains(Slot(request: request, slotIndex: soldSlotIndex))

View File

@ -1,6 +1,7 @@
import std/random
import std/sequtils
import std/times
import std/typetraits
import pkg/codex/contracts/requests
import pkg/codex/sales/slotqueue
import pkg/stint
@ -19,8 +20,9 @@ proc example*[T](_: type seq[T]): seq[T] =
proc example*(_: type UInt256): UInt256 =
UInt256.fromBytes(array[32, byte].example)
proc example*[T: RequestId | SlotId | Nonce](_: type T): T =
T(array[32, byte].example)
proc example*[T: distinct](_: type T): T =
type baseType = T.distinctBase
T(baseType.example)
proc example*(_: type StorageRequest): StorageRequest =
StorageRequest(