diff --git a/codex/sales.nim b/codex/sales.nim index 01cc0fd7..6a00e53b 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -105,14 +105,15 @@ proc new*( subscriptions: @[], ) -proc remove(sales: Sales, agent: SalesAgent) {.async.} = +proc remove(sales: Sales, agent: SalesAgent) {.async: (raises: []).} = await agent.stop() + if sales.running: sales.agents.keepItIf(it != agent) proc cleanUp( sales: Sales, agent: SalesAgent, reprocessSlot: bool, returnedCollateral: ?UInt256 -) {.async.} = +) {.async: (raises: []).} = let data = agent.data logScope: @@ -129,36 +130,32 @@ proc cleanUp( # there are not really any bytes to be returned if request =? data.request and reservation =? data.reservation: if returnErr =? ( - await sales.context.reservations.returnBytesToAvailability( + await noCancel sales.context.reservations.returnBytesToAvailability( reservation.availabilityId, reservation.id, request.ask.slotSize ) ).errorOption: error "failure returning bytes", error = returnErr.msg, bytes = request.ask.slotSize - # delete reservation and return reservation bytes back to the availability - if reservation =? data.reservation and - deleteErr =? ( - await sales.context.reservations.deleteReservation( - reservation.id, reservation.availabilityId, returnedCollateral - ) - ).errorOption: - error "failure deleting reservation", error = deleteErr.msg - - if data.slotIndex > uint16.high.uint64: - error "Cannot cast slot index to uint16", slotIndex = data.slotIndex - return + # delete reservation and return reservation bytes back to the availability + if reservation =? data.reservation and + deleteErr =? ( + await noCancel sales.context.reservations.deleteReservation( + reservation.id, reservation.availabilityId, returnedCollateral + ) + ).errorOption: + error "failure deleting reservation", error = deleteErr.msg # Re-add items back into the queue to prevent small availabilities from # draining the queue. Seen items will be ordered last. - if reprocessSlot and request =? data.request: - try: - without collateral =? - await sales.context.market.slotCollateral(data.requestId, data.slotIndex), err: - error "Failed to re-add item back to the slot queue: unable to calculate collateral", - error = err.msg - return - + if data.slotIndex <= uint16.high.uint64 and reprocessSlot and request =? data.request: + let res = + await noCancel sales.context.market.slotCollateral(data.requestId, data.slotIndex) + if res.isErr: + error "Failed to re-add item back to the slot queue: unable to calculate collateral", + error = res.error.msg + else: + let collateral = res.get() let queue = sales.context.slotQueue var seenItem = SlotQueueItem.init( data.requestId, @@ -171,11 +168,9 @@ proc cleanUp( trace "pushing ignored item to queue, marked as seen" if err =? queue.push(seenItem).errorOption: error "failed to readd slot to queue", errorType = $(type err), error = err.msg - except MarketError as e: - error "Failed to re-add item back to the slot queue.", error = e.msg - return - await sales.remove(agent) + let fut = sales.remove(agent) + sales.trackedFutures.track(fut) proc filled(sales: Sales, request: StorageRequest, slotIndex: uint64) = if onSale =? sales.context.onSale: @@ -193,7 +188,7 @@ proc processSlot( agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = trace "slot cleanup" await sales.cleanUp(agent, reprocessSlot, returnedCollateral) completed.fire() @@ -269,7 +264,7 @@ proc load*(sales: Sales) {.async.} = agent.onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = await sales.cleanUp(agent, reprocessSlot, returnedCollateral) # There is no need to assign agent.onFilled as slots loaded from `mySlots` diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 07e3f406..fe4a82d3 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -248,7 +248,7 @@ proc exists*( let exists = await self.repo.metaDs.ds.contains(key) return exists -iterator items(self: StorableIter): Future[?seq[byte]] = +iterator items(self: StorableIter): auto = while not self.finished: yield self.next() @@ -368,7 +368,9 @@ proc update*( error "Lock error when trying to update the availability", err = e.msg return failure(e) -proc delete(self: Reservations, key: Key): Future[?!void] {.async.} = +proc delete( + self: Reservations, key: Key +): Future[?!void] {.async: (raises: [CancelledError]).} = trace "deleting object", key if not await self.exists(key): @@ -384,7 +386,7 @@ proc deleteReservation*( reservationId: ReservationId, availabilityId: AvailabilityId, returnedCollateral: ?UInt256 = UInt256.none, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: reservationId availabilityId @@ -393,35 +395,39 @@ proc deleteReservation*( without key =? key(reservationId, availabilityId), error: return failure(error) - withLock(self.availabilityLock): - without reservation =? (await self.get(key, Reservation)), error: - if error of NotExistsError: - return success() - else: - return failure(error) + try: + withLock(self.availabilityLock): + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) - if reservation.size > 0.uint64: - trace "returning remaining reservation bytes to availability", - size = reservation.size + if reservation.size > 0.uint64: + trace "returning remaining reservation bytes to availability", + size = reservation.size - without availabilityKey =? availabilityId.key, error: - return failure(error) + without availabilityKey =? availabilityId.key, error: + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - availability.freeSize += reservation.size + availability.freeSize += reservation.size - if collateral =? returnedCollateral: - availability.totalRemainingCollateral += collateral + if collateral =? returnedCollateral: + availability.totalRemainingCollateral += collateral - if updateErr =? (await self.updateAvailability(availability)).errorOption: - return failure(updateErr) + if updateErr =? (await self.updateAvailability(availability)).errorOption: + return failure(updateErr) - if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: - return failure(err.toErr(DeleteFailedError)) + if err =? (await self.repo.metaDs.ds.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) - return success() + return success() + except AsyncLockError as e: + error "Lock error when trying to delete the availability", err = e.msg + return failure(e) # TODO: add support for deleting availabilities # To delete, must not have any active sales. @@ -434,7 +440,7 @@ proc createAvailability*( totalCollateral: UInt256, enabled: bool, until: SecondsSince1970, -): Future[?!Availability] {.async.} = +): Future[?!Availability] {.async: (raises: [CancelledError]).} = trace "creating availability", size, duration, minPricePerBytePerSecond, totalCollateral, enabled, until @@ -470,109 +476,116 @@ method createReservation*( slotIndex: uint64, collateralPerByte: UInt256, validUntil: SecondsSince1970, -): Future[?!Reservation] {.async, base.} = - withLock(self.availabilityLock): - without availabilityKey =? availabilityId.key, error: - return failure(error) +): Future[?!Reservation] {.async: (raises: [CancelledError]), base.} = + try: + withLock(self.availabilityLock): + without availabilityKey =? availabilityId.key, error: + return failure(error) - without availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + without availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications - if availability.freeSize < slotSize: - let error = newException( - BytesOutOfBoundsError, - "trying to reserve an amount of bytes that is greater than the free size of the Availability", - ) - return failure(error) + # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications + if availability.freeSize < slotSize: + let error = newException( + BytesOutOfBoundsError, + "trying to reserve an amount of bytes that is greater than the free size of the Availability", + ) + return failure(error) - trace "Creating reservation", - availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil + trace "Creating reservation", + availabilityId, slotSize, requestId, slotIndex, validUntil = validUntil - let reservation = - Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil) + let reservation = + Reservation.init(availabilityId, slotSize, requestId, slotIndex, validUntil) - if createResErr =? (await self.update(reservation)).errorOption: - return failure(createResErr) + if createResErr =? (await self.update(reservation)).errorOption: + return failure(createResErr) - # reduce availability freeSize by the slot size, which is now accounted for in - # the newly created Reservation - availability.freeSize -= slotSize + # reduce availability freeSize by the slot size, which is now accounted for in + # the newly created Reservation + availability.freeSize -= slotSize - # adjust the remaining totalRemainingCollateral - availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte + # adjust the remaining totalRemainingCollateral + availability.totalRemainingCollateral -= slotSize.u256 * collateralPerByte - # update availability with reduced size - trace "Updating availability with reduced size" - if updateErr =? (await self.updateAvailability(availability)).errorOption: - trace "Updating availability failed, rolling back reservation creation" + # update availability with reduced size + trace "Updating availability with reduced size" + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Updating availability failed, rolling back reservation creation" - without key =? reservation.key, keyError: - keyError.parent = updateErr - return failure(keyError) + without key =? reservation.key, keyError: + keyError.parent = updateErr + return failure(keyError) - # rollback the reservation creation - if rollbackErr =? (await self.delete(key)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) + # rollback the reservation creation + if rollbackErr =? (await self.delete(key)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) - return failure(updateErr) + return failure(updateErr) - trace "Reservation succesfully created" - return success(reservation) + trace "Reservation succesfully created" + return success(reservation) + except AsyncLockError as e: + error "Lock error when trying to delete the availability", err = e.msg + return failure(e) proc returnBytesToAvailability*( self: Reservations, availabilityId: AvailabilityId, reservationId: ReservationId, bytes: uint64, -): Future[?!void] {.async.} = +): Future[?!void] {.async: (raises: [CancelledError]).} = logScope: reservationId availabilityId + try: + withLock(self.availabilityLock): + without key =? key(reservationId, availabilityId), error: + return failure(error) - withLock(self.availabilityLock): - without key =? key(reservationId, availabilityId), error: - return failure(error) + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) - without var reservation =? (await self.get(key, Reservation)), error: - return failure(error) + # We are ignoring bytes that are still present in the Reservation because + # they will be returned to Availability through `deleteReservation`. + let bytesToBeReturned = bytes - reservation.size - # We are ignoring bytes that are still present in the Reservation because - # they will be returned to Availability through `deleteReservation`. - let bytesToBeReturned = bytes - reservation.size + if bytesToBeReturned == 0: + trace "No bytes are returned", + requestSizeBytes = bytes, returningBytes = bytesToBeReturned + return success() - if bytesToBeReturned == 0: - trace "No bytes are returned", + trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + + # First lets see if we can re-reserve the bytes, if the Repo's quota + # is depleted then we will fail-fast as there is nothing to be done atm. + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.freeSize += bytesToBeReturned + + # Update availability with returned size + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Rolling back returning bytes" + if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) + return success() - - trace "Returning bytes", - requestSizeBytes = bytes, returningBytes = bytesToBeReturned - - # First lets see if we can re-reserve the bytes, if the Repo's quota - # is depleted then we will fail-fast as there is nothing to be done atm. - if reserveErr =? (await self.repo.reserve(bytesToBeReturned.NBytes)).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - without availabilityKey =? availabilityId.key, error: - return failure(error) - - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) - - availability.freeSize += bytesToBeReturned - - # Update availability with returned size - if updateErr =? (await self.updateAvailability(availability)).errorOption: - trace "Rolling back returning bytes" - if rollbackErr =? (await self.repo.release(bytesToBeReturned.NBytes)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) - - return failure(updateErr) - - return success() + except AsyncLockError as e: + error "Lock error when returning bytes to the availability", err = e.msg + return failure(e) proc release*( self: Reservations, @@ -698,7 +711,7 @@ proc findAvailability*( size, duration: uint64, pricePerBytePerSecond, collateralPerByte: UInt256, validUntil: SecondsSince1970, -): Future[?Availability] {.async.} = +): Future[?Availability] {.async: (raises: [CancelledError]).} = without storables =? (await self.storables(Availability)), e: error "failed to get all storables", error = e.msg return none Availability diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index 61f3a9d3..6b62d5e4 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -26,10 +26,10 @@ type onCleanUp*: OnCleanUp onFilled*: ?OnFilled - OnCleanUp* = proc( - reprocessSlot = false, returnedCollateral = UInt256.none - ): Future[void] {.gcsafe, upraises: [].} - OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].} + OnCleanUp* = proc(reprocessSlot = false, returnedCollateral = UInt256.none) {. + async: (raises: []) + .} + OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, raises: [].} SalesAgentError = object of CodexError AllSlotsFilledError* = object of SalesAgentError @@ -132,7 +132,7 @@ proc subscribe*(agent: SalesAgent) {.async.} = await agent.subscribeCancellation() agent.subscribed = true -proc unsubscribe*(agent: SalesAgent) {.async.} = +proc unsubscribe*(agent: SalesAgent) {.async: (raises: []).} = if not agent.subscribed: return @@ -143,6 +143,6 @@ proc unsubscribe*(agent: SalesAgent) {.async.} = agent.subscribed = false -proc stop*(agent: SalesAgent) {.async.} = +proc stop*(agent: SalesAgent) {.async: (raises: []).} = await Machine(agent).stop() await agent.unsubscribe() diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index a3aee4c9..807bb196 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -82,7 +82,7 @@ method run*( info "Availability found for request, creating reservation" without reservation =? - await reservations.createReservation( + await noCancel reservations.createReservation( availability.id, request.ask.slotSize, request.id, data.slotIndex, request.ask.collateralPerByte, requestEnd, ), error: diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index eb84378c..194aea20 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -2,7 +2,6 @@ import pkg/questionable import pkg/chronos import ../logutils import ./trackedfutures -import ./exceptions {.push raises: [].} @@ -89,7 +88,7 @@ proc start*(machine: Machine, initialState: State) = machine.trackedFutures.track(fut) machine.schedule(Event.transition(machine.state, initialState)) -proc stop*(machine: Machine) {.async.} = +proc stop*(machine: Machine) {.async: (raises: []).} = if not machine.started: return diff --git a/tests/codex/helpers/mockreservations.nim b/tests/codex/helpers/mockreservations.nim index 91ed04ec..a8933e00 100644 --- a/tests/codex/helpers/mockreservations.nim +++ b/tests/codex/helpers/mockreservations.nim @@ -30,7 +30,7 @@ method createReservation*( slotIndex: uint64, collateralPerByte: UInt256, validUntil: SecondsSince1970, -): Future[?!Reservation] {.async.} = +): Future[?!Reservation] {.async: (raises: [CancelledError]).} = if self.createReservationThrowBytesOutOfBoundsError: let error = newException( BytesOutOfBoundsError, diff --git a/tests/codex/sales/states/testcancelled.nim b/tests/codex/sales/states/testcancelled.nim index 6eaf1f5a..ebd9fe2d 100644 --- a/tests/codex/sales/states/testcancelled.nim +++ b/tests/codex/sales/states/testcancelled.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'cancelled'": market = MockMarket.new() let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral diff --git a/tests/codex/sales/states/testerrored.nim b/tests/codex/sales/states/testerrored.nim index 0cc26cf8..b0352edb 100644 --- a/tests/codex/sales/states/testerrored.nim +++ b/tests/codex/sales/states/testerrored.nim @@ -25,7 +25,7 @@ asyncchecksuite "sales state 'errored'": setup: let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = reprocessSlot let context = SalesContext(market: market, clock: clock) diff --git a/tests/codex/sales/states/testfinished.nim b/tests/codex/sales/states/testfinished.nim index 1648df3a..b5502351 100644 --- a/tests/codex/sales/states/testfinished.nim +++ b/tests/codex/sales/states/testfinished.nim @@ -31,7 +31,7 @@ asyncchecksuite "sales state 'finished'": market = MockMarket.new() let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = some reprocessSlot returnedCollateralValue = returnedCollateral diff --git a/tests/codex/sales/states/testignored.nim b/tests/codex/sales/states/testignored.nim index 5eea7d16..b509ce9b 100644 --- a/tests/codex/sales/states/testignored.nim +++ b/tests/codex/sales/states/testignored.nim @@ -25,7 +25,7 @@ asyncchecksuite "sales state 'ignored'": setup: let onCleanUp = proc( reprocessSlot = false, returnedCollateral = UInt256.none - ) {.async.} = + ) {.async: (raises: []).} = reprocessSlotWas = reprocessSlot let context = SalesContext(market: market, clock: clock) diff --git a/tests/integration/codexclient.nim b/tests/integration/codexclient.nim index 5ce5d42a..17ed6dd4 100644 --- a/tests/integration/codexclient.nim +++ b/tests/integration/codexclient.nim @@ -424,3 +424,10 @@ proc requestId*( proc buildUrl*(client: CodexClient, path: string): string = return client.baseurl & path + +proc getSlots*( + client: CodexClient +): Future[?!seq[Slot]] {.async: (raises: [CancelledError, HttpError]).} = + let url = client.baseurl & "/sales/slots" + let body = await client.getContent(url) + seq[Slot].fromJson(body) diff --git a/tests/integration/testmarketplace.nim b/tests/integration/testmarketplace.nim index 40f394e0..b2d44b3f 100644 --- a/tests/integration/testmarketplace.nim +++ b/tests/integration/testmarketplace.nim @@ -133,6 +133,87 @@ marketplacesuite "Marketplace": timeout = 10 * 1000, # give client a bit of time to withdraw its funds ) + test "SP are able to process slots after workers were busy with other slots and ignored them", + NodeConfigs( + clients: CodexConfigs.init(nodes = 1) + # .debug() + .some, + providers: CodexConfigs.init(nodes = 2) + # .debug() + # .withLogFile() + # .withLogTopics("marketplace", "sales", "statemachine","slotqueue", "reservations") + .some, + ): + let client0 = clients()[0] + let provider0 = providers()[0] + let provider1 = providers()[1] + let duration = 20 * 60.uint64 + + let data = await RandomChunker.example(blocks = blocks) + let slotSize = slotSize(blocks, ecNodes, ecTolerance) + + # We create an avavilability allowing the first SP to host the 3 slots. + # So the second SP will not have any availability so it will just process + # the slots and ignore them. + discard await provider0.client.postAvailability( + totalSize = 3 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * slotSize * minPricePerBytePerSecond, + ) + + let cid = (await client0.client.upload(data)).get + + let purchaseId = await client0.client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = minPricePerBytePerSecond, + proofProbability = 1.u256, + expiry = 10 * 60.uint64, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + ) + + let requestId = (await client0.client.requestId(purchaseId)).get + + # We wait that the 3 slots are filled by the first SP + check eventually( + await client0.client.purchaseStateIs(purchaseId, "started"), + timeout = 10 * 60.int * 1000, + ) + + # Here we create the same availability as previously but for the second SP. + # Meaning that, after ignoring all the slots for the first request, the second SP will process + # and host the slots for the second request. + discard await provider1.client.postAvailability( + totalSize = 3 * slotSize.truncate(uint64), + duration = duration, + minPricePerBytePerSecond = minPricePerBytePerSecond, + totalCollateral = 3 * slotSize * collateralPerByte, + ) + + let purchaseId2 = await client0.client.requestStorage( + cid, + duration = duration, + pricePerBytePerSecond = minPricePerBytePerSecond, + proofProbability = 3.u256, + expiry = 10 * 60.uint64, + collateralPerByte = collateralPerByte, + nodes = ecNodes, + tolerance = ecTolerance, + ) + let requestId2 = (await client0.client.requestId(purchaseId2)).get + + # Wait that the slots of the second request are filled + check eventually( + await client0.client.purchaseStateIs(purchaseId2, "started"), + timeout = 10 * 60.int * 1000, + ) + + # Double check, verify that our second SP hosts the 3 slots + check eventually ((await provider1.client.getSlots()).get).len == 3 + marketplacesuite "Marketplace payouts": const minPricePerBytePerSecond = 1.u256 const collateralPerByte = 1.u256 @@ -145,14 +226,18 @@ marketplacesuite "Marketplace payouts": # Uncomment to start Hardhat automatically, typically so logs can be inspected locally hardhat: HardhatConfig.none, clients: CodexConfigs.init(nodes = 1) - # .debug() # uncomment to enable console log output.debug() - # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log + # .debug() # uncomment to enable console log output.debug() + # .withLogFile() + # # uncomment to output log file to tests/integration/logs/ //_.log # .withLogTopics("node", "erasure") .some, providers: CodexConfigs.init(nodes = 1) - # .debug() # uncomment to enable console log output - # .withLogFile() # uncomment to output log file to tests/integration/logs/ //_.log - # .withLogTopics("node", "marketplace", "sales", "reservations", "node", "proving", "clock") + # .debug() # uncomment to enable console log output + # .withLogFile() + # # uncomment to output log file to tests/integration/logs/ //_.log + # .withLogTopics( + # "node", "marketplace", "sales", "reservations", "node", "statemachine" + # ) .some, ): let duration = 20.periods @@ -203,7 +288,12 @@ marketplacesuite "Marketplace payouts": # wait until sale is cancelled await ethProvider.advanceTime(expiry.u256) - check eventually await providerApi.saleStateIs(slotId, "SaleCancelled") + + check eventually( + await providerApi.saleStateIs(slotId, "SaleCancelled"), + timeout = 5 * 1000, + pollInterval = 200, + ) await advanceToNextPeriod()