Fix sales agent (#733)
* clock: add 1 second leeway before acting on timeouts * sales: do not raise in proving loop when slot is cancelled Allow the onCancelled callback to handle cancellation, and the onFailed callback to handle failed requests. * sales: cleanup proving tests * sales: fix sales agent tests * sales: stop cancellation loop when request started, finished or failed * sales: fix flaky test * sales: fix another flaky test * clock: add comment explaining the + 1 second Co-Authored-By: benbierens <thatbenbierens@gmail.com> --------- Co-authored-by: benbierens <thatbenbierens@gmail.com>
This commit is contained in:
parent
d1658d7b77
commit
e654e93c71
|
@ -29,7 +29,8 @@ method run*(state: PurchaseStarted, machine: Machine): Future[?State] {.async.}
|
||||||
failed.complete()
|
failed.complete()
|
||||||
let subscription = await market.subscribeRequestFailed(purchase.requestId, callback)
|
let subscription = await market.subscribeRequestFailed(purchase.requestId, callback)
|
||||||
|
|
||||||
let ended = clock.waitUntil(await market.getRequestEnd(purchase.requestId))
|
# Ensure that we're past the request end by waiting an additional second
|
||||||
|
let ended = clock.waitUntil((await market.getRequestEnd(purchase.requestId)) + 1)
|
||||||
let fut = await one(ended, failed)
|
let fut = await one(ended, failed)
|
||||||
await subscription.unsubscribe()
|
await subscription.unsubscribe()
|
||||||
if fut.id == failed.id:
|
if fut.id == failed.id:
|
||||||
|
|
|
@ -81,9 +81,14 @@ proc subscribeCancellation(agent: SalesAgent) {.async.} =
|
||||||
error "Uknown request", requestId = data.requestId
|
error "Uknown request", requestId = data.requestId
|
||||||
return
|
return
|
||||||
|
|
||||||
if state == RequestState.Cancelled:
|
case state
|
||||||
|
of New:
|
||||||
|
discard
|
||||||
|
of RequestState.Cancelled:
|
||||||
agent.schedule(cancelledEvent(request))
|
agent.schedule(cancelledEvent(request))
|
||||||
break
|
break
|
||||||
|
of RequestState.Started, RequestState.Finished, RequestState.Failed:
|
||||||
|
break
|
||||||
|
|
||||||
debug "The request is not yet canceled, even though it should be. Waiting for some more time.", currentState = state, now=clock.now
|
debug "The request is not yet canceled, even though it should be. Waiting for some more time.", currentState = state, now=clock.now
|
||||||
|
|
||||||
|
|
|
@ -62,24 +62,32 @@ proc proveLoop(
|
||||||
|
|
||||||
proc waitUntilPeriod(period: Period) {.async.} =
|
proc waitUntilPeriod(period: Period) {.async.} =
|
||||||
let periodicity = await market.periodicity()
|
let periodicity = await market.periodicity()
|
||||||
await clock.waitUntil(periodicity.periodStart(period).truncate(int64))
|
# Ensure that we're past the period boundary by waiting an additional second
|
||||||
|
await clock.waitUntil(periodicity.periodStart(period).truncate(int64) + 1)
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
let currentPeriod = await getCurrentPeriod()
|
let currentPeriod = await getCurrentPeriod()
|
||||||
let slotState = await market.slotState(slot.id)
|
let slotState = await market.slotState(slot.id)
|
||||||
if slotState == SlotState.Finished:
|
|
||||||
debug "Slot reached finished state", period = currentPeriod
|
|
||||||
return
|
|
||||||
|
|
||||||
if slotState != SlotState.Filled:
|
|
||||||
raise newException(SlotNotFilledError, "Slot is not in Filled state!")
|
|
||||||
|
|
||||||
|
case slotState
|
||||||
|
of SlotState.Filled:
|
||||||
debug "Proving for new period", period = currentPeriod
|
debug "Proving for new period", period = currentPeriod
|
||||||
|
|
||||||
if (await market.isProofRequired(slotId)) or (await market.willProofBeRequired(slotId)):
|
if (await market.isProofRequired(slotId)) or (await market.willProofBeRequired(slotId)):
|
||||||
let challenge = await market.getChallenge(slotId)
|
let challenge = await market.getChallenge(slotId)
|
||||||
debug "Proof is required", period = currentPeriod, challenge = challenge
|
debug "Proof is required", period = currentPeriod, challenge = challenge
|
||||||
await state.prove(slot, challenge, onProve, market, currentPeriod)
|
await state.prove(slot, challenge, onProve, market, currentPeriod)
|
||||||
|
of SlotState.Cancelled:
|
||||||
|
debug "Slot reached cancelled state"
|
||||||
|
# do nothing, let onCancelled callback take care of it
|
||||||
|
of SlotState.Failed:
|
||||||
|
debug "Slot reached failed state"
|
||||||
|
# do nothing, let onFailed callback take care of it
|
||||||
|
of SlotState.Finished:
|
||||||
|
debug "Slot reached finished state", period = currentPeriod
|
||||||
|
return # exit the loop
|
||||||
|
else:
|
||||||
|
let message = "Slot is not in Filled state, but in state: " & $slotState
|
||||||
|
raise newException(SlotNotFilledError, message)
|
||||||
|
|
||||||
debug "waiting until next period"
|
debug "waiting until next period"
|
||||||
await waitUntilPeriod(currentPeriod + 1)
|
await waitUntilPeriod(currentPeriod + 1)
|
||||||
|
|
|
@ -5,6 +5,7 @@ import pkg/codex/sales/states/proving
|
||||||
import pkg/codex/sales/states/cancelled
|
import pkg/codex/sales/states/cancelled
|
||||||
import pkg/codex/sales/states/failed
|
import pkg/codex/sales/states/failed
|
||||||
import pkg/codex/sales/states/payout
|
import pkg/codex/sales/states/payout
|
||||||
|
import pkg/codex/sales/states/errored
|
||||||
import pkg/codex/sales/salesagent
|
import pkg/codex/sales/salesagent
|
||||||
import pkg/codex/sales/salescontext
|
import pkg/codex/sales/salescontext
|
||||||
|
|
||||||
|
@ -41,7 +42,9 @@ asyncchecksuite "sales state 'proving'":
|
||||||
|
|
||||||
proc advanceToNextPeriod(market: Market) {.async.} =
|
proc advanceToNextPeriod(market: Market) {.async.} =
|
||||||
let periodicity = await market.periodicity()
|
let periodicity = await market.periodicity()
|
||||||
clock.advance(periodicity.seconds.truncate(int64))
|
let current = periodicity.periodOf(clock.now().u256)
|
||||||
|
let periodEnd = periodicity.periodEnd(current)
|
||||||
|
clock.set(periodEnd.truncate(int64) + 1)
|
||||||
|
|
||||||
test "switches to cancelled state when request expires":
|
test "switches to cancelled state when request expires":
|
||||||
let next = state.onCancelled(request)
|
let next = state.onCancelled(request)
|
||||||
|
@ -65,7 +68,7 @@ asyncchecksuite "sales state 'proving'":
|
||||||
market.setProofRequired(slot.id, true)
|
market.setProofRequired(slot.id, true)
|
||||||
await market.advanceToNextPeriod()
|
await market.advanceToNextPeriod()
|
||||||
|
|
||||||
check eventually receivedIds == @[slot.id]
|
check eventually receivedIds.contains(slot.id)
|
||||||
|
|
||||||
await future.cancelAndWait()
|
await future.cancelAndWait()
|
||||||
await subscription.unsubscribe()
|
await subscription.unsubscribe()
|
||||||
|
@ -81,6 +84,17 @@ asyncchecksuite "sales state 'proving'":
|
||||||
check eventually future.finished
|
check eventually future.finished
|
||||||
check !(future.read()) of SalePayout
|
check !(future.read()) of SalePayout
|
||||||
|
|
||||||
|
test "switches to error state when slot is no longer filled":
|
||||||
|
market.slotState[slot.id] = SlotState.Filled
|
||||||
|
|
||||||
|
let future = state.run(agent)
|
||||||
|
|
||||||
|
market.slotState[slot.id] = SlotState.Free
|
||||||
|
await market.advanceToNextPeriod()
|
||||||
|
|
||||||
|
check eventually future.finished
|
||||||
|
check !(future.read()) of SaleErrored
|
||||||
|
|
||||||
test "onProve callback provides proof challenge":
|
test "onProve callback provides proof challenge":
|
||||||
market.proofChallenge = ProofChallenge.example
|
market.proofChallenge = ProofChallenge.example
|
||||||
market.slotState[slot.id] = SlotState.Filled
|
market.slotState[slot.id] = SlotState.Filled
|
||||||
|
@ -88,4 +102,6 @@ asyncchecksuite "sales state 'proving'":
|
||||||
|
|
||||||
let future = state.run(agent)
|
let future = state.run(agent)
|
||||||
|
|
||||||
check receivedChallenge == market.proofChallenge
|
check eventually receivedChallenge == market.proofChallenge
|
||||||
|
|
||||||
|
await future.cancelAndWait()
|
||||||
|
|
|
@ -58,7 +58,9 @@ asyncchecksuite "sales state 'simulated-proving'":
|
||||||
|
|
||||||
proc advanceToNextPeriod(market: Market) {.async.} =
|
proc advanceToNextPeriod(market: Market) {.async.} =
|
||||||
let periodicity = await market.periodicity()
|
let periodicity = await market.periodicity()
|
||||||
clock.advance(periodicity.seconds.truncate(int64))
|
let current = periodicity.periodOf(clock.now().u256)
|
||||||
|
let periodEnd = periodicity.periodEnd(current)
|
||||||
|
clock.set(periodEnd.truncate(int64) + 1)
|
||||||
|
|
||||||
proc waitForProvingRounds(market: Market, rounds: int) {.async.} =
|
proc waitForProvingRounds(market: Market, rounds: int) {.async.} =
|
||||||
var rnds = rounds - 1 # proof round runs prior to advancing
|
var rnds = rounds - 1 # proof round runs prior to advancing
|
||||||
|
|
|
@ -5,7 +5,6 @@ import pkg/codex/sales/salesagent
|
||||||
import pkg/codex/sales/salescontext
|
import pkg/codex/sales/salescontext
|
||||||
import pkg/codex/sales/statemachine
|
import pkg/codex/sales/statemachine
|
||||||
import pkg/codex/sales/states/errorhandling
|
import pkg/codex/sales/states/errorhandling
|
||||||
import pkg/codex/proving
|
|
||||||
|
|
||||||
import ../../asynctest
|
import ../../asynctest
|
||||||
import ../helpers/mockmarket
|
import ../helpers/mockmarket
|
||||||
|
@ -73,7 +72,6 @@ asyncchecksuite "Sales agent":
|
||||||
request.id,
|
request.id,
|
||||||
slotIndex,
|
slotIndex,
|
||||||
some request)
|
some request)
|
||||||
request.expiry = (getTime() + initDuration(hours=1)).toUnix.u256
|
|
||||||
|
|
||||||
teardown:
|
teardown:
|
||||||
await agent.stop()
|
await agent.stop()
|
||||||
|
@ -108,12 +106,29 @@ asyncchecksuite "Sales agent":
|
||||||
await agent.unsubscribe()
|
await agent.unsubscribe()
|
||||||
|
|
||||||
test "current state onCancelled called when cancel emitted":
|
test "current state onCancelled called when cancel emitted":
|
||||||
let state = MockState.new()
|
agent.start(MockState.new())
|
||||||
agent.start(state)
|
|
||||||
await agent.subscribe()
|
await agent.subscribe()
|
||||||
clock.set(request.expiry.truncate(int64))
|
market.requestState[request.id] = RequestState.Cancelled
|
||||||
|
clock.set(request.expiry.truncate(int64) + 1)
|
||||||
check eventually onCancelCalled
|
check eventually onCancelCalled
|
||||||
|
|
||||||
|
for requestState in {RequestState.New, Started, Finished, Failed}:
|
||||||
|
test "onCancelled is not called when request state is " & $requestState:
|
||||||
|
agent.start(MockState.new())
|
||||||
|
await agent.subscribe()
|
||||||
|
market.requestState[request.id] = requestState
|
||||||
|
clock.set(request.expiry.truncate(int64) + 1)
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
check not onCancelCalled
|
||||||
|
|
||||||
|
for requestState in {RequestState.Started, Finished, Failed}:
|
||||||
|
test "cancelled future is finished when request state is " & $requestState:
|
||||||
|
agent.start(MockState.new())
|
||||||
|
await agent.subscribe()
|
||||||
|
market.requestState[request.id] = requestState
|
||||||
|
clock.set(request.expiry.truncate(int64) + 1)
|
||||||
|
check eventually agent.data.cancelled.finished
|
||||||
|
|
||||||
test "cancelled future is finished (cancelled) when onFulfilled called":
|
test "cancelled future is finished (cancelled) when onFulfilled called":
|
||||||
agent.start(MockState.new())
|
agent.start(MockState.new())
|
||||||
await agent.subscribe()
|
await agent.subscribe()
|
||||||
|
|
|
@ -98,7 +98,7 @@ asyncchecksuite "Purchasing":
|
||||||
let requestEnd = getTime().toUnix() + 42
|
let requestEnd = getTime().toUnix() + 42
|
||||||
market.requestEnds[request.id] = requestEnd
|
market.requestEnds[request.id] = requestEnd
|
||||||
market.emitRequestFulfilled(request.id)
|
market.emitRequestFulfilled(request.id)
|
||||||
clock.set(requestEnd)
|
clock.set(requestEnd + 1)
|
||||||
await purchase.wait()
|
await purchase.wait()
|
||||||
check purchase.error.isNone
|
check purchase.error.isNone
|
||||||
|
|
||||||
|
@ -229,7 +229,7 @@ checksuite "Purchasing state machine":
|
||||||
market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64)
|
market.requestEnds[request.id] = clock.now() + request.ask.duration.truncate(int64)
|
||||||
let future = PurchaseStarted().run(purchase)
|
let future = PurchaseStarted().run(purchase)
|
||||||
|
|
||||||
clock.advance(request.ask.duration.truncate(int64))
|
clock.advance(request.ask.duration.truncate(int64) + 1)
|
||||||
|
|
||||||
let next = await future
|
let next = await future
|
||||||
check !next of PurchaseFinished
|
check !next of PurchaseFinished
|
||||||
|
|
|
@ -2,5 +2,6 @@ import ./sales/testsales
|
||||||
import ./sales/teststates
|
import ./sales/teststates
|
||||||
import ./sales/testreservations
|
import ./sales/testreservations
|
||||||
import ./sales/testslotqueue
|
import ./sales/testslotqueue
|
||||||
|
import ./sales/testsalesagent
|
||||||
|
|
||||||
{.warning[UnusedImport]: off.}
|
{.warning[UnusedImport]: off.}
|
||||||
|
|
Loading…
Reference in New Issue