From 8a0f6f119bc8e2a93fc71ffe4dd553c90fb3ca43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Fri, 5 Apr 2024 16:19:33 +0300 Subject: [PATCH] chore: wip --- codex/rest/api.nim | 41 +++++++++++++++++++- codex/rest/json.nim | 4 +- codex/sales.nim | 25 +++--------- codex/sales/reservations.nim | 61 ++++++++++++++++++------------ codex/sales/salesagent.nim | 2 +- codex/sales/states/cancelled.nim | 2 +- codex/sales/states/downloading.nim | 7 ++++ codex/sales/states/errored.nim | 2 +- 8 files changed, 95 insertions(+), 49 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 18f65006..7bc9d533 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -46,6 +46,27 @@ logScope: declareCounter(codex_api_uploads, "codex API uploads") declareCounter(codex_api_downloads, "codex API downloads") +proc getLongestRequestEnd(node: CodexNodeRef, availabilityId: AvailabilityId): ?!SecondsSince1970 = + without contracts =? node.contracts.host: + return failure("Sales unavailable") + + let + reservations = contracts.sales.context.reservations + market = contracts.sales.context.market + requestEndFutures = reservations.all(Reservation, availabilityId).mapIt(market.getRequestEnd(it.requestId)) + + if len(requestEndFutures) == 0: + return success(0) + + try: + let requestEnds = await allFutures(requestEndFutures) + + return success(requestEnds.reduce(max)) + except CancelledError as exc: + raise exc + except CatchableError as exc: + return failure(exc.msg) + proc validate( pattern: string, value: string): int @@ -276,6 +297,9 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = if restAv.totalSize == 0: return RestApiResponse.error(Http400, "Total size must be larger then zero") + if restAv.until < 0: + return RestApiResponse.error(Http400, "Until parameter has to be positive integer") + if not reservations.hasAvailable(restAv.totalSize.truncate(uint)): return RestApiResponse.error(Http422, "Not enough storage quota") @@ -284,7 +308,9 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = restAv.totalSize, restAv.duration, restAv.minPrice, - restAv.maxCollateral) + restAv.maxCollateral, + restAv.until, + restAv.enabled |? true) ), error: return RestApiResponse.error(Http500, error.msg) @@ -350,6 +376,19 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = if maxCollateral =? restAv.maxCollateral: availability.maxCollateral = maxCollateral + if enabled =? restAv.enabled: + availability.enabled = enabled + + if until =? restAv.until: + if until < 0: + return RestApiResponse.error(Http400, "Until parameter must be greater or equal 0. Got: " & $until) + + let longestRequestEnd = node.getLongestRequestEnd(id) + if until != 0 && until < longestRequestEnd: + return RestApiResponse.error(Http400, "Until parameter must be greater or equal the current longest request. Longest request ends at: " & $longestRequestEnd) + + availability.until = until + if err =? (await reservations.update(availability)).errorOption: return RestApiResponse.error(Http500, err.msg) diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 7fe13c32..dadaac21 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -28,10 +28,12 @@ type RestAvailability* = object totalSize* {.serialize.}: UInt256 + freeSize* {.serialize.}: ?UInt256 duration* {.serialize.}: UInt256 minPrice* {.serialize.}: UInt256 maxCollateral* {.serialize.}: UInt256 - freeSize* {.serialize.}: ?UInt256 + until* {.serialize.}: int64 + enabled* {.serialize.}: ?bool RestSalesAgent* = object state* {.serialize.}: string diff --git a/codex/sales.nim b/codex/sales.nim index 3e04228c..f9da9404 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -90,7 +90,7 @@ func new*(_: type Sales, repo: RepoStore, simulateProofFailures: int): Sales = - let reservations = Reservations.new(repo) + let reservations = Reservations.new(repo, clock) Sales( context: SalesContext( market: market, @@ -110,7 +110,6 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp(sales: Sales, agent: SalesAgent, - returnBytes: bool, processing: Future[void]) {.async.} = let data = agent.data @@ -121,20 +120,6 @@ proc cleanUp(sales: Sales, reservationId = data.reservation.?id |? ReservationId.default, availabilityId = data.reservation.?availabilityId |? AvailabilityId.default - # if reservation for the SalesAgent was not created, then it means - # that the cleanUp was called before the sales process really started, so - # there are not really any bytes to be returned - if returnBytes and request =? data.request and reservation =? data.reservation: - if returnErr =? (await sales.context.reservations.returnBytesToAvailability( - reservation.availabilityId, - reservation.id, - request.ask.slotSize - )).errorOption: - error "failure returning bytes", - error = returnErr.msg, - availabilityId = reservation.availabilityId, - bytes = request.ask.slotSize - # delete reservation and return reservation bytes back to the availability if reservation =? data.reservation and deleteErr =? (await sales.context.reservations.deleteReservation( @@ -176,8 +161,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.onCleanUp = proc (returnBytes = false) {.async.} = - await sales.cleanUp(agent, returnBytes, done) + agent.onCleanUp = proc () {.async.} = + await sales.cleanUp(agent, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = sales.filled(request, slotIndex, done) @@ -241,9 +226,9 @@ proc load*(sales: Sales) {.async.} = slot.slotIndex, some slot.request) - agent.onCleanUp = proc(returnBytes = false) {.async.} = + agent.onCleanUp = proc() {.async.} = let done = newFuture[void]("onCleanUp_Dummy") - await sales.cleanUp(agent, returnBytes, done) + await sales.cleanUp(agent, done) await done # completed in sales.cleanUp agent.start(SaleUnknown()) diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 40793e68..0582d78c 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -55,6 +55,7 @@ type ReservationId* = distinct array[32, byte] SomeStorableObject = Availability | Reservation SomeStorableId = AvailabilityId | ReservationId + Availability* = ref object id* {.serialize.}: AvailabilityId totalSize* {.serialize.}: UInt256 @@ -62,15 +63,24 @@ type duration* {.serialize.}: UInt256 minPrice* {.serialize.}: UInt256 maxCollateral* {.serialize.}: UInt256 + # 0 means non-restricted, otherwise contains timestamp until the Availability will be renewed + until* {.serialize.}: SecondsSince1970 + # false means that the availability won't be immidiatelly considered for sale + enabled* {.serialize.}: bool + Reservation* = ref object id* {.serialize.}: ReservationId availabilityId* {.serialize.}: AvailabilityId - size* {.serialize.}: UInt256 + reservedSize* {.serialize.}: UInt256 + totalSize* {.serialize.}: UInt256 requestId* {.serialize.}: RequestId slotIndex* {.serialize.}: UInt256 + Reservations* = ref object repo: RepoStore + clock: Clock onAvailabilityAdded: ?OnAvailabilityAdded + GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} StorableIter* = ref object @@ -91,9 +101,10 @@ const ReservationsKey = (SalesKey / "reservations").tryGet proc new*(T: type Reservations, - repo: RepoStore): Reservations = + repo: RepoStore, + clock: Clock): Reservations = - T(repo: repo) + T(repo: repo, clock: clock) proc init*( _: type Availability, @@ -110,14 +121,15 @@ proc init*( proc init*( _: type Reservation, availabilityId: AvailabilityId, - size: UInt256, + totalSize: UInt256, + reservedSize: UInt256, requestId: RequestId, slotIndex: UInt256 ): Reservation = var id: array[32, byte] doAssert randomBytes(id) == 32 - Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex) + Reservation(id: ReservationId(id), availabilityId: availabilityId, totalSize: totalSize, reservedSize: reservedSize, requestId: requestId, slotIndex: slotIndex) func toArray(id: SomeStorableId): array[32, byte] = array[32, byte](id) @@ -168,8 +180,7 @@ proc exists*( self: Reservations, key: Key): Future[bool] {.async.} = - let exists = await self.repo.metaDs.contains(key) - return exists + return await self.repo.metaDs.contains(key) proc getImpl( self: Reservations, @@ -280,17 +291,17 @@ proc deleteReservation*( else: return failure(error) - if reservation.size > 0.u256: + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + if reservation.reservedSize > 0.u256: trace "returning remaining reservation bytes to availability", - size = reservation.size + size = reservation.reservedSize - without availabilityKey =? availabilityId.key, error: - return failure(error) - - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) - - availability.freeSize += reservation.size + availability.freeSize += reservation.reservedSize if updateErr =? (await self.update(availability)).errorOption: return failure(updateErr) @@ -305,12 +316,14 @@ proc createAvailability*( size: UInt256, duration: UInt256, minPrice: UInt256, - maxCollateral: UInt256): Future[?!Availability] {.async.} = + maxCollateral: UInt256, + until: SecondsSince1970 = 0, + enabled = true): Future[?!Availability] {.async.} = trace "creating availability", size, duration, minPrice, maxCollateral let availability = Availability.init( - size, size, duration, minPrice, maxCollateral + size, size, duration, minPrice, maxCollateral, until, enabled ) let bytes = availability.freeSize.truncate(uint) @@ -327,7 +340,8 @@ proc createAvailability*( return failure(updateErr) - if onAvailabilityAdded =? self.onAvailabilityAdded: + # we won't trigger the callback if the availability is not enabled + if enabled and onAvailabilityAdded =? self.onAvailabilityAdded: try: await onAvailabilityAdded(availability) except CatchableError as e: @@ -348,7 +362,7 @@ proc createReservation*( trace "creating reservation", availabilityId, slotSize, requestId, slotIndex - let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + let reservation = Reservation.init(availabilityId, slotSize, slotSize, requestId, slotIndex) without availabilityKey =? availabilityId.key, error: return failure(error) @@ -397,7 +411,6 @@ proc returnBytesToAvailability*( reservationId availabilityId - without key =? key(reservationId, availabilityId), error: return failure(error) @@ -406,7 +419,7 @@ proc returnBytesToAvailability*( # We are ignoring bytes that are still present in the Reservation because # they will be returned to Availability through `deleteReservation`. - let bytesToBeReturned = bytes - reservation.size + let bytesToBeReturned = bytes - reservation.reservedSize if bytesToBeReturned == 0: trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned @@ -459,7 +472,7 @@ proc release*( without var reservation =? (await self.get(key, Reservation)), error: return failure(error) - if reservation.size < bytes.u256: + if reservation.reservedSize < bytes.u256: let error = newException( BytesOutOfBoundsError, "trying to release an amount of bytes that is greater than the total size of the Reservation") @@ -468,7 +481,7 @@ proc release*( if releaseErr =? (await self.repo.release(bytes)).errorOption: return failure(releaseErr.toErr(ReleaseFailedError)) - reservation.size -= bytes.u256 + reservation.reservedSize -= bytes.u256 # persist partially used Reservation with updated size if err =? (await self.update(reservation)).errorOption: diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 337c643c..74f988e6 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -25,7 +25,7 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} + OnCleanUp* = proc (): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 8464a61b..64acee91 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp() warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index c301ab2e..c71f0dec 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -72,4 +72,11 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} return some State(SaleErrored(error: err)) trace "Download complete" + + if updatedReservation =? await reservations.get(reservation.id, Reservation): + if updatedReservation.size != 0: + error "After downloading the data there is unused capacity in Reservation" + else: + error "Couldn't get updated reservation" + return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 51f34bc9..356cf5b5 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -30,5 +30,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true) + await onCleanUp()