561 lines
18 KiB
Nim
561 lines
18 KiB
Nim
# Nimbus
|
|
# Copyright (c) 2018-2021 Status Research & Development GmbH
|
|
# Licensed under either of
|
|
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or
|
|
# http://www.apache.org/licenses/LICENSE-2.0)
|
|
# * MIT license ([LICENSE-MIT](LICENSE-MIT) or
|
|
# http://opensource.org/licenses/MIT)
|
|
# at your option. This file may not be copied, modified, or distributed
|
|
# except according to those terms.
|
|
|
|
{.push raises: [].}
|
|
|
|
import
|
|
std/sequtils,
|
|
chronicles,
|
|
chronos,
|
|
eth/[common, p2p, trie/db, trie/nibbles],
|
|
stew/[byteutils, interval_set],
|
|
../../core/chain,
|
|
../../db/core_db/legacy_db,
|
|
../snap/[constants, range_desc],
|
|
../snap/worker/db/[hexary_desc, hexary_error, hexary_paths,
|
|
snapdb_persistent, hexary_range],
|
|
../protocol,
|
|
../protocol/snap/snap_types
|
|
|
|
logScope:
|
|
topics = "snap-wire"
|
|
|
|
type
|
|
SnapWireRef* = ref object of SnapWireBase
|
|
chain: ChainRef
|
|
elaFetchMax: chronos.Duration
|
|
dataSizeMax: int
|
|
peerPool: PeerPool
|
|
|
|
SlotsSpecs = object
|
|
slotFn: HexaryGetFn # For accessing storage slots
|
|
stoRoot: NodeKey # Storage root
|
|
|
|
const
|
|
extraTraceMessages = false # or true
|
|
## Enabled additional logging noise
|
|
|
|
estimatedNodeSize = hexaryRangeRlpNodesListSizeMax(1)
|
|
## Some expected upper limit for a single node
|
|
|
|
estimatedProofSize = hexaryRangeRlpNodesListSizeMax(10)
|
|
## Some expected upper limit, typically not mote than 10 proof nodes
|
|
|
|
emptySnapStorageList = seq[SnapStorage].default
|
|
## Dummy list for empty slots
|
|
|
|
defaultElaFetchMax = 990.milliseconds
|
|
## Fetching accounts or slots can be extensive, stop in the middle if
|
|
## it takes too long
|
|
|
|
defaultDataSizeMax = fetchRequestBytesLimit
|
|
## Truncate maximum data size
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
template logTxt(info: static[string]): static[string] =
|
|
"handlers.snap." & info
|
|
|
|
proc notImplemented(name: string) {.used.} =
|
|
debug "Wire handler method not implemented", meth=name
|
|
|
|
# ----------------------------------
|
|
|
|
proc getAccountFn(
|
|
ctx: SnapWireRef;
|
|
): HexaryGetFn
|
|
{.gcsafe.} =
|
|
# The snap sync implementation provides a function `persistentAccountGetFn()`
|
|
# similar to this one. But it is not safe to use it at the moment as the
|
|
# storage table might (or might not) differ.
|
|
let db = ctx.chain.com.db
|
|
return proc(key: openArray[byte]): Blob =
|
|
if db.isLegacy:
|
|
return db.kvt.backend.toLegacy.get(key)
|
|
|
|
proc getStoSlotFn(
|
|
ctx: SnapWireRef;
|
|
accKey: NodeKey;
|
|
): HexaryGetFn
|
|
{.gcsafe.} =
|
|
# The snap sync implementation provides a function
|
|
# `persistentStorageSlotsGetFn()` similar to this one. But it is not safe to
|
|
# use it at the moment as the storage table might (or might not) differ.
|
|
let db = ctx.chain.com.db
|
|
return proc(key: openArray[byte]): Blob =
|
|
if db.isLegacy:
|
|
return db.kvt.backend.toLegacy.get(key)
|
|
|
|
proc getCodeFn(
|
|
ctx: SnapWireRef;
|
|
): HexaryGetFn
|
|
{.gcsafe.} =
|
|
# It is save to borrow this function from the snap sync implementation.
|
|
ctx.chain.com.db.persistentContractsGetFn
|
|
|
|
# ----------------------------------
|
|
|
|
proc to(
|
|
rl: RangeLeaf;
|
|
T: type SnapAccount;
|
|
): T
|
|
{.gcsafe, raises: [RlpError].} =
|
|
## Convert the generic `RangeLeaf` argument to payload type.
|
|
T(accHash: rl.key.to(Hash256),
|
|
accBody: rl.data.decode(Account))
|
|
|
|
proc to(
|
|
rl: RangeLeaf;
|
|
T: type SnapStorage;
|
|
): T
|
|
{.gcsafe.} =
|
|
## Convert the generic `RangeLeaf` argument to payload type.
|
|
T(slotHash: rl.key.to(Hash256),
|
|
slotData: rl.data)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: fetch leaf range
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc getSlotsSpecs(
|
|
ctx: SnapWireRef; # Handler descriptor
|
|
rootKey: NodeKey; # State root
|
|
accGetFn: HexaryGetFn; # Database abstraction
|
|
accKey: NodeKey; # Current account
|
|
): Result[SlotsSpecs,void]
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Retrieve storage slots specs from account data
|
|
let accData = accKey.hexaryPath(rootKey, accGetFn).leafData
|
|
|
|
# Ignore missing account entry
|
|
if accData.len == 0:
|
|
when extraTraceMessages:
|
|
trace logTxt "getSlotsSpecs: no such account", accKey, rootKey
|
|
return err()
|
|
|
|
# Ignore empty storage list
|
|
let stoRoot = rlp.decode(accData,Account).storageRoot
|
|
if stoRoot == EMPTY_ROOT_HASH:
|
|
when extraTraceMessages:
|
|
trace logTxt "getSlotsSpecs: no slots", accKey
|
|
return err()
|
|
|
|
ok(SlotsSpecs(
|
|
slotFn: ctx.getStoSlotFn(accKey),
|
|
stoRoot: stoRoot.to(NodeKey)))
|
|
|
|
|
|
iterator doTrieNodeSpecs(
|
|
ctx: SnapWireRef; # Handler descriptor
|
|
rootKey: NodeKey; # State root
|
|
pGroups: openArray[SnapTriePaths]; # Group of partial paths
|
|
): (NodeKey, HexaryGetFn, Blob, int)
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Helper for `getTrieNodes()` to cycle over `pathGroups`
|
|
let accGetFn = ctx.getAccountFn
|
|
|
|
for w in pGroups:
|
|
# Special case: fetch account node
|
|
if w.slotPaths.len == 0:
|
|
yield (rootKey, accGetFn, w.accPath, 0)
|
|
continue
|
|
|
|
# Compile account key
|
|
var accKey: NodeKey
|
|
if accKey.init(w.accPath):
|
|
# Derive slot specs from accounts
|
|
let rc = ctx.getSlotsSpecs(rootKey, accGetFn, accKey)
|
|
if rc.isOk:
|
|
# Loop over slot paths
|
|
for path in w.slotPaths:
|
|
when extraTraceMessages:
|
|
trace logTxt "doTrieNodeSpecs",
|
|
rootKey=rc.value.stoRoot, slotPath=path.toHex
|
|
yield (rc.value.stoRoot, rc.value.slotFn, path, w.slotPaths.len)
|
|
continue
|
|
|
|
# Fail on this group
|
|
when extraTraceMessages:
|
|
trace logTxt "doTrieNodeSpecs (blind)", accPath=w.accPath.toHex,
|
|
nBlind=w.slotPaths.len, nBlind0=w.slotPaths[0].toHex
|
|
yield (NodeKey.default, nil, EmptyBlob, w.slotPaths.len)
|
|
|
|
|
|
proc mkNodeTagRange(
|
|
origin: openArray[byte];
|
|
limit: openArray[byte];
|
|
nAccounts = 1;
|
|
): Result[NodeTagRange,void] =
|
|
## Verify and convert range arguments to interval
|
|
var (minPt, maxPt) = (low(NodeTag), high(NodeTag))
|
|
|
|
if 0 < origin.len or 0 < limit.len:
|
|
|
|
# Range applies only if there is exactly one account. A number of accounts
|
|
# different from 1 may be used by `getStorageRanges()`
|
|
if nAccounts == 0:
|
|
return err() # oops: no account
|
|
|
|
# Verify range arguments
|
|
if not minPt.init(origin) or not maxPt.init(limit) or maxPt < minPt:
|
|
when extraTraceMessages:
|
|
trace logTxt "mkNodeTagRange: malformed range",
|
|
origin=origin.toHex, limit=limit.toHex
|
|
return err()
|
|
|
|
if 1 < nAccounts:
|
|
return ok(NodeTagRange.new(low(NodeTag), high(NodeTag)))
|
|
|
|
ok(NodeTagRange.new(minPt, maxPt))
|
|
|
|
|
|
proc fetchLeafRange(
|
|
ctx: SnapWireRef; # Handler descriptor
|
|
getFn: HexaryGetFn; # Database abstraction
|
|
rootKey: NodeKey; # State root
|
|
iv: NodeTagRange; # Proofed range of leaf paths
|
|
replySizeMax: int; # Updated size counter for the raw list
|
|
stopAt: Moment; # Implies timeout
|
|
): Result[RangeProof,HexaryError]
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Generic leaf fetcher
|
|
let
|
|
sizeMax = replySizeMax - estimatedProofSize
|
|
now = Moment.now()
|
|
timeout = if now < stopAt: stopAt - now else: 1.milliseconds
|
|
rc = getFn.hexaryRangeLeafsProof(rootKey, iv, sizeMax, timeout)
|
|
if rc.isErr:
|
|
error logTxt "fetchLeafRange: database problem",
|
|
iv, replySizeMax, error=rc.error
|
|
return rc # database error
|
|
|
|
let sizeOnWire = rc.value.leafsSize + rc.value.proofSize
|
|
if sizeOnWire <= replySizeMax:
|
|
return rc
|
|
|
|
# Estimate the overhead size on wire needed for a single leaf tail item
|
|
const leafExtraSize = (sizeof RangeLeaf()) - (sizeof newSeq[Blob](0))
|
|
|
|
let nLeafs = rc.value.leafs.len
|
|
when extraTraceMessages:
|
|
trace logTxt "fetchLeafRange: reducing reply sample",
|
|
iv, sizeOnWire, replySizeMax, nLeafs
|
|
|
|
# Strip parts of leafs result and amend remainder by adding proof nodes
|
|
var (tailSize, tailItems, reduceBy) = (0, 0, replySizeMax - sizeOnWire)
|
|
while tailSize <= reduceBy:
|
|
tailItems.inc
|
|
if nLeafs <= tailItems:
|
|
when extraTraceMessages:
|
|
trace logTxt "fetchLeafRange: stripping leaf list failed",
|
|
iv, replySizeMax, nLeafs, tailItems
|
|
return err(DataSizeError) # empty tail (package size too small)
|
|
tailSize += rc.value.leafs[^tailItems].data.len + leafExtraSize
|
|
|
|
# Provide truncated leafs list
|
|
let
|
|
leafProof = getFn.hexaryRangeLeafsProof(
|
|
rootKey, RangeProof(leafs: rc.value.leafs[0 ..< nLeafs - tailItems]))
|
|
strippedSizeOnWire = leafProof.leafsSize + leafProof.proofSize
|
|
if strippedSizeOnWire <= replySizeMax:
|
|
return ok(leafProof)
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "fetchLeafRange: data size problem",
|
|
iv, replySizeMax, nLeafs, tailItems, strippedSizeOnWire
|
|
|
|
err(DataSizeError)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Private functions: peer observer
|
|
# ------------------------------------------------------------------------------
|
|
|
|
#proc onPeerConnected(ctx: SnapWireRef, peer: Peer) =
|
|
# debug "snapWire: add peer", peer
|
|
# discard
|
|
#
|
|
#proc onPeerDisconnected(ctx: SnapWireRef, peer: Peer) =
|
|
# debug "snapWire: remove peer", peer
|
|
# discard
|
|
#
|
|
#proc setupPeerObserver(ctx: SnapWireRef) =
|
|
# var po = PeerObserver(
|
|
# onPeerConnected:
|
|
# proc(p: Peer) {.gcsafe.} =
|
|
# ctx.onPeerConnected(p),
|
|
# onPeerDisconnected:
|
|
# proc(p: Peer) {.gcsafe.} =
|
|
# ctx.onPeerDisconnected(p))
|
|
# po.setProtocol protocol.snap
|
|
# ctx.peerPool.addObserver(ctx, po)
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public constructor/destructor
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc init*(
|
|
T: type SnapWireRef;
|
|
chain: ChainRef;
|
|
peerPool: PeerPool;
|
|
): T =
|
|
## Constructor (uses `init()` as suggested in style guide.)
|
|
let ctx = T(
|
|
chain: chain,
|
|
elaFetchMax: defaultElaFetchMax,
|
|
dataSizeMax: defaultDataSizeMax,
|
|
peerPool: peerPool)
|
|
|
|
#ctx.setupPeerObserver()
|
|
ctx
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions: helpers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
proc proofEncode*(proof: seq[SnapProof]): Blob =
|
|
var writer = initRlpWriter()
|
|
writer.snapAppend SnapProofNodes(nodes: proof)
|
|
writer.finish
|
|
|
|
proc proofDecode*(data: Blob): seq[SnapProof] {.gcsafe, raises: [RlpError].} =
|
|
var reader = data.rlpFromBytes
|
|
reader.snapRead(SnapProofNodes).nodes
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# Public functions: snap wire protocol handlers
|
|
# ------------------------------------------------------------------------------
|
|
|
|
method getAccountRange*(
|
|
ctx: SnapWireRef;
|
|
root: Hash256;
|
|
origin: openArray[byte];
|
|
limit: openArray[byte];
|
|
replySizeMax: uint64;
|
|
): (seq[SnapAccount], SnapProofNodes)
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Fetch accounts list from database
|
|
let sizeMax = min(replySizeMax, ctx.dataSizeMax.uint64).int
|
|
if sizeMax <= estimatedProofSize:
|
|
when extraTraceMessages:
|
|
trace logTxt "getAccountRange: max data size too small",
|
|
origin=origin.toHex, limit=limit.toHex, sizeMax
|
|
return # package size too small
|
|
|
|
let
|
|
rootKey = root.to(NodeKey)
|
|
iv = block: # Calculate effective accounts range (if any)
|
|
let rc = origin.mkNodeTagRange limit
|
|
if rc.isErr:
|
|
return # malformed interval
|
|
rc.value
|
|
|
|
stopAt = Moment.now() + ctx.elaFetchMax
|
|
rc = ctx.fetchLeafRange(ctx.getAccountFn, rootKey, iv, sizeMax, stopAt)
|
|
|
|
if rc.isErr:
|
|
return # extraction failed
|
|
let
|
|
accounts = rc.value.leafs.mapIt(it.to(SnapAccount))
|
|
proof = rc.value.proof
|
|
|
|
#when extraTraceMessages:
|
|
# trace logTxt "getAccountRange: done", iv, replySizeMax,
|
|
# nAccounts=accounts.len, nProof=proof.len
|
|
|
|
(accounts, SnapProofNodes(nodes: proof))
|
|
|
|
|
|
method getStorageRanges*(
|
|
ctx: SnapWireRef;
|
|
root: Hash256;
|
|
accounts: openArray[Hash256];
|
|
origin: openArray[byte];
|
|
limit: openArray[byte];
|
|
replySizeMax: uint64;
|
|
): (seq[seq[SnapStorage]], SnapProofNodes)
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Fetch storage slots list from database
|
|
let sizeMax = min(replySizeMax, ctx.dataSizeMax.uint64).int
|
|
if sizeMax <= estimatedProofSize:
|
|
when extraTraceMessages:
|
|
trace logTxt "getStorageRanges: max data size too small",
|
|
origin=origin.toHex, limit=limit.toHex, sizeMax
|
|
return # package size too small
|
|
|
|
let
|
|
iv = block: # Calculate effective slots range (if any)
|
|
let rc = origin.mkNodeTagRange(limit, accounts.len)
|
|
if rc.isErr:
|
|
return # malformed interval
|
|
rc.value
|
|
|
|
rootKey = root.to(NodeKey)
|
|
accGetFn = ctx.getAccountFn
|
|
stopAt = Moment.now() + ctx.elaFetchMax
|
|
|
|
# Loop over accounts
|
|
var
|
|
dataAllocated = 0
|
|
timeExceeded = false
|
|
slotLists: seq[seq[SnapStorage]]
|
|
proof: seq[SnapProof]
|
|
for accHash in accounts:
|
|
let sp = block:
|
|
let rc = ctx.getSlotsSpecs(rootKey, accGetFn, accHash.to(NodeKey))
|
|
if rc.isErr:
|
|
slotLists.add emptySnapStorageList
|
|
dataAllocated.inc # empty list
|
|
continue
|
|
rc.value
|
|
|
|
# Collect data slots for this account => `rangeProof`
|
|
let
|
|
sizeLeft = sizeMax - dataAllocated
|
|
rangeProof = block:
|
|
let rc = ctx.fetchLeafRange(sp.slotFn, sp.stoRoot, iv, sizeLeft, stopAt)
|
|
if rc.isErr:
|
|
when extraTraceMessages:
|
|
trace logTxt "getStorageRanges: failed", iv, sizeMax, sizeLeft,
|
|
accKey=accHash.to(NodeKey), stoRoot=sp.stoRoot, error=rc.error
|
|
return # extraction failed
|
|
rc.value
|
|
|
|
# Process data slots for this account
|
|
dataAllocated += rangeProof.leafsSize
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "getStorageRanges: data slots", iv, sizeMax, dataAllocated,
|
|
nAccounts=accounts.len, accKey=accHash.to(NodeKey), stoRoot=sp.stoRoot,
|
|
nSlots=rangeProof.leafs.len, nProof=rangeProof.proof.len
|
|
|
|
slotLists.add rangeProof.leafs.mapIt(it.to(SnapStorage))
|
|
if 0 < rangeProof.proof.len:
|
|
proof = rangeProof.proof
|
|
break # only last entry has a proof
|
|
|
|
# Stop unless there is enough space left
|
|
if sizeMax - dataAllocated <= estimatedProofSize:
|
|
break
|
|
|
|
if stopAt <= Moment.now():
|
|
timeExceeded = true
|
|
break
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "getStorageRanges: done", iv, sizeMax, dataAllocated,
|
|
nAccounts=accounts.len, nLeafLists=slotLists.len, nProof=proof.len,
|
|
timeExceeded
|
|
|
|
(slotLists, SnapProofNodes(nodes: proof))
|
|
|
|
|
|
method getByteCodes*(
|
|
ctx: SnapWireRef;
|
|
nodes: openArray[Hash256];
|
|
replySizeMax: uint64;
|
|
): seq[Blob]
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Fetch contract codes from the database
|
|
let
|
|
sizeMax = min(replySizeMax, ctx.dataSizeMax.uint64).int
|
|
pfxMax = (hexaryRangeRlpSize sizeMax) - sizeMax # RLP list/blob pfx max
|
|
effSizeMax = sizeMax - pfxMax
|
|
stopAt = Moment.now() + ctx.elaFetchMax
|
|
getFn = ctx.getCodeFn
|
|
|
|
var
|
|
dataAllocated = 0
|
|
timeExceeded = false
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "getByteCodes", sizeMax, nNodes=nodes.len
|
|
|
|
for w in nodes:
|
|
let data = w.data.toSeq.getFn
|
|
if 0 < data.len:
|
|
let effDataLen = hexaryRangeRlpSize data.len
|
|
if effSizeMax - effDataLen < dataAllocated:
|
|
break
|
|
dataAllocated += effDataLen
|
|
result.add data
|
|
else:
|
|
when extraTraceMessages:
|
|
trace logTxt "getByteCodes: empty record", sizeMax, nNodes=nodes.len,
|
|
key=w
|
|
if stopAt <= Moment.now():
|
|
timeExceeded = true
|
|
break
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "getByteCodes: done", sizeMax, dataAllocated,
|
|
nNodes=nodes.len, nResult=result.len, timeExceeded
|
|
|
|
|
|
method getTrieNodes*(
|
|
ctx: SnapWireRef;
|
|
root: Hash256;
|
|
pathGroups: openArray[SnapTriePaths];
|
|
replySizeMax: uint64;
|
|
): seq[Blob]
|
|
{.gcsafe, raises: [CatchableError].} =
|
|
## Fetch nodes from the database
|
|
let
|
|
sizeMax = min(replySizeMax, ctx.dataSizeMax.uint64).int
|
|
someSlack = sizeMax.hexaryRangeRlpSize() - sizeMax
|
|
if sizeMax <= someSlack:
|
|
when extraTraceMessages:
|
|
trace logTxt "getTrieNodes: max data size too small",
|
|
root=root.to(NodeKey), nPathGroups=pathGroups.len, sizeMax, someSlack
|
|
return # package size too small
|
|
let
|
|
rootKey = root.to(NodeKey)
|
|
effSizeMax = sizeMax - someSlack
|
|
stopAt = Moment.now() + ctx.elaFetchMax
|
|
var
|
|
dataAllocated = 0
|
|
timeExceeded = false
|
|
logPartPath: seq[Blob]
|
|
|
|
for (stateKey,getFn,partPath,n) in ctx.doTrieNodeSpecs(rootKey, pathGroups):
|
|
# Special case: no data available
|
|
if getFn.isNil:
|
|
if effSizeMax < dataAllocated + n:
|
|
break # no need to add trailing empty nodes
|
|
result &= EmptyBlob.repeat(n)
|
|
dataAllocated += n
|
|
continue
|
|
|
|
# Fetch node blob
|
|
let node = block:
|
|
let steps = partPath.hexPrefixDecode[1].hexaryPath(stateKey, getFn)
|
|
if 0 < steps.path.len and
|
|
steps.tail.len == 0 and steps.path[^1].nibble < 0:
|
|
steps.path[^1].node.convertTo(Blob)
|
|
else:
|
|
EmptyBlob
|
|
|
|
if effSizeMax < dataAllocated + node.len:
|
|
break
|
|
if stopAt <= Moment.now():
|
|
timeExceeded = true
|
|
break
|
|
result &= node
|
|
|
|
when extraTraceMessages:
|
|
trace logTxt "getTrieNodes: done", sizeMax, dataAllocated,
|
|
nGroups=pathGroups.mapIt(max(1,it.slotPaths.len)).foldl(a+b,0),
|
|
nPaths=pathGroups.len, nResult=result.len, timeExceeded
|
|
|
|
# ------------------------------------------------------------------------------
|
|
# End
|
|
# ------------------------------------------------------------------------------
|