mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-01-10 22:36:01 +00:00
REST /eth/v1/events API call implementation. (#2878)
* Placing callbacks into strategic places. * Initial events call implementation. * Post rebase fixes. * Change addSyncContribution() implementation. * Add `attestation-sent` event. Remove gcsafe, raises from callbacks implementations. Move `attestation-received` fire at the end of attestation processing. * Address review comments.
This commit is contained in:
parent
0b21ebfe74
commit
b566d4657f
@ -56,6 +56,7 @@ type
|
||||
eth1Monitor*: Eth1Monitor
|
||||
rpcServer*: RpcServer
|
||||
restServer*: RestServerRef
|
||||
eventBus*: AsyncEventBus
|
||||
vcProcess*: Process
|
||||
requestManager*: RequestManager
|
||||
syncManager*: SyncManager[Peer, PeerID]
|
||||
@ -68,6 +69,7 @@ type
|
||||
gossipState*: GossipState
|
||||
beaconClock*: BeaconClock
|
||||
taskpool*: TaskPoolPtr
|
||||
onAttestationSent*: OnAttestationCallback
|
||||
|
||||
const
|
||||
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
|
||||
|
@ -29,6 +29,8 @@ type
|
||||
# Attestation Pool
|
||||
#
|
||||
# #############################################
|
||||
OnAttestationCallback* = proc(data: Attestation) {.gcsafe, raises: [Defect].}
|
||||
|
||||
Validation* = object
|
||||
## Validations collect a set of signatures for a distict attestation - in
|
||||
## eth2, a single bit is used to keep track of which signatures have been
|
||||
@ -75,6 +77,8 @@ type
|
||||
nextAttestationEpoch*: seq[tuple[subnet: Epoch, aggregate: Epoch]] ## \
|
||||
## sequence based on validator indices
|
||||
|
||||
onAttestationAdded*: OnAttestationCallback
|
||||
|
||||
SyncCommitteeMsgKey* = object
|
||||
originator*: ValidatorIndex
|
||||
slot*: Slot
|
||||
@ -96,14 +100,21 @@ type
|
||||
subnets*: array[SYNC_COMMITTEE_SUBNET_COUNT,
|
||||
BestSyncSubcommitteeContribution]
|
||||
|
||||
OnSyncContributionCallback* =
|
||||
proc(data: SignedContributionAndProof) {.gcsafe, raises: [Defect].}
|
||||
|
||||
SyncCommitteeMsgPool* = object
|
||||
seenSyncMsgByAuthor*: HashSet[SyncCommitteeMsgKey]
|
||||
seenContributionByAuthor*: HashSet[SyncCommitteeMsgKey]
|
||||
syncMessages*: Table[Eth2Digest, seq[TrustedSyncCommitteeMsg]]
|
||||
bestContributions*: Table[Eth2Digest, BestSyncSubcommitteeContributions]
|
||||
onContributionReceived*: OnSyncContributionCallback
|
||||
|
||||
SyncCommitteeMsgPoolRef* = ref SyncCommitteeMsgPool
|
||||
|
||||
OnVoluntaryExitCallback* =
|
||||
proc(data: SignedVoluntaryExit) {.gcsafe, raises: [Defect].}
|
||||
|
||||
ExitPool* = object
|
||||
## The exit pool tracks attester slashings, proposer slashings, and
|
||||
## voluntary exits that could be added to a proposed block.
|
||||
@ -128,6 +139,8 @@ type
|
||||
|
||||
dag*: ChainDAGRef
|
||||
|
||||
onVoluntaryExitReceived*: OnVoluntaryExitCallback
|
||||
|
||||
# #############################################
|
||||
#
|
||||
# Validator Pool
|
||||
|
@ -27,7 +27,9 @@ logScope: topics = "attpool"
|
||||
declareGauge attestation_pool_block_attestation_packing_time,
|
||||
"Time it took to create list of attestations for block"
|
||||
|
||||
proc init*(T: type AttestationPool, dag: ChainDAGRef, quarantine: QuarantineRef): T =
|
||||
proc init*(T: type AttestationPool, dag: ChainDAGRef,
|
||||
quarantine: QuarantineRef,
|
||||
onAttestation: OnAttestationCallback = nil): T =
|
||||
## Initialize an AttestationPool from the dag `headState`
|
||||
## The `finalized_root` works around the finalized_checkpoint of the genesis block
|
||||
## holding a zero_root.
|
||||
@ -92,7 +94,8 @@ proc init*(T: type AttestationPool, dag: ChainDAGRef, quarantine: QuarantineRef)
|
||||
T(
|
||||
dag: dag,
|
||||
quarantine: quarantine,
|
||||
forkChoice: forkChoice
|
||||
forkChoice: forkChoice,
|
||||
onAttestationAdded: onAttestation
|
||||
)
|
||||
|
||||
proc addForkChoiceVotes(
|
||||
@ -307,6 +310,10 @@ proc addAttestation*(pool: var AttestationPool,
|
||||
attestation.data.slot, attesting_indices,
|
||||
attestation.data.beacon_block_root, wallSlot)
|
||||
|
||||
# Send notification about new attestation via callback.
|
||||
if not(isNil(pool.onAttestationAdded)):
|
||||
pool.onAttestationAdded(attestation)
|
||||
|
||||
proc addForkChoice*(pool: var AttestationPool,
|
||||
epochRef: EpochRef,
|
||||
blckRef: BlockRef,
|
||||
|
@ -180,6 +180,8 @@ proc addResolvedBlock(
|
||||
# notifications for parents happens before those of the children
|
||||
if onBlockAdded != nil:
|
||||
onBlockAdded(blockRef, trustedBlock, epochRef)
|
||||
if not(isNil(dag.onBlockAdded)):
|
||||
dag.onBlockAdded(ForkedTrustedSignedBeaconBlock.init(trustedBlock))
|
||||
|
||||
resolveQuarantinedBlocks(dag, quarantine, onBlockAdded)
|
||||
|
||||
|
@ -48,6 +48,15 @@ type
|
||||
Old
|
||||
Duplicate
|
||||
|
||||
OnBlockCallback* =
|
||||
proc(data: ForkedTrustedSignedBeaconBlock) {.gcsafe, raises: [Defect].}
|
||||
OnHeadCallback* =
|
||||
proc(data: HeadChangeInfoObject) {.gcsafe, raises: [Defect].}
|
||||
OnReorgCallback* =
|
||||
proc(data: ReorgInfoObject) {.gcsafe, raises: [Defect].}
|
||||
OnFinalizedCallback* =
|
||||
proc(data: FinalizationInfoObject) {.gcsafe, raises: [Defect].}
|
||||
|
||||
QuarantineRef* = ref object
|
||||
## Keeps track of unsafe blocks coming from the network
|
||||
## and that cannot be added to the chain
|
||||
@ -175,6 +184,15 @@ type
|
||||
## value with other components which don't have access to the
|
||||
## full ChainDAG.
|
||||
|
||||
onBlockAdded*: OnBlockCallback
|
||||
## On block added callback
|
||||
onHeadChanged*: OnHeadCallback
|
||||
## On head changed callback
|
||||
onReorgHappened*: OnReorgCallback
|
||||
## On beacon chain reorganization
|
||||
onFinHappened*: OnFinalizedCallback
|
||||
## On finalization callback
|
||||
|
||||
EpochKey* = object
|
||||
## The epoch key fully determines the shuffling for proposers and
|
||||
## committees in a beacon state - the epoch level information in the state
|
||||
@ -242,6 +260,27 @@ type
|
||||
blck: altair.TrustedSignedBeaconBlock,
|
||||
epochRef: EpochRef) {.gcsafe, raises: [Defect].}
|
||||
|
||||
HeadChangeInfoObject* = object
|
||||
slot*: Slot
|
||||
block_root*: Eth2Digest
|
||||
state_root*: Eth2Digest
|
||||
epoch_transition*: bool
|
||||
previous_duty_dependent_root*: Eth2Digest
|
||||
current_duty_depenedent_root*: Eth2Digest
|
||||
|
||||
ReorgInfoObject* = object
|
||||
slot*: Slot
|
||||
depth*: uint64
|
||||
old_head_block_root*: Eth2Digest
|
||||
new_head_block_root*: Eth2Digest
|
||||
old_head_state_root*: Eth2Digest
|
||||
new_head_state_root*: Eth2Digest
|
||||
|
||||
FinalizationInfoObject* = object
|
||||
block_root*: Eth2Digest
|
||||
state_root*: Eth2Digest
|
||||
epoch*: Epoch
|
||||
|
||||
template head*(dag: ChainDAGRef): BlockRef = dag.headState.blck
|
||||
|
||||
template epoch*(e: EpochRef): Epoch = e.key.epoch
|
||||
@ -293,3 +332,37 @@ func init*(T: type KeyedBlockRef, blck: BlockRef): KeyedBlockRef =
|
||||
|
||||
func blockRef*(key: KeyedBlockRef): BlockRef =
|
||||
key.data
|
||||
|
||||
func init*(t: typedesc[HeadChangeInfoObject], slot: Slot, blockRoot: Eth2Digest,
|
||||
stateRoot: Eth2Digest, epochTransition: bool,
|
||||
previousDutyDepRoot: Eth2Digest,
|
||||
currentDutyDepRoot: Eth2Digest): HeadChangeInfoObject =
|
||||
HeadChangeInfoObject(
|
||||
slot: slot,
|
||||
block_root: blockRoot,
|
||||
state_root: stateRoot,
|
||||
epoch_transition: epochTransition,
|
||||
previous_duty_dependent_root: previousDutyDepRoot,
|
||||
current_duty_depenedent_root: currentDutyDepRoot
|
||||
)
|
||||
|
||||
func init*(t: typedesc[ReorgInfoObject], slot: Slot, depth: uint64,
|
||||
oldHeadBlockRoot: Eth2Digest, newHeadBlockRoot: Eth2Digest,
|
||||
oldHeadStateRoot: Eth2Digest,
|
||||
newHeadStateRoot: Eth2Digest): ReorgInfoObject =
|
||||
ReorgInfoObject(
|
||||
slot: slot,
|
||||
depth: depth,
|
||||
old_head_block_root: oldHeadBlockRoot,
|
||||
new_head_block_root: newHeadBlockRoot,
|
||||
old_head_state_root: oldHeadStateRoot,
|
||||
new_head_state_root: newHeadStateRoot
|
||||
)
|
||||
|
||||
func init*(t: typedesc[FinalizationInfoObject], blockRoot: Eth2Digest,
|
||||
stateRoot: Eth2Digest, epoch: Epoch): FinalizationInfoObject =
|
||||
FinalizationInfoObject(
|
||||
block_root: blockRoot,
|
||||
state_root: stateRoot,
|
||||
epoch: epoch
|
||||
)
|
||||
|
@ -187,12 +187,13 @@ func link*(parent, child: BlockRef) =
|
||||
|
||||
child.parent = parent
|
||||
|
||||
func isAncestorOf*(a, b: BlockRef): bool =
|
||||
func getDepth*(a, b: BlockRef): tuple[ancestor: bool, depth: int] =
|
||||
var b = b
|
||||
var depth = 0
|
||||
const maxDepth = (100'i64 * 365 * 24 * 60 * 60 div SECONDS_PER_SLOT.int)
|
||||
while true:
|
||||
if a == b: return true
|
||||
if a == b:
|
||||
return (true, depth)
|
||||
|
||||
# for now, use an assert for block chain length since a chain this long
|
||||
# indicates a circular reference here..
|
||||
@ -200,11 +201,15 @@ func isAncestorOf*(a, b: BlockRef): bool =
|
||||
depth += 1
|
||||
|
||||
if a.slot >= b.slot or b.parent.isNil:
|
||||
return false
|
||||
return (false, depth)
|
||||
|
||||
doAssert b.slot > b.parent.slot
|
||||
b = b.parent
|
||||
|
||||
func isAncestorOf*(a, b: BlockRef): bool =
|
||||
let (isAncestor, _) = getDepth(a, b)
|
||||
isAncestor
|
||||
|
||||
func get_ancestor*(blck: BlockRef, slot: Slot,
|
||||
maxDepth = 100'i64 * 365 * 24 * 60 * 60 div SECONDS_PER_SLOT.int):
|
||||
BlockRef =
|
||||
@ -325,10 +330,10 @@ func isStateCheckpoint(bs: BlockSlot): bool =
|
||||
(bs.slot == bs.blck.slot and bs.blck.parent == nil) or
|
||||
(bs.slot.isEpoch and bs.slot.epoch == (bs.blck.slot.epoch + 1))
|
||||
|
||||
proc init*(T: type ChainDAGRef,
|
||||
cfg: RuntimeConfig,
|
||||
db: BeaconChainDB,
|
||||
updateFlags: UpdateFlags): ChainDAGRef =
|
||||
proc init*(T: type ChainDAGRef, cfg: RuntimeConfig, db: BeaconChainDB,
|
||||
updateFlags: UpdateFlags, onBlockCb: OnBlockCallback = nil,
|
||||
onHeadCb: OnHeadCallback = nil, onReorgCb: OnReorgCallback = nil,
|
||||
onFinCb: OnFinalizedCallback = nil): ChainDAGRef =
|
||||
# TODO we require that the db contains both a head and a tail block -
|
||||
# asserting here doesn't seem like the right way to go about it however..
|
||||
|
||||
@ -452,6 +457,11 @@ proc init*(T: type ChainDAGRef,
|
||||
# allow skipping some validation.
|
||||
updateFlags: {verifyFinalization} * updateFlags,
|
||||
cfg: cfg,
|
||||
|
||||
onBlockAdded: onBlockCb,
|
||||
onHeadChanged: onHeadCb,
|
||||
onReorgHappened: onReorgCb,
|
||||
onFinHappened: onFinCb
|
||||
)
|
||||
|
||||
doAssert cfg.GENESIS_FORK_VERSION != cfg.ALTAIR_FORK_VERSION
|
||||
@ -1156,11 +1166,11 @@ proc updateHead*(
|
||||
|
||||
if dag.head == newHead:
|
||||
trace "No head block update"
|
||||
|
||||
return
|
||||
|
||||
let
|
||||
lastHead = dag.head
|
||||
lastHeadStateRoot = getStateRoot(dag.headState.data)
|
||||
|
||||
# Start off by making sure we have the right state - updateStateData will try
|
||||
# to use existing in-memory states to make this smooth
|
||||
@ -1177,7 +1187,8 @@ proc updateHead*(
|
||||
doAssert (not finalizedHead.blck.isNil),
|
||||
"Block graph should always lead to a finalized block"
|
||||
|
||||
if not lastHead.isAncestorOf(newHead):
|
||||
let (isAncestor, ancestorDepth) = lastHead.getDepth(newHead)
|
||||
if not(isAncestor):
|
||||
notice "Updated head block with chain reorg",
|
||||
lastHead = shortLog(lastHead),
|
||||
headParent = shortLog(newHead.parent),
|
||||
@ -1189,6 +1200,13 @@ proc updateHead*(
|
||||
finalized = shortLog(getStateField(
|
||||
dag.headState.data, finalized_checkpoint))
|
||||
|
||||
if not(isNil(dag.onReorgHappened)):
|
||||
let data = ReorgInfoObject.init(dag.head.slot, uint64(ancestorDepth),
|
||||
lastHead.root, newHead.root,
|
||||
lastHeadStateRoot,
|
||||
getStateRoot(dag.headState.data))
|
||||
dag.onReorgHappened(data)
|
||||
|
||||
# A reasonable criterion for "reorganizations of the chain"
|
||||
quarantine.clearQuarantine()
|
||||
beacon_reorgs_total.inc()
|
||||
@ -1202,6 +1220,28 @@ proc updateHead*(
|
||||
finalized = shortLog(getStateField(
|
||||
dag.headState.data, finalized_checkpoint))
|
||||
|
||||
if not(isNil(dag.onHeadChanged)):
|
||||
let currentEpoch = epoch(newHead.slot)
|
||||
let
|
||||
currentDutyDepRoot =
|
||||
if currentEpoch > Epoch(0):
|
||||
dag.head.atSlot(
|
||||
compute_start_slot_at_epoch(currentEpoch) - 1).blck.root
|
||||
else:
|
||||
dag.genesis.root
|
||||
previousDutyDepRoot =
|
||||
if currentEpoch > Epoch(1):
|
||||
dag.head.atSlot(
|
||||
compute_start_slot_at_epoch(currentEpoch - 1) - 1).blck.root
|
||||
else:
|
||||
dag.genesis.root
|
||||
epochTransition = (finalizedHead != dag.finalizedHead)
|
||||
let data = HeadChangeInfoObject.init(dag.head.slot, dag.head.root,
|
||||
getStateRoot(dag.headState.data),
|
||||
epochTransition, previousDutyDepRoot,
|
||||
currentDutyDepRoot)
|
||||
dag.onHeadChanged(data)
|
||||
|
||||
# https://github.com/ethereum/eth2.0-metrics/blob/master/metrics.md#additional-metrics
|
||||
# both non-negative, so difference can't overflow or underflow int64
|
||||
beacon_pending_deposits.set(
|
||||
@ -1251,6 +1291,17 @@ proc updateHead*(
|
||||
# therefore no longer be considered as part of the chain we're following
|
||||
dag.pruneBlocksDAG()
|
||||
|
||||
# Send notification about new finalization point via callback.
|
||||
if not(isNil(dag.onFinHappened)):
|
||||
let epoch = getStateField(
|
||||
dag.headState.data, finalized_checkpoint).epoch
|
||||
let blckRoot = getStateField(
|
||||
dag.headState.data, finalized_checkpoint).root
|
||||
let data = FinalizationInfoObject.init(blckRoot,
|
||||
getStateRoot(dag.headState.data),
|
||||
epoch)
|
||||
dag.onFinHappened(data)
|
||||
|
||||
proc isInitialized*(T: type ChainDAGRef, db: BeaconChainDB): bool =
|
||||
let
|
||||
headBlockRoot = db.getHeadBlock()
|
||||
|
@ -27,8 +27,8 @@ const
|
||||
PROPOSER_SLASHINGS_BOUND* = MAX_PROPOSER_SLASHINGS * 2
|
||||
VOLUNTARY_EXITS_BOUND* = MAX_VOLUNTARY_EXITS * 2
|
||||
|
||||
proc init*(
|
||||
T: type ExitPool, dag: ChainDAGRef): T =
|
||||
proc init*(T: type ExitPool, dag: ChainDAGRef,
|
||||
onVoluntaryExit: OnVoluntaryExitCallback = nil): T =
|
||||
## Initialize an ExitPool from the dag `headState`
|
||||
T(
|
||||
# Allow for filtering out some exit messages during block production
|
||||
@ -39,6 +39,7 @@ proc init*(
|
||||
voluntary_exits:
|
||||
initDeque[SignedVoluntaryExit](initialSize = VOLUNTARY_EXITS_BOUND.int),
|
||||
dag: dag,
|
||||
onVoluntaryExitReceived: onVoluntaryExit
|
||||
)
|
||||
|
||||
func addExitMessage*(subpool: var auto, exitMessage, bound: auto) =
|
||||
|
@ -29,8 +29,10 @@ const
|
||||
## How many slots to retain sync committee
|
||||
## messsages before discarding them.
|
||||
|
||||
func init*(T: type SyncCommitteeMsgPool): SyncCommitteeMsgPool =
|
||||
discard
|
||||
func init*(T: type SyncCommitteeMsgPool,
|
||||
onSyncContribution: OnSyncContributionCallback = nil
|
||||
): SyncCommitteeMsgPool =
|
||||
T(onContributionReceived: onSyncContribution)
|
||||
|
||||
func init(T: type SyncAggregate): SyncAggregate =
|
||||
SyncAggregate(sync_committee_signature: ValidatorSig.infinity)
|
||||
@ -131,10 +133,9 @@ func addAggregateAux(bestVotes: var BestSyncSubcommitteeContributions,
|
||||
participationBits: contribution.aggregation_bits,
|
||||
signature: contribution.signature.load.get)
|
||||
|
||||
func addSyncContribution*(
|
||||
pool: var SyncCommitteeMsgPool,
|
||||
contribution: SyncCommitteeContribution,
|
||||
signature: CookedSig) =
|
||||
proc addSyncContribution*(pool: var SyncCommitteeMsgPool,
|
||||
contribution: SyncCommitteeContribution,
|
||||
signature: CookedSig) =
|
||||
|
||||
template blockRoot: auto = contribution.beacon_block_root
|
||||
|
||||
@ -156,6 +157,13 @@ func addSyncContribution*(
|
||||
except KeyError:
|
||||
raiseAssert "We have checked for the key upfront"
|
||||
|
||||
proc addSyncContribution*(pool: var SyncCommitteeMsgPool,
|
||||
scproof: SignedContributionAndProof,
|
||||
signature: CookedSig) =
|
||||
pool.addSyncContribution(scproof.message.contribution, signature)
|
||||
if not(isNil(pool.onContributionReceived)):
|
||||
pool.onContributionReceived(scproof)
|
||||
|
||||
proc produceSyncAggregateAux(
|
||||
bestContributions: BestSyncSubcommitteeContributions): SyncAggregate =
|
||||
var
|
||||
|
@ -686,7 +686,6 @@ proc validateAttesterSlashing*(
|
||||
pool.prior_seen_attester_slashed_indices.incl attester_slashed_indices
|
||||
pool.attester_slashings.addExitMessage(
|
||||
attester_slashing, ATTESTER_SLASHINGS_BOUND)
|
||||
|
||||
ok(true)
|
||||
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#proposer_slashing
|
||||
@ -715,7 +714,6 @@ proc validateProposerSlashing*(
|
||||
proposer_slashing.signed_header_1.message.proposer_index.int)
|
||||
pool.proposer_slashings.addExitMessage(
|
||||
proposer_slashing, PROPOSER_SLASHINGS_BOUND)
|
||||
|
||||
ok(true)
|
||||
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.0.1/specs/phase0/p2p-interface.md#voluntary_exit
|
||||
@ -748,6 +746,10 @@ proc validateVoluntaryExit*(
|
||||
pool.voluntary_exits.addExitMessage(
|
||||
signed_voluntary_exit, VOLUNTARY_EXITS_BOUND)
|
||||
|
||||
# Send notification about new voluntary exit via callback
|
||||
if not(isNil(pool.onVoluntaryExitReceived)):
|
||||
pool.onVoluntaryExitReceived(signed_voluntary_exit)
|
||||
|
||||
ok()
|
||||
|
||||
# https://github.com/ethereum/eth2.0-specs/blob/v1.1.0-alpha.8/specs/altair/p2p-interface.md#sync_committee_subnet_id
|
||||
@ -946,8 +948,6 @@ proc validateSignedContributionAndProof*(
|
||||
return errReject(
|
||||
"SignedContributionAndProof: aggregate signature fails to verify")
|
||||
|
||||
syncCommitteeMsgPool[].addSyncContribution(
|
||||
msg.message.contribution,
|
||||
cookedSignature.get)
|
||||
syncCommitteeMsgPool[].addSyncContribution(msg, cookedSignature.get)
|
||||
|
||||
ok()
|
||||
|
@ -127,12 +127,30 @@ proc init*(T: type BeaconNode,
|
||||
raise newException(Defect, "Failure in taskpool initialization.")
|
||||
|
||||
let
|
||||
eventBus = newAsyncEventBus()
|
||||
db = BeaconChainDB.new(config.databaseDir, inMemory = false)
|
||||
|
||||
var
|
||||
genesisState, checkpointState: ref phase0.BeaconState
|
||||
checkpointBlock: phase0.TrustedSignedBeaconBlock
|
||||
|
||||
proc onAttestationReceived(data: Attestation) =
|
||||
eventBus.emit("attestation-received", data)
|
||||
proc onAttestationSent(data: Attestation) =
|
||||
eventBus.emit("attestation-sent", data)
|
||||
proc onVoluntaryExitAdded(data: SignedVoluntaryExit) =
|
||||
eventBus.emit("voluntary-exit", data)
|
||||
proc onBlockAdded(data: ForkedTrustedSignedBeaconBlock) =
|
||||
eventBus.emit("signed-beacon-block", data)
|
||||
proc onHeadChanged(data: HeadChangeInfoObject) =
|
||||
eventBus.emit("head-change", data)
|
||||
proc onChainReorg(data: ReorgInfoObject) =
|
||||
eventBus.emit("chain-reorg", data)
|
||||
proc onFinalization(data: FinalizationInfoObject) =
|
||||
eventBus.emit("finalization", data)
|
||||
proc onSyncContribution(data: SignedContributionAndProof) =
|
||||
eventBus.emit("sync-contribution-and-proof", data)
|
||||
|
||||
if config.finalizedCheckpointState.isSome:
|
||||
let checkpointStatePath = config.finalizedCheckpointState.get.string
|
||||
checkpointState = try:
|
||||
@ -255,7 +273,8 @@ proc init*(T: type BeaconNode,
|
||||
let
|
||||
chainDagFlags = if config.verifyFinalization: {verifyFinalization}
|
||||
else: {}
|
||||
dag = ChainDAGRef.init(cfg, db, chainDagFlags)
|
||||
dag = ChainDAGRef.init(cfg, db, chainDagFlags, onBlockAdded, onHeadChanged,
|
||||
onChainReorg, onFinalization)
|
||||
quarantine = QuarantineRef.init(rng, taskpool)
|
||||
databaseGenesisValidatorsRoot =
|
||||
getStateField(dag.headState.data, genesis_validators_root)
|
||||
@ -330,9 +349,13 @@ proc init*(T: type BeaconNode,
|
||||
network = createEth2Node(
|
||||
rng, config, netKeys, cfg, dag.forkDigests, getBeaconTime,
|
||||
getStateField(dag.headState.data, genesis_validators_root))
|
||||
attestationPool = newClone(AttestationPool.init(dag, quarantine))
|
||||
syncCommitteeMsgPool = newClone(SyncCommitteeMsgPool.init())
|
||||
exitPool = newClone(ExitPool.init(dag))
|
||||
attestationPool = newClone(
|
||||
AttestationPool.init(dag, quarantine, onAttestationReceived)
|
||||
)
|
||||
syncCommitteeMsgPool = newClone(
|
||||
SyncCommitteeMsgPool.init(onSyncContribution)
|
||||
)
|
||||
exitPool = newClone(ExitPool.init(dag, onVoluntaryExitAdded))
|
||||
|
||||
case config.slashingDbKind
|
||||
of SlashingDbKind.v2:
|
||||
@ -382,12 +405,14 @@ proc init*(T: type BeaconNode,
|
||||
eth1Monitor: eth1Monitor,
|
||||
rpcServer: rpcServer,
|
||||
restServer: restServer,
|
||||
eventBus: eventBus,
|
||||
processor: processor,
|
||||
blockProcessor: blockProcessor,
|
||||
consensusManager: consensusManager,
|
||||
requestManager: RequestManager.init(network, blockProcessor),
|
||||
beaconClock: beaconClock,
|
||||
taskpool: taskpool
|
||||
taskpool: taskpool,
|
||||
onAttestationSent: onAttestationSent
|
||||
)
|
||||
|
||||
# set topic validation routine
|
||||
|
@ -42,17 +42,59 @@ proc validateEventTopics(events: seq[EventTopic]): Result[EventTopics,
|
||||
if EventTopic.ChainReorg in res:
|
||||
return err(NonUniqueError)
|
||||
res.incl(EventTopic.ChainReorg)
|
||||
of EventTopic.ContributionAndProof:
|
||||
if EventTopic.ContributionAndProof in res:
|
||||
return err(NonUniqueError)
|
||||
res.incl(EventTopic.ContributionAndProof)
|
||||
if res == {}:
|
||||
err("Empty topics list")
|
||||
else:
|
||||
ok(res)
|
||||
|
||||
proc eventHandler*(response: HttpResponseRef, node: BeaconNode,
|
||||
T: typedesc, event: string,
|
||||
serverEvent: string) {.async.} =
|
||||
var fut = node.eventBus.waitEvent(T, event)
|
||||
while true:
|
||||
let jsonRes =
|
||||
try:
|
||||
let res = await fut
|
||||
when T is ForkedTrustedSignedBeaconBlock:
|
||||
let blockInfo = RestBlockInfo.init(res)
|
||||
some(RestApiResponse.prepareJsonStringResponse(blockInfo))
|
||||
else:
|
||||
some(RestApiResponse.prepareJsonStringResponse(res))
|
||||
except CancelledError:
|
||||
none[string]()
|
||||
if jsonRes.isNone() or (response.state != HttpResponseState.Sending):
|
||||
# Cancellation happened or connection with remote peer has been lost.
|
||||
break
|
||||
# Initiating new event waiting to avoid race conditions and event misses.
|
||||
fut = node.eventBus.waitEvent(T, event)
|
||||
# Sending event and payload over wire.
|
||||
let exitLoop =
|
||||
try:
|
||||
await response.sendEvent(serverEvent, jsonRes.get())
|
||||
false
|
||||
except CancelledError:
|
||||
true
|
||||
except HttpError as exc:
|
||||
debug "Unable to deliver event to remote peer", error_name = $exc.name,
|
||||
error_msg = $exc.msg
|
||||
true
|
||||
except CatchableError as exc:
|
||||
debug "Unexpected error encountered", error_name = $exc.name,
|
||||
error_msg = $exc.msg
|
||||
true
|
||||
if exitLoop:
|
||||
if not(fut.finished()):
|
||||
await fut.cancelAndWait()
|
||||
break
|
||||
|
||||
proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
||||
# https://ethereum.github.io/beacon-APIs/#/Events/eventstream
|
||||
router.api(MethodGet, "/api/eth/v1/events") do (
|
||||
topics: seq[EventTopic]) -> RestApiResponse:
|
||||
# TODO (cheatfate): This call is not fully implemented yet, because there
|
||||
# missing infrastructure to raise/catch global events (eventbus).
|
||||
let eventTopics =
|
||||
block:
|
||||
if topics.isErr():
|
||||
@ -64,7 +106,72 @@ proc installEventApiHandlers*(router: var RestRouter, node: BeaconNode) =
|
||||
$res.error())
|
||||
res.get()
|
||||
|
||||
return RestApiResponse.jsonError(Http500, "Not implemented yet")
|
||||
let res = preferredContentType("text/event-stream")
|
||||
if res.isErr():
|
||||
return RestApiResponse.jsonError(Http406, ContentNotAcceptableError)
|
||||
if res.get() != "text/event-stream":
|
||||
return RestApiResponse.jsonError(Http500, InvalidAcceptError)
|
||||
|
||||
var response = request.getResponse()
|
||||
response.keepAlive = false
|
||||
try:
|
||||
await response.prepareSSE()
|
||||
except HttpError:
|
||||
# It means that server failed to send HTTP response to the remote client
|
||||
# so there no need to respond with HTTP error response.
|
||||
return
|
||||
|
||||
let handlers =
|
||||
block:
|
||||
var res: seq[Future[void]]
|
||||
if EventTopic.Head in eventTopics:
|
||||
let handler = response.eventHandler(node, HeadChangeInfoObject,
|
||||
"head-change", "head")
|
||||
res.add(handler)
|
||||
if EventTopic.Block in eventTopics:
|
||||
let handler = response.eventHandler(node,
|
||||
ForkedTrustedSignedBeaconBlock,
|
||||
"signed-beacon-block", "block")
|
||||
res.add(handler)
|
||||
if EventTopic.Attestation in eventTopics:
|
||||
let handler = response.eventHandler(node, Attestation,
|
||||
"attestation-received",
|
||||
"attestation")
|
||||
res.add(handler)
|
||||
if EventTopic.VoluntaryExit in eventTopics:
|
||||
let handler = response.eventHandler(node, SignedVoluntaryExit,
|
||||
"voluntary-exit",
|
||||
"voluntary_exit")
|
||||
res.add(handler)
|
||||
if EventTopic.FinalizedCheckpoint in eventTopics:
|
||||
let handler = response.eventHandler(node, FinalizationInfoObject,
|
||||
"finalization",
|
||||
"finalized_checkpoint")
|
||||
res.add(handler)
|
||||
if EventTopic.ChainReorg in eventTopics:
|
||||
let handler = response.eventHandler(node, ReorgInfoObject,
|
||||
"chain-reorg", "chain_reorg")
|
||||
res.add(handler)
|
||||
if EventTopic.ContributionAndProof in eventTopics:
|
||||
let handler = response.eventHandler(node, SignedContributionAndProof,
|
||||
"sync-contribution-and-proof",
|
||||
"contribution_and_proof")
|
||||
res.add(handler)
|
||||
res
|
||||
|
||||
discard await one(handlers)
|
||||
# One of the handlers finished, it means that connection has been droped, so
|
||||
# we cancelling all other handlers.
|
||||
let pending =
|
||||
block:
|
||||
var res: seq[Future[void]]
|
||||
for fut in handlers:
|
||||
if not(fut.finished()):
|
||||
fut.cancel()
|
||||
res.add(fut)
|
||||
res
|
||||
await allFutures(pending)
|
||||
return
|
||||
|
||||
router.redirect(
|
||||
MethodGet,
|
||||
|
@ -85,6 +85,21 @@ proc prepareJsonResponse*(t: typedesc[RestApiResponse], d: auto): seq[byte] =
|
||||
default
|
||||
res
|
||||
|
||||
proc prepareJsonStringResponse*(t: typedesc[RestApiResponse], d: auto): string =
|
||||
let res =
|
||||
block:
|
||||
var default: string
|
||||
try:
|
||||
var stream = memoryOutput()
|
||||
var writer = JsonWriter[RestJson].init(stream)
|
||||
writer.writeValue(d)
|
||||
stream.getOutput(string)
|
||||
except SerializationError:
|
||||
default
|
||||
except IOError:
|
||||
default
|
||||
res
|
||||
|
||||
proc jsonResponseWRoot*(t: typedesc[RestApiResponse], data: auto,
|
||||
dependent_root: Eth2Digest): RestApiResponse =
|
||||
let res =
|
||||
@ -983,6 +998,8 @@ proc decodeString*(t: typedesc[EventTopic],
|
||||
ok(EventTopic.FinalizedCheckpoint)
|
||||
of "chain_reorg":
|
||||
ok(EventTopic.ChainReorg)
|
||||
of "contribution_and_proof":
|
||||
ok(EventTopic.ContributionAndProof)
|
||||
else:
|
||||
err("Incorrect event's topic value")
|
||||
|
||||
|
@ -27,7 +27,8 @@ const
|
||||
|
||||
type
|
||||
EventTopic* {.pure.} = enum
|
||||
Head, Block, Attestation, VoluntaryExit, FinalizedCheckpoint, ChainReorg
|
||||
Head, Block, Attestation, VoluntaryExit, FinalizedCheckpoint, ChainReorg,
|
||||
ContributionAndProof
|
||||
|
||||
EventTopics* = set[EventTopic]
|
||||
|
||||
@ -256,6 +257,10 @@ type
|
||||
chain_id*: string
|
||||
address*: string
|
||||
|
||||
RestBlockInfo* = object
|
||||
slot*: Slot
|
||||
blck* {.serializedFieldName: "block".}: Eth2Digest
|
||||
|
||||
DataEnclosedObject*[T] = object
|
||||
data*: T
|
||||
|
||||
@ -327,3 +332,7 @@ func init*(t: typedesc[ValidatorIdent], v: ValidatorIndex): ValidatorIdent =
|
||||
|
||||
func init*(t: typedesc[ValidatorIdent], v: ValidatorPubKey): ValidatorIdent =
|
||||
ValidatorIdent(kind: ValidatorQueryKind.Key, key: v)
|
||||
|
||||
func init*(t: typedesc[RestBlockInfo],
|
||||
v: ForkedTrustedSignedBeaconBlock): RestBlockInfo =
|
||||
RestBlockInfo(slot: v.slot(), blck: v.root())
|
||||
|
@ -189,6 +189,8 @@ proc sendAttestation*(
|
||||
of ValidationResult.Accept:
|
||||
node.network.broadcastAttestation(subnet_id, attestation)
|
||||
beacon_attestations_sent.inc()
|
||||
if not(isNil(node.onAttestationSent)):
|
||||
node.onAttestationSent(attestation)
|
||||
true
|
||||
else:
|
||||
notice "Produced attestation failed validation",
|
||||
|
2
vendor/nim-chronos
vendored
2
vendor/nim-chronos
vendored
@ -1 +1 @@
|
||||
Subproject commit 5034f0a5a6772576c78d6ef45dacfda6a6bc60c4
|
||||
Subproject commit 80102a3b6a8bc9c205302ec5d0f895d01a4e43df
|
Loading…
x
Reference in New Issue
Block a user