mirror of
https://github.com/codex-storage/nim-codex.git
synced 2025-01-20 17:58:53 +00:00
92a0eda79a
* adds a new overload of queryPastEvents allowing to query past events based on timestamp in the past * adds state restoration to validator * refactors a bit to get the tests back to work * replaces deprecated generic methods from Market with methods for specific event types * Refactors binary search * adds market tests for querying past SlotFilled events and binary search * Takes into account that <<earliest>> block available is not necessarily the genesis block * Adds more logging and makes testing earliest block boundary more reliable * adds validation tests for historical state restoration * adds mockprovider to simplify and improve testing of the edge conditions * adds slot reservation to the new tests after rebasing * adds validation groups and group index in logs of validator * adds integration test with two validators * adds comment on how to enable logging in integration test executable itself * testIntegration: makes list is running nodes injected and available in the body of the test * validation: adds integration test for historical state * adds more logging to validator * integration test: validator only looks 30 days back for historical state * adds logging of the slotState when removing slots during validation * review and refactor validator integration tests * adds validation to the set of integration tests * Fixes mistyped name of the mock provider module in testMarket * Fixes a typo in the name of the validation suite in integration tests * Makes validation unit test a bit easier to follow * better use of logScopes to reduce duplication * improves timing and clarifies the test conditions * uses http as default RPC provider for nodes running in integration tests as a workaround for dropped subscriptions * simplifies the validation integration tests by waiting for failed request instead of tracking slots * adds config option allowing selectively to set different provider url * Brings back the default settings for RPC provider in integration tests * use http RPC provider for clients in validation integration tests * fine-tune the tests * Makes validator integration test more robust - adds extra tracking * brings tracking of marketplace event back to validator integration test * refactors integration tests * deletes tmp file * adds <<return>> after forcing integration test to fail preliminarily * re-enables all integration tests and matrix * stops debug output in CI * allows to choose a different RPC provider for a given integration test suite * fixes signature of <<getBlock>> method in mockProvider * adds missing import which seem to be braking integration tests on windows * makes sure that clients, SPs, and validators use the same provider url * makes validator integration tests using http at 127.0.0.1:8545 * testvalidator: stop resubscribing as we are now using http polling as rpc provider * applying review comments * groups queryPastStorage overrides together (review comment) * groups the historical validation tests into a sub suite * removes the temporary extensions in marketplacesuite and multinodesuite allowing to specify provider url * simplifies validation integration tests * Removes debug logs when waiting for request to fail * Renaming waitForRequestFailed => waitForRequestToFail * renames blockNumberForBlocksAgo to pastBlockTag and makes it private * removes redundant debugging logs * refines logging in validation * removes dev logging from mockmarket * improves exception handling in provider helper procs and prepares for extraction to a separate module * Uses chronos instead of std/times for Duration * extracts provider and binary search helpers to a separate module * removes redundant log entry params from validator * unifies the notation to consistently use method call syntax * reuses ProviderError from nim-ethers in the provider extension * clarifies the comment in multinodesuite * uses == operator to check the predefined tags and raises exception when `BlockTag.pending` is requested. * when waiting for request to fail, we break on any request state that is not Started * removes tests that were moved to testProvider from testMarket * extracts tests that use MockProvider to a separate async suite * improves performance of the historical state restoration * removing redundant log messages in validator (groupIndex and groups) * adds testProvider to testContracts group * removes unused import in testMarket
159 lines
5.4 KiB
Nim
159 lines
5.4 KiB
Nim
import std/sets
|
|
import std/sequtils
|
|
import pkg/chronos
|
|
import pkg/questionable/results
|
|
|
|
import ./validationconfig
|
|
import ./market
|
|
import ./clock
|
|
import ./logutils
|
|
|
|
export market
|
|
export sets
|
|
export validationconfig
|
|
|
|
type
|
|
Validation* = ref object
|
|
slots: HashSet[SlotId]
|
|
clock: Clock
|
|
market: Market
|
|
subscriptions: seq[Subscription]
|
|
running: Future[void]
|
|
periodicity: Periodicity
|
|
proofTimeout: UInt256
|
|
config: ValidationConfig
|
|
|
|
const
|
|
MaxStorageRequestDuration = 30.days
|
|
|
|
logScope:
|
|
topics = "codex validator"
|
|
|
|
proc new*(
|
|
_: type Validation,
|
|
clock: Clock,
|
|
market: Market,
|
|
config: ValidationConfig
|
|
): Validation =
|
|
Validation(clock: clock, market: market, config: config)
|
|
|
|
proc slots*(validation: Validation): seq[SlotId] =
|
|
validation.slots.toSeq
|
|
|
|
proc getCurrentPeriod(validation: Validation): UInt256 =
|
|
return validation.periodicity.periodOf(validation.clock.now().u256)
|
|
|
|
proc waitUntilNextPeriod(validation: Validation) {.async.} =
|
|
let period = validation.getCurrentPeriod()
|
|
let periodEnd = validation.periodicity.periodEnd(period)
|
|
trace "Waiting until next period", currentPeriod = period
|
|
await validation.clock.waitUntil(periodEnd.truncate(int64) + 1)
|
|
|
|
func groupIndexForSlotId*(slotId: SlotId,
|
|
validationGroups: ValidationGroups): uint16 =
|
|
let slotIdUInt256 = UInt256.fromBytesBE(slotId.toArray)
|
|
(slotIdUInt256 mod validationGroups.u256).truncate(uint16)
|
|
|
|
func maxSlotsConstraintRespected(validation: Validation): bool =
|
|
validation.config.maxSlots == 0 or
|
|
validation.slots.len < validation.config.maxSlots
|
|
|
|
func shouldValidateSlot(validation: Validation, slotId: SlotId): bool =
|
|
without validationGroups =? validation.config.groups:
|
|
return true
|
|
groupIndexForSlotId(slotId, validationGroups) ==
|
|
validation.config.groupIndex
|
|
|
|
proc subscribeSlotFilled(validation: Validation) {.async.} =
|
|
proc onSlotFilled(requestId: RequestId, slotIndex: UInt256) =
|
|
if not validation.maxSlotsConstraintRespected:
|
|
return
|
|
let slotId = slotId(requestId, slotIndex)
|
|
if validation.shouldValidateSlot(slotId):
|
|
trace "Adding slot", slotId
|
|
validation.slots.incl(slotId)
|
|
let subscription = await validation.market.subscribeSlotFilled(onSlotFilled)
|
|
validation.subscriptions.add(subscription)
|
|
|
|
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
|
|
var ended: HashSet[SlotId]
|
|
let slots = validation.slots
|
|
for slotId in slots:
|
|
let state = await validation.market.slotState(slotId)
|
|
if state != SlotState.Filled:
|
|
trace "Removing slot", slotId, slotState = state
|
|
ended.incl(slotId)
|
|
validation.slots.excl(ended)
|
|
|
|
proc markProofAsMissing(validation: Validation,
|
|
slotId: SlotId,
|
|
period: Period) {.async.} =
|
|
logScope:
|
|
currentPeriod = validation.getCurrentPeriod()
|
|
|
|
try:
|
|
if await validation.market.canProofBeMarkedAsMissing(slotId, period):
|
|
trace "Marking proof as missing", slotId, periodProofMissed = period
|
|
await validation.market.markProofAsMissing(slotId, period)
|
|
else:
|
|
let inDowntime {.used.} = await validation.market.inDowntime(slotId)
|
|
trace "Proof not missing", checkedPeriod = period, inDowntime
|
|
except CancelledError:
|
|
raise
|
|
except CatchableError as e:
|
|
error "Marking proof as missing failed", msg = e.msg
|
|
|
|
proc markProofsAsMissing(validation: Validation) {.async.} =
|
|
let slots = validation.slots
|
|
for slotId in slots:
|
|
let previousPeriod = validation.getCurrentPeriod() - 1
|
|
await validation.markProofAsMissing(slotId, previousPeriod)
|
|
|
|
proc run(validation: Validation) {.async.} =
|
|
trace "Validation started"
|
|
try:
|
|
while true:
|
|
await validation.waitUntilNextPeriod()
|
|
await validation.removeSlotsThatHaveEnded()
|
|
await validation.markProofsAsMissing()
|
|
except CancelledError:
|
|
trace "Validation stopped"
|
|
discard
|
|
except CatchableError as e:
|
|
error "Validation failed", msg = e.msg
|
|
|
|
proc epochForDurationBackFromNow(validation: Validation,
|
|
duration: Duration): SecondsSince1970 =
|
|
return validation.clock.now - duration.secs
|
|
|
|
proc restoreHistoricalState(validation: Validation) {.async.} =
|
|
trace "Restoring historical state..."
|
|
let startTimeEpoch = validation.epochForDurationBackFromNow(MaxStorageRequestDuration)
|
|
let slotFilledEvents = await validation.market.queryPastSlotFilledEvents(
|
|
fromTime = startTimeEpoch)
|
|
for event in slotFilledEvents:
|
|
if not validation.maxSlotsConstraintRespected:
|
|
break
|
|
let slotId = slotId(event.requestId, event.slotIndex)
|
|
let slotState = await validation.market.slotState(slotId)
|
|
if slotState == SlotState.Filled and validation.shouldValidateSlot(slotId):
|
|
trace "Adding slot [historical]", slotId
|
|
validation.slots.incl(slotId)
|
|
trace "Historical state restored", numberOfSlots = validation.slots.len
|
|
|
|
proc start*(validation: Validation) {.async.} =
|
|
trace "Starting validator", groups = validation.config.groups,
|
|
groupIndex = validation.config.groupIndex
|
|
validation.periodicity = await validation.market.periodicity()
|
|
validation.proofTimeout = await validation.market.proofTimeout()
|
|
await validation.subscribeSlotFilled()
|
|
await validation.restoreHistoricalState()
|
|
validation.running = validation.run()
|
|
|
|
proc stop*(validation: Validation) {.async.} =
|
|
if not isNil(validation.running):
|
|
await validation.running.cancelAndWait()
|
|
while validation.subscriptions.len > 0:
|
|
let subscription = validation.subscriptions.pop()
|
|
await subscription.unsubscribe()
|