mirror of
https://github.com/status-im/nimbus-eth2.git
synced 2025-02-22 19:28:20 +00:00
use LRU strategy for shuffling/epoch caches (#4196)
When EL `newPayload` is slow (e.g., Raspberry Pi with Besu), the epoch and shuffling caches tend to fill up with multiple copies per epoch when processing gossip and performing validator duties close to wall slot. The old strategy of evicting oldest epoch led to the same item being evicted over and over, leading to blocking of over 5 minutes in extreme cases where alternate epochs/shuffling got loaded repeatedly. Changing the cache eviction strategy to least-recently-used seems to improve the situation drastically. A simple implementation was selected based on single linked-list without a hashtable.
This commit is contained in:
parent
eea13ee5ed
commit
5968ed586b
@ -65,6 +65,10 @@ type
|
||||
# unnecessary overhead.
|
||||
data*: BlockRef
|
||||
|
||||
LRUCache*[I: static[int], T] = object
|
||||
entries*: array[I, tuple[value: T, lastUsed: uint32]]
|
||||
timestamp*: uint32
|
||||
|
||||
ChainDAGRef* = ref object
|
||||
## ChainDAG validates, stores and serves chain history of valid blocks
|
||||
## according to the beacon chain state transtion. From genesis to the
|
||||
@ -189,9 +193,9 @@ type
|
||||
|
||||
cfg*: RuntimeConfig
|
||||
|
||||
shufflingRefs*: array[16, ShufflingRef]
|
||||
shufflingRefs*: LRUCache[16, ShufflingRef]
|
||||
|
||||
epochRefs*: array[32, EpochRef]
|
||||
epochRefs*: LRUCache[32, EpochRef]
|
||||
## Cached information about a particular epoch ending with the given
|
||||
## block - we limit the number of held EpochRefs to put a cap on
|
||||
## memory usage
|
||||
|
@ -265,6 +265,52 @@ func atSlot*(dag: ChainDAGRef, bid: BlockId, slot: Slot): Opt[BlockSlotId] =
|
||||
else:
|
||||
dag.getBlockIdAtSlot(slot)
|
||||
|
||||
func nextTimestamp[I, T](cache: var LRUCache[I, T]): uint32 =
|
||||
if cache.timestamp == uint32.high:
|
||||
for i in 0 ..< I:
|
||||
template e: untyped = cache.entries[i]
|
||||
if e.lastUsed != 0:
|
||||
e.lastUsed = 1
|
||||
cache.timestamp = 1
|
||||
inc cache.timestamp
|
||||
cache.timestamp
|
||||
|
||||
template findIt[I, T](cache: var LRUCache[I, T], predicate: untyped): Opt[T] =
|
||||
block:
|
||||
var res: Opt[T]
|
||||
for i in 0 ..< I:
|
||||
template e: untyped = cache.entries[i]
|
||||
template it: untyped {.inject, used.} = e.value
|
||||
if e.lastUsed != 0 and predicate:
|
||||
e.lastUsed = cache.nextTimestamp
|
||||
res.ok it
|
||||
break
|
||||
res
|
||||
|
||||
template delIt[I, T](cache: var LRUCache[I, T], predicate: untyped) =
|
||||
block:
|
||||
for i in 0 ..< I:
|
||||
template e: untyped = cache.entries[i]
|
||||
template it: untyped {.inject, used.} = e.value
|
||||
if e.lastUsed != 0 and predicate:
|
||||
e.reset()
|
||||
|
||||
func put[I, T](cache: var LRUCache[I, T], value: T) =
|
||||
var lru = 0
|
||||
block:
|
||||
var min = uint32.high
|
||||
for i in 0 ..< I:
|
||||
template e: untyped = cache.entries[i]
|
||||
if e.lastUsed < min:
|
||||
min = e.lastUsed
|
||||
lru = i
|
||||
if min == 0:
|
||||
break
|
||||
|
||||
template e: untyped = cache.entries[lru]
|
||||
e.value = value
|
||||
e.lastUsed = cache.nextTimestamp
|
||||
|
||||
func epochAncestor(dag: ChainDAGRef, bid: BlockId, epoch: Epoch):
|
||||
Opt[BlockSlotId] =
|
||||
## The epoch ancestor is the last block that has an effect on the epoch-
|
||||
@ -314,11 +360,8 @@ func findShufflingRef*(
|
||||
dependent_bsi = dag.atSlot(bid, dependent_slot).valueOr:
|
||||
return Opt.none(ShufflingRef)
|
||||
|
||||
for s in dag.shufflingRefs:
|
||||
if s == nil: continue
|
||||
if s.epoch == epoch and dependent_bsi.bid.root == s.attester_dependent_root:
|
||||
return Opt.some s
|
||||
Opt.none(ShufflingRef)
|
||||
dag.shufflingRefs.findIt(
|
||||
it.epoch == epoch and dependent_bsi.bid.root == it.attester_dependent_root)
|
||||
|
||||
func putShufflingRef*(dag: ChainDAGRef, shufflingRef: ShufflingRef) =
|
||||
## Store shuffling in the cache
|
||||
@ -327,20 +370,7 @@ func putShufflingRef*(dag: ChainDAGRef, shufflingRef: ShufflingRef) =
|
||||
# are seldomly used (ie RPC), so no need to cache
|
||||
return
|
||||
|
||||
# Because we put a cap on the number of shufflingRef we store, we want to
|
||||
# prune the least useful state - for now, we'll assume that to be the
|
||||
# oldest shufflingRef we know about.
|
||||
var
|
||||
oldest = 0
|
||||
for x in 0..<dag.shufflingRefs.len:
|
||||
let candidate = dag.shufflingRefs[x]
|
||||
if candidate == nil:
|
||||
oldest = x
|
||||
break
|
||||
if candidate.epoch < dag.shufflingRefs[oldest].epoch:
|
||||
oldest = x
|
||||
|
||||
dag.shufflingRefs[oldest] = shufflingRef
|
||||
dag.shufflingRefs.put shufflingRef
|
||||
|
||||
func findEpochRef*(
|
||||
dag: ChainDAGRef, bid: BlockId, epoch: Epoch): Opt[EpochRef] =
|
||||
@ -348,12 +378,7 @@ func findEpochRef*(
|
||||
## `getEpochRef` for a version that creates a new instance if it's missing
|
||||
let key = ? dag.epochKey(bid, epoch)
|
||||
|
||||
for e in dag.epochRefs:
|
||||
if e == nil: continue
|
||||
if e.key == key:
|
||||
return Opt.some e
|
||||
|
||||
Opt.none(EpochRef)
|
||||
dag.epochRefs.findIt(it.key == key)
|
||||
|
||||
func putEpochRef(dag: ChainDAGRef, epochRef: EpochRef) =
|
||||
if epochRef.epoch < dag.finalizedHead.slot.epoch():
|
||||
@ -361,21 +386,7 @@ func putEpochRef(dag: ChainDAGRef, epochRef: EpochRef) =
|
||||
# are seldomly used (ie RPC), so no need to cache
|
||||
return
|
||||
|
||||
# Because we put a cap on the number of epochRefs we store, we want to
|
||||
# prune the least useful state - for now, we'll assume that to be the
|
||||
# oldest epochRef we know about.
|
||||
|
||||
var
|
||||
oldest = 0
|
||||
for x in 0..<dag.epochRefs.len:
|
||||
let candidate = dag.epochRefs[x]
|
||||
if candidate == nil:
|
||||
oldest = x
|
||||
break
|
||||
if candidate.key.epoch < dag.epochRefs[oldest].epoch:
|
||||
oldest = x
|
||||
|
||||
dag.epochRefs[oldest] = epochRef
|
||||
dag.epochRefs.put epochRef
|
||||
|
||||
func init*(
|
||||
T: type ShufflingRef, state: ForkedHashedBeaconState,
|
||||
@ -1666,14 +1677,8 @@ proc pruneStateCachesDAG*(dag: ChainDAGRef) =
|
||||
block: # Clean up old EpochRef instances
|
||||
# After finalization, we can clear up the epoch cache and save memory -
|
||||
# it will be recomputed if needed
|
||||
for i in 0..<dag.epochRefs.len:
|
||||
if dag.epochRefs[i] != nil and
|
||||
dag.epochRefs[i].epoch < dag.finalizedHead.slot.epoch:
|
||||
dag.epochRefs[i] = nil
|
||||
for i in 0..<dag.shufflingRefs.len:
|
||||
if dag.shufflingRefs[i] != nil and
|
||||
dag.shufflingRefs[i].epoch < dag.finalizedHead.slot.epoch:
|
||||
dag.shufflingRefs[i] = nil
|
||||
dag.epochRefs.delIt(it.epoch < dag.finalizedHead.slot.epoch)
|
||||
dag.shufflingRefs.delIt(it.epoch < dag.finalizedHead.slot.epoch)
|
||||
|
||||
let epochRefPruneTick = Moment.now()
|
||||
|
||||
|
@ -525,8 +525,8 @@ suite "chain DAG finalization tests" & preset():
|
||||
not finalER.isErr()
|
||||
|
||||
block:
|
||||
for er in dag.epochRefs:
|
||||
check: er == nil or er.epoch >= dag.finalizedHead.slot.epoch
|
||||
for er in dag.epochRefs.entries:
|
||||
check: er.value == nil or er.value.epoch >= dag.finalizedHead.slot.epoch
|
||||
|
||||
block:
|
||||
let tmpStateData = assignClone(dag.headState)
|
||||
|
Loading…
x
Reference in New Issue
Block a user