avoid double deserialization

When blocks and attestations arrive, they are SSZ-decoded twice: once
for validation and once for processing. This branch enqueues the decoded
block directly for processing, avoiding the second, slow
deserialization.

* move processing of blocks and attestations to queue
* ...and out from beacon_node
* split attestation processing into attestations and aggregates
  * also updates metrics
* clean up logging to better follow the lifetime of gossip: arrival,
validation and processing
* drop attestations and aggregates if there are too many
* try to prioritise blocks and aggregates before single-validator
attestations
This commit is contained in:
Jacek Sieka 2020-08-20 18:30:47 +02:00 committed by zah
parent bbc90afa27
commit 22998fdfd4
10 changed files with 482 additions and 299 deletions

View File

@ -29,9 +29,10 @@ import
beacon_node_common, beacon_node_types,
block_pools/[spec_cache, chain_dag, quarantine, clearance, block_pools_types],
nimbus_binary_common, network_metadata,
mainchain_monitor, version, ssz/[merkleization], sszdump, merkle_minimal,
mainchain_monitor, version, ssz/[merkleization], merkle_minimal,
sync_protocol, request_manager, keystore_management, interop, statusbar,
sync_manager, validator_duties, validator_api, attestation_aggregation
sync_manager, validator_duties, validator_api,
./eth2_processor
const
genesisFile* = "genesis.ssz"
@ -56,12 +57,6 @@ declareGauge beacon_slot,
declareGauge beacon_head_slot,
"Slot of the head block of the beacon chain"
# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received,
"Number of beacon chain attestations received by this peer"
declareCounter beacon_blocks_received,
"Number of beacon chain blocks received by this peer"
# Finalization tracking
declareGauge finalization_delay,
"Epoch delay between scheduled epoch and finalized epoch"
@ -69,21 +64,8 @@ declareGauge finalization_delay,
declareGauge ticks_delay,
"How long does to take to run the onSecond loop"
const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]
declareHistogram beacon_attestation_received_seconds_from_slot_start,
"Interval between slot start and attestation reception", buckets = delayBuckets
declareHistogram beacon_block_received_seconds_from_slot_start,
"Interval between slot start and beacon block reception", buckets = delayBuckets
declareHistogram beacon_store_block_duration_seconds,
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
logScope: topics = "beacnde"
proc onBeaconBlock(node: BeaconNode, signedBlock: SignedBeaconBlock) {.gcsafe.}
proc getStateFromSnapshot(conf: BeaconNodeConf, stateSnapshotContents: ref string): NilableBeaconStateRef =
var
genesisPath = conf.dataDir/genesisFile
@ -266,7 +248,7 @@ proc init*(T: type BeaconNode,
topicBeaconBlocks = getBeaconBlocksTopic(enrForkId.forkDigest)
topicAggregateAndProofs = getAggregateAndProofsTopic(enrForkId.forkDigest)
network = createEth2Node(rng, conf, enrForkId)
attestationPool = newClone(AttestationPool.init(chainDag, quarantine))
var res = BeaconNode(
nickname: nickname,
graffitiBytes: if conf.graffiti.isSome: conf.graffiti.get.GraffitiBytes
@ -278,16 +260,23 @@ proc init*(T: type BeaconNode,
attachedValidators: ValidatorPool.init(),
chainDag: chainDag,
quarantine: quarantine,
attestationPool: AttestationPool.init(chainDag, quarantine),
attestationPool: attestationPool,
mainchainMonitor: mainchainMonitor,
beaconClock: BeaconClock.init(chainDag.headState.data.data),
rpcServer: rpcServer,
forkDigest: enrForkId.forkDigest,
topicBeaconBlocks: topicBeaconBlocks,
topicAggregateAndProofs: topicAggregateAndProofs,
blocksQueue: newAsyncQueue[SyncBlock](1),
)
res.requestManager = RequestManager.init(network, res.blocksQueue)
proc getWallTime(): BeaconTime = res.beaconClock.now()
res.processor = Eth2Processor.new(
conf, chainDag, attestationPool, quarantine, getWallTime)
res.requestManager = RequestManager.init(
network, res.processor.blocksQueue)
res.addLocalValidators()
# This merely configures the BeaconSync
@ -295,84 +284,6 @@ proc init*(T: type BeaconNode,
network.initBeaconSync(chainDag, enrForkId.forkDigest)
return res
proc onAttestation(node: BeaconNode, attestation: Attestation) =
# We received an attestation from the network but don't know much about it
# yet - in particular, we haven't verified that it belongs to particular chain
# we're on, or that it follows the rules of the protocol
logScope:
attestation = shortLog(attestation)
head = shortLog(node.chainDag.head)
pcs = "on_attestation"
let
wallSlot = node.beaconClock.now().toSlot()
head = node.chainDag.head
debug "Attestation received",
wallSlot = shortLog(wallSlot.slot)
if not wallSlot.afterGenesis or wallSlot.slot < head.slot:
warn "Received attestation before genesis or head - clock is wrong?",
afterGenesis = wallSlot.afterGenesis,
wallSlot = shortLog(wallSlot.slot)
return
if attestation.data.slot > head.slot and
(attestation.data.slot - head.slot) > MaxEmptySlotCount:
warn "Ignoring attestation, head block too old (out of sync?)"
return
node.attestationPool.addAttestation(attestation, wallSlot.slot)
proc dumpBlock[T](
node: BeaconNode, signedBlock: SignedBeaconBlock,
res: Result[T, BlockError]) =
if node.config.dumpEnabled and res.isErr:
case res.error
of Invalid:
dump(
node.config.dumpDirInvalid, signedBlock)
of MissingParent:
dump(
node.config.dumpDirIncoming, signedBlock)
else:
discard
proc storeBlock(
node: BeaconNode, signedBlock: SignedBeaconBlock): Result[void, BlockError] =
let start = Moment.now()
debug "Block received",
signedBlock = shortLog(signedBlock.message),
blockRoot = shortLog(signedBlock.root),
pcs = "receive_block"
beacon_blocks_received.inc()
{.gcsafe.}: # TODO: fork choice and quarantine should sync via messages instead of callbacks
let blck = node.chainDag.addRawBlock(node.quarantine, signedBlock) do (
blckRef: BlockRef, signedBlock: SignedBeaconBlock,
epochRef: EpochRef, state: HashedBeaconState):
# Callback add to fork choice if valid
node.attestationPool.addForkChoice(
epochRef, blckRef, signedBlock.message,
node.beaconClock.now().slotOrZero())
node.dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr:
return err(blck.error)
beacon_store_block_duration_seconds.observe((Moment.now() - start).milliseconds.float64 / 1000)
return ok()
proc onBeaconBlock(node: BeaconNode, signedBlock: SignedBeaconBlock) =
# We received a block but don't know much about it yet - in particular, we
# don't know if it's part of the chain we're currently building.
discard node.storeBlock(signedBlock)
func verifyFinalization(node: BeaconNode, slot: Slot) =
# Epoch must be >= 4 to check finalization
const SETTLING_TIME_OFFSET = 1'u64
@ -393,23 +304,12 @@ func verifyFinalization(node: BeaconNode, slot: Slot) =
doAssert finalizedEpoch + 4 >= epoch
proc installAttestationSubnetHandlers(node: BeaconNode, subnets: set[uint8]) =
proc attestationHandler(attestation: Attestation) =
# Avoid double-counting attestation-topic attestations on shared codepath
# when they're reflected through beacon blocks
beacon_attestations_received.inc()
beacon_attestation_received_seconds_from_slot_start.observe(
node.beaconClock.now.int64 - attestation.data.slot.toBeaconTime.int64)
node.onAttestation(attestation)
var attestationSubscriptions: seq[Future[void]] = @[]
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#attestations-and-aggregation
for subnet in subnets:
attestationSubscriptions.add(node.network.subscribe(
getAttestationTopic(node.forkDigest, subnet),
attestationHandler,
))
getAttestationTopic(node.forkDigest, subnet)))
waitFor allFutures(attestationSubscriptions)
@ -636,37 +536,6 @@ proc runOnSecondLoop(node: BeaconNode) {.async.} =
ticks_delay.set(sleepTime.nanoseconds.float / nanosecondsIn1s)
debug "onSecond task completed", sleepTime, processingTime
proc importBlock(node: BeaconNode,
sblock: SignedBeaconBlock): Result[void, BlockError] =
let sm1 = now(chronos.Moment)
let res = node.storeBlock(sblock)
let em1 = now(chronos.Moment)
if res.isOk() or (res.error() in {BlockError.Duplicate, BlockError.Old}):
let sm2 = now(chronos.Moment)
discard node.updateHead(node.beaconClock.now().slotOrZero)
let em2 = now(chronos.Moment)
let storeBlockDuration = if res.isOk(): em1 - sm1 else: ZeroDuration
let updateHeadDuration = if res.isOk(): em2 - sm2 else: ZeroDuration
let overallDuration = if res.isOk(): em2 - sm1 else: ZeroDuration
let storeSpeed =
block:
let secs = float(chronos.seconds(1).nanoseconds)
if not(overallDuration.isZero()):
let v = secs / float(overallDuration.nanoseconds)
round(v * 10_000) / 10_000
else:
0.0
debug "Block got imported successfully",
local_head_slot = node.chainDag.head.slot, store_speed = storeSpeed,
block_root = shortLog(sblock.root),
block_slot = sblock.message.slot,
store_block_duration = $storeBlockDuration,
update_head_duration = $updateHeadDuration,
overall_duration = $overallDuration
ok()
else:
err(res.error())
proc startSyncManager(node: BeaconNode) =
func getLocalHeadSlot(): Slot =
node.chainDag.head.slot
@ -696,16 +565,10 @@ proc startSyncManager(node: BeaconNode) =
node.syncManager = newSyncManager[Peer, PeerID](
node.network.peerPool, getLocalHeadSlot, getLocalWallSlot,
getFirstSlotAtFinalizedEpoch, node.blocksQueue, chunkSize = 32
getFirstSlotAtFinalizedEpoch, node.processor.blocksQueue, chunkSize = 32
)
node.syncManager.start()
proc runBlockProcessingLoop(node: BeaconNode) {.async.} =
## Incoming blocks processing loop.
while true:
let sblock = await node.blocksQueue.popFirst()
sblock.complete(node.importBlock(sblock.blk))
proc currentSlot(node: BeaconNode): Slot =
node.beaconClock.now.slotOrZero
@ -833,20 +696,6 @@ proc installRpcHandlers(rpcServer: RpcServer, node: BeaconNode) =
rpcServer.installDebugApiHandlers(node)
proc installMessageValidators(node: BeaconNode) =
proc attestationValidator(attestation: Attestation,
committeeIndex: uint64): bool =
let (afterGenesis, slot) = node.beaconClock.now().toSlot()
if not afterGenesis:
return false
node.attestationPool.isValidAttestation(attestation, slot, committeeIndex)
proc aggregatedAttestationValidator(
signedAggregateAndProof: SignedAggregateAndProof): bool =
let (afterGenesis, slot) = node.beaconClock.now().toSlot()
if not afterGenesis:
return false
node.attestationPool.isValidAggregatedAttestation(signedAggregateAndProof, slot)
# https://github.com/ethereum/eth2.0-specs/blob/v0.12.2/specs/phase0/p2p-interface.md#attestations-and-aggregation
# These validators stay around the whole time, regardless of which specific
# subnets are subscribed to during any given epoch.
@ -857,59 +706,19 @@ proc installMessageValidators(node: BeaconNode) =
getAttestationTopic(node.forkDigest, ci),
# This proc needs to be within closureScope; don't lift out of loop.
proc(attestation: Attestation): bool =
attestationValidator(attestation, ci)
)
node.processor[].attestationValidator(attestation, ci))
node.network.addValidator(
getAggregateAndProofsTopic(node.forkDigest),
proc(signedAggregateAndProof: SignedAggregateAndProof): bool =
aggregatedAttestationValidator(signedAggregateAndProof))
node.processor[].aggregateValidator(signedAggregateAndProof))
node.network.addValidator(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock) -> bool:
let
now = node.beaconClock.now
(afterGenesis, slot) = now.toSlot()
if not afterGenesis:
return false
logScope:
blk = shortLog(signedBlock.message)
root = shortLog(signedBlock.root)
let isKnown = signedBlock.root in node.chainDag.blocks
if isKnown:
trace "Received known gossip block"
# TODO:
# Potentially use a fast exit here. We only need to check that
# the contents of the incoming message match our previously seen
# version of the block. We don't need to use HTR for this - for
# better efficiency we can use vanilla SHA256 or direct comparison
# if we still have the previous block in memory.
# TODO:
# We are seeing extreme delays sometimes (e.g. 300 seconds).
# Should we drop such blocks? The spec doesn't set a policy on this.
else:
let delay = (now.int64 - signedBlock.message.slot.toBeaconTime.int64)
debug "Incoming gossip block", delay
beacon_block_received_seconds_from_slot_start.observe delay
let blck = node.chainDag.isValidBeaconBlock(node.quarantine,
signedBlock, slot, {})
node.dumpBlock(signedBlock, blck)
blck.isOk
node.network.addValidator(
node.topicBeaconBlocks,
proc (signedBlock: SignedBeaconBlock): bool =
node.processor[].blockValidator(signedBlock))
proc getAttestationHandlers(node: BeaconNode): Future[void] =
proc attestationHandler(attestation: Attestation) =
# Avoid double-counting attestation-topic attestations on shared codepath
# when they're reflected through beacon blocks
beacon_attestations_received.inc()
beacon_attestation_received_seconds_from_slot_start.observe(
node.beaconClock.now.int64 - attestation.data.slot.toBeaconTime.int64)
node.onAttestation(attestation)
var initialSubnets: set[uint8]
for i in 0'u8 ..< ATTESTATION_SUBNET_COUNT:
initialSubnets.incl i
@ -925,16 +734,12 @@ proc getAttestationHandlers(node: BeaconNode): Future[void] =
node.attestationSubnets.subscribedSubnets[1 - (GENESIS_EPOCH mod 2)] =
initialSubnets
node.network.subscribe(
getAggregateAndProofsTopic(node.forkDigest),
proc(signedAggregateAndProof: SignedAggregateAndProof) =
attestationHandler(signedAggregateAndProof.message.aggregate))
node.network.subscribe(getAggregateAndProofsTopic(node.forkDigest))
proc addMessageHandlers(node: BeaconNode) =
waitFor allFutures(
# As a side-effect, this gets the attestation subnets too.
node.network.subscribe(node.topicBeaconBlocks) do (signedBlock: SignedBeaconBlock):
onBeaconBlock(node, signedBlock),
node.network.subscribe(node.topicBeaconBlocks),
node.getAttestationHandlers()
)
@ -987,7 +792,7 @@ proc run*(node: BeaconNode) =
asyncCheck node.onSlotStart(curSlot, nextSlot)
node.onSecondLoop = runOnSecondLoop(node)
node.blockProcessingLoop = runBlockProcessingLoop(node)
node.blockProcessingLoop = node.processor.runQueueProcessingLoop()
node.requestManager.start()
node.startSyncManager()

View File

@ -21,7 +21,8 @@ import
attestation_pool, eth2_network,
block_pools/[chain_dag, quarantine],
beacon_node_types, mainchain_monitor, request_manager,
sync_manager
sync_manager,
./eth2_processor
# This removes an invalid Nim warning that the digest module is unused here
# It's currently used for `shortLog(head.blck.root)`
@ -41,12 +42,11 @@ type
attachedValidators*: ValidatorPool
chainDag*: ChainDAGRef
quarantine*: QuarantineRef
attestationPool*: AttestationPool
attestationPool*: ref AttestationPool
mainchainMonitor*: MainchainMonitor
beaconClock*: BeaconClock
rpcServer*: RpcServer
forkDigest*: ForkDigest
blocksQueue*: AsyncQueue[SyncBlock]
requestManager*: RequestManager
syncManager*: SyncManager[Peer, PeerID]
topicBeaconBlocks*: string
@ -55,33 +55,14 @@ type
onSecondLoop*: Future[void]
genesisSnapshotContent*: string
attestationSubnets*: AttestationSubnets
processor*: ref Eth2Processor
const
MaxEmptySlotCount* = uint64(10*60) div SECONDS_PER_SLOT
# Metrics
declareGauge beacon_head_root,
"Root of the head block of the beacon chain"
proc updateHead*(node: BeaconNode, wallSlot: Slot): BlockRef =
# Check pending attestations - maybe we found some blocks for them
node.attestationPool.resolve(wallSlot)
# Grab the new head according to our latest attestation data
let newHead = node.attestationPool.selectHead(wallSlot)
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
let oldFinalized = node.chainDag.finalizedHead.blck
node.chainDag.updateHead(newHead)
beacon_head_root.set newHead.root.toGaugeValue
# Cleanup the fork choice v2 if we have a finalized head
if oldFinalized != node.chainDag.finalizedHead.blck:
node.attestationPool.prune()
newHead
node.processor[].updateHead(wallSlot)
template findIt*(s: openarray, predicate: untyped): int =
var res = -1

