Merge branch 'master' into feature/blkexc-peer-selection

This commit is contained in:
Ben Bierens 2024-12-13 08:31:00 +01:00 committed by GitHub
commit 9c2f3faa4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 101 additions and 55 deletions

View File

@ -26,7 +26,7 @@ import ../../manifest
logScope: logScope:
topics = "codex discoveryengine advertiser" topics = "codex discoveryengine advertiser"
declareGauge(codexInflightAdvertise, "inflight advertise requests") declareGauge(codex_inflight_advertise, "inflight advertise requests")
const const
DefaultConcurrentAdvertRequests = 10 DefaultConcurrentAdvertRequests = 10
@ -97,12 +97,12 @@ proc processQueueLoop(b: Advertiser) {.async.} =
request = b.discovery.provide(cid) request = b.discovery.provide(cid)
b.inFlightAdvReqs[cid] = request b.inFlightAdvReqs[cid] = request
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
await request await request
finally: finally:
b.inFlightAdvReqs.del(cid) b.inFlightAdvReqs.del(cid)
codexInflightAdvertise.set(b.inFlightAdvReqs.len.int64) codex_inflight_advertise.set(b.inFlightAdvReqs.len.int64)
except CancelledError: except CancelledError:
trace "Advertise task cancelled" trace "Advertise task cancelled"
return return

View File

@ -31,7 +31,7 @@ import ../../manifest
logScope: logScope:
topics = "codex discoveryengine" topics = "codex discoveryengine"
declareGauge(codexInflightDiscovery, "inflight discovery requests") declareGauge(codex_inflight_discovery, "inflight discovery requests")
const const
DefaultConcurrentDiscRequests = 10 DefaultConcurrentDiscRequests = 10
@ -96,7 +96,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
.wait(DefaultDiscoveryTimeout) .wait(DefaultDiscoveryTimeout)
b.inFlightDiscReqs[cid] = request b.inFlightDiscReqs[cid] = request
codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64) codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
let let
peers = await request peers = await request
@ -110,7 +110,7 @@ proc discoveryTaskLoop(b: DiscoveryEngine) {.async.} =
finally: finally:
b.inFlightDiscReqs.del(cid) b.inFlightDiscReqs.del(cid)
codexInflightDiscovery.set(b.inFlightDiscReqs.len.int64) codex_inflight_discovery.set(b.inFlightDiscReqs.len.int64)
except CancelledError: except CancelledError:
trace "Discovery task cancelled" trace "Discovery task cancelled"
return return

View File

@ -20,6 +20,8 @@ type
contract: Marketplace contract: Marketplace
signer: Signer signer: Signer
rewardRecipient: ?Address rewardRecipient: ?Address
configuration: ?MarketplaceConfig
MarketSubscription = market.Subscription MarketSubscription = market.Subscription
EventSubscription = ethers.Subscription EventSubscription = ethers.Subscription
OnChainMarketSubscription = ref object of MarketSubscription OnChainMarketSubscription = ref object of MarketSubscription
@ -48,6 +50,14 @@ template convertEthersError(body) =
except EthersError as error: except EthersError as error:
raiseMarketError(error.msgDetail) raiseMarketError(error.msgDetail)
proc config(market: OnChainMarket): Future[MarketplaceConfig] {.async.} =
without resolvedConfig =? market.configuration:
let fetchedConfig = await market.contract.configuration()
market.configuration = some fetchedConfig
return fetchedConfig
return resolvedConfig
proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} = proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
debug "Approving tokens", amount debug "Approving tokens", amount
convertEthersError: convertEthersError:
@ -56,7 +66,7 @@ proc approveFunds(market: OnChainMarket, amount: UInt256) {.async.} =
discard await token.increaseAllowance(market.contract.address(), amount).confirm(1) discard await token.increaseAllowance(market.contract.address(), amount).confirm(1)
method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} = method getZkeyHash*(market: OnChainMarket): Future[?string] {.async.} =
let config = await market.contract.configuration() let config = await market.config()
return some config.proofs.zkeyHash return some config.proofs.zkeyHash
method getSigner*(market: OnChainMarket): Future[Address] {.async.} = method getSigner*(market: OnChainMarket): Future[Address] {.async.} =
@ -65,18 +75,23 @@ method getSigner*(market: OnChainMarket): Future[Address] {.async.} =
method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} = method periodicity*(market: OnChainMarket): Future[Periodicity] {.async.} =
convertEthersError: convertEthersError:
let config = await market.contract.configuration() let config = await market.config()
let period = config.proofs.period let period = config.proofs.period
return Periodicity(seconds: period) return Periodicity(seconds: period)
method proofTimeout*(market: OnChainMarket): Future[UInt256] {.async.} = method proofTimeout*(market: OnChainMarket): Future[UInt256] {.async.} =
convertEthersError: convertEthersError:
let config = await market.contract.configuration() let config = await market.config()
return config.proofs.timeout return config.proofs.timeout
method repairRewardPercentage*(market: OnChainMarket): Future[uint8] {.async.} =
convertEthersError:
let config = await market.contract.configuration()
return config.collateral.repairRewardPercentage
method proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} = method proofDowntime*(market: OnChainMarket): Future[uint8] {.async.} =
convertEthersError: convertEthersError:
let config = await market.contract.configuration() let config = await market.config()
return config.proofs.downtime return config.proofs.downtime
method getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} = method getPointer*(market: OnChainMarket, slotId: SlotId): Future[uint8] {.async.} =

