Normalise snap objects (#1114)

* Fix/recover download flag

why:
  The fetch indicator used to control the data download somehow got
  lost during re-org.

* Updated chronicles/logger topics

* Reorganised run state flags

why:
  The original code used a pair of boolean flags `(stopped,stopThisState)`
  which was translated to three states running, stoppedPending, and
  stopped. It is currently not clear whether collapsing some states was
  correct. So the original logic has been re-stored, albeit wrapped into
  directives like `isStopped()` etc.

also:
  Moving some function bodies in `worker.nim`

* Moved `reply_data.nim` and `validate_trienode.nim` to sub-directory `fetch_trie`

why:
  Only used in `fetch_trie.nim`.

* Move `fetch_*` file and directory objects to `fetch` subdirectory

why:
  Only used in `fetch.nim`

* Added start/stop and/or setup/release methods for all sub-modules

why:
  good housekeeping

also:
  updated getters/setters for ctrl states
  updated trace messages
This commit is contained in:
Jordan Hrycaj 2022-06-06 14:42:08 +01:00 committed by GitHub
parent 0776f35e0c
commit 76f6de8059
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 501 additions and 309 deletions

View File

@ -213,14 +213,14 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
skip: 0,
reverse: true)
trace trEthSendSending & "GetBlockHeaders (0x03)", peer=p,
trace trEthSendSendingGetBlockHeaders, peer=p,
startBlock=request.startBlock.hash.toHex, max=request.maxResults
let latestBlock = await p.getBlockHeaders(request)
if latestBlock.isSome:
if latestBlock.get.headers.len > 0:
result = latestBlock.get.headers[0].blockNumber
trace trEthRecvGot & "BlockHeaders (0x04)", peer=p,
trace trEthRecvReceivedBlockHeaders, peer=p,
count=latestBlock.get.headers.len,
blockNumber=(if latestBlock.get.headers.len > 0: $result else: "missing")
@ -252,26 +252,26 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} =
var dataReceived = false
try:
trace trEthSendSending & "GetBlockHeaders (0x03)", peer,
trace trEthSendSendingGetBlockHeaders, peer,
startBlock=request.startBlock.number, max=request.maxResults,
step=traceStep(request)
let results = await peer.getBlockHeaders(request)
if results.isSome:
trace trEthRecvGot & "BlockHeaders (0x04)", peer,
trace trEthRecvReceivedBlockHeaders, peer,
count=results.get.headers.len, requested=request.maxResults
shallowCopy(workItem.headers, results.get.headers)
var bodies = newSeqOfCap[BlockBody](workItem.headers.len)
var hashes = newSeqOfCap[KeccakHash](maxBodiesFetch)
template fetchBodies() =
trace trEthSendSending & "GetBlockBodies (0x05)", peer,
trace trEthSendSendingGetBlockBodies, peer,
hashes=hashes.len
let b = await peer.getBlockBodies(hashes)
if b.isNone:
raise newException(
CatchableError, "Was not able to get the block bodies")
let bodiesLen = b.get.blocks.len
trace trEthRecvGot & "BlockBodies (0x06)", peer,
trace trEthRecvReceivedBlockBodies, peer,
count=bodiesLen, requested=hashes.len
if bodiesLen == 0:
raise newException(CatchableError, "Zero block bodies received for request")
@ -346,7 +346,7 @@ proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
skip: 0,
reverse: true)
trace trEthSendSending & "GetBlockHeaders (0x03)", peer=a,
trace trEthSendSendingGetBlockHeaders, peer=a,
startBlock=request.startBlock.hash.toHex, max=request.maxResults
let latestBlock = await a.getBlockHeaders(request)
@ -354,7 +354,7 @@ proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
if latestBlock.isSome:
let blockNumber = if result: $latestBlock.get.headers[0].blockNumber
else: "missing"
trace trEthRecvGot & "BlockHeaders (0x04)", peer=a,
trace trEthRecvReceivedBlockHeaders, peer=a,
count=latestBlock.get.headers.len, blockNumber
proc randomTrustedPeer(ctx: FastSyncCtx): Peer =

View File

