From ada1a6f8653def2356483e5d651fef166ec70424 Mon Sep 17 00:00:00 2001 From: Arnaud Date: Fri, 21 Feb 2025 17:03:09 +0100 Subject: [PATCH] Move until validation to reservations module --- codex/rest/api.nim | 50 +----- codex/sales/reservations.nim | 218 +++++++++++++++-------- codex/sales/states/preparing.nim | 2 +- tests/codex/helpers/mockreservations.nim | 2 + tests/codex/sales/testreservations.nim | 36 +++- tests/codex/sales/testsales.nim | 15 +- tests/integration/codexclient.nim | 3 +- tests/integration/testmarketplace.nim | 11 +- tests/integration/testsales.nim | 24 +-- 9 files changed, 215 insertions(+), 146 deletions(-) diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 2def25f8..2e1c3fb5 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -49,32 +49,6 @@ logScope: declareCounter(codex_api_uploads, "codex API uploads") declareCounter(codex_api_downloads, "codex API downloads") -proc getLongestRequestEnd( - node: CodexNodeRef, availabilityId: AvailabilityId -): Future[?!SecondsSince1970] {.async: (raises: []).} = - without contracts =? node.contracts.host: - return failure("Sales unavailable") - - let - reservations = contracts.sales.context.reservations - market = contracts.sales.context.market - - try: - without allReservations =? await reservations.all(Reservation, availabilityId): - return failure("Cannot retrieve the reservations") - - let requestEnds = allReservations.mapIt(await market.getRequestEnd(it.requestId)) - - if len(requestEnds) == 0: - return success(0.SecondsSince1970) - - return success(requestEnds.max) - except CancelledError as err: - raise err - except CatchableError as err: - error "Error when trying to get longest request end", error = err.msg - return failure("Cannot retrieve the request dates: " & err.msg) - proc validate(pattern: string, value: string): int {.gcsafe, raises: [Defect].} = 0 @@ -547,6 +521,7 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = ## tokens) to be matched against the request's pricePerBytePerSecond ## totalCollateral - total collateral (in amount of ## tokens) that can be distributed among matching requests + try: without contracts =? node.contracts.host: return RestApiResponse.error(Http503, "Persistence is not enabled") @@ -595,29 +570,20 @@ proc initSalesApi(node: CodexNodeRef, router: var RestRouter) = availability.totalCollateral = totalCollateral if until =? restAv.until: - if until < 0: - return RestApiResponse.error( - Http400, "Until parameter must be greater or equal 0. Got: " & $until - ) - - without longestRequestEnd =? (await node.getLongestRequestEnd(id)).catch, err: - return RestApiResponse.error(Http500, err.msg) - - if until > 0 and until < longestRequestEnd.get: - return RestApiResponse.error( - Http400, - "Until parameter must be greater or equal the current longest request.", - ) - availability.until = until if enabled =? restAv.enabled: availability.enabled = enabled if err =? (await reservations.update(availability)).errorOption: - return RestApiResponse.error(Http500, err.msg) + if err of CancelledError: + raise err + if err of UntilOutOfBoundsError: + return RestApiResponse.error(Http422, err.msg) + else: + return RestApiResponse.error(Http500, err.msg) - return RestApiResponse.response(Http200) + return RestApiResponse.response(Http204) except CatchableError as exc: trace "Excepting processing request", exc = exc.msg return RestApiResponse.error(Http500) diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index cd272924..de4b2788 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -84,6 +84,7 @@ type size* {.serialize.}: UInt256 requestId* {.serialize.}: RequestId slotIndex* {.serialize.}: UInt256 + validUntil* {.serialize.}: SecondsSince1970 Reservations* = ref object of RootObj availabilityLock: AsyncLock @@ -110,13 +111,20 @@ type SerializationError* = object of ReservationsError UpdateFailedError* = object of ReservationsError BytesOutOfBoundsError* = object of ReservationsError + UntilOutOfBoundsError* = object of ReservationsError const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet proc hash*(x: AvailabilityId): Hash {.borrow.} -proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} +proc all*( + self: Reservations, T: type SomeStorableObject +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} + +proc all*( + self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} template withLock(lock, body) = try: @@ -166,6 +174,7 @@ proc init*( size: UInt256, requestId: RequestId, slotIndex: UInt256, + validUntil: SecondsSince1970, ): Reservation = var id: array[32, byte] doAssert randomBytes(id) == 32 @@ -175,6 +184,7 @@ proc init*( size: size, requestId: requestId, slotIndex: slotIndex, + validUntil: validUntil, ) func toArray(id: SomeStorableId): array[32, byte] = @@ -233,6 +243,10 @@ proc exists*(self: Reservations, key: Key): Future[bool] {.async.} = let exists = await self.repo.metaDs.ds.contains(key) return exists +iterator items(self: StorableIter): Future[?seq[byte]] = + while not self.finished: + yield self.next() + proc getImpl(self: Reservations, key: Key): Future[?!seq[byte]] {.async.} = if not await self.exists(key): let err = @@ -268,57 +282,91 @@ proc updateImpl(self: Reservations, obj: SomeStorableObject): Future[?!void] {.a proc updateAvailability( self: Reservations, obj: Availability -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: availabilityId = obj.id + if obj.until < 0: + let error = + newException(UntilOutOfBoundsError, "Cannot set until to a negative value") + return failure(error) + without key =? obj.key, error: return failure(error) - without oldAvailability =? await self.get(key, Availability), err: - if err of NotExistsError: - trace "Creating new Availability" - let res = await self.updateImpl(obj) - # inform subscribers that Availability has been added - if obj.enabled and onAvailabilityAdded =? self.onAvailabilityAdded: + try: + without oldAvailability =? await self.get(key, Availability), err: + if err of NotExistsError: + trace "Creating new Availability" + let res = await self.updateImpl(obj) + # inform subscribers that Availability has been added + if onAvailabilityAdded =? self.onAvailabilityAdded: + await onAvailabilityAdded(obj) + return res + else: + return failure(err) + + if obj.until > 0: + without allReservations =? await self.all(Reservation, obj.id): + let error = newException( + GetFailedError, + "Until parameter must be greater or equal the current longest request", + ) + return failure(error) + + let requestEnds = allReservations.mapIt(it.validUntil) + + if requestEnds.len > 0 and requestEnds.max > obj.until: + let error = newException( + UntilOutOfBoundsError, + "Until parameter must be greater or equal the current longest request", + ) + return failure(error) + + # Sizing of the availability changed, we need to adjust the repo reservation accordingly + if oldAvailability.totalSize != obj.totalSize: + trace "totalSize changed, updating repo reservation" + if oldAvailability.totalSize < obj.totalSize: # storage added + if reserveErr =? ( + await self.repo.reserve( + (obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes + ) + ).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + elif oldAvailability.totalSize > obj.totalSize: # storage removed + if reserveErr =? ( + await self.repo.release( + (oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes + ) + ).errorOption: + return failure(reserveErr.toErr(ReleaseFailedError)) + + let res = await self.updateImpl(obj) + + if oldAvailability.freeSize < obj.freeSize: # availability added + # inform subscribers that Availability has been modified (with increased + # size) + if onAvailabilityAdded =? self.onAvailabilityAdded: await onAvailabilityAdded(obj) - return res - else: - return failure(err) - - # Sizing of the availability changed, we need to adjust the repo reservation accordingly - if oldAvailability.totalSize != obj.totalSize: - trace "totalSize changed, updating repo reservation" - if oldAvailability.totalSize < obj.totalSize: # storage added - if reserveErr =? ( - await self.repo.reserve( - (obj.totalSize - oldAvailability.totalSize).truncate(uint).NBytes - ) - ).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - elif oldAvailability.totalSize > obj.totalSize: # storage removed - if reserveErr =? ( - await self.repo.release( - (oldAvailability.totalSize - obj.totalSize).truncate(uint).NBytes - ) - ).errorOption: - return failure(reserveErr.toErr(ReleaseFailedError)) - - let res = await self.updateImpl(obj) - - if obj.enabled and oldAvailability.freeSize < obj.freeSize: # availability added - # inform subscribers that Availability has been modified (with increased - # size) - if onAvailabilityAdded =? self.onAvailabilityAdded: - await onAvailabilityAdded(obj) - return res + return res + except CancelledError as e: + raise e + except CatchableError as e: + error "Error when trying to update availability", error = e.msg + return failure(e) proc update*(self: Reservations, obj: Reservation): Future[?!void] {.async.} = return await self.updateImpl(obj) -proc update*(self: Reservations, obj: Availability): Future[?!void] {.async.} = - withLock(self.availabilityLock): - return await self.updateAvailability(obj) +proc update*( + self: Reservations, obj: Availability +): Future[?!void] {.async: (raises: [CancelledError]).} = + try: + withLock(self.availabilityLock): + return await self.updateAvailability(obj) + except AsyncLockError as e: + error "Lock error when trying to update the availability", err = e.msg + return failure(e) proc delete(self: Reservations, key: Key): Future[?!void] {.async.} = trace "deleting object", key @@ -390,6 +438,11 @@ proc createAvailability*( trace "creating availability", size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until + if until < 0: + let error = + newException(UntilOutOfBoundsError, "Cannot set until to a negative value") + return failure(error) + let availability = Availability.init( size, size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until ) @@ -416,6 +469,7 @@ method createReservation*( requestId: RequestId, slotIndex: UInt256, collateralPerByte: UInt256, + duration: UInt256, ): Future[?!Reservation] {.async, base.} = withLock(self.availabilityLock): without availabilityKey =? availabilityId.key, error: @@ -432,9 +486,13 @@ method createReservation*( ) return failure(error) - trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex + let validUntil = + times.now().utc().toTime().toUnix() + duration.truncate(SecondsSince1970) + trace "Creating reservation", + availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil - let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + let reservation = + Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil) if createResErr =? (await self.update(reservation)).errorOption: return failure(createResErr) @@ -563,13 +621,9 @@ proc release*( return success() -iterator items(self: StorableIter): Future[?seq[byte]] = - while not self.finished: - yield self.next() - proc storables( self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey -): Future[?!StorableIter] {.async.} = +): Future[?!StorableIter] {.async: (raises: [CancelledError]).} = var iter = StorableIter() let query = Query.init(queryKey) when T is Availability: @@ -583,54 +637,68 @@ proc storables( else: raiseAssert "unknown type" - without results =? await self.repo.metaDs.ds.query(query), error: - return failure(error) + try: + without results =? await self.repo.metaDs.ds.query(query), error: + return failure(error) - # /sales/reservations - proc next(): Future[?seq[byte]] {.async.} = - await idleAsync() - iter.finished = results.finished - if not results.finished and res =? (await results.next()) and res.data.len > 0 and - key =? res.key and key.namespaces.len == defaultKey.namespaces.len: - return some res.data + # /sales/reservations + proc next(): Future[?seq[byte]] {.async.} = + await idleAsync() + iter.finished = results.finished + if not results.finished and res =? (await results.next()) and res.data.len > 0 and + key =? res.key and key.namespaces.len == defaultKey.namespaces.len: + return some res.data - return none seq[byte] + return none seq[byte] - proc dispose(): Future[?!void] {.async.} = - return await results.dispose() + proc dispose(): Future[?!void] {.async.} = + return await results.dispose() - iter.next = next - iter.dispose = dispose - return success iter + iter.next = next + iter.dispose = dispose + return success iter + except CancelledError as e: + raise e + except CatchableError as e: + error "Cannot retrieve the storables from the datastore", error = e.msg + return failure(e) proc allImpl( self: Reservations, T: type SomeStorableObject, queryKey: Key = ReservationsKey -): Future[?!seq[T]] {.async.} = +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = var ret: seq[T] = @[] without storables =? (await self.storables(T, queryKey)), error: return failure(error) - for storable in storables.items: - without bytes =? (await storable): - continue + try: + for storable in storables.items: + without bytes =? (await storable): + continue - without obj =? T.fromJson(bytes), error: - error "json deserialization error", - json = string.fromBytes(bytes), error = error.msg - continue + without obj =? T.fromJson(bytes), error: + error "json deserialization error", + json = string.fromBytes(bytes), error = error.msg + continue - ret.add obj + ret.add obj + except CancelledError as e: + raise e + except CatchableError as e: + error "error when retrieving storable", error = e.msg + return failure(e) return success(ret) -proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} = +proc all*( + self: Reservations, T: type SomeStorableObject +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = return await self.allImpl(T) proc all*( self: Reservations, T: type SomeStorableObject, availabilityId: AvailabilityId -): Future[?!seq[T]] {.async.} = - without key =? (ReservationsKey / $availabilityId): +): Future[?!seq[T]] {.async: (raises: [CancelledError]).} = + without key =? key(availabilityId): return failure("no key") return await self.allImpl(T, key) diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index bdde1249..b21ff163 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -79,7 +79,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = without reservation =? await reservations.createReservation( availability.id, request.ask.slotSize, request.id, data.slotIndex, - request.ask.collateralPerByte, + request.ask.collateralPerByte, request.ask.duration, ), error: trace "Creation of reservation failed" # Race condition: diff --git a/tests/codex/helpers/mockreservations.nim b/tests/codex/helpers/mockreservations.nim index 060790a8..d3ec91b6 100644 --- a/tests/codex/helpers/mockreservations.nim +++ b/tests/codex/helpers/mockreservations.nim @@ -28,6 +28,7 @@ method createReservation*( requestId: RequestId, slotIndex: UInt256, collateralPerByte: UInt256, + duration: UInt256, ): Future[?!Reservation] {.async.} = if self.createReservationThrowBytesOutOfBoundsError: let error = newException( @@ -45,4 +46,5 @@ method createReservation*( requestId, slotIndex, collateralPerByte, + duration, ) diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 0dee52ae..4f136779 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -53,7 +53,7 @@ asyncchecksuite "Reservations module": proc createReservation(availability: Availability): Reservation = let size = rand(1 ..< availability.freeSize.truncate(int)) let reservation = waitFor reservations.createReservation( - availability.id, size.u256, RequestId.example, UInt256.example, 1.u256 + availability.id, size.u256, RequestId.example, UInt256.example, 1.u256, 30.u256 ) return reservation.get @@ -135,7 +135,8 @@ asyncchecksuite "Reservations module": test "cannot create reservation with non-existant availability": let availability = Availability.example let created = await reservations.createReservation( - availability.id, UInt256.example, RequestId.example, UInt256.example, 1.u256 + availability.id, UInt256.example, RequestId.example, UInt256.example, 1.u256, + 30.u256, ) check created.isErr check created.error of NotExistsError @@ -148,6 +149,7 @@ asyncchecksuite "Reservations module": RequestId.example, UInt256.example, UInt256.example, + UInt256.example, ) check created.isErr check created.error of BytesOutOfBoundsError @@ -161,11 +163,12 @@ asyncchecksuite "Reservations module": RequestId.example, UInt256.example, UInt256.example, + UInt256.example, ) let two = reservations.createReservation( availability.id, availability.totalSize, RequestId.example, UInt256.example, - UInt256.example, + UInt256.example, UInt256.example, ) let oneResult = await one @@ -280,6 +283,33 @@ asyncchecksuite "Reservations module": check availability.enabled == false check availability.until == until + test "create an availability fails when trying set until with a negative value": + let totalSize = rand(100000 .. 200000).u256 + let example = Availability.example(collateralPerByte) + let totalCollateral = totalSize * collateralPerByte + + let result = await reservations.createAvailability( + totalSize, + example.duration, + example.minPricePerBytePerSecond, + totalCollateral, + enabled = true, + until = -1.SecondsSince1970, + ) + + check result.isErr + check result.error of UntilOutOfBoundsError + + test "update an availability fails when trying set until with a negative value": + let until = getTime().toUnix() + let availability = createAvailability(until = until) + + availability.until = -1 + + let result = await reservations.update(availability) + check result.isErr + check result.error of UntilOutOfBoundsError + test "reservation can be partially released": let availability = createAvailability() let reservation = createReservation(availability) diff --git a/tests/codex/sales/testsales.nim b/tests/codex/sales/testsales.nim index 54415d02..62c6834b 100644 --- a/tests/codex/sales/testsales.nim +++ b/tests/codex/sales/testsales.nim @@ -657,9 +657,22 @@ asyncchecksuite "Sales": test "deletes inactive reservations on load": createAvailability() discard await reservations.createReservation( - availability.id, 100.u256, RequestId.example, UInt256.example, UInt256.example + availability.id, 100.u256, RequestId.example, UInt256.example, UInt256.example, + UInt256.example, ) check (await reservations.all(Reservation)).get.len == 1 await sales.load() check (await reservations.all(Reservation)).get.len == 0 check getAvailability().freeSize == availability.freeSize # was restored + + test "update an availability fails when trying change the until date before an existing reservation": + let until = getTime().toUnix() + 300.SecondsSince1970 + createAvailability(until = until) + await market.requestStorage(request) + await allowRequestToStart() + + availability.until = getTime().toUnix() + + let result = await reservations.update(availability) + check result.isErr + check result.error of UntilOutOfBoundsError diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 316660de..195f0ee9 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -278,7 +278,8 @@ proc patchAvailability*( enabled = enabled, until = until, ) - doAssert response.status == "200 OK", "expected 200 OK, got " & response.status + doAssert response.status == "204 No Content", + "expected 200 OK, got " & response.status proc getAvailabilities*(client: CodexClient): ?!seq[Availability] = ## Call sales availability REST endpoint diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index f40aba7e..9cf3803f 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -161,16 +161,15 @@ marketplacesuite "Marketplace": check purchase.error == none string let unixNow = getTime().toUnix() - let until = unixNow + 1 + let until = unixNow + 1.SecondsSince1970 - let response = host.patchAvailabilityRaw( - availabilityId = availability.id, until = cast[SecondsSince1970](until).some - ) + let response = + host.patchAvailabilityRaw(availabilityId = availability.id, until = until.some) check: - response.status == "400 Bad Request" + response.status == "422 Unprocessable Entity" response.body == - "Until parameter must be greater or equal the current longest request." + "Until parameter must be greater or equal the current longest request" marketplacesuite "Marketplace payouts": const minPricePerBytePerSecond = 1.u256 diff --git a/tests/integration/testsales.nim b/tests/integration/testsales.nim index 937c6365..31f483d6 100644 --- a/tests/integration/testsales.nim +++ b/tests/integration/testsales.nim @@ -18,7 +18,11 @@ proc findItem[T](items: seq[T], item: T): ?!T = multinodesuite "Sales": let salesConfig = NodeConfigs( clients: CodexConfigs.init(nodes = 1).some, - providers: CodexConfigs.init(nodes = 1).some, + providers: CodexConfigs.init(nodes = 1) + # .debug() # uncomment to enable console log output + # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock") + .some, ) var host: CodexClient @@ -96,6 +100,8 @@ multinodesuite "Sales": until = until.some, ) + host.restart() + let updatedAvailability = (host.getAvailabilities().get).findItem(availability).get check updatedAvailability.duration == 100 check updatedAvailability.minPricePerBytePerSecond == 2 @@ -172,19 +178,3 @@ multinodesuite "Sales": (host.getAvailabilities().get).findItem(availability).get check newUpdatedAvailability.totalSize == originalSize + 20000 check newUpdatedAvailability.freeSize - updatedAvailability.freeSize == 20000 - - test "updating availability fails with until negative", salesConfig: - let availability = host.postAvailability( - totalSize = 140000.u256, - duration = 200.u256, - minPricePerBytePerSecond = 3.u256, - totalCollateral = 300.u256, - ).get - - let response = host.patchAvailabilityRaw( - availability.id, until = cast[SecondsSince1970](-1).some - ) - - check: - response.status == "400 Bad Request" - response.body == "Until parameter must be greater or equal 0. Got: -1"