From f10f29d8291c79eafdd54fe3b91570faf08b09d7 Mon Sep 17 00:00:00 2001 From: Jacek Sieka Date: Tue, 8 Dec 2020 09:59:40 +0100 Subject: [PATCH] avoid some futures and copies in processing pipeline (#2157) `addLastNoWait` does the same thing with less task scheduling and copying --- beacon_chain/eth2_processor.nim | 38 ++++++++++++++++++++------------- 1 file changed, 23 insertions(+), 15 deletions(-) diff --git a/beacon_chain/eth2_processor.nim b/beacon_chain/eth2_processor.nim index 1ae9bc0c2..a7e2a2ebf 100644 --- a/beacon_chain/eth2_processor.nim +++ b/beacon_chain/eth2_processor.nim @@ -79,13 +79,11 @@ proc updateHead*(self: var Eth2Processor, wallSlot: Slot) = # justified and finalized let oldFinalized = self.chainDag.finalizedHead.blck - oldHead = self.chainDag.head self.chainDag.updateHead(newHead, self.quarantine) # Cleanup the fork choice v2 if we have a finalized head if oldFinalized != self.chainDag.finalizedHead.blck: - self.attestationPool[].prune() proc dumpBlock[T]( @@ -318,14 +316,19 @@ proc attestationValidator*( beacon_attestation_delay.observe(delay.toFloatSeconds()) while self.attestationsQueue.full(): - let dropped = self.attestationsQueue.popFirst() - doAssert dropped.finished, "popFirst sanity" - notice "Queue full, dropping attestation", - dropped = shortLog(dropped.read().v) + try: + notice "Queue full, dropping attestation", + dropped = shortLog(self.attestationsQueue[0].v) + discard self.attestationsQueue.popFirstNoWait() + except CatchableError as exc: + raiseAssert "If queue is full, we have at least one item! " & exc.msg trace "Attestation validated" - traceAsyncErrors self.attestationsQueue.addLast( - AttestationEntry(v: attestation, attesting_indices: v.get())) + try: + self.attestationsQueue.addLastNoWait( + AttestationEntry(v: attestation, attesting_indices: v.get())) + except CatchableError as exc: + raiseAssert "We just checked that queue is not full! " & exc.msg ValidationResult.Accept @@ -361,15 +364,20 @@ proc aggregateValidator*( beacon_aggregate_delay.observe(delay.toFloatSeconds()) while self.aggregatesQueue.full(): - let dropped = self.aggregatesQueue.popFirst() - doAssert dropped.finished, "popFirst sanity" - notice "Queue full, dropping aggregate", - dropped = shortLog(dropped.read().v) + try: + notice "Queue full, dropping aggregate", + dropped = shortLog(self.aggregatesQueue[0].v) + discard self.aggregatesQueue.popFirstNoWait() + except CatchableError as exc: + raiseAssert "We just checked that queue is not full! " & exc.msg trace "Aggregate validated" - traceAsyncErrors self.aggregatesQueue.addLast(AggregateEntry( - v: signedAggregateAndProof.message.aggregate, - attesting_indices: v.get())) + try: + self.aggregatesQueue.addLastNoWait(AggregateEntry( + v: signedAggregateAndProof.message.aggregate, + attesting_indices: v.get())) + except CatchableError as exc: + raiseAssert "We just checked that queue is not full! " & exc.msg ValidationResult.Accept