mirror of
synced 2025-03-03 15:50:57 +00:00
* Indicate that slot is being repaired when trying to download * Fix tests * Apply nph * Calculate the repair collateral when adding the item into the queue * Add slotCollateral calculation with getRequest cache and remove populationItem function * Update with pricePerByte * Simplify StorageAsk parameter * Minor fixes * Move cache request to another PR * Rename SlotQueueItem collateral and required in init * Use override func to optimise calls when the slot state is known * Remove unused code * Cosmetic change * Use raiseMarketError helper * Add exceptions to async pragma * Cosmetic change * Use raiseMarketError helper * Let slotCollateral determines the slot sate * Use configSync to avoid async pragma in onStorageRequested * Add loadConfig function * Add CatchableError to async pragma * Add missing pragma raises errors * Move loadConfig * Avoid swallow CancelledError * Avoid swallowing CancelledError * Avoid swallowing CancelledError * Update error messages * Except MarketError instead of CatchableError * Fix merge issue * Log fatal when configuration cannot be loaded * Propagate MarketError in slotCollateral * Remove useless configSync * Use result with explicit error * Fix syntax --------- Signed-off-by: Arnaud <arnaud@status.im>
461 lines
14 KiB
461 lines
14 KiB
import std/sequtils
import std/tables
import pkg/chronos
import pkg/questionable
import pkg/questionable/results
import pkg/upraises
import ../errors
import ../clock
import ../logutils
import ../rng
import ../utils
import ../contracts/requests
import ../utils/asyncheapqueue
import ../utils/trackedfutures
topics = "marketplace slotqueue"
OnProcessSlot* =
proc(item: SlotQueueItem, done: Future[void]): Future[void] {.gcsafe, upraises: [].}
# Non-ref obj copies value when assigned, preventing accidental modification
# of values which could cause an incorrect order (eg
# ``slotQueue[1].collateral = 1`` would cause ``collateral`` to be updated,
# but the heap invariant would no longer be honoured. When non-ref, the
# compiler can ensure that statement will fail).
SlotQueueWorker = object
doneProcessing*: Future[void]
SlotQueueItem* = object
requestId: RequestId
slotIndex: uint16
slotSize: uint64
duration: uint64
pricePerBytePerSecond: UInt256
collateral: UInt256 # Collateral computed
expiry: uint64
seen: bool
# don't need to -1 to prevent overflow when adding 1 (to always allow push)
# because AsyncHeapQueue size is of type `int`, which is larger than `uint16`
SlotQueueSize = range[1'u16 .. uint16.high]
SlotQueue* = ref object
maxWorkers: int
onProcessSlot: ?OnProcessSlot
queue: AsyncHeapQueue[SlotQueueItem]
running: bool
workers: AsyncQueue[SlotQueueWorker]
trackedFutures: TrackedFutures
unpaused: AsyncEvent
SlotQueueError = object of CodexError
SlotQueueItemExistsError* = object of SlotQueueError
SlotQueueItemNotExistsError* = object of SlotQueueError
SlotsOutOfRangeError* = object of SlotQueueError
QueueNotRunningError* = object of SlotQueueError
# Number of concurrent workers used for processing SlotQueueItems
const DefaultMaxWorkers = 3
# Cap slot queue size to prevent unbounded growth and make sifting more
# efficient. Max size is not equivalent to the number of slots a host can
# service, which is limited by host availabilities and new requests circulating
# the network. Additionally, each new request/slot in the network will be
# included in the queue if it is higher priority than any of the exisiting
# items. Older slots should be unfillable over time as other hosts fill the
# slots.
const DefaultMaxSize = 128'u16
proc profitability(item: SlotQueueItem): UInt256 =
duration: item.duration,
pricePerBytePerSecond: item.pricePerBytePerSecond,
slotSize: item.slotSize,
proc `<`*(a, b: SlotQueueItem): bool =
# for A to have a higher priority than B (in a min queue), A must be less than
# B.
var scoreA: uint8 = 0
var scoreB: uint8 = 0
proc addIf(score: var uint8, condition: bool, addition: int) =
if condition:
score += 1'u8 shl addition
scoreA.addIf(a.seen < b.seen, 4)
scoreB.addIf(a.seen > b.seen, 4)
scoreA.addIf(a.profitability > b.profitability, 3)
scoreB.addIf(a.profitability < b.profitability, 3)
scoreA.addIf(a.collateral < b.collateral, 2)
scoreB.addIf(a.collateral > b.collateral, 2)
scoreA.addIf(a.expiry > b.expiry, 1)
scoreB.addIf(a.expiry < b.expiry, 1)
return scoreA > scoreB
proc `==`*(a, b: SlotQueueItem): bool =
a.requestId == b.requestId and a.slotIndex == b.slotIndex
proc new*(
_: type SlotQueue,
maxWorkers = DefaultMaxWorkers,
maxSize: SlotQueueSize = DefaultMaxSize,
): SlotQueue =
if maxWorkers <= 0:
raise newException(ValueError, "maxWorkers must be positive")
if maxWorkers.uint16 > maxSize:
raise newException(ValueError, "maxWorkers must be less than maxSize")
maxWorkers: maxWorkers,
# Add 1 to always allow for an extra item to be pushed onto the queue
# temporarily. After push (and sort), the bottom-most item will be deleted
queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1),
running: false,
trackedFutures: TrackedFutures.new(),
unpaused: newAsyncEvent(),
# avoid instantiating `workers` in constructor to avoid side effects in
# `newAsyncQueue` procedure
proc init(_: type SlotQueueWorker): SlotQueueWorker =
SlotQueueWorker(doneProcessing: newFuture[void]("slotqueue.worker.processing"))
proc init*(
_: type SlotQueueItem,
requestId: RequestId,
slotIndex: uint16,
ask: StorageAsk,
expiry: uint64,
collateral: UInt256,
seen = false,
): SlotQueueItem =
requestId: requestId,
slotIndex: slotIndex,
slotSize: ask.slotSize,
duration: ask.duration,
pricePerBytePerSecond: ask.pricePerBytePerSecond,
collateral: collateral,
expiry: expiry,
seen: seen,
proc init*(
_: type SlotQueueItem,
request: StorageRequest,
slotIndex: uint16,
collateral: UInt256,
): SlotQueueItem =
SlotQueueItem.init(request.id, slotIndex, request.ask, request.expiry, collateral)
proc init*(
_: type SlotQueueItem,
requestId: RequestId,
ask: StorageAsk,
expiry: uint64,
collateral: UInt256,
): seq[SlotQueueItem] {.raises: [SlotsOutOfRangeError].} =
if not ask.slots.inRange:
raise newException(SlotsOutOfRangeError, "Too many slots")
var i = 0'u16
proc initSlotQueueItem(): SlotQueueItem =
let item = SlotQueueItem.init(requestId, i, ask, expiry, collateral)
inc i
return item
var items = newSeqWith(ask.slots.int, initSlotQueueItem())
return items
proc init*(
_: type SlotQueueItem, request: StorageRequest, collateral: UInt256
): seq[SlotQueueItem] =
return SlotQueueItem.init(request.id, request.ask, request.expiry, collateral)
proc inRange*(val: SomeUnsignedInt): bool =
val.uint16 in SlotQueueSize.low .. SlotQueueSize.high
proc requestId*(self: SlotQueueItem): RequestId =
proc slotIndex*(self: SlotQueueItem): uint16 =
proc slotSize*(self: SlotQueueItem): uint64 =
proc duration*(self: SlotQueueItem): uint64 =
proc pricePerBytePerSecond*(self: SlotQueueItem): UInt256 =
proc collateralPerByte*(self: SlotQueueItem): UInt256 =
proc seen*(self: SlotQueueItem): bool =
proc running*(self: SlotQueue): bool =
proc len*(self: SlotQueue): int =
proc size*(self: SlotQueue): int =
self.queue.size - 1
proc paused*(self: SlotQueue): bool =
not self.unpaused.isSet
proc `$`*(self: SlotQueue): string =
proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) =
self.onProcessSlot = some onProcessSlot
proc activeWorkers*(self: SlotQueue): int =
if not self.running:
return 0
# active = capacity - available
self.maxWorkers - self.workers.len
proc contains*(self: SlotQueue, item: SlotQueueItem): bool =
proc pause*(self: SlotQueue) =
# set unpaused flag to false -- coroutines will block on unpaused.wait()
proc unpause*(self: SlotQueue) =
# set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait()
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void {.raises: [].} =
requestId = item.requestId
slotIndex = item.slotIndex
seen = item.seen
trace "pushing item to queue"
if not self.running:
let err = newException(QueueNotRunningError, "queue not running")
return failure(err)
if self.contains(item):
let err = newException(SlotQueueItemExistsError, "item already exists")
return failure(err)
if err =? self.queue.pushNoWait(item).mapFailure.errorOption:
return failure(err)
if self.queue.full():
# delete the last item
self.queue.del(self.queue.size - 1)
doAssert self.queue.len <= self.queue.size - 1
# when slots are pushed to the queue, the queue should be unpaused if it was
# paused
if self.paused and not item.seen:
trace "unpausing queue after new slot pushed"
return success()
proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void =
for item in items:
if err =? self.push(item).errorOption:
return failure(err)
return success()
proc findByRequest(self: SlotQueue, requestId: RequestId): seq[SlotQueueItem] =
var items: seq[SlotQueueItem] = @[]
for item in self.queue.items:
if item.requestId == requestId:
items.add item
return items
proc delete*(self: SlotQueue, item: SlotQueueItem) =
requestId = item.requestId
slotIndex = item.slotIndex
trace "removing item from queue"
if not self.running:
trace "cannot delete item from queue, queue not running"
proc delete*(self: SlotQueue, requestId: RequestId, slotIndex: uint16) =
let item = SlotQueueItem(requestId: requestId, slotIndex: slotIndex)
proc delete*(self: SlotQueue, requestId: RequestId) =
let items = self.findByRequest(requestId)
for item in items:
proc `[]`*(self: SlotQueue, i: Natural): SlotQueueItem =
proc addWorker(self: SlotQueue): ?!void =
if not self.running:
let err = newException(QueueNotRunningError, "queue must be running")
return failure(err)
trace "adding new worker to worker queue"
let worker = SlotQueueWorker.init()
except AsyncQueueFullError:
return failure("failed to add worker, worker queue full")
return success()
proc dispatch(
self: SlotQueue, worker: SlotQueueWorker, item: SlotQueueItem
) {.async: (raises: []).} =
requestId = item.requestId
slotIndex = item.slotIndex
if not self.running:
warn "Could not dispatch worker because queue is not running"
if onProcessSlot =? self.onProcessSlot:
await onProcessSlot(item, worker.doneProcessing)
await worker.doneProcessing
if err =? self.addWorker().errorOption:
raise err # catch below
except QueueNotRunningError as e:
info "could not re-add worker to worker queue, queue not running", error = e.msg
except CancelledError:
# do not bubble exception up as it is called with `asyncSpawn` which would
# convert the exception into a `FutureDefect`
except CatchableError as e:
# we don't have any insight into types of errors that `onProcessSlot` can
# throw because it is caller-defined
warn "Unknown error processing slot in worker", error = e.msg
proc clearSeenFlags*(self: SlotQueue) =
# Enumerate all items in the queue, overwriting each item with `seen = false`.
# To avoid issues with new queue items being pushed to the queue while all
# items are being iterated (eg if a new storage request comes in and pushes
# new slots to the queue), this routine must remain synchronous.
if self.queue.empty:
for item in self.queue.mitems:
item.seen = false # does not maintain the heap invariant
# force heap reshuffling to maintain the heap invariant
doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle"
trace "all 'seen' flags cleared"
proc run(self: SlotQueue) {.async: (raises: []).} =
while self.running:
if self.paused:
trace "Queue is paused, waiting for new slots or availabilities to be modified/added"
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()
let worker =
await self.workers.popFirst() # if workers saturated, wait here for new workers
let item = await self.queue.pop() # if queue empty, wait here for new items
reqId = item.requestId
slotIdx = item.slotIndex
seen = item.seen
if not self.running: # may have changed after waiting for pop
trace "not running, exiting"
# If, upon processing a slot, the slot item already has a `seen` flag set,
# the queue should be paused.
if item.seen:
trace "processing already seen item, pausing queue",
reqId = item.requestId, slotIdx = item.slotIndex
# put item back in queue so that if other items are pushed while paused,
# it will be sorted accordingly. Otherwise, this item would be processed
# immediately (with priority over other items) once unpaused
trace "readding seen item back into the queue"
discard self.push(item) # on error, drop the item and continue
if err =? self.addWorker().errorOption:
error "error adding new worker", error = err.msg
await sleepAsync(1.millis) # poll
trace "processing item"
let fut = self.dispatch(worker, item)
asyncSpawn fut
await sleepAsync(1.millis) # poll
except CancelledError:
trace "slot queue cancelled"
except CatchableError as e: # raised from self.queue.pop() or self.workers.pop()
warn "slot queue error encountered during processing", error = e.msg
proc start*(self: SlotQueue) =
if self.running:
trace "starting slot queue"
self.running = true
# must be called in `start` to avoid sideeffects in `new`
self.workers = newAsyncQueue[SlotQueueWorker](self.maxWorkers)
# Add initial workers to the `AsyncHeapQueue`. Once a worker has completed its
# task, a new worker will be pushed to the queue
for i in 0 ..< self.maxWorkers:
if err =? self.addWorker().errorOption:
error "start: error adding new worker", error = err.msg
let fut = self.run()
asyncSpawn fut
proc stop*(self: SlotQueue) {.async.} =
if not self.running:
trace "stopping slot queue"
self.running = false
await self.trackedFutures.cancelTracked()