Fluffy: State bridge clean shutdown (#2996)

* Implement clean shutdown for state bridge.

* Handle shutdown in history and beacon bridges.
This commit is contained in:
bhartnett 2025-01-16 08:48:10 +08:00 committed by GitHub
parent 232a9ad247
commit 19ea82bf50
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 107 additions and 42 deletions

View File

@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -47,6 +47,7 @@
import
chronos,
chronicles,
confutils,
confutils/std/net,
../../logging,
@ -54,6 +55,17 @@ import
portal_bridge_conf, portal_bridge_beacon, portal_bridge_history, portal_bridge_state
]
type PortalBridgeStatus = enum
Running
Stopping
template pollWhileRunning(status: PortalBridgeStatus) =
while status == PortalBridgeStatus.Running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg
when isMainModule:
{.pop.}
let config = PortalBridgeConf.load()
@ -61,10 +73,38 @@ when isMainModule:
setupLogging(config.logLevel, config.logStdout, none(OutFile))
var bridgeStatus = PortalBridgeStatus.Running
# Ctrl+C handling
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
try:
setupForeignThreadGc()
except Exception as e:
raiseAssert e.msg # shouldn't happen
notice "Shutting down after having received SIGINT"
bridgeStatus = PortalBridgeStatus.Stopping
try:
setControlCHook(controlCHandler)
except Exception as e: # TODO Exception
warn "Cannot set ctrl-c handler", msg = e.msg
case config.cmd
of PortalBridgeCmd.beacon:
runBeacon(config)
pollWhileRunning(bridgeStatus)
# TODO: Implement stop/cleanup for beacon bridge
of PortalBridgeCmd.history:
runHistory(config)
pollWhileRunning(bridgeStatus)
# TODO: Implement stop/cleanup for history bridge
of PortalBridgeCmd.state:
runState(config)
let bridge = waitFor runState(config)
pollWhileRunning(bridgeStatus)
waitFor bridge.stop()

View File

@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2023-2024 Status Research & Development GmbH
# Copyright (c) 2023-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -465,6 +465,3 @@ proc runBeacon*(config: PortalBridgeConf) {.raises: [CatchableError].} =
)
asyncSpawn runOnSlotLoop()
while true:
poll()

View File

@ -1,5 +1,5 @@
# Fluffy
# Copyright (c) 2024 Status Research & Development GmbH
# Copyright (c) 2024-2025 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
@ -543,6 +543,3 @@ proc runHistory*(config: PortalBridgeConf) =
asyncSpawn bridge.runBackfillLoop(
config.era1Dir.string, config.startEra, config.endEra
)
while true:
poll()

View File

