Merge branch 'master' into fix/networkpeer-nullref
This commit is contained in:
commit
d1152117ca
|
@ -247,6 +247,22 @@ method canProofBeMarkedAsMissing*(
|
|||
trace "Proof cannot be marked as missing", msg = e.msg
|
||||
return false
|
||||
|
||||
method reserveSlot*(
|
||||
market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256) {.async.} =
|
||||
|
||||
convertEthersError:
|
||||
discard await market.contract.reserveSlot(requestId, slotIndex).confirm(0)
|
||||
|
||||
method canReserveSlot*(
|
||||
market: OnChainMarket,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256): Future[bool] {.async.} =
|
||||
|
||||
convertEthersError:
|
||||
return await market.contract.canReserveSlot(requestId, slotIndex)
|
||||
|
||||
method subscribeRequests*(market: OnChainMarket,
|
||||
callback: OnRequest):
|
||||
Future[MarketSubscription] {.async.} =
|
||||
|
|
|
@ -51,3 +51,6 @@ proc getPointer*(marketplace: Marketplace, id: SlotId): uint8 {.contract, view.}
|
|||
|
||||
proc submitProof*(marketplace: Marketplace, id: SlotId, proof: Groth16Proof): ?TransactionResponse {.contract.}
|
||||
proc markProofAsMissing*(marketplace: Marketplace, id: SlotId, period: UInt256): ?TransactionResponse {.contract.}
|
||||
|
||||
proc reserveSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256): ?TransactionResponse {.contract.}
|
||||
proc canReserveSlot*(marketplace: Marketplace, requestId: RequestId, slotIndex: UInt256): bool {.contract, view.}
|
||||
|
|
|
@ -161,6 +161,20 @@ method canProofBeMarkedAsMissing*(market: Market,
|
|||
period: Period): Future[bool] {.base, async.} =
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method reserveSlot*(
|
||||
market: Market,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256) {.base, async.} =
|
||||
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method canReserveSlot*(
|
||||
market: Market,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256): Future[bool] {.base, async.} =
|
||||
|
||||
raiseAssert("not implemented")
|
||||
|
||||
method subscribeFulfillment*(market: Market,
|
||||
callback: OnFulfillment):
|
||||
Future[Subscription] {.base, async.} =
|
||||
|
|
|
@ -8,8 +8,13 @@ import ./errorhandling
|
|||
logScope:
|
||||
topics = "marketplace sales ignored"
|
||||
|
||||
# Ignored slots could mean there was no availability or that the slot could
|
||||
# not be reserved.
|
||||
|
||||
type
|
||||
SaleIgnored* = ref object of ErrorHandlingState
|
||||
reprocessSlot*: bool # readd slot to queue with `seen` flag
|
||||
returnBytes*: bool # return unreleased bytes from Reservation to Availability
|
||||
|
||||
method `$`*(state: SaleIgnored): string = "SaleIgnored"
|
||||
|
||||
|
@ -17,7 +22,5 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
|
|||
let agent = SalesAgent(machine)
|
||||
|
||||
if onCleanUp =? agent.onCleanUp:
|
||||
# Ignored slots mean there was no availability. In order to prevent small
|
||||
# availabilities from draining the queue, mark this slot as seen and re-add
|
||||
# back into the queue.
|
||||
await onCleanUp(reprocessSlot = true)
|
||||
await onCleanUp(reprocessSlot = state.reprocessSlot,
|
||||
returnBytes = state.returnBytes)
|
||||
|
|
|
@ -11,7 +11,7 @@ import ./cancelled
|
|||
import ./failed
|
||||
import ./filled
|
||||
import ./ignored
|
||||
import ./downloading
|
||||
import ./slotreserving
|
||||
import ./errored
|
||||
|
||||
declareCounter(codex_reservations_availability_mismatch, "codex reservations availability_mismatch")
|
||||
|
@ -50,7 +50,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
|
|||
let slotId = slotId(data.requestId, data.slotIndex)
|
||||
let state = await market.slotState(slotId)
|
||||
if state != SlotState.Free:
|
||||
return some State(SaleIgnored())
|
||||
return some State(SaleIgnored(reprocessSlot: false, returnBytes: false))
|
||||
|
||||
# TODO: Once implemented, check to ensure the host is allowed to fill the slot,
|
||||
# due to the [sliding window mechanism](https://github.com/codex-storage/codex-research/blob/master/design/marketplace.md#dispersal)
|
||||
|
@ -71,7 +71,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
|
|||
request.ask.collateral):
|
||||
debug "No availability found for request, ignoring"
|
||||
|
||||
return some State(SaleIgnored())
|
||||
return some State(SaleIgnored(reprocessSlot: true))
|
||||
|
||||
info "Availability found for request, creating reservation"
|
||||
|
||||
|
@ -88,11 +88,11 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
|
|||
if error of BytesOutOfBoundsError:
|
||||
# Lets monitor how often this happen and if it is often we can make it more inteligent to handle it
|
||||
codex_reservations_availability_mismatch.inc()
|
||||
return some State(SaleIgnored())
|
||||
return some State(SaleIgnored(reprocessSlot: true))
|
||||
|
||||
return some State(SaleErrored(error: error))
|
||||
|
||||
trace "Reservation created succesfully"
|
||||
|
||||
data.reservation = some reservation
|
||||
return some State(SaleDownloading())
|
||||
return some State(SaleSlotReserving())
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
import pkg/questionable
|
||||
import pkg/questionable/results
|
||||
import pkg/metrics
|
||||
|
||||
import ../../logutils
|
||||
import ../../market
|
||||
import ../salesagent
|
||||
import ../statemachine
|
||||
import ./errorhandling
|
||||
import ./cancelled
|
||||
import ./failed
|
||||
import ./filled
|
||||
import ./ignored
|
||||
import ./downloading
|
||||
import ./errored
|
||||
|
||||
type
|
||||
SaleSlotReserving* = ref object of ErrorHandlingState
|
||||
|
||||
logScope:
|
||||
topics = "marketplace sales reserving"
|
||||
|
||||
method `$`*(state: SaleSlotReserving): string = "SaleSlotReserving"
|
||||
|
||||
method onCancelled*(state: SaleSlotReserving, request: StorageRequest): ?State =
|
||||
return some State(SaleCancelled())
|
||||
|
||||
method onFailed*(state: SaleSlotReserving, request: StorageRequest): ?State =
|
||||
return some State(SaleFailed())
|
||||
|
||||
method onSlotFilled*(state: SaleSlotReserving, requestId: RequestId,
|
||||
slotIndex: UInt256): ?State =
|
||||
return some State(SaleFilled())
|
||||
|
||||
method run*(state: SaleSlotReserving, machine: Machine): Future[?State] {.async.} =
|
||||
let agent = SalesAgent(machine)
|
||||
let data = agent.data
|
||||
let context = agent.context
|
||||
let market = context.market
|
||||
|
||||
logScope:
|
||||
requestId = data.requestId
|
||||
slotIndex = data.slotIndex
|
||||
|
||||
let canReserve = await market.canReserveSlot(data.requestId, data.slotIndex)
|
||||
if canReserve:
|
||||
try:
|
||||
trace "Reserving slot"
|
||||
await market.reserveSlot(data.requestId, data.slotIndex)
|
||||
except MarketError as e:
|
||||
return some State( SaleErrored(error: e) )
|
||||
|
||||
trace "Slot successfully reserved"
|
||||
return some State( SaleDownloading() )
|
||||
|
||||
else:
|
||||
# do not re-add this slot to the queue, and return bytes from Reservation to
|
||||
# the Availability
|
||||
debug "Slot cannot be reserved, ignoring"
|
||||
return some State( SaleIgnored(reprocessSlot: false, returnBytes: true) )
|
||||
|
|
@ -38,6 +38,8 @@ type
|
|||
signer: Address
|
||||
subscriptions: Subscriptions
|
||||
config*: MarketplaceConfig
|
||||
canReserveSlot*: bool
|
||||
reserveSlotThrowError*: ?(ref MarketError)
|
||||
Fulfillment* = object
|
||||
requestId*: RequestId
|
||||
proof*: Groth16Proof
|
||||
|
@ -105,7 +107,7 @@ proc new*(_: type MockMarket): MockMarket =
|
|||
downtimeProduct: 67.uint8
|
||||
)
|
||||
)
|
||||
MockMarket(signer: Address.example, config: config)
|
||||
MockMarket(signer: Address.example, config: config, canReserveSlot: true)
|
||||
|
||||
method getSigner*(market: MockMarket): Future[Address] {.async.} =
|
||||
return market.signer
|
||||
|
@ -303,6 +305,29 @@ method canProofBeMarkedAsMissing*(market: MockMarket,
|
|||
period: Period): Future[bool] {.async.} =
|
||||
return market.canBeMarkedAsMissing.contains(id)
|
||||
|
||||
method reserveSlot*(
|
||||
market: MockMarket,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256) {.async.} =
|
||||
|
||||
if error =? market.reserveSlotThrowError:
|
||||
raise error
|
||||
|
||||
method canReserveSlot*(
|
||||
market: MockMarket,
|
||||
requestId: RequestId,
|
||||
slotIndex: UInt256): Future[bool] {.async.} =
|
||||
|
||||
return market.canReserveSlot
|
||||
|
||||
func setCanReserveSlot*(market: MockMarket, canReserveSlot: bool) =
|
||||
market.canReserveSlot = canReserveSlot
|
||||
|
||||
func setReserveSlotThrowError*(
|
||||
market: MockMarket, error: ?(ref MarketError)) =
|
||||
|
||||
market.reserveSlotThrowError = error
|
||||
|
||||
method subscribeRequests*(market: MockMarket,
|
||||
callback: OnRequest):
|
||||
Future[Subscription] {.async.} =
|
||||
|
|
|
@ -6,6 +6,7 @@ import pkg/questionable/results
|
|||
type
|
||||
MockReservations* = ref object of Reservations
|
||||
createReservationThrowBytesOutOfBoundsError: bool
|
||||
createReservationThrowError: ?(ref CatchableError)
|
||||
|
||||
proc new*(
|
||||
T: type MockReservations,
|
||||
|
@ -14,9 +15,16 @@ proc new*(
|
|||
## Create a mock clock instance
|
||||
MockReservations(availabilityLock: newAsyncLock(), repo: repo)
|
||||
|
||||
proc setCreateReservationThrowBytesOutOfBoundsError*(self: MockReservations, flag: bool) =
|
||||
proc setCreateReservationThrowBytesOutOfBoundsError*(
|
||||
self: MockReservations, flag: bool) =
|
||||
|
||||
self.createReservationThrowBytesOutOfBoundsError = flag
|
||||
|
||||
proc setCreateReservationThrowError*(
|
||||
self: MockReservations, error: ?(ref CatchableError)) =
|
||||
|
||||
self.createReservationThrowError = error
|
||||
|
||||
method createReservation*(
|
||||
self: MockReservations,
|
||||
availabilityId: AvailabilityId,
|
||||
|
@ -29,5 +37,8 @@ method createReservation*(
|
|||
"trying to reserve an amount of bytes that is greater than the total size of the Availability")
|
||||
return failure(error)
|
||||
|
||||
elif error =? self.createReservationThrowError:
|
||||
return failure(error)
|
||||
|
||||
return await procCall createReservation(Reservations(self), availabilityId, slotSize, requestId, slotIndex)
|
||||
|
||||
|
|
|
@ -39,7 +39,8 @@ asyncchecksuite "sales state 'ignored'":
|
|||
agent.onCleanUp = onCleanUp
|
||||
state = SaleIgnored.new()
|
||||
|
||||
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
|
||||
test "calls onCleanUp with values assigned to SaleIgnored":
|
||||
state = SaleIgnored(reprocessSlot: true, returnBytes: true)
|
||||
discard await state.run(agent)
|
||||
check eventually returnBytesWas == false
|
||||
check eventually returnBytesWas == true
|
||||
check eventually reprocessSlotWas == true
|
||||
|
|
|
@ -4,7 +4,7 @@ import pkg/datastore
|
|||
import pkg/stew/byteutils
|
||||
import pkg/codex/contracts/requests
|
||||
import pkg/codex/sales/states/preparing
|
||||
import pkg/codex/sales/states/downloading
|
||||
import pkg/codex/sales/states/slotreserving
|
||||
import pkg/codex/sales/states/cancelled
|
||||
import pkg/codex/sales/states/failed
|
||||
import pkg/codex/sales/states/filled
|
||||
|
@ -84,17 +84,33 @@ asyncchecksuite "sales state 'preparing'":
|
|||
availability = a.get
|
||||
|
||||
test "run switches to ignored when no availability":
|
||||
let next = await state.run(agent)
|
||||
check !next of SaleIgnored
|
||||
let next = !(await state.run(agent))
|
||||
check next of SaleIgnored
|
||||
let ignored = SaleIgnored(next)
|
||||
check ignored.reprocessSlot
|
||||
check ignored.returnBytes == false
|
||||
|
||||
test "run switches to downloading when reserved":
|
||||
test "run switches to slot reserving state after reservation created":
|
||||
await createAvailability()
|
||||
let next = await state.run(agent)
|
||||
check !next of SaleDownloading
|
||||
check !next of SaleSlotReserving
|
||||
|
||||
test "run switches to ignored when reserve fails with BytesOutOfBounds":
|
||||
await createAvailability()
|
||||
reservations.setCreateReservationThrowBytesOutOfBoundsError(true)
|
||||
|
||||
let next = await state.run(agent)
|
||||
check !next of SaleIgnored
|
||||
let next = !(await state.run(agent))
|
||||
check next of SaleIgnored
|
||||
let ignored = SaleIgnored(next)
|
||||
check ignored.reprocessSlot
|
||||
check ignored.returnBytes == false
|
||||
|
||||
test "run switches to errored when reserve fails with other error":
|
||||
await createAvailability()
|
||||
let error = newException(CatchableError, "some error")
|
||||
reservations.setCreateReservationThrowError(some error)
|
||||
|
||||
let next = !(await state.run(agent))
|
||||
check next of SaleErrored
|
||||
let errored = SaleErrored(next)
|
||||
check errored.error == error
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
import pkg/chronos
|
||||
import pkg/questionable
|
||||
import pkg/codex/contracts/requests
|
||||
import pkg/codex/sales/states/slotreserving
|
||||
import pkg/codex/sales/states/downloading
|
||||
import pkg/codex/sales/states/cancelled
|
||||
import pkg/codex/sales/states/failed
|
||||
import pkg/codex/sales/states/filled
|
||||
import pkg/codex/sales/states/ignored
|
||||
import pkg/codex/sales/states/errored
|
||||
import pkg/codex/sales/salesagent
|
||||
import pkg/codex/sales/salescontext
|
||||
import pkg/codex/sales/reservations
|
||||
import pkg/codex/stores/repostore
|
||||
import ../../../asynctest
|
||||
import ../../helpers
|
||||
import ../../examples
|
||||
import ../../helpers/mockmarket
|
||||
import ../../helpers/mockreservations
|
||||
import ../../helpers/mockclock
|
||||
|
||||
asyncchecksuite "sales state 'SlotReserving'":
|
||||
let request = StorageRequest.example
|
||||
let slotIndex = (request.ask.slots div 2).u256
|
||||
var market: MockMarket
|
||||
var clock: MockClock
|
||||
var agent: SalesAgent
|
||||
var state: SaleSlotReserving
|
||||
var context: SalesContext
|
||||
|
||||
setup:
|
||||
market = MockMarket.new()
|
||||
clock = MockClock.new()
|
||||
|
||||
state = SaleSlotReserving.new()
|
||||
context = SalesContext(
|
||||
market: market,
|
||||
clock: clock
|
||||
)
|
||||
|
||||
agent = newSalesAgent(context,
|
||||
request.id,
|
||||
slotIndex,
|
||||
request.some)
|
||||
|
||||
test "switches to cancelled state when request expires":
|
||||
let next = state.onCancelled(request)
|
||||
check !next of SaleCancelled
|
||||
|
||||
test "switches to failed state when request fails":
|
||||
let next = state.onFailed(request)
|
||||
check !next of SaleFailed
|
||||
|
||||
test "switches to filled state when slot is filled":
|
||||
let next = state.onSlotFilled(request.id, slotIndex)
|
||||
check !next of SaleFilled
|
||||
|
||||
test "run switches to downloading when slot successfully reserved":
|
||||
let next = await state.run(agent)
|
||||
check !next of SaleDownloading
|
||||
|
||||
test "run switches to ignored when slot reservation not allowed":
|
||||
market.setCanReserveSlot(false)
|
||||
let next = await state.run(agent)
|
||||
check !next of SaleIgnored
|
||||
|
||||
test "run switches to errored when slot reservation errors":
|
||||
let error = newException(MarketError, "some error")
|
||||
market.setReserveSlotThrowError(some error)
|
||||
let next = !(await state.run(agent))
|
||||
check next of SaleErrored
|
||||
let errored = SaleErrored(next)
|
||||
check errored.error == error
|
|
@ -10,5 +10,6 @@ import ./states/testcancelled
|
|||
import ./states/testerrored
|
||||
import ./states/testignored
|
||||
import ./states/testpreparing
|
||||
import ./states/testslotreserving
|
||||
|
||||
{.warning[UnusedImport]: off.}
|
||||
|
|
|
@ -1 +1 @@
|
|||
Subproject commit 558bf645c3dc385437a3e695bba57e7dba1375fb
|
||||
Subproject commit 807fc973c875b5bde8f517c71c818ba8f2f720dd
|
Loading…
Reference in New Issue