mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-01-18 17:02:26 +00:00
de88fd2c53
* implement a logging proxy The logging proxy: - prevents the need to import chronicles (as well as export except toJson), - prevents the need to override `writeValue` or use or import nim-json-seralization elsewhere in the codebase, allowing for sole use of utils/json for de/serialization, - and handles json formatting correctly in chronicles json sinks * Rename logging -> logutils to avoid ambiguity with common names * clean up * add setProperty for JsonRecord, remove nim-json-serialization conflict * Allow specifying textlines and json format separately Not specifying a LogFormat will apply the formatting to both textlines and json sinks. Specifying a LogFormat will apply the formatting to only that sink. * remove unneeded usages of std/json We only need to import utils/json instead of std/json * move serialization from rest/json to utils/json so it can be shared * fix NoColors ambiguity Was causing unit tests to fail on Windows. * Remove nre usage to fix Windows error Windows was erroring with `could not load: pcre64.dll`. Instead of fixing that error, remove the pcre usage :) * Add logutils module doc * Shorten logutils.formatIt for `NBytes` Both json and textlines formatIt were not needed, and could be combined into one formatIt * remove debug integration test config debug output and logformat of json for integration test logs * Use ## module doc to support docgen * bump nim-poseidon2 to export fromBytes Before the changes in this branch, fromBytes was likely being resolved by nim-stew, or other dependency. With the changes in this branch, that dependency was removed and fromBytes could no longer be resolved. By exporting fromBytes from nim-poseidon, the correct resolution is now happening. * fixes to get compiling after rebasing master * Add support for Result types being logged using formatIt
132 lines
3.8 KiB
Nim
132 lines
3.8 KiB
Nim
import pkg/chronos
|
|
import pkg/questionable
|
|
import pkg/questionable/results
|
|
import pkg/stint
|
|
import pkg/upraises
|
|
import ../contracts/requests
|
|
import ../errors
|
|
import ../logutils
|
|
import ./statemachine
|
|
import ./salescontext
|
|
import ./salesdata
|
|
import ./reservations
|
|
|
|
export reservations
|
|
|
|
logScope:
|
|
topics = "marketplace sales"
|
|
|
|
type
|
|
SalesAgent* = ref object of Machine
|
|
context*: SalesContext
|
|
data*: SalesData
|
|
subscribed: bool
|
|
# Slot-level callbacks.
|
|
onCleanUp*: OnCleanUp
|
|
onFilled*: ?OnFilled
|
|
|
|
OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].}
|
|
OnFilled* = proc(request: StorageRequest,
|
|
slotIndex: UInt256) {.gcsafe, upraises: [].}
|
|
|
|
SalesAgentError = object of CodexError
|
|
AllSlotsFilledError* = object of SalesAgentError
|
|
|
|
func `==`*(a, b: SalesAgent): bool =
|
|
a.data.requestId == b.data.requestId and
|
|
a.data.slotIndex == b.data.slotIndex
|
|
|
|
proc newSalesAgent*(context: SalesContext,
|
|
requestId: RequestId,
|
|
slotIndex: UInt256,
|
|
request: ?StorageRequest): SalesAgent =
|
|
var agent = SalesAgent.new()
|
|
agent.context = context
|
|
agent.data = SalesData(
|
|
requestId: requestId,
|
|
slotIndex: slotIndex,
|
|
request: request)
|
|
return agent
|
|
|
|
proc retrieveRequest*(agent: SalesAgent) {.async.} =
|
|
let data = agent.data
|
|
let market = agent.context.market
|
|
if data.request.isNone:
|
|
data.request = await market.getRequest(data.requestId)
|
|
|
|
proc retrieveRequestState*(agent: SalesAgent): Future[?RequestState] {.async.} =
|
|
let data = agent.data
|
|
let market = agent.context.market
|
|
return await market.requestState(data.requestId)
|
|
|
|
func state*(agent: SalesAgent): ?string =
|
|
proc description(state: State): string =
|
|
$state
|
|
agent.query(description)
|
|
|
|
proc subscribeCancellation(agent: SalesAgent) {.async.} =
|
|
let data = agent.data
|
|
let clock = agent.context.clock
|
|
|
|
proc onCancelled() {.async.} =
|
|
without request =? data.request:
|
|
return
|
|
|
|
while true:
|
|
let deadline = max(clock.now, request.expiry.truncate(int64)) + 1
|
|
trace "Waiting for request to be cancelled", now=clock.now, expiry=deadline
|
|
await clock.waitUntil(deadline)
|
|
|
|
without state =? await agent.retrieveRequestState():
|
|
error "Uknown request", requestId = data.requestId
|
|
return
|
|
|
|
if state == RequestState.Cancelled:
|
|
agent.schedule(cancelledEvent(request))
|
|
break
|
|
|
|
debug "The request is not yet canceled, even though it should be. Waiting for some more time.", currentState = state, now=clock.now
|
|
|
|
data.cancelled = onCancelled()
|
|
|
|
method onFulfilled*(agent: SalesAgent, requestId: RequestId) {.base, gcsafe, upraises: [].} =
|
|
if agent.data.requestId == requestId and
|
|
not agent.data.cancelled.isNil:
|
|
agent.data.cancelled.cancel()
|
|
|
|
method onFailed*(agent: SalesAgent, requestId: RequestId) {.base, gcsafe, upraises: [].} =
|
|
without request =? agent.data.request:
|
|
return
|
|
if agent.data.requestId == requestId:
|
|
agent.schedule(failedEvent(request))
|
|
|
|
method onSlotFilled*(agent: SalesAgent,
|
|
requestId: RequestId,
|
|
slotIndex: UInt256) {.base, gcsafe, upraises: [].} =
|
|
|
|
if agent.data.requestId == requestId and
|
|
agent.data.slotIndex == slotIndex:
|
|
agent.schedule(slotFilledEvent(requestId, slotIndex))
|
|
|
|
proc subscribe*(agent: SalesAgent) {.async.} =
|
|
if agent.subscribed:
|
|
return
|
|
|
|
await agent.subscribeCancellation()
|
|
agent.subscribed = true
|
|
|
|
proc unsubscribe*(agent: SalesAgent) {.async.} =
|
|
if not agent.subscribed:
|
|
return
|
|
|
|
let data = agent.data
|
|
if not data.cancelled.isNil and not data.cancelled.finished:
|
|
await data.cancelled.cancelAndWait()
|
|
data.cancelled = nil
|
|
|
|
agent.subscribed = false
|
|
|
|
proc stop*(agent: SalesAgent) {.async.} =
|
|
await Machine(agent).stop()
|
|
await agent.unsubscribe()
|