diff --git a/beacon_chain/eth2_processor.nim b/beacon_chain/eth2_processor.nim index f628847f7..faaa04c78 100644 --- a/beacon_chain/eth2_processor.nim +++ b/beacon_chain/eth2_processor.nim @@ -536,10 +536,21 @@ proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} = elif aggregateFut.finished: # aggregates will be dropped under heavy load on producer side self[].processAggregate(aggregateFut.read()) + for i in 0..<7: # process a few at a time - this is fairly fast + if self[].aggregatesQueue.empty(): + break + self[].processAggregate(self[].aggregatesQueue.popFirstNoWait()) + aggregateFut = self[].aggregatesQueue.popFirst() elif attestationFut.finished: # attestations will be dropped under heavy load on producer side self[].processAttestation(attestationFut.read()) + + for i in 0..<7: # process a few at a time - this is fairly fast + if self[].attestationsQueue.empty(): + break + self[].processAttestation(self[].attestationsQueue.popFirstNoWait()) + attestationFut = self[].attestationsQueue.popFirst() proc new*(T: type Eth2Processor, @@ -560,6 +571,14 @@ proc new*(T: type Eth2Processor, quarantine: quarantine, blockReceivedDuringSlot: newFuture[void](), blocksQueue: newAsyncQueue[BlockEntry](1), - aggregatesQueue: newAsyncQueue[AggregateEntry](MAX_ATTESTATIONS.int), - attestationsQueue: newAsyncQueue[AttestationEntry](TARGET_COMMITTEE_SIZE.int * 4), + # limit to the max number of aggregates we expect to see in one slot + aggregatesQueue: newAsyncQueue[AggregateEntry]( + (TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int), + # This queue is a bit harder to bound reasonably - we want to get a good + # spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE + # per committee - assuming randomness in vote arrival, this limit should + # cover that but of course, when votes arrive depends on a number of + # factors that are not entire random + attestationsQueue: newAsyncQueue[AttestationEntry]( + (TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int), )