From 7deeb7d2b34862889e5bc30e31e44709ca60ff9f Mon Sep 17 00:00:00 2001 From: Arnaud Date: Wed, 26 Mar 2025 12:45:22 +0100 Subject: [PATCH] feat(marketplace): persistent availabilities (#1099) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Add availability enabled parameter * Return bytes to availability when finished * Add until parameter * Remove debug message * Clean up and fix tests * Update documentations and cleanup * Avoid swallowing CancelledError * Move until validation to reservations module * Call onAvailabilityAdded callabck when the availability is enabled in sales * Remove until validation in restapi when creating an availability * Add openapi documentation * Use results instead of stew/results (#1112) * feat: request duration limit (#1057) * feat: request duration limit * Fix tests and duration type * Add custom error * Remove merge issue * Update codex contracts eth * Update market config and fix test * Fix SlotReservationsConfig syntax * Update dependencies * test: remove doubled test * chore: update contracts repo --------- Co-authored-by: Arnaud * fix(statemachine): do not raise from state.run (#1115) * fix(statemachine): do not raise from state.run * fix rebase * fix exception handling in SaleProvingSimulated.prove - re-raise CancelledError - don't return State on CatchableError - expect the Proofs_InvalidProof custom error instead of checking a string * asyncSpawn salesagent.onCancelled This was swallowing a KeyError in one of the tests (fixed in the previous commit) * remove error handling states in asyncstatemachine * revert unneeded changes * formatting * PR feedback, logging updates * chore(integration): simplify block expiration integration test (#1100) * chore(integration): simplify block expiration integration test * clean up * fix after rebase * perf: contract storage optimizations (#1094) * perf: contract storage optimizations * Apply optimization changes * Apply optimizing parameters sizing * Update codex-contracts-eth * bump latest changes in contracts branch * Change requestDurationLimit to uint64 * fix tests * fix tests --------- Co-authored-by: Arnaud Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> * bump contracts to master (#1122) * Add availability enabled parameter * Return bytes to availability when finished * Add until parameter * Clean up and fix tests * Move until validation to reservations module * Apply suggestion changes: return the reservation module error * Apply suggestion changes for until dates * Apply suggestion changes: reorganize tests * Fix indent * Remove test related to timing issue * Add raises errors to async pragram and remove useless try except * Update open api documentation * Fix wording * Remove the httpClient restart statements * Use market.getRequestEnd to set validUntil * Remove returnBytes * Use clock.now in testing * Move the api validation file to the right file --------- Co-authored-by: Adam Uhlíř Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> --- codex/rest/api.nim | 29 ++- codex/rest/json.nim | 2 + codex/sales.nim | 20 +- codex/sales/reservations.nim | 195 ++++++++++++------ codex/sales/salesagent.nim | 2 +- codex/sales/states/cancelled.nim | 4 +- codex/sales/states/errored.nim | 2 +- codex/sales/states/filling.nim | 2 +- codex/sales/states/finished.nim | 3 + codex/sales/states/ignored.nim | 5 +- codex/sales/states/preparing.nim | 8 +- codex/sales/states/slotreserving.nim | 4 +- codex/stores/repostore/operations.nim | 2 +- codex/stores/repostore/store.nim | 8 +- openapi.yaml | 13 +- tests/codex/examples.nim | 2 + tests/codex/helpers/mockreservations.nim | 3 + tests/codex/sales/states/testcancelled.nim | 7 +- tests/codex/sales/states/testerrored.nim | 7 +- tests/codex/sales/states/testfilling.nim | 1 - tests/codex/sales/states/testfinished.nim | 11 +- tests/codex/sales/states/testignored.nim | 7 +- tests/codex/sales/states/testpreparing.nim | 22 +- .../codex/sales/states/testslotreserving.nim | 1 - tests/codex/sales/testreservations.nim | 161 +++++++++++++-- tests/codex/sales/testsales.nim | 64 +++++- tests/integration/codexclient.nim | 29 ++- tests/integration/testmarketplace.nim | 2 + tests/integration/testproofs.nim | 4 +- tests/integration/testrestapi.nim | 1 + tests/integration/testrestapivalidation.nim | 16 ++ tests/integration/testsales.nim | 97 +++++++-- vendor/nim-datastore | 2 +- 33 files changed, 564 insertions(+), 172 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 243d4ed6..ee493e03 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -484,10 +484,19 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = without availability =? ( await reservations.createAvailability( - restAv.totalSize, restAv.duration, restAv.minPricePerBytePerSecond, + restAv.totalSize, + restAv.duration, + restAv.minPricePerBytePerSecond, restAv.totalCollateral, + enabled = restAv.enabled |? true, + until = restAv.until |? 0, ) ), error: + if error of CancelledError: + raise error + if error of UntilOutOfBoundsError: + return RestApiResponse.error(Http422, error.msg) + return RestApiResponse.error(Http500, error.msg, headers = headers) return RestApiResponse.response( @@ -524,6 +533,7 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = ## tokens) to be matched against the request's pricePerBytePerSecond ## totalCollateral - total collateral (in amount of ## tokens) that can be distributed among matching requests + try: without contracts =? node.contracts.host: return RestApiResponse.error(Http503, "Persistence is not enabled") @@ -577,10 +587,21 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = if totalCollateral =? restAv.totalCollateral: availability.totalCollateral = totalCollateral - if err =? (await reservations.update(availability)).errorOption: - return RestApiResponse.error(Http500, err.msg) + if until =? restAv.until: + availability.until = until - return RestApiResponse.response(Http200) + if enabled =? restAv.enabled: + availability.enabled = enabled + + if err =? (await reservations.update(availability)).errorOption: + if err of CancelledError: + raise err + if err of UntilOutOfBoundsError: + return RestApiResponse.error(Http422, err.msg) + else: + return RestApiResponse.error(Http500, err.msg) + + return RestApiResponse.response(Http204) except CatchableError as exc: trace "Excepting processing request", exc = exc.msg return RestApiResponse.error(Http500) diff --git a/codex/rest/json.nim b/codex/rest/json.nim index 50c8b514..1b9459c1 100644 --- a/codex/rest/json.nim +++ b/codex/rest/json.nim @@ -33,6 +33,8 @@ type minPricePerBytePerSecond* {.serialize.}: UInt256 totalCollateral* {.serialize.}: UInt256 freeSize* {.serialize.}: ?uint64 + enabled* {.serialize.}: ?bool + until* {.serialize.}: ?SecondsSince1970 RestSalesAgent* = object state* {.serialize.}: string diff --git a/codex/sales.nim b/codex/sales.nim index a4a174c1..37e2c06a 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -113,7 +113,6 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = proc cleanUp( sales: Sales, agent: SalesAgent, - returnBytes: bool, reprocessSlot: bool, returnedCollateral: ?UInt256, processing: Future[void], @@ -132,7 +131,7 @@ proc cleanUp( # if reservation for the SalesAgent was not created, then it means # that the cleanUp was called before the sales process really started, so # there are not really any bytes to be returned - if returnBytes and request =? data.request and reservation =? data.reservation: + if request =? data.request and reservation =? data.reservation: if returnErr =? ( await sales.context.reservations.returnBytesToAvailability( reservation.availabilityId, reservation.id, request.ask.slotSize @@ -203,9 +202,9 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = newSalesAgent(sales.context, item.requestId, item.slotIndex, none StorageRequest) agent.onCleanUp = proc( - returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none + reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = - await sales.cleanUp(agent, returnBytes, reprocessSlot, returnedCollateral, done) + await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done) agent.onFilled = some proc(request: StorageRequest, slotIndex: uint64) = sales.filled(request, slotIndex, done) @@ -271,12 +270,12 @@ proc load*(sales: Sales) {.async.} = newSalesAgent(sales.context, slot.request.id, slot.slotIndex, some slot.request) agent.onCleanUp = proc( - returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none + reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = # since workers are not being dispatched, this future has not been created # by a worker. Create a dummy one here so we can call sales.cleanUp let done: Future[void] = nil - await sales.cleanUp(agent, returnBytes, reprocessSlot, returnedCollateral, done) + await sales.cleanUp(agent, reprocessSlot, returnedCollateral, done) # There is no need to assign agent.onFilled as slots loaded from `mySlots` # are inherently already filled and so assigning agent.onFilled would be @@ -285,7 +284,9 @@ proc load*(sales: Sales) {.async.} = agent.start(SaleUnknown()) sales.agents.add agent -proc OnAvailabilitySaved(sales: Sales, availability: Availability) {.async.} = +proc OnAvailabilitySaved( + sales: Sales, availability: Availability +) {.async: (raises: []).} = ## When availabilities are modified or added, the queue should be unpaused if ## it was paused and any slots in the queue should have their `seen` flag ## cleared. @@ -533,8 +534,9 @@ proc startSlotQueue(sales: Sales) = slotQueue.start() - proc OnAvailabilitySaved(availability: Availability) {.async.} = - await sales.OnAvailabilitySaved(availability) + proc OnAvailabilitySaved(availability: Availability) {.async: (raises: []).} = + if availability.enabled: + await sales.OnAvailabilitySaved(availability) reservations.OnAvailabilitySaved = OnAvailabilitySaved diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 25ee2b99..b717cc1c 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -35,6 +35,7 @@ import std/sequtils import std/sugar import std/typetraits import std/sequtils +import std/times import pkg/chronos import pkg/datastore import pkg/nimcrypto @@ -70,6 +71,12 @@ type minPricePerBytePerSecond* {.serialize.}: UInt256 totalCollateral {.serialize.}: UInt256 totalRemainingCollateral* {.serialize.}: UInt256 + # If set to false, the availability will not accept new slots. + # If enabled, it will not impact any existing slots that are already being hosted. + enabled* {.serialize.}: bool + # Specifies the latest timestamp after which the availability will no longer host any slots. + # If set to 0, there will be no restrictions. + until* {.serialize.}: SecondsSince1970 Reservation* = ref object id* {.serialize.}: ReservationId @@ -77,6 +84,7 @@ type size* {.serialize.}: uint64 requestId* {.serialize.}: RequestId slotIndex* {.serialize.}: uint64 + validUntil* {.serialize.}: SecondsSince1970 Reservations* = ref object of RootObj availabilityLock: AsyncLock @@ -84,10 +92,14 @@ type repo: RepoStore OnAvailabilitySaved: ?OnAvailabilitySaved - GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} - IterDispose* = proc(): Future[?!void] {.gcsafe, closure.} - OnAvailabilitySaved* = - proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} + GetNext* = proc(): Future[?seq[byte]] {. + upraises: [], gcsafe, async: (raises: [CancelledError]), closure + .} + IterDispose* = + proc(): Future[?!void] {.gcsafe, async: (raises: [CancelledError]), closure.} + OnAvailabilitySaved* = proc(availability: Availability): Future[void] {. + upraises: [], gcsafe, async: (raises: []) + .} StorableIter* = ref object finished*: bool next*: GetNext @@ -102,13 +114,20 @@ type SerializationError* = object of ReservationsError UpdateFailedError* = object of ReservationsError BytesOutOfBoundsError* = object of ReservationsError + UntilOutOfBoundsError* = object of ReservationsError const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet proc hash*(x: AvailabilityId): Hash {.borrow.} -proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} +proc all*( + self: Reservations, T: type SomeStorableObject +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} + +proc all*( + self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} template withLock(lock, body) = try: @@ -128,6 +147,8 @@ proc init*( duration: uint64, minPricePerBytePerSecond: UInt256, totalCollateral: UInt256, + enabled: bool, + until: SecondsSince1970, ): Availability = var id: array[32, byte] doAssert randomBytes(id) == 32 @@ -139,6 +160,8 @@ proc init*( minPricePerBytePerSecond: minPricePerBytePerSecond, totalCollateral: totalCollateral, totalRemainingCollateral: totalCollateral, + enabled: enabled, + until: until, ) func totalCollateral*(self: Availability): UInt256 {.inline.} = @@ -154,6 +177,7 @@ proc init*( size: uint64, requestId: RequestId, slotIndex: uint64, + validUntil: SecondsSince1970, ): Reservation = var id: array[32, byte] doAssert randomBytes(id) == 32 @@ -163,6 +187,7 @@ proc init*( size: size, requestId: requestId, slotIndex: slotIndex, + validUntil: validUntil, ) func toArray(id: SomeStorableId): array[32, byte] = @@ -217,11 +242,19 @@ func available*(self: Reservations): uint = func hasAvailable*(self: Reservations, bytes: uint): bool = self.repo.available(bytes.NBytes) -proc exists*(self: Reservations, key: Key): Future[bool] {.async.} = +proc exists*( + self: Reservations, key: Key +): Future[bool] {.async: (raises: [CancelledError]).} = let exists = await self.repo.metaDs.ds.contains(key) return exists -proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} = +iterator items(self: StorableIter): Future[?seq[byte]] = + while not self.finished: + yield self.next() + +proc getImpl( + self: Reservations, key: Key +): Future[?!seq[byte]] {.async: (raises: [CancelledError]).} = if not await self.exists(key): let err = newException(NotExistsError, "object with key " & $key & " does not exist") @@ -234,7 +267,7 @@ proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} = proc get*( self: Reservations, key: Key, T: type SomeStorableObject -): Future[?!T] {.async.} = +): Future[?!T] {.async: (raises: [CancelledError]).} = without serialized =? await self.getImpl(key), error: return failure(error) @@ -243,7 +276,9 @@ proc get*( return success obj -proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.async.} = +proc updateImpl( + self: Reservations, obj: SomeStorableObject +): Future[?!void] {.async: (raises: [CancelledError]).} = trace "updating " & $(obj.type), id = obj.id without key =? obj.key, error: @@ -256,10 +291,15 @@ proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.a proc updateAvailability( self: Reservations, obj: Availability -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: availabilityId = obj.id + if obj.until < 0: + let error = + newException(UntilOutOfBoundsError, "Cannot set until to a negative value") + return failure(error) + without key =? obj.key, error: return failure(error) @@ -269,21 +309,25 @@ proc updateAvailability( let res = await self.updateImpl(obj) # inform subscribers that Availability has been added if OnAvailabilitySaved =? self.OnAvailabilitySaved: - # when chronos v4 is implemented, and OnAvailabilitySaved is annotated - # with async:(raises:[]), we can remove this try/catch as we know, with - # certainty, that nothing will be raised - try: - await OnAvailabilitySaved(obj) - except CancelledError as e: - raise e - except CatchableError as e: - # we don't have any insight into types of exceptions that - # `OnAvailabilitySaved` can raise because it is caller-defined - warn "Unknown error during 'OnAvailabilitySaved' callback", error = e.msg + await OnAvailabilitySaved(obj) return res else: return failure(err) + if obj.until > 0: + without allReservations =? await self.all(Reservation, obj.id), error: + error.msg = "Error updating reservation: " & error.msg + return failure(error) + + let requestEnds = allReservations.mapIt(it.validUntil) + + if requestEnds.len > 0 and requestEnds.max > obj.until: + let error = newException( + UntilOutOfBoundsError, + "Until parameter must be greater or equal to the longest currently hosted slot", + ) + return failure(error) + # Sizing of the availability changed, we need to adjust the repo reservation accordingly if oldAvailability.totalSize != obj.totalSize: trace "totalSize changed, updating repo reservation" @@ -306,26 +350,23 @@ proc updateAvailability( # inform subscribers that Availability has been modified (with increased # size) if OnAvailabilitySaved =? self.OnAvailabilitySaved: - # when chronos v4 is implemented, and OnAvailabilitySaved is annotated - # with async:(raises:[]), we can remove this try/catch as we know, with - # certainty, that nothing will be raised - try: - await OnAvailabilitySaved(obj) - except CancelledError as e: - raise e - except CatchableError as e: - # we don't have any insight into types of exceptions that - # `OnAvailabilitySaved` can raise because it is caller-defined - warn "Unknown error during 'OnAvailabilitySaved' callback", error = e.msg - + await OnAvailabilitySaved(obj) return res -proc update*(self: Reservations, obj: Reservation): Future[?!void] {.async.} = +proc update*( + self: Reservations, obj: Reservation +): Future[?!void] {.async: (raises: [CancelledError]).} = return await self.updateImpl(obj) -proc update*(self: Reservations, obj: Availability): Future[?!void] {.async.} = - withLock(self.availabilityLock): - return await self.updateAvailability(obj) +proc update*( + self: Reservations, obj: Availability +): Future[?!void] {.async: (raises: [CancelledError]).} = + try: + withLock(self.availabilityLock): + return await self.updateAvailability(obj) + except AsyncLockError as e: + error "Lock error when trying to update the availability", err = e.msg + return failure(e) proc delete(self: Reservations, key: Key): Future[?!void] {.async.} = trace "deleting object", key @@ -391,12 +432,20 @@ proc createAvailability*( duration: uint64, minPricePerBytePerSecond: UInt256, totalCollateral: UInt256, + enabled: bool, + until: SecondsSince1970, ): Future[?!Availability] {.async.} = trace "creating availability", - size, duration, minPricePerBytePerSecond, totalCollateral + size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until - let availability = - Availability.init(size, size, duration, minPricePerBytePerSecond, totalCollateral) + if until < 0: + let error = + newException(UntilOutOfBoundsError, "Cannot set until to a negative value") + return failure(error) + + let availability = Availability.init( + size, size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until + ) let bytes = availability.freeSize if reserveErr =? (await self.repo.reserve(bytes.NBytes)).errorOption: @@ -420,6 +469,7 @@ method createReservation*( requestId: RequestId, slotIndex: uint64, collateralPerByte: UInt256, + validUntil: SecondsSince1970, ): Future[?!Reservation] {.async, base.} = withLock(self.availabilityLock): without availabilityKey =? availabilityId.key, error: @@ -436,9 +486,11 @@ method createReservation*( ) return failure(error) - trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex + trace "Creating reservation", + availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil - let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + let reservation = + Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil) if createResErr =? (await self.update(reservation)).errorOption: return failure(createResErr) @@ -448,7 +500,7 @@ method createReservation*( availability.freeSize -= slotSize # adjust the remaining totalRemainingCollateral - availability.totalRemainingCollateral -= slotSize.stuint(256) * collateralPerByte + availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte # update availability with reduced size trace "Updating availability with reduced size" @@ -527,7 +579,7 @@ proc release*( reservationId: ReservationId, availabilityId: AvailabilityId, bytes: uint, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: topics = "release" bytes @@ -565,13 +617,9 @@ proc release*( return success() -iterator items(self: StorableIter): Future[?seq[byte]] = - while not self.finished: - yield self.next() - proc storables( self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey -): Future[?!StorableIter] {.async.} = +): Future[?!StorableIter] {.async: (raises: [CancelledError]).} = var iter = StorableIter() let query = Query.init(queryKey) when T is Availability: @@ -589,7 +637,7 @@ proc storables( return failure(error) # /sales/reservations - proc next(): Future[?seq[byte]] {.async.} = + proc next(): Future[?seq[byte]] {.async: (raises: [CancelledError]).} = await idleAsync() iter.finished = results.finished if not results.finished and res =? (await results.next()) and res.data.len > 0 and @@ -598,7 +646,7 @@ proc storables( return none seq[byte] - proc dispose(): Future[?!void] {.async.} = + proc dispose(): Future[?!void] {.async: (raises: [CancelledError]).} = return await results.dispose() iter.next = next @@ -607,32 +655,40 @@ proc storables( proc allImpl( self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey -): Future[?!seq[T]] {.async.} = +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = var ret: seq[T] = @[] without storables =? (await self.storables(T, queryKey)), error: return failure(error) for storable in storables.items: - without bytes =? (await storable): - continue + try: + without bytes =? (await storable): + continue - without obj =? T.fromJson(bytes), error: - error "json deserialization error", - json = string.fromBytes(bytes), error = error.msg - continue + without obj =? T.fromJson(bytes), error: + error "json deserialization error", + json = string.fromBytes(bytes), error = error.msg + continue - ret.add obj + ret.add obj + except CancelledError as err: + raise err + except CatchableError as err: + error "Error when retrieving storable", error = err.msg + continue return success(ret) -proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} = +proc all*( + self: Reservations, T: type SomeStorableObject +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = return await self.allImpl(T) proc all*( self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId -): Future[?!seq[T]] {.async.} = - without key =? (ReservationsKey / $availabilityId): +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = + without key =? key(availabilityId): return failure("no key") return await self.allImpl(T, key) @@ -641,6 +697,7 @@ proc findAvailability*( self: Reservations, size, duration: uint64, pricePerBytePerSecond, collateralPerByte: UInt256, + validUntil: SecondsSince1970, ): Future[?Availability] {.async.} = without storables =? (await self.storables(Availability)), e: error "failed to get all storables", error = e.msg @@ -648,11 +705,14 @@ proc findAvailability*( for item in storables.items: if bytes =? (await item) and availability =? Availability.fromJson(bytes): - if size <= availability.freeSize and duration <= availability.duration and + if availability.enabled and size <= availability.freeSize and + duration <= availability.duration and collateralPerByte <= availability.maxCollateralPerByte and - pricePerBytePerSecond >= availability.minPricePerBytePerSecond: + pricePerBytePerSecond >= availability.minPricePerBytePerSecond and + (availability.until == 0 or availability.until >= validUntil): trace "availability matched", id = availability.id, + enabled = availability.enabled, size, availFreeSize = availability.freeSize, duration, @@ -660,7 +720,8 @@ proc findAvailability*( pricePerBytePerSecond, availMinPricePerBytePerSecond = availability.minPricePerBytePerSecond, collateralPerByte, - availMaxCollateralPerByte = availability.maxCollateralPerByte + availMaxCollateralPerByte = availability.maxCollateralPerByte, + until = availability.until # TODO: As soon as we're on ARC-ORC, we can use destructors # to automatically dispose our iterators when they fall out of scope. @@ -672,6 +733,7 @@ proc findAvailability*( trace "availability did not match", id = availability.id, + enabled = availability.enabled, size, availFreeSize = availability.freeSize, duration, @@ -679,4 +741,5 @@ proc findAvailability*( pricePerBytePerSecond, availMinPricePerBytePerSecond = availability.minPricePerBytePerSecond, collateralPerByte, - availMaxCollateralPerByte = availability.maxCollateralPerByte + availMaxCollateralPerByte = availability.maxCollateralPerByte, + until = availability.until diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index f0abf3ee..61f3a9d3 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -27,7 +27,7 @@ type onFilled*: ?OnFilled OnCleanUp* = proc( - returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none + reprocessSlot = false, returnedCollateral = UInt256.none ): Future[void] {.gcsafe, upraises: [].} OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].} diff --git a/codex/sales/states/cancelled.nim b/codex/sales/states/cancelled.nim index 3bdf8c2f..2c240e15 100644 --- a/codex/sales/states/cancelled.nim +++ b/codex/sales/states/cancelled.nim @@ -34,9 +34,7 @@ method run*( if onCleanUp =? agent.onCleanUp: await onCleanUp( - returnBytes = true, - reprocessSlot = false, - returnedCollateral = some currentCollateral, + reprocessSlot = false, returnedCollateral = some currentCollateral ) warn "Sale cancelled due to timeout", diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 77bf08d3..95848fd3 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -34,7 +34,7 @@ method run*( onClear(request, data.slotIndex) if onCleanUp =? agent.onCleanUp: - await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot) + await onCleanUp(reprocessSlot = state.reprocessSlot) except CancelledError as e: trace "SaleErrored.run was cancelled", error = e.msgDetail except CatchableError as e: diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 13644223..1b76150a 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -50,7 +50,7 @@ method run*( await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) except SlotStateMismatchError as e: debug "Slot is already filled, ignoring slot" - return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) + return some State(SaleIgnored(reprocessSlot: false)) except MarketError as e: return some State(SaleErrored(error: e)) # other CatchableErrors are handled "automatically" by the SaleState diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 2aba69eb..16e66d27 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -36,6 +36,9 @@ method run*( requestId = data.requestId, slotIndex = data.slotIndex try: + if onClear =? agent.context.onClear: + onClear(request, data.slotIndex) + if onCleanUp =? agent.onCleanUp: await onCleanUp(returnedCollateral = state.returnedCollateral) except CancelledError as e: diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index b07a201c..7f2ae5b1 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -14,7 +14,6 @@ logScope: type SaleIgnored* = ref object of SaleState reprocessSlot*: bool # readd slot to queue with `seen` flag - returnBytes*: bool # return unreleased bytes from Reservation to Availability method `$`*(state: SaleIgnored): string = "SaleIgnored" @@ -26,9 +25,7 @@ method run*( try: if onCleanUp =? agent.onCleanUp: - await onCleanUp( - reprocessSlot = state.reprocessSlot, returnBytes = state.returnBytes - ) + await onCleanUp(reprocessSlot = state.reprocessSlot) except CancelledError as e: trace "SaleIgnored.run was cancelled", error = e.msgDetail except CatchableError as e: diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index 443aee0b..a3aee4c9 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -56,7 +56,7 @@ method run*( let slotId = slotId(data.requestId, data.slotIndex) let state = await market.slotState(slotId) if state != SlotState.Free and state != SlotState.Repair: - return some State(SaleIgnored(reprocessSlot: false, returnBytes: false)) + return some State(SaleIgnored(reprocessSlot: false)) # TODO: Once implemented, check to ensure the host is allowed to fill the slot, # due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal) @@ -68,10 +68,12 @@ method run*( pricePerBytePerSecond = request.ask.pricePerBytePerSecond collateralPerByte = request.ask.collateralPerByte + let requestEnd = await market.getRequestEnd(data.requestId) + without availability =? await reservations.findAvailability( request.ask.slotSize, request.ask.duration, request.ask.pricePerBytePerSecond, - request.ask.collateralPerByte, + request.ask.collateralPerByte, requestEnd, ): debug "No availability found for request, ignoring" @@ -82,7 +84,7 @@ method run*( without reservation =? await reservations.createReservation( availability.id, request.ask.slotSize, request.id, data.slotIndex, - request.ask.collateralPerByte, + request.ask.collateralPerByte, requestEnd, ), error: trace "Creation of reservation failed" # Race condition: diff --git a/codex/sales/states/slotreserving.nim b/codex/sales/states/slotreserving.nim index e9ac8dcd..780dadfc 100644 --- a/codex/sales/states/slotreserving.nim +++ b/codex/sales/states/slotreserving.nim @@ -46,7 +46,7 @@ method run*( await market.reserveSlot(data.requestId, data.slotIndex) except SlotReservationNotAllowedError as e: debug "Slot cannot be reserved, ignoring", error = e.msg - return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) + return some State(SaleIgnored(reprocessSlot: false)) except MarketError as e: return some State(SaleErrored(error: e)) # other CatchableErrors are handled "automatically" by the SaleState @@ -57,7 +57,7 @@ method run*( # do not re-add this slot to the queue, and return bytes from Reservation to # the Availability debug "Slot cannot be reserved, ignoring" - return some State(SaleIgnored(reprocessSlot: false, returnBytes: true)) + return some State(SaleIgnored(reprocessSlot: false)) except CancelledError as e: trace "SaleSlotReserving.run was cancelled", error = e.msgDetail except CatchableError as e: diff --git a/codex/stores/repostore/operations.nim b/codex/stores/repostore/operations.nim index 125741e1..cc488240 100644 --- a/codex/stores/repostore/operations.nim +++ b/codex/stores/repostore/operations.nim @@ -105,7 +105,7 @@ proc updateQuotaUsage*( minusUsed: NBytes = 0.NBytes, plusReserved: NBytes = 0.NBytes, minusReserved: NBytes = 0.NBytes, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = await self.metaDs.modify( QuotaUsedKey, proc(maybeCurrUsage: ?QuotaUsage): Future[?QuotaUsage] {.async.} = diff --git a/codex/stores/repostore/store.nim b/codex/stores/repostore/store.nim index d7305107..130ab15e 100644 --- a/codex/stores/repostore/store.nim +++ b/codex/stores/repostore/store.nim @@ -380,7 +380,9 @@ method close*(self: RepoStore): Future[void] {.async.} = # RepoStore procs ########################################################### -proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = +proc reserve*( + self: RepoStore, bytes: NBytes +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Reserve bytes ## @@ -388,7 +390,9 @@ proc reserve*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = await self.updateQuotaUsage(plusReserved = bytes) -proc release*(self: RepoStore, bytes: NBytes): Future[?!void] {.async.} = +proc release*( + self: RepoStore, bytes: NBytes +): Future[?!void] {.async: (raises: [CancelledError]).} = ## Release bytes ## diff --git a/openapi.yaml b/openapi.yaml index c2088cc5..8bae1b10 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -163,6 +163,14 @@ components: totalCollateral: type: string description: Total collateral (in amount of tokens) that can be used for matching requests + enabled: + type: boolean + description: Enable the ability to receive sales on this availability. + default: true + until: + type: integer + description: Specifies the latest timestamp, after which the availability will no longer host any slots. If set to 0, there will be no restrictions. + default: 0 SalesAvailabilityREAD: allOf: @@ -239,6 +247,9 @@ components: slotIndex: type: string description: Slot Index as decimal string + validUntil: + type: integer + description: Timestamp after which the reservation will no longer be valid. StorageRequestCreation: type: object @@ -704,7 +715,7 @@ paths: "400": description: Invalid data input "422": - description: The provided parameters did not pass validation + description: Not enough node's storage quota available or the provided parameters did not pass validation "500": description: Error reserving availability "503": diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index ed1dd52a..52b8a0b8 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -75,6 +75,8 @@ proc example*( duration = uint16.example.uint64, minPricePerBytePerSecond = uint8.example.u256, totalCollateral = totalSize.u256 * collateralPerByte, + enabled = true, + until = 0.SecondsSince1970, ) proc example*(_: type Reservation): Reservation = diff --git a/tests/codex/helpers/mockreservations.nim b/tests/codex/helpers/mockreservations.nim index 1bc76a09..91ed04ec 100644 --- a/tests/codex/helpers/mockreservations.nim +++ b/tests/codex/helpers/mockreservations.nim @@ -2,6 +2,7 @@ import pkg/chronos import pkg/codex/sales import pkg/codex/stores import pkg/questionable/results +import pkg/codex/clock type MockReservations* = ref object of Reservations createReservationThrowBytesOutOfBoundsError: bool @@ -28,6 +29,7 @@ method createReservation*( requestId: RequestId, slotIndex: uint64, collateralPerByte: UInt256, + validUntil: SecondsSince1970, ): Future[?!Reservation] {.async.} = if self.createReservationThrowBytesOutOfBoundsError: let error = newException( @@ -45,4 +47,5 @@ method createReservation*( requestId, slotIndex, collateralPerByte, + validUntil, ) diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim index 48f3e8a0..ab450200 100644 --- a/tests/codex/sales/states/testcancelled.nim +++ b/tests/codex/sales/states/testcancelled.nim @@ -22,16 +22,14 @@ asyncchecksuite "sales state 'cancelled'": var market: MockMarket var state: SaleCancelled var agent: SalesAgent - var returnBytesWas = bool.none var reprocessSlotWas = bool.none var returnedCollateralValue = UInt256.none setup: market = MockMarket.new() let onCleanUp = proc( - returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none + reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = - returnBytesWas = some returnBytes reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral @@ -40,7 +38,7 @@ asyncchecksuite "sales state 'cancelled'": agent.onCleanUp = onCleanUp state = SaleCancelled.new() - test "calls onCleanUp with returnBytes = false, reprocessSlot = true, and returnedCollateral = currentCollateral": + test "calls onCleanUp with reprocessSlot = true, and returnedCollateral = currentCollateral": market.fillSlot( requestId = request.id, slotIndex = slotIndex, @@ -49,6 +47,5 @@ asyncchecksuite "sales state 'cancelled'": collateral = currentCollateral, ) discard await state.run(agent) - check eventually returnBytesWas == some true check eventually reprocessSlotWas == some false check eventually returnedCollateralValue == some currentCollateral diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim index 07e325e3..0cc26cf8 100644 --- a/tests/codex/sales/states/testerrored.nim +++ b/tests/codex/sales/states/testerrored.nim @@ -20,14 +20,12 @@ asyncchecksuite "sales state 'errored'": var state: SaleErrored var agent: SalesAgent - var returnBytesWas = false var reprocessSlotWas = false setup: let onCleanUp = proc( - returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none + reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = - returnBytesWas = returnBytes reprocessSlotWas = reprocessSlot let context = SalesContext(market: market, clock: clock) @@ -35,8 +33,7 @@ asyncchecksuite "sales state 'errored'": agent.onCleanUp = onCleanUp state = SaleErrored(error: newException(ValueError, "oh no!")) - test "calls onCleanUp with returnBytes = false and reprocessSlot = true": + test "calls onCleanUp with reprocessSlot = true": state = SaleErrored(error: newException(ValueError, "oh no!"), reprocessSlot: true) discard await state.run(agent) - check eventually returnBytesWas == true check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/states/testfilling.nim b/tests/codex/sales/states/testfilling.nim index f746b5a8..54536a4c 100644 --- a/tests/codex/sales/states/testfilling.nim +++ b/tests/codex/sales/states/testfilling.nim @@ -47,7 +47,6 @@ suite "sales state 'filling'": let next = !(await state.run(agent)) check next of SaleIgnored check SaleIgnored(next).reprocessSlot == false - check SaleIgnored(next).returnBytes test "run switches to errored with other error ": let error = newException(MarketError, "some error") diff --git a/tests/codex/sales/states/testfinished.nim b/tests/codex/sales/states/testfinished.nim index 0c33a7b3..1648df3a 100644 --- a/tests/codex/sales/states/testfinished.nim +++ b/tests/codex/sales/states/testfinished.nim @@ -23,22 +23,23 @@ asyncchecksuite "sales state 'finished'": var market: MockMarket var state: SaleFinished var agent: SalesAgent - var returnBytesWas = bool.none var reprocessSlotWas = bool.none var returnedCollateralValue = UInt256.none + var saleCleared = bool.none setup: market = MockMarket.new() let onCleanUp = proc( - returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none + reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = - returnBytesWas = some returnBytes reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral let context = SalesContext(market: market, clock: clock) agent = newSalesAgent(context, request.id, slotIndex, request.some) agent.onCleanUp = onCleanUp + agent.context.onClear = some proc(request: StorageRequest, idx: uint64) = + saleCleared = some true state = SaleFinished(returnedCollateral: some currentCollateral) test "switches to cancelled state when request expires": @@ -49,8 +50,8 @@ asyncchecksuite "sales state 'finished'": let next = state.onFailed(request) check !next of SaleFailed - test "calls onCleanUp with returnBytes = false, reprocessSlot = true, and returnedCollateral = currentCollateral": + test "calls onCleanUp with reprocessSlot = true, and returnedCollateral = currentCollateral": discard await state.run(agent) - check eventually returnBytesWas == some false check eventually reprocessSlotWas == some false check eventually returnedCollateralValue == some currentCollateral + check eventually saleCleared == some true diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim index 2e1c6e91..5eea7d16 100644 --- a/tests/codex/sales/states/testignored.nim +++ b/tests/codex/sales/states/testignored.nim @@ -20,14 +20,12 @@ asyncchecksuite "sales state 'ignored'": var state: SaleIgnored var agent: SalesAgent - var returnBytesWas = false var reprocessSlotWas = false setup: let onCleanUp = proc( - returnBytes = false, reprocessSlot = false, returnedCollateral = UInt256.none + reprocessSlot = false, returnedCollateral = UInt256.none ) {.async.} = - returnBytesWas = returnBytes reprocessSlotWas = reprocessSlot let context = SalesContext(market: market, clock: clock) @@ -36,7 +34,6 @@ asyncchecksuite "sales state 'ignored'": state = SaleIgnored.new() test "calls onCleanUp with values assigned to SaleIgnored": - state = SaleIgnored(reprocessSlot: true, returnBytes: true) + state = SaleIgnored(reprocessSlot: true) discard await state.run(agent) - check eventually returnBytesWas == true check eventually reprocessSlotWas == true diff --git a/tests/codex/sales/states/testpreparing.nim b/tests/codex/sales/states/testpreparing.nim index 99d9c7fe..802489a1 100644 --- a/tests/codex/sales/states/testpreparing.nim +++ b/tests/codex/sales/states/testpreparing.nim @@ -13,6 +13,7 @@ import pkg/codex/sales/salesagent import pkg/codex/sales/salescontext import pkg/codex/sales/reservations import pkg/codex/stores/repostore +import times import ../../../asynctest import ../../helpers import ../../examples @@ -39,6 +40,8 @@ asyncchecksuite "sales state 'preparing'": duration = request.ask.duration + 60.uint64, minPricePerBytePerSecond = request.ask.pricePerBytePerSecond, totalCollateral = request.ask.collateralPerSlot * request.ask.slots.u256, + enabled = true, + until = 0.SecondsSince1970, ) let repoDs = SQLiteDatastore.new(Memory).tryGet() let metaDs = SQLiteDatastore.new(Memory).tryGet() @@ -52,6 +55,8 @@ asyncchecksuite "sales state 'preparing'": context.reservations = reservations agent = newSalesAgent(context, request.id, slotIndex, request.some) + market.requestEnds[request.id] = clock.now() + cast[int64](request.ask.duration) + teardown: await repo.stop() @@ -67,10 +72,14 @@ asyncchecksuite "sales state 'preparing'": let next = state.onSlotFilled(request.id, slotIndex) check !next of SaleFilled - proc createAvailability() {.async.} = + proc createAvailability(enabled = true) {.async.} = let a = await reservations.createAvailability( - availability.totalSize, availability.duration, - availability.minPricePerBytePerSecond, availability.totalCollateral, + availability.totalSize, + availability.duration, + availability.minPricePerBytePerSecond, + availability.totalCollateral, + enabled, + until = 0.SecondsSince1970, ) availability = a.get @@ -79,7 +88,11 @@ asyncchecksuite "sales state 'preparing'": check next of SaleIgnored let ignored = SaleIgnored(next) check ignored.reprocessSlot - check ignored.returnBytes == false + + test "run switches to ignored when availability is not enabled": + await createAvailability(enabled = false) + let next = !(await state.run(agent)) + check next of SaleIgnored test "run switches to slot reserving state after reservation created": await createAvailability() @@ -94,7 +107,6 @@ asyncchecksuite "sales state 'preparing'": check next of SaleIgnored let ignored = SaleIgnored(next) check ignored.reprocessSlot - check ignored.returnBytes == false test "run switches to errored when reserve fails with other error": await createAvailability() diff --git a/tests/codex/sales/states/testslotreserving.nim b/tests/codex/sales/states/testslotreserving.nim index 0e2e2cc7..b223338a 100644 --- a/tests/codex/sales/states/testslotreserving.nim +++ b/tests/codex/sales/states/testslotreserving.nim @@ -67,4 +67,3 @@ asyncchecksuite "sales state 'SlotReserving'": let next = !(await state.run(agent)) check next of SaleIgnored check SaleIgnored(next).reprocessSlot == false - check SaleIgnored(next).returnBytes diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 49df059d..ff5e153c 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -1,5 +1,5 @@ import std/random - +import std/times import pkg/questionable import pkg/questionable/results import pkg/chronos @@ -8,6 +8,7 @@ import pkg/datastore import pkg/codex/stores import pkg/codex/errors import pkg/codex/sales +import pkg/codex/clock import pkg/codex/utils/json import ../../asynctest @@ -39,19 +40,22 @@ asyncchecksuite "Reservations module": await repoTmp.destroyDb() await metaTmp.destroyDb() - proc createAvailability(): Availability = + proc createAvailability(enabled = true, until = 0.SecondsSince1970): Availability = let example = Availability.example(collateralPerByte) let totalSize = rand(100000 .. 200000).uint64 let totalCollateral = totalSize.u256 * collateralPerByte let availability = waitFor reservations.createAvailability( - totalSize, example.duration, example.minPricePerBytePerSecond, totalCollateral + totalSize, example.duration, example.minPricePerBytePerSecond, totalCollateral, + enabled, until, ) return availability.get proc createReservation(availability: Availability): Reservation = let size = rand(1 ..< availability.freeSize.int) + let validUntil = getTime().toUnix() + 30.SecondsSince1970 let reservation = waitFor reservations.createReservation( - availability.id, size.uint64, RequestId.example, uint64.example, 1.u256 + availability.id, size.uint64, RequestId.example, uint64.example, 1.u256, + validUntil, ) return reservation.get @@ -64,8 +68,12 @@ asyncchecksuite "Reservations module": check (await reservations.all(Availability)).get.len == 0 test "generates unique ids for storage availability": - let availability1 = Availability.init(1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256) - let availability2 = Availability.init(1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256) + let availability1 = Availability.init( + 1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256, true, 0.SecondsSince1970 + ) + let availability2 = Availability.init( + 1.uint64, 2.uint64, 3.uint64, 4.u256, 5.u256, true, 0.SecondsSince1970 + ) check availability1.id != availability2.id test "can reserve available storage": @@ -128,20 +136,24 @@ asyncchecksuite "Reservations module": test "cannot create reservation with non-existant availability": let availability = Availability.example + let validUntil = getTime().toUnix() + 30.SecondsSince1970 let created = await reservations.createReservation( - availability.id, uint64.example, RequestId.example, uint64.example, 1.u256 + availability.id, uint64.example, RequestId.example, uint64.example, 1.u256, + validUntil, ) check created.isErr check created.error of NotExistsError test "cannot create reservation larger than availability size": let availability = createAvailability() + let validUntil = getTime().toUnix() + 30.SecondsSince1970 let created = await reservations.createReservation( availability.id, availability.totalSize + 1, RequestId.example, uint64.example, UInt256.example, + validUntil, ) check created.isErr check created.error of BytesOutOfBoundsError @@ -149,23 +161,26 @@ asyncchecksuite "Reservations module": test "cannot create reservation larger than availability size - concurrency test": proc concurrencyTest(): Future[void] {.async.} = let availability = createAvailability() + let validUntil = getTime().toUnix() + 30.SecondsSince1970 let one = reservations.createReservation( availability.id, availability.totalSize - 1, RequestId.example, uint64.example, UInt256.example, + validUntil, ) let two = reservations.createReservation( availability.id, availability.totalSize, RequestId.example, uint64.example, - UInt256.example, + UInt256.example, validUntil, ) let oneResult = await one let twoResult = await two check oneResult.isErr or twoResult.isErr + if oneResult.isErr: check oneResult.error of BytesOutOfBoundsError if twoResult.isErr: @@ -259,6 +274,48 @@ asyncchecksuite "Reservations module": check isOk await reservations.update(availability) check (repo.quotaReservedBytes - origQuota) == 100.NBytes + test "create availability set enabled to true by default": + let availability = createAvailability() + check availability.enabled == true + + test "create availability set until to 0 by default": + let availability = createAvailability() + check availability.until == 0.SecondsSince1970 + + test "create availability whith correct values": + var until = getTime().toUnix() + + let availability = createAvailability(enabled = false, until = until) + check availability.enabled == false + check availability.until == until + + test "create an availability fails when trying set until with a negative value": + let totalSize = rand(100000 .. 200000).uint64 + let example = Availability.example(collateralPerByte) + let totalCollateral = totalSize.u256 * collateralPerByte + + let result = await reservations.createAvailability( + totalSize, + example.duration, + example.minPricePerBytePerSecond, + totalCollateral, + enabled = true, + until = -1.SecondsSince1970, + ) + + check result.isErr + check result.error of UntilOutOfBoundsError + + test "update an availability fails when trying set until with a negative value": + let until = getTime().toUnix() + let availability = createAvailability(until = until) + + availability.until = -1 + + let result = await reservations.update(availability) + check result.isErr + check result.error of UntilOutOfBoundsError + test "reservation can be partially released": let availability = createAvailability() let reservation = createReservation(availability) @@ -285,7 +342,9 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved called when availability is created": var added: Availability - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc( + a: Availability + ) {.gcsafe, async: (raises: []).} = added = a let availability = createAvailability() @@ -295,7 +354,9 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved called when availability size is increased": var availability = createAvailability() var added: Availability - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc( + a: Availability + ) {.gcsafe, async: (raises: []).} = added = a availability.freeSize += 1 discard await reservations.update(availability) @@ -305,7 +366,21 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved is not called when availability size is decreased": var availability = createAvailability() var called = false - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc( + a: Availability + ) {.gcsafe, async: (raises: []).} = + called = true + availability.freeSize -= 1.uint64 + discard await reservations.update(availability) + + check not called + + test "OnAvailabilitySaved is not called when availability is disabled": + var availability = createAvailability(enabled = false) + var called = false + reservations.OnAvailabilitySaved = proc( + a: Availability + ) {.gcsafe, async: (raises: []).} = called = true availability.freeSize -= 1 discard await reservations.update(availability) @@ -315,7 +390,7 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved called when availability duration is increased": var availability = createAvailability() var added: Availability - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = added = a availability.duration += 1 discard await reservations.update(availability) @@ -325,7 +400,7 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved is not called when availability duration is decreased": var availability = createAvailability() var called = false - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = called = true availability.duration -= 1 discard await reservations.update(availability) @@ -335,7 +410,7 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved called when availability minPricePerBytePerSecond is increased": var availability = createAvailability() var added: Availability - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = added = a availability.minPricePerBytePerSecond += 1.u256 discard await reservations.update(availability) @@ -345,7 +420,7 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved is not called when availability minPricePerBytePerSecond is decreased": var availability = createAvailability() var called = false - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = called = true availability.minPricePerBytePerSecond -= 1.u256 discard await reservations.update(availability) @@ -355,7 +430,7 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved called when availability totalCollateral is increased": var availability = createAvailability() var added: Availability - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = added = a availability.totalCollateral = availability.totalCollateral + 1.u256 discard await reservations.update(availability) @@ -365,7 +440,7 @@ asyncchecksuite "Reservations module": test "OnAvailabilitySaved is not called when availability totalCollateral is decreased": var availability = createAvailability() var called = false - reservations.OnAvailabilitySaved = proc(a: Availability) {.async.} = + reservations.OnAvailabilitySaved = proc(a: Availability) {.async: (raises: []).} = called = true availability.totalCollateral = availability.totalCollateral - 1.u256 discard await reservations.update(availability) @@ -374,32 +449,69 @@ asyncchecksuite "Reservations module": test "availabilities can be found": let availability = createAvailability() - + let validUntil = getTime().toUnix() + 30.SecondsSince1970 let found = await reservations.findAvailability( availability.freeSize, availability.duration, - availability.minPricePerBytePerSecond, collateralPerByte, + availability.minPricePerBytePerSecond, collateralPerByte, validUntil, ) check found.isSome check found.get == availability + test "does not find an availability when is it disabled": + let availability = createAvailability(enabled = false) + let validUntil = getTime().toUnix() + 30.SecondsSince1970 + let found = await reservations.findAvailability( + availability.freeSize, availability.duration, + availability.minPricePerBytePerSecond, collateralPerByte, validUntil, + ) + + check found.isNone + + test "finds an availability when the until date is after the duration": + let example = Availability.example(collateralPerByte) + let until = getTime().toUnix() + example.duration.SecondsSince1970 + let availability = createAvailability(until = until) + let validUntil = getTime().toUnix() + 30.SecondsSince1970 + let found = await reservations.findAvailability( + availability.freeSize, availability.duration, + availability.minPricePerBytePerSecond, collateralPerByte, validUntil, + ) + + check found.isSome + check found.get == availability + + test "does not find an availability when the until date is before the duration": + let example = Availability.example(collateralPerByte) + let until = getTime().toUnix() + 1.SecondsSince1970 + let availability = createAvailability(until = until) + let validUntil = getTime().toUnix() + 30.SecondsSince1970 + let found = await reservations.findAvailability( + availability.freeSize, availability.duration, + availability.minPricePerBytePerSecond, collateralPerByte, validUntil, + ) + + check found.isNone + test "non-matching availabilities are not found": let availability = createAvailability() - + let validUntil = getTime().toUnix() + 30.SecondsSince1970 let found = await reservations.findAvailability( availability.freeSize + 1, availability.duration, availability.minPricePerBytePerSecond, collateralPerByte, + validUntil, ) check found.isNone test "non-existent availability cannot be found": let availability = Availability.example + let validUntil = getTime().toUnix() + 30.SecondsSince1970 let found = await reservations.findAvailability( availability.freeSize, availability.duration, - availability.minPricePerBytePerSecond, collateralPerByte, + availability.minPricePerBytePerSecond, collateralPerByte, validUntil, ) check found.isNone @@ -420,7 +532,12 @@ asyncchecksuite "Reservations module": test "fails to create availability with size that is larger than available quota": let created = await reservations.createAvailability( - DefaultQuotaBytes.uint64 + 1, uint64.example, UInt256.example, UInt256.example + DefaultQuotaBytes.uint64 + 1, + uint64.example, + UInt256.example, + UInt256.example, + enabled = true, + until = 0.SecondsSince1970, ) check created.isErr check created.error of ReserveFailedError diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 74ea8a2b..f4d9cbae 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -14,6 +14,7 @@ import pkg/codex/stores/repostore import pkg/codex/blocktype as bt import pkg/codex/node import pkg/codex/utils/asyncstatemachine +import times import ../../asynctest import ../helpers import ../helpers/mockmarket @@ -152,6 +153,8 @@ asyncchecksuite "Sales": duration = 60.uint64, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = totalCollateral, + enabled = true, + until = 0.SecondsSince1970, ) request = StorageRequest( ask: StorageAsk( @@ -221,10 +224,11 @@ asyncchecksuite "Sales": let key = availability.id.key.get (waitFor reservations.get(key, Availability)).get - proc createAvailability() = + proc createAvailability(enabled = true, until = 0.SecondsSince1970) = let a = waitFor reservations.createAvailability( availability.totalSize, availability.duration, - availability.minPricePerBytePerSecond, availability.totalCollateral, + availability.minPricePerBytePerSecond, availability.totalCollateral, enabled, + until, ) availability = a.get # update id @@ -380,14 +384,14 @@ asyncchecksuite "Sales": check eventually getAvailability().freeSize == availability.freeSize - request.ask.slotSize - test "non-downloaded bytes are returned to availability once finished": + test "bytes are returned to availability once finished": var slotIndex = 0.uint64 sales.onStore = proc( request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false ): Future[?!void] {.async.} = slotIndex = slot let blk = bt.Block.new(@[1.byte]).get - await onBatch(@[blk]) + await onBatch(blk.repeat(request.ask.slotSize)) let sold = newFuture[void]() sales.onSale = proc(request: StorageRequest, slotIndex: uint64) = @@ -403,7 +407,7 @@ asyncchecksuite "Sales": market.slotState[request.slotId(slotIndex)] = SlotState.Finished clock.advance(request.ask.duration.int64) - check eventually getAvailability().freeSize == origSize - 1 + check eventually getAvailability().freeSize == origSize test "ignores download when duration not long enough": availability.duration = request.ask.duration - 1 @@ -439,6 +443,34 @@ asyncchecksuite "Sales": market.slotState[request.slotId(3.uint64)] = SlotState.Filled check wasIgnored() + test "ignores request when availability is not enabled": + createAvailability(enabled = false) + await market.requestStorage(request) + check wasIgnored() + + test "ignores request when availability until terminates before the duration": + let until = getTime().toUnix() + createAvailability(until = until) + await market.requestStorage(request) + + check wasIgnored() + + test "retrieves request when availability until terminates after the duration": + let requestEnd = getTime().toUnix() + cast[int64](request.ask.duration) + let until = requestEnd + 1 + createAvailability(until = until) + + var storingRequest: StorageRequest + sales.onStore = proc( + request: StorageRequest, slot: uint64, onBatch: BatchProc, isRepairing = false + ): Future[?!void] {.async.} = + storingRequest = request + return success() + + market.requestEnds[request.id] = requestEnd + await market.requestStorage(request) + check eventually storingRequest == request + test "retrieves and stores data locally": var storingRequest: StorageRequest var storingSlot: uint64 @@ -563,6 +595,8 @@ asyncchecksuite "Sales": # by other slots request.ask.slots = 1 market.requestExpiry[request.id] = expiry + market.requestEnds[request.id] = + getTime().toUnix() + cast[int64](request.ask.duration) let origSize = availability.freeSize sales.onStore = proc( @@ -621,10 +655,28 @@ asyncchecksuite "Sales": test "deletes inactive reservations on load": createAvailability() + let validUntil = getTime().toUnix() + 30.SecondsSince1970 discard await reservations.createReservation( - availability.id, 100.uint64, RequestId.example, 0.uint64, UInt256.example + availability.id, 100.uint64, RequestId.example, 0.uint64, UInt256.example, + validUntil, ) check (await reservations.all(Reservation)).get.len == 1 await sales.load() check (await reservations.all(Reservation)).get.len == 0 check getAvailability().freeSize == availability.freeSize # was restored + + test "update an availability fails when trying change the until date before an existing reservation": + let until = getTime().toUnix() + 300.SecondsSince1970 + createAvailability(until = until) + + market.requestEnds[request.id] = + getTime().toUnix() + cast[int64](request.ask.duration) + + await market.requestStorage(request) + await allowRequestToStart() + + availability.until = getTime().toUnix() + + let result = await reservations.update(availability) + check result.isErr + check result.error of UntilOutOfBoundsError diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 5d5f0cc2..d7ed3df2 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -294,6 +294,8 @@ proc postAvailabilityRaw*( client: CodexClient, totalSize, duration: uint64, minPricePerBytePerSecond, totalCollateral: UInt256, + enabled: ?bool = bool.none, + until: ?SecondsSince1970 = SecondsSince1970.none, ): Future[HttpClientResponseRef] {.async: (raises: [CancelledError, HttpError]).} = ## Post sales availability endpoint ## @@ -304,18 +306,27 @@ proc postAvailabilityRaw*( "duration": duration, "minPricePerBytePerSecond": minPricePerBytePerSecond, "totalCollateral": totalCollateral, + "enabled": enabled, + "until": until, } - return await client.post(url, $json) proc postAvailability*( client: CodexClient, totalSize, duration: uint64, minPricePerBytePerSecond, totalCollateral: UInt256, + enabled: ?bool = bool.none, + until: ?SecondsSince1970 = SecondsSince1970.none, ): Future[?!Availability] {.async: (raises: [CancelledError, HttpError]).} = let response = await client.postAvailabilityRaw( - totalSize, duration, minPricePerBytePerSecond, totalCollateral + totalSize = totalSize, + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = totalCollateral, + enabled = enabled, + until = until, ) + let body = await response.body doAssert response.status == 201, @@ -327,6 +338,8 @@ proc patchAvailabilityRaw*( availabilityId: AvailabilityId, totalSize, freeSize, duration: ?uint64 = uint64.none, minPricePerBytePerSecond, totalCollateral: ?UInt256 = UInt256.none, + enabled: ?bool = bool.none, + until: ?SecondsSince1970 = SecondsSince1970.none, ): Future[HttpClientResponseRef] {. async: (raw: true, raises: [CancelledError, HttpError]) .} = @@ -352,6 +365,12 @@ proc patchAvailabilityRaw*( if totalCollateral =? totalCollateral: json["totalCollateral"] = %totalCollateral + if enabled =? enabled: + json["enabled"] = %enabled + + if until =? until: + json["until"] = %until + client.patch(url, $json) proc patchAvailability*( @@ -359,6 +378,8 @@ proc patchAvailability*( availabilityId: AvailabilityId, totalSize, duration: ?uint64 = uint64.none, minPricePerBytePerSecond, totalCollateral: ?UInt256 = UInt256.none, + enabled: ?bool = bool.none, + until: ?SecondsSince1970 = SecondsSince1970.none, ): Future[void] {.async: (raises: [CancelledError, HttpError]).} = let response = await client.patchAvailabilityRaw( availabilityId, @@ -366,8 +387,10 @@ proc patchAvailability*( duration = duration, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = totalCollateral, + enabled = enabled, + until = until, ) - doAssert response.status == 200, "expected 200 OK, got " & $response.status + doAssert response.status == 204, "expected No Content, got " & $response.status proc getAvailabilities*( client: CodexClient diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index dee3645e..40f394e0 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -1,3 +1,5 @@ +import std/times +import std/httpclient import ../examples import ../contracts/time import ../contracts/deployment diff --git a/tests/integration/testproofs.nim b/tests/integration/testproofs.nim index b0ede765..c49b7b6f 100644 --- a/tests/integration/testproofs.nim +++ b/tests/integration/testproofs.nim @@ -275,7 +275,9 @@ marketplacesuite "Simulate invalid proofs": # totalSize=slotSize, # should match 1 slot only # duration=totalPeriods.periods.u256, # minPricePerBytePerSecond=minPricePerBytePerSecond, - # totalCollateral=slotSize * minPricePerBytePerSecond + # totalCollateral=slotSize * minPricePerBytePerSecond, + # enabled = true.some, + # until = 0.SecondsSince1970.some, # ) # let cid = client0.upload(data).get diff --git a/tests/integration/testrestapi.nim b/tests/integration/testrestapi.nim index 415658c1..57e38b39 100644 --- a/tests/integration/testrestapi.nim +++ b/tests/integration/testrestapi.nim @@ -35,6 +35,7 @@ twonodessuite "REST API": duration = 2.uint64, minPricePerBytePerSecond = minPricePerBytePerSecond, totalCollateral = totalCollateral, + enabled = true.some, ) ).get let space = (await client1.space()).tryGet() diff --git a/tests/integration/testrestapivalidation.nim b/tests/integration/testrestapivalidation.nim index 00caefdd..adeffa77 100644 --- a/tests/integration/testrestapivalidation.nim +++ b/tests/integration/testrestapivalidation.nim @@ -364,5 +364,21 @@ asyncchecksuite "Rest API validation": check responseBefore.status == 422 check (await responseBefore.body) == "Collateral per byte must be greater than zero" + test "creating availability fails when until is negative": + let totalSize = 12.uint64 + let minPricePerBytePerSecond = 1.u256 + let totalCollateral = totalSize.u256 * minPricePerBytePerSecond + let response = await client.postAvailabilityRaw( + totalSize = totalSize, + duration = 2.uint64, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = totalCollateral, + until = -1.SecondsSince1970.some, + ) + + check: + response.status == 422 + (await response.body) == "Cannot set until to a negative value" + waitFor node.stop() node.removeDataDir() diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim index 5e9b26df..ef999990 100644 --- a/tests/integration/testsales.nim +++ b/tests/integration/testsales.nim @@ -1,4 +1,5 @@ import std/httpclient +import std/times import pkg/codex/contracts from pkg/codex/stores/repostore/types import DefaultQuotaBytes import ./twonodes @@ -17,22 +18,14 @@ proc findItem[T](items: seq[T], item: T): ?!T = multinodesuite "Sales": let salesConfig = NodeConfigs( - clients: CodexConfigs - .init(nodes = 1) - .withLogFile() - .withLogTopics( - "node", "marketplace", "sales", "reservations", "node", "proving", "clock" - ).some, - providers: CodexConfigs - .init(nodes = 1) - .withLogFile() - .withLogTopics( - "node", "marketplace", "sales", "reservations", "node", "proving", "clock" - ).some, + clients: CodexConfigs.init(nodes = 1).some, + providers: CodexConfigs.init(nodes = 1) + # .debug() # uncomment to enable console log output + # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock") + .some, ) - let minPricePerBytePerSecond = 1.u256 - var host: CodexClient var client: CodexClient @@ -80,11 +73,15 @@ multinodesuite "Sales": ) ).get + var until = getTime().toUnix() + await host.patchAvailability( availability.id, duration = 100.uint64.some, minPricePerBytePerSecond = 2.u256.some, totalCollateral = 200.u256.some, + enabled = false.some, + until = until.some, ) let updatedAvailability = @@ -94,6 +91,8 @@ multinodesuite "Sales": check updatedAvailability.totalCollateral == 200 check updatedAvailability.totalSize == 140000.uint64 check updatedAvailability.freeSize == 140000.uint64 + check updatedAvailability.enabled == false + check updatedAvailability.until == until test "updating availability - updating totalSize", salesConfig: let availability = ( @@ -105,6 +104,7 @@ multinodesuite "Sales": ) ).get await host.patchAvailability(availability.id, totalSize = 100000.uint64.some) + let updatedAvailability = ((await host.getAvailabilities()).get).findItem(availability).get check updatedAvailability.totalSize == 100000 @@ -165,3 +165,72 @@ multinodesuite "Sales": ((await host.getAvailabilities()).get).findItem(availability).get check newUpdatedAvailability.totalSize == originalSize + 20000 check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 + + test "updating availability fails with until negative", salesConfig: + let availability = ( + await host.postAvailability( + totalSize = 140000.uint64, + duration = 200.uint64, + minPricePerBytePerSecond = 3.u256, + totalCollateral = 300.u256, + ) + ).get + + let response = + await host.patchAvailabilityRaw(availability.id, until = -1.SecondsSince1970.some) + + check: + (await response.body) == "Cannot set until to a negative value" + + test "returns an error when trying to update the until date before an existing a request is finished", + salesConfig: + let size = 0xFFFFFF.uint64 + let data = await RandomChunker.example(blocks = 8) + let duration = 20 * 60.uint64 + let minPricePerBytePerSecond = 3.u256 + let collateralPerByte = 1.u256 + let ecNodes = 3.uint + let ecTolerance = 1.uint + + # host makes storage available + let availability = ( + await host.postAvailability( + totalSize = size, + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = size.u256 * minPricePerBytePerSecond, + ) + ).get + + # client requests storage + let cid = (await client.upload(data)).get + let id = ( + await client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = minPricePerBytePerSecond, + proofProbability = 3.u256, + expiry = 10 * 60.uint64, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + ) + ).get + + check eventually( + await client.purchaseStateIs(id, "started"), timeout = 10 * 60 * 1000 + ) + let purchase = (await client.getPurchase(id)).get + check purchase.error == none string + + let unixNow = getTime().toUnix() + let until = unixNow + 1.SecondsSince1970 + + let response = await host.patchAvailabilityRaw( + availabilityId = availability.id, until = until.some + ) + + check: + response.status == 422 + (await response.body) == + "Until parameter must be greater or equal to the longest currently hosted slot" diff --git a/vendor/nim-datastore b/vendor/nim-datastore index d67860ad..5778e373 160000 --- a/vendor/nim-datastore +++ b/vendor/nim-datastore @@ -1 +1 @@ -Subproject commit d67860add63fd23cdacde1d3da8f4739c2660c2d +Subproject commit 5778e373fa97286f389e0aef61f1e8f30a934dab