View File

@ -49,6 +49,7 @@ type
Failed Failed
Paid Paid
Cancelled Cancelled
Repair
proc `==`*(x, y: Nonce): bool {.borrow.} proc `==`*(x, y: Nonce): bool {.borrow.}
proc `==`*(x, y: RequestId): bool {.borrow.} proc `==`*(x, y: RequestId): bool {.borrow.}

View File

@ -67,6 +67,9 @@ method periodicity*(market: Market): Future[Periodicity] {.base, async.} =
method proofTimeout*(market: Market): Future[UInt256] {.base, async.} = method proofTimeout*(market: Market): Future[UInt256] {.base, async.} =
raiseAssert("not implemented") raiseAssert("not implemented")
method repairRewardPercentage*(market: Market): Future[uint8] {.base, async.} =
raiseAssert("not implemented")
method proofDowntime*(market: Market): Future[uint8] {.base, async.} = method proofDowntime*(market: Market): Future[uint8] {.base, async.} =
raiseAssert("not implemented") raiseAssert("not implemented")

View File

@ -1,3 +1,4 @@
import pkg/stint
import ../../logutils import ../../logutils
import ../../market import ../../market
import ../statemachine import ../statemachine
@ -27,13 +28,23 @@ method onFailed*(state: SaleFilling, request: StorageRequest): ?State =
method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} = method run(state: SaleFilling, machine: Machine): Future[?State] {.async.} =
let data = SalesAgent(machine).data let data = SalesAgent(machine).data
let market = SalesAgent(machine).context.market let market = SalesAgent(machine).context.market
without (collateral =? data.request.?ask.?collateral): without (fullCollateral =? data.request.?ask.?collateral):
raiseAssert "Request not set" raiseAssert "Request not set"
logScope: logScope:
requestId = data.requestId requestId = data.requestId
slotIndex = data.slotIndex slotIndex = data.slotIndex
let slotState = await market.slotState(slotId(data.requestId, data.slotIndex))
var collateral: Uint256
if slotState == SlotState.Repair:
# When repairing the node gets "discount" on the collateral that it needs to
let repairRewardPercentage = (await market.repairRewardPercentage).u256
collateral = fullCollateral - ((fullCollateral * repairRewardPercentage)).div(100.u256)
else:
collateral = fullCollateral
debug "Filling slot" debug "Filling slot"
try: try:
await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral) await market.fillSlot(data.requestId, data.slotIndex, state.proof, collateral)

View File

@ -49,7 +49,7 @@ method run*(state: SalePreparing, machine: Machine): Future[?State] {.async.} =
let slotId = slotId(data.requestId, data.slotIndex) let slotId = slotId(data.requestId, data.slotIndex)
let state = await market.slotState(slotId) let state = await market.slotState(slotId)
if state != SlotState.Free: if state != SlotState.Free and state != SlotState.Repair:
return some State(SaleIgnored(reprocessSlot: false, returnBytes: false)) return some State(SaleIgnored(reprocessSlot: false, returnBytes: false))
# TODO: Once implemented, check to ensure the host is allowed to fill the slot, # TODO: Once implemented, check to ensure the host is allowed to fill the slot,

View File

