From 56724536a4f0fb32959ce795c6eb0c38d426f96e Mon Sep 17 00:00:00 2001 From: bhartnett <51288821+bhartnett@users.noreply.github.com> Date: Fri, 4 Oct 2024 14:17:18 +0800 Subject: [PATCH] Fluffy State Bridge: Improvements to websocket rpc connections (#2694) Handle reconnect for websocket rpc client and create rpc client instance per worker in order to improve websocket throughput. (#2694) --- .../portal_bridge/portal_bridge_common.nim | 12 ++++++++ .../portal_bridge/portal_bridge_state.nim | 28 +++++++++++-------- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/fluffy/tools/portal_bridge/portal_bridge_common.nim b/fluffy/tools/portal_bridge/portal_bridge_common.nim index 1e7b852b1..abbdef149 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_common.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_common.nim @@ -36,6 +36,18 @@ proc newRpcClientConnect*(url: JsonRpcUrl): RpcClient = quit QuitFailure client +proc tryReconnect*(client: RpcClient, url: JsonRpcUrl) {.async: (raises: []).} = + if url.kind == WsUrl: + doAssert client of RpcWebSocketClient + + let wsClient = RpcWebSocketClient(client) + if wsClient.transport.isNil: + # disconnected + try: + await wsClient.connect(url.value) + except CatchableError as e: + warn "Failed to reconnect to JSON-RPC server", error = $e.msg, url = url.value + proc getBlockByNumber*( client: RpcClient, blockId: BlockIdentifier, fullTransactions: bool = true ): Future[Result[BlockObject, string]] {.async: (raises: []).} = diff --git a/fluffy/tools/portal_bridge/portal_bridge_state.nim b/fluffy/tools/portal_bridge/portal_bridge_state.nim index 8a942df8c..9a13895d7 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_state.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_state.nim @@ -73,11 +73,15 @@ proc putLastPersistedBlockNumber(db: DatabaseRef, blockNumber: uint64) {.inline. proc runBackfillCollectBlockDataLoop( db: DatabaseRef, blockDataQueue: AsyncQueue[BlockData], - web3Client: RpcClient, + web3Url: JsonRpcUrl, startBlockNumber: uint64, ) {.async: (raises: [CancelledError]).} = info "Starting state backfill collect block data loop" + let web3Client = newRpcClientConnect(web3Url) + if web3Client of RpcHttpClient: + warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance" + let parentBlock = ( await web3Client.getBlockByNumber(blockId(startBlockNumber - 1.uint64), false) ).valueOr: @@ -97,11 +101,13 @@ proc runBackfillCollectBlockDataLoop( blockId = blockId(currentBlockNumber) blockObject = (await web3Client.getBlockByNumber(blockId, false)).valueOr: error "Failed to get block", error = error - await sleepAsync(1.seconds) + await sleepAsync(3.seconds) + # We might need to reconnect if using a WebSocket client + await web3Client.tryReconnect(web3Url) continue stateDiffs = (await web3Client.getStateDiffsByBlockNumber(blockId)).valueOr: error "Failed to get state diffs", error = error - await sleepAsync(1.seconds) + await sleepAsync(3.seconds) continue var uncleBlocks: seq[BlockObject] @@ -110,7 +116,7 @@ proc runBackfillCollectBlockDataLoop( await web3Client.getUncleByBlockNumberAndIndex(blockId, i.Quantity) ).valueOr: error "Failed to get uncle block", error = error - await sleepAsync(1.seconds) + await sleepAsync(3.seconds) continue uncleBlocks.add(uncleBlock) @@ -255,13 +261,14 @@ proc recursiveCollectOffer( proc runBackfillGossipBlockOffersLoop( blockOffersQueue: AsyncQueue[BlockOffersRef], - portalClient: RpcClient, + portalRpcUrl: JsonRpcUrl, portalNodeId: NodeId, verifyGossip: bool, workerId: int, ) {.async: (raises: [CancelledError]).} = info "Starting state backfill gossip block offers loop", workerId + let portalClient = newRpcClientConnect(portalRpcUrl) var blockOffers = await blockOffersQueue.popFirst() while true: @@ -315,6 +322,8 @@ proc runBackfillGossipBlockOffersLoop( await sleepAsync(3.seconds) warn "Retrying state gossip for block number: ", blockNumber = blockOffers.blockNumber, workerId + # We might need to reconnect if using a WebSocket client + await portalClient.tryReconnect(portalRpcUrl) continue if verifyGossip: @@ -377,10 +386,7 @@ proc runState*(config: PortalBridgeConf) = fatal "Failed to connect to portal client", error = $e.msg quit QuitFailure info "Connected to portal client with nodeId", nodeId = portalNodeId - - let web3Client = newRpcClientConnect(config.web3UrlState) - if web3Client of RpcHttpClient: - warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance" + asyncSpawn portalClient.close() # this connection was only used to collect the nodeId let db = DatabaseRef.init(config.stateDir.string).get() defer: @@ -409,7 +415,7 @@ proc runState*(config: PortalBridgeConf) = blockOffersQueue = newAsyncQueue[BlockOffersRef](bufferSize) asyncSpawn runBackfillCollectBlockDataLoop( - db, blockDataQueue, web3Client, config.startBlockNumber + db, blockDataQueue, config.web3UrlState, config.startBlockNumber ) asyncSpawn runBackfillBuildBlockOffersLoop( db, blockDataQueue, blockOffersQueue, config.verifyStateProofs, config.enableGossip, @@ -418,7 +424,7 @@ proc runState*(config: PortalBridgeConf) = for workerId in 1 .. config.gossipWorkersCount.int: asyncSpawn runBackfillGossipBlockOffersLoop( - blockOffersQueue, portalClient, portalNodeId, config.verifyGossip, workerId + blockOffersQueue, config.portalRpcUrl, portalNodeId, config.verifyGossip, workerId ) asyncSpawn runBackfillMetricsLoop(blockDataQueue, blockOffersQueue)