@ -51,7 +51,7 @@ type
blockOffersQueue: AsyncQueue[BlockOffersRef]
gossipBlockOffersLoop: Future[void]
PortalStateBridge = ref object
PortalStateBridge* = ref object
web3Client: RpcClient
web3Url: JsonRpcUrl
db: DatabaseRef
@ -113,10 +113,10 @@ proc recursiveCollectOffer(
# continue the recursive collect
offersMap.recursiveCollectOffer(offerWithKey.getParent())
proc runBackfillCollectBlockDataLoop(
proc runCollectBlockDataLoop(
bridge: PortalStateBridge, startBlockNumber: uint64
) {.async: (raises: []).} =
info "Starting state backfill collect block data loop"
info "Starting collect block data loop"
try:
bridge.web3Client = newRpcClientConnect(bridge.web3Url)
@ -197,13 +197,13 @@ proc runBackfillCollectBlockDataLoop(
except CancelledError:
trace "collectBlockDataLoop canceled"
proc runBackfillBuildBlockOffersLoop(
proc runBuildBlockOffersLoop(
bridge: PortalStateBridge,
verifyStateProofs: bool,
enableGossip: bool,
gossipGenesis: bool,
) {.async: (raises: []).} =
info "Starting state backfill build block offers loop"
info "Starting build block offers loop"
try:
# wait for the first block data to be put on the queue
@ -225,8 +225,8 @@ proc runBackfillBuildBlockOffersLoop(
genesisAccounts =
try:
genesisBlockForNetwork(MainNet).alloc
except CatchableError as e:
raiseAssert(e.msg) # Should never happen
except ValueError, RlpError:
raiseAssert("Unable to get genesis accounts") # Should never happen
worldState.applyGenesisAccounts(genesisAccounts)
if enableGossip and gossipGenesis:
@ -299,10 +299,10 @@ proc runBackfillBuildBlockOffersLoop(
except CancelledError:
trace "buildBlockOffersLoop canceled"
proc runBackfillGossipBlockOffersLoop(
proc runGossipBlockOffersLoop(
worker: PortalStateGossipWorker, verifyGossip: bool, skipGossipForExisting: bool
) {.async: (raises: []).} =
info "Starting state backfill gossip block offers loop", workerId = worker.id
info "Starting gossip block offers loop", workerId = worker.id
try:
# Create one client per worker in order to improve performance.
@ -354,6 +354,8 @@ proc runBackfillGossipBlockOffersLoop(
await worker.portalClient.portal_stateGetContent(k.to0xHex())
if contentInfo.content.len() > 0:
gossipContent = false
except CancelledError as e:
raise e
except CatchableError as e:
debug "Unable to find existing content. Will attempt to gossip content: ",
contentKey = k.to0xHex(), error = e.msg, workerId = worker.id
@ -373,6 +375,8 @@ proc runBackfillGossipBlockOffersLoop(
warn "Offer gossipped to no peers", workerId = worker.id
retryGossip = true
break
except CancelledError as e:
raise e
except CatchableError as e:
error "Failed to gossip offer to peers", error = e.msg, workerId = worker.id
retryGossip = true
@ -395,6 +399,8 @@ proc runBackfillGossipBlockOffersLoop(
retryGossip = true
break
foundContentKeys.add(k)
except CancelledError as e:
raise e
except CatchableError as e:
warn "Unable to find content with key. Will retry gossipping content:",
contentKey = k.to0xHex(), error = e.msg, workerId = worker.id
@ -432,8 +438,8 @@ proc runBackfillGossipBlockOffersLoop(
except CancelledError:
trace "gossipBlockOffersLoop canceled"
proc runBackfillMetricsLoop(bridge: PortalStateBridge) {.async: (raises: []).} =
info "Starting state backfill metrics loop"
proc runMetricsLoop(bridge: PortalStateBridge) {.async: (raises: []).} =
info "Starting metrics loop"
try:
while true:
@ -459,7 +465,7 @@ proc runBackfillMetricsLoop(bridge: PortalStateBridge) {.async: (raises: []).} =
proc validatePortalRpcEndpoints(
portalRpcUrl: JsonRpcUrl, numOfEndpoints: int
): seq[(JsonRpcUrl, NodeId)] =
): Future[seq[(JsonRpcUrl, NodeId)]] {.async: (raises: []).} =
var
uri = parseUri(portalRpcUrl.value)
endpoints = newSeq[(JsonRpcUrl, NodeId)]()
@ -474,7 +480,7 @@ proc validatePortalRpcEndpoints(
client = newRpcClientConnect(rpcUrl)
nodeId =
try:
(waitFor client.portal_stateNodeInfo()).nodeId
(await client.portal_stateNodeInfo()).nodeId
except CatchableError as e:
fatal "Failed to connect to portal client", error = $e.msg
quit QuitFailure
@ -505,31 +511,57 @@ proc validateStartBlockNumber(db: DatabaseRef, startBlockNumber: uint64) =
warn "Start block must be set to 1"
quit QuitFailure
proc start(bridge: PortalStateBridge, config: PortalBridgeConf) =
info "Starting state backfill from block: ",
proc start*(bridge: PortalStateBridge, config: PortalBridgeConf) =
info "Starting Portal state bridge from block: ",
startBlockNumber = config.startBlockNumber
bridge.collectBlockDataLoop =
bridge.runBackfillCollectBlockDataLoop(config.startBlockNumber)
bridge.buildBlockOffersLoop = bridge.runBackfillBuildBlockOffersLoop(
bridge.collectBlockDataLoop = bridge.runCollectBlockDataLoop(config.startBlockNumber)
bridge.buildBlockOffersLoop = bridge.runBuildBlockOffersLoop(
config.verifyStateProofs, config.enableGossip, config.gossipGenesis
)
bridge.metricsLoop = bridge.runBackfillMetricsLoop()
bridge.metricsLoop = bridge.runMetricsLoop()
for worker in bridge.gossipWorkers:
worker.gossipBlockOffersLoop = worker.runBackfillGossipBlockOffersLoop(
config.verifyGossip, config.skipGossipForExisting
)
worker.gossipBlockOffersLoop =
worker.runGossipBlockOffersLoop(config.verifyGossip, config.skipGossipForExisting)
# TODO: Implement stop and clean shutdown
proc stop*(bridge: PortalStateBridge) {.async: (raises: []).} =
info "Stopping Portal state bridge"
proc runState*(config: PortalBridgeConf) =
let
portalEndpoints =
validatePortalRpcEndpoints(config.portalRpcUrl, config.portalRpcEndpoints.int)
db = DatabaseRef.init(config.stateDir.string).get()
defer:
db.close()
var futures = newSeq[Future[void]]()
# Cancel loops
for worker in bridge.gossipWorkers:
if not worker.gossipBlockOffersLoop.isNil():
futures.add(worker.gossipBlockOffersLoop.cancelAndWait())
if not bridge.metricsLoop.isNil():
futures.add(bridge.metricsLoop.cancelAndWait())
if not bridge.buildBlockOffersLoop.isNil():
futures.add(bridge.buildBlockOffersLoop.cancelAndWait())
if not bridge.collectBlockDataLoop.isNil():
futures.add(bridge.collectBlockDataLoop.cancelAndWait())
await noCancel(allFutures(futures))
# Close the database
bridge.db.close()
for worker in bridge.gossipWorkers:
worker.gossipBlockOffersLoop = nil
bridge.metricsLoop = nil
bridge.buildBlockOffersLoop = nil
bridge.collectBlockDataLoop = nil
proc runState*(
config: PortalBridgeConf
): Future[PortalStateBridge] {.async: (raises: []).} =
let portalEndpoints =
await validatePortalRpcEndpoints(config.portalRpcUrl, config.portalRpcEndpoints.int)
info "Using state directory: ", stateDir = config.stateDir.string
let db = DatabaseRef.init(config.stateDir.string).get()
validateStartBlockNumber(db, config.startBlockNumber)
@ -555,5 +587,4 @@ proc runState*(config: PortalBridgeConf) =
bridge.start(config)
while true:
poll()
return bridge