nim-dagger/codex/sales/salesagent.nim
markspanbroek e654e93c71
Fix sales agent (#733)
* clock: add 1 second leeway before acting on timeouts

* sales: do not raise in proving loop when slot is cancelled

Allow the onCancelled callback to handle cancellation, and
the onFailed callback to handle failed requests.

* sales: cleanup proving tests

* sales: fix sales agent tests

* sales: stop cancellation loop when request started, finished or failed

* sales: fix flaky test

* sales: fix another flaky test

* clock: add comment explaining the + 1 second

Co-Authored-By: benbierens <thatbenbierens@gmail.com>

---------

Co-authored-by: benbierens <thatbenbierens@gmail.com>
2024-03-12 06:41:03 +00:00

137 lines
3.9 KiB
Nim

import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/stint
import pkg/upraises
import ../contracts/requests
import ../errors
import ../logutils
import ./statemachine
import ./salescontext
import ./salesdata
import ./reservations
export reservations
logScope:
topics = "marketplace sales"
type
SalesAgent* = ref object of Machine
context*: SalesContext
data*: SalesData
subscribed: bool
# Slot-level callbacks.
onCleanUp*: OnCleanUp
onFilled*: ?OnFilled
OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
SalesAgentError = object of CodexError
AllSlotsFilledError* = object of SalesAgentError
func `==`*(a, b: SalesAgent): bool =
a.data.requestId == b.data.requestId and
a.data.slotIndex == b.data.slotIndex
proc newSalesAgent*(context: SalesContext,
requestId: RequestId,
slotIndex: UInt256,
request: ?StorageRequest): SalesAgent =
var agent = SalesAgent.new()
agent.context = context
agent.data = SalesData(
requestId: requestId,
slotIndex: slotIndex,
request: request)
return agent
proc retrieveRequest*(agent: SalesAgent) {.async.} =
let data = agent.data
let market = agent.context.market
if data.request.isNone:
data.request = await market.getRequest(data.requestId)
proc retrieveRequestState*(agent: SalesAgent): Future[?RequestState] {.async.} =
let data = agent.data
let market = agent.context.market
return await market.requestState(data.requestId)
func state*(agent: SalesAgent): ?string =
proc description(state: State): string =
$state
agent.query(description)
proc subscribeCancellation(agent: SalesAgent) {.async.} =
let data = agent.data
let clock = agent.context.clock
proc onCancelled() {.async.} =
without request =? data.request:
return
while true:
let deadline = max(clock.now, request.expiry.truncate(int64)) + 1
trace "Waiting for request to be cancelled", now=clock.now, expiry=deadline
await clock.waitUntil(deadline)
without state =? await agent.retrieveRequestState():
error "Uknown request", requestId = data.requestId
return
case state
of New:
discard
of RequestState.Cancelled:
agent.schedule(cancelledEvent(request))
break
of RequestState.Started, RequestState.Finished, RequestState.Failed:
break
debug "The request is not yet canceled, even though it should be. Waiting for some more time.", currentState = state, now=clock.now
data.cancelled = onCancelled()
method onFulfilled*(agent: SalesAgent, requestId: RequestId) {.base, gcsafe, upraises: [].} =
if agent.data.requestId == requestId and
not agent.data.cancelled.isNil:
agent.data.cancelled.cancel()
method onFailed*(agent: SalesAgent, requestId: RequestId) {.base, gcsafe, upraises: [].} =
without request =? agent.data.request:
return
if agent.data.requestId == requestId:
agent.schedule(failedEvent(request))
method onSlotFilled*(agent: SalesAgent,
requestId: RequestId,
slotIndex: UInt256) {.base, gcsafe, upraises: [].} =
if agent.data.requestId == requestId and
agent.data.slotIndex == slotIndex:
agent.schedule(slotFilledEvent(requestId, slotIndex))
proc subscribe*(agent: SalesAgent) {.async.} =
if agent.subscribed:
return
await agent.subscribeCancellation()
agent.subscribed = true
proc unsubscribe*(agent: SalesAgent) {.async.} =
if not agent.subscribed:
return
let data = agent.data
if not data.cancelled.isNil and not data.cancelled.finished:
await data.cancelled.cancelAndWait()
data.cancelled = nil
agent.subscribed = false
proc stop*(agent: SalesAgent) {.async.} =
await Machine(agent).stop()
await agent.unsubscribe()