From 855b9738110a1292a8f612a6e3fb70f7bd3e086c Mon Sep 17 00:00:00 2001 From: Vaclav Pavlin Date: Thu, 12 Dec 2024 11:45:47 +0100 Subject: [PATCH 1/5] chore: fix inconsistent metric naming (#1027) --- codex/blockexchange/engine/advertiser.nim | 6 +++--- codex/blockexchange/engine/discovery.nim | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/codex/blockexchange/engine/advertiser.nim b/codex/blockexchange/engine/advertiser.nim index 0b59d150..e4a97db1 100644 --- a/codex/blockexchange/engine/advertiser.nim +++ b/codex/blockexchange/engine/advertiser.nim @@ -26,7 +26,7 @@ import ../../manifest logScope: topics = "codex discoveryengine advertiser" -declareGauge(codexInflightAdvertise, "inflight advertise requests") +declareGauge(codex_inflight_advertise, "inflight advertise requests") const DefaultConcurrentAdvertRequests = 10 @@ -97,12 +97,12 @@ proc processQueueLoop(b: Advertiser) {.async.} = request = b.discovery.provide(cid) b.inFlightAdvReqs[cid] = request - codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) + codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64) await request finally: b.inFlightAdvReqs.del(cid) - codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) + codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64) except CancelledError: trace "Advertise task cancelled" return diff --git a/codex/blockexchange/engine/discovery.nim b/codex/blockexchange/engine/discovery.nim index 15312de9..f06105bf 100644 --- a/codex/blockexchange/engine/discovery.nim +++ b/codex/blockexchange/engine/discovery.nim @@ -31,7 +31,7 @@ import ../../manifest logScope: topics = "codex discoveryengine" -declareGauge(codexInflightDiscovery, "inflight discovery requests") +declareGauge(codex_inflight_discovery, "inflight discovery requests") const DefaultConcurrentDiscRequests = 10 @@ -96,7 +96,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = .wait(DefaultDiscoveryTimeout) b.inFlightDiscReqs[cid] = request - codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64) + codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64) let peers = await request @@ -110,7 +110,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} = finally: b.inFlightDiscReqs.del(cid) - codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64) + codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64) except CancelledError: trace "Discovery task cancelled" return From da234d503b1df4a9510b5505676551dfe810c28b Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Thu, 12 Dec 2024 19:31:51 +0700 Subject: [PATCH 2/5] fix(trackedfutures): removes usage of `then` from tracked futures (#1032) - removes usage of `then`, simplifying the logic, and allowing `then` to be removed completely - updates annotations to reflect that all procs (sync and async) raise no exceptions --- codex/utils/trackedfutures.nim | 20 +++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/codex/utils/trackedfutures.nim b/codex/utils/trackedfutures.nim index f3fcdb2d..a9753299 100644 --- a/codex/utils/trackedfutures.nim +++ b/codex/utils/trackedfutures.nim @@ -1,9 +1,9 @@ -import std/sugar import std/tables import pkg/chronos import ../logutils -import ../utils/then + +{.push raises: [].} type TrackedFutures* = ref object @@ -25,10 +25,10 @@ proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] = self.futures[fut.id] = FutureBase(fut) - fut - .then((val: T) => self.removeFuture(fut)) - .cancelled(() => self.removeFuture(fut)) - .catch((e: ref CatchableError) => self.removeFuture(fut)) + proc cb(udata: pointer) = + self.removeFuture(fut) + + fut.addCallback(cb) return fut @@ -38,15 +38,17 @@ proc track*[T, U](future: Future[T], self: U): Future[T] = ## `trackedFutures` property. self.trackedFutures.track(future) -proc cancelTracked*(self: TrackedFutures) {.async.} = +proc cancelTracked*(self: TrackedFutures) {.async: (raises: []).} = self.cancelling = true trace "cancelling tracked futures" + var cancellations: seq[FutureBase] for future in self.futures.values: if not future.isNil and not future.finished: - trace "cancelling tracked future", id = future.id - await future.cancelAndWait() + cancellations.add future.cancelAndWait() + + await noCancel allFutures cancellations self.futures.clear() self.cancelling = false From d10072bf67ddab51a9084c8a7f875239153d7922 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Thu, 12 Dec 2024 13:57:34 +0100 Subject: [PATCH 3/5] refactor: marketplace configuration is cached (#1029) --- codex/contracts/market.nim | 18 ++++++++++++++---- tests/contracts/testMarket.nim | 5 +++++ 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index e1e36d9b..cc60e27c 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -20,6 +20,8 @@ type contract: Marketplace signer: Signer rewardRecipient: ?Address + configuration: ?MarketplaceConfig + MarketSubscription = market.Subscription EventSubscription = ethers.Subscription OnChainMarketSubscription = ref object of MarketSubscription @@ -48,6 +50,14 @@ template convertEthersError(body) = except EthersError as error: raiseMarketError(error.msgDetail) +proc config(market: OnChainMarket): Future[MarketplaceConfig] {.async.} = + without resolvedConfig =? market.configuration: + let fetchedConfig = await market.contract.configuration() + market.configuration = some fetchedConfig + return fetchedConfig + + return resolvedConfig + proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = debug "Approving tokens", amount convertEthersError: @@ -56,7 +66,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = discard await token.increaseAllowance(market.contract.address(), amount).confirm(1) method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} = - let config = await market.contract.configuration() + let config = await market.config() return some config.proofs.zkeyHash method getSigner*(market: OnChainMarket): Future[Address] {.async.} = @@ -65,18 +75,18 @@ method getSigner*(market: OnChainMarket): Future[Address] {.async.} = method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} = convertEthersError: - let config = await market.contract.configuration() + let config = await market.config() let period = config.proofs.period return Periodicity(seconds: period) method proofTimeout*(market: OnChainMarket): Future[UInt256] {.async.} = convertEthersError: - let config = await market.contract.configuration() + let config = await market.config() return config.proofs.timeout method proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} = convertEthersError: - let config = await market.contract.configuration() + let config = await market.config() return config.proofs.downtime method getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} = diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index b967bc8c..8e4d013e 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -66,6 +66,11 @@ ethersuite "On-Chain Market": ): await advanceToNextPeriod() + test "caches marketplace configuration": + check isNone market.configuration + discard await market.periodicity() + check isSome market.configuration + test "fails to instantiate when contract does not have a signer": let storageWithoutSigner = marketplace.connect(ethProvider) expect AssertionDefect: From 19af79786e478f338eadc47ac3a9024ff236ed4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Adam=20Uhl=C3=AD=C5=99?= Date: Thu, 12 Dec 2024 21:19:56 +0100 Subject: [PATCH 4/5] feat: repair is rewarded (#1022) * feat: repair is rewarded * chore: update contracts repo * feat: proving loop handles repair case * test: assert repair state * chore: update contracts repo * fix: upon unknown state of repair go to error --- codex/contracts/market.nim | 5 +++++ codex/contracts/requests.nim | 1 + codex/market.nim | 3 +++ codex/sales/states/filling.nim | 13 ++++++++++++- codex/sales/states/preparing.nim | 2 +- codex/sales/states/proving.nim | 5 +++++ codex/sales/states/unknown.nim | 7 ++++++- tests/codex/helpers/mockmarket.nim | 3 +++ tests/contracts/testMarket.nim | 2 +- vendor/codex-contracts-eth | 2 +- 10 files changed, 38 insertions(+), 5 deletions(-) diff --git a/codex/contracts/market.nim b/codex/contracts/market.nim index cc60e27c..049e38bb 100644 --- a/codex/contracts/market.nim +++ b/codex/contracts/market.nim @@ -84,6 +84,11 @@ method proofTimeout*(market: OnChainMarket): Future[UInt256] {.async.} = let config = await market.config() return config.proofs.timeout +method repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async.} = + convertEthersError: + let config = await market.contract.configuration() + return config.collateral.repairRewardPercentage + method proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} = convertEthersError: let config = await market.config() diff --git a/codex/contracts/requests.nim b/codex/contracts/requests.nim index d94baa17..be85e6b5 100644 --- a/codex/contracts/requests.nim +++ b/codex/contracts/requests.nim @@ -49,6 +49,7 @@ type Failed Paid Cancelled + Repair proc `==`*(x, y: Nonce): bool {.borrow.} proc `==`*(x, y: RequestId): bool {.borrow.} diff --git a/codex/market.nim b/codex/market.nim index 6892237c..cb86e0d7 100644 --- a/codex/market.nim +++ b/codex/market.nim @@ -67,6 +67,9 @@ method periodicity*(market: Market): Future[Periodicity] {.base, async.} = method proofTimeout*(market: Market): Future[UInt256] {.base, async.} = raiseAssert("not implemented") +method repairRewardPercentage*(market: Market): Future[uint8] {.base, async.} = + raiseAssert("not implemented") + method proofDowntime*(market: Market): Future[uint8] {.base, async.} = raiseAssert("not implemented") diff --git a/codex/sales/states/filling.nim b/codex/sales/states/filling.nim index 678e2a21..a5531e79 100644 --- a/codex/sales/states/filling.nim +++ b/codex/sales/states/filling.nim @@ -1,3 +1,4 @@ +import pkg/stint import ../../logutils import ../../market import ../statemachine @@ -27,13 +28,23 @@ method onFailed*(state: SaleFilling, request: StorageRequest): ?State = method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = let data = SalesAgent(machine).data let market = SalesAgent(machine).context.market - without (collateral =? data.request.?ask.?collateral): + without (fullCollateral =? data.request.?ask.?collateral): raiseAssert "Request not set" logScope: requestId = data.requestId slotIndex = data.slotIndex + let slotState = await market.slotState(slotId(data.requestId, data.slotIndex)) + var collateral: Uint256 + + if slotState == SlotState.Repair: + # When repairing the node gets "discount" on the collateral that it needs to + let repairRewardPercentage = (await market.repairRewardPercentage).u256 + collateral = fullCollateral - ((fullCollateral * repairRewardPercentage)).div(100.u256) + else: + collateral = fullCollateral + debug "Filling slot" try: await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) diff --git a/codex/sales/states/preparing.nim b/codex/sales/states/preparing.nim index c92ec716..169eb964 100644 --- a/codex/sales/states/preparing.nim +++ b/codex/sales/states/preparing.nim @@ -49,7 +49,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} = let slotId = slotId(data.requestId, data.slotIndex) let state = await market.slotState(slotId) - if state != SlotState.Free: + if state != SlotState.Free and state != SlotState.Repair: return some State(SaleIgnored(reprocessSlot: false, returnBytes: false)) # TODO: Once implemented, check to ensure the host is allowed to fill the slot, diff --git a/codex/sales/states/proving.nim b/codex/sales/states/proving.nim index dd05ac7f..76180ab2 100644 --- a/codex/sales/states/proving.nim +++ b/codex/sales/states/proving.nim @@ -16,6 +16,7 @@ logScope: topics = "marketplace sales proving" type + SlotFreedError* = object of CatchableError SlotNotFilledError* = object of CatchableError SaleProving* = ref object of ErrorHandlingState loop: Future[void] @@ -82,6 +83,10 @@ proc proveLoop( of SlotState.Cancelled: debug "Slot reached cancelled state" # do nothing, let onCancelled callback take care of it + of SlotState.Repair: + warn "Slot was forcible freed" + let message = "Slot was forcible freed and host was removed from its hosting" + raise newException(SlotFreedError, message) of SlotState.Failed: debug "Slot reached failed state" # do nothing, let onFailed callback take care of it diff --git a/codex/sales/states/unknown.nim b/codex/sales/states/unknown.nim index db00f517..d497cba3 100644 --- a/codex/sales/states/unknown.nim +++ b/codex/sales/states/unknown.nim @@ -5,6 +5,7 @@ import ./filled import ./finished import ./failed import ./errored +import ./proving import ./cancelled import ./payout @@ -38,7 +39,7 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} = case slotState of SlotState.Free: let error = newException(UnexpectedSlotError, - "slot state on chain should not be 'free'") + "Slot state on chain should not be 'free'") return some State(SaleErrored(error: error)) of SlotState.Filled: return some State(SaleFilled()) @@ -50,3 +51,7 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} = return some State(SaleFailed()) of SlotState.Cancelled: return some State(SaleCancelled()) + of SlotState.Repair: + let error = newException(SlotFreedError, + "Slot was forcible freed and host was removed from its hosting") + return some State(SaleErrored(error: error)) diff --git a/tests/codex/helpers/mockmarket.nim b/tests/codex/helpers/mockmarket.nim index d19f0c21..07eeb856 100644 --- a/tests/codex/helpers/mockmarket.nim +++ b/tests/codex/helpers/mockmarket.nim @@ -125,6 +125,9 @@ method proofTimeout*(market: MockMarket): Future[UInt256] {.async.} = method proofDowntime*(market: MockMarket): Future[uint8] {.async.} = return market.config.proofs.downtime +method repairRewardPercentage*(market: MockMarket): Future[uint8] {.async.} = + return market.config.collateral.repairRewardPercentage + method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} = return market.proofPointer diff --git a/tests/contracts/testMarket.nim b/tests/contracts/testMarket.nim index 8e4d013e..a32590d6 100644 --- a/tests/contracts/testMarket.nim +++ b/tests/contracts/testMarket.nim @@ -303,7 +303,7 @@ ethersuite "On-Chain Market": let slotId = request.slotId(slotIndex.u256) while true: let slotState = await marketplace.slotState(slotId) - if slotState == SlotState.Free: + if slotState == SlotState.Repair or slotState == SlotState.Failed: break await waitUntilProofRequired(slotId) let missingPeriod = periodicity.periodOf(await ethProvider.currentTime()) diff --git a/vendor/codex-contracts-eth b/vendor/codex-contracts-eth index 945f6008..dfab6102 160000 --- a/vendor/codex-contracts-eth +++ b/vendor/codex-contracts-eth @@ -1 +1 @@ -Subproject commit 945f6008c8817abd7ca43a40368d33bb1e014c14 +Subproject commit dfab6102e71d2acaff86af45b87be2536530c624 From 7c804b0ec901f75ef2b37234d0cd37c945b52b70 Mon Sep 17 00:00:00 2001 From: Eric <5089238+emizzle@users.noreply.github.com> Date: Fri, 13 Dec 2024 09:35:39 +0700 Subject: [PATCH 5/5] fix(asyncstatemachine): fixes not awaiting or asyncSpawning futures (#1033) - adds a break in scheduler when CancelledError is caught - tracks asyncSpawned state.run, so that it can be cancelled during stop - removes usages of `then` - ensures that no exceptions are leaked from async procs --- codex/utils/asyncstatemachine.nim | 58 ++++++++++++++----------------- 1 file changed, 27 insertions(+), 31 deletions(-) diff --git a/codex/utils/asyncstatemachine.nim b/codex/utils/asyncstatemachine.nim index 3f15af31..b32667be 100644 --- a/codex/utils/asyncstatemachine.nim +++ b/codex/utils/asyncstatemachine.nim @@ -1,12 +1,11 @@ import std/sugar import pkg/questionable import pkg/chronos -import pkg/upraises import ../logutils import ./then import ./trackedfutures -push: {.upraises:[].} +{.push raises:[].} type Machine* = ref object of RootObj @@ -17,7 +16,7 @@ type trackedFutures: TrackedFutures State* = ref object of RootObj Query*[T] = proc(state: State): T - Event* = proc(state: State): ?State {.gcsafe, upraises:[].} + Event* = proc(state: State): ?State {.gcsafe, raises:[].} logScope: topics = "statemachine" @@ -58,29 +57,31 @@ proc onError(machine: Machine, error: ref CatchableError): Event = return proc (state: State): ?State = state.onError(error) -proc run(machine: Machine, state: State) {.async.} = - if next =? await state.run(machine): - machine.schedule(Event.transition(state, next)) +proc run(machine: Machine, state: State) {.async: (raises:[]).} = + try: + if next =? await state.run(machine): + machine.schedule(Event.transition(state, next)) + except CancelledError: + discard # do not propagate + except CatchableError as e: + machine.schedule(machine.onError(e)) -proc scheduler(machine: Machine) {.async.} = - var running: Future[void] +proc scheduler(machine: Machine) {.async: (raises: []).} = + var running: Future[void].Raising([]) while machine.started: - let event = await machine.scheduled.get().track(machine) - if next =? event(machine.state): - if not running.isNil and not running.finished: - trace "cancelling current state", state = $machine.state - await running.cancelAndWait() - let fromState = if machine.state.isNil: "" else: $machine.state - machine.state = next - debug "enter state", state = fromState & " => " & $machine.state - running = machine.run(machine.state) - running - .track(machine) - .cancelled(proc() = trace "state.run cancelled, swallowing", state = $machine.state) - .catch(proc(err: ref CatchableError) = - trace "error caught in state.run, calling state.onError", state = $machine.state - machine.schedule(machine.onError(err)) - ) + try: + let event = await machine.scheduled.get() + if next =? event(machine.state): + if not running.isNil and not running.finished: + trace "cancelling current state", state = $machine.state + await running.cancelAndWait() + let fromState = if machine.state.isNil: "" else: $machine.state + machine.state = next + debug "enter state", state = fromState & " => " & $machine.state + running = machine.run(machine.state) + asyncSpawn running.track(machine) + except CancelledError: + break # do not propagate bc it is asyncSpawned proc start*(machine: Machine, initialState: State) = if machine.started: @@ -90,13 +91,8 @@ proc start*(machine: Machine, initialState: State) = machine.scheduled = newAsyncQueue[Event]() machine.started = true - try: - discard machine.scheduler().track(machine) - machine.schedule(Event.transition(machine.state, initialState)) - except CancelledError as e: - discard - except CatchableError as e: - error("Error in scheduler", error = e.msg) + asyncSpawn machine.scheduler().track(machine) + machine.schedule(Event.transition(machine.state, initialState)) proc stop*(machine: Machine) {.async.} = if not machine.started: