increase attestation/aggregate queue sizes

when there are many validators, many aggregates and attestations arrive
every slot - increase the queue size a bit - also do batches on each
idle loop iteration since it's fairly quick
This commit is contained in:
Jacek Sieka 2021-01-26 10:10:57 +01:00 committed by zah
parent 1bdbf099cc
commit 43c64d32f8

View File

@ -536,10 +536,21 @@ proc runQueueProcessingLoop*(self: ref Eth2Processor) {.async.} =
elif aggregateFut.finished:
# aggregates will be dropped under heavy load on producer side
self[].processAggregate(aggregateFut.read())
for i in 0..<7: # process a few at a time - this is fairly fast
if self[].aggregatesQueue.empty():
break
self[].processAggregate(self[].aggregatesQueue.popFirstNoWait())
aggregateFut = self[].aggregatesQueue.popFirst()
elif attestationFut.finished:
# attestations will be dropped under heavy load on producer side
self[].processAttestation(attestationFut.read())
for i in 0..<7: # process a few at a time - this is fairly fast
if self[].attestationsQueue.empty():
break
self[].processAttestation(self[].attestationsQueue.popFirstNoWait())
attestationFut = self[].attestationsQueue.popFirst()
proc new*(T: type Eth2Processor,
@ -560,6 +571,14 @@ proc new*(T: type Eth2Processor,
quarantine: quarantine,
blockReceivedDuringSlot: newFuture[void](),
blocksQueue: newAsyncQueue[BlockEntry](1),
aggregatesQueue: newAsyncQueue[AggregateEntry](MAX_ATTESTATIONS.int),
attestationsQueue: newAsyncQueue[AttestationEntry](TARGET_COMMITTEE_SIZE.int * 4),
# limit to the max number of aggregates we expect to see in one slot
aggregatesQueue: newAsyncQueue[AggregateEntry](
(TARGET_AGGREGATORS_PER_COMMITTEE * MAX_COMMITTEES_PER_SLOT).int),
# This queue is a bit harder to bound reasonably - we want to get a good
# spread of votes across committees - ideally at least TARGET_COMMITTEE_SIZE
# per committee - assuming randomness in vote arrival, this limit should
# cover that but of course, when votes arrive depends on a number of
# factors that are not entire random
attestationsQueue: newAsyncQueue[AttestationEntry](
(TARGET_COMMITTEE_SIZE * MAX_COMMITTEES_PER_SLOT).int),
)