feat(slot-reservations): Add SaleSlotReserving state (#917)
* convert EthersError to MarketError * change `canReserveSlot` and `reserveSlot` parameters Parameters for `canReserveSlot` and `reserveSlot` were changed from `SlotId` to `RequestId` and `UInt256 slotIndex`. * Add SaleSlotReserving Adds a new state, SaleSlotReserving, that attempts to reserve a slot before downloading. If the slot cannot be reserved, the state moves to SaleIgnored. On error, the state moves to SaleErrored. SaleIgnored is also updated to pass in `reprocessSlot` and `returnBytes`, controlling the behaviour in the Sales module after the slot is ignored. This is because previously it was assumed that SaleIgnored was only reached when there was no Availability. This is no longer the case, since SaleIgnored can now be reached when a slot cannot be reserved. * Update SalePreparing Specify `reprocessSlot` and `returnBytes` when moving to `SaleIgnored` from `SalePreparing`. Update tests to include test for a raised CatchableError. * Fix unit test * Modify `canReserveSlot` and `reverseSlot` params after rebase * Update MockMarket with new `canReserveSlot` and `reserveSlot` params * fix after rebase also bump codex-contracts-eth to master
This commit is contained in:
parent
b5ee57afd7
commit
4c51dca299
|
@ -260,7 +260,8 @@ method canReserveSlot*(
|
||||||
requestId: RequestId,
|
requestId: RequestId,
|
||||||
slotIndex: UInt256): Future[bool] {.async.} =
|
slotIndex: UInt256): Future[bool] {.async.} =
|
||||||
|
|
||||||
await market.contract.canReserveSlot(requestId, slotIndex)
|
convertEthersError:
|
||||||
|
return await market.contract.canReserveSlot(requestId, slotIndex)
|
||||||
|
|
||||||
method subscribeRequests*(market: OnChainMarket,
|
method subscribeRequests*(market: OnChainMarket,
|
||||||
callback: OnRequest):
|
callback: OnRequest):
|
||||||
|
|
|
@ -8,8 +8,13 @@ import ./errorhandling
|
||||||
logScope:
|
logScope:
|
||||||
topics = "marketplace sales ignored"
|
topics = "marketplace sales ignored"
|
||||||
|
|
||||||
|
# Ignored slots could mean there was no availability or that the slot could
|
||||||
|
# not be reserved.
|
||||||
|
|
||||||
type
|
type
|
||||||
SaleIgnored* = ref object of ErrorHandlingState
|
SaleIgnored* = ref object of ErrorHandlingState
|
||||||
|
reprocessSlot*: bool # readd slot to queue with `seen` flag
|
||||||
|
returnBytes*: bool # return unreleased bytes from Reservation to Availability
|
||||||
|
|
||||||
method `$`*(state: SaleIgnored): string = "SaleIgnored"
|
method `$`*(state: SaleIgnored): string = "SaleIgnored"
|
||||||
|
|
||||||
|
@ -17,7 +22,5 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
|
||||||
let agent = SalesAgent(machine)
|
let agent = SalesAgent(machine)
|
||||||
|
|
||||||
if onCleanUp =? agent.onCleanUp:
|
if onCleanUp =? agent.onCleanUp:
|
||||||
# Ignored slots mean there was no availability. In order to prevent small
|
await onCleanUp(reprocessSlot = state.reprocessSlot,
|
||||||
# availabilities from draining the queue, mark this slot as seen and re-add
|
returnBytes = state.returnBytes)
|
||||||
# back into the queue.
|
|
||||||
await onCleanUp(reprocessSlot = true)
|
|
||||||
|
|
|
@ -11,7 +11,7 @@ import ./cancelled
|
||||||
import ./failed
|
import ./failed
|
||||||
import ./filled
|
import ./filled
|
||||||
import ./ignored
|
import ./ignored
|
||||||
import ./downloading
|
import ./slotreserving
|
||||||
import ./errored
|
import ./errored
|
||||||
|
|
||||||
declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch")
|
declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch")
|
||||||
|
@ -50,7 +50,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
|
||||||
let slotId = slotId(data.requestId, data.slotIndex)
|
let slotId = slotId(data.requestId, data.slotIndex)
|
||||||
let state = await market.slotState(slotId)
|
let state = await market.slotState(slotId)
|
||||||
if state != SlotState.Free:
|
if state != SlotState.Free:
|
||||||
return some State(SaleIgnored())
|
return some State(SaleIgnored(reprocessSlot: false, returnBytes: false))
|
||||||
|
|
||||||
# TODO: Once implemented, check to ensure the host is allowed to fill the slot,
|
# TODO: Once implemented, check to ensure the host is allowed to fill the slot,
|
||||||
# due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal)
|
# due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal)
|
||||||
|
@ -71,7 +71,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
|
||||||
request.ask.collateral):
|
request.ask.collateral):
|
||||||
debug "No availability found for request, ignoring"
|
debug "No availability found for request, ignoring"
|
||||||
|
|
||||||
return some State(SaleIgnored())
|
return some State(SaleIgnored(reprocessSlot: true))
|
||||||
|
|
||||||
info "Availability found for request, creating reservation"
|
info "Availability found for request, creating reservation"
|
||||||
|
|
||||||
|
@ -88,11 +88,11 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
|
||||||
if error of BytesOutOfBoundsError:
|
if error of BytesOutOfBoundsError:
|
||||||
# Lets monitor how often this happen and if it is often we can make it more inteligent to handle it
|
# Lets monitor how often this happen and if it is often we can make it more inteligent to handle it
|
||||||
codex_reservations_availability_mismatch.inc()
|
codex_reservations_availability_mismatch.inc()
|
||||||
return some State(SaleIgnored())
|
return some State(SaleIgnored(reprocessSlot: true))
|
||||||
|
|
||||||
return some State(SaleErrored(error: error))
|
return some State(SaleErrored(error: error))
|
||||||
|
|
||||||
trace "Reservation created succesfully"
|
trace "Reservation created succesfully"
|
||||||
|
|
||||||
data.reservation = some reservation
|
data.reservation = some reservation
|
||||||
return some State(SaleDownloading())
|
return some State(SaleSlotReserving())
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/questionable/results
|
||||||
|
import pkg/metrics
|
||||||
|
|
||||||
|
import ../../logutils
|
||||||
|
import ../../market
|
||||||
|
import ../salesagent
|
||||||
|
import ../statemachine
|
||||||
|
import ./errorhandling
|
||||||
|
import ./cancelled
|
||||||
|
import ./failed
|
||||||
|
import ./filled
|
||||||
|
import ./ignored
|
||||||
|
import ./downloading
|
||||||
|
import ./errored
|
||||||
|
|
||||||
|
type
|
||||||
|
SaleSlotReserving* = ref object of ErrorHandlingState
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
topics = "marketplace sales reserving"
|
||||||
|
|
||||||
|
method `$`*(state: SaleSlotReserving): string = "SaleSlotReserving"
|
||||||
|
|
||||||
|
method onCancelled*(state: SaleSlotReserving, request: StorageRequest): ?State =
|
||||||
|
return some State(SaleCancelled())
|
||||||
|
|
||||||
|
method onFailed*(state: SaleSlotReserving, request: StorageRequest): ?State =
|
||||||
|
return some State(SaleFailed())
|
||||||
|
|
||||||
|
method onSlotFilled*(state: SaleSlotReserving, requestId: RequestId,
|
||||||
|
slotIndex: UInt256): ?State =
|
||||||
|
return some State(SaleFilled())
|
||||||
|
|
||||||
|
method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async.} =
|
||||||
|
let agent = SalesAgent(machine)
|
||||||
|
let data = agent.data
|
||||||
|
let context = agent.context
|
||||||
|
let market = context.market
|
||||||
|
|
||||||
|
logScope:
|
||||||
|
requestId = data.requestId
|
||||||
|
slotIndex = data.slotIndex
|
||||||
|
|
||||||
|
let canReserve = await market.canReserveSlot(data.requestId, data.slotIndex)
|
||||||
|
if canReserve:
|
||||||
|
try:
|
||||||
|
trace "Reserving slot"
|
||||||
|
await market.reserveSlot(data.requestId, data.slotIndex)
|
||||||
|
except MarketError as e:
|
||||||
|
return some State( SaleErrored(error: e) )
|
||||||
|
|
||||||
|
trace "Slot successfully reserved"
|
||||||
|
return some State( SaleDownloading() )
|
||||||
|
|
||||||
|
else:
|
||||||
|
# do not re-add this slot to the queue, and return bytes from Reservation to
|
||||||
|
# the Availability
|
||||||
|
debug "Slot cannot be reserved, ignoring"
|
||||||
|
return some State( SaleIgnored(reprocessSlot: false, returnBytes: true) )
|
||||||
|
|
|
@ -38,6 +38,8 @@ type
|
||||||
signer: Address
|
signer: Address
|
||||||
subscriptions: Subscriptions
|
subscriptions: Subscriptions
|
||||||
config*: MarketplaceConfig
|
config*: MarketplaceConfig
|
||||||
|
canReserveSlot*: bool
|
||||||
|
reserveSlotThrowError*: ?(ref MarketError)
|
||||||
Fulfillment* = object
|
Fulfillment* = object
|
||||||
requestId*: RequestId
|
requestId*: RequestId
|
||||||
proof*: Groth16Proof
|
proof*: Groth16Proof
|
||||||
|
@ -105,7 +107,7 @@ proc new*(_: type MockMarket): MockMarket =
|
||||||
downtimeProduct: 67.uint8
|
downtimeProduct: 67.uint8
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
MockMarket(signer: Address.example, config: config)
|
MockMarket(signer: Address.example, config: config, canReserveSlot: true)
|
||||||
|
|
||||||
method getSigner*(market: MockMarket): Future[Address] {.async.} =
|
method getSigner*(market: MockMarket): Future[Address] {.async.} =
|
||||||
return market.signer
|
return market.signer
|
||||||
|
@ -303,6 +305,29 @@ method canProofBeMarkedAsMissing*(market: MockMarket,
|
||||||
period: Period): Future[bool] {.async.} =
|
period: Period): Future[bool] {.async.} =
|
||||||
return market.canBeMarkedAsMissing.contains(id)
|
return market.canBeMarkedAsMissing.contains(id)
|
||||||
|
|
||||||
|
method reserveSlot*(
|
||||||
|
market: MockMarket,
|
||||||
|
requestId: RequestId,
|
||||||
|
slotIndex: UInt256) {.async.} =
|
||||||
|
|
||||||
|
if error =? market.reserveSlotThrowError:
|
||||||
|
raise error
|
||||||
|
|
||||||
|
method canReserveSlot*(
|
||||||
|
market: MockMarket,
|
||||||
|
requestId: RequestId,
|
||||||
|
slotIndex: UInt256): Future[bool] {.async.} =
|
||||||
|
|
||||||
|
return market.canReserveSlot
|
||||||
|
|
||||||
|
func setCanReserveSlot*(market: MockMarket, canReserveSlot: bool) =
|
||||||
|
market.canReserveSlot = canReserveSlot
|
||||||
|
|
||||||
|
func setReserveSlotThrowError*(
|
||||||
|
market: MockMarket, error: ?(ref MarketError)) =
|
||||||
|
|
||||||
|
market.reserveSlotThrowError = error
|
||||||
|
|
||||||
method subscribeRequests*(market: MockMarket,
|
method subscribeRequests*(market: MockMarket,
|
||||||
callback: OnRequest):
|
callback: OnRequest):
|
||||||
Future[Subscription] {.async.} =
|
Future[Subscription] {.async.} =
|
||||||
|
|
|
@ -6,6 +6,7 @@ import pkg/questionable/results
|
||||||
type
|
type
|
||||||
MockReservations* = ref object of Reservations
|
MockReservations* = ref object of Reservations
|
||||||
createReservationThrowBytesOutOfBoundsError: bool
|
createReservationThrowBytesOutOfBoundsError: bool
|
||||||
|
createReservationThrowError: ?(ref CatchableError)
|
||||||
|
|
||||||
proc new*(
|
proc new*(
|
||||||
T: type MockReservations,
|
T: type MockReservations,
|
||||||
|
@ -14,9 +15,16 @@ proc new*(
|
||||||
## Create a mock clock instance
|
## Create a mock clock instance
|
||||||
MockReservations(availabilityLock: newAsyncLock(), repo: repo)
|
MockReservations(availabilityLock: newAsyncLock(), repo: repo)
|
||||||
|
|
||||||
proc setCreateReservationThrowBytesOutOfBoundsError*(self: MockReservations, flag: bool) =
|
proc setCreateReservationThrowBytesOutOfBoundsError*(
|
||||||
|
self: MockReservations, flag: bool) =
|
||||||
|
|
||||||
self.createReservationThrowBytesOutOfBoundsError = flag
|
self.createReservationThrowBytesOutOfBoundsError = flag
|
||||||
|
|
||||||
|
proc setCreateReservationThrowError*(
|
||||||
|
self: MockReservations, error: ?(ref CatchableError)) =
|
||||||
|
|
||||||
|
self.createReservationThrowError = error
|
||||||
|
|
||||||
method createReservation*(
|
method createReservation*(
|
||||||
self: MockReservations,
|
self: MockReservations,
|
||||||
availabilityId: AvailabilityId,
|
availabilityId: AvailabilityId,
|
||||||
|
@ -29,5 +37,8 @@ method createReservation*(
|
||||||
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
|
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
|
||||||
return failure(error)
|
return failure(error)
|
||||||
|
|
||||||
|
elif error =? self.createReservationThrowError:
|
||||||
|
return failure(error)
|
||||||
|
|
||||||
return await procCall createReservation(Reservations(self), availabilityId, slotSize, requestId, slotIndex)
|
return await procCall createReservation(Reservations(self), availabilityId, slotSize, requestId, slotIndex)
|
||||||
|
|
||||||
|
|
|
@ -39,7 +39,8 @@ asyncchecksuite "sales state 'ignored'":
|
||||||
agent.onCleanUp = onCleanUp
|
agent.onCleanUp = onCleanUp
|
||||||
state = SaleIgnored.new()
|
state = SaleIgnored.new()
|
||||||
|
|
||||||
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
|
test "calls onCleanUp with values assigned to SaleIgnored":
|
||||||
|
state = SaleIgnored(reprocessSlot: true, returnBytes: true)
|
||||||
discard await state.run(agent)
|
discard await state.run(agent)
|
||||||
check eventually returnBytesWas == false
|
check eventually returnBytesWas == true
|
||||||
check eventually reprocessSlotWas == true
|
check eventually reprocessSlotWas == true
|
||||||
|
|
|
@ -4,7 +4,7 @@ import pkg/datastore
|
||||||
import pkg/stew/byteutils
|
import pkg/stew/byteutils
|
||||||
import pkg/codex/contracts/requests
|
import pkg/codex/contracts/requests
|
||||||
import pkg/codex/sales/states/preparing
|
import pkg/codex/sales/states/preparing
|
||||||
import pkg/codex/sales/states/downloading
|
import pkg/codex/sales/states/slotreserving
|
||||||
import pkg/codex/sales/states/cancelled
|
import pkg/codex/sales/states/cancelled
|
||||||
import pkg/codex/sales/states/failed
|
import pkg/codex/sales/states/failed
|
||||||
import pkg/codex/sales/states/filled
|
import pkg/codex/sales/states/filled
|
||||||
|
@ -84,17 +84,33 @@ asyncchecksuite "sales state 'preparing'":
|
||||||
availability = a.get
|
availability = a.get
|
||||||
|
|
||||||
test "run switches to ignored when no availability":
|
test "run switches to ignored when no availability":
|
||||||
let next = await state.run(agent)
|
let next = !(await state.run(agent))
|
||||||
check !next of SaleIgnored
|
check next of SaleIgnored
|
||||||
|
let ignored = SaleIgnored(next)
|
||||||
|
check ignored.reprocessSlot
|
||||||
|
check ignored.returnBytes == false
|
||||||
|
|
||||||
test "run switches to downloading when reserved":
|
test "run switches to slot reserving state after reservation created":
|
||||||
await createAvailability()
|
await createAvailability()
|
||||||
let next = await state.run(agent)
|
let next = await state.run(agent)
|
||||||
check !next of SaleDownloading
|
check !next of SaleSlotReserving
|
||||||
|
|
||||||
test "run switches to ignored when reserve fails with BytesOutOfBounds":
|
test "run switches to ignored when reserve fails with BytesOutOfBounds":
|
||||||
await createAvailability()
|
await createAvailability()
|
||||||
reservations.setCreateReservationThrowBytesOutOfBoundsError(true)
|
reservations.setCreateReservationThrowBytesOutOfBoundsError(true)
|
||||||
|
|
||||||
let next = await state.run(agent)
|
let next = !(await state.run(agent))
|
||||||
check !next of SaleIgnored
|
check next of SaleIgnored
|
||||||
|
let ignored = SaleIgnored(next)
|
||||||
|
check ignored.reprocessSlot
|
||||||
|
check ignored.returnBytes == false
|
||||||
|
|
||||||
|
test "run switches to errored when reserve fails with other error":
|
||||||
|
await createAvailability()
|
||||||
|
let error = newException(CatchableError, "some error")
|
||||||
|
reservations.setCreateReservationThrowError(some error)
|
||||||
|
|
||||||
|
let next = !(await state.run(agent))
|
||||||
|
check next of SaleErrored
|
||||||
|
let errored = SaleErrored(next)
|
||||||
|
check errored.error == error
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
import pkg/chronos
|
||||||
|
import pkg/questionable
|
||||||
|
import pkg/codex/contracts/requests
|
||||||
|
import pkg/codex/sales/states/slotreserving
|
||||||
|
import pkg/codex/sales/states/downloading
|
||||||
|
import pkg/codex/sales/states/cancelled
|
||||||
|
import pkg/codex/sales/states/failed
|
||||||
|
import pkg/codex/sales/states/filled
|
||||||
|
import pkg/codex/sales/states/ignored
|
||||||
|
import pkg/codex/sales/states/errored
|
||||||
|
import pkg/codex/sales/salesagent
|
||||||
|
import pkg/codex/sales/salescontext
|
||||||
|
import pkg/codex/sales/reservations
|
||||||
|
import pkg/codex/stores/repostore
|
||||||
|
import ../../../asynctest
|
||||||
|
import ../../helpers
|
||||||
|
import ../../examples
|
||||||
|
import ../../helpers/mockmarket
|
||||||
|
import ../../helpers/mockreservations
|
||||||
|
import ../../helpers/mockclock
|
||||||
|
|
||||||
|
asyncchecksuite "sales state 'SlotReserving'":
|
||||||
|
let request = StorageRequest.example
|
||||||
|
let slotIndex = (request.ask.slots div 2).u256
|
||||||
|
var market: MockMarket
|
||||||
|
var clock: MockClock
|
||||||
|
var agent: SalesAgent
|
||||||
|
var state: SaleSlotReserving
|
||||||
|
var context: SalesContext
|
||||||
|
|
||||||
|
setup:
|
||||||
|
market = MockMarket.new()
|
||||||
|
clock = MockClock.new()
|
||||||
|
|
||||||
|
state = SaleSlotReserving.new()
|
||||||
|
context = SalesContext(
|
||||||
|
market: market,
|
||||||
|
clock: clock
|
||||||
|
)
|
||||||
|
|
||||||
|
agent = newSalesAgent(context,
|
||||||
|
request.id,
|
||||||
|
slotIndex,
|
||||||
|
request.some)
|
||||||
|
|
||||||
|
test "switches to cancelled state when request expires":
|
||||||
|
let next = state.onCancelled(request)
|
||||||
|
check !next of SaleCancelled
|
||||||
|
|
||||||
|
test "switches to failed state when request fails":
|
||||||
|
let next = state.onFailed(request)
|
||||||
|
check !next of SaleFailed
|
||||||
|
|
||||||
|
test "switches to filled state when slot is filled":
|
||||||
|
let next = state.onSlotFilled(request.id, slotIndex)
|
||||||
|
check !next of SaleFilled
|
||||||
|
|
||||||
|
test "run switches to downloading when slot successfully reserved":
|
||||||
|
let next = await state.run(agent)
|
||||||
|
check !next of SaleDownloading
|
||||||
|
|
||||||
|
test "run switches to ignored when slot reservation not allowed":
|
||||||
|
market.setCanReserveSlot(false)
|
||||||
|
let next = await state.run(agent)
|
||||||
|
check !next of SaleIgnored
|
||||||
|
|
||||||
|
test "run switches to errored when slot reservation errors":
|
||||||
|
let error = newException(MarketError, "some error")
|
||||||
|
market.setReserveSlotThrowError(some error)
|
||||||
|
let next = !(await state.run(agent))
|
||||||
|
check next of SaleErrored
|
||||||
|
let errored = SaleErrored(next)
|
||||||
|
check errored.error == error
|
|
@ -10,5 +10,6 @@ import ./states/testcancelled
|
||||||
import ./states/testerrored
|
import ./states/testerrored
|
||||||
import ./states/testignored
|
import ./states/testignored
|
||||||
import ./states/testpreparing
|
import ./states/testpreparing
|
||||||
|
import ./states/testslotreserving
|
||||||
|
|
||||||
{.warning[UnusedImport]: off.}
|
{.warning[UnusedImport]: off.}
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 33010bd20cfdc3d589be25782052796af580ca83
|
Subproject commit 807fc973c875b5bde8f517c71c818ba8f2f720dd
|
Loading…
Reference in New Issue