From 0957c9adfaa294d6498e4c326ceeefeb7ca5bbfc Mon Sep 17 00:00:00 2001 From: Eric Mastro Date: Mon, 14 Nov 2022 21:01:00 +1100 Subject: [PATCH] PR comments - add slotIndex to `SalesAgent` constructor - remove `SalesAgent.init` - rename `SalesAgent.init` to `start` and `SalesAgent.deinit` to `stop`. - rename `SalesAgent. populateRequest` to `SalesAgent.retreiveRequest`. - move availability removal to the downloading state. once availability is persisted to disk, it should survive node restarts. - --- codex/sales.nim | 15 ++++++++++++--- codex/sales/salesagent.nim | 20 ++++++++------------ codex/sales/states/downloading.nim | 3 +++ codex/sales/states/filled.nim | 3 --- codex/utils/statemachine.nim | 2 +- tests/codex/testsales.nim | 23 +++++++++++------------ 6 files changed, 35 insertions(+), 31 deletions(-) diff --git a/codex/sales.nim b/codex/sales.nim index 3e399357..4c00c1d4 100644 --- a/codex/sales.nim +++ b/codex/sales.nim @@ -52,18 +52,25 @@ proc init*(_: type Availability, doAssert randomBytes(id) == 32 Availability(id: id, size: size, duration: duration, minPrice: minPrice) +proc randomSlotIndex(numSlots: uint64): UInt256 = + let rng = Rng.instance + let slotIndex = rng.rand(numSlots - 1) + return slotIndex.u256 + proc handleRequest(sales: Sales, requestId: RequestId, ask: StorageAsk) {.async.} = let availability = sales.findAvailability(ask) + let slotIndex = randomSlotIndex(ask.slots) let agent = newSalesAgent( sales, requestId, availability, + some slotIndex, none StorageRequest ) - await agent.init(ask.slots) + await agent.start(ask.slots) await agent.switchAsync(SaleDownloading()) sales.agents.add agent @@ -78,13 +85,15 @@ proc load*(sales: Sales) {.async.} = if slot =? await market.getSlot(slotId): if request =? await market.getRequest(slot.requestId): let availability = sales.findAvailability(request.ask) + let slotIndex = randomSlotIndex(request.ask.slots) let agent = newSalesAgent( sales, slot.requestId, availability, + some slotIndex, some request) - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) await agent.switchAsync(SaleUnknown()) sales.agents.add agent @@ -108,5 +117,5 @@ proc stop*(sales: Sales) {.async.} = warn "Unsubscribe failed", msg = e.msg for agent in sales.agents: - await agent.deinit() + await agent.stop() diff --git a/codex/sales/salesagent.nim b/codex/sales/salesagent.nim index face5493..058e8259 100644 --- a/codex/sales/salesagent.nim +++ b/codex/sales/salesagent.nim @@ -9,32 +9,30 @@ import ../rng proc newSalesAgent*(sales: Sales, requestId: RequestId, availability: ?Availability, + slotIndex: ?UInt256, request: ?StorageRequest): SalesAgent = SalesAgent( sales: sales, requestId: requestId, availability: availability, + slotIndex: slotIndex, request: request) # fwd declarations proc subscribeCancellation*(agent: SalesAgent): Future[void] {.gcsafe.} proc subscribeFailure*(agent: SalesAgent): Future[void] {.gcsafe.} -proc populateRequest(agent: SalesAgent) {.async.} = +proc retrieveRequest(agent: SalesAgent) {.async.} = if agent.request.isNone: agent.request = await agent.sales.market.getRequest(agent.requestId) -proc init*(agent: SalesAgent, numSlots: uint64) {.async.} = - let rng = Rng.instance - let slotIndex = rng.rand(numSlots - 1) - agent.slotIndex = some slotIndex.u256 - +proc start*(agent: SalesAgent, numSlots: uint64) {.async.} = # TODO: try not to block the thread waiting for the network - await agent.populateRequest() + await agent.retrieveRequest() await agent.subscribeCancellation() await agent.subscribeFailure() -proc deinit*(agent: SalesAgent) {.async.} = +proc stop*(agent: SalesAgent) {.async.} = try: await agent.fulfilled.unsubscribe() except CatchableError: @@ -73,10 +71,8 @@ proc subscribeFailure*(agent: SalesAgent) {.async.} = let market = agent.sales.market proc onFailed(_: RequestId) {.async.} = - without request =? agent.request: - return - - without state =? (agent.state as SaleState): + without request =? agent.request and + state =? (agent.state as SaleState): return await state.onFailed(request) diff --git a/codex/sales/states/downloading.nim b/codex/sales/states/downloading.nim index e1da6be7..b0278e76 100644 --- a/codex/sales/states/downloading.nim +++ b/codex/sales/states/downloading.nim @@ -35,6 +35,9 @@ method enterAsync(state: SaleDownloading) {.async.} = without request =? agent.request: raiseAssert "no sale request" + if availability =? agent.availability: + agent.sales.remove(availability) + await onStore(request, slotIndex, agent.availability) await state.switchAsync(SaleProving()) diff --git a/codex/sales/states/filled.nim b/codex/sales/states/filled.nim index f1e95339..8e2eb07c 100644 --- a/codex/sales/states/filled.nim +++ b/codex/sales/states/filled.nim @@ -27,9 +27,6 @@ method enterAsync(state: SaleFilled) {.async.} = without slotIndex =? agent.slotIndex: raiseAssert "no slot selected" - if availability =? agent.availability: - agent.sales.remove(availability) - let host = await market.getHost(agent.requestId, slotIndex) let me = await market.getSigner() if host == me.some: diff --git a/codex/utils/statemachine.nim b/codex/utils/statemachine.nim index bd7b7638..272dcb00 100644 --- a/codex/utils/statemachine.nim +++ b/codex/utils/statemachine.nim @@ -119,7 +119,7 @@ proc switchAsync*(machine: StateMachineAsync, newState: AsyncState) {.async.} = await state.exitAsync() state.context = none StateMachine else: - trace "Switching sales state", `from` = "no state", to = $newState + trace "Switching state", `from` = "no state", to = $newState machine.state = some State(newState) newState.context = some StateMachine(machine) diff --git a/tests/codex/testsales.nim b/tests/codex/testsales.nim index 5186c5a2..98a5f768 100644 --- a/tests/codex/testsales.nim +++ b/tests/codex/testsales.nim @@ -4,7 +4,6 @@ import std/sugar import std/times import pkg/asynctest import pkg/chronos -# import pkg/codex/contracts/requests import pkg/codex/sales import pkg/codex/sales/states/[downloading, cancelled, errored, filled, filling, failed, proving, finished, unknown] @@ -265,8 +264,8 @@ suite "Sales state machine": proc newSalesAgent(slotIdx: UInt256 = 0.u256): SalesAgent = let agent = sales.newSalesAgent(request.id, some availability, + some slotIdx, some request) - agent.slotIndex = some slotIdx return agent proc fillSlot(slotIdx: UInt256 = 0.u256) {.async.} = @@ -328,7 +327,7 @@ suite "Sales state machine": await sleepAsync(chronos.minutes(1)) # "far" in the future request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.New await agent.switchAsync(SaleDownloading()) @@ -345,7 +344,7 @@ suite "Sales state machine": availability: ?Availability) {.async.} = await sleepAsync(chronos.minutes(1)) # "far" in the future let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.New await agent.switchAsync(SaleDownloading()) @@ -359,7 +358,7 @@ suite "Sales state machine": test "moves to SaleErrored when Filling and request expires": request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.New await agent.switchAsync(SaleFilling()) @@ -372,7 +371,7 @@ suite "Sales state machine": test "moves to SaleErrored when Filling and request fails": let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.New await agent.switchAsync(SaleFilling()) @@ -386,7 +385,7 @@ suite "Sales state machine": test "moves to SaleErrored when Finished and request expires": request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.Finished await agent.switchAsync(SaleFinished()) @@ -399,7 +398,7 @@ suite "Sales state machine": test "moves to SaleErrored when Finished and request fails": let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.Finished await agent.switchAsync(SaleFinished()) @@ -417,7 +416,7 @@ suite "Sales state machine": return @[] request.expiry = (getTime() + initDuration(seconds=2)).toUnix.u256 let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.New await agent.switchAsync(SaleProving()) @@ -434,7 +433,7 @@ suite "Sales state machine": await sleepAsync(chronos.minutes(1)) # "far" in the future return @[] let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.New await agent.switchAsync(SaleProving()) @@ -465,7 +464,7 @@ suite "Sales state machine": onSaleCalled = true let agent = newSalesAgent() - await agent.init(request.ask.slots) + await agent.start(request.ask.slots) market.requested.add request market.state[request.id] = RequestState.New await fillSlot(!agent.slotIndex) @@ -505,7 +504,7 @@ suite "Sales state machine": requestId: request.id, availability: none Availability, request: some request) - # because sales.load() calls agent.init, we won't know the slotIndex + # because sales.load() calls agent.start, we won't know the slotIndex # randomly selected for the agent, and we also won't know the value of # `failed`/`fulfilled`/`cancelled` futures, so we need to compare # the properties we know