mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-01-10 04:55:40 +00:00
570a1f7b67
## Problem When Availabilities are created, the amount of bytes in the Availability are reserved in the repo, so those bytes on disk cannot be written to otherwise. When a request for storage is received by a node, if a previously created Availability is matched, an attempt will be made to fill a slot in the request (more accurately, the request's slots are added to the SlotQueue, and eventually those slots will be processed). During download, bytes that were reserved for the Availability were released (as they were written to disk). To prevent more bytes from being released than were reserved in the Availability, the Availability was marked as used during the download, so that no other requests would match the Availability, and therefore no new downloads (and byte releases) would begin. The unfortunate downside to this, is that the number of Availabilities a node has determines the download concurrency capacity. If, for example, a node creates a single Availability that covers all available disk space the operator is willing to use, that single Availability would mean that only one download could occur at a time, meaning the node could potentially miss out on storage opportunities. ## Solution To alleviate the concurrency issue, each time a slot is processed, a Reservation is created, which takes size (aka reserved bytes) away from the Availability and stores them in the Reservation object. This can be done as many times as needed as long as there are enough bytes remaining in the Availability. Therefore, concurrent downloads are no longer limited by the number of Availabilities. Instead, they would more likely be limited to the SlotQueue's `maxWorkers`. From a database design perspective, an Availability has zero or more Reservations. Reservations are persisted in the RepoStore's metadata, along with Availabilities. The metadata store key path for Reservations is ` meta / sales / reservations / <availabilityId> / <reservationId>`, while Availabilities are stored one level up, eg `meta / sales / reservations / <availabilityId> `, allowing all Reservations for an Availability to be queried (this is not currently needed, but may be useful when work to restore Availability size is implemented, more on this later). ### Lifecycle When a reservation is created, its size is deducted from the Availability, and when a reservation is deleted, any remaining size (bytes not written to disk) is returned to the Availability. If the request finishes, is cancelled (expired), or an error occurs, the Reservation is deleted (and any undownloaded bytes returned to the Availability). In addition, when the Sales module starts, any Reservations that are not actively being used in a filled slot, are deleted. Having a Reservation persisted until after a storage request is completed, will allow for the originally set Availability size to be reclaimed once a request contract has been completed. This is a feature that is yet to be implemented, however the work in this PR is a step in the direction towards enabling this. ### Unknowns Reservation size is determined by the `StorageAsk.slotSize`. If during download, more bytes than `slotSize` are attempted to be downloaded than this, then the Reservation update will fail, and the state machine will move to a `SaleErrored` state, deleting the Reservation. This will likely prevent the slot from being filled. ### Notes Based on #514
493 lines
15 KiB
Nim
493 lines
15 KiB
Nim
import std/sequtils
|
|
import std/sugar
|
|
import pkg/questionable
|
|
import pkg/questionable/results
|
|
import pkg/stint
|
|
import pkg/chronicles
|
|
import pkg/datastore
|
|
import ./market
|
|
import ./clock
|
|
import ./stores
|
|
import ./contracts/requests
|
|
import ./contracts/marketplace
|
|
import ./sales/salescontext
|
|
import ./sales/salesagent
|
|
import ./sales/statemachine
|
|
import ./sales/slotqueue
|
|
import ./sales/states/preparing
|
|
import ./sales/states/unknown
|
|
import ./utils/then
|
|
import ./utils/trackedfutures
|
|
|
|
## Sales holds a list of available storage that it may sell.
|
|
##
|
|
## When storage is requested on the market that matches availability, the Sales
|
|
## object will instruct the Codex node to persist the requested data. Once the
|
|
## data has been persisted, it uploads a proof of storage to the market in an
|
|
## attempt to win a storage contract.
|
|
##
|
|
## Node Sales Market
|
|
## | | |
|
|
## | -- add availability --> | |
|
|
## | | <-- storage request --- |
|
|
## | <----- store data ------ | |
|
|
## | -----------------------> | |
|
|
## | | |
|
|
## | <----- prove data ---- | |
|
|
## | -----------------------> | |
|
|
## | | ---- storage proof ---> |
|
|
|
|
export stint
|
|
export reservations
|
|
|
|
logScope:
|
|
topics = "sales marketplace"
|
|
|
|
type
|
|
Sales* = ref object
|
|
context*: SalesContext
|
|
agents*: seq[SalesAgent]
|
|
running: bool
|
|
subscriptions: seq[market.Subscription]
|
|
trackedFutures: TrackedFutures
|
|
|
|
proc `onStore=`*(sales: Sales, onStore: OnStore) =
|
|
sales.context.onStore = some onStore
|
|
|
|
proc `onClear=`*(sales: Sales, onClear: OnClear) =
|
|
sales.context.onClear = some onClear
|
|
|
|
proc `onSale=`*(sales: Sales, callback: OnSale) =
|
|
sales.context.onSale = some callback
|
|
|
|
proc `onProve=`*(sales: Sales, callback: OnProve) =
|
|
sales.context.onProve = some callback
|
|
|
|
proc onStore*(sales: Sales): ?OnStore = sales.context.onStore
|
|
|
|
proc onClear*(sales: Sales): ?OnClear = sales.context.onClear
|
|
|
|
proc onSale*(sales: Sales): ?OnSale = sales.context.onSale
|
|
|
|
proc onProve*(sales: Sales): ?OnProve = sales.context.onProve
|
|
|
|
func new*(_: type Sales,
|
|
market: Market,
|
|
clock: Clock,
|
|
repo: RepoStore): Sales =
|
|
Sales.new(market, clock, repo, 0)
|
|
|
|
func new*(_: type Sales,
|
|
market: Market,
|
|
clock: Clock,
|
|
repo: RepoStore,
|
|
simulateProofFailures: int): Sales =
|
|
|
|
let reservations = Reservations.new(repo)
|
|
Sales(
|
|
context: SalesContext(
|
|
market: market,
|
|
clock: clock,
|
|
reservations: reservations,
|
|
slotQueue: SlotQueue.new(),
|
|
simulateProofFailures: simulateProofFailures
|
|
),
|
|
trackedFutures: TrackedFutures.new(),
|
|
subscriptions: @[]
|
|
)
|
|
|
|
proc remove(sales: Sales, agent: SalesAgent) {.async.} =
|
|
await agent.stop()
|
|
if sales.running:
|
|
sales.agents.keepItIf(it != agent)
|
|
|
|
proc cleanUp(sales: Sales,
|
|
agent: SalesAgent,
|
|
processing: Future[void]) {.async.} =
|
|
|
|
let data = agent.data
|
|
|
|
trace "cleaning up sales agent",
|
|
requestId = data.requestId,
|
|
slotIndex = data.slotIndex,
|
|
reservationId = data.reservation.?id |? ReservationId.default,
|
|
availabilityId = data.reservation.?availabilityId |? AvailabilityId.default
|
|
|
|
# TODO: return bytes that were used in the request back to the availability
|
|
# as well, which will require removing the bytes from disk (perhaps via
|
|
# setting blockTTL to -1 and then running block maintainer?)
|
|
|
|
# delete reservation and return reservation bytes back to the availability
|
|
if reservation =? data.reservation and
|
|
deleteErr =? (await sales.context.reservations.deleteReservation(
|
|
reservation.id,
|
|
reservation.availabilityId
|
|
)).errorOption:
|
|
error "failure deleting reservation",
|
|
error = deleteErr.msg,
|
|
reservationId = reservation.id,
|
|
availabilityId = reservation.availabilityId
|
|
|
|
await sales.remove(agent)
|
|
|
|
# signal back to the slot queue to cycle a worker
|
|
if not processing.isNil and not processing.finished():
|
|
processing.complete()
|
|
|
|
proc filled(
|
|
sales: Sales,
|
|
request: StorageRequest,
|
|
slotIndex: UInt256,
|
|
processing: Future[void]) =
|
|
|
|
if onSale =? sales.context.onSale:
|
|
onSale(request, slotIndex)
|
|
|
|
# signal back to the slot queue to cycle a worker
|
|
if not processing.isNil and not processing.finished():
|
|
processing.complete()
|
|
|
|
proc processSlot(sales: Sales, item: SlotQueueItem, done: Future[void]) =
|
|
debug "processing slot from queue", requestId = $item.requestId,
|
|
slot = item.slotIndex
|
|
|
|
let agent = newSalesAgent(
|
|
sales.context,
|
|
item.requestId,
|
|
item.slotIndex.u256,
|
|
none StorageRequest
|
|
)
|
|
|
|
agent.onCleanUp = proc {.async.} =
|
|
await sales.cleanUp(agent, done)
|
|
|
|
agent.onFilled = some proc(request: StorageRequest, slotIndex: UInt256) =
|
|
sales.filled(request, slotIndex, done)
|
|
|
|
agent.start(SalePreparing())
|
|
sales.agents.add agent
|
|
|
|
proc deleteInactiveReservations(sales: Sales, activeSlots: seq[Slot]) {.async.} =
|
|
let reservations = sales.context.reservations
|
|
without reservs =? await reservations.all(Reservation):
|
|
info "no unused reservations found for deletion"
|
|
|
|
let unused = reservs.filter(r => (
|
|
let slotId = slotId(r.requestId, r.slotIndex)
|
|
not activeSlots.any(slot => slot.id == slotId)
|
|
))
|
|
info "found unused reservations for deletion", unused = unused.len
|
|
|
|
for reservation in unused:
|
|
|
|
logScope:
|
|
reservationId = reservation.id
|
|
availabilityId = reservation.availabilityId
|
|
|
|
if err =? (await reservations.deleteReservation(
|
|
reservation.id, reservation.availabilityId
|
|
)).errorOption:
|
|
error "failed to delete unused reservation", error = err.msg
|
|
else:
|
|
trace "deleted unused reservation"
|
|
|
|
proc mySlots*(sales: Sales): Future[seq[Slot]] {.async.} =
|
|
let market = sales.context.market
|
|
let slotIds = await market.mySlots()
|
|
var slots: seq[Slot] = @[]
|
|
|
|
info "Loading active slots", slotsCount = len(slots)
|
|
for slotId in slotIds:
|
|
if slot =? (await market.getActiveSlot(slotId)):
|
|
slots.add slot
|
|
|
|
return slots
|
|
|
|
proc load*(sales: Sales) {.async.} =
|
|
let activeSlots = await sales.mySlots()
|
|
|
|
await sales.deleteInactiveReservations(activeSlots)
|
|
|
|
for slot in activeSlots:
|
|
let agent = newSalesAgent(
|
|
sales.context,
|
|
slot.request.id,
|
|
slot.slotIndex,
|
|
some slot.request)
|
|
|
|
agent.onCleanUp = proc {.async.} =
|
|
let done = newFuture[void]("onCleanUp_Dummy")
|
|
await sales.cleanUp(agent, done)
|
|
await done # completed in sales.cleanUp
|
|
|
|
agent.start(SaleUnknown())
|
|
sales.agents.add agent
|
|
|
|
proc onAvailabilityAdded(sales: Sales, availability: Availability) {.async.} =
|
|
## Query last 256 blocks for new requests, adding them to the queue. `push`
|
|
## checks for availability before adding to the queue. If processed, the
|
|
## sales agent will check if the slot is free.
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
logScope:
|
|
topics = "marketplace sales onAvailabilityAdded callback"
|
|
|
|
trace "availability added, querying past storage requests to add to queue"
|
|
|
|
try:
|
|
let events = await market.queryPastStorageRequests(256)
|
|
|
|
if events.len == 0:
|
|
trace "no storage request events found in recent past"
|
|
return
|
|
|
|
let requests = events.map(event =>
|
|
SlotQueueItem.init(event.requestId, event.ask, event.expiry)
|
|
)
|
|
|
|
trace "found past storage requested events to add to queue",
|
|
events = events.len
|
|
|
|
for slots in requests:
|
|
for slot in slots:
|
|
if err =? queue.push(slot).errorOption:
|
|
# continue on error
|
|
if err of QueueNotRunningError:
|
|
warn "cannot push items to queue, queue is not running"
|
|
elif err of NoMatchingAvailabilityError:
|
|
info "slot in queue had no matching availabilities, ignoring"
|
|
elif err of SlotsOutOfRangeError:
|
|
warn "Too many slots, cannot add to queue"
|
|
elif err of SlotQueueItemExistsError:
|
|
trace "item already exists, ignoring"
|
|
discard
|
|
else: raise err
|
|
|
|
except CatchableError as e:
|
|
warn "Error adding request to SlotQueue", error = e.msg
|
|
discard
|
|
|
|
proc onStorageRequested(sales: Sales,
|
|
requestId: RequestId,
|
|
ask: StorageAsk,
|
|
expiry: UInt256) =
|
|
|
|
logScope:
|
|
topics = " marketplace sales onStorageRequested"
|
|
requestId
|
|
slots = ask.slots
|
|
expiry
|
|
|
|
let slotQueue = sales.context.slotQueue
|
|
|
|
trace "storage requested, adding slots to queue"
|
|
|
|
without items =? SlotQueueItem.init(requestId, ask, expiry).catch, err:
|
|
if err of SlotsOutOfRangeError:
|
|
warn "Too many slots, cannot add to queue"
|
|
else:
|
|
warn "Failed to create slot queue items from request", error = err.msg
|
|
return
|
|
|
|
for item in items:
|
|
# continue on failure
|
|
if err =? slotQueue.push(item).errorOption:
|
|
if err of NoMatchingAvailabilityError:
|
|
info "slot in queue had no matching availabilities, ignoring"
|
|
elif err of SlotQueueItemExistsError:
|
|
error "Failed to push item to queue becaue it already exists"
|
|
elif err of QueueNotRunningError:
|
|
warn "Failed to push item to queue becaue queue is not running"
|
|
else:
|
|
warn "Error adding request to SlotQueue", error = err.msg
|
|
|
|
proc onSlotFreed(sales: Sales,
|
|
requestId: RequestId,
|
|
slotIndex: UInt256) =
|
|
|
|
logScope:
|
|
topics = "marketplace sales onSlotFreed"
|
|
requestId
|
|
slotIndex
|
|
|
|
trace "slot freed, adding to queue"
|
|
|
|
proc addSlotToQueue() {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
# first attempt to populate request using existing slot metadata in queue
|
|
without var found =? queue.populateItem(requestId,
|
|
slotIndex.truncate(uint16)):
|
|
trace "no existing request metadata, getting request info from contract"
|
|
# if there's no existing slot for that request, retrieve the request
|
|
# from the contract.
|
|
without request =? await market.getRequest(requestId):
|
|
error "unknown request in contract"
|
|
return
|
|
|
|
found = SlotQueueItem.init(request, slotIndex.truncate(uint16))
|
|
|
|
if err =? queue.push(found).errorOption:
|
|
raise err
|
|
|
|
addSlotToQueue()
|
|
.track(sales)
|
|
.catch(proc(err: ref CatchableError) =
|
|
if err of NoMatchingAvailabilityError:
|
|
info "slot in queue had no matching availabilities, ignoring"
|
|
elif err of SlotQueueItemExistsError:
|
|
error "Failed to push item to queue becaue it already exists"
|
|
elif err of QueueNotRunningError:
|
|
warn "Failed to push item to queue becaue queue is not running"
|
|
else:
|
|
warn "Error adding request to SlotQueue", error = err.msg
|
|
)
|
|
|
|
proc subscribeRequested(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
|
|
proc onStorageRequested(requestId: RequestId,
|
|
ask: StorageAsk,
|
|
expiry: UInt256) =
|
|
sales.onStorageRequested(requestId, ask, expiry)
|
|
|
|
try:
|
|
let sub = await market.subscribeRequests(onStorageRequested)
|
|
sales.subscriptions.add(sub)
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to storage request events", msg = e.msg
|
|
|
|
proc subscribeCancellation(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onCancelled(requestId: RequestId) =
|
|
trace "request cancelled, removing all request slots from queue"
|
|
queue.delete(requestId)
|
|
|
|
try:
|
|
let sub = await market.subscribeRequestCancelled(onCancelled)
|
|
sales.subscriptions.add(sub)
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to cancellation events", msg = e.msg
|
|
|
|
proc subscribeFulfilled*(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onFulfilled(requestId: RequestId) =
|
|
trace "request fulfilled, removing all request slots from queue"
|
|
queue.delete(requestId)
|
|
|
|
for agent in sales.agents:
|
|
agent.onFulfilled(requestId)
|
|
|
|
try:
|
|
let sub = await market.subscribeFulfillment(onFulfilled)
|
|
sales.subscriptions.add(sub)
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to storage fulfilled events", msg = e.msg
|
|
|
|
proc subscribeFailure(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onFailed(requestId: RequestId) =
|
|
trace "request failed, removing all request slots from queue"
|
|
queue.delete(requestId)
|
|
|
|
for agent in sales.agents:
|
|
agent.onFailed(requestId)
|
|
|
|
try:
|
|
let sub = await market.subscribeRequestFailed(onFailed)
|
|
sales.subscriptions.add(sub)
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to storage failure events", msg = e.msg
|
|
|
|
proc subscribeSlotFilled(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
let queue = context.slotQueue
|
|
|
|
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
|
|
trace "slot filled, removing from slot queue", requestId, slotIndex
|
|
queue.delete(requestId, slotIndex.truncate(uint16))
|
|
|
|
for agent in sales.agents:
|
|
agent.onSlotFilled(requestId, slotIndex)
|
|
|
|
try:
|
|
let sub = await market.subscribeSlotFilled(onSlotFilled)
|
|
sales.subscriptions.add(sub)
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to slot filled events", msg = e.msg
|
|
|
|
proc subscribeSlotFreed(sales: Sales) {.async.} =
|
|
let context = sales.context
|
|
let market = context.market
|
|
|
|
proc onSlotFreed(requestId: RequestId, slotIndex: UInt256) =
|
|
sales.onSlotFreed(requestId, slotIndex)
|
|
|
|
try:
|
|
let sub = await market.subscribeSlotFreed(onSlotFreed)
|
|
sales.subscriptions.add(sub)
|
|
except CatchableError as e:
|
|
error "Unable to subscribe to slot freed events", msg = e.msg
|
|
|
|
proc startSlotQueue(sales: Sales) {.async.} =
|
|
let slotQueue = sales.context.slotQueue
|
|
let reservations = sales.context.reservations
|
|
|
|
slotQueue.onProcessSlot =
|
|
proc(item: SlotQueueItem, done: Future[void]) {.async.} =
|
|
sales.processSlot(item, done)
|
|
|
|
asyncSpawn slotQueue.start()
|
|
|
|
proc onAvailabilityAdded(availability: Availability) {.async.} =
|
|
await sales.onAvailabilityAdded(availability)
|
|
|
|
reservations.onAvailabilityAdded = onAvailabilityAdded
|
|
|
|
proc subscribe(sales: Sales) {.async.} =
|
|
await sales.subscribeRequested()
|
|
await sales.subscribeFulfilled()
|
|
await sales.subscribeFailure()
|
|
await sales.subscribeSlotFilled()
|
|
await sales.subscribeSlotFreed()
|
|
await sales.subscribeCancellation()
|
|
|
|
proc unsubscribe(sales: Sales) {.async.} =
|
|
for sub in sales.subscriptions:
|
|
try:
|
|
await sub.unsubscribe()
|
|
except CatchableError as e:
|
|
error "Unable to unsubscribe from subscription", error = e.msg
|
|
|
|
proc start*(sales: Sales) {.async.} =
|
|
await sales.load()
|
|
await sales.startSlotQueue()
|
|
await sales.subscribe()
|
|
|
|
proc stop*(sales: Sales) {.async.} =
|
|
trace "stopping sales"
|
|
sales.running = false
|
|
await sales.context.slotQueue.stop()
|
|
await sales.unsubscribe()
|
|
await sales.trackedFutures.cancelTracked()
|
|
|
|
for agent in sales.agents:
|
|
await agent.stop()
|
|
|
|
sales.agents = @[]
|