Add Reservation object, rename reserve > create

This commit is contained in:
Eric 2023-08-22 12:44:10 +10:00
parent a24efbdf56
commit e925927278
No known key found for this signature in database
4 changed files with 104 additions and 111 deletions

View File

@ -374,7 +374,7 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter =
if not reservations.hasAvailable(availability.size.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")
if err =? (await reservations.reserve(availability)).errorOption:
if err =? (await reservations.create(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
let json = %availability

View File

@ -6,26 +6,38 @@
## at your option.
## This file may not be copied, modified, or distributed except according to
## those terms.
##
## +--------------------------------------+
## | RESERVATION |
## +--------------------------------------+ |--------------------------------------|
## | AVAILABILITY | | ReservationId | id | PK |
## |--------------------------------------| |--------------------------------------|
## | AvailabilityId | id | PK |<-||-------o<-| AvailabilityId | availabilityId | FK |
## |--------------------------------------| |--------------------------------------|
## | UInt256 | size | | | UInt256 | size | |
## |--------------------------------------| |--------------------------------------|
## | UInt256 | duration | | | SlotId | slotId | |
## |--------------------------------------| +--------------------------------------+
## | UInt256 | minPrice | |
## |--------------------------------------|
## | UInt256 | maxCollateral | |
## +--------------------------------------+
import pkg/upraises
push: {.upraises: [].}
import std/typetraits
import pkg/chronos
import pkg/chronicles
import pkg/upraises
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/stint
import pkg/stew/byteutils
import pkg/datastore
import pkg/nimcrypto
import pkg/questionable
import pkg/questionable/results
import ../utils/json
push: {.upraises: [].}
import pkg/datastore
import pkg/stint
import pkg/stew/byteutils
import ../stores
import ../contracts/requests
import ../utils/json
export requests
@ -34,13 +46,19 @@ logScope:
type
AvailabilityId* = distinct array[32, byte]
ReservationId* = distinct array[32, byte]
Availability* = object
id* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
duration* {.serialize.}: UInt256
minPrice* {.serialize.}: UInt256
maxCollateral* {.serialize.}: UInt256
used*: bool
# used*: bool
Reservation* = object
id* {.serialize.}: ReservationId
availabilityId* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
slotId* {.serialize.}: SlotId
Reservations* = ref object
repo: RepoStore
onAdded: ?OnAvailabilityAdded
@ -78,10 +96,16 @@ proc init*(
doAssert randomBytes(id) == 32
Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice, maxCollateral: maxCollateral)
func toArray*(id: AvailabilityId): array[32, byte] =
func toArray(id: AvailabilityId | ReservationId): array[32, byte] =
array[32, byte](id)
proc `==`*(x, y: AvailabilityId): bool {.borrow.}
proc `==`*(x, y: ReservationId): bool {.borrow.}
proc `==`*(x, y: Reservation): bool =
x.id == y.id and
x.availabilityId == y.availabilityId and
x.size == y.size and
x.slotId == y.slotId
proc `==`*(x, y: Availability): bool =
x.id == y.id and
x.size == y.size and
@ -89,7 +113,7 @@ proc `==`*(x, y: Availability): bool =
x.maxCollateral == y.maxCollateral and
x.minPrice == y.minPrice
proc `$`*(id: AvailabilityId): string = id.toArray.toHex
proc `$`*(id: AvailabilityId | ReservationId): string = id.toArray.toHex
proc toErr[E1: ref CatchableError, E2: AvailabilityError](
e1: E1,
@ -100,32 +124,25 @@ proc toErr[E1: ref CatchableError, E2: AvailabilityError](
proc writeValue*(
writer: var JsonWriter,
value: AvailabilityId) {.upraises:[IOError].} =
value: AvailabilityId | ReservationId) {.upraises:[IOError].} =
## used for chronicles' logs
mixin writeValue
writer.writeValue value.toArray
proc readValue*[T: AvailabilityId](
reader: var JsonReader,
value: var T) {.upraises: [SerializationError, IOError].} =
mixin readValue
value = T reader.readValue(T.distinctBase)
writer.writeValue %value
proc `onAdded=`*(self: Reservations,
onAdded: OnAvailabilityAdded) =
self.onAdded = some onAdded
proc `onMarkUnused=`*(
self: Reservations,
onMarkUnused: OnAvailabilityAdded
) =
self.onMarkUnused = some onMarkUnused
func key(id: AvailabilityId): ?!Key =
(ReservationsKey / id.toArray.toHex)
## sales / reservations / <availabilityId>
(ReservationsKey / $id)
func key*(availability: Availability): ?!Key =
func key(reservationId: ReservationId, availabilityId: AvailabilityId): ?!Key =
## sales / reservations / <availabilityId> / <reservationId>
(availabilityId.key / $reservationId)
func key*(availability: Availability | Reservation): ?!Key =
return availability.id.key
func available*(self: Reservations): uint = self.repo.available
@ -158,7 +175,7 @@ proc get*(
without serialized =? await self.repo.metaDs.get(key), err:
return failure(err.toErr(AvailabilityGetFailedError))
without availability =? Json.decode(serialized, Availability).catch, err:
without availability =? Availability.fromJson(serialized), err:
return failure(err.toErr(AvailabilityGetFailedError))
return success availability
@ -175,7 +192,7 @@ proc update(
if err =? (await self.repo.metaDs.put(
key,
@(availability.toJson.toBytes))).errorOption:
@(($ %availability).toBytes))).errorOption:
return failure(err.toErr(AvailabilityUpdateFailedError))
return success()
@ -197,7 +214,7 @@ proc delete(
return success()
proc reserve*(
proc create*(
self: Reservations,
availability: Availability): Future[?!void] {.async.} =
@ -274,40 +291,6 @@ proc release*(
return success()
proc markUsed*(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
without var availability =? (await self.get(id)), err:
return failure(err)
availability.used = true
let r = await self.update(availability)
if r.isOk:
trace "availability marked used", id = id.toArray.toHex
return r
proc markUnused*(
self: Reservations,
id: AvailabilityId): Future[?!void] {.async.} =
without var availability =? (await self.get(id)), err:
return failure(err)
availability.used = false
let r = await self.update(availability)
if r.isOk:
trace "availability marked unused", id = id.toArray.toHex
if onMarkedUnused =? self.onMarkUnused:
try:
await onMarkedUnused(availability)
except CatchableError as e:
warn "Unknown error during 'onMarkedUnused' callback",
availabilityId = availability.id, error = e.msg
return r
iterator items*(self: AvailabilityIter): Future[?Availability] =
while not self.finished:
yield self.next()
@ -329,7 +312,7 @@ proc availabilities*(
serialized =? r.data and
serialized.len > 0:
return some Json.decode(string.fromBytes(serialized), Availability)
return Availability.fromJson(serialized).option
return none Availability

View File

@ -4,11 +4,10 @@ import pkg/questionable/results
import pkg/chronos
import pkg/asynctest
import pkg/datastore
import pkg/json_serialization
import pkg/json_serialization/std/options
import pkg/codex/stores
import pkg/codex/sales
import pkg/codex/utils/json
import ../examples
import ./helpers
@ -30,8 +29,8 @@ asyncchecksuite "Reservations module":
test "availability can be serialised and deserialised":
let availability = Availability.example
let serialised = availability.toJson
check Json.decode(serialised, Availability) == availability
let serialised = %availability
check Availability.fromJson(serialised).get == availability
test "has no availability initially":
check (await reservations.allAvailabilities()).len == 0
@ -44,8 +43,8 @@ asyncchecksuite "Reservations module":
test "can reserve available storage":
let availability1 = Availability.example
let availability2 = Availability.example
check isOk await reservations.reserve(availability1)
check isOk await reservations.reserve(availability2)
check isOk await reservations.create(availability1)
check isOk await reservations.create(availability2)
let availabilities = await reservations.allAvailabilities()
check:
@ -55,7 +54,7 @@ asyncchecksuite "Reservations module":
availabilities.contains(availability2)
test "reserved availability exists":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
without exists =? await reservations.exists(availability.id):
fail()
@ -64,7 +63,7 @@ asyncchecksuite "Reservations module":
test "reserved availability can be partially released":
let size = availability.size.truncate(uint)
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check isOk await reservations.release(availability.id, size - 1)
without a =? await reservations.get(availability.id):
@ -74,7 +73,7 @@ asyncchecksuite "Reservations module":
test "availability is deleted after being fully released":
let size = availability.size.truncate(uint)
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check isOk await reservations.release(availability.id, size)
without exists =? await reservations.exists(availability.id):
@ -89,7 +88,7 @@ asyncchecksuite "Reservations module":
check r.error.msg == "Availability does not exist"
test "added availability is not used initially":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
without available =? await reservations.get(availability.id):
fail()
@ -97,7 +96,7 @@ asyncchecksuite "Reservations module":
check not available.used
test "availability can be marked used":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check isOk await reservations.markUsed(availability.id)
@ -107,7 +106,7 @@ asyncchecksuite "Reservations module":
check available.used
test "availability can be marked unused":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check isOk await reservations.markUsed(availability.id)
check isOk await reservations.markUnused(availability.id)
@ -122,7 +121,7 @@ asyncchecksuite "Reservations module":
reservations.onMarkUnused = proc(a: Availability) {.async.} =
markedUnused = a
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check isOk await reservations.markUnused(availability.id)
check markedUnused == availability
@ -132,12 +131,12 @@ asyncchecksuite "Reservations module":
reservations.onAdded = proc(a: Availability) {.async.} =
added = a
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check added == availability
test "used availability can be found":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check isOk await reservations.markUsed(availability.id)
@ -147,7 +146,7 @@ asyncchecksuite "Reservations module":
fail()
test "unused availability can be found":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
without available =? await reservations.find(availability.size,
availability.duration, availability.minPrice, availability.maxCollateral, used = false):
@ -164,15 +163,15 @@ asyncchecksuite "Reservations module":
check r.error.msg == "Availability does not exist"
test "same availability cannot be reserved twice":
check isOk await reservations.reserve(availability)
let r = await reservations.reserve(availability)
check isOk await reservations.create(availability)
let r = await reservations.create(availability)
check r.error of AvailabilityAlreadyExistsError
test "can get available bytes in repo":
check reservations.available == DefaultQuotaBytes
test "reserving availability reduces available bytes":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
check reservations.available ==
DefaultQuotaBytes - availability.size.truncate(uint)
@ -189,7 +188,7 @@ asyncchecksuite "Reservations module":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = availability.size.truncate(uint) - 1)
reservations = Reservations.new(repo)
let r = await reservations.reserve(availability)
let r = await reservations.create(availability)
check r.error of AvailabilityReserveFailedError
check r.error.parent of QuotaNotEnoughError
check exists =? (await reservations.exists(availability.id)) and not exists
@ -199,7 +198,7 @@ asyncchecksuite "Reservations module":
repo = RepoStore.new(repoDs, metaDs,
quotaMaxBytes = size)
reservations = Reservations.new(repo)
discard await reservations.reserve(availability)
discard await reservations.create(availability)
let r = await reservations.release(availability.id, size + 1)
check r.error of AvailabilityReleaseFailedError
check r.error.parent.msg == "Cannot release this many bytes"

View File

@ -201,7 +201,7 @@ asyncchecksuite "Sales":
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
itemsProcessed.add item
done.complete()
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
check eventually items.allIt(itemsProcessed.contains(it))
@ -232,7 +232,7 @@ asyncchecksuite "Sales":
itemsProcessed.add item
done.complete()
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
market.requested.add request # "contract" must be able to return request
market.emitSlotFreed(request.id, 2.u256)
@ -261,7 +261,7 @@ asyncchecksuite "Sales":
used = avail.used
return success()
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually used
@ -274,43 +274,43 @@ asyncchecksuite "Sales":
onBatch: BatchProc): Future[?!void] {.async.} =
await onBatch(@[blk])
return success()
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually getAvailability().?size == success 1.u256
test "ignores download when duration not long enough":
availability.duration = request.ask.duration - 1
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check getAvailability().?size == success availability.size
test "ignores request when slot size is too small":
availability.size = request.ask.slotSize - 1
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check getAvailability().?size == success availability.size
test "ignores request when reward is too low":
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check getAvailability().?size == success availability.size
test "availability remains unused when request is ignored":
availability.minPrice = request.ask.pricePerSlot + 1
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check getAvailability().?used == success false
test "ignores request when asked collateral is too high":
var tooBigCollateral = request
tooBigCollateral.ask.collateral = availability.maxCollateral + 1
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(tooBigCollateral)
check getAvailability().?size == success availability.size
test "ignores request when slot state is not free":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
market.slotState[request.slotId(0.u256)] = SlotState.Filled
market.slotState[request.slotId(1.u256)] = SlotState.Filled
@ -327,7 +327,7 @@ asyncchecksuite "Sales":
storingRequest = request
storingSlot = slot
return success()
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually storingRequest == request
check storingSlot < request.ask.slots.u256
@ -342,7 +342,7 @@ asyncchecksuite "Sales":
sales.onClear = proc(request: StorageRequest,
idx: UInt256) =
saleFailed = true
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually saleFailed
@ -352,7 +352,7 @@ asyncchecksuite "Sales":
slot: UInt256,
onBatch: BatchProc): Future[?!void] {.async.} =
return failure(error)
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually getAvailability().?used == success false
check getAvailability().?size == success availability.size
@ -363,13 +363,13 @@ asyncchecksuite "Sales":
sales.onProve = proc(slot: Slot): Future[seq[byte]] {.async.} =
provingRequest = slot.request
provingSlot = slot.slotIndex
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually provingRequest == request
check provingSlot < request.ask.slots.u256
test "fills a slot":
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually market.filled.len > 0
check market.filled[0].requestId == request.id
@ -387,7 +387,7 @@ asyncchecksuite "Sales":
soldAvailability = a
soldRequest = request
soldSlotIndex = slotIndex
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually soldAvailability == availability
check soldRequest == request
@ -404,7 +404,7 @@ asyncchecksuite "Sales":
slotIndex: UInt256) =
clearedRequest = request
clearedSlotIndex = slotIndex
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventually clearedRequest == request
check clearedSlotIndex < request.ask.slots.u256
@ -416,7 +416,7 @@ asyncchecksuite "Sales":
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
for slotIndex in 0..<request.ask.slots:
market.fillSlot(request.id, slotIndex.u256, proof, otherHost)
@ -428,10 +428,21 @@ asyncchecksuite "Sales":
onBatch: BatchProc): Future[?!void] {.async.} =
await sleepAsync(chronos.hours(1))
return success()
check isOk await reservations.reserve(availability)
check isOk await reservations.create(availability)
await market.requestStorage(request)
clock.set(request.expiry.truncate(int64))
check eventually (await reservations.allAvailabilities) == @[availability]
check eventuallyCheck (await reservations.allAvailabilities) == @[availability]
test "adds proving for slot when slot is filled":
var soldSlotIndex: UInt256
sales.onSale = proc(request: StorageRequest,
slotIndex: UInt256) =
soldSlotIndex = slotIndex
check proving.slots.len == 0
check isOk await reservations.create(availability)
await market.requestStorage(request)
check eventuallyCheck proving.slots.len == 1
check proving.slots.contains(Slot(request: request, slotIndex: soldSlotIndex))
test "loads active slots from market":
let me = await market.getSigner()