@ -16,6 +16,7 @@ logScope:
topics = "marketplace sales proving" topics = "marketplace sales proving"
type type
SlotFreedError* = object of CatchableError
SlotNotFilledError* = object of CatchableError SlotNotFilledError* = object of CatchableError
SaleProving* = ref object of ErrorHandlingState SaleProving* = ref object of ErrorHandlingState
loop: Future[void] loop: Future[void]
@ -82,6 +83,10 @@ proc proveLoop(
of SlotState.Cancelled: of SlotState.Cancelled:
debug "Slot reached cancelled state" debug "Slot reached cancelled state"
# do nothing, let onCancelled callback take care of it # do nothing, let onCancelled callback take care of it
of SlotState.Repair:
warn "Slot was forcible freed"
let message = "Slot was forcible freed and host was removed from its hosting"
raise newException(SlotFreedError, message)
of SlotState.Failed: of SlotState.Failed:
debug "Slot reached failed state" debug "Slot reached failed state"
# do nothing, let onFailed callback take care of it # do nothing, let onFailed callback take care of it

View File

@ -5,6 +5,7 @@ import ./filled
import ./finished import ./finished
import ./failed import ./failed
import ./errored import ./errored
import ./proving
import ./cancelled import ./cancelled
import ./payout import ./payout
@ -38,7 +39,7 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
case slotState case slotState
of SlotState.Free: of SlotState.Free:
let error = newException(UnexpectedSlotError, let error = newException(UnexpectedSlotError,
"slot state on chain should not be 'free'") "Slot state on chain should not be 'free'")
return some State(SaleErrored(error: error)) return some State(SaleErrored(error: error))
of SlotState.Filled: of SlotState.Filled:
return some State(SaleFilled()) return some State(SaleFilled())
@ -50,3 +51,7 @@ method run*(state: SaleUnknown, machine: Machine): Future[?State] {.async.} =
return some State(SaleFailed()) return some State(SaleFailed())
of SlotState.Cancelled: of SlotState.Cancelled:
return some State(SaleCancelled()) return some State(SaleCancelled())
of SlotState.Repair:
let error = newException(SlotFreedError,
"Slot was forcible freed and host was removed from its hosting")
return some State(SaleErrored(error: error))

View File

@ -1,12 +1,11 @@
import std/sugar import std/sugar
import pkg/questionable import pkg/questionable
import pkg/chronos import pkg/chronos
import pkg/upraises
import ../logutils import ../logutils
import ./then import ./then
import ./trackedfutures import ./trackedfutures
push: {.upraises:[].} {.push raises:[].}
type type
Machine* = ref object of RootObj Machine* = ref object of RootObj
@ -17,7 +16,7 @@ type
trackedFutures: TrackedFutures trackedFutures: TrackedFutures
State* = ref object of RootObj State* = ref object of RootObj
Query*[T] = proc(state: State): T Query*[T] = proc(state: State): T
Event* = proc(state: State): ?State {.gcsafe, upraises:[].} Event* = proc(state: State): ?State {.gcsafe, raises:[].}
logScope: logScope:
topics = "statemachine" topics = "statemachine"
@ -58,29 +57,31 @@ proc onError(machine: Machine, error: ref CatchableError): Event =
return proc (state: State): ?State = return proc (state: State): ?State =
state.onError(error) state.onError(error)
proc run(machine: Machine, state: State) {.async.} = proc run(machine: Machine, state: State) {.async: (raises:[]).} =
if next =? await state.run(machine): try:
machine.schedule(Event.transition(state, next)) if next =? await state.run(machine):
machine.schedule(Event.transition(state, next))
except CancelledError:
discard # do not propagate
except CatchableError as e:
machine.schedule(machine.onError(e))
proc scheduler(machine: Machine) {.async.} = proc scheduler(machine: Machine) {.async: (raises: []).} =
var running: Future[void] var running: Future[void].Raising([])
while machine.started: while machine.started:
let event = await machine.scheduled.get().track(machine) try:
if next =? event(machine.state): let event = await machine.scheduled.get()
if not running.isNil and not running.finished: if next =? event(machine.state):
trace "cancelling current state", state = $machine.state if not running.isNil and not running.finished:
await running.cancelAndWait() trace "cancelling current state", state = $machine.state
let fromState = if machine.state.isNil: "<none>" else: $machine.state await running.cancelAndWait()
machine.state = next let fromState = if machine.state.isNil: "<none>" else: $machine.state
debug "enter state", state = fromState & " => " & $machine.state machine.state = next
running = machine.run(machine.state) debug "enter state", state = fromState & " => " & $machine.state
running running = machine.run(machine.state)
.track(machine) asyncSpawn running.track(machine)
.cancelled(proc() = trace "state.run cancelled, swallowing", state = $machine.state) except CancelledError:
.catch(proc(err: ref CatchableError) = break # do not propagate bc it is asyncSpawned
trace "error caught in state.run, calling state.onError", state = $machine.state
machine.schedule(machine.onError(err))
)
proc start*(machine: Machine, initialState: State) = proc start*(machine: Machine, initialState: State) =
if machine.started: if machine.started:
@ -90,13 +91,8 @@ proc start*(machine: Machine, initialState: State) =
machine.scheduled = newAsyncQueue[Event]() machine.scheduled = newAsyncQueue[Event]()
machine.started = true machine.started = true
try: asyncSpawn machine.scheduler().track(machine)
discard machine.scheduler().track(machine) machine.schedule(Event.transition(machine.state, initialState))
machine.schedule(Event.transition(machine.state, initialState))
except CancelledError as e:
discard
except CatchableError as e:
error("Error in scheduler", error = e.msg)
proc stop*(machine: Machine) {.async.} = proc stop*(machine: Machine) {.async.} =
if not machine.started: if not machine.started:

View File

@ -1,9 +1,9 @@
import std/sugar
import std/tables import std/tables
import pkg/chronos import pkg/chronos
import ../logutils import ../logutils
import ../utils/then
{.push raises: [].}
type type
TrackedFutures* = ref object TrackedFutures* = ref object
@ -25,10 +25,10 @@ proc track*[T](self: TrackedFutures, fut: Future[T]): Future[T] =
self.futures[fut.id] = FutureBase(fut) self.futures[fut.id] = FutureBase(fut)
fut proc cb(udata: pointer) =
.then((val: T) => self.removeFuture(fut)) self.removeFuture(fut)
.cancelled(() => self.removeFuture(fut))
.catch((e: ref CatchableError) => self.removeFuture(fut)) fut.addCallback(cb)
return fut return fut
@ -38,15 +38,17 @@ proc track*[T, U](future: Future[T], self: U): Future[T] =
## `trackedFutures` property. ## `trackedFutures` property.
self.trackedFutures.track(future) self.trackedFutures.track(future)
proc cancelTracked*(self: TrackedFutures) {.async.} = proc cancelTracked*(self: TrackedFutures) {.async: (raises: []).} =
self.cancelling = true self.cancelling = true
trace "cancelling tracked futures" trace "cancelling tracked futures"
var cancellations: seq[FutureBase]
for future in self.futures.values: for future in self.futures.values:
if not future.isNil and not future.finished: if not future.isNil and not future.finished:
trace "cancelling tracked future", id = future.id cancellations.add future.cancelAndWait()
await future.cancelAndWait()
await noCancel allFutures cancellations
self.futures.clear() self.futures.clear()
self.cancelling = false self.cancelling = false

View File

@ -125,6 +125,9 @@ method proofTimeout*(market: MockMarket): Future[UInt256] {.async.} =
method proofDowntime*(market: MockMarket): Future[uint8] {.async.} = method proofDowntime*(market: MockMarket): Future[uint8] {.async.} =
return market.config.proofs.downtime return market.config.proofs.downtime
method repairRewardPercentage*(market: MockMarket): Future[uint8] {.async.} =
return market.config.collateral.repairRewardPercentage
method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} = method getPointer*(market: MockMarket, slotId: SlotId): Future[uint8] {.async.} =
return market.proofPointer return market.proofPointer

