feat[marketplace]: add slot queue pausing (#752)

* add seen flag

* Add MockSlotQueueItem and better prioritisation tests

* Update seen priority, and include in SlotQueueItem.init

* Re-add processed slots to queue

Re-add processed slots to queue if the sale was ignored or errored

* add pausing of queue

- when processing slots in queue, pause queue if item was marked seen
- if availability size is increased, trigger onAvailabilityAdded callback
- in sales, on availability added, clear 'seen' flags, then unpause the queue
- when items pushed to the queue, unpause the queue

* remove unused NoMatchingAvailabilityError from slotqueue

The slot queue should also have nothing to do with availabilities

* when all availabilities are empty, pause the queue

An empty availability is defined as size < DefaultBlockSize as this means even the smallest possible request could not be served. However, this is up for discussion.

* remove availability from onAvailabilitiesEmptied callback

* refactor onAvailabilityAdded and onAvailabilitiesEmptied

onAvailabilityAdded and onAvailabilitiesEmptied are now only called from reservations.update (and eventually reservations.delete once implemented).

- Add empty routine for Availability and Reservation
- Add allEmpty routine for Availability and Reservation, which returns true when all all Availability or Reservation objects in the datastore are empty.

* SlotQueue test support updates

* Sales module test support updates

* Reservations module tests for queue pausing

* Sales module tests for queue pausing

Includes tests for sales states cancelled, errored, ignored to ensure onCleanUp is called with correct parameters

* SlotQueue module tests for queue pausing

* fix existing sales test

* PR feedback

- indent `self.unpause`
- update comment for `clearSeenFlags`

* reprocessSlot in SaleErrored only when coming from downloading

* remove pausing of queue when availabilities are "emptied"

Queue pausing when all availiabilies are "emptied" is not necessary, given that the node would not be able to service slots once all its availabilities' freeSize are too small for the slots in the queue, and would then be paused anyway.

Add test that asserts the queue is paused once the freeSpace of availabilities drops too low to fill slots in the queue.

* Update clearing of seen flags

The asyncheapqueue update overload would need to check index bounds and ultimately a different solution was found using the mitems iterator.

* fix test

request.id was different before updating request.ask.slots, and that id was used to set the state in mockmarket.

* Change filled/cleanup future to nil, so no await is needed

* add wait to allow items to be added to queue

* do not unpause queue when seen items are pushed

* re-add seen item back to queue once paused

Previously, when a seen item was processed, it was first popped off the queue, then the queue was paused waiting to process that item once the queue was unpaused. Now, when a seen item is processed, it is popped off the queue, the queue is paused, then the item is re-added to the queue and the queue will wait until unpaused before it will continue popping items off the queue. If the item was not re-added to the queue, it would have been processed immediately once unpaused, however there may have been other items with higher priority pushed to the queue in the meantime. The queue would not be unpaused if those added items were already seen. In particular, this may happen when ignored items due to lack of availability are re-added to a paused queue. Those ignored items will likely have a higher priority than the item that was just seen (due to it having been processed first), causing the queue to the be paused.

* address PR comments
This commit is contained in:
Eric 2024-05-26 10:38:38 +10:00 committed by GitHub
parent efd46148b0
commit e6a387e8e8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
16 changed files with 614 additions and 129 deletions

View File

@ -78,13 +78,13 @@ proc onProve*(sales: Sales): ?OnProve = sales.context.onProve
proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate
func new*(_: type Sales, proc new*(_: type Sales,
market: Market, market: Market,
clock: Clock, clock: Clock,
repo: RepoStore): Sales = repo: RepoStore): Sales =
Sales.new(market, clock, repo, 0) Sales.new(market, clock, repo, 0)
func new*(_: type Sales, proc new*(_: type Sales,
market: Market, market: Market,
clock: Clock, clock: Clock,
repo: RepoStore, repo: RepoStore,
@ -111,16 +111,20 @@ proc remove(sales: Sales, agent: SalesAgent) {.async.} =
proc cleanUp(sales: Sales, proc cleanUp(sales: Sales,
agent: SalesAgent, agent: SalesAgent,
returnBytes: bool, returnBytes: bool,
reprocessSlot: bool,
processing: Future[void]) {.async.} = processing: Future[void]) {.async.} =
let data = agent.data let data = agent.data
trace "cleaning up sales agent", logScope:
requestId = data.requestId, topics = "sales cleanUp"
slotIndex = data.slotIndex, requestId = data.requestId
reservationId = data.reservation.?id |? ReservationId.default, slotIndex = data.slotIndex
reservationId = data.reservation.?id |? ReservationId.default
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
trace "cleaning up sales agent"
# if reservation for the SalesAgent was not created, then it means # if reservation for the SalesAgent was not created, then it means
# that the cleanUp was called before the sales process really started, so # that the cleanUp was called before the sales process really started, so
# there are not really any bytes to be returned # there are not really any bytes to be returned
@ -132,7 +136,6 @@ proc cleanUp(sales: Sales,
)).errorOption: )).errorOption:
error "failure returning bytes", error "failure returning bytes",
error = returnErr.msg, error = returnErr.msg,
availabilityId = reservation.availabilityId,
bytes = request.ask.slotSize bytes = request.ask.slotSize
# delete reservation and return reservation bytes back to the availability # delete reservation and return reservation bytes back to the availability
@ -141,10 +144,21 @@ proc cleanUp(sales: Sales,
reservation.id, reservation.id,
reservation.availabilityId reservation.availabilityId
)).errorOption: )).errorOption:
error "failure deleting reservation", error "failure deleting reservation", error = deleteErr.msg
error = deleteErr.msg,
reservationId = reservation.id, # Re-add items back into the queue to prevent small availabilities from
availabilityId = reservation.availabilityId # draining the queue. Seen items will be ordered last.
if reprocessSlot and request =? data.request:
let queue = sales.context.slotQueue
var seenItem = SlotQueueItem.init(data.requestId,
data.slotIndex.truncate(uint16),
data.ask,
request.expiry,
seen = true)
trace "pushing ignored item to queue, marked as seen"
if err =? queue.push(seenItem).errorOption:
error "failed to readd slot to queue",
errorType = $(type err), error = err.msg
await sales.remove(agent) await sales.remove(agent)
@ -176,8 +190,8 @@ proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
none StorageRequest none StorageRequest
) )
agent.onCleanUp = proc (returnBytes = false) {.async.} = agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
await sales.cleanUp(agent, returnBytes, done) await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) = agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
sales.filled(request, slotIndex, done) sales.filled(request, slotIndex, done)
@ -222,7 +236,6 @@ proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
return slots return slots
proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} = proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} =
let market = sales.context.market
for agent in sales.agents: for agent in sales.agents:
if slotId(agent.data.requestId, agent.data.slotIndex) == slotId: if slotId(agent.data.requestId, agent.data.slotIndex) == slotId:
return some agent return some agent
@ -241,60 +254,29 @@ proc load*(sales: Sales) {.async.} =
slot.slotIndex, slot.slotIndex,
some slot.request) some slot.request)
agent.onCleanUp = proc(returnBytes = false) {.async.} = agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} =
let done = newFuture[void]("onCleanUp_Dummy") # since workers are not being dispatched, this future has not been created
await sales.cleanUp(agent, returnBytes, done) # by a worker. Create a dummy one here so we can call sales.cleanUp
await done # completed in sales.cleanUp let done: Future[void] = nil
await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
# There is no need to assign agent.onFilled as slots loaded from `mySlots`
# are inherently already filled and so assigning agent.onFilled would be
# superfluous.
agent.start(SaleUnknown()) agent.start(SaleUnknown())
sales.agents.add agent sales.agents.add agent
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} = proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
## Query last 256 blocks for new requests, adding them to the queue. `push` ## When availabilities are modified or added, the queue should be unpaused if
## checks for availability before adding to the queue. If processed, the ## it was paused and any slots in the queue should have their `seen` flag
## sales agent will check if the slot is free. ## cleared.
let context = sales.context let queue = sales.context.slotQueue
let market = context.market
let queue = context.slotQueue
logScope: queue.clearSeenFlags()
topics = "marketplace sales onAvailabilityAdded callback" if queue.paused:
trace "unpausing queue after new availability added"
trace "availability added, querying past storage requests to add to queue" queue.unpause()
try:
let events = await market.queryPastStorageRequests(256)
if events.len == 0:
trace "no storage request events found in recent past"
return
let requests = events.map(event =>
SlotQueueItem.init(event.requestId, event.ask, event.expiry)
)
trace "found past storage requested events to add to queue",
events = events.len
for slots in requests:
for slot in slots:
if err =? queue.push(slot).errorOption:
# continue on error
if err of QueueNotRunningError:
warn "cannot push items to queue, queue is not running"
elif err of NoMatchingAvailabilityError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotsOutOfRangeError:
warn "Too many slots, cannot add to queue"
elif err of SlotQueueItemExistsError:
trace "item already exists, ignoring"
discard
else: raise err
except CancelledError as error:
raise error
except CatchableError as e:
warn "Error adding request to SlotQueue", error = e.msg
discard
proc onStorageRequested(sales: Sales, proc onStorageRequested(sales: Sales,
requestId: RequestId, requestId: RequestId,
@ -321,9 +303,7 @@ proc onStorageRequested(sales: Sales,
for item in items: for item in items:
# continue on failure # continue on failure
if err =? slotQueue.push(item).errorOption: if err =? slotQueue.push(item).errorOption:
if err of NoMatchingAvailabilityError: if err of SlotQueueItemExistsError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists" error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError: elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running" warn "Failed to push item to queue becaue queue is not running"
@ -364,9 +344,7 @@ proc onSlotFreed(sales: Sales,
addSlotToQueue() addSlotToQueue()
.track(sales) .track(sales)
.catch(proc(err: ref CatchableError) = .catch(proc(err: ref CatchableError) =
if err of NoMatchingAvailabilityError: if err of SlotQueueItemExistsError:
info "slot in queue had no matching availabilities, ignoring"
elif err of SlotQueueItemExistsError:
error "Failed to push item to queue becaue it already exists" error "Failed to push item to queue becaue it already exists"
elif err of QueueNotRunningError: elif err of QueueNotRunningError:
warn "Failed to push item to queue becaue queue is not running" warn "Failed to push item to queue becaue queue is not running"
@ -489,6 +467,7 @@ proc startSlotQueue(sales: Sales) {.async.} =
slotQueue.onProcessSlot = slotQueue.onProcessSlot =
proc(item: SlotQueueItem, done: Future[void]) {.async.} = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
sales.processSlot(item, done) sales.processSlot(item, done)
asyncSpawn slotQueue.start() asyncSpawn slotQueue.start()

View File

@ -28,6 +28,8 @@
import pkg/upraises import pkg/upraises
push: {.upraises: [].} push: {.upraises: [].}
import std/sequtils
import std/sugar
import std/typetraits import std/typetraits
import std/sequtils import std/sequtils
import pkg/chronos import pkg/chronos
@ -37,6 +39,7 @@ import pkg/questionable
import pkg/questionable/results import pkg/questionable/results
import pkg/stint import pkg/stint
import pkg/stew/byteutils import pkg/stew/byteutils
import ../codextypes
import ../logutils import ../logutils
import ../clock import ../clock
import ../stores import ../stores
@ -90,6 +93,8 @@ const
SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module SalesKey = (CodexMetaKey / "sales").tryGet # TODO: move to sales module
ReservationsKey = (SalesKey / "reservations").tryGet ReservationsKey = (SalesKey / "reservations").tryGet
proc all*(self: Reservations, T: type SomeStorableObject): Future[?!seq[T]] {.async.}
proc new*(T: type Reservations, proc new*(T: type Reservations,
repo: RepoStore): Reservations = repo: RepoStore): Reservations =
@ -226,10 +231,26 @@ proc update*(
without key =? obj.key, error: without key =? obj.key, error:
return failure(error) return failure(error)
let getResult = await self.get(key, Availability) without oldAvailability =? await self.get(key, Availability), err:
if err of NotExistsError:
if getResult.isOk: let res = await self.updateImpl(obj)
let oldAvailability = !getResult # inform subscribers that Availability has been added
if onAvailabilityAdded =? self.onAvailabilityAdded:
# when chronos v4 is implemented, and OnAvailabilityAdded is annotated
# with async:(raises:[]), we can remove this try/catch as we know, with
# certainty, that nothing will be raised
try:
await onAvailabilityAdded(obj)
except CancelledError as e:
raise e
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
return res
else:
return failure(err)
# Sizing of the availability changed, we need to adjust the repo reservation accordingly # Sizing of the availability changed, we need to adjust the repo reservation accordingly
if oldAvailability.totalSize != obj.totalSize: if oldAvailability.totalSize != obj.totalSize:
@ -240,12 +261,27 @@ proc update*(
elif oldAvailability.totalSize > obj.totalSize: # storage removed elif oldAvailability.totalSize > obj.totalSize: # storage removed
if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption: if reserveErr =? (await self.repo.release((oldAvailability.totalSize - obj.totalSize).truncate(uint))).errorOption:
return failure(reserveErr.toErr(ReleaseFailedError)) return failure(reserveErr.toErr(ReleaseFailedError))
else:
let err = getResult.error()
if not (err of NotExistsError):
return failure(err)
return await self.updateImpl(obj) let res = await self.updateImpl(obj)
if oldAvailability.freeSize < obj.freeSize: # availability added
# inform subscribers that Availability has been modified (with increased
# size)
if onAvailabilityAdded =? self.onAvailabilityAdded:
# when chronos v4 is implemented, and OnAvailabilityAdded is annotated
# with async:(raises:[]), we can remove this try/catch as we know, with
# certainty, that nothing will be raised
try:
await onAvailabilityAdded(obj)
except CancelledError as e:
raise e
except CatchableError as e:
# we don't have any insight into types of exceptions that
# `onAvailabilityAdded` can raise because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = obj.id, error = e.msg
return res
proc delete( proc delete(
self: Reservations, self: Reservations,
@ -300,6 +336,9 @@ proc deleteReservation*(
return success() return success()
# TODO: add support for deleting availabilities
# To delete, must not have any active sales.
proc createAvailability*( proc createAvailability*(
self: Reservations, self: Reservations,
size: UInt256, size: UInt256,
@ -327,17 +366,6 @@ proc createAvailability*(
return failure(updateErr) return failure(updateErr)
if onAvailabilityAdded =? self.onAvailabilityAdded:
try:
await onAvailabilityAdded(availability)
except CancelledError as error:
raise error
except CatchableError as e:
# we don't have any insight into types of errors that `onProcessSlot` can
# throw because it is caller-defined
warn "Unknown error during 'onAvailabilityAdded' callback",
availabilityId = availability.id, error = e.msg
return success(availability) return success(availability)
proc createReservation*( proc createReservation*(

View File

@ -25,7 +25,7 @@ type
onCleanUp*: OnCleanUp onCleanUp*: OnCleanUp
onFilled*: ?OnFilled onFilled*: ?OnFilled
OnCleanUp* = proc (returnBytes = false): Future[void] {.gcsafe, upraises: [].} OnCleanUp* = proc (returnBytes = false, reprocessSlot = false): Future[void] {.gcsafe, upraises: [].}
OnFilled* = proc(request: StorageRequest, OnFilled* = proc(request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].} slotIndex: UInt256) {.gcsafe, upraises: [].}

View File

@ -36,6 +36,7 @@ type
reward: UInt256 reward: UInt256
collateral: UInt256 collateral: UInt256
expiry: UInt256 expiry: UInt256
seen: bool
# don't need to -1 to prevent overflow when adding 1 (to always allow push) # don't need to -1 to prevent overflow when adding 1 (to always allow push)
# because AsyncHeapQueue size is of type `int`, which is larger than `uint16` # because AsyncHeapQueue size is of type `int`, which is larger than `uint16`
@ -48,12 +49,12 @@ type
running: bool running: bool
workers: AsyncQueue[SlotQueueWorker] workers: AsyncQueue[SlotQueueWorker]
trackedFutures: TrackedFutures trackedFutures: TrackedFutures
unpaused: AsyncEvent
SlotQueueError = object of CodexError SlotQueueError = object of CodexError
SlotQueueItemExistsError* = object of SlotQueueError SlotQueueItemExistsError* = object of SlotQueueError
SlotQueueItemNotExistsError* = object of SlotQueueError SlotQueueItemNotExistsError* = object of SlotQueueError
SlotsOutOfRangeError* = object of SlotQueueError SlotsOutOfRangeError* = object of SlotQueueError
NoMatchingAvailabilityError* = object of SlotQueueError
QueueNotRunningError* = object of SlotQueueError QueueNotRunningError* = object of SlotQueueError
# Number of concurrent workers used for processing SlotQueueItems # Number of concurrent workers used for processing SlotQueueItems
@ -84,6 +85,9 @@ proc `<`*(a, b: SlotQueueItem): bool =
if condition: if condition:
score += 1'u8 shl addition score += 1'u8 shl addition
scoreA.addIf(a.seen < b.seen, 4)
scoreB.addIf(a.seen > b.seen, 4)
scoreA.addIf(a.profitability > b.profitability, 3) scoreA.addIf(a.profitability > b.profitability, 3)
scoreB.addIf(a.profitability < b.profitability, 3) scoreB.addIf(a.profitability < b.profitability, 3)
@ -117,12 +121,13 @@ proc new*(_: type SlotQueue,
# temporarily. After push (and sort), the bottom-most item will be deleted # temporarily. After push (and sort), the bottom-most item will be deleted
queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1), queue: newAsyncHeapQueue[SlotQueueItem](maxSize.int + 1),
running: false, running: false,
trackedFutures: TrackedFutures.new() trackedFutures: TrackedFutures.new(),
unpaused: newAsyncEvent()
) )
# avoid instantiating `workers` in constructor to avoid side effects in # avoid instantiating `workers` in constructor to avoid side effects in
# `newAsyncQueue` procedure # `newAsyncQueue` procedure
proc init*(_: type SlotQueueWorker): SlotQueueWorker = proc init(_: type SlotQueueWorker): SlotQueueWorker =
SlotQueueWorker( SlotQueueWorker(
doneProcessing: newFuture[void]("slotqueue.worker.processing") doneProcessing: newFuture[void]("slotqueue.worker.processing")
) )
@ -131,7 +136,8 @@ proc init*(_: type SlotQueueItem,
requestId: RequestId, requestId: RequestId,
slotIndex: uint16, slotIndex: uint16,
ask: StorageAsk, ask: StorageAsk,
expiry: UInt256): SlotQueueItem = expiry: UInt256,
seen = false): SlotQueueItem =
SlotQueueItem( SlotQueueItem(
requestId: requestId, requestId: requestId,
@ -140,7 +146,8 @@ proc init*(_: type SlotQueueItem,
duration: ask.duration, duration: ask.duration,
reward: ask.reward, reward: ask.reward,
collateral: ask.collateral, collateral: ask.collateral,
expiry: expiry expiry: expiry,
seen: seen
) )
proc init*(_: type SlotQueueItem, proc init*(_: type SlotQueueItem,
@ -184,6 +191,7 @@ proc slotSize*(self: SlotQueueItem): UInt256 = self.slotSize
proc duration*(self: SlotQueueItem): UInt256 = self.duration proc duration*(self: SlotQueueItem): UInt256 = self.duration
proc reward*(self: SlotQueueItem): UInt256 = self.reward proc reward*(self: SlotQueueItem): UInt256 = self.reward
proc collateral*(self: SlotQueueItem): UInt256 = self.collateral proc collateral*(self: SlotQueueItem): UInt256 = self.collateral
proc seen*(self: SlotQueueItem): bool = self.seen
proc running*(self: SlotQueue): bool = self.running proc running*(self: SlotQueue): bool = self.running
@ -191,6 +199,8 @@ proc len*(self: SlotQueue): int = self.queue.len
proc size*(self: SlotQueue): int = self.queue.size - 1 proc size*(self: SlotQueue): int = self.queue.size - 1
proc paused*(self: SlotQueue): bool = not self.unpaused.isSet
proc `$`*(self: SlotQueue): string = $self.queue proc `$`*(self: SlotQueue): string = $self.queue
proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) = proc `onProcessSlot=`*(self: SlotQueue, onProcessSlot: OnProcessSlot) =
@ -205,6 +215,14 @@ proc activeWorkers*(self: SlotQueue): int =
proc contains*(self: SlotQueue, item: SlotQueueItem): bool = proc contains*(self: SlotQueue, item: SlotQueueItem): bool =
self.queue.contains(item) self.queue.contains(item)
proc pause*(self: SlotQueue) =
# set unpaused flag to false -- coroutines will block on unpaused.wait()
self.unpaused.clear()
proc unpause*(self: SlotQueue) =
# set unpaused flag to true -- unblocks coroutines waiting on unpaused.wait()
self.unpaused.fire()
proc populateItem*(self: SlotQueue, proc populateItem*(self: SlotQueue,
requestId: RequestId, requestId: RequestId,
slotIndex: uint16): ?SlotQueueItem = slotIndex: uint16): ?SlotQueueItem =
@ -226,8 +244,12 @@ proc populateItem*(self: SlotQueue,
proc push*(self: SlotQueue, item: SlotQueueItem): ?!void = proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
trace "pushing item to queue", logScope:
requestId = item.requestId, slotIndex = item.slotIndex requestId = item.requestId
slotIndex = item.slotIndex
seen = item.seen
trace "pushing item to queue"
if not self.running: if not self.running:
let err = newException(QueueNotRunningError, "queue not running") let err = newException(QueueNotRunningError, "queue not running")
@ -245,6 +267,13 @@ proc push*(self: SlotQueue, item: SlotQueueItem): ?!void =
self.queue.del(self.queue.size - 1) self.queue.del(self.queue.size - 1)
doAssert self.queue.len <= self.queue.size - 1 doAssert self.queue.len <= self.queue.size - 1
# when slots are pushed to the queue, the queue should be unpaused if it was
# paused
if self.paused and not item.seen:
trace "unpausing queue after new slot pushed"
self.unpause()
return success() return success()
proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void = proc push*(self: SlotQueue, items: seq[SlotQueueItem]): ?!void =
@ -295,6 +324,7 @@ proc addWorker(self: SlotQueue): ?!void =
let worker = SlotQueueWorker.init() let worker = SlotQueueWorker.init()
try: try:
discard worker.doneProcessing.track(self)
self.workers.addLastNoWait(worker) self.workers.addLastNoWait(worker)
except AsyncQueueFullError: except AsyncQueueFullError:
return failure("failed to add worker, worker queue full") return failure("failed to add worker, worker queue full")
@ -314,6 +344,7 @@ proc dispatch(self: SlotQueue,
if onProcessSlot =? self.onProcessSlot: if onProcessSlot =? self.onProcessSlot:
try: try:
discard worker.doneProcessing.track(self)
await onProcessSlot(item, worker.doneProcessing) await onProcessSlot(item, worker.doneProcessing)
await worker.doneProcessing await worker.doneProcessing
@ -332,6 +363,23 @@ proc dispatch(self: SlotQueue,
# throw because it is caller-defined # throw because it is caller-defined
warn "Unknown error processing slot in worker", error = e.msg warn "Unknown error processing slot in worker", error = e.msg
proc clearSeenFlags*(self: SlotQueue) =
# Enumerate all items in the queue, overwriting each item with `seen = false`.
# To avoid issues with new queue items being pushed to the queue while all
# items are being iterated (eg if a new storage request comes in and pushes
# new slots to the queue), this routine must remain synchronous.
if self.queue.empty:
return
for item in self.queue.mitems:
item.seen = false # does not maintain the heap invariant
# force heap reshuffling to maintain the heap invariant
doAssert self.queue.update(self.queue[0]), "slot queue failed to reshuffle"
trace "all 'seen' flags cleared"
proc start*(self: SlotQueue) {.async.} = proc start*(self: SlotQueue) {.async.} =
if self.running: if self.running:
return return
@ -351,21 +399,47 @@ proc start*(self: SlotQueue) {.async.} =
while self.running: while self.running:
try: try:
if self.paused:
trace "Queue is paused, waiting for new slots or availabilities to be modified/added"
# block until unpaused is true/fired, ie wait for queue to be unpaused
await self.unpaused.wait()
let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers let worker = await self.workers.popFirst().track(self) # if workers saturated, wait here for new workers
let item = await self.queue.pop().track(self) # if queue empty, wait here for new items let item = await self.queue.pop().track(self) # if queue empty, wait here for new items
logScope:
reqId = item.requestId
slotIdx = item.slotIndex
seen = item.seen
if not self.running: # may have changed after waiting for pop if not self.running: # may have changed after waiting for pop
trace "not running, exiting" trace "not running, exiting"
break break
# If, upon processing a slot, the slot item already has a `seen` flag set,
# the queue should be paused.
if item.seen:
trace "processing already seen item, pausing queue",
reqId = item.requestId, slotIdx = item.slotIndex
self.pause()
# put item back in queue so that if other items are pushed while paused,
# it will be sorted accordingly. Otherwise, this item would be processed
# immediately (with priority over other items) once unpaused
trace "readding seen item back into the queue"
discard self.push(item) # on error, drop the item and continue
worker.doneProcessing.complete()
await sleepAsync(1.millis) # poll
continue
trace "processing item"
self.dispatch(worker, item) self.dispatch(worker, item)
.track(self) .track(self)
.catch(proc (e: ref CatchableError) = .catch(proc (e: ref CatchableError) =
error "Unknown error dispatching worker", error = e.msg error "Unknown error dispatching worker", error = e.msg
) )
discard worker.doneProcessing.track(self)
await sleepAsync(1.millis) # poll await sleepAsync(1.millis) # poll
except CancelledError: except CancelledError:
trace "slot queue cancelled" trace "slot queue cancelled"

View File

@ -28,6 +28,6 @@ method run*(state: SaleCancelled, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex) onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp: if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true) await onCleanUp(returnBytes = true, reprocessSlot = false)
warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex warn "Sale cancelled due to timeout", requestId = data.requestId, slotIndex = data.slotIndex

View File

@ -69,7 +69,7 @@ method run*(state: SaleDownloading, machine: Machine): Future[?State] {.async.}
if err =? (await onStore(request, if err =? (await onStore(request,
data.slotIndex, data.slotIndex,
onBlocks)).errorOption: onBlocks)).errorOption:
return some State(SaleErrored(error: err)) return some State(SaleErrored(error: err, reprocessSlot: false))
trace "Download complete" trace "Download complete"
return some State(SaleInitialProving()) return some State(SaleInitialProving())

View File

@ -12,6 +12,7 @@ logScope:
type SaleErrored* = ref object of SaleState type SaleErrored* = ref object of SaleState
error*: ref CatchableError error*: ref CatchableError
reprocessSlot*: bool
method `$`*(state: SaleErrored): string = "SaleErrored" method `$`*(state: SaleErrored): string = "SaleErrored"
@ -30,5 +31,5 @@ method run*(state: SaleErrored, machine: Machine): Future[?State] {.async.} =
onClear(request, data.slotIndex) onClear(request, data.slotIndex)
if onCleanUp =? agent.onCleanUp: if onCleanUp =? agent.onCleanUp:
await onCleanUp(returnBytes = true) await onCleanUp(returnBytes = true, reprocessSlot = state.reprocessSlot)

View File

@ -17,4 +17,7 @@ method run*(state: SaleIgnored, machine: Machine): Future[?State] {.async.} =
let agent = SalesAgent(machine) let agent = SalesAgent(machine)
if onCleanUp =? agent.onCleanUp: if onCleanUp =? agent.onCleanUp:
await onCleanUp() # Ignored slots mean there was no availability. In order to prevent small
# availabilities from draining the queue, mark this slot as seen and re-add
# back into the queue.
await onCleanUp(reprocessSlot = true)

View File

@ -0,0 +1,26 @@
import pkg/codex/contracts/requests
import pkg/codex/sales/slotqueue
type MockSlotQueueItem* = object
requestId*: RequestId
slotIndex*: uint16
slotSize*: UInt256
duration*: UInt256
reward*: UInt256
collateral*: UInt256
expiry*: UInt256
seen*: bool
proc toSlotQueueItem*(item: MockSlotQueueItem): SlotQueueItem =
SlotQueueItem.init(
requestId = item.requestId,
slotIndex = item.slotIndex,
ask = StorageAsk(
slotSize: item.slotSize,
duration: item.duration,
reward: item.reward,
collateral: item.collateral
),
expiry = item.expiry,
seen = item.seen
)

View File

@ -0,0 +1,45 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/cancelled
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'cancelled'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleCancelled
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleCancelled.new()
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == false

View File

@ -1,8 +1,9 @@
import std/unittest import std/unittest
import pkg/questionable import pkg/questionable
import pkg/codex/contracts/requests import pkg/codex/contracts/requests
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/cancelled import pkg/codex/sales/states/cancelled
import pkg/codex/sales/states/downloading
import pkg/codex/sales/states/errored
import pkg/codex/sales/states/failed import pkg/codex/sales/states/failed
import pkg/codex/sales/states/filled import pkg/codex/sales/states/filled
import ../../examples import ../../examples

View File

@ -0,0 +1,49 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/errored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'errored'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleErrored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleErrored(error: newException(ValueError, "oh no!"))
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
state = SaleErrored(
error: newException(ValueError, "oh no!"),
reprocessSlot: true
)
discard await state.run(agent)
check eventually returnBytesWas == true
check eventually reprocessSlotWas == true

View File

@ -0,0 +1,45 @@
import pkg/questionable
import pkg/chronos
import pkg/codex/contracts/requests
import pkg/codex/sales/states/ignored
import pkg/codex/sales/salesagent
import pkg/codex/sales/salescontext
import pkg/codex/market
import ../../../asynctest
import ../../examples
import ../../helpers
import ../../helpers/mockmarket
import ../../helpers/mockclock
asyncchecksuite "sales state 'ignored'":
let request = StorageRequest.example
let slotIndex = (request.ask.slots div 2).u256
let market = MockMarket.new()
let clock = MockClock.new()
var state: SaleIgnored
var agent: SalesAgent
var returnBytesWas = false
var reprocessSlotWas = false
setup:
let onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
returnBytesWas = returnBytes
reprocessSlotWas = reprocessSlot
let context = SalesContext(
market: market,
clock: clock
)
agent = newSalesAgent(context,
request.id,
slotIndex,
request.some)
agent.onCleanUp = onCleanUp
state = SaleIgnored.new()
test "calls onCleanUp with returnBytes = false and reprocessSlot = true":
discard await state.run(agent)
check eventually returnBytesWas == false
check eventually reprocessSlotWas == true

View File

@ -258,7 +258,7 @@ asyncchecksuite "Reservations module":
check updated.isErr check updated.isErr
check updated.error of NotExistsError check updated.error of NotExistsError
test "onAvailabilityAdded called when availability is reserved": test "onAvailabilityAdded called when availability is created":
var added: Availability var added: Availability
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} = reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a added = a
@ -267,6 +267,26 @@ asyncchecksuite "Reservations module":
check added == availability check added == availability
test "onAvailabilityAdded called when availability size is increased":
var availability = createAvailability()
var added: Availability
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
added = a
availability.freeSize += 1.u256
discard await reservations.update(availability)
check added == availability
test "onAvailabilityAdded is not called when availability size is decreased":
var availability = createAvailability()
var called = false
reservations.onAvailabilityAdded = proc(a: Availability) {.async.} =
called = true
availability.freeSize -= 1.u256
discard await reservations.update(availability)
check not called
test "availabilities can be found": test "availabilities can be found":
let availability = createAvailability() let availability = createAvailability()

View File

@ -272,24 +272,41 @@ asyncchecksuite "Sales":
let expected = SlotQueueItem.init(request, 2.uint16) let expected = SlotQueueItem.init(request, 2.uint16)
check eventually itemsProcessed.contains(expected) check eventually itemsProcessed.contains(expected)
test "adds past requests to queue once availability added": test "items in queue are readded (and marked seen) once ignored":
var itemsProcessed: seq[SlotQueueItem] = @[]
# ignore all
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
done.complete()
await market.requestStorage(request) await market.requestStorage(request)
await sleepAsync(10.millis) let items = SlotQueueItem.init(request)
await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item will be will have been re-pushed with `seen =
# true`. Then, once this item is processed by the queue, its 'seen' flag
# will be checked, at which point the queue will be paused. This test could
# check item existence in the queue, but that would require inspecting
# onProcessSlot to see which item was first, and overridding onProcessSlot
# will prevent the queue working as expected in the Sales module.
check eventually queue.len == 4
# check how many slots were processed by the queue for item in items:
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = check queue.contains(item)
itemsProcessed.add item
done.complete()
# now add matching availability for i in 0..<queue.len:
createAvailability() check queue[i].seen
check eventually itemsProcessed.len == request.ask.slots.int
test "queue is paused once availability is insufficient to service slots in queue":
createAvailability() # enough to fill a single slot
await market.requestStorage(request)
let items = SlotQueueItem.init(request)
await sleepAsync(10.millis) # queue starts paused, allow items to be added to the queue
check eventually queue.paused
# The first processed item/slot will be filled (eventually). Subsequent
# items will be processed and eventually re-pushed with `seen = true`. Once
# a "seen" item is processed by the queue, the queue is paused. In the
# meantime, the other items that are process, marked as seen, and re-added
# to the queue may be processed simultaneously as the queue pausing.
# Therefore, there should eventually be 3 items remaining in the queue, all
# seen.
check eventually queue.len == 3
for i in 0..<queue.len:
check queue[i].seen
test "availability size is reduced by request slot size when fully downloaded": test "availability size is reduced by request slot size when fully downloaded":
sales.onStore = proc(request: StorageRequest, sales.onStore = proc(request: StorageRequest,
@ -495,6 +512,10 @@ asyncchecksuite "Sales":
test "verifies that request is indeed expired from onchain before firing onCancelled": test "verifies that request is indeed expired from onchain before firing onCancelled":
let expiry = getTime().toUnix() + 10 let expiry = getTime().toUnix() + 10
# ensure only one slot, otherwise once bytes are returned to the
# availability, the queue will be unpaused and availability will be consumed
# by other slots
request.ask.slots = 1.uint64
market.requestExpiry[request.id] = expiry market.requestExpiry[request.id] = expiry
let origSize = availability.freeSize let origSize = availability.freeSize

View File

@ -10,6 +10,7 @@ import pkg/codex/sales/slotqueue
import ../../asynctest import ../../asynctest
import ../helpers import ../helpers
import ../helpers/mockmarket import ../helpers/mockmarket
import ../helpers/mockslotqueueitem
import ../examples import ../examples
suite "Slot queue start/stop": suite "Slot queue start/stop":
@ -118,7 +119,6 @@ suite "Slot queue":
queue = SlotQueue.new(maxWorkers, maxSize.uint16) queue = SlotQueue.new(maxWorkers, maxSize.uint16)
queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} = queue.onProcessSlot = proc(item: SlotQueueItem, done: Future[void]) {.async.} =
await sleepAsync(processSlotDelay) await sleepAsync(processSlotDelay)
trace "processing item", requestId = item.requestId, slotIndex = item.slotIndex
onProcessSlotCalled = true onProcessSlotCalled = true
onProcessSlotCalledWith.add (item.requestId, item.slotIndex) onProcessSlotCalledWith.add (item.requestId, item.slotIndex)
done.complete() done.complete()
@ -162,6 +162,131 @@ suite "Slot queue":
check itemB < itemA # B higher priority than A check itemB < itemA # B higher priority than A
check itemA > itemB check itemA > itemB
test "correct prioritizes SlotQueueItems based on 'seen'":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 2.u256, # profitability is higher (good)
collateral: 1.u256,
expiry: 1.u256,
seen: true # seen (bad), more weight than profitability
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256, # profitability is lower (bad)
collateral: 1.u256,
expiry: 1.u256,
seen: false # not seen (good)
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # B higher priority than A
check itemA.toSlotQueueItem > itemB.toSlotQueueItem
test "correct prioritizes SlotQueueItems based on profitability":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256, # reward is lower (bad)
collateral: 1.u256, # collateral is lower (good)
expiry: 1.u256,
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 2.u256, # reward is higher (good), more weight than collateral
collateral: 2.u256, # collateral is higher (bad)
expiry: 1.u256,
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on collateral":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256,
collateral: 2.u256, # collateral is higher (bad)
expiry: 2.u256, # expiry is longer (good)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256,
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256, # collateral is lower (good), more weight than expiry
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on expiry":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256, # slotSize is smaller (good)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 2.u256, # slotSize is larger (bad)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 2.u256, # expiry is longer (good), more weight than slotSize
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "correct prioritizes SlotQueueItems based on slotSize":
let request = StorageRequest.example
let itemA = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 2.u256, # slotSize is larger (bad)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256, # expiry is shorter (bad)
seen: false
)
let itemB = MockSlotQueueItem(
requestId: request.id,
slotIndex: 0,
slotSize: 1.u256, # slotSize is smaller (good)
duration: 1.u256,
reward: 1.u256,
collateral: 1.u256,
expiry: 1.u256,
seen: false
)
check itemB.toSlotQueueItem < itemA.toSlotQueueItem # < indicates higher priority
test "expands available all possible slot indices on init": test "expands available all possible slot indices on init":
let request = StorageRequest.example let request = StorageRequest.example
let items = SlotQueueItem.init(request) let items = SlotQueueItem.init(request)
@ -391,3 +516,71 @@ suite "Slot queue":
(item3.requestId, item3.slotIndex), (item3.requestId, item3.slotIndex),
] ]
) )
test "processing a 'seen' item pauses the queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
queue.push(item)
check eventually queue.paused
check onProcessSlotCalledWith.len == 0
test "pushing items to queue unpauses queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
queue.pause
let request = StorageRequest.example
var items = SlotQueueItem.init(request)
queue.push(items)
# check all items processed
check eventually queue.len == 0
test "pushing seen item does not unpause queue":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
check queue.paused
queue.push(item0)
check queue.paused
test "paused queue waits for unpause before continuing processing":
newSlotQueue(maxSize = 4, maxWorkers = 4)
let request = StorageRequest.example
let item = SlotQueueItem.init(request.id, 1'u16,
request.ask,
request.expiry,
seen = false)
check queue.paused
# push causes unpause
queue.push(item)
# check all items processed
check eventually onProcessSlotCalledWith == @[
(item.requestId, item.slotIndex),
]
check eventually queue.len == 0
test "item 'seen' flags can be cleared":
newSlotQueue(maxSize = 4, maxWorkers = 1)
let request = StorageRequest.example
let item0 = SlotQueueItem.init(request.id, 0'u16,
request.ask,
request.expiry,
seen = true)
let item1 = SlotQueueItem.init(request.id, 1'u16,
request.ask,
request.expiry,
seen = true)
queue.push(item0)
queue.push(item1)
check queue[0].seen
check queue[1].seen
queue.clearSeenFlags()
check queue[0].seen == false
check queue[1].seen == false