fix: createReservation lock (#825)

* fix: createReservation lock

* fix: additional locking places

* fix: acquire lock

* chore: feedback

Co-authored-by: markspanbroek <mark@spanbroek.net>
Signed-off-by: Adam Uhlíř <adam@uhlir.dev>

* feat: withLock template and fixed tests

* fix: use proc for MockReservations constructor

* chore: feedback

Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
Signed-off-by: Adam Uhlíř <adam@uhlir.dev>

* chore: feedback implementation

---------

Signed-off-by: Adam Uhlíř <adam@uhlir.dev>
Co-authored-by: markspanbroek <mark@spanbroek.net>
Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
This commit is contained in:
Adam Uhlíř 2024-06-20 12:28:01 +02:00 committed by GitHub
parent 6e9bdf1d7e
commit 1a57341b7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 286 additions and 100 deletions

View File

@ -53,6 +53,7 @@ export logutils
logScope: logScope:
topics = "sales reservations" topics = "sales reservations"
type type
AvailabilityId* = distinct array[32, byte] AvailabilityId* = distinct array[32, byte]
ReservationId* = distinct array[32, byte] ReservationId* = distinct array[32, byte]
@ -71,7 +72,8 @@ type
size* {.serialize.}: UInt256 size* {.serialize.}: UInt256
requestId* {.serialize.}: RequestId requestId* {.serialize.}: RequestId
slotIndex* {.serialize.}: UInt256 slotIndex* {.serialize.}: UInt256
Reservations* = ref object Reservations* = ref object of RootObj
availabilityLock: AsyncLock # Lock for protecting assertions of availability's sizes when searching for matching availability
repo: RepoStore repo: RepoStore
onAvailabilityAdded: ?OnAvailabilityAdded onAvailabilityAdded: ?OnAvailabilityAdded
GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.} GetNext* = proc(): Future[?seq[byte]] {.upraises: [], gcsafe, closure.}
@ -95,12 +97,22 @@ const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet ReservationsKey = (SalesKey / "reservations").tryGet
proc hash*(x: AvailabilityId): Hash {.borrow.}
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.} proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
template withLock(lock, body) =
try:
await lock.acquire()
body
finally:
if lock.locked:
lock.release()
proc new*(T: type Reservations, proc new*(T: type Reservations,
repo: RepoStore): Reservations = repo: RepoStore): Reservations =
T(repo: repo) T(availabilityLock: newAsyncLock(),repo: repo)
proc init*( proc init*(
_: type Availability, _: type Availability,
@ -221,20 +233,19 @@ proc updateImpl(
return success() return success()
proc update*( proc updateAvailability(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)
proc update*(
self: Reservations, self: Reservations,
obj: Availability): Future[?!void] {.async.} = obj: Availability): Future[?!void] {.async.} =
logScope:
availabilityId = obj.id
without key =? obj.key, error: without key =? obj.key, error:
return failure(error) return failure(error)
without oldAvailability =? await self.get(key, Availability), err: without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError: if err of NotExistsError:
trace "Creating new Availability"
let res = await self.updateImpl(obj) let res = await self.updateImpl(obj)
# inform subscribers that Availability has been added # inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded: if onAvailabilityAdded =? self.onAvailabilityAdded:
@ -248,14 +259,14 @@ proc update*(
except CatchableError as e: except CatchableError as e:
# we don't have any insight into types of exceptions that # we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined # `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback", warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
availabilityId = obj.id, error = e.msg
return res return res
else: else:
return failure(err) return failure(err)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly # Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize: if oldAvailability.totalSize != obj.totalSize:
trace "totalSize changed, updating repo reservation"
if oldAvailability.totalSize < obj.totalSize: # storage added if oldAvailability.totalSize < obj.totalSize: # storage added
if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption: if reserveErr =? (await self.repo.reserve((obj.totalSize - oldAvailability.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError)) return failure(reserveErr.toErr(ReserveFailedError))
@ -280,11 +291,21 @@ proc update*(
except CatchableError as e: except CatchableError as e:
# we don't have any insight into types of exceptions that # we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined # `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback", warn "Unknown error during 'onAvailabilityAdded' callback", error = e.msg
availabilityId = obj.id, error = e.msg
return res return res
proc update*(
self: Reservations,
obj: Reservation): Future[?!void] {.async.} =
return await self.updateImpl(obj)
proc update*(
self: Reservations,
obj: Availability): Future[?!void] {.async.} =
withLock(self.availabilityLock):
return await self.updateAvailability(obj)
proc delete( proc delete(
self: Reservations, self: Reservations,
key: Key): Future[?!void] {.async.} = key: Key): Future[?!void] {.async.} =
@ -312,31 +333,32 @@ proc deleteReservation*(
without key =? key(reservationId, availabilityId), error: without key =? key(reservationId, availabilityId), error:
return failure(error) return failure(error)
without reservation =? (await self.get(key, Reservation)), error: withLock(self.availabilityLock):
if error of NotExistsError: without reservation =? (await self.get(key, Reservation)), error:
return success() if error of NotExistsError:
else: return success()
return failure(error) else:
return failure(error)
if reservation.size > 0.u256: if reservation.size > 0.u256:
trace "returning remaining reservation bytes to availability", trace "returning remaining reservation bytes to availability",
size = reservation.size size = reservation.size
without availabilityKey =? availabilityId.key, error: without availabilityKey =? availabilityId.key, error:
return failure(error) return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error: without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error) return failure(error)
availability.freeSize += reservation.size availability.freeSize += reservation.size
if updateErr =? (await self.update(availability)).errorOption: if updateErr =? (await self.updateAvailability(availability)).errorOption:
return failure(updateErr) return failure(updateErr)
if err =? (await self.repo.metaDs.delete(key)).errorOption: if err =? (await self.repo.metaDs.delete(key)).errorOption:
return failure(err.toErr(DeleteFailedError)) return failure(err.toErr(DeleteFailedError))
return success() return success()
# TODO: add support for deleting availabilities # TODO: add support for deleting availabilities
# To delete, must not have any active sales. # To delete, must not have any active sales.
@ -370,54 +392,57 @@ proc createAvailability*(
return success(availability) return success(availability)
proc createReservation*( method createReservation*(
self: Reservations, self: Reservations,
availabilityId: AvailabilityId, availabilityId: AvailabilityId,
slotSize: UInt256, slotSize: UInt256,
requestId: RequestId, requestId: RequestId,
slotIndex: UInt256 slotIndex: UInt256
): Future[?!Reservation] {.async.} = ): Future[?!Reservation] {.async, base.} =
trace "creating reservation", availabilityId, slotSize, requestId, slotIndex withLock(self.availabilityLock):
without availabilityKey =? availabilityId.key, error:
return failure(error)
let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex) without availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
without availabilityKey =? availabilityId.key, error: # Check that the found availability has enough free space after the lock has been acquired, to prevent asynchronous Availiability modifications
return failure(error) if availability.freeSize < slotSize:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error: trace "Creating reservation", availabilityId, slotSize, requestId, slotIndex
return failure(error)
if availability.freeSize < slotSize: let reservation = Reservation.init(availabilityId, slotSize, requestId, slotIndex)
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
if createResErr =? (await self.update(reservation)).errorOption: if createResErr =? (await self.update(reservation)).errorOption:
return failure(createResErr) return failure(createResErr)
# reduce availability freeSize by the slot size, which is now accounted for in # reduce availability freeSize by the slot size, which is now accounted for in
# the newly created Reservation # the newly created Reservation
availability.freeSize -= slotSize availability.freeSize -= slotSize
# update availability with reduced size # update availability with reduced size
if updateErr =? (await self.update(availability)).errorOption: trace "Updating availability with reduced size"
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Updating availability failed, rolling back reservation creation"
trace "rolling back reservation creation" without key =? reservation.key, keyError:
keyError.parent = updateErr
return failure(keyError)
without key =? reservation.key, keyError: # rollback the reservation creation
keyError.parent = updateErr if rollbackErr =? (await self.delete(key)).errorOption:
return failure(keyError) rollbackErr.parent = updateErr
return failure(rollbackErr)
# rollback the reservation creation return failure(updateErr)
if rollbackErr =? (await self.delete(key)).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr) trace "Reservation succesfully created"
return success(reservation)
return success(reservation)
proc returnBytesToAvailability*( proc returnBytesToAvailability*(
self: Reservations, self: Reservations,
@ -429,48 +454,48 @@ proc returnBytesToAvailability*(
reservationId reservationId
availabilityId availabilityId
withLock(self.availabilityLock):
without key =? key(reservationId, availabilityId), error:
return failure(error)
without key =? key(reservationId, availabilityId), error: without var reservation =? (await self.get(key, Reservation)), error:
return failure(error) return failure(error)
without var reservation =? (await self.get(key, Reservation)), error: # We are ignoring bytes that are still present in the Reservation because
return failure(error) # they will be returned to Availability through `deleteReservation`.
let bytesToBeReturned = bytes - reservation.size
# We are ignoring bytes that are still present in the Reservation because if bytesToBeReturned == 0:
# they will be returned to Availability through `deleteReservation`. trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
let bytesToBeReturned = bytes - reservation.size return success()
trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.updateAvailability(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
if bytesToBeReturned == 0:
trace "No bytes are returned", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
return success() return success()
trace "Returning bytes", requestSizeBytes = bytes, returningBytes = bytesToBeReturned
# First lets see if we can re-reserve the bytes, if the Repo's quota
# is depleted then we will fail-fast as there is nothing to be done atm.
if reserveErr =? (await self.repo.reserve(bytesToBeReturned.truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReserveFailedError))
without availabilityKey =? availabilityId.key, error:
return failure(error)
without var availability =? await self.get(availabilityKey, Availability), error:
return failure(error)
availability.freeSize += bytesToBeReturned
# Update availability with returned size
if updateErr =? (await self.update(availability)).errorOption:
trace "Rolling back returning bytes"
if rollbackErr =? (await self.repo.release(bytesToBeReturned.truncate(uint))).errorOption:
rollbackErr.parent = updateErr
return failure(rollbackErr)
return failure(updateErr)
return success()
proc release*( proc release*(
self: Reservations, self: Reservations,
reservationId: ReservationId, reservationId: ReservationId,
@ -621,6 +646,7 @@ proc findAvailability*(
minPrice >= availability.minPrice: minPrice >= availability.minPrice:
trace "availability matched", trace "availability matched",
id = availability.id,
size, availFreeSize = availability.freeSize, size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration, duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice, minPrice, availMinPrice = availability.minPrice,
@ -635,6 +661,7 @@ proc findAvailability*(
return some availability return some availability
trace "availability did not match", trace "availability did not match",
id = availability.id,
size, availFreeSize = availability.freeSize, size, availFreeSize = availability.freeSize,
duration, availDuration = availability.duration, duration, availDuration = availability.duration,
minPrice, availMinPrice = availability.minPrice, minPrice, availMinPrice = availability.minPrice,

View File

@ -1,5 +1,6 @@
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/metrics
import ../../logutils import ../../logutils
import ../../market import ../../market
@ -13,6 +14,8 @@ import ./ignored
import ./downloading import ./downloading
import ./errored import ./errored
declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch")
type type
SalePreparing* = ref object of ErrorHandlingState SalePreparing* = ref object of ErrorHandlingState
@ -78,7 +81,18 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
request.id, request.id,
data.slotIndex data.slotIndex
), error: ), error:
trace "Creation of reservation failed"
# Race condition:
# reservations.findAvailability (line 64) is no guarantee. You can never know for certain that the reservation can be created until after you have it.
# Should createReservation fail because there's no space, we proceed to SaleIgnored.
if error of BytesOutOfBoundsError:
# Lets monitor how often this happen and if it is often we can make it more inteligent to handle it
codex_reservations_availability_mismatch.inc()
return some State(SaleIgnored())
return some State(SaleErrored(error: error)) return some State(SaleErrored(error: error))
trace "Reservation created succesfully"
data.reservation = some reservation data.reservation = some reservation
return some State(SaleDownloading()) return some State(SaleDownloading())

View File

@ -0,0 +1,33 @@
import pkg/chronos
import pkg/codex/sales
import pkg/codex/stores
import pkg/questionable/results
type
MockReservations* = ref object of Reservations
createReservationThrowBytesOutOfBoundsError: bool
proc new*(
T: type MockReservations,
repo: RepoStore
): MockReservations =
## Create a mock clock instance
MockReservations(availabilityLock: newAsyncLock(), repo: repo)
proc setCreateReservationThrowBytesOutOfBoundsError*(self: MockReservations, flag: bool) =
self.createReservationThrowBytesOutOfBoundsError = flag
method createReservation*(
self: MockReservations,
availabilityId: AvailabilityId,
slotSize: UInt256,
requestId: RequestId,
slotIndex: UInt256): Future[?!Reservation] {.async.} =
if self.createReservationThrowBytesOutOfBoundsError:
let error = newException(
BytesOutOfBoundsError,
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
return failure(error)
return await procCall createReservation(Reservations(self), availabilityId, slotSize, requestId, slotIndex)

View File

@ -1,20 +1,66 @@
import std/unittest import pkg/chronos
import pkg/questionable import pkg/questionable
import pkg/datastore
import pkg/stew/byteutils
import pkg/codex/contracts/requests import pkg/codex/contracts/requests
import pkg/codex/sales/states/preparing
import pkg/codex/sales/states/downloading import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/failed import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled import pkg/codex/sales/states/filled
import pkg/codex/sales/states/ignored
import pkg/codex/sales/states/errored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/sales/reservations
import pkg/codex/stores/repostore
import ../../../asynctest
import ../../helpers
import ../../examples import ../../examples
import ../../helpers/mockmarket
import ../../helpers/mockreservations
import ../../helpers/mockclock
suite "sales state 'preparing'": asyncchecksuite "sales state 'preparing'":
let request = StorageRequest.example let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256 let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var agent: SalesAgent
var state: SalePreparing var state: SalePreparing
var repo: RepoStore
var availability: Availability
var context: SalesContext
var reservations: MockReservations
setup: setup:
availability = Availability(
totalSize: request.ask.slotSize + 100.u256,
freeSize: request.ask.slotSize + 100.u256,
duration: request.ask.duration + 60.u256,
minPrice: request.ask.pricePerSlot - 10.u256,
maxCollateral: request.ask.collateral + 400.u256
)
let repoDs = SQLiteDatastore.new(Memory).tryGet()
let metaDs = SQLiteDatastore.new(Memory).tryGet()
repo = RepoStore.new(repoDs, metaDs)
await repo.start()
state = SalePreparing.new() state = SalePreparing.new()
context = SalesContext(
market: market,
clock: clock
)
reservations = MockReservations.new(repo)
context.reservations = reservations
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
teardown:
await repo.stop()
test "switches to cancelled state when request expires": test "switches to cancelled state when request expires":
let next = state.onCancelled(request) let next = state.onCancelled(request)
@ -27,3 +73,28 @@ suite "sales state 'preparing'":
test "switches to filled state when slot is filled": test "switches to filled state when slot is filled":
let next = state.onSlotFilled(request.id, slotIndex) let next = state.onSlotFilled(request.id, slotIndex)
check !next of SaleFilled check !next of SaleFilled
proc createAvailability() {.async.} =
let a = await reservations.createAvailability(
availability.totalSize,
availability.duration,
availability.minPrice,
availability.maxCollateral
)
availability = a.get
test "run switches to ignored when no availability":
let next = await state.run(agent)
check !next of SaleIgnored
test "run switches to downloading when reserved":
await createAvailability()
let next = await state.run(agent)
check !next of SaleDownloading
test "run switches to ignored when reserve fails with BytesOutOfBounds":
await createAvailability()
reservations.setCreateReservationThrowBytesOutOfBoundsError(true)
let next = await state.run(agent)
check !next of SaleIgnored

View File

@ -1,4 +1,5 @@
import std/random import std/random
import std/sequtils
import pkg/questionable import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
@ -6,6 +7,7 @@ import pkg/chronos
import pkg/datastore import pkg/datastore
import pkg/codex/stores import pkg/codex/stores
import pkg/codex/errors
import pkg/codex/sales import pkg/codex/sales
import pkg/codex/utils/json import pkg/codex/utils/json
@ -13,6 +15,8 @@ import ../../asynctest
import ../examples import ../examples
import ../helpers import ../helpers
const CONCURRENCY_TESTS_COUNT = 1000
asyncchecksuite "Reservations module": asyncchecksuite "Reservations module":
var var
repo: RepoStore repo: RepoStore
@ -148,6 +152,39 @@ asyncchecksuite "Reservations module":
check created.isErr check created.isErr
check created.error of BytesOutOfBoundsError check created.error of BytesOutOfBoundsError
test "cannot create reservation larger than availability size - concurrency test":
proc concurrencyTest(): Future[void] {.async.} =
let availability = createAvailability()
let one = reservations.createReservation(
availability.id,
availability.totalSize - 1,
RequestId.example,
UInt256.example
)
let two = reservations.createReservation(
availability.id,
availability.totalSize,
RequestId.example,
UInt256.example
)
let oneResult = await one
let twoResult = await two
check oneResult.isErr or twoResult.isErr
if oneResult.isErr:
check oneResult.error of BytesOutOfBoundsError
if twoResult.isErr:
check twoResult.error of BytesOutOfBoundsError
var futures: seq[Future[void]]
for _ in 1..CONCURRENCY_TESTS_COUNT:
futures.add(concurrencyTest())
await allFuturesThrowing(futures)
test "creating reservation reduces availability size": test "creating reservation reduces availability size":
let availability = createAvailability() let availability = createAvailability()
let orig = availability.freeSize let orig = availability.freeSize

View File

@ -6,5 +6,9 @@ import ./states/testinitialproving
import ./states/testfilled import ./states/testfilled
import ./states/testproving import ./states/testproving
import ./states/testsimulatedproving import ./states/testsimulatedproving
import ./states/testcancelled
import ./states/testerrored
import ./states/testignored
import ./states/testpreparing
{.warning[UnusedImport]: off.} {.warning[UnusedImport]: off.}