Move until validation to reservations module

This commit is contained in:
Arnaud 2025-02-21 17:03:09 +01:00
parent c62a023191
commit ada1a6f865
No known key found for this signature in database
GPG Key ID: 69D6CE281FCAE663
9 changed files with 215 additions and 146 deletions

View File

@ -49,32 +49,6 @@ logScope:
declareCounter(codex_api_uploads, "codex API uploads")
declareCounter(codex_api_downloads, "codex API downloads")
proc getLongestRequestEnd(
node: CodexNodeRef, availabilityId: AvailabilityId
): Future[?!SecondsSince1970] {.async: (raises: []).} =
without contracts =? node.contracts.host:
return failure("Sales unavailable")
let
reservations = contracts.sales.context.reservations
market = contracts.sales.context.market
try:
without allReservations =? await reservations.all(Reservation, availabilityId):
return failure("Cannot retrieve the reservations")
let requestEnds = allReservations.mapIt(await market.getRequestEnd(it.requestId))
if len(requestEnds) == 0:
return success(0.SecondsSince1970)
return success(requestEnds.max)
except CancelledError as err:
raise err
except CatchableError as err:
error "Error when trying to get longest request end", error = err.msg
return failure("Cannot retrieve the request dates: " & err.msg)
proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} =
0
@ -547,6 +521,7 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
## tokens) to be matched against the request's pricePerBytePerSecond
## totalCollateral - total collateral (in amount of
## tokens) that can be distributed among matching requests
try:
without contracts =? node.contracts.host:
return RestApiResponse.error(Http503, "Persistence is not enabled")
@ -595,29 +570,20 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
availability.totalCollateral = totalCollateral
if until =? restAv.until:
if until < 0:
return RestApiResponse.error(
Http400, "Until parameter must be greater or equal 0. Got: " & $until
)
without longestRequestEnd =? (await node.getLongestRequestEnd(id)).catch, err:
return RestApiResponse.error(Http500, err.msg)
if until > 0 and until < longestRequestEnd.get:
return RestApiResponse.error(
Http400,
"Until parameter must be greater or equal the current longest request.",
)
availability.until = until
if enabled =? restAv.enabled:
availability.enabled = enabled
if err =? (await reservations.update(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)
if err of CancelledError:
raise err
if err of UntilOutOfBoundsError:
return RestApiResponse.error(Http422, err.msg)
else:
return RestApiResponse.error(Http500, err.msg)
return RestApiResponse.response(Http200)
return RestApiResponse.response(Http204)
except CatchableError as exc:
trace "Excepting processing request", exc = exc.msg
return RestApiResponse.error(Http500)

View File

@ -84,6 +84,7 @@ type
size* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256
validUntil* {.serialize.}: SecondsSince1970
Reservations* = ref object of RootObj
availabilityLock: AsyncLock
@ -110,13 +111,20 @@ type
SerializationError* = object of ReservationsError
UpdateFailedError* = object of ReservationsError
BytesOutOfBoundsError* = object of ReservationsError
UntilOutOfBoundsError* = object of ReservationsError
const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet
proc hash*(x: AvailabilityId): Hash {.borrow.}
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
proc all*(
self: Reservations, T: type SomeStorableObject
): Future[?!seq[T]] {.async: (raises: [CancelledError]).}
proc all*(
self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId
): Future[?!seq[T]] {.async: (raises: [CancelledError]).}
template withLock(lock, body) =
try:
@ -166,6 +174,7 @@ proc init*(
size: UInt256,
requestId: RequestId,
slotIndex: UInt256,
validUntil: SecondsSince1970,
): Reservation =
var id: array[32, byte]
doAssert randomBytes(id) == 32
@ -175,6 +184,7 @@ proc init*(
size: size,
requestId: requestId,
slotIndex: slotIndex,
validUntil: validUntil,
)
func toArray(id: SomeStorableId): array[32, byte] =
@ -233,6 +243,10 @@ proc exists*(self: Reservations, key: Key): Future[bool] {.async.} =
let exists = await self.repo.metaDs.ds.contains(key)
return exists
iterator items(self: StorableIter): Future[?seq[byte]] =
while not self.finished:
yield self.next()
proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} =
if not await self.exists(key):
let err =
@ -268,57 +282,91 @@ proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.a
proc updateAvailability(
self: Reservations, obj: Availability
): Future[?!void] {.async.} =
): Future[?!void] {.async: (raises: [CancelledError]).} =
logScope:
availabilityId = obj.id
if obj.until < 0:
let error =
newException(UntilOutOfBoundsError, "Cannot set until to a negative value")
return failure(error)
without key =? obj.key, error:
return failure(error)
without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if obj.enabled and onAvailabilityAdded =? self.onAvailabilityAdded:
try:
without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
await onAvailabilityAdded(obj)
return res
else:
return failure(err)
if obj.until > 0:
without allReservations =? await self.all(Reservation, obj.id):
let error = newException(
GetFailedError,
"Until parameter must be greater or equal the current longest request",
)
return failure(error)
let requestEnds = allReservations.mapIt(it.validUntil)
if requestEnds.len > 0 and requestEnds.max > obj.until:
let error = newException(
UntilOutOfBoundsError,
"Until parameter must be greater or equal the current longest request",
)
return failure(error)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (
await self.repo.reserve(
(obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes
)
).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (
await self.repo.release(
(oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes
)
).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj)
if oldAvailability.freeSize < obj.freeSize: # availability added
# inform subscribers that Availability has been modified (with increased
# size)
if onAvailabilityAdded =? self.onAvailabilityAdded:
await onAvailabilityAdded(obj)
return res
else:
return failure(err)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (
await self.repo.reserve(
(obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes
)
).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (
await self.repo.release(
(oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes
)
).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError))
let res = await self.updateImpl(obj)
if obj.enabled and oldAvailability.freeSize < obj.freeSize: # availability added
# inform subscribers that Availability has been modified (with increased
# size)
if onAvailabilityAdded =? self.onAvailabilityAdded:
await onAvailabilityAdded(obj)
return res
return res
except CancelledError as e:
raise e
except CatchableError as e:
error "Error when trying to update availability", error = e.msg
return failure(e)
proc update*(self: Reservations, obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)
proc update*(self: Reservations, obj: Availability): Future[?!void] {.async.} =
withLock(self.availabilityLock):
return await self.updateAvailability(obj)
proc update*(
self: Reservations, obj: Availability
): Future[?!void] {.async: (raises: [CancelledError]).} =
try:
withLock(self.availabilityLock):
return await self.updateAvailability(obj)
except AsyncLockError as e:
error "Lock error when trying to update the availability", err = e.msg
return failure(e)
proc delete(self: Reservations, key: Key): Future[?!void] {.async.} =
trace "deleting object", key
@ -390,6 +438,11 @@ proc createAvailability*(
trace "creating availability",
size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until
if until < 0:
let error =
newException(UntilOutOfBoundsError, "Cannot set until to a negative value")
return failure(error)
let availability = Availability.init(
size, size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until
)
@ -416,6 +469,7 @@ method createReservation*(
requestId: RequestId,
slotIndex: UInt256,
collateralPerByte: UInt256,
duration: UInt256,
): Future[?!Reservation] {.async, base.} =
withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
@ -432,9 +486,13 @@ method createReservation*(
)
return failure(error)
trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex
let validUntil =
times.now().utc().toTime().toUnix() + duration.truncate(SecondsSince1970)
trace "Creating reservation",
availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
let reservation =
Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil)
if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr)
@ -563,13 +621,9 @@ proc release*(
return success()
iterator items(self: StorableIter): Future[?seq[byte]] =
while not self.finished:
yield self.next()
proc storables(
self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey
): Future[?!StorableIter] {.async.} =
): Future[?!StorableIter] {.async: (raises: [CancelledError]).} =
var iter = StorableIter()
let query = Query.init(queryKey)
when T is Availability:
@ -583,54 +637,68 @@ proc storables(
else:
raiseAssert "unknown type"
without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error)
try:
without results =? await self.repo.metaDs.ds.query(query), error:
return failure(error)
# /sales/reservations
proc next(): Future[?seq[byte]] {.async.} =
await idleAsync()
iter.finished = results.finished
if not results.finished and res =? (await results.next()) and res.data.len > 0 and
key =? res.key and key.namespaces.len == defaultKey.namespaces.len:
return some res.data
# /sales/reservations
proc next(): Future[?seq[byte]] {.async.} =
await idleAsync()
iter.finished = results.finished
if not results.finished and res =? (await results.next()) and res.data.len > 0 and
key =? res.key and key.namespaces.len == defaultKey.namespaces.len:
return some res.data
return none seq[byte]
return none seq[byte]
proc dispose(): Future[?!void] {.async.} =
return await results.dispose()
proc dispose(): Future[?!void] {.async.} =
return await results.dispose()
iter.next = next
iter.dispose = dispose
return success iter
iter.next = next
iter.dispose = dispose
return success iter
except CancelledError as e:
raise e
except CatchableError as e:
error "Cannot retrieve the storables from the datastore", error = e.msg
return failure(e)
proc allImpl(
self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey
): Future[?!seq[T]] {.async.} =
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
var ret: seq[T] = @[]
without storables =? (await self.storables(T, queryKey)), error:
return failure(error)
for storable in storables.items:
without bytes =? (await storable):
continue
try:
for storable in storables.items:
without bytes =? (await storable):
continue
without obj =? T.fromJson(bytes), error:
error "json deserialization error",
json = string.fromBytes(bytes), error = error.msg
continue
without obj =? T.fromJson(bytes), error:
error "json deserialization error",
json = string.fromBytes(bytes), error = error.msg
continue
ret.add obj
ret.add obj
except CancelledError as e:
raise e
except CatchableError as e:
error "error when retrieving storable", error = e.msg
return failure(e)
return success(ret)
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} =
proc all*(
self: Reservations, T: type SomeStorableObject
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
return await self.allImpl(T)
proc all*(
self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId
): Future[?!seq[T]] {.async.} =
without key =? (ReservationsKey / $availabilityId):
): Future[?!seq[T]] {.async: (raises: [CancelledError]).} =
without key =? key(availabilityId):
return failure("no key")
return await self.allImpl(T, key)

View File

@ -79,7 +79,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
without reservation =?
await reservations.createReservation(
availability.id, request.ask.slotSize, request.id, data.slotIndex,
request.ask.collateralPerByte,
request.ask.collateralPerByte, request.ask.duration,
), error:
trace "Creation of reservation failed"
# Race condition:

View File

@ -28,6 +28,7 @@ method createReservation*(
requestId: RequestId,
slotIndex: UInt256,
collateralPerByte: UInt256,
duration: UInt256,
): Future[?!Reservation] {.async.} =
if self.createReservationThrowBytesOutOfBoundsError:
let error = newException(
@ -45,4 +46,5 @@ method createReservation*(
requestId,
slotIndex,
collateralPerByte,
duration,
)

View File

@ -53,7 +53,7 @@ asyncchecksuite "Reservations module":
proc createReservation(availability: Availability): Reservation =
let size = rand(1 ..< availability.freeSize.truncate(int))
let reservation = waitFor reservations.createReservation(
availability.id, size.u256, RequestId.example, UInt256.example, 1.u256
availability.id, size.u256, RequestId.example, UInt256.example, 1.u256, 30.u256
)
return reservation.get
@ -135,7 +135,8 @@ asyncchecksuite "Reservations module":
test "cannot create reservation with non-existant availability":
let availability = Availability.example
let created = await reservations.createReservation(
availability.id, UInt256.example, RequestId.example, UInt256.example, 1.u256
availability.id, UInt256.example, RequestId.example, UInt256.example, 1.u256,
30.u256,
)
check created.isErr
check created.error of NotExistsError
@ -148,6 +149,7 @@ asyncchecksuite "Reservations module":
RequestId.example,
UInt256.example,
UInt256.example,
UInt256.example,
)
check created.isErr
check created.error of BytesOutOfBoundsError
@ -161,11 +163,12 @@ asyncchecksuite "Reservations module":
RequestId.example,
UInt256.example,
UInt256.example,
UInt256.example,
)
let two = reservations.createReservation(
availability.id, availability.totalSize, RequestId.example, UInt256.example,
UInt256.example,
UInt256.example, UInt256.example,
)
let oneResult = await one
@ -280,6 +283,33 @@ asyncchecksuite "Reservations module":
check availability.enabled == false
check availability.until == until
test "create an availability fails when trying set until with a negative value":
let totalSize = rand(100000 .. 200000).u256
let example = Availability.example(collateralPerByte)
let totalCollateral = totalSize * collateralPerByte
let result = await reservations.createAvailability(
totalSize,
example.duration,
example.minPricePerBytePerSecond,
totalCollateral,
enabled = true,
until = -1.SecondsSince1970,
)
check result.isErr
check result.error of UntilOutOfBoundsError
test "update an availability fails when trying set until with a negative value":
let until = getTime().toUnix()
let availability = createAvailability(until = until)
availability.until = -1
let result = await reservations.update(availability)
check result.isErr
check result.error of UntilOutOfBoundsError
test "reservation can be partially released":
let availability = createAvailability()
let reservation = createReservation(availability)

View File

@ -657,9 +657,22 @@ asyncchecksuite "Sales":
test "deletes inactive reservations on load":
createAvailability()
discard await reservations.createReservation(
availability.id, 100.u256, RequestId.example, UInt256.example, UInt256.example
availability.id, 100.u256, RequestId.example, UInt256.example, UInt256.example,
UInt256.example,
)
check (await reservations.all(Reservation)).get.len == 1
await sales.load()
check (await reservations.all(Reservation)).get.len == 0
check getAvailability().freeSize == availability.freeSize # was restored
test "update an availability fails when trying change the until date before an existing reservation":
let until = getTime().toUnix() + 300.SecondsSince1970
createAvailability(until = until)
await market.requestStorage(request)
await allowRequestToStart()
availability.until = getTime().toUnix()
let result = await reservations.update(availability)
check result.isErr
check result.error of UntilOutOfBoundsError

View File

@ -278,7 +278,8 @@ proc patchAvailability*(
enabled = enabled,
until = until,
)
doAssert response.status == "200 OK", "expected 200 OK, got " & response.status
doAssert response.status == "204 No Content",
"expected 200 OK, got " & response.status
proc getAvailabilities*(client: CodexClient): ?!seq[Availability] =
## Call sales availability REST endpoint

View File

@ -161,16 +161,15 @@ marketplacesuite "Marketplace":
check purchase.error == none string
let unixNow = getTime().toUnix()
let until = unixNow + 1
let until = unixNow + 1.SecondsSince1970
let response = host.patchAvailabilityRaw(
availabilityId = availability.id, until = cast[SecondsSince1970](until).some
)
let response =
host.patchAvailabilityRaw(availabilityId = availability.id, until = until.some)
check:
response.status == "400 Bad Request"
response.status == "422 Unprocessable Entity"
response.body ==
"Until parameter must be greater or equal the current longest request."
"Until parameter must be greater or equal the current longest request"
marketplacesuite "Marketplace payouts":
const minPricePerBytePerSecond = 1.u256

View File

@ -18,7 +18,11 @@ proc findItem[T](items: seq[T], item: T): ?!T =
multinodesuite "Sales":
let salesConfig = NodeConfigs(
clients: CodexConfigs.init(nodes = 1).some,
providers: CodexConfigs.init(nodes = 1).some,
providers: CodexConfigs.init(nodes = 1)
# .debug() # uncomment to enable console log output
# .withLogFile() # uncomment to output log file to tests/integration/logs/<start_datetime> <suite_name>/<test_name>/<node_role>_<node_idx>.log
# .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock")
.some,
)
var host: CodexClient
@ -96,6 +100,8 @@ multinodesuite "Sales":
until = until.some,
)
host.restart()
let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get
check updatedAvailability.duration == 100
check updatedAvailability.minPricePerBytePerSecond == 2
@ -172,19 +178,3 @@ multinodesuite "Sales":
(host.getAvailabilities().get).findItem(availability).get
check newUpdatedAvailability.totalSize == originalSize + 20000
check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000
test "updating availability fails with until negative", salesConfig:
let availability = host.postAvailability(
totalSize = 140000.u256,
duration = 200.u256,
minPricePerBytePerSecond = 3.u256,
totalCollateral = 300.u256,
).get
let response = host.patchAvailabilityRaw(
availability.id, until = cast[SecondsSince1970](-1).some
)
check:
response.status == "400 Bad Request"
response.body == "Until parameter must be greater or equal 0. Got: -1"