Snap sync update pivot updating via rpc (#1583)

* Unit tests update, code cosmetics

* Fix segfault with zombie handling

why:
  In order to save memory, the data records of zombie entries are removed
  and only the key (aka peer node) is kept. Consequently, logging these
  zombies can only be done by the key.

* Allow to accept V2 payload without `shanghaiTime` set while syncing

why:
  Currently, `shanghaiTime` is missing (alt least) while snap syncing. So
  beacon node headers can be processed regardless. Normal (aka strict)
  processing will be automatically restored when leaving snap sync mode.
This commit is contained in:
Jordan Hrycaj 2023-05-16 14:52:44 +01:00 committed by GitHub
parent 4e332868b7
commit 4c865ec884
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 72 additions and 24 deletions

View File

@ -71,6 +71,13 @@ type
consensusType: ConsensusType
syncReqNewHead: SyncReqNewHeadCB
## Call back function for the sync processor. This function stages
## the arguent header to a private aerea for subsequent processing.
syncReqRelaxV2: bool
## Allow processing of certain RPC/V2 messages type while syncing (i.e.
## `syncReqNewHead` is set.) although `shanghaiTime` is unavailable
## or has not reached, yet.
startOfHistory: Hash256
## This setting is needed for resuming blockwise syncying after
@ -450,6 +457,9 @@ func syncCurrent*(com: CommonRef): BlockNumber =
func syncHighest*(com: CommonRef): BlockNumber =
com.syncProgress.highest
func syncReqRelaxV2*(com: CommonRef): bool =
com.syncReqRelaxV2
# ------------------------------------------------------------------------------
# Setters
# ------------------------------------------------------------------------------
@ -480,7 +490,20 @@ proc setFork*(com: CommonRef, fork: HardFork): Hardfork =
com.consensusTransition(fork)
proc `syncReqNewHead=`*(com: CommonRef; cb: SyncReqNewHeadCB) =
## Activate or reset a call back handler for syncing. When resetting (by
## passing `cb` as `nil`), the `syncReqRelaxV2` value is also reset.
com.syncReqNewHead = cb
if cb.isNil:
com.syncReqRelaxV2 = false
proc `syncReqRelaxV2=`*(com: CommonRef; val: bool) =
## Allow processing of certain RPC/V2 messages type while syncing (i.e.
## `syncReqNewHead` is set.) although `shanghaiTime` is unavailable
## or has not reached, yet.
##
## This setter is effective only while `syncReqNewHead` is activated.
if not com.syncReqNewHead.isNil:
com.syncReqRelaxV2 = val
# ------------------------------------------------------------------------------
# End

View File

@ -98,10 +98,16 @@ proc handle_newPayload(sealingEngine: SealingEngineRef, api: EngineApiRef, com:
if com.isShanghaiOrLater(fromUnix(payload.timestamp.unsafeQuantityToInt64)):
when not(payload is ExecutionPayloadV2):
raise invalidParams("if timestamp is Shanghai or later, payload must be ExecutionPayloadV2")
raise invalidParams("if timestamp is Shanghai or later, " &
"payload must be ExecutionPayloadV2")
else:
when not(payload is ExecutionPayloadV1):
raise invalidParams("if timestamp is earlier than Shanghai, payload must be ExecutionPayloadV1")
if com.syncReqRelaxV2:
trace "Relaxed mode, treating payload as V1"
discard
else:
raise invalidParams("if timestamp is earlier than Shanghai, " &
"payload must be ExecutionPayloadV1")
var header = toBlockHeader(payload)
let blockHash = payload.blockHash.asEthHash
@ -489,7 +495,7 @@ proc setupEngineAPI*(
# cannot use `params` as param name. see https:#github.com/status-im/nim-json-rpc/issues/128
server.rpc("engine_newPayloadV1") do(payload: ExecutionPayloadV1) -> PayloadStatusV1:
return handle_newPayload(sealingEngine, api, com, maybeAsyncDataSource, payload)
server.rpc("engine_newPayloadV2") do(payload: ExecutionPayloadV1OrV2) -> PayloadStatusV1:
let p = payload.toExecutionPayloadV1OrExecutionPayloadV2
if p.isOk:

View File

@ -11,7 +11,7 @@
{.push raises: [].}
import
std/[sequtils, strutils],
std/sequtils,
chronicles,
chronos,
eth/[common, p2p, trie/nibbles],

View File

@ -305,6 +305,7 @@ proc stopBuddy*(t: TickerRef) =
if not t.isNil:
t.nBuddies.dec
if t.nBuddies <= 0 and not t.fullMode and not t.snap.recovery:
t.nBuddies = 0
t.stopImpl()
when extraTraceMessages:
debug logTxt "stop (buddy)", fullMode=t.fullMode, nBuddies=t.nBuddies

View File

@ -50,6 +50,7 @@ proc releasePass(ctx: SnapCtxRef) =
proc enableRpcMagic(ctx: SnapCtxRef) =
## Helper for `setup()`: Enable external pivot update via RPC
ctx.chain.com.syncReqNewHead = ctx.updateBeaconHeaderCB
ctx.chain.com.syncReqRelaxV2 = true
proc disableRpcMagic(ctx: SnapCtxRef) =
## Helper for `release()`

View File

@ -291,7 +291,6 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
let
maxWorkers {.used.} = dsc.ctx.buddiesMax
nPeers {.used.} = dsc.pool.len
nWorkers = dsc.buddies.len
zombie = dsc.buddies.eq peer.key
if zombie.isOk:
let
@ -299,12 +298,12 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
ttz = zombie.value.zombified + zombieTimeToLinger
if ttz < Moment.now():
trace "Reconnecting zombie peer ignored", peer,
nPeers, nWorkers, maxWorkers, canRequeue=(now-ttz)
nPeers, nWorkers=dsc.buddies.len, maxWorkers, canRequeue=(now-ttz)
return
# Zombie can be removed from the database
dsc.buddies.del peer.key
trace "Zombie peer timeout, ready for requeing", peer,
nPeers, nWorkers, maxWorkers
nPeers, nWorkers=dsc.buddies.len, maxWorkers
# Initialise worker for this peer
let buddy = RunnerBuddyRef[S,W](
@ -314,25 +313,31 @@ proc onPeerConnected[S,W](dsc: RunnerSyncRef[S,W]; peer: Peer) =
ctrl: BuddyCtrlRef(),
peer: peer))
if not buddy.worker.runStart():
trace "Ignoring useless peer", peer, nPeers, nWorkers, maxWorkers
trace "Ignoring useless peer", peer, nPeers,
nWorkers=dsc.buddies.len, maxWorkers
buddy.worker.ctrl.zombie = true
return
# Check for table overflow. An overflow might happen if there are zombies
# in the table (though preventing them from re-connecting for a while.)
if dsc.ctx.buddiesMax <= nWorkers:
let leastPeer = dsc.buddies.shift.value.data
if leastPeer.worker.ctrl.zombie:
if dsc.ctx.buddiesMax <= dsc.buddies.len:
let
leastVal = dsc.buddies.shift.value # unqueue first/least item
oldest = leastVal.data.worker
if oldest.isNil:
trace "Dequeuing zombie peer",
oldest=leastPeer.worker, nPeers, nWorkers=dsc.buddies.len, maxWorkers
# Fake `Peer` pretty print for `oldest`
oldest=("Node[" & $leastVal.key.address & "]"),
since=leastVal.data.zombified, nPeers, nWorkers=dsc.buddies.len,
maxWorkers
discard
else:
# This could happen if there are idle entries in the table, i.e.
# somehow hanging runners.
trace "Peer table full! Dequeuing least used entry",
oldest=leastPeer.worker, nPeers, nWorkers=dsc.buddies.len, maxWorkers
leastPeer.worker.ctrl.zombie = true
leastPeer.worker.runStop()
trace "Peer table full! Dequeuing least used entry", oldest,
nPeers, nWorkers=dsc.buddies.len, maxWorkers
oldest.ctrl.zombie = true
oldest.runStop()
# Add peer entry
discard dsc.buddies.lruAppend(peer.key, buddy, dsc.ctx.buddiesMax)
@ -351,8 +356,12 @@ proc onPeerDisconnected[S,W](dsc: RunnerSyncRef[S,W], peer: Peer) =
rc = dsc.buddies.eq peer.key
if rc.isErr:
debug "Disconnected, unregistered peer", peer, nPeers, nWorkers, maxWorkers
return
if rc.value.worker.ctrl.zombie:
discard
elif rc.value.worker.isNil:
# Re-visiting zombie
trace "Ignore zombie", peer, nPeers, nWorkers, maxWorkers
discard
elif rc.value.worker.ctrl.zombie:
# Don't disconnect, leave them fall out of the LRU cache. The effect is,
# that reconnecting might be blocked, for a while. For few peers cases,
# the start of zombification is registered so that a zombie can eventually

View File

@ -42,7 +42,7 @@ const
accSample = snapTest0
storSample = snapTest4
# # Number of database slots available
# Number of database slots available
nTestDbInstances = 9
type
@ -473,7 +473,7 @@ when isMainModule:
when true: # and false:
noisy.miscRunner()
# Test database dnapdhot handling. The test samples ate too big for
# Test database snapshot handling. The test samples ate too big for
# `nimbus-eth1` so they are available on `nimbus-eth1-blobs.`
when true: # or false
import ./test_sync_snap/snap_syncdb_xx

View File

@ -15,6 +15,14 @@ import
const
snapSyncdb0* = SnapSyncSpecs(
name: "main-snap",
network: MainNet,
snapDump: "mainnet=64.txt.gz",
tailBlocks: "mainnet332160.txt.gz",
pivotBlock: 64u64,
nItems: 100)
snapSyncdb1* = SnapSyncSpecs(
name: "main-snap",
network: MainNet,
snapDump: "mainnet=128.txt.gz",
@ -22,7 +30,7 @@ const
pivotBlock: 128u64,
nItems: 500)
snapSyncdb1* = SnapSyncSpecs(
snapSyncdb2* = SnapSyncSpecs(
name: "main-snap",
network: MainNet,
snapDump: "mainnet=500.txt.gz",
@ -30,7 +38,7 @@ const
pivotBlock: 500u64,
nItems: 500)
snapSyncdb2* = SnapSyncSpecs(
snapSyncdb3* = SnapSyncSpecs(
name: "main-snap",
network: MainNet,
snapDump: "mainnet=1000.txt.gz",
@ -38,7 +46,7 @@ const
pivotBlock: 1000u64,
nItems: 500)
snapSyncdb3* = SnapSyncSpecs(
snapSyncdb4* = SnapSyncSpecs(
name: "main-snap",
network: MainNet,
snapDump: "mainnet=300000.txt.gz",
@ -47,6 +55,6 @@ const
nItems: 500)
snapSyncdbList* = [
snapSyncdb0, snapSyncdb1, snapSyncdb2, snapSyncdb3]
snapSyncdb0, snapSyncdb1, snapSyncdb2, snapSyncdb3, snapSyncdb4]
# End