Fluffy: Refactor and organize state bridge code (#2992)

This commit is contained in:
bhartnett 2025-01-13 16:49:35 +08:00 committed by GitHub
parent 67a45b0a7f
commit 6e83a48969
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -43,9 +43,24 @@ type
contractTrieOffers: seq[ContractTrieOfferWithKey]
contractCodeOffers: seq[ContractCodeOfferWithKey]
PortalEndpoint = object
rpcUrl: JsonRpcUrl
PortalStateGossipWorker = ref object
id: int
portalClient: RpcClient
portalUrl: JsonRpcUrl
nodeId: NodeId
blockOffersQueue: AsyncQueue[BlockOffersRef]
gossipBlockOffersLoop: Future[void]
PortalStateBridge = ref object
web3Client: RpcClient
web3Url: JsonRpcUrl
db: DatabaseRef
blockDataQueue: AsyncQueue[BlockData]
blockOffersQueue: AsyncQueue[BlockOffersRef]
gossipWorkers: seq[PortalStateGossipWorker]
collectBlockDataLoop: Future[void]
buildBlockOffersLoop: Future[void]
metricsLoop: Future[void]
proc getBlockData(db: DatabaseRef, blockNumber: uint64): Opt[BlockData] =
let blockDataBytes = db.get(rlp.encode(blockNumber))
@ -77,16 +92,35 @@ proc putLastPersistedBlockNumber(db: DatabaseRef, blockNumber: uint64) {.inline.
if blockNumber > db.getLastPersistedBlockNumber().valueOr(0):
db.put(rlp.encode("lastPersistedBlockNumber"), rlp.encode(blockNumber))
proc collectOffer(
offersMap: OrderedTableRef[seq[byte], seq[byte]],
offerWithKey:
AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey,
) {.inline.} =
let keyBytes = offerWithKey.key.toContentKey().encode().asSeq()
offersMap[keyBytes] = offerWithKey.offer.encode()
proc recursiveCollectOffer(
offersMap: OrderedTableRef[seq[byte], seq[byte]],
offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey,
) =
offersMap.collectOffer(offerWithKey)
# root node, recursive collect is finished
if offerWithKey.key.path.unpackNibbles().len() == 0:
return
# continue the recursive collect
offersMap.recursiveCollectOffer(offerWithKey.getParent())
proc runBackfillCollectBlockDataLoop(
db: DatabaseRef,
blockDataQueue: AsyncQueue[BlockData],
web3Url: JsonRpcUrl,
startBlockNumber: uint64,
) {.async: (raises: [CancelledError]).} =
bridge: PortalStateBridge, startBlockNumber: uint64
) {.async: (raises: []).} =
info "Starting state backfill collect block data loop"
let web3Client = newRpcClientConnect(web3Url)
if web3Client of RpcHttpClient:
try:
bridge.web3Client = newRpcClientConnect(bridge.web3Url)
if bridge.web3Client of RpcHttpClient:
warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance"
var
@ -95,9 +129,10 @@ proc runBackfillCollectBlockDataLoop(
while true:
if currentBlockNumber mod 10000 == 0:
info "Collecting block data for block number: ", blockNumber = currentBlockNumber
info "Collecting block data for block number: ",
blockNumber = currentBlockNumber
let blockData = db.getBlockData(currentBlockNumber).valueOr:
let blockData = bridge.db.getBlockData(currentBlockNumber).valueOr:
# block data doesn't exist in db so we fetch it via RPC
# This should only be run for the starting block but we put this code here
@ -108,27 +143,27 @@ proc runBackfillCollectBlockDataLoop(
# if we don't yet have the parent state root get it from the parent block
let parentBlock = (
await web3Client.getBlockByNumber(
await bridge.web3Client.getBlockByNumber(
blockId(currentBlockNumber - 1.uint64), false
)
).valueOr:
error "Failed to get parent block", error = error
await sleepAsync(3.seconds)
# We might need to reconnect if using a WebSocket client
await web3Client.tryReconnect(web3Url)
await bridge.web3Client.tryReconnect(bridge.web3Url)
continue
parentStateRoot = parentBlock.stateRoot
let
blockId = blockId(currentBlockNumber)
blockObject = (await web3Client.getBlockByNumber(blockId, false)).valueOr:
blockObject = (await bridge.web3Client.getBlockByNumber(blockId, false)).valueOr:
error "Failed to get block", error = error
await sleepAsync(3.seconds)
# We might need to reconnect if using a WebSocket client
await web3Client.tryReconnect(web3Url)
await bridge.web3Client.tryReconnect(bridge.web3Url)
continue
stateDiffs = (await web3Client.getStateDiffsByBlockNumber(blockId)).valueOr:
stateDiffs = (await bridge.web3Client.getStateDiffsByBlockNumber(blockId)).valueOr:
error "Failed to get state diffs", error = error
await sleepAsync(3.seconds)
continue
@ -136,7 +171,7 @@ proc runBackfillCollectBlockDataLoop(
var uncleBlocks: seq[BlockObject]
for i in 0 .. blockObject.uncles.high:
let uncleBlock = (
await web3Client.getUncleByBlockNumberAndIndex(blockId, i.Quantity)
await bridge.web3Client.getUncleByBlockNumberAndIndex(blockId, i.Quantity)
).valueOr:
error "Failed to get uncle block", error = error
await sleepAsync(3.seconds)
@ -152,55 +187,56 @@ proc runBackfillCollectBlockDataLoop(
stateRoot: blockObject.stateRoot,
stateDiffs: stateDiffs,
)
db.putBlockData(currentBlockNumber, blockData)
bridge.db.putBlockData(currentBlockNumber, blockData)
parentStateRoot = blockObject.stateRoot
blockData
await blockDataQueue.addLast(blockData)
await bridge.blockDataQueue.addLast(blockData)
parentStateRoot = blockData.stateRoot
inc currentBlockNumber
except CancelledError:
trace "collectBlockDataLoop canceled"
proc runBackfillBuildBlockOffersLoop(
db: DatabaseRef,
blockDataQueue: AsyncQueue[BlockData],
blockOffersQueue: AsyncQueue[BlockOffersRef],
bridge: PortalStateBridge,
verifyStateProofs: bool,
enableGossip: bool,
gossipGenesis: bool,
) {.async: (raises: [CancelledError]).} =
) {.async: (raises: []).} =
info "Starting state backfill build block offers loop"
try:
# wait for the first block data to be put on the queue
# so that we can access the first block once available
while blockDataQueue.empty():
while bridge.blockDataQueue.empty():
await sleepAsync(100.milliseconds)
# peek but don't remove it so that it can be processed later
let firstBlock = blockDataQueue[0]
let firstBlock = bridge.blockDataQueue[0]
# Only apply genesis accounts if starting from block 1
if firstBlock.blockNumber == 1:
info "Building state for genesis"
db.withTransaction:
bridge.db.withTransaction:
# Requires an active transaction because it writes an emptyRlp node
# to the accounts HexaryTrie on initialization
let
ws = WorldStateRef.init(db)
worldState = WorldStateRef.init(bridge.db)
genesisAccounts =
try:
genesisBlockForNetwork(MainNet).alloc
except CatchableError as e:
raiseAssert(e.msg) # Should never happen
ws.applyGenesisAccounts(genesisAccounts)
worldState.applyGenesisAccounts(genesisAccounts)
if enableGossip and gossipGenesis:
let genesisBlockHash =
hash32"d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3"
var builder = OffersBuilder.init(ws, genesisBlockHash)
var builder = OffersBuilder.init(worldState, genesisBlockHash)
builder.buildBlockOffers()
await blockOffersQueue.addLast(
await bridge.blockOffersQueue.addLast(
BlockOffersRef(
blockNumber: 0.uint64,
accountTrieOffers: builder.getAccountTrieOffers(),
@ -210,10 +246,10 @@ proc runBackfillBuildBlockOffersLoop(
)
# Load the world state using the parent state root
let worldState = WorldStateRef.init(db, firstBlock.parentStateRoot)
let worldState = WorldStateRef.init(bridge.db, firstBlock.parentStateRoot)
while true:
let blockData = await blockDataQueue.popFirst()
let blockData = await bridge.blockDataQueue.popFirst()
if blockData.blockNumber mod 10000 == 0:
info "Building state for block number: ", blockNumber = blockData.blockNumber
@ -221,7 +257,7 @@ proc runBackfillBuildBlockOffersLoop(
# For now all WorldStateRef functions need to be inside a transaction
# because the DatabaseRef backends currently only supports reading and
# writing to/from a single active transaction.
db.withTransaction:
bridge.db.withTransaction:
for stateDiff in blockData.stateDiffs:
worldState.applyStateDiff(stateDiff)
@ -247,7 +283,7 @@ proc runBackfillBuildBlockOffersLoop(
var builder = OffersBuilder.init(worldState, blockData.blockHash)
builder.buildBlockOffers()
await blockOffersQueue.addLast(
await bridge.blockOffersQueue.addLast(
BlockOffersRef(
blockNumber: blockData.blockNumber,
accountTrieOffers: builder.getAccountTrieOffers(),
@ -259,43 +295,22 @@ proc runBackfillBuildBlockOffersLoop(
# After commit of the above db transaction which stores the updated account state
# then we store the last persisted block number in the database so that we can use it
# to enable restarting from this block if needed
db.putLastPersistedBlockNumber(blockData.blockNumber)
proc collectOffer(
offersMap: OrderedTableRef[seq[byte], seq[byte]],
offerWithKey:
AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey,
) {.inline.} =
let keyBytes = offerWithKey.key.toContentKey().encode().asSeq()
offersMap[keyBytes] = offerWithKey.offer.encode()
proc recursiveCollectOffer(
offersMap: OrderedTableRef[seq[byte], seq[byte]],
offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey,
) =
offersMap.collectOffer(offerWithKey)
# root node, recursive collect is finished
if offerWithKey.key.path.unpackNibbles().len() == 0:
return
# continue the recursive collect
offersMap.recursiveCollectOffer(offerWithKey.getParent())
bridge.db.putLastPersistedBlockNumber(blockData.blockNumber)
except CancelledError:
trace "buildBlockOffersLoop canceled"
proc runBackfillGossipBlockOffersLoop(
blockOffersQueue: AsyncQueue[BlockOffersRef],
portalEndpoint: PortalEndpoint,
verifyGossip: bool,
skipGossipForExisting: bool,
workerId: int,
) {.async: (raises: [CancelledError]).} =
info "Starting state backfill gossip block offers loop", workerId
worker: PortalStateGossipWorker, verifyGossip: bool, skipGossipForExisting: bool
) {.async: (raises: []).} =
info "Starting state backfill gossip block offers loop", workerId = worker.id
try:
# Create one client per worker in order to improve performance.
# WebSocket connections don't perform well when shared by many
# concurrent workers.
let portalClient = newRpcClientConnect(portalEndpoint.rpcUrl)
var blockOffers = await blockOffersQueue.popFirst()
worker.portalClient = newRpcClientConnect(worker.portalUrl)
var blockOffers = await worker.blockOffersQueue.popFirst()
while true:
# A table of offer key, value pairs is used to filter out duplicates so
@ -315,8 +330,8 @@ proc runBackfillGossipBlockOffersLoop(
let
xId = ContentKeyByteList.init(x[0]).toContentId()
yId = ContentKeyByteList.init(y[0]).toContentId()
xDistance = portalEndpoint.nodeId xor xId
yDistance = portalEndpoint.nodeId xor yId
xDistance = worker.nodeId xor xId
yDistance = worker.nodeId xor yId
if xDistance == yDistance:
0
@ -335,28 +350,31 @@ proc runBackfillGossipBlockOffersLoop(
var gossipContent = true
if skipGossipForExisting:
try:
let contentInfo = await portalClient.portal_stateGetContent(k.to0xHex())
let contentInfo =
await worker.portalClient.portal_stateGetContent(k.to0xHex())
if contentInfo.content.len() > 0:
gossipContent = false
except CatchableError as e:
debug "Unable to find existing content. Will attempt to gossip content: ",
contentKey = k.to0xHex(), error = e.msg, workerId
contentKey = k.to0xHex(), error = e.msg, workerId = worker.id
# Gossip the content into the network
if gossipContent:
try:
let
putContentResult =
await portalClient.portal_statePutContent(k.to0xHex(), v.to0xHex())
putContentResult = await worker.portalClient.portal_statePutContent(
k.to0xHex(), v.to0xHex()
)
numPeers = putContentResult.peerCount
if numPeers > 0:
debug "Offer successfully gossipped to peers: ", numPeers, workerId
debug "Offer successfully gossipped to peers: ",
numPeers, workerId = worker.id
elif numPeers == 0:
warn "Offer gossipped to no peers", workerId
warn "Offer gossipped to no peers", workerId = worker.id
retryGossip = true
break
except CatchableError as e:
error "Failed to gossip offer to peers", error = e.msg, workerId
error "Failed to gossip offer to peers", error = e.msg, workerId = worker.id
retryGossip = true
break
@ -370,15 +388,16 @@ proc runBackfillGossipBlockOffersLoop(
for k, _ in offersMap:
try:
let contentInfo = await portalClient.portal_stateGetContent(k.to0xHex())
let contentInfo =
await worker.portalClient.portal_stateGetContent(k.to0xHex())
if contentInfo.content.len() == 0:
error "Found empty contentValue", workerId
error "Found empty contentValue", workerId = worker.id
retryGossip = true
break
foundContentKeys.add(k)
except CatchableError as e:
warn "Unable to find content with key. Will retry gossipping content:",
contentKey = k.to0xHex(), error = e.msg, workerId
contentKey = k.to0xHex(), error = e.msg, workerId = worker.id
retryGossip = true
break
@ -392,49 +411,60 @@ proc runBackfillGossipBlockOffersLoop(
warn "Retrying state gossip for block: ",
blockNumber = blockOffers.blockNumber,
remainingOffers = offersMap.len(),
workerId
workerId = worker.id
# We might need to reconnect if using a WebSocket client
await portalClient.tryReconnect(portalEndpoint.rpcUrl)
await worker.portalClient.tryReconnect(worker.portalUrl)
continue
if blockOffers.blockNumber mod 1000 == 0:
info "Finished gossiping offers for block: ",
workerId, blockNumber = blockOffers.blockNumber, offerCount = offersMap.len()
blockNumber = blockOffers.blockNumber,
offerCount = offersMap.len(),
workerId = worker.id
else:
debug "Finished gossiping offers for block: ",
workerId, blockNumber = blockOffers.blockNumber, offerCount = offersMap.len()
blockNumber = blockOffers.blockNumber,
offerCount = offersMap.len(),
workerId = worker.id
blockOffers = await blockOffersQueue.popFirst()
blockOffers = await worker.blockOffersQueue.popFirst()
except CancelledError:
trace "gossipBlockOffersLoop canceled"
proc runBackfillMetricsLoop(
blockDataQueue: AsyncQueue[BlockData], blockOffersQueue: AsyncQueue[BlockOffersRef]
) {.async: (raises: [CancelledError]).} =
proc runBackfillMetricsLoop(bridge: PortalStateBridge) {.async: (raises: []).} =
info "Starting state backfill metrics loop"
try:
while true:
await sleepAsync(30.seconds)
if blockDataQueue.len() > 0:
if bridge.blockDataQueue.len() > 0:
info "Block data queue metrics: ",
nextBlockNumber = blockDataQueue[0].blockNumber,
blockDataQueueLen = blockDataQueue.len()
nextBlockNumber = bridge.blockDataQueue[0].blockNumber,
blockDataQueueLen = bridge.blockDataQueue.len()
else:
info "Block data queue metrics: ", blockDataQueueLen = blockDataQueue.len()
info "Block data queue metrics: ",
blockDataQueueLen = bridge.blockDataQueue.len()
if blockOffersQueue.len() > 0:
if bridge.blockOffersQueue.len() > 0:
info "Block offers queue metrics: ",
nextBlockNumber = blockOffersQueue[0].blockNumber,
blockOffersQueueLen = blockOffersQueue.len()
nextBlockNumber = bridge.blockOffersQueue[0].blockNumber,
blockOffersQueueLen = bridge.blockOffersQueue.len()
else:
info "Block offers queue metrics: ", blockOffersQueueLen = blockOffersQueue.len()
info "Block offers queue metrics: ",
blockOffersQueueLen = bridge.blockOffersQueue.len()
except CancelledError:
trace "metricsLoop canceled"
proc runState*(config: PortalBridgeConf) =
proc validatePortalRpcEndpoints(
portalRpcUrl: JsonRpcUrl, numOfEndpoints: int
): seq[(JsonRpcUrl, NodeId)] =
var
uri = parseUri(config.portalRpcUrl.value)
portalEndpoints = newSeq[PortalEndpoint]()
uri = parseUri(portalRpcUrl.value)
endpoints = newSeq[(JsonRpcUrl, NodeId)]()
for i in 0 ..< config.portalRpcEndpoints.int:
for i in 0 ..< numOfEndpoints:
let
rpcUrl =
try:
@ -449,58 +479,81 @@ proc runState*(config: PortalBridgeConf) =
fatal "Failed to connect to portal client", error = $e.msg
quit QuitFailure
info "Connected to portal client with nodeId", nodeId
portalEndpoints.add(PortalEndpoint(rpcUrl: rpcUrl, nodeId: nodeId))
asyncSpawn client.close() # this connection was only used to collect the nodeId
endpoints.add((rpcUrl, nodeId))
uri.port =
try:
$(parseInt(uri.port) + 1)
except ValueError as e:
raiseAssert("Failed to parse int")
let db = DatabaseRef.init(config.stateDir.string).get()
defer:
db.close()
asyncSpawn client.close() # this connection was only used to collect the nodeId
return endpoints
proc validateStartBlockNumber(db: DatabaseRef, startBlockNumber: uint64) =
let maybeLastPersistedBlock = db.getLastPersistedBlockNumber()
if maybeLastPersistedBlock.isSome():
info "Last persisted block found in the database: ",
lastPersistedBlock = maybeLastPersistedBlock.get()
if config.startBlockNumber < 1 or
config.startBlockNumber > maybeLastPersistedBlock.get():
if startBlockNumber < 1 or startBlockNumber > maybeLastPersistedBlock.get():
warn "Start block must be set to a value between 1 and the last persisted block"
quit QuitFailure
else:
info "No last persisted block found in the database"
if config.startBlockNumber != 1:
if startBlockNumber != 1:
warn "Start block must be set to 1"
quit QuitFailure
info "Starting state backfill from block number: ",
proc start(bridge: PortalStateBridge, config: PortalBridgeConf) =
info "Starting state backfill from block: ",
startBlockNumber = config.startBlockNumber
let
bufferSize = 1000
blockDataQueue = newAsyncQueue[BlockData](bufferSize)
blockOffersQueue = newAsyncQueue[BlockOffersRef](bufferSize)
asyncSpawn runBackfillCollectBlockDataLoop(
db, blockDataQueue, config.web3RpcUrl, config.startBlockNumber
bridge.collectBlockDataLoop =
bridge.runBackfillCollectBlockDataLoop(config.startBlockNumber)
bridge.buildBlockOffersLoop = bridge.runBackfillBuildBlockOffersLoop(
config.verifyStateProofs, config.enableGossip, config.gossipGenesis
)
asyncSpawn runBackfillBuildBlockOffersLoop(
db, blockDataQueue, blockOffersQueue, config.verifyStateProofs, config.enableGossip,
config.gossipGenesis,
bridge.metricsLoop = bridge.runBackfillMetricsLoop()
for worker in bridge.gossipWorkers:
worker.gossipBlockOffersLoop = worker.runBackfillGossipBlockOffersLoop(
config.verifyGossip, config.skipGossipForExisting
)
# TODO: Implement stop and clean shutdown
proc runState*(config: PortalBridgeConf) =
let
portalEndpoints =
validatePortalRpcEndpoints(config.portalRpcUrl, config.portalRpcEndpoints.int)
db = DatabaseRef.init(config.stateDir.string).get()
defer:
db.close()
validateStartBlockNumber(db, config.startBlockNumber)
const queueSize = 1000
let bridge = PortalStateBridge(
web3Url: config.web3RpcUrl,
db: db,
blockDataQueue: newAsyncQueue[BlockData](queueSize),
blockOffersQueue: newAsyncQueue[BlockOffersRef](queueSize),
gossipWorkers: newSeq[PortalStateGossipWorker](),
)
for i in 0 ..< config.gossipWorkers.int:
let
portalEndpoint = portalEndpoints[i mod config.portalRpcEndpoints.int]
workerId = i + 1
asyncSpawn runBackfillGossipBlockOffersLoop(
blockOffersQueue, portalEndpoint, config.verifyGossip,
config.skipGossipForExisting, workerId,
(rpcUrl, nodeId) = portalEndpoints[i mod config.portalRpcEndpoints.int]
worker = PortalStateGossipWorker(
id: i + 1,
portalUrl: rpcUrl,
nodeId: nodeId,
blockOffersQueue: bridge.blockOffersQueue,
)
bridge.gossipWorkers.add(worker)
asyncSpawn runBackfillMetricsLoop(blockDataQueue, blockOffersQueue)
bridge.start(config)
while true:
poll()