From e1369a7c25a7f30789ea7bc02fa693a494129897 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Wed, 26 Apr 2023 16:46:42 +0100 Subject: [PATCH] Improve full sync part behaviour 4 snap sync suite (#1564) * Set maximum time for nodes to be banned. why: Useless nodes are marked zombies and banned. They a kept in a table until flushed out by new connections. This works well if there are many connections. For the case that there are a few only, a maximum time is set. When expired, zombies are flushed automatically. * Suspend full sync while block number at beacon block details: Also allows to use external setting from file (2nd line) * Resume state at full sync after restart (if any) --- nimbus/sync/misc/sync_ctrl.nim | 19 ++- nimbus/sync/snap/README.txt | 11 +- nimbus/sync/snap/worker/pass/pass_full.nim | 110 +++++++++++++----- nimbus/sync/snap/worker/pass/pass_init.nim | 23 ++++ nimbus/sync/snap/worker/pass/pass_snap.nim | 12 -- .../pass/pass_snap/helper/beacon_header.nim | 14 +-- .../sync/snap/worker/pass/pass_snap/pivot.nim | 16 +-- .../worker/pass/pass_snap/snap_pass_desc.nim | 1 - nimbus/sync/snap/worker_desc.nim | 2 + nimbus/sync/sync_sched.nim | 55 ++++++--- 10 files changed, 179 insertions(+), 84 deletions(-) diff --git a/nimbus/sync/misc/sync_ctrl.nim b/nimbus/sync/misc/sync_ctrl.nim index 7a0446c65..f477d6bdb 100644 --- a/nimbus/sync/misc/sync_ctrl.nim +++ b/nimbus/sync/misc/sync_ctrl.nim @@ -22,25 +22,31 @@ logScope: # Private functions # ------------------------------------------------------------------------------ -proc getDataLine(name: string): string {.gcsafe, raises: [IOError].} = +proc getDataLine( + name: string; + lineNum: int; + ): string {.gcsafe, raises: [IOError].} = if name.fileExists: let file = name.open defer: file.close - return (file.readAll.splitLines)[0].strip + let linesRead = file.readAll.splitLines + if lineNum < linesRead.len: + return linesRead[lineNum].strip # ------------------------------------------------------------------------------ # Public functions # ------------------------------------------------------------------------------ proc syncCtrlBlockNumberFromFile*( - fileName: Option[string]; + fileName: Option[string]; # Optional file name + lineNum = 0; # Read line from file ): Result[BlockNumber,void] = ## Returns a block number from the file name argument `fileName`. The first ## line of the file is parsed as a decimal encoded block number. if fileName.isSome: let file = fileName.get try: - let data = file.getDataLine + let data = file.getDataLine(lineNum) if 0 < data.len: let num = parse(data,UInt256) return ok(num.toBlockNumber) @@ -52,7 +58,8 @@ proc syncCtrlBlockNumberFromFile*( err() proc syncCtrlHashOrBlockNumFromFile*( - fileName: Option[string]; + fileName: Option[string]; # Optional file name + lineNum = 0; # Read line from file ): Result[HashOrNum,void] = ## Returns a block number or a hash from the file name argument `fileName`. ## A block number is decimal encoded and a hash is expexted to be a 66 hex @@ -62,7 +69,7 @@ proc syncCtrlHashOrBlockNumFromFile*( # Parse value dump and fetch a header from the peer (if any) try: - let data = file.getDataLine + let data = file.getDataLine(lineNum) if 0 < data.len: if 66 == data.len: let hash = HashOrNum( diff --git a/nimbus/sync/snap/README.txt b/nimbus/sync/snap/README.txt index ba84648b9..e4581f20b 100644 --- a/nimbus/sync/snap/README.txt +++ b/nimbus/sync/snap/README.txt @@ -1,5 +1,5 @@ -Snap sync test & debugging scenario -=================================== +Test & debugging scenario with nimbus-eth1 client/server +======================================================== Start snap/1 server @@ -40,7 +40,12 @@ Start snap/1 client # Tell nimbus to use this pivot block number. This number must be smaller # than the 2000000 written into the file full-limit.txt above. - echo 600000 > snap/snap-update.txt. + echo 600000 > snap/snap-update.txt + + # Tell nimbus to stop somewhere after 1000000 blocks have been downloaded + # with full sync follow up after snap sync has completed (2nd line of + # external setuip file.) + echo 1000000 >> snap/snap-update.txt # Tell nimbus to use this hard coded peer enode. echo enode://192d7e7a302bd4ff27f48d7852621e0d3cb863a6dd67dd44e0314a25a3aa866837f0d2460b4444dc66e7b7a2cd56a2de1c31b2a2ba4e23549bf3ba3b0c4f2eb5@127.0.0.1:30319 > snap/full-servers.txt diff --git a/nimbus/sync/snap/worker/pass/pass_full.nim b/nimbus/sync/snap/worker/pass/pass_full.nim index 18f577033..0494c54da 100644 --- a/nimbus/sync/snap/worker/pass/pass_full.nim +++ b/nimbus/sync/snap/worker/pass/pass_full.nim @@ -15,7 +15,7 @@ import chronos, eth/p2p, stew/keyed_queue, - ../../../misc/[best_pivot, block_queue, ticker], + ../../../misc/[best_pivot, block_queue, sync_ctrl, ticker], ../../../protocol, "../.."/[range_desc, worker_desc], ../db/[snapdb_desc, snapdb_persistent], @@ -25,23 +25,24 @@ import type FullPassCtxRef = ref object of RootRef ## Pass local descriptor extension for full sync process - startNumber*: Option[BlockNumber] ## History starts here (used for logging) - pivot*: BestPivotCtxRef ## Global pivot descriptor - bCtx*: BlockQueueCtxRef ## Global block queue descriptor + startNumber: Option[BlockNumber] ## History starts here (used for logging) + pivot: BestPivotCtxRef ## Global pivot descriptor + bCtx: BlockQueueCtxRef ## Global block queue descriptor + suspendAt: BlockNumber ## Suspend if persistent head is larger FullPassBuddyRef = ref object of RootRef ## Pass local descriptor extension for full sync process - pivot*: BestPivotWorkerRef ## Local pivot worker descriptor - queue*: BlockQueueWorkerRef ## Block queue worker + pivot: BestPivotWorkerRef ## Local pivot worker descriptor + queue: BlockQueueWorkerRef ## Block queue worker const extraTraceMessages = false # or true ## Enabled additional logging noise - dumpDatabaseOnRollOver = true # or false # <--- will go away (debugging only) + dumpDatabaseOnRollOver = false # or true # <--- will go away (debugging only) ## Dump database before switching to full sync (debugging, testing) -when dumpDatabaseOnRollOver: +when dumpDatabaseOnRollOver: # <--- will go away (debugging only) import ../../../../../tests/replay/undump_kvp # ------------------------------------------------------------------------------ @@ -81,10 +82,23 @@ proc `pass=`(only: var SnapBuddyData; val: FullPassBuddyRef) = # Private functions # ------------------------------------------------------------------------------ +proc resumeAtNumber(ctx: SnapCtxRef): BlockNumber = + ## Resume full sync (if any) + ignoreException("resumeAtNumber"): + const nBackBlocks = maxHeadersFetch div 2 + let bestNumber = ctx.chain.db.getCanonicalHead().blockNumber + if nBackBlocks < bestNumber: + return bestNumber - nBackBlocks + + proc tickerUpdater(ctx: SnapCtxRef): TickerFullStatsUpdater = result = proc: TickerFullStats = + let full = ctx.pool.pass + var stats: BlockQueueStats - ctx.pool.pass.bCtx.blockQueueStats(stats) + full.bCtx.blockQueueStats(stats) + + let suspended = 0 < full.suspendAt and full.suspendAt <= stats.topAccepted TickerFullStats( pivotBlock: ctx.pool.pass.startNumber, @@ -92,6 +106,7 @@ proc tickerUpdater(ctx: SnapCtxRef): TickerFullStatsUpdater = nextStaged: stats.nextStaged, nextUnprocessed: stats.nextUnprocessed, nStagedQueue: stats.nStagedQueue, + suspended: suspended, reOrg: stats.reOrg) @@ -158,15 +173,46 @@ proc processStaged(buddy: SnapBuddyRef): bool = return false +proc suspendDownload(buddy: SnapBuddyRef): bool = + ## Check whether downloading should be suspended + let + ctx = buddy.ctx + full = ctx.pool.pass + + # Update from RPC magic + if full.suspendAt < ctx.pool.beaconHeader.blockNumber: + full.suspendAt = ctx.pool.beaconHeader.blockNumber + + # Optionaly, some external update request + if ctx.exCtrlFile.isSome: + # Needs to be read as second line (index 1) + let rc = ctx.exCtrlFile.syncCtrlBlockNumberFromFile(1) + if rc.isOk and full.suspendAt < rc.value: + full.suspendAt = rc.value + + # Return `true` if download should be suspended + if 0 < full.suspendAt: + return full.suspendAt <= buddy.only.pass.queue.topAccepted + # ------------------------------------------------------------------------------ # Private functions, full sync admin handlers # ------------------------------------------------------------------------------ proc fullSyncSetup(ctx: SnapCtxRef) = # Set up descriptor - ctx.pool.pass = FullPassCtxRef() - ctx.pool.pass.bCtx = BlockQueueCtxRef.init() - ctx.pool.pass.pivot = BestPivotCtxRef.init(rng=ctx.pool.rng, minPeers=0) + let full = FullPassCtxRef() + ctx.pool.pass = full + + # Initialise full sync, resume from previous download (if any) + let blockNumber = ctx.resumeAtNumber() + if 0 < blockNumber: + full.startNumber = some(blockNumber) + full.bCtx = BlockQueueCtxRef.init(blockNumber + 1) + else: + full.bCtx = BlockQueueCtxRef.init() + + # Initialise peer pivots in relaxed mode (not waiting for agreeing peers) + full.pivot = BestPivotCtxRef.init(rng=ctx.pool.rng, minPeers=0) # Update ticker ctx.pool.ticker.init(cb = ctx.tickerUpdater()) @@ -211,27 +257,32 @@ proc fullSyncPool(buddy: SnapBuddyRef, last: bool; laps: int): bool = # There is a soft re-setup after switch over to full sync mode if a pivot # block header is available initialised from outside, i.e. snap sync swich. if ctx.pool.fullHeader.isSome: - let stateHeader = ctx.pool.fullHeader.unsafeGet + let + stateHeader = ctx.pool.fullHeader.unsafeGet + initFullSync = ctx.pool.pass.startNumber.isNone - # Reinialise block queue descriptor relative to current pivot + # Re-assign start number for logging (instead of genesis) ctx.pool.pass.startNumber = some(stateHeader.blockNumber) - ctx.pool.pass.bCtx = BlockQueueCtxRef.init(stateHeader.blockNumber + 1) + + if initFullSync: + # Reinitialise block queue descriptor relative to current pivot + ctx.pool.pass.bCtx = BlockQueueCtxRef.init(stateHeader.blockNumber + 1) + + # Store pivot as parent hash in database + ctx.pool.snapDb.kvDb.persistentBlockHeaderPut stateHeader + + # Instead of genesis. + ctx.chain.com.startOfHistory = stateHeader.blockHash + + when dumpDatabaseOnRollOver: # <--- will go away (debugging only) + # Dump database ... <--- will go away (debugging only) + let nRecords = # <--- will go away (debugging only) + ctx.pool.snapDb.rockDb.dumpAllDb # <--- will go away (debugging only) + trace logTxt "dumped block chain database", nRecords # Kick off ticker (was stopped by snap `release()` method) ctx.pool.ticker.start() - # Store pivot as parent hash in database - ctx.pool.snapDb.kvDb.persistentBlockHeaderPut stateHeader - - # Instead of genesis. - ctx.chain.com.startOfHistory = stateHeader.blockHash - - when dumpDatabaseOnRollOver: # <--- will go away (debugging only) - # Dump database ... <--- will go away (debugging only) - let nRecords = # <--- will go away (debugging only) - ctx.pool.snapDb.rockDb.dumpAllDb # <--- will go away (debugging only) - trace logTxt "dumped block chain database", nRecords - # Reset so that this action would not be triggered, again ctx.pool.fullHeader = none(BlockHeader) @@ -283,6 +334,11 @@ proc fullSyncMulti(buddy: SnapBuddyRef): Future[void] {.async.} = ctx = buddy.ctx bq = buddy.only.pass.queue + if buddy.suspendDownload: + # Sleep for a while, then leave + await sleepAsync(10.seconds) + return + # Fetch work item let rc = await bq.blockQueueWorker() if rc.isErr: diff --git a/nimbus/sync/snap/worker/pass/pass_init.nim b/nimbus/sync/snap/worker/pass/pass_init.nim index 2ac4e95a6..633021e34 100644 --- a/nimbus/sync/snap/worker/pass/pass_init.nim +++ b/nimbus/sync/snap/worker/pass/pass_init.nim @@ -21,6 +21,17 @@ import logScope: topics = "snap-init" +# ------------------------------------------------------------------------------ +# Private helpers +# ------------------------------------------------------------------------------ + +proc updateBeaconHeaderCB(ctx: SnapCtxRef): SyncReqNewHeadCB = + ## Update beacon header. This function is intended as a call back function + ## for the RPC module. + result = proc(h: BlockHeader) {.gcsafe, raises: [].} = + if ctx.pool.beaconHeader.blockNumber < h.blockNumber: + ctx.pool.beaconHeader = h + # ------------------------------------------------------------------------------ # Private functions # ------------------------------------------------------------------------------ @@ -36,6 +47,16 @@ proc releasePass(ctx: SnapCtxRef) = # -------------- +proc enableRpcMagic(ctx: SnapCtxRef) = + ## Helper for `setup()`: Enable external pivot update via RPC + ctx.chain.com.syncReqNewHead = ctx.updateBeaconHeaderCB + +proc disableRpcMagic(ctx: SnapCtxRef) = + ## Helper for `release()` + ctx.chain.com.syncReqNewHead = nil + +# -------------- + proc setupTicker(ctx: SnapCtxRef) = let blindTicker: TickerSnapStatsUpdater = proc: TickerSnapStats = discard @@ -64,6 +85,7 @@ proc passInitSetup*(ctx: SnapCtxRef) = ctx.setupPass() # Set up sync sub-mode specs. ctx.setupSnapDb() # Set database backend, subject to change ctx.setupTicker() # Start log/status ticker (if any) + ctx.enableRpcMagic() # Allow external pivot update via RPC # Experimental, also used for debugging if ctx.exCtrlFile.isSome: @@ -72,6 +94,7 @@ proc passInitSetup*(ctx: SnapCtxRef) = proc passInitRelease*(ctx: SnapCtxRef) = ## Global clean up + ctx.disableRpcMagic() # Disable external pivot update via RPC ctx.releaseTicker() # Stop log/status ticker (if any) ctx.releasePass() # Shut down sync methods diff --git a/nimbus/sync/snap/worker/pass/pass_snap.nim b/nimbus/sync/snap/worker/pass/pass_snap.nim index 3b11a712a..3b267deae 100644 --- a/nimbus/sync/snap/worker/pass/pass_snap.nim +++ b/nimbus/sync/snap/worker/pass/pass_snap.nim @@ -63,16 +63,6 @@ proc enableWireServices(ctx: SnapCtxRef) = # -------------- -proc enableRpcMagic(ctx: SnapCtxRef) = - ## Helper for `setup()`: Enable external pivot update via RPC - ctx.chain.com.syncReqNewHead = ctx.pivotUpdateBeaconHeaderCB - -proc disableRpcMagic(ctx: SnapCtxRef) = - ## Helper for `release()` - ctx.chain.com.syncReqNewHead = nil - -# -------------- - proc detectSnapSyncRecovery(ctx: SnapCtxRef) = ## Helper for `setup()`: Initiate snap sync recovery (if any) let rc = ctx.pool.snapDb.pivotRecoverDB() @@ -171,12 +161,10 @@ proc snapSyncSetup(ctx: SnapCtxRef) = ctx.pool.pass.coveredAccounts = NodeTagRangeSet.init() ctx.pool.ticker.init(cb = ctx.pool.pass.pivotTable.tickerStats(ctx)) - ctx.enableRpcMagic() # Allow external pivot update via RPC ctx.disableWireServices() # Stop unwanted public services ctx.detectSnapSyncRecovery() # Check for recovery mode proc snapSyncRelease(ctx: SnapCtxRef) = - ctx.disableRpcMagic() # Disable external pivot update via RPC ctx.enableWireServices() # re-enable public services ctx.pool.ticker.stop() diff --git a/nimbus/sync/snap/worker/pass/pass_snap/helper/beacon_header.nim b/nimbus/sync/snap/worker/pass/pass_snap/helper/beacon_header.nim index eec4f93ec..516c570e4 100644 --- a/nimbus/sync/snap/worker/pass/pass_snap/helper/beacon_header.nim +++ b/nimbus/sync/snap/worker/pass/pass_snap/helper/beacon_header.nim @@ -35,14 +35,13 @@ proc beaconHeaderUpdatebuBlockNumber*( ## This function is typically used for testing and debugging. let ctx = buddy.ctx - snap = ctx.pool.pass peer = buddy.peer trace "fetch beacon header", peer, num - if snap.beaconHeader.blockNumber < num: + if ctx.pool.beaconHeader.blockNumber < num: let rc = await buddy.getBlockHeader(num) if rc.isOk: - snap.beaconHeader = rc.value + ctx.pool.beaconHeader = rc.value proc beaconHeaderUpdateFromFile*( @@ -62,7 +61,6 @@ proc beaconHeaderUpdateFromFile*( return rc.value - snap = ctx.pool.pass peer = buddy.peer var @@ -74,20 +72,20 @@ proc beaconHeaderUpdateFromFile*( if isHash: let hash = hashOrNum.hash trace "External beacon info", peer, hash - if hash != snap.beaconHeader.hash: + if hash != ctx.pool.beaconHeader.hash: rc = await buddy.getBlockHeader(hash) else: let num = hashOrNum.number trace "External beacon info", peer, num - if snap.beaconHeader.blockNumber < num: + if ctx.pool.beaconHeader.blockNumber < num: rc = await buddy.getBlockHeader(num) except CatchableError as e: trace "Exception while parsing beacon info", peer, isHash, name=($e.name), msg=(e.msg) if rc.isOk: - if snap.beaconHeader.blockNumber < rc.value.blockNumber: - snap.beaconHeader = rc.value + if ctx.pool.beaconHeader.blockNumber < rc.value.blockNumber: + ctx.pool.beaconHeader = rc.value # ------------------------------------------------------------------------------ # End diff --git a/nimbus/sync/snap/worker/pass/pass_snap/pivot.nim b/nimbus/sync/snap/worker/pass/pass_snap/pivot.nim index dcda0427b..45acfe5b8 100644 --- a/nimbus/sync/snap/worker/pass/pass_snap/pivot.nim +++ b/nimbus/sync/snap/worker/pass/pass_snap/pivot.nim @@ -188,8 +188,8 @@ proc tickerStats*( procChunks = env.fetchAccounts.processed.chunks stoQuLen = some(env.storageQueueTotal()) ctraQuLen = some(env.fetchContracts.len) - if 0 < ctx.pool.pass.beaconHeader.blockNumber: - beaconBlock = some(ctx.pool.pass.beaconHeader.blockNumber) + if 0 < ctx.pool.beaconHeader.blockNumber: + beaconBlock = some(ctx.pool.beaconHeader.blockNumber) TickerSnapStats( beaconBlock: beaconBlock, @@ -421,7 +421,7 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} = ## it will not proceed to the next scheduler task. let ctx = buddy.ctx - beaconHeader = ctx.pool.pass.beaconHeader + beaconHeader = ctx.pool.beaconHeader var pivotHeader: BlockHeader @@ -454,16 +454,6 @@ proc pivotApprovePeer*(buddy: SnapBuddyRef) {.async.} = if pivotHeader.blockNumber == 0: buddy.ctrl.stopped = true - -proc pivotUpdateBeaconHeaderCB*(ctx: SnapCtxRef): SyncReqNewHeadCB = - ## Update beacon header. This function is intended as a call back function - ## for the RPC module. - result = proc(h: BlockHeader) {.gcsafe.} = - if ctx.pool.pass.beaconHeader.blockNumber < h.blockNumber: - # when extraTraceMessages: - # trace logTxt "external beacon info update", header=h.blockNumber.toStr - ctx.pool.pass.beaconHeader = h - # ------------------------------------------------------------------------------ # Public function, debugging # ------------------------------------------------------------------------------ diff --git a/nimbus/sync/snap/worker/pass/pass_snap/snap_pass_desc.nim b/nimbus/sync/snap/worker/pass/pass_snap/snap_pass_desc.nim index 33b9c77a1..6fd945de1 100644 --- a/nimbus/sync/snap/worker/pass/pass_snap/snap_pass_desc.nim +++ b/nimbus/sync/snap/worker/pass/pass_snap/snap_pass_desc.nim @@ -92,7 +92,6 @@ type ## Global context extension, snap sync parameters, pivot table pivotTable*: PivotTable ## Per state root environment completedPivot*: SnapPivotRef ## Start full sync from here - beaconHeader*: BlockHeader ## Running on beacon chain coveredAccounts*: NodeTagRangeSet ## Derived from all available accounts covAccTimesFull*: uint ## # of 100% coverages recovery*: RecoveryRef ## Current recovery checkpoint/context diff --git a/nimbus/sync/snap/worker_desc.nim b/nimbus/sync/snap/worker_desc.nim index bdfd61150..752361a1f 100644 --- a/nimbus/sync/snap/worker_desc.nim +++ b/nimbus/sync/snap/worker_desc.nim @@ -26,6 +26,7 @@ type ## Peer-worker local descriptor data extension errors*: GetErrorStatsRef ## For error handling full*: RootRef ## Peer local full sync descriptor + # snap*: RootRef ## Peer local snap sync descriptor SnapSyncPassType* = enum ## Current sync mode, after a snapshot has been downloaded, the system @@ -47,6 +48,7 @@ type snapDb*: SnapDbRef ## Accounts snapshot DB # Info + beaconHeader*: BlockHeader ## Running on beacon chain enableTicker*: bool ## Advisary, extra level of gossip ticker*: TickerRef ## Ticker, logger descriptor diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index 52daee6fc..39909a51c 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -88,13 +88,12 @@ ## * stew/[interval_set, sorted_set], ## * "."/[sync_desc, sync_sched, protocol] ## - {.push raises: [].} import std/hashes, chronos, - eth/[p2p, p2p/peer_pool, p2p/private/p2p_types], + eth/[keys, p2p, p2p/peer_pool], stew/keyed_queue, "."/[handlers, sync_desc] @@ -105,7 +104,7 @@ static: type ActiveBuddies[S,W] = ##\ ## List of active workers, using `Hash(Peer)` rather than `Peer` - KeyedQueue[Hash,RunnerBuddyRef[S,W]] + KeyedQueue[ENode,RunnerBuddyRef[S,W]] RunnerSyncRef*[S,W] = ref object ## Module descriptor @@ -122,8 +121,12 @@ type ## Per worker peer descriptor dsc: RunnerSyncRef[S,W] ## Scheduler descriptor worker: BuddyRef[S,W] ## Worker peer data + zombified: Moment ## When it became undead (if any) const + zombieTimeToLinger = 20.seconds + ## Maximum time a zombie is kept on the database. + execLoopTimeElapsedMin = 50.milliseconds ## Minimum elapsed time an exec loop needs for a single lap. If it is ## faster, asynchroneous sleep seconds are added. in order to avoid @@ -143,9 +146,18 @@ const # Private helpers # ------------------------------------------------------------------------------ -proc hash(peer: Peer): Hash = - ## Needed for `buddies` table key comparison - peer.remote.id.hash +proc hash*(key: ENode): Hash = + ## Mixin, needed for `buddies` table key comparison. Needs to be a public + ## function technically although it should be seen logically as a private + ## one. + var h: Hash = 0 + h = h !& hashes.hash(key.pubkey.toRaw) + h = h !& hashes.hash(key.address) + !$h + +proc key(peer: Peer): ENode = + ## Map to key for below table methods. + peer.remote.node # ------------------------------------------------------------------------------ # Private functions @@ -230,7 +242,7 @@ proc workerLoop[S,W](buddy: RunnerBuddyRef[S,W]) {.async.} = else: # Rotate connection table so the most used entry is at the top/right # end. So zombies will end up leftish. - discard dsc.buddies.lruFetch(peer.hash) + discard dsc.buddies.lruFetch peer.key # Multi mode if worker.ctrl.multiOk: @@ -280,9 +292,19 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = maxWorkers {.used.} = dsc.ctx.buddiesMax nPeers {.used.} = dsc.pool.len nWorkers = dsc.buddies.len - if dsc.buddies.hasKey(peer.hash): - trace "Reconnecting zombie peer ignored", peer, nPeers, nWorkers, maxWorkers - return + zombie = dsc.buddies.eq peer.key + if zombie.isOk: + let + now = Moment.now() + ttz = zombie.value.zombified + zombieTimeToLinger + if ttz < Moment.now(): + trace "Reconnecting zombie peer ignored", peer, + nPeers, nWorkers, maxWorkers, canRequeue=(now-ttz) + return + # Zombie can be removed from the database + dsc.buddies.del peer.key + trace "Zombie peer timeout, ready for requeing", peer, + nPeers, nWorkers, maxWorkers # Initialise worker for this peer let buddy = RunnerBuddyRef[S,W]( @@ -313,7 +335,7 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = leastPeer.worker.runStop() # Add peer entry - discard dsc.buddies.lruAppend(peer.hash, buddy, dsc.ctx.buddiesMax) + discard dsc.buddies.lruAppend(peer.key, buddy, dsc.ctx.buddiesMax) trace "Running peer worker", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers @@ -326,17 +348,22 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = nPeers = dsc.pool.len maxWorkers = dsc.ctx.buddiesMax nWorkers = dsc.buddies.len - rc = dsc.buddies.eq(peer.hash) + rc = dsc.buddies.eq peer.key if rc.isErr: debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers return if rc.value.worker.ctrl.zombie: # Don't disconnect, leave them fall out of the LRU cache. The effect is, - # that reconnecting might be blocked, for a while. + # that reconnecting might be blocked, for a while. For few peers cases, + # the start of zombification is registered so that a zombie can eventually + # be let die and buried. + rc.value.worker = nil + rc.value.dsc = nil + rc.value.zombified = Moment.now() trace "Disconnected, zombie", peer, nPeers, nWorkers, maxWorkers else: rc.value.worker.ctrl.stopped = true # in case it is hanging somewhere - dsc.buddies.del(peer.hash) + dsc.buddies.del peer.key trace "Disconnected buddy", peer, nPeers, nWorkers=dsc.buddies.len, maxWorkers