diff --git a/beacon_chain/eth2_processor.nim b/beacon_chain/eth2_processor.nim index 1579a9860..43df92dd2 100644 --- a/beacon_chain/eth2_processor.nim +++ b/beacon_chain/eth2_processor.nim @@ -1,7 +1,16 @@ +# beacon_chain +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed and distributed under either of +# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). +# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). +# at your option. This file may not be copied, modified, or distributed except according to those terms. + +{.push raises: [Defect].} + import std/[math, tables], stew/results, - chronicles, chronicles/chronos_tools, chronos, metrics, + chronicles, chronos, metrics, ./spec/[crypto, datatypes, digest], ./block_pools/[clearance, chain_dag], ./attestation_aggregation, ./exit_pool, @@ -234,6 +243,8 @@ proc processBlock(self: var Eth2Processor, entry: BlockEntry) = if entry.v.resFut != nil: entry.v.resFut.complete(Result[void, BlockError].err(res.error())) +{.pop.} # TODO AsyncQueue.addLast raises Exception in theory but not in practise + proc blockValidator*( self: var Eth2Processor, signedBlock: SignedBeaconBlock): ValidationResult = @@ -280,11 +291,13 @@ proc blockValidator*( # sync, we don't lose the gossip blocks, but also don't block the gossip # propagation of seemingly good blocks trace "Block validated" - traceAsyncErrors self.blocksQueue.addLast( + asyncSpawn self.blocksQueue.addLast( BlockEntry(v: SyncBlock(blk: signedBlock))) ValidationResult.Accept +{.push raises: [Defect].} + proc attestationValidator*( self: var Eth2Processor, attestation: Attestation, @@ -321,14 +334,14 @@ proc attestationValidator*( notice "Queue full, dropping attestation", dropped = shortLog(self.attestationsQueue[0].v) discard self.attestationsQueue.popFirstNoWait() - except CatchableError as exc: + except AsyncQueueEmptyError as exc: raiseAssert "If queue is full, we have at least one item! " & exc.msg trace "Attestation validated" try: self.attestationsQueue.addLastNoWait( AttestationEntry(v: attestation, attesting_indices: v.get())) - except CatchableError as exc: + except AsyncQueueFullError as exc: raiseAssert "We just checked that queue is not full! " & exc.msg ValidationResult.Accept @@ -373,7 +386,7 @@ proc aggregateValidator*( notice "Queue full, dropping aggregate", dropped = shortLog(self.aggregatesQueue[0].v) discard self.aggregatesQueue.popFirstNoWait() - except CatchableError as exc: + except AsyncQueueEmptyError as exc: raiseAssert "We just checked that queue is not full! " & exc.msg trace "Aggregate validated", @@ -385,7 +398,7 @@ proc aggregateValidator*( self.aggregatesQueue.addLastNoWait(AggregateEntry( v: signedAggregateAndProof.message.aggregate, attesting_indices: v.get())) - except CatchableError as exc: + except AsyncQueueFullError as exc: raiseAssert "We just checked that queue is not full! " & exc.msg ValidationResult.Accept @@ -435,6 +448,8 @@ proc voluntaryExitValidator*( ValidationResult.Accept +{.pop.} # TODO raises in chronos + proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} = # Blocks in eth2 arrive on a schedule for every slot: # @@ -448,24 +463,36 @@ proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} = attestationFut = self[].attestationsQueue.popFirst() while true: - trace "Waiting for processing work" - await blockFut or aggregateFut or attestationFut + # Cooperative concurrency: one idle calculation step per loop - because + # we run both networking and CPU-heavy things like block processing + # on the same thread, we need to make sure that there is steady progress + # on the networking side or we get long lockups that lead to timeouts. + # + # We cap waiting for an idle slot to 1 second in case there's a lot of + # network traffic taking up all CPU - we don't want to _completely_ stop + # processing blocks in this case (attestations will get dropped) + discard await idleAsync().withTimeout(1.seconds) - while blockFut.finished: - # TODO await here _hopefully_ yields to the event loop allowing another - # queue put to complete - self[].processBlock(await blockFut) + # Avoid one more `await` when there's work to do + if not (blockFut.finished or aggregateFut.finished or attestationFut.finished): + trace "Waiting for processing work" + await blockFut or aggregateFut or attestationFut + + # Only run one task per idle iteration, in priority order: blocks are needed + # for all other processing - then come aggregates which are cheap to + # process but might have a big impact on fork choice - last come + # attestations which individually have the smallest effect on chain progress + if blockFut.finished: + self[].processBlock(blockFut.read()) blockFut = self[].blocksQueue.popFirst() - - if aggregateFut.finished: - self[].processAggregate(await aggregateFut) + elif aggregateFut.finished: + # aggregates will be dropped under heavy load on producer side + self[].processAggregate(aggregateFut.read()) aggregateFut = self[].aggregatesQueue.popFirst() - continue - - if attestationFut.finished: - self[].processAttestation(await attestationFut) + elif attestationFut.finished: + # attestations will be dropped under heavy load on producer side + self[].processAttestation(attestationFut.read()) attestationFut = self[].attestationsQueue.popFirst() - continue proc new*(T: type Eth2Processor, config: BeaconNodeConf, diff --git a/vendor/nim-chronos b/vendor/nim-chronos index 46c0bf3c5..396208044 160000 --- a/vendor/nim-chronos +++ b/vendor/nim-chronos @@ -1 +1 @@ -Subproject commit 46c0bf3c5aff131ac437a88c00aa112133c94d54 +Subproject commit 396208044a6375d7d803f1dce6b2d1efdfec7fbb