227 lines
7.0 KiB
Nim
227 lines
7.0 KiB
Nim
import std/sequtils
|
|
import pkg/questionable
|
|
import pkg/upraises
|
|
import pkg/stint
|
|
import pkg/nimcrypto
|
|
import pkg/chronicles
|
|
import ./rng
|
|
import ./market
|
|
import ./clock
|
|
|
|
## Sales holds a list of available storage that it may sell.
|
|
##
|
|
## When storage is requested on the market that matches availability, the Sales
|
|
## object will instruct the Codex node to persist the requested data. Once the
|
|
## data has been persisted, it uploads a proof of storage to the market in an
|
|
## attempt to win a storage contract.
|
|
##
|
|
## Node Sales Market
|
|
## | | |
|
|
## | -- add availability --> | |
|
|
## | | <-- storage request --- |
|
|
## | <----- store data ------ | |
|
|
## | -----------------------> | |
|
|
## | | |
|
|
## | <----- prove data ---- | |
|
|
## | -----------------------> | |
|
|
## | | ---- storage proof ---> |
|
|
|
|
export stint
|
|
|
|
type
|
|
Sales* = ref object
|
|
market: Market
|
|
clock: Clock
|
|
subscription: ?Subscription
|
|
available*: seq[Availability]
|
|
onStore: ?OnStore
|
|
onProve: ?OnProve
|
|
onClear: ?OnClear
|
|
onSale: ?OnSale
|
|
Availability* = object
|
|
id*: array[32, byte]
|
|
size*: UInt256
|
|
duration*: UInt256
|
|
minPrice*: UInt256
|
|
SalesAgent = ref object
|
|
sales: Sales
|
|
requestId: array[32, byte]
|
|
ask: StorageAsk
|
|
availability: Availability
|
|
request: ?StorageRequest
|
|
slotIndex: ?UInt256
|
|
subscription: ?Subscription
|
|
running: ?Future[void]
|
|
waiting: ?Future[void]
|
|
finished: bool
|
|
OnStore = proc(cid: string, availability: Availability): Future[void] {.gcsafe, upraises: [].}
|
|
OnProve = proc(cid: string): Future[seq[byte]] {.gcsafe, upraises: [].}
|
|
OnClear = proc(availability: Availability, request: StorageRequest) {.gcsafe, upraises: [].}
|
|
OnSale = proc(availability: Availability,
|
|
request: StorageRequest,
|
|
slotIndex: UInt256) {.gcsafe, upraises: [].}
|
|
|
|
func new*(_: type Sales, market: Market, clock: Clock): Sales =
|
|
Sales(
|
|
market: market,
|
|
clock: clock,
|
|
)
|
|
|
|
proc init*(_: type Availability,
|
|
size: UInt256,
|
|
duration: UInt256,
|
|
minPrice: UInt256): Availability =
|
|
var id: array[32, byte]
|
|
doAssert randomBytes(id) == 32
|
|
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
|
|
|
|
proc `onStore=`*(sales: Sales, onStore: OnStore) =
|
|
sales.onStore = some onStore
|
|
|
|
proc `onProve=`*(sales: Sales, onProve: OnProve) =
|
|
sales.onProve = some onProve
|
|
|
|
proc `onClear=`*(sales: Sales, onClear: OnClear) =
|
|
sales.onClear = some onClear
|
|
|
|
proc `onSale=`*(sales: Sales, callback: OnSale) =
|
|
sales.onSale = some callback
|
|
|
|
func add*(sales: Sales, availability: Availability) =
|
|
sales.available.add(availability)
|
|
|
|
func remove*(sales: Sales, availability: Availability) =
|
|
sales.available.keepItIf(it != availability)
|
|
|
|
func findAvailability(sales: Sales, ask: StorageAsk): ?Availability =
|
|
for availability in sales.available:
|
|
if ask.size <= availability.size and
|
|
ask.duration <= availability.duration and
|
|
ask.pricePerSlot >= availability.minPrice:
|
|
return some availability
|
|
|
|
proc finish(agent: SalesAgent, success: bool) =
|
|
if agent.finished:
|
|
return
|
|
|
|
agent.finished = true
|
|
|
|
if subscription =? agent.subscription:
|
|
asyncSpawn subscription.unsubscribe()
|
|
|
|
if running =? agent.running:
|
|
running.cancel()
|
|
|
|
if waiting =? agent.waiting:
|
|
waiting.cancel()
|
|
|
|
if success:
|
|
if onSale =? agent.sales.onSale and
|
|
request =? agent.request and
|
|
slotIndex =? agent.slotIndex:
|
|
onSale(agent.availability, request, slotIndex)
|
|
else:
|
|
if onClear =? agent.sales.onClear and request =? agent.request:
|
|
onClear(agent.availability, request)
|
|
agent.sales.add(agent.availability)
|
|
|
|
proc selectSlot(agent: SalesAgent) =
|
|
let rng = Rng.instance
|
|
let slotIndex = rng.rand(agent.ask.slots - 1)
|
|
agent.slotIndex = some slotIndex.u256
|
|
|
|
proc onSlotFilled(agent: SalesAgent,
|
|
requestId: array[32, byte],
|
|
slotIndex: UInt256) {.async.} =
|
|
try:
|
|
let market = agent.sales.market
|
|
let host = await market.getHost(requestId, slotIndex)
|
|
let me = await market.getSigner()
|
|
agent.finish(success = (host == me.some))
|
|
except CatchableError:
|
|
agent.finish(success = false)
|
|
|
|
proc subscribeSlotFilled(agent: SalesAgent, slotIndex: UInt256) {.async.} =
|
|
proc onSlotFilled(requestId: array[32, byte],
|
|
slotIndex: UInt256) {.gcsafe, upraises:[].} =
|
|
asyncSpawn agent.onSlotFilled(requestId, slotIndex)
|
|
let market = agent.sales.market
|
|
let subscription = await market.subscribeSlotFilled(agent.requestId,
|
|
slotIndex,
|
|
onSlotFilled)
|
|
agent.subscription = some subscription
|
|
|
|
proc waitForExpiry(agent: SalesAgent) {.async.} =
|
|
without request =? agent.request:
|
|
return
|
|
await agent.sales.clock.waitUntil(request.expiry.truncate(int64))
|
|
agent.finish(success = false)
|
|
|
|
proc start(agent: SalesAgent) {.async.} =
|
|
try:
|
|
let sales = agent.sales
|
|
let market = sales.market
|
|
let availability = agent.availability
|
|
|
|
without onStore =? sales.onStore:
|
|
raiseAssert "onStore callback not set"
|
|
|
|
without onProve =? sales.onProve:
|
|
raiseAssert "onProve callback not set"
|
|
|
|
sales.remove(availability)
|
|
|
|
agent.selectSlot()
|
|
without slotIndex =? agent.slotIndex:
|
|
raiseAssert "no slot selected"
|
|
|
|
await agent.subscribeSlotFilled(slotIndex)
|
|
|
|
agent.request = await market.getRequest(agent.requestId)
|
|
without request =? agent.request:
|
|
agent.finish(success = false)
|
|
return
|
|
|
|
agent.waiting = some agent.waitForExpiry()
|
|
|
|
await onStore(request.content.cid, availability)
|
|
let proof = await onProve(request.content.cid)
|
|
await market.fillSlot(request.id, slotIndex, proof)
|
|
except CancelledError:
|
|
raise
|
|
except CatchableError as e:
|
|
error "SalesAgent failed", msg = e.msg
|
|
agent.finish(success = false)
|
|
|
|
proc handleRequest(sales: Sales, requestId: array[32, byte], ask: StorageAsk) =
|
|
without availability =? sales.findAvailability(ask):
|
|
return
|
|
|
|
let agent = SalesAgent(
|
|
sales: sales,
|
|
requestId: requestId,
|
|
ask: ask,
|
|
availability: availability
|
|
)
|
|
|
|
agent.running = some agent.start()
|
|
|
|
proc start*(sales: Sales) {.async.} =
|
|
doAssert sales.subscription.isNone, "Sales already started"
|
|
|
|
proc onRequest(requestId: array[32, byte], ask: StorageAsk) {.gcsafe, upraises:[].} =
|
|
sales.handleRequest(requestId, ask)
|
|
|
|
try:
|
|
sales.subscription = some await sales.market.subscribeRequests(onRequest)
|
|
except CatchableError as e:
|
|
error "Unable to start sales", msg = e.msg
|
|
|
|
proc stop*(sales: Sales) {.async.} =
|
|
if subscription =? sales.subscription:
|
|
sales.subscription = Subscription.none
|
|
try:
|
|
await subscription.unsubscribe()
|
|
except CatchableError as e:
|
|
warn "Unsubscribe failed", msg = e.msg
|