@ -79,22 +79,38 @@ const
# Pickeled tracer texts
trEthRecvReceived* =
"<< " & prettyEthProtoName & " Received "
trEthRecvGot* =
"<< " & prettyEthProtoName & " Got "
trEthRecvReceivedBlockHeaders* =
trEthRecvReceived & "BlockHeaders (0x04)"
trEthRecvReceivedBlockBodies* =
trEthRecvReceived & "BlockBodies (0x06)"
trEthRecvReceivedGetNodeData* =
trEthRecvReceived & "GetNodeData (0x0d)"
trEthRecvReceivedNodeData* =
trEthRecvReceived & "NodeData (0x0e)"
trEthRecvProtocolViolation* =
"<< " & prettyEthProtoName & " Protocol violation, "
trEthRecvError* =
"<< " & prettyEthProtoName & " Error "
trEthRecvTimeoutWaiting* =
"<< " & prettyEthProtoName & " Timeout waiting "
trEthRecvDiscarding* =
"<< " & prettyEthProtoName & " Discarding "
trEthSendSending* =
">> " & prettyEthProtoName & " Sending "
trEthSendSendingGetBlockHeaders* =
trEthSendSending & "GetBlockHeaders (0x03)"
trEthSendSendingGetBlockBodies* =
trEthSendSending & "GetBlockBodies (0x05)"
trEthSendReplying* =
">> " & prettyEthProtoName & " Replying "
trEthSendReplyingNodeData* =
trEthSendReplying & "NodeData (0x0e)"
trEthSendDelaying* =
">> " & prettyEthProtoName & " Delaying "
trEthSendDiscarding* =
"<< " & prettyEthProtoName & " Discarding "
p2pProtocol eth66(version = ethVersion,
rlpxName = "eth",
@ -168,14 +184,14 @@ p2pProtocol eth66(version = ethVersion,
# User message 0x01: NewBlockHashes.
proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) =
when trEthTraceGossipOk:
trace trEthSendDiscarding & "NewBlockHashes (0x01)", peer,
trace trEthRecvDiscarding & "NewBlockHashes (0x01)", peer,
hashes=hashes.len
discard
# User message 0x02: Transactions.
proc transactions(peer: Peer, transactions: openArray[Transaction]) =
when trEthTraceGossipOk:
trace trEthSendDiscarding & "Transactions (0x02)", peer,
trace trEthRecvDiscarding & "Transactions (0x02)", peer,
transactions=transactions.len
discard
@ -241,7 +257,7 @@ p2pProtocol eth66(version = ethVersion,
# (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce`
# because either `p2pProtocol` or RLPx doesn't work with an alias.)
when trEthTraceGossipOk:
trace trEthSendDiscarding & "NewBlock (0x07)", peer,
trace trEthRecvDiscarding & "NewBlock (0x07)", peer,
totalDifficulty,
blockNumber = bh.header.blockNumber,
blockDifficulty = bh.header.difficulty
@ -250,7 +266,7 @@ p2pProtocol eth66(version = ethVersion,
# User message 0x08: NewPooledTransactionHashes.
proc newPooledTransactionHashes(peer: Peer, txHashes: openArray[Hash256]) =
when trEthTraceGossipOk:
trace trEthSendDiscarding & "NewPooledTransactionHashes (0x08)", peer,
trace trEthRecvDiscarding & "NewPooledTransactionHashes (0x08)", peer,
hashes=txHashes.len
discard
@ -271,7 +287,7 @@ p2pProtocol eth66(version = ethVersion,
# User message 0x0d: GetNodeData.
proc getNodeData(peer: Peer, nodeHashes: openArray[Hash256]) =
trace trEthRecvReceived & "GetNodeData (0x0d)", peer,
trace trEthRecvReceivedGetNodeData, peer,
hashes=nodeHashes.len
var data: seq[Blob]
@ -280,12 +296,8 @@ p2pProtocol eth66(version = ethVersion,
else:
data = peer.network.chain.getStorageNodes(nodeHashes)
if data.len > 0:
trace trEthSendReplying & "with NodeData (0x0e)", peer,
sent=data.len, requested=nodeHashes.len
else:
trace trEthSendReplying & "EMPTY NodeData (0x0e)", peer,
sent=0, requested=nodeHashes.len
trace trEthSendReplyingNodeData, peer,
sent=data.len, requested=nodeHashes.len
await peer.nodeData(data)
@ -296,7 +308,7 @@ p2pProtocol eth66(version = ethVersion,
# know if this is a valid reply ("Got reply") or something else.
peer.state.onNodeData(peer, data)
else:
trace trEthSendDiscarding & "NodeData (0x0e)", peer,
trace trEthRecvDiscarding & "NodeData (0x0e)", peer,
bytes=data.len
requestResponse:

View File

@ -230,20 +230,18 @@ const
# Pickeled tracer texts
trSnapRecvReceived* =
"<< " & prettySnapProtoName & " Received "
trSnapRecvGot* =
"<< " & prettySnapProtoName & " Got "
trSnapRecvProtocolViolation* =
"<< " & prettySnapProtoName & " Protocol violation, "
trSnapRecvError* =
"<< " & prettySnapProtoName & " Error "
trSnapRecvTimeoutWaiting* =
"<< " & prettySnapProtoName & " Timeout waiting "
trSnapSendSending* =
">> " & prettySnapProtoName & " Sending "
trSnapSendReplying* =
">> " & prettySnapProtoName & " Replying "
# The `snap` protocol represents `Account` differently from the regular RLP
# serialisation used in `eth` protocol as well as the canonical Merkle hash
# over all accounts. In `snap`, empty storage hash and empty code hash are

View File

@ -29,10 +29,6 @@ type
tabSize: int ## maximal number of entries
pool: PeerPool ## for starting the system, debugging
# debugging
lastDump: seq[string]
lastlen: int
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
@ -48,19 +44,15 @@ proc hash(peer: Peer): Hash =
# Private debugging helpers
# ------------------------------------------------------------------------------
proc dumpPeers(sn: SnapSyncCtx; force = false) =
if sn.lastLen != sn.peerTab.len or force:
sn.lastLen = sn.peerTab.len
let poolSize = sn.pool.len
if sn.peerTab.len == 0:
trace "*** Empty peer list", poolSize
else:
var n = sn.peerTab.len - 1
for sp in sn.peerTab.prevValues:
trace "*** Peer list entry",
n, poolSize, peer=sp, worker=sp.huntPp
n.dec
proc dumpPeers(sn: SnapSyncCtx) =
let poolSize = sn.pool.len
if sn.peerTab.len == 0:
trace "*** Empty peer list", poolSize
else:
var n = sn.peerTab.len - 1
for sp in sn.peerTab.prevValues:
trace "*** Peer list entry", n, poolSize, peer=sp, worker=sp.huntPp
n.dec
# ------------------------------------------------------------------------------
# Private functions
@ -69,11 +61,11 @@ proc dumpPeers(sn: SnapSyncCtx; force = false) =
proc syncPeerLoop(sp: WorkerBuddy) {.async.} =
# This basic loop just runs the head-hunter for each peer.
var cache = ""
while sp.ctrl.runState != BuddyStopped:
while not sp.ctrl.stopped:
# Do something, work a bit
await sp.workerExec
if sp.ctrl.runState == BuddyStopped:
if sp.ctrl.stopped:
trace "Ignoring stopped peer", peer=sp
return
@ -84,20 +76,13 @@ proc syncPeerLoop(sp: WorkerBuddy) {.async.} =
let delayMs = if sp.workerLockedOk: 1000 else: 50
await sleepAsync(chronos.milliseconds(delayMs))
proc syncPeerStart(sp: WorkerBuddy) =
asyncSpawn sp.syncPeerLoop()
proc syncPeerStop(sp: WorkerBuddy) =
sp.ctrl.runState = BuddyStopped
# TODO: Cancel running `WorkerBuddy` instances. We need clean cancellation
# for this. Doing so reliably will be addressed at a later time.
proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) =
trace "Peer connected", peer
let
ethOk = peer.supports(protocol.eth)
snapOk = peer.supports(protocol.snap)
trace "Peer connected", peer, ethOk, snapOk
let sp = WorkerBuddy.new(ns, peer, BuddyRunningOk)
let sp = WorkerBuddy.new(ns, peer)
# Manage connection table, check for existing entry
if ns.peerTab.hasKey(peer):
@ -105,13 +90,15 @@ proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) =
return
# Initialise snap sync for this peer
discard sp.workerStart
if not sp.workerStart():
trace "State(eth) not initialized!"
return
# Check for table overflow. An overflow should not happen if the table is
# as large as the peer connection table.
if ns.tabSize <= ns.peerTab.len:
let leastPeer = ns.peerTab.shift.value.data
leastPeer.syncPeerStop
leastPeer.workerStop()
trace "Peer table full, deleted least used",
leastPeer, poolSize=ns.pool.len, tabLen=ns.peerTab.len, tabMax=ns.tabSize
@ -121,18 +108,19 @@ proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) =
peer, poolSize=ns.pool.len, tabLen=ns.peerTab.len, tabMax=ns.tabSize
# Debugging, peer table dump after adding gentry
#ns.dumpPeers(true)
sp.syncPeerStart()
#ns.dumpPeers
asyncSpawn sp.syncPeerLoop()
proc onPeerDisconnected(ns: SnapSyncCtx, peer: Peer) =
trace "Peer disconnected", peer
echo "onPeerDisconnected peer=", peer
# Debugging, peer table dump before removing entry
#ns.dumpPeers(true)
#ns.dumpPeers
let rc = ns.peerTab.delete(peer)
if rc.isOk:
rc.value.data.syncPeerStop()
rc.value.data.workerStop()
else:
debug "Disconnected from unregistered peer", peer
@ -157,9 +145,17 @@ proc start*(ctx: SnapSyncCtx) =
onPeerDisconnected:
proc(p: Peer) {.gcsafe.} =
ctx.onPeerDisconnected(p))
# Initialise sub-systems
ctx.workerSetup()
po.setProtocol eth
ctx.pool.addObserver(ctx, po)
proc stop*(ctx: SnapSyncCtx) =
## Stop syncing
ctx.pool.delObserver(ctx)
ctx.workerRelease()
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -124,6 +124,7 @@ type
WorkerHuntEx = ref object of WorkerBase
## Peer canonical chain head ("best block") search state.
syncMode: WorkerMode ## Action mode
startedFetch: bool ## Start download once, only
lowNumber: BlockNumber ## Recent lowest known block number.
highNumber: BlockNumber ## Recent highest known block number.
bestNumber: BlockNumber
@ -223,8 +224,11 @@ proc clearSyncStateRoot(sp: WorkerBuddy) =
debug "Stopping state sync from this peer", peer=sp
sp.ctrl.stateRoot = none(TrieHash)
proc lockSyncStateRoot(sp: WorkerBuddy, number: BlockNumber, hash: BlockHash,
stateRoot: TrieHash) =
proc lockSyncStateRoot(
sp: WorkerBuddy,
number: BlockNumber,
hash: BlockHash,
stateRoot: TrieHash) =
sp.setSyncLocked(number, hash)
let thisBlock = sp.ns.pp(hash, number)
@ -237,8 +241,8 @@ proc lockSyncStateRoot(sp: WorkerBuddy, number: BlockNumber, hash: BlockHash,
sp.ctrl.stateRoot = some(stateRoot)
if sp.ctrl.runState != BuddyRunningOK:
sp.ctrl.runState = BuddyRunningOK
if not sp.hunt.startedFetch:
sp.hunt.startedFetch = true
trace "Starting to download block state", peer=sp,
thisBlock, stateRoot
asyncSpawn sp.fetch()
@ -283,110 +287,9 @@ proc updateHuntPresent(sp: WorkerBuddy, highestPresent: BlockNumber) =
sp.setHuntForward(highestPresent)
sp.clearSyncStateRoot()
proc peerSyncChainEmptyReply(sp: WorkerBuddy, request: BlocksRequest) =
## Handle empty `GetBlockHeaders` reply. This means `request.startBlock` is
## absent on the peer. If it was `SyncLocked` there must have been a reorg
## and the previous canonical chain head has disappeared. If hunting, this
## updates the range of uncertainty.
# Treat empty response to a request starting from block 1 as equivalent to
# length 1 starting from block 0 in `peerSyncChainNonEmptyReply`. We treat
# every peer as if it would send genesis for block 0, without asking for it.
if request.skip == 0 and
not request.reverse and
not request.startBlock.isHash and
request.startBlock.number == 1.toBlockNumber:
sp.lockSyncStateRoot(0.toBlockNumber,
sp.peer.network.chain.genesisHash.BlockHash,
sp.peer.network.chain.Chain.genesisStateRoot.TrieHash)
return
if sp.hunt.syncMode == SyncLocked or sp.hunt.syncMode == SyncOnlyHash:
inc sp.stats.ok.reorgDetected
trace "Peer reorg detected, best block disappeared", peer=sp,
startBlock=request.startBlock
let lowestAbsent = request.startBlock.number
case sp.hunt.syncMode:
of SyncLocked:
# If this message doesn't change our knowledge, ignore it.
if lowestAbsent > sp.hunt.bestNumber:
return
# Due to a reorg, peer's canonical head has lower block number, outside
# our tracking window. Sync lock is no longer valid. Switch to hunt
# backward to find the new canonical head.
sp.setHuntBackward(lowestAbsent)
of SyncOnlyHash:
# Due to a reorg, peer doesn't have the block hash it originally gave us.
# Switch to hunt forward from block zero to find the canonical head.
sp.setHuntForward(0.toBlockNumber)
of HuntForward, HuntBackward, HuntRange, HuntRangeFinal:
# Update the hunt range.
sp.updateHuntAbsent(lowestAbsent)
# Update best block number. It is invalid except when `SyncLocked`, but
# still useful as a hint of what we knew recently, for example in displays.
if lowestAbsent <= sp.hunt.bestNumber:
sp.hunt.bestNumber = if lowestAbsent == 0.toBlockNumber: lowestAbsent
else: lowestAbsent - 1.toBlockNumber
sp.hunt.bestHash = default(typeof(sp.hunt.bestHash))
sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber)
proc peerSyncChainNonEmptyReply(sp: WorkerBuddy, request: BlocksRequest,
headers: openArray[BlockHeader]) =
## Handle non-empty `GetBlockHeaders` reply. This means `request.startBlock`
## is present on the peer and in its canonical chain (unless the request was
## made with a hash). If it's a short, contiguous, ascending order reply, it
## reveals the abrupt transition at the end of the chain and we have learned
## or reconfirmed the real-time head block. If hunting, this updates the
## range of uncertainty.
let len = headers.len
let highestIndex = if request.reverse: 0 else: len - 1
# We assume a short enough reply means we've learned the peer's canonical
# head, because it would have replied with another header if not at the head.
# This is not justified when the request used a general hash, because the
# peer doesn't have to reply with its canonical chain in that case, except it
# is still justified if the hash was the known canonical head, which is
# the case in a `SyncOnlyHash` request.
if len < syncLockedMinimumReply and
request.skip == 0 and not request.reverse and
len.uint < request.maxResults:
sp.lockSyncStateRoot(headers[highestIndex].blockNumber,
headers[highestIndex].blockHash.BlockHash,
headers[highestIndex].stateRoot.TrieHash)
return
# Be careful, this number is from externally supplied data and arithmetic
# in the upward direction could overflow.
let highestPresent = headers[highestIndex].blockNumber
# A reply that isn't short enough for the canonical head criterion above
# tells us headers up to some number, but it doesn't tell us if there are
# more after it in the peer's canonical chain. We have to request more
# headers to find out.
case sp.hunt.syncMode:
of SyncLocked:
# If this message doesn't change our knowledge, ignore it.
if highestPresent <= sp.hunt.bestNumber:
return
# Sync lock is no longer valid as we don't have confirmed canonical head.
# Switch to hunt forward to find the new canonical head.
sp.setHuntForward(highestPresent)
of SyncOnlyHash:
# As `SyncLocked` but without the block number check.
sp.setHuntForward(highestPresent)
of HuntForward, HuntBackward, HuntRange, HuntRangeFinal:
# Update the hunt range.
sp.updateHuntPresent(highestPresent)
# Update best block number. It is invalid except when `SyncLocked`, but
# still useful as a hint of what we knew recently, for example in displays.
if highestPresent > sp.hunt.bestNumber:
sp.hunt.bestNumber = highestPresent
sp.hunt.bestHash = headers[highestIndex].blockHash.BlockHash
sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber)
# ------------------------------------------------------------------------------
# Private functions, assemble request
# ------------------------------------------------------------------------------
proc peerSyncChainRequest(sp: WorkerBuddy): BlocksRequest =
## Choose `GetBlockHeaders` parameters when hunting or following the canonical
@ -521,6 +424,160 @@ proc peerSyncChainRequest(sp: WorkerBuddy): BlocksRequest =
result.maxResults = huntFinalSize
sp.hunt.syncMode = HuntRangeFinal
# ------------------------------------------------------------------------------
# Private functions, reply handling
# ------------------------------------------------------------------------------
proc peerSyncChainEmptyReply(
sp: WorkerBuddy,
request: BlocksRequest) =
## Handle empty `GetBlockHeaders` reply. This means `request.startBlock` is
## absent on the peer. If it was `SyncLocked` there must have been a reorg
## and the previous canonical chain head has disappeared. If hunting, this
## updates the range of uncertainty.
# Treat empty response to a request starting from block 1 as equivalent to
# length 1 starting from block 0 in `peerSyncChainNonEmptyReply`. We treat
# every peer as if it would send genesis for block 0, without asking for it.
if request.skip == 0 and
not request.reverse and
not request.startBlock.isHash and
request.startBlock.number == 1.toBlockNumber:
sp.lockSyncStateRoot(
0.toBlockNumber,
sp.peer.network.chain.genesisHash.BlockHash,
sp.peer.network.chain.Chain.genesisStateRoot.TrieHash)
return
if sp.hunt.syncMode in {SyncLocked, SyncOnlyHash}:
inc sp.stats.ok.reorgDetected
trace "Peer reorg detected, best block disappeared", peer=sp,
startBlock=request.startBlock
let lowestAbsent = request.startBlock.number
case sp.hunt.syncMode:
of SyncLocked:
# If this message doesn't change our knowledge, ignore it.
if lowestAbsent > sp.hunt.bestNumber:
return
# Due to a reorg, peer's canonical head has lower block number, outside
# our tracking window. Sync lock is no longer valid. Switch to hunt
# backward to find the new canonical head.
sp.setHuntBackward(lowestAbsent)
of SyncOnlyHash:
# Due to a reorg, peer doesn't have the block hash it originally gave us.
# Switch to hunt forward from block zero to find the canonical head.
sp.setHuntForward(0.toBlockNumber)
of HuntForward, HuntBackward, HuntRange, HuntRangeFinal:
# Update the hunt range.
sp.updateHuntAbsent(lowestAbsent)
# Update best block number. It is invalid except when `SyncLocked`, but
# still useful as a hint of what we knew recently, for example in displays.
if lowestAbsent <= sp.hunt.bestNumber:
sp.hunt.bestNumber =
if lowestAbsent == 0.toBlockNumber: lowestAbsent
else: lowestAbsent - 1.toBlockNumber
sp.hunt.bestHash = default(typeof(sp.hunt.bestHash))
sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber)
proc peerSyncChainNonEmptyReply(
sp: WorkerBuddy,
request: BlocksRequest,
headers: openArray[BlockHeader]) =
## Handle non-empty `GetBlockHeaders` reply. This means `request.startBlock`
## is present on the peer and in its canonical chain (unless the request was
## made with a hash). If it's a short, contiguous, ascending order reply, it
## reveals the abrupt transition at the end of the chain and we have learned
## or reconfirmed the real-time head block. If hunting, this updates the
## range of uncertainty.
let len = headers.len
let highestIndex = if request.reverse: 0 else: len - 1
# We assume a short enough reply means we've learned the peer's canonical
# head, because it would have replied with another header if not at the head.
# This is not justified when the request used a general hash, because the
# peer doesn't have to reply with its canonical chain in that case, except it
# is still justified if the hash was the known canonical head, which is
# the case in a `SyncOnlyHash` request.
if len < syncLockedMinimumReply and
request.skip == 0 and not request.reverse and
len.uint < request.maxResults:
sp.lockSyncStateRoot(
headers[highestIndex].blockNumber,
headers[highestIndex].blockHash.BlockHash,
headers[highestIndex].stateRoot.TrieHash)
return
# Be careful, this number is from externally supplied data and arithmetic
# in the upward direction could overflow.
let highestPresent = headers[highestIndex].blockNumber
# A reply that isn't short enough for the canonical head criterion above
# tells us headers up to some number, but it doesn't tell us if there are
# more after it in the peer's canonical chain. We have to request more
# headers to find out.
case sp.hunt.syncMode:
of SyncLocked:
# If this message doesn't change our knowledge, ignore it.
if highestPresent <= sp.hunt.bestNumber:
return
# Sync lock is no longer valid as we don't have confirmed canonical head.
# Switch to hunt forward to find the new canonical head.
sp.setHuntForward(highestPresent)
of SyncOnlyHash:
# As `SyncLocked` but without the block number check.
sp.setHuntForward(highestPresent)
of HuntForward, HuntBackward, HuntRange, HuntRangeFinal:
# Update the hunt range.
sp.updateHuntPresent(highestPresent)
# Update best block number. It is invalid except when `SyncLocked`, but
# still useful as a hint of what we knew recently, for example in displays.
if highestPresent > sp.hunt.bestNumber:
sp.hunt.bestNumber = highestPresent
sp.hunt.bestHash = headers[highestIndex].blockHash.BlockHash
sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber)
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc workerSetup*(ns: Worker) =
## Global set up
ns.fetchSetup()
proc workerRelease*(ns: Worker) =
## Global clean up
ns.fetchRelease()
proc workerStart*(sp: WorkerBuddy): bool =
## Initialise `WorkerBuddy` to support `workerBlockHeaders()` calls
sp.ctrl.init(fullyRunning = true)
# Initialise `DataNode` reply handling
sp.fetchStart()
# Link in hunt descriptor
sp.hunt = WorkerHuntEx.new(HuntForward)
if sp.peer.state(protocol.eth).initialized:
# We know the hash but not the block number.
sp.hunt.bestHash = sp.peer.state(protocol.eth).bestBlockHash.BlockHash
# TODO: Temporarily disabled because it's useful to test the worker.
# sp.syncMode = SyncOnlyHash
return true
proc workerStop*(sp: WorkerBuddy) =
## Clean up this peer
sp.ctrl.stopped = true
sp.fetchStop()
proc workerLockedOk*(sp: WorkerBuddy): bool =
sp.hunt.syncMode == SyncLocked
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -539,7 +596,7 @@ proc workerExec*(sp: WorkerBuddy) {.async.} =
let request = sp.peerSyncChainRequest
trace trEthSendSending & "GetBlockHeaders", peer=sp,
trace trEthSendSendingGetBlockHeaders, peer=sp,
count=request.maxResults,
startBlock=sp.ns.pp(request.startBlock), step=request.traceStep
@ -548,30 +605,30 @@ proc workerExec*(sp: WorkerBuddy) {.async.} =
try:
reply = await sp.peer.getBlockHeaders(request)
except CatchableError as e:
trace trEthRecvError & "waiting for reply to GetBlockHeaders", peer=sp,
trace trEthRecvError & "waiting for GetBlockHeaders reply", peer=sp,
error=e.msg
inc sp.stats.major.networkErrors
sp.ctrl.runState = BuddyStopped
sp.ctrl.stopped = true
return
if reply.isNone:
trace trEthRecvTimeoutWaiting & "for reply to GetBlockHeaders", peer=sp
trace trEthRecvTimeoutWaiting & "for GetBlockHeaders reply", peer=sp
# TODO: Should disconnect?
inc sp.stats.minor.timeoutBlockHeaders
return
let nHeaders = reply.get.headers.len
if nHeaders == 0:
trace trEthRecvGot & "EMPTY reply BlockHeaders", peer=sp,
trace trEthRecvReceivedBlockHeaders, peer=sp,
got=0, requested=request.maxResults
else:
trace trEthRecvGot & "reply BlockHeaders", peer=sp,
trace trEthRecvReceivedBlockHeaders, peer=sp,
got=nHeaders, requested=request.maxResults,
firstBlock=reply.get.headers[0].blockNumber,
lastBlock=reply.get.headers[^1].blockNumber
if request.maxResults.int < nHeaders:
trace trEthRecvProtocolViolation & "excess headers in BlockHeaders",
trace trEthRecvProtocolViolation & "excess headers in BlockHeaders message",
peer=sp, got=nHeaders, requested=request.maxResults
# TODO: Should disconnect.
inc sp.stats.major.excessBlockHeaders
@ -583,28 +640,6 @@ proc workerExec*(sp: WorkerBuddy) {.async.} =
else:
sp.peerSyncChainEmptyReply(request)
proc workerStart*(sp: WorkerBuddy): bool =
## Initialise `WorkerBuddy` to support `workerBlockHeaders()` calls
# Initialise `DataNode` reply handling
sp.fetchSetup
# Link in hunt descriptor
sp.hunt = WorkerHuntEx.new(HuntForward)
if sp.peer.state(eth).initialized:
# We know the hash but not the block number.
sp.hunt.bestHash = sp.peer.state(eth).bestBlockHash.BlockHash
# TODO: Temporarily disabled because it's useful to test the head hunter.
# sp.syncMode = SyncOnlyHash
return true
trace "State(eth) not initialized!"
proc workerLockedOk*(sp: WorkerBuddy): bool =
sp.hunt.syncMode == SyncLocked
# ------------------------------------------------------------------------------
# Debugging
# ------------------------------------------------------------------------------

