parent
f4cacdfc6a
commit
56f169b23e
|
@ -27,7 +27,7 @@ import
|
|||
./core/[chain, sealer, clique/clique_desc,
|
||||
clique/clique_sealer, tx_pool, block_import],
|
||||
./rpc/merge/merger,
|
||||
./sync/[fast, full, protocol, snap,
|
||||
./sync/[legacy, full, protocol, snap,
|
||||
protocol/les_protocol, handlers, peers]
|
||||
|
||||
when defined(evmc_enabled):
|
||||
|
@ -410,7 +410,7 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
|
|||
if ProtocolFlag.Eth in protocols and conf.maxPeers > 0:
|
||||
case conf.syncMode:
|
||||
of SyncMode.Default:
|
||||
let syncer = FastSyncCtx.new(nimbus.ethNode, nimbus.chainRef)
|
||||
let syncer = LegacySyncRef.new(nimbus.ethNode, nimbus.chainRef)
|
||||
syncer.start
|
||||
|
||||
let wireHandler = EthWireRef(
|
||||
|
@ -418,12 +418,12 @@ proc start(nimbus: NimbusNode, conf: NimbusConf) =
|
|||
)
|
||||
|
||||
wireHandler.setNewBlockHandler(
|
||||
fast.newBlockHandler,
|
||||
legacy.newBlockHandler,
|
||||
cast[pointer](syncer)
|
||||
)
|
||||
|
||||
wireHandler.setNewBlockHashesHandler(
|
||||
fast.newBlockHashesHandler,
|
||||
legacy.newBlockHashesHandler,
|
||||
cast[pointer](syncer)
|
||||
)
|
||||
|
||||
|
|
|
@ -59,7 +59,7 @@ type
|
|||
headers: seq[BlockHeader]
|
||||
bodies: seq[BlockBody]
|
||||
|
||||
FastSyncCtx* = ref object
|
||||
LegacySyncRef* = ref object
|
||||
workQueue: seq[WantedBlocks]
|
||||
endBlockNumber: BlockNumber
|
||||
finalizedBlock: BlockNumber # Block which was downloaded and verified
|
||||
|
@ -77,7 +77,7 @@ type
|
|||
|
||||
proc hash*(p: Peer): Hash = hash(cast[pointer](p))
|
||||
|
||||
proc cleanupKnownByPeer(ctx: FastSyncCtx) =
|
||||
proc cleanupKnownByPeer(ctx: LegacySyncRef) =
|
||||
let now = getTime()
|
||||
var tmp = initHashSet[Hash256]()
|
||||
for _, map in ctx.knownByPeer:
|
||||
|
@ -98,7 +98,7 @@ proc cleanupKnownByPeer(ctx: FastSyncCtx) =
|
|||
|
||||
ctx.lastCleanup = now
|
||||
|
||||
proc addToKnownByPeer(ctx: FastSyncCtx,
|
||||
proc addToKnownByPeer(ctx: LegacySyncRef,
|
||||
blockHash: Hash256,
|
||||
peer: Peer): bool =
|
||||
var map: HashToTime
|
||||
|
@ -110,13 +110,13 @@ proc addToKnownByPeer(ctx: FastSyncCtx,
|
|||
|
||||
map[blockHash] = getTime()
|
||||
|
||||
proc getPeers(ctx: FastSyncCtx, thisPeer: Peer): seq[Peer] =
|
||||
proc getPeers(ctx: LegacySyncRef, thisPeer: Peer): seq[Peer] =
|
||||
# do not send back block/blockhash to thisPeer
|
||||
for peer in peers(ctx.peerPool):
|
||||
if peer != thisPeer:
|
||||
result.add peer
|
||||
|
||||
proc handleLostPeer(ctx: FastSyncCtx) =
|
||||
proc handleLostPeer(ctx: LegacySyncRef) =
|
||||
# TODO: ask the PeerPool for new connections and then call
|
||||
# `obtainBlocksFromPeer`
|
||||
discard
|
||||
|
@ -125,7 +125,7 @@ proc handleLostPeer(ctx: FastSyncCtx) =
|
|||
# Private functions: validators
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc validateDifficulty(ctx: FastSyncCtx,
|
||||
proc validateDifficulty(ctx: LegacySyncRef,
|
||||
header, parentHeader: BlockHeader,
|
||||
consensusType: ConsensusType): bool =
|
||||
try:
|
||||
|
@ -160,7 +160,7 @@ proc validateDifficulty(ctx: FastSyncCtx,
|
|||
exc = e.name, err = e.msg
|
||||
return false
|
||||
|
||||
proc validateHeader(ctx: FastSyncCtx, header: BlockHeader,
|
||||
proc validateHeader(ctx: LegacySyncRef, header: BlockHeader,
|
||||
height = none(BlockNumber)): bool
|
||||
{.raises: [Defect,CatchableError].} =
|
||||
if header.parentHash == GENESIS_PARENT_HASH:
|
||||
|
@ -229,7 +229,7 @@ proc validateHeader(ctx: FastSyncCtx, header: BlockHeader,
|
|||
# Private functions: sync worker
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc broadcastBlockHash(ctx: FastSyncCtx, hashes: seq[NewBlockHashesAnnounce], peers: seq[Peer]) {.async.} =
|
||||
proc broadcastBlockHash(ctx: LegacySyncRef, hashes: seq[NewBlockHashesAnnounce], peers: seq[Peer]) {.async.} =
|
||||
try:
|
||||
|
||||
var bha = newSeqOfCap[NewBlockHashesAnnounce](hashes.len)
|
||||
|
@ -248,7 +248,7 @@ proc broadcastBlockHash(ctx: FastSyncCtx, hashes: seq[NewBlockHashesAnnounce], p
|
|||
except CatchableError as e:
|
||||
debug "Exception in broadcastBlockHash", exc = e.name, err = e.msg
|
||||
|
||||
proc broadcastBlock(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.async.} =
|
||||
proc broadcastBlock(ctx: LegacySyncRef, blk: EthBlock, peers: seq[Peer]) {.async.} =
|
||||
try:
|
||||
|
||||
let
|
||||
|
@ -270,7 +270,7 @@ proc broadcastBlock(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.async.}
|
|||
except CatchableError as e:
|
||||
debug "Exception in broadcastBlock", exc = e.name, err = e.msg
|
||||
|
||||
proc broadcastBlockHash(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.async.} =
|
||||
proc broadcastBlockHash(ctx: LegacySyncRef, blk: EthBlock, peers: seq[Peer]) {.async.} =
|
||||
try:
|
||||
|
||||
let bha = NewBlockHashesAnnounce(
|
||||
|
@ -290,7 +290,7 @@ proc broadcastBlockHash(ctx: FastSyncCtx, blk: EthBlock, peers: seq[Peer]) {.asy
|
|||
except CatchableError as e:
|
||||
debug "Exception in broadcastBlockHash", exc = e.name, err = e.msg
|
||||
|
||||
proc sendBlockOrHash(ctx: FastSyncCtx, peer: Peer) {.async.} =
|
||||
proc sendBlockOrHash(ctx: LegacySyncRef, peer: Peer) {.async.} =
|
||||
# because peer TD is lower than us,
|
||||
# it become our recipient of block and block hashes
|
||||
# instead of we download from it
|
||||
|
@ -380,7 +380,7 @@ proc endIndex(b: WantedBlocks): BlockNumber =
|
|||
result = b.startIndex
|
||||
result += (b.numBlocks - 1).toBlockNumber
|
||||
|
||||
proc availableWorkItem(ctx: FastSyncCtx): int =
|
||||
proc availableWorkItem(ctx: LegacySyncRef): int =
|
||||
var maxPendingBlock = ctx.finalizedBlock # the last downloaded & processed
|
||||
trace "queue len", length = ctx.workQueue.len
|
||||
result = -1
|
||||
|
@ -430,7 +430,7 @@ proc availableWorkItem(ctx: FastSyncCtx): int =
|
|||
state : Initial,
|
||||
isHash : false)
|
||||
|
||||
proc appendWorkItem(ctx: FastSyncCtx, hash: Hash256,
|
||||
proc appendWorkItem(ctx: LegacySyncRef, hash: Hash256,
|
||||
startIndex: BlockNumber, numBlocks: uint) =
|
||||
for i in 0 .. ctx.workQueue.high:
|
||||
if ctx.workQueue[i].state == Persisted:
|
||||
|
@ -451,7 +451,7 @@ proc appendWorkItem(ctx: FastSyncCtx, hash: Hash256,
|
|||
numBlocks : numBlocks,
|
||||
state : Initial)
|
||||
|
||||
proc persistWorkItem(ctx: FastSyncCtx, wi: var WantedBlocks): ValidationResult
|
||||
proc persistWorkItem(ctx: LegacySyncRef, wi: var WantedBlocks): ValidationResult
|
||||
{.gcsafe, raises:[Defect,CatchableError].} =
|
||||
try:
|
||||
result = ctx.chain.persistBlocks(wi.headers, wi.bodies)
|
||||
|
@ -479,7 +479,7 @@ proc persistWorkItem(ctx: FastSyncCtx, wi: var WantedBlocks): ValidationResult
|
|||
wi.headers = @[]
|
||||
wi.bodies = @[]
|
||||
|
||||
proc persistPendingWorkItems(ctx: FastSyncCtx): (int, ValidationResult)
|
||||
proc persistPendingWorkItems(ctx: LegacySyncRef): (int, ValidationResult)
|
||||
{.gcsafe, raises:[Defect,CatchableError].} =
|
||||
var nextStartIndex = ctx.finalizedBlock + 1
|
||||
var keepRunning = true
|
||||
|
@ -508,7 +508,7 @@ proc persistPendingWorkItems(ctx: FastSyncCtx): (int, ValidationResult)
|
|||
|
||||
ctx.hasOutOfOrderBlocks = hasOutOfOrderBlocks
|
||||
|
||||
proc returnWorkItem(ctx: FastSyncCtx, workItem: int): ValidationResult
|
||||
proc returnWorkItem(ctx: LegacySyncRef, workItem: int): ValidationResult
|
||||
{.gcsafe, raises:[Defect,CatchableError].} =
|
||||
let wi = addr ctx.workQueue[workItem]
|
||||
let askedBlocks = wi.numBlocks.int
|
||||
|
@ -591,7 +591,7 @@ proc hash*(x: BodyHash): Hash =
|
|||
h = h !& hash(x.uncleHash.data)
|
||||
result = !$h
|
||||
|
||||
proc fetchBodies(ctx: FastSyncCtx, peer: Peer,
|
||||
proc fetchBodies(ctx: LegacySyncRef, peer: Peer,
|
||||
workItemIdx: int, reqBodies: seq[bool]): Future[bool] {.async.} =
|
||||
template workItem: auto = ctx.workQueue[workItemIdx]
|
||||
var bodies = newSeqOfCap[BlockBody](workItem.headers.len)
|
||||
|
@ -655,7 +655,7 @@ proc fetchBodies(ctx: FastSyncCtx, peer: Peer,
|
|||
|
||||
return true
|
||||
|
||||
proc obtainBlocksFromPeer(ctx: FastSyncCtx, peer: Peer) {.async.} =
|
||||
proc obtainBlocksFromPeer(ctx: LegacySyncRef, peer: Peer) {.async.} =
|
||||
# Update our best block number
|
||||
try:
|
||||
if ctx.endBlockNumber <= ctx.finalizedBlock:
|
||||
|
@ -788,7 +788,7 @@ proc peersAgreeOnChain(a, b: Peer): Future[bool] {.async.} =
|
|||
trace trEthRecvReceivedBlockHeaders, peer=a,
|
||||
count=latestBlock.get.headers.len, blockNumber
|
||||
|
||||
proc randomTrustedPeer(ctx: FastSyncCtx): Peer =
|
||||
proc randomTrustedPeer(ctx: LegacySyncRef): Peer =
|
||||
var k = rand(ctx.trustedPeers.len - 1)
|
||||
var i = 0
|
||||
for p in ctx.trustedPeers:
|
||||
|
@ -796,7 +796,7 @@ proc randomTrustedPeer(ctx: FastSyncCtx): Peer =
|
|||
if i == k: return
|
||||
inc i
|
||||
|
||||
proc startSyncWithPeerImpl(ctx: FastSyncCtx, peer: Peer) {.async.} =
|
||||
proc startSyncWithPeerImpl(ctx: LegacySyncRef, peer: Peer) {.async.} =
|
||||
trace "Start sync", peer, trustedPeers = ctx.trustedPeers.len
|
||||
if ctx.trustedPeers.len >= minPeersToStartSync:
|
||||
# We have enough trusted peers. Validate new peer against trusted
|
||||
|
@ -840,7 +840,7 @@ proc startSyncWithPeerImpl(ctx: FastSyncCtx, peer: Peer) {.async.} =
|
|||
for p in ctx.trustedPeers:
|
||||
asyncSpawn ctx.obtainBlocksFromPeer(p)
|
||||
|
||||
proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) =
|
||||
proc startSyncWithPeer(ctx: LegacySyncRef, peer: Peer) =
|
||||
try:
|
||||
let
|
||||
db = ctx.chain.db
|
||||
|
@ -872,7 +872,7 @@ proc startSyncWithPeer(ctx: FastSyncCtx, peer: Peer) =
|
|||
except CatchableError as e:
|
||||
debug "Exception in startSyncWithPeer()", exc = e.name, err = e.msg
|
||||
|
||||
proc startObtainBlocks(ctx: FastSyncCtx, peer: Peer) =
|
||||
proc startObtainBlocks(ctx: LegacySyncRef, peer: Peer) =
|
||||
# simpler version of startSyncWithPeer
|
||||
try:
|
||||
|
||||
|
@ -892,11 +892,11 @@ proc startObtainBlocks(ctx: FastSyncCtx, peer: Peer) =
|
|||
except CatchableError as e:
|
||||
debug "Exception in startObtainBlocks()", exc = e.name, err = e.msg
|
||||
|
||||
proc onPeerConnected(ctx: FastSyncCtx, peer: Peer) =
|
||||
proc onPeerConnected(ctx: LegacySyncRef, peer: Peer) =
|
||||
trace "New candidate for sync", peer
|
||||
ctx.startSyncWithPeer(peer)
|
||||
|
||||
proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) =
|
||||
proc onPeerDisconnected(ctx: LegacySyncRef, p: Peer) =
|
||||
trace "peer disconnected ", peer = p
|
||||
ctx.trustedPeers.excl(p)
|
||||
ctx.busyPeers.excl(p)
|
||||
|
@ -906,9 +906,9 @@ proc onPeerDisconnected(ctx: FastSyncCtx, p: Peer) =
|
|||
# Public constructor/destructor
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc new*(T: type FastSyncCtx; ethNode: EthereumNode; chain: ChainRef): T
|
||||
proc new*(T: type LegacySyncRef; ethNode: EthereumNode; chain: ChainRef): T
|
||||
{.gcsafe, raises:[Defect,CatchableError].} =
|
||||
FastSyncCtx(
|
||||
LegacySyncRef(
|
||||
# workQueue: n/a
|
||||
# endBlockNumber: n/a
|
||||
# hasOutOfOrderBlocks: n/a
|
||||
|
@ -917,7 +917,7 @@ proc new*(T: type FastSyncCtx; ethNode: EthereumNode; chain: ChainRef): T
|
|||
trustedPeers: initHashSet[Peer](),
|
||||
finalizedBlock: chain.db.getCanonicalHead().blockNumber)
|
||||
|
||||
proc start*(ctx: FastSyncCtx) =
|
||||
proc start*(ctx: LegacySyncRef) =
|
||||
## Code for the fast blockchain sync procedure:
|
||||
## <https://github.com/ethereum/wiki/wiki/Parallel-Block-Downloads>_
|
||||
## <https://github.com/ethereum/go-ethereum/pull/1889__
|
||||
|
@ -959,7 +959,7 @@ proc start*(ctx: FastSyncCtx) =
|
|||
# Public procs: eth wire protocol handlers
|
||||
# ------------------------------------------------------------------------------
|
||||
|
||||
proc handleNewBlockHashes(ctx: FastSyncCtx,
|
||||
proc handleNewBlockHashes(ctx: LegacySyncRef,
|
||||
peer: Peer,
|
||||
hashes: openArray[NewBlockHashesAnnounce]) {.
|
||||
gcsafe, raises: [Defect, CatchableError].} =
|
||||
|
@ -1012,7 +1012,7 @@ proc handleNewBlockHashes(ctx: FastSyncCtx,
|
|||
let peer = ctx.randomTrustedPeer()
|
||||
ctx.startSyncWithPeer(peer)
|
||||
|
||||
proc handleNewBlock(ctx: FastSyncCtx,
|
||||
proc handleNewBlock(ctx: LegacySyncRef,
|
||||
peer: Peer,
|
||||
blk: EthBlock,
|
||||
totalDifficulty: DifficultyInt) {.
|
||||
|
@ -1087,7 +1087,7 @@ proc newBlockHashesHandler*(arg: pointer,
|
|||
peer: Peer,
|
||||
hashes: openArray[NewBlockHashesAnnounce]) {.
|
||||
gcsafe, raises: [Defect, CatchableError].} =
|
||||
let ctx = cast[FastSyncCtx](arg)
|
||||
let ctx = cast[LegacySyncRef](arg)
|
||||
ctx.handleNewBlockHashes(peer, hashes)
|
||||
|
||||
proc newBlockHandler*(arg: pointer,
|
||||
|
@ -1096,7 +1096,7 @@ proc newBlockHandler*(arg: pointer,
|
|||
totalDifficulty: DifficultyInt) {.
|
||||
gcsafe, raises: [Defect, CatchableError].} =
|
||||
|
||||
let ctx = cast[FastSyncCtx](arg)
|
||||
let ctx = cast[LegacySyncRef](arg)
|
||||
ctx.handleNewBlock(peer, blk, totalDifficulty)
|
||||
|
||||
# End
|
Loading…
Reference in New Issue