eth wire protocol: implement NewBlock and NewBlockHashes handler

and it also do invasive changes to fast sync because
they are tightly related.

fix #673
This commit is contained in:
jangko 2022-11-16 13:45:28 +07:00
parent c3e8f951b2
commit fffe071f86
No known key found for this signature in database
GPG Key ID: 31702AE10541E6B9
5 changed files with 915 additions and 171 deletions

View File

@ -416,7 +416,23 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
if ProtocolFlag.Eth in protocols and conf.maxPeers > 0:
case conf.syncMode:
of SyncMode.Default:
FastSyncCtx.new(nimbus.ethNode, nimbus.chainRef).start
let syncer = FastSyncCtx.new(nimbus.ethNode, nimbus.chainRef)
syncer.start
let wireHandler = EthWireRef(
nimbus.ethNode.protocolState(eth)
)
wireHandler.setNewBlockHandler(
fast.newBlockHandler,
cast[pointer](syncer)
)
wireHandler.setNewBlockHashesHandler(
fast.newBlockHashesHandler,
cast[pointer](syncer)
)
of SyncMode.Full, SyncMode.Snap:
discard

View File

@ -9,14 +9,18 @@
# except according to those terms.
import
std/[sets, options, random, hashes, sequtils],
std/[sets, options, random,
hashes, sequtils, math, tables, times],
chronicles,
chronos,
eth/[common, p2p],
eth/p2p/[private/p2p_types, peer_pool],
stew/byteutils,
"."/[protocol, types],
../p2p/chain
../p2p/[chain, clique/clique_sealer, gaslimit],
../db/db_chain,
../utils/difficulty,
".."/[constants, utils]
{.push raises:[Defect].}
@ -26,6 +30,7 @@ logScope:
const
minPeersToStartSync* = 2 # Wait for consensus of at least this
# number of peers before syncing
CleanupInterval = initDuration(minutes = 20)
type
#SyncStatus = enum
@ -33,6 +38,8 @@ type
# syncNotEnoughPeers
# syncTimeOut
HashToTime = TableRef[Hash256, Time]
BlockchainSyncDefect* = object of Defect
## Catch and relay exception
@ -43,6 +50,8 @@ type
Persisted
WantedBlocks = object
isHash: bool
hash: Hash256
startIndex: BlockNumber
numBlocks: uint
state: WantedBlocksState
@ -57,9 +66,306 @@ type
peerPool: PeerPool
trustedPeers: HashSet[Peer]
hasOutOfOrderBlocks: bool
busyPeers: HashSet[Peer]
knownByPeer: Table[Peer, HashToTime]
lastCleanup: Time
# ------------------------------------------------------------------------------
# Private functions: peers related functions
# ------------------------------------------------------------------------------
proc hash*(p: Peer): Hash = hash(cast[pointer](p))
proc cleanupKnownByPeer(ctx: FastSyncCtx) =
let now = getTime()
var tmp = initHashSet[Hash256]()
for _, map in ctx.knownByPeer:
for hash, time in map:
if time - now >= CleanupInterval:
tmp.incl hash
for hash in tmp:
map.del(hash)
tmp.clear()
var tmpPeer = initHashSet[Peer]()
for peer, map in ctx.knownByPeer:
if map.len == 0:
tmpPeer.incl peer
for peer in tmpPeer:
ctx.knownByPeer.del peer
ctx.lastCleanup = now
proc addToKnownByPeer(ctx: FastSyncCtx,
blockHash: Hash256,
peer: Peer): bool =
var map: HashToTime
if not ctx.knownByPeer.take(peer, map):
map = newTable[Hash256, Time]()
result = false
else:
result = true
map[blockHash] = getTime()
proc getPeers(ctx: FastSyncCtx, thisPeer: Peer): seq[Peer] =
# do not send back block/blockhash to thisPeer
for peer in peers(ctx.peerPool):
if peer != thisPeer:
result.add peer
proc handleLostPeer(ctx: FastSyncCtx) =
# TODO: ask the PeerPool for new connections and then call
# `obtainBlocksFromPeer`
discard
# ------------------------------------------------------------------------------
# Private functions: validators
# ------------------------------------------------------------------------------
proc validateDifficulty(ctx: FastSyncCtx, header, parentHeader: BlockHeader): bool =
try:
if ctx.chain.isBlockAfterTtd(header):
if header.difficulty != 0.u256:
trace "invalid difficulty",
expect=0, get=header.difficulty
return false
return true
let
db = ctx.chain.db
config = db.config
if config.poaEngine:
let rc = ctx.chain.clique.calcDifficulty(parentHeader)
if rc.isErr:
return false
if header.difficulty < rc.get():
trace "provided header difficulty is too low",
expect=rc.get(), get=header.difficulty
return false
return true
let calcDiffc = config.calcDifficulty(header.timestamp, parentHeader)
if header.difficulty < calcDiffc:
trace "provided header difficulty is too low",
expect=calcDiffc, get=header.difficulty
return false
return true
except CatchableError as e:
error "Exception in FastSync.validateDifficulty()",
exc = e.name, err = e.msg
return false
proc validateHeader(ctx: FastSyncCtx, header: BlockHeader, height = none(BlockNumber)): bool =
if header.parentHash == GENESIS_PARENT_HASH:
return true
let
db = ctx.chain.db
config = db.config
var parentHeader: BlockHeader
if not db.getBlockHeader(header.parentHash, parentHeader):
error "can't get parentHeader",
hash=header.parentHash, number=header.blockNumber
return false
if header.blockNumber != parentHeader.blockNumber + 1.toBlockNumber:
trace "invalid block number",
expect=parentHeader.blockNumber + 1.toBlockNumber,
get=header.blockNumber
return false
if header.timestamp <= parentHeader.timestamp:
trace "invalid timestamp",
parent=parentHeader.timestamp,
header=header.timestamp
return false
if not ctx.validateDifficulty(header, parentHeader):
return false
if config.poaEngine:
let period = initDuration(seconds = config.cliquePeriod)
# Timestamp diff between blocks is lower than PERIOD (clique)
if parentHeader.timestamp + period > header.timestamp:
trace "invalid timestamp diff (lower than period)",
parent=parentHeader.timestamp,
header=header.timestamp,
period
return false
let res = db.validateGasLimitOrBaseFee(header, parentHeader)
if res.isErr:
trace "validate gaslimit error",
msg=res.error
return false
if height.isSome:
let dif = height.get() - parentHeader.blockNumber
if not (dif < 8.toBlockNumber and dif > 1.toBlockNumber):
trace "uncle block has a parent that is too old or too young",
dif=dif,
height=height.get(),
parentNumber=parentHeader.blockNumber
return false
return true
# ------------------------------------------------------------------------------
# Private functions: sync worker
# ------------------------------------------------------------------------------
proc broadcastBlockHash(ctx: FastSyncCtx, hashes: seq[NewBlockHashesAnnounce], peers: seq[Peer]) {.async.} =
try:
var bha = newSeqOfCap[NewBlockHashesAnnounce](hashes.len)
for peer in peers:
for val in hashes:
let alreadyKnownByPeer = ctx.addToKnownByPeer(val.hash, peer)
if not alreadyKnownByPeer:
bha.add val
if bha.len > 0:
trace trEthSendNewBlockHashes, numHashes=bha.len, peer
await peer.newBlockHashes(bha)
bha.setLen(0)
except TransportError:
debug "Transport got closed during broadcastBlockHash"
except CatchableError as e:
debug "Exception in broadcastBlockHash", exc = e.name, err = e.msg
proc broadcastBlock(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.async.} =
try:
let
db = ctx.chain.db
blockHash = blk.header.blockHash
td = db.getScore(blk.header.parentHash) +
blk.header.difficulty
for peer in peers:
let alreadyKnownByPeer = ctx.addToKnownByPeer(blockHash, peer)
if not alreadyKnownByPeer:
trace trEthSendNewBlock,
number=blk.header.blockNumber, td=td,
hash=short(blockHash), peer
await peer.newBlock(blk, td)
except TransportError:
debug "Transport got closed during broadcastBlock"
except CatchableError as e:
debug "Exception in broadcastBlock", exc = e.name, err = e.msg
proc broadcastBlockHash(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.async.} =
try:
let bha = NewBlockHashesAnnounce(
number: blk.header.blockNumber,
hash: blk.header.blockHash
)
for peer in peers:
let alreadyKnownByPeer = ctx.addToKnownByPeer(bha.hash, peer)
if not alreadyKnownByPeer:
trace trEthSendNewBlockHashes,
number=bha.number, hash=short(bha.hash), peer
await peer.newBlockHashes([bha])
except TransportError:
debug "Transport got closed during broadcastBlockHash"
except CatchableError as e:
debug "Exception in broadcastBlockHash", exc = e.name, err = e.msg
proc sendBlockOrHash(ctx: FastSyncCtx, peer: Peer) {.async.} =
# because peer TD is lower than us,
# it become our recipient of block and block hashes
# instead of we download from it
try:
let
db = ctx.chain.db
peerBlockHash = peer.state(eth).bestBlockHash
var peerBlockHeader: BlockHeader
if not db.getBlockHeader(peerBlockHash, peerBlockHeader):
error "can't get block header", hash=short(peerBlockHash)
return
let
dist = (ctx.finalizedBlock - peerBlockHeader.blockNumber).truncate(int)
start = peerBlockHeader.blockNumber + 1.toBlockNumber
if dist == 1:
# only one block apart, send NewBlock
let number = ctx.finalizedBlock
var header: BlockHeader
var body: BlockBody
if not db.getBlockHeader(number, header):
error "can't get block header", number=number
return
let blockHash = header.blockHash
if not db.getBlockBody(blockHash, body):
error "can't get block body", number=number
return
let
ourTD = db.getScore(blockHash)
newBlock = EthBlock(
header: header,
txs: body.transactions,
uncles: body.uncles)
trace "send newBlock",
number = header.blockNumber,
hash = short(header.blockHash),
td = ourTD, peer
await peer.newBlock(newBlock, ourTD)
return
if dist > maxHeadersFetch:
# distance is too far, ignore this peer
return
# send hashes in batch
var
hash: Hash256
number = 0
hashes = newSeqOfCap[NewBlockHashesAnnounce](maxHeadersFetch)
while number < dist:
let blockNumber = start + number.toBlockNumber
if not db.getBlockHash(blockNumber, hash):
error "failed to get block hash", number=blockNumber
return
hashes.add(NewBlockHashesAnnounce(
number: blockNumber,
hash: hash))
if hashes.len == maxHeadersFetch:
trace "send newBlockHashes(batch)", numHashes=hashes.len, peer
await peer.newBlockHashes(hashes)
hashes.setLen(0)
inc number
# send the rest of hashes if available
if hashes.len > 0:
trace "send newBlockHashes(remaining)", numHashes=hashes.len, peer
await peer.newBlockHashes(hashes)
except TransportError:
debug "Transport got closed during obtainBlocksFromPeer"
except CatchableError as e:
debug "Exception in getBestBlockNumber()", exc = e.name, err = e.msg
# no need to exit here, because the context might still have blocks to fetch
# from this peer
discard e
proc endIndex(b: WantedBlocks): BlockNumber =
result = b.startIndex
result += (b.numBlocks - 1).toBlockNumber
@ -90,9 +396,10 @@ proc availableWorkItem(ctx: FastSyncCtx): int =
maxPendingBlock = endBlock
let nextRequestedBlock = maxPendingBlock + 1
# If this next block doesn't exist yet according to any of our peers, don't
# return a work item (and sync will be stopped).
if nextRequestedBlock >= ctx.endBlockNumber:
if nextRequestedBlock > ctx.endBlockNumber:
return -1
# Increase queue when there are no free (Initial / Persisted) work items in
@ -103,10 +410,36 @@ proc availableWorkItem(ctx: FastSyncCtx): int =
# Create new work item when queue was increased, reset when selected work item
# is at Persisted state.
var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).truncate(int)
var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).truncate(int) + 1
if numBlocks > maxHeadersFetch:
numBlocks = maxHeadersFetch
ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial)
ctx.workQueue[result] = WantedBlocks(
startIndex: nextRequestedBlock,
numBlocks : numBlocks.uint,
state : Initial,
isHash : false)
proc appendWorkItem(ctx: FastSyncCtx, hash: Hash256,
startIndex: BlockNumber, numBlocks: uint) =
for i in 0 .. ctx.workQueue.high:
if ctx.workQueue[i].state == Persisted:
ctx.workQueue[i] = WantedBlocks(
isHash : true,
hash : hash,
startIndex: startIndex,
numBlocks : numBlocks,
state : Initial)
return
let i = ctx.workQueue.len
ctx.workQueue.setLen(i + 1)
ctx.workQueue[i] = WantedBlocks(
isHash : true,
hash : hash,
startIndex: startIndex,
numBlocks : numBlocks,
state : Initial)
proc persistWorkItem(ctx: FastSyncCtx, wi: var WantedBlocks): ValidationResult
{.gcsafe, raises:[Defect,CatchableError].} =
@ -204,11 +537,6 @@ proc returnWorkItem(ctx: FastSyncCtx, workItem: int): ValidationResult
receivedBlocks
return ValidationResult.Error
proc handleLostPeer(ctx: FastSyncCtx) =
# TODO: ask the PeerPool for new connections and then call
# `obtainBlocksFromPeer`
discard
proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
let request = BlocksRequest(
startBlock: HashOrNum(isHash: true,
@ -228,13 +556,104 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} =
count=latestBlock.get.headers.len,
blockNumber=(if latestBlock.get.headers.len > 0: $result else: "missing")
proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} =
proc toRequest(workItem: WantedBlocks): BlocksRequest =
if workItem.isHash:
BlocksRequest(
startBlock: HashOrNum(isHash: true, hash: workItem.hash),
maxResults: workItem.numBlocks,
skip: 0,
reverse: false)
else:
BlocksRequest(
startBlock: HashOrNum(isHash: false, number: workItem.startIndex),
maxResults: workItem.numBlocks,
skip: 0,
reverse: false)
type
BodyHash = object
txRoot: Hash256
uncleHash: Hash256
proc hash*(x: BodyHash): Hash =
var h: Hash = 0
h = h !& hash(x.txRoot.data)
h = h !& hash(x.uncleHash.data)
result = !$h
proc fetchBodies(ctx: FastSyncCtx, peer: Peer,
workItemIdx: int, reqBodies: seq[bool]): Future[bool] {.async.} =
template workItem: auto = ctx.workQueue[workItemIdx]
var bodies = newSeqOfCap[BlockBody](workItem.headers.len)
var hashes = newSeqOfCap[KeccakHash](maxBodiesFetch)
doAssert(reqBodies.len == workItem.headers.len)
template fetchBodies() =
trace trEthSendSendingGetBlockBodies, peer,
hashes=hashes.len
let b = await peer.getBlockBodies(hashes)
if b.isNone:
raise newException(CatchableError, "Was not able to get the block bodies")
let bodiesLen = b.get.blocks.len
trace trEthRecvReceivedBlockBodies, peer,
count=bodiesLen, requested=hashes.len
if bodiesLen == 0:
raise newException(CatchableError, "Zero block bodies received for request")
elif bodiesLen < hashes.len:
hashes.delete(0, bodiesLen - 1)
elif bodiesLen == hashes.len:
hashes.setLen(0)
else:
raise newException(CatchableError, "Too many block bodies received for request")
bodies.add(b.get.blocks)
var numRequest = 0
for i, h in workItem.headers:
if reqBodies[i]:
hashes.add(h.blockHash)
inc numRequest
if hashes.len == maxBodiesFetch:
fetchBodies()
while hashes.len != 0:
fetchBodies()
workItem.bodies = newSeqOfCap[BlockBody](workItem.headers.len)
var bodyHashes = initTable[BodyHash, int]()
for z, body in bodies:
let bodyHash = BodyHash(
txRoot: calcTxRoot(body.transactions),
uncleHash: rlpHash(body.uncles))
bodyHashes[bodyHash] = z
for i, req in reqBodies:
if req:
let bodyHash = BodyHash(
txRoot: workItem.headers[i].txRoot,
uncleHash: workItem.headers[i].ommersHash)
let z = bodyHashes.getOrDefault(bodyHash, -1)
if z == -1:
error "header missing it's body",
number=workItem.headers[i].blockNumber,
hash=workItem.headers[i].blockHash.short
return false
workItem.bodies.add bodies[z]
else:
workItem.bodies.add BlockBody()
return true
proc obtainBlocksFromPeer(ctx: FastSyncCtx, peer: Peer) {.async.} =
# Update our best block number
try:
let bestBlockNumber = await peer.getBestBlockNumber()
if bestBlockNumber > syncCtx.endBlockNumber:
trace "New sync end block number", number = bestBlockNumber
syncCtx.endBlockNumber = bestBlockNumber
if ctx.endBlockNumber <= ctx.finalizedBlock:
# endBlockNumber need update
let bestBlockNumber = await peer.getBestBlockNumber()
if bestBlockNumber > ctx.endBlockNumber:
trace "New sync end block number", number = bestBlockNumber
ctx.endBlockNumber = bestBlockNumber
except TransportError:
debug "Transport got closed during obtainBlocksFromPeer"
except CatchableError as e:
@ -243,19 +662,14 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} =
# from this peer
discard e
while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1 and
while (let workItemIdx = ctx.availableWorkItem(); workItemIdx != -1 and
peer.connectionState notin {Disconnecting, Disconnected}):
template workItem: auto = syncCtx.workQueue[workItemIdx]
template workItem: auto = ctx.workQueue[workItemIdx]
workItem.state = Requested
trace "Requesting block headers", start = workItem.startIndex,
count = workItem.numBlocks, peer = peer.remote.node
let request = BlocksRequest(
startBlock: HashOrNum(isHash: false, number: workItem.startIndex),
maxResults: workItem.numBlocks,
skip: 0,
reverse: false)
var dataReceived = false
let request = toRequest(workItem)
var dataReceived = true
try:
trace trEthSendSendingGetBlockHeaders, peer,
startBlock=request.startBlock.number, max=request.maxResults,
@ -266,59 +680,63 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} =
count=results.get.headers.len, requested=request.maxResults
shallowCopy(workItem.headers, results.get.headers)
var bodies = newSeqOfCap[BlockBody](workItem.headers.len)
var hashes = newSeqOfCap[KeccakHash](maxBodiesFetch)
template fetchBodies() =
trace trEthSendSendingGetBlockBodies, peer,
hashes=hashes.len
let b = await peer.getBlockBodies(hashes)
if b.isNone:
raise newException(
CatchableError, "Was not able to get the block bodies")
let bodiesLen = b.get.blocks.len
trace trEthRecvReceivedBlockBodies, peer,
count=bodiesLen, requested=hashes.len
if bodiesLen == 0:
raise newException(CatchableError, "Zero block bodies received for request")
elif bodiesLen < hashes.len:
hashes.delete(0, bodiesLen - 1)
elif bodiesLen == hashes.len:
hashes.setLen(0)
else:
raise newException(CatchableError, "Too many block bodies received for request")
bodies.add(b.get.blocks)
var
reqBodies = newSeqOfCap[bool](workItem.headers.len)
numRequest = 0
nextIndex = workItem.startIndex
var nextIndex = workItem.startIndex
for i in workItem.headers:
if i.blockNumber != nextIndex:
raise newException(CatchableError, "The block numbers are not in sequence. Not processing this workItem.")
else:
nextIndex = nextIndex + 1
hashes.add(blockHash(i))
if hashes.len == maxBodiesFetch:
fetchBodies()
# skip requesting empty bodies
for h in workItem.headers:
if h.blockNumber != nextIndex:
raise newException(CatchableError,
"The block numbers are not in sequence. Not processing this workItem.")
while hashes.len != 0:
fetchBodies()
nextIndex = nextIndex + 1
let req = h.txRoot != EMPTY_ROOT_HASH or
h.ommersHash != EMPTY_UNCLE_HASH
reqBodies.add(req)
if req: inc numRequest
if bodies.len == workItem.headers.len:
shallowCopy(workItem.bodies, bodies)
dataReceived = true
if numRequest > 0:
dataReceived = dataReceived and
await ctx.fetchBodies(peer, workItemIdx, reqBodies)
else:
warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len
# all bodies are empty
workItem.bodies.setLen(workItem.headers.len)
let peers = ctx.getPeers(peer)
if peers.len > 0:
var hashes = newSeqOfCap[NewBlockHashesAnnounce](workItem.headers.len)
for h in workItem.headers:
hashes.add NewBlockHashesAnnounce(
hash: h.blockHash,
number: h.blockNumber)
trace "broadcast block hashes", numPeers=peers.len, numHashes=hashes.len
await ctx.broadcastBlockHash(hashes, peers)
# fetchBodies can fail
dataReceived = dataReceived and true
else:
dataReceived = false
except TransportError:
debug "Transport got closed during obtainBlocksFromPeer"
debug "Transport got closed during obtainBlocksFromPeer",
peer
except CatchableError as e:
# the success case sets `dataReceived`, so we can just fall back to the
# failure path below. If we signal time-outs with exceptions such
# failures will be easier to handle.
debug "Exception in obtainBlocksFromPeer()", exc = e.name, err = e.msg
debug "Exception in obtainBlocksFromPeer()",
exc = e.name, err = e.msg, peer
var giveUpOnPeer = false
if dataReceived:
trace "Finished obtaining blocks", peer, numBlocks=workItem.headers.len
workItem.state = Received
if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK:
let res = ctx.returnWorkItem(workItemIdx)
if res != ValidationResult.OK:
trace "validation error"
giveUpOnPeer = true
else:
giveUpOnPeer = true
@ -329,11 +747,9 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} =
await peer.disconnect(SubprotocolReason)
except CatchableError:
discard
syncCtx.handleLostPeer()
ctx.handleLostPeer()
break
trace "Finished obtaining blocks", peer
proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
# Returns true if one of the peers acknowledges existence of the best block
# of another peer.
@ -370,7 +786,7 @@ proc randomTrustedPeer(ctx: FastSyncCtx): Peer =
if i == k: return
inc i
proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) {.async.} =
proc startSyncWithPeerImpl(ctx: FastSyncCtx, peer: Peer) {.async.} =
trace "Start sync", peer, trustedPeers = ctx.trustedPeers.len
if ctx.trustedPeers.len >= minPeersToStartSync:
# We have enough trusted peers. Validate new peer against trusted
@ -414,26 +830,71 @@ proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) {.async.} =
for p in ctx.trustedPeers:
asyncSpawn ctx.obtainBlocksFromPeer(p)
proc onPeerConnected(ctx: FastSyncCtx, peer: Peer) =
trace "New candidate for sync", peer
proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) =
try:
let f = ctx.startSyncWithPeer(peer)
let
db = ctx.chain.db
header = db.getBlockHeader(ctx.finalizedBlock)
ourTD = db.getScore(header.blockHash)
peerTD = peer.state(eth).bestDifficulty
if peerTD <= ourTD:
# do nothing if peer have same height
if peerTD < ourTD:
trace "Peer have lower TD, become recipient",
peer, ourTD, peerTD
asyncSpawn ctx.sendBlockOrHash(peer)
return
ctx.busyPeers.incl(peer)
let f = ctx.startSyncWithPeerImpl(peer)
f.callback = proc(data: pointer) {.gcsafe.} =
if f.failed:
if f.error of TransportError:
debug "Transport got closed during startSyncWithPeer"
else:
error "startSyncWithPeer failed", msg = f.readError.msg, peer
ctx.busyPeers.excl(peer)
asyncSpawn f
except TransportError:
debug "Transport got closed during startSyncWithPeer"
except CatchableError as e:
debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg
proc startObtainBlocks(ctx: FastSyncCtx, peer: Peer) =
# simpler version of startSyncWithPeer
try:
ctx.busyPeers.incl(peer)
let f = ctx.obtainBlocksFromPeer(peer)
f.callback = proc(data: pointer) {.gcsafe.} =
if f.failed:
if f.error of TransportError:
debug "Transport got closed during startObtainBlocks"
else:
error "startObtainBlocks failed", msg = f.readError.msg, peer
ctx.busyPeers.excl(peer)
asyncSpawn f
except TransportError:
debug "Transport got closed during startObtainBlocks"
except CatchableError as e:
debug "Exception in startObtainBlocks()", exc = e.name, err = e.msg
proc onPeerConnected(ctx: FastSyncCtx, peer: Peer) =
trace "New candidate for sync", peer
ctx.startSyncWithPeer(peer)
proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) =
trace "peer disconnected ", peer = p
ctx.trustedPeers.excl(p)
ctx.busyPeers.excl(p)
ctx.knownByPeer.del(p)
# ------------------------------------------------------------------------------
# Public constructor/destructor
# ------------------------------------------------------------------------------
proc new*(T: type FastSyncCtx; ethNode: EthereumNode; chain: Chain): T
{.gcsafe, raises:[Defect,CatchableError].} =
@ -450,14 +911,184 @@ proc start*(ctx: FastSyncCtx) =
## Code for the fast blockchain sync procedure:
## <https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads>_
## <https://github.com/ethereum/go-ethereum/pull/1889__
try:
var blockHash: Hash256
let
db = ctx.chain.db
ttd = db.ttd()
if not db.getBlockHash(ctx.finalizedBlock, blockHash):
debug "FastSync.start: Failed to get blockHash",
number=ctx.finalizedBlock
return
let td = db.getScore(blockHash)
if td > ttd:
debug "Fast sync is disabled after POS merge"
return
info "Fast Sync: start sync from",
number=ctx.finalizedBlock,
hash=blockHash
except CatchableError as e:
debug "Exception in FastSync.start()",
exc = e.name, err = e.msg
var po = PeerObserver(
onPeerConnected:
proc(p: Peer) {.gcsafe.} =
ctx.onPeerConnected(p),
ctx.onPeerConnected(p),
onPeerDisconnected:
proc(p: Peer) {.gcsafe.} =
ctx.onPeerDisconnected(p))
po.setProtocol eth
ctx.peerPool.addObserver(ctx, po)
# ------------------------------------------------------------------------------
# Public procs: eth wire protocol handlers
# ------------------------------------------------------------------------------
proc handleNewBlockHashes(ctx: FastSyncCtx,
peer: Peer,
hashes: openArray[NewBlockHashesAnnounce]) {.
gcsafe, raises: [Defect, CatchableError].} =
trace trEthRecvNewBlockHashes,
numHash=hashes.len
if hashes.len == 0:
return
var number = hashes[0].number
if hashes.len > 1:
for i in 1..<hashes.len:
let val = hashes[i]
if val.number != number + 1.toBlockNumber:
error "Found a gap in block hashes"
return
number = val.number
number = hashes[^1].number
if number <= ctx.endBlockNumber:
trace "Will not set new synctarget",
newSyncHeight=number,
endBlockNumber=ctx.endBlockNumber,
peer
return
# don't send back hashes to this peer
for val in hashes:
discard ctx.addToKnownByPeer(val.hash, peer)
# set new sync target, + 1'u means including last block
let numBlocks = (number - hashes[0].number).truncate(uint) + 1'u
ctx.appendWorkItem(hashes[0].hash, hashes[0].number, numBlocks)
ctx.endBlockNumber = number
trace "New sync target height", number
if ctx.busyPeers.len > 0:
# do nothing. busy peers will keep syncing
# until new sync target reached
trace "sync using busyPeers",
len=ctx.busyPeers.len
return
if ctx.trustedPeers.len == 0:
trace "sync with this peer"
ctx.startObtainBlocks(peer)
else:
trace "sync with random peer"
let peer = ctx.randomTrustedPeer()
ctx.startSyncWithPeer(peer)
proc handleNewBlock(ctx: FastSyncCtx,
peer: Peer,
blk: EthBlock,
totalDifficulty: DifficultyInt) {.
gcsafe, raises: [Defect, CatchableError].} =
trace trEthRecvNewBlock,
number=blk.header.blockNumber,
hash=short(blk.header.blockHash)
if ctx.lastCleanup - getTime() > CleanupInterval:
ctx.cleanupKnownByPeer()
# Don't send NEW_BLOCK announcement to peer that sent original new block message
discard ctx.addToKnownByPeer(blk.header.blockHash, peer)
if blk.header.blockNumber > ctx.finalizedBlock + 1.toBlockNumber:
# If the block number exceeds one past our height we cannot validate it
trace "NewBlock got block past our height",
number=blk.header.blockNumber
return
if not ctx.validateHeader(blk.header):
error "invalid header from peer",
peer, hash=short(blk.header.blockHash)
return
# Send NEW_BLOCK to square root of total number of peers in pool
# https://github.com/ethereum/devp2p/blob/master/caps/eth.md#block-propagation
let
numPeersToShareWith = sqrt(ctx.peerPool.len.float32).int
peers = ctx.getPeers(peer)
debug "num peers to share with",
number=numPeersToShareWith,
numPeers=peers.len
if peers.len > 0 and numPeersToShareWith > 0:
asyncSpawn ctx.broadcastBlock(blk, peers[0..<numPeersToShareWith])
var parentHash: Hash256
if not ctx.chain.db.getBlockHash(ctx.finalizedBlock, parentHash):
error "failed to get parent hash",
number=ctx.finalizedBlock
return
if parentHash == blk.header.parentHash:
# If new block is child of current chain tip, insert new block into chain
let body = BlockBody(
transactions: blk.txs,
uncles: blk.uncles
)
let res = ctx.chain.persistBlocks([blk.header], [body])
# Check if new sync target height can be set
if res == ValidationResult.OK:
ctx.endBlockNumber = blk.header.blockNumber
ctx.finalizedBlock = blk.header.blockNumber
else:
# Call handleNewBlockHashes to retrieve all blocks between chain tip and new block
let newSyncHeight = NewBlockHashesAnnounce(
number: blk.header.blockNumber,
hash: blk.header.blockHash
)
ctx.handleNewBlockHashes(peer, [newSyncHeight])
if peers.len > 0 and numPeersToShareWith > 0:
# Send `NEW_BLOCK_HASHES` message for received block to all other peers
asyncSpawn ctx.broadcastBlockHash(blk, peers[numPeersToShareWith..^1])
proc newBlockHashesHandler*(arg: pointer,
peer: Peer,
hashes: openArray[NewBlockHashesAnnounce]) {.
gcsafe, raises: [Defect, CatchableError].} =
let ctx = cast[FastSyncCtx](arg)
ctx.handleNewBlockHashes(peer, hashes)
proc newBlockHandler*(arg: pointer,
peer: Peer,
blk: EthBlock,
totalDifficulty: DifficultyInt) {.
gcsafe, raises: [Defect, CatchableError].} =
let ctx = cast[FastSyncCtx](arg)
ctx.handleNewBlock(peer, blk, totalDifficulty)
# End

