From 453cb2f33e4c7abc1a186d2ea90ed9e2a75ced41 Mon Sep 17 00:00:00 2001 From: Kim De Mey <7857583+kdeme@users.noreply.github.com> Date: Thu, 21 Nov 2024 21:30:42 +0700 Subject: [PATCH] portal_bridge: Add concurrency to the history content gossip (#2855) --- .../portal_bridge/portal_bridge_conf.nim | 7 + .../portal_bridge/portal_bridge_history.nim | 196 ++++++++---------- 2 files changed, 91 insertions(+), 112 deletions(-) diff --git a/fluffy/tools/portal_bridge/portal_bridge_conf.nim b/fluffy/tools/portal_bridge/portal_bridge_conf.nim index 9c5c37337..351db637e 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_conf.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_conf.nim @@ -141,6 +141,13 @@ type defaultValueDesc: defaultEra1DataDir(), name: "era1-dir" .}: InputDir + + gossipConcurrency* {. + desc: + "The number of concurrent gossip workers for gossiping content into the portal network", + defaultValue: 50, + name: "gossip-concurrency" + .}: int of PortalBridgeCmd.state: web3UrlState* {.desc: "Execution layer JSON-RPC API URL", name: "web3-url".}: JsonRpcUrl diff --git a/fluffy/tools/portal_bridge/portal_bridge_history.nim b/fluffy/tools/portal_bridge/portal_bridge_history.nim index ddfe5c989..3d1f16fa9 100644 --- a/fluffy/tools/portal_bridge/portal_bridge_history.nim +++ b/fluffy/tools/portal_bridge/portal_bridge_history.nim @@ -31,6 +31,11 @@ from eth/common/eth_types_rlp import rlpHash const newHeadPollInterval = 6.seconds # Slot with potential block is every 12s +type PortalHistoryBridge = ref object + portalClient: RpcClient + web3Client: RpcClient + gossipQueue: AsyncQueue[(seq[byte], seq[byte])] + ## Conversion functions for Block and Receipts func asEthBlock(blockObject: BlockObject): EthBlock = @@ -139,63 +144,34 @@ proc getBlockReceipts( ## Portal JSON-RPC API helper calls for pushing block and receipts proc gossipBlockHeader( - client: RpcClient, id: Hash32 | uint64, headerWithProof: BlockHeaderWithProof -): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = - let - contentKey = blockHeaderContentKey(id) - encodedContentKeyHex = contentKey.encode.asSeq().toHex() + bridge: PortalHistoryBridge, + id: Hash32 | uint64, + headerWithProof: BlockHeaderWithProof, +): Future[void] {.async: (raises: [CancelledError]).} = + let contentKey = blockHeaderContentKey(id) - peers = - try: - await client.portal_historyGossip( - encodedContentKeyHex, SSZ.encode(headerWithProof).toHex() - ) - except CatchableError as e: - return err("JSON-RPC portal_historyGossip failed: " & $e.msg) - - info "Block header gossiped", peers, contentKey = encodedContentKeyHex - ok() + await bridge.gossipQueue.addLast( + (contentKey.encode.asSeq(), SSZ.encode(headerWithProof)) + ) proc gossipBlockBody( - client: RpcClient, + bridge: PortalHistoryBridge, hash: Hash32, body: PortalBlockBodyLegacy | PortalBlockBodyShanghai, -): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = - let - contentKey = blockBodyContentKey(hash) - encodedContentKeyHex = contentKey.encode.asSeq().toHex() +): Future[void] {.async: (raises: [CancelledError]).} = + let contentKey = blockBodyContentKey(hash) - peers = - try: - await client.portal_historyGossip( - encodedContentKeyHex, SSZ.encode(body).toHex() - ) - except CatchableError as e: - return err("JSON-RPC portal_historyGossip failed: " & $e.msg) - - info "Block body gossiped", peers, contentKey = encodedContentKeyHex - ok() + await bridge.gossipQueue.addLast((contentKey.encode.asSeq(), SSZ.encode(body))) proc gossipReceipts( - client: RpcClient, hash: Hash32, receipts: PortalReceipts -): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = - let - contentKey = receiptsContentKey(hash) - encodedContentKeyHex = contentKey.encode.asSeq().toHex() + bridge: PortalHistoryBridge, hash: Hash32, receipts: PortalReceipts +): Future[void] {.async: (raises: [CancelledError]).} = + let contentKey = receiptsContentKey(hash) - peers = - try: - await client.portal_historyGossip( - encodedContentKeyHex, SSZ.encode(receipts).toHex() - ) - except CatchableError as e: - return err("JSON-RPC portal_historyGossip failed: " & $e.msg) - - info "Receipts gossiped", peers, contentKey = encodedContentKeyHex - return ok() + await bridge.gossipQueue.addLast((contentKey.encode.asSeq(), SSZ.encode(receipts))) proc runLatestLoop( - portalClient: RpcClient, web3Client: RpcClient, validate = false + bridge: PortalHistoryBridge, validate = false ) {.async: (raises: [CancelledError]).} = ## Loop that requests the latest block + receipts and pushes them into the ## Portal network. @@ -211,14 +187,14 @@ proc runLatestLoop( var lastBlockNumber = 0'u64 while true: let t0 = Moment.now() - let blockObject = (await getBlockByNumber(web3Client, blockId)).valueOr: + let blockObject = (await bridge.web3Client.getBlockByNumber(blockId)).valueOr: error "Failed to get latest block", error await sleepAsync(1.seconds) continue let blockNumber = distinctBase(blockObject.number) if blockNumber > lastBlockNumber: - let receiptObjects = (await web3Client.getBlockReceipts(blockNumber)).valueOr: + let receiptObjects = (await bridge.web3Client.getBlockReceipts(blockNumber)).valueOr: error "Failed to get latest receipts", error await sleepAsync(1.seconds) continue @@ -250,11 +226,9 @@ proc runLatestLoop( continue # gossip block header by hash - (await portalClient.gossipBlockHeader(hash, headerWithProof)).isOkOr: - error "Failed to gossip block header", error, hash + await bridge.gossipBlockHeader(hash, headerWithProof) # gossip block header by number - (await portalClient.gossipBlockHeader(blockNumber, headerWithProof)).isOkOr: - error "Failed to gossip block header", error, hash + await bridge.gossipBlockHeader(blockNumber, headerWithProof) # For bodies & receipts to get verified, the header needs to be available # on the network. Wait a little to get the headers propagated through @@ -262,12 +236,9 @@ proc runLatestLoop( await sleepAsync(2.seconds) # gossip block body - (await portalClient.gossipBlockBody(hash, body)).isOkOr: - error "Failed to gossip block body", error, hash - + await bridge.gossipBlockBody(hash, body) # gossip receipts - (await portalClient.gossipReceipts(hash, portalReceipts)).isOkOr: - error "Failed to gossip receipts", error, hash + await bridge.gossipReceipts(hash, portalReceipts) # Making sure here that we poll enough times not to miss a block. # We could also do some work without awaiting it, e.g. the gossiping or @@ -281,7 +252,7 @@ proc runLatestLoop( warn "Block gossip took longer than slot interval" proc gossipHeadersWithProof( - portalClient: RpcClient, + bridge: PortalHistoryBridge, era1File: string, epochRecordFile: Opt[string] = Opt.none(string), verifyEra = false, @@ -312,15 +283,15 @@ proc gossipHeadersWithProof( blockHash = blockHeader.rlpHash() # gossip block header by hash - ?(await portalClient.gossipBlockHeader(blockHash, headerWithProof)) + await bridge.gossipBlockHeader(blockHash, headerWithProof) # gossip block header by number - ?(await portalClient.gossipBlockHeader(blockHeader.number, headerWithProof)) + await bridge.gossipBlockHeader(blockHeader.number, headerWithProof) - info "Succesfully gossiped headers from era1 file", era1File + info "Succesfully put headers from era1 file in gossip queue", era1File ok() proc gossipBlockContent( - portalClient: RpcClient, era1File: string, verifyEra = false + bridge: PortalHistoryBridge, era1File: string, verifyEra = false ): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = let f = ?Era1File.open(era1File) @@ -333,28 +304,15 @@ proc gossipBlockContent( let blockHash = header.rlpHash() # gossip block body - ?( - await portalClient.gossipBlockBody( - blockHash, PortalBlockBodyLegacy.fromBlockBody(body) - ) - ) - + await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body)) # gossip receipts - ?( - await portalClient.gossipReceipts( - blockHash, PortalReceipts.fromReceipts(receipts) - ) - ) + await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts)) - info "Succesfully gossiped bodies and receipts from era1 file", era1File + info "Succesfully put bodies and receipts from era1 file in gossip queue", era1File ok() proc runBackfillLoop( - portalClient: RpcClient, - web3Client: RpcClient, - era1Dir: string, - startEra: uint64, - endEra: uint64, + bridge: PortalHistoryBridge, era1Dir: string, startEra: uint64, endEra: uint64 ) {.async: (raises: [CancelledError]).} = let accumulator = loadAccumulator() @@ -381,7 +339,7 @@ proc runBackfillLoop( info "Gossip headers from era1 file", era1File let headerRes = try: - await portalClient.portal_debug_historyGossipHeaders(era1File) + await bridge.portalClient.portal_debug_historyGossipHeaders(era1File) except CatchableError as e: error "JSON-RPC portal_debug_historyGossipHeaders failed", error = e.msg false @@ -390,7 +348,7 @@ proc runBackfillLoop( info "Gossip block content from era1 file", era1File let res = try: - await portalClient.portal_debug_historyGossipBlockContent(era1File) + await bridge.portalClient.portal_debug_historyGossipBlockContent(era1File) except CatchableError as e: error "JSON-RPC portal_debug_historyGossipBlockContent failed", error = e.msg @@ -400,16 +358,16 @@ proc runBackfillLoop( else: error "Failed to gossip headers from era1 file", era1File else: - (await portalClient.gossipHeadersWithProof(era1File)).isOkOr: + (await bridge.gossipHeadersWithProof(era1File)).isOkOr: error "Failed to gossip headers from era1 file", error, era1File continue - (await portalClient.gossipBlockContent(era1File)).isOkOr: + (await bridge.gossipBlockContent(era1File)).isOkOr: error "Failed to gossip block content from era1 file", error, era1File continue proc runBackfillLoopAuditMode( - portalClient: RpcClient, web3Client: RpcClient, era1Dir: string + bridge: PortalHistoryBridge, era1Dir: string ) {.async: (raises: [CancelledError]).} = let rng = newRng() @@ -436,7 +394,7 @@ proc runBackfillLoopAuditMode( contentHex = try: ( - await portalClient.portal_historyGetContent( + await bridge.portalClient.portal_historyGetContent( contentKey.encode.asSeq().toHex() ) ).content @@ -468,7 +426,7 @@ proc runBackfillLoopAuditMode( contentHex = try: ( - await portalClient.portal_historyGetContent( + await bridge.portalClient.portal_historyGetContent( contentKey.encode.asSeq().toHex() ) ).content @@ -496,7 +454,7 @@ proc runBackfillLoopAuditMode( contentHex = try: ( - await portalClient.portal_historyGetContent( + await bridge.portalClient.portal_historyGetContent( contentKey.encode.asSeq().toHex() ) ).content @@ -526,44 +484,58 @@ proc runBackfillLoopAuditMode( raiseAssert "Failed to build header with proof: " & error # gossip block header by hash - (await portalClient.gossipBlockHeader(blockHash, headerWithProof)).isOkOr: - error "Failed to gossip block header", error, blockHash + await bridge.gossipBlockHeader(blockHash, headerWithProof) # gossip block header by number - (await portalClient.gossipBlockHeader(blockNumber, headerWithProof)).isOkOr: - error "Failed to gossip block header", error, blockHash + await bridge.gossipBlockHeader(blockNumber, headerWithProof) if not bodySuccess: - ( - await portalClient.gossipBlockBody( - blockHash, PortalBlockBodyLegacy.fromBlockBody(body) - ) - ).isOkOr: - error "Failed to gossip block body", error, blockHash + await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body)) if not receiptsSuccess: - ( - await portalClient.gossipReceipts( - blockHash, PortalReceipts.fromReceipts(receipts) - ) - ).isOkOr: - error "Failed to gossip receipts", error, blockHash + await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts)) await sleepAsync(2.seconds) proc runHistory*(config: PortalBridgeConf) = - let - portalClient = newRpcClientConnect(config.portalRpcUrl) - web3Client = newRpcClientConnect(config.web3Url) + let bridge = PortalHistoryBridge( + portalClient: newRpcClientConnect(config.portalRpcUrl), + web3Client: newRpcClientConnect(config.web3Url), + gossipQueue: newAsyncQueue[(seq[byte], seq[byte])](config.gossipConcurrency), + ) + + proc gossipWorker(bridge: PortalHistoryBridge) {.async: (raises: []).} = + try: + while true: + let + (contentKey, contentValue) = await bridge.gossipQueue.popFirst() + contentKeyHex = contentKey.toHex() + contentValueHex = contentValue.toHex() + + try: + let peers = await bridge.portalClient.portal_historyGossip( + contentKeyHex, contentValueHex + ) + debug "Content gossiped", peers, contentKey = contentKeyHex + except CancelledError as e: + trace "Cancelled gossipWorker" + raise e + except CatchableError as e: + error "JSON-RPC portal_historyGossip failed", + error = $e.msg, contentKey = contentKeyHex + except CancelledError: + trace "gossipWorker canceled" + + var workers: seq[Future[void]] = @[] + for i in 0 ..< config.gossipConcurrency: + workers.add bridge.gossipWorker() if config.latest: - asyncSpawn runLatestLoop(portalClient, web3Client, config.blockVerify) + asyncSpawn bridge.runLatestLoop(config.blockVerify) if config.backfill: if config.audit: - asyncSpawn runBackfillLoopAuditMode( - portalClient, web3Client, config.era1Dir.string - ) + asyncSpawn bridge.runBackfillLoopAuditMode(config.era1Dir.string) else: - asyncSpawn runBackfillLoop( - portalClient, web3Client, config.era1Dir.string, config.startEra, config.endEra + asyncSpawn bridge.runBackfillLoop( + config.era1Dir.string, config.startEra, config.endEra ) while true: