chore: wip

This commit is contained in:
Adam Uhlíř 2024-04-05 16:19:33 +03:00
parent de1714ed06
commit 8a0f6f119b
No known key found for this signature in database
GPG Key ID: 1D17A9E81F76155B
8 changed files with 95 additions and 49 deletions

View File

@ -46,6 +46,27 @@ logScope:
declareCounter(codex_api_uploads, "codex API uploads")
declareCounter(codex_api_downloads, "codex API downloads")
proc getLongestRequestEnd(node: CodexNodeRef, availabilityId: AvailabilityId): ?!SecondsSince1970 =
without contracts =? node.contracts.host:
return failure("Sales unavailable")
let
reservations = contracts.sales.context.reservations
market = contracts.sales.context.market
requestEndFutures = reservations.all(Reservation, availabilityId).mapIt(market.getRequestEnd(it.requestId))
if len(requestEndFutures) == 0:
return success(0)
try:
let requestEnds = await allFutures(requestEndFutures)
return success(requestEnds.reduce(max))
except CancelledError as exc:
raise exc
except CatchableError as exc:
return failure(exc.msg)
proc validate(
pattern: string,
value: string): int
@ -276,6 +297,9 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
if restAv.totalSize == 0:
return RestApiResponse.error(Http400, "Total size must be larger then zero")
if restAv.until < 0:
return RestApiResponse.error(Http400, "Until parameter has to be positive integer")
if not reservations.hasAvailable(restAv.totalSize.truncate(uint)):
return RestApiResponse.error(Http422, "Not enough storage quota")
@ -284,7 +308,9 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
restAv.totalSize,
restAv.duration,
restAv.minPrice,
restAv.maxCollateral)
restAv.maxCollateral,
restAv.until,
restAv.enabled |? true)
), error:
return RestApiResponse.error(Http500, error.msg)
@ -350,6 +376,19 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) =
if maxCollateral =? restAv.maxCollateral:
availability.maxCollateral = maxCollateral
if enabled =? restAv.enabled:
availability.enabled = enabled
if until =? restAv.until:
if until < 0:
return RestApiResponse.error(Http400, "Until parameter must be greater or equal 0. Got: " & $until)
let longestRequestEnd = node.getLongestRequestEnd(id)
if until != 0 && until < longestRequestEnd:
return RestApiResponse.error(Http400, "Until parameter must be greater or equal the current longest request. Longest request ends at: " & $longestRequestEnd)
availability.until = until
if err =? (await reservations.update(availability)).errorOption:
return RestApiResponse.error(Http500, err.msg)

View File

@ -28,10 +28,12 @@ type
RestAvailability* = object
totalSize* {.serialize.}: UInt256
freeSize* {.serialize.}: ?UInt256
duration* {.serialize.}: UInt256
minPrice* {.serialize.}: UInt256
maxCollateral* {.serialize.}: UInt256
freeSize* {.serialize.}: ?UInt256
until* {.serialize.}: int64
enabled* {.serialize.}: ?bool
RestSalesAgent* = object
state* {.serialize.}: string

View File