View File

@ -953,7 +953,8 @@ proc stop*(node: Eth2Node) {.async.} =
timeout = 5.seconds
completed = await withTimeout(allFutures(waitedFutures), timeout)
if not completed:
trace "Eth2Node.stop(): timeout reached", timeout, futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg)
trace "Eth2Node.stop(): timeout reached", timeout,
futureErrors = waitedFutures.filterIt(it.error != nil).mapIt(it.error.msg)
proc init*(T: type Peer, network: Eth2Node, info: PeerInfo): Peer =
new result
@ -1259,6 +1260,12 @@ proc subscribe*[MsgType](node: Eth2Node,
await node.pubsub.subscribe(topic & "_snappy", execMsgHandler)
proc subscribe*(node: Eth2Node, topic: string) {.async, gcsafe.} =
proc dummyMsgHandler(topic: string, data: seq[byte]) {.async, gcsafe.} =
discard
await node.pubsub.subscribe(topic & "_snappy", dummyMsgHandler)
proc addValidator*[MsgType](node: Eth2Node,
topic: string,
msgValidator: proc(msg: MsgType): bool {.gcsafe.} ) =

View File

@ -0,0 +1,404 @@
import
std/[math, tables],
stew/results,
chronicles, chronicles/chronos_tools, chronos, metrics,
./spec/[crypto, datatypes, digest],
./block_pools/[clearance, chain_dag],
./attestation_aggregation,
./beacon_node_types, ./attestation_pool,
./time, ./conf, ./sszdump
# Metrics for tracking attestation and beacon block loss
declareCounter beacon_attestations_received,
"Number of beacon chain attestations received by this peer"
declareCounter beacon_aggregates_received,
"Number of beacon chain aggregate attestations received by this peer"
declareCounter beacon_blocks_received,
"Number of beacon chain blocks received by this peer"
const delayBuckets = [2.0, 4.0, 6.0, 8.0, 10.0, 12.0, 14.0, Inf]
declareHistogram beacon_attestation_delay,
"Time(s) between slot start and attestation reception", buckets = delayBuckets
declareHistogram beacon_aggregate_delay,
"Time(s) between slot start and aggregate reception", buckets = delayBuckets
declareHistogram beacon_block_delay,
"Time(s) between slot start and beacon block reception", buckets = delayBuckets
declareHistogram beacon_store_block_duration_seconds,
"storeBlock() duration", buckets = [0.25, 0.5, 1, 2, 4, 8, Inf]
declareGauge beacon_head_root,
"Root of the head block of the beacon chain"
type
GetWallTimeFn* = proc(): BeaconTime {.gcsafe, raises: [Defect].}
Entry[T] = object
v*: T
SyncBlock* = object
blk*: SignedBeaconBlock
resfut*: Future[Result[void, BlockError]]
BlockEntry* = Entry[SyncBlock]
AttestationEntry* = Entry[Attestation]
AggregateEntry* = Entry[Attestation]
Eth2Processor* = object
config*: BeaconNodeConf
getWallTime*: GetWallTimeFn
chainDag*: ChainDAGRef
attestationPool*: ref AttestationPool
quarantine*: QuarantineRef
blocksQueue*: AsyncQueue[BlockEntry]
attestationsQueue*: AsyncQueue[AttestationEntry]
aggregatesQueue*: AsyncQueue[AggregateEntry]
proc updateHead*(self: var Eth2Processor, wallSlot: Slot): BlockRef =
# Check pending attestations - maybe we found some blocks for them
self.attestationPool[].resolve(wallSlot)
# Grab the new head according to our latest attestation data
let newHead = self.attestationPool[].selectHead(wallSlot)
# Store the new head in the chain DAG - this may cause epochs to be
# justified and finalized
let oldFinalized = self.chainDag.finalizedHead.blck
self.chainDag.updateHead(newHead)
beacon_head_root.set newHead.root.toGaugeValue
# Cleanup the fork choice v2 if we have a finalized head
if oldFinalized != self.chainDag.finalizedHead.blck:
self.attestationPool[].prune()
newHead
proc dumpBlock[T](
self: Eth2Processor, signedBlock: SignedBeaconBlock,
res: Result[T, BlockError]) =
if self.config.dumpEnabled and res.isErr:
case res.error
of Invalid:
dump(
self.config.dumpDirInvalid, signedBlock)
of MissingParent:
dump(
self.config.dumpDirIncoming, signedBlock)
else:
discard
proc done*(blk: SyncBlock) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
## verification successfully.
if blk.resfut != nil:
blk.resfut.complete(Result[void, BlockError].ok())
proc fail*(blk: SyncBlock, error: BlockError) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
## verification with specific ``error``.
if blk.resfut != nil:
blk.resfut.complete(Result[void, BlockError].err(error))
proc complete*(blk: SyncBlock, res: Result[void, BlockError]) {.inline.} =
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
## verification.
if blk.resfut != nil:
blk.resfut.complete(res)
proc storeBlock(
self: var Eth2Processor, signedBlock: SignedBeaconBlock,
wallSlot: Slot): Result[void, BlockError] =
let
start = Moment.now()
attestationPool = self.attestationPool
{.gcsafe.}: # TODO: fork choice and quarantine should sync via messages instead of callbacks
let blck = self.chainDag.addRawBlock(self.quarantine, signedBlock) do (
blckRef: BlockRef, signedBlock: SignedBeaconBlock,
epochRef: EpochRef, state: HashedBeaconState):
# Callback add to fork choice if valid
attestationPool[].addForkChoice(
epochRef, blckRef, signedBlock.message, wallSlot)
self.dumpBlock(signedBlock, blck)
# There can be a scenario where we receive a block we already received.
# However this block was before the last finalized epoch and so its parent
# was pruned from the ForkChoice.
if blck.isErr:
return err(blck.error)
beacon_store_block_duration_seconds.observe((Moment.now() - start).milliseconds.float64 / 1000)
return ok()
proc processAttestation(
self: var Eth2Processor, entry: AttestationEntry) =
logScope:
signature = shortLog(entry.v.signature)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing attestation before genesis, clock turned back?"
quit 1
debug "Processing attestation"
self.attestationPool[].addAttestation(entry.v, wallSlot)
proc processAggregate(
self: var Eth2Processor, entry: AggregateEntry) =
logScope:
signature = shortLog(entry.v.signature)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing aggregate before genesis, clock turned back?"
quit 1
debug "Processing aggregate"
self.attestationPool[].addAttestation(entry.v, wallSlot)
proc processBlock(self: var Eth2Processor, entry: BlockEntry) =
logScope:
blockRoot = shortLog(entry.v.blk.root)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
error "Processing block before genesis, clock turned back?"
quit 1
let
start = now(chronos.Moment)
res = self.storeBlock(entry.v.blk, wallSlot)
storeDone = now(chronos.Moment)
if res.isOk():
# Eagerly update head in case the new block gets selected
discard self.updateHead(wallSlot)
let updateDone = now(chronos.Moment)
let storeBlockDuration = storeDone - start
let updateHeadDuration = updateDone - storeDone
let overallDuration = updateDone - start
let storeSpeed =
block:
let secs = float(chronos.seconds(1).nanoseconds)
if not(overallDuration.isZero()):
let v = secs / float(overallDuration.nanoseconds)
round(v * 10_000) / 10_000
else:
0.0
debug "Block processed",
local_head_slot = self.chainDag.head.slot,
store_speed = storeSpeed,
block_slot = entry.v.blk.message.slot,
store_block_duration = $storeBlockDuration,
update_head_duration = $updateHeadDuration,
overall_duration = $overallDuration
if entry.v.resFut != nil:
entry.v.resFut.complete(Result[void, BlockError].ok())
elif res.error() in {BlockError.Duplicate, BlockError.Old}:
# These are harmless / valid outcomes - for the purpose of scoring peers,
# they are ok
if entry.v.resFut != nil:
entry.v.resFut.complete(Result[void, BlockError].ok())
else:
if entry.v.resFut != nil:
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
proc blockValidator*(
self: var Eth2Processor,
signedBlock: SignedBeaconBlock): bool =
logScope:
signedBlock = shortLog(signedBlock.message)
blockRoot = shortLog(signedBlock.root)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
return false
logScope: wallSlot
let delay = wallTime - signedBlock.message.slot.toBeaconTime
if signedBlock.root in self.chainDag.blocks:
# The gossip algorithm itself already does one round of hashing to find
# already-seen data, but it is fairly aggressive about forgetting about
# what it has seen already
debug "Dropping already-seen gossip block", delay
# TODO:
# Potentially use a fast exit here. We only need to check that
# the contents of the incoming message match our previously seen
# version of the block. We don't need to use HTR for this - for
# better efficiency we can use vanilla SHA256 or direct comparison
# if we still have the previous block in memory.
return false
# Start of block processing - in reality, we have already gone through SSZ
# decoding at this stage, which may be significant
debug "Block received", delay
let blck = self.chainDag.isValidBeaconBlock(
self.quarantine, signedBlock, wallSlot, {})
self.dumpBlock(signedBlock, blck)
if not blck.isOk:
return false
# Block passed validation - enqueue it for processing. The block processing
# queue is effectively unbounded as we use a freestanding task to enqueue
# the block - this is done so that when blocks arrive concurrently with
# sync, we don't lose the gossip blocks, but also don't block the gossip
# propagation of seemingly good blocks
debug "Block validated"
traceAsyncErrors self.blocksQueue.addLast(
BlockEntry(v: SyncBlock(blk: signedBlock)))
beacon_block_delay.observe(float(milliseconds(delay)) / 1000.0)
true
proc attestationValidator*(
self: var Eth2Processor,
attestation: Attestation,
committeeIndex: uint64): bool =
logScope:
attestation = shortLog(attestation)
committeeIndex
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
notice "Attestation before genesis"
return false
logScope: wallSlot
let delay = wallTime - attestation.data.slot.toBeaconTime
debug "Attestation received", delay
if not self.attestationPool[].isValidAttestation(
attestation, wallSlot, committeeIndex):
return false # logged in validation
beacon_attestations_received.inc()
beacon_attestation_delay.observe(float(milliseconds(delay)) / 1000.0)
while self.attestationsQueue.full():
let dropped = self.attestationsQueue.popFirst()
doAssert dropped.finished, "popFirst sanity"
notice "Queue full, dropping attestation",
dropped = shortLog(dropped.read().v)
debug "Attestation validated"
traceAsyncErrors self.attestationsQueue.addLast(
AttestationEntry(v: attestation))
true
proc aggregateValidator*(
self: var Eth2Processor,
signedAggregateAndProof: SignedAggregateAndProof): bool =
logScope:
aggregate = shortLog(signedAggregateAndProof.message.aggregate)
signature = shortLog(signedAggregateAndProof.signature)
let
wallTime = self.getWallTime()
(afterGenesis, wallSlot) = wallTime.toSlot()
if not afterGenesis:
notice "Aggregate before genesis"
return false
logScope: wallSlot
let delay =
wallTime - signedAggregateAndProof.message.aggregate.data.slot.toBeaconTime
debug "Aggregate received", delay
if not self.attestationPool[].isValidAggregatedAttestation(
signedAggregateAndProof, wallSlot):
return false
beacon_aggregates_received.inc()
beacon_aggregate_delay.observe(float(milliseconds(delay)) / 1000.0)
while self.aggregatesQueue.full():
let dropped = self.aggregatesQueue.popFirst()
doAssert dropped.finished, "popFirst sanity"
notice "Queue full, dropping aggregate",
dropped = shortLog(dropped.read().v)
debug "Aggregate validated"
traceAsyncErrors self.aggregatesQueue.addLast(AggregateEntry(
v: signedAggregateAndProof.message.aggregate))
true
proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
# Blocks in eth2 arrive on a schedule for every slot:
#
# * Block arrives at time 0
# * Attestations arrives at time 4
# * Aggregate arrives at time 8
var
blockFut = self[].blocksQueue.popFirst()
aggregateFut = self[].aggregatesQueue.popFirst()
attestationFut = self[].attestationsQueue.popFirst()
while true:
trace "Waiting for processing work"
await blockFut or aggregateFut or attestationFut
while blockFut.finished:
# TODO await here _hopefully_ yields to the event loop allowing another
# queue put to complete
self[].processBlock(await blockFut)
blockFut = self[].blocksQueue.popFirst()
if aggregateFut.finished:
self[].processAggregate(await aggregateFut)
aggregateFut = self[].aggregatesQueue.popFirst()
continue
if attestationFut.finished:
self[].processAttestation(await attestationFut)
attestationFut = self[].attestationsQueue.popFirst()
continue
proc new*(T: type Eth2Processor,
config: BeaconNodeConf,
chainDag: ChainDAGRef,
attestationPool: ref AttestationPool,
quarantine: QuarantineRef,
getWallTime: GetWallTimeFn): ref Eth2Processor =
(ref Eth2Processor)(
config: config,
getWallTime: getWallTime,
chainDag: chainDag,
attestationPool: attestationPool,
quarantine: quarantine,
blocksQueue: newAsyncQueue[BlockEntry](1),
aggregatesQueue: newAsyncQueue[AggregateEntry](MAX_ATTESTATIONS.int),
attestationsQueue: newAsyncQueue[AttestationEntry](TARGET_COMMITTEE_SIZE.int * 4),
)

View File

@ -1,7 +1,7 @@
import options, sequtils, strutils
import chronos, chronicles
import spec/[datatypes, digest], eth2_network, beacon_node_types, sync_protocol,
sync_manager, ssz/merkleization
sync_manager, ssz/merkleization, ./eth2_processor
export sync_manager
logScope:
@ -18,7 +18,7 @@ type
RequestManager* = object
network*: Eth2Node
inpQueue*: AsyncQueue[FetchRecord]
outQueue*: AsyncQueue[SyncBlock]
outQueue*: AsyncQueue[BlockEntry]
loopFuture: Future[void]
func shortLog*(x: seq[Eth2Digest]): string =
@ -28,7 +28,7 @@ func shortLog*(x: seq[FetchRecord]): string =
"[" & x.mapIt(shortLog(it.root)).join(", ") & "]"
proc init*(T: type RequestManager, network: Eth2Node,
outputQueue: AsyncQueue[SyncBlock]): RequestManager =
outputQueue: AsyncQueue[BlockEntry]): RequestManager =
RequestManager(
network: network,
inpQueue: newAsyncQueue[FetchRecord](),
@ -55,7 +55,7 @@ proc validate(rman: RequestManager,
blk: b,
resfut: newFuture[Result[void, BlockError]]("request.manager.validate")
)
await rman.outQueue.addLast(sblock)
await rman.outQueue.addLast(BlockEntry(v: sblock))
return await sblock.resfut
proc fetchAncestorBlocksFromNetwork(rman: RequestManager,

View File

@ -303,7 +303,7 @@ func get_beacon_proposer_index*(state: BeaconState, cache: var StateCache, slot:
# active validator indices are kept in cache but sorting them takes
# quite a while
indices = get_active_validator_indices(state, epoch)
start = slot.epoch().compute_start_slot_at_epoch()
start = epoch.compute_start_slot_at_epoch()
var res: Option[ValidatorIndex]
for i in 0..<SLOTS_PER_EPOCH:

View File

@ -4,6 +4,7 @@ import stew/results, chronos, chronicles
import spec/[datatypes, digest], peer_pool, eth2_network
import eth/async_utils
import ./eth2_processor
import block_pools/block_pools_types
export datatypes, digest, chronos, chronicles, results, block_pools_types
@ -39,10 +40,6 @@ type
GetSlotCallback* = proc(): Slot {.gcsafe, raises: [Defect].}
SyncBlock* = object
blk*: SignedBeaconBlock
resfut*: Future[Result[void, BlockError]]
SyncRequest*[T] = object
index*: uint64
slot*: Slot
@ -73,7 +70,7 @@ type
debtsCount: uint64
readyQueue: HeapQueue[SyncResult[T]]
suspects: seq[SyncResult[T]]
outQueue: AsyncQueue[SyncBlock]
outQueue: AsyncQueue[BlockEntry]
SyncManager*[A, B] = ref object
pool: PeerPool[A, B]
@ -91,7 +88,7 @@ type
queue: SyncQueue[A]
failures: seq[SyncFailure[A]]
syncFut: Future[void]
outQueue: AsyncQueue[SyncBlock]
outQueue: AsyncQueue[BlockEntry]
inProgress*: bool
SyncMoment* = object
@ -112,7 +109,7 @@ proc validate*[T](sq: SyncQueue[T],
blk: blk,
resfut: newFuture[Result[void, BlockError]]("sync.manager.validate")
)
await sq.outQueue.addLast(sblock)
await sq.outQueue.addLast(BlockEntry(v: sblock))
return await sblock.resfut
proc getShortMap*[T](req: SyncRequest[T],
@ -213,7 +210,7 @@ proc isEmpty*[T](sr: SyncRequest[T]): bool {.inline.} =
proc init*[T](t1: typedesc[SyncQueue], t2: typedesc[T],
start, last: Slot, chunkSize: uint64,
getFinalizedSlotCb: GetSlotCallback,
outputQueue: AsyncQueue[SyncBlock],
outputQueue: AsyncQueue[BlockEntry],
queueSize: int = -1): SyncQueue[T] =
## Create new synchronization queue with parameters
##
@ -587,7 +584,7 @@ proc newSyncManager*[A, B](pool: PeerPool[A, B],
getLocalHeadSlotCb: GetSlotCallback,
getLocalWallSlotCb: GetSlotCallback,
getFinalizedSlotCb: GetSlotCallback,
outputQueue: AsyncQueue[SyncBlock],
outputQueue: AsyncQueue[BlockEntry],
maxWorkers = 10,
maxStatusAge = uint64(SLOTS_PER_EPOCH * 4),
maxHeadAge = uint64(SLOTS_PER_EPOCH * 1),
@ -979,18 +976,3 @@ proc syncLoop[A, B](man: SyncManager[A, B]) {.async.} =
proc start*[A, B](man: SyncManager[A, B]) =
## Starts SyncManager's main loop.
man.syncFut = man.syncLoop()
proc done*(blk: SyncBlock) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has passed
## verification successfully.
blk.resfut.complete(Result[void, BlockError].ok())
proc fail*(blk: SyncBlock, error: BlockError) =
## Send signal to [Sync/Request]Manager that the block ``blk`` has NOT passed
## verification with specific ``error``.
blk.resfut.complete(Result[void, BlockError].err(error))
proc complete*(blk: SyncBlock, res: Result[void, BlockError]) {.inline.} =
## Send signal to [Sync/Request]Manager about result ``res`` of block ``blk``
## verification.
blk.resfut.complete(res)

View File

@ -115,4 +115,8 @@ func shortLog*(d: Duration): string =
tmp &= $frac & "m"
tmp
func `$`*(v: BeaconTime): string = $(int64(v))
func shortLog*(v: BeaconTime): int64 = v.int64
func `-`*(a, b: BeaconTime): Duration =
seconds(int64(a)) - seconds(int64(b))

View File

@ -213,7 +213,7 @@ proc makeBeaconBlockForHeadAndSlot*(node: BeaconNode,
getRandaoReveal(val_info),
eth1data,
graffiti,
node.attestationPool.getAttestationsForBlock(state),
node.attestationPool[].getAttestationsForBlock(state),
deposits,
restore,
cache)
@ -238,7 +238,7 @@ proc proposeSignedBlock*(node: BeaconNode,
blckRef: BlockRef, signedBlock: SignedBeaconBlock,
epochRef: EpochRef, state: HashedBeaconState):
# Callback add to fork choice if valid
node.attestationPool.addForkChoice(
node.attestationPool[].addForkChoice(
epochRef, blckRef, signedBlock.message,
node.beaconClock.now().slotOrZero())
@ -414,7 +414,7 @@ proc broadcastAggregatedAttestations(
# the validator index and private key pair. TODO verify it only has
# one isSome() with test.
let aggregateAndProof =
aggregate_attestations(node.attestationPool, state,
aggregate_attestations(node.attestationPool[], state,
committee_index.CommitteeIndex,
# TODO https://github.com/status-im/nim-beacon-chain/issues/545
# this assumes in-process private keys

View File

@ -2,7 +2,7 @@
import unittest
import chronos
import ../beacon_chain/sync_manager
import ../beacon_chain/[eth2_processor, sync_manager]
type
SomeTPeer = ref object
@ -28,7 +28,7 @@ suite "SyncManager test suite":
test "[SyncQueue] Start and finish slots equal":
let p1 = SomeTPeer()
let aq = newAsyncQueue[SyncBlock](1)
let aq = newAsyncQueue[BlockEntry](1)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(0), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
check len(queue) == 1
@ -45,7 +45,7 @@ suite "SyncManager test suite":
r11.slot == Slot(0) and r11.count == 1'u64 and r11.step == 1'u64
test "[SyncQueue] Two full requests success/fail":
let aq = newAsyncQueue[SyncBlock](1)
let aq = newAsyncQueue[BlockEntry](1)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(1), 1'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
@ -74,7 +74,7 @@ suite "SyncManager test suite":
r22.slot == Slot(1) and r22.count == 1'u64 and r22.step == 1'u64
test "[SyncQueue] Full and incomplete success/fail start from zero":
let aq = newAsyncQueue[SyncBlock](1)
let aq = newAsyncQueue[BlockEntry](1)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 2'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
@ -114,7 +114,7 @@ suite "SyncManager test suite":
r33.slot == Slot(4) and r33.count == 1'u64 and r33.step == 1'u64
test "[SyncQueue] Full and incomplete success/fail start from non-zero":
let aq = newAsyncQueue[SyncBlock](1)
let aq = newAsyncQueue[BlockEntry](1)
var queue = SyncQueue.init(SomeTPeer, Slot(1), Slot(5), 3'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
@ -143,7 +143,7 @@ suite "SyncManager test suite":
r42.slot == Slot(4) and r42.count == 2'u64 and r42.step == 1'u64
test "[SyncQueue] Smart and stupid success/fail":
let aq = newAsyncQueue[SyncBlock](1)
let aq = newAsyncQueue[BlockEntry](1)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
@ -172,7 +172,7 @@ suite "SyncManager test suite":
r52.slot == Slot(4) and r52.count == 1'u64 and r52.step == 1'u64
test "[SyncQueue] One smart and one stupid + debt split + empty":
let aq = newAsyncQueue[SyncBlock](1)
let aq = newAsyncQueue[BlockEntry](1)
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(4), 5'u64,
getFirstSlotAtFinalizedEpoch, aq)
let p1 = SomeTPeer()
@ -208,16 +208,16 @@ suite "SyncManager test suite":
proc test(): Future[bool] {.async.} =
var counter = 0
proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} =
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blk.message.slot == Slot(counter):
if sblock.v.blk.message.slot == Slot(counter):
inc(counter)
else:
sblock.fail(BlockError.Invalid)
sblock.done()
sblock.v.fail(BlockError.Invalid)
sblock.v.done()
var aq = newAsyncQueue[SyncBlock](1)
var aq = newAsyncQueue[BlockEntry](1)
var chain = createChain(Slot(0), Slot(2))
var queue = SyncQueue.init(SomeTPeer, Slot(0), Slot(2), 1'u64,
getFirstSlotAtFinalizedEpoch, aq, 1)
@ -257,16 +257,16 @@ suite "SyncManager test suite":
proc test(): Future[bool] {.async.} =
var counter = 5
proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} =
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blk.message.slot == Slot(counter):
if sblock.v.blk.message.slot == Slot(counter):
inc(counter)
else:
sblock.fail(BlockError.Invalid)
sblock.done()
sblock.v.fail(BlockError.Invalid)
sblock.v.done()
var aq = newAsyncQueue[SyncBlock](1)
var aq = newAsyncQueue[BlockEntry](1)
var chain = createChain(Slot(5), Slot(11))
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(11), 2'u64,
getFirstSlotAtFinalizedEpoch, aq, 2)
@ -312,16 +312,16 @@ suite "SyncManager test suite":
proc test(): Future[bool] {.async.} =
var counter = 5
proc simpleValidator(aq: AsyncQueue[SyncBlock]) {.async.} =
proc simpleValidator(aq: AsyncQueue[BlockEntry]) {.async.} =
while true:
let sblock = await aq.popFirst()
if sblock.blk.message.slot == Slot(counter):
if sblock.v.blk.message.slot == Slot(counter):
inc(counter)
else:
sblock.fail(BlockError.Invalid)
sblock.done()
sblock.v.fail(BlockError.Invalid)
sblock.v.done()
var aq = newAsyncQueue[SyncBlock](1)
var aq = newAsyncQueue[BlockEntry](1)
var chain = createChain(Slot(5), Slot(18))
var queue = SyncQueue.init(SomeTPeer, Slot(5), Slot(18), 2'u64,
getFirstSlotAtFinalizedEpoch, aq, 2)