From 4c865ec88456551281f15da66f3be91400650409 Mon Sep 17 00:00:00 2001 From: Jordan Hrycaj Date: Tue, 16 May 2023 14:52:44 +0100 Subject: [PATCH] Snap sync update pivot updating via rpc (#1583) * Unit tests update, code cosmetics * Fix segfault with zombie handling why: In order to save memory, the data records of zombie entries are removed and only the key (aka peer node) is kept. Consequently, logging these zombies can only be done by the key. * Allow to accept V2 payload without `shanghaiTime` set while syncing why: Currently, `shanghaiTime` is missing (alt least) while snap syncing. So beacon node headers can be processed regardless. Normal (aka strict) processing will be automatically restored when leaving snap sync mode. --- nimbus/common/common.nim | 23 ++++++++++++++ nimbus/rpc/engine_api.nim | 12 +++++-- nimbus/sync/handlers/snap.nim | 2 +- nimbus/sync/misc/ticker.nim | 1 + nimbus/sync/snap/worker/pass/pass_init.nim | 1 + nimbus/sync/sync_sched.nim | 37 ++++++++++++++-------- tests/test_sync_snap.nim | 4 +-- tests/test_sync_snap/snap_syncdb_xx.nim | 16 +++++++--- 8 files changed, 72 insertions(+), 24 deletions(-) diff --git a/nimbus/common/common.nim b/nimbus/common/common.nim index 9fbb8c91b..595af1348 100644 --- a/nimbus/common/common.nim +++ b/nimbus/common/common.nim @@ -71,6 +71,13 @@ type consensusType: ConsensusType syncReqNewHead: SyncReqNewHeadCB + ## Call back function for the sync processor. This function stages + ## the arguent header to a private aerea for subsequent processing. + + syncReqRelaxV2: bool + ## Allow processing of certain RPC/V2 messages type while syncing (i.e. + ## `syncReqNewHead` is set.) although `shanghaiTime` is unavailable + ## or has not reached, yet. startOfHistory: Hash256 ## This setting is needed for resuming blockwise syncying after @@ -450,6 +457,9 @@ func syncCurrent*(com: CommonRef): BlockNumber = func syncHighest*(com: CommonRef): BlockNumber = com.syncProgress.highest +func syncReqRelaxV2*(com: CommonRef): bool = + com.syncReqRelaxV2 + # ------------------------------------------------------------------------------ # Setters # ------------------------------------------------------------------------------ @@ -480,7 +490,20 @@ proc setFork*(com: CommonRef, fork: HardFork): Hardfork = com.consensusTransition(fork) proc `syncReqNewHead=`*(com: CommonRef; cb: SyncReqNewHeadCB) = + ## Activate or reset a call back handler for syncing. When resetting (by + ## passing `cb` as `nil`), the `syncReqRelaxV2` value is also reset. com.syncReqNewHead = cb + if cb.isNil: + com.syncReqRelaxV2 = false + +proc `syncReqRelaxV2=`*(com: CommonRef; val: bool) = + ## Allow processing of certain RPC/V2 messages type while syncing (i.e. + ## `syncReqNewHead` is set.) although `shanghaiTime` is unavailable + ## or has not reached, yet. + ## + ## This setter is effective only while `syncReqNewHead` is activated. + if not com.syncReqNewHead.isNil: + com.syncReqRelaxV2 = val # ------------------------------------------------------------------------------ # End diff --git a/nimbus/rpc/engine_api.nim b/nimbus/rpc/engine_api.nim index 0df0b40e1..7782b17df 100644 --- a/nimbus/rpc/engine_api.nim +++ b/nimbus/rpc/engine_api.nim @@ -98,10 +98,16 @@ proc handle_newPayload(sealingEngine: SealingEngineRef, api: EngineApiRef, com: if com.isShanghaiOrLater(fromUnix(payload.timestamp.unsafeQuantityToInt64)): when not(payload is ExecutionPayloadV2): - raise invalidParams("if timestamp is Shanghai or later, payload must be ExecutionPayloadV2") + raise invalidParams("if timestamp is Shanghai or later, " & + "payload must be ExecutionPayloadV2") else: when not(payload is ExecutionPayloadV1): - raise invalidParams("if timestamp is earlier than Shanghai, payload must be ExecutionPayloadV1") + if com.syncReqRelaxV2: + trace "Relaxed mode, treating payload as V1" + discard + else: + raise invalidParams("if timestamp is earlier than Shanghai, " & + "payload must be ExecutionPayloadV1") var header = toBlockHeader(payload) let blockHash = payload.blockHash.asEthHash @@ -489,7 +495,7 @@ proc setupEngineAPI*( # cannot use `params` as param name. see https:#github.com/status-im/nim-json-rpc/issues/128 server.rpc("engine_newPayloadV1") do(payload: ExecutionPayloadV1) -> PayloadStatusV1: return handle_newPayload(sealingEngine, api, com, maybeAsyncDataSource, payload) - + server.rpc("engine_newPayloadV2") do(payload: ExecutionPayloadV1OrV2) -> PayloadStatusV1: let p = payload.toExecutionPayloadV1OrExecutionPayloadV2 if p.isOk: diff --git a/nimbus/sync/handlers/snap.nim b/nimbus/sync/handlers/snap.nim index e4a19bb90..498c6ff09 100644 --- a/nimbus/sync/handlers/snap.nim +++ b/nimbus/sync/handlers/snap.nim @@ -11,7 +11,7 @@ {.push raises: [].} import - std/[sequtils, strutils], + std/sequtils, chronicles, chronos, eth/[common, p2p, trie/nibbles], diff --git a/nimbus/sync/misc/ticker.nim b/nimbus/sync/misc/ticker.nim index bf4b94827..f378c30e1 100644 --- a/nimbus/sync/misc/ticker.nim +++ b/nimbus/sync/misc/ticker.nim @@ -305,6 +305,7 @@ proc stopBuddy*(t: TickerRef) = if not t.isNil: t.nBuddies.dec if t.nBuddies <= 0 and not t.fullMode and not t.snap.recovery: + t.nBuddies = 0 t.stopImpl() when extraTraceMessages: debug logTxt "stop (buddy)", fullMode=t.fullMode, nBuddies=t.nBuddies diff --git a/nimbus/sync/snap/worker/pass/pass_init.nim b/nimbus/sync/snap/worker/pass/pass_init.nim index 633021e34..05d7c68b2 100644 --- a/nimbus/sync/snap/worker/pass/pass_init.nim +++ b/nimbus/sync/snap/worker/pass/pass_init.nim @@ -50,6 +50,7 @@ proc releasePass(ctx: SnapCtxRef) = proc enableRpcMagic(ctx: SnapCtxRef) = ## Helper for `setup()`: Enable external pivot update via RPC ctx.chain.com.syncReqNewHead = ctx.updateBeaconHeaderCB + ctx.chain.com.syncReqRelaxV2 = true proc disableRpcMagic(ctx: SnapCtxRef) = ## Helper for `release()` diff --git a/nimbus/sync/sync_sched.nim b/nimbus/sync/sync_sched.nim index 39909a51c..361610523 100644 --- a/nimbus/sync/sync_sched.nim +++ b/nimbus/sync/sync_sched.nim @@ -291,7 +291,6 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = let maxWorkers {.used.} = dsc.ctx.buddiesMax nPeers {.used.} = dsc.pool.len - nWorkers = dsc.buddies.len zombie = dsc.buddies.eq peer.key if zombie.isOk: let @@ -299,12 +298,12 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = ttz = zombie.value.zombified + zombieTimeToLinger if ttz < Moment.now(): trace "Reconnecting zombie peer ignored", peer, - nPeers, nWorkers, maxWorkers, canRequeue=(now-ttz) + nPeers, nWorkers=dsc.buddies.len, maxWorkers, canRequeue=(now-ttz) return # Zombie can be removed from the database dsc.buddies.del peer.key trace "Zombie peer timeout, ready for requeing", peer, - nPeers, nWorkers, maxWorkers + nPeers, nWorkers=dsc.buddies.len, maxWorkers # Initialise worker for this peer let buddy = RunnerBuddyRef[S,W]( @@ -314,25 +313,31 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) = ctrl: BuddyCtrlRef(), peer: peer)) if not buddy.worker.runStart(): - trace "Ignoring useless peer", peer, nPeers, nWorkers, maxWorkers + trace "Ignoring useless peer", peer, nPeers, + nWorkers=dsc.buddies.len, maxWorkers buddy.worker.ctrl.zombie = true return # Check for table overflow. An overflow might happen if there are zombies # in the table (though preventing them from re-connecting for a while.) - if dsc.ctx.buddiesMax <= nWorkers: - let leastPeer = dsc.buddies.shift.value.data - if leastPeer.worker.ctrl.zombie: + if dsc.ctx.buddiesMax <= dsc.buddies.len: + let + leastVal = dsc.buddies.shift.value # unqueue first/least item + oldest = leastVal.data.worker + if oldest.isNil: trace "Dequeuing zombie peer", - oldest=leastPeer.worker, nPeers, nWorkers=dsc.buddies.len, maxWorkers + # Fake `Peer` pretty print for `oldest` + oldest=("Node[" & $leastVal.key.address & "]"), + since=leastVal.data.zombified, nPeers, nWorkers=dsc.buddies.len, + maxWorkers discard else: # This could happen if there are idle entries in the table, i.e. # somehow hanging runners. - trace "Peer table full! Dequeuing least used entry", - oldest=leastPeer.worker, nPeers, nWorkers=dsc.buddies.len, maxWorkers - leastPeer.worker.ctrl.zombie = true - leastPeer.worker.runStop() + trace "Peer table full! Dequeuing least used entry", oldest, + nPeers, nWorkers=dsc.buddies.len, maxWorkers + oldest.ctrl.zombie = true + oldest.runStop() # Add peer entry discard dsc.buddies.lruAppend(peer.key, buddy, dsc.ctx.buddiesMax) @@ -351,8 +356,12 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) = rc = dsc.buddies.eq peer.key if rc.isErr: debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers - return - if rc.value.worker.ctrl.zombie: + discard + elif rc.value.worker.isNil: + # Re-visiting zombie + trace "Ignore zombie", peer, nPeers, nWorkers, maxWorkers + discard + elif rc.value.worker.ctrl.zombie: # Don't disconnect, leave them fall out of the LRU cache. The effect is, # that reconnecting might be blocked, for a while. For few peers cases, # the start of zombification is registered so that a zombie can eventually diff --git a/tests/test_sync_snap.nim b/tests/test_sync_snap.nim index 4f11f4d71..9c8bccbd8 100644 --- a/tests/test_sync_snap.nim +++ b/tests/test_sync_snap.nim @@ -42,7 +42,7 @@ const accSample = snapTest0 storSample = snapTest4 - # # Number of database slots available + # Number of database slots available nTestDbInstances = 9 type @@ -473,7 +473,7 @@ when isMainModule: when true: # and false: noisy.miscRunner() - # Test database dnapdhot handling. The test samples ate too big for + # Test database snapshot handling. The test samples ate too big for # `nimbus-eth1` so they are available on `nimbus-eth1-blobs.` when true: # or false import ./test_sync_snap/snap_syncdb_xx diff --git a/tests/test_sync_snap/snap_syncdb_xx.nim b/tests/test_sync_snap/snap_syncdb_xx.nim index 35a58be96..b85f95d20 100644 --- a/tests/test_sync_snap/snap_syncdb_xx.nim +++ b/tests/test_sync_snap/snap_syncdb_xx.nim @@ -15,6 +15,14 @@ import const snapSyncdb0* = SnapSyncSpecs( + name: "main-snap", + network: MainNet, + snapDump: "mainnet=64.txt.gz", + tailBlocks: "mainnet332160.txt.gz", + pivotBlock: 64u64, + nItems: 100) + + snapSyncdb1* = SnapSyncSpecs( name: "main-snap", network: MainNet, snapDump: "mainnet=128.txt.gz", @@ -22,7 +30,7 @@ const pivotBlock: 128u64, nItems: 500) - snapSyncdb1* = SnapSyncSpecs( + snapSyncdb2* = SnapSyncSpecs( name: "main-snap", network: MainNet, snapDump: "mainnet=500.txt.gz", @@ -30,7 +38,7 @@ const pivotBlock: 500u64, nItems: 500) - snapSyncdb2* = SnapSyncSpecs( + snapSyncdb3* = SnapSyncSpecs( name: "main-snap", network: MainNet, snapDump: "mainnet=1000.txt.gz", @@ -38,7 +46,7 @@ const pivotBlock: 1000u64, nItems: 500) - snapSyncdb3* = SnapSyncSpecs( + snapSyncdb4* = SnapSyncSpecs( name: "main-snap", network: MainNet, snapDump: "mainnet=300000.txt.gz", @@ -47,6 +55,6 @@ const nItems: 500) snapSyncdbList* = [ - snapSyncdb0, snapSyncdb1, snapSyncdb2, snapSyncdb3] + snapSyncdb0, snapSyncdb1, snapSyncdb2, snapSyncdb3, snapSyncdb4] # End