diff --git a/codex/node.nim b/codex/node.nim index d7f799c2..f78e2c91 100644 --- a/codex/node.nim +++ b/codex/node.nim @@ -284,7 +284,13 @@ proc requestStorage*( let request = StorageRequest( ask: StorageAsk( slots: nodes + tolerance, - slotSize: (encoded.blockSize.int * encoded.steps).u256, + # TODO: Specify slot-specific size (as below) once dispersal is + # implemented. The current implementation downloads the entire dataset, so + # it is currently set to be the size of the entire dataset. This is + # because the slotSize is used to determine the amount of bytes to reserve + # in a Reservations + # TODO: slotSize: (encoded.blockSize.int * encoded.steps).u256, + slotSize: (encoded.blockSize.int * encoded.blocks.len).u256, duration: duration, proofProbability: proofProbability, reward: reward, diff --git a/codex/rest/api.nim b/codex/rest/api.nim index 57633851..6c3fce33 100644 --- a/codex/rest/api.nim +++ b/codex/rest/api.nim @@ -340,10 +340,10 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = without contracts =? node.contracts.host: return RestApiResponse.error(Http503, "Sales unavailable") - without unused =? (await contracts.sales.context.reservations.unused), err: + without avails =? (await contracts.sales.context.reservations.all(Availability)), err: return RestApiResponse.error(Http500, err.msg) - let json = %unused + let json = %avails return RestApiResponse.response($json, contentType="application/json") router.rawApi( @@ -365,20 +365,21 @@ proc initRestApi*(node: CodexNodeRef, conf: CodexConf): RestRouter = return RestApiResponse.error(Http400, error.msg) let reservations = contracts.sales.context.reservations - # assign id to availability via init - let availability = Availability.init(restAv.size, - restAv.duration, - restAv.minPrice, - restAv.maxCollateral) - if not reservations.hasAvailable(availability.size.truncate(uint)): + if not reservations.hasAvailable(restAv.size.truncate(uint)): return RestApiResponse.error(Http422, "Not enough storage quota") - if err =? (await reservations.reserve(availability)).errorOption: - return RestApiResponse.error(Http500, err.msg) + without availability =? ( + await reservations.createAvailability( + restAv.size, + restAv.duration, + restAv.minPrice, + restAv.maxCollateral) + ), error: + return RestApiResponse.error(Http500, error.msg) - let json = %availability - return RestApiResponse.response($json, contentType="application/json") + return RestApiResponse.response(availability.toJson, + contentType="application/json") router.api( MethodGet, diff --git a/codex/sales.nim b/codex/sales.nim index c81432ec..4db0ed52 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -1,7 +1,7 @@ import std/sequtils import std/sugar -import std/tables import pkg/questionable +import pkg/questionable/results import pkg/stint import pkg/chronicles import pkg/datastore @@ -101,8 +101,49 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} = if sales.running: sales.agents.keepItIf(it != agent) -proc filled(sales: Sales, - processing: Future[void]) = +proc cleanUp(sales: Sales, + agent: SalesAgent, + processing: Future[void]) {.async.} = + + let data = agent.data + + trace "cleaning up sales agent", + requestId = data.requestId, + slotIndex = data.slotIndex, + reservationId = data.reservation.?id |? ReservationId.default, + availabilityId = data.reservation.?availabilityId |? AvailabilityId.default + + # TODO: return bytes that were used in the request back to the availability + # as well, which will require removing the bytes from disk (perhaps via + # setting blockTTL to -1 and then running block maintainer?) + + # delete reservation and return reservation bytes back to the availability + if reservation =? data.reservation and + deleteErr =? (await sales.context.reservations.deleteReservation( + reservation.id, + reservation.availabilityId + )).errorOption: + error "failure deleting reservation", + error = deleteErr.msg, + reservationId = reservation.id, + availabilityId = reservation.availabilityId + + await sales.remove(agent) + + # signal back to the slot queue to cycle a worker + if not processing.isNil and not processing.finished(): + processing.complete() + +proc filled( + sales: Sales, + request: StorageRequest, + slotIndex: UInt256, + processing: Future[void]) = + + if onSale =? sales.context.onSale: + onSale(request, slotIndex) + + # signal back to the slot queue to cycle a worker if not processing.isNil and not processing.finished(): processing.complete() @@ -117,15 +158,39 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) = none StorageRequest ) - agent.context.onCleanUp = proc {.async.} = - await sales.remove(agent) + agent.onCleanUp = proc {.async.} = + await sales.cleanUp(agent, done) - agent.context.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = - sales.filled(done) + agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = + sales.filled(request, slotIndex, done) agent.start(SalePreparing()) sales.agents.add agent +proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} = + let reservations = sales.context.reservations + without reservs =? await reservations.all(Reservation): + info "no unused reservations found for deletion" + + let unused = reservs.filter(r => ( + let slotId = slotId(r.requestId, r.slotIndex) + not activeSlots.any(slot => slot.id == slotId) + )) + info "found unused reservations for deletion", unused = unused.len + + for reservation in unused: + + logScope: + reservationId = reservation.id + availabilityId = reservation.availabilityId + + if err =? (await reservations.deleteReservation( + reservation.id, reservation.availabilityId + )).errorOption: + error "failed to delete unused reservation", error = err.msg + else: + trace "deleted unused reservation" + proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = let market = sales.context.market let slotIds = await market.mySlots() @@ -139,21 +204,26 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} = return slots proc load*(sales: Sales) {.async.} = - let slots = await sales.mySlots() + let activeSlots = await sales.mySlots() - for slot in slots: + await sales.deleteInactiveReservations(activeSlots) + + for slot in activeSlots: let agent = newSalesAgent( sales.context, slot.request.id, slot.slotIndex, some slot.request) - agent.context.onCleanUp = proc {.async.} = await sales.remove(agent) + agent.onCleanUp = proc {.async.} = + let done = newFuture[void]("onCleanUp_Dummy") + await sales.cleanUp(agent, done) + await done # completed in sales.cleanUp agent.start(SaleUnknown()) sales.agents.add agent -proc onReservationAdded(sales: Sales, availability: Availability) {.async.} = +proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = ## Query last 256 blocks for new requests, adding them to the queue. `push` ## checks for availability before adding to the queue. If processed, the ## sales agent will check if the slot is free. @@ -162,9 +232,9 @@ proc onReservationAdded(sales: Sales, availability: Availability) {.async.} = let queue = context.slotQueue logScope: - topics = "marketplace sales onReservationAdded callback" + topics = "marketplace sales onAvailabilityAdded callback" - trace "reservation added, querying past storage requests to add to queue" + trace "availability added, querying past storage requests to add to queue" try: let events = await market.queryPastStorageRequests(256) @@ -384,10 +454,10 @@ proc startSlotQueue(sales: Sales) {.async.} = asyncSpawn slotQueue.start() - reservations.onReservationAdded = - proc(availability: Availability) {.async.} = - await sales.onReservationAdded(availability) + proc onAvailabilityAdded(availability: Availability) {.async.} = + await sales.onAvailabilityAdded(availability) + reservations.onAvailabilityAdded = onAvailabilityAdded proc subscribe(sales: Sales) {.async.} = await sales.subscribeRequested() diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index e03f5dd8..ec5d2777 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -6,56 +6,79 @@ ## at your option. ## This file may not be copied, modified, or distributed except according to ## those terms. +## +## +--------------------------------------+ +## | RESERVATION | +## +--------------------------------------+ |--------------------------------------| +## | AVAILABILITY | | ReservationId | id | PK | +## |--------------------------------------| |--------------------------------------| +## | AvailabilityId | id | PK |<-||-------o<-| AvailabilityId | availabilityId | FK | +## |--------------------------------------| |--------------------------------------| +## | UInt256 | size | | | UInt256 | size | | +## |--------------------------------------| |--------------------------------------| +## | UInt256 | duration | | | SlotId | slotId | | +## |--------------------------------------| +--------------------------------------+ +## | UInt256 | minPrice | | +## |--------------------------------------| +## | UInt256 | maxCollateral | | +## +--------------------------------------+ + +import pkg/upraises +push: {.upraises: [].} import std/typetraits - import pkg/chronos import pkg/chronicles -import pkg/upraises -import pkg/json_serialization -import pkg/json_serialization/std/options -import pkg/stint -import pkg/stew/byteutils +import pkg/datastore import pkg/nimcrypto import pkg/questionable import pkg/questionable/results -import ../utils/json - -push: {.upraises: [].} - -import pkg/datastore +import pkg/stint +import pkg/stew/byteutils import ../stores import ../contracts/requests +import ../utils/json export requests +export chronicles logScope: - topics = "reservations" + topics = "sales reservations" type AvailabilityId* = distinct array[32, byte] - Availability* = object + ReservationId* = distinct array[32, byte] + SomeStorableObject = Availability | Reservation + SomeStorableId = AvailabilityId | ReservationId + Availability* = ref object id* {.serialize.}: AvailabilityId size* {.serialize.}: UInt256 duration* {.serialize.}: UInt256 minPrice* {.serialize.}: UInt256 maxCollateral* {.serialize.}: UInt256 - used*: bool + Reservation* = ref object + id* {.serialize.}: ReservationId + availabilityId* {.serialize.}: AvailabilityId + size* {.serialize.}: UInt256 + requestId* {.serialize.}: RequestId + slotIndex* {.serialize.}: UInt256 Reservations* = ref object repo: RepoStore - onReservationAdded: ?OnReservationAdded - GetNext* = proc(): Future[?Availability] {.upraises: [], gcsafe, closure.} - OnReservationAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} - AvailabilityIter* = ref object + onAvailabilityAdded: ?OnAvailabilityAdded + GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} + OnAvailabilityAdded* = proc(availability: Availability): Future[void] {.upraises: [], gcsafe.} + StorableIter* = ref object finished*: bool next*: GetNext - AvailabilityError* = object of CodexError - AvailabilityAlreadyExistsError* = object of AvailabilityError - AvailabilityReserveFailedError* = object of AvailabilityError - AvailabilityReleaseFailedError* = object of AvailabilityError - AvailabilityDeleteFailedError* = object of AvailabilityError - AvailabilityGetFailedError* = object of AvailabilityError - AvailabilityUpdateFailedError* = object of AvailabilityError + ReservationsError* = object of CodexError + ReserveFailedError* = object of ReservationsError + ReleaseFailedError* = object of ReservationsError + DeleteFailedError* = object of ReservationsError + GetFailedError* = object of ReservationsError + NotExistsError* = object of ReservationsError + SerializationError* = object of ReservationsError + UpdateFailedError* = object of ReservationsError + BytesOutOfBoundsError* = object of ReservationsError const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module @@ -77,10 +100,29 @@ proc init*( doAssert randomBytes(id) == 32 Availability(id: AvailabilityId(id), size: size, duration: duration, minPrice: minPrice, maxCollateral: maxCollateral) -func toArray*(id: AvailabilityId): array[32, byte] = +proc init*( + _: type Reservation, + availabilityId: AvailabilityId, + size: UInt256, + requestId: RequestId, + slotIndex: UInt256 +): Reservation = + + var id: array[32, byte] + doAssert randomBytes(id) == 32 + Reservation(id: ReservationId(id), availabilityId: availabilityId, size: size, requestId: requestId, slotIndex: slotIndex) + +func toArray(id: SomeStorableId): array[32, byte] = array[32, byte](id) proc `==`*(x, y: AvailabilityId): bool {.borrow.} +proc `==`*(x, y: ReservationId): bool {.borrow.} +proc `==`*(x, y: Reservation): bool = + x.id == y.id and + x.availabilityId == y.availabilityId and + x.size == y.size and + x.requestId == y.requestId and + x.slotIndex == y.slotIndex proc `==`*(x, y: Availability): bool = x.id == y.id and x.size == y.size and @@ -88,9 +130,9 @@ proc `==`*(x, y: Availability): bool = x.maxCollateral == y.maxCollateral and x.minPrice == y.minPrice -proc `$`*(id: AvailabilityId): string = id.toArray.toHex +proc `$`*(id: SomeStorableId): string = id.toArray.toHex -proc toErr[E1: ref CatchableError, E2: AvailabilityError]( +proc toErr[E1: ref CatchableError, E2: ReservationsError]( e1: E1, _: type E2, msg: string = e1.msg): ref E2 = @@ -99,28 +141,30 @@ proc toErr[E1: ref CatchableError, E2: AvailabilityError]( proc writeValue*( writer: var JsonWriter, - value: AvailabilityId) {.upraises:[IOError].} = + value: SomeStorableId) {.upraises:[IOError].} = + ## used for chronicles' logs mixin writeValue - writer.writeValue value.toArray + writer.writeValue %value -proc readValue*[T: AvailabilityId]( - reader: var JsonReader, - value: var T) {.upraises: [SerializationError, IOError].} = +proc `onAvailabilityAdded=`*(self: Reservations, + onAvailabilityAdded: OnAvailabilityAdded) = + self.onAvailabilityAdded = some onAvailabilityAdded - mixin readValue - value = T reader.readValue(T.distinctBase) +func key*(id: AvailabilityId): ?!Key = + ## sales / reservations / + (ReservationsKey / $id) -proc `onReservationAdded=`*(self: Reservations, - onReservationAdded: OnReservationAdded) = - self.onReservationAdded = some onReservationAdded - -func key(id: AvailabilityId): ?!Key = - (ReservationsKey / id.toArray.toHex) +func key*(reservationId: ReservationId, availabilityId: AvailabilityId): ?!Key = + ## sales / reservations / / + (availabilityId.key / $reservationId) func key*(availability: Availability): ?!Key = return availability.id.key +func key*(reservation: Reservation): ?!Key = + return key(reservation.id, reservation.availabilityId) + func available*(self: Reservations): uint = self.repo.available func hasAvailable*(self: Reservations, bytes: uint): bool = @@ -128,84 +172,123 @@ func hasAvailable*(self: Reservations, bytes: uint): bool = proc exists*( self: Reservations, - id: AvailabilityId): Future[?!bool] {.async.} = - - without key =? id.key, err: - return failure(err) + key: Key): Future[bool] {.async.} = let exists = await self.repo.metaDs.contains(key) - return success(exists) + return exists + +proc getImpl( + self: Reservations, + key: Key): Future[?!seq[byte]] {.async.} = + + if exists =? (await self.exists(key)) and not exists: + let err = newException(NotExistsError, "object with key " & $key & " does not exist") + return failure(err) + + without serialized =? await self.repo.metaDs.get(key), error: + return failure(error.toErr(GetFailedError)) + + return success serialized proc get*( self: Reservations, - id: AvailabilityId): Future[?!Availability] {.async.} = + key: Key, + T: type SomeStorableObject): Future[?!T] {.async.} = - if exists =? (await self.exists(id)) and not exists: - let err = newException(AvailabilityGetFailedError, - "Availability does not exist") - return failure(err) + without serialized =? await self.getImpl(key), error: + return failure(error) - without key =? id.key, err: - return failure(err.toErr(AvailabilityGetFailedError)) + without obj =? T.fromJson(serialized), error: + return failure(error.toErr(SerializationError)) - without serialized =? await self.repo.metaDs.get(key), err: - return failure(err.toErr(AvailabilityGetFailedError)) - - without availability =? Json.decode(serialized, Availability).catch, err: - return failure(err.toErr(AvailabilityGetFailedError)) - - return success availability + return success obj proc update( self: Reservations, - availability: Availability): Future[?!void] {.async.} = + obj: SomeStorableObject): Future[?!void] {.async.} = - trace "updating availability", id = availability.id, size = availability.size, - used = availability.used + trace "updating " & $(obj.type), id = obj.id, size = obj.size - without key =? availability.key, err: - return failure(err) + without key =? obj.key, error: + return failure(error) if err =? (await self.repo.metaDs.put( key, - @(availability.toJson.toBytes))).errorOption: - return failure(err.toErr(AvailabilityUpdateFailedError)) + @(obj.toJson.toBytes) + )).errorOption: + return failure(err.toErr(UpdateFailedError)) return success() proc delete( self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = + key: Key): Future[?!void] {.async.} = - trace "deleting availability", id + trace "deleting object", key - without availability =? (await self.get(id)), err: - return failure(err) - - without key =? availability.key, err: - return failure(err) + if exists =? (await self.exists(key)) and not exists: + return success() if err =? (await self.repo.metaDs.delete(key)).errorOption: - return failure(err.toErr(AvailabilityDeleteFailedError)) + return failure(err.toErr(DeleteFailedError)) return success() -proc reserve*( +proc deleteReservation*( self: Reservations, - availability: Availability): Future[?!void] {.async.} = + reservationId: ReservationId, + availabilityId: AvailabilityId): Future[?!void] {.async.} = - if exists =? (await self.exists(availability.id)) and exists: - let err = newException(AvailabilityAlreadyExistsError, - "Availability already exists") - return failure(err) + logScope: + reservationId + availabilityId - without key =? availability.key, err: - return failure(err) + trace "deleting reservation" + without key =? key(reservationId, availabilityId), error: + return failure(error) + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) + + if reservation.size > 0.u256: + trace "returning remaining reservation bytes to availability", + size = reservation.size + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.size += reservation.size + + if updateErr =? (await self.update(availability)).errorOption: + return failure(updateErr) + + if err =? (await self.repo.metaDs.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) + + return success() + +proc createAvailability*( + self: Reservations, + size: UInt256, + duration: UInt256, + minPrice: UInt256, + maxCollateral: UInt256): Future[?!Availability] {.async.} = + + trace "creating availability", size, duration, minPrice, maxCollateral + + let availability = Availability.init( + size, duration, minPrice, maxCollateral + ) let bytes = availability.size.truncate(uint) if reserveErr =? (await self.repo.reserve(bytes)).errorOption: - return failure(reserveErr.toErr(AvailabilityReserveFailedError)) + return failure(reserveErr.toErr(ReserveFailedError)) if updateErr =? (await self.update(availability)).errorOption: @@ -217,144 +300,192 @@ proc reserve*( return failure(updateErr) - if onReservationAdded =? self.onReservationAdded: + if onAvailabilityAdded =? self.onAvailabilityAdded: try: - await onReservationAdded(availability) + await onAvailabilityAdded(availability) except CatchableError as e: # we don't have any insight into types of errors that `onProcessSlot` can # throw because it is caller-defined - warn "Unknown error during 'onReservationAdded' callback", + warn "Unknown error during 'onAvailabilityAdded' callback", availabilityId = availability.id, error = e.msg - return success() + return success(availability) + +proc createReservation*( + self: Reservations, + availabilityId: AvailabilityId, + slotSize: UInt256, + requestId: RequestId, + slotIndex: UInt256 +): Future[?!Reservation] {.async.} = + + trace "creating reservation", availabilityId, slotSize, requestId, slotIndex + + let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + if availability.size < slotSize: + let error = newException(BytesOutOfBoundsError, "trying to reserve an " & + "amount of bytes that is greater than the total size of the Availability") + return failure(error) + + if createResErr =? (await self.update(reservation)).errorOption: + return failure(createResErr) + + # reduce availability size by the slot size, which is now accounted for in + # the newly created Reservation + availability.size -= slotSize + + # update availability with reduced size + if updateErr =? (await self.update(availability)).errorOption: + + trace "rolling back reservation creation" + + without key =? reservation.key, keyError: + keyError.parent = updateErr + return failure(keyError) + + # rollback the reservation creation + if rollbackErr =? (await self.delete(key)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) + + return success(reservation) proc release*( self: Reservations, - id: AvailabilityId, + reservationId: ReservationId, + availabilityId: AvailabilityId, bytes: uint): Future[?!void] {.async.} = - trace "releasing bytes and updating availability", bytes, id + logScope: + topics = "release" + bytes + reservationId + availabilityId - without var availability =? (await self.get(id)), err: - return failure(err) + trace "releasing bytes and updating reservation" - without key =? id.key, err: - return failure(err) + without key =? key(reservationId, availabilityId), error: + return failure(error) + + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) + + if reservation.size < bytes.u256: + let error = newException(BytesOutOfBoundsError, + "trying to release an amount of bytes that is greater than the total " & + "size of the Reservation") + return failure(error) if releaseErr =? (await self.repo.release(bytes)).errorOption: - return failure(releaseErr.toErr(AvailabilityReleaseFailedError)) + return failure(releaseErr.toErr(ReleaseFailedError)) - availability.size = (availability.size.truncate(uint) - bytes).u256 + reservation.size -= bytes.u256 - template rollbackRelease(e: ref CatchableError) = + # persist partially used Reservation with updated size + if err =? (await self.update(reservation)).errorOption: + + # rollback release if an update error encountered trace "rolling back release" if rollbackErr =? (await self.repo.reserve(bytes)).errorOption: - rollbackErr.parent = e + rollbackErr.parent = err return failure(rollbackErr) - - # remove completely used availabilities - if availability.size == 0.u256: - if err =? (await self.delete(availability.id)).errorOption: - rollbackRelease(err) - return failure(err) - - return success() - - # persist partially used availability with updated size - if err =? (await self.update(availability)).errorOption: - rollbackRelease(err) return failure(err) return success() - -proc markUsed*( - self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = - - without var availability =? (await self.get(id)), err: - return failure(err) - - availability.used = true - let r = await self.update(availability) - if r.isOk: - trace "availability marked used", id = id.toArray.toHex - return r - -proc markUnused*( - self: Reservations, - id: AvailabilityId): Future[?!void] {.async.} = - - without var availability =? (await self.get(id)), err: - return failure(err) - - availability.used = false - let r = await self.update(availability) - if r.isOk: - trace "availability marked unused", id = id.toArray.toHex - return r - -iterator items*(self: AvailabilityIter): Future[?Availability] = +iterator items(self: StorableIter): Future[?seq[byte]] = while not self.finished: yield self.next() -proc availabilities*( - self: Reservations): Future[?!AvailabilityIter] {.async.} = +proc storables( + self: Reservations, + T: type SomeStorableObject +): Future[?!StorableIter] {.async.} = - var iter = AvailabilityIter() + var iter = StorableIter() let query = Query.init(ReservationsKey) + when T is Availability: + # should indicate key length of 4, but let the .key logic determine it + without defaultKey =? AvailabilityId.default.key, error: + return failure(error) + elif T is Reservation: + # should indicate key length of 5, but let the .key logic determine it + without defaultKey =? key(ReservationId.default, AvailabilityId.default), error: + return failure(error) + else: + raiseAssert "unknown type" - without results =? await self.repo.metaDs.query(query), err: - return failure(err) + without results =? await self.repo.metaDs.query(query), error: + return failure(error) - proc next(): Future[?Availability] {.async.} = + proc next(): Future[?seq[byte]] {.async.} = await idleAsync() iter.finished = results.finished if not results.finished and - r =? (await results.next()) and - serialized =? r.data and - serialized.len > 0: + res =? (await results.next()) and + res.data.len > 0 and + key =? res.key and + key.namespaces.len == defaultKey.namespaces.len: - return some Json.decode(string.fromBytes(serialized), Availability) + return some res.data - return none Availability + return none seq[byte] iter.next = next return success iter -proc unused*(r: Reservations): Future[?!seq[Availability]] {.async.} = - var ret: seq[Availability] = @[] +proc all*( + self: Reservations, + T: type SomeStorableObject +): Future[?!seq[T]] {.async.} = - without availabilities =? (await r.availabilities), err: - return failure(err) + var ret: seq[T] = @[] - for a in availabilities: - if availability =? (await a) and not availability.used: - ret.add availability + without storables =? (await self.storables(T)), error: + return failure(error) + + for storable in storables.items: + without bytes =? (await storable): + continue + + without obj =? T.fromJson(bytes), error: + error "json deserialization error", + json = string.fromBytes(bytes), + error = error.msg + continue + + ret.add obj return success(ret) -proc find*( +proc findAvailability*( self: Reservations, - size, duration, minPrice, collateral: UInt256, - used: bool): Future[?Availability] {.async.} = + size, duration, minPrice, collateral: UInt256 +): Future[?Availability] {.async.} = - - without availabilities =? (await self.availabilities), err: - error "failed to get all availabilities", error = err.msg + without storables =? (await self.storables(Availability)), e: + error "failed to get all storables", error = e.msg return none Availability - for a in availabilities: - if availability =? (await a): + for item in storables.items: + if bytes =? (await item) and + availability =? Availability.fromJson(bytes): - if used == availability.used and - size <= availability.size and + if size <= availability.size and duration <= availability.duration and collateral <= availability.maxCollateral and minPrice >= availability.minPrice: trace "availability matched", - used, availUsed = availability.used, size, availsize = availability.size, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, @@ -363,7 +494,6 @@ proc find*( return some availability trace "availiability did not match", - used, availUsed = availability.used, size, availsize = availability.size, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 3f84ff9b..97cf1387 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -21,6 +21,14 @@ type context*: SalesContext data*: SalesData subscribed: bool + # Slot-level callbacks. + onCleanUp*: OnCleanUp + onFilled*: ?OnFilled + + OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} + OnFilled* = proc(request: StorageRequest, + slotIndex: UInt256) {.gcsafe, upraises: [].} + SalesAgentError = object of CodexError AllSlotsFilledError* = object of SalesAgentError diff --git a/codex/sales/salescontext.nim b/codex/sales/salescontext.nim index fd3e5b6f..a823e78a 100644 --- a/codex/sales/salescontext.nim +++ b/codex/sales/salescontext.nim @@ -11,11 +11,11 @@ type SalesContext* = ref object market*: Market clock*: Clock + # Sales-level callbacks. Closure will be overwritten each time a slot is + # processed. onStore*: ?OnStore onClear*: ?OnClear onSale*: ?OnSale - onFilled*: ?OnFilled - onCleanUp*: OnCleanUp onProve*: ?OnProve reservations*: Reservations slotQueue*: SlotQueue @@ -29,11 +29,3 @@ type slotIndex: UInt256) {.gcsafe, upraises: [].} OnSale* = proc(request: StorageRequest, slotIndex: UInt256) {.gcsafe, upraises: [].} - - # OnFilled has same function as OnSale, but is kept for internal purposes and should not be set by any external - # purposes as it is used for freeing Queue Workers after slot is filled. And the callbacks allows only - # one callback to be set, so if some other component would use it, it would override the Slot Queue freeing - # mechanism which would lead to blocking of the queue. - OnFilled* = proc(request: StorageRequest, - slotIndex: UInt256) {.gcsafe, upraises: [].} - OnCleanUp* = proc: Future[void] {.gcsafe, upraises: [].} diff --git a/codex/sales/salesdata.nim b/codex/sales/salesdata.nim index 0e975ac1..7fd56149 100644 --- a/codex/sales/salesdata.nim +++ b/codex/sales/salesdata.nim @@ -10,3 +10,4 @@ type request*: ?StorageRequest slotIndex*: UInt256 cancelled*: Future[void] + reservation*: ?Reservation diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index 59e59a6f..50f4f8e6 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -14,10 +14,9 @@ import ./errored type SaleDownloading* = ref object of ErrorHandlingState - availability*: Availability logScope: - topics = "marketplace sales downloading" + topics = "marketplace sales downloading" method `$`*(state: SaleDownloading): string = "SaleDownloading" @@ -36,7 +35,6 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} let data = agent.data let context = agent.context let reservations = context.reservations - let availability = state.availability without onStore =? context.onStore: raiseAssert "onStore callback not set" @@ -47,9 +45,14 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} without slotIndex =? data.slotIndex: raiseAssert("no slot index assigned") - # mark availability as used so that it is not matched to other requests - if markUsedErr =? (await reservations.markUsed(availability.id)).errorOption: - return some State(SaleErrored(error: markUsedErr)) + without reservation =? data.reservation: + raiseAssert("no reservation") + + logScope: + requestId = request.id + slotIndex + reservationId = reservation.id + availabilityId = reservation.availabilityId proc onBatch(blocks: seq[bt.Block]) {.async.} = # release batches of blocks as they are written to disk and @@ -59,25 +62,19 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.} bytes += blk.data.len.uint trace "Releasing batch of bytes written to disk", bytes - let r = await reservations.release(availability.id, bytes) + let r = await reservations.release(reservation.id, + reservation.availabilityId, + bytes) # `tryGet` will raise the exception that occurred during release, if there # was one. The exception will be caught in the closure and sent to the # SaleErrored state. r.tryGet() - template markUnused(id: AvailabilityId) = - if markUnusedErr =? (await reservations.markUnused(id)).errorOption: - return some State(SaleErrored(error: markUnusedErr)) - trace "Starting download" if err =? (await onStore(request, slotIndex, onBatch)).errorOption: - - markUnused(availability.id) return some State(SaleErrored(error: err)) trace "Download complete" - - markUnused(availability.id) return some State(SaleInitialProving()) diff --git a/codex/sales/states/errored.nim b/codex/sales/states/errored.nim index 72f22e98..4533583e 100644 --- a/codex/sales/states/errored.nim +++ b/codex/sales/states/errored.nim @@ -4,6 +4,7 @@ import pkg/upraises import pkg/chronicles import ../statemachine import ../salesagent +import ../../utils/exceptions logScope: topics = "marketplace sales errored" @@ -21,13 +22,13 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} = let data = agent.data let context = agent.context - error "Sale error", error=state.error.msg, requestId = $data.requestId + error "Sale error", error=state.error.msgDetail, requestId = data.requestId, slotIndex = data.slotIndex if onClear =? context.onClear and request =? data.request and slotIndex =? data.slotIndex: onClear(request, slotIndex) - if onCleanUp =? context.onCleanUp: + if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index 25c14dde..2ba14672 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -26,8 +26,9 @@ method onFailed*(state: SaleFilled, request: StorageRequest): ?State = method `$`*(state: SaleFilled): string = "SaleFilled" method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = - let data = SalesAgent(machine).data - let context = SalesAgent(machine).context + let agent = SalesAgent(machine) + let data = agent.data + let context = agent.context let market = context.market without slotIndex =? data.slotIndex: @@ -39,9 +40,7 @@ method run*(state: SaleFilled, machine: Machine): Future[?State] {.async.} = info "Slot succesfully filled", requestId = $data.requestId, slotIndex if request =? data.request and slotIndex =? data.slotIndex: - if onSale =? context.onSale: - onSale(request, slotIndex) - if onFilled =? context.onFilled: + if onFilled =? agent.onFilled: onFilled(request, slotIndex) when codex_enable_proof_failures: diff --git a/codex/sales/states/finished.nim b/codex/sales/states/finished.nim index 7cc5ba7e..1a8c5151 100644 --- a/codex/sales/states/finished.nim +++ b/codex/sales/states/finished.nim @@ -23,7 +23,6 @@ method onFailed*(state: SaleFinished, request: StorageRequest): ?State = method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) let data = agent.data - let context = agent.context without request =? data.request: raiseAssert "no sale request" @@ -33,5 +32,5 @@ method run*(state: SaleFinished, machine: Machine): Future[?State] {.async.} = info "Slot finished and paid out", requestId = $data.requestId, slotIndex - if onCleanUp =? context.onCleanUp: + if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/codex/sales/states/ignored.nim b/codex/sales/states/ignored.nim index a7ce7e6c..5955937e 100644 --- a/codex/sales/states/ignored.nim +++ b/codex/sales/states/ignored.nim @@ -14,7 +14,6 @@ method `$`*(state: SaleIgnored): string = "SaleIgnored" method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} = let agent = SalesAgent(machine) - let context = agent.context - if onCleanUp =? context.onCleanUp: + if onCleanUp =? agent.onCleanUp: await onCleanUp() diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index b0c5120c..d6a2270e 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -10,6 +10,7 @@ import ./failed import ./filled import ./ignored import ./downloading +import ./errored type SalePreparing* = ref object of ErrorHandlingState @@ -50,20 +51,33 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = # TODO: Once implemented, check to ensure the host is allowed to fill the slot, # due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal) + logScope: + slotIndex = data.slotIndex + slotSize = request.ask.slotSize + duration = request.ask.duration + pricePerSlot = request.ask.pricePerSlot + # availability was checked for this slot when it entered the queue, however # check to the ensure that there is still availability as they may have # changed since being added (other slots may have been processed in that time) - without availability =? await reservations.find( + without availability =? await reservations.findAvailability( request.ask.slotSize, request.ask.duration, request.ask.pricePerSlot, - request.ask.collateral, - used = false): - info "no availability found for request, ignoring", - slotSize = request.ask.slotSize, - duration = request.ask.duration, - pricePerSlot = request.ask.pricePerSlot, - used = false + request.ask.collateral): + debug "no availability found for request, ignoring" + return some State(SaleIgnored()) - return some State(SaleDownloading(availability: availability)) + info "availability found for request, creating reservation" + + without reservation =? await reservations.createReservation( + availability.id, + request.ask.slotSize, + request.id, + data.slotIndex + ), error: + return some State(SaleErrored(error: error)) + + data.reservation = some reservation + return some State(SaleDownloading()) diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index 3d49f741..b01cba79 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -103,6 +103,8 @@ proc stop*(machine: Machine) {.async.} = if not machine.started: return + trace "stopping state machine" + machine.started = false await machine.trackedFutures.cancelTracked() diff --git a/codex/utils/exceptions.nim b/codex/utils/exceptions.nim new file mode 100644 index 00000000..4aa2af95 --- /dev/null +++ b/codex/utils/exceptions.nim @@ -0,0 +1,7 @@ +import std/strformat + +proc msgDetail*(e: ref CatchableError): string = + var msg = e.msg + if e.parent != nil: + msg = fmt"{msg} Inner exception: {e.parent.msg}" + return msg \ No newline at end of file diff --git a/codex/utils/json.nim b/codex/utils/json.nim index 1960f7af..a893ce5f 100644 --- a/codex/utils/json.nim +++ b/codex/utils/json.nim @@ -234,6 +234,13 @@ proc fromJson*[T: object]( let json = ?catch parseJson(string.fromBytes(bytes)) T.fromJson(json) +proc fromJson*[T: ref object]( + _: type T, + bytes: seq[byte] +): ?!T = + let json = ?catch parseJson(string.fromBytes(bytes)) + T.fromJson(json) + func `%`*(s: string): JsonNode = newJString(s) func `%`*(n: uint): JsonNode = @@ -307,6 +314,9 @@ func `%`*[T: distinct](id: T): JsonNode = type baseType = T.distinctBase % baseType(id) +func toJson*(obj: object): string = $(%obj) +func toJson*(obj: ref object): string = $(%obj) + proc toJsnImpl(x: NimNode): NimNode = case x.kind of nnkBracket: # array diff --git a/codex/utils/trackedfutures.nim b/codex/utils/trackedfutures.nim index ea26c4ae..064bf9e3 100644 --- a/codex/utils/trackedfutures.nim +++ b/codex/utils/trackedfutures.nim @@ -16,14 +16,12 @@ proc len*(self: TrackedFutures): int = self.futures.len proc removeFuture(self: TrackedFutures, future: FutureBase) = if not self.cancelling and not future.isNil: - trace "removing tracked future" self.futures.del(future.id) proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] = if self.cancelling: return fut - trace "tracking future", id = fut.id self.futures[fut.id] = FutureBase(fut) fut @@ -42,6 +40,8 @@ proc track*[T, U](future: Future[T], self: U): Future[T] = proc cancelTracked*(self: TrackedFutures) {.async.} = self.cancelling = true + trace "cancelling tracked futures" + for future in self.futures.values: if not future.isNil and not future.finished: trace "cancelling tracked future", id = future.id diff --git a/tests/codex/examples.nim b/tests/codex/examples.nim index fce8c46b..f85ae67d 100644 --- a/tests/codex/examples.nim +++ b/tests/codex/examples.nim @@ -60,3 +60,11 @@ proc example*(_: type Availability): Availability = minPrice = uint64.example.u256, maxCollateral = uint16.example.u256 ) + +proc example*(_: type Reservation): Reservation = + Reservation.init( + availabilityId = AvailabilityId(array[32, byte].example), + size = uint16.example.u256, + slotId = SlotId.example + ) + diff --git a/tests/codex/sales/helpers.nim b/tests/codex/sales/helpers.nim deleted file mode 100644 index 9ba250ca..00000000 --- a/tests/codex/sales/helpers.nim +++ /dev/null @@ -1,17 +0,0 @@ -import pkg/chronos -import pkg/questionable -import pkg/questionable/results - -import pkg/codex/sales/reservations -import ../helpers - -export checktest - -proc allAvailabilities*(r: Reservations): Future[seq[Availability]] {.async.} = - var ret: seq[Availability] = @[] - without availabilities =? (await r.availabilities), err: - raiseAssert "failed to get availabilities, error: " & err.msg - for a in availabilities: - if availability =? (await a): - ret.add availability - return ret diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index db02d46d..4a7d94d7 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -1,40 +1,60 @@ +import std/random + import pkg/questionable import pkg/questionable/results - import pkg/chronos import pkg/asynctest import pkg/datastore -import pkg/json_serialization -import pkg/json_serialization/std/options import pkg/codex/stores import pkg/codex/sales +import pkg/codex/utils/json import ../examples -import ./helpers +import ../helpers asyncchecksuite "Reservations module": var repo: RepoStore repoDs: Datastore metaDs: SQLiteDatastore - availability: Availability reservations: Reservations setup: + randomize(1.int64) # create reproducible results repoDs = SQLiteDatastore.new(Memory).tryGet() metaDs = SQLiteDatastore.new(Memory).tryGet() repo = RepoStore.new(repoDs, metaDs) reservations = Reservations.new(repo) - availability = Availability.example + + proc createAvailability(): Availability = + let example = Availability.example + let size = rand(100000..200000) + let availability = waitFor reservations.createAvailability( + size.u256, + example.duration, + example.minPrice, + example.maxCollateral + ) + return availability.get + + proc createReservation(availability: Availability): Reservation = + let size = rand(1.. 0 check market.filled[0].requestId == request.id @@ -378,19 +401,15 @@ asyncchecksuite "Sales": check market.filled[0].host == await market.getSigner() test "calls onFilled when slot is filled": - var soldAvailability: Availability - var soldRequest: StorageRequest - var soldSlotIndex: UInt256 + var soldRequest = StorageRequest.default + var soldSlotIndex = UInt256.high sales.onSale = proc(request: StorageRequest, slotIndex: UInt256) = - if a =? availability: - soldAvailability = a soldRequest = request soldSlotIndex = slotIndex - check isOk await reservations.reserve(availability) + createAvailability() await market.requestStorage(request) - check eventually soldAvailability == availability - check soldRequest == request + check eventually soldRequest == request check soldSlotIndex < request.ask.slots.u256 test "calls onClear when storage becomes available again": @@ -404,7 +423,7 @@ asyncchecksuite "Sales": slotIndex: UInt256) = clearedRequest = request clearedSlotIndex = slotIndex - check isOk await reservations.reserve(availability) + createAvailability() await market.requestStorage(request) check eventually clearedRequest == request check clearedSlotIndex < request.ask.slots.u256 @@ -416,22 +435,24 @@ asyncchecksuite "Sales": onBatch: BatchProc): Future[?!void] {.async.} = await sleepAsync(chronos.hours(1)) return success() - check isOk await reservations.reserve(availability) + createAvailability() await market.requestStorage(request) for slotIndex in 0.. agent.data.requestId == request.id and agent.data.slotIndex == 0.u256) check sales.agents.any(agent => agent.data.requestId == request.id and agent.data.slotIndex == 1.u256) + + test "deletes inactive reservations on load": + createAvailability() + discard await reservations.createReservation( + availability.id, + 100.u256, + RequestId.example, + UInt256.example) + check (await reservations.all(Reservation)).get.len == 1 + await sales.load() + check (await reservations.all(Reservation)).get.len == 0 + check getAvailability().size == availability.size # was restored diff --git a/tests/codex/utils/testjson.nim b/tests/codex/utils/testjson.nim index 26c2fe49..059f80ef 100644 --- a/tests/codex/utils/testjson.nim +++ b/tests/codex/utils/testjson.nim @@ -211,7 +211,7 @@ checksuite "json serialization": }, "expiry": "1691545330" }""".flatten - check $(%request) == expected + check request.toJson == expected test "deserializes UInt256 from non-hex string representation": let json = newJString("100000") diff --git a/tests/examples.nim b/tests/examples.nim index e14f5149..a3171f27 100644 --- a/tests/examples.nim +++ b/tests/examples.nim @@ -1,6 +1,7 @@ import std/random import std/sequtils import std/times +import std/typetraits import pkg/codex/contracts/requests import pkg/codex/sales/slotqueue import pkg/stint @@ -19,8 +20,9 @@ proc example*[T](_: type seq[T]): seq[T] = proc example*(_: type UInt256): UInt256 = UInt256.fromBytes(array[32, byte].example) -proc example*[T: RequestId | SlotId | Nonce](_: type T): T = - T(array[32, byte].example) +proc example*[T: distinct](_: type T): T = + type baseType = T.distinctBase + T(baseType.example) proc example*(_: type StorageRequest): StorageRequest = StorageRequest(