From 570a1f7b676b4d5faddd05376e287646566279e5 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Fri, 29 Sep 2023 14:33:08 +1000 Subject: [PATCH] [marketplace] Availability improvements (#535) ## Problem When Availabilities are created, the amount of bytes in the Availability are reserved in the repo, so those bytes on disk cannot be written to otherwise. When a request for storage is received by a node, if a previously created Availability is matched, an attempt will be made to fill a slot in the request (more accurately, the request's slots are added to the SlotQueue, and eventually those slots will be processed). During download, bytes that were reserved for the Availability were released (as they were written to disk). To prevent more bytes from being released than were reserved in the Availability, the Availability was marked as used during the download, so that no other requests would match the Availability, and therefore no new downloads (and byte releases) would begin. The unfortunate downside to this, is that the number of Availabilities a node has determines the download concurrency capacity. If, for example, a node creates a single Availability that covers all available disk space the operator is willing to use, that single Availability would mean that only one download could occur at a time, meaning the node could potentially miss out on storage opportunities. ## Solution To alleviate the concurrency issue, each time a slot is processed, a Reservation is created, which takes size (aka reserved bytes) away from the Availability and stores them in the Reservation object. This can be done as many times as needed as long as there are enough bytes remaining in the Availability. Therefore, concurrent downloads are no longer limited by the number of Availabilities. Instead, they would more likely be limited to the SlotQueue's `maxWorkers`. From a database design perspective, an Availability has zero or more Reservations. Reservations are persisted in the RepoStore's metadata, along with Availabilities. The metadata store key path for Reservations is ` meta / sales / reservations / / `, while Availabilities are stored one level up, eg `meta / sales / reservations / `, allowing all Reservations for an Availability to be queried (this is not currently needed, but may be useful when work to restore Availability size is implemented, more on this later). ### Lifecycle When a reservation is created, its size is deducted from the Availability, and when a reservation is deleted, any remaining size (bytes not written to disk) is returned to the Availability. If the request finishes, is cancelled (expired), or an error occurs, the Reservation is deleted (and any undownloaded bytes returned to the Availability). In addition, when the Sales module starts, any Reservations that are not actively being used in a filled slot, are deleted. Having a Reservation persisted until after a storage request is completed, will allow for the originally set Availability size to be reclaimed once a request contract has been completed. This is a feature that is yet to be implemented, however the work in this PR is a step in the direction towards enabling this. ### Unknowns Reservation size is determined by the `StorageAsk.slotSize`. If during download, more bytes than `slotSize` are attempted to be downloaded than this, then the Reservation update will fail, and the state machine will move to a `SaleErrored` state, deleting the Reservation. This will likely prevent the slot from being filled. ### Notes Based on #514 --- codex/node.nim | 8 +- codex/rest/api.nim | 25 +- codex/sales.nim | 102 +++++- codex/sales/reservations.nim | 466 ++++++++++++++++--------- codex/sales/salesagent.nim | 8 + codex/sales/salescontext.nim | 12 +- codex/sales/salesdata.nim | 1 + codex/sales/states/downloading.nim | 27 +- codex/sales/states/errored.nim | 5 +- codex/sales/states/filled.nim | 9 +- codex/sales/states/finished.nim | 3 +- codex/sales/states/ignored.nim | 3 +- codex/sales/states/preparing.nim | 32 +- codex/utils/asyncstatemachine.nim | 2 + codex/utils/exceptions.nim | 7 + codex/utils/json.nim | 10 + codex/utils/trackedfutures.nim | 4 +- tests/codex/examples.nim | 8 + tests/codex/sales/helpers.nim | 17 - tests/codex/sales/testreservations.nim | 300 ++++++++++------ tests/codex/sales/testsales.nim | 225 +++++++----- tests/codex/utils/testjson.nim | 2 +- tests/examples.nim | 6 +- 23 files changed, 818 insertions(+), 464 deletions(-) create mode 100644 codex/utils/exceptions.nim delete mode 100644 tests/codex/sales/helpers.nim diff --git a/codex/node.nim b/codex/node.nim index d7f799c2..f78e2c91 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -284,7 +284,13 @@ proc requestStorage*( let request = StorageRequest( ask: StorageAsk( slots: nodes + tolerance, - slotSize: (encoded.blockSize.int * encoded.steps).u256, + # TODO: Specify slot-specific size (as below) once dispersal is + # implemented. The current implementation downloads the entire dataset, so + # it is currently set to be the size of the entire dataset. This is + # because the slotSize is used to determine the amount of bytes to reserve + # in a Reservations + # TODO: slotSize: (encoded.blockSize.int * encoded.steps).u256, + slotSize: (encoded.blockSize.int * encoded.blocks.len).u256, duration: duration, proofProbability: proofProbability, reward: reward, diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 57633851..6c3fce33 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -340,10 +340,10 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = without contracts =? node.contracts.host: return RestApiResponse.error(Http503, "Sales unavailable") - without unused =? (await contracts.sales.context.reservations.unused), err: + without avails =? (await contracts.sales.context.reservations.all(Availability)), err: return RestApiResponse.error(Http500, err.msg) - let json = %unused + let json = %avails return RestApiResponse.response($json, contentType="application/json") router.rawApi( @@ -365,20 +365,21 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = return RestApiResponse.error(Http400, error.msg) let reservations = contracts.sales.context.reservations - # assign id to availability via init - let availability = Availability.init(restAv.size, - restAv.duration, - restAv.minPrice, - restAv.maxCollateral) - if not reservations.hasAvailable(availability.size.truncate(uint)): + if not reservations.hasAvailable(restAv.size.truncate(uint)): return RestApiResponse.error(Http422, "Not enough storage quota") - if err =? (await reservations.reserve(availability)).errorOption: - return RestApiResponse.error(Http500, err.msg) + without availability =? ( + await reservations.createAvailability( + restAv.size, + restAv.duration, + restAv.minPrice, + restAv.maxCollateral) + ), error: + return RestApiResponse.error(Http500, error.msg) - let json = %availability - return RestApiResponse.response($json, contentType="application/json") + return RestApiResponse.response(availability.toJson, + contentType="application/json") router.api( MethodGet, diff --git a/codex/sales.nim b/codex/sales.nim index c81432ec..4db0ed52 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,7 +1,7 @@ import std/sequtils import std/sugar -import std/tables import pkg/questionable +import pkg/questionable/results import pkg/stint import pkg/chronicles import pkg/datastore @@ -101,8 +101,49 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = if sales.running: sales.agents.keepItIf(it != agent) -proc filled(sales: Sales, - processing: Future[void]) = +proc cleanUp(sales: Sales, + agent: SalesAgent, + processing: Future[void]) {.async.} = + + let data = agent.data + + trace "cleaning up sales agent", + requestId = data.requestId, + slotIndex = data.slotIndex, + reservationId = data.reservation.?id |? ReservationId.default, + availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + + # TODO: return bytes that were used in the request back to the availability + # as well, which will require removing the bytes from disk (perhaps via + # setting blockTTL to -1 and then running block maintainer?) + + # delete reservation and return reservation bytes back to the availability + if reservation =? data.reservation and + deleteErr =? (await sales.context.reservations.deleteReservation( + reservation.id, + reservation.availabilityId + )).errorOption: + error "failure deleting reservation", + error = deleteErr.msg, + reservationId = reservation.id, + availabilityId = reservation.availabilityId + + await sales.remove(agent) + + # signal back to the slot queue to cycle a worker + if not processing.isNil and not processing.finished(): + processing.complete() + +proc filled( + sales: Sales, + request: StorageRequest, + slotIndex: UInt256, + processing: Future[void]) = + + if onSale =? sales.context.onSale: + onSale(request, slotIndex) + + # signal back to the slot queue to cycle a worker if not processing.isNil and not processing.finished(): processing.complete() @@ -117,15 +158,39 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.context.onCleanUp = proc {.async.} = - await sales.remove(agent) + agent.onCleanUp = proc {.async.} = + await sales.cleanUp(agent, done) - agent.context.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = - sales.filled(done) + agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = + sales.filled(request, slotIndex, done) agent.start(SalePreparing()) sales.agents.add agent +proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} = + let reservations = sales.context.reservations + without reservs =? await reservations.all(Reservation): + info "no unused reservations found for deletion" + + let unused = reservs.filter(r => ( + let slotId = slotId(r.requestId, r.slotIndex) + not activeSlots.any(slot => slot.id == slotId) + )) + info "found unused reservations for deletion", unused = unused.len + + for reservation in unused: + + logScope: + reservationId = reservation.id + availabilityId = reservation.availabilityId + + if err =? (await reservations.deleteReservation( + reservation.id, reservation.availabilityId + )).errorOption: + error "failed to delete unused reservation", error = err.msg + else: + trace "deleted unused reservation" + proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = let market = sales.context.market let slotIds = await market.mySlots() @@ -139,21 +204,26 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc load*(sales: Sales) {.async.} = - let slots = await sales.mySlots() + let activeSlots = await sales.mySlots() - for slot in slots: + await sales.deleteInactiveReservations(activeSlots) + + for slot in activeSlots: let agent = newSalesAgent( sales.context, slot.request.id, slot.slotIndex, some slot.request) - agent.context.onCleanUp = proc {.async.} = await sales.remove(agent) + agent.onCleanUp = proc {.async.} = + let done = newFuture[void]("onCleanUp_Dummy") + await sales.cleanUp(agent, done) + await done # completed in sales.cleanUp agent.start(SaleUnknown()) sales.agents.add agent -proc onReservationAdded(sales: Sales, availability: Availability) {.async.} = +proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = ## Query last 256 blocks for new requests, adding them to the queue. `push` ## checks for availability before adding to the queue. If processed, the ## sales agent will check if the slot is free. @@ -162,9 +232,9 @@ proc onReservationAdded(sales: Sales, availability: Availability) {.async.} = let queue = context.slotQueue logScope: - topics = "marketplace sales onReservationAdded callback" + topics = "marketplace sales onAvailabilityAdded callback" - trace "reservation added, querying past storage requests to add to queue" + trace "availability added, querying past storage requests to add to queue" try: let events = await market.queryPastStorageRequests(256) @@ -384,10 +454,10 @@ proc startSlotQueue(sales: Sales) {.async.} = asyncSpawn slotQueue.start() - reservations.onReservationAdded = - proc(availability: Availability) {.async.} = - await sales.onReservationAdded(availability) + proc onAvailabilityAdded(availability: Availability) {.async.} = + await sales.onAvailabilityAdded(availability) + reservations.onAvailabilityAdded = onAvailabilityAdded proc subscribe(sales: Sales) {.async.} = await sales.subscribeRequested() diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index e03f5dd8..ec5d2777 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -6,56 +6,79 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. +## +## +--------------------------------------+ +## | RESERVATION | +## +--------------------------------------+ |--------------------------------------| +## | AVAILABILITY | | ReservationId | id | PK | +## |--------------------------------------| |--------------------------------------| +## | AvailabilityId | id | PK |<-||-------o<-| AvailabilityId | availabilityId | FK | +## |--------------------------------------| |--------------------------------------| +## | UInt256 | size | | | UInt256 | size | | +## |--------------------------------------| |--------------------------------------| +## | UInt256 | duration | | | SlotId | slotId | | +## |--------------------------------------| +--------------------------------------+ +## | UInt256 | minPrice | | +## |--------------------------------------| +## | UInt256 | maxCollateral | | +## +--------------------------------------+ + +import pkg/upraises +push: {.upraises: [].} import std/typetraits - import pkg/chronos import pkg/chronicles -import pkg/upraises -import pkg/json_serialization -import pkg/json_serialization/std/options -import pkg/stint -import pkg/stew/byteutils +import pkg/datastore import pkg/nimcrypto import pkg/questionable import pkg/questionable/results -import ../utils/json - -push: {.upraises: [].} - -import pkg/datastore +import pkg/stint +import pkg/stew/byteutils import ../stores import ../contracts/requests +import ../utils/json export requests +export chronicles logScope: - topics = "reservations" + topics = "sales reservations" type AvailabilityId* = distinct array[32, byte] - Availability* = object + ReservationId* = distinct array[32, byte] + SomeStorableObject = Availability | Reservation + SomeStorableId = AvailabilityId | ReservationId + Availability* = ref object id* {.serialize.}: AvailabilityId size* {.serialize.}: UInt256 duration* {.serialize.}: UInt256 minPrice* {.serialize.}: UInt256 maxCollateral* {.serialize.}: UInt256 - used*: bool + Reservation* = ref object + id* {.serialize.}: ReservationId + availabilityId* {.serialize.}: AvailabilityId + size* {.serialize.}: UInt256 + requestId* {.serialize.}: RequestId + slotIndex* {.serialize.}: UInt256 Reservations* = ref object repo: RepoStore - onReservationAdded: ?OnReservationAdded - GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.} - OnReservationAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} - AvailabilityIter* = ref object + onAvailabilityAdded: ?OnAvailabilityAdded + GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} + OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} + StorableIter* = ref object finished*: bool next*: GetNext - AvailabilityError* = object of CodexError - AvailabilityAlreadyExistsError* = object of AvailabilityError - AvailabilityReserveFailedError* = object of AvailabilityError - AvailabilityReleaseFailedError* = object of AvailabilityError - AvailabilityDeleteFailedError* = object of AvailabilityError - AvailabilityGetFailedError* = object of AvailabilityError - AvailabilityUpdateFailedError* = object of AvailabilityError + ReservationsError* = object of CodexError + ReserveFailedError* = object of ReservationsError + ReleaseFailedError* = object of ReservationsError + DeleteFailedError* = object of ReservationsError + GetFailedError* = object of ReservationsError + NotExistsError* = object of ReservationsError + SerializationError* = object of ReservationsError + UpdateFailedError* = object of ReservationsError + BytesOutOfBoundsError* = object of ReservationsError const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module @@ -77,10 +100,29 @@ proc init*( doAssert randomBytes(id) == 32 Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice, maxCollateral: maxCollateral) -func toArray*(id: AvailabilityId): array[32, byte] = +proc init*( + _: type Reservation, + availabilityId: AvailabilityId, + size: UInt256, + requestId: RequestId, + slotIndex: UInt256 +): Reservation = + + var id: array[32, byte] + doAssert randomBytes(id) == 32 + Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex) + +func toArray(id: SomeStorableId): array[32, byte] = array[32, byte](id) proc `==`*(x, y: AvailabilityId): bool {.borrow.} +proc `==`*(x, y: ReservationId): bool {.borrow.} +proc `==`*(x, y: Reservation): bool = + x.id == y.id and + x.availabilityId == y.availabilityId and + x.size == y.size and + x.requestId == y.requestId and + x.slotIndex == y.slotIndex proc `==`*(x, y: Availability): bool = x.id == y.id and x.size == y.size and @@ -88,9 +130,9 @@ proc `==`*(x, y: Availability): bool = x.maxCollateral == y.maxCollateral and x.minPrice == y.minPrice -proc `$`*(id: AvailabilityId): string = id.toArray.toHex +proc `$`*(id: SomeStorableId): string = id.toArray.toHex -proc toErr[E1: ref CatchableError, E2: AvailabilityError]( +proc toErr[E1: ref CatchableError, E2: ReservationsError]( e1: E1, _: type E2, msg: string = e1.msg): ref E2 = @@ -99,28 +141,30 @@ proc toErr[E1: ref CatchableError, E2: AvailabilityError]( proc writeValue*( writer: var JsonWriter, - value: AvailabilityId) {.upraises:[IOError].} = + value: SomeStorableId) {.upraises:[IOError].} = + ## used for chronicles' logs mixin writeValue - writer.writeValue value.toArray + writer.writeValue %value -proc readValue*[T: AvailabilityId]( - reader: var JsonReader, - value: var T) {.upraises: [SerializationError, IOError].} = +proc `onAvailabilityAdded=`*(self: Reservations, + onAvailabilityAdded: OnAvailabilityAdded) = + self.onAvailabilityAdded = some onAvailabilityAdded - mixin readValue - value = T reader.readValue(T.distinctBase) +func key*(id: AvailabilityId): ?!Key = + ## sales / reservations / + (ReservationsKey / $id) -proc `onReservationAdded=`*(self: Reservations, - onReservationAdded: OnReservationAdded) = - self.onReservationAdded = some onReservationAdded - -func key(id: AvailabilityId): ?!Key = - (ReservationsKey / id.toArray.toHex) +func key*(reservationId: ReservationId, availabilityId: AvailabilityId): ?!Key = + ## sales / reservations / / + (availabilityId.key / $reservationId) func key*(availability: Availability): ?!Key = return availability.id.key +func key*(reservation: Reservation): ?!Key = + return key(reservation.id, reservation.availabilityId) + func available*(self: Reservations): uint = self.repo.available func hasAvailable*(self: Reservations, bytes: uint): bool = @@ -128,84 +172,123 @@ func hasAvailable*(self: Reservations, bytes: uint): bool = proc exists*( self: Reservations, - id: AvailabilityId): Future[?!bool] {.async.} = - - without key =? id.key, err: - return failure(err) + key: Key): Future[bool] {.async.} = let exists = await self.repo.metaDs.contains(key) - return success(exists) + return exists + +proc getImpl( + self: Reservations, + key: Key): Future[?!seq[byte]] {.async.} = + + if exists =? (await self.exists(key)) and not exists: + let err = newException(NotExistsError, "object with key " & $key & " does not exist") + return failure(err) + + without serialized =? await self.repo.metaDs.get(key), error: + return failure(error.toErr(GetFailedError)) + + return success serialized proc get*( self: Reservations, - id: AvailabilityId): Future[?!Availability] {.async.} = + key: Key, + T: type SomeStorableObject): Future[?!T] {.async.} = - if exists =? (await self.exists(id)) and not exists: - let err = newException(AvailabilityGetFailedError, - "Availability does not exist") - return failure(err) + without serialized =? await self.getImpl(key), error: + return failure(error) - without key =? id.key, err: - return failure(err.toErr(AvailabilityGetFailedError)) + without obj =? T.fromJson(serialized), error: + return failure(error.toErr(SerializationError)) - without serialized =? await self.repo.metaDs.get(key), err: - return failure(err.toErr(AvailabilityGetFailedError)) - - without availability =? Json.decode(serialized, Availability).catch, err: - return failure(err.toErr(AvailabilityGetFailedError)) - - return success availability + return success obj proc update( self: Reservations, - availability: Availability): Future[?!void] {.async.} = + obj: SomeStorableObject): Future[?!void] {.async.} = - trace "updating availability", id = availability.id, size = availability.size, - used = availability.used + trace "updating " & $(obj.type), id = obj.id, size = obj.size - without key =? availability.key, err: - return failure(err) + without key =? obj.key, error: + return failure(error) if err =? (await self.repo.metaDs.put( key, - @(availability.toJson.toBytes))).errorOption: - return failure(err.toErr(AvailabilityUpdateFailedError)) + @(obj.toJson.toBytes) + )).errorOption: + return failure(err.toErr(UpdateFailedError)) return success() proc delete( self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = + key: Key): Future[?!void] {.async.} = - trace "deleting availability", id + trace "deleting object", key - without availability =? (await self.get(id)), err: - return failure(err) - - without key =? availability.key, err: - return failure(err) + if exists =? (await self.exists(key)) and not exists: + return success() if err =? (await self.repo.metaDs.delete(key)).errorOption: - return failure(err.toErr(AvailabilityDeleteFailedError)) + return failure(err.toErr(DeleteFailedError)) return success() -proc reserve*( +proc deleteReservation*( self: Reservations, - availability: Availability): Future[?!void] {.async.} = + reservationId: ReservationId, + availabilityId: AvailabilityId): Future[?!void] {.async.} = - if exists =? (await self.exists(availability.id)) and exists: - let err = newException(AvailabilityAlreadyExistsError, - "Availability already exists") - return failure(err) + logScope: + reservationId + availabilityId - without key =? availability.key, err: - return failure(err) + trace "deleting reservation" + without key =? key(reservationId, availabilityId), error: + return failure(error) + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) + + if reservation.size > 0.u256: + trace "returning remaining reservation bytes to availability", + size = reservation.size + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.size += reservation.size + + if updateErr =? (await self.update(availability)).errorOption: + return failure(updateErr) + + if err =? (await self.repo.metaDs.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) + + return success() + +proc createAvailability*( + self: Reservations, + size: UInt256, + duration: UInt256, + minPrice: UInt256, + maxCollateral: UInt256): Future[?!Availability] {.async.} = + + trace "creating availability", size, duration, minPrice, maxCollateral + + let availability = Availability.init( + size, duration, minPrice, maxCollateral + ) let bytes = availability.size.truncate(uint) if reserveErr =? (await self.repo.reserve(bytes)).errorOption: - return failure(reserveErr.toErr(AvailabilityReserveFailedError)) + return failure(reserveErr.toErr(ReserveFailedError)) if updateErr =? (await self.update(availability)).errorOption: @@ -217,144 +300,192 @@ proc reserve*( return failure(updateErr) - if onReservationAdded =? self.onReservationAdded: + if onAvailabilityAdded =? self.onAvailabilityAdded: try: - await onReservationAdded(availability) + await onAvailabilityAdded(availability) except CatchableError as e: # we don't have any insight into types of errors that `onProcessSlot` can # throw because it is caller-defined - warn "Unknown error during 'onReservationAdded' callback", + warn "Unknown error during 'onAvailabilityAdded' callback", availabilityId = availability.id, error = e.msg - return success() + return success(availability) + +proc createReservation*( + self: Reservations, + availabilityId: AvailabilityId, + slotSize: UInt256, + requestId: RequestId, + slotIndex: UInt256 +): Future[?!Reservation] {.async.} = + + trace "creating reservation", availabilityId, slotSize, requestId, slotIndex + + let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + if availability.size < slotSize: + let error = newException(BytesOutOfBoundsError, "trying to reserve an " & + "amount of bytes that is greater than the total size of the Availability") + return failure(error) + + if createResErr =? (await self.update(reservation)).errorOption: + return failure(createResErr) + + # reduce availability size by the slot size, which is now accounted for in + # the newly created Reservation + availability.size -= slotSize + + # update availability with reduced size + if updateErr =? (await self.update(availability)).errorOption: + + trace "rolling back reservation creation" + + without key =? reservation.key, keyError: + keyError.parent = updateErr + return failure(keyError) + + # rollback the reservation creation + if rollbackErr =? (await self.delete(key)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) + + return success(reservation) proc release*( self: Reservations, - id: AvailabilityId, + reservationId: ReservationId, + availabilityId: AvailabilityId, bytes: uint): Future[?!void] {.async.} = - trace "releasing bytes and updating availability", bytes, id + logScope: + topics = "release" + bytes + reservationId + availabilityId - without var availability =? (await self.get(id)), err: - return failure(err) + trace "releasing bytes and updating reservation" - without key =? id.key, err: - return failure(err) + without key =? key(reservationId, availabilityId), error: + return failure(error) + + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) + + if reservation.size < bytes.u256: + let error = newException(BytesOutOfBoundsError, + "trying to release an amount of bytes that is greater than the total " & + "size of the Reservation") + return failure(error) if releaseErr =? (await self.repo.release(bytes)).errorOption: - return failure(releaseErr.toErr(AvailabilityReleaseFailedError)) + return failure(releaseErr.toErr(ReleaseFailedError)) - availability.size = (availability.size.truncate(uint) - bytes).u256 + reservation.size -= bytes.u256 - template rollbackRelease(e: ref CatchableError) = + # persist partially used Reservation with updated size + if err =? (await self.update(reservation)).errorOption: + + # rollback release if an update error encountered trace "rolling back release" if rollbackErr =? (await self.repo.reserve(bytes)).errorOption: - rollbackErr.parent = e + rollbackErr.parent = err return failure(rollbackErr) - - # remove completely used availabilities - if availability.size == 0.u256: - if err =? (await self.delete(availability.id)).errorOption: - rollbackRelease(err) - return failure(err) - - return success() - - # persist partially used availability with updated size - if err =? (await self.update(availability)).errorOption: - rollbackRelease(err) return failure(err) return success() - -proc markUsed*( - self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = - - without var availability =? (await self.get(id)), err: - return failure(err) - - availability.used = true - let r = await self.update(availability) - if r.isOk: - trace "availability marked used", id = id.toArray.toHex - return r - -proc markUnused*( - self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = - - without var availability =? (await self.get(id)), err: - return failure(err) - - availability.used = false - let r = await self.update(availability) - if r.isOk: - trace "availability marked unused", id = id.toArray.toHex - return r - -iterator items*(self: AvailabilityIter): Future[?Availability] = +iterator items(self: StorableIter): Future[?seq[byte]] = while not self.finished: yield self.next() -proc availabilities*( - self: Reservations): Future[?!AvailabilityIter] {.async.} = +proc storables( + self: Reservations, + T: type SomeStorableObject +): Future[?!StorableIter] {.async.} = - var iter = AvailabilityIter() + var iter = StorableIter() let query = Query.init(ReservationsKey) + when T is Availability: + # should indicate key length of 4, but let the .key logic determine it + without defaultKey =? AvailabilityId.default.key, error: + return failure(error) + elif T is Reservation: + # should indicate key length of 5, but let the .key logic determine it + without defaultKey =? key(ReservationId.default, AvailabilityId.default), error: + return failure(error) + else: + raiseAssert "unknown type" - without results =? await self.repo.metaDs.query(query), err: - return failure(err) + without results =? await self.repo.metaDs.query(query), error: + return failure(error) - proc next(): Future[?Availability] {.async.} = + proc next(): Future[?seq[byte]] {.async.} = await idleAsync() iter.finished = results.finished if not results.finished and - r =? (await results.next()) and - serialized =? r.data and - serialized.len > 0: + res =? (await results.next()) and + res.data.len > 0 and + key =? res.key and + key.namespaces.len == defaultKey.namespaces.len: - return some Json.decode(string.fromBytes(serialized), Availability) + return some res.data - return none Availability + return none seq[byte] iter.next = next return success iter -proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} = - var ret: seq[Availability] = @[] +proc all*( + self: Reservations, + T: type SomeStorableObject +): Future[?!seq[T]] {.async.} = - without availabilities =? (await r.availabilities), err: - return failure(err) + var ret: seq[T] = @[] - for a in availabilities: - if availability =? (await a) and not availability.used: - ret.add availability + without storables =? (await self.storables(T)), error: + return failure(error) + + for storable in storables.items: + without bytes =? (await storable): + continue + + without obj =? T.fromJson(bytes), error: + error "json deserialization error", + json = string.fromBytes(bytes), + error = error.msg + continue + + ret.add obj return success(ret) -proc find*( +proc findAvailability*( self: Reservations, - size, duration, minPrice, collateral: UInt256, - used: bool): Future[?Availability] {.async.} = + size, duration, minPrice, collateral: UInt256 +): Future[?Availability] {.async.} = - - without availabilities =? (await self.availabilities), err: - error "failed to get all availabilities", error = err.msg + without storables =? (await self.storables(Availability)), e: + error "failed to get all storables", error = e.msg return none Availability - for a in availabilities: - if availability =? (await a): + for item in storables.items: + if bytes =? (await item) and + availability =? Availability.fromJson(bytes): - if used == availability.used and - size <= availability.size and + if size <= availability.size and duration <= availability.duration and collateral <= availability.maxCollateral and minPrice >= availability.minPrice: trace "availability matched", - used, availUsed = availability.used, size, availsize = availability.size, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, @@ -363,7 +494,6 @@ proc find*( return some availability trace "availiability did not match", - used, availUsed = availability.used, size, availsize = availability.size, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 3f84ff9b..97cf1387 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -21,6 +21,14 @@ type context*: SalesContext data*: SalesData subscribed: bool + # Slot-level callbacks. + onCleanUp*: OnCleanUp + onFilled*: ?OnFilled + + OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} + OnFilled* = proc(request: StorageRequest, + slotIndex: UInt256) {.gcsafe, upraises: [].} + SalesAgentError = object of CodexError AllSlotsFilledError* = object of SalesAgentError diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index fd3e5b6f..a823e78a 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -11,11 +11,11 @@ type SalesContext* = ref object market*: Market clock*: Clock + # Sales-level callbacks. Closure will be overwritten each time a slot is + # processed. onStore*: ?OnStore onClear*: ?OnClear onSale*: ?OnSale - onFilled*: ?OnFilled - onCleanUp*: OnCleanUp onProve*: ?OnProve reservations*: Reservations slotQueue*: SlotQueue @@ -29,11 +29,3 @@ type slotIndex: UInt256) {.gcsafe, upraises: [].} OnSale* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} - - # OnFilled has same function as OnSale, but is kept for internal purposes and should not be set by any external - # purposes as it is used for freeing Queue Workers after slot is filled. And the callbacks allows only - # one callback to be set, so if some other component would use it, it would override the Slot Queue freeing - # mechanism which would lead to blocking of the queue. - OnFilled* = proc(request: StorageRequest, - slotIndex: UInt256) {.gcsafe, upraises: [].} - OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} diff --git a/codex/sales/salesdata.nim b/codex/sales/salesdata.nim index 0e975ac1..7fd56149 100644 --- a/codex/sales/salesdata.nim +++ b/codex/sales/salesdata.nim @@ -10,3 +10,4 @@ type request*: ?StorageRequest slotIndex*: UInt256 cancelled*: Future[void] + reservation*: ?Reservation diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 59e59a6f..50f4f8e6 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -14,10 +14,9 @@ import ./errored type SaleDownloading* = ref object of ErrorHandlingState - availability*: Availability logScope: - topics = "marketplace sales downloading" + topics = "marketplace sales downloading" method `$`*(state: SaleDownloading): string = "SaleDownloading" @@ -36,7 +35,6 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} let data = agent.data let context = agent.context let reservations = context.reservations - let availability = state.availability without onStore =? context.onStore: raiseAssert "onStore callback not set" @@ -47,9 +45,14 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} without slotIndex =? data.slotIndex: raiseAssert("no slot index assigned") - # mark availability as used so that it is not matched to other requests - if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption: - return some State(SaleErrored(error: markUsedErr)) + without reservation =? data.reservation: + raiseAssert("no reservation") + + logScope: + requestId = request.id + slotIndex + reservationId = reservation.id + availabilityId = reservation.availabilityId proc onBatch(blocks: seq[bt.Block]) {.async.} = # release batches of blocks as they are written to disk and @@ -59,25 +62,19 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} bytes += blk.data.len.uint trace "Releasing batch of bytes written to disk", bytes - let r = await reservations.release(availability.id, bytes) + let r = await reservations.release(reservation.id, + reservation.availabilityId, + bytes) # `tryGet` will raise the exception that occurred during release, if there # was one. The exception will be caught in the closure and sent to the # SaleErrored state. r.tryGet() - template markUnused(id: AvailabilityId) = - if markUnusedErr =? (await reservations.markUnused(id)).errorOption: - return some State(SaleErrored(error: markUnusedErr)) - trace "Starting download" if err =? (await onStore(request, slotIndex, onBatch)).errorOption: - - markUnused(availability.id) return some State(SaleErrored(error: err)) trace "Download complete" - - markUnused(availability.id) return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 72f22e98..4533583e 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -4,6 +4,7 @@ import pkg/upraises import pkg/chronicles import ../statemachine import ../salesagent +import ../../utils/exceptions logScope: topics = "marketplace sales errored" @@ -21,13 +22,13 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = let data = agent.data let context = agent.context - error "Sale error", error=state.error.msg, requestId = $data.requestId + error "Sale error", error=state.error.msgDetail, requestId = data.requestId, slotIndex = data.slotIndex if onClear =? context.onClear and request =? data.request and slotIndex =? data.slotIndex: onClear(request, slotIndex) - if onCleanUp =? context.onCleanUp: + if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index 25c14dde..2ba14672 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -26,8 +26,9 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State = method `$`*(state: SaleFilled): string = "SaleFilled" method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = - let data = SalesAgent(machine).data - let context = SalesAgent(machine).context + let agent = SalesAgent(machine) + let data = agent.data + let context = agent.context let market = context.market without slotIndex =? data.slotIndex: @@ -39,9 +40,7 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = info "Slot succesfully filled", requestId = $data.requestId, slotIndex if request =? data.request and slotIndex =? data.slotIndex: - if onSale =? context.onSale: - onSale(request, slotIndex) - if onFilled =? context.onFilled: + if onFilled =? agent.onFilled: onFilled(request, slotIndex) when codex_enable_proof_failures: diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 7cc5ba7e..1a8c5151 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -23,7 +23,6 @@ method onFailed*(state: SaleFinished, request: StorageRequest): ?State = method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) let data = agent.data - let context = agent.context without request =? data.request: raiseAssert "no sale request" @@ -33,5 +32,5 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = info "Slot finished and paid out", requestId = $data.requestId, slotIndex - if onCleanUp =? context.onCleanUp: + if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index a7ce7e6c..5955937e 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -14,7 +14,6 @@ method `$`*(state: SaleIgnored): string = "SaleIgnored" method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) - let context = agent.context - if onCleanUp =? context.onCleanUp: + if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index b0c5120c..d6a2270e 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -10,6 +10,7 @@ import ./failed import ./filled import ./ignored import ./downloading +import ./errored type SalePreparing* = ref object of ErrorHandlingState @@ -50,20 +51,33 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = # TODO: Once implemented, check to ensure the host is allowed to fill the slot, # due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal) + logScope: + slotIndex = data.slotIndex + slotSize = request.ask.slotSize + duration = request.ask.duration + pricePerSlot = request.ask.pricePerSlot + # availability was checked for this slot when it entered the queue, however # check to the ensure that there is still availability as they may have # changed since being added (other slots may have been processed in that time) - without availability =? await reservations.find( + without availability =? await reservations.findAvailability( request.ask.slotSize, request.ask.duration, request.ask.pricePerSlot, - request.ask.collateral, - used = false): - info "no availability found for request, ignoring", - slotSize = request.ask.slotSize, - duration = request.ask.duration, - pricePerSlot = request.ask.pricePerSlot, - used = false + request.ask.collateral): + debug "no availability found for request, ignoring" + return some State(SaleIgnored()) - return some State(SaleDownloading(availability: availability)) + info "availability found for request, creating reservation" + + without reservation =? await reservations.createReservation( + availability.id, + request.ask.slotSize, + request.id, + data.slotIndex + ), error: + return some State(SaleErrored(error: error)) + + data.reservation = some reservation + return some State(SaleDownloading()) diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index 3d49f741..b01cba79 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -103,6 +103,8 @@ proc stop*(machine: Machine) {.async.} = if not machine.started: return + trace "stopping state machine" + machine.started = false await machine.trackedFutures.cancelTracked() diff --git a/codex/utils/exceptions.nim b/codex/utils/exceptions.nim new file mode 100644 index 00000000..4aa2af95 --- /dev/null +++ b/codex/utils/exceptions.nim @@ -0,0 +1,7 @@ +import std/strformat + +proc msgDetail*(e: ref CatchableError): string = + var msg = e.msg + if e.parent != nil: + msg = fmt"{msg} Inner exception: {e.parent.msg}" + return msg \ No newline at end of file diff --git a/codex/utils/json.nim b/codex/utils/json.nim index 1960f7af..a893ce5f 100644 --- a/codex/utils/json.nim +++ b/codex/utils/json.nim @@ -234,6 +234,13 @@ proc fromJson*[T: object]( let json = ?catch parseJson(string.fromBytes(bytes)) T.fromJson(json) +proc fromJson*[T: ref object]( + _: type T, + bytes: seq[byte] +): ?!T = + let json = ?catch parseJson(string.fromBytes(bytes)) + T.fromJson(json) + func `%`*(s: string): JsonNode = newJString(s) func `%`*(n: uint): JsonNode = @@ -307,6 +314,9 @@ func `%`*[T: distinct](id: T): JsonNode = type baseType = T.distinctBase % baseType(id) +func toJson*(obj: object): string = $(%obj) +func toJson*(obj: ref object): string = $(%obj) + proc toJsnImpl(x: NimNode): NimNode = case x.kind of nnkBracket: # array diff --git a/codex/utils/trackedfutures.nim b/codex/utils/trackedfutures.nim index ea26c4ae..064bf9e3 100644 --- a/codex/utils/trackedfutures.nim +++ b/codex/utils/trackedfutures.nim @@ -16,14 +16,12 @@ proc len*(self: TrackedFutures): int = self.futures.len proc removeFuture(self: TrackedFutures, future: FutureBase) = if not self.cancelling and not future.isNil: - trace "removing tracked future" self.futures.del(future.id) proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] = if self.cancelling: return fut - trace "tracking future", id = fut.id self.futures[fut.id] = FutureBase(fut) fut @@ -42,6 +40,8 @@ proc track*[T, U](future: Future[T], self: U): Future[T] = proc cancelTracked*(self: TrackedFutures) {.async.} = self.cancelling = true + trace "cancelling tracked futures" + for future in self.futures.values: if not future.isNil and not future.finished: trace "cancelling tracked future", id = future.id diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index fce8c46b..f85ae67d 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -60,3 +60,11 @@ proc example*(_: type Availability): Availability = minPrice = uint64.example.u256, maxCollateral = uint16.example.u256 ) + +proc example*(_: type Reservation): Reservation = + Reservation.init( + availabilityId = AvailabilityId(array[32, byte].example), + size = uint16.example.u256, + slotId = SlotId.example + ) + diff --git a/tests/codex/sales/helpers.nim b/tests/codex/sales/helpers.nim deleted file mode 100644 index 9ba250ca..00000000 --- a/tests/codex/sales/helpers.nim +++ /dev/null @@ -1,17 +0,0 @@ -import pkg/chronos -import pkg/questionable -import pkg/questionable/results - -import pkg/codex/sales/reservations -import ../helpers - -export checktest - -proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} = - var ret: seq[Availability] = @[] - without availabilities =? (await r.availabilities), err: - raiseAssert "failed to get availabilities, error: " & err.msg - for a in availabilities: - if availability =? (await a): - ret.add availability - return ret diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index db02d46d..4a7d94d7 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -1,40 +1,60 @@ +import std/random + import pkg/questionable import pkg/questionable/results - import pkg/chronos import pkg/asynctest import pkg/datastore -import pkg/json_serialization -import pkg/json_serialization/std/options import pkg/codex/stores import pkg/codex/sales +import pkg/codex/utils/json import ../examples -import ./helpers +import ../helpers asyncchecksuite "Reservations module": var repo: RepoStore repoDs: Datastore metaDs: SQLiteDatastore - availability: Availability reservations: Reservations setup: + randomize(1.int64) # create reproducible results repoDs = SQLiteDatastore.new(Memory).tryGet() metaDs = SQLiteDatastore.new(Memory).tryGet() repo = RepoStore.new(repoDs, metaDs) reservations = Reservations.new(repo) - availability = Availability.example + + proc createAvailability(): Availability = + let example = Availability.example + let size = rand(100000..200000) + let availability = waitFor reservations.createAvailability( + size.u256, + example.duration, + example.minPrice, + example.maxCollateral + ) + return availability.get + + proc createReservation(availability: Availability): Reservation = + let size = rand(1.. 0 check market.filled[0].requestId == request.id @@ -378,19 +401,15 @@ asyncchecksuite "Sales": check market.filled[0].host == await market.getSigner() test "calls onFilled when slot is filled": - var soldAvailability: Availability - var soldRequest: StorageRequest - var soldSlotIndex: UInt256 + var soldRequest = StorageRequest.default + var soldSlotIndex = UInt256.high sales.onSale = proc(request: StorageRequest, slotIndex: UInt256) = - if a =? availability: - soldAvailability = a soldRequest = request soldSlotIndex = slotIndex - check isOk await reservations.reserve(availability) + createAvailability() await market.requestStorage(request) - check eventually soldAvailability == availability - check soldRequest == request + check eventually soldRequest == request check soldSlotIndex < request.ask.slots.u256 test "calls onClear when storage becomes available again": @@ -404,7 +423,7 @@ asyncchecksuite "Sales": slotIndex: UInt256) = clearedRequest = request clearedSlotIndex = slotIndex - check isOk await reservations.reserve(availability) + createAvailability() await market.requestStorage(request) check eventually clearedRequest == request check clearedSlotIndex < request.ask.slots.u256 @@ -416,22 +435,24 @@ asyncchecksuite "Sales": onBatch: BatchProc): Future[?!void] {.async.} = await sleepAsync(chronos.hours(1)) return success() - check isOk await reservations.reserve(availability) + createAvailability() await market.requestStorage(request) for slotIndex in 0.. agent.data.requestId == request.id and agent.data.slotIndex == 0.u256) check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256) + + test "deletes inactive reservations on load": + createAvailability() + discard await reservations.createReservation( + availability.id, + 100.u256, + RequestId.example, + UInt256.example) + check (await reservations.all(Reservation)).get.len == 1 + await sales.load() + check (await reservations.all(Reservation)).get.len == 0 + check getAvailability().size == availability.size # was restored diff --git a/tests/codex/utils/testjson.nim b/tests/codex/utils/testjson.nim index 26c2fe49..059f80ef 100644 --- a/tests/codex/utils/testjson.nim +++ b/tests/codex/utils/testjson.nim @@ -211,7 +211,7 @@ checksuite "json serialization": }, "expiry": "1691545330" }""".flatten - check $(%request) == expected + check request.toJson == expected test "deserializes UInt256 from non-hex string representation": let json = newJString("100000") diff --git a/tests/examples.nim b/tests/examples.nim index e14f5149..a3171f27 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -1,6 +1,7 @@ import std/random import std/sequtils import std/times +import std/typetraits import pkg/codex/contracts/requests import pkg/codex/sales/slotqueue import pkg/stint @@ -19,8 +20,9 @@ proc example*[T](_: type seq[T]): seq[T] = proc example*(_: type UInt256): UInt256 = UInt256.fromBytes(array[32, byte].example) -proc example*[T: RequestId | SlotId | Nonce](_: type T): T = - T(array[32, byte].example) +proc example*[T: distinct](_: type T): T = + type baseType = T.distinctBase + T(baseType.example) proc example*(_: type StorageRequest): StorageRequest = StorageRequest(