View File

@ -17,16 +17,44 @@ import
eth/[common/eth_types, p2p],
../../types,
../path_desc,
"."/[common, fetch_trie, fetch_snap, worker_desc]
./fetch/[common, fetch_snap, fetch_trie],
./worker_desc
{.push raises: [Defect].}
logScope:
topics = "snap peer fetch"
topics = "snap fetch"
# Note: To test disabling snap (or trie), modify `fetchTrieOk` or
# `fetchSnapOk` where those are defined.
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc fetchSetup*(ns: Worker) =
## Global set up
ns.commonSetup()
proc fetchRelease*(ns: Worker) =
## Global clean up
ns.commonRelease()
proc fetchStart*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `ReplyData.new()` calls.
sp.fetchTrieStart()
trace "Supported fetch modes", peer=sp,
ctrlState=sp.ctrl.state, trieMode=sp.fetchTrieOk, snapMode=sp.fetchSnapOk
proc fetchStop*(sp: WorkerBuddy) =
## Clean up for this peer
sp.fetchTrieStop()
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc fetch*(sp: WorkerBuddy) {.async.} =
var stateRoot = sp.ctrl.stateRoot.get
trace "Syncing from stateRoot", peer=sp, stateRoot
@ -52,9 +80,9 @@ proc fetch*(sp: WorkerBuddy) {.async.} =
if stateRoot != sp.ctrl.stateRoot.get:
trace "Syncing from new stateRoot", peer=sp, stateRoot
stateRoot = sp.ctrl.stateRoot.get
sp.ctrl.runState = BuddyRunningOK
sp.ctrl.stopped = false
if sp.ctrl.runState == BuddyStopRequest:
if sp.ctrl.stopRequest:
trace "Pausing sync until we get a new state root", peer=sp
while sp.ctrl.stateRoot.isSome and stateRoot == sp.ctrl.stateRoot.get and
(sp.fetchTrieOk or sp.fetchSnapOk) and
@ -83,6 +111,6 @@ proc fetch*(sp: WorkerBuddy) {.async.} =
leafRange=pathRange(leafRange.leafLow, leafRange.leafHigh), stateRoot
await sp.fetchTrie(stateRoot, leafRange)
proc fetchSetup*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `ReplyData.new()` calls.
sp.fetchTrieSetup
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -15,13 +15,14 @@ import
chronicles,
eth/[common/eth_types, p2p],
stint,
../path_desc,
"."/[timer_helper, worker_desc]
../../path_desc,
../worker_desc,
./timer_helper
{.push raises: [Defect].}
logScope:
topics = "snap peer common"
topics = "snap common"
type
CommonFetchEx* = ref object of CommonBase
@ -38,6 +39,10 @@ type
countRangeTrieStarted*: bool
logTicker: TimerCallback
const
defaultTickerStartDelay = 100.milliseconds
tickerLogInterval = 1.seconds
# ------------------------------------------------------------------------------
# Private timer helpers
# ------------------------------------------------------------------------------
@ -77,17 +82,20 @@ proc setLogTicker(sf: CommonFetchEx; at: Moment) {.gcsafe.}
proc runLogTicker(sf: CommonFetchEx) {.gcsafe.} =
doAssert not sf.isNil
info "State: Account sync progress",
percent = percent(sf.countRange, sf.countRangeStarted),
info "Sync accounts progress",
accounts = sf.countAccounts,
percent = percent(sf.countRange, sf.countRangeStarted),
snap = percent(sf.countRangeSnap, sf.countRangeSnapStarted),
trie = percent(sf.countRangeTrie, sf.countRangeTrieStarted)
sf.setLogTicker(Moment.fromNow(1.seconds))
sf.setLogTicker(Moment.fromNow(tickerLogInterval))
proc setLogTicker(sf: CommonFetchEx; at: Moment) =
sf.logTicker = safeSetTimer(at, runLogTicker, sf)
if sf.logTicker.isNil:
debug "Sync accounts progress ticker has stopped"
else:
sf.logTicker = safeSetTimer(at, runLogTicker, sf)
proc new*(T: type CommonFetchEx; startAfter = 100.milliseconds): T =
proc new(T: type CommonFetchEx; startAfter = defaultTickerStartDelay): T =
result = CommonFetchEx(
leafRanges: @[LeafRange(
leafLow: LeafPath.low,
@ -113,6 +121,20 @@ proc sharedFetchEx*(ns: Worker): CommonFetchEx =
## Handy helper
ns.commonBase.CommonFetchEx
# ------------------------------------------------------------------------------
# Public start/stop functions
# ------------------------------------------------------------------------------
proc commonSetup*(ns: Worker) =
## Global set up
discard
proc commonRelease*(ns: Worker) =
## Global clean up
if not ns.sharedFetchEx.isNil:
ns.sharedFetchEx.logTicker = nil # stop timer
ns.sharedFetchEx = nil # unlink `CommonFetchEx` object
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -120,7 +142,7 @@ proc sharedFetchEx*(ns: Worker): CommonFetchEx =
proc hasSlice*(sp: WorkerBuddy): bool =
## Return `true` iff `getSlice` would return a free slice to work on.
if sp.ns.sharedFetchEx.isNil:
sp.ns.sharedFetchEx = CommonFetchEx.new
sp.ns.sharedFetchEx = CommonFetchEx.new()
result = 0 < sp.ns.sharedFetchEx.leafRanges.len
trace "hasSlice", peer=sp, hasSlice=result
@ -128,9 +150,8 @@ proc getSlice*(sp: WorkerBuddy, leafLow, leafHigh: var LeafPath): bool =
## Claim a free slice to work on. If a slice was available, it's claimed,
## `leadLow` and `leafHigh` are set to the slice range and `true` is
## returned. Otherwise `false` is returned.
if sp.ns.sharedFetchEx.isNil:
sp.ns.sharedFetchEx = CommonFetchEx.new
sp.ns.sharedFetchEx = CommonFetchEx.new()
let sharedFetch = sp.ns.sharedFetchEx
template ranges: auto = sharedFetch.leafRanges
const leafMaxFetchRange = (high(LeafPath) - low(LeafPath)) div 1000
@ -138,6 +159,7 @@ proc getSlice*(sp: WorkerBuddy, leafLow, leafHigh: var LeafPath): bool =
if ranges.len == 0:
trace "GetSlice", leafRange="none"
return false
leafLow = ranges[0].leafLow
if ranges[0].leafHigh - ranges[0].leafLow <= leafMaxFetchRange:
leafHigh = ranges[0].leafHigh

View File

@ -24,36 +24,41 @@ import
chronos,
eth/[common/eth_types, p2p],
nimcrypto/keccak,
"../.."/[protocol, types],
../path_desc,
"."/[common, worker_desc]
"../../.."/[protocol, protocol/trace_config, types],
../../path_desc,
../worker_desc,
./common
{.push raises: [Defect].}
logScope:
topics = "snap peer fetch"
topics = "snap fetch"
const
snapRequestBytesLimit = 2 * 1024 * 1024
## Soft bytes limit to request in `snap` protocol calls.
proc fetchSnap*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
{.async.} =
proc fetchSnap*(
sp: WorkerBuddy,
stateRoot: TrieHash,
leafRange: LeafRange) {.async.} =
## Fetch data using the `snap#` protocol
var origin = leafRange.leafLow
var limit = leafRange.leafHigh
const responseBytes = 2 * 1024 * 1024
if sp.ctrl.runState == BuddyStopped:
if sp.ctrl.stopped:
trace trSnapRecvError &
"peer already disconnected, not sending GetAccountRange",
peer=sp, accountRange=pathRange(origin, limit),
stateRoot, bytesLimit=snapRequestBytesLimit
sp.putSlice(leafRange)
return # FIXME: was there a return missing?
trace trSnapSendSending & "GetAccountRange", peer=sp,
accountRange=pathRange(origin, limit),
stateRoot, bytesLimit=snapRequestBytesLimit
if trSnapTracePacketsOk:
trace trSnapSendSending & "GetAccountRange", peer=sp,
accountRange=pathRange(origin, limit),
stateRoot, bytesLimit=snapRequestBytesLimit
var
reply: Option[accountRangeObj]
@ -64,7 +69,7 @@ proc fetchSnap*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
trace trSnapRecvError & "waiting for reply to GetAccountRange", peer=sp,
error=e.msg
inc sp.stats.major.networkErrors
sp.ctrl.runState = BuddyStopped
sp.ctrl.stopped = true
sp.putSlice(leafRange)
return
@ -92,13 +97,13 @@ proc fetchSnap*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
# This makes all the difference to terminating the fetch. For now we'll
# trust the mere existence of the proof rather than verifying it.
if proof.len == 0:
trace trSnapRecvGot & "EMPTY reply AccountRange", peer=sp,
trace trSnapRecvReceived & "EMPTY AccountRange message", peer=sp,
got=len, proofLen=proof.len, gotRange="-", requestedRange, stateRoot
sp.putSlice(leafRange)
# Don't keep retrying snap for this state.
sp.ctrl.runState = BuddyStopRequest
sp.ctrl.stopRequest = true
else:
trace trSnapRecvGot & "END reply AccountRange", peer=sp,
trace trSnapRecvReceived & "END AccountRange message", peer=sp,
got=len, proofLen=proof.len, gotRange=pathRange(origin, high(LeafPath)),
requestedRange, stateRoot
# Current slicer can't accept more result data than was requested, so
@ -107,7 +112,7 @@ proc fetchSnap*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
return
var lastPath = accounts[len-1].accHash
trace trSnapRecvGot & "reply AccountRange", peer=sp,
trace trSnapRecvReceived & "AccountRange message", peer=sp,
got=len, proofLen=proof.len, gotRange=pathRange(origin, lastPath),
requestedRange, stateRoot
@ -139,4 +144,6 @@ proc fetchSnap*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
proc fetchSnapOk*(sp: WorkerBuddy): bool =
## Sort of getter: if `true`, fetching data using the `snap#` protocol
## is supported.
sp.ctrl.runState != BuddyStopped and sp.peer.supports(snap)
result = not sp.ctrl.stopped and sp.peer.supports(protocol.snap)
trace "fetchSnapOk()", peer=sp,
ctrlState=sp.ctrl.state, snapOk=sp.peer.supports(protocol.snap), result

View File

@ -26,15 +26,16 @@ import
std/[sets, tables, algorithm],
chronos,
eth/[common/eth_types, p2p],
"../.."/[protocol/trace_config, types],
../path_desc,
"."/[common, reply_data, validate_trienode, worker_desc]
"../../.."/[protocol/trace_config, types],
../../path_desc,
../worker_desc,
./fetch_trie/[reply_data, validate_trienode],
./common
{.push raises: [Defect].}
logScope:
topics = "snap peer fetch"
topics = "snap fetch"
const
maxBatchGetNodeData = 384
@ -92,7 +93,7 @@ proc wrapCallGetNodeData(fetch: FetchStateEx, hashes: seq[NodeHash],
if reply.replyType == NoReplyData:
# Empty reply, timeout or error (i.e. `reply.isNil`).
# It means there are none of the nodes available.
fetch.sp.ctrl.runState = BuddyStopRequest
fetch.sp.ctrl.stopRequest = true
for i in 0 ..< futures.len:
futures[i].complete(@[])
@ -160,7 +161,7 @@ proc batchGetNodeData(fetch: FetchStateEx) =
trace "Trie: Sort length", sortLen=i
# If stopped, abort all waiting nodes, so they clean up.
if fetch.sp.ctrl.runState != BuddyRunningOk:
if not fetch.sp.ctrl.fullyRunning:
while i > 0:
fetch.nodeGetQueue[i].future.complete(@[])
dec i
@ -218,7 +219,7 @@ proc getNodeData(fetch: FetchStateEx,
fetch.scheduleBatchGetNodeData()
let nodeBytes = await future
if fetch.sp.ctrl.runState != BuddyRunningOk:
if not fetch.sp.ctrl.fullyRunning:
return nodebytes
when trEthTracePacketsOk:
@ -252,20 +253,20 @@ proc pathInRange(fetch: FetchStateEx, path: InteriorPath): bool =
proc traverse(fetch: FetchStateEx, hash: NodeHash, path: InteriorPath,
fromExtension: bool) {.async.} =
template errorReturn() =
fetch.sp.ctrl.runState = BuddyStopRequest
fetch.sp.ctrl.stopRequest = true
dec fetch.nodesInFlight
if fetch.nodesInFlight == 0:
fetch.finish.complete()
return
# If something triggered stop earlier, don't request, and clean up now.
if fetch.sp.ctrl.runState != BuddyRunningOk:
if not fetch.sp.ctrl.fullyRunning:
errorReturn()
let nodeBytes = await fetch.getNodeData(hash.TrieHash, path)
# If something triggered stop, clean up now.
if fetch.sp.ctrl.runState != BuddyRunningOk:
if not fetch.sp.ctrl.fullyRunning:
errorReturn()
# Don't keep emitting error messages after one error. We'll allow 10.
if fetch.getNodeDataErrors >= 10:
@ -340,6 +341,23 @@ proc probeGetNodeData(sp: WorkerBuddy, stateRoot: TrieHash): Future[bool]
let reply = await ReplyData.new(sp, @[stateRoot.NodeHash])
return reply.replyType == SingleEntryReply
# ------------------------------------------------------------------------------
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc fetchTrieStart*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `replyDataGet()` calls.
sp.replyDataStart()
proc fetchTrieStop*(sp: WorkerBuddy) =
sp.replyDataStop()
proc fetchTrieOk*(sp: WorkerBuddy): bool =
result = not sp.ctrl.stopped and
(sp.fetchStateEx.isNil or sp.fetchStateEx.getNodeDataErrors == 0)
trace "fetchTrieOk()", peer=sp,
ctrlState=sp.ctrl.state, result
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
@ -366,14 +384,6 @@ proc fetchTrie*(sp: WorkerBuddy, stateRoot: TrieHash, leafRange: LeafRange)
sp.ns.sharedFetchEx.countAccountBytes -= fetch.unwindAccountBytes
sp.putSlice(leafRange)
proc fetchTrieOk*(sp: WorkerBuddy): bool =
sp.ctrl.runState != BuddyStopped and
(sp.fetchStateEx.isNil or sp.fetchStateEx.getNodeDataErrors == 0)
proc fetchTrieSetup*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `replyDataGet()` calls.
sp.replyDataSetup
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -64,9 +64,10 @@ import
std/[sequtils, sets, tables, hashes],
chronos,
eth/[common/eth_types, p2p],
"../.."/[protocol, protocol/trace_config, types],
../path_desc,
"."/[timer_helper, worker_desc]
"../../../.."/[protocol, protocol/trace_config, types],
../../../path_desc,
../../worker_desc,
../timer_helper
{.push raises: [Defect].}
@ -158,9 +159,9 @@ proc traceReplyDataEmpty(sp: WorkerBuddy, request: RequestData) =
# `request` can be `nil` because we don't always know which request
# the empty reply goes with. Therefore `sp` must be included.
if request.isNil:
trace trEthRecvGot & "EMPTY NodeData", peer=sp, got=0
trace trEthRecvReceivedNodeData, peer=sp, got=0
else:
trace trEthRecvGot & "NodeData", peer=sp, got=0,
trace trEthRecvReceivedNodeData, peer=sp, got=0,
requested=request.hashes.len, pathRange=request.pathRange
proc traceReplyDataUnmatched(sp: WorkerBuddy, got: int) =
@ -176,11 +177,11 @@ proc traceReplyData(request: RequestData,
logScope: pathRange=request.pathRange
logScope: peer=request.sp
if got > request.hashes.len and (unmatched + other) == 0:
trace trEthRecvGot & "EXCESS reply NodeData"
trace trEthRecvReceivedNodeData & " (EXCESS data)"
elif got == request.hashes.len or use != got:
trace trEthRecvGot & "reply NodeData"
trace trEthRecvReceivedNodeData
elif got < request.hashes.len:
trace trEthRecvGot & "TRUNCATED reply NodeData"
trace trEthRecvReceivedNodeData & " TRUNCATED"
if use != got:
logScope:
@ -335,11 +336,13 @@ proc nodeDataRequestDequeue(rq: RequestDataQueue,
rq.itemHash.del(addr request.hashes[j])
# Forward declarations.
proc nodeDataTryEmpties(rq: RequestDataQueue)
proc nodeDataTryEmpties(rq: RequestDataQueue) {.gcsafe.}
proc nodeDataEnqueueAndSend(request: RequestData) {.async.}
proc nodeDataComplete(request: RequestData, reply: ReplyData,
insideTryEmpties = false) =
proc nodeDataComplete(
request: RequestData,
reply: ReplyData,
insideTryEmpties = false) {.gcsafe.} =
## Complete `request` with received data or other reply.
if request.future.finished:
# Subtle: Timer can trigger and its callback be added to Chronos run loop,
@ -406,7 +409,7 @@ proc nodeDataEnqueueAndSend(request: RequestData) {.async.} =
## Helper function to send an `eth.GetNodeData` request.
## But not when we're draining the in flight queue to match empty replies.
let sp = request.sp
if sp.ctrl.runState == BuddyStopped:
if sp.ctrl.stopped:
request.traceGetNodeDataDisconnected()
request.future.complete(nil)
return
@ -426,10 +429,10 @@ proc nodeDataEnqueueAndSend(request: RequestData) {.async.} =
except CatchableError as e:
request.traceGetNodeDataSendError(e)
inc sp.stats.major.networkErrors
sp.ctrl.runState = BuddyStopped
sp.ctrl.stopped = true
request.future.fail(e)
proc onNodeData(sp: WorkerBuddy, data: openArray[Blob]) =
proc onNodeData(sp: WorkerBuddy, data: openArray[Blob]) {.gcsafe.} =
## Handle an incoming `eth.NodeData` reply.
## Practically, this is also where all the incoming packet trace messages go.
let rq = sp.requestsEx
@ -478,7 +481,32 @@ proc onNodeData(sp: WorkerBuddy, data: openArray[Blob]) =
request.nodeDataComplete(reply)
# ------------------------------------------------------------------------------
# Public functions
# Public start/stop and admin functions
# ------------------------------------------------------------------------------
proc replyDataStart*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `NodeData` replies to `GetNodeData`
## requests issued by `new()`.
if sp.requestsEx.isNil:
sp.requestsEx = RequestDataQueue()
sp.peer.state(protocol.eth).onNodeData =
proc (_: Peer, data: openArray[Blob]) =
trace "Custom handler onNodeData", peer=sp, blobs=data.len
onNodeData(sp, data)
sp.peer.state(protocol.eth).onGetNodeData =
proc (_: Peer, hashes: openArray[Hash256], data: var seq[Blob]) =
## Return empty nodes result. This callback is installed to
## ensure we don't reply with nodes from the chainDb.
discard
proc replyDataStop*(sp: WorkerBuddy) =
sp.peer.state(protocol.eth).onNodeData = nil
sp.peer.state(protocol.eth).onGetNodeData = nil
# ------------------------------------------------------------------------------
# Public constructor
# ------------------------------------------------------------------------------
proc new*(
@ -513,7 +541,7 @@ proc new*(
except CatchableError as e:
request.traceReplyDataError(e)
inc sp.stats.major.networkErrors
sp.ctrl.runState = BuddyStopped
sp.ctrl.stopped = true
return nil
# Timeout, packet and packet error trace messages are done in `onNodeData`
@ -521,6 +549,10 @@ proc new*(
# always received just valid data with hashes already verified, or `nil`.
return reply
# ------------------------------------------------------------------------------
# Public functions
# ------------------------------------------------------------------------------
proc replyType*(reply: ReplyData): ReplyDataType =
## Fancy interface for evaluating the reply lengths for none, 1, or many.
## If the `reply` argument is `nil`, the result `NoReplyData` is returned
@ -547,23 +579,6 @@ proc `[]`*(reply: ReplyData; inx: int): Blob =
if inx < reply.hashVerifiedData.len:
return reply.hashVerifiedData[inx]
proc replyDataSetup*(sp: WorkerBuddy) =
## Initialise `WorkerBuddy` to support `NodeData` replies to `GetNodeData`
## requests issued by `new()`.
if sp.requestsEx.isNil:
sp.requestsEx = RequestDataQueue()
sp.peer.state(eth).onNodeData =
proc (_: Peer, data: openArray[Blob]) =
{.gcsafe.}: onNodeData(sp, data)
sp.peer.state(eth).onGetNodeData =
proc (_: Peer, hashes: openArray[Hash256], data: var seq[Blob]) =
## Return empty nodes result. This callback is installed to
## ensure we don't reply with nodes from the chainDb.
discard
# ------------------------------------------------------------------------------
# End
# ------------------------------------------------------------------------------

View File

@ -32,14 +32,14 @@
import
eth/[common/eth_types, p2p],
../../types,
../path_desc,
./worker_desc
../../../../types,
../../../path_desc,
../../worker_desc
{.push raises: [Defect].}
logScope:
topics = "snap validate trie node"
topics = "snap validate"
type
TrieNodeParseContext* = object

View File

@ -32,10 +32,13 @@ type
BuddyStat* = distinct uint
BuddyRunState* = enum
BuddyRunningOk
BuddyStopRequest
BuddyStopped
BuddyRunState = enum
## Combined state of two boolean values (`stopped`,`stopThisState`) as used
## in the original source set up (should be double checked and simplified.)
FullyRunning ## running, not requested to stop
StopRequested ## running, stop request
SingularStop ## stopped, no stop request (for completeness)
FullyStopped ## stopped, stop request
WorkerBuddyStats* = tuple
## Statistics counters for events associated with this peer.
@ -52,9 +55,9 @@ type
excessBlockHeaders: BuddyStat,
wrongBlockHeader: BuddyStat]
WorkerBuddyCtrl* = tuple
WorkerBuddyCtrl* = object
## Control and state settings
stateRoot: Option[TrieHash]
stateRoot*: Option[TrieHash]
## State root to fetch state for. This changes during sync and is
## slightly different for each peer.
runState: BuddyRunState
@ -91,18 +94,14 @@ type
# Public Constructor
# ------------------------------------------------------------------------------
proc new*(
T: type WorkerBuddy;
ns: Worker;
peer: Peer;
runState: BuddyRunState
): T =
## Initial state, maximum uncertainty range.
T(ns: ns,
peer: peer,
ctrl: (
stateRoot: none(TrieHash),
runState: runState))
proc new*(T: type WorkerBuddy; ns: Worker; peer: Peer): T =
## Initial state all default settings.
T(ns: ns, peer: peer)
proc init*(ctrl: var WorkerBuddyCtrl; fullyRunning: bool) =
## Set initial running state `fullyRunning` if the argument `fullyRunning`
## is `true`. Otherwise the running state is set `fullyStopped`.
ctrl.runState = if fullyRunning: FullyRunning else: FullyStopped
# ------------------------------------------------------------------------------
# Public functions
@ -113,6 +112,76 @@ proc `$`*(sp: WorkerBuddy): string =
proc inc(stat: var BuddyStat) {.borrow.}
# ------------------------------------------------------------------------------
# Public getters, `BuddyRunState` execution control functions
# ------------------------------------------------------------------------------
proc state*(ctrl: WorkerBuddyCtrl): BuddyRunState =
## Getter (logging only, details of `BuddyRunState` are private)
ctrl.runState
proc fullyRunning*(ctrl: WorkerBuddyCtrl): bool =
## Getter, if `true`, `stopped` and `stopRequest` are `false`
ctrl.runState == FullyRunning
proc fullyStopped*(ctrl: WorkerBuddyCtrl): bool =
## Getter, if `true`, `stopped` and `stopRequest` are `true`
ctrl.runState == FullyStopped
proc stopped*(ctrl: WorkerBuddyCtrl): bool =
## Getter, not running (ignoring pending stop request)
ctrl.runState in {FullyStopped,SingularStop}
proc stopRequest*(ctrl: WorkerBuddyCtrl): bool =
## Getter, pending stop request (ignoring running state)
ctrl.runState in {StopRequested,FullyStopped}
# ------------------------------------------------------------------------------
# Public setters, `BuddyRunState` execution control functions
# ------------------------------------------------------------------------------
proc `stopped=`*(ctrl: var WorkerBuddyCtrl; value: bool) =
## Setter
if value:
case ctrl.runState:
of FullyRunning:
ctrl.runState = SingularStop
of StopRequested:
ctrl.runState = FullyStopped
of SingularStop, FullyStopped:
discard
else:
case ctrl.runState:
of FullyRunning, StopRequested:
discard
of SingularStop:
ctrl.runState = FullyRunning
of FullyStopped:
ctrl.runState = StopRequested
proc `stopRequest=`*(ctrl: var WorkerBuddyCtrl; value: bool) =
## Setter, stop request (ignoring running state)
if value:
case ctrl.runState:
of FullyRunning:
ctrl.runState = StopRequested
of StopRequested:
discard
of SingularStop:
ctrl.runState = FullyStopped
of FullyStopped:
discard
else:
case ctrl.runState:
of FullyRunning:
discard
of StopRequested:
ctrl.runState = FullyRunning
of SingularStop:
discard
of FullyStopped:
ctrl.runState = SingularStop
# ------------------------------------------------------------------------------
# Public functions, debugging helpers (will go away eventually)
# ------------------------------------------------------------------------------