diff --git a/codex/sales/reservations.nim b/codex/sales/reservations.nim index 8ba762ea..3303b05c 100644 --- a/codex/sales/reservations.nim +++ b/codex/sales/reservations.nim @@ -53,6 +53,7 @@ export logutils logScope: topics = "sales reservations" + type AvailabilityId* = distinct array[32, byte] ReservationId* = distinct array[32, byte] @@ -71,7 +72,8 @@ type size* {.serialize.}: UInt256 requestId* {.serialize.}: RequestId slotIndex* {.serialize.}: UInt256 - Reservations* = ref object + Reservations* = ref object of RootObj + availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability repo: RepoStore onAvailabilityAdded: ?OnAvailabilityAdded GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} @@ -95,12 +97,22 @@ const SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module ReservationsKey = (SalesKey / "reservations").tryGet +proc hash*(x: AvailabilityId): Hash {.borrow.} proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} +template withLock(lock, body) = + try: + await lock.acquire() + body + finally: + if lock.locked: + lock.release() + + proc new*(T: type Reservations, repo: RepoStore): Reservations = - T(repo: repo) + T(availabilityLock: newAsyncLock(),repo: repo) proc init*( _: type Availability, @@ -221,20 +233,19 @@ proc updateImpl( return success() -proc update*( - self: Reservations, - obj: Reservation): Future[?!void] {.async.} = - return await self.updateImpl(obj) - -proc update*( +proc updateAvailability( self: Reservations, obj: Availability): Future[?!void] {.async.} = + logScope: + availabilityId = obj.id + without key =? obj.key, error: return failure(error) without oldAvailability =? await self.get(key, Availability), err: if err of NotExistsError: + trace "Creating new Availability" let res = await self.updateImpl(obj) # inform subscribers that Availability has been added if onAvailabilityAdded =? self.onAvailabilityAdded: @@ -248,14 +259,14 @@ proc update*( except CatchableError as e: # we don't have any insight into types of exceptions that # `onAvailabilityAdded` can raise because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = obj.id, error = e.msg + warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg return res else: return failure(err) # Sizing of the availability changed, we need to adjust the repo reservation accordingly if oldAvailability.totalSize != obj.totalSize: + trace "totalSize changed, updating repo reservation" if oldAvailability.totalSize < obj.totalSize: # storage added if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: return failure(reserveErr.toErr(ReserveFailedError)) @@ -280,11 +291,21 @@ proc update*( except CatchableError as e: # we don't have any insight into types of exceptions that # `onAvailabilityAdded` can raise because it is caller-defined - warn "Unknown error during 'onAvailabilityAdded' callback", - availabilityId = obj.id, error = e.msg + warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg return res +proc update*( + self: Reservations, + obj: Reservation): Future[?!void] {.async.} = + return await self.updateImpl(obj) + +proc update*( + self: Reservations, + obj: Availability): Future[?!void] {.async.} = + withLock(self.availabilityLock): + return await self.updateAvailability(obj) + proc delete( self: Reservations, key: Key): Future[?!void] {.async.} = @@ -312,31 +333,32 @@ proc deleteReservation*( without key =? key(reservationId, availabilityId), error: return failure(error) - without reservation =? (await self.get(key, Reservation)), error: - if error of NotExistsError: - return success() - else: - return failure(error) + withLock(self.availabilityLock): + without reservation =? (await self.get(key, Reservation)), error: + if error of NotExistsError: + return success() + else: + return failure(error) - if reservation.size > 0.u256: - trace "returning remaining reservation bytes to availability", - size = reservation.size + if reservation.size > 0.u256: + trace "returning remaining reservation bytes to availability", + size = reservation.size - without availabilityKey =? availabilityId.key, error: - return failure(error) + without availabilityKey =? availabilityId.key, error: + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - availability.freeSize += reservation.size + availability.freeSize += reservation.size - if updateErr =? (await self.update(availability)).errorOption: - return failure(updateErr) + if updateErr =? (await self.updateAvailability(availability)).errorOption: + return failure(updateErr) - if err =? (await self.repo.metaDs.delete(key)).errorOption: - return failure(err.toErr(DeleteFailedError)) + if err =? (await self.repo.metaDs.delete(key)).errorOption: + return failure(err.toErr(DeleteFailedError)) - return success() + return success() # TODO: add support for deleting availabilities # To delete, must not have any active sales. @@ -370,54 +392,57 @@ proc createAvailability*( return success(availability) -proc createReservation*( +method createReservation*( self: Reservations, availabilityId: AvailabilityId, slotSize: UInt256, requestId: RequestId, slotIndex: UInt256 -): Future[?!Reservation] {.async.} = +): Future[?!Reservation] {.async, base.} = - trace "creating reservation", availabilityId, slotSize, requestId, slotIndex + withLock(self.availabilityLock): + without availabilityKey =? availabilityId.key, error: + return failure(error) - let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) + without availability =? await self.get(availabilityKey, Availability), error: + return failure(error) - without availabilityKey =? availabilityId.key, error: - return failure(error) + # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications + if availability.freeSize < slotSize: + let error = newException( + BytesOutOfBoundsError, + "trying to reserve an amount of bytes that is greater than the total size of the Availability") + return failure(error) - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) + trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex - if availability.freeSize < slotSize: - let error = newException( - BytesOutOfBoundsError, - "trying to reserve an amount of bytes that is greater than the total size of the Availability") - return failure(error) + let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) - if createResErr =? (await self.update(reservation)).errorOption: - return failure(createResErr) + if createResErr =? (await self.update(reservation)).errorOption: + return failure(createResErr) - # reduce availability freeSize by the slot size, which is now accounted for in - # the newly created Reservation - availability.freeSize -= slotSize + # reduce availability freeSize by the slot size, which is now accounted for in + # the newly created Reservation + availability.freeSize -= slotSize - # update availability with reduced size - if updateErr =? (await self.update(availability)).errorOption: + # update availability with reduced size + trace "Updating availability with reduced size" + if updateErr =? (await self.updateAvailability(availability)).errorOption: + trace "Updating availability failed, rolling back reservation creation" - trace "rolling back reservation creation" + without key =? reservation.key, keyError: + keyError.parent = updateErr + return failure(keyError) - without key =? reservation.key, keyError: - keyError.parent = updateErr - return failure(keyError) + # rollback the reservation creation + if rollbackErr =? (await self.delete(key)).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) - # rollback the reservation creation - if rollbackErr =? (await self.delete(key)).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) + return failure(updateErr) - return failure(updateErr) - - return success(reservation) + trace "Reservation succesfully created" + return success(reservation) proc returnBytesToAvailability*( self: Reservations, @@ -429,48 +454,48 @@ proc returnBytesToAvailability*( reservationId availabilityId + withLock(self.availabilityLock): + without key =? key(reservationId, availabilityId), error: + return failure(error) - without key =? key(reservationId, availabilityId), error: - return failure(error) + without var reservation =? (await self.get(key, Reservation)), error: + return failure(error) - without var reservation =? (await self.get(key, Reservation)), error: - return failure(error) + # We are ignoring bytes that are still present in the Reservation because + # they will be returned to Availability through `deleteReservation`. + let bytesToBeReturned = bytes - reservation.size - # We are ignoring bytes that are still present in the Reservation because - # they will be returned to Availability through `deleteReservation`. - let bytesToBeReturned = bytes - reservation.size + if bytesToBeReturned == 0: + trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + return success() + + trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned + + # First lets see if we can re-reserve the bytes, if the Repo's quota + # is depleted then we will fail-fast as there is nothing to be done atm. + if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: + return failure(reserveErr.toErr(ReserveFailedError)) + + without availabilityKey =? availabilityId.key, error: + return failure(error) + + without var availability =? await self.get(availabilityKey, Availability), error: + return failure(error) + + availability.freeSize += bytesToBeReturned + + # Update availability with returned size + if updateErr =? (await self.updateAvailability(availability)).errorOption: + + trace "Rolling back returning bytes" + if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: + rollbackErr.parent = updateErr + return failure(rollbackErr) + + return failure(updateErr) - if bytesToBeReturned == 0: - trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned return success() - trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned - - # First lets see if we can re-reserve the bytes, if the Repo's quota - # is depleted then we will fail-fast as there is nothing to be done atm. - if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption: - return failure(reserveErr.toErr(ReserveFailedError)) - - without availabilityKey =? availabilityId.key, error: - return failure(error) - - without var availability =? await self.get(availabilityKey, Availability), error: - return failure(error) - - availability.freeSize += bytesToBeReturned - - # Update availability with returned size - if updateErr =? (await self.update(availability)).errorOption: - - trace "Rolling back returning bytes" - if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption: - rollbackErr.parent = updateErr - return failure(rollbackErr) - - return failure(updateErr) - - return success() - proc release*( self: Reservations, reservationId: ReservationId, @@ -621,6 +646,7 @@ proc findAvailability*( minPrice >= availability.minPrice: trace "availability matched", + id = availability.id, size, availFreeSize = availability.freeSize, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, @@ -635,6 +661,7 @@ proc findAvailability*( return some availability trace "availability did not match", + id = availability.id, size, availFreeSize = availability.freeSize, duration, availDuration = availability.duration, minPrice, availMinPrice = availability.minPrice, diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index 973446e2..e5a441d3 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -1,5 +1,6 @@ import pkg/questionable import pkg/questionable/results +import pkg/metrics import ../../logutils import ../../market @@ -13,6 +14,8 @@ import ./ignored import ./downloading import ./errored +declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch") + type SalePreparing* = ref object of ErrorHandlingState @@ -78,7 +81,18 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = request.id, data.slotIndex ), error: + trace "Creation of reservation failed" + # Race condition: + # reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it. + # Should createReservation fail because there's no space, we proceed to SaleIgnored. + if error of BytesOutOfBoundsError: + # Lets monitor how often this happen and if it is often we can make it more inteligent to handle it + codex_reservations_availability_mismatch.inc() + return some State(SaleIgnored()) + return some State(SaleErrored(error: error)) + trace "Reservation created succesfully" + data.reservation = some reservation return some State(SaleDownloading()) diff --git a/tests/codex/helpers/mockreservations.nim b/tests/codex/helpers/mockreservations.nim new file mode 100644 index 00000000..cbc95bc1 --- /dev/null +++ b/tests/codex/helpers/mockreservations.nim @@ -0,0 +1,33 @@ +import pkg/chronos +import pkg/codex/sales +import pkg/codex/stores +import pkg/questionable/results + +type + MockReservations* = ref object of Reservations + createReservationThrowBytesOutOfBoundsError: bool + +proc new*( + T: type MockReservations, + repo: RepoStore +): MockReservations = + ## Create a mock clock instance + MockReservations(availabilityLock: newAsyncLock(), repo: repo) + +proc setCreateReservationThrowBytesOutOfBoundsError*(self: MockReservations, flag: bool) = + self.createReservationThrowBytesOutOfBoundsError = flag + +method createReservation*( + self: MockReservations, + availabilityId: AvailabilityId, + slotSize: UInt256, + requestId: RequestId, + slotIndex: UInt256): Future[?!Reservation] {.async.} = + if self.createReservationThrowBytesOutOfBoundsError: + let error = newException( + BytesOutOfBoundsError, + "trying to reserve an amount of bytes that is greater than the total size of the Availability") + return failure(error) + + return await procCall createReservation(Reservations(self), availabilityId, slotSize, requestId, slotIndex) + diff --git a/tests/codex/sales/states/testpreparing.nim b/tests/codex/sales/states/testpreparing.nim index 6f5d8c7f..c095d99e 100644 --- a/tests/codex/sales/states/testpreparing.nim +++ b/tests/codex/sales/states/testpreparing.nim @@ -1,20 +1,66 @@ -import std/unittest +import pkg/chronos import pkg/questionable +import pkg/datastore +import pkg/stew/byteutils import pkg/codex/contracts/requests +import pkg/codex/sales/states/preparing import pkg/codex/sales/states/downloading import pkg/codex/sales/states/cancelled import pkg/codex/sales/states/failed import pkg/codex/sales/states/filled +import pkg/codex/sales/states/ignored +import pkg/codex/sales/states/errored +import pkg/codex/sales/salesagent +import pkg/codex/sales/salescontext +import pkg/codex/sales/reservations +import pkg/codex/stores/repostore +import ../../../asynctest +import ../../helpers import ../../examples +import ../../helpers/mockmarket +import ../../helpers/mockreservations +import ../../helpers/mockclock -suite "sales state 'preparing'": - +asyncchecksuite "sales state 'preparing'": let request = StorageRequest.example let slotIndex = (request.ask.slots div 2).u256 + let market = MockMarket.new() + let clock = MockClock.new() + var agent: SalesAgent var state: SalePreparing + var repo: RepoStore + var availability: Availability + var context: SalesContext + var reservations: MockReservations setup: + availability = Availability( + totalSize: request.ask.slotSize + 100.u256, + freeSize: request.ask.slotSize + 100.u256, + duration: request.ask.duration + 60.u256, + minPrice: request.ask.pricePerSlot - 10.u256, + maxCollateral: request.ask.collateral + 400.u256 + ) + let repoDs = SQLiteDatastore.new(Memory).tryGet() + let metaDs = SQLiteDatastore.new(Memory).tryGet() + repo = RepoStore.new(repoDs, metaDs) + await repo.start() + state = SalePreparing.new() + context = SalesContext( + market: market, + clock: clock + ) + + reservations = MockReservations.new(repo) + context.reservations = reservations + agent = newSalesAgent(context, + request.id, + slotIndex, + request.some) + + teardown: + await repo.stop() test "switches to cancelled state when request expires": let next = state.onCancelled(request) @@ -27,3 +73,28 @@ suite "sales state 'preparing'": test "switches to filled state when slot is filled": let next = state.onSlotFilled(request.id, slotIndex) check !next of SaleFilled + + proc createAvailability() {.async.} = + let a = await reservations.createAvailability( + availability.totalSize, + availability.duration, + availability.minPrice, + availability.maxCollateral + ) + availability = a.get + + test "run switches to ignored when no availability": + let next = await state.run(agent) + check !next of SaleIgnored + + test "run switches to downloading when reserved": + await createAvailability() + let next = await state.run(agent) + check !next of SaleDownloading + + test "run switches to ignored when reserve fails with BytesOutOfBounds": + await createAvailability() + reservations.setCreateReservationThrowBytesOutOfBoundsError(true) + + let next = await state.run(agent) + check !next of SaleIgnored diff --git a/tests/codex/sales/testreservations.nim b/tests/codex/sales/testreservations.nim index 36256ec3..6e71bb5a 100644 --- a/tests/codex/sales/testreservations.nim +++ b/tests/codex/sales/testreservations.nim @@ -1,4 +1,5 @@ import std/random +import std/sequtils import pkg/questionable import pkg/questionable/results @@ -6,6 +7,7 @@ import pkg/chronos import pkg/datastore import pkg/codex/stores +import pkg/codex/errors import pkg/codex/sales import pkg/codex/utils/json @@ -13,6 +15,8 @@ import ../../asynctest import ../examples import ../helpers +const CONCURRENCY_TESTS_COUNT = 1000 + asyncchecksuite "Reservations module": var repo: RepoStore @@ -148,6 +152,39 @@ asyncchecksuite "Reservations module": check created.isErr check created.error of BytesOutOfBoundsError + test "cannot create reservation larger than availability size - concurrency test": + proc concurrencyTest(): Future[void] {.async.} = + let availability = createAvailability() + let one = reservations.createReservation( + availability.id, + availability.totalSize - 1, + RequestId.example, + UInt256.example + ) + + let two = reservations.createReservation( + availability.id, + availability.totalSize, + RequestId.example, + UInt256.example + ) + + let oneResult = await one + let twoResult = await two + + check oneResult.isErr or twoResult.isErr + if oneResult.isErr: + check oneResult.error of BytesOutOfBoundsError + if twoResult.isErr: + check twoResult.error of BytesOutOfBoundsError + + var futures: seq[Future[void]] + for _ in 1..CONCURRENCY_TESTS_COUNT: + futures.add(concurrencyTest()) + + await allFuturesThrowing(futures) + + test "creating reservation reduces availability size": let availability = createAvailability() let orig = availability.freeSize diff --git a/tests/codex/sales/teststates.nim b/tests/codex/sales/teststates.nim index ef562136..73810134 100644 --- a/tests/codex/sales/teststates.nim +++ b/tests/codex/sales/teststates.nim @@ -6,5 +6,9 @@ import ./states/testinitialproving import ./states/testfilled import ./states/testproving import ./states/testsimulatedproving +import ./states/testcancelled +import ./states/testerrored +import ./states/testignored +import ./states/testpreparing {.warning[UnusedImport]: off.}