Snap sync rename objects (#1099)

* Disentangle `collect` module from `reply_data`

why:
  Now the module visible from `collect` for fetching data is `peer/fetch`
  only.

* Merge `SnapPeerHunt` into `collect`

why:
  This part needs to be known by `collect`, only

* rename collect => worker

* Dissolve `sync_fetch_xdesc` module into `common`

why:
  Descriptor is only used in `common` and `fetch_trie`

* rename `snap/peer` directory => `snap/worker`

* rename `SnapSync` -> `Worker`, `SnapPeer` -> `WorkerBuddy`

* moved `snap/base_desc.nim` -> `snap/worker/worker_desc.nim`

* Unified opaque object ref naming in `worker_desc.nim`

details:
  indicated my inheriting module (exactly one, always)
This commit is contained in:
Jordan Hrycaj 2022-05-24 09:07:39 +01:00 committed by GitHub
parent 553be51217
commit 96bb09457e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 592 additions and 563 deletions

View File

@ -16,7 +16,7 @@ import
eth/[common/eth_types, p2p, p2p/peer_pool, p2p/private/p2p_types],
stew/keyed_queue,
"."/[protocol, types],
./snap/[base_desc, collect]
./snap/worker
{.push raises: [Defect].}
@ -24,10 +24,10 @@ logScope:
topics = "snap sync"
type
SnapSyncCtx* = ref object of SnapSync
peerTab: KeyedQueue[Peer,SnapPeer] ## LRU cache
tabSize: int ## maximal number of entries
pool: PeerPool ## for starting the system, debugging
SnapSyncCtx* = ref object of Worker
peerTab: KeyedQueue[Peer,WorkerBuddy] ## LRU cache
tabSize: int ## maximal number of entries
pool: PeerPool ## for starting the system, debugging
# debugging
lastDump: seq[string]
@ -37,7 +37,7 @@ type
# Private helpers
# ------------------------------------------------------------------------------
proc nsCtx(sp: SnapPeer): SnapSyncCtx =
proc nsCtx(sp: WorkerBuddy): SnapSyncCtx =
sp.ns.SnapSyncCtx
proc hash(peer: Peer): Hash =
@ -59,21 +59,21 @@ proc dumpPeers(sn: SnapSyncCtx; force = false) =
var n = sn.peerTab.len - 1
for sp in sn.peerTab.prevValues:
trace "*** Peer list entry",
n, poolSize, peer=sp, hunt=sp.hunt.pp
n, poolSize, peer=sp, worker=sp.huntPp
n.dec
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc syncPeerLoop(sp: SnapPeer) {.async.} =
proc syncPeerLoop(sp: WorkerBuddy) {.async.} =
# This basic loop just runs the head-hunter for each peer.
var cache = ""
while sp.ctrl.runState != SyncStopped:
while sp.ctrl.runState != BuddyStopped:
# Do something, work a bit
await sp.collectBlockHeaders()
if sp.ctrl.runState == SyncStopped:
await sp.workerExec
if sp.ctrl.runState == BuddyStopped:
trace "Ignoring stopped peer", peer=sp
return
@ -81,38 +81,32 @@ proc syncPeerLoop(sp: SnapPeer) {.async.} =
# TODO: Update implementation of lruFetch() using re-link, only
discard sp.nsCtx.peerTab.lruFetch(sp.peer)
let delayMs = if sp.hunt.syncMode == SyncLocked: 1000 else: 50
let delayMs = if sp.workerLockedOk: 1000 else: 50
await sleepAsync(chronos.milliseconds(delayMs))
proc syncPeerStart(sp: SnapPeer) =
proc syncPeerStart(sp: WorkerBuddy) =
asyncSpawn sp.syncPeerLoop()
proc syncPeerStop(sp: SnapPeer) =
sp.ctrl.runState = SyncStopped
# TODO: Cancel running `SnapPeer` instances. We need clean cancellation
proc syncPeerStop(sp: WorkerBuddy) =
sp.ctrl.runState = BuddyStopped
# TODO: Cancel running `WorkerBuddy` instances. We need clean cancellation
# for this. Doing so reliably will be addressed at a later time.
proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) =
trace "Peer connected", peer
let sp = SnapPeer.new(ns, peer, SyncHuntForward, SyncRunningOk)
sp.collectDataSetup()
if peer.state(eth).initialized:
# We know the hash but not the block number.
sp.hunt.bestHash = peer.state(eth).bestBlockHash.BlockHash
# TODO: Temporarily disabled because it's useful to test the head hunter.
# sp.syncMode = SyncOnlyHash
else:
trace "State(eth) not initialized!"
let sp = WorkerBuddy.new(ns, peer, BuddyRunningOk)
# Manage connection table, check for existing entry
if ns.peerTab.hasKey(peer):
trace "Peer exists already!", peer
trace "Peer exists already!", peer # can this happen, at all?
return
# Initialise snap sync for this peer
discard sp.workerStart
# Check for table overflow. An overflow should not happen if the table is
# as large as the peer connection table.
if ns.tabSize <= ns.peerTab.len:

View File

