Jordan Hrycaj bd42ebb193
Snap sync refactor accounts healing (#1392)
* Relocated mothballing (i.e. swap-in preparation) logic

details:
  Mothballing was previously tested & started after downloading
  account ranges in `range_fetch_accounts`.

  Whenever current download or healing stops because of a pivot change,
  swap-in preparation is needed (otherwise some storage slots may get
  lost when swap-in takes place.)

  Also, `execSnapSyncAction()` has been moved back to `pivot_helper`.

* Reorganised source file directories

details:
  Grouped pivot focused modules into `pivot` directory

* Renamed `checkNodes`, `sickSubTries` as `nodes.check`, `nodes.missing`

why:
  Both lists are typically used together as pair. Renaming `sickSubTries`
  reflects moving away from a healing centric view towards a swap-in
  attitude.

* Multi times coverage recording

details:
  Per pivot account ranges are accumulated into coverage range set. This
  set fill eventually contain a singe range of account hashes [0..2^256]
  which amounts to 100% capacity.

  A counter has been added that is incremented whenever max capacity is
  reached. The accumulated range is then reset to empty.

  The effect of this setting is that the coverage can be evenly duplicated.
  So 200% would not accumulate on a particular region.

* Update range length comparisons (mod 2^256)

why:
  A range interval can have sizes 1..2^256 as it cannot be empty by
  definition. The number of points in a range intervals set can have
  0..2^256 points. As the scalar range is a residue class modulo 2^256,
  the residue class 0 means length 2^256 for a range interval, but can
  be 0 or 2^256 for the number of points in a range intervals set.

* Generalised `hexaryEnvelopeDecompose()`

details:
  Compile the complement of the union of some (processed) intervals and
  express this complement as a list of envelopes of sub-tries.

  This facility is directly applicable to swap-in book-keeping.

* Re-factor `swapIn()`

why:
  Good idea but baloney implementation. The main algorithm is based on
  the generalised version of `hexaryEnvelopeDecompose()` which has been
  derived from this implementation.

* Refactor `healAccounts()` using `hexaryEnvelopeDecompose()` as main driver

why:
  Previously, the hexary trie was searched recursively for dangling nodes
  which has a poor worst case performance already when the trie  is
  reasonably populated.

  The function `hexaryEnvelopeDecompose()` is a magnitude faster because
  it does not peruse existing sub-tries in order to find missing nodes
  although result is not fully compatible with the previous function.

  So recursive search is used in a limited mode only when the decomposer
  will not deliver a useful result.

* Logging & maintenance fixes

details:
  Preparation for abandoning buddy-global healing variables `node`,
  `resumeCtx`, and `lockTriePerusal`. These variable are trie-perusal
  centric which will be run on the back burner in favour of
  `hexaryEnvelopeDecompose()` which is used for accounts healing already.
2022-12-19 21:22:09 +00:00

227 lines
6.8 KiB
Nim

# Nimbus - Fetch account and storage states from peers efficiently
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[strformat, strutils],
chronos,
chronicles,
eth/[common, p2p],
stint,
../../../utils/prettify,
../../misc/timer_helper
{.push raises: [Defect].}
logScope:
topics = "snap-tick"
type
TickerStats* = object
pivotBlock*: Option[BlockNumber]
nAccounts*: (float,float) ## mean and standard deviation
accountsFill*: (float,float,float) ## mean, standard deviation, merged total
nAccountStats*: int ## #chunks
nSlotLists*: (float,float) ## mean and standard deviation
nStorageQueue*: Option[int]
nQueues*: int
TickerStatsUpdater* =
proc: TickerStats {.gcsafe, raises: [Defect].}
TickerRef* = ref object
## Account fetching state that is shared among all peers.
nBuddies: int
recovery: bool
lastRecov: bool
lastStats: TickerStats
statsCb: TickerStatsUpdater
logTicker: TimerCallback
started: Moment
visited: Moment
const
tickerStartDelay = chronos.milliseconds(100)
tickerLogInterval = chronos.seconds(1)
tickerLogSuppressMax = chronos.seconds(100)
# ------------------------------------------------------------------------------
# Private functions: pretty printing
# ------------------------------------------------------------------------------
# proc ppMs*(elapsed: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# result = $elapsed.inMilliseconds
# let ns = elapsed.inNanoseconds mod 1_000_000 # fraction of a milli second
# if ns != 0:
# # to rounded deca milli seconds
# let dm = (ns + 5_000i64) div 10_000i64
# result &= &".{dm:02}"
# result &= "ms"
#
# proc ppSecs*(elapsed: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# result = $elapsed.inSeconds
# let ns = elapsed.inNanoseconds mod 1_000_000_000 # fraction of a second
# if ns != 0:
# # round up
# let ds = (ns + 5_000_000i64) div 10_000_000i64
# result &= &".{ds:02}"
# result &= "s"
#
# proc ppMins*(elapsed: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# result = $elapsed.inMinutes
# let ns = elapsed.inNanoseconds mod 60_000_000_000 # fraction of a minute
# if ns != 0:
# # round up
# let dm = (ns + 500_000_000i64) div 1_000_000_000i64
# result &= &":{dm:02}"
# result &= "m"
#
# proc pp(d: times.Duration): string
# {.gcsafe, raises: [Defect, ValueError]} =
# if 40 < d.inSeconds:
# d.ppMins
# elif 200 < d.inMilliseconds:
# d.ppSecs
# else:
# d.ppMs
proc pc99(val: float): string =
if 0.99 <= val and val < 1.0: "99%"
elif 0.0 < val and val <= 0.01: "1%"
else: val.toPC(0)
# ------------------------------------------------------------------------------
# Private functions: ticking log messages
# ------------------------------------------------------------------------------
template noFmtError(info: static[string]; code: untyped) =
try:
code
except ValueError as e:
raiseAssert "Inconveivable (" & info & "): " & e.msg
proc setLogTicker(t: TickerRef; at: Moment) {.gcsafe.}
proc runLogTicker(t: TickerRef) {.gcsafe.} =
let
data = t.statsCb()
now = Moment.now()
if data != t.lastStats or
t.recovery != t.lastRecov or
tickerLogSuppressMax < (now - t.visited):
var
nAcc, nSto, bulk: string
pivot = "n/a"
nStoQue = "n/a"
let
recoveryDone = t.lastRecov
accCov = data.accountsFill[0].pc99 &
"(" & data.accountsFill[1].pc99 & ")" &
"/" & data.accountsFill[2].pc99 &
"~" & data.nAccountStats.uint.toSI
buddies = t.nBuddies
# With `int64`, there are more than 29*10^10 years range for seconds
up = (now - t.started).seconds.uint64.toSI
mem = getTotalMem().uint.toSI
t.lastStats = data
t.visited = now
t.lastRecov = t.recovery
noFmtError("runLogTicker"):
if data.pivotBlock.isSome:
pivot = &"#{data.pivotBlock.get}/{data.nQueues}"
nAcc = (&"{(data.nAccounts[0]+0.5).int64}" &
&"({(data.nAccounts[1]+0.5).int64})")
nSto = (&"{(data.nSlotLists[0]+0.5).int64}" &
&"({(data.nSlotLists[1]+0.5).int64})")
if data.nStorageQueue.isSome:
nStoQue = $data.nStorageQueue.unsafeGet
if t.recovery:
info "Snap sync statistics (recovery)",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
elif recoveryDone:
info "Snap sync statistics (recovery done)",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
else:
info "Snap sync statistics",
up, buddies, pivot, nAcc, accCov, nSto, nStoQue, mem
t.setLogTicker(Moment.fromNow(tickerLogInterval))
proc setLogTicker(t: TickerRef; at: Moment) =
if not t.logTicker.isNil:
t.logTicker = safeSetTimer(at, runLogTicker, t)
# ------------------------------------------------------------------------------
# Public constructor and start/stop functions
# ------------------------------------------------------------------------------
proc init*(T: type TickerRef; cb: TickerStatsUpdater): T =
## Constructor
T(statsCb: cb)
proc start*(t: TickerRef) =
## Re/start ticker unconditionally
#debug "Started ticker"
t.logTicker = safeSetTimer(Moment.fromNow(tickerStartDelay), runLogTicker, t)
if t.started == Moment.default:
t.started = Moment.now()
proc stop*(t: TickerRef) =
## Stop ticker unconditionally
t.logTicker = nil
#debug "Stopped ticker"
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc startBuddy*(t: TickerRef) =
## Increment buddies counter and start ticker unless running.
if t.nBuddies <= 0:
t.nBuddies = 1
if not t.recovery:
t.start()
else:
t.nBuddies.inc
proc startRecovery*(t: TickerRef) =
## Ditto for recovery mode
if not t.recovery:
t.recovery = true
if t.nBuddies <= 0:
t.start()
proc stopBuddy*(t: TickerRef) =
## Decrement buddies counter and stop ticker if there are no more registered
## buddies.
t.nBuddies.dec
if t.nBuddies <= 0 and not t.recovery:
t.stop()
proc stopRecovery*(t: TickerRef) =
## Ditto for recovery mode
if t.recovery:
t.recovery = false
t.stop()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------