introduce light client processor (#3509)

Adds `LightClientProcessor` as the pendant to `BlockProcessor` while
operating in light client mode. Note that a similar mechanism based on
async futures is used for interoperability with existing infrastructure,
despite light client object validation being done synchronously.
This commit is contained in:
Etan Kissling 2022-03-17 23:26:56 +01:00 committed by GitHub
parent 9f8894fb43
commit 12dc427535
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 581 additions and 3 deletions

View File

@ -254,6 +254,16 @@ OK: 9/9 Fail: 0/9 Skip: 0/9
+ Pre-Altair OK + Pre-Altair OK
``` ```
OK: 2/3 Fail: 0/3 Skip: 1/3 OK: 2/3 Fail: 0/3 Skip: 1/3
## Light client processor [Preset: mainnet]
```diff
+ Duplicate bootstrap [Preset: mainnet] OK
+ Forced update [Preset: mainnet] OK
+ Invalid bootstrap [Preset: mainnet] OK
+ Missing bootstrap (optimistic update) [Preset: mainnet] OK
+ Missing bootstrap (update) [Preset: mainnet] OK
+ Standard sync [Preset: mainnet] OK
```
OK: 6/6 Fail: 0/6 Skip: 0/6
## ListKeys requests [Preset: mainnet] ## ListKeys requests [Preset: mainnet]
```diff ```diff
+ Correct token provided [Preset: mainnet] OK + Correct token provided [Preset: mainnet] OK
@ -519,4 +529,4 @@ OK: 1/1 Fail: 0/1 Skip: 0/1
OK: 1/1 Fail: 0/1 Skip: 0/1 OK: 1/1 Fail: 0/1 Skip: 0/1
---TOTAL--- ---TOTAL---
OK: 287/293 Fail: 0/293 Skip: 6/293 OK: 293/299 Fail: 0/299 Skip: 6/299

View File

@ -0,0 +1,280 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [Defect].}
import
stew/objects,
chronos, metrics,
../spec/datatypes/altair,
../spec/light_client_sync,
../consensus_object_pools/block_pools_types,
".."/[beacon_clock],
../sszdump
export sszdump
# Light Client Processor
# ------------------------------------------------------------------------------
# The light client processor handles received light client objects
declareHistogram light_client_store_object_duration_seconds,
"storeObject() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
type
DidInitializeStoreCallback* =
proc() {.gcsafe, raises: [Defect].}
LightClientProcessor* = object
## This manages the processing of received light client objects
##
## from:
## - Gossip (`OptimisticLightClientUpdate`)
## - SyncManager (`BestLightClientUpdatesByRange`)
## - LightClientManager (`GetLatestLightClientUpdate`,
## `GetOptimisticLightClientUpdate`, `GetLightClientBootstrap`)
##
## are then verified and added to:
## - `LightClientStore`
##
## The processor will also attempt to force-update the light client state
## if no update seems to be available on the network, that is both signed by
## a supermajority of sync committee members and also has a finality proof.
## This logic is triggered if there is no progress for an extended period
## of time, and there are repeated messages indicating that this is the best
## available data on the network during that time period.
# Config
# ----------------------------------------------------------------
dumpEnabled: bool
dumpDirInvalid: string
dumpDirIncoming: string
# Consumer
# ----------------------------------------------------------------
store: ref Option[LightClientStore]
getBeaconTime*: GetBeaconTimeFn
didInitializeStoreCallback: DidInitializeStoreCallback
cfg: RuntimeConfig
genesisValidatorsRoot: Eth2Digest
trustedBlockRoot: Eth2Digest
lastProgressTick: BeaconTime # Moment when last update made progress
lastDuplicateTick: BeaconTime # Moment when last duplicate update received
numDuplicatesSinceProgress: int # Number of duplicates since last progress
const
# These constants have been chosen empirically and are not backed by spec
duplicateRateLimit = chronos.seconds(5) # Rate limit for counting duplicates
duplicateCountDelay = chronos.minutes(15) # Delay to start counting duplicates
minForceUpdateDelay = chronos.minutes(30) # Minimum delay until forced-update
minForceUpdateDuplicates = 100 # Minimum duplicates until forced-update
# Initialization
# ------------------------------------------------------------------------------
proc new*(
T: type LightClientProcessor,
dumpEnabled: bool,
dumpDirInvalid, dumpDirIncoming: string,
cfg: RuntimeConfig,
genesisValidatorsRoot, trustedBlockRoot: Eth2Digest,
store: ref Option[LightClientStore],
getBeaconTime: GetBeaconTimeFn,
didInitializeStoreCallback: DidInitializeStoreCallback = nil
): ref LightClientProcessor =
(ref LightClientProcessor)(
dumpEnabled: dumpEnabled,
dumpDirInvalid: dumpDirInvalid,
dumpDirIncoming: dumpDirIncoming,
store: store,
getBeaconTime: getBeaconTime,
didInitializeStoreCallback: didInitializeStoreCallback,
cfg: cfg,
genesisValidatorsRoot: genesisValidatorsRoot,
trustedBlockRoot: trustedBlockRoot
)
# Storage
# ------------------------------------------------------------------------------
proc dumpInvalidObject(
self: LightClientProcessor,
obj: SomeLightClientObject) =
if self.dumpEnabled:
dump(self.dumpDirInvalid, obj)
proc dumpObject[T](
self: LightClientProcessor,
obj: SomeLightClientObject,
res: Result[T, BlockError]) =
if self.dumpEnabled and res.isErr:
case res.error
of BlockError.Invalid:
self.dumpInvalidObject(obj)
of BlockError.MissingParent:
dump(self.dumpDirIncoming, obj)
else:
discard
proc tryForceUpdate(
self: var LightClientProcessor,
wallTime: BeaconTime) =
## Try to force-update to the next sync committee period.
let
wallSlot = wallTime.slotOrZero()
store = self.store
if store[].isSome:
case store[].get.process_slot_for_light_client_store(wallSlot)
of NoUpdate:
discard
of UpdatedWithoutSupermajority:
warn "Light client force-update without supermajority",
localHeadSlot = store[].get.optimistic_header.slot,
finalized = store[].get.finalized_header
of UpdatedWithoutFinalityProof:
warn "Light client force-update without finality proof",
localHeadSlot = store[].get.optimistic_header.slot,
finalized = store[].get.finalized_header
proc storeObject*(
self: var LightClientProcessor,
src: MsgSource, wallTime: BeaconTime,
obj: SomeLightClientObject): Result[void, BlockError] =
## storeObject is the main entry point for unvalidated light client objects -
## all untrusted objects pass through here. When storing an object, we will
## update the `LightClientStore` accordingly
let
startTick = Moment.now()
wallSlot = wallTime.slotOrZero()
store = self.store
res =
when obj is altair.LightClientBootstrap:
if store[].isSome:
err(BlockError.Duplicate)
else:
let initRes = initialize_light_client_store(
self.trustedBlockRoot, obj)
if initRes.isErr:
err(initRes.error)
else:
store[] = some(initRes.get)
ok()
elif obj is altair.LightClientUpdate:
if store[].isNone:
err(BlockError.MissingParent)
else:
store[].get.process_light_client_update(
obj, wallSlot, self.cfg, self.genesisValidatorsRoot,
allowForceUpdate = false)
elif obj is altair.OptimisticLightClientUpdate:
if store[].isNone:
err(BlockError.MissingParent)
else:
store[].get.process_optimistic_light_client_update(
obj, wallSlot, self.cfg, self.genesisValidatorsRoot)
self.dumpObject(obj, res)
if res.isErr:
when obj is altair.LightClientUpdate:
if store[].isSome and store[].get.best_valid_update.isSome:
# `best_valid_update` gets set when no supermajority / finality proof
# is available. In that case, we will wait for a better update.
# If none is made available within reasonable time, the light client
# is force-updated using the best known data to ensure sync progress.
case res.error
of BlockError.Duplicate:
if wallTime >= self.lastDuplicateTick + duplicateRateLimit:
if self.numDuplicatesSinceProgress < minForceUpdateDuplicates:
let
finalized_period =
store[].get.finalized_header.slot.sync_committee_period
update_period =
obj.get_active_header().slot.sync_committee_period
is_next_sync_committee_known =
not store[].get.next_sync_committee.isZeroMemory
update_can_advance_period =
if is_next_sync_committee_known:
update_period == finalized_period + 1
else:
update_period == finalized_period
if update_can_advance_period:
self.lastDuplicateTick = wallTime
inc self.numDuplicatesSinceProgress
if self.numDuplicatesSinceProgress >= minForceUpdateDuplicates and
wallTime >= self.lastProgressTick + minForceUpdateDelay:
self.tryForceUpdate(wallTime)
self.lastProgressTick = wallTime
self.lastDuplicateTick = wallTime + duplicateCountDelay
self.numDuplicatesSinceProgress = 0
else: discard
return res
when obj is altair.LightClientBootstrap | altair.LightClientUpdate:
self.lastProgressTick = wallTime
self.lastDuplicateTick = wallTime + duplicateCountDelay
self.numDuplicatesSinceProgress = 0
let
storeObjectTick = Moment.now()
storeObjectDur = storeObjectTick - startTick
light_client_store_object_duration_seconds.observe(
storeObjectDur.toFloatSeconds())
let objSlot =
when obj is altair.LightClientBootstrap:
obj.header.slot
else:
obj.attested_header.slot
debug "Light client object processed", kind = typeof(obj).name,
localHeadSlot = store[].get.optimistic_header.slot,
objectSlot = objSlot,
storeObjectDur
when obj is altair.LightClientBootstrap:
if self.didInitializeStoreCallback != nil:
self.didInitializeStoreCallback()
self.didInitializeStoreCallback = nil
res
# Enqueue
# ------------------------------------------------------------------------------
proc addObject*(
self: var LightClientProcessor,
src: MsgSource,
obj: SomeLightClientObject,
resfut: Future[Result[void, BlockError]] = nil) =
## Enqueue a Gossip-validated light client object for verification
# Backpressure:
# Only one object is validated at any time -
# Light client objects are always "fast" to process
# Producers:
# - Gossip (`OptimisticLightClientUpdate`)
# - SyncManager (`BestLightClientUpdatesByRange`)
# - LightClientManager (`GetLatestLightClientUpdate`,
# `GetOptimisticLightClientUpdate`, `GetLightClientBootstrap`)
let
wallTime = self.getBeaconTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing light client object before genesis, clock turned back?"
quit 1
let res = self.storeObject(src, wallTime, obj)
if resFut != nil:
resFut.complete(res)

View File

@ -207,6 +207,11 @@ type
is_signed_by_next_sync_committee*: bool ##\ is_signed_by_next_sync_committee*: bool ##\
## Whether the signature was produced by `attested_header`'s next sync committee ## Whether the signature was produced by `attested_header`'s next sync committee
SomeLightClientObject* =
LightClientBootstrap |
LightClientUpdate |
OptimisticLightClientUpdate
# https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/sync-protocol.md#lightclientstore # https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/sync-protocol.md#lightclientstore
LightClientStore* = object LightClientStore* = object
finalized_header*: BeaconBlockHeader ##\ finalized_header*: BeaconBlockHeader ##\

View File

@ -45,11 +45,11 @@ func period_contains_fork_version(
false false
# https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/sync-protocol.md#get_active_header # https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/sync-protocol.md#get_active_header
func is_finality_update(update: altair.LightClientUpdate): bool = func is_finality_update*(update: altair.LightClientUpdate): bool =
not update.finalized_header.isZeroMemory not update.finalized_header.isZeroMemory
# https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/sync-protocol.md#get_active_header # https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/altair/sync-protocol.md#get_active_header
func get_active_header(update: altair.LightClientUpdate): BeaconBlockHeader = func get_active_header*(update: altair.LightClientUpdate): BeaconBlockHeader =
# The "active header" is the header that the update is trying to convince # The "active header" is the header that the update is trying to convince
# us to accept. If a finalized header is present, it's the finalized # us to accept. If a finalized header is present, it's the finalized
# header, otherwise it's the attested header # header, otherwise it's the attested header

View File

@ -13,6 +13,8 @@ import
./spec/[beaconstate, eth2_ssz_serialization, eth2_merkleization, forks], ./spec/[beaconstate, eth2_ssz_serialization, eth2_merkleization, forks],
./spec/datatypes/[phase0, altair] ./spec/datatypes/[phase0, altair]
from spec/light_client_sync import is_finality_update
export export
beaconstate, eth2_ssz_serialization, eth2_merkleization, forks beaconstate, eth2_ssz_serialization, eth2_merkleization, forks
@ -46,3 +48,38 @@ proc dump*(dir: string, v: ForkyHashedBeaconState) =
proc dump*(dir: string, v: SyncCommitteeMessage, validator: ValidatorPubKey) = proc dump*(dir: string, v: SyncCommitteeMessage, validator: ValidatorPubKey) =
logErrors: logErrors:
SSZ.saveFile(dir / &"sync-committee-msg-{v.slot}-{shortLog(validator)}.ssz", v) SSZ.saveFile(dir / &"sync-committee-msg-{v.slot}-{shortLog(validator)}.ssz", v)
proc dump*(dir: string, v: altair.LightClientBootstrap) =
logErrors:
let
prefix = "bootstrap"
slot = v.header.slot
blck = shortLog(v.header.hash_tree_root())
root = shortLog(v.hash_tree_root())
SSZ.saveFile(
dir / &"{prefix}-{slot}-{blck}-{root}.ssz", v)
proc dump*(dir: string, v: altair.LightClientUpdate) =
logErrors:
let
prefix = "update"
attestedSlot = v.attested_header.slot
attestedBlck = shortLog(v.attested_header.hash_tree_root())
suffix =
if v.is_finality_update:
"f"
else:
"o"
root = shortLog(v.hash_tree_root())
SSZ.saveFile(
dir / &"{prefix}-{attestedSlot}-{attestedBlck}-{suffix}-{root}.ssz", v)
proc dump*(dir: string, v: OptimisticLightClientUpdate) =
logErrors:
let
prefix = "optimistic-update"
attestedSlot = v.attested_header.slot
attestedBlck = shortLog(v.attested_header.hash_tree_root())
root = shortLog(v.hash_tree_root())
SSZ.saveFile(
dir / &"{prefix}-{attestedSlot}-{attestedBlck}-{root}.ssz", v)

View File

@ -32,6 +32,7 @@ import # Unit test
./test_honest_validator, ./test_honest_validator,
./test_interop, ./test_interop,
./test_light_client, ./test_light_client,
./test_light_client_processor,
./test_message_signatures, ./test_message_signatures,
./test_peer_pool, ./test_peer_pool,
./test_spec, ./test_spec,

View File

@ -0,0 +1,245 @@
# beacon_chain
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.used.}
import
# Status libraries
chronos, eth/keys,
# Beacon chain internals
../beacon_chain/consensus_object_pools/
[block_clearance, block_quarantine, blockchain_dag],
../beacon_chain/gossip_processing/light_client_processor,
../beacon_chain/spec/[beacon_time, light_client_sync, state_transition],
# Test utilities
./testutil, ./testdbutil
suite "Light client processor" & preset():
let
cfg = block:
var res = defaultRuntimeConfig
res.ALTAIR_FORK_EPOCH = GENESIS_EPOCH + 1
res
const numValidators = SLOTS_PER_EPOCH
let
validatorMonitor = newClone(ValidatorMonitor.init())
dag = ChainDAGRef.init(
cfg, makeTestDB(numValidators), validatorMonitor, {},
serveLightClientData = true,
importLightClientData = ImportLightClientData.OnlyNew)
quarantine = newClone(Quarantine.init())
taskpool = TaskPool.new()
var verifier = BatchVerifier(rng: keys.newRng(), taskpool: taskpool)
var cache: StateCache
proc addBlocks(blocks: uint64, syncCommitteeRatio: float) =
for blck in makeTestBlocks(dag.headState, cache, blocks.int,
attested = true, syncCommitteeRatio, cfg):
let added =
case blck.kind
of BeaconBlockFork.Phase0:
const nilCallback = OnPhase0BlockAdded(nil)
dag.addHeadBlock(verifier, blck.phase0Data, nilCallback)
of BeaconBlockFork.Altair:
const nilCallback = OnAltairBlockAdded(nil)
dag.addHeadBlock(verifier, blck.altairData, nilCallback)
of BeaconBlockFork.Bellatrix:
const nilCallback = OnBellatrixBlockAdded(nil)
dag.addHeadBlock(verifier, blck.bellatrixData, nilCallback)
doAssert added.isOk()
dag.updateHead(added[], quarantine[])
addBlocks(SLOTS_PER_EPOCH, 0.75)
let
genesisValidatorsRoot = dag.genesisValidatorsRoot
trustedBlockRoot = dag.head.root
const
lowPeriod = 0.SyncCommitteePeriod
lastPeriodWithSupermajority = 3.SyncCommitteePeriod
highPeriod = 5.SyncCommitteePeriod
for period in lowPeriod .. highPeriod:
const numFilledEpochsPerPeriod = 3
let slot = ((period + 1).start_epoch - numFilledEpochsPerPeriod).start_slot
var info: ForkedEpochInfo
doAssert process_slots(cfg, dag.headState, slot,
cache, info, flags = {}).isOk()
let syncCommitteeRatio =
if period > lastPeriodWithSupermajority:
0.25
else:
0.75
addBlocks(numFilledEpochsPerPeriod * SLOTS_PER_EPOCH, syncCommitteeRatio)
setup:
var time = chronos.seconds(0)
proc getBeaconTime(): BeaconTime =
BeaconTime(ns_since_genesis: time.nanoseconds)
func setTimeToSlot(slot: Slot) =
time = chronos.seconds((slot * SECONDS_PER_SLOT).int64)
var numDidInitializeStoreCalls = 0
proc didInitializeStore() = inc numDidInitializeStoreCalls
let store = (ref Option[LightClientStore])()
var
processor = LightClientProcessor.new(
false, "", "", cfg, genesisValidatorsRoot, trustedBlockRoot,
store, getBeaconTime, didInitializeStore)
res: Result[void, BlockError]
test "Standard sync" & preset():
let bootstrap = dag.getLightClientBootstrap(trustedBlockRoot)
check bootstrap.isSome
setTimeToSlot(bootstrap.get.header.slot)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isOk
numDidInitializeStoreCalls == 1
for period in lowPeriod .. lastPeriodWithSupermajority:
let update = dag.getBestLightClientUpdateForPeriod(period)
check update.isSome
setTimeToSlot(update.get.attested_header.slot + 1)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), update.get)
check:
res.isOk
store[].isSome
store[].get.finalized_header == update.get.finalized_header
store[].get.optimistic_header == update.get.attested_header
test "Forced update" & preset():
let bootstrap = dag.getLightClientBootstrap(trustedBlockRoot)
check bootstrap.isSome
setTimeToSlot(bootstrap.get.header.slot)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isOk
numDidInitializeStoreCalls == 1
for period in lowPeriod .. lastPeriodWithSupermajority:
let update = dag.getBestLightClientUpdateForPeriod(period)
check update.isSome
setTimeToSlot(update.get.attested_header.slot + 1)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), update.get)
check:
res.isOk
store[].isSome
store[].get.finalized_header == update.get.finalized_header
store[].get.optimistic_header == update.get.attested_header
for period in lastPeriodWithSupermajority + 1 .. highPeriod:
let update = dag.getBestLightClientUpdateForPeriod(period)
check update.isSome
setTimeToSlot(update.get.attested_header.slot + 1)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), update.get)
check:
res.isOk
store[].isSome
store[].get.best_valid_update.isSome
store[].get.best_valid_update.get == update.get
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), update.get)
check:
res.isErr
res.error == BlockError.Duplicate
store[].isSome
store[].get.best_valid_update.isSome
store[].get.best_valid_update.get == update.get
time += chronos.minutes(15)
for _ in 0 ..< 150:
time += chronos.seconds(5)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), update.get)
check:
res.isErr
res.error == BlockError.Duplicate
store[].isSome
store[].get.best_valid_update.isSome
store[].get.best_valid_update.get == update.get
time += chronos.minutes(15)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), update.get)
check:
res.isErr
res.error == BlockError.Duplicate
store[].isSome
store[].get.best_valid_update.isNone
store[].get.finalized_header == update.get.finalized_header
let optimisticUpdate = dag.getOptimisticLightClientUpdate()
check optimisticUpdate.isSome
setTimeToSlot(optimisticUpdate.get.attested_header.slot + 1)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), optimisticUpdate.get)
if res.isOk:
check:
store[].isSome
store[].get.optimistic_header == optimisticUpdate.get.attested_header
else:
check res.error == BlockError.Duplicate
check numDidInitializeStoreCalls == 1
test "Invalid bootstrap" & preset():
var bootstrap = dag.getLightClientBootstrap(trustedBlockRoot)
check bootstrap.isSome
bootstrap.get.header.slot.inc()
setTimeToSlot(bootstrap.get.header.slot)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isErr
res.error == BlockError.Invalid
numDidInitializeStoreCalls == 0
test "Duplicate bootstrap" & preset():
let bootstrap = dag.getLightClientBootstrap(trustedBlockRoot)
check bootstrap.isSome
setTimeToSlot(bootstrap.get.header.slot)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isOk
numDidInitializeStoreCalls == 1
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), bootstrap.get)
check:
res.isErr
res.error == BlockError.Duplicate
numDidInitializeStoreCalls == 1
test "Missing bootstrap (update)" & preset():
let update = dag.getBestLightClientUpdateForPeriod(lowPeriod)
check update.isSome
setTimeToSlot(update.get.attested_header.slot + 1)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), update.get)
check:
res.isErr
res.error == BlockError.MissingParent
numDidInitializeStoreCalls == 0
test "Missing bootstrap (optimistic update)" & preset():
let optimisticUpdate = dag.getOptimisticLightClientUpdate()
check optimisticUpdate.isSome
setTimeToSlot(optimisticUpdate.get.attested_header.slot + 1)
res = processor[].storeObject(
MsgSource.gossip, getBeaconTime(), optimisticUpdate.get)
check:
res.isErr
res.error == BlockError.MissingParent
numDidInitializeStoreCalls == 0