better timesharing in eth2 processor
* use `idleAsync` to more evenly divide cpu attention when syncing in particular - this gives networking better latency * more strict exception handling in eth2_processor
This commit is contained in:
parent
6fabefa76f
commit
76f15302a7
|
@ -1,7 +1,16 @@
|
||||||
|
# beacon_chain
|
||||||
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
||||||
|
# Licensed and distributed under either of
|
||||||
|
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
|
||||||
|
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
|
||||||
|
# at your option. This file may not be copied, modified, or distributed except according to those terms.
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
import
|
import
|
||||||
std/[math, tables],
|
std/[math, tables],
|
||||||
stew/results,
|
stew/results,
|
||||||
chronicles, chronicles/chronos_tools, chronos, metrics,
|
chronicles, chronos, metrics,
|
||||||
./spec/[crypto, datatypes, digest],
|
./spec/[crypto, datatypes, digest],
|
||||||
./block_pools/[clearance, chain_dag],
|
./block_pools/[clearance, chain_dag],
|
||||||
./attestation_aggregation, ./exit_pool,
|
./attestation_aggregation, ./exit_pool,
|
||||||
|
@ -234,6 +243,8 @@ proc processBlock(self: var Eth2Processor, entry: BlockEntry) =
|
||||||
if entry.v.resFut != nil:
|
if entry.v.resFut != nil:
|
||||||
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
|
entry.v.resFut.complete(Result[void, BlockError].err(res.error()))
|
||||||
|
|
||||||
|
{.pop.} # TODO AsyncQueue.addLast raises Exception in theory but not in practise
|
||||||
|
|
||||||
proc blockValidator*(
|
proc blockValidator*(
|
||||||
self: var Eth2Processor,
|
self: var Eth2Processor,
|
||||||
signedBlock: SignedBeaconBlock): ValidationResult =
|
signedBlock: SignedBeaconBlock): ValidationResult =
|
||||||
|
@ -280,11 +291,13 @@ proc blockValidator*(
|
||||||
# sync, we don't lose the gossip blocks, but also don't block the gossip
|
# sync, we don't lose the gossip blocks, but also don't block the gossip
|
||||||
# propagation of seemingly good blocks
|
# propagation of seemingly good blocks
|
||||||
trace "Block validated"
|
trace "Block validated"
|
||||||
traceAsyncErrors self.blocksQueue.addLast(
|
asyncSpawn self.blocksQueue.addLast(
|
||||||
BlockEntry(v: SyncBlock(blk: signedBlock)))
|
BlockEntry(v: SyncBlock(blk: signedBlock)))
|
||||||
|
|
||||||
ValidationResult.Accept
|
ValidationResult.Accept
|
||||||
|
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
|
||||||
proc attestationValidator*(
|
proc attestationValidator*(
|
||||||
self: var Eth2Processor,
|
self: var Eth2Processor,
|
||||||
attestation: Attestation,
|
attestation: Attestation,
|
||||||
|
@ -321,14 +334,14 @@ proc attestationValidator*(
|
||||||
notice "Queue full, dropping attestation",
|
notice "Queue full, dropping attestation",
|
||||||
dropped = shortLog(self.attestationsQueue[0].v)
|
dropped = shortLog(self.attestationsQueue[0].v)
|
||||||
discard self.attestationsQueue.popFirstNoWait()
|
discard self.attestationsQueue.popFirstNoWait()
|
||||||
except CatchableError as exc:
|
except AsyncQueueEmptyError as exc:
|
||||||
raiseAssert "If queue is full, we have at least one item! " & exc.msg
|
raiseAssert "If queue is full, we have at least one item! " & exc.msg
|
||||||
|
|
||||||
trace "Attestation validated"
|
trace "Attestation validated"
|
||||||
try:
|
try:
|
||||||
self.attestationsQueue.addLastNoWait(
|
self.attestationsQueue.addLastNoWait(
|
||||||
AttestationEntry(v: attestation, attesting_indices: v.get()))
|
AttestationEntry(v: attestation, attesting_indices: v.get()))
|
||||||
except CatchableError as exc:
|
except AsyncQueueFullError as exc:
|
||||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||||
|
|
||||||
ValidationResult.Accept
|
ValidationResult.Accept
|
||||||
|
@ -373,7 +386,7 @@ proc aggregateValidator*(
|
||||||
notice "Queue full, dropping aggregate",
|
notice "Queue full, dropping aggregate",
|
||||||
dropped = shortLog(self.aggregatesQueue[0].v)
|
dropped = shortLog(self.aggregatesQueue[0].v)
|
||||||
discard self.aggregatesQueue.popFirstNoWait()
|
discard self.aggregatesQueue.popFirstNoWait()
|
||||||
except CatchableError as exc:
|
except AsyncQueueEmptyError as exc:
|
||||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||||
|
|
||||||
trace "Aggregate validated",
|
trace "Aggregate validated",
|
||||||
|
@ -385,7 +398,7 @@ proc aggregateValidator*(
|
||||||
self.aggregatesQueue.addLastNoWait(AggregateEntry(
|
self.aggregatesQueue.addLastNoWait(AggregateEntry(
|
||||||
v: signedAggregateAndProof.message.aggregate,
|
v: signedAggregateAndProof.message.aggregate,
|
||||||
attesting_indices: v.get()))
|
attesting_indices: v.get()))
|
||||||
except CatchableError as exc:
|
except AsyncQueueFullError as exc:
|
||||||
raiseAssert "We just checked that queue is not full! " & exc.msg
|
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||||
|
|
||||||
ValidationResult.Accept
|
ValidationResult.Accept
|
||||||
|
@ -435,6 +448,8 @@ proc voluntaryExitValidator*(
|
||||||
|
|
||||||
ValidationResult.Accept
|
ValidationResult.Accept
|
||||||
|
|
||||||
|
{.pop.} # TODO raises in chronos
|
||||||
|
|
||||||
proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
|
proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
|
||||||
# Blocks in eth2 arrive on a schedule for every slot:
|
# Blocks in eth2 arrive on a schedule for every slot:
|
||||||
#
|
#
|
||||||
|
@ -448,24 +463,36 @@ proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
|
||||||
attestationFut = self[].attestationsQueue.popFirst()
|
attestationFut = self[].attestationsQueue.popFirst()
|
||||||
|
|
||||||
while true:
|
while true:
|
||||||
|
# Cooperative concurrency: one idle calculation step per loop - because
|
||||||
|
# we run both networking and CPU-heavy things like block processing
|
||||||
|
# on the same thread, we need to make sure that there is steady progress
|
||||||
|
# on the networking side or we get long lockups that lead to timeouts.
|
||||||
|
#
|
||||||
|
# We cap waiting for an idle slot to 1 second in case there's a lot of
|
||||||
|
# network traffic taking up all CPU - we don't want to _completely_ stop
|
||||||
|
# processing blocks in this case (attestations will get dropped)
|
||||||
|
discard await idleAsync().withTimeout(1.seconds)
|
||||||
|
|
||||||
|
# Avoid one more `await` when there's work to do
|
||||||
|
if not (blockFut.finished or aggregateFut.finished or attestationFut.finished):
|
||||||
trace "Waiting for processing work"
|
trace "Waiting for processing work"
|
||||||
await blockFut or aggregateFut or attestationFut
|
await blockFut or aggregateFut or attestationFut
|
||||||
|
|
||||||
while blockFut.finished:
|
# Only run one task per idle iteration, in priority order: blocks are needed
|
||||||
# TODO await here _hopefully_ yields to the event loop allowing another
|
# for all other processing - then come aggregates which are cheap to
|
||||||
# queue put to complete
|
# process but might have a big impact on fork choice - last come
|
||||||
self[].processBlock(await blockFut)
|
# attestations which individually have the smallest effect on chain progress
|
||||||
|
if blockFut.finished:
|
||||||
|
self[].processBlock(blockFut.read())
|
||||||
blockFut = self[].blocksQueue.popFirst()
|
blockFut = self[].blocksQueue.popFirst()
|
||||||
|
elif aggregateFut.finished:
|
||||||
if aggregateFut.finished:
|
# aggregates will be dropped under heavy load on producer side
|
||||||
self[].processAggregate(await aggregateFut)
|
self[].processAggregate(aggregateFut.read())
|
||||||
aggregateFut = self[].aggregatesQueue.popFirst()
|
aggregateFut = self[].aggregatesQueue.popFirst()
|
||||||
continue
|
elif attestationFut.finished:
|
||||||
|
# attestations will be dropped under heavy load on producer side
|
||||||
if attestationFut.finished:
|
self[].processAttestation(attestationFut.read())
|
||||||
self[].processAttestation(await attestationFut)
|
|
||||||
attestationFut = self[].attestationsQueue.popFirst()
|
attestationFut = self[].attestationsQueue.popFirst()
|
||||||
continue
|
|
||||||
|
|
||||||
proc new*(T: type Eth2Processor,
|
proc new*(T: type Eth2Processor,
|
||||||
config: BeaconNodeConf,
|
config: BeaconNodeConf,
|
||||||
|
|
|
@ -1 +1 @@
|
||||||
Subproject commit 46c0bf3c5aff131ac437a88c00aa112133c94d54
|
Subproject commit 396208044a6375d7d803f1dce6b2d1efdfec7fbb
|
Loading…
Reference in New Issue