mirror of
https://github.com/logos-storage/logos-storage-nim.git
synced 2026-01-02 13:33:10 +00:00
* cleanup imports and logs * add BlockHandle type * revert deps * refactor: async error handling and future tracking improvements - Update async procedures to use explicit raises annotation - Modify TrackedFutures to handle futures with no raised exceptions - Replace `asyncSpawn` with explicit future tracking - Update test suites to use `unittest2` - Standardize error handling across network and async components - Remove deprecated error handling patterns This commit introduces a more robust approach to async error handling and future management, improving type safety and reducing potential runtime errors. * bump nim-serde * remove asyncSpawn * rework background downloads and prefetch * imporove logging * refactor: enhance async procedures with error handling and raise annotations * misc cleanup * misc * refactor: implement allFinishedFailed to aggregate future results with success and failure tracking * refactor: update error handling in reader procedures to raise ChunkerError and CancelledError * refactor: improve error handling in wantListHandler and accountHandler procedures * refactor: simplify LPStreamReadError creation by consolidating parameters * refactor: enhance error handling in AsyncStreamWrapper to catch unexpected errors * refactor: enhance error handling in advertiser and discovery loops to improve resilience * misc * refactor: improve code structure and readability * remove cancellation from addSlotToQueue * refactor: add assertion for unexpected errors in local store checks * refactor: prevent tracking of finished futures and improve test assertions * refactor: improve error handling in local store checks * remove usage of msgDetail * feat: add initial implementation of discovery engine and related components * refactor: improve task scheduling logic by removing unnecessary break statement * break after scheduling a task * make taskHandler cancelable * refactor: update async handlers to raise CancelledError * refactor(advertiser): streamline error handling and improve task flow in advertise loops * fix: correct spelling of "divisible" in error messages and comments * refactor(discovery): simplify discovery task loop and improve error handling * refactor(engine): filter peers before processing in cancelBlocks procedure
152 lines
5.5 KiB
Nim
152 lines
5.5 KiB
Nim
import std/sets
|
|
import std/sequtils
|
|
import pkg/chronos
|
|
import pkg/questionable/results
|
|
import pkg/stew/endians2
|
|
|
|
import ./validationconfig
|
|
import ./market
|
|
import ./clock
|
|
import ./logutils
|
|
|
|
export market
|
|
export sets
|
|
export validationconfig
|
|
|
|
type Validation* = ref object
|
|
slots: HashSet[SlotId]
|
|
clock: Clock
|
|
market: Market
|
|
subscriptions: seq[Subscription]
|
|
running: Future[void]
|
|
periodicity: Periodicity
|
|
proofTimeout: uint64
|
|
config: ValidationConfig
|
|
|
|
logScope:
|
|
topics = "codex validator"
|
|
|
|
proc new*(
|
|
_: type Validation, clock: Clock, market: Market, config: ValidationConfig
|
|
): Validation =
|
|
Validation(clock: clock, market: market, config: config)
|
|
|
|
proc slots*(validation: Validation): seq[SlotId] =
|
|
validation.slots.toSeq
|
|
|
|
proc getCurrentPeriod(validation: Validation): Period =
|
|
return validation.periodicity.periodOf(validation.clock.now().Timestamp)
|
|
|
|
proc waitUntilNextPeriod(validation: Validation) {.async.} =
|
|
let period = validation.getCurrentPeriod()
|
|
let periodEnd = validation.periodicity.periodEnd(period)
|
|
trace "Waiting until next period", currentPeriod = period
|
|
await validation.clock.waitUntil((periodEnd + 1).toSecondsSince1970)
|
|
|
|
func groupIndexForSlotId*(slotId: SlotId, validationGroups: ValidationGroups): uint16 =
|
|
let a = slotId.toArray
|
|
let slotIdInt64 = uint64.fromBytesBE(a)
|
|
(slotIdInt64 mod uint64(validationGroups)).uint16
|
|
|
|
func maxSlotsConstraintRespected(validation: Validation): bool =
|
|
validation.config.maxSlots == 0 or validation.slots.len < validation.config.maxSlots
|
|
|
|
func shouldValidateSlot(validation: Validation, slotId: SlotId): bool =
|
|
without validationGroups =? validation.config.groups:
|
|
return true
|
|
groupIndexForSlotId(slotId, validationGroups) == validation.config.groupIndex
|
|
|
|
proc subscribeSlotFilled(validation: Validation) {.async.} =
|
|
proc onSlotFilled(requestId: RequestId, slotIndex: uint64) =
|
|
if not validation.maxSlotsConstraintRespected:
|
|
return
|
|
let slotId = slotId(requestId, slotIndex)
|
|
if validation.shouldValidateSlot(slotId):
|
|
trace "Adding slot", slotId
|
|
validation.slots.incl(slotId)
|
|
|
|
let subscription = await validation.market.subscribeSlotFilled(onSlotFilled)
|
|
validation.subscriptions.add(subscription)
|
|
|
|
proc removeSlotsThatHaveEnded(validation: Validation) {.async.} =
|
|
var ended: HashSet[SlotId]
|
|
let slots = validation.slots
|
|
for slotId in slots:
|
|
let state = await validation.market.slotState(slotId)
|
|
if state != SlotState.Filled:
|
|
trace "Removing slot", slotId, slotState = state
|
|
ended.incl(slotId)
|
|
validation.slots.excl(ended)
|
|
|
|
proc markProofAsMissing(
|
|
validation: Validation, slotId: SlotId, period: Period
|
|
) {.async.} =
|
|
logScope:
|
|
currentPeriod = validation.getCurrentPeriod()
|
|
|
|
try:
|
|
if await validation.market.canProofBeMarkedAsMissing(slotId, period):
|
|
trace "Marking proof as missing", slotId, periodProofMissed = period
|
|
await validation.market.markProofAsMissing(slotId, period)
|
|
else:
|
|
let inDowntime {.used.} = await validation.market.inDowntime(slotId)
|
|
trace "Proof not missing", checkedPeriod = period, inDowntime
|
|
except CancelledError:
|
|
raise
|
|
except CatchableError as e:
|
|
error "Marking proof as missing failed", msg = e.msg
|
|
|
|
proc markProofsAsMissing(validation: Validation) {.async.} =
|
|
let slots = validation.slots
|
|
for slotId in slots:
|
|
let previousPeriod = validation.getCurrentPeriod() - 1
|
|
await validation.markProofAsMissing(slotId, previousPeriod)
|
|
|
|
proc run(validation: Validation) {.async: (raises: []).} =
|
|
trace "Validation started"
|
|
try:
|
|
while true:
|
|
await validation.waitUntilNextPeriod()
|
|
await validation.removeSlotsThatHaveEnded()
|
|
await validation.markProofsAsMissing()
|
|
except CancelledError:
|
|
trace "Validation stopped"
|
|
discard # do not propagate as run is asyncSpawned
|
|
except CatchableError as e:
|
|
error "Validation failed", msg = e.msg
|
|
|
|
proc findEpoch(validation: Validation, secondsAgo: uint64): SecondsSince1970 =
|
|
return validation.clock.now - secondsAgo.int64
|
|
|
|
proc restoreHistoricalState(validation: Validation) {.async.} =
|
|
trace "Restoring historical state..."
|
|
let requestDurationLimit = await validation.market.requestDurationLimit
|
|
let startTimeEpoch = validation.findEpoch(secondsAgo = requestDurationLimit)
|
|
let slotFilledEvents =
|
|
await validation.market.queryPastSlotFilledEvents(fromTime = startTimeEpoch)
|
|
for event in slotFilledEvents:
|
|
if not validation.maxSlotsConstraintRespected:
|
|
break
|
|
let slotId = slotId(event.requestId, event.slotIndex)
|
|
let slotState = await validation.market.slotState(slotId)
|
|
if slotState == SlotState.Filled and validation.shouldValidateSlot(slotId):
|
|
trace "Adding slot [historical]", slotId
|
|
validation.slots.incl(slotId)
|
|
trace "Historical state restored", numberOfSlots = validation.slots.len
|
|
|
|
proc start*(validation: Validation) {.async.} =
|
|
trace "Starting validator",
|
|
groups = validation.config.groups, groupIndex = validation.config.groupIndex
|
|
validation.periodicity = await validation.market.periodicity()
|
|
validation.proofTimeout = await validation.market.proofTimeout()
|
|
await validation.subscribeSlotFilled()
|
|
await validation.restoreHistoricalState()
|
|
validation.running = validation.run()
|
|
|
|
proc stop*(validation: Validation) {.async.} =
|
|
if not validation.running.isNil and not validation.running.finished:
|
|
await validation.running.cancelAndWait()
|
|
while validation.subscriptions.len > 0:
|
|
let subscription = validation.subscriptions.pop()
|
|
await subscription.unsubscribe()
|