mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-03 22:13:12 +00:00
* Add availability enabled parameter * Return bytes to availability when finished * Add until parameter * Remove debug message * Clean up and fix tests * Update documentations and cleanup * Avoid swallowing CancelledError * Move until validation to reservations module * Call onAvailabilityAdded callabck when the availability is enabled in sales * Remove until validation in restapi when creating an availability * Add openapi documentation * Use results instead of stew/results (#1112) * feat: request duration limit (#1057) * feat: request duration limit * Fix tests and duration type * Add custom error * Remove merge issue * Update codex contracts eth * Update market config and fix test * Fix SlotReservationsConfig syntax * Update dependencies * test: remove doubled test * chore: update contracts repo --------- Co-authored-by: Arnaud <arnaud@status.im> * fix(statemachine): do not raise from state.run (#1115) * fix(statemachine): do not raise from state.run * fix rebase * fix exception handling in SaleProvingSimulated.prove - re-raise CancelledError - don't return State on CatchableError - expect the Proofs_InvalidProof custom error instead of checking a string * asyncSpawn salesagent.onCancelled This was swallowing a KeyError in one of the tests (fixed in the previous commit) * remove error handling states in asyncstatemachine * revert unneeded changes * formatting * PR feedback, logging updates * chore(integration): simplify block expiration integration test (#1100) * chore(integration): simplify block expiration integration test * clean up * fix after rebase * perf: contract storage optimizations (#1094) * perf: contract storage optimizations * Apply optimization changes * Apply optimizing parameters sizing * Update codex-contracts-eth * bump latest changes in contracts branch * Change requestDurationLimit to uint64 * fix tests * fix tests --------- Co-authored-by: Arnaud <arnaud@status.im> Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com> * bump contracts to master (#1122) * Add availability enabled parameter * Return bytes to availability when finished * Add until parameter * Clean up and fix tests * Move until validation to reservations module * Apply suggestion changes: return the reservation module error * Apply suggestion changes for until dates * Apply suggestion changes: reorganize tests * Fix indent * Remove test related to timing issue * Add raises errors to async pragram and remove useless try except * Update open api documentation * Fix wording * Remove the httpClient restart statements * Use market.getRequestEnd to set validUntil * Remove returnBytes * Use clock.now in testing * Move the api validation file to the right file --------- Co-authored-by: Adam Uhlíř <adam@uhlir.dev> Co-authored-by: Eric <5089238+emizzle@users.noreply.github.com>
149 lines
4.3 KiB
Nim
149 lines
4.3 KiB
Nim
import pkg/chronos
|
|
import pkg/questionable
|
|
import pkg/questionable/results
|
|
import pkg/stint
|
|
import pkg/upraises
|
|
import ../contracts/requests
|
|
import ../errors
|
|
import ../logutils
|
|
import ../utils/exceptions
|
|
import ./statemachine
|
|
import ./salescontext
|
|
import ./salesdata
|
|
import ./reservations
|
|
|
|
export reservations
|
|
|
|
logScope:
|
|
topics = "marketplace sales"
|
|
|
|
type
|
|
SalesAgent* = ref object of Machine
|
|
context*: SalesContext
|
|
data*: SalesData
|
|
subscribed: bool
|
|
# Slot-level callbacks.
|
|
onCleanUp*: OnCleanUp
|
|
onFilled*: ?OnFilled
|
|
|
|
OnCleanUp* = proc(
|
|
reprocessSlot = false, returnedCollateral = UInt256.none
|
|
): Future[void] {.gcsafe, upraises: [].}
|
|
OnFilled* = proc(request: StorageRequest, slotIndex: uint64) {.gcsafe, upraises: [].}
|
|
|
|
SalesAgentError = object of CodexError
|
|
AllSlotsFilledError* = object of SalesAgentError
|
|
|
|
func `==`*(a, b: SalesAgent): bool =
|
|
a.data.requestId == b.data.requestId and a.data.slotIndex == b.data.slotIndex
|
|
|
|
proc newSalesAgent*(
|
|
context: SalesContext,
|
|
requestId: RequestId,
|
|
slotIndex: uint64,
|
|
request: ?StorageRequest,
|
|
): SalesAgent =
|
|
var agent = SalesAgent.new()
|
|
agent.context = context
|
|
agent.data = SalesData(requestId: requestId, slotIndex: slotIndex, request: request)
|
|
return agent
|
|
|
|
proc retrieveRequest*(agent: SalesAgent) {.async.} =
|
|
let data = agent.data
|
|
let market = agent.context.market
|
|
if data.request.isNone:
|
|
data.request = await market.getRequest(data.requestId)
|
|
|
|
proc retrieveRequestState*(agent: SalesAgent): Future[?RequestState] {.async.} =
|
|
let data = agent.data
|
|
let market = agent.context.market
|
|
return await market.requestState(data.requestId)
|
|
|
|
func state*(agent: SalesAgent): ?string =
|
|
proc description(state: State): string =
|
|
$state
|
|
|
|
agent.query(description)
|
|
|
|
proc subscribeCancellation(agent: SalesAgent) {.async.} =
|
|
let data = agent.data
|
|
let clock = agent.context.clock
|
|
|
|
proc onCancelled() {.async: (raises: []).} =
|
|
without request =? data.request:
|
|
return
|
|
|
|
try:
|
|
let market = agent.context.market
|
|
let expiry = await market.requestExpiresAt(data.requestId)
|
|
|
|
while true:
|
|
let deadline = max(clock.now, expiry) + 1
|
|
trace "Waiting for request to be cancelled", now = clock.now, expiry = deadline
|
|
await clock.waitUntil(deadline)
|
|
|
|
without state =? await agent.retrieveRequestState():
|
|
error "Unknown request", requestId = data.requestId
|
|
return
|
|
|
|
case state
|
|
of New:
|
|
discard
|
|
of RequestState.Cancelled:
|
|
agent.schedule(cancelledEvent(request))
|
|
break
|
|
of RequestState.Started, RequestState.Finished, RequestState.Failed:
|
|
break
|
|
|
|
debug "The request is not yet canceled, even though it should be. Waiting for some more time.",
|
|
currentState = state, now = clock.now
|
|
except CancelledError:
|
|
trace "Waiting for expiry to lapse was cancelled", requestId = data.requestId
|
|
except CatchableError as e:
|
|
error "Error while waiting for expiry to lapse", error = e.msgDetail
|
|
|
|
data.cancelled = onCancelled()
|
|
|
|
method onFulfilled*(
|
|
agent: SalesAgent, requestId: RequestId
|
|
) {.base, gcsafe, upraises: [].} =
|
|
let cancelled = agent.data.cancelled
|
|
if agent.data.requestId == requestId and not cancelled.isNil and not cancelled.finished:
|
|
cancelled.cancelSoon()
|
|
|
|
method onFailed*(
|
|
agent: SalesAgent, requestId: RequestId
|
|
) {.base, gcsafe, upraises: [].} =
|
|
without request =? agent.data.request:
|
|
return
|
|
if agent.data.requestId == requestId:
|
|
agent.schedule(failedEvent(request))
|
|
|
|
method onSlotFilled*(
|
|
agent: SalesAgent, requestId: RequestId, slotIndex: uint64
|
|
) {.base, gcsafe, upraises: [].} =
|
|
if agent.data.requestId == requestId and agent.data.slotIndex == slotIndex:
|
|
agent.schedule(slotFilledEvent(requestId, slotIndex))
|
|
|
|
proc subscribe*(agent: SalesAgent) {.async.} =
|
|
if agent.subscribed:
|
|
return
|
|
|
|
await agent.subscribeCancellation()
|
|
agent.subscribed = true
|
|
|
|
proc unsubscribe*(agent: SalesAgent) {.async.} =
|
|
if not agent.subscribed:
|
|
return
|
|
|
|
let data = agent.data
|
|
if not data.cancelled.isNil and not data.cancelled.finished:
|
|
await data.cancelled.cancelAndWait()
|
|
data.cancelled = nil
|
|
|
|
agent.subscribed = false
|
|
|
|
proc stop*(agent: SalesAgent) {.async.} =
|
|
await Machine(agent).stop()
|
|
await agent.unsubscribe()
|