portal_bridge: Add concurrency to the history content gossip (#2855)

This commit is contained in:
Kim De Mey 2024-11-21 21:30:42 +07:00 committed by GitHub
parent 107db3ae16
commit 453cb2f33e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 91 additions and 112 deletions

View File

@ -141,6 +141,13 @@ type
defaultValueDesc: defaultEra1DataDir(), defaultValueDesc: defaultEra1DataDir(),
name: "era1-dir" name: "era1-dir"
.}: InputDir .}: InputDir
gossipConcurrency* {.
desc:
"The number of concurrent gossip workers for gossiping content into the portal network",
defaultValue: 50,
name: "gossip-concurrency"
.}: int
of PortalBridgeCmd.state: of PortalBridgeCmd.state:
web3UrlState* {.desc: "Execution layer JSON-RPC API URL", name: "web3-url".}: web3UrlState* {.desc: "Execution layer JSON-RPC API URL", name: "web3-url".}:
JsonRpcUrl JsonRpcUrl

View File

@ -31,6 +31,11 @@ from eth/common/eth_types_rlp import rlpHash
const newHeadPollInterval = 6.seconds # Slot with potential block is every 12s const newHeadPollInterval = 6.seconds # Slot with potential block is every 12s
type PortalHistoryBridge = ref object
portalClient: RpcClient
web3Client: RpcClient
gossipQueue: AsyncQueue[(seq[byte], seq[byte])]
## Conversion functions for Block and Receipts ## Conversion functions for Block and Receipts
func asEthBlock(blockObject: BlockObject): EthBlock = func asEthBlock(blockObject: BlockObject): EthBlock =
@ -139,63 +144,34 @@ proc getBlockReceipts(
## Portal JSON-RPC API helper calls for pushing block and receipts ## Portal JSON-RPC API helper calls for pushing block and receipts
proc gossipBlockHeader( proc gossipBlockHeader(
client: RpcClient, id: Hash32 | uint64, headerWithProof: BlockHeaderWithProof bridge: PortalHistoryBridge,
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = id: Hash32 | uint64,
let headerWithProof: BlockHeaderWithProof,
contentKey = blockHeaderContentKey(id) ): Future[void] {.async: (raises: [CancelledError]).} =
encodedContentKeyHex = contentKey.encode.asSeq().toHex() let contentKey = blockHeaderContentKey(id)
peers = await bridge.gossipQueue.addLast(
try: (contentKey.encode.asSeq(), SSZ.encode(headerWithProof))
await client.portal_historyGossip(
encodedContentKeyHex, SSZ.encode(headerWithProof).toHex()
) )
except CatchableError as e:
return err("JSON-RPC portal_historyGossip failed: " & $e.msg)
info "Block header gossiped", peers, contentKey = encodedContentKeyHex
ok()
proc gossipBlockBody( proc gossipBlockBody(
client: RpcClient, bridge: PortalHistoryBridge,
hash: Hash32, hash: Hash32,
body: PortalBlockBodyLegacy | PortalBlockBodyShanghai, body: PortalBlockBodyLegacy | PortalBlockBodyShanghai,
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = ): Future[void] {.async: (raises: [CancelledError]).} =
let let contentKey = blockBodyContentKey(hash)
contentKey = blockBodyContentKey(hash)
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
peers = await bridge.gossipQueue.addLast((contentKey.encode.asSeq(), SSZ.encode(body)))
try:
await client.portal_historyGossip(
encodedContentKeyHex, SSZ.encode(body).toHex()
)
except CatchableError as e:
return err("JSON-RPC portal_historyGossip failed: " & $e.msg)
info "Block body gossiped", peers, contentKey = encodedContentKeyHex
ok()
proc gossipReceipts( proc gossipReceipts(
client: RpcClient, hash: Hash32, receipts: PortalReceipts bridge: PortalHistoryBridge, hash: Hash32, receipts: PortalReceipts
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = ): Future[void] {.async: (raises: [CancelledError]).} =
let let contentKey = receiptsContentKey(hash)
contentKey = receiptsContentKey(hash)
encodedContentKeyHex = contentKey.encode.asSeq().toHex()
peers = await bridge.gossipQueue.addLast((contentKey.encode.asSeq(), SSZ.encode(receipts)))
try:
await client.portal_historyGossip(
encodedContentKeyHex, SSZ.encode(receipts).toHex()
)
except CatchableError as e:
return err("JSON-RPC portal_historyGossip failed: " & $e.msg)
info "Receipts gossiped", peers, contentKey = encodedContentKeyHex
return ok()
proc runLatestLoop( proc runLatestLoop(
portalClient: RpcClient, web3Client: RpcClient, validate = false bridge: PortalHistoryBridge, validate = false
) {.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError]).} =
## Loop that requests the latest block + receipts and pushes them into the ## Loop that requests the latest block + receipts and pushes them into the
## Portal network. ## Portal network.
@ -211,14 +187,14 @@ proc runLatestLoop(
var lastBlockNumber = 0'u64 var lastBlockNumber = 0'u64
while true: while true:
let t0 = Moment.now() let t0 = Moment.now()
let blockObject = (await getBlockByNumber(web3Client, blockId)).valueOr: let blockObject = (await bridge.web3Client.getBlockByNumber(blockId)).valueOr:
error "Failed to get latest block", error error "Failed to get latest block", error
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
continue continue
let blockNumber = distinctBase(blockObject.number) let blockNumber = distinctBase(blockObject.number)
if blockNumber > lastBlockNumber: if blockNumber > lastBlockNumber:
let receiptObjects = (await web3Client.getBlockReceipts(blockNumber)).valueOr: let receiptObjects = (await bridge.web3Client.getBlockReceipts(blockNumber)).valueOr:
error "Failed to get latest receipts", error error "Failed to get latest receipts", error
await sleepAsync(1.seconds) await sleepAsync(1.seconds)
continue continue
@ -250,11 +226,9 @@ proc runLatestLoop(
continue continue
# gossip block header by hash # gossip block header by hash
(await portalClient.gossipBlockHeader(hash, headerWithProof)).isOkOr: await bridge.gossipBlockHeader(hash, headerWithProof)
error "Failed to gossip block header", error, hash
# gossip block header by number # gossip block header by number
(await portalClient.gossipBlockHeader(blockNumber, headerWithProof)).isOkOr: await bridge.gossipBlockHeader(blockNumber, headerWithProof)
error "Failed to gossip block header", error, hash
# For bodies & receipts to get verified, the header needs to be available # For bodies & receipts to get verified, the header needs to be available
# on the network. Wait a little to get the headers propagated through # on the network. Wait a little to get the headers propagated through
@ -262,12 +236,9 @@ proc runLatestLoop(
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
# gossip block body # gossip block body
(await portalClient.gossipBlockBody(hash, body)).isOkOr: await bridge.gossipBlockBody(hash, body)
error "Failed to gossip block body", error, hash
# gossip receipts # gossip receipts
(await portalClient.gossipReceipts(hash, portalReceipts)).isOkOr: await bridge.gossipReceipts(hash, portalReceipts)
error "Failed to gossip receipts", error, hash
# Making sure here that we poll enough times not to miss a block. # Making sure here that we poll enough times not to miss a block.
# We could also do some work without awaiting it, e.g. the gossiping or # We could also do some work without awaiting it, e.g. the gossiping or
@ -281,7 +252,7 @@ proc runLatestLoop(
warn "Block gossip took longer than slot interval" warn "Block gossip took longer than slot interval"
proc gossipHeadersWithProof( proc gossipHeadersWithProof(
portalClient: RpcClient, bridge: PortalHistoryBridge,
era1File: string, era1File: string,
epochRecordFile: Opt[string] = Opt.none(string), epochRecordFile: Opt[string] = Opt.none(string),
verifyEra = false, verifyEra = false,
@ -312,15 +283,15 @@ proc gossipHeadersWithProof(
blockHash = blockHeader.rlpHash() blockHash = blockHeader.rlpHash()
# gossip block header by hash # gossip block header by hash
?(await portalClient.gossipBlockHeader(blockHash, headerWithProof)) await bridge.gossipBlockHeader(blockHash, headerWithProof)
# gossip block header by number # gossip block header by number
?(await portalClient.gossipBlockHeader(blockHeader.number, headerWithProof)) await bridge.gossipBlockHeader(blockHeader.number, headerWithProof)
info "Succesfully gossiped headers from era1 file", era1File info "Succesfully put headers from era1 file in gossip queue", era1File
ok() ok()
proc gossipBlockContent( proc gossipBlockContent(
portalClient: RpcClient, era1File: string, verifyEra = false bridge: PortalHistoryBridge, era1File: string, verifyEra = false
): Future[Result[void, string]] {.async: (raises: [CancelledError]).} = ): Future[Result[void, string]] {.async: (raises: [CancelledError]).} =
let f = ?Era1File.open(era1File) let f = ?Era1File.open(era1File)
@ -333,28 +304,15 @@ proc gossipBlockContent(
let blockHash = header.rlpHash() let blockHash = header.rlpHash()
# gossip block body # gossip block body
?( await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body))
await portalClient.gossipBlockBody(
blockHash, PortalBlockBodyLegacy.fromBlockBody(body)
)
)
# gossip receipts # gossip receipts
?( await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts))
await portalClient.gossipReceipts(
blockHash, PortalReceipts.fromReceipts(receipts)
)
)
info "Succesfully gossiped bodies and receipts from era1 file", era1File info "Succesfully put bodies and receipts from era1 file in gossip queue", era1File
ok() ok()
proc runBackfillLoop( proc runBackfillLoop(
portalClient: RpcClient, bridge: PortalHistoryBridge, era1Dir: string, startEra: uint64, endEra: uint64
web3Client: RpcClient,
era1Dir: string,
startEra: uint64,
endEra: uint64,
) {.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError]).} =
let accumulator = loadAccumulator() let accumulator = loadAccumulator()
@ -381,7 +339,7 @@ proc runBackfillLoop(
info "Gossip headers from era1 file", era1File info "Gossip headers from era1 file", era1File
let headerRes = let headerRes =
try: try:
await portalClient.portal_debug_historyGossipHeaders(era1File) await bridge.portalClient.portal_debug_historyGossipHeaders(era1File)
except CatchableError as e: except CatchableError as e:
error "JSON-RPC portal_debug_historyGossipHeaders failed", error = e.msg error "JSON-RPC portal_debug_historyGossipHeaders failed", error = e.msg
false false
@ -390,7 +348,7 @@ proc runBackfillLoop(
info "Gossip block content from era1 file", era1File info "Gossip block content from era1 file", era1File
let res = let res =
try: try:
await portalClient.portal_debug_historyGossipBlockContent(era1File) await bridge.portalClient.portal_debug_historyGossipBlockContent(era1File)
except CatchableError as e: except CatchableError as e:
error "JSON-RPC portal_debug_historyGossipBlockContent failed", error "JSON-RPC portal_debug_historyGossipBlockContent failed",
error = e.msg error = e.msg
@ -400,16 +358,16 @@ proc runBackfillLoop(
else: else:
error "Failed to gossip headers from era1 file", era1File error "Failed to gossip headers from era1 file", era1File
else: else:
(await portalClient.gossipHeadersWithProof(era1File)).isOkOr: (await bridge.gossipHeadersWithProof(era1File)).isOkOr:
error "Failed to gossip headers from era1 file", error, era1File error "Failed to gossip headers from era1 file", error, era1File
continue continue
(await portalClient.gossipBlockContent(era1File)).isOkOr: (await bridge.gossipBlockContent(era1File)).isOkOr:
error "Failed to gossip block content from era1 file", error, era1File error "Failed to gossip block content from era1 file", error, era1File
continue continue
proc runBackfillLoopAuditMode( proc runBackfillLoopAuditMode(
portalClient: RpcClient, web3Client: RpcClient, era1Dir: string bridge: PortalHistoryBridge, era1Dir: string
) {.async: (raises: [CancelledError]).} = ) {.async: (raises: [CancelledError]).} =
let let
rng = newRng() rng = newRng()
@ -436,7 +394,7 @@ proc runBackfillLoopAuditMode(
contentHex = contentHex =
try: try:
( (
await portalClient.portal_historyGetContent( await bridge.portalClient.portal_historyGetContent(
contentKey.encode.asSeq().toHex() contentKey.encode.asSeq().toHex()
) )
).content ).content
@ -468,7 +426,7 @@ proc runBackfillLoopAuditMode(
contentHex = contentHex =
try: try:
( (
await portalClient.portal_historyGetContent( await bridge.portalClient.portal_historyGetContent(
contentKey.encode.asSeq().toHex() contentKey.encode.asSeq().toHex()
) )
).content ).content
@ -496,7 +454,7 @@ proc runBackfillLoopAuditMode(
contentHex = contentHex =
try: try:
( (
await portalClient.portal_historyGetContent( await bridge.portalClient.portal_historyGetContent(
contentKey.encode.asSeq().toHex() contentKey.encode.asSeq().toHex()
) )
).content ).content
@ -526,44 +484,58 @@ proc runBackfillLoopAuditMode(
raiseAssert "Failed to build header with proof: " & error raiseAssert "Failed to build header with proof: " & error
# gossip block header by hash # gossip block header by hash
(await portalClient.gossipBlockHeader(blockHash, headerWithProof)).isOkOr: await bridge.gossipBlockHeader(blockHash, headerWithProof)
error "Failed to gossip block header", error, blockHash
# gossip block header by number # gossip block header by number
(await portalClient.gossipBlockHeader(blockNumber, headerWithProof)).isOkOr: await bridge.gossipBlockHeader(blockNumber, headerWithProof)
error "Failed to gossip block header", error, blockHash
if not bodySuccess: if not bodySuccess:
( await bridge.gossipBlockBody(blockHash, PortalBlockBodyLegacy.fromBlockBody(body))
await portalClient.gossipBlockBody(
blockHash, PortalBlockBodyLegacy.fromBlockBody(body)
)
).isOkOr:
error "Failed to gossip block body", error, blockHash
if not receiptsSuccess: if not receiptsSuccess:
( await bridge.gossipReceipts(blockHash, PortalReceipts.fromReceipts(receipts))
await portalClient.gossipReceipts(
blockHash, PortalReceipts.fromReceipts(receipts)
)
).isOkOr:
error "Failed to gossip receipts", error, blockHash
await sleepAsync(2.seconds) await sleepAsync(2.seconds)
proc runHistory*(config: PortalBridgeConf) = proc runHistory*(config: PortalBridgeConf) =
let bridge = PortalHistoryBridge(
portalClient: newRpcClientConnect(config.portalRpcUrl),
web3Client: newRpcClientConnect(config.web3Url),
gossipQueue: newAsyncQueue[(seq[byte], seq[byte])](config.gossipConcurrency),
)
proc gossipWorker(bridge: PortalHistoryBridge) {.async: (raises: []).} =
try:
while true:
let let
portalClient = newRpcClientConnect(config.portalRpcUrl) (contentKey, contentValue) = await bridge.gossipQueue.popFirst()
web3Client = newRpcClientConnect(config.web3Url) contentKeyHex = contentKey.toHex()
contentValueHex = contentValue.toHex()
try:
let peers = await bridge.portalClient.portal_historyGossip(
contentKeyHex, contentValueHex
)
debug "Content gossiped", peers, contentKey = contentKeyHex
except CancelledError as e:
trace "Cancelled gossipWorker"
raise e
except CatchableError as e:
error "JSON-RPC portal_historyGossip failed",
error = $e.msg, contentKey = contentKeyHex
except CancelledError:
trace "gossipWorker canceled"
var workers: seq[Future[void]] = @[]
for i in 0 ..< config.gossipConcurrency:
workers.add bridge.gossipWorker()
if config.latest: if config.latest:
asyncSpawn runLatestLoop(portalClient, web3Client, config.blockVerify) asyncSpawn bridge.runLatestLoop(config.blockVerify)
if config.backfill: if config.backfill:
if config.audit: if config.audit:
asyncSpawn runBackfillLoopAuditMode( asyncSpawn bridge.runBackfillLoopAuditMode(config.era1Dir.string)
portalClient, web3Client, config.era1Dir.string
)
else: else:
asyncSpawn runBackfillLoop( asyncSpawn bridge.runBackfillLoop(
portalClient, web3Client, config.era1Dir.string, config.startEra, config.endEra config.era1Dir.string, config.startEra, config.endEra
) )
while true: while true: