Improve full sync part behaviour 4 snap sync suite (#1564)

* Set maximum time for nodes to be banned.

why:
  Useless nodes are marked zombies and banned. They a kept in a table
  until flushed out by new connections. This works well if there are many
  connections. For the case that there are a few only, a maximum time is
  set. When expired, zombies are flushed automatically.

* Suspend full sync while block number at beacon block

details:
  Also allows to use external setting from file (2nd line)

* Resume state at full sync after restart (if any)
This commit is contained in:
Jordan Hrycaj 2023-04-26 16:46:42 +01:00 committed by GitHub
parent 68b2448ce1
commit e1369a7c25
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 179 additions and 84 deletions

View File

@ -22,25 +22,31 @@ logScope:
# Private functions # Private functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc getDataLine(name: string): string {.gcsafe, raises: [IOError].} = proc getDataLine(
name: string;
lineNum: int;
): string {.gcsafe, raises: [IOError].} =
if name.fileExists: if name.fileExists:
let file = name.open let file = name.open
defer: file.close defer: file.close
return (file.readAll.splitLines)[0].strip let linesRead = file.readAll.splitLines
if lineNum < linesRead.len:
return linesRead[lineNum].strip
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public functions # Public functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc syncCtrlBlockNumberFromFile*( proc syncCtrlBlockNumberFromFile*(
fileName: Option[string]; fileName: Option[string]; # Optional file name
lineNum = 0; # Read line from file
): Result[BlockNumber,void] = ): Result[BlockNumber,void] =
## Returns a block number from the file name argument `fileName`. The first ## Returns a block number from the file name argument `fileName`. The first
## line of the file is parsed as a decimal encoded block number. ## line of the file is parsed as a decimal encoded block number.
if fileName.isSome: if fileName.isSome:
let file = fileName.get let file = fileName.get
try: try:
let data = file.getDataLine let data = file.getDataLine(lineNum)
if 0 < data.len: if 0 < data.len:
let num = parse(data,UInt256) let num = parse(data,UInt256)
return ok(num.toBlockNumber) return ok(num.toBlockNumber)
@ -52,7 +58,8 @@ proc syncCtrlBlockNumberFromFile*(
err() err()
proc syncCtrlHashOrBlockNumFromFile*( proc syncCtrlHashOrBlockNumFromFile*(
fileName: Option[string]; fileName: Option[string]; # Optional file name
lineNum = 0; # Read line from file
): Result[HashOrNum,void] = ): Result[HashOrNum,void] =
## Returns a block number or a hash from the file name argument `fileName`. ## Returns a block number or a hash from the file name argument `fileName`.
## A block number is decimal encoded and a hash is expexted to be a 66 hex ## A block number is decimal encoded and a hash is expexted to be a 66 hex
@ -62,7 +69,7 @@ proc syncCtrlHashOrBlockNumFromFile*(
# Parse value dump and fetch a header from the peer (if any) # Parse value dump and fetch a header from the peer (if any)
try: try:
let data = file.getDataLine let data = file.getDataLine(lineNum)
if 0 < data.len: if 0 < data.len:
if 66 == data.len: if 66 == data.len:
let hash = HashOrNum( let hash = HashOrNum(

View File

@ -1,5 +1,5 @@
Snap sync test & debugging scenario Test & debugging scenario with nimbus-eth1 client/server
=================================== ========================================================
Start snap/1 server Start snap/1 server
@ -40,7 +40,12 @@ Start snap/1 client
# Tell nimbus to use this pivot block number. This number must be smaller # Tell nimbus to use this pivot block number. This number must be smaller
# than the 2000000 written into the file full-limit.txt above. # than the 2000000 written into the file full-limit.txt above.
echo 600000 > snap/snap-update.txt. echo 600000 > snap/snap-update.txt
# Tell nimbus to stop somewhere after 1000000 blocks have been downloaded
# with full sync follow up after snap sync has completed (2nd line of
# external setuip file.)
echo 1000000 >> snap/snap-update.txt
# Tell nimbus to use this hard coded peer enode. # Tell nimbus to use this hard coded peer enode.
echo enode://192d7e7a302bd4ff27f48d7852621e0d3cb863a6dd67dd44e0314a25a3aa866837f0d2460b4444dc66e7b7a2cd56a2de1c31b2a2ba4e23549bf3ba3b0c4f2eb5@127.0.0.1:30319 > snap/full-servers.txt echo enode://192d7e7a302bd4ff27f48d7852621e0d3cb863a6dd67dd44e0314a25a3aa866837f0d2460b4444dc66e7b7a2cd56a2de1c31b2a2ba4e23549bf3ba3b0c4f2eb5@127.0.0.1:30319 > snap/full-servers.txt

View File

@ -15,7 +15,7 @@ import
chronos, chronos,
eth/p2p, eth/p2p,
stew/keyed_queue, stew/keyed_queue,
../../../misc/[best_pivot, block_queue, ticker], ../../../misc/[best_pivot, block_queue, sync_ctrl, ticker],
../../../protocol, ../../../protocol,
"../.."/[range_desc, worker_desc], "../.."/[range_desc, worker_desc],
../db/[snapdb_desc, snapdb_persistent], ../db/[snapdb_desc, snapdb_persistent],
@ -25,23 +25,24 @@ import
type type
FullPassCtxRef = ref object of RootRef FullPassCtxRef = ref object of RootRef
## Pass local descriptor extension for full sync process ## Pass local descriptor extension for full sync process
startNumber*: Option[BlockNumber] ## History starts here (used for logging) startNumber: Option[BlockNumber] ## History starts here (used for logging)
pivot*: BestPivotCtxRef ## Global pivot descriptor pivot: BestPivotCtxRef ## Global pivot descriptor
bCtx*: BlockQueueCtxRef ## Global block queue descriptor bCtx: BlockQueueCtxRef ## Global block queue descriptor
suspendAt: BlockNumber ## Suspend if persistent head is larger
FullPassBuddyRef = ref object of RootRef FullPassBuddyRef = ref object of RootRef
## Pass local descriptor extension for full sync process ## Pass local descriptor extension for full sync process
pivot*: BestPivotWorkerRef ## Local pivot worker descriptor pivot: BestPivotWorkerRef ## Local pivot worker descriptor
queue*: BlockQueueWorkerRef ## Block queue worker queue: BlockQueueWorkerRef ## Block queue worker
const const
extraTraceMessages = false # or true extraTraceMessages = false # or true
## Enabled additional logging noise ## Enabled additional logging noise
dumpDatabaseOnRollOver = true # or false # <--- will go away (debugging only) dumpDatabaseOnRollOver = false # or true # <--- will go away (debugging only)
## Dump database before switching to full sync (debugging, testing) ## Dump database before switching to full sync (debugging, testing)
when dumpDatabaseOnRollOver: when dumpDatabaseOnRollOver: # <--- will go away (debugging only)
import ../../../../../tests/replay/undump_kvp import ../../../../../tests/replay/undump_kvp
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -81,10 +82,23 @@ proc `pass=`(only: var SnapBuddyData; val: FullPassBuddyRef) =
# Private functions # Private functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc resumeAtNumber(ctx: SnapCtxRef): BlockNumber =
## Resume full sync (if any)
ignoreException("resumeAtNumber"):
const nBackBlocks = maxHeadersFetch div 2
let bestNumber = ctx.chain.db.getCanonicalHead().blockNumber
if nBackBlocks < bestNumber:
return bestNumber - nBackBlocks
proc tickerUpdater(ctx: SnapCtxRef): TickerFullStatsUpdater = proc tickerUpdater(ctx: SnapCtxRef): TickerFullStatsUpdater =
result = proc: TickerFullStats = result = proc: TickerFullStats =
let full = ctx.pool.pass
var stats: BlockQueueStats var stats: BlockQueueStats
ctx.pool.pass.bCtx.blockQueueStats(stats) full.bCtx.blockQueueStats(stats)
let suspended = 0 < full.suspendAt and full.suspendAt <= stats.topAccepted
TickerFullStats( TickerFullStats(
pivotBlock: ctx.pool.pass.startNumber, pivotBlock: ctx.pool.pass.startNumber,
@ -92,6 +106,7 @@ proc tickerUpdater(ctx: SnapCtxRef): TickerFullStatsUpdater =
nextStaged: stats.nextStaged, nextStaged: stats.nextStaged,
nextUnprocessed: stats.nextUnprocessed, nextUnprocessed: stats.nextUnprocessed,
nStagedQueue: stats.nStagedQueue, nStagedQueue: stats.nStagedQueue,
suspended: suspended,
reOrg: stats.reOrg) reOrg: stats.reOrg)
@ -158,15 +173,46 @@ proc processStaged(buddy: SnapBuddyRef): bool =
return false return false
proc suspendDownload(buddy: SnapBuddyRef): bool =
## Check whether downloading should be suspended
let
ctx = buddy.ctx
full = ctx.pool.pass
# Update from RPC magic
if full.suspendAt < ctx.pool.beaconHeader.blockNumber:
full.suspendAt = ctx.pool.beaconHeader.blockNumber
# Optionaly, some external update request
if ctx.exCtrlFile.isSome:
# Needs to be read as second line (index 1)
let rc = ctx.exCtrlFile.syncCtrlBlockNumberFromFile(1)
if rc.isOk and full.suspendAt < rc.value:
full.suspendAt = rc.value
# Return `true` if download should be suspended
if 0 < full.suspendAt:
return full.suspendAt <= buddy.only.pass.queue.topAccepted
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private functions, full sync admin handlers # Private functions, full sync admin handlers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc fullSyncSetup(ctx: SnapCtxRef) = proc fullSyncSetup(ctx: SnapCtxRef) =
# Set up descriptor # Set up descriptor
ctx.pool.pass = FullPassCtxRef() let full = FullPassCtxRef()
ctx.pool.pass.bCtx = BlockQueueCtxRef.init() ctx.pool.pass = full
ctx.pool.pass.pivot = BestPivotCtxRef.init(rng=ctx.pool.rng, minPeers=0)
# Initialise full sync, resume from previous download (if any)
let blockNumber = ctx.resumeAtNumber()
if 0 < blockNumber:
full.startNumber = some(blockNumber)
full.bCtx = BlockQueueCtxRef.init(blockNumber + 1)
else:
full.bCtx = BlockQueueCtxRef.init()
# Initialise peer pivots in relaxed mode (not waiting for agreeing peers)
full.pivot = BestPivotCtxRef.init(rng=ctx.pool.rng, minPeers=0)
# Update ticker # Update ticker
ctx.pool.ticker.init(cb = ctx.tickerUpdater()) ctx.pool.ticker.init(cb = ctx.tickerUpdater())
@ -211,27 +257,32 @@ proc fullSyncPool(buddy: SnapBuddyRef, last: bool; laps: int): bool =
# There is a soft re-setup after switch over to full sync mode if a pivot # There is a soft re-setup after switch over to full sync mode if a pivot
# block header is available initialised from outside, i.e. snap sync swich. # block header is available initialised from outside, i.e. snap sync swich.
if ctx.pool.fullHeader.isSome: if ctx.pool.fullHeader.isSome:
let stateHeader = ctx.pool.fullHeader.unsafeGet let
stateHeader = ctx.pool.fullHeader.unsafeGet
initFullSync = ctx.pool.pass.startNumber.isNone
# Reinialise block queue descriptor relative to current pivot # Re-assign start number for logging (instead of genesis)
ctx.pool.pass.startNumber = some(stateHeader.blockNumber) ctx.pool.pass.startNumber = some(stateHeader.blockNumber)
ctx.pool.pass.bCtx = BlockQueueCtxRef.init(stateHeader.blockNumber + 1)
if initFullSync:
# Reinitialise block queue descriptor relative to current pivot
ctx.pool.pass.bCtx = BlockQueueCtxRef.init(stateHeader.blockNumber + 1)
# Store pivot as parent hash in database
ctx.pool.snapDb.kvDb.persistentBlockHeaderPut stateHeader
# Instead of genesis.
ctx.chain.com.startOfHistory = stateHeader.blockHash
when dumpDatabaseOnRollOver: # <--- will go away (debugging only)
# Dump database ... <--- will go away (debugging only)
let nRecords = # <--- will go away (debugging only)
ctx.pool.snapDb.rockDb.dumpAllDb # <--- will go away (debugging only)
trace logTxt "dumped block chain database", nRecords
# Kick off ticker (was stopped by snap `release()` method) # Kick off ticker (was stopped by snap `release()` method)
ctx.pool.ticker.start() ctx.pool.ticker.start()
# Store pivot as parent hash in database
ctx.pool.snapDb.kvDb.persistentBlockHeaderPut stateHeader
# Instead of genesis.
ctx.chain.com.startOfHistory = stateHeader.blockHash
when dumpDatabaseOnRollOver: # <--- will go away (debugging only)
# Dump database ... <--- will go away (debugging only)
let nRecords = # <--- will go away (debugging only)
ctx.pool.snapDb.rockDb.dumpAllDb # <--- will go away (debugging only)
trace logTxt "dumped block chain database", nRecords
# Reset so that this action would not be triggered, again # Reset so that this action would not be triggered, again
ctx.pool.fullHeader = none(BlockHeader) ctx.pool.fullHeader = none(BlockHeader)
@ -283,6 +334,11 @@ proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} =
ctx = buddy.ctx ctx = buddy.ctx
bq = buddy.only.pass.queue bq = buddy.only.pass.queue
if buddy.suspendDownload:
# Sleep for a while, then leave
await sleepAsync(10.seconds)
return
# Fetch work item # Fetch work item
let rc = await bq.blockQueueWorker() let rc = await bq.blockQueueWorker()
if rc.isErr: if rc.isErr:

View File

@ -21,6 +21,17 @@ import
logScope: logScope:
topics = "snap-init" topics = "snap-init"
# ------------------------------------------------------------------------------
# Private helpers
# ------------------------------------------------------------------------------
proc updateBeaconHeaderCB(ctx: SnapCtxRef): SyncReqNewHeadCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
result = proc(h: BlockHeader) {.gcsafe, raises: [].} =
if ctx.pool.beaconHeader.blockNumber < h.blockNumber:
ctx.pool.beaconHeader = h
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private functions # Private functions
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
@ -36,6 +47,16 @@ proc releasePass(ctx: SnapCtxRef) =
# -------------- # --------------
proc enableRpcMagic(ctx: SnapCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.chain.com.syncReqNewHead = ctx.updateBeaconHeaderCB
proc disableRpcMagic(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.chain.com.syncReqNewHead = nil
# --------------
proc setupTicker(ctx: SnapCtxRef) = proc setupTicker(ctx: SnapCtxRef) =
let blindTicker: TickerSnapStatsUpdater = proc: TickerSnapStats = let blindTicker: TickerSnapStatsUpdater = proc: TickerSnapStats =
discard discard
@ -64,6 +85,7 @@ proc passInitSetup*(ctx: SnapCtxRef) =
ctx.setupPass() # Set up sync sub-mode specs. ctx.setupPass() # Set up sync sub-mode specs.
ctx.setupSnapDb() # Set database backend, subject to change ctx.setupSnapDb() # Set database backend, subject to change
ctx.setupTicker() # Start log/status ticker (if any) ctx.setupTicker() # Start log/status ticker (if any)
ctx.enableRpcMagic() # Allow external pivot update via RPC
# Experimental, also used for debugging # Experimental, also used for debugging
if ctx.exCtrlFile.isSome: if ctx.exCtrlFile.isSome:
@ -72,6 +94,7 @@ proc passInitSetup*(ctx: SnapCtxRef) =
proc passInitRelease*(ctx: SnapCtxRef) = proc passInitRelease*(ctx: SnapCtxRef) =
## Global clean up ## Global clean up
ctx.disableRpcMagic() # Disable external pivot update via RPC
ctx.releaseTicker() # Stop log/status ticker (if any) ctx.releaseTicker() # Stop log/status ticker (if any)
ctx.releasePass() # Shut down sync methods ctx.releasePass() # Shut down sync methods

View File

@ -63,16 +63,6 @@ proc enableWireServices(ctx: SnapCtxRef) =
# -------------- # --------------
proc enableRpcMagic(ctx: SnapCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB
proc disableRpcMagic(ctx: SnapCtxRef) =
## Helper for `release()`
ctx.chain.com.syncReqNewHead = nil
# --------------
proc detectSnapSyncRecovery(ctx: SnapCtxRef) = proc detectSnapSyncRecovery(ctx: SnapCtxRef) =
## Helper for `setup()`: Initiate snap sync recovery (if any) ## Helper for `setup()`: Initiate snap sync recovery (if any)
let rc = ctx.pool.snapDb.pivotRecoverDB() let rc = ctx.pool.snapDb.pivotRecoverDB()
@ -171,12 +161,10 @@ proc snapSyncSetup(ctx: SnapCtxRef) =
ctx.pool.pass.coveredAccounts = NodeTagRangeSet.init() ctx.pool.pass.coveredAccounts = NodeTagRangeSet.init()
ctx.pool.ticker.init(cb = ctx.pool.pass.pivotTable.tickerStats(ctx)) ctx.pool.ticker.init(cb = ctx.pool.pass.pivotTable.tickerStats(ctx))
ctx.enableRpcMagic() # Allow external pivot update via RPC
ctx.disableWireServices() # Stop unwanted public services ctx.disableWireServices() # Stop unwanted public services
ctx.detectSnapSyncRecovery() # Check for recovery mode ctx.detectSnapSyncRecovery() # Check for recovery mode
proc snapSyncRelease(ctx: SnapCtxRef) = proc snapSyncRelease(ctx: SnapCtxRef) =
ctx.disableRpcMagic() # Disable external pivot update via RPC
ctx.enableWireServices() # re-enable public services ctx.enableWireServices() # re-enable public services
ctx.pool.ticker.stop() ctx.pool.ticker.stop()

View File

@ -35,14 +35,13 @@ proc beaconHeaderUpdatebuBlockNumber*(
## This function is typically used for testing and debugging. ## This function is typically used for testing and debugging.
let let
ctx = buddy.ctx ctx = buddy.ctx
snap = ctx.pool.pass
peer = buddy.peer peer = buddy.peer
trace "fetch beacon header", peer, num trace "fetch beacon header", peer, num
if snap.beaconHeader.blockNumber < num: if ctx.pool.beaconHeader.blockNumber < num:
let rc = await buddy.getBlockHeader(num) let rc = await buddy.getBlockHeader(num)
if rc.isOk: if rc.isOk:
snap.beaconHeader = rc.value ctx.pool.beaconHeader = rc.value
proc beaconHeaderUpdateFromFile*( proc beaconHeaderUpdateFromFile*(
@ -62,7 +61,6 @@ proc beaconHeaderUpdateFromFile*(
return return
rc.value rc.value
snap = ctx.pool.pass
peer = buddy.peer peer = buddy.peer
var var
@ -74,20 +72,20 @@ proc beaconHeaderUpdateFromFile*(
if isHash: if isHash:
let hash = hashOrNum.hash let hash = hashOrNum.hash
trace "External beacon info", peer, hash trace "External beacon info", peer, hash
if hash != snap.beaconHeader.hash: if hash != ctx.pool.beaconHeader.hash:
rc = await buddy.getBlockHeader(hash) rc = await buddy.getBlockHeader(hash)
else: else:
let num = hashOrNum.number let num = hashOrNum.number
trace "External beacon info", peer, num trace "External beacon info", peer, num
if snap.beaconHeader.blockNumber < num: if ctx.pool.beaconHeader.blockNumber < num:
rc = await buddy.getBlockHeader(num) rc = await buddy.getBlockHeader(num)
except CatchableError as e: except CatchableError as e:
trace "Exception while parsing beacon info", peer, isHash, trace "Exception while parsing beacon info", peer, isHash,
name=($e.name), msg=(e.msg) name=($e.name), msg=(e.msg)
if rc.isOk: if rc.isOk:
if snap.beaconHeader.blockNumber < rc.value.blockNumber: if ctx.pool.beaconHeader.blockNumber < rc.value.blockNumber:
snap.beaconHeader = rc.value ctx.pool.beaconHeader = rc.value
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# End # End

View File

@ -188,8 +188,8 @@ proc tickerStats*(
procChunks = env.fetchAccounts.processed.chunks procChunks = env.fetchAccounts.processed.chunks
stoQuLen = some(env.storageQueueTotal()) stoQuLen = some(env.storageQueueTotal())
ctraQuLen = some(env.fetchContracts.len) ctraQuLen = some(env.fetchContracts.len)
if 0 < ctx.pool.pass.beaconHeader.blockNumber: if 0 < ctx.pool.beaconHeader.blockNumber:
beaconBlock = some(ctx.pool.pass.beaconHeader.blockNumber) beaconBlock = some(ctx.pool.beaconHeader.blockNumber)
TickerSnapStats( TickerSnapStats(
beaconBlock: beaconBlock, beaconBlock: beaconBlock,
@ -421,7 +421,7 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
## it will not proceed to the next scheduler task. ## it will not proceed to the next scheduler task.
let let
ctx = buddy.ctx ctx = buddy.ctx
beaconHeader = ctx.pool.pass.beaconHeader beaconHeader = ctx.pool.beaconHeader
var var
pivotHeader: BlockHeader pivotHeader: BlockHeader
@ -454,16 +454,6 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} =
if pivotHeader.blockNumber == 0: if pivotHeader.blockNumber == 0:
buddy.ctrl.stopped = true buddy.ctrl.stopped = true
proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB =
## Update beacon header. This function is intended as a call back function
## for the RPC module.
result = proc(h: BlockHeader) {.gcsafe.} =
if ctx.pool.pass.beaconHeader.blockNumber < h.blockNumber:
# when extraTraceMessages:
# trace logTxt "external beacon info update", header=h.blockNumber.toStr
ctx.pool.pass.beaconHeader = h
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Public function, debugging # Public function, debugging
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------

View File

@ -92,7 +92,6 @@ type
## Global context extension, snap sync parameters, pivot table ## Global context extension, snap sync parameters, pivot table
pivotTable*: PivotTable ## Per state root environment pivotTable*: PivotTable ## Per state root environment
completedPivot*: SnapPivotRef ## Start full sync from here completedPivot*: SnapPivotRef ## Start full sync from here
beaconHeader*: BlockHeader ## Running on beacon chain
coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts
covAccTimesFull*: uint ## # of 100% coverages covAccTimesFull*: uint ## # of 100% coverages
recovery*: RecoveryRef ## Current recovery checkpoint/context recovery*: RecoveryRef ## Current recovery checkpoint/context

View File

@ -26,6 +26,7 @@ type
## Peer-worker local descriptor data extension ## Peer-worker local descriptor data extension
errors*: GetErrorStatsRef ## For error handling errors*: GetErrorStatsRef ## For error handling
full*: RootRef ## Peer local full sync descriptor full*: RootRef ## Peer local full sync descriptor
# snap*: RootRef ## Peer local snap sync descriptor
SnapSyncPassType* = enum SnapSyncPassType* = enum
## Current sync mode, after a snapshot has been downloaded, the system ## Current sync mode, after a snapshot has been downloaded, the system
@ -47,6 +48,7 @@ type
snapDb*: SnapDbRef ## Accounts snapshot DB snapDb*: SnapDbRef ## Accounts snapshot DB
# Info # Info
beaconHeader*: BlockHeader ## Running on beacon chain
enableTicker*: bool ## Advisary, extra level of gossip enableTicker*: bool ## Advisary, extra level of gossip
ticker*: TickerRef ## Ticker, logger descriptor ticker*: TickerRef ## Ticker, logger descriptor

View File

@ -88,13 +88,12 @@
## * stew/[interval_set, sorted_set], ## * stew/[interval_set, sorted_set],
## * "."/[sync_desc, sync_sched, protocol] ## * "."/[sync_desc, sync_sched, protocol]
## ##
{.push raises: [].} {.push raises: [].}
import import
std/hashes, std/hashes,
chronos, chronos,
eth/[p2p, p2p/peer_pool, p2p/private/p2p_types], eth/[keys, p2p, p2p/peer_pool],
stew/keyed_queue, stew/keyed_queue,
"."/[handlers, sync_desc] "."/[handlers, sync_desc]
@ -105,7 +104,7 @@ static:
type type
ActiveBuddies[S,W] = ##\ ActiveBuddies[S,W] = ##\
## List of active workers, using `Hash(Peer)` rather than `Peer` ## List of active workers, using `Hash(Peer)` rather than `Peer`
KeyedQueue[Hash,RunnerBuddyRef[S,W]] KeyedQueue[ENode,RunnerBuddyRef[S,W]]
RunnerSyncRef*[S,W] = ref object RunnerSyncRef*[S,W] = ref object
## Module descriptor ## Module descriptor
@ -122,8 +121,12 @@ type
## Per worker peer descriptor ## Per worker peer descriptor
dsc: RunnerSyncRef[S,W] ## Scheduler descriptor dsc: RunnerSyncRef[S,W] ## Scheduler descriptor
worker: BuddyRef[S,W] ## Worker peer data worker: BuddyRef[S,W] ## Worker peer data
zombified: Moment ## When it became undead (if any)
const const
zombieTimeToLinger = 20.seconds
## Maximum time a zombie is kept on the database.
execLoopTimeElapsedMin = 50.milliseconds execLoopTimeElapsedMin = 50.milliseconds
## Minimum elapsed time an exec loop needs for a single lap. If it is ## Minimum elapsed time an exec loop needs for a single lap. If it is
## faster, asynchroneous sleep seconds are added. in order to avoid ## faster, asynchroneous sleep seconds are added. in order to avoid
@ -143,9 +146,18 @@ const
# Private helpers # Private helpers
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
proc hash(peer: Peer): Hash = proc hash*(key: ENode): Hash =
## Needed for `buddies` table key comparison ## Mixin, needed for `buddies` table key comparison. Needs to be a public
peer.remote.id.hash ## function technically although it should be seen logically as a private
## one.
var h: Hash = 0
h = h !& hashes.hash(key.pubkey.toRaw)
h = h !& hashes.hash(key.address)
!$h
proc key(peer: Peer): ENode =
## Map to key for below table methods.
peer.remote.node
# ------------------------------------------------------------------------------ # ------------------------------------------------------------------------------
# Private functions # Private functions
@ -230,7 +242,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} =
else: else:
# Rotate connection table so the most used entry is at the top/right # Rotate connection table so the most used entry is at the top/right
# end. So zombies will end up leftish. # end. So zombies will end up leftish.
discard dsc.buddies.lruFetch(peer.hash) discard dsc.buddies.lruFetch peer.key
# Multi mode # Multi mode
if worker.ctrl.multiOk: if worker.ctrl.multiOk:
@ -280,9 +292,19 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
maxWorkers {.used.} = dsc.ctx.buddiesMax maxWorkers {.used.} = dsc.ctx.buddiesMax
nPeers {.used.} = dsc.pool.len nPeers {.used.} = dsc.pool.len
nWorkers = dsc.buddies.len nWorkers = dsc.buddies.len
if dsc.buddies.hasKey(peer.hash): zombie = dsc.buddies.eq peer.key
trace "Reconnecting zombie peer ignored", peer, nPeers, nWorkers, maxWorkers if zombie.isOk:
return let
now = Moment.now()
ttz = zombie.value.zombified + zombieTimeToLinger
if ttz < Moment.now():
trace "Reconnecting zombie peer ignored", peer,
nPeers, nWorkers, maxWorkers, canRequeue=(now-ttz)
return
# Zombie can be removed from the database
dsc.buddies.del peer.key
trace "Zombie peer timeout, ready for requeing", peer,
nPeers, nWorkers, maxWorkers
# Initialise worker for this peer # Initialise worker for this peer
let buddy = RunnerBuddyRef[S,W]( let buddy = RunnerBuddyRef[S,W](
@ -313,7 +335,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
leastPeer.worker.runStop() leastPeer.worker.runStop()
# Add peer entry # Add peer entry
discard dsc.buddies.lruAppend(peer.hash, buddy, dsc.ctx.buddiesMax) discard dsc.buddies.lruAppend(peer.key, buddy, dsc.ctx.buddiesMax)
trace "Running peer worker", peer, nPeers, trace "Running peer worker", peer, nPeers,
nWorkers=dsc.buddies.len, maxWorkers nWorkers=dsc.buddies.len, maxWorkers
@ -326,17 +348,22 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) =
nPeers = dsc.pool.len nPeers = dsc.pool.len
maxWorkers = dsc.ctx.buddiesMax maxWorkers = dsc.ctx.buddiesMax
nWorkers = dsc.buddies.len nWorkers = dsc.buddies.len
rc = dsc.buddies.eq(peer.hash) rc = dsc.buddies.eq peer.key
if rc.isErr: if rc.isErr:
debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers
return return
if rc.value.worker.ctrl.zombie: if rc.value.worker.ctrl.zombie:
# Don't disconnect, leave them fall out of the LRU cache. The effect is, # Don't disconnect, leave them fall out of the LRU cache. The effect is,
# that reconnecting might be blocked, for a while. # that reconnecting might be blocked, for a while. For few peers cases,
# the start of zombification is registered so that a zombie can eventually
# be let die and buried.
rc.value.worker = nil
rc.value.dsc = nil
rc.value.zombified = Moment.now()
trace "Disconnected, zombie", peer, nPeers, nWorkers, maxWorkers trace "Disconnected, zombie", peer, nPeers, nWorkers, maxWorkers
else: else:
rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere
dsc.buddies.del(peer.hash) dsc.buddies.del peer.key
trace "Disconnected buddy", peer, nPeers, trace "Disconnected buddy", peer, nPeers,
nWorkers=dsc.buddies.len, maxWorkers nWorkers=dsc.buddies.len, maxWorkers