avoid some futures and copies in processing pipeline (#2157)
`addLastNoWait` does the same thing with less task scheduling and copying
This commit is contained in:
parent
3b06e0f657
commit
f10f29d829
|
@ -79,13 +79,11 @@ proc updateHead*(self: var Eth2Processor, wallSlot: Slot) =
|
||||||
# justified and finalized
|
# justified and finalized
|
||||||
let
|
let
|
||||||
oldFinalized = self.chainDag.finalizedHead.blck
|
oldFinalized = self.chainDag.finalizedHead.blck
|
||||||
oldHead = self.chainDag.head
|
|
||||||
|
|
||||||
self.chainDag.updateHead(newHead, self.quarantine)
|
self.chainDag.updateHead(newHead, self.quarantine)
|
||||||
|
|
||||||
# Cleanup the fork choice v2 if we have a finalized head
|
# Cleanup the fork choice v2 if we have a finalized head
|
||||||
if oldFinalized != self.chainDag.finalizedHead.blck:
|
if oldFinalized != self.chainDag.finalizedHead.blck:
|
||||||
|
|
||||||
self.attestationPool[].prune()
|
self.attestationPool[].prune()
|
||||||
|
|
||||||
proc dumpBlock[T](
|
proc dumpBlock[T](
|
||||||
|
@ -318,14 +316,19 @@ proc attestationValidator*(
|
||||||
beacon_attestation_delay.observe(delay.toFloatSeconds())
|
beacon_attestation_delay.observe(delay.toFloatSeconds())
|
||||||
|
|
||||||
while self.attestationsQueue.full():
|
while self.attestationsQueue.full():
|
||||||
let dropped = self.attestationsQueue.popFirst()
|
try:
|
||||||
doAssert dropped.finished, "popFirst sanity"
|
|
||||||
notice "Queue full, dropping attestation",
|
notice "Queue full, dropping attestation",
|
||||||
dropped = shortLog(dropped.read().v)
|
dropped = shortLog(self.attestationsQueue[0].v)
|
||||||
|
discard self.attestationsQueue.popFirstNoWait()
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "If queue is full, we have at least one item! " & exc.msg
|
||||||
|
|
||||||
trace "Attestation validated"
|
trace "Attestation validated"
|
||||||
traceAsyncErrors self.attestationsQueue.addLast(
|
try:
|
||||||
|
self.attestationsQueue.addLastNoWait(
|
||||||
AttestationEntry(v: attestation, attesting_indices: v.get()))
|
AttestationEntry(v: attestation, attesting_indices: v.get()))
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||||
|
|
||||||
ValidationResult.Accept
|
ValidationResult.Accept
|
||||||
|
|
||||||
|
@ -361,15 +364,20 @@ proc aggregateValidator*(
|
||||||
beacon_aggregate_delay.observe(delay.toFloatSeconds())
|
beacon_aggregate_delay.observe(delay.toFloatSeconds())
|
||||||
|
|
||||||
while self.aggregatesQueue.full():
|
while self.aggregatesQueue.full():
|
||||||
let dropped = self.aggregatesQueue.popFirst()
|
try:
|
||||||
doAssert dropped.finished, "popFirst sanity"
|
|
||||||
notice "Queue full, dropping aggregate",
|
notice "Queue full, dropping aggregate",
|
||||||
dropped = shortLog(dropped.read().v)
|
dropped = shortLog(self.aggregatesQueue[0].v)
|
||||||
|
discard self.aggregatesQueue.popFirstNoWait()
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||||
|
|
||||||
trace "Aggregate validated"
|
trace "Aggregate validated"
|
||||||
traceAsyncErrors self.aggregatesQueue.addLast(AggregateEntry(
|
try:
|
||||||
|
self.aggregatesQueue.addLastNoWait(AggregateEntry(
|
||||||
v: signedAggregateAndProof.message.aggregate,
|
v: signedAggregateAndProof.message.aggregate,
|
||||||
attesting_indices: v.get()))
|
attesting_indices: v.get()))
|
||||||
|
except CatchableError as exc:
|
||||||
|
raiseAssert "We just checked that queue is not full! " & exc.msg
|
||||||
|
|
||||||
ValidationResult.Accept
|
ValidationResult.Accept
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue