diff --git a/nimbus/nimbus.nim b/nimbus/nimbus.nim index fb1361b56..7ef350509 100644 --- a/nimbus/nimbus.nim +++ b/nimbus/nimbus.nim @@ -416,7 +416,23 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) = if ProtocolFlag.Eth in protocols and conf.maxPeers > 0: case conf.syncMode: of SyncMode.Default: - FastSyncCtx.new(nimbus.ethNode, nimbus.chainRef).start + let syncer = FastSyncCtx.new(nimbus.ethNode, nimbus.chainRef) + syncer.start + + let wireHandler = EthWireRef( + nimbus.ethNode.protocolState(eth) + ) + + wireHandler.setNewBlockHandler( + fast.newBlockHandler, + cast[pointer](syncer) + ) + + wireHandler.setNewBlockHashesHandler( + fast.newBlockHashesHandler, + cast[pointer](syncer) + ) + of SyncMode.Full, SyncMode.Snap: discard diff --git a/nimbus/sync/fast.nim b/nimbus/sync/fast.nim index b6ac4883c..c10e6ed52 100644 --- a/nimbus/sync/fast.nim +++ b/nimbus/sync/fast.nim @@ -9,14 +9,18 @@ # except according to those terms. import - std/[sets, options, random, hashes, sequtils], + std/[sets, options, random, + hashes, sequtils, math, tables, times], chronicles, chronos, eth/[common, p2p], eth/p2p/[private/p2p_types, peer_pool], stew/byteutils, "."/[protocol, types], - ../p2p/chain + ../p2p/[chain, clique/clique_sealer, gaslimit], + ../db/db_chain, + ../utils/difficulty, + ".."/[constants, utils] {.push raises:[Defect].} @@ -26,6 +30,7 @@ logScope: const minPeersToStartSync* = 2 # Wait for consensus of at least this # number of peers before syncing + CleanupInterval = initDuration(minutes = 20) type #SyncStatus = enum @@ -33,6 +38,8 @@ type # syncNotEnoughPeers # syncTimeOut + HashToTime = TableRef[Hash256, Time] + BlockchainSyncDefect* = object of Defect ## Catch and relay exception @@ -43,6 +50,8 @@ type Persisted WantedBlocks = object + isHash: bool + hash: Hash256 startIndex: BlockNumber numBlocks: uint state: WantedBlocksState @@ -57,9 +66,306 @@ type peerPool: PeerPool trustedPeers: HashSet[Peer] hasOutOfOrderBlocks: bool + busyPeers: HashSet[Peer] + knownByPeer: Table[Peer, HashToTime] + lastCleanup: Time + +# ------------------------------------------------------------------------------ +# Private functions: peers related functions +# ------------------------------------------------------------------------------ proc hash*(p: Peer): Hash = hash(cast[pointer](p)) +proc cleanupKnownByPeer(ctx: FastSyncCtx) = + let now = getTime() + var tmp = initHashSet[Hash256]() + for _, map in ctx.knownByPeer: + for hash, time in map: + if time - now >= CleanupInterval: + tmp.incl hash + for hash in tmp: + map.del(hash) + tmp.clear() + + var tmpPeer = initHashSet[Peer]() + for peer, map in ctx.knownByPeer: + if map.len == 0: + tmpPeer.incl peer + + for peer in tmpPeer: + ctx.knownByPeer.del peer + + ctx.lastCleanup = now + +proc addToKnownByPeer(ctx: FastSyncCtx, + blockHash: Hash256, + peer: Peer): bool = + var map: HashToTime + if not ctx.knownByPeer.take(peer, map): + map = newTable[Hash256, Time]() + result = false + else: + result = true + + map[blockHash] = getTime() + +proc getPeers(ctx: FastSyncCtx, thisPeer: Peer): seq[Peer] = + # do not send back block/blockhash to thisPeer + for peer in peers(ctx.peerPool): + if peer != thisPeer: + result.add peer + +proc handleLostPeer(ctx: FastSyncCtx) = + # TODO: ask the PeerPool for new connections and then call + # `obtainBlocksFromPeer` + discard + +# ------------------------------------------------------------------------------ +# Private functions: validators +# ------------------------------------------------------------------------------ + +proc validateDifficulty(ctx: FastSyncCtx, header, parentHeader: BlockHeader): bool = + try: + if ctx.chain.isBlockAfterTtd(header): + if header.difficulty != 0.u256: + trace "invalid difficulty", + expect=0, get=header.difficulty + return false + return true + + let + db = ctx.chain.db + config = db.config + + if config.poaEngine: + let rc = ctx.chain.clique.calcDifficulty(parentHeader) + if rc.isErr: + return false + if header.difficulty < rc.get(): + trace "provided header difficulty is too low", + expect=rc.get(), get=header.difficulty + return false + return true + + let calcDiffc = config.calcDifficulty(header.timestamp, parentHeader) + if header.difficulty < calcDiffc: + trace "provided header difficulty is too low", + expect=calcDiffc, get=header.difficulty + return false + return true + + except CatchableError as e: + error "Exception in FastSync.validateDifficulty()", + exc = e.name, err = e.msg + return false + +proc validateHeader(ctx: FastSyncCtx, header: BlockHeader, height = none(BlockNumber)): bool = + if header.parentHash == GENESIS_PARENT_HASH: + return true + + let + db = ctx.chain.db + config = db.config + + var parentHeader: BlockHeader + if not db.getBlockHeader(header.parentHash, parentHeader): + error "can't get parentHeader", + hash=header.parentHash, number=header.blockNumber + return false + + if header.blockNumber != parentHeader.blockNumber + 1.toBlockNumber: + trace "invalid block number", + expect=parentHeader.blockNumber + 1.toBlockNumber, + get=header.blockNumber + return false + + if header.timestamp <= parentHeader.timestamp: + trace "invalid timestamp", + parent=parentHeader.timestamp, + header=header.timestamp + return false + + if not ctx.validateDifficulty(header, parentHeader): + return false + + if config.poaEngine: + let period = initDuration(seconds = config.cliquePeriod) + # Timestamp diff between blocks is lower than PERIOD (clique) + if parentHeader.timestamp + period > header.timestamp: + trace "invalid timestamp diff (lower than period)", + parent=parentHeader.timestamp, + header=header.timestamp, + period + return false + + let res = db.validateGasLimitOrBaseFee(header, parentHeader) + if res.isErr: + trace "validate gaslimit error", + msg=res.error + return false + + if height.isSome: + let dif = height.get() - parentHeader.blockNumber + if not (dif < 8.toBlockNumber and dif > 1.toBlockNumber): + trace "uncle block has a parent that is too old or too young", + dif=dif, + height=height.get(), + parentNumber=parentHeader.blockNumber + return false + + return true + +# ------------------------------------------------------------------------------ +# Private functions: sync worker +# ------------------------------------------------------------------------------ + +proc broadcastBlockHash(ctx: FastSyncCtx, hashes: seq[NewBlockHashesAnnounce], peers: seq[Peer]) {.async.} = + try: + + var bha = newSeqOfCap[NewBlockHashesAnnounce](hashes.len) + for peer in peers: + for val in hashes: + let alreadyKnownByPeer = ctx.addToKnownByPeer(val.hash, peer) + if not alreadyKnownByPeer: + bha.add val + if bha.len > 0: + trace trEthSendNewBlockHashes, numHashes=bha.len, peer + await peer.newBlockHashes(bha) + bha.setLen(0) + + except TransportError: + debug "Transport got closed during broadcastBlockHash" + except CatchableError as e: + debug "Exception in broadcastBlockHash", exc = e.name, err = e.msg + +proc broadcastBlock(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.async.} = + try: + + let + db = ctx.chain.db + blockHash = blk.header.blockHash + td = db.getScore(blk.header.parentHash) + + blk.header.difficulty + + for peer in peers: + let alreadyKnownByPeer = ctx.addToKnownByPeer(blockHash, peer) + if not alreadyKnownByPeer: + trace trEthSendNewBlock, + number=blk.header.blockNumber, td=td, + hash=short(blockHash), peer + await peer.newBlock(blk, td) + + except TransportError: + debug "Transport got closed during broadcastBlock" + except CatchableError as e: + debug "Exception in broadcastBlock", exc = e.name, err = e.msg + +proc broadcastBlockHash(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.async.} = + try: + + let bha = NewBlockHashesAnnounce( + number: blk.header.blockNumber, + hash: blk.header.blockHash + ) + + for peer in peers: + let alreadyKnownByPeer = ctx.addToKnownByPeer(bha.hash, peer) + if not alreadyKnownByPeer: + trace trEthSendNewBlockHashes, + number=bha.number, hash=short(bha.hash), peer + await peer.newBlockHashes([bha]) + + except TransportError: + debug "Transport got closed during broadcastBlockHash" + except CatchableError as e: + debug "Exception in broadcastBlockHash", exc = e.name, err = e.msg + +proc sendBlockOrHash(ctx: FastSyncCtx, peer: Peer) {.async.} = + # because peer TD is lower than us, + # it become our recipient of block and block hashes + # instead of we download from it + + try: + let + db = ctx.chain.db + peerBlockHash = peer.state(eth).bestBlockHash + + var peerBlockHeader: BlockHeader + if not db.getBlockHeader(peerBlockHash, peerBlockHeader): + error "can't get block header", hash=short(peerBlockHash) + return + + let + dist = (ctx.finalizedBlock - peerBlockHeader.blockNumber).truncate(int) + start = peerBlockHeader.blockNumber + 1.toBlockNumber + + if dist == 1: + # only one block apart, send NewBlock + let number = ctx.finalizedBlock + var header: BlockHeader + var body: BlockBody + if not db.getBlockHeader(number, header): + error "can't get block header", number=number + return + + let blockHash = header.blockHash + if not db.getBlockBody(blockHash, body): + error "can't get block body", number=number + return + + let + ourTD = db.getScore(blockHash) + newBlock = EthBlock( + header: header, + txs: body.transactions, + uncles: body.uncles) + + trace "send newBlock", + number = header.blockNumber, + hash = short(header.blockHash), + td = ourTD, peer + await peer.newBlock(newBlock, ourTD) + return + + if dist > maxHeadersFetch: + # distance is too far, ignore this peer + return + + # send hashes in batch + var + hash: Hash256 + number = 0 + hashes = newSeqOfCap[NewBlockHashesAnnounce](maxHeadersFetch) + + while number < dist: + let blockNumber = start + number.toBlockNumber + if not db.getBlockHash(blockNumber, hash): + error "failed to get block hash", number=blockNumber + return + + hashes.add(NewBlockHashesAnnounce( + number: blockNumber, + hash: hash)) + + if hashes.len == maxHeadersFetch: + trace "send newBlockHashes(batch)", numHashes=hashes.len, peer + await peer.newBlockHashes(hashes) + hashes.setLen(0) + inc number + + # send the rest of hashes if available + if hashes.len > 0: + trace "send newBlockHashes(remaining)", numHashes=hashes.len, peer + await peer.newBlockHashes(hashes) + + except TransportError: + debug "Transport got closed during obtainBlocksFromPeer" + except CatchableError as e: + debug "Exception in getBestBlockNumber()", exc = e.name, err = e.msg + # no need to exit here, because the context might still have blocks to fetch + # from this peer + discard e + proc endIndex(b: WantedBlocks): BlockNumber = result = b.startIndex result += (b.numBlocks - 1).toBlockNumber @@ -90,9 +396,10 @@ proc availableWorkItem(ctx: FastSyncCtx): int = maxPendingBlock = endBlock let nextRequestedBlock = maxPendingBlock + 1 + # If this next block doesn't exist yet according to any of our peers, don't # return a work item (and sync will be stopped). - if nextRequestedBlock >= ctx.endBlockNumber: + if nextRequestedBlock > ctx.endBlockNumber: return -1 # Increase queue when there are no free (Initial / Persisted) work items in @@ -103,10 +410,36 @@ proc availableWorkItem(ctx: FastSyncCtx): int = # Create new work item when queue was increased, reset when selected work item # is at Persisted state. - var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).truncate(int) + var numBlocks = (ctx.endBlockNumber - nextRequestedBlock).truncate(int) + 1 + if numBlocks > maxHeadersFetch: numBlocks = maxHeadersFetch - ctx.workQueue[result] = WantedBlocks(startIndex: nextRequestedBlock, numBlocks: numBlocks.uint, state: Initial) + ctx.workQueue[result] = WantedBlocks( + startIndex: nextRequestedBlock, + numBlocks : numBlocks.uint, + state : Initial, + isHash : false) + +proc appendWorkItem(ctx: FastSyncCtx, hash: Hash256, + startIndex: BlockNumber, numBlocks: uint) = + for i in 0 .. ctx.workQueue.high: + if ctx.workQueue[i].state == Persisted: + ctx.workQueue[i] = WantedBlocks( + isHash : true, + hash : hash, + startIndex: startIndex, + numBlocks : numBlocks, + state : Initial) + return + + let i = ctx.workQueue.len + ctx.workQueue.setLen(i + 1) + ctx.workQueue[i] = WantedBlocks( + isHash : true, + hash : hash, + startIndex: startIndex, + numBlocks : numBlocks, + state : Initial) proc persistWorkItem(ctx: FastSyncCtx, wi: var WantedBlocks): ValidationResult {.gcsafe, raises:[Defect,CatchableError].} = @@ -204,11 +537,6 @@ proc returnWorkItem(ctx: FastSyncCtx, workItem: int): ValidationResult receivedBlocks return ValidationResult.Error -proc handleLostPeer(ctx: FastSyncCtx) = - # TODO: ask the PeerPool for new connections and then call - # `obtainBlocksFromPeer` - discard - proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} = let request = BlocksRequest( startBlock: HashOrNum(isHash: true, @@ -228,13 +556,104 @@ proc getBestBlockNumber(p: Peer): Future[BlockNumber] {.async.} = count=latestBlock.get.headers.len, blockNumber=(if latestBlock.get.headers.len > 0: $result else: "missing") -proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} = +proc toRequest(workItem: WantedBlocks): BlocksRequest = + if workItem.isHash: + BlocksRequest( + startBlock: HashOrNum(isHash: true, hash: workItem.hash), + maxResults: workItem.numBlocks, + skip: 0, + reverse: false) + else: + BlocksRequest( + startBlock: HashOrNum(isHash: false, number: workItem.startIndex), + maxResults: workItem.numBlocks, + skip: 0, + reverse: false) + +type + BodyHash = object + txRoot: Hash256 + uncleHash: Hash256 + +proc hash*(x: BodyHash): Hash = + var h: Hash = 0 + h = h !& hash(x.txRoot.data) + h = h !& hash(x.uncleHash.data) + result = !$h + +proc fetchBodies(ctx: FastSyncCtx, peer: Peer, + workItemIdx: int, reqBodies: seq[bool]): Future[bool] {.async.} = + template workItem: auto = ctx.workQueue[workItemIdx] + var bodies = newSeqOfCap[BlockBody](workItem.headers.len) + var hashes = newSeqOfCap[KeccakHash](maxBodiesFetch) + + doAssert(reqBodies.len == workItem.headers.len) + + template fetchBodies() = + trace trEthSendSendingGetBlockBodies, peer, + hashes=hashes.len + let b = await peer.getBlockBodies(hashes) + if b.isNone: + raise newException(CatchableError, "Was not able to get the block bodies") + let bodiesLen = b.get.blocks.len + trace trEthRecvReceivedBlockBodies, peer, + count=bodiesLen, requested=hashes.len + if bodiesLen == 0: + raise newException(CatchableError, "Zero block bodies received for request") + elif bodiesLen < hashes.len: + hashes.delete(0, bodiesLen - 1) + elif bodiesLen == hashes.len: + hashes.setLen(0) + else: + raise newException(CatchableError, "Too many block bodies received for request") + bodies.add(b.get.blocks) + + var numRequest = 0 + for i, h in workItem.headers: + if reqBodies[i]: + hashes.add(h.blockHash) + inc numRequest + + if hashes.len == maxBodiesFetch: + fetchBodies() + + while hashes.len != 0: + fetchBodies() + + workItem.bodies = newSeqOfCap[BlockBody](workItem.headers.len) + var bodyHashes = initTable[BodyHash, int]() + for z, body in bodies: + let bodyHash = BodyHash( + txRoot: calcTxRoot(body.transactions), + uncleHash: rlpHash(body.uncles)) + bodyHashes[bodyHash] = z + + for i, req in reqBodies: + if req: + let bodyHash = BodyHash( + txRoot: workItem.headers[i].txRoot, + uncleHash: workItem.headers[i].ommersHash) + let z = bodyHashes.getOrDefault(bodyHash, -1) + if z == -1: + error "header missing it's body", + number=workItem.headers[i].blockNumber, + hash=workItem.headers[i].blockHash.short + return false + workItem.bodies.add bodies[z] + else: + workItem.bodies.add BlockBody() + + return true + +proc obtainBlocksFromPeer(ctx: FastSyncCtx, peer: Peer) {.async.} = # Update our best block number try: - let bestBlockNumber = await peer.getBestBlockNumber() - if bestBlockNumber > syncCtx.endBlockNumber: - trace "New sync end block number", number = bestBlockNumber - syncCtx.endBlockNumber = bestBlockNumber + if ctx.endBlockNumber <= ctx.finalizedBlock: + # endBlockNumber need update + let bestBlockNumber = await peer.getBestBlockNumber() + if bestBlockNumber > ctx.endBlockNumber: + trace "New sync end block number", number = bestBlockNumber + ctx.endBlockNumber = bestBlockNumber except TransportError: debug "Transport got closed during obtainBlocksFromPeer" except CatchableError as e: @@ -243,19 +662,14 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} = # from this peer discard e - while (let workItemIdx = syncCtx.availableWorkItem(); workItemIdx != -1 and + while (let workItemIdx = ctx.availableWorkItem(); workItemIdx != -1 and peer.connectionState notin {Disconnecting, Disconnected}): - template workItem: auto = syncCtx.workQueue[workItemIdx] + template workItem: auto = ctx.workQueue[workItemIdx] workItem.state = Requested trace "Requesting block headers", start = workItem.startIndex, count = workItem.numBlocks, peer = peer.remote.node - let request = BlocksRequest( - startBlock: HashOrNum(isHash: false, number: workItem.startIndex), - maxResults: workItem.numBlocks, - skip: 0, - reverse: false) - - var dataReceived = false + let request = toRequest(workItem) + var dataReceived = true try: trace trEthSendSendingGetBlockHeaders, peer, startBlock=request.startBlock.number, max=request.maxResults, @@ -266,59 +680,63 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} = count=results.get.headers.len, requested=request.maxResults shallowCopy(workItem.headers, results.get.headers) - var bodies = newSeqOfCap[BlockBody](workItem.headers.len) - var hashes = newSeqOfCap[KeccakHash](maxBodiesFetch) - template fetchBodies() = - trace trEthSendSendingGetBlockBodies, peer, - hashes=hashes.len - let b = await peer.getBlockBodies(hashes) - if b.isNone: - raise newException( - CatchableError, "Was not able to get the block bodies") - let bodiesLen = b.get.blocks.len - trace trEthRecvReceivedBlockBodies, peer, - count=bodiesLen, requested=hashes.len - if bodiesLen == 0: - raise newException(CatchableError, "Zero block bodies received for request") - elif bodiesLen < hashes.len: - hashes.delete(0, bodiesLen - 1) - elif bodiesLen == hashes.len: - hashes.setLen(0) - else: - raise newException(CatchableError, "Too many block bodies received for request") - bodies.add(b.get.blocks) + var + reqBodies = newSeqOfCap[bool](workItem.headers.len) + numRequest = 0 + nextIndex = workItem.startIndex - var nextIndex = workItem.startIndex - for i in workItem.headers: - if i.blockNumber != nextIndex: - raise newException(CatchableError, "The block numbers are not in sequence. Not processing this workItem.") - else: - nextIndex = nextIndex + 1 - hashes.add(blockHash(i)) - if hashes.len == maxBodiesFetch: - fetchBodies() + # skip requesting empty bodies + for h in workItem.headers: + if h.blockNumber != nextIndex: + raise newException(CatchableError, + "The block numbers are not in sequence. Not processing this workItem.") - while hashes.len != 0: - fetchBodies() + nextIndex = nextIndex + 1 + let req = h.txRoot != EMPTY_ROOT_HASH or + h.ommersHash != EMPTY_UNCLE_HASH + reqBodies.add(req) + if req: inc numRequest - if bodies.len == workItem.headers.len: - shallowCopy(workItem.bodies, bodies) - dataReceived = true + if numRequest > 0: + dataReceived = dataReceived and + await ctx.fetchBodies(peer, workItemIdx, reqBodies) else: - warn "Bodies len != headers.len", bodies = bodies.len, headers = workItem.headers.len + # all bodies are empty + workItem.bodies.setLen(workItem.headers.len) + + let peers = ctx.getPeers(peer) + if peers.len > 0: + var hashes = newSeqOfCap[NewBlockHashesAnnounce](workItem.headers.len) + for h in workItem.headers: + hashes.add NewBlockHashesAnnounce( + hash: h.blockHash, + number: h.blockNumber) + trace "broadcast block hashes", numPeers=peers.len, numHashes=hashes.len + await ctx.broadcastBlockHash(hashes, peers) + + # fetchBodies can fail + dataReceived = dataReceived and true + + else: + dataReceived = false except TransportError: - debug "Transport got closed during obtainBlocksFromPeer" + debug "Transport got closed during obtainBlocksFromPeer", + peer except CatchableError as e: # the success case sets `dataReceived`, so we can just fall back to the # failure path below. If we signal time-outs with exceptions such # failures will be easier to handle. - debug "Exception in obtainBlocksFromPeer()", exc = e.name, err = e.msg + debug "Exception in obtainBlocksFromPeer()", + exc = e.name, err = e.msg, peer var giveUpOnPeer = false - if dataReceived: + trace "Finished obtaining blocks", peer, numBlocks=workItem.headers.len + workItem.state = Received - if syncCtx.returnWorkItem(workItemIdx) != ValidationResult.OK: + let res = ctx.returnWorkItem(workItemIdx) + if res != ValidationResult.OK: + trace "validation error" giveUpOnPeer = true else: giveUpOnPeer = true @@ -329,11 +747,9 @@ proc obtainBlocksFromPeer(syncCtx: FastSyncCtx, peer: Peer) {.async.} = await peer.disconnect(SubprotocolReason) except CatchableError: discard - syncCtx.handleLostPeer() + ctx.handleLostPeer() break - trace "Finished obtaining blocks", peer - proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} = # Returns true if one of the peers acknowledges existence of the best block # of another peer. @@ -370,7 +786,7 @@ proc randomTrustedPeer(ctx: FastSyncCtx): Peer = if i == k: return inc i -proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) {.async.} = +proc startSyncWithPeerImpl(ctx: FastSyncCtx, peer: Peer) {.async.} = trace "Start sync", peer, trustedPeers = ctx.trustedPeers.len if ctx.trustedPeers.len >= minPeersToStartSync: # We have enough trusted peers. Validate new peer against trusted @@ -414,26 +830,71 @@ proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) {.async.} = for p in ctx.trustedPeers: asyncSpawn ctx.obtainBlocksFromPeer(p) - -proc onPeerConnected(ctx: FastSyncCtx, peer: Peer) = - trace "New candidate for sync", peer +proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) = try: - let f = ctx.startSyncWithPeer(peer) + let + db = ctx.chain.db + header = db.getBlockHeader(ctx.finalizedBlock) + ourTD = db.getScore(header.blockHash) + peerTD = peer.state(eth).bestDifficulty + + if peerTD <= ourTD: + # do nothing if peer have same height + if peerTD < ourTD: + trace "Peer have lower TD, become recipient", + peer, ourTD, peerTD + asyncSpawn ctx.sendBlockOrHash(peer) + return + + ctx.busyPeers.incl(peer) + let f = ctx.startSyncWithPeerImpl(peer) f.callback = proc(data: pointer) {.gcsafe.} = if f.failed: if f.error of TransportError: debug "Transport got closed during startSyncWithPeer" else: error "startSyncWithPeer failed", msg = f.readError.msg, peer + ctx.busyPeers.excl(peer) + asyncSpawn f + except TransportError: debug "Transport got closed during startSyncWithPeer" except CatchableError as e: debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg +proc startObtainBlocks(ctx: FastSyncCtx, peer: Peer) = + # simpler version of startSyncWithPeer + try: + + ctx.busyPeers.incl(peer) + let f = ctx.obtainBlocksFromPeer(peer) + f.callback = proc(data: pointer) {.gcsafe.} = + if f.failed: + if f.error of TransportError: + debug "Transport got closed during startObtainBlocks" + else: + error "startObtainBlocks failed", msg = f.readError.msg, peer + ctx.busyPeers.excl(peer) + asyncSpawn f + + except TransportError: + debug "Transport got closed during startObtainBlocks" + except CatchableError as e: + debug "Exception in startObtainBlocks()", exc = e.name, err = e.msg + +proc onPeerConnected(ctx: FastSyncCtx, peer: Peer) = + trace "New candidate for sync", peer + ctx.startSyncWithPeer(peer) proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) = trace "peer disconnected ", peer = p ctx.trustedPeers.excl(p) + ctx.busyPeers.excl(p) + ctx.knownByPeer.del(p) + +# ------------------------------------------------------------------------------ +# Public constructor/destructor +# ------------------------------------------------------------------------------ proc new*(T: type FastSyncCtx; ethNode: EthereumNode; chain: Chain): T {.gcsafe, raises:[Defect,CatchableError].} = @@ -450,14 +911,184 @@ proc start*(ctx: FastSyncCtx) = ## Code for the fast blockchain sync procedure: ## _ ## ttd: + debug "Fast sync is disabled after POS merge" + return + + info "Fast Sync: start sync from", + number=ctx.finalizedBlock, + hash=blockHash + + except CatchableError as e: + debug "Exception in FastSync.start()", + exc = e.name, err = e.msg + var po = PeerObserver( onPeerConnected: proc(p: Peer) {.gcsafe.} = - ctx.onPeerConnected(p), + ctx.onPeerConnected(p), onPeerDisconnected: proc(p: Peer) {.gcsafe.} = ctx.onPeerDisconnected(p)) po.setProtocol eth ctx.peerPool.addObserver(ctx, po) +# ------------------------------------------------------------------------------ +# Public procs: eth wire protocol handlers +# ------------------------------------------------------------------------------ + +proc handleNewBlockHashes(ctx: FastSyncCtx, + peer: Peer, + hashes: openArray[NewBlockHashesAnnounce]) {. + gcsafe, raises: [Defect, CatchableError].} = + + trace trEthRecvNewBlockHashes, + numHash=hashes.len + + if hashes.len == 0: + return + + var number = hashes[0].number + if hashes.len > 1: + for i in 1.. 0: + # do nothing. busy peers will keep syncing + # until new sync target reached + trace "sync using busyPeers", + len=ctx.busyPeers.len + return + + if ctx.trustedPeers.len == 0: + trace "sync with this peer" + ctx.startObtainBlocks(peer) + else: + trace "sync with random peer" + let peer = ctx.randomTrustedPeer() + ctx.startSyncWithPeer(peer) + +proc handleNewBlock(ctx: FastSyncCtx, + peer: Peer, + blk: EthBlock, + totalDifficulty: DifficultyInt) {. + gcsafe, raises: [Defect, CatchableError].} = + + trace trEthRecvNewBlock, + number=blk.header.blockNumber, + hash=short(blk.header.blockHash) + + if ctx.lastCleanup - getTime() > CleanupInterval: + ctx.cleanupKnownByPeer() + + # Don't send NEW_BLOCK announcement to peer that sent original new block message + discard ctx.addToKnownByPeer(blk.header.blockHash, peer) + + if blk.header.blockNumber > ctx.finalizedBlock + 1.toBlockNumber: + # If the block number exceeds one past our height we cannot validate it + trace "NewBlock got block past our height", + number=blk.header.blockNumber + return + + if not ctx.validateHeader(blk.header): + error "invalid header from peer", + peer, hash=short(blk.header.blockHash) + return + + # Send NEW_BLOCK to square root of total number of peers in pool + # https://github.com/ethereum/devp2p/blob/master/caps/eth.md#block-propagation + let + numPeersToShareWith = sqrt(ctx.peerPool.len.float32).int + peers = ctx.getPeers(peer) + + debug "num peers to share with", + number=numPeersToShareWith, + numPeers=peers.len + + if peers.len > 0 and numPeersToShareWith > 0: + asyncSpawn ctx.broadcastBlock(blk, peers[0.. 0 and numPeersToShareWith > 0: + # Send `NEW_BLOCK_HASHES` message for received block to all other peers + asyncSpawn ctx.broadcastBlockHash(blk, peers[numPeersToShareWith..^1]) + +proc newBlockHashesHandler*(arg: pointer, + peer: Peer, + hashes: openArray[NewBlockHashesAnnounce]) {. + gcsafe, raises: [Defect, CatchableError].} = + let ctx = cast[FastSyncCtx](arg) + ctx.handleNewBlockHashes(peer, hashes) + +proc newBlockHandler*(arg: pointer, + peer: Peer, + blk: EthBlock, + totalDifficulty: DifficultyInt) {. + gcsafe, raises: [Defect, CatchableError].} = + + let ctx = cast[FastSyncCtx](arg) + ctx.handleNewBlock(peer, blk, totalDifficulty) + # End diff --git a/nimbus/sync/handlers.nim b/nimbus/sync/handlers.nim index 546e781f3..d4a970493 100644 --- a/nimbus/sync/handlers.nim +++ b/nimbus/sync/handlers.nim @@ -15,6 +15,27 @@ import type HashToTime = TableRef[Hash256, Time] + NewBlockHandler* = proc( + arg: pointer, + peer: Peer, + blk: EthBlock, + totalDifficulty: DifficultyInt) {. + gcsafe, raises: [Defect, CatchableError].} + + NewBlockHashesHandler* = proc( + arg: pointer, + peer: Peer, + hashes: openArray[NewBlockHashesAnnounce]) {. + gcsafe, raises: [Defect, CatchableError].} + + NewBlockHandlerPair = object + arg: pointer + handler: NewBlockHandler + + NewBlockHashesHandlerPair = object + arg: pointer + handler: NewBlockHashesHandler + EthWireRef* = ref object of EthWireBase db: BaseChainDB chain: Chain @@ -25,6 +46,8 @@ type pending: HashSet[Hash256] lastCleanup: Time merger: MergerRef + newBlockHandler: NewBlockHandlerPair + newBlockHashesHandler: NewBlockHashesHandlerPair ReconnectRef = ref object pool: PeerPool @@ -35,9 +58,63 @@ const POOLED_STORAGE_TIME_LIMIT = initDuration(minutes = 20) PEER_LONG_BANTIME = chronos.minutes(150) +# ------------------------------------------------------------------------------ +# Private functions: helper functions +# ------------------------------------------------------------------------------ + +proc notEnabled(name: string) = + debug "Wire handler method is disabled", meth = name + +proc notImplemented(name: string) = + debug "Wire handler method not implemented", meth = name + +proc inPool(ctx: EthWireRef, txHash: Hash256): bool = + let res = ctx.txPool.getItem(txHash) + res.isOk + +proc inPoolAndOk(ctx: EthWireRef, txHash: Hash256): bool = + let res = ctx.txPool.getItem(txHash) + if res.isErr: return false + res.get().reject == txInfoOk + +proc successorHeader(db: BaseChainDB, + h: BlockHeader, + output: var BlockHeader, + skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = + let offset = 1 + skip.toBlockNumber + if h.blockNumber <= (not 0.toBlockNumber) - offset: + result = db.getBlockHeader(h.blockNumber + offset, output) + +proc ancestorHeader(db: BaseChainDB, + h: BlockHeader, + output: var BlockHeader, + skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = + let offset = 1 + skip.toBlockNumber + if h.blockNumber >= offset: + result = db.getBlockHeader(h.blockNumber - offset, output) + +proc blockHeader(db: BaseChainDB, + b: HashOrNum, + output: var BlockHeader): bool + {.gcsafe, raises: [Defect,RlpError].} = + if b.isHash: + db.getBlockHeader(b.hash, output) + else: + db.getBlockHeader(b.number, output) + +# ------------------------------------------------------------------------------ +# Private functions: peers related functions +# ------------------------------------------------------------------------------ + proc hash(peer: Peer): hashes.Hash = hash(peer.remote) +proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] = + # do not send back tx or txhash to thisPeer + for peer in peers(ctx.peerPool): + if peer != thisPeer: + result.add peer + proc banExpiredReconnect(arg: pointer) {.gcsafe, raises: [Defect].} = # Reconnect to peer after ban period if pool is empty try: @@ -86,6 +163,14 @@ proc cleanupKnownByPeer(ctx: EthWireRef) = map.del(hash) tmp.clear() + var tmpPeer = initHashSet[Peer]() + for peer, map in ctx.knownByPeer: + if map.len == 0: + tmpPeer.incl peer + + for peer in tmpPeer: + ctx.knownByPeer.del peer + ctx.lastCleanup = now proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) = @@ -97,7 +182,10 @@ proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer) if txHash notin map: map[txHash] = getTime() -proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer, newHashes: var seq[Hash256]) = +proc addToKnownByPeer(ctx: EthWireRef, + txHashes: openArray[Hash256], + peer: Peer, + newHashes: var seq[Hash256]) = var map: HashToTime if not ctx.knownByPeer.take(peer, map): map = newTable[Hash256, Time]() @@ -108,6 +196,10 @@ proc addToKnownByPeer(ctx: EthWireRef, txHashes: openArray[Hash256], peer: Peer, map[txHash] = getTime() newHashes.add txHash +# ------------------------------------------------------------------------------ +# Private functions: async workers +# ------------------------------------------------------------------------------ + proc sendNewTxHashes(ctx: EthWireRef, txHashes: seq[Hash256], peers: seq[Peer]): Future[void] {.async.} = @@ -144,20 +236,49 @@ proc sendTransactions(ctx: EthWireRef, except CatchableError as e: debug "Exception in sendTransactions", exc = e.name, err = e.msg -proc inPool(ctx: EthWireRef, txHash: Hash256): bool = - let res = ctx.txPool.getItem(txHash) - res.isOk +proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} = + debug "fetchTx: requesting txs", + number = reqHashes.len -proc inPoolAndOk(ctx: EthWireRef, txHash: Hash256): bool = - let res = ctx.txPool.getItem(txHash) - if res.isErr: return false - res.get().reject == txInfoOk + try: -proc getPeers(ctx: EthWireRef, thisPeer: Peer): seq[Peer] = - # do not send back tx or txhash to thisPeer - for peer in peers(ctx.peerPool): - if peer != thisPeer: - result.add peer + let res = await peer.getPooledTransactions(reqHashes) + if res.isNone: + error "not able to get pooled transactions" + return + + let txs = res.get() + debug "fetchTx: received requested txs", + number = txs.transactions.len + + # Remove from pending list regardless if tx is in result + for tx in txs.transactions: + let txHash = rlpHash(tx) + ctx.pending.excl txHash + + ctx.txPool.jobAddTxs(txs.transactions) + + except TransportError: + debug "Transport got closed during fetchTransactions" + return + except CatchableError as e: + debug "Exception in fetchTransactions", exc = e.name, err = e.msg + return + + var newTxHashes = newSeqOfCap[Hash256](reqHashes.len) + for txHash in reqHashes: + if ctx.inPoolAndOk(txHash): + newTxHashes.add txHash + + let peers = ctx.getPeers(peer) + if peers.len == 0 or newTxHashes.len == 0: + return + + await ctx.sendNewTxHashes(newTxHashes, peers) + +# ------------------------------------------------------------------------------ +# Private functions: peer observer +# ------------------------------------------------------------------------------ proc onPeerConnected(ctx: EthWireRef, peer: Peer) = if ctx.disableTxPool: @@ -192,6 +313,10 @@ proc setupPeerObserver(ctx: EthWireRef) = po.setProtocol eth ctx.peerPool.addObserver(ctx, po) +# ------------------------------------------------------------------------------ +# Public constructor/destructor +# ------------------------------------------------------------------------------ + proc new*(_: type EthWireRef, chain: Chain, txPool: TxPoolRef, @@ -209,11 +334,25 @@ proc new*(_: type EthWireRef, ctx.setupPeerObserver() ctx -proc notEnabled(name: string) = - debug "Wire handler method is disabled", meth = name +# ------------------------------------------------------------------------------ +# Public functions: callbacks setters +# ------------------------------------------------------------------------------ -proc notImplemented(name: string) = - debug "Wire handler method not implemented", meth = name +proc setNewBlockHandler*(ctx: EthWireRef, handler: NewBlockHandler, arg: pointer) = + ctx.newBlockHandler = NewBlockHandlerPair( + arg: arg, + handler: handler + ) + +proc setNewBlockHashesHandler*(ctx: EthWireRef, handler: NewBlockHashesHandler, arg: pointer) = + ctx.newBlockHashesHandler = NewBlockHashesHandlerPair( + arg: arg, + handler: handler + ) + +# ------------------------------------------------------------------------------ +# Public functions: eth wire protocol handlers +# ------------------------------------------------------------------------------ proc txPoolEnabled*(ctx: EthWireRef; ena: bool) = ctx.disableTxPool = not ena @@ -265,31 +404,6 @@ method getBlockBodies*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[BlockBo result.add BlockBody() trace "handlers.getBlockBodies: blockBody not found", blockHash -proc successorHeader(db: BaseChainDB, - h: BlockHeader, - output: var BlockHeader, - skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = - let offset = 1 + skip.toBlockNumber - if h.blockNumber <= (not 0.toBlockNumber) - offset: - result = db.getBlockHeader(h.blockNumber + offset, output) - -proc ancestorHeader*(db: BaseChainDB, - h: BlockHeader, - output: var BlockHeader, - skip = 0'u): bool {.gcsafe, raises: [Defect,RlpError].} = - let offset = 1 + skip.toBlockNumber - if h.blockNumber >= offset: - result = db.getBlockHeader(h.blockNumber - offset, output) - -proc blockHeader(db: BaseChainDB, - b: HashOrNum, - output: var BlockHeader): bool - {.gcsafe, raises: [Defect,RlpError].} = - if b.isHash: - db.getBlockHeader(b.hash, output) - else: - db.getBlockHeader(b.number, output) - method getBlockHeaders*(ctx: EthWireRef, req: BlocksRequest): seq[BlockHeader] {.gcsafe.} = let db = ctx.db var foundBlock: BlockHeader @@ -348,46 +462,6 @@ method handleAnnouncedTxs*(ctx: EthWireRef, peer: Peer, txs: openArray[Transacti asyncSpawn ctx.sendNewTxHashes(newTxHashes, peers[sendFull..^1]) -proc fetchTransactions(ctx: EthWireRef, reqHashes: seq[Hash256], peer: Peer): Future[void] {.async.} = - debug "fetchTx: requesting txs", - number = reqHashes.len - - try: - - let res = await peer.getPooledTransactions(reqHashes) - if res.isNone: - error "not able to get pooled transactions" - return - - let txs = res.get() - debug "fetchTx: received requested txs", - number = txs.transactions.len - - # Remove from pending list regardless if tx is in result - for tx in txs.transactions: - let txHash = rlpHash(tx) - ctx.pending.excl txHash - - ctx.txPool.jobAddTxs(txs.transactions) - - except TransportError: - debug "Transport got closed during fetchTransactions" - return - except CatchableError as e: - debug "Exception in fetchTransactions", exc = e.name, err = e.msg - return - - var newTxHashes = newSeqOfCap[Hash256](reqHashes.len) - for txHash in reqHashes: - if ctx.inPoolAndOk(txHash): - newTxHashes.add txHash - - let peers = ctx.getPeers(peer) - if peers.len == 0 or newTxHashes.len == 0: - return - - await ctx.sendNewTxHashes(newTxHashes, peers) - method handleAnnouncedTxsHashes*(ctx: EthWireRef, peer: Peer, txHashes: openArray[Hash256]) {.gcsafe.} = if ctx.disableTxPool: when trMissingOrDisabledGossipOk: @@ -426,6 +500,12 @@ method handleNewBlock*(ctx: EthWireRef, peer: Peer, blk: EthBlock, totalDifficul asyncSpawn banPeer(ctx.peerPool, peer, PEER_LONG_BANTIME) return + if not ctx.newBlockHandler.handler.isNil: + ctx.newBlockHandler.handler( + ctx.newBlockHandler.arg, + peer, blk, totalDifficulty + ) + method handleNewBlockHashes*(ctx: EthWireRef, peer: Peer, hashes: openArray[NewBlockHashesAnnounce]) {.gcsafe.} = if ctx.merger.posFinalized: debug "Dropping peer for sending NewBlockHashes after merge (EIP-3675)", @@ -433,6 +513,13 @@ method handleNewBlockHashes*(ctx: EthWireRef, peer: Peer, hashes: openArray[NewB asyncSpawn banPeer(ctx.peerPool, peer, PEER_LONG_BANTIME) return + if not ctx.newBlockHashesHandler.handler.isNil: + ctx.newBlockHashesHandler.handler( + ctx.newBlockHashesHandler.arg, + peer, + hashes + ) + when defined(legacy_eth66_enabled): method getStorageNodes*(ctx: EthWireRef, hashes: openArray[Hash256]): seq[Blob] {.gcsafe.} = let db = ctx.db.db diff --git a/nimbus/sync/protocol/eth66.nim b/nimbus/sync/protocol/eth66.nim index a555df388..e97c775f2 100644 --- a/nimbus/sync/protocol/eth66.nim +++ b/nimbus/sync/protocol/eth66.nim @@ -64,9 +64,14 @@ const trEthSendDelaying* = ">> " & prettyEthProtoName & " Delaying " -func toHex(hash: Hash256): string = - ## Shortcut for `byteutils.toHex(hash.data)` - hash.data.toHex + trEthRecvNewBlock* = + "<< " & prettyEthProtoName & " Received NewBlock" + trEthRecvNewBlockHashes* = + "<< " & prettyEthProtoName & " Received NewBlockHashes" + trEthSendNewBlock* = + ">> " & prettyEthProtoName & " Sending NewBlock" + trEthSendNewBlockHashes* = + ">> " & prettyEthProtoName & " Sending NewBlockHashes" p2pProtocol eth66(version = ethVersion, rlpxName = "eth", @@ -82,9 +87,9 @@ p2pProtocol eth66(version = ethVersion, trace trEthSendSending & "Status (0x00)", peer, td = status.totalDifficulty, - bestHash = status.bestBlockHash, + bestHash = short(status.bestBlockHash), networkId = network.networkId, - genesis = status.genesisHash, + genesis = short(status.genesisHash), forkHash = status.forkId.forkHash.toHex, forkNext = status.forkId.forkNext @@ -113,7 +118,7 @@ p2pProtocol eth66(version = ethVersion, if m.genesisHash != status.genesisHash: trace "Peer for a different network (genesisHash)", peer, - expectGenesis=status.genesisHash, gotGenesis=m.genesisHash + expectGenesis=short(status.genesisHash), gotGenesis=short(m.genesisHash) raise newException( UselessPeerError, "Eth handshake for different network") @@ -132,7 +137,7 @@ p2pProtocol eth66(version = ethVersion, genesisHash: Hash256, forkId: ChainForkId) = trace trEthRecvReceived & "Status (0x00)", peer, - networkId, totalDifficulty, bestHash, genesisHash, + networkId, totalDifficulty, bestHash=short(bestHash), genesisHash=short(genesisHash), forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext # User message 0x01: NewBlockHashes. diff --git a/nimbus/sync/protocol/eth67.nim b/nimbus/sync/protocol/eth67.nim index 12abb3ae4..e647fc39e 100644 --- a/nimbus/sync/protocol/eth67.nim +++ b/nimbus/sync/protocol/eth67.nim @@ -64,9 +64,14 @@ const trEthSendDelaying* = ">> " & prettyEthProtoName & " Delaying " -func toHex(hash: Hash256): string = - ## Shortcut for `byteutils.toHex(hash.data)` - hash.data.toHex + trEthRecvNewBlock* = + "<< " & prettyEthProtoName & " Received NewBlock" + trEthRecvNewBlockHashes* = + "<< " & prettyEthProtoName & " Received NewBlockHashes" + trEthSendNewBlock* = + ">> " & prettyEthProtoName & " Sending NewBlock" + trEthSendNewBlockHashes* = + ">> " & prettyEthProtoName & " Sending NewBlockHashes" p2pProtocol eth67(version = ethVersion, rlpxName = "eth", @@ -82,9 +87,9 @@ p2pProtocol eth67(version = ethVersion, trace trEthSendSending & "Status (0x00)", peer, td = status.totalDifficulty, - bestHash = status.bestBlockHash, + bestHash = short(status.bestBlockHash), networkId = network.networkId, - genesis = status.genesisHash, + genesis = short(status.genesisHash), forkHash = status.forkId.forkHash.toHex, forkNext = status.forkId.forkNext @@ -100,7 +105,7 @@ p2pProtocol eth67(version = ethVersion, trace "Handshake: Local and remote networkId", local=network.networkId, remote=m.networkId trace "Handshake: Local and remote genesisHash", - local=status.genesisHash, remote=m.genesisHash + local=short(status.genesisHash), remote=short(m.genesisHash) trace "Handshake: Local and remote forkId", local=(status.forkId.forkHash.toHex & "/" & $status.forkId.forkNext), remote=(m.forkId.forkHash.toHex & "/" & $m.forkId.forkNext) @@ -113,7 +118,7 @@ p2pProtocol eth67(version = ethVersion, if m.genesisHash != status.genesisHash: trace "Peer for a different network (genesisHash)", peer, - expectGenesis=status.genesisHash, gotGenesis=m.genesisHash + expectGenesis=short(status.genesisHash), gotGenesis=short(m.genesisHash) raise newException( UselessPeerError, "Eth handshake for different network") @@ -132,7 +137,7 @@ p2pProtocol eth67(version = ethVersion, genesisHash: Hash256, forkId: ChainForkId) = trace trEthRecvReceived & "Status (0x00)", peer, - networkId, totalDifficulty, bestHash, genesisHash, + networkId, totalDifficulty, bestHash=short(bestHash), genesisHash=short(genesisHash), forkHash=forkId.forkHash.toHex, forkNext=forkId.forkNext # User message 0x01: NewBlockHashes.