@ -90,7 +90,7 @@ func new*(_: type Sales,
repo: RepoStore,
simulateProofFailures: int): Sales =
let reservations = Reservations.new(repo)
let reservations = Reservations.new(repo, clock)
Sales(
context: SalesContext(
market: market,
@ -110,7 +110,6 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc cleanUp(sales: Sales,
agent: SalesAgent,
returnBytes: bool,
processing: Future[void]) {.async.} =
let data = agent.data
@ -121,20 +120,6 @@ proc cleanUp(sales: Sales,
reservationId = data.reservation.?id |? ReservationId.default,
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
# if reservation for the SalesAgent was not created, then it means
# that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned
if returnBytes and request =? data.request and reservation =? data.reservation:
if returnErr =? (await sales.context.reservations.returnBytesToAvailability(
reservation.availabilityId,
reservation.id,
request.ask.slotSize
)).errorOption:
error "failure returning bytes",
error = returnErr.msg,
availabilityId = reservation.availabilityId,
bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability
if reservation =? data.reservation and
deleteErr =? (await sales.context.reservations.deleteReservation(
@ -176,8 +161,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest
)
agent.onCleanUp = proc (returnBytes = false) {.async.} =
await sales.cleanUp(agent, returnBytes, done)
agent.onCleanUp = proc () {.async.} =
await sales.cleanUp(agent, done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done)
@ -241,9 +226,9 @@ proc load*(sales: Sales) {.async.} =
slot.slotIndex,
some slot.request)
agent.onCleanUp = proc(returnBytes = false) {.async.} =
agent.onCleanUp = proc() {.async.} =
let done = newFuture[void]("onCleanUp_Dummy")
await sales.cleanUp(agent, returnBytes, done)
await sales.cleanUp(agent, done)
await done # completed in sales.cleanUp
agent.start(SaleUnknown())

View File

@ -55,6 +55,7 @@ type
ReservationId* = distinct array[32, byte]
SomeStorableObject = Availability | Reservation
SomeStorableId = AvailabilityId | ReservationId
Availability* = ref object
id* {.serialize.}: AvailabilityId
totalSize* {.serialize.}: UInt256
@ -62,15 +63,24 @@ type
duration* {.serialize.}: UInt256
minPrice* {.serialize.}: UInt256
maxCollateral* {.serialize.}: UInt256
# 0 means non-restricted, otherwise contains timestamp until the Availability will be renewed
until* {.serialize.}: SecondsSince1970
# false means that the availability won't be immidiatelly considered for sale
enabled* {.serialize.}: bool
Reservation* = ref object
id* {.serialize.}: ReservationId
availabilityId* {.serialize.}: AvailabilityId
size* {.serialize.}: UInt256
reservedSize* {.serialize.}: UInt256
totalSize* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256
Reservations* = ref object
repo: RepoStore
clock: Clock
onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.}
StorableIter* = ref object
@ -91,9 +101,10 @@ const
ReservationsKey = (SalesKey / "reservations").tryGet
proc new*(T: type Reservations,
repo: RepoStore): Reservations =
repo: RepoStore,
clock: Clock): Reservations =
T(repo: repo)
T(repo: repo, clock: clock)
proc init*(
_: type Availability,
@ -110,14 +121,15 @@ proc init*(
proc init*(
_: type Reservation,
availabilityId: AvailabilityId,
size: UInt256,
totalSize: UInt256,
reservedSize: UInt256,
requestId: RequestId,
slotIndex: UInt256
): Reservation =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex)
Reservation(id: ReservationId(id), availabilityId: availabilityId, totalSize: totalSize, reservedSize: reservedSize, requestId: requestId, slotIndex: slotIndex)
func toArray(id: SomeStorableId): array[32, byte] =
array[32, byte](id)
@ -168,8 +180,7 @@ proc exists*(
self: Reservations,
key: Key): Future[bool] {.async.} =
let exists = await self.repo.metaDs.contains(key)
return exists
return await self.repo.metaDs.contains(key)
proc getImpl(
self: Reservations,
@ -280,17 +291,17 @@ proc deleteReservation*(
else:
return failure(error)
if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.size
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += reservation.size
if reservation.reservedSize > 0.u256:
trace "returning remaining reservation bytes to availability",
size = reservation.reservedSize
availability.freeSize += reservation.reservedSize
if updateErr =? (await self.update(availability)).errorOption:
return failure(updateErr)
@ -305,12 +316,14 @@ proc createAvailability*(
size: UInt256,
duration: UInt256,
minPrice: UInt256,
maxCollateral: UInt256): Future[?!Availability] {.async.} =
maxCollateral: UInt256,
until: SecondsSince1970 = 0,
enabled = true): Future[?!Availability] {.async.} =
trace "creating availability", size, duration, minPrice, maxCollateral
let availability = Availability.init(
size, size, duration, minPrice, maxCollateral
size, size, duration, minPrice, maxCollateral, until, enabled
)
let bytes = availability.freeSize.truncate(uint)
@ -327,7 +340,8 @@ proc createAvailability*(
return failure(updateErr)
if onAvailabilityAdded =? self.onAvailabilityAdded:
# we won't trigger the callback if the availability is not enabled
if enabled and onAvailabilityAdded =? self.onAvailabilityAdded:
try:
await onAvailabilityAdded(availability)
except CatchableError as e:
@ -348,7 +362,7 @@ proc createReservation*(
trace "creating reservation", availabilityId, slotSize, requestId, slotIndex
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
let reservation = Reservation.init(availabilityId, slotSize, slotSize, requestId, slotIndex)
without availabilityKey =? availabilityId.key, error:
return failure(error)
@ -397,7 +411,6 @@ proc returnBytesToAvailability*(
reservationId
availabilityId
without key =? key(reservationId, availabilityId), error:
return failure(error)
@ -406,7 +419,7 @@ proc returnBytesToAvailability*(
# We are ignoring bytes that are still present in the Reservation because
# they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
let bytesToBeReturned = bytes - reservation.reservedSize
if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
@ -459,7 +472,7 @@ proc release*(
without var reservation =? (await self.get(key, Reservation)), error:
return failure(error)
if reservation.size < bytes.u256:
if reservation.reservedSize < bytes.u256:
let error = newException(
BytesOutOfBoundsError,
"trying to release an amount of bytes that is greater than the total size of the Reservation")
@ -468,7 +481,7 @@ proc release*(
if releaseErr =? (await self.repo.release(bytes)).errorOption:
return failure(releaseErr.toErr(ReleaseFailedError))
reservation.size -= bytes.u256
reservation.reservedSize -= bytes.u256
# persist partially used Reservation with updated size
if err =? (await self.update(reservation)).errorOption:

View File

@ -25,7 +25,7 @@ type
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].}
OnCleanUp* = proc (): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}

View File

@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true)
await onCleanUp()
warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex

View File

@ -72,4 +72,11 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
return some State(SaleErrored(error: err))
trace "Download complete"
if updatedReservation =? await reservations.get(reservation.id, Reservation):
if updatedReservation.size != 0:
error "After downloading the data there is unused capacity in Reservation"
else:
error "Couldn't get updated reservation"
return some State(SaleInitialProving())

View File

@ -30,5 +30,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true)
await onCleanUp()