diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index 23917c16f..7bac1d1e3 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -137,7 +137,7 @@ proc setupP2P(nimbus: NimbusNode, conf: NimbusConf, # Early-initialise "--snap-sync" before starting any network connections. if ProtocolFlag.Eth in protocols and conf.snapSync: - SnapSyncCtx.new(nimbus.ethNode).start + SnapSyncCtx.new(nimbus.ethNode, conf.maxPeers).start # Connect directly to the static nodes let staticPeers = conf.getStaticPeers() diff --git a/nimbus/sync/fast.nim b/nimbus/sync/fast.nim index 4c45fe72a..2f5676b71 100644 --- a/nimbus/sync/fast.nim +++ b/nimbus/sync/fast.nim @@ -15,10 +15,13 @@ import eth/[common/eth_types, p2p], eth/p2p/[private/p2p_types, peer_pool], stew/byteutils, - "."/[protocol, trace_helper] + "."/[protocol, types] {.push raises:[Defect].} +logScope: + topics = "fast sync" + const minPeersToStartSync* = 2 # Wait for consensus of at least this # number of peers before syncing @@ -210,16 +213,16 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} = skip: 0, reverse: true) - tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer=p, + trace trEthSendSending & "GetBlockHeaders (0x03)", peer=p, startBlock=request.startBlock.hash.toHex, max=request.maxResults let latestBlock = await p.getBlockHeaders(request) if latestBlock.isSome: if latestBlock.get.headers.len > 0: result = latestBlock.get.headers[0].blockNumber - tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer=p, - count=latestBlock.get.headers.len, - blockNumber=(if latestBlock.get.headers.len > 0: $result else: "missing") + trace trEthRecvGot & "BlockHeaders (0x04)", peer=p, + count=latestBlock.get.headers.len, + blockNumber=(if latestBlock.get.headers.len > 0: $result else: "missing") proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} = # Update our best block number @@ -249,25 +252,26 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} = var dataReceived = false try: - tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer, + trace trEthSendSending & "GetBlockHeaders (0x03)", peer, startBlock=request.startBlock.number, max=request.maxResults, step=traceStep(request) let results = await peer.getBlockHeaders(request) if results.isSome: - tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer, + trace trEthRecvGot & "BlockHeaders (0x04)", peer, count=results.get.headers.len, requested=request.maxResults shallowCopy(workItem.headers, results.get.headers) var bodies = newSeqOfCap[BlockBody](workItem.headers.len) var hashes = newSeqOfCap[KeccakHash](maxBodiesFetch) template fetchBodies() = - tracePacket ">> Sending eth.GetBlockBodies (0x05)", peer, + trace trEthSendSending & "GetBlockBodies (0x05)", peer, hashes=hashes.len let b = await peer.getBlockBodies(hashes) if b.isNone: - raise newException(CatchableError, "Was not able to get the block bodies") + raise newException( + CatchableError, "Was not able to get the block bodies") let bodiesLen = b.get.blocks.len - tracePacket "<< Got reply eth.BlockBodies (0x06)", peer, + trace trEthRecvGot & "BlockBodies (0x06)", peer, count=bodiesLen, requested=hashes.len if bodiesLen == 0: raise newException(CatchableError, "Zero block bodies received for request") @@ -342,15 +346,15 @@ proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} = skip: 0, reverse: true) - tracePacket ">> Sending eth.GetBlockHeaders (0x03)", peer=a, + trace trEthSendSending & "GetBlockHeaders (0x03)", peer=a, startBlock=request.startBlock.hash.toHex, max=request.maxResults let latestBlock = await a.getBlockHeaders(request) result = latestBlock.isSome and latestBlock.get.headers.len > 0 - if tracePackets and latestBlock.isSome: + if latestBlock.isSome: let blockNumber = if result: $latestBlock.get.headers[0].blockNumber else: "missing" - tracePacket "<< Got reply eth.BlockHeaders (0x04)", peer=a, + trace trEthRecvGot & "BlockHeaders (0x04)", peer=a, count=latestBlock.get.headers.len, blockNumber proc randomTrustedPeer(ctx: FastSyncCtx): Peer = @@ -362,7 +366,7 @@ proc randomTrustedPeer(ctx: FastSyncCtx): Peer = inc i proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) {.async.} = - trace "start sync", peer, trustedPeers = ctx.trustedPeers.len + trace "Start sync", peer, trustedPeers = ctx.trustedPeers.len if ctx.trustedPeers.len >= minPeersToStartSync: # We have enough trusted peers. Validate new peer against trusted if await peersAgreeOnChain(peer, ctx.randomTrustedPeer()): diff --git a/nimbus/sync/protocol/eth66.nim b/nimbus/sync/protocol/eth66.nim index d2a9674c2..0184c54ff 100644 --- a/nimbus/sync/protocol/eth66.nim +++ b/nimbus/sync/protocol/eth66.nim @@ -40,15 +40,11 @@ import chronos, eth/[common/eth_types, p2p, p2p/private/p2p_types, p2p/blockchain_utils], stew/byteutils, - ".."/trace_helper, - ./pickeled_eth_tracers + ../types, + ./trace_config -export - tracePackets, tracePacket, - traceGossips, traceGossip, - traceTimeouts, traceTimeout, - traceNetworkErrors, traceNetworkError, - tracePacketErrors, tracePacketError +logScope: + topics = "datax" type NewBlockHashesAnnounce* = object @@ -80,6 +76,25 @@ const ethVersion* = 66 prettyEthProtoName* = "[eth/" & $ethVersion & "]" + # Pickeled tracer texts + trEthRecvReceived* = + "<< " & prettyEthProtoName & " Received " + trEthRecvGot* = + "<< " & prettyEthProtoName & " Got " + trEthRecvProtocolViolation* = + "<< " & prettyEthProtoName & " Protocol violation, " + trEthRecvError* = + "<< " & prettyEthProtoName & " Error " + trEthRecvTimeoutWaiting* = + "<< " & prettyEthProtoName & " Timeout waiting " + trEthSendSending* = + ">> " & prettyEthProtoName & " Sending " + trEthSendReplying* = + ">> " & prettyEthProtoName & " Replying " + trEthSendDelaying* = + ">> " & prettyEthProtoName & " Delaying " + trEthSendDiscarding* = + "<< " & prettyEthProtoName & " Discarding " p2pProtocol eth66(version = ethVersion, rlpxName = "eth", @@ -96,8 +111,8 @@ p2pProtocol eth66(version = ethVersion, forkHash: chainForkId.crc.toBytesBE, forkNext: chainForkId.nextFork.toBlockNumber) - traceSendSending "Status (0x00) " & prettyEthProtoName, - peer, td=bestBlock.difficulty, + trace trEthSendSending & "Status (0x00)", peer, + td=bestBlock.difficulty, bestHash=bestBlock.blockHash.toHex, networkId=network.networkId, genesis=chain.genesisHash.toHex, @@ -111,7 +126,7 @@ p2pProtocol eth66(version = ethVersion, forkId, timeout = chronos.seconds(10)) - if traceHandshakes: + when trEthTraceHandshakesOk: trace "Handshake: Local and remote networkId", local=network.networkId, remote=m.networkId trace "Handshake: Local and remote genesisHash", @@ -123,12 +138,14 @@ p2pProtocol eth66(version = ethVersion, if m.networkId != network.networkId: trace "Peer for a different network (networkId)", peer, expectNetworkId=network.networkId, gotNetworkId=m.networkId - raise newException(UselessPeerError, "Eth handshake for different network") + raise newException( + UselessPeerError, "Eth handshake for different network") if m.genesisHash != chain.genesisHash: trace "Peer for a different network (genesisHash)", peer, expectGenesis=chain.genesisHash.toHex, gotGenesis=m.genesisHash.toHex - raise newException(UselessPeerError, "Eth handshake for different network") + raise newException( + UselessPeerError, "Eth handshake for different network") trace "Peer matches our network", peer peer.state.initialized = true @@ -144,42 +161,37 @@ p2pProtocol eth66(version = ethVersion, bestHash: Hash256, genesisHash: Hash256, forkId: ForkId) = - traceRecvReceived "Status (0x00)", - peer, networkId, totalDifficulty, bestHash, genesisHash, + trace trEthRecvReceived & "Status (0x00)", peer, + networkId, totalDifficulty, bestHash, genesisHash, forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext # User message 0x01: NewBlockHashes. proc newBlockHashes(peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) = - traceSendGossipDiscarding "NewBlockHashes (0x01)", - peer, hashes=hashes.len + when trEthTraceGossipOk: + trace trEthSendDiscarding & "NewBlockHashes (0x01)", peer, + hashes=hashes.len discard # User message 0x02: Transactions. proc transactions(peer: Peer, transactions: openArray[Transaction]) = - traceSendGossipDiscarding "Transactions (0x02)", - peer, transactions=transactions.len + when trEthTraceGossipOk: + trace trEthSendDiscarding & "Transactions (0x02)", peer, + transactions=transactions.len discard requestResponse: # User message 0x03: GetBlockHeaders. proc getBlockHeaders(peer: Peer, request: BlocksRequest) = - if tracePackets: - if request.maxResults == 1 and request.startBlock.isHash: - traceRecvReceived "GetBlockHeaders/Hash (0x03)", - peer, blockHash=($request.startBlock.hash), count=1 - elif request.maxResults == 1: - traceRecvReceived "GetBlockHeaders (0x03)", - peer, `block`=request.startBlock.number, count=1 - elif request.startBlock.isHash: - traceRecvReceived "GetBlockHeaders/Hash (0x03)", - peer, firstBlockHash=($request.startBlock.hash), - count=request.maxResults, - step=traceStep(request) - else: - traceRecvReceived "GetBlockHeaders (0x03)", - peer, firstBlock=request.startBlock.number, - count=request.maxResults, - step=traceStep(request) + when trEthTracePacketsOk: + let + startBlock = + if request.startBlock.isHash: request.startBlock.hash.toHex + else: '#' & $request.startBlock.number + step = + if request.maxResults == 1: "n/a" + else: $request.traceStep + trace trEthRecvReceived & "GetBlockHeaders (0x03)", peer, + startBlock, count=request.maxResults, step if request.maxResults > uint64(maxHeadersFetch): debug "GetBlockHeaders (0x03) requested too many headers", @@ -189,11 +201,11 @@ p2pProtocol eth66(version = ethVersion, let headers = peer.network.chain.getBlockHeaders(request) if headers.len > 0: - traceSendReplying "with BlockHeaders (0x04)", - peer, sent=headers.len, requested=request.maxResults + trace trEthSendReplying & "with BlockHeaders (0x04)", peer, + sent=headers.len, requested=request.maxResults else: - traceSendReplying "EMPTY BlockHeaders (0x04)", - peer, sent=0, requested=request.maxResults + trace trEthSendReplying & "EMPTY BlockHeaders (0x04)", peer, + sent=0, requested=request.maxResults await response.send(headers) @@ -203,8 +215,8 @@ p2pProtocol eth66(version = ethVersion, requestResponse: # User message 0x05: GetBlockBodies. proc getBlockBodies(peer: Peer, hashes: openArray[Hash256]) = - traceRecvReceived "GetBlockBodies (0x05)", - peer, hashes=hashes.len + trace trEthRecvReceived & "GetBlockBodies (0x05)", peer, + hashes=hashes.len if hashes.len > maxBodiesFetch: debug "GetBlockBodies (0x05) requested too many bodies", peer, requested=hashes.len, max=maxBodiesFetch @@ -213,11 +225,11 @@ p2pProtocol eth66(version = ethVersion, let bodies = peer.network.chain.getBlockBodies(hashes) if bodies.len > 0: - traceSendReplying "with BlockBodies (0x06)", - peer, sent=bodies.len, requested=hashes.len + trace trEthSendReplying & "with BlockBodies (0x06)", peer, + sent=bodies.len, requested=hashes.len else: - traceSendReplying "EMPTY BlockBodies (0x06)", - peer, sent=0, requested=hashes.len + trace trEthSendReplying & "EMPTY BlockBodies (0x06)", peer, + sent=0, requested=hashes.len await response.send(bodies) @@ -228,26 +240,28 @@ p2pProtocol eth66(version = ethVersion, proc newBlock(peer: Peer, bh: EthBlock, totalDifficulty: DifficultyInt) = # (Note, needs to use `EthBlock` instead of its alias `NewBlockAnnounce` # because either `p2pProtocol` or RLPx doesn't work with an alias.) - traceSendGossipDiscarding "NewBlock (0x07)", - peer, totalDifficulty, - blockNumber = bh.header.blockNumber, - blockDifficulty = bh.header.difficulty + when trEthTraceGossipOk: + trace trEthSendDiscarding & "NewBlock (0x07)", peer, + totalDifficulty, + blockNumber = bh.header.blockNumber, + blockDifficulty = bh.header.difficulty discard # User message 0x08: NewPooledTransactionHashes. proc newPooledTransactionHashes(peer: Peer, txHashes: openArray[Hash256]) = - traceSendGossipDiscarding "NewPooledTransactionHashes (0x08)", - peer, hashes=txHashes.len + when trEthTraceGossipOk: + trace trEthSendDiscarding & "NewPooledTransactionHashes (0x08)", peer, + hashes=txHashes.len discard requestResponse: # User message 0x09: GetPooledTransactions. proc getPooledTransactions(peer: Peer, txHashes: openArray[Hash256]) = - traceRecvReceived "GetPooledTransactions (0x09)", - peer, hashes=txHashes.len + trace trEthRecvReceived & "GetPooledTransactions (0x09)", peer, + hashes=txHashes.len - traceSendReplying "EMPTY PooledTransactions (0x10)", - peer, sent=0, requested=txHashes.len + trace trEthSendReplying & "EMPTY PooledTransactions (0x10)", peer, + sent=0, requested=txHashes.len await response.send([]) # User message 0x0a: PooledTransactions. @@ -257,7 +271,7 @@ p2pProtocol eth66(version = ethVersion, # User message 0x0d: GetNodeData. proc getNodeData(peer: Peer, nodeHashes: openArray[Hash256]) = - traceRecvReceived "GetNodeData (0x0d)", peer, + trace trEthRecvReceived & "GetNodeData (0x0d)", peer, hashes=nodeHashes.len var data: seq[Blob] @@ -267,10 +281,10 @@ p2pProtocol eth66(version = ethVersion, data = peer.network.chain.getStorageNodes(nodeHashes) if data.len > 0: - traceSendReplying "with NodeData (0x0e)", peer, + trace trEthSendReplying & "with NodeData (0x0e)", peer, sent=data.len, requested=nodeHashes.len else: - traceSendReplying "EMPTY NodeData (0x0e)", peer, + trace trEthSendReplying & "EMPTY NodeData (0x0e)", peer, sent=0, requested=nodeHashes.len await peer.nodeData(data) @@ -282,16 +296,16 @@ p2pProtocol eth66(version = ethVersion, # know if this is a valid reply ("Got reply") or something else. peer.state.onNodeData(peer, data) else: - traceSendDiscarding "NodeData (0x0e)", peer, + trace trEthSendDiscarding & "NodeData (0x0e)", peer, bytes=data.len requestResponse: # User message 0x0f: GetReceipts. proc getReceipts(peer: Peer, hashes: openArray[Hash256]) = - traceRecvReceived "GetReceipts (0x0f)", peer, + trace trEthRecvReceived & "GetReceipts (0x0f)", peer, hashes=hashes.len - traceSendReplying "EMPTY Receipts (0x10)", peer, + trace trEthSendReplying & "EMPTY Receipts (0x10)", peer, sent=0, requested=hashes.len await response.send([]) # TODO: implement `getReceipts` and reactivate this code diff --git a/nimbus/sync/protocol/pickeled_eth_tracers.nim b/nimbus/sync/protocol/pickeled_eth_tracers.nim deleted file mode 100644 index 0e0a930b2..000000000 --- a/nimbus/sync/protocol/pickeled_eth_tracers.nim +++ /dev/null @@ -1,52 +0,0 @@ -# Nimbus - Rapidly converge on and track the canonical chain head of each peer -# -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -template traceRecvReceived*(msg: static[string], args: varargs[untyped]) = - tracePacket "<< " & prettyEthProtoName & " Received " & msg, - `args` - -template traceRecvGot*(msg: static[string], args: varargs[untyped]) = - tracePacket "<< " & prettyEthProtoName & " Got " & msg, - `args` - -template traceRecvProtocolViolation*(msg: static[string], args: varargs[untyped]) = - tracePacketError "<< " & prettyEthProtoName & " Protocol violation, " & msg, - `args` - -template traceRecvError*(msg: static[string], args: varargs[untyped]) = - traceNetworkError "<< " & prettyEthProtoName & " Error " & msg, - `args` - -template traceRecvTimeoutWaiting*(msg: static[string], args: varargs[untyped]) = - traceTimeout "<< " & prettyEthProtoName & " Timeout waiting " & msg, - `args` - -template traceSendSending*(msg: static[string], args: varargs[untyped]) = - tracePacket ">> " & prettyEthProtoName & " Sending " & msg, - `args` - -template traceSendReplying*(msg: static[string], args: varargs[untyped]) = - tracePacket ">> " & prettyEthProtoName & " Replying " & msg, - `args` - -template traceSendDelaying*(msg: static[string], args: varargs[untyped]) = - tracePacket ">>" & prettyEthProtoName & " Delaying " & msg, - `args` - -template traceSendGossipDiscarding*(msg: static[string], args: varargs[untyped]) = - traceGossip "<< " & prettyEthProtoName & " Discarding " & msg, - `args` - -template traceSendDiscarding*(msg: static[string], args: varargs[untyped]) = - tracePacket "<< " & prettyEthProtoName & " Discarding " & msg, - `args` - -# End diff --git a/nimbus/sync/protocol/pickeled_snap_tracers.nim b/nimbus/sync/protocol/pickeled_snap_tracers.nim deleted file mode 100644 index a1eeb1661..000000000 --- a/nimbus/sync/protocol/pickeled_snap_tracers.nim +++ /dev/null @@ -1,40 +0,0 @@ -# Nimbus - Rapidly converge on and track the canonical chain head of each peer -# -# Copyright (c) 2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or distributed -# except according to those terms. - -template traceRecvReceived*(msg: static[string], args: varargs[untyped]) = - tracePacket "<< " & prettySnapProtoName & " Received " & msg, - `args` - -template traceRecvGot*(msg: static[string], args: varargs[untyped]) = - tracePacket "<< " & prettySnapProtoName & " Got " & msg, - `args` - -template traceRecvProtocolViolation*(msg: static[string], args: varargs[untyped]) = - tracePacketError "<< " & prettySnapProtoName & " Protocol violation, " & msg, - `args` - -template traceRecvError*(msg: static[string], args: varargs[untyped]) = - traceNetworkError "<< " & prettySnapProtoName & " Error " & msg, - `args` - -template traceRecvTimeoutWaiting*(msg: static[string], args: varargs[untyped]) = - traceTimeout "<< " & prettySnapProtoName & " Timeout waiting " & msg, - `args` - -template traceSendSending*(msg: static[string], args: varargs[untyped]) = - tracePacket ">> " & prettySnapProtoName & " Sending " & msg, - `args` - -template traceSendReplying*(msg: static[string], args: varargs[untyped]) = - tracePacket ">> " & prettySnapProtoName & " Replying " & msg, - `args` - -# End diff --git a/nimbus/sync/protocol/snap1.nim b/nimbus/sync/protocol/snap1.nim index 0b3b72296..0e62f9d89 100644 --- a/nimbus/sync/protocol/snap1.nim +++ b/nimbus/sync/protocol/snap1.nim @@ -203,9 +203,12 @@ import eth/[common/eth_types, p2p, p2p/private/p2p_types], nimcrypto/hash, stew/byteutils, - ".."/[snap/path_desc, trace_helper], ../../constants, - ./pickeled_snap_tracers + ../snap/path_desc, + ./trace_config + +logScope: + topics = "datax" type SnapAccount* = object @@ -224,6 +227,23 @@ const snapVersion* = 1 prettySnapProtoName* = "[snap/" & $snapVersion & "]" + # Pickeled tracer texts + trSnapRecvReceived* = + "<< " & prettySnapProtoName & " Received " + trSnapRecvGot* = + "<< " & prettySnapProtoName & " Got " + trSnapRecvProtocolViolation* = + "<< " & prettySnapProtoName & " Protocol violation, " + trSnapRecvError* = + "<< " & prettySnapProtoName & " Error " + trSnapRecvTimeoutWaiting* = + "<< " & prettySnapProtoName & " Timeout waiting " + trSnapSendSending* = + ">> " & prettySnapProtoName & " Sending " + trSnapSendReplying* = + ">> " & prettySnapProtoName & " Replying " + + # The `snap` protocol represents `Account` differently from the regular RLP # serialisation used in `eth` protocol as well as the canonical Merkle hash # over all accounts. In `snap`, empty storage hash and empty code hash are @@ -297,11 +317,12 @@ p2pProtocol snap1(version = 1, # Next line differs from spec to match Geth. origin: LeafPath, limit: LeafPath, responseBytes: uint64) = - traceRecvReceived "GetAccountRange (0x00)", peer, + trace trSnapRecvReceived & "GetAccountRange (0x00)", peer, + # traceRecvReceived "GetAccountRange (0x00)", peer, accountRange=pathRange(origin, limit), stateRoot=($rootHash), responseBytes - traceSendReplying "EMPTY AccountRange (0x01)", peer, sent=0 + trace trSnapSendReplying & "EMPTY AccountRange (0x01)", peer, sent=0 await response.send(@[], @[]) # User message 0x01: AccountRange. @@ -316,7 +337,7 @@ p2pProtocol snap1(version = 1, # Next line differs from spec to match Geth. origin: openArray[byte], limit: openArray[byte], responseBytes: uint64) = - if tracePackets: + when trSnapTracePacketsOk: var definiteFullRange = ((origin.len == 32 or origin.len == 0) and (limit.len == 32 or limit.len == 0)) if definiteFullRange: @@ -337,12 +358,12 @@ p2pProtocol snap1(version = 1, if definiteFullRange: # Fetching storage for multiple accounts. - traceRecvReceived "GetStorageRanges/A (0x02)", peer, + trace trSnapRecvReceived & "GetStorageRanges/A (0x02)", peer, accountPaths=accounts.len, stateRoot=($rootHash), responseBytes elif accounts.len == 1: # Fetching partial storage for one account, aka. "large contract". - traceRecvReceived "GetStorageRanges/S (0x02)", peer, + trace trSnapRecvReceived & "GetStorageRanges/S (0x02)", peer, accountPaths=1, storageRange=(describe(origin) & '-' & describe(limit)), stateRoot=($rootHash), responseBytes @@ -350,12 +371,12 @@ p2pProtocol snap1(version = 1, # This branch is separated because these shouldn't occur. It's not # really specified what happens when there are multiple accounts and # non-default path range. - traceRecvReceived "GetStorageRanges/AS?? (0x02)", peer, + trace trSnapRecvReceived & "GetStorageRanges/AS?? (0x02)", peer, accountPaths=accounts.len, storageRange=(describe(origin) & '-' & describe(limit)), stateRoot=($rootHash), responseBytes - traceSendReplying "EMPTY StorageRanges (0x03)", peer, sent=0 + trace trSnapSendReplying & "EMPTY StorageRanges (0x03)", peer, sent=0 await response.send(@[], @[]) # User message 0x03: StorageRanges. @@ -367,10 +388,10 @@ p2pProtocol snap1(version = 1, requestResponse: proc getByteCodes(peer: Peer, nodeHashes: openArray[Hash256], responseBytes: uint64) = - traceRecvReceived "GetByteCodes (0x04)", peer, + trace trSnapRecvReceived & "GetByteCodes (0x04)", peer, hashes=nodeHashes.len, responseBytes - traceSendReplying "EMPTY ByteCodes (0x05)", peer, sent=0 + trace trSnapSendReplying & "EMPTY ByteCodes (0x05)", peer, sent=0 await response.send(@[]) # User message 0x05: ByteCodes. @@ -380,10 +401,10 @@ p2pProtocol snap1(version = 1, requestResponse: proc getTrieNodes(peer: Peer, rootHash: Hash256, paths: openArray[InteriorPath], responseBytes: uint64) = - traceRecvReceived "GetTrieNodes (0x06)", peer, + trace trSnapRecvReceived & "GetTrieNodes (0x06)", peer, nodePaths=paths.len, stateRoot=($rootHash), responseBytes - traceSendReplying "EMPTY TrieNodes (0x07)", peer, sent=0 + trace trSnapSendReplying & "EMPTY TrieNodes (0x07)", peer, sent=0 await response.send(@[]) # User message 0x07: TrieNodes. diff --git a/nimbus/sync/protocol/trace_config.nim b/nimbus/sync/protocol/trace_config.nim new file mode 100644 index 000000000..e1ebcd4ea --- /dev/null +++ b/nimbus/sync/protocol/trace_config.nim @@ -0,0 +1,30 @@ +# Nimbus - Ethereum Wire Protocol, version eth/65 +# +# Copyright (c) 2018-2021 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or +# http://www.apache.org/licenses/LICENSE-2.0) +# * MIT license ([LICENSE-MIT](LICENSE-MIT) or +# http://opensource.org/licenses/MIT) +# at your option. This file may not be copied, modified, or distributed +# except according to those terms. + +const + # Some static noisy settings for `eth` debugging + trEthTracePacketsOk* = true + ## `trace` log each sync network message. + trEthTraceGossipOk* = true + ## `trace` log each sync network message. + trEthTraceHandshakesOk* = true + ## `trace` log each network handshake message. + trEthTraceIndividualNodesOk* = true + ## `trace` log each trie node, account, storage, receipt, etc. + + # Some static noisy settings for `snap` debugging + trSnapTracePacketsOk* = true + ## `trace` log each sync network message. + +# The files and lines clutter differently when sync tracing is enabled. +# publicLogScope: chroniclesLineNumbers=false + +# End diff --git a/nimbus/sync/snap.nim b/nimbus/sync/snap.nim index 8b74df601..a103ad9be 100644 --- a/nimbus/sync/snap.nim +++ b/nimbus/sync/snap.nim @@ -10,102 +10,149 @@ # except according to those terms. import + std/[hashes, strutils], chronicles, chronos, - eth/[common/eth_types, p2p, rlp], - eth/p2p/[peer_pool, private/p2p_types, rlpx], - stint, - ./protocol, - ./snap/[base_desc, chain_head_tracker, get_nodedata, types], - ./snap/pie/[sync_desc, peer_desc] + eth/[common/eth_types, p2p, p2p/peer_pool, p2p/private/p2p_types], + stew/keyed_queue, + "."/[protocol, types], + ./snap/[base_desc, collect] {.push raises: [Defect].} +logScope: + topics = "snap sync" + type - SnapSyncCtx* = ref object of SnapSyncEx - peerPool: PeerPool + SnapSyncCtx* = ref object of SnapSync + peerTab: KeyedQueue[Peer,SnapPeer] ## LRU cache + tabSize: int ## maximal number of entries + pool: PeerPool ## for starting the system, debugging + + # debugging + lastDump: seq[string] + lastlen: int # ------------------------------------------------------------------------------ # Private helpers # ------------------------------------------------------------------------------ -proc fetchPeerDesc(ns: SnapSyncCtx, peer: Peer): SnapPeerEx = - ## Find matching peer and remove descriptor from list - for i in 0 ..< ns.syncPeers.len: - if ns.syncPeers[i].peer == peer: - result = ns.syncPeers[i].ex - ns.syncPeers.delete(i) - return +proc nsCtx(sp: SnapPeer): SnapSyncCtx = + sp.ns.SnapSyncCtx -proc new(T: type SnapPeerEx; ns: SnapSyncCtx; peer: Peer): T = - T( - ns: ns, - peer: peer, - stopped: false, - # Initial state: hunt forward, maximum uncertainty range. - syncMode: SyncHuntForward, - huntLow: 0.toBlockNumber, - huntHigh: high(BlockNumber), - huntStep: 0, - bestBlockNumber: 0.toBlockNumber) +proc hash(peer: Peer): Hash = + ## Needed for `peerTab` table key comparison + hash(peer.remote.id) + +# ------------------------------------------------------------------------------ +# Private debugging helpers +# ------------------------------------------------------------------------------ + +proc dumpPeers(sn: SnapSyncCtx; force = false) = + if sn.lastLen != sn.peerTab.len or force: + sn.lastLen = sn.peerTab.len + + let poolSize = sn.pool.len + if sn.peerTab.len == 0: + trace "*** Empty peer list", poolSize + else: + var n = sn.peerTab.len - 1 + for sp in sn.peerTab.prevValues: + trace "*** Peer list entry", + n, poolSize, peer=sp, hunt=sp.hunt.pp + n.dec # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc syncPeerLoop(sp: SnapPeerEx) {.async.} = +proc syncPeerLoop(sp: SnapPeer) {.async.} = # This basic loop just runs the head-hunter for each peer. - while not sp.stopped: - await sp.peerHuntCanonical() - if sp.stopped: + var cache = "" + while sp.ctrl.runState != SyncStopped: + + # Do something, work a bit + await sp.collectBlockHeaders() + if sp.ctrl.runState == SyncStopped: + trace "Ignoring stopped peer", peer=sp return - let delayMs = if sp.syncMode == SyncLocked: 1000 else: 50 + + # Rotate LRU connection table so the most used entry is at the list end + # TODO: Update implementation of lruFetch() using re-link, only + discard sp.nsCtx.peerTab.lruFetch(sp.peer) + + let delayMs = if sp.hunt.syncMode == SyncLocked: 1000 else: 50 await sleepAsync(chronos.milliseconds(delayMs)) -proc syncPeerStart(sp: SnapPeerEx) = +proc syncPeerStart(sp: SnapPeer) = asyncSpawn sp.syncPeerLoop() -proc syncPeerStop(sp: SnapPeerEx) = - sp.stopped = true - # TODO: Cancel running `SnapPeerEx` instances. We need clean cancellation +proc syncPeerStop(sp: SnapPeer) = + sp.ctrl.runState = SyncStopped + # TODO: Cancel running `SnapPeer` instances. We need clean cancellation # for this. Doing so reliably will be addressed at a later time. proc onPeerConnected(ns: SnapSyncCtx, peer: Peer) = - trace "Snap: Peer connected", peer + trace "Peer connected", peer - let sp = SnapPeerEx.new(ns, peer) - sp.setupGetNodeData() + let sp = SnapPeer.new(ns, peer, SyncHuntForward, SyncRunningOk) + sp.collectDataSetup() if peer.state(eth).initialized: # We know the hash but not the block number. - sp.bestBlockHash = peer.state(eth).bestBlockHash.BlockHash + sp.hunt.bestHash = peer.state(eth).bestBlockHash.BlockHash # TODO: Temporarily disabled because it's useful to test the head hunter. # sp.syncMode = SyncOnlyHash else: - trace "Snap: state(eth) not initialized!" + trace "State(eth) not initialized!" - ns.syncPeers.add(sp) + # Manage connection table, check for existing entry + if ns.peerTab.hasKey(peer): + trace "Peer exists already!", peer + return + + # Check for table overflow. An overflow should not happen if the table is + # as large as the peer connection table. + if ns.tabSize <= ns.peerTab.len: + let leastPeer = ns.peerTab.shift.value.data + leastPeer.syncPeerStop + trace "Peer table full, deleted least used", + leastPeer, poolSize=ns.pool.len, tabLen=ns.peerTab.len, tabMax=ns.tabSize + + # Add peer entry + discard ns.peerTab.append(sp.peer,sp) + trace "Starting peer", + peer, poolSize=ns.pool.len, tabLen=ns.peerTab.len, tabMax=ns.tabSize + + # Debugging, peer table dump after adding gentry + #ns.dumpPeers(true) sp.syncPeerStart() proc onPeerDisconnected(ns: SnapSyncCtx, peer: Peer) = - trace "Snap: Peer disconnected", peer + trace "Peer disconnected", peer - let sp = ns.fetchPeerDesc(peer) - if sp.isNil: - debug "Snap: Disconnected from unregistered peer", peer + # Debugging, peer table dump before removing entry + #ns.dumpPeers(true) + + let rc = ns.peerTab.delete(peer) + if rc.isOk: + rc.value.data.syncPeerStop() else: - sp.syncPeerStop() + debug "Disconnected from unregistered peer", peer # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc new*(T: type SnapSyncCtx; ethNode: EthereumNode): T = +proc new*(T: type SnapSyncCtx; ethNode: EthereumNode; maxPeers: int): T = ## Constructor new result - result.peerPool = ethNode.peerPool + let size = max(1,2*maxPeers) # allow double argument size + result.peerTab.init(size) + result.tabSize = size + result.pool = ethNode.peerPool proc start*(ctx: SnapSyncCtx) = ## Set up syncing. This call should come early. @@ -117,7 +164,7 @@ proc start*(ctx: SnapSyncCtx) = proc(p: Peer) {.gcsafe.} = ctx.onPeerDisconnected(p)) po.setProtocol eth - ctx.peerPool.addObserver(ctx, po) + ctx.pool.addObserver(ctx, po) # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/base_desc.nim b/nimbus/sync/snap/base_desc.nim index 8f2e470f6..1f72c8149 100644 --- a/nimbus/sync/snap/base_desc.nim +++ b/nimbus/sync/snap/base_desc.nim @@ -12,7 +12,8 @@ import eth/[common/eth_types, p2p], stew/[byteutils, keyed_queue, results], - ./types + ../../constants, + ../types {.push raises: [Defect].} @@ -21,28 +22,13 @@ const ## Internal size of LRU cache (for debugging) type - SnapStat* = distinct int + SnapPeerStat* = distinct uint - SnapPeerStatsOk = object - reorgDetected*: SnapStat - getBlockHeaders*: SnapStat - getNodeData*: SnapStat + SnapPeerFetchBase* = ref object of RootObj + ## Stub object, to be inherited - SnapPeerStatsMinor = object - timeoutBlockHeaders*: SnapStat - unexpectedBlockHash*: SnapStat - - SnapPeerStatsMajor = object - networkErrors*: SnapStat - excessBlockHeaders*: SnapStat - wrongBlockHeader*: SnapStat - - SnapPeerStats* = object - ## Statistics counters for events associated with this peer. - ## These may be used to recognise errors and select good peers. - ok*: SnapPeerStatsOk - minor*: SnapPeerStatsMinor - major*: SnapPeerStatsMajor + SnapPeerRequestsBase* = ref object of RootObj + ## Stub object, to be inherited SnapPeerMode* = enum ## The current state of tracking the peer's canonical chain head. @@ -54,63 +40,120 @@ type SyncHuntRange SyncHuntRangeFinal - SnapPeerBase* = ref object of RootObj - ## Peer state tracking. - ns*: SnapSyncBase ## Opaque object reference - peer*: Peer ## eth p2pProtocol - stopped*: bool - pendingGetBlockHeaders*:bool - stats*: SnapPeerStats + SnapPeerRunState* = enum + SyncRunningOk + SyncStopRequest + SyncStopped - # Peer canonical chain head ("best block") search state. - syncMode*: SnapPeerMode - bestBlockNumber*: BlockNumber - bestBlockHash*: BlockHash - huntLow*: BlockNumber ## Recent highest known present block. - huntHigh*: BlockNumber ## Recent lowest known absent block. - huntStep*: typeof(BlocksRequest.skip) # aka uint + SnapPeerStats* = tuple + ## Statistics counters for events associated with this peer. + ## These may be used to recognise errors and select good peers. + ok: tuple[ + reorgDetected: SnapPeerStat, + getBlockHeaders: SnapPeerStat, + getNodeData: SnapPeerStat] + minor: tuple[ + timeoutBlockHeaders: SnapPeerStat, + unexpectedBlockHash: SnapPeerStat] + major: tuple[ + networkErrors: SnapPeerStat, + excessBlockHeaders: SnapPeerStat, + wrongBlockHeader: SnapPeerStat] - # State root to fetch state for. - # This changes during sync and is slightly different for each peer. - syncStateRoot*: Option[TrieHash] - startedFetch*: bool - stopThisState*: bool + SnapPeerHunt* = tuple + ## Peer canonical chain head ("best block") search state. + syncMode: SnapPeerMode ## Action mode + lowNumber: BlockNumber ## Recent lowest known block number. + highNumber: BlockNumber ## Recent highest known block number. + bestNumber: BlockNumber + bestHash: BlockHash + step: uint - SnapSyncBase* = ref object of RootObj - ## Shared state among all peers of a snap syncing node. - seenBlock: KeyedQueue[array[32,byte],BlockNumber] - ## Temporary for pretty debugging, BlockHash keyed lru cache - syncPeers*: seq[SnapPeerBase] - ## Peer state tracking + SnapPeerCtrl* = tuple + ## Control and state settings + stateRoot: Option[TrieHash] + ## State root to fetch state for. This changes during sync and is + ## slightly different for each peer. + runState: SnapPeerRunState + + # ------- + + SnapSyncSeenBlocks = KeyedQueue[array[32,byte],BlockNumber] + ## Temporary for pretty debugging, `BlockHash` keyed lru cache + + SnapSyncFetchBase* = ref object of RootObj + ## Stub object, to be inherited + + # ------- + + SnapPeer* = ref object + ## Non-inheritable peer state tracking descriptor. + ns*: SnapSync ## Snap descriptor object back reference + peer*: Peer ## Reference to eth p2pProtocol entry + stats*: SnapPeerStats ## Statistics counters + hunt*: SnapPeerHunt ## Peer chain head search state + ctrl*: SnapPeerCtrl ## Control and state settings + requests*: SnapPeerRequestsBase ## Opaque object reference + fetchState*: SnapPeerFetchBase ## Opaque object reference + + SnapSync* = ref object of RootObj + ## Shared state among all peers of a snap syncing node. Will be + ## amended/inherited into `SnapSyncCtx` by the `snap` module. + seenBlock: SnapSyncSeenBlocks ## Temporary, debugging, prettyfied logs + sharedFetch*: SnapSyncFetchBase ## Opaque object reference + +# ------------------------------------------------------------------------------ +# Public Constructor +# ------------------------------------------------------------------------------ + +proc new*( + T: type SnapPeer; + ns: SnapSync; + peer: Peer; + syncMode: SnapPeerMode; + runState: SnapPeerRunState): T = + ## Initial state, maximum uncertainty range. + T(ns: ns, + peer: peer, + ctrl: ( + stateRoot: none(TrieHash), + runState: runState), + hunt: ( + syncMode: syncMode, + lowNumber: 0.toBlockNumber.BlockNumber, + highNumber: high(BlockNumber).BlockNumber, # maximum uncertainty range. + bestNumber: 0.toBlockNumber.BlockNumber, + bestHash: ZERO_HASH256.BlockHash, # whatever + step: 0u)) # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc `$`*(sp: SnapPeerBase): string = +proc `$`*(sp: SnapPeer): string = $sp.peer -proc inc(stat: var SnapStat) {.borrow.} +proc inc(stat: var SnapPeerStat) {.borrow.} # ------------------------------------------------------------------------------ # Public functions, debugging helpers (will go away eventually) # ------------------------------------------------------------------------------ -proc pp*(sn: SnapSyncBase; bh: BlockHash): string = +proc pp*(sn: SnapSync; bh: BlockHash): string = ## Pretty printer for debugging let rc = sn.seenBlock.lruFetch(bh.untie.data) if rc.isOk: return "#" & $rc.value $bh.untie.data.toHex -proc pp*(sn: SnapSyncBase; bh: BlockHash; bn: BlockNumber): string = +proc pp*(sn: SnapSync; bh: BlockHash; bn: BlockNumber): string = ## Pretty printer for debugging let rc = sn.seenBlock.lruFetch(bh.untie.data) if rc.isOk: return "#" & $rc.value "#" & $sn.seenBlock.lruAppend(bh.untie.data, bn, seenBlocksMax) -proc pp*(sn: SnapSyncBase; bhn: HashOrNum): string = +proc pp*(sn: SnapSync; bhn: HashOrNum): string = if not bhn.isHash: return "num(#" & $bhn.number & ")" let rc = sn.seenBlock.lruFetch(bhn.hash.data) @@ -118,11 +161,30 @@ proc pp*(sn: SnapSyncBase; bhn: HashOrNum): string = return "hash(#" & $rc.value & ")" return "hash(" & $bhn.hash.data.toHex & ")" -proc seen*(sn: SnapSyncBase; bh: BlockHash; bn: BlockNumber) = +proc seen*(sn: SnapSync; bh: BlockHash; bn: BlockNumber) = ## Register for pretty printing if not sn.seenBlock.lruFetch(bh.untie.data).isOk: discard sn.seenBlock.lruAppend(bh.untie.data, bn, seenBlocksMax) +# ----------- + +import + ../../../tests/replay/pp_light + +proc pp*(bh: BlockHash): string = + bh.Hash256.pp + +proc pp*(bn: BlockNumber): string = + if bn == high(BlockNumber): "#max" + else: "#" & $bn + +proc pp*(sp: SnapPeerHunt): string = + result &= "(mode=" & $sp.syncMode + result &= ",num=(" & sp.lowNumber.pp & "," & sp.highNumber.pp & ")" + result &= ",best=(" & sp.bestNumber.pp & "," & sp.bestHash.pp & ")" + result &= ",step=" & $sp.step + result &= ")" + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/chain_head_tracker.nim b/nimbus/sync/snap/collect.nim similarity index 66% rename from nimbus/sync/snap/chain_head_tracker.nim rename to nimbus/sync/snap/collect.nim index 84f1ff890..37ee2fcb9 100644 --- a/nimbus/sync/snap/chain_head_tracker.nim +++ b/nimbus/sync/snap/collect.nim @@ -64,11 +64,14 @@ import chronos, eth/[common/eth_types, p2p, p2p/private/p2p_types], ../../p2p/chain/chain_desc, - ".."/[protocol, protocol/pickeled_eth_tracers, trace_helper], - "."/[base_desc, pie/peer_desc, pie/slicer, types] + ".."/[protocol, types], + "."/[base_desc, peer/fetch, peer/reply_data] {.push raises: [Defect].} +logScope: + topics = "snap collect" + const syncLockedMinimumReply = 8 ## Minimum number of headers we assume any peers will send if they have @@ -112,126 +115,134 @@ static: doAssert syncHuntForwardExpandShift >= 1 and syncHuntForwardExpandShift <= 8 doAssert syncHuntBackwardExpandShift >= 1 and syncHuntBackwardExpandShift <= 8 + # Make sure that request/response wire protocol messages are id-tracked and + # would not overlap (no multi-protocol legacy support) + doAssert 66 <= protocol.ethVersion + # ------------------------------------------------------------------------------ # Private logging helpers # ------------------------------------------------------------------------------ -proc traceSyncLocked(sp: SnapPeerEx, bestNumber: BlockNumber, - bestHash: BlockHash) = +proc traceSyncLocked(sp: SnapPeer, number: BlockNumber, hash: BlockHash) = ## Trace messages when peer canonical head is confirmed or updated. - let bestBlock = sp.ns.pp(bestHash,bestNumber) - if sp.syncMode != SyncLocked: - debug "Snap: Now tracking chain head of peer", peer=sp, bestBlock - elif bestNumber > sp.bestBlockNumber: - if bestNumber == sp.bestBlockNumber + 1: - debug "Snap: Peer chain head advanced one block", peer=sp, + let + bestBlock = sp.ns.pp(hash, number) + peer = $sp + if sp.hunt.syncMode != SyncLocked: + debug "Now tracking chain head of peer", peer, bestBlock + elif number > sp.hunt.bestNumber: + if number == sp.hunt.bestNumber + 1: + debug "Peer chain head advanced one block", peer, advance=1, bestBlock else: - debug "Snap: Peer chain head advanced some blocks", peer=sp, - advance=(sp.bestBlockNumber - bestNumber), bestBlock - elif bestNumber < sp.bestBlockNumber or bestHash != sp.bestBlockHash: - debug "Snap: Peer chain head reorg detected", peer=sp, - advance=(sp.bestBlockNumber - bestNumber), bestBlock + debug "Peer chain head advanced some blocks", peer, + advance=(sp.hunt.bestNumber - number), bestBlock + elif number < sp.hunt.bestNumber or hash != sp.hunt.bestHash: + debug "Peer chain head reorg detected", peer, + advance=(sp.hunt.bestNumber - number), bestBlock -# proc peerSyncChainTrace(sp: SnapPeerEx) = +# proc peerSyncChainTrace(sp: SnapPeer) = # ## To be called after `peerSyncChainRequest` has updated state. -# case sp.syncMode: +# case sp.hunt.syncMode: # of SyncLocked: -# trace "Snap: SyncLocked", -# bestBlock=sp.bestBlockNumber, bestBlockHash=($sp.bestBlockHash) +# trace "SyncLocked", +# bestBlock = sp.ns.pp(sp.hunt.bestHash, sp.hunt.bestNumber) # of SyncOnlyHash: -# trace "Snap: OnlyHash", bestBlockHash=($sp.bestBlockHash) +# trace "OnlyHash", +# bestBlock = sp.ns.pp(sp.hunt.bestHash, sp.hunt.bestNumber) # of SyncHuntForward: # template highMax(n: BlockNumber): string = # if n == high(BlockNumber): "max" else: $n -# trace "Snap: HuntForward", -# low=sp.huntLow, high=highMax(sp.huntHigh), step=sp.huntStep +# trace "HuntForward", +# low=sp.hunt.lowNumber, high=highMax(sp.hunt.highNumber), +# step=sp.hunt.step # of SyncHuntBackward: -# trace "Snap: HuntBackward", -# low=sp.huntLow, high=sp.huntHigh, step=sp.huntStep +# trace "HuntBackward", +# low=sp.hunt.lowNumber, high=sp.hunt.highNumber, step=sp.hunt.step # of SyncHuntRange: -# trace "Snap: HuntRange", -# low=sp.huntLow, high=sp.huntHigh, step=sp.huntStep +# trace "HuntRange", +# low=sp.hunt.lowNumber, high=sp.hunt.highNumber, step=sp.hunt.step # of SyncHuntRangeFinal: -# trace "Snap: HuntRangeFinal", -# low=sp.huntLow, high=sp.huntHigh, step=1 +# trace "HuntRangeFinal", +# low=sp.hunt.lowNumber, high=sp.hunt.highNumber, step=1 # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc clearSyncStateRoot(sp: SnapPeerEx) = - if sp.syncStateRoot.isSome: - debug "Snap: Stopping state sync from this peer", peer=sp - sp.syncStateRoot = none(TrieHash) - -proc setSyncStateRoot(sp: SnapPeerEx, blockNumber: BlockNumber, - blockHash: BlockHash, stateRoot: TrieHash) = - let thisBlock = sp.ns.pp(blockHash,blockNumber) - if sp.syncStateRoot.isNone: - debug "Snap: Starting state sync from this peer", peer=sp, - thisBlock, stateRoot - elif sp.syncStateRoot.unsafeGet != stateRoot: - trace "Snap: Adjusting state sync root from this peer", peer=sp, - thisBlock, stateRoot - - sp.syncStateRoot = some(stateRoot) - - if not sp.startedFetch: - sp.startedFetch = true - trace "Snap: Starting to download block state", peer=sp, - thisBlock, stateRoot - asyncSpawn sp.stateFetch() - -proc setSyncLocked(sp: SnapPeerEx, bestNumber: BlockNumber, - bestHash: BlockHash) = +proc setSyncLocked(sp: SnapPeer, number: BlockNumber, hash: BlockHash) = ## Actions to take when peer canonical head is confirmed or updated. - sp.traceSyncLocked(bestNumber, bestHash) - sp.bestBlockNumber = bestNumber - sp.bestBlockHash = bestHash - sp.syncMode = SyncLocked + sp.traceSyncLocked(number, hash) + sp.hunt.bestNumber = number + sp.hunt.bestHash = hash + sp.hunt.syncMode = SyncLocked -proc setHuntBackward(sp: SnapPeerEx, lowestAbsent: BlockNumber) = +proc clearSyncStateRoot(sp: SnapPeer) = + if sp.ctrl.stateRoot.isSome: + debug "Stopping state sync from this peer", peer=sp + sp.ctrl.stateRoot = none(TrieHash) + +proc lockSyncStateRoot(sp: SnapPeer, number: BlockNumber, hash: BlockHash, + stateRoot: TrieHash) = + sp.setSyncLocked(number, hash) + + let thisBlock = sp.ns.pp(hash, number) + if sp.ctrl.stateRoot.isNone: + debug "Starting state sync from this peer", peer=sp, + thisBlock, stateRoot + elif sp.ctrl.stateRoot.unsafeGet != stateRoot: + trace "Adjusting state sync root from this peer", peer=sp, + thisBlock, stateRoot + + sp.ctrl.stateRoot = some(stateRoot) + + if sp.ctrl.runState != SyncRunningOK: + sp.ctrl.runState = SyncRunningOK + trace "Starting to download block state", peer=sp, + thisBlock, stateRoot + asyncSpawn sp.fetch() + +proc setHuntBackward(sp: SnapPeer, lowestAbsent: BlockNumber) = ## Start exponential search mode backward due to new uncertainty. - sp.syncMode = SyncHuntBackward - sp.huntStep = 0 + sp.hunt.syncMode = SyncHuntBackward + sp.hunt.step = 0 # Block zero is always present. - sp.huntLow = 0.toBlockNumber + sp.hunt.lowNumber = 0.toBlockNumber # Zero `lowestAbsent` is never correct, but an incorrect peer could send it. - sp.huntHigh = if lowestAbsent > 0: lowestAbsent else: 1.toBlockNumber + sp.hunt.highNumber = if lowestAbsent > 0: lowestAbsent else: 1.toBlockNumber sp.clearSyncStateRoot() -proc setHuntForward(sp: SnapPeerEx, highestPresent: BlockNumber) = +proc setHuntForward(sp: SnapPeer, highestPresent: BlockNumber) = ## Start exponential search mode forward due to new uncertainty. - sp.syncMode = SyncHuntForward - sp.huntStep = 0 - sp.huntLow = highestPresent - sp.huntHigh = high(BlockNumber) + sp.hunt.syncMode = SyncHuntForward + sp.hunt.step = 0 + sp.hunt.lowNumber = highestPresent + sp.hunt.highNumber = high(BlockNumber) sp.clearSyncStateRoot() -proc updateHuntAbsent(sp: SnapPeerEx, lowestAbsent: BlockNumber) = +proc updateHuntAbsent(sp: SnapPeer, lowestAbsent: BlockNumber) = ## Converge uncertainty range backward. - if lowestAbsent < sp.huntHigh: - sp.huntHigh = lowestAbsent + if lowestAbsent < sp.hunt.highNumber: + sp.hunt.highNumber = lowestAbsent # If uncertainty range has moved outside the search window, change to hunt # backward to block zero. Note that empty uncertainty range is allowed - # (empty range is `huntLow + 1 == huntHigh`). - if sp.huntHigh <= sp.huntLow: + # (empty range is `hunt.lowNumber + 1 == hunt.highNumber`). + if sp.hunt.highNumber <= sp.hunt.lowNumber: sp.setHuntBackward(lowestAbsent) sp.clearSyncStateRoot() -proc updateHuntPresent(sp: SnapPeerEx, highestPresent: BlockNumber) = +proc updateHuntPresent(sp: SnapPeer, highestPresent: BlockNumber) = ## Converge uncertainty range forward. - if highestPresent > sp.huntLow: - sp.huntLow = highestPresent + if highestPresent > sp.hunt.lowNumber: + sp.hunt.lowNumber = highestPresent # If uncertainty range has moved outside the search window, change to hunt # forward to no upper limit. Note that empty uncertainty range is allowed - # (empty range is `huntLow + 1 == huntHigh`). - if sp.huntLow >= sp.huntHigh: + # (empty range is `hunt.lowNumber + 1 == hunt.highNumber`). + if sp.hunt.lowNumber >= sp.hunt.highNumber: sp.setHuntForward(highestPresent) sp.clearSyncStateRoot() -proc peerSyncChainEmptyReply(sp: SnapPeerEx, request: BlocksRequest) = +proc peerSyncChainEmptyReply(sp: SnapPeer, request: BlocksRequest) = ## Handle empty `GetBlockHeaders` reply. This means `request.startBlock` is ## absent on the peer. If it was `SyncLocked` there must have been a reorg ## and the previous canonical chain head has disappeared. If hunting, this @@ -240,26 +251,25 @@ proc peerSyncChainEmptyReply(sp: SnapPeerEx, request: BlocksRequest) = # Treat empty response to a request starting from block 1 as equivalent to # length 1 starting from block 0 in `peerSyncChainNonEmptyReply`. We treat # every peer as if it would send genesis for block 0, without asking for it. - if request.skip == 0 and not request.reverse and + if request.skip == 0 and + not request.reverse and not request.startBlock.isHash and request.startBlock.number == 1.toBlockNumber: - sp.setSyncLocked(0.toBlockNumber, - sp.peer.network.chain.genesisHash.BlockHash) - sp.setSyncStateRoot(0.toBlockNumber, - sp.peer.network.chain.genesisHash.BlockHash, - sp.peer.network.chain.Chain.genesisStateRoot.TrieHash) + sp.lockSyncStateRoot(0.toBlockNumber, + sp.peer.network.chain.genesisHash.BlockHash, + sp.peer.network.chain.Chain.genesisStateRoot.TrieHash) return - if sp.syncMode == SyncLocked or sp.syncMode == SyncOnlyHash: + if sp.hunt.syncMode == SyncLocked or sp.hunt.syncMode == SyncOnlyHash: inc sp.stats.ok.reorgDetected - trace "Snap: Peer reorg detected, best block disappeared", peer=sp, + trace "Peer reorg detected, best block disappeared", peer=sp, startBlock=request.startBlock let lowestAbsent = request.startBlock.number - case sp.syncMode: + case sp.hunt.syncMode: of SyncLocked: # If this message doesn't change our knowledge, ignore it. - if lowestAbsent > sp.bestBlockNumber: + if lowestAbsent > sp.hunt.bestNumber: return # Due to a reorg, peer's canonical head has lower block number, outside # our tracking window. Sync lock is no longer valid. Switch to hunt @@ -275,13 +285,13 @@ proc peerSyncChainEmptyReply(sp: SnapPeerEx, request: BlocksRequest) = # Update best block number. It is invalid except when `SyncLocked`, but # still useful as a hint of what we knew recently, for example in displays. - if lowestAbsent <= sp.bestBlockNumber: - sp.bestBlockNumber = if lowestAbsent == 0.toBlockNumber: lowestAbsent + if lowestAbsent <= sp.hunt.bestNumber: + sp.hunt.bestNumber = if lowestAbsent == 0.toBlockNumber: lowestAbsent else: lowestAbsent - 1.toBlockNumber - sp.bestBlockHash = default(typeof(sp.bestBlockHash)) - sp.ns.seen(sp.bestBlockHash,sp.bestBlockNumber) + sp.hunt.bestHash = default(typeof(sp.hunt.bestHash)) + sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber) -proc peerSyncChainNonEmptyReply(sp: SnapPeerEx, request: BlocksRequest, +proc peerSyncChainNonEmptyReply(sp: SnapPeer, request: BlocksRequest, headers: openArray[BlockHeader]) = ## Handle non-empty `GetBlockHeaders` reply. This means `request.startBlock` ## is present on the peer and in its canonical chain (unless the request was @@ -302,10 +312,9 @@ proc peerSyncChainNonEmptyReply(sp: SnapPeerEx, request: BlocksRequest, if len < syncLockedMinimumReply and request.skip == 0 and not request.reverse and len.uint < request.maxResults: - let blockHash = headers[highestIndex].blockHash.BlockHash - sp.setSyncLocked(headers[highestIndex].blockNumber, blockHash) - sp.setSyncStateRoot(headers[highestIndex].blockNumber, blockHash, - headers[highestIndex].stateRoot.TrieHash) + sp.lockSyncStateRoot(headers[highestIndex].blockNumber, + headers[highestIndex].blockHash.BlockHash, + headers[highestIndex].stateRoot.TrieHash) return # Be careful, this number is from externally supplied data and arithmetic @@ -316,10 +325,10 @@ proc peerSyncChainNonEmptyReply(sp: SnapPeerEx, request: BlocksRequest, # tells us headers up to some number, but it doesn't tell us if there are # more after it in the peer's canonical chain. We have to request more # headers to find out. - case sp.syncMode: + case sp.hunt.syncMode: of SyncLocked: # If this message doesn't change our knowledge, ignore it. - if highestPresent <= sp.bestBlockNumber: + if highestPresent <= sp.hunt.bestNumber: return # Sync lock is no longer valid as we don't have confirmed canonical head. # Switch to hunt forward to find the new canonical head. @@ -333,44 +342,38 @@ proc peerSyncChainNonEmptyReply(sp: SnapPeerEx, request: BlocksRequest, # Update best block number. It is invalid except when `SyncLocked`, but # still useful as a hint of what we knew recently, for example in displays. - if highestPresent > sp.bestBlockNumber: - sp.bestBlockNumber = highestPresent - sp.bestBlockHash = headers[highestIndex].blockHash.BlockHash - sp.ns.seen(sp.bestBlockHash,sp.bestBlockNumber) + if highestPresent > sp.hunt.bestNumber: + sp.hunt.bestNumber = highestPresent + sp.hunt.bestHash = headers[highestIndex].blockHash.BlockHash + sp.ns.seen(sp.hunt.bestHash,sp.hunt.bestNumber) -proc peerSyncChainRequest(sp: SnapPeerEx, request: var BlocksRequest) = +proc peerSyncChainRequest(sp: SnapPeer): BlocksRequest = ## Choose `GetBlockHeaders` parameters when hunting or following the canonical ## chain of a peer. - request = BlocksRequest( - startBlock: HashOrNum(isHash: false), - skip: 0, - reverse: false - ) - - if sp.syncMode == SyncLocked: + if sp.hunt.syncMode == SyncLocked: # Stable and locked. This is just checking for changes including reorgs. - # `sp.bestBlockNumber` was recently the head of the peer's canonical + # `sp.hunt.bestNumber` was recently the head of the peer's canonical # chain. We must include this block number to detect when the canonical # chain gets shorter versus no change. - request.startBlock.number = - if sp.bestBlockNumber <= syncLockedQueryOverlap: + result.startBlock.number = + if sp.hunt.bestNumber <= syncLockedQueryOverlap: # Every peer should send genesis for block 0, so don't ask for it. # `peerSyncChainEmptyReply` has logic to handle this reply as if it # was for block 0. Aside from saving bytes, this is more robust if # some client doesn't do genesis reply correctly. 1.toBlockNumber else: - min(sp.bestBlockNumber - syncLockedQueryOverlap.toBlockNumber, + min(sp.hunt.bestNumber - syncLockedQueryOverlap.toBlockNumber, high(BlockNumber) - (syncLockedQuerySize - 1).toBlockNumber) - request.maxResults = syncLockedQuerySize + result.maxResults = syncLockedQuerySize return - if sp.syncMode == SyncOnlyHash: + if sp.hunt.syncMode == SyncOnlyHash: # We only have the hash of the recent head of the peer's canonical chain. # Like `SyncLocked`, query more than one item to detect when the # canonical chain gets shorter, no change or longer. - request.startBlock = HashOrNum(isHash: true, hash: sp.bestBlockHash.untie) - request.maxResults = syncLockedQuerySize + result.startBlock = sp.hunt.bestHash.toHashOrNum + result.maxResults = syncLockedQuerySize return # Searching for the peers's canonical head. An ascending query is always @@ -386,29 +389,30 @@ proc peerSyncChainRequest(sp: SnapPeerEx, request: var BlocksRequest) = # Guaranteeing O(log N) time convergence in all scenarios requires some # properties to be true in both exponential search (expanding) and # quasi-binary search (converging in a range). The most important is that - # the gap to `startBlock` after `huntLow` and also before `huntHigh` are - # proportional to the query step, where the query step is `huntStep` - # exponentially expanding each round, or `maxStep` approximately evenly - # distributed in the range. + # the gap to `startBlock` after `hunt.lowNumber` and also before + # `hunt.highNumber` are proportional to the query step, where the query step + # is `hunt.step` exponentially expanding each round, or `maxStep` + # approximately evenly distributed in the range. # - # `huntLow+1` must not be used consistently as the start, even with a large - # enough query step size, as that will sometimes take O(N) to converge in - # both the exponential and quasi-binary searches. (Ending at `huntHigh-1` - # is fine if `syncHuntQuerySize > 1`. This asymmetry is due to ascending - # queries (see earlier comment), and non-empty truncated query reply being - # proof of presence before the truncation point, but not proof of absence - # after it. A reply can be truncated just because the peer decides to.) + # `hunt.lowNumber+1` must not be used consistently as the start, even with a + # large enough query step size, as that will sometimes take O(N) to converge + # in both the exponential and quasi-binary searches. (Ending at + # `hunt.highNumber-1` is fine if `syncHuntQuerySize > 1`. This asymmetry is + # due to ascending queries (see earlier comment), and non-empty truncated + # query reply being proof of presence before the truncation point, but not + # proof of absence after it. A reply can be truncated just because the peer + # decides to.) # # The proportional gap requirement is why we divide by query size here, # instead of stretching to fit more strictly with `(range-1)/(size-1)`. const syncHuntFinalSize = max(2, syncHuntQuerySize) - var maxStep: typeof(request.skip) + var maxStep = 0u let fullRangeClamped = - if sp.huntHigh <= sp.huntLow: typeof(maxStep)(0) - else: min(high(typeof(maxStep)).toBlockNumber, - sp.huntHigh - sp.huntLow).truncate(typeof(maxStep)) - 1 + if sp.hunt.highNumber <= sp.hunt.lowNumber: 0u + else: min(high(uint).toBlockNumber, + sp.hunt.highNumber - sp.hunt.lowNumber).truncate(uint) - 1 if fullRangeClamped >= syncHuntFinalSize: # `SyncHuntRangeFinal` condition. maxStep = if syncHuntQuerySize == 1: @@ -420,28 +424,28 @@ proc peerSyncChainRequest(sp: SnapPeerEx, request: var BlocksRequest) = doAssert syncHuntFinalSize >= syncHuntQuerySize doAssert maxStep >= 1 # Ensured by the above assertion. - # Check for exponential search (expanding). Iterate `huntStep`. O(log N) - # requires `startBlock` to be offset from `huntLow`/`huntHigh`. - if sp.syncMode in {SyncHuntForward, SyncHuntBackward} and + # Check for exponential search (expanding). Iterate `hunt.step`. O(log N) + # requires `startBlock` to be offset from `hunt.lowNumber`/`hunt.highNumber`. + if sp.hunt.syncMode in {SyncHuntForward, SyncHuntBackward} and fullRangeClamped >= syncHuntFinalSize: - let forward = sp.syncMode == SyncHuntForward + let forward = sp.hunt.syncMode == SyncHuntForward let expandShift = if forward: syncHuntForwardExpandShift else: syncHuntBackwardExpandShift # Switches to range search when this condition is no longer true. - if sp.huntStep < maxStep shr expandShift: + if sp.hunt.step < maxStep shr expandShift: # The `if` above means the next line cannot overflow. - sp.huntStep = if sp.huntStep > 0: sp.huntStep shl expandShift else: 1 + sp.hunt.step = if sp.hunt.step > 0: sp.hunt.step shl expandShift else: 1 # Satisfy the O(log N) convergence conditions. - request.startBlock.number = - if forward: sp.huntLow + sp.huntStep.toBlockNumber - else: sp.huntHigh - (sp.huntStep * syncHuntQuerySize).toBlockNumber - request.maxResults = syncHuntQuerySize - request.skip = sp.huntStep - 1 + result.startBlock.number = + if forward: sp.hunt.lowNumber + sp.hunt.step.toBlockNumber + else: sp.hunt.highNumber - (sp.hunt.step * syncHuntQuerySize).toBlockNumber + result.maxResults = syncHuntQuerySize + result.skip = sp.hunt.step - 1 return # For tracing/display. - sp.huntStep = maxStep - sp.syncMode = SyncHuntRange + sp.hunt.step = maxStep + sp.hunt.syncMode = SyncHuntRange if maxStep > 0: # Quasi-binary search (converging in a range). O(log N) requires # `startBlock` to satisfy the constraints described above, with the @@ -451,9 +455,9 @@ proc peerSyncChainRequest(sp: SnapPeerEx, request: var BlocksRequest) = var offset = fullRangeClamped - maxStep * (syncHuntQuerySize-1) # Rounding must bias towards end to ensure `offset >= 1` after this. offset -= offset shr 1 - request.startBlock.number = sp.huntLow + offset.toBlockNumber - request.maxResults = syncHuntQuerySize - request.skip = maxStep - 1 + result.startBlock.number = sp.hunt.lowNumber + offset.toBlockNumber + result.maxResults = syncHuntQuerySize + result.skip = maxStep - 1 else: # Small range, final step. At `fullRange == 0` we must query at least one # block before and after the range to confirm the canonical head boundary, @@ -469,18 +473,18 @@ proc peerSyncChainRequest(sp: SnapPeerEx, request: var BlocksRequest) = before = max(before + afterSoftMax, extra) - afterSoftMax before = min(before, beforeHardMax) # See `SyncLocked` case. - request.startBlock.number = - if sp.bestBlockNumber <= before.toBlockNumber: 1.toBlockNumber - else: min(sp.bestBlockNumber - before.toBlockNumber, + result.startBlock.number = + if sp.hunt.bestNumber <= before.toBlockNumber: 1.toBlockNumber + else: min(sp.hunt.bestNumber - before.toBlockNumber, high(BlockNumber) - (syncHuntFinalSize - 1).toBlockNumber) - request.maxResults = syncHuntFinalSize - sp.syncMode = SyncHuntRangeFinal + result.maxResults = syncHuntFinalSize + sp.hunt.syncMode = SyncHuntRangeFinal # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ -proc peerHuntCanonical*(sp: SnapPeerEx) {.async.} = +proc collectBlockHeaders*(sp: SnapPeer) {.async.} = ## Query a peer to update our knowledge of its canonical chain and its best ## block, which is its canonical chain head. This can be called at any time ## after a peer has negotiated the connection. @@ -492,52 +496,41 @@ proc peerHuntCanonical*(sp: SnapPeerEx) {.async.} = ## All replies to this query are part of the peer's canonical chain at the ## time the peer sends them. - # If we send multiple `GetBlockHeaders` requests, the replies can be out of - # order, and prior to eth/66 there is no request-id. We'll avoid this - # problem by never sending overlapping `GetBlockHeaders` to the same peer. - if sp.pendingGetBlockHeaders: - #trace ">| Blocked overlapping eth.GetBlockHeaders (0x03)", peer=sp - await sleepAsync(chronos.milliseconds(500)) - return - sp.pendingGetBlockHeaders = true + let request = sp.peerSyncChainRequest - var request {.noinit.}: BlocksRequest - sp.peerSyncChainRequest(request) - - traceSendSending "GetBlockHeaders", peer=sp, count=request.maxResults, + trace trEthSendSending & "GetBlockHeaders", peer=sp, + count=request.maxResults, startBlock=sp.ns.pp(request.startBlock), step=request.traceStep inc sp.stats.ok.getBlockHeaders - var reply: typeof await sp.peer.getBlockHeaders(request) + var reply: Option[protocol.blockHeadersObj] try: reply = await sp.peer.getBlockHeaders(request) except CatchableError as e: - traceRecvError "waiting for reply to GetBlockHeaders", - peer=sp, error=e.msg + trace trEthRecvError & "waiting for reply to GetBlockHeaders", peer=sp, + error=e.msg inc sp.stats.major.networkErrors - sp.stopped = true + sp.ctrl.runState = SyncStopped return if reply.isNone: - traceRecvTimeoutWaiting "for reply to GetBlockHeaders", peer=sp + trace trEthRecvTimeoutWaiting & "for reply to GetBlockHeaders", peer=sp # TODO: Should disconnect? inc sp.stats.minor.timeoutBlockHeaders return let nHeaders = reply.get.headers.len if nHeaders == 0: - traceRecvGot "EMPTY reply BlockHeaders", peer=sp, got=0, - requested=request.maxResults + trace trEthRecvGot & "EMPTY reply BlockHeaders", peer=sp, + got=0, requested=request.maxResults else: - traceRecvGot "reply BlockHeaders", peer=sp, got=nHeaders, - requested=request.maxResults, + trace trEthRecvGot & "reply BlockHeaders", peer=sp, + got=nHeaders, requested=request.maxResults, firstBlock=reply.get.headers[0].blockNumber, lastBlock=reply.get.headers[^1].blockNumber - sp.pendingGetBlockHeaders = false - if request.maxResults.int < nHeaders: - traceRecvProtocolViolation "excess headers in BlockHeaders", + trace trEthRecvProtocolViolation & "excess headers in BlockHeaders", peer=sp, got=nHeaders, requested=request.maxResults # TODO: Should disconnect. inc sp.stats.major.excessBlockHeaders @@ -549,6 +542,9 @@ proc peerHuntCanonical*(sp: SnapPeerEx) {.async.} = else: sp.peerSyncChainEmptyReply(request) +proc collectDataSetup*(sp: SnapPeer) = + sp.replyDataSetup + # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/pie/common.nim b/nimbus/sync/snap/peer/common.nim similarity index 60% rename from nimbus/sync/snap/pie/common.nim rename to nimbus/sync/snap/peer/common.nim index a1bc85fff..0a5223e18 100644 --- a/nimbus/sync/snap/pie/common.nim +++ b/nimbus/sync/snap/peer/common.nim @@ -15,31 +15,34 @@ import chronicles, eth/[common/eth_types, p2p], stint, - ../path_desc, - "."/[peer_desc, sync_desc] + ".."/[base_desc, path_desc], + ./sync_fetch_xdesc {.push raises: [Defect].} -proc hasSlice*(sp: SnapPeerEx): bool = - ## Return `true` iff `getSlice` would return a free slice to work on. - if sp.nsx.sharedFetch.isNil: - sp.nsx.sharedFetch = SharedFetchState.new - result = 0 < sp.nsx.sharedFetch.leafRanges.len - trace "Snap: hasSlice", peer=sp, hasSlice=result +logScope: + topics = "snap peer common" -proc getSlice*(sp: SnapPeerEx, leafLow, leafHigh: var LeafPath): bool = +proc hasSlice*(sp: SnapPeer): bool = + ## Return `true` iff `getSlice` would return a free slice to work on. + if sp.ns.sharedFetchEx.isNil: + sp.ns.sharedFetchEx = SnapSyncFetchEx.new + result = 0 < sp.ns.sharedFetchEx.leafRanges.len + trace "hasSlice", peer=sp, hasSlice=result + +proc getSlice*(sp: SnapPeer, leafLow, leafHigh: var LeafPath): bool = ## Claim a free slice to work on. If a slice was available, it's claimed, ## `leadLow` and `leafHigh` are set to the slice range and `true` is ## returned. Otherwise `false` is returned. - if sp.nsx.sharedFetch.isNil: - sp.nsx.sharedFetch = SharedFetchState.new - let sharedFetch = sp.nsx.sharedFetch + if sp.ns.sharedFetchEx.isNil: + sp.ns.sharedFetchEx = SnapSyncFetchEx.new + let sharedFetch = sp.ns.sharedFetchEx template ranges: auto = sharedFetch.leafRanges const leafMaxFetchRange = (high(LeafPath) - low(LeafPath)) div 1000 if ranges.len == 0: - trace "Snap: getSlice", leafRange="none" + trace "GetSlice", leafRange="none" return false leafLow = ranges[0].leafLow if ranges[0].leafHigh - ranges[0].leafLow <= leafMaxFetchRange: @@ -48,16 +51,16 @@ proc getSlice*(sp: SnapPeerEx, leafLow, leafHigh: var LeafPath): bool = else: leafHigh = leafLow + leafMaxFetchRange ranges[0].leafLow = leafHigh + 1 - trace "Snap: getSlice", peer=sp, leafRange=pathRange(leafLow, leafHigh) + trace "GetSlice", peer=sp, leafRange=pathRange(leafLow, leafHigh) return true -proc putSlice*(sp: SnapPeerEx, leafLow, leafHigh: LeafPath) = +proc putSlice*(sp: SnapPeer, leafLow, leafHigh: LeafPath) = ## Return a slice to the free list, merging with the rest of the list. - let sharedFetch = sp.nsx.sharedFetch + let sharedFetch = sp.ns.sharedFetchEx template ranges: auto = sharedFetch.leafRanges - trace "Snap: putSlice", leafRange=pathRange(leafLow, leafHigh), peer=sp + trace "PutSlice", leafRange=pathRange(leafLow, leafHigh), peer=sp var i = 0 while i < ranges.len and leafLow > ranges[i].leafHigh: inc i @@ -79,25 +82,25 @@ proc putSlice*(sp: SnapPeerEx, leafLow, leafHigh: LeafPath) = if leafHigh > ranges[i].leafHigh: ranges[i].leafHigh = leafHigh -template getSlice*(sp: SnapPeerEx, leafRange: var LeafRange): bool = +template getSlice*(sp: SnapPeer, leafRange: var LeafRange): bool = sp.getSlice(leafRange.leafLow, leafRange.leafHigh) -template putSlice*(sp: SnapPeerEx, leafRange: LeafRange) = +template putSlice*(sp: SnapPeer, leafRange: LeafRange) = sp.putSlice(leafRange.leafLow, leafRange.leafHigh) -proc countSlice*(sp: SnapPeerEx, leafLow, leafHigh: LeafPath, which: bool) = +proc countSlice*(sp: SnapPeer, leafLow, leafHigh: LeafPath, which: bool) = doAssert leafLow <= leafHigh - sp.nsx.sharedFetch.countRange += leafHigh - leafLow + 1 - sp.nsx.sharedFetch.countRangeStarted = true + sp.ns.sharedFetchEx.countRange += leafHigh - leafLow + 1 + sp.ns.sharedFetchEx.countRangeStarted = true if which: - sp.nsx.sharedFetch.countRangeSnap += leafHigh - leafLow + 1 - sp.nsx.sharedFetch.countRangeSnapStarted = true + sp.ns.sharedFetchEx.countRangeSnap += leafHigh - leafLow + 1 + sp.ns.sharedFetchEx.countRangeSnapStarted = true else: - sp.nsx.sharedFetch.countRangeTrie += leafHigh - leafLow + 1 - sp.nsx.sharedFetch.countRangeTrieStarted = true + sp.ns.sharedFetchEx.countRangeTrie += leafHigh - leafLow + 1 + sp.ns.sharedFetchEx.countRangeTrieStarted = true -template countSlice*(sp: SnapPeerEx, leafRange: LeafRange, which: bool) = +template countSlice*(sp: SnapPeer, leafRange: LeafRange, which: bool) = sp.countSlice(leafRange.leafLow, leafRange.leafHigh, which) -proc countAccounts*(sp: SnapPeerEx, len: int) = - sp.nsx.sharedFetch.countAccounts += len +proc countAccounts*(sp: SnapPeer, len: int) = + sp.ns.sharedFetchEx.countAccounts += len diff --git a/nimbus/sync/snap/pie/slicer.nim b/nimbus/sync/snap/peer/fetch.nim similarity index 51% rename from nimbus/sync/snap/pie/slicer.nim rename to nimbus/sync/snap/peer/fetch.nim index 1048aba4a..9ad7bedc5 100644 --- a/nimbus/sync/snap/pie/slicer.nim +++ b/nimbus/sync/snap/peer/fetch.nim @@ -9,51 +9,55 @@ # at your option. This file may not be copied, modified, or distributed # except according to those terms. -{.push raises: [Defect].} - import std/[sets, random], chronos, nimcrypto/keccak, stint, eth/[common/eth_types, p2p], - ".."/[path_desc, base_desc, types], - "."/[common, fetch_trie, fetch_snap, peer_desc] + ../../types, + ".."/[path_desc, base_desc], + "."/[common, fetch_trie, fetch_snap] + +{.push raises: [Defect].} + +logScope: + topics = "snap peer fetch" # Note: To test disabling snap (or trie), modify `peerSupportsGetNodeData` or -# `peerSupportsSnap` where those are defined. +# `fetchSnapOk` where those are defined. -proc stateFetch*(sp: SnapPeerEx) {.async.} = - var stateRoot = sp.syncStateRoot.get - trace "Snap: Syncing from stateRoot", peer=sp, stateRoot +proc fetch*(sp: SnapPeer) {.async.} = + var stateRoot = sp.ctrl.stateRoot.get + trace "Syncing from stateRoot", peer=sp, stateRoot while true: - if not sp.peerSupportsGetNodeData() and not sp.peerSupportsSnap(): - trace "Snap: Cannot sync more from this peer", peer=sp + if not sp.fetchTrieOk and not sp.fetchSnapOk: + trace "No more sync available from this peer", peer=sp return if not sp.hasSlice(): - trace "Snap: Nothing more to sync from this peer", peer=sp + trace "Nothing more to sync from this peer", peer=sp while not sp.hasSlice(): await sleepAsync(5.seconds) # TODO: Use an event trigger instead. - if sp.syncStateRoot.isNone: - trace "Snap: No current state root for this peer", peer=sp - while sp.syncStateRoot.isNone and - (sp.peerSupportsGetNodeData() or sp.peerSupportsSnap()) and + if sp.ctrl.stateRoot.isNone: + trace "No current state root for this peer", peer=sp + while sp.ctrl.stateRoot.isNone and + (sp.fetchTrieOk or sp.fetchSnapOk) and sp.hasSlice(): await sleepAsync(5.seconds) # TODO: Use an event trigger instead. continue - if stateRoot != sp.syncStateRoot.get: - trace "Snap: Syncing from new stateRoot", peer=sp, stateRoot - stateRoot = sp.syncStateRoot.get - sp.stopThisState = false + if stateRoot != sp.ctrl.stateRoot.get: + trace "Syncing from new stateRoot", peer=sp, stateRoot + stateRoot = sp.ctrl.stateRoot.get + sp.ctrl.runState = SyncRunningOK - if sp.stopThisState: - trace "Snap: Pausing sync until we get a new state root", peer=sp - while sp.syncStateRoot.isSome and stateRoot == sp.syncStateRoot.get and - (sp.peerSupportsGetNodeData() or sp.peerSupportsSnap()) and + if sp.ctrl.runState == SyncStopRequest: + trace "Pausing sync until we get a new state root", peer=sp + while sp.ctrl.stateRoot.isSome and stateRoot == sp.ctrl.stateRoot.get and + (sp.fetchTrieOk or sp.fetchSnapOk) and sp.hasSlice(): await sleepAsync(5.seconds) # TODO: Use an event trigger instead. continue @@ -63,17 +67,18 @@ proc stateFetch*(sp: SnapPeerEx) {.async.} = # Mix up different slice modes, because when connecting to static nodes one # mode or the other tends to dominate, which makes the mix harder to test. var allowSnap = true - if sp.peerSupportsSnap() and sp.peerSupportsGetNodeData(): + if sp.fetchSnapOk and sp.fetchTrieOk: if rand(99) < 50: allowSnap = false - if sp.peerSupportsSnap() and allowSnap: + if sp.fetchSnapOk and allowSnap: discard sp.getSlice(leafRange) - trace "Snap: snap.GetAccountRange segment", peer=sp, + trace "GetAccountRange segment", peer=sp, leafRange=pathRange(leafRange.leafLow, leafRange.leafHigh), stateRoot - await sp.snapFetch(stateRoot, leafRange) - elif sp.peerSupportsGetNodeData(): + await sp.fetchSnap(stateRoot, leafRange) + + elif sp.fetchTrieOk: discard sp.getSlice(leafRange) - trace "Snap: eth.GetNodeData segment", peer=sp, + trace "GetNodeData segment", peer=sp, leafRange=pathRange(leafRange.leafLow, leafRange.leafHigh), stateRoot - await sp.trieFetch(stateRoot, leafRange) + await sp.fetchTrie(stateRoot, leafRange) diff --git a/nimbus/sync/snap/pie/fetch_snap.nim b/nimbus/sync/snap/peer/fetch_snap.nim similarity index 73% rename from nimbus/sync/snap/pie/fetch_snap.nim rename to nimbus/sync/snap/peer/fetch_snap.nim index 4ff3c6093..13c137b53 100644 --- a/nimbus/sync/snap/pie/fetch_snap.nim +++ b/nimbus/sync/snap/peer/fetch_snap.nim @@ -19,38 +19,42 @@ ## different related tries (blocks at different times) together in a way that ## eventually becomes a full trie for a single block. -{.push raises: [Defect].} - import std/sets, chronos, eth/[common/eth_types, p2p], nimcrypto/keccak, - stint, - "../.."/[protocol, protocol/pickeled_snap_tracers, trace_helper], - ".."/[base_desc, path_desc, types], - "."/[common, peer_desc] + #stint, + "../.."/[protocol, types], + ".."/[base_desc, path_desc], + ./common + +{.push raises: [Defect].} + +logScope: + topics = "snap peer fetch" const snapRequestBytesLimit = 2 * 1024 * 1024 ## Soft bytes limit to request in `snap` protocol calls. -proc snapFetch*(sp: SnapPeerEx, stateRoot: TrieHash, leafRange: LeafRange) +proc fetchSnap*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange) {.async.} = + ## Fetch data using the `snap#` protocol var origin = leafRange.leafLow var limit = leafRange.leafHigh const responseBytes = 2 * 1024 * 1024 - if sp.stopped: - traceRecvError "peer already disconnected, not sending GetAccountRange", + if sp.ctrl.runState == SyncStopped: + trace trSnapRecvError & + "peer already disconnected, not sending GetAccountRange", peer=sp, accountRange=pathRange(origin, limit), - stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit + stateRoot, bytesLimit=snapRequestBytesLimit sp.putSlice(leafRange) - if tracePackets: - traceSendSending "GetAccountRange", - accountRange=pathRange(origin, limit), - stateRoot=($stateRoot), bytesLimit=snapRequestBytesLimit, peer=sp + trace trSnapSendSending & "GetAccountRange", peer=sp, + accountRange=pathRange(origin, limit), + stateRoot, bytesLimit=snapRequestBytesLimit var reply: Option[accountRangeObj] @@ -58,16 +62,15 @@ proc snapFetch*(sp: SnapPeerEx, stateRoot: TrieHash, leafRange: LeafRange) reply = await sp.peer.getAccountRange( stateRoot.untie, origin, limit, snapRequestBytesLimit) except CatchableError as e: - traceRecvError "waiting for reply to GetAccountRange", - peer=sp, error=e.msg + trace trSnapRecvError & "waiting for reply to GetAccountRange", peer=sp, + error=e.msg inc sp.stats.major.networkErrors - sp.stopped = true + sp.ctrl.runState = SyncStopped sp.putSlice(leafRange) return if reply.isNone: - traceRecvTimeoutWaiting "for reply to GetAccountRange", - peer=sp + trace trSnapRecvTimeoutWaiting & "for reply to GetAccountRange", peer=sp sp.putSlice(leafRange) return @@ -82,6 +85,7 @@ proc snapFetch*(sp: SnapPeerEx, stateRoot: TrieHash, leafRange: LeafRange) template proof: auto = accountsAndProof.proof let len = accounts.len + let requestedRange = pathRange(origin, limit) if len == 0: # If there's no proof, this reply means the peer has no accounts available # in the range for this query. But if there's a proof, this reply means @@ -89,32 +93,31 @@ proc snapFetch*(sp: SnapPeerEx, stateRoot: TrieHash, leafRange: LeafRange) # This makes all the difference to terminating the fetch. For now we'll # trust the mere existence of the proof rather than verifying it. if proof.len == 0: - traceRecvGot "EMPTY reply AccountRange", peer=sp, - got=len, proofLen=proof.len, gotRange="-", - requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) + trace trSnapRecvGot & "EMPTY reply AccountRange", peer=sp, + got=len, proofLen=proof.len, gotRange="-", requestedRange, stateRoot sp.putSlice(leafRange) # Don't keep retrying snap for this state. - sp.stopThisState = true + sp.ctrl.runState = SyncStopRequest else: - traceRecvGot "END reply AccountRange", peer=sp, + trace trSnapRecvGot & "END reply AccountRange", peer=sp, got=len, proofLen=proof.len, gotRange=pathRange(origin, high(LeafPath)), - requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) + requestedRange, stateRoot # Current slicer can't accept more result data than was requested, so # just leave the requested slice claimed and update statistics. sp.countSlice(origin, limit, true) return var lastPath = accounts[len-1].accHash - traceRecvGot "reply AccountRange", peer=sp, + trace trSnapRecvGot & "reply AccountRange", peer=sp, got=len, proofLen=proof.len, gotRange=pathRange(origin, lastPath), - requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) + requestedRange, stateRoot # Missing proof isn't allowed, unless `origin` is min path in which case # there might be no proof if the result spans the entire range. if proof.len == 0 and origin != low(LeafPath): - traceRecvProtocolViolation "missing proof in AccountRange", + trace trSnapRecvProtocolViolation & "missing proof in AccountRange", peer=sp, got=len, proofLen=proof.len, gotRange=pathRange(origin,lastPath), - requestedRange=pathRange(origin, limit), stateRoot=($stateRoot) + requestedRange, stateRoot sp.putSlice(leafRange) return @@ -134,5 +137,7 @@ proc snapFetch*(sp: SnapPeerEx, stateRoot: TrieHash, leafRange: LeafRange) sp.countAccounts(keepAccounts) -proc peerSupportsSnap*(sp: SnapPeerEx): bool = - not sp.stopped and sp.peer.supports(snap) +proc fetchSnapOk*(sp: SnapPeer): bool = + ## Sort of getter: if `true`, fetching data using the `snap#` protocol + ## is supported. + sp.ctrl.runState != SyncStopped and sp.peer.supports(snap) diff --git a/nimbus/sync/snap/pie/fetch_trie.nim b/nimbus/sync/snap/peer/fetch_trie.nim similarity index 73% rename from nimbus/sync/snap/pie/fetch_trie.nim rename to nimbus/sync/snap/peer/fetch_trie.nim index 06ef4bf90..f9f5cf489 100644 --- a/nimbus/sync/snap/pie/fetch_trie.nim +++ b/nimbus/sync/snap/peer/fetch_trie.nim @@ -26,12 +26,15 @@ import std/[sets, tables, algorithm], chronos, eth/[common/eth_types, p2p], - ../../trace_helper, - ".."/[base_desc, get_nodedata, path_desc, types, validate_trienode], - "."/[common, peer_desc, sync_desc] + "../.."/[protocol/trace_config, types], + ".."/[base_desc, path_desc], + "."/[common, reply_data, sync_fetch_xdesc, validate_trienode] {.push raises: [Defect].} +logScope: + topics = "snap peer fetch" + const maxBatchGetNodeData = 384 ## Maximum number of node hashes to batch per `GetNodeData` request. @@ -40,61 +43,69 @@ const ## Maximum number of `GetNodeData` requests in parallel to a single peer. type - SingleNodeRequestEx = ref object of SingleNodeRequestBase - hash: NodeHash - path: InteriorPath - future: Future[Blob] + SingleNodeRequest = ref object + hash: NodeHash + path: InteriorPath + future: Future[Blob] -proc hash(n: SingleNodeRequestBase): NodeHash = - n.SingleNodeRequestEx.hash + FetchStateEx = ref object of SnapPeerFetchBase + ## Account fetching state on a single peer. + sp: SnapPeer + nodeGetQueue: seq[SingleNodeRequest] + nodeGetsInFlight: int + scheduledBatch: bool + progressPrefix: string + progressCount: int + nodesInFlight: int + getNodeDataErrors: int + leafRange: LeafRange + unwindAccounts: int64 + unwindAccountBytes: int64 + finish: Future[void] -proc path(n: SingleNodeRequestBase): InteriorPath = - n.SingleNodeRequestEx.path +proc fetchStateEx(sp: SnapPeer): FetchStateEx = + sp.fetchState.FetchStateEx -proc future(n: SingleNodeRequestBase): Future[Blob] = - n.SingleNodeRequestEx.future +proc `fetchStateEx=`(sp: SnapPeer; value: FetchStateEx) = + sp.fetchState = value +proc new(T: type FetchStateEx; peer: SnapPeer): T = + FetchStateEx(sp: peer) # Forward declaration. -proc scheduleBatchGetNodeData(fetch: FetchState) {.gcsafe.} +proc scheduleBatchGetNodeData(fetch: FetchStateEx) {.gcsafe.} -# --- +# ------------------------------------------------------------------------------ +# Private functions +# ------------------------------------------------------------------------------ -proc wrapCallGetNodeData(fetch: FetchState, hashes: seq[NodeHash], +proc wrapCallGetNodeData(fetch: FetchStateEx, hashes: seq[NodeHash], futures: seq[Future[Blob]], pathFrom, pathTo: InteriorPath) {.async.} = inc fetch.nodeGetsInFlight - let reply = await fetch.sp.getNodeData(hashes, pathFrom, pathTo) + let reply = await ReplyData.new(fetch.sp, hashes, pathFrom, pathTo) # Timeout, packet and packet error trace messages are done in `get_nodedata`, # where there is more context than here. Here we always received just valid # data with hashes already verified, or empty list of `nil`. - if reply.isNil: - # Timeout or error. - fetch.sp.stopThisState = true - for i in 0 ..< futures.len: - futures[i].complete(@[]) - elif reply.hashVerifiedData.len == 0: - # Empty reply, matched to request. - # It means there are none of the nodes available, but it's not an error. - fetch.sp.stopThisState = true + if reply.replyType == NoReplyData: + # Empty reply, timeout or error (i.e. `reply.isNil`). + # It means there are none of the nodes available. + fetch.sp.ctrl.runState = SyncStopRequest for i in 0 ..< futures.len: futures[i].complete(@[]) + else: # Non-empty reply. for i in 0 ..< futures.len: - let index = reply.reverseMap(i) - if index >= 0: - futures[i].complete(reply.hashVerifiedData[index]) - else: - futures[i].complete(@[]) + futures[i].complete(reply[i]) dec fetch.nodeGetsInFlight # Receiving a reply may allow more requests to be sent. if fetch.nodeGetQueue.len > 0 and not fetch.scheduledBatch: fetch.scheduleBatchGetNodeData() -proc batchGetNodeData(fetch: FetchState) = +proc batchGetNodeData(fetch: FetchStateEx) = var i = fetch.nodeGetQueue.len if i == 0 or fetch.nodeGetsInFlight >= maxParallelGetNodeData: return @@ -140,7 +151,7 @@ proc batchGetNodeData(fetch: FetchState) = # internally (like SQLite by default), the left-to-right write order will # improve read performance when other peers sync reading this local node. - proc cmpSingleNodeRequest(x, y: SingleNodeRequestBase): int = + proc cmpSingleNodeRequest(x, y: SingleNodeRequest): int = # `x` and `y` are deliberately swapped to get descending order. See above. cmp(y.path, x.path) sort(fetch.nodeGetQueue, cmpSingleNodeRequest) @@ -148,7 +159,7 @@ proc batchGetNodeData(fetch: FetchState) = trace "Trie: Sort length", sortLen=i # If stopped, abort all waiting nodes, so they clean up. - if fetch.sp.stopThisState or fetch.sp.stopped: + if fetch.sp.ctrl.runState != SyncRunningOk: while i > 0: fetch.nodeGetQueue[i].future.complete(@[]) dec i @@ -177,26 +188,27 @@ proc batchGetNodeData(fetch: FetchState) = futures.setLen(0) fetch.nodeGetQueue.setLen(i) -proc scheduleBatchGetNodeData(fetch: FetchState) = +proc scheduleBatchGetNodeData(fetch: FetchStateEx) = if not fetch.scheduledBatch: fetch.scheduledBatch = true proc batchGetNodeData(arg: pointer) = - let fetch = cast[FetchState](arg) + let fetch = cast[FetchStateEx](arg) fetch.scheduledBatch = false fetch.batchGetNodeData() # We rely on `callSoon` scheduling for the _end_ of the current run list, # after other async functions finish adding more single node requests. callSoon(batchGetNodeData, cast[pointer](fetch)) -proc getNodeData(fetch: FetchState, +proc getNodeData(fetch: FetchStateEx, hash: TrieHash, path: InteriorPath): Future[Blob] {.async.} = ## Request _one_ item of trie node data asynchronously. This function ## batches requested into larger `eth.GetNodeData` requests efficiently. - traceIndividualNode "> Fetching individual NodeData", peer=fetch.sp, - depth=path.depth, path, hash=($hash) + when trEthTraceIndividualNodesOk: + trace "> Fetching individual NodeData", peer=fetch.sp, + depth=path.depth, path, hash=($hash) let future = newFuture[Blob]() - fetch.nodeGetQueue.add SingleNodeRequestEx( + fetch.nodeGetQueue.add SingleNodeRequest( hash: hash.NodeHash, path: path, future: future) @@ -205,23 +217,24 @@ proc getNodeData(fetch: FetchState, fetch.scheduleBatchGetNodeData() let nodeBytes = await future - if fetch.sp.stopThisState or fetch.sp.stopped: + if fetch.sp.ctrl.runState != SyncRunningOk: return nodebytes - if tracePackets: + when trEthTracePacketsOk: doAssert nodeBytes.len == 0 or nodeBytes.toNodeHash == hash - if nodeBytes.len > 0: - traceIndividualNode "< Received individual NodeData", peer=fetch.sp, - depth=path.depth, path, hash=($hash), - nodeLen=nodeBytes.len, nodeBytes - else: - traceIndividualNode "< Received EMPTY individual NodeData", peer=fetch.sp, - depth=path.depth, path, hash, - nodeLen=nodeBytes.len + when trEthTraceIndividualNodesOk: + if nodeBytes.len > 0: + trace "< Received individual NodeData", peer=fetch.sp, + depth=path.depth, path, hash=($hash), + nodeLen=nodeBytes.len, nodeBytes + else: + trace "< Received EMPTY individual NodeData", peer=fetch.sp, + depth=path.depth, path, hash, + nodeLen=nodeBytes.len return nodeBytes -proc pathInRange(fetch: FetchState, path: InteriorPath): bool = +proc pathInRange(fetch: FetchStateEx, path: InteriorPath): bool = # TODO: This method is ugly and unnecessarily slow. var compare = fetch.leafRange.leafLow.toInteriorPath while compare.depth > path.depth: @@ -235,23 +248,23 @@ proc pathInRange(fetch: FetchState, path: InteriorPath): bool = return false return true -proc traverse(fetch: FetchState, hash: NodeHash, path: InteriorPath, +proc traverse(fetch: FetchStateEx, hash: NodeHash, path: InteriorPath, fromExtension: bool) {.async.} = template errorReturn() = - fetch.sp.stopThisState = true + fetch.sp.ctrl.runState = SyncStopRequest dec fetch.nodesInFlight if fetch.nodesInFlight == 0: fetch.finish.complete() return # If something triggered stop earlier, don't request, and clean up now. - if fetch.sp.stopThisState or fetch.sp.stopped: + if fetch.sp.ctrl.runState != SyncRunningOk: errorReturn() let nodeBytes = await fetch.getNodeData(hash.TrieHash, path) # If something triggered stop, clean up now. - if fetch.sp.stopThisState or fetch.sp.stopped: + if fetch.sp.ctrl.runState != SyncRunningOk: errorReturn() # Don't keep emitting error messages after one error. We'll allow 10. if fetch.getNodeDataErrors >= 10: @@ -296,14 +309,14 @@ proc traverse(fetch: FetchState, hash: NodeHash, path: InteriorPath, template leafBytes: auto = leafPtr[2] inc fetch.unwindAccounts fetch.unwindAccountBytes += leafBytes.len - inc fetch.sp.nsx.sharedFetch.countAccounts - fetch.sp.nsx.sharedFetch.countAccountBytes += leafBytes.len + inc fetch.sp.ns.sharedFetchEx.countAccounts + fetch.sp.ns.sharedFetchEx.countAccountBytes += leafBytes.len dec fetch.nodesInFlight if fetch.nodesInFlight == 0: fetch.finish.complete() -proc probeGetNodeData(sp: SnapPeerEx, stateRoot: TrieHash): Future[bool] +proc probeGetNodeData(sp: SnapPeer, stateRoot: TrieHash): Future[bool] {.async.} = # Before doing real trie traversal on this peer, send a probe request for # `stateRoot` to see if it's worth pursuing at all. We will avoid reserving @@ -323,15 +336,19 @@ proc probeGetNodeData(sp: SnapPeerEx, stateRoot: TrieHash): Future[bool] # send an empty reply. We don't want to cut off a peer for other purposes # such as a source of blocks and transactions, just because it doesn't # reply to `GetNodeData`. - let reply = await sp.getNodeData( - @[stateRoot.NodeHash], InteriorPath(), InteriorPath()) - return not reply.isNil and reply.hashVerifiedData.len == 1 + let reply = await ReplyData.new(sp, @[stateRoot.NodeHash]) + return reply.replyType == SingleEntryReply -proc trieFetch*(sp: SnapPeerEx, stateRoot: TrieHash, - leafRange: LeafRange) {.async.} = - if sp.fetchState.isNil: - sp.fetchState = FetchState(sp: sp) - template fetch: auto = sp.fetchState +# ------------------------------------------------------------------------------ +# Public functions +# ------------------------------------------------------------------------------ + +proc fetchTrie*(sp: SnapPeer, stateRoot: TrieHash, leafRange: LeafRange) + {.async.} = + if sp.fetchStateEx.isNil: + sp.fetchStateEx = FetchStateEx.new(sp) + + let fetch = sp.fetchStateEx fetch.leafRange = leafRange fetch.finish = newFuture[void]() @@ -344,10 +361,14 @@ proc trieFetch*(sp: SnapPeerEx, stateRoot: TrieHash, if fetch.getNodeDataErrors == 0: sp.countSlice(leafRange, false) else: - sp.nsx.sharedFetch.countAccounts -= fetch.unwindAccounts - sp.nsx.sharedFetch.countAccountBytes -= fetch.unwindAccountBytes + sp.ns.sharedFetchEx.countAccounts -= fetch.unwindAccounts + sp.ns.sharedFetchEx.countAccountBytes -= fetch.unwindAccountBytes sp.putSlice(leafRange) -proc peerSupportsGetNodeData*(sp: SnapPeerEx): bool = - template fetch(sp): FetchState = sp.fetchState - not sp.stopped and (sp.fetch.isNil or sp.fetch.getNodeDataErrors == 0) +proc fetchTrieOk*(sp: SnapPeer): bool = + sp.ctrl.runState != SyncStopped and + (sp.fetchStateEx.isNil or sp.fetchStateEx.getNodeDataErrors == 0) + +# ------------------------------------------------------------------------------ +# End +# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/get_nodedata.nim b/nimbus/sync/snap/peer/reply_data.nim similarity index 68% rename from nimbus/sync/snap/get_nodedata.nim rename to nimbus/sync/snap/peer/reply_data.nim index a9f7477b6..2ce7d5e48 100644 --- a/nimbus/sync/snap/get_nodedata.nim +++ b/nimbus/sync/snap/peer/reply_data.nim @@ -60,40 +60,53 @@ ## matching. Before this module was written, we tended to accept whatever came ## and assume a lot about replies. It often worked but wasn't robust enough. -{.push raises: [Defect].} - import std/[sequtils, sets, tables, hashes], chronos, eth/[common/eth_types, p2p], nimcrypto/keccak, stint, - ".."/[protocol, protocol/pickeled_eth_tracers], - "."/[base_desc, path_desc, pie/peer_desc, timer_helper, types] + "../.."/[protocol, protocol/trace_config, types], + ".."/[base_desc, path_desc, timer_helper] + +{.push raises: [Defect].} + +logScope: + topics = "snap reply" type - NodeDataRequest = ref object of NodeDataRequestBase - sp: SnapPeerEx - hashes: seq[NodeHash] - future: Future[NodeDataReply] - timer: TimerCallback - pathRange: (InteriorPath, InteriorPath) - fullHashed: bool + ReplyData* = ref object + ## Opaque object handle for reply message + reverseMap: seq[int] ## for reading out the `hashVerifiedData[]` + hashVerifiedData: seq[Blob] - NodeDataReply* = ref object - reverseMap: seq[int] # Access with `reversMap(i)` instead. - hashVerifiedData*: seq[Blob] + ReplyDataType* = enum + NoReplyData + SingleEntryReply + MultipleEntriesReply + RequestData = ref object + sp: SnapPeer + hashes: seq[NodeHash] + future: Future[ReplyData] + timer: TimerCallback + pathRange: (InteriorPath, InteriorPath) + fullHashed: bool -proc ex(base: NodeDataRequestBase): NodeDataRequest = - ## to extended object version - base.NodeDataRequest + RequestDataQueue = ref object of SnapPeerRequestsBase + liveRequests: HashSet[RequestData] + empties: int + # `OrderedSet` was considered instead of `seq` here, but it has a slow + # implementation of `excl`, defeating the motivation for using it. + waitingOnEmpties: seq[RequestData] + beforeFirstHash: seq[RequestData] + beforeFullHash: HashSet[RequestData] + # We need to be able to lookup requests by the hash of reply data. + # `ptr NodeHash` is used here so the table doesn't require an independent + # copy of the hash. The hash is part of the request object. + itemHash: Table[ptr NodeHash, (RequestData,int)] -proc ex(pair: (NodeDataRequestBase,int)): (NodeDataRequest, int) = - ## to extended object version - (pair[0].ex, pair[1]) - -proc hash(request: NodeDataRequest|NodeDataRequestBase): Hash = +proc hash(request: RequestData): Hash = hash(cast[pointer](request)) proc hash(hash: ptr Hash256): Hash = @@ -102,67 +115,73 @@ proc hash(hash: ptr Hash256): Hash = proc `==`(hash1, hash2: ptr Hash256): bool = hash1[] == hash2[] +proc requestsEx(sp: SnapPeer): RequestDataQueue = + sp.requests.RequestDataQueue + +proc `requestsEx=`(sp: SnapPeer; value: RequestDataQueue) = + sp.requests = value + # ------------------------------------------------------------------------------ # Private logging helpers # ------------------------------------------------------------------------------ -template pathRange(request: NodeDataRequest): string = +template pathRange(request: RequestData): string = pathRange(request.pathRange[0], request.pathRange[1]) -proc traceGetNodeDataSending(request: NodeDataRequest) = - traceSendSending "GetNodeData", peer=request.sp, +proc traceGetNodeDataSending(request: RequestData) = + trace trEthSendSending & "GetNodeData", peer=request.sp, hashes=request.hashes.len, pathRange=request.pathRange -proc traceGetNodeDataDelaying(request: NodeDataRequest) = - traceSendDelaying "GetNodeData", peer=request.sp, +proc traceGetNodeDataDelaying(request: RequestData) = + trace trEthSendDelaying & "GetNodeData", peer=request.sp, hashes=request.hashes.len, pathRange=request.pathRange -proc traceGetNodeDataSendError(request: NodeDataRequest, +proc traceGetNodeDataSendError(request: RequestData, e: ref CatchableError) = - traceRecvError "sending GetNodeData", peer=request.sp, + trace trEthRecvError & "sending GetNodeData", peer=request.sp, error=e.msg, hashes=request.hashes.len, pathRange=request.pathRange -proc traceNodeDataReplyError(request: NodeDataRequest, +proc traceReplyDataError(request: RequestData, e: ref CatchableError) = - traceRecvError "waiting for reply to GetNodeData", + trace trEthRecvError & "waiting for reply to GetNodeData", peer=request.sp, error=e.msg, hashes=request.hashes.len, pathRange=request.pathRange -proc traceNodeDataReplyTimeout(request: NodeDataRequest) = - traceRecvTimeoutWaiting "for reply to GetNodeData", +proc traceReplyDataTimeout(request: RequestData) = + trace trEthRecvTimeoutWaiting & "for reply to GetNodeData", hashes=request.hashes.len, pathRange=request.pathRange, peer=request.sp -proc traceGetNodeDataDisconnected(request: NodeDataRequest) = - traceRecvError "peer disconnected, not sending GetNodeData", +proc traceGetNodeDataDisconnected(request: RequestData) = + trace trEthRecvError & "peer disconnected, not sending GetNodeData", peer=request.sp, hashes=request.hashes.len, pathRange=request.pathRange -proc traceNodeDataReplyEmpty(sp: SnapPeerEx, request: NodeDataRequest) = +proc traceReplyDataEmpty(sp: SnapPeer, request: RequestData) = # `request` can be `nil` because we don't always know which request # the empty reply goes with. Therefore `sp` must be included. if request.isNil: - traceRecvGot "EMPTY NodeData", peer=sp, got=0 + trace trEthRecvGot & "EMPTY NodeData", peer=sp, got=0 else: - traceRecvGot "NodeData", peer=sp, got=0, + trace trEthRecvGot & "NodeData", peer=sp, got=0, requested=request.hashes.len, pathRange=request.pathRange -proc traceNodeDataReplyUnmatched(sp: SnapPeerEx, got: int) = +proc traceReplyDataUnmatched(sp: SnapPeer, got: int) = # There is no request for this reply. Therefore `sp` must be included. - traceRecvProtocolViolation "non-reply NodeData", peer=sp, got - debug "Snap: Warning: Unexpected non-reply NodeData from peer" + trace trEthRecvProtocolViolation & "non-reply NodeData", peer=sp, got + debug "Warning: Unexpected non-reply NodeData from peer" -proc traceNodeDataReply(request: NodeDataRequest, +proc traceReplyData(request: RequestData, got, use, unmatched, other, duplicates: int) = - if tracePackets: + when trEthTracePacketsOk: logScope: got=got logScope: requested=request.hashes.len logScope: pathRange=request.pathRange logScope: peer=request.sp if got > request.hashes.len and (unmatched + other) == 0: - traceRecvGot "EXCESS reply NodeData" + trace trEthRecvGot & "EXCESS reply NodeData" elif got == request.hashes.len or use != got: - traceRecvGot "reply NodeData" + trace trEthRecvGot & "reply NodeData" elif got < request.hashes.len: - traceRecvGot "TRUNCATED reply NodeData" + trace trEthRecvGot & "TRUNCATED reply NodeData" if use != got: logScope: @@ -173,27 +192,29 @@ proc traceNodeDataReply(request: NodeDataRequest, pathRange=request.pathRange peer=request.sp if unmatched > 0: - traceRecvProtocolViolation "incorrect hashes in NodeData" - debug "Snap: Warning: NodeData has nodes with incorrect hashes" + trace trEthRecvProtocolViolation & "incorrect hashes in NodeData" + debug "Warning: NodeData has nodes with incorrect hashes" elif other > 0: - traceRecvProtocolViolation "mixed request nodes in NodeData" - debug "Snap: Warning: NodeData has nodes from mixed requests" + trace trEthRecvProtocolViolation & "mixed request nodes in NodeData" + debug "Warning: NodeData has nodes from mixed requests" elif got > request.hashes.len: # Excess without unmatched/other is only possible with duplicates > 0. - traceRecvProtocolViolation "excess nodes in NodeData" - debug "Snap: Warning: NodeData has more nodes than requested" + trace trEthRecvProtocolViolation & "excess nodes in NodeData" + debug "Warning: NodeData has more nodes than requested" else: - traceRecvProtocolViolation "duplicate nodes in NodeData" - debug "Snap: Warning: NodeData has duplicate nodes" + trace trEthRecvProtocolViolation & "duplicate nodes in NodeData" + debug "Warning: NodeData has duplicate nodes" # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ -proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], - reverseMap: var seq[int], - use, unmatched, other, duplicates: var int - ): NodeDataRequest = +proc nodeDataMatchRequest( + rq: RequestDataQueue, + data: openArray[Blob], + reverseMap: var seq[int], + use, unmatched, other, duplicates: var int + ): RequestData = ## Verify hashes in the received node data and use them to find the matching ## request, and match individual nodes to indices in the request in case they ## are out of order, which is allowed. Note, even if we know which request, @@ -206,11 +227,11 @@ proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], ## `use`, `unmatched`, `other` or `duplicates` are incremented for each node. ## If the last three happen, the reply has errors, but the caller can decide ## what to do. Non-nil `request` may still be returned with those errors. - var request: NodeDataRequest = nil + var request: RequestData = nil # Iterate through reply data, hashing and efficiently finding what matches. for i in 0 ..< data.len: - var itemRequest: NodeDataRequest + var itemRequest: RequestData var index = 0 let hash = data[i].toNodeHash if i == 0: @@ -220,7 +241,7 @@ proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], # make sure we always find the oldest queued request first. var j = 0 while j < rq.beforeFirstHash.len: - let hashRequest = rq.beforeFirstHash[j].NodeDataRequest + let hashRequest = rq.beforeFirstHash[j].RequestData if hashRequest.hashes[0] == hash: itemRequest = hashRequest break @@ -229,7 +250,7 @@ proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], # in the global request table when replies have items in requested # order, even though replies themselves are out of order. if j == 0: - (itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash).ex + (itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash) if not itemRequest.isNil: break rq.itemHash[addr hashRequest.hashes[0]] = (hashRequest, 0) @@ -247,7 +268,7 @@ proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], # If this succeeds, the reply must have items out of requested order. # If it fails, a peer sent a bad reply. if itemRequest.isNil: - (itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash).ex + (itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash) if itemRequest.isNil: # Hash and search items in the current request first, if there is one. if not request.isNil and not request.fullHashed: @@ -255,7 +276,7 @@ proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], for j in 0 ..< request.hashes.len: rq.itemHash[addr request.hashes[j]] = (request, j) (itemRequest, index) = - rq.itemHash.getOrDefault(unsafeAddr hash).ex + rq.itemHash.getOrDefault(unsafeAddr hash) if itemRequest.isNil: # Hash and search all items across all requests. if rq.beforeFirstHash.len + rq.beforeFullHash.len > 0: @@ -263,12 +284,12 @@ proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], rq.beforeFirstHash.add(rq.beforeFullHash.toSeq) rq.beforeFullHash.clear() for hashRequest in rq.beforeFirstHash: - if not hashRequest.ex.fullHashed: - hashRequest.ex.fullHashed = true - for j in 0 ..< hashRequest.ex.hashes.len: - rq.itemHash[addr hashRequest.ex.hashes[j]] = (hashRequest, j) + if not hashRequest.fullHashed: + hashRequest.fullHashed = true + for j in 0 ..< hashRequest.hashes.len: + rq.itemHash[addr hashRequest.hashes[j]] = (hashRequest, j) rq.beforeFirstHash.setLen(0) - (itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash).ex + (itemRequest, index) = rq.itemHash.getOrDefault(unsafeAddr hash) if itemRequest.isNil: # Not found anywhere. inc unmatched @@ -297,15 +318,15 @@ proc nodeDataMatchRequest(rq: NodeDataRequestQueue, data: openArray[Blob], return request -proc nodeDataRequestEnqueue(rq: NodeDataRequestQueue, - request: NodeDataRequest) = - ## Add `request` to the data structures in `rq: NodeDataRequest`. +proc nodeDataRequestEnqueue(rq: RequestDataQueue, + request: RequestData) = + ## Add `request` to the data structures in `rq: RequestData`. doAssert not rq.liveRequests.containsOrIncl(request) rq.beforeFirstHash.add(request) -proc nodeDataRequestDequeue(rq: NodeDataRequestQueue, - request: NodeDataRequest) = - ## Remove `request` from the data structures in `rq: NodeDataRequest`. +proc nodeDataRequestDequeue(rq: RequestDataQueue, + request: RequestData) = + ## Remove `request` from the data structures in `rq: RequestData`. doAssert not rq.liveRequests.missingOrExcl(request) let index = rq.beforeFirstHash.find(request) if index >= 0: @@ -315,33 +336,33 @@ proc nodeDataRequestDequeue(rq: NodeDataRequestQueue, rq.itemHash.del(addr request.hashes[j]) # Forward declarations. -proc nodeDataTryEmpties(rq: NodeDataRequestQueue) -proc nodeDataEnqueueAndSend(request: NodeDataRequest) {.async.} +proc nodeDataTryEmpties(rq: RequestDataQueue) +proc nodeDataEnqueueAndSend(request: RequestData) {.async.} -proc nodeDataComplete(request: NodeDataRequest, reply: NodeDataReply, +proc nodeDataComplete(request: RequestData, reply: ReplyData, insideTryEmpties = false) = ## Complete `request` with received data or other reply. if request.future.finished: # Subtle: Timer can trigger and its callback be added to Chronos run loop, # then data event trigger and call `clearTimer()`. The timer callback # will then run but it must be ignored. - debug "Snap: Warning: Resolved timer race over NodeData reply" + debug "Warning: Resolved timer race over NodeData reply" else: request.timer.clearTimer() request.future.complete(reply) - let rq = request.sp.nodeDataRequests + let rq = request.sp.requestsEx trace "nodeDataRequestDequeue", addr=cast[pointer](request).repr rq.nodeDataRequestDequeue(request) # It may now be possible to match empty replies to earlier requests. if not insideTryEmpties: rq.nodeDataTryEmpties() -proc nodeDataTimeout(request: NodeDataRequest) = +proc nodeDataTimeout(request: RequestData) = ## Complete `request` with timeout. - request.traceNodeDataReplyTimeout() + request.traceReplyDataTimeout() {.gcsafe.}: request.nodeDataComplete(nil) -proc nodeDataTryEmpties(rq: NodeDataRequestQueue) = +proc nodeDataTryEmpties(rq: RequestDataQueue) = ## See if we can match queued empty replies to earlier requests. # TODO: This approach doesn't handle timeouts and errors correctly. # The problem is it's ambiguous whether an empty reply after timed out @@ -351,43 +372,46 @@ proc nodeDataTryEmpties(rq: NodeDataRequestQueue) = if rq.liveRequests.len > 0: # Careful: Use `.toSeq` below because we must not use the `HashSet` # iterator while the set is being changed. - for request in rq.liveRequests.toSeq.mapIt(it.ex): + for request in rq.liveRequests.toSeq: # Constructed reply object, because empty is different from timeout. - request.nodeDataComplete(NodeDataReply(), true) + request.nodeDataComplete(ReplyData(), true) # Move all temporarily delayed requests to the live state, and send them. if rq.waitingOnEmpties.len > 0: - var tmpList: seq[NodeDataRequestBase] + var tmpList: seq[RequestData] swap(tmpList, rq.waitingOnEmpties) for i in 0 ..< tmpList.len: - asyncSpawn nodeDataEnqueueAndSend(tmpList[i].ex) + asyncSpawn nodeDataEnqueueAndSend(tmpList[i]) -proc nodeDataNewRequest(sp: SnapPeerEx, hashes: seq[NodeHash], - pathFrom, pathTo: InteriorPath - ): NodeDataRequest = - ## Make a new `NodeDataRequest` to receive a reply or timeout in future. The +proc new( + T: type RequestData, + sp: SnapPeer, + hashes: seq[NodeHash], + pathFrom, pathTo: InteriorPath + ): RequestData = + ## Make a new `RequestData` to receive a reply or timeout in future. The ## caller is responsible for sending the `GetNodeData` request, and must do ## that after this setup (not before) to avoid race conditions. - let request = NodeDataRequest(sp: sp, hashes: hashes, + let request = RequestData(sp: sp, hashes: hashes, pathRange: (pathFrom, pathTo)) # TODO: Cache the time when making batches of requests, instead of calling # `Moment.fromNow()` which always does a system call. `p2pProtocol` request # timeouts have the same issue (and is where we got 10 seconds default). -# request.timer = setTimer(Moment.fromNow(10.seconds), -# nodeDataTimeout, cast[pointer](request)) + # request.timer = setTimer(Moment.fromNow(10.seconds), + # nodeDataTimeout, cast[pointer](request)) request.timer = safeSetTimer(Moment.fromNow(10.seconds), nodeDataTimeout, request) - request.future = newFuture[NodeDataReply]() + request.future = newFuture[ReplyData]() return request -proc nodeDataEnqueueAndSend(request: NodeDataRequest) {.async.} = +proc nodeDataEnqueueAndSend(request: RequestData) {.async.} = ## Helper function to send an `eth.GetNodeData` request. ## But not when we're draining the in flight queue to match empty replies. let sp = request.sp - if sp.stopped: + if sp.ctrl.runState == SyncStopped: request.traceGetNodeDataDisconnected() request.future.complete(nil) return - let rq = sp.nodeDataRequests + let rq = sp.requestsEx if rq.empties > 0: request.traceGetNodeDataDelaying() rq.waitingOnEmpties.add(request) @@ -403,13 +427,13 @@ proc nodeDataEnqueueAndSend(request: NodeDataRequest) {.async.} = except CatchableError as e: request.traceGetNodeDataSendError(e) inc sp.stats.major.networkErrors - sp.stopped = true + sp.ctrl.runState = SyncStopped request.future.fail(e) -proc onNodeData(sp: SnapPeerEx, data: openArray[Blob]) = +proc onNodeData(sp: SnapPeer, data: openArray[Blob]) = ## Handle an incoming `eth.NodeData` reply. ## Practically, this is also where all the incoming packet trace messages go. - let rq = sp.nodeDataRequests + let rq = sp.requestsEx # Empty replies are meaningful, but we can only associate them with requests # when there are enough empty replies to cover all outstanding requests. If @@ -419,23 +443,23 @@ proc onNodeData(sp: SnapPeerEx, data: openArray[Blob]) = # If there are no requests, don't queue, just let processing fall # through until the "non-reply" protocol violation error. if rq.liveRequests.len > 0: - sp.traceNodeDataReplyEmpty(if rq.liveRequests.len != 1: nil - else: rq.liveRequests.toSeq[0].ex) + sp.traceReplyDataEmpty(if rq.liveRequests.len != 1: nil + else: rq.liveRequests.toSeq[0]) inc rq.empties # It may now be possible to match empty replies to earlier requests. rq.nodeDataTryEmpties() return - let reply = NodeDataReply() + let reply = ReplyData() var (use, unmatched, other, duplicates) = (0, 0, 0, 0) let request = nodeDataMatchRequest(rq, data, reply.reverseMap, use, unmatched, other, duplicates) if request.isNil: - sp.traceNodeDataReplyUnmatched(data.len) + sp.traceReplyDataUnmatched(data.len) return - request.traceNodeDataReply(data.len, use, unmatched, other, duplicates) + request.traceReplyData(data.len, use, unmatched, other, duplicates) # TODO: Speed improvement possible here. if reply.reverseMap.len == 0: @@ -458,11 +482,15 @@ proc onNodeData(sp: SnapPeerEx, data: openArray[Blob]) = # Public functions # ------------------------------------------------------------------------------ -proc getNodeData*(sp: SnapPeerEx, hashes: seq[NodeHash], - pathFrom, pathTo: InteriorPath): Future[NodeDataReply] - {.async.} = +proc new*( + T: type ReplyData, + sp: SnapPeer, + hashes: seq[NodeHash], + pathFrom = InteriorPath(), + pathTo = InteriorPath() + ): Future[T] {.async.} = ## Async function to send a `GetNodeData` request to a peer, and when the - ## peer replies, or on timeout or error, return `NodeDataReply`. + ## peer replies, or on timeout or error, return `ReplyData`. ## ## The request is a list of hashes. The reply is a list of trie nodes or ## contract bytecodes matching those hashes, not necessarily in the same @@ -476,28 +504,55 @@ proc getNodeData*(sp: SnapPeerEx, hashes: seq[NodeHash], ## ## `pathFrom` and `pathTo` are not used except for logging. - let request = sp.nodeDataNewRequest(hashes, pathFrom, pathTo) + let request = RequestData.new(sp, hashes, pathFrom, pathTo) # There is no "Sending..." trace message here, because it can be delayed # by the empty reply logic in `nodeDataEnqueueAndSend`. - var reply: NodeDataReply = nil + var reply: ReplyData = nil try: await request.nodeDataEnqueueAndSend() reply = await request.future except CatchableError as e: - request.traceNodeDataReplyError(e) + request.traceReplyDataError(e) inc sp.stats.major.networkErrors - sp.stopped = true + sp.ctrl.runState = SyncStopped return nil + # Timeout, packet and packet error trace messages are done in `onNodeData` # and `nodeDataTimeout`, where there is more context than here. Here we # always received just valid data with hashes already verified, or `nil`. return reply -proc setupGetNodeData*(sp: SnapPeerEx) = - ## Initialise `SnapPeerEx` to support `GetNodeData` calls. +proc replyType*(reply: ReplyData): ReplyDataType = + ## Fancy interface for evaluating the reply lengths for none, 1, or many. + ## If the `reply` argument is `nil`, the result `NoReplyData` is returned + ## which is the same as for zero lengths reply. + if reply.isNil or reply.hashVerifiedData.len == 0: + NoReplyData + elif reply.hashVerifiedData.len == 1: + SingleEntryReply + else: + MultipleEntriesReply - if sp.nodeDataRequests.isNil: - sp.nodeDataRequests = NodeDataRequestQueue() +proc `[]`*(reply: ReplyData; inx: int): Blob = + ## Returns the reverse indexed item from the reply cache (if any). If + ## `reply` is `nil` or `inx` is out of bounds, an empty `Blob` (i.e. `@[]`) + ## is returned. + ## + ## Note that the length of the `reply` list is limited by the `new()` + ## contructor argument `hashes`. So there is no `len` directive used. + if 0 <= inx: + if inx < reply.reverseMap.len: + let xni = reply.reverseMap[inx] - 1 + if 0 <= xni: + return reply.hashVerifiedData[xni] + if inx < reply.hashVerifiedData.len: + return reply.hashVerifiedData[inx] + +proc replyDataSetup*(sp: SnapPeer) = + ## Initialise `SnapPeer` to support `replyDataGet()` calls. + + if sp.requestsEx.isNil: + sp.requestsEx = RequestDataQueue() sp.peer.state(eth).onNodeData = proc (_: Peer, data: openArray[Blob]) = @@ -505,17 +560,10 @@ proc setupGetNodeData*(sp: SnapPeerEx) = sp.peer.state(eth).onGetNodeData = proc (_: Peer, hashes: openArray[Hash256], data: var seq[Blob]) = - # Return empty nodes result. This callback is installed to - # ensure we don't reply with nodes from the chainDb. + ## Return empty nodes result. This callback is installed to + ## ensure we don't reply with nodes from the chainDb. discard -proc reverseMap*(reply: NodeDataReply, index: int): int = - ## Given an index into the request hash list, return index into the reply - ## `hashVerifiedData`, or -1 if there is no data for that request hash. - if index < reply.reverseMap.len: reply.reverseMap[index] - 1 - elif index < reply.hashVerifiedData.len: index - else: -1 - # ------------------------------------------------------------------------------ # End # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/pie/sync_desc.nim b/nimbus/sync/snap/peer/sync_fetch_xdesc.nim similarity index 83% rename from nimbus/sync/snap/pie/sync_desc.nim rename to nimbus/sync/snap/peer/sync_fetch_xdesc.nim index b9b6456de..9a2000610 100644 --- a/nimbus/sync/snap/pie/sync_desc.nim +++ b/nimbus/sync/snap/peer/sync_fetch_xdesc.nim @@ -20,7 +20,7 @@ import {.push raises: [Defect].} type - SharedFetchState* = ref object + SnapSyncFetchEx* = ref object of SnapSyncFetchBase ## Account fetching state that is shared among all peers. # Leaf path ranges not fetched or in progress on any peer. leafRanges*: seq[LeafRange] @@ -34,9 +34,6 @@ type countRangeTrieStarted*: bool logTicker: TimerCallback - SnapSyncEx* = ref object of SnapSyncBase - sharedFetch*: SharedFetchState - # ------------------------------------------------------------------------------ # Private timer helpers # ------------------------------------------------------------------------------ @@ -69,9 +66,9 @@ proc percent(value: UInt256, discriminator: bool): string = result.add('%') -proc setLogTicker(sf: SharedFetchState; at: Moment) {.gcsafe.} +proc setLogTicker(sf: SnapSyncFetchEx; at: Moment) {.gcsafe.} -proc runLogTicker(sf: SharedFetchState) {.gcsafe.} = +proc runLogTicker(sf: SnapSyncFetchEx) {.gcsafe.} = doAssert not sf.isNil info "State: Account sync progress", percent = percent(sf.countRange, sf.countRangeStarted), @@ -80,20 +77,20 @@ proc runLogTicker(sf: SharedFetchState) {.gcsafe.} = trie = percent(sf.countRangeTrie, sf.countRangeTrieStarted) sf.setLogTicker(Moment.fromNow(1.seconds)) -proc setLogTicker(sf: SharedFetchState; at: Moment) = +proc setLogTicker(sf: SnapSyncFetchEx; at: Moment) = sf.logTicker = safeSetTimer(at, runLogTicker, sf) # ------------------------------------------------------------------------------ # Public constructor # ------------------------------------------------------------------------------ -proc new*(T: type SharedFetchState; startLoggingAfter = 100.milliseconds): T = - result = SharedFetchState( +proc new*(T: type SnapSyncFetchEx; startAfter = 100.milliseconds): T = + result = SnapSyncFetchEx( leafRanges: @[LeafRange( leafLow: LeafPath.low, leafHigh: LeafPath.high)]) result.logTicker = safeSetTimer( - Moment.fromNow(startLoggingAfter), + Moment.fromNow(startAfter), runLogTicker, result) @@ -101,9 +98,17 @@ proc new*(T: type SharedFetchState; startLoggingAfter = 100.milliseconds): T = # Public getters # ------------------------------------------------------------------------------ -proc nsx*[T](sp: T): SnapSyncEx = - ## Handy helper, typically used with `T` instantiated as `SnapPeerEx` - sp.ns.SnapSyncEx +proc sharedFetchEx*(ns: SnapSync): SnapSyncFetchEx = + ## Handy helper + ns.sharedFetch.SnapSyncFetchEx + +# ------------------------------------------------------------------------------ +# Public setters +# ------------------------------------------------------------------------------ + +proc `sharedFetchEx=`*(ns: SnapSync; value: SnapSyncFetchEx) = + ## Handy helper + ns.sharedFetch = value # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/validate_trienode.nim b/nimbus/sync/snap/peer/validate_trienode.nim similarity index 97% rename from nimbus/sync/snap/validate_trienode.nim rename to nimbus/sync/snap/peer/validate_trienode.nim index 479409474..d44dd6421 100644 --- a/nimbus/sync/snap/validate_trienode.nim +++ b/nimbus/sync/snap/peer/validate_trienode.nim @@ -31,18 +31,24 @@ ## exception to `parseTrieNodeError` if it occurs. import - eth/[common/eth_types, p2p, rlp], - ../trace_helper, - "."/[base_desc, path_desc, types] + eth/[common/eth_types, p2p], + ../../types, + ".."/[base_desc, path_desc] {.push raises: [Defect].} +logScope: + topics = "snap validate trie node" + type TrieNodeParseContext* = object childQueue*: seq[(InteriorPath, NodeHash, bool)] leafQueue*: seq[(LeafPath, NodeHash, Blob)] errors*: int +const + # Local debugging + traceIndividualNodesOk = true template read(rlp: var Rlp, T: type NodeHash): T = rlp.read(Hash256).T @@ -82,7 +88,7 @@ template nodeError(msg: string{lit}, more: varargs[untyped]) = #echo inspect(rlpFromBytes(nodeBytes)) inc context.errors -proc parseLeafValue(sp: SnapPeerBase, +proc parseLeafValue(sp: SnapPeer, nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob, nodeRlp: var Rlp, leafPath: InteriorPath, context: var TrieNodeParseContext @@ -114,7 +120,7 @@ proc parseLeafValue(sp: SnapPeerBase, context.leafQueue.add((leafPath.toLeafPath, nodeHash, nodeRlp.toBytes)) - if traceIndividualNodes: + when traceIndividualNodesOk: let leafBytes = context.leafQueue[^1][2] trace "Trie: Account leaf found", peer=sp, path=combinePaths(nodePath, leafPath), @@ -123,13 +129,13 @@ proc parseLeafValue(sp: SnapPeerBase, # echo inspect(rlpFromBytes(leafBytes)) # Forward declaration, used for bounded, rare recursion. -proc parseTrieNode*(sp: SnapPeerBase, +proc parseTrieNode*(sp: SnapPeer, nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob, fromExtension: bool, context: var TrieNodeParseContext ) {.gcsafe, raises: [Defect, RlpError].} -proc parseExtensionChild(sp: SnapPeerBase, +proc parseExtensionChild(sp: SnapPeer, nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob, nodeRlp: var Rlp, childPath: InteriorPath, @@ -177,7 +183,7 @@ proc parseExtensionChild(sp: SnapPeerBase, else: childError "Extension node child (RLP element 1) has length > 32 bytes" -proc parseExtensionOrLeaf(sp: SnapPeerBase, +proc parseExtensionOrLeaf(sp: SnapPeer, nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob, nodeRlp: var Rlp, fromExtension: bool, @@ -265,7 +271,7 @@ proc parseExtensionOrLeaf(sp: SnapPeerBase, sp.parseExtensionChild(nodePath, nodeHash, nodeBytes, nodeRlp, childPath, context) -proc parseBranchNode(sp: SnapPeerBase, +proc parseBranchNode(sp: SnapPeer, nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob, nodeRlp: var Rlp, context: var TrieNodeParseContext @@ -339,7 +345,7 @@ proc parseBranchNode(sp: SnapPeerBase, branches=branchCount, minBranches=2 return -proc parseTrieNode*(sp: SnapPeerBase, +proc parseTrieNode*(sp: SnapPeer, nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob, fromExtension: bool, context: var TrieNodeParseContext ) {.raises: [Defect, RlpError].} = @@ -439,7 +445,7 @@ proc parseTrieNode*(sp: SnapPeerBase, listLen=nodeListLen return -proc parseTrieNodeError*(sp: SnapPeerBase, nodePath: InteriorPath, +proc parseTrieNodeError*(sp: SnapPeer, nodePath: InteriorPath, nodeHash: NodeHash, nodeBytes: Blob, context: var TrieNodeParseContext, exception: ref RlpError) = diff --git a/nimbus/sync/snap/pie/peer_desc.nim b/nimbus/sync/snap/pie/peer_desc.nim deleted file mode 100644 index b8d00224c..000000000 --- a/nimbus/sync/snap/pie/peer_desc.nim +++ /dev/null @@ -1,78 +0,0 @@ -# Nimbus - Types, data structures and shared utilities used in network sync -# -# Copyright (c) 2018-2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or -# distributed except according to those terms. - -import - std/[sets, tables], - chronos, - stint, - ".."/[base_desc, path_desc, types] - -type - NodeDataRequestBase* = ref object of RootObj - ## Stub object, to be inherited - - SingleNodeRequestBase* = ref object of RootObj - ## Stub object, to be inherited - - NodeDataRequestQueue* = ref object - liveRequests*: HashSet[NodeDataRequestBase] - empties*: int - # `OrderedSet` was considered instead of `seq` here, but it has a slow - # implementation of `excl`, defeating the motivation for using it. - waitingOnEmpties*: seq[NodeDataRequestBase] - beforeFirstHash*: seq[NodeDataRequestBase] - beforeFullHash*: HashSet[NodeDataRequestBase] - # We need to be able to lookup requests by the hash of reply data. - # `ptr NodeHash` is used here so the table doesn't require an independent - # copy of the hash. The hash is part of the request object. - itemHash*: Table[ptr NodeHash, (NodeDataRequestBase, int)] - - FetchState* = ref object - ## Account fetching state on a single peer. - sp*: SnapPeerEx - nodeGetQueue*: seq[SingleNodeRequestBase] - nodeGetsInFlight*: int - scheduledBatch*: bool - progressPrefix*: string - progressCount*: int - nodesInFlight*: int - getNodeDataErrors*: int - leafRange*: LeafRange - unwindAccounts*: int64 - unwindAccountBytes*: int64 - finish*: Future[void] - - SnapPeerEx* = ref object of SnapPeerBase - nodeDataRequests*: NodeDataRequestQueue - fetchState*: FetchState - -# ------------------------------------------------------------------------------ -# Public functions -# ------------------------------------------------------------------------------ - -proc `$`*(sp: SnapPeerEx): string = - $sp.SnapPeerBase - -# ------------------------------------------------------------------------------ -# Public getter -# ------------------------------------------------------------------------------ - -proc ex*(base: SnapPeerBase): SnapPeerEx = - ## to extended object instance version - base.SnapPeerEx - -# ------------------------------------------------------------------------------ -# Public setter -# ------------------------------------------------------------------------------ - -# ------------------------------------------------------------------------------ -# End -# ------------------------------------------------------------------------------ diff --git a/nimbus/sync/trace_helper.nim b/nimbus/sync/trace_helper.nim deleted file mode 100644 index 2bbed0c38..000000000 --- a/nimbus/sync/trace_helper.nim +++ /dev/null @@ -1,69 +0,0 @@ -# Nimbus - Types, data structures and shared utilities used in network sync -# -# Copyright (c) 2018-2021 Status Research & Development GmbH -# Licensed under either of -# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE) or -# http://www.apache.org/licenses/LICENSE-2.0) -# * MIT license ([LICENSE-MIT](LICENSE-MIT) or -# http://opensource.org/licenses/MIT) -# at your option. This file may not be copied, modified, or -# distributed except according to those terms. - -import - chronicles, - eth/common/eth_types, - stew/byteutils - -{.push raises: [Defect].} - -const - tracePackets* = true - ## Whether to `trace` log each sync network message. - traceGossips* = false - ## Whether to `trace` log each gossip network message. - traceHandshakes* = true - ## Whether to `trace` log each network handshake message. - traceTimeouts* = true - ## Whether to `trace` log each network request timeout. - traceNetworkErrors* = true - ## Whether to `trace` log each network request error. - tracePacketErrors* = true - ## Whether to `trace` log each messages with invalid data. - traceIndividualNodes* = false - ## Whether to `trace` log each trie node, account, storage, receipt, etc. - -template tracePacket*(msg: static[string], args: varargs[untyped]) = - if tracePackets: trace `msg`, `args` -template traceGossip*(msg: static[string], args: varargs[untyped]) = - if traceGossips: trace `msg`, `args` -template traceTimeout*(msg: static[string], args: varargs[untyped]) = - if traceTimeouts: trace `msg`, `args` -template traceNetworkError*(msg: static[string], args: varargs[untyped]) = - if traceNetworkErrors: trace `msg`, `args` -template tracePacketError*(msg: static[string], args: varargs[untyped]) = - if tracePacketErrors: trace `msg`, `args` -template traceIndividualNode*(msg: static[string], args: varargs[untyped]) = - if traceIndividualNodes: trace `msg`, `args` - -func toHex*(hash: Hash256): string = - ## Shortcut for buteutils.toHex(hash.data) - hash.data.toHex - -func traceStep*(request: BlocksRequest): string = - var str = if request.reverse: "-" else: "+" - if request.skip < high(typeof(request.skip)): - return str & $(request.skip + 1) - return static($(high(typeof(request.skip)).u256 + 1)) - -proc `$`*(hash: Hash256): string = - hash.data.toHex - -proc `$`*(blob: Blob): string = - blob.toHex - -proc `$`*(hashOrNum: HashOrNum): string = - # It's always obvious which one from the visible length of the string. - if hashOrNum.isHash: $hashOrNum.hash - else: $hashOrNum.number - -# End diff --git a/nimbus/sync/snap/types.nim b/nimbus/sync/types.nim similarity index 76% rename from nimbus/sync/snap/types.nim rename to nimbus/sync/types.nim index 881583043..a9b460eda 100644 --- a/nimbus/sync/snap/types.nim +++ b/nimbus/sync/types.nim @@ -46,8 +46,11 @@ type # Public Constructor # ------------------------------------------------------------------------------ +proc new*(T: type TxHash): T = Hash256().T proc new*(T: type NodeHash): T = Hash256().T - +proc new*(T: type BlockHash): T = Hash256().T +proc new*(T: type TrieHash): T = Hash256().T + # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ @@ -69,12 +72,36 @@ proc `==`*(a,b: BlockHash): bool {.borrow.} proc toNodeHash*(data: Blob): NodeHash = keccak256.digest(data).NodeHash +proc toHashOrNum*(bh: BlockHash): HashOrNum = + HashOrNum(isHash: true, hash: bh.Hash256) + # ------------------------------------------------------------------------------ # Public debugging helpers # ------------------------------------------------------------------------------ -proc `$`*(th: TrieHash|NodeHash): string = - th.Hash256.data.toHex +func toHex*(hash: Hash256): string = + ## Shortcut for buteutils.toHex(hash.data) + hash.data.toHex + +func `$`*(th: TrieHash|NodeHash): string = + th.Hash256.toHex + +func `$`*(hash: Hash256): string = + hash.toHex + +func `$`*(blob: Blob): string = + blob.toHex + +func `$`*(hashOrNum: HashOrNum): string = + # It's always obvious which one from the visible length of the string. + if hashOrNum.isHash: $hashOrNum.hash + else: $hashOrNum.number + +func traceStep*(request: BlocksRequest): string = + var str = if request.reverse: "-" else: "+" + if request.skip < high(typeof(request.skip)): + return str & $(request.skip + 1) + return static($(high(typeof(request.skip)).u256 + 1)) # ------------------------------------------------------------------------------ # End diff --git a/vendor/nim-stew b/vendor/nim-stew index bb705bf17..779ba052c 160000 --- a/vendor/nim-stew +++ b/vendor/nim-stew @@ -1 +1 @@ -Subproject commit bb705bf17b46d2c8f9bfb106d9cc7437009a2501 +Subproject commit 779ba052c827af46bea79ff8b12b159f68c0f14a