From 19ea82bf504d5b3461d2888ed901be0b4c8fbd53 Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Thu, 16 Jan 2025 08:48:10 +0800 Subject: [PATCH] Fluffy: State bridge clean shutdown (#2996) * Implement clean shutdown for state bridge. * Handle shutdown in history and beacon bridges. --- fluffy/tools/portal_bridge/portal_bridge.nim | 44 ++++++++- .../portal_bridge/portal_bridge_beacon.nim | 5 +- .../portal_bridge/portal_bridge_history.nim | 5 +- .../portal_bridge/portal_bridge_state.nim | 95 ++++++++++++------- 4 files changed, 107 insertions(+), 42 deletions(-) diff --git a/fluffy/tools/portal_bridge/portal_bridge.nim b/fluffy/tools/portal_bridge/portal_bridge.nim index 692fce1df..4f7f69755 100644 --- a/fluffy/tools/portal_bridge/portal_bridge.nim +++ b/fluffy/tools/portal_bridge/portal_bridge.nim @@ -1,5 +1,5 @@ # Fluffy -# Copyright (c) 2023-2024 Status Research & Development GmbH +# Copyright (c) 2023-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -47,6 +47,7 @@ import chronos, + chronicles, confutils, confutils/std/net, ../../logging, @@ -54,6 +55,17 @@ import portal_bridge_conf, portal_bridge_beacon, portal_bridge_history, portal_bridge_state ] +type PortalBridgeStatus = enum + Running + Stopping + +template pollWhileRunning(status: PortalBridgeStatus) = + while status == PortalBridgeStatus.Running: + try: + poll() + except CatchableError as e: + warn "Exception in poll()", exc = e.name, err = e.msg + when isMainModule: {.pop.} let config = PortalBridgeConf.load() @@ -61,10 +73,38 @@ when isMainModule: setupLogging(config.logLevel, config.logStdout, none(OutFile)) + var bridgeStatus = PortalBridgeStatus.Running + + # Ctrl+C handling + proc controlCHandler() {.noconv.} = + when defined(windows): + # workaround for https://github.com/nim-lang/Nim/issues/4057 + try: + setupForeignThreadGc() + except Exception as e: + raiseAssert e.msg # shouldn't happen + + notice "Shutting down after having received SIGINT" + bridgeStatus = PortalBridgeStatus.Stopping + + try: + setControlCHook(controlCHandler) + except Exception as e: # TODO Exception + warn "Cannot set ctrl-c handler", msg = e.msg + case config.cmd of PortalBridgeCmd.beacon: runBeacon(config) + + pollWhileRunning(bridgeStatus) + # TODO: Implement stop/cleanup for beacon bridge of PortalBridgeCmd.history: runHistory(config) + + pollWhileRunning(bridgeStatus) + # TODO: Implement stop/cleanup for history bridge of PortalBridgeCmd.state: - runState(config) + let bridge = waitFor runState(config) + + pollWhileRunning(bridgeStatus) + waitFor bridge.stop() diff --git a/fluffy/tools/portal_bridge/portal_bridge_beacon.nim b/fluffy/tools/portal_bridge/portal_bridge_beacon.nim index 486ed0bbe..6863f17fd 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_beacon.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_beacon.nim @@ -1,5 +1,5 @@ # Fluffy -# Copyright (c) 2023-2024 Status Research & Development GmbH +# Copyright (c) 2023-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -465,6 +465,3 @@ proc runBeacon*(config: PortalBridgeConf) {.raises: [CatchableError].} = ) asyncSpawn runOnSlotLoop() - - while true: - poll() diff --git a/fluffy/tools/portal_bridge/portal_bridge_history.nim b/fluffy/tools/portal_bridge/portal_bridge_history.nim index 116ba1ab4..8fe3cdec8 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_history.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_history.nim @@ -1,5 +1,5 @@ # Fluffy -# Copyright (c) 2024 Status Research & Development GmbH +# Copyright (c) 2024-2025 Status Research & Development GmbH # Licensed and distributed under either of # * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT). # * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0). @@ -543,6 +543,3 @@ proc runHistory*(config: PortalBridgeConf) = asyncSpawn bridge.runBackfillLoop( config.era1Dir.string, config.startEra, config.endEra ) - - while true: - poll() diff --git a/fluffy/tools/portal_bridge/portal_bridge_state.nim b/fluffy/tools/portal_bridge/portal_bridge_state.nim index 90011bea8..4064bdf31 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_state.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_state.nim @@ -51,7 +51,7 @@ type blockOffersQueue: AsyncQueue[BlockOffersRef] gossipBlockOffersLoop: Future[void] - PortalStateBridge = ref object + PortalStateBridge* = ref object web3Client: RpcClient web3Url: JsonRpcUrl db: DatabaseRef @@ -113,10 +113,10 @@ proc recursiveCollectOffer( # continue the recursive collect offersMap.recursiveCollectOffer(offerWithKey.getParent()) -proc runBackfillCollectBlockDataLoop( +proc runCollectBlockDataLoop( bridge: PortalStateBridge, startBlockNumber: uint64 ) {.async: (raises: []).} = - info "Starting state backfill collect block data loop" + info "Starting collect block data loop" try: bridge.web3Client = newRpcClientConnect(bridge.web3Url) @@ -197,13 +197,13 @@ proc runBackfillCollectBlockDataLoop( except CancelledError: trace "collectBlockDataLoop canceled" -proc runBackfillBuildBlockOffersLoop( +proc runBuildBlockOffersLoop( bridge: PortalStateBridge, verifyStateProofs: bool, enableGossip: bool, gossipGenesis: bool, ) {.async: (raises: []).} = - info "Starting state backfill build block offers loop" + info "Starting build block offers loop" try: # wait for the first block data to be put on the queue @@ -225,8 +225,8 @@ proc runBackfillBuildBlockOffersLoop( genesisAccounts = try: genesisBlockForNetwork(MainNet).alloc - except CatchableError as e: - raiseAssert(e.msg) # Should never happen + except ValueError, RlpError: + raiseAssert("Unable to get genesis accounts") # Should never happen worldState.applyGenesisAccounts(genesisAccounts) if enableGossip and gossipGenesis: @@ -299,10 +299,10 @@ proc runBackfillBuildBlockOffersLoop( except CancelledError: trace "buildBlockOffersLoop canceled" -proc runBackfillGossipBlockOffersLoop( +proc runGossipBlockOffersLoop( worker: PortalStateGossipWorker, verifyGossip: bool, skipGossipForExisting: bool ) {.async: (raises: []).} = - info "Starting state backfill gossip block offers loop", workerId = worker.id + info "Starting gossip block offers loop", workerId = worker.id try: # Create one client per worker in order to improve performance. @@ -354,6 +354,8 @@ proc runBackfillGossipBlockOffersLoop( await worker.portalClient.portal_stateGetContent(k.to0xHex()) if contentInfo.content.len() > 0: gossipContent = false + except CancelledError as e: + raise e except CatchableError as e: debug "Unable to find existing content. Will attempt to gossip content: ", contentKey = k.to0xHex(), error = e.msg, workerId = worker.id @@ -373,6 +375,8 @@ proc runBackfillGossipBlockOffersLoop( warn "Offer gossipped to no peers", workerId = worker.id retryGossip = true break + except CancelledError as e: + raise e except CatchableError as e: error "Failed to gossip offer to peers", error = e.msg, workerId = worker.id retryGossip = true @@ -395,6 +399,8 @@ proc runBackfillGossipBlockOffersLoop( retryGossip = true break foundContentKeys.add(k) + except CancelledError as e: + raise e except CatchableError as e: warn "Unable to find content with key. Will retry gossipping content:", contentKey = k.to0xHex(), error = e.msg, workerId = worker.id @@ -432,8 +438,8 @@ proc runBackfillGossipBlockOffersLoop( except CancelledError: trace "gossipBlockOffersLoop canceled" -proc runBackfillMetricsLoop(bridge: PortalStateBridge) {.async: (raises: []).} = - info "Starting state backfill metrics loop" +proc runMetricsLoop(bridge: PortalStateBridge) {.async: (raises: []).} = + info "Starting metrics loop" try: while true: @@ -459,7 +465,7 @@ proc runBackfillMetricsLoop(bridge: PortalStateBridge) {.async: (raises: []).} = proc validatePortalRpcEndpoints( portalRpcUrl: JsonRpcUrl, numOfEndpoints: int -): seq[(JsonRpcUrl, NodeId)] = +): Future[seq[(JsonRpcUrl, NodeId)]] {.async: (raises: []).} = var uri = parseUri(portalRpcUrl.value) endpoints = newSeq[(JsonRpcUrl, NodeId)]() @@ -474,7 +480,7 @@ proc validatePortalRpcEndpoints( client = newRpcClientConnect(rpcUrl) nodeId = try: - (waitFor client.portal_stateNodeInfo()).nodeId + (await client.portal_stateNodeInfo()).nodeId except CatchableError as e: fatal "Failed to connect to portal client", error = $e.msg quit QuitFailure @@ -505,31 +511,57 @@ proc validateStartBlockNumber(db: DatabaseRef, startBlockNumber: uint64) = warn "Start block must be set to 1" quit QuitFailure -proc start(bridge: PortalStateBridge, config: PortalBridgeConf) = - info "Starting state backfill from block: ", +proc start*(bridge: PortalStateBridge, config: PortalBridgeConf) = + info "Starting Portal state bridge from block: ", startBlockNumber = config.startBlockNumber - bridge.collectBlockDataLoop = - bridge.runBackfillCollectBlockDataLoop(config.startBlockNumber) - bridge.buildBlockOffersLoop = bridge.runBackfillBuildBlockOffersLoop( + bridge.collectBlockDataLoop = bridge.runCollectBlockDataLoop(config.startBlockNumber) + bridge.buildBlockOffersLoop = bridge.runBuildBlockOffersLoop( config.verifyStateProofs, config.enableGossip, config.gossipGenesis ) - bridge.metricsLoop = bridge.runBackfillMetricsLoop() + bridge.metricsLoop = bridge.runMetricsLoop() for worker in bridge.gossipWorkers: - worker.gossipBlockOffersLoop = worker.runBackfillGossipBlockOffersLoop( - config.verifyGossip, config.skipGossipForExisting - ) + worker.gossipBlockOffersLoop = + worker.runGossipBlockOffersLoop(config.verifyGossip, config.skipGossipForExisting) -# TODO: Implement stop and clean shutdown +proc stop*(bridge: PortalStateBridge) {.async: (raises: []).} = + info "Stopping Portal state bridge" -proc runState*(config: PortalBridgeConf) = - let - portalEndpoints = - validatePortalRpcEndpoints(config.portalRpcUrl, config.portalRpcEndpoints.int) - db = DatabaseRef.init(config.stateDir.string).get() - defer: - db.close() + var futures = newSeq[Future[void]]() + + # Cancel loops + for worker in bridge.gossipWorkers: + if not worker.gossipBlockOffersLoop.isNil(): + futures.add(worker.gossipBlockOffersLoop.cancelAndWait()) + + if not bridge.metricsLoop.isNil(): + futures.add(bridge.metricsLoop.cancelAndWait()) + if not bridge.buildBlockOffersLoop.isNil(): + futures.add(bridge.buildBlockOffersLoop.cancelAndWait()) + if not bridge.collectBlockDataLoop.isNil(): + futures.add(bridge.collectBlockDataLoop.cancelAndWait()) + + await noCancel(allFutures(futures)) + + # Close the database + bridge.db.close() + + for worker in bridge.gossipWorkers: + worker.gossipBlockOffersLoop = nil + + bridge.metricsLoop = nil + bridge.buildBlockOffersLoop = nil + bridge.collectBlockDataLoop = nil + +proc runState*( + config: PortalBridgeConf +): Future[PortalStateBridge] {.async: (raises: []).} = + let portalEndpoints = + await validatePortalRpcEndpoints(config.portalRpcUrl, config.portalRpcEndpoints.int) + + info "Using state directory: ", stateDir = config.stateDir.string + let db = DatabaseRef.init(config.stateDir.string).get() validateStartBlockNumber(db, config.startBlockNumber) @@ -555,5 +587,4 @@ proc runState*(config: PortalBridgeConf) = bridge.start(config) - while true: - poll() + return bridge