nim-dagger/codex/validation.nim
Marcin Czenko 5ace105a66
Validator - support partitioning of the slot id space (#890)
* Adds validatorPartitionSize and validatorPartitionIndex config options

* adds partitioning options to the validation type

* adds partitioning logic to the validator

* ignores partitionIndex when partitionSize is either 0 or 1

* clips the partition index to <<partitionIndex mod partitionSize>>

* handles negative values for the validation partition index

* updates long description of the new validator cli options

* makes default partitionSize to be 0 for better backward compatibility

* Improving formatting on validator CLI

* reactors validation params into a separate type and simplifies validation of validation params

* removes suspected duplication

* fixes typo in validator CLI help

* updates README

* Applies review comments - using optionals and range types to handle validation params

* Adds initializer to the configFactory for validatorMaxSlots

* [Review] update validator CLI description and README

* [Review]: renaming validationParams to validationConfig (config)

* [Review]: move validationconfig.nim to a higher level (next to validation.nim)

* changes backing type of MaxSlots to be int and makes sure slots are validated without limit when maxSlots is set to 0

* adds more end-to-end test for the validator and the groups

* fixes typo in README and conf.nim

* makes `maxSlotsConstraintRespected` and `shouldValidateSlot` private + updates the tests

* fixes public address of the signer account in the marketplace tutorial

* applies review comments - removes two tests
2024-10-02 22:00:40 +00:00

133 lines
4.3 KiB
Nim

import std/sets
import std/sequtils
import pkg/chronos
import pkg/questionable/results
import ./validationconfig
import ./market
import ./clock
import ./logutils
export market
export sets
export validationconfig
type
Validation* = ref object
slots: HashSet[SlotId]
clock: Clock
market: Market
subscriptions: seq[Subscription]
running: Future[void]
periodicity: Periodicity
proofTimeout: UInt256
config: ValidationConfig
logScope:
topics = "codex validator"
proc new*(
_: type Validation,
clock: Clock,
market: Market,
config: ValidationConfig
): Validation =
Validation(clock: clock, market: market, config: config)
proc slots*(validation: Validation): seq[SlotId] =
validation.slots.toSeq
proc getCurrentPeriod(validation: Validation): UInt256 =
return validation.periodicity.periodOf(validation.clock.now().u256)
proc waitUntilNextPeriod(validation: Validation) {.async.} =
let period = validation.getCurrentPeriod()
let periodEnd = validation.periodicity.periodEnd(period)
trace "Waiting until next period", currentPeriod = period
await validation.clock.waitUntil(periodEnd.truncate(int64) + 1)
func groupIndexForSlotId*(slotId: SlotId,
validationGroups: ValidationGroups): uint16 =
let slotIdUInt256 = UInt256.fromBytesBE(slotId.toArray)
(slotIdUInt256 mod validationGroups.u256).truncate(uint16)
func maxSlotsConstraintRespected(validation: Validation): bool =
validation.config.maxSlots == 0 or
validation.slots.len < validation.config.maxSlots
func shouldValidateSlot(validation: Validation, slotId: SlotId): bool =
if (validationGroups =? validation.config.groups):
(groupIndexForSlotId(slotId, validationGroups) ==
validation.config.groupIndex) and
validation.maxSlotsConstraintRespected
else:
validation.maxSlotsConstraintRespected
proc subscribeSlotFilled(validation: Validation) {.async.} =
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
let slotId = slotId(requestId, slotIndex)
if validation.shouldValidateSlot(slotId):
trace "Adding slot", slotId
validation.slots.incl(slotId)
let subscription = await validation.market.subscribeSlotFilled(onSlotFilled)
validation.subscriptions.add(subscription)
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
var ended: HashSet[SlotId]
let slots = validation.slots
for slotId in slots:
let state = await validation.market.slotState(slotId)
if state != SlotState.Filled:
trace "Removing slot", slotId
ended.incl(slotId)
validation.slots.excl(ended)
proc markProofAsMissing(validation: Validation,
slotId: SlotId,
period: Period) {.async.} =
logScope:
currentPeriod = validation.getCurrentPeriod()
try:
if await validation.market.canProofBeMarkedAsMissing(slotId, period):
trace "Marking proof as missing", slotId, periodProofMissed = period
await validation.market.markProofAsMissing(slotId, period)
else:
let inDowntime {.used.} = await validation.market.inDowntime(slotId)
trace "Proof not missing", checkedPeriod = period, inDowntime
except CancelledError:
raise
except CatchableError as e:
error "Marking proof as missing failed", msg = e.msg
proc markProofsAsMissing(validation: Validation) {.async.} =
let slots = validation.slots
for slotId in slots:
let previousPeriod = validation.getCurrentPeriod() - 1
await validation.markProofAsMissing(slotId, previousPeriod)
proc run(validation: Validation) {.async.} =
trace "Validation started"
try:
while true:
await validation.waitUntilNextPeriod()
await validation.removeSlotsThatHaveEnded()
await validation.markProofsAsMissing()
except CancelledError:
trace "Validation stopped"
discard
except CatchableError as e:
error "Validation failed", msg = e.msg
proc start*(validation: Validation) {.async.} =
validation.periodicity = await validation.market.periodicity()
validation.proofTimeout = await validation.market.proofTimeout()
await validation.subscribeSlotFilled()
validation.running = validation.run()
proc stop*(validation: Validation) {.async.} =
await validation.running.cancelAndWait()
while validation.subscriptions.len > 0:
let subscription = validation.subscriptions.pop()
await subscription.unsubscribe()