refactor Reservations api to include Reservation CRUD

This commit is contained in:
Eric 2023-08-23 12:46:17 +10:00
parent e925927278
commit 0e751fe27d
No known key found for this signature in database
10 changed files with 290 additions and 151 deletions

View File

@ -340,7 +340,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Sales unavailable")
without unused =? (await contracts.sales.context.reservations.unused), err:
without unused =? (await contracts.sales.context.reservations.allAvailabilities), err:
return RestApiResponse.error(Http500, err.msg)
let json = %unused
@ -365,17 +365,18 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
return RestApiResponse.error(Http400, error.msg)
let reservations = contracts.sales.context.reservations
# assign id to availability via init
let availability = Availability.init(restAv.size,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)
if not reservations.hasAvailable(availability.size.truncate(uint)):
if not reservations.hasAvailable(restAv.size.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")
if err =? (await reservations.create(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
without availability =? (
await reservations.createAvailability(
restAv.size,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)
), error:
return RestApiResponse.error(Http500, error.msg)
let json = %availability
return RestApiResponse.response($json, contentType="application/json")

View File

@ -2,6 +2,7 @@ import std/sequtils
import std/sugar
import std/tables
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/chronicles
import pkg/datastore
@ -101,8 +102,23 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
if sales.running:
sales.agents.keepItIf(it != agent)
proc cleanUp(sales: Sales,
agent: SalesAgent,
processing: Future[void]) {.async.} =
await sales.remove(agent)
if reservation =? agent.data.reservation and
deleteErr =? (await sales.context.reservations.deleteReservation(
reservation.id,
reservation.availabilityId
)).errorOption:
error "failure deleting reservation",
reservationId = reservation.id,
availabilityId = reservation.availabilityId
proc filled(sales: Sales,
processing: Future[void]) =
processing: Future[void]) =
# signal back to the slot queue to cycle a worker
if not processing.isNil and not processing.finished():
processing.complete()
@ -387,9 +403,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
proc onAvailabilityAdded(availability: Availability) {.async.} =
await sales.onAvailabilityAdded(availability)
reservations.onAdded = onAvailabilityAdded
reservations.onMarkUnused = onAvailabilityAdded
reservations.onAvailabilityAdded = onAvailabilityAdded
proc subscribe(sales: Sales) {.async.} =
await sales.subscribeRequested()

View File

@ -42,11 +42,13 @@ import ../utils/json
export requests
logScope:
topics = "reservations"
topics = "sales reservations"
type
AvailabilityId* = distinct array[32, byte]
ReservationId* = distinct array[32, byte]
SomeStorableObject = Availability | Reservation
SomeStorableId = AvailabilityId | ReservationId
Availability* = object
id* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
@ -54,27 +56,28 @@ type
minPrice* {.serialize.}: UInt256
maxCollateral* {.serialize.}: UInt256
# used*: bool
Reservation* = object
Reservation* = ref object
id* {.serialize.}: ReservationId
availabilityId* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
slotId* {.serialize.}: SlotId
Reservations* = ref object
repo: RepoStore
onAdded: ?OnAvailabilityAdded
onAvailabilityAdded: ?OnAvailabilityAdded
onMarkUnused: ?OnAvailabilityAdded
GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.}
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
AvailabilityIter* = ref object
finished*: bool
next*: GetNext
AvailabilityError* = object of CodexError
AvailabilityAlreadyExistsError* = object of AvailabilityError
AvailabilityReserveFailedError* = object of AvailabilityError
AvailabilityReleaseFailedError* = object of AvailabilityError
AvailabilityDeleteFailedError* = object of AvailabilityError
AvailabilityGetFailedError* = object of AvailabilityError
AvailabilityUpdateFailedError* = object of AvailabilityError
ReservationsError* = object of CodexError
AlreadyExistsError* = object of ReservationsError
ReserveFailedError* = object of ReservationsError
ReleaseFailedError* = object of ReservationsError
DeleteFailedError* = object of ReservationsError
GetFailedError* = object of ReservationsError
UpdateFailedError* = object of ReservationsError
BytesOutOfBoundsError* = object of ReservationsError
const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
@ -96,7 +99,18 @@ proc init*(
doAssert randomBytes(id) == 32
Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice, maxCollateral: maxCollateral)
func toArray(id: AvailabilityId | ReservationId): array[32, byte] =
proc init*(
_: type Reservation,
availabilityId: AvailabilityId,
size: UInt256,
slotId: SlotId
): Reservation =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, slotId: slotId)
func toArray(id: SomeStorableId): array[32, byte] =
array[32, byte](id)
proc `==`*(x, y: AvailabilityId): bool {.borrow.}
@ -113,9 +127,9 @@ proc `==`*(x, y: Availability): bool =
x.maxCollateral == y.maxCollateral and
x.minPrice == y.minPrice
proc `$`*(id: AvailabilityId | ReservationId): string = id.toArray.toHex
proc `$`*(id: SomeStorableId): string = id.toArray.toHex
proc toErr[E1: ref CatchableError, E2: AvailabilityError](
proc toErr[E1: ref CatchableError, E2: ReservationsError](
e1: E1,
_: type E2,
msg: string = e1.msg): ref E2 =
@ -124,27 +138,30 @@ proc toErr[E1: ref CatchableError, E2: AvailabilityError](
proc writeValue*(
writer: var JsonWriter,
value: AvailabilityId | ReservationId) {.upraises:[IOError].} =
value: SomeStorableId) {.upraises:[IOError].} =
## used for chronicles' logs
mixin writeValue
writer.writeValue %value
proc `onAdded=`*(self: Reservations,
onAdded: OnAvailabilityAdded) =
self.onAdded = some onAdded
proc `onAvailabilityAdded=`*(self: Reservations,
onAvailabilityAdded: OnAvailabilityAdded) =
self.onAvailabilityAdded = some onAvailabilityAdded
func key(id: AvailabilityId): ?!Key =
func key*(id: AvailabilityId): ?!Key =
## sales / reservations / <availabilityId>
(ReservationsKey / $id)
func key(reservationId: ReservationId, availabilityId: AvailabilityId): ?!Key =
func key*(reservationId: ReservationId, availabilityId: AvailabilityId): ?!Key =
## sales / reservations / <availabilityId> / <reservationId>
(availabilityId.key / $reservationId)
func key*(availability: Availability | Reservation): ?!Key =
func key*(availability: Availability): ?!Key =
return availability.id.key
func key*(reservation: Reservation): ?!Key =
return key(reservation.id, reservation.availabilityId)
func available*(self: Reservations): uint = self.repo.available
func hasAvailable*(self: Reservations, bytes: uint): bool =
@ -152,84 +169,122 @@ func hasAvailable*(self: Reservations, bytes: uint): bool =
proc exists*(
self: Reservations,
id: AvailabilityId): Future[?!bool] {.async.} =
without key =? id.key, err:
return failure(err)
key: Key): Future[bool] {.async.} =
let exists = await self.repo.metaDs.contains(key)
return success(exists)
return exists
proc getImpl(
self: Reservations,
key: Key): Future[?!seq[byte]] {.async.} =
if exists =? (await self.exists(key)) and not exists:
let err = newException(GetFailedError, "object with key " & $key & " does not exist")
return failure(err)
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err.toErr(GetFailedError))
return success serialized
proc get*(
self: Reservations,
id: AvailabilityId): Future[?!Availability] {.async.} =
key: Key,
T: type SomeStorableObject): Future[?!T] {.async.} =
if exists =? (await self.exists(id)) and not exists:
let err = newException(AvailabilityGetFailedError,
"Availability does not exist")
without serialized =? await self.getImpl(key), err:
return failure(err)
without key =? id.key, err:
return failure(err.toErr(AvailabilityGetFailedError))
without obj =? T.fromJson(serialized), err:
return failure(err.toErr(GetFailedError))
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err.toErr(AvailabilityGetFailedError))
without availability =? Availability.fromJson(serialized), err:
return failure(err.toErr(AvailabilityGetFailedError))
return success availability
return success obj
proc update(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
obj: SomeStorableObject): Future[?!void] {.async.} =
trace "updating availability", id = availability.id, size = availability.size,
used = availability.used
trace "updating " & $(obj.type), id = obj.id, size = obj.size
without key =? availability.key, err:
without key =? obj.key, err:
return failure(err)
if err =? (await self.repo.metaDs.put(
key,
@(($ %availability).toBytes))).errorOption:
return failure(err.toErr(AvailabilityUpdateFailedError))
@(obj.toJson.toBytes)
)).errorOption:
return failure(err.toErr(UpdateFailedError))
return success()
proc delete(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
key: Key): Future[?!void] {.async.} =
trace "deleting availability", id
trace "deleting object", key
without availability =? (await self.get(id)), err:
return failure(err)
without key =? availability.key, err:
return failure(err)
if exists =? (await self.exists(key)) and not exists:
return success()
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(AvailabilityDeleteFailedError))
return failure(err.toErr(DeleteFailedError))
return success()
proc create*(
proc deleteReservation*(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
reservationId: ReservationId,
availabilityId: AvailabilityId): Future[?!void] {.async.} =
if exists =? (await self.exists(availability.id)) and exists:
let err = newException(AvailabilityAlreadyExistsError,
"Availability already exists")
trace "deleting reservation", reservationId, availabilityId
without key =? key(reservationId, availabilityId), err:
return failure(err)
without reservation =? (await self.get(key, Reservation)), error:
return failure(error)
if reservation.size > 0.u256:
# return remaining bytes to availability
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.size += reservation.size
if updateErr =? (await self.update(availability)).errorOption:
return failure(updateErr)
if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError))
return success()
proc createAvailability*(
self: Reservations,
size: UInt256,
duration: UInt256,
minPrice: UInt256,
maxCollateral: UInt256): Future[?!Availability] {.async.} =
let availability = Availability.init(
size, duration, minPrice, maxCollateral
)
without key =? availability.key, err:
return failure(err)
if exists =? (await self.exists(key)) and exists:
let err = newException(AlreadyExistsError,
"Availability already exists")
return failure(err)
let bytes = availability.size.truncate(uint)
if reserveErr =? (await self.repo.reserve(bytes)).errorOption:
return failure(reserveErr.toErr(AvailabilityReserveFailedError))
return failure(reserveErr.toErr(ReserveFailedError))
if updateErr =? (await self.update(availability)).errorOption:
@ -241,52 +296,120 @@ proc create*(
return failure(updateErr)
if onAdded =? self.onAdded:
if onAvailabilityAdded =? self.onAvailabilityAdded:
try:
await onAdded(availability)
await onAvailabilityAdded(availability)
except CatchableError as e:
# we don't have any insight into types of errors that `onProcessSlot` can
# throw because it is caller-defined
warn "Unknown error during 'onAdded' callback",
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = availability.id, error = e.msg
return success()
return success(availability)
proc createReservation*(
self: Reservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
slotId: SlotId
): Future[?!Reservation] {.async.} =
let reservation = Reservation.init(availabilityId, slotSize, slotId)
without key =? reservation.key, error:
return failure(error)
if exists =? (await self.exists(key)) and exists:
let err = newException(AlreadyExistsError,
"Reservation already exists")
return failure(err)
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
if availability.size < slotSize:
let error = newException(BytesOutOfBoundsError, "trying to reserve an " &
"amount of bytes that is greater than the total size of the Availability")
return failure(error)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
# reduce availability size by the slot size, which is now accounted for in
# the newly created Reservation
availability.size -= slotSize
# remove availabilities with no reserved bytes remaining
if availability.size == 0.u256:
without key =? availability.key, error:
return failure(error)
if err =? (await self.delete(key)).errorOption:
# rollbackRelease(err)
return failure(err)
# otherwise, update availability with reduced size
elif updateErr =? (await self.update(availability)).errorOption:
trace "rolling back reservation creation"
without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
# rollback the reservation creation
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success(reservation)
proc release*(
self: Reservations,
id: AvailabilityId,
reservationId: ReservationId,
availabilityId: AvailabilityId,
bytes: uint): Future[?!void] {.async.} =
trace "releasing bytes and updating availability", bytes, id
logScope:
topics = "release"
bytes
reservationId
availabilityId
without var availability =? (await self.get(id)), err:
trace "releasing bytes and updating reservation"
without key =? key(reservationId, availabilityId), err:
return failure(err)
without key =? id.key, err:
without var reservation =? (await self.get(key, Reservation)), err:
return failure(err)
if reservation.size < bytes.u256:
let error = newException(BytesOutOfBoundsError,
"trying to release an amount of bytes that is greater than the total " &
"size of the Reservation")
return failure(error)
if releaseErr =? (await self.repo.release(bytes)).errorOption:
return failure(releaseErr.toErr(AvailabilityReleaseFailedError))
return failure(releaseErr.toErr(ReleaseFailedError))
availability.size = (availability.size.truncate(uint) - bytes).u256
reservation.size -= bytes.u256
template rollbackRelease(e: ref CatchableError) =
# TODO: remove used up reservation after sales process is complete
# persist partially used Reservation with updated size
if err =? (await self.update(reservation)).errorOption:
# rollback release if an update error encountered
trace "rolling back release"
if rollbackErr =? (await self.repo.reserve(bytes)).errorOption:
rollbackErr.parent = e
rollbackErr.parent = err
return failure(rollbackErr)
# remove completely used availabilities
if availability.size == 0.u256:
if err =? (await self.delete(availability.id)).errorOption:
rollbackRelease(err)
return failure(err)
return success()
# persist partially used availability with updated size
if err =? (await self.update(availability)).errorOption:
rollbackRelease(err)
return failure(err)
return success()
@ -319,22 +442,22 @@ proc availabilities*(
iter.next = next
return success iter
proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} =
proc allAvailabilities*(r: Reservations): Future[?!seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
without availabilities =? (await r.availabilities), err:
return failure(err)
for a in availabilities:
if availability =? (await a) and not availability.used:
if availability =? (await a):
ret.add availability
return success(ret)
proc find*(
self: Reservations,
size, duration, minPrice, collateral: UInt256,
used: bool): Future[?Availability] {.async.} =
size, duration, minPrice, collateral: UInt256
): Future[?Availability] {.async.} =
without availabilities =? (await self.availabilities), err:
@ -344,14 +467,12 @@ proc find*(
for a in availabilities:
if availability =? (await a):
if used == availability.used and
size <= availability.size and
duration <= availability.duration and
collateral <= availability.maxCollateral and
minPrice >= availability.minPrice:
if size <= availability.size and
duration <= availability.duration and
collateral <= availability.maxCollateral and
minPrice >= availability.minPrice:
trace "availability matched",
used, availUsed = availability.used,
size, availsize = availability.size,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,
@ -360,7 +481,6 @@ proc find*(
return some availability
trace "availiability did not match",
used, availUsed = availability.used,
size, availsize = availability.size,
duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice,

View File

@ -10,3 +10,4 @@ type
request*: ?StorageRequest
slotIndex*: UInt256
cancelled*: Future[void]
reservation*: ?Reservation

View File

@ -14,7 +14,6 @@ import ./errored
type
SaleDownloading* = ref object of ErrorHandlingState
availability*: Availability
logScope:
topics = "marketplace sales downloading"
@ -36,7 +35,6 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
let data = agent.data
let context = agent.context
let reservations = context.reservations
let availability = state.availability
without onStore =? context.onStore:
raiseAssert "onStore callback not set"
@ -47,9 +45,8 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
without slotIndex =? data.slotIndex:
raiseAssert("no slot index assigned")
# mark availability as used so that it is not matched to other requests
if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption:
return some State(SaleErrored(error: markUsedErr))
without reservation =? data.reservation:
raiseAssert("no reservation")
proc onBatch(blocks: seq[bt.Block]) {.async.} =
# release batches of blocks as they are written to disk and
@ -59,25 +56,21 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
bytes += blk.data.len.uint
trace "Releasing batch of bytes written to disk", bytes
let r = await reservations.release(availability.id, bytes)
let r = await reservations.release(reservation.id,
reservation.availabilityId,
bytes)
# `tryGet` will raise the exception that occurred during release, if there
# was one. The exception will be caught in the closure and sent to the
# SaleErrored state.
r.tryGet()
template markUnused(id: AvailabilityId) =
if markUnusedErr =? (await reservations.markUnused(id)).errorOption:
return some State(SaleErrored(error: markUnusedErr))
trace "Starting download"
if err =? (await onStore(request,
slotIndex,
onBatch)).errorOption:
markUnused(availability.id)
return some State(SaleErrored(error: err))
trace "Download complete"
markUnused(availability.id)
return some State(SaleInitialProving())

View File

@ -10,6 +10,7 @@ import ./failed
import ./filled
import ./ignored
import ./downloading
import ./errored
type
SalePreparing* = ref object of ErrorHandlingState
@ -57,13 +58,20 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
request.ask.slotSize,
request.ask.duration,
request.ask.pricePerSlot,
request.ask.collateral,
used = false):
request.ask.collateral):
info "no availability found for request, ignoring",
slotSize = request.ask.slotSize,
duration = request.ask.duration,
pricePerSlot = request.ask.pricePerSlot,
used = false
pricePerSlot = request.ask.pricePerSlot
return some State(SaleIgnored())
return some State(SaleDownloading(availability: availability))
without reservation =? await reservations.createReservation(
availability.id,
request.ask.slotSize,
slotId
), error:
return some State(SaleErrored(error: error))
data.reservation = some reservation
return some State(SaleDownloading())

View File

@ -234,6 +234,13 @@ proc fromJson*[T: object](
let json = ?catch parseJson(string.fromBytes(bytes))
T.fromJson(json)
proc fromJson*[T: ref object](
_: type T,
bytes: seq[byte]
): ?!T =
let json = ?catch parseJson(string.fromBytes(bytes))
T.fromJson(json)
func `%`*(s: string): JsonNode = newJString(s)
func `%`*(n: uint): JsonNode =
@ -307,6 +314,9 @@ func `%`*[T: distinct](id: T): JsonNode =
type baseType = T.distinctBase
% baseType(id)
func toJson*(obj: object): string = $(%obj)
func toJson*(obj: ref object): string = $(%obj)
proc toJsnImpl(x: NimNode): NimNode =
case x.kind
of nnkBracket: # array

View File

@ -1,17 +0,0 @@
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/codex/sales/reservations
import ../helpers
export checktest
proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} =
var ret: seq[Availability] = @[]
without availabilities =? (await r.availabilities), err:
raiseAssert "failed to get availabilities, error: " & err.msg
for a in availabilities:
if availability =? (await a):
ret.add availability
return ret

View File

@ -1,6 +1,5 @@
import pkg/questionable
import pkg/questionable/results
import pkg/chronos
import pkg/asynctest
import pkg/datastore
@ -10,7 +9,7 @@ import pkg/codex/sales
import pkg/codex/utils/json
import ../examples
import ./helpers
import ../helpers
asyncchecksuite "Reservations module":
var
@ -27,13 +26,23 @@ asyncchecksuite "Reservations module":
reservations = Reservations.new(repo)
availability = Availability.example
proc createAvailability(): Availability =
let example = Availability.example
let availability = waitFor reservations.createAvailability(
example.size,
example.duration,
example.minPrice,
example.maxCollateral
)
return availability.get
test "availability can be serialised and deserialised":
let availability = Availability.example
let serialised = %availability
check Availability.fromJson(serialised).get == availability
test "has no availability initially":
check (await reservations.allAvailabilities()).len == 0
check (await reservations.allAvailabilities()).get.len == 0
test "generates unique ids for storage availability":
let availability1 = Availability.init(1.u256, 2.u256, 3.u256, 4.u256)
@ -41,12 +50,12 @@ asyncchecksuite "Reservations module":
check availability1.id != availability2.id
test "can reserve available storage":
let availability1 = Availability.example
let availability2 = Availability.example
check isOk await reservations.create(availability1)
check isOk await reservations.create(availability2)
let availability1 = createAvailability()
let availability2 = createAvailability()
check availability1.id != AvailabilityId.default
check availability2.id != AvailabilityId.default
let availabilities = await reservations.allAvailabilities()
let availabilities = (await reservations.allAvailabilities()).get
check:
# perform unordered checks
availabilities.len == 2
@ -54,9 +63,9 @@ asyncchecksuite "Reservations module":
availabilities.contains(availability2)
test "reserved availability exists":
check isOk await reservations.create(availability)
let availability = createAvailability()
without exists =? await reservations.exists(availability.id):
without exists =? await reservations.exists(availability.key.get):
fail()
check exists

View File

@ -211,7 +211,7 @@ checksuite "json serialization":
},
"expiry": "1691545330"
}""".flatten
check $(%request) == expected
check request.toJson == expected
test "deserializes UInt256 from non-hex string representation":
let json = newJString("100000")