PR comments

- add slotIndex to `SalesAgent` constructor
- remove `SalesAgent.init`
- rename `SalesAgent.init` to `start` and `SalesAgent.deinit` to `stop`.
- rename `SalesAgent. populateRequest` to `SalesAgent.retreiveRequest`.
- move availability removal to the downloading state. once availability is persisted to disk, it should survive node restarts.
-
This commit is contained in:
Eric Mastro 2022-11-14 21:01:00 +11:00 committed by Mark Spanbroek
parent e8f87e3f52
commit 0957c9adfa
No known key found for this signature in database
GPG Key ID: FBE3E9548D427C00
6 changed files with 35 additions and 31 deletions

View File

@ -52,18 +52,25 @@ proc init*(_: type Availability,
doAssert randomBytes(id) == 32
Availability(id: id, size: size, duration: duration, minPrice: minPrice)
proc randomSlotIndex(numSlots: uint64): UInt256 =
let rng = Rng.instance
let slotIndex = rng.rand(numSlots - 1)
return slotIndex.u256
proc handleRequest(sales: Sales,
requestId: RequestId,
ask: StorageAsk) {.async.} =
let availability = sales.findAvailability(ask)
let slotIndex = randomSlotIndex(ask.slots)
let agent = newSalesAgent(
sales,
requestId,
availability,
some slotIndex,
none StorageRequest
)
await agent.init(ask.slots)
await agent.start(ask.slots)
await agent.switchAsync(SaleDownloading())
sales.agents.add agent
@ -78,13 +85,15 @@ proc load*(sales: Sales) {.async.} =
if slot =? await market.getSlot(slotId):
if request =? await market.getRequest(slot.requestId):
let availability = sales.findAvailability(request.ask)
let slotIndex = randomSlotIndex(request.ask.slots)
let agent = newSalesAgent(
sales,
slot.requestId,
availability,
some slotIndex,
some request)
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
await agent.switchAsync(SaleUnknown())
sales.agents.add agent
@ -108,5 +117,5 @@ proc stop*(sales: Sales) {.async.} =
warn "Unsubscribe failed", msg = e.msg
for agent in sales.agents:
await agent.deinit()
await agent.stop()

View File

@ -9,32 +9,30 @@ import ../rng
proc newSalesAgent*(sales: Sales,
requestId: RequestId,
availability: ?Availability,
slotIndex: ?UInt256,
request: ?StorageRequest): SalesAgent =
SalesAgent(
sales: sales,
requestId: requestId,
availability: availability,
slotIndex: slotIndex,
request: request)
# fwd declarations
proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.}
proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.}
proc populateRequest(agent: SalesAgent) {.async.} =
proc retrieveRequest(agent: SalesAgent) {.async.} =
if agent.request.isNone:
agent.request = await agent.sales.market.getRequest(agent.requestId)
proc init*(agent: SalesAgent, numSlots: uint64) {.async.} =
let rng = Rng.instance
let slotIndex = rng.rand(numSlots - 1)
agent.slotIndex = some slotIndex.u256
proc start*(agent: SalesAgent, numSlots: uint64) {.async.} =
# TODO: try not to block the thread waiting for the network
await agent.populateRequest()
await agent.retrieveRequest()
await agent.subscribeCancellation()
await agent.subscribeFailure()
proc deinit*(agent: SalesAgent) {.async.} =
proc stop*(agent: SalesAgent) {.async.} =
try:
await agent.fulfilled.unsubscribe()
except CatchableError:
@ -73,10 +71,8 @@ proc subscribeFailure*(agent: SalesAgent) {.async.} =
let market = agent.sales.market
proc onFailed(_: RequestId) {.async.} =
without request =? agent.request:
return
without state =? (agent.state as SaleState):
without request =? agent.request and
state =? (agent.state as SaleState):
return
await state.onFailed(request)

View File

@ -35,6 +35,9 @@ method enterAsync(state: SaleDownloading) {.async.} =
without request =? agent.request:
raiseAssert "no sale request"
if availability =? agent.availability:
agent.sales.remove(availability)
await onStore(request, slotIndex, agent.availability)
await state.switchAsync(SaleProving())

View File

@ -27,9 +27,6 @@ method enterAsync(state: SaleFilled) {.async.} =
without slotIndex =? agent.slotIndex:
raiseAssert "no slot selected"
if availability =? agent.availability:
agent.sales.remove(availability)
let host = await market.getHost(agent.requestId, slotIndex)
let me = await market.getSigner()
if host == me.some:

View File

@ -119,7 +119,7 @@ proc switchAsync*(machine: StateMachineAsync, newState: AsyncState) {.async.} =
await state.exitAsync()
state.context = none StateMachine
else:
trace "Switching sales state", `from` = "no state", to = $newState
trace "Switching state", `from` = "no state", to = $newState
machine.state = some State(newState)
newState.context = some StateMachine(machine)

View File

@ -4,7 +4,6 @@ import std/sugar
import std/times
import pkg/asynctest
import pkg/chronos
# import pkg/codex/contracts/requests
import pkg/codex/sales
import pkg/codex/sales/states/[downloading, cancelled, errored, filled, filling,
failed, proving, finished, unknown]
@ -265,8 +264,8 @@ suite "Sales state machine":
proc newSalesAgent(slotIdx: UInt256 = 0.u256): SalesAgent =
let agent = sales.newSalesAgent(request.id,
some availability,
some slotIdx,
some request)
agent.slotIndex = some slotIdx
return agent
proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} =
@ -328,7 +327,7 @@ suite "Sales state machine":
await sleepAsync(chronos.minutes(1)) # "far" in the future
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleDownloading())
@ -345,7 +344,7 @@ suite "Sales state machine":
availability: ?Availability) {.async.} =
await sleepAsync(chronos.minutes(1)) # "far" in the future
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleDownloading())
@ -359,7 +358,7 @@ suite "Sales state machine":
test "moves to SaleErrored when Filling and request expires":
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleFilling())
@ -372,7 +371,7 @@ suite "Sales state machine":
test "moves to SaleErrored when Filling and request fails":
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleFilling())
@ -386,7 +385,7 @@ suite "Sales state machine":
test "moves to SaleErrored when Finished and request expires":
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.Finished
await agent.switchAsync(SaleFinished())
@ -399,7 +398,7 @@ suite "Sales state machine":
test "moves to SaleErrored when Finished and request fails":
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.Finished
await agent.switchAsync(SaleFinished())
@ -417,7 +416,7 @@ suite "Sales state machine":
return @[]
request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleProving())
@ -434,7 +433,7 @@ suite "Sales state machine":
await sleepAsync(chronos.minutes(1)) # "far" in the future
return @[]
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await agent.switchAsync(SaleProving())
@ -465,7 +464,7 @@ suite "Sales state machine":
onSaleCalled = true
let agent = newSalesAgent()
await agent.init(request.ask.slots)
await agent.start(request.ask.slots)
market.requested.add request
market.state[request.id] = RequestState.New
await fillSlot(!agent.slotIndex)
@ -505,7 +504,7 @@ suite "Sales state machine":
requestId: request.id,
availability: none Availability,
request: some request)
# because sales.load() calls agent.init, we won't know the slotIndex
# because sales.load() calls agent.start, we won't know the slotIndex
# randomly selected for the agent, and we also won't know the value of
# `failed`/`fulfilled`/`cancelled` futures, so we need to compare
# the properties we know