nim-codex/codex/sales.nim

243 lines
7.4 KiB
Nim
Raw Normal View History

2022-03-30 12:51:28 +02:00
import std/sequtils
import pkg/questionable
import pkg/upraises
import pkg/stint
import pkg/nimcrypto
import pkg/chronicles
2022-08-01 14:12:05 +02:00
import ./rng
2022-03-30 12:51:28 +02:00
import ./market
2022-05-17 17:02:03 +02:00
import ./clock
import ./proving
import ./contracts/requests
2022-03-30 12:51:28 +02:00
2022-07-07 16:36:48 +02:00
## Sales holds a list of available storage that it may sell.
##
## When storage is requested on the market that matches availability, the Sales
## object will instruct the Codex node to persist the requested data. Once the
## data has been persisted, it uploads a proof of storage to the market in an
## attempt to win a storage contract.
##
## Node Sales Market
## | | |
## | -- add availability --> | |
## | | <-- storage request --- |
## | <----- store data ------ | |
## | -----------------------> | |
## | | |
## | <----- prove data ---- | |
## | -----------------------> | |
## | | ---- storage proof ---> |
2022-03-30 12:51:28 +02:00
export stint
type
Sales* = ref object
market: Market
2022-05-17 17:02:03 +02:00
clock: Clock
subscription: ?market.Subscription
available*: seq[Availability]
2022-07-07 16:18:45 +02:00
onStore: ?OnStore
onProve: ?OnProve
onClear: ?OnClear
2022-03-31 11:41:45 +02:00
onSale: ?OnSale
proving: Proving
2022-03-30 12:51:28 +02:00
Availability* = object
id*: array[32, byte]
size*: UInt256
duration*: UInt256
2022-03-30 12:51:28 +02:00
minPrice*: UInt256
SalesAgent = ref object
2022-03-31 14:35:53 +02:00
sales: Sales
requestId: RequestId
ask: StorageAsk
2022-03-31 14:35:53 +02:00
availability: Availability
2022-07-05 10:51:01 +02:00
request: ?StorageRequest
2022-08-01 14:12:05 +02:00
slotIndex: ?UInt256
subscription: ?market.Subscription
running: ?Future[void]
2022-03-31 14:35:53 +02:00
waiting: ?Future[void]
finished: bool
OnStore = proc(request: StorageRequest,
slot: UInt256,
availability: Availability): Future[void] {.gcsafe, upraises: [].}
OnProve = proc(request: StorageRequest,
slot: UInt256): Future[seq[byte]] {.gcsafe, upraises: [].}
OnClear = proc(availability: Availability,
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
OnSale = proc(availability: Availability,
request: StorageRequest,
slotIndex: UInt256) {.gcsafe, upraises: [].}
2022-03-30 12:51:28 +02:00
func new*(_: type Sales,
market: Market,
clock: Clock,
proving: Proving): Sales =
2022-05-17 17:02:03 +02:00
Sales(
market: market,
clock: clock,
proving: proving
2022-05-17 17:02:03 +02:00
)
2022-03-30 12:51:28 +02:00
proc init*(_: type Availability,
size: UInt256,
duration: UInt256,
2022-03-30 12:51:28 +02:00
minPrice: UInt256): Availability =
var id: array[32, byte]
doAssert randomBytes(id) == 32
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
2022-07-07 16:18:45 +02:00
proc `onStore=`*(sales: Sales, onStore: OnStore) =
sales.onStore = some onStore
2022-07-05 09:39:59 +02:00
2022-07-07 16:18:45 +02:00
proc `onProve=`*(sales: Sales, onProve: OnProve) =
sales.onProve = some onProve
2022-07-05 10:24:33 +02:00
proc `onClear=`*(sales: Sales, onClear: OnClear) =
sales.onClear = some onClear
2022-03-31 11:41:45 +02:00
proc `onSale=`*(sales: Sales, callback: OnSale) =
sales.onSale = some callback
2022-03-30 12:51:28 +02:00
func add*(sales: Sales, availability: Availability) =
sales.available.add(availability)
func remove*(sales: Sales, availability: Availability) =
sales.available.keepItIf(it != availability)
func findAvailability(sales: Sales, ask: StorageAsk): ?Availability =
2022-03-30 12:51:28 +02:00
for availability in sales.available:
2022-08-02 14:21:12 +02:00
if ask.slotSize <= availability.size and
ask.duration <= availability.duration and
2022-08-01 14:25:32 +02:00
ask.pricePerSlot >= availability.minPrice:
2022-03-30 12:51:28 +02:00
return some availability
proc finish(agent: SalesAgent, success: bool) =
if agent.finished:
return
agent.finished = true
if subscription =? agent.subscription:
2022-03-31 14:35:53 +02:00
asyncSpawn subscription.unsubscribe()
if running =? agent.running:
running.cancel()
if waiting =? agent.waiting:
waiting.cancel()
if success:
if request =? agent.request and
2022-08-01 14:12:05 +02:00
slotIndex =? agent.slotIndex:
agent.sales.proving.add(request.slotId(slotIndex))
if onSale =? agent.sales.onSale:
onSale(agent.availability, request, slotIndex)
2022-03-31 14:35:53 +02:00
else:
if onClear =? agent.sales.onClear and
request =? agent.request and
slotIndex =? agent.slotIndex:
onClear(agent.availability, request, slotIndex)
agent.sales.add(agent.availability)
2022-03-31 14:35:53 +02:00
2022-08-01 14:12:05 +02:00
proc selectSlot(agent: SalesAgent) =
let rng = Rng.instance
let slotIndex = rng.rand(agent.ask.slots - 1)
agent.slotIndex = some slotIndex.u256
proc onSlotFilled(agent: SalesAgent,
requestId: RequestId,
slotIndex: UInt256) {.async.} =
try:
let market = agent.sales.market
let host = await market.getHost(requestId, slotIndex)
let me = await market.getSigner()
agent.finish(success = (host == me.some))
except CatchableError:
agent.finish(success = false)
2022-08-01 14:12:05 +02:00
proc subscribeSlotFilled(agent: SalesAgent, slotIndex: UInt256) {.async.} =
proc onSlotFilled(requestId: RequestId,
slotIndex: UInt256) {.gcsafe, upraises:[].} =
asyncSpawn agent.onSlotFilled(requestId, slotIndex)
let market = agent.sales.market
let subscription = await market.subscribeSlotFilled(agent.requestId,
2022-08-01 14:12:05 +02:00
slotIndex,
onSlotFilled)
agent.subscription = some subscription
proc waitForExpiry(agent: SalesAgent) {.async.} =
without request =? agent.request:
2022-03-31 14:35:53 +02:00
return
await agent.sales.clock.waitUntil(request.expiry.truncate(int64))
agent.finish(success = false)
2022-03-31 14:35:53 +02:00
proc start(agent: SalesAgent) {.async.} =
try:
let sales = agent.sales
let market = sales.market
let availability = agent.availability
2022-07-05 09:39:59 +02:00
2022-07-07 16:18:45 +02:00
without onStore =? sales.onStore:
raiseAssert "onStore callback not set"
2022-07-05 09:39:59 +02:00
2022-07-07 16:18:45 +02:00
without onProve =? sales.onProve:
raiseAssert "onProve callback not set"
2022-07-05 10:24:33 +02:00
sales.remove(availability)
2022-07-05 09:39:59 +02:00
2022-08-01 14:12:05 +02:00
agent.selectSlot()
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
await agent.subscribeSlotFilled(slotIndex)
agent.request = await market.getRequest(agent.requestId)
without request =? agent.request:
agent.finish(success = false)
2022-07-05 09:39:59 +02:00
return
agent.waiting = some agent.waitForExpiry()
2022-07-05 15:04:25 +02:00
await onStore(request, slotIndex, availability)
let proof = await onProve(request, slotIndex)
2022-08-01 14:12:05 +02:00
await market.fillSlot(request.id, slotIndex, proof)
except CancelledError:
raise
except CatchableError as e:
error "SalesAgent failed", msg = e.msg
agent.finish(success = false)
proc handleRequest(sales: Sales, requestId: RequestId, ask: StorageAsk) =
without availability =? sales.findAvailability(ask):
2022-03-31 14:35:53 +02:00
return
let agent = SalesAgent(
2022-03-31 14:35:53 +02:00
sales: sales,
requestId: requestId,
ask: ask,
2022-03-31 14:35:53 +02:00
availability: availability
)
agent.running = some agent.start()
2022-03-30 12:51:28 +02:00
proc start*(sales: Sales) {.async.} =
2022-03-30 12:51:28 +02:00
doAssert sales.subscription.isNone, "Sales already started"
proc onRequest(requestId: RequestId, ask: StorageAsk) {.gcsafe, upraises:[].} =
sales.handleRequest(requestId, ask)
2022-03-30 12:51:28 +02:00
try:
sales.subscription = some await sales.market.subscribeRequests(onRequest)
except CatchableError as e:
error "Unable to start sales", msg = e.msg
2022-03-30 12:51:28 +02:00
proc stop*(sales: Sales) {.async.} =
2022-03-30 12:51:28 +02:00
if subscription =? sales.subscription:
sales.subscription = market.Subscription.none
try:
await subscription.unsubscribe()
except CatchableError as e:
warn "Unsubscribe failed", msg = e.msg