View File

@ -66,6 +66,11 @@ ethersuite "On-Chain Market":
): ):
await advanceToNextPeriod() await advanceToNextPeriod()
test "caches marketplace configuration":
check isNone market.configuration
discard await market.periodicity()
check isSome market.configuration
test "fails to instantiate when contract does not have a signer": test "fails to instantiate when contract does not have a signer":
let storageWithoutSigner = marketplace.connect(ethProvider) let storageWithoutSigner = marketplace.connect(ethProvider)
expect AssertionDefect: expect AssertionDefect:
@ -298,7 +303,7 @@ ethersuite "On-Chain Market":
let slotId = request.slotId(slotIndex.u256) let slotId = request.slotId(slotIndex.u256)
while true: while true:
let slotState = await marketplace.slotState(slotId) let slotState = await marketplace.slotState(slotId)
if slotState == SlotState.Free: if slotState == SlotState.Repair or slotState == SlotState.Failed:
break break
await waitUntilProofRequired(slotId) await waitUntilProofRequired(slotId)
let missingPeriod = periodicity.periodOf(await ethProvider.currentTime()) let missingPeriod = periodicity.periodOf(await ethProvider.currentTime())

@ -1 +1 @@
Subproject commit 945f6008c8817abd7ca43a40368d33bb1e014c14 Subproject commit dfab6102e71d2acaff86af45b87be2536530c624