@ -1,190 +0,0 @@
# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
eth/[common/eth_types, p2p],
stew/[byteutils, keyed_queue, results],
../../constants,
../types
{.push raises: [Defect].}
const
seenBlocksMax = 500
## Internal size of LRU cache (for debugging)
type
SnapPeerStat* = distinct uint
SnapPeerFetchBase* = ref object of RootObj
## Stub object, to be inherited
SnapPeerRequestsBase* = ref object of RootObj
## Stub object, to be inherited
SnapPeerMode* = enum
## The current state of tracking the peer's canonical chain head.
## `bestBlockNumber` is only valid when this is `SyncLocked`.
SyncLocked
SyncOnlyHash
SyncHuntForward
SyncHuntBackward
SyncHuntRange
SyncHuntRangeFinal
SnapPeerRunState* = enum
SyncRunningOk
SyncStopRequest
SyncStopped
SnapPeerStats* = tuple
## Statistics counters for events associated with this peer.
## These may be used to recognise errors and select good peers.
ok: tuple[
reorgDetected: SnapPeerStat,
getBlockHeaders: SnapPeerStat,
getNodeData: SnapPeerStat]
minor: tuple[
timeoutBlockHeaders: SnapPeerStat,
unexpectedBlockHash: SnapPeerStat]
major: tuple[
networkErrors: SnapPeerStat,
excessBlockHeaders: SnapPeerStat,
wrongBlockHeader: SnapPeerStat]
SnapPeerHunt* = tuple
## Peer canonical chain head ("best block") search state.
syncMode: SnapPeerMode ## Action mode
lowNumber: BlockNumber ## Recent lowest known block number.
highNumber: BlockNumber ## Recent highest known block number.
bestNumber: BlockNumber
bestHash: BlockHash
step: uint
SnapPeerCtrl* = tuple
## Control and state settings
stateRoot: Option[TrieHash]
## State root to fetch state for. This changes during sync and is
## slightly different for each peer.
runState: SnapPeerRunState
# -------
SnapSyncSeenBlocks = KeyedQueue[array[32,byte],BlockNumber]
## Temporary for pretty debugging, `BlockHash` keyed lru cache
SnapSyncFetchBase* = ref object of RootObj
## Stub object, to be inherited
# -------
SnapPeer* = ref object
## Non-inheritable peer state tracking descriptor.
ns*: SnapSync ## Snap descriptor object back reference
peer*: Peer ## Reference to eth p2pProtocol entry
stats*: SnapPeerStats ## Statistics counters
hunt*: SnapPeerHunt ## Peer chain head search state
ctrl*: SnapPeerCtrl ## Control and state settings
requests*: SnapPeerRequestsBase ## Opaque object reference
fetchState*: SnapPeerFetchBase ## Opaque object reference
SnapSync* = ref object of RootObj
## Shared state among all peers of a snap syncing node. Will be
## amended/inherited into `SnapSyncCtx` by the `snap` module.
seenBlock: SnapSyncSeenBlocks ## Temporary, debugging, prettyfied logs
sharedFetch*: SnapSyncFetchBase ## Opaque object reference
# ------------------------------------------------------------------------------
# Public Constructor
# ------------------------------------------------------------------------------
proc new*(
T: type SnapPeer;
ns: SnapSync;
peer: Peer;
syncMode: SnapPeerMode;
runState: SnapPeerRunState): T =
## Initial state, maximum uncertainty range.
T(ns: ns,
peer: peer,
ctrl: (
stateRoot: none(TrieHash),
runState: runState),
hunt: (
syncMode: syncMode,
lowNumber: 0.toBlockNumber.BlockNumber,
highNumber: high(BlockNumber).BlockNumber, # maximum uncertainty range.
bestNumber: 0.toBlockNumber.BlockNumber,
bestHash: ZERO_HASH256.BlockHash, # whatever
step: 0u))
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc `$`*(sp: SnapPeer): string =
$sp.peer
proc inc(stat: var SnapPeerStat) {.borrow.}
# ------------------------------------------------------------------------------
# Public functions, debugging helpers (will go away eventually)
# ------------------------------------------------------------------------------
proc pp*(sn: SnapSync; bh: BlockHash): string =
## Pretty printer for debugging
let rc = sn.seenBlock.lruFetch(bh.untie.data)
if rc.isOk:
return "#" & $rc.value
$bh.untie.data.toHex
proc pp*(sn: SnapSync; bh: BlockHash; bn: BlockNumber): string =
## Pretty printer for debugging
let rc = sn.seenBlock.lruFetch(bh.untie.data)
if rc.isOk:
return "#" & $rc.value
"#" & $sn.seenBlock.lruAppend(bh.untie.data, bn, seenBlocksMax)
proc pp*(sn: SnapSync; bhn: HashOrNum): string =
if not bhn.isHash:
return "num(#" & $bhn.number & ")"
let rc = sn.seenBlock.lruFetch(bhn.hash.data)
if rc.isOk:
return "hash(#" & $rc.value & ")"
return "hash(" & $bhn.hash.data.toHex & ")"
proc seen*(sn: SnapSync; bh: BlockHash; bn: BlockNumber) =
## Register for pretty printing
if not sn.seenBlock.lruFetch(bh.untie.data).isOk:
discard sn.seenBlock.lruAppend(bh.untie.data, bn, seenBlocksMax)
# -----------
import
../../../tests/replay/pp_light
proc pp*(bh: BlockHash): string =
bh.Hash256.pp
proc pp*(bn: BlockNumber): string =
if bn == high(BlockNumber): "#max"
else: "#" & $bn
proc pp*(sp: SnapPeerHunt): string =
result &= "(mode=" & $sp.syncMode
result &= ",num=(" & sp.lowNumber.pp & "," & sp.highNumber.pp & ")"
result &= ",best=(" & sp.bestNumber.pp & "," & sp.bestHash.pp & ")"
result &= ",step=" & $sp.step
result &= ")"
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -1,106 +0,0 @@
# Nimbus - Fetch account and storage states from peers efficiently
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[sets, sequtils, strutils],
chronos,
chronicles,
eth/[common/eth_types, p2p],
stint,
".."/[base_desc, path_desc],
./sync_fetch_xdesc
{.push raises: [Defect].}
logScope:
topics = "snap peer common"
proc hasSlice*(sp: SnapPeer): bool =
## Return `true` iff `getSlice` would return a free slice to work on.
if sp.ns.sharedFetchEx.isNil:
sp.ns.sharedFetchEx = SnapSyncFetchEx.new
result = 0 < sp.ns.sharedFetchEx.leafRanges.len
trace "hasSlice", peer=sp, hasSlice=result
proc getSlice*(sp: SnapPeer, leafLow, leafHigh: var LeafPath): bool =
## Claim a free slice to work on. If a slice was available, it's claimed,
## `leadLow` and `leafHigh` are set to the slice range and `true` is
## returned. Otherwise `false` is returned.
if sp.ns.sharedFetchEx.isNil:
sp.ns.sharedFetchEx = SnapSyncFetchEx.new
let sharedFetch = sp.ns.sharedFetchEx
template ranges: auto = sharedFetch.leafRanges
const leafMaxFetchRange = (high(LeafPath) - low(LeafPath)) div 1000
if ranges.len == 0:
trace "GetSlice", leafRange="none"
return false
leafLow = ranges[0].leafLow
if ranges[0].leafHigh - ranges[0].leafLow <= leafMaxFetchRange:
leafHigh = ranges[0].leafHigh
ranges.delete(0)
else:
leafHigh = leafLow + leafMaxFetchRange
ranges[0].leafLow = leafHigh + 1
trace "GetSlice", peer=sp, leafRange=pathRange(leafLow, leafHigh)
return true
proc putSlice*(sp: SnapPeer, leafLow, leafHigh: LeafPath) =
## Return a slice to the free list, merging with the rest of the list.
let sharedFetch = sp.ns.sharedFetchEx
template ranges: auto = sharedFetch.leafRanges
trace "PutSlice", leafRange=pathRange(leafLow, leafHigh), peer=sp
var i = 0
while i < ranges.len and leafLow > ranges[i].leafHigh:
inc i
if i > 0 and leafLow - 1 == ranges[i-1].leafHigh:
dec i
var j = i
while j < ranges.len and leafHigh >= ranges[j].leafLow:
inc j
if j < ranges.len and leafHigh + 1 == ranges[j].leafLow:
inc j
if j == i:
ranges.insert(LeafRange(leafLow: leafLow, leafHigh: leafHigh), i)
else:
if j-1 > i:
ranges[i].leafHigh = ranges[j-1].leafHigh
ranges.delete(i+1, j-1)
if leafLow < ranges[i].leafLow:
ranges[i].leafLow = leafLow
if leafHigh > ranges[i].leafHigh:
ranges[i].leafHigh = leafHigh
template getSlice*(sp: SnapPeer, leafRange: var LeafRange): bool =
sp.getSlice(leafRange.leafLow, leafRange.leafHigh)
template putSlice*(sp: SnapPeer, leafRange: LeafRange) =
sp.putSlice(leafRange.leafLow, leafRange.leafHigh)
proc countSlice*(sp: SnapPeer, leafLow, leafHigh: LeafPath, which: bool) =
doAssert leafLow <= leafHigh
sp.ns.sharedFetchEx.countRange += leafHigh - leafLow + 1
sp.ns.sharedFetchEx.countRangeStarted = true
if which:
sp.ns.sharedFetchEx.countRangeSnap += leafHigh - leafLow + 1
sp.ns.sharedFetchEx.countRangeSnapStarted = true
else:
sp.ns.sharedFetchEx.countRangeTrie += leafHigh - leafLow + 1
sp.ns.sharedFetchEx.countRangeTrieStarted = true
template countSlice*(sp: SnapPeer, leafRange: LeafRange, which: bool) =
sp.countSlice(leafRange.leafLow, leafRange.leafHigh, which)
proc countAccounts*(sp: SnapPeer, len: int) =
sp.ns.sharedFetchEx.countAccounts += len

View File

@ -1,115 +0,0 @@
# Nimbus - Fetch account and storage states from peers efficiently
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[sets, strutils],
chronos,
chronicles,
eth/[common/eth_types, p2p],
stint,
".."/[base_desc, path_desc, timer_helper]
{.push raises: [Defect].}
type
SnapSyncFetchEx* = ref object of SnapSyncFetchBase
## Account fetching state that is shared among all peers.
# Leaf path ranges not fetched or in progress on any peer.
leafRanges*: seq[LeafRange]
countAccounts*: int64
countAccountBytes*: int64
countRange*: UInt256
countRangeStarted*: bool
countRangeSnap*: UInt256
countRangeSnapStarted*: bool
countRangeTrie*: UInt256
countRangeTrieStarted*: bool
logTicker: TimerCallback
# ------------------------------------------------------------------------------
# Private timer helpers
# ------------------------------------------------------------------------------
proc rangeFraction(value: UInt256, discriminator: bool): int =
## Format a value in the range 0..2^256 as a percentage, 0-100%. As the
## top of the range 2^256 cannot be represented in `UInt256` it actually
## has the value `0: UInt256`, and with that value, `discriminator` is
## consulted to decide between 0% and 100%. For other values, the value is
## constrained to be between slightly above 0% and slightly below 100%,
## so that the endpoints are distinctive when displayed.
const multiplier = 10000
var fraction: int = 0 # Fixed point, fraction 0.0-1.0 multiplied up.
if value == 0:
return if discriminator: multiplier else: 0 # Either 100.00% or 0.00%.
const shift = 8 * (sizeof(value) - sizeof(uint64))
const wordHigh: uint64 = (high(typeof(value)) shr shift).truncate(uint64)
# Divide `wordHigh+1` by `multiplier`, rounding up, avoiding overflow.
const wordDiv: uint64 = 1 + ((wordHigh shr 1) div (multiplier.uint64 shr 1))
let wordValue: uint64 = (value shr shift).truncate(uint64)
let divided: uint64 = wordValue div wordDiv
return if divided >= multiplier: multiplier - 1
elif divided <= 0: 1
else: divided.int
proc percent(value: UInt256, discriminator: bool): string =
result = intToStr(rangeFraction(value, discriminator), 3)
result.insert(".", result.len - 2)
result.add('%')
proc setLogTicker(sf: SnapSyncFetchEx; at: Moment) {.gcsafe.}
proc runLogTicker(sf: SnapSyncFetchEx) {.gcsafe.} =
doAssert not sf.isNil
info "State: Account sync progress",
percent = percent(sf.countRange, sf.countRangeStarted),
accounts = sf.countAccounts,
snap = percent(sf.countRangeSnap, sf.countRangeSnapStarted),
trie = percent(sf.countRangeTrie, sf.countRangeTrieStarted)
sf.setLogTicker(Moment.fromNow(1.seconds))
proc setLogTicker(sf: SnapSyncFetchEx; at: Moment) =
sf.logTicker = safeSetTimer(at, runLogTicker, sf)
# ------------------------------------------------------------------------------
# Public constructor
# ------------------------------------------------------------------------------
proc new*(T: type SnapSyncFetchEx; startAfter = 100.milliseconds): T =
result = SnapSyncFetchEx(
leafRanges: @[LeafRange(
leafLow: LeafPath.low,
leafHigh: LeafPath.high)])
result.logTicker = safeSetTimer(
Moment.fromNow(startAfter),
runLogTicker,
result)
# ------------------------------------------------------------------------------
# Public getters
# ------------------------------------------------------------------------------
proc sharedFetchEx*(ns: SnapSync): SnapSyncFetchEx =
## Handy helper
ns.sharedFetch.SnapSyncFetchEx
# ------------------------------------------------------------------------------
# Public setters
# ------------------------------------------------------------------------------
proc `sharedFetchEx=`*(ns: SnapSync; value: SnapSyncFetchEx) =
## Handy helper
ns.sharedFetch = value
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -63,17 +63,20 @@ import
chronicles,
chronos,
eth/[common/eth_types, p2p, p2p/private/p2p_types],
../../p2p/chain/chain_desc,
"../.."/[constants, p2p/chain/chain_desc],
".."/[protocol, types],
"."/[base_desc, peer/fetch, peer/reply_data]
./worker/[worker_desc, fetch]
{.push raises: [Defect].}
export
worker_desc
logScope:
topics = "snap collect"
topics = "snap worker"
const
syncLockedMinimumReply = 8
syncLockedMinimumReply = 8
## Minimum number of headers we assume any peers will send if they have
## them in contiguous ascending queries. Fewer than this confirms we have
## found the peer's canonical chain head boundary. Must be at least 2, and
@ -82,7 +85,7 @@ const
## about peer implementations. 8 is chosen as it allows 3-deep extensions
## and 3-deep reorgs to be followed in a single round trip.
syncLockedQueryOverlap = 4
syncLockedQueryOverlap = 4
## Number of headers to re-query on each poll when `SyncLocked` so that we
## get small reorg updates in one round trip. Must be no more than
## `syncLockedMinimumReply-1`, no more than `syncLockedMinimumReply-2` to
@ -90,40 +93,78 @@ const
## excessive duplicate fetching. 4 is chosen as it allows 3-deep reorgs
## to be followed in single round trip.
syncLockedQuerySize = 192
syncLockedQuerySize = 192
## Query size when polling `SyncLocked`. Must be at least
## `syncLockedMinimumReply`. Large is fine, if we get a large reply the
## values are almost always useful.
syncHuntQuerySize = 16
huntQuerySize = 16
## Query size when hunting for canonical head boundary. Small is good
## because we don't want to keep most of the headers at hunt time.
syncHuntForwardExpandShift = 4
## Expansion factor during `SyncHuntForward` exponential search.
huntForwardExpandShift = 4
## Expansion factor during `HuntForward` exponential search.
## 16 is chosen for rapid convergence when bootstrapping or catching up.
syncHuntBackwardExpandShift = 1
## Expansion factor during `SyncHuntBackward` exponential search.
huntBackwardExpandShift = 1
## Expansion factor during `HuntBackward` exponential search.
## 2 is chosen for better convergence when tracking a chain reorg.
type
WorkerMode = enum
## The current state of tracking the peer's canonical chain head.
## `bestBlockNumber` is only valid when this is `SyncLocked`.
SyncLocked
SyncOnlyHash
HuntForward
HuntBackward
HuntRange
HuntRangeFinal
WorkerHuntEx = ref object of WorkerBase
## Peer canonical chain head ("best block") search state.
syncMode: WorkerMode ## Action mode
lowNumber: BlockNumber ## Recent lowest known block number.
highNumber: BlockNumber ## Recent highest known block number.
bestNumber: BlockNumber
bestHash: BlockHash
step: uint
static:
doAssert syncLockedMinimumReply >= 2
doAssert syncLockedMinimumReply >= syncLockedQueryOverlap + 2
doAssert syncLockedQuerySize <= maxHeadersFetch
doAssert syncHuntQuerySize >= 1 and syncHuntQuerySize <= maxHeadersFetch
doAssert syncHuntForwardExpandShift >= 1 and syncHuntForwardExpandShift <= 8
doAssert syncHuntBackwardExpandShift >= 1 and syncHuntBackwardExpandShift <= 8
doAssert huntQuerySize >= 1 and huntQuerySize <= maxHeadersFetch
doAssert huntForwardExpandShift >= 1 and huntForwardExpandShift <= 8
doAssert huntBackwardExpandShift >= 1 and huntBackwardExpandShift <= 8
# Make sure that request/response wire protocol messages are id-tracked and
# would not overlap (no multi-protocol legacy support)
doAssert 66 <= protocol.ethVersion
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc hunt(sp: WorkerBuddy): WorkerHuntEx =
sp.workerBase.WorkerHuntEx
proc `hunt=`(sp: WorkerBuddy; value: WorkerHuntEx) =
sp.workerBase = value
proc new(T: type WorkerHuntEx; syncMode: WorkerMode): T =
T(syncMode: syncMode,
lowNumber: 0.toBlockNumber.BlockNumber,
highNumber: high(BlockNumber).BlockNumber, # maximum uncertainty range.
bestNumber: 0.toBlockNumber.BlockNumber,
bestHash: ZERO_HASH256.BlockHash, # whatever
step: 0u)
# ------------------------------------------------------------------------------
# Private logging helpers
# ------------------------------------------------------------------------------
proc traceSyncLocked(sp: SnapPeer, number: BlockNumber, hash: BlockHash) =
proc traceSyncLocked(sp: WorkerBuddy, number: BlockNumber, hash: BlockHash) =
## Trace messages when peer canonical head is confirmed or updated.
let
bestBlock = sp.ns.pp(hash, number)
@ -141,7 +182,7 @@ proc traceSyncLocked(sp: SnapPeer, number: BlockNumber, hash: BlockHash) =
debug "Peer chain head reorg detected", peer,
advance=(sp.hunt.bestNumber - number), bestBlock
# proc peerSyncChainTrace(sp: SnapPeer) =
# proc peerSyncChainTrace(sp: WorkerBuddy) =
# ## To be called after `peerSyncChainRequest` has updated state.
# case sp.hunt.syncMode:
# of SyncLocked:
@ -150,19 +191,19 @@ proc traceSyncLocked(sp: SnapPeer, number: BlockNumber, hash: BlockHash) =
# of SyncOnlyHash:
# trace "OnlyHash",
# bestBlock = sp.ns.pp(sp.hunt.bestHash, sp.hunt.bestNumber)
# of SyncHuntForward:
# of HuntForward:
# template highMax(n: BlockNumber): string =
# if n == high(BlockNumber): "max" else: $n
# trace "HuntForward",
# low=sp.hunt.lowNumber, high=highMax(sp.hunt.highNumber),
# step=sp.hunt.step
# of SyncHuntBackward:
# of HuntBackward:
# trace "HuntBackward",
# low=sp.hunt.lowNumber, high=sp.hunt.highNumber, step=sp.hunt.step
# of SyncHuntRange:
# of HuntRange:
# trace "HuntRange",
# low=sp.hunt.lowNumber, high=sp.hunt.highNumber, step=sp.hunt.step
# of SyncHuntRangeFinal:
# of HuntRangeFinal:
# trace "HuntRangeFinal",
# low=sp.hunt.lowNumber, high=sp.hunt.highNumber, step=1
@ -170,19 +211,19 @@ proc traceSyncLocked(sp: SnapPeer, number: BlockNumber, hash: BlockHash) =
# Private functions
# ------------------------------------------------------------------------------
proc setSyncLocked(sp: SnapPeer, number: BlockNumber, hash: BlockHash) =
proc setSyncLocked(sp: WorkerBuddy, number: BlockNumber, hash: BlockHash) =
## Actions to take when peer canonical head is confirmed or updated.
sp.traceSyncLocked(number, hash)
sp.hunt.bestNumber = number
sp.hunt.bestHash = hash
sp.hunt.syncMode = SyncLocked
proc clearSyncStateRoot(sp: SnapPeer) =
proc clearSyncStateRoot(sp: WorkerBuddy) =
if sp.ctrl.stateRoot.isSome:
debug "Stopping state sync from this peer", peer=sp
sp.ctrl.stateRoot = none(TrieHash)
proc lockSyncStateRoot(sp: SnapPeer, number: BlockNumber, hash: BlockHash,
proc lockSyncStateRoot(sp: WorkerBuddy, number: BlockNumber, hash: BlockHash,
stateRoot: TrieHash) =
sp.setSyncLocked(number, hash)
@ -196,15 +237,15 @@ proc lockSyncStateRoot(sp: SnapPeer, number: BlockNumber, hash: BlockHash,
sp.ctrl.stateRoot = some(stateRoot)
if sp.ctrl.runState != SyncRunningOK:
sp.ctrl.runState = SyncRunningOK
if sp.ctrl.runState != BuddyRunningOK:
sp.ctrl.runState = BuddyRunningOK
trace "Starting to download block state", peer=sp,
thisBlock, stateRoot
asyncSpawn sp.fetch()
proc setHuntBackward(sp: SnapPeer, lowestAbsent: BlockNumber) =
proc setHuntBackward(sp: WorkerBuddy, lowestAbsent: BlockNumber) =
## Start exponential search mode backward due to new uncertainty.
sp.hunt.syncMode = SyncHuntBackward
sp.hunt.syncMode = HuntBackward
sp.hunt.step = 0
# Block zero is always present.
sp.hunt.lowNumber = 0.toBlockNumber
@ -212,15 +253,15 @@ proc setHuntBackward(sp: SnapPeer, lowestAbsent: BlockNumber) =
sp.hunt.highNumber = if lowestAbsent > 0: lowestAbsent else: 1.toBlockNumber
sp.clearSyncStateRoot()
proc setHuntForward(sp: SnapPeer, highestPresent: BlockNumber) =
proc setHuntForward(sp: WorkerBuddy, highestPresent: BlockNumber) =
## Start exponential search mode forward due to new uncertainty.
sp.hunt.syncMode = SyncHuntForward
sp.hunt.syncMode = HuntForward
sp.hunt.step = 0
sp.hunt.lowNumber = highestPresent
sp.hunt.highNumber = high(BlockNumber)
sp.clearSyncStateRoot()
proc updateHuntAbsent(sp: SnapPeer, lowestAbsent: BlockNumber) =
proc updateHuntAbsent(sp: WorkerBuddy, lowestAbsent: BlockNumber) =
## Converge uncertainty range backward.
if lowestAbsent < sp.hunt.highNumber:
sp.hunt.highNumber = lowestAbsent
@ -231,7 +272,7 @@ proc updateHuntAbsent(sp: SnapPeer, lowestAbsent: BlockNumber) =
sp.setHuntBackward(lowestAbsent)
sp.clearSyncStateRoot()
proc updateHuntPresent(sp: SnapPeer, highestPresent: BlockNumber) =
proc updateHuntPresent(sp: WorkerBuddy, highestPresent: BlockNumber) =
## Converge uncertainty range forward.
if highestPresent > sp.hunt.lowNumber:
sp.hunt.lowNumber = highestPresent
@ -242,7 +283,7 @@ proc updateHuntPresent(sp: SnapPeer, highestPresent: BlockNumber) =
sp.setHuntForward(highestPresent)
sp.clearSyncStateRoot()
proc peerSyncChainEmptyReply(sp: SnapPeer, request: BlocksRequest) =
proc peerSyncChainEmptyReply(sp: WorkerBuddy, request: BlocksRequest) =
## Handle empty `GetBlockHeaders` reply. This means `request.startBlock` is
## absent on the peer. If it was `SyncLocked` there must have been a reorg
## and the previous canonical chain head has disappeared. If hunting, this
@ -279,7 +320,7 @@ proc peerSyncChainEmptyReply(sp: SnapPeer, request: BlocksRequest) =
# Due to a reorg, peer doesn't have the block hash it originally gave us.
# Switch to hunt forward from block zero to find the canonical head.
sp.setHuntForward(0.toBlockNumber)
of SyncHuntForward, SyncHuntBackward, SyncHuntRange, SyncHuntRangeFinal:
of HuntForward, HuntBackward, HuntRange, HuntRangeFinal:
# Update the hunt range.
sp.updateHuntAbsent(lowestAbsent)
@ -291,7 +332,7 @@ proc peerSyncChainEmptyReply(sp: SnapPeer, request: BlocksRequest) =
sp.hunt.bestHash = default(typeof(sp.hunt.bestHash))
sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber)
proc peerSyncChainNonEmptyReply(sp: SnapPeer, request: BlocksRequest,
proc peerSyncChainNonEmptyReply(sp: WorkerBuddy, request: BlocksRequest,
headers: openArray[BlockHeader]) =
## Handle non-empty `GetBlockHeaders` reply. This means `request.startBlock`
## is present on the peer and in its canonical chain (unless the request was
@ -336,7 +377,7 @@ proc peerSyncChainNonEmptyReply(sp: SnapPeer, request: BlocksRequest,
of SyncOnlyHash:
# As `SyncLocked` but without the block number check.
sp.setHuntForward(highestPresent)
of SyncHuntForward, SyncHuntBackward, SyncHuntRange, SyncHuntRangeFinal:
of HuntForward, HuntBackward, HuntRange, HuntRangeFinal:
# Update the hunt range.
sp.updateHuntPresent(highestPresent)
@ -347,7 +388,7 @@ proc peerSyncChainNonEmptyReply(sp: SnapPeer, request: BlocksRequest,
sp.hunt.bestHash = headers[highestIndex].blockHash.BlockHash
sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber)
proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest =
proc peerSyncChainRequest(sp: WorkerBuddy): BlocksRequest =
## Choose `GetBlockHeaders` parameters when hunting or following the canonical
## chain of a peer.
if sp.hunt.syncMode == SyncLocked:
@ -397,7 +438,7 @@ proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest =
# `hunt.lowNumber+1` must not be used consistently as the start, even with a
# large enough query step size, as that will sometimes take O(N) to converge
# in both the exponential and quasi-binary searches. (Ending at
# `hunt.highNumber-1` is fine if `syncHuntQuerySize > 1`. This asymmetry is
# `hunt.highNumber-1` is fine if `huntQuerySize > 1`. This asymmetry is
# due to ascending queries (see earlier comment), and non-empty truncated
# query reply being proof of presence before the truncation point, but not
# proof of absence after it. A reply can be truncated just because the peer
@ -406,7 +447,7 @@ proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest =
# The proportional gap requirement is why we divide by query size here,
# instead of stretching to fit more strictly with `(range-1)/(size-1)`.
const syncHuntFinalSize = max(2, syncHuntQuerySize)
const huntFinalSize = max(2, huntQuerySize)
var maxStep = 0u
let fullRangeClamped =
@ -414,23 +455,23 @@ proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest =
else: min(high(uint).toBlockNumber,
sp.hunt.highNumber - sp.hunt.lowNumber).truncate(uint) - 1
if fullRangeClamped >= syncHuntFinalSize: # `SyncHuntRangeFinal` condition.
maxStep = if syncHuntQuerySize == 1:
if fullRangeClamped >= huntFinalSize: # `HuntRangeFinal` condition.
maxStep = if huntQuerySize == 1:
fullRangeClamped
elif (syncHuntQuerySize and (syncHuntQuerySize-1)) == 0:
fullRangeClamped shr fastLog2(syncHuntQuerySize)
elif (huntQuerySize and (huntQuerySize-1)) == 0:
fullRangeClamped shr fastLog2(huntQuerySize)
else:
fullRangeClamped div syncHuntQuerySize
doAssert syncHuntFinalSize >= syncHuntQuerySize
fullRangeClamped div huntQuerySize
doAssert huntFinalSize >= huntQuerySize
doAssert maxStep >= 1 # Ensured by the above assertion.
# Check for exponential search (expanding). Iterate `hunt.step`. O(log N)
# requires `startBlock` to be offset from `hunt.lowNumber`/`hunt.highNumber`.
if sp.hunt.syncMode in {SyncHuntForward, SyncHuntBackward} and
fullRangeClamped >= syncHuntFinalSize:
let forward = sp.hunt.syncMode == SyncHuntForward
let expandShift = if forward: syncHuntForwardExpandShift
else: syncHuntBackwardExpandShift
if sp.hunt.syncMode in {HuntForward, HuntBackward} and
fullRangeClamped >= huntFinalSize:
let forward = sp.hunt.syncMode == HuntForward
let expandShift = if forward: huntForwardExpandShift
else: huntBackwardExpandShift
# Switches to range search when this condition is no longer true.
if sp.hunt.step < maxStep shr expandShift:
# The `if` above means the next line cannot overflow.
@ -438,25 +479,25 @@ proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest =
# Satisfy the O(log N) convergence conditions.
result.startBlock.number =
if forward: sp.hunt.lowNumber + sp.hunt.step.toBlockNumber
else: sp.hunt.highNumber - (sp.hunt.step * syncHuntQuerySize).toBlockNumber
result.maxResults = syncHuntQuerySize
else: sp.hunt.highNumber - (sp.hunt.step * huntQuerySize).toBlockNumber
result.maxResults = huntQuerySize
result.skip = sp.hunt.step - 1
return
# For tracing/display.
sp.hunt.step = maxStep
sp.hunt.syncMode = SyncHuntRange
sp.hunt.syncMode = HuntRange
if maxStep > 0:
# Quasi-binary search (converging in a range). O(log N) requires
# `startBlock` to satisfy the constraints described above, with the
# proportionality from both ends of the range. The optimal information
# gathering position is tricky and doesn't make much difference, so don't
# bother. We'll centre the query in the range.
var offset = fullRangeClamped - maxStep * (syncHuntQuerySize-1)
var offset = fullRangeClamped - maxStep * (huntQuerySize-1)
# Rounding must bias towards end to ensure `offset >= 1` after this.
offset -= offset shr 1
result.startBlock.number = sp.hunt.lowNumber + offset.toBlockNumber
result.maxResults = syncHuntQuerySize
result.maxResults = huntQuerySize
result.skip = maxStep - 1
else:
# Small range, final step. At `fullRange == 0` we must query at least one
@ -468,7 +509,7 @@ proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest =
# in one round trip, and accommodate a small reorg or extension.
const afterSoftMax = syncLockedMinimumReply - syncLockedQueryOverlap
const beforeHardMax = syncLockedQueryOverlap
let extra = syncHuntFinalSize - fullRangeClamped
let extra = huntFinalSize - fullRangeClamped
var before = (extra + 1) shr 1
before = max(before + afterSoftMax, extra) - afterSoftMax
before = min(before, beforeHardMax)
@ -476,15 +517,15 @@ proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest =
result.startBlock.number =
if sp.hunt.bestNumber <= before.toBlockNumber: 1.toBlockNumber
else: min(sp.hunt.bestNumber - before.toBlockNumber,
high(BlockNumber) - (syncHuntFinalSize - 1).toBlockNumber)
result.maxResults = syncHuntFinalSize
sp.hunt.syncMode = SyncHuntRangeFinal
high(BlockNumber) - (huntFinalSize - 1).toBlockNumber)
result.maxResults = huntFinalSize
sp.hunt.syncMode = HuntRangeFinal
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc collectBlockHeaders*(sp: SnapPeer) {.async.} =
proc workerExec*(sp: WorkerBuddy) {.async.} =
## Query a peer to update our knowledge of its canonical chain and its best
## block, which is its canonical chain head. This can be called at any time
## after a peer has negotiated the connection.
@ -510,7 +551,7 @@ proc collectBlockHeaders*(sp: SnapPeer) {.async.} =
trace trEthRecvError & "waiting for reply to GetBlockHeaders", peer=sp,
error=e.msg
inc sp.stats.major.networkErrors
sp.ctrl.runState = SyncStopped
sp.ctrl.runState = BuddyStopped
return
if reply.isNone:
@ -542,8 +583,39 @@ proc collectBlockHeaders*(sp: SnapPeer) {.async.} =
else:
sp.peerSyncChainEmptyReply(request)
proc collectDataSetup*(sp: SnapPeer) =
sp.replyDataSetup
proc workerStart*(sp: WorkerBuddy): bool =
## Initialise `WorkerBuddy` to support `workerBlockHeaders()` calls
# Initialise `DataNode` reply handling
sp.fetchSetup
# Link in hunt descriptor
sp.hunt = WorkerHuntEx.new(HuntForward)
if sp.peer.state(eth).initialized:
# We know the hash but not the block number.
sp.hunt.bestHash = sp.peer.state(eth).bestBlockHash.BlockHash
# TODO: Temporarily disabled because it's useful to test the head hunter.
# sp.syncMode = SyncOnlyHash
return true
trace "State(eth) not initialized!"
proc workerLockedOk*(sp: WorkerBuddy): bool =
sp.hunt.syncMode == SyncLocked
# ------------------------------------------------------------------------------
# Debugging
# ------------------------------------------------------------------------------
proc huntPp*(sn: WorkerBuddy): string =
let hx = sn.hunt
result &= "(mode=" & $hx.syncMode
result &= ",num=(" & hx.lowNumber.pp & "," & hx.highNumber.pp & ")"
result &= ",best=(" & hx.bestNumber.pp & "," & hx.bestHash.pp & ")"
result &= ",step=" & $hx.step
result &= ")"
# ------------------------------------------------------------------------------
# End

View File

@ -0,0 +1,204 @@
# Nimbus - Fetch account and storage states from peers efficiently
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
std/[sets, sequtils, strutils],
chronos,
chronicles,
eth/[common/eth_types, p2p],
stint,
../path_desc,
"."/[timer_helper, worker_desc]
{.push raises: [Defect].}
logScope:
topics = "snap peer common"
type
CommonFetchEx* = ref object of CommonBase
## Account fetching state that is shared among all peers.
# Leaf path ranges not fetched or in progress on any peer.
leafRanges*: seq[LeafRange]
countAccounts*: int64
countAccountBytes*: int64
countRange*: UInt256
countRangeStarted*: bool
countRangeSnap*: UInt256
countRangeSnapStarted*: bool
countRangeTrie*: UInt256
countRangeTrieStarted*: bool
logTicker: TimerCallback
# ------------------------------------------------------------------------------
# Private timer helpers
# ------------------------------------------------------------------------------
proc rangeFraction(value: UInt256, discriminator: bool): int =
## Format a value in the range 0..2^256 as a percentage, 0-100%. As the
## top of the range 2^256 cannot be represented in `UInt256` it actually
## has the value `0: UInt256`, and with that value, `discriminator` is
## consulted to decide between 0% and 100%. For other values, the value is
## constrained to be between slightly above 0% and slightly below 100%,
## so that the endpoints are distinctive when displayed.
const multiplier = 10000
var fraction: int = 0 # Fixed point, fraction 0.0-1.0 multiplied up.
if value == 0:
return if discriminator: multiplier else: 0 # Either 100.00% or 0.00%.
const shift = 8 * (sizeof(value) - sizeof(uint64))
const wordHigh: uint64 = (high(typeof(value)) shr shift).truncate(uint64)
# Divide `wordHigh+1` by `multiplier`, rounding up, avoiding overflow.
const wordDiv: uint64 = 1 + ((wordHigh shr 1) div (multiplier.uint64 shr 1))
let wordValue: uint64 = (value shr shift).truncate(uint64)
let divided: uint64 = wordValue div wordDiv
return if divided >= multiplier: multiplier - 1
elif divided <= 0: 1
else: divided.int
proc percent(value: UInt256, discriminator: bool): string =
result = intToStr(rangeFraction(value, discriminator), 3)
result.insert(".", result.len - 2)
result.add('%')
# ------------------------------------------------------------------------------
# Private functions
# ------------------------------------------------------------------------------
proc setLogTicker(sf: CommonFetchEx; at: Moment) {.gcsafe.}
proc runLogTicker(sf: CommonFetchEx) {.gcsafe.} =
doAssert not sf.isNil
info "State: Account sync progress",
percent = percent(sf.countRange, sf.countRangeStarted),
accounts = sf.countAccounts,
snap = percent(sf.countRangeSnap, sf.countRangeSnapStarted),
trie = percent(sf.countRangeTrie, sf.countRangeTrieStarted)
sf.setLogTicker(Moment.fromNow(1.seconds))
proc setLogTicker(sf: CommonFetchEx; at: Moment) =
sf.logTicker = safeSetTimer(at, runLogTicker, sf)
proc new*(T: type CommonFetchEx; startAfter = 100.milliseconds): T =
result = CommonFetchEx(
leafRanges: @[LeafRange(
leafLow: LeafPath.low,
leafHigh: LeafPath.high)])
result.logTicker = safeSetTimer(
Moment.fromNow(startAfter),
runLogTicker,
result)
# ------------------------------------------------------------------------------
# Private setters
# ------------------------------------------------------------------------------
proc `sharedFetchEx=`(ns: Worker; value: CommonFetchEx) =
## Handy helper
ns.commonBase = value
# ------------------------------------------------------------------------------
# Public getters
# ------------------------------------------------------------------------------
proc sharedFetchEx*(ns: Worker): CommonFetchEx =
## Handy helper
ns.commonBase.CommonFetchEx
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc hasSlice*(sp: WorkerBuddy): bool =
## Return `true` iff `getSlice` would return a free slice to work on.
if sp.ns.sharedFetchEx.isNil:
sp.ns.sharedFetchEx = CommonFetchEx.new
result = 0 < sp.ns.sharedFetchEx.leafRanges.len
trace "hasSlice", peer=sp, hasSlice=result
proc getSlice*(sp: WorkerBuddy, leafLow, leafHigh: var LeafPath): bool =
## Claim a free slice to work on. If a slice was available, it's claimed,
## `leadLow` and `leafHigh` are set to the slice range and `true` is
## returned. Otherwise `false` is returned.
if sp.ns.sharedFetchEx.isNil:
sp.ns.sharedFetchEx = CommonFetchEx.new
let sharedFetch = sp.ns.sharedFetchEx
template ranges: auto = sharedFetch.leafRanges
const leafMaxFetchRange = (high(LeafPath) - low(LeafPath)) div 1000
if ranges.len == 0:
trace "GetSlice", leafRange="none"
return false
leafLow = ranges[0].leafLow
if ranges[0].leafHigh - ranges[0].leafLow <= leafMaxFetchRange:
leafHigh = ranges[0].leafHigh
ranges.delete(0)
else:
leafHigh = leafLow + leafMaxFetchRange
ranges[0].leafLow = leafHigh + 1
trace "GetSlice", peer=sp, leafRange=pathRange(leafLow, leafHigh)
return true
proc putSlice*(sp: WorkerBuddy, leafLow, leafHigh: LeafPath) =
## Return a slice to the free list, merging with the rest of the list.
let sharedFetch = sp.ns.sharedFetchEx
template ranges: auto = sharedFetch.leafRanges
trace "PutSlice", leafRange=pathRange(leafLow, leafHigh), peer=sp
var i = 0
while i < ranges.len and leafLow > ranges[i].leafHigh:
inc i
if i > 0 and leafLow - 1 == ranges[i-1].leafHigh:
dec i
var j = i
while j < ranges.len and leafHigh >= ranges[j].leafLow:
inc j
if j < ranges.len and leafHigh + 1 == ranges[j].leafLow:
inc j
if j == i:
ranges.insert(LeafRange(leafLow: leafLow, leafHigh: leafHigh), i)
else:
if j-1 > i:
ranges[i].leafHigh = ranges[j-1].leafHigh
ranges.delete(i+1, j-1)
if leafLow < ranges[i].leafLow:
ranges[i].leafLow = leafLow
if leafHigh > ranges[i].leafHigh:
ranges[i].leafHigh = leafHigh
template getSlice*(sp: WorkerBuddy, leafRange: var LeafRange): bool =
sp.getSlice(leafRange.leafLow, leafRange.leafHigh)
template putSlice*(sp: WorkerBuddy, leafRange: LeafRange) =
sp.putSlice(leafRange.leafLow, leafRange.leafHigh)
proc countSlice*(sp: WorkerBuddy, leafLow, leafHigh: LeafPath, which: bool) =
doAssert leafLow <= leafHigh
sp.ns.sharedFetchEx.countRange += leafHigh - leafLow + 1
sp.ns.sharedFetchEx.countRangeStarted = true
if which:
sp.ns.sharedFetchEx.countRangeSnap += leafHigh - leafLow + 1
sp.ns.sharedFetchEx.countRangeSnapStarted = true
else:
sp.ns.sharedFetchEx.countRangeTrie += leafHigh - leafLow + 1
sp.ns.sharedFetchEx.countRangeTrieStarted = true
template countSlice*(sp: WorkerBuddy, leafRange: LeafRange, which: bool) =
sp.countSlice(leafRange.leafLow, leafRange.leafHigh, which)
proc countAccounts*(sp: WorkerBuddy, len: int) =
sp.ns.sharedFetchEx.countAccounts += len
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -16,18 +16,18 @@ import
stint,
eth/[common/eth_types, p2p],
../../types,
".."/[path_desc, base_desc],
"."/[common, fetch_trie, fetch_snap]
../path_desc,
"."/[common, fetch_trie, fetch_snap, worker_desc]
{.push raises: [Defect].}
logScope:
topics = "snap peer fetch"
# Note: To test disabling snap (or trie), modify `peerSupportsGetNodeData` or
# Note: To test disabling snap (or trie), modify `fetchTrieOk` or
# `fetchSnapOk` where those are defined.
proc fetch*(sp: SnapPeer) {.async.} =
proc fetch*(sp: WorkerBuddy) {.async.} =
var stateRoot = sp.ctrl.stateRoot.get
trace "Syncing from stateRoot", peer=sp, stateRoot
@ -52,9 +52,9 @@ proc fetch*(sp: SnapPeer) {.async.} =
if stateRoot != sp.ctrl.stateRoot.get:
trace "Syncing from new stateRoot", peer=sp, stateRoot
stateRoot = sp.ctrl.stateRoot.get
sp.ctrl.runState = SyncRunningOK
sp.ctrl.runState = BuddyRunningOK
if sp.ctrl.runState == SyncStopRequest:
if sp.ctrl.runState == BuddyStopRequest:
trace "Pausing sync until we get a new state root", peer=sp
while sp.ctrl.stateRoot.isSome and stateRoot == sp.ctrl.stateRoot.get and
(sp.fetchTrieOk or sp.fetchSnapOk) and
@ -82,3 +82,7 @@ proc fetch*(sp: SnapPeer) {.async.} =
trace "GetNodeData segment", peer=sp,
leafRange=pathRange(leafRange.leafLow, leafRange.leafHigh), stateRoot
await sp.fetchTrie(stateRoot, leafRange)
proc fetchSetup*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `ReplyData.new()` calls.
sp.fetchTrieSetup

View File

@ -24,10 +24,9 @@ import
chronos,
eth/[common/eth_types, p2p],
nimcrypto/keccak,
#stint,
"../.."/[protocol, types],
".."/[base_desc, path_desc],
./common
../path_desc,
"."/[common, worker_desc]
{.push raises: [Defect].}
@ -38,14 +37,14 @@ const
snapRequestBytesLimit = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
proc fetchSnap*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange)
proc fetchSnap*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
{.async.} =
## Fetch data using the `snap#` protocol
var origin = leafRange.leafLow
var limit = leafRange.leafHigh
const responseBytes = 2 * 1024 * 1024
if sp.ctrl.runState == SyncStopped:
if sp.ctrl.runState == BuddyStopped:
trace trSnapRecvError &
"peer already disconnected, not sending GetAccountRange",
peer=sp, accountRange=pathRange(origin, limit),
@ -65,7 +64,7 @@ proc fetchSnap*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange)
trace trSnapRecvError & "waiting for reply to GetAccountRange", peer=sp,
error=e.msg
inc sp.stats.major.networkErrors
sp.ctrl.runState = SyncStopped
sp.ctrl.runState = BuddyStopped
sp.putSlice(leafRange)
return
@ -97,7 +96,7 @@ proc fetchSnap*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange)
got=len, proofLen=proof.len, gotRange="-", requestedRange, stateRoot
sp.putSlice(leafRange)
# Don't keep retrying snap for this state.
sp.ctrl.runState = SyncStopRequest
sp.ctrl.runState = BuddyStopRequest
else:
trace trSnapRecvGot & "END reply AccountRange", peer=sp,
got=len, proofLen=proof.len, gotRange=pathRange(origin, high(LeafPath)),
@ -137,7 +136,7 @@ proc fetchSnap*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange)
sp.countAccounts(keepAccounts)
proc fetchSnapOk*(sp: SnapPeer): bool =
proc fetchSnapOk*(sp: WorkerBuddy): bool =
## Sort of getter: if `true`, fetching data using the `snap#` protocol
## is supported.
sp.ctrl.runState != SyncStopped and sp.peer.supports(snap)
sp.ctrl.runState != BuddyStopped and sp.peer.supports(snap)

View File

@ -27,8 +27,9 @@ import
chronos,
eth/[common/eth_types, p2p],
"../.."/[protocol/trace_config, types],
".."/[base_desc, path_desc],
"."/[common, reply_data, sync_fetch_xdesc, validate_trienode]
../path_desc,
"."/[common, reply_data, validate_trienode, worker_desc]
{.push raises: [Defect].}
@ -48,9 +49,9 @@ type
path: InteriorPath
future: Future[Blob]
FetchStateEx = ref object of SnapPeerFetchBase
FetchStateEx = ref object of FetchTrieBase
## Account fetching state on a single peer.
sp: SnapPeer
sp: WorkerBuddy
nodeGetQueue: seq[SingleNodeRequest]
nodeGetsInFlight: int
scheduledBatch: bool
@ -63,13 +64,13 @@ type
unwindAccountBytes: int64
finish: Future[void]
proc fetchStateEx(sp: SnapPeer): FetchStateEx =
sp.fetchState.FetchStateEx
proc fetchStateEx(sp: WorkerBuddy): FetchStateEx =
sp.fetchTrieBase.FetchStateEx
proc `fetchStateEx=`(sp: SnapPeer; value: FetchStateEx) =
sp.fetchState = value
proc `fetchStateEx=`(sp: WorkerBuddy; value: FetchStateEx) =
sp.fetchTrieBase = value
proc new(T: type FetchStateEx; peer: SnapPeer): T =
proc new(T: type FetchStateEx; peer: WorkerBuddy): T =
FetchStateEx(sp: peer)
# Forward declaration.
@ -91,7 +92,7 @@ proc wrapCallGetNodeData(fetch: FetchStateEx, hashes: seq[NodeHash],
if reply.replyType == NoReplyData:
# Empty reply, timeout or error (i.e. `reply.isNil`).
# It means there are none of the nodes available.
fetch.sp.ctrl.runState = SyncStopRequest
fetch.sp.ctrl.runState = BuddyStopRequest
for i in 0 ..< futures.len:
futures[i].complete(@[])
@ -159,7 +160,7 @@ proc batchGetNodeData(fetch: FetchStateEx) =
trace "Trie: Sort length", sortLen=i
# If stopped, abort all waiting nodes, so they clean up.
if fetch.sp.ctrl.runState != SyncRunningOk:
if fetch.sp.ctrl.runState != BuddyRunningOk:
while i > 0:
fetch.nodeGetQueue[i].future.complete(@[])
dec i
@ -217,7 +218,7 @@ proc getNodeData(fetch: FetchStateEx,
fetch.scheduleBatchGetNodeData()
let nodeBytes = await future
if fetch.sp.ctrl.runState != SyncRunningOk:
if fetch.sp.ctrl.runState != BuddyRunningOk:
return nodebytes
when trEthTracePacketsOk:
@ -251,20 +252,20 @@ proc pathInRange(fetch: FetchStateEx, path: InteriorPath): bool =
proc traverse(fetch: FetchStateEx, hash: NodeHash, path: InteriorPath,
fromExtension: bool) {.async.} =
template errorReturn() =
fetch.sp.ctrl.runState = SyncStopRequest
fetch.sp.ctrl.runState = BuddyStopRequest
dec fetch.nodesInFlight
if fetch.nodesInFlight == 0:
fetch.finish.complete()
return
# If something triggered stop earlier, don't request, and clean up now.
if fetch.sp.ctrl.runState != SyncRunningOk:
if fetch.sp.ctrl.runState != BuddyRunningOk:
errorReturn()
let nodeBytes = await fetch.getNodeData(hash.TrieHash, path)
# If something triggered stop, clean up now.
if fetch.sp.ctrl.runState != SyncRunningOk:
if fetch.sp.ctrl.runState != BuddyRunningOk:
errorReturn()
# Don't keep emitting error messages after one error. We'll allow 10.
if fetch.getNodeDataErrors >= 10:
@ -316,7 +317,7 @@ proc traverse(fetch: FetchStateEx, hash: NodeHash, path: InteriorPath,
if fetch.nodesInFlight == 0:
fetch.finish.complete()
proc probeGetNodeData(sp: SnapPeer, stateRoot: TrieHash): Future[bool]
proc probeGetNodeData(sp: WorkerBuddy, stateRoot: TrieHash): Future[bool]
{.async.} =
# Before doing real trie traversal on this peer, send a probe request for
# `stateRoot` to see if it's worth pursuing at all. We will avoid reserving
@ -343,7 +344,7 @@ proc probeGetNodeData(sp: SnapPeer, stateRoot: TrieHash): Future[bool]
# Public functions
# ------------------------------------------------------------------------------
proc fetchTrie*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange)
proc fetchTrie*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
{.async.} =
if sp.fetchStateEx.isNil:
sp.fetchStateEx = FetchStateEx.new(sp)
@ -365,10 +366,14 @@ proc fetchTrie*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange)
sp.ns.sharedFetchEx.countAccountBytes -= fetch.unwindAccountBytes
sp.putSlice(leafRange)
proc fetchTrieOk*(sp: SnapPeer): bool =
sp.ctrl.runState != SyncStopped and
proc fetchTrieOk*(sp: WorkerBuddy): bool =
sp.ctrl.runState != BuddyStopped and
(sp.fetchStateEx.isNil or sp.fetchStateEx.getNodeDataErrors == 0)
proc fetchTrieSetup*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `replyDataGet()` calls.
sp.replyDataSetup
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -64,10 +64,9 @@ import
std/[sequtils, sets, tables, hashes],
chronos,
eth/[common/eth_types, p2p],
nimcrypto/keccak,
stint,
"../.."/[protocol, protocol/trace_config, types],
".."/[base_desc, path_desc, timer_helper]
../path_desc,
"."/[timer_helper, worker_desc]
{.push raises: [Defect].}
@ -86,14 +85,14 @@ type
MultipleEntriesReply
RequestData = ref object
sp: SnapPeer
sp: WorkerBuddy
hashes: seq[NodeHash]
future: Future[ReplyData]
timer: TimerCallback
pathRange: (InteriorPath, InteriorPath)
fullHashed: bool
RequestDataQueue = ref object of SnapPeerRequestsBase
RequestDataQueue = ref object of ReplyDataBase
liveRequests: HashSet[RequestData]
empties: int
# `OrderedSet` was considered instead of `seq` here, but it has a slow
@ -115,11 +114,11 @@ proc hash(hash: ptr Hash256): Hash =
proc `==`(hash1, hash2: ptr Hash256): bool =
hash1[] == hash2[]
proc requestsEx(sp: SnapPeer): RequestDataQueue =
sp.requests.RequestDataQueue
proc requestsEx(sp: WorkerBuddy): RequestDataQueue =
sp.replyDataBase.RequestDataQueue
proc `requestsEx=`(sp: SnapPeer; value: RequestDataQueue) =
sp.requests = value
proc `requestsEx=`(sp: WorkerBuddy; value: RequestDataQueue) =
sp.replyDataBase = value
# ------------------------------------------------------------------------------
# Private logging helpers
@ -155,7 +154,7 @@ proc traceGetNodeDataDisconnected(request: RequestData) =
trace trEthRecvError & "peer disconnected, not sending GetNodeData",
peer=request.sp, hashes=request.hashes.len, pathRange=request.pathRange
proc traceReplyDataEmpty(sp: SnapPeer, request: RequestData) =
proc traceReplyDataEmpty(sp: WorkerBuddy, request: RequestData) =
# `request` can be `nil` because we don't always know which request
# the empty reply goes with. Therefore `sp` must be included.
if request.isNil:
@ -164,7 +163,7 @@ proc traceReplyDataEmpty(sp: SnapPeer, request: RequestData) =
trace trEthRecvGot & "NodeData", peer=sp, got=0,
requested=request.hashes.len, pathRange=request.pathRange
proc traceReplyDataUnmatched(sp: SnapPeer, got: int) =
proc traceReplyDataUnmatched(sp: WorkerBuddy, got: int) =
# There is no request for this reply. Therefore `sp` must be included.
trace trEthRecvProtocolViolation & "non-reply NodeData", peer=sp, got
debug "Warning: Unexpected non-reply NodeData from peer"
@ -384,7 +383,7 @@ proc nodeDataTryEmpties(rq: RequestDataQueue) =
proc new(
T: type RequestData,
sp: SnapPeer,
sp: WorkerBuddy,
hashes: seq[NodeHash],
pathFrom, pathTo: InteriorPath
): RequestData =
@ -407,7 +406,7 @@ proc nodeDataEnqueueAndSend(request: RequestData) {.async.} =
## Helper function to send an `eth.GetNodeData` request.
## But not when we're draining the in flight queue to match empty replies.
let sp = request.sp
if sp.ctrl.runState == SyncStopped:
if sp.ctrl.runState == BuddyStopped:
request.traceGetNodeDataDisconnected()
request.future.complete(nil)
return
@ -427,10 +426,10 @@ proc nodeDataEnqueueAndSend(request: RequestData) {.async.} =
except CatchableError as e:
request.traceGetNodeDataSendError(e)
inc sp.stats.major.networkErrors
sp.ctrl.runState = SyncStopped
sp.ctrl.runState = BuddyStopped
request.future.fail(e)
proc onNodeData(sp: SnapPeer, data: openArray[Blob]) =
proc onNodeData(sp: WorkerBuddy, data: openArray[Blob]) =
## Handle an incoming `eth.NodeData` reply.
## Practically, this is also where all the incoming packet trace messages go.
let rq = sp.requestsEx
@ -484,7 +483,7 @@ proc onNodeData(sp: SnapPeer, data: openArray[Blob]) =
proc new*(
T: type ReplyData,
sp: SnapPeer,
sp: WorkerBuddy,
hashes: seq[NodeHash],
pathFrom = InteriorPath(),
pathTo = InteriorPath()
@ -514,7 +513,7 @@ proc new*(
except CatchableError as e:
request.traceReplyDataError(e)
inc sp.stats.major.networkErrors
sp.ctrl.runState = SyncStopped
sp.ctrl.runState = BuddyStopped
return nil
# Timeout, packet and packet error trace messages are done in `onNodeData`
@ -548,8 +547,9 @@ proc `[]`*(reply: ReplyData; inx: int): Blob =
if inx < reply.hashVerifiedData.len:
return reply.hashVerifiedData[inx]
proc replyDataSetup*(sp: SnapPeer) =
## Initialise `SnapPeer` to support `replyDataGet()` calls.
proc replyDataSetup*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `NodeData` replies to `GetNodeData`
## requests issued by `new()`.
if sp.requestsEx.isNil:
sp.requestsEx = RequestDataQueue()

View File

@ -33,7 +33,8 @@
import
eth/[common/eth_types, p2p],
../../types,
".."/[base_desc, path_desc]
../path_desc,
./worker_desc
{.push raises: [Defect].}
@ -88,7 +89,7 @@ template nodeError(msg: string{lit}, more: varargs[untyped]) =
#echo inspect(rlpFromBytes(nodeBytes))
inc context.errors
proc parseLeafValue(sp: SnapPeer,
proc parseLeafValue(sp: WorkerBuddy,
nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob,
nodeRlp: var Rlp, leafPath: InteriorPath,
context: var TrieNodeParseContext
@ -129,13 +130,13 @@ proc parseLeafValue(sp: SnapPeer,
# echo inspect(rlpFromBytes(leafBytes))
# Forward declaration, used for bounded, rare recursion.
proc parseTrieNode*(sp: SnapPeer,
proc parseTrieNode*(sp: WorkerBuddy,
nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob,
fromExtension: bool,
context: var TrieNodeParseContext
) {.gcsafe, raises: [Defect, RlpError].}
proc parseExtensionChild(sp: SnapPeer,
proc parseExtensionChild(sp: WorkerBuddy,
nodePath: InteriorPath, nodeHash: NodeHash,
nodeBytes: Blob, nodeRlp: var Rlp,
childPath: InteriorPath,
@ -183,7 +184,7 @@ proc parseExtensionChild(sp: SnapPeer,
else:
childError "Extension node child (RLP element 1) has length > 32 bytes"
proc parseExtensionOrLeaf(sp: SnapPeer,
proc parseExtensionOrLeaf(sp: WorkerBuddy,
nodePath: InteriorPath, nodeHash: NodeHash,
nodeBytes: Blob, nodeRlp: var Rlp,
fromExtension: bool,
@ -271,7 +272,7 @@ proc parseExtensionOrLeaf(sp: SnapPeer,
sp.parseExtensionChild(nodePath, nodeHash, nodeBytes, nodeRlp,
childPath, context)
proc parseBranchNode(sp: SnapPeer,
proc parseBranchNode(sp: WorkerBuddy,
nodePath: InteriorPath, nodeHash: NodeHash,
nodeBytes: Blob, nodeRlp: var Rlp,
context: var TrieNodeParseContext
@ -345,7 +346,7 @@ proc parseBranchNode(sp: SnapPeer,
branches=branchCount, minBranches=2
return
proc parseTrieNode*(sp: SnapPeer,
proc parseTrieNode*(sp: WorkerBuddy,
nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob,
fromExtension: bool, context: var TrieNodeParseContext
) {.raises: [Defect, RlpError].} =
@ -381,8 +382,8 @@ proc parseTrieNode*(sp: SnapPeer,
## root node of a trie, otherwise it is the value stored in `childQueue`
## from parsing the parent node.
##
## - The `sp: SnapPeerBase` is like the hash, only used for diagnostics. When
## there is invalid data, it's useful to show where we got it from.
## - The `sp: WorkerBuddyBase` is like the hash, only used for diagnostics.
## When there is invalid data, it's useful to show where we got it from.
##
## - Some limited recursion is possible while parsing, because of how < 32
## byte nodes are encoded inside others. When recursion occurs, the number
@ -445,7 +446,7 @@ proc parseTrieNode*(sp: SnapPeer,
listLen=nodeListLen
return
proc parseTrieNodeError*(sp: SnapPeer, nodePath: InteriorPath,
proc parseTrieNodeError*(sp: WorkerBuddy, nodePath: InteriorPath,
nodeHash: NodeHash, nodeBytes: Blob,
context: var TrieNodeParseContext,
exception: ref RlpError) =

View File

@ -0,0 +1,161 @@
# Nimbus - New sync approach - A fusion of snap, trie, beam and other methods
#
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
# http://www.apache.org/licenses/LICENSE-2.0)
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
# http://opensource.org/licenses/MIT)
# at your option. This file may not be copied, modified, or distributed
# except according to those terms.
import
eth/[common/eth_types, p2p],
stew/[byteutils, keyed_queue, results],
../../types
{.push raises: [Defect].}
const
seenBlocksMax = 500
## Internal size of LRU cache (for debugging)
type
FetchTrieBase* = ref object of RootObj
## Stub object, to be inherited in file `fetch_trie.nim`
ReplyDataBase* = ref object of RootObj
## Stub object, to be inherited in file `reply_data.nim`
WorkerBase* = ref object of RootObj
## Stub object, to be inherited in file `worker.nim`
BuddyStat* = distinct uint
BuddyRunState* = enum
BuddyRunningOk
BuddyStopRequest
BuddyStopped
WorkerBuddyStats* = tuple
## Statistics counters for events associated with this peer.
## These may be used to recognise errors and select good peers.
ok: tuple[
reorgDetected: BuddyStat,
getBlockHeaders: BuddyStat,
getNodeData: BuddyStat]
minor: tuple[
timeoutBlockHeaders: BuddyStat,
unexpectedBlockHash: BuddyStat]
major: tuple[
networkErrors: BuddyStat,
excessBlockHeaders: BuddyStat,
wrongBlockHeader: BuddyStat]
WorkerBuddyCtrl* = tuple
## Control and state settings
stateRoot: Option[TrieHash]
## State root to fetch state for. This changes during sync and is
## slightly different for each peer.
runState: BuddyRunState
# -------
WorkerSeenBlocks = KeyedQueue[array[32,byte],BlockNumber]
## Temporary for pretty debugging, `BlockHash` keyed lru cache
CommonBase* = ref object of RootObj
## Stub object, to be inherited in file `common.nim`
# -------
WorkerBuddy* = ref object
## Non-inheritable peer state tracking descriptor.
ns*: Worker ## Worker descriptor object back reference
peer*: Peer ## Reference to eth p2pProtocol entry
stats*: WorkerBuddyStats ## Statistics counters
ctrl*: WorkerBuddyCtrl ## Control and state settings
workerBase*: WorkerBase ## Opaque object reference
replyDataBase*: ReplyDataBase ## Opaque object reference
fetchTrieBase*: FetchTrieBase ## Opaque object reference
Worker* = ref object of RootObj
## Shared state among all peers of a snap syncing node. Will be
## amended/inherited into `WorkerCtx` by the `snap` module.
seenBlock: WorkerSeenBlocks ## Temporary, debugging, pretty logs
commonBase*: CommonBase ## Opaque object reference
# ------------------------------------------------------------------------------
# Public Constructor
# ------------------------------------------------------------------------------
proc new*(
T: type WorkerBuddy;
ns: Worker;
peer: Peer;
runState: BuddyRunState
): T =
## Initial state, maximum uncertainty range.
T(ns: ns,
peer: peer,
ctrl: (
stateRoot: none(TrieHash),
runState: runState))
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc `$`*(sp: WorkerBuddy): string =
$sp.peer
proc inc(stat: var BuddyStat) {.borrow.}
# ------------------------------------------------------------------------------
# Public functions, debugging helpers (will go away eventually)
# ------------------------------------------------------------------------------
proc pp*(sn: Worker; bh: BlockHash): string =
## Pretty printer for debugging
let rc = sn.seenBlock.lruFetch(bh.untie.data)
if rc.isOk:
return "#" & $rc.value
$bh.untie.data.toHex
proc pp*(sn: Worker; bh: BlockHash; bn: BlockNumber): string =
## Pretty printer for debugging
let rc = sn.seenBlock.lruFetch(bh.untie.data)
if rc.isOk:
return "#" & $rc.value
"#" & $sn.seenBlock.lruAppend(bh.untie.data, bn, seenBlocksMax)
proc pp*(sn: Worker; bhn: HashOrNum): string =
if not bhn.isHash:
return "num(#" & $bhn.number & ")"
let rc = sn.seenBlock.lruFetch(bhn.hash.data)
if rc.isOk:
return "hash(#" & $rc.value & ")"
return "hash(" & $bhn.hash.data.toHex & ")"
proc seen*(sn: Worker; bh: BlockHash; bn: BlockNumber) =
## Register for pretty printing
if not sn.seenBlock.lruFetch(bh.untie.data).isOk:
discard sn.seenBlock.lruAppend(bh.untie.data, bn, seenBlocksMax)
# -----------
import
../../../tests/replay/pp_light
proc pp*(bh: BlockHash): string =
bh.Hash256.pp
proc pp*(bn: BlockNumber): string =
if bn == high(BlockNumber): "#max"
else: "#" & $bn
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------