mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-01-20 17:58:53 +00:00
e6a387e8e8
* add seen flag * Add MockSlotQueueItem and better prioritisation tests * Update seen priority, and include in SlotQueueItem.init * Re-add processed slots to queue Re-add processed slots to queue if the sale was ignored or errored * add pausing of queue - when processing slots in queue, pause queue if item was marked seen - if availability size is increased, trigger onAvailabilityAdded callback - in sales, on availability added, clear 'seen' flags, then unpause the queue - when items pushed to the queue, unpause the queue * remove unused NoMatchingAvailabilityError from slotqueue The slot queue should also have nothing to do with availabilities * when all availabilities are empty, pause the queue An empty availability is defined as size < DefaultBlockSize as this means even the smallest possible request could not be served. However, this is up for discussion. * remove availability from onAvailabilitiesEmptied callback * refactor onAvailabilityAdded and onAvailabilitiesEmptied onAvailabilityAdded and onAvailabilitiesEmptied are now only called from reservations.update (and eventually reservations.delete once implemented). - Add empty routine for Availability and Reservation - Add allEmpty routine for Availability and Reservation, which returns true when all all Availability or Reservation objects in the datastore are empty. * SlotQueue test support updates * Sales module test support updates * Reservations module tests for queue pausing * Sales module tests for queue pausing Includes tests for sales states cancelled, errored, ignored to ensure onCleanUp is called with correct parameters * SlotQueue module tests for queue pausing * fix existing sales test * PR feedback - indent `self.unpause` - update comment for `clearSeenFlags` * reprocessSlot in SaleErrored only when coming from downloading * remove pausing of queue when availabilities are "emptied" Queue pausing when all availiabilies are "emptied" is not necessary, given that the node would not be able to service slots once all its availabilities' freeSize are too small for the slots in the queue, and would then be paused anyway. Add test that asserts the queue is paused once the freeSpace of availabilities drops too low to fill slots in the queue. * Update clearing of seen flags The asyncheapqueue update overload would need to check index bounds and ultimately a different solution was found using the mitems iterator. * fix test request.id was different before updating request.ask.slots, and that id was used to set the state in mockmarket. * Change filled/cleanup future to nil, so no await is needed * add wait to allow items to be added to queue * do not unpause queue when seen items are pushed * re-add seen item back to queue once paused Previously, when a seen item was processed, it was first popped off the queue, then the queue was paused waiting to process that item once the queue was unpaused. Now, when a seen item is processed, it is popped off the queue, the queue is paused, then the item is re-added to the queue and the queue will wait until unpaused before it will continue popping items off the queue. If the item was not re-added to the queue, it would have been processed immediately once unpaused, however there may have been other items with higher priority pushed to the queue in the meantime. The queue would not be unpaused if those added items were already seen. In particular, this may happen when ignored items due to lack of availability are re-added to a paused queue. Those ignored items will likely have a higher priority than the item that was just seen (due to it having been processed first), causing the queue to the be paused. * address PR comments
514 lines
16 KiB
Nim
514 lines
16 KiB
Nim
import std/sequtils
|
|
import std/sugar
|
|
import pkg/questionable
|
|
import pkg/questionable/results
|
|
import pkg/stint
|
|
import pkg/datastore
|
|
import ./market
|
|
import ./clock
|
|
import ./stores
|
|
import ./contracts/requests
|
|
import ./contracts/marketplace
|
|
import ./logutils
|
|
import ./sales/salescontext
|
|
import ./sales/salesagent
|
|
import ./sales/statemachine
|
|
import ./sales/slotqueue
|
|
import ./sales/states/preparing
|
|
import ./sales/states/unknown
|
|
import ./utils/then
|
|
import ./utils/trackedfutures
|
|
|
|
## Sales holds a list of available storage that it may sell.
|
|
##
|
|
## When storage is requested on the market that matches availability, the Sales
|
|
## object will instruct the Codex node to persist the requested data. Once the
|
|
## data has been persisted, it uploads a proof of storage to the market in an
|
|
## attempt to win a storage contract.
|
|
##
|
|
## Node Sales Market
|
|
## | | |
|
|
## | -- add availability --> | |
|
|
## | | <-- storage request --- |
|
|
## | <----- store data ------ | |
|
|
## | -----------------------> | |
|
|
## | | |
|
|
## | <----- prove data ---- | |
|
|
## | -----------------------> | |
|
|
## | | ---- storage proof ---> |
|
|
|
|
export stint
|
|
export reservations
|
|
export salesagent
|
|
export salescontext
|
|
|
|
logScope:
|
|
topics = "sales marketplace"
|
|
|
|
type
|
|
Sales* = ref object
|
|
context*: SalesContext
|
|
agents*: seq[SalesAgent]
|
|
running: bool
|
|
subscriptions: seq[market.Subscription]
|
|
trackedFutures: TrackedFutures
|
|
|
|
proc `onStore=`*(sales: Sales, onStore: OnStore) =
|
|
sales.context.onStore = some onStore
|
|
|
|
proc `onClear=`*(sales: Sales, onClear: OnClear) =
|
|
sales.context.onClear = some onClear
|
|
|
|
proc `onSale=`*(sales: Sales, callback: OnSale) =
|
|
sales.context.onSale = some callback
|
|
|
|
proc `onProve=`*(sales: Sales, callback: OnProve) =
|
|
sales.context.onProve = some callback
|
|
|
|
proc `onExpiryUpdate=`*(sales: Sales, callback: OnExpiryUpdate) =
|
|
sales.context.onExpiryUpdate = some callback
|
|
|
|
proc onStore*(sales: Sales): ?OnStore = sales.context.onStore
|
|
|
|
proc onClear*(sales: Sales): ?OnClear = sales.context.onClear
|
|
|
|
proc onSale*(sales: Sales): ?OnSale = sales.context.onSale
|
|
|
|
proc onProve*(sales: Sales): ?OnProve = sales.context.onProve
|
|
|
|
proc onExpiryUpdate*(sales: Sales): ?OnExpiryUpdate = sales.context.onExpiryUpdate
|
|
|
|
proc new*(_: type Sales,
|
|
market: Market,
|
|
clock: Clock,
|
|
repo: RepoStore): Sales =
|
|
Sales.new(market, clock, repo, 0)
|
|
|
|
proc new*(_: type Sales,
|
|
market: Market,
|
|
clock: Clock,
|
|
repo: RepoStore,
|
|
simulateProofFailures: int): Sales =
|
|
|
|
let reservations = Reservations.new(repo)
|
|
Sales(
|
|
context: SalesContext(
|
|
market: market,
|
|
clock: clock,
|
|
reservations: reservations,
|
|
slotQueue: SlotQueue.new(),
|
|
simulateProofFailures: simulateProofFailures
|
|
),
|
|
trackedFutures: TrackedFutures.new(),
|
|
subscriptions: @[]
|
|
)
|
|
|
|
proc remove(sales: Sales, agent: SalesAgent) {.async.} =
|
|
await agent.stop()
|
|
if sales.running:
|
|
sales.agents.keepItIf(it != agent)
|
|
|
|
proc cleanUp(sales: Sales,
|
|
agent: SalesAgent,
|
|
returnBytes: bool,
|
|
reprocessSlot: bool,
|
|
processing: Future[void]) {.async.} =
|
|
|
|
let data = agent.data
|
|
|
|
logScope:
|
|
topics = "sales cleanUp"
|
|
requestId = data.requestId
|
|
slotIndex = data.slotIndex
|
|
reservationId = data.reservation.?id |? ReservationId.default
|
|
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
|
|
|
|
trace "cleaning up sales agent"
|
|
|
|
# if reservation for the SalesAgent was not created, then it means
|
|
# that the cleanUp was called before the sales process really started, so
|
|
# there are not really any bytes to be returned
|
|
if returnBytes and request =? data.request and reservation =? data.reservation:
|
|
if returnErr =? (await sales.context.reservations.returnBytesToAvailability(
|
|
reservation.availabilityId,
|
|
reservation.id,
|
|
request.ask.slotSize
|
|
)).errorOption:
|
|
error "failure returning bytes",
|
|
error = returnErr.msg,
|
|
bytes = request.ask.slotSize
|
|
|
|
# delete reservation and return reservation bytes back to the availability
|
|
if reservation =? data.reservation and
|
|
deleteErr =? (await sales.context.reservations.deleteReservation(
|
|
reservation.id,
|
|
reservation.availabilityId
|
|
)).errorOption:
|
|
error "failure deleting reservation", error = deleteErr.msg
|
|
|
|
# Re-add items back into the queue to prevent small availabilities from
|
|
# draining the queue. Seen items will be ordered last.
|
|
if reprocessSlot and request =? data.request:
|
|
let queue = sales.context.slotQueue
|
|
var seenItem = SlotQueueItem.init(data.requestId,
|
|
data.slotIndex.truncate(uint16),
|
|
data.ask,
|
|
request.expiry,
|
|
seen = true)
|
|
trace "pushing ignored item to queue, marked as seen"
|
|
if err =? queue.push(seenItem).errorOption:
|
|
error "failed to readd slot to queue",
|
|
errorType = $(type err), error = err.msg
|
|
|
|
await sales.remove(agent)
|
|
|
|
# signal back to the slot queue to cycle a worker
|
|
if not processing.isNil and not processing.finished():
|
|
processing.complete()
|
|
|
|
proc filled(
|
|
sales: Sales,
|
|
request: StorageRequest,
|
|
slotIndex: UInt256,
|
|
processing: Future[void]) =
|
|
|
|
if onSale =? sales.context.onSale:
|
|
onSale(request, slotIndex)
|
|
|
|
# signal back to the slot queue to cycle a worker
|
|
if not processing.isNil and not processing.finished():
|
|
processing.complete()
|
|
|
|
proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
|
|
debug "processing slot from queue", requestId = item.requestId,
|
|
slot = item.slotIndex
|
|
|
|
let agent = newSalesAgent(
|
|
sales.context,
|
|
item.requestId,
|
|
item.slotIndex.u256,
|
|
none StorageRequest
|
|
)
|
|
|
|
agent.onCleanUp = proc (returnBytes = false, reprocessSlot = false) {.async.} =
|
|
await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
|
|
|
|
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
|
|
sales.filled(request, slotIndex, done)
|
|
|
|
agent.start(SalePreparing())
|
|
sales.agents.add agent
|
|
|
|
proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} =
|
|
let reservations = sales.context.reservations
|
|
without reservs =? await reservations.all(Reservation):
|
|
info "no unused reservations found for deletion"
|
|
|
|
let unused = reservs.filter(r => (
|
|
let slotId = slotId(r.requestId, r.slotIndex)
|
|
not activeSlots.any(slot => slot.id == slotId)
|
|
))
|
|
info "found unused reservations for deletion", unused = unused.len
|
|
|
|
for reservation in unused:
|
|
|
|
logScope:
|
|
reservationId = reservation.id
|
|
availabilityId = reservation.availabilityId
|
|
|
|
if err =? (await reservations.deleteReservation(
|
|
reservation.id, reservation.availabilityId
|
|
)).errorOption:
|
|
error "failed to delete unused reservation", error = err.msg
|
|
else:
|
|
trace "deleted unused reservation"
|
|
|
|
proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
|
|
let market = sales.context.market
|
|
let slotIds = await market.mySlots()
|
|
var slots: seq[Slot] = @[]
|
|
|
|
info "Loading active slots", slotsCount = len(slots)
|
|
for slotId in slotIds:
|
|
if slot =? (await market.getActiveSlot(slotId)):
|
|
slots.add slot
|
|
|
|
return slots
|
|
|
|
proc activeSale*(sales: Sales, slotId: SlotId): Future[?SalesAgent] {.async.} =
|
|
for agent in sales.agents:
|
|
if slotId(agent.data.requestId, agent.data.slotIndex) == slotId:
|
|
return some agent
|
|
|
|
return none SalesAgent
|
|
|
|
proc load*(sales: Sales) {.async.} =
|
|
let activeSlots = await sales.mySlots()
|
|
|
|
await sales.deleteInactiveReservations(activeSlots)
|
|
|
|
for slot in activeSlots:
|
|
let agent = newSalesAgent(
|
|
sales.context,
|
|
slot.request.id,
|
|
slot.slotIndex,
|
|
some slot.request)
|
|
|
|
agent.onCleanUp = proc(returnBytes = false, reprocessSlot = false) {.async.} =
|
|
# since workers are not being dispatched, this future has not been created
|
|
# by a worker. Create a dummy one here so we can call sales.cleanUp
|
|
let done: Future[void] = nil
|
|
await sales.cleanUp(agent, returnBytes, reprocessSlot, done)
|
|
|
|
# There is no need to assign agent.onFilled as slots loaded from `mySlots`
|
|
# are inherently already filled and so assigning agent.onFilled would be
|
|
# superfluous.
|
|
|
|
agent.start(SaleUnknown())
|
|
sales.agents.add agent
|
|
|
|
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
|
|
## When availabilities are modified or added, the queue should be unpaused if
|
|
## it was paused and any slots in the queue should have their `seen` flag
|
|
## cleared.
|
|
let queue = sales.context.slotQueue
|
|
|
|
queue.clearSeenFlags()
|
|
if queue.paused:
|
|
trace "unpausing queue after new availability added"
|
|
queue.unpause()
|
|
|
|
proc onStorageRequested(sales: Sales,
|
|
requestId: RequestId,
|
|
ask: StorageAsk,
|
|
expiry: UInt256) =
|
|
|
|
logScope:
|
|
topics = "marketplace sales onStorageRequested"
|
|
requestId
|
|
slots = ask.slots
|
|
expiry
|
|
|
|
let slotQueue = sales.context.slotQueue
|
|
|
|
trace "storage requested, adding slots to queue"
|
|
|
|
without items =? SlotQueueItem.init(requestId, ask, expiry).catch, err:
|
|
if err of SlotsOutOfRangeError:
|
|
warn "Too many slots, cannot add to queue"
|
|
else:
|
|
warn "Failed to create slot queue items from request", error = err.msg
|
|
return
|
|
|
|
for item in items:
|
|
# continue on failure
|
|
if err =? slotQueue.push(item).errorOption:
|
|
if err of SlotQueueItemExistsError:
|
|
error "Failed to push item to queue becaue it already exists"
|
|
elif err of QueueNotRunningError:
|
|
warn "Failed to push item to queue becaue queue is not running"
|
|
else:
|
|
warn "Error adding request to SlotQueue", error = err.msg
|
|
|
|
proc onSlotFreed(sales: Sales,
|
|
requestId: RequestId,
|
|
slotIndex: UInt256) =
|
|
|
|
logScope:
|
|
topics = "marketplace sales onSlotFreed"
|
|
requestId
|
|
slotIndex
|
|
|
|
trace "slot freed, adding to queue"
|
|
|
|
proc addSlotToQueue() {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
# first attempt to populate request using existing slot metadata in queue
|
|
without var found =? queue.populateItem(requestId,
|
|
slotIndex.truncate(uint16)):
|
|
trace "no existing request metadata, getting request info from contract"
|
|
# if there's no existing slot for that request, retrieve the request
|
|
# from the contract.
|
|
without request =? await market.getRequest(requestId):
|
|
error "unknown request in contract"
|
|
return
|
|
|
|
found = SlotQueueItem.init(request, slotIndex.truncate(uint16))
|
|
|
|
if err =? queue.push(found).errorOption:
|
|
raise err
|
|
|
|
addSlotToQueue()
|
|
.track(sales)
|
|
.catch(proc(err: ref CatchableError) =
|
|
if err of SlotQueueItemExistsError:
|
|
error "Failed to push item to queue becaue it already exists"
|
|
elif err of QueueNotRunningError:
|
|
warn "Failed to push item to queue becaue queue is not running"
|
|
else:
|
|
warn "Error adding request to SlotQueue", error = err.msg
|
|
)
|
|
|
|
proc subscribeRequested(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
|
|
proc onStorageRequested(requestId: RequestId,
|
|
ask: StorageAsk,
|
|
expiry: UInt256) =
|
|
sales.onStorageRequested(requestId, ask, expiry)
|
|
|
|
try:
|
|
let sub = await market.subscribeRequests(onStorageRequested)
|
|
sales.subscriptions.add(sub)
|
|
except CancelledError as error:
|
|
raise error
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to storage request events", msg = e.msg
|
|
|
|
proc subscribeCancellation(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onCancelled(requestId: RequestId) =
|
|
trace "request cancelled (via contract RequestCancelled event), removing all request slots from queue"
|
|
queue.delete(requestId)
|
|
|
|
try:
|
|
let sub = await market.subscribeRequestCancelled(onCancelled)
|
|
sales.subscriptions.add(sub)
|
|
except CancelledError as error:
|
|
raise error
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to cancellation events", msg = e.msg
|
|
|
|
proc subscribeFulfilled*(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onFulfilled(requestId: RequestId) =
|
|
trace "request fulfilled, removing all request slots from queue"
|
|
queue.delete(requestId)
|
|
|
|
for agent in sales.agents:
|
|
agent.onFulfilled(requestId)
|
|
|
|
try:
|
|
let sub = await market.subscribeFulfillment(onFulfilled)
|
|
sales.subscriptions.add(sub)
|
|
except CancelledError as error:
|
|
raise error
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to storage fulfilled events", msg = e.msg
|
|
|
|
proc subscribeFailure(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onFailed(requestId: RequestId) =
|
|
trace "request failed, removing all request slots from queue"
|
|
queue.delete(requestId)
|
|
|
|
for agent in sales.agents:
|
|
agent.onFailed(requestId)
|
|
|
|
try:
|
|
let sub = await market.subscribeRequestFailed(onFailed)
|
|
sales.subscriptions.add(sub)
|
|
except CancelledError as error:
|
|
raise error
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to storage failure events", msg = e.msg
|
|
|
|
proc subscribeSlotFilled(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
|
|
trace "slot filled, removing from slot queue", requestId, slotIndex
|
|
queue.delete(requestId, slotIndex.truncate(uint16))
|
|
|
|
for agent in sales.agents:
|
|
agent.onSlotFilled(requestId, slotIndex)
|
|
|
|
try:
|
|
let sub = await market.subscribeSlotFilled(onSlotFilled)
|
|
sales.subscriptions.add(sub)
|
|
except CancelledError as error:
|
|
raise error
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to slot filled events", msg = e.msg
|
|
|
|
proc subscribeSlotFreed(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
|
|
proc onSlotFreed(requestId: RequestId, slotIndex: UInt256) =
|
|
sales.onSlotFreed(requestId, slotIndex)
|
|
|
|
try:
|
|
let sub = await market.subscribeSlotFreed(onSlotFreed)
|
|
sales.subscriptions.add(sub)
|
|
except CancelledError as error:
|
|
raise error
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to slot freed events", msg = e.msg
|
|
|
|
proc startSlotQueue(sales: Sales) {.async.} =
|
|
let slotQueue = sales.context.slotQueue
|
|
let reservations = sales.context.reservations
|
|
|
|
slotQueue.onProcessSlot =
|
|
proc(item: SlotQueueItem, done: Future[void]) {.async.} =
|
|
trace "processing slot queue item", reqId = item.requestId, slotIdx = item.slotIndex
|
|
sales.processSlot(item, done)
|
|
|
|
asyncSpawn slotQueue.start()
|
|
|
|
proc onAvailabilityAdded(availability: Availability) {.async.} =
|
|
await sales.onAvailabilityAdded(availability)
|
|
|
|
reservations.onAvailabilityAdded = onAvailabilityAdded
|
|
|
|
proc subscribe(sales: Sales) {.async.} =
|
|
await sales.subscribeRequested()
|
|
await sales.subscribeFulfilled()
|
|
await sales.subscribeFailure()
|
|
await sales.subscribeSlotFilled()
|
|
await sales.subscribeSlotFreed()
|
|
await sales.subscribeCancellation()
|
|
|
|
proc unsubscribe(sales: Sales) {.async.} =
|
|
for sub in sales.subscriptions:
|
|
try:
|
|
await sub.unsubscribe()
|
|
except CancelledError as error:
|
|
raise error
|
|
except CatchableError as e:
|
|
error "Unable to unsubscribe from subscription", error = e.msg
|
|
|
|
proc start*(sales: Sales) {.async.} =
|
|
await sales.load()
|
|
await sales.startSlotQueue()
|
|
await sales.subscribe()
|
|
sales.running = true
|
|
|
|
proc stop*(sales: Sales) {.async.} =
|
|
trace "stopping sales"
|
|
sales.running = false
|
|
await sales.context.slotQueue.stop()
|
|
await sales.unsubscribe()
|
|
await sales.trackedFutures.cancelTracked()
|
|
|
|
for agent in sales.agents:
|
|
await agent.stop()
|
|
|
|
sales.agents = @[]
|