Fluffy State Bridge: Improvements to websocket rpc connections (#2694)

Handle reconnect for websocket rpc client and create rpc client instance per worker in order to improve websocket throughput. (#2694)
This commit is contained in:
bhartnett 2024-10-04 14:17:18 +08:00 committed by GitHub
parent d7a9d946a3
commit 56724536a4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 29 additions and 11 deletions

View File

@ -36,6 +36,18 @@ proc newRpcClientConnect*(url: JsonRpcUrl): RpcClient =
quit QuitFailure quit QuitFailure
client client
proc tryReconnect*(client: RpcClient, url: JsonRpcUrl) {.async: (raises: []).} =
if url.kind == WsUrl:
doAssert client of RpcWebSocketClient
let wsClient = RpcWebSocketClient(client)
if wsClient.transport.isNil:
# disconnected
try:
await wsClient.connect(url.value)
except CatchableError as e:
warn "Failed to reconnect to JSON-RPC server", error = $e.msg, url = url.value
proc getBlockByNumber*( proc getBlockByNumber*(
client: RpcClient, blockId: BlockIdentifier, fullTransactions: bool = true client: RpcClient, blockId: BlockIdentifier, fullTransactions: bool = true
): Future[Result[BlockObject, string]] {.async: (raises: []).} = ): Future[Result[BlockObject, string]] {.async: (raises: []).} =

View File

@ -73,11 +73,15 @@ proc putLastPersistedBlockNumber(db: DatabaseRef, blockNumber: uint64) {.inline.
proc runBackfillCollectBlockDataLoop( proc runBackfillCollectBlockDataLoop(
db: DatabaseRef, db: DatabaseRef,
blockDataQueue: AsyncQueue[BlockData], blockDataQueue: AsyncQueue[BlockData],
web3Client: RpcClient, web3Url: JsonRpcUrl,
startBlockNumber: uint64, startBlockNumber: uint64,
) {.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError]).} =
info "Starting state backfill collect block data loop" info "Starting state backfill collect block data loop"
let web3Client = newRpcClientConnect(web3Url)
if web3Client of RpcHttpClient:
warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance"
let parentBlock = ( let parentBlock = (
await web3Client.getBlockByNumber(blockId(startBlockNumber - 1.uint64), false) await web3Client.getBlockByNumber(blockId(startBlockNumber - 1.uint64), false)
).valueOr: ).valueOr:
@ -97,11 +101,13 @@ proc runBackfillCollectBlockDataLoop(
blockId = blockId(currentBlockNumber) blockId = blockId(currentBlockNumber)
blockObject = (await web3Client.getBlockByNumber(blockId, false)).valueOr: blockObject = (await web3Client.getBlockByNumber(blockId, false)).valueOr:
error "Failed to get block", error = error error "Failed to get block", error = error
await sleepAsync(1.seconds) await sleepAsync(3.seconds)
# We might need to reconnect if using a WebSocket client
await web3Client.tryReconnect(web3Url)
continue continue
stateDiffs = (await web3Client.getStateDiffsByBlockNumber(blockId)).valueOr: stateDiffs = (await web3Client.getStateDiffsByBlockNumber(blockId)).valueOr:
error "Failed to get state diffs", error = error error "Failed to get state diffs", error = error
await sleepAsync(1.seconds) await sleepAsync(3.seconds)
continue continue
var uncleBlocks: seq[BlockObject] var uncleBlocks: seq[BlockObject]
@ -110,7 +116,7 @@ proc runBackfillCollectBlockDataLoop(
await web3Client.getUncleByBlockNumberAndIndex(blockId, i.Quantity) await web3Client.getUncleByBlockNumberAndIndex(blockId, i.Quantity)
).valueOr: ).valueOr:
error "Failed to get uncle block", error = error error "Failed to get uncle block", error = error
await sleepAsync(1.seconds) await sleepAsync(3.seconds)
continue continue
uncleBlocks.add(uncleBlock) uncleBlocks.add(uncleBlock)
@ -255,13 +261,14 @@ proc recursiveCollectOffer(
proc runBackfillGossipBlockOffersLoop( proc runBackfillGossipBlockOffersLoop(
blockOffersQueue: AsyncQueue[BlockOffersRef], blockOffersQueue: AsyncQueue[BlockOffersRef],
portalClient: RpcClient, portalRpcUrl: JsonRpcUrl,
portalNodeId: NodeId, portalNodeId: NodeId,
verifyGossip: bool, verifyGossip: bool,
workerId: int, workerId: int,
) {.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError]).} =
info "Starting state backfill gossip block offers loop", workerId info "Starting state backfill gossip block offers loop", workerId
let portalClient = newRpcClientConnect(portalRpcUrl)
var blockOffers = await blockOffersQueue.popFirst() var blockOffers = await blockOffersQueue.popFirst()
while true: while true:
@ -315,6 +322,8 @@ proc runBackfillGossipBlockOffersLoop(
await sleepAsync(3.seconds) await sleepAsync(3.seconds)
warn "Retrying state gossip for block number: ", warn "Retrying state gossip for block number: ",
blockNumber = blockOffers.blockNumber, workerId blockNumber = blockOffers.blockNumber, workerId
# We might need to reconnect if using a WebSocket client
await portalClient.tryReconnect(portalRpcUrl)
continue continue
if verifyGossip: if verifyGossip:
@ -377,10 +386,7 @@ proc runState*(config: PortalBridgeConf) =
fatal "Failed to connect to portal client", error = $e.msg fatal "Failed to connect to portal client", error = $e.msg
quit QuitFailure quit QuitFailure
info "Connected to portal client with nodeId", nodeId = portalNodeId info "Connected to portal client with nodeId", nodeId = portalNodeId
asyncSpawn portalClient.close() # this connection was only used to collect the nodeId
let web3Client = newRpcClientConnect(config.web3UrlState)
if web3Client of RpcHttpClient:
warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance"
let db = DatabaseRef.init(config.stateDir.string).get() let db = DatabaseRef.init(config.stateDir.string).get()
defer: defer:
@ -409,7 +415,7 @@ proc runState*(config: PortalBridgeConf) =
blockOffersQueue = newAsyncQueue[BlockOffersRef](bufferSize) blockOffersQueue = newAsyncQueue[BlockOffersRef](bufferSize)
asyncSpawn runBackfillCollectBlockDataLoop( asyncSpawn runBackfillCollectBlockDataLoop(
db, blockDataQueue, web3Client, config.startBlockNumber db, blockDataQueue, config.web3UrlState, config.startBlockNumber
) )
asyncSpawn runBackfillBuildBlockOffersLoop( asyncSpawn runBackfillBuildBlockOffersLoop(
db, blockDataQueue, blockOffersQueue, config.verifyStateProofs, config.enableGossip, db, blockDataQueue, blockOffersQueue, config.verifyStateProofs, config.enableGossip,
@ -418,7 +424,7 @@ proc runState*(config: PortalBridgeConf) =
for workerId in 1 .. config.gossipWorkersCount.int: for workerId in 1 .. config.gossipWorkersCount.int:
asyncSpawn runBackfillGossipBlockOffersLoop( asyncSpawn runBackfillGossipBlockOffersLoop(
blockOffersQueue, portalClient, portalNodeId, config.verifyGossip, workerId blockOffersQueue, config.portalRpcUrl, portalNodeId, config.verifyGossip, workerId
) )
asyncSpawn runBackfillMetricsLoop(blockDataQueue, blockOffersQueue) asyncSpawn runBackfillMetricsLoop(blockDataQueue, blockOffersQueue)