View File

@ -15,6 +15,27 @@ import
type
HashToTime = TableRef[Hash256, Time]
NewBlockHandler* = proc(
arg: pointer,
peer: Peer,
blk: EthBlock,
totalDifficulty: DifficultyInt) {.
gcsafe, raises: [Defect, CatchableError].}
NewBlockHashesHandler* = proc(
arg: pointer,
peer: Peer,
hashes: openArray[NewBlockHashesAnnounce]) {.
gcsafe, raises: [Defect, CatchableError].}
NewBlockHandlerPair = object
arg: pointer
handler: NewBlockHandler
NewBlockHashesHandlerPair = object
arg: pointer
handler: NewBlockHashesHandler
EthWireRef* = ref object of EthWireBase
db: BaseChainDB
chain: Chain
@ -25,6 +46,8 @@ type
pending: HashSet[Hash256]
lastCleanup: Time
merger: MergerRef
newBlockHandler: NewBlockHandlerPair
newBlockHashesHandler: NewBlockHashesHandlerPair
ReconnectRef = ref object
pool: PeerPool
@ -35,9 +58,63 @@ const
POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20)
PEER_LONG_BANTIME = chronos.minutes(150)
# ------------------------------------------------------------------------------
# Private functions: helper functions
# ------------------------------------------------------------------------------
proc notEnabled(name: string) =
debug "Wire handler method is disabled", meth = name
proc notImplemented(name: string) =
debug "Wire handler method not implemented", meth = name
proc inPool(ctx: EthWireRef, txHash: Hash256): bool =
let res = ctx.txPool.getItem(txHash)
res.isOk
proc inPoolAndOk(ctx: EthWireRef, txHash: Hash256): bool =
let res = ctx.txPool.getItem(txHash)
if res.isErr: return false
res.get().reject == txInfoOk
proc successorHeader(db: BaseChainDB,
h: BlockHeader,
output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber <= (not 0.toBlockNumber) - offset:
result = db.getBlockHeader(h.blockNumber + offset, output)
proc ancestorHeader(db: BaseChainDB,
h: BlockHeader,
output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber >= offset:
result = db.getBlockHeader(h.blockNumber - offset, output)
proc blockHeader(db: BaseChainDB,
b: HashOrNum,
output: var BlockHeader): bool
{.gcsafe, raises: [Defect,RlpError].} =
if b.isHash:
db.getBlockHeader(b.hash, output)
else:
db.getBlockHeader(b.number, output)
# ------------------------------------------------------------------------------
# Private functions: peers related functions
# ------------------------------------------------------------------------------
proc hash(peer: Peer): hashes.Hash =
hash(peer.remote)
proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] =
# do not send back tx or txhash to thisPeer
for peer in peers(ctx.peerPool):
if peer != thisPeer:
result.add peer
proc banExpiredReconnect(arg: pointer) {.gcsafe, raises: [Defect].} =
# Reconnect to peer after ban period if pool is empty
try:
@ -86,6 +163,14 @@ proc cleanupKnownByPeer(ctx: EthWireRef) =
map.del(hash)
tmp.clear()
var tmpPeer = initHashSet[Peer]()
for peer, map in ctx.knownByPeer:
if map.len == 0:
tmpPeer.incl peer
for peer in tmpPeer:
ctx.knownByPeer.del peer
ctx.lastCleanup = now
proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) =
@ -97,7 +182,10 @@ proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer)
if txHash notin map:
map[txHash] = getTime()
proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer, newHashes: var seq[Hash256]) =
proc addToKnownByPeer(ctx: EthWireRef,
txHashes: openArray[Hash256],
peer: Peer,
newHashes: var seq[Hash256]) =
var map: HashToTime
if not ctx.knownByPeer.take(peer, map):
map = newTable[Hash256, Time]()
@ -108,6 +196,10 @@ proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer,
map[txHash] = getTime()
newHashes.add txHash
# ------------------------------------------------------------------------------
# Private functions: async workers
# ------------------------------------------------------------------------------
proc sendNewTxHashes(ctx: EthWireRef,
txHashes: seq[Hash256],
peers: seq[Peer]): Future[void] {.async.} =
@ -144,20 +236,49 @@ proc sendTransactions(ctx: EthWireRef,
except CatchableError as e:
debug "Exception in sendTransactions", exc = e.name, err = e.msg
proc inPool(ctx: EthWireRef, txHash: Hash256): bool =
let res = ctx.txPool.getItem(txHash)
res.isOk
proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} =
debug "fetchTx: requesting txs",
number = reqHashes.len
proc inPoolAndOk(ctx: EthWireRef, txHash: Hash256): bool =
let res = ctx.txPool.getItem(txHash)
if res.isErr: return false
res.get().reject == txInfoOk
try:
proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] =
# do not send back tx or txhash to thisPeer
for peer in peers(ctx.peerPool):
if peer != thisPeer:
result.add peer
let res = await peer.getPooledTransactions(reqHashes)
if res.isNone:
error "not able to get pooled transactions"
return
let txs = res.get()
debug "fetchTx: received requested txs",
number = txs.transactions.len
# Remove from pending list regardless if tx is in result
for tx in txs.transactions:
let txHash = rlpHash(tx)
ctx.pending.excl txHash
ctx.txPool.jobAddTxs(txs.transactions)
except TransportError:
debug "Transport got closed during fetchTransactions"
return
except CatchableError as e:
debug "Exception in fetchTransactions", exc = e.name, err = e.msg
return
var newTxHashes = newSeqOfCap[Hash256](reqHashes.len)
for txHash in reqHashes:
if ctx.inPoolAndOk(txHash):
newTxHashes.add txHash
let peers = ctx.getPeers(peer)
if peers.len == 0 or newTxHashes.len == 0:
return
await ctx.sendNewTxHashes(newTxHashes, peers)
# ------------------------------------------------------------------------------
# Private functions: peer observer
# ------------------------------------------------------------------------------
proc onPeerConnected(ctx: EthWireRef, peer: Peer) =
if ctx.disableTxPool:
@ -192,6 +313,10 @@ proc setupPeerObserver(ctx: EthWireRef) =
po.setProtocol eth
ctx.peerPool.addObserver(ctx, po)
# ------------------------------------------------------------------------------
# Public constructor/destructor
# ------------------------------------------------------------------------------
proc new*(_: type EthWireRef,
chain: Chain,
txPool: TxPoolRef,
@ -209,11 +334,25 @@ proc new*(_: type EthWireRef,
ctx.setupPeerObserver()
ctx
proc notEnabled(name: string) =
debug "Wire handler method is disabled", meth = name
# ------------------------------------------------------------------------------
# Public functions: callbacks setters
# ------------------------------------------------------------------------------
proc notImplemented(name: string) =
debug "Wire handler method not implemented", meth = name
proc setNewBlockHandler*(ctx: EthWireRef, handler: NewBlockHandler, arg: pointer) =
ctx.newBlockHandler = NewBlockHandlerPair(
arg: arg,
handler: handler
)
proc setNewBlockHashesHandler*(ctx: EthWireRef, handler: NewBlockHashesHandler, arg: pointer) =
ctx.newBlockHashesHandler = NewBlockHashesHandlerPair(
arg: arg,
handler: handler
)
# ------------------------------------------------------------------------------
# Public functions: eth wire protocol handlers
# ------------------------------------------------------------------------------
proc txPoolEnabled*(ctx: EthWireRef; ena: bool) =
ctx.disableTxPool = not ena
@ -265,31 +404,6 @@ method getBlockBodies*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[BlockBo
result.add BlockBody()
trace "handlers.getBlockBodies: blockBody not found", blockHash
proc successorHeader(db: BaseChainDB,
h: BlockHeader,
output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber <= (not 0.toBlockNumber) - offset:
result = db.getBlockHeader(h.blockNumber + offset, output)
proc ancestorHeader*(db: BaseChainDB,
h: BlockHeader,
output: var BlockHeader,
skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} =
let offset = 1 + skip.toBlockNumber
if h.blockNumber >= offset:
result = db.getBlockHeader(h.blockNumber - offset, output)
proc blockHeader(db: BaseChainDB,
b: HashOrNum,
output: var BlockHeader): bool
{.gcsafe, raises: [Defect,RlpError].} =
if b.isHash:
db.getBlockHeader(b.hash, output)
else:
db.getBlockHeader(b.number, output)
method getBlockHeaders*(ctx: EthWireRef, req: BlocksRequest): seq[BlockHeader] {.gcsafe.} =
let db = ctx.db
var foundBlock: BlockHeader
@ -348,46 +462,6 @@ method handleAnnouncedTxs*(ctx: EthWireRef, peer: Peer, txs: openArray[Transacti
asyncSpawn ctx.sendNewTxHashes(newTxHashes, peers[sendFull..^1])
proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} =
debug "fetchTx: requesting txs",
number = reqHashes.len
try:
let res = await peer.getPooledTransactions(reqHashes)
if res.isNone:
error "not able to get pooled transactions"
return
let txs = res.get()
debug "fetchTx: received requested txs",
number = txs.transactions.len
# Remove from pending list regardless if tx is in result
for tx in txs.transactions:
let txHash = rlpHash(tx)
ctx.pending.excl txHash
ctx.txPool.jobAddTxs(txs.transactions)
except TransportError:
debug "Transport got closed during fetchTransactions"
return
except CatchableError as e:
debug "Exception in fetchTransactions", exc = e.name, err = e.msg
return
var newTxHashes = newSeqOfCap[Hash256](reqHashes.len)
for txHash in reqHashes:
if ctx.inPoolAndOk(txHash):
newTxHashes.add txHash
let peers = ctx.getPeers(peer)
if peers.len == 0 or newTxHashes.len == 0:
return
await ctx.sendNewTxHashes(newTxHashes, peers)
method handleAnnouncedTxsHashes*(ctx: EthWireRef, peer: Peer, txHashes: openArray[Hash256]) {.gcsafe.} =
if ctx.disableTxPool:
when trMissingOrDisabledGossipOk:
@ -426,6 +500,12 @@ method handleNewBlock*(ctx: EthWireRef, peer: Peer, blk: EthBlock, totalDifficul
asyncSpawn banPeer(ctx.peerPool, peer, PEER_LONG_BANTIME)
return
if not ctx.newBlockHandler.handler.isNil:
ctx.newBlockHandler.handler(
ctx.newBlockHandler.arg,
peer, blk, totalDifficulty
)
method handleNewBlockHashes*(ctx: EthWireRef, peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) {.gcsafe.} =
if ctx.merger.posFinalized:
debug "Dropping peer for sending NewBlockHashes after merge (EIP-3675)",
@ -433,6 +513,13 @@ method handleNewBlockHashes*(ctx: EthWireRef, peer: Peer, hashes: openArray[NewB
asyncSpawn banPeer(ctx.peerPool, peer, PEER_LONG_BANTIME)
return
if not ctx.newBlockHashesHandler.handler.isNil:
ctx.newBlockHashesHandler.handler(
ctx.newBlockHashesHandler.arg,
peer,
hashes
)
when defined(legacy_eth66_enabled):
method getStorageNodes*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[Blob] {.gcsafe.} =
let db = ctx.db.db

View File

@ -64,9 +64,14 @@ const
trEthSendDelaying* =
">> " & prettyEthProtoName & " Delaying "
func toHex(hash: Hash256): string =
## Shortcut for `byteutils.toHex(hash.data)`
hash.data.toHex
trEthRecvNewBlock* =
"<< " & prettyEthProtoName & " Received NewBlock"
trEthRecvNewBlockHashes* =
"<< " & prettyEthProtoName & " Received NewBlockHashes"
trEthSendNewBlock* =
">> " & prettyEthProtoName & " Sending NewBlock"
trEthSendNewBlockHashes* =
">> " & prettyEthProtoName & " Sending NewBlockHashes"
p2pProtocol eth66(version = ethVersion,
rlpxName = "eth",
@ -82,9 +87,9 @@ p2pProtocol eth66(version = ethVersion,
trace trEthSendSending & "Status (0x00)", peer,
td = status.totalDifficulty,
bestHash = status.bestBlockHash,
bestHash = short(status.bestBlockHash),
networkId = network.networkId,
genesis = status.genesisHash,
genesis = short(status.genesisHash),
forkHash = status.forkId.forkHash.toHex,
forkNext = status.forkId.forkNext
@ -113,7 +118,7 @@ p2pProtocol eth66(version = ethVersion,
if m.genesisHash != status.genesisHash:
trace "Peer for a different network (genesisHash)", peer,
expectGenesis=status.genesisHash, gotGenesis=m.genesisHash
expectGenesis=short(status.genesisHash), gotGenesis=short(m.genesisHash)
raise newException(
UselessPeerError, "Eth handshake for different network")
@ -132,7 +137,7 @@ p2pProtocol eth66(version = ethVersion,
genesisHash: Hash256,
forkId: ChainForkId) =
trace trEthRecvReceived & "Status (0x00)", peer,
networkId, totalDifficulty, bestHash, genesisHash,
networkId, totalDifficulty, bestHash=short(bestHash), genesisHash=short(genesisHash),
forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext
# User message 0x01: NewBlockHashes.

View File

@ -64,9 +64,14 @@ const
trEthSendDelaying* =
">> " & prettyEthProtoName & " Delaying "
func toHex(hash: Hash256): string =
## Shortcut for `byteutils.toHex(hash.data)`
hash.data.toHex
trEthRecvNewBlock* =
"<< " & prettyEthProtoName & " Received NewBlock"
trEthRecvNewBlockHashes* =
"<< " & prettyEthProtoName & " Received NewBlockHashes"
trEthSendNewBlock* =
">> " & prettyEthProtoName & " Sending NewBlock"
trEthSendNewBlockHashes* =
">> " & prettyEthProtoName & " Sending NewBlockHashes"
p2pProtocol eth67(version = ethVersion,
rlpxName = "eth",
@ -82,9 +87,9 @@ p2pProtocol eth67(version = ethVersion,
trace trEthSendSending & "Status (0x00)", peer,
td = status.totalDifficulty,
bestHash = status.bestBlockHash,
bestHash = short(status.bestBlockHash),
networkId = network.networkId,
genesis = status.genesisHash,
genesis = short(status.genesisHash),
forkHash = status.forkId.forkHash.toHex,
forkNext = status.forkId.forkNext
@ -100,7 +105,7 @@ p2pProtocol eth67(version = ethVersion,
trace "Handshake: Local and remote networkId",
local=network.networkId, remote=m.networkId
trace "Handshake: Local and remote genesisHash",
local=status.genesisHash, remote=m.genesisHash
local=short(status.genesisHash), remote=short(m.genesisHash)
trace "Handshake: Local and remote forkId",
local=(status.forkId.forkHash.toHex & "/" & $status.forkId.forkNext),
remote=(m.forkId.forkHash.toHex & "/" & $m.forkId.forkNext)
@ -113,7 +118,7 @@ p2pProtocol eth67(version = ethVersion,
if m.genesisHash != status.genesisHash:
trace "Peer for a different network (genesisHash)", peer,
expectGenesis=status.genesisHash, gotGenesis=m.genesisHash
expectGenesis=short(status.genesisHash), gotGenesis=short(m.genesisHash)
raise newException(
UselessPeerError, "Eth handshake for different network")
@ -132,7 +137,7 @@ p2pProtocol eth67(version = ethVersion,
genesisHash: Hash256,
forkId: ChainForkId) =
trace trEthRecvReceived & "Status (0x00)", peer,
networkId, totalDifficulty, bestHash, genesisHash,
networkId, totalDifficulty, bestHash=short(bestHash), genesisHash=short(genesisHash),
forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext
# User message 0x01: NewBlockHashes.