Fluffy state bridge - Add CLI parameters and implement workers for offer gossip. (#2541)

* Use RPC batching to send offer requests and filter out duplicate offers.

* Lookup offers after gossip to check if gossip successful.

* Use multiple workers for gossiping offers.

* Update Fluffy state network logging.

* Use single RPC calls instead of batching.

* Update cli parameters.

* Fix bug in contract trie offer building.
This commit is contained in:
web3-developer 2024-08-06 15:38:38 +08:00 committed by GitHub
parent 9dacfed943
commit 63d13182c1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 229 additions and 110 deletions

View File

@ -97,7 +97,7 @@ proc gossipOffer*(
let req1Peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
info "Offered content gossipped successfully with peers", keyBytes, peers = req1Peers
debug "Offered content gossipped successfully with peers", keyBytes, peers = req1Peers
proc gossipOffer*(
p: PortalProtocol,
@ -110,7 +110,7 @@ proc gossipOffer*(
let req1Peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
info "Offered content gossipped successfully with peers", keyBytes, peers = req1Peers
debug "Offered content gossipped successfully with peers", keyBytes, peers = req1Peers
proc gossipOffer*(
p: PortalProtocol,
@ -123,7 +123,7 @@ proc gossipOffer*(
let peers = await p.neighborhoodGossip(
srcNodeId, ContentKeysList.init(@[keyBytes]), @[offerBytes]
)
info "Offered content gossipped successfully with peers", keyBytes, peers
debug "Offered content gossipped successfully with peers", keyBytes, peers
# Currently only used for testing to gossip an entire account trie proof
# This may also be useful for the state network bridge

View File

@ -173,7 +173,7 @@ proc processOffer*(
n.portalProtocol.storeContent(
contentKeyBytes, contentId, contentValue.toRetrievalValue().encode()
)
info "Offered content validated successfully", contentKeyBytes
debug "Offered content validated successfully", contentKeyBytes
await gossipOffer(
n.portalProtocol, maybeSrcNodeId, contentKeyBytes, contentValueBytes, contentKey,

View File

@ -145,18 +145,32 @@ type
desc: "The block number to start from", defaultValue: 1, name: "start-block"
.}: uint64
verifyState* {.
desc: "Verify the fetched state before gossiping it into the network",
defaultValue: true,
name: "verify-state"
verifyStateProofs* {.
desc: "Verify state proofs before gossiping them into the portal network",
defaultValue: false,
name: "verify-state-proofs"
.}: bool
backfillState* {.
desc: "Backfill pre-merge state data into the network",
gossipGenesis* {.
desc: "Enable gossip of the genesis state into the portal network",
defaultValue: true,
name: "backfill"
name: "gossip-genesis"
.}: bool
verifyGossip* {.
desc:
"Enable verifying that the state was successfully gossipped by fetching it from the network",
defaultValue: false,
name: "verify-gossip"
.}: bool
gossipWorkersCount* {.
desc:
"The number of workers to use for gossiping the state into the portal network",
defaultValue: 2,
name: "gossip-workers"
.}: uint
func parseCmdArg*(T: type TrustedDigest, input: string): T {.raises: [ValueError].} =
TrustedDigest.fromHex(input)

View File

@ -12,6 +12,7 @@ import
chronicles,
chronos,
stint,
json_serialization,
stew/byteutils,
web3/[eth_api, eth_api_types],
results,
@ -65,7 +66,9 @@ proc getLastPersistedBlockNumber(db: DatabaseRef): Opt[uint64] =
raiseAssert(e.msg) # Should never happen
proc putLastPersistedBlockNumber(db: DatabaseRef, blockNumber: uint64) {.inline.} =
db.put(rlp.encode("lastPersistedBlockNumber"), rlp.encode(blockNumber))
# Only update the last persisted block number if it's greater than the current one
if blockNumber > db.getLastPersistedBlockNumber().valueOr(0):
db.put(rlp.encode("lastPersistedBlockNumber"), rlp.encode(blockNumber))
proc runBackfillCollectBlockDataLoop(
db: DatabaseRef,
@ -132,6 +135,8 @@ proc runBackfillBuildBlockOffersLoop(
db: DatabaseRef,
blockDataQueue: AsyncQueue[BlockData],
blockOffersQueue: AsyncQueue[BlockOffersRef],
verifyStateProofs: bool,
gossipGenesis: bool,
) {.async: (raises: [CancelledError]).} =
info "Starting state backfill build block offers loop"
@ -158,20 +163,21 @@ proc runBackfillBuildBlockOffersLoop(
raiseAssert(e.msg) # Should never happen
ws.applyGenesisAccounts(genesisAccounts)
let genesisBlockHash = KeccakHash.fromHex(
"d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3"
)
var builder = OffersBuilderRef.init(ws, genesisBlockHash)
builder.buildBlockOffers()
await blockOffersQueue.addLast(
BlockOffersRef(
blockNumber: 0.uint64,
accountTrieOffers: builder.getAccountTrieOffers(),
contractTrieOffers: builder.getContractTrieOffers(),
contractCodeOffers: builder.getContractCodeOffers(),
if gossipGenesis:
let genesisBlockHash = KeccakHash.fromHex(
"d4e56740f876aef8c010b86a40d5f56745a118d0906a34e69aec8c0db1cb8fa3"
)
var builder = OffersBuilder.init(ws, genesisBlockHash)
builder.buildBlockOffers()
await blockOffersQueue.addLast(
BlockOffersRef(
blockNumber: 0.uint64,
accountTrieOffers: builder.getAccountTrieOffers(),
contractTrieOffers: builder.getContractTrieOffers(),
contractCodeOffers: builder.getContractCodeOffers(),
)
)
)
# Load the world state using the parent state root
let worldState = WorldStateRef.init(db, firstBlock.parentStateRoot)
@ -204,9 +210,10 @@ proc runBackfillBuildBlockOffersLoop(
trace "State diffs successfully applied to block number:",
blockNumber = blockData.blockNumber
# worldState.verifyProofs(blockData.parentStateRoot, blockData.stateRoot)
if verifyStateProofs:
worldState.verifyProofs(blockData.parentStateRoot, blockData.stateRoot)
var builder = OffersBuilderRef.init(worldState, blockData.blockHash)
var builder = OffersBuilder.init(worldState, blockData.blockHash)
builder.buildBlockOffers()
await blockOffersQueue.addLast(
@ -223,50 +230,95 @@ proc runBackfillBuildBlockOffersLoop(
# to enable restarting from this block if needed
db.putLastPersistedBlockNumber(blockData.blockNumber)
proc gossipOffer(
portalClient: RpcClient,
proc collectOffer(
offersMap: TableRef[seq[byte], seq[byte]],
offerWithKey:
AccountTrieOfferWithKey | ContractTrieOfferWithKey | ContractCodeOfferWithKey,
) {.async: (raises: [CancelledError]).} =
let
keyBytes = offerWithKey.key.toContentKey().encode().asSeq()
offerBytes = offerWithKey.offer.encode()
try:
let numPeers =
await portalClient.portal_stateGossip(keyBytes.to0xHex(), offerBytes.to0xHex())
debug "Gossiping offer to peers: ", offerKey = keyBytes.to0xHex(), numPeers
except CatchableError as e:
raiseAssert(e.msg) # Should never happen
) {.inline.} =
let keyBytes = offerWithKey.key.toContentKey().encode().asSeq()
offersMap[keyBytes] = offerWithKey.offer.encode()
proc recursiveGossipOffer(
portalClient: RpcClient,
proc recursiveCollectOffer(
offersMap: TableRef[seq[byte], seq[byte]],
offerWithKey: AccountTrieOfferWithKey | ContractTrieOfferWithKey,
) {.async: (raises: [CancelledError]).} =
await portalClient.gossipOffer(offerWithKey)
) =
offersMap.collectOffer(offerWithKey)
# root node, recursive gossip is finished
# root node, recursive collect is finished
if offerWithKey.key.path.unpackNibbles().len() == 0:
return
# continue the recursive gossip by sharing the parent offer with peers
await portalClient.recursiveGossipOffer(offerWithKey.getParent())
# continue the recursive collect
offersMap.recursiveCollectOffer(offerWithKey.getParent())
proc runBackfillGossipBlockOffersLoop(
blockOffersQueue: AsyncQueue[BlockOffersRef], portalClient: RpcClient
blockOffersQueue: AsyncQueue[BlockOffersRef],
portalClient: RpcClient,
verifyGossip: bool,
workerId: int,
) {.async: (raises: [CancelledError]).} =
info "Starting state backfill gossip block offers loop"
info "Starting state backfill gossip block offers loop", workerId
var blockOffers = await blockOffersQueue.popFirst()
while true:
let blockOffers = await blockOffersQueue.popFirst()
# A table of offer key, value pairs is used to filter out duplicates so
# that we don't gossip the same offer multiple times.
let offersMap = newTable[seq[byte], seq[byte]]()
for offerWithKey in blockOffers.accountTrieOffers:
await portalClient.recursiveGossipOffer(offerWithKey)
offersMap.recursiveCollectOffer(offerWithKey)
for offerWithKey in blockOffers.contractTrieOffers:
await portalClient.recursiveGossipOffer(offerWithKey)
offersMap.recursiveCollectOffer(offerWithKey)
for offerWithKey in blockOffers.contractCodeOffers:
await portalClient.gossipOffer(offerWithKey)
offersMap.collectOffer(offerWithKey)
var retryGossip = false
for k, v in offersMap:
try:
let numPeers = await portalClient.portal_stateGossip(k.to0xHex(), v.to0xHex())
if numPeers == 0:
warn "Offer gossipped to no peers", workerId
retryGossip = true
break
except CatchableError as e:
error "Failed to gossip offer to peers", error = e.msg, workerId
retryGossip = true
break
if retryGossip:
await sleepAsync(1.seconds)
warn "Retrying state gossip for block number: ",
blockNumber = blockOffers.blockNumber, workerId
continue
if verifyGossip:
#await sleepAsync(100.milliseconds) # wait for the peers to be updated
for k, _ in offersMap:
try:
let contentInfo =
await portalClient.portal_stateRecursiveFindContent(k.to0xHex())
if contentInfo.content.len() == 0:
error "Found empty contentValue", workerId
retryGossip = true
break
except CatchableError as e:
error "Failed to find content with key: ",
contentKey = k, error = e.msg, workerId
retryGossip = true
break
if retryGossip:
await sleepAsync(1.seconds)
warn "Retrying state gossip for block number: ",
blockNumber = blockOffers.blockNumber
continue
if blockOffers.blockNumber mod 1000 == 0:
info "Finished gossiping offers for block number: ",
workerId, blockNumber = blockOffers.blockNumber, offerCount = offersMap.len()
blockOffers = await blockOffersQueue.popFirst()
proc runBackfillMetricsLoop(
blockDataQueue: AsyncQueue[BlockData], blockOffersQueue: AsyncQueue[BlockOffersRef]
@ -275,8 +327,12 @@ proc runBackfillMetricsLoop(
while true:
await sleepAsync(10.seconds)
info "Block data queue length: ", blockDataQueueLen = blockDataQueue.len()
info "Block offers queue length: ", blockOffersQueueLen = blockOffersQueue.len()
info "Block data queue metrics: ",
nextBlockNumber = blockDataQueue[0].blockNumber,
blockDataQueueLen = blockDataQueue.len()
info "Block offers queue metrics: ",
nextBlockNumber = blockOffersQueue[0].blockNumber,
blockOffersQueueLen = blockOffersQueue.len()
proc runState*(config: PortalBridgeConf) =
let
@ -289,47 +345,41 @@ proc runState*(config: PortalBridgeConf) =
if web3Client of RpcHttpClient:
warn "Using a WebSocket connection to the JSON-RPC API is recommended to improve performance"
# TODO:
# Here we'd want to implement initially a loop that backfills the state
# content. Secondly, a loop that follows the head and injects the latest
# state changes too.
#
# The first step would probably be the easier one to start with, as one
# can start from genesis state.
# It could be implemented by using the `exp_getProofsByBlockNumber` JSON-RPC
# method from nimbus-eth1.
# It could also be implemented by having the whole state execution happening
# inside the bridge, and getting the blocks from era1 files.
let maybeLastPersistedBlock = db.getLastPersistedBlockNumber()
if maybeLastPersistedBlock.isSome():
info "Last persisted block found in the database: ",
lastPersistedBlock = maybeLastPersistedBlock.get()
if config.startBlockNumber < 1 or
config.startBlockNumber > maybeLastPersistedBlock.get():
warn "Start block must be set to a value between 1 and the last persisted block"
quit QuitFailure
else:
info "No last persisted block found in the database"
if config.startBlockNumber != 1:
warn "Start block must be set to 1"
quit QuitFailure
if config.backfillState:
let maybeLastPersistedBlock = db.getLastPersistedBlockNumber()
if maybeLastPersistedBlock.isSome():
info "Last persisted block found in the database: ",
lastPersistedBlock = maybeLastPersistedBlock.get()
if config.startBlockNumber < 1 or
config.startBlockNumber > maybeLastPersistedBlock.get():
warn "Start block must be set to a value between 1 and the last persisted block"
quit QuitFailure
else:
info "No last persisted block found in the database"
if config.startBlockNumber != 1:
warn "Start block must be set to 1"
quit QuitFailure
info "Starting state backfill from block number: ",
startBlockNumber = config.startBlockNumber
info "Starting state backfill from block number: ",
startBlockNumber = config.startBlockNumber
let
bufferSize = 1000
blockDataQueue = newAsyncQueue[BlockData](bufferSize)
blockOffersQueue = newAsyncQueue[BlockOffersRef](bufferSize)
const bufferSize = 1000 # Should we make this configurable?
let
blockDataQueue = newAsyncQueue[BlockData](bufferSize)
blockOffersQueue = newAsyncQueue[BlockOffersRef](bufferSize)
asyncSpawn runBackfillCollectBlockDataLoop(
db, blockDataQueue, web3Client, config.startBlockNumber
)
asyncSpawn runBackfillBuildBlockOffersLoop(
db, blockDataQueue, blockOffersQueue, config.verifyStateProofs, config.gossipGenesis
)
asyncSpawn runBackfillCollectBlockDataLoop(
db, blockDataQueue, web3Client, config.startBlockNumber
for workerId in 1 .. config.gossipWorkersCount.int:
asyncSpawn runBackfillGossipBlockOffersLoop(
blockOffersQueue, portalClient, config.verifyGossip, workerId
)
asyncSpawn runBackfillBuildBlockOffersLoop(db, blockDataQueue, blockOffersQueue)
asyncSpawn runBackfillGossipBlockOffersLoop(blockOffersQueue, portalClient)
asyncSpawn runBackfillMetricsLoop(blockDataQueue, blockOffersQueue)
asyncSpawn runBackfillMetricsLoop(blockDataQueue, blockOffersQueue)
while true:
poll()

View File

@ -13,23 +13,21 @@ import
../../../network/state/[state_content, state_utils, state_gossip],
./world_state
type OffersBuilderRef* = ref object
type OffersBuilder* = object
worldState: WorldStateRef
blockHash: BlockHash
accountTrieOffers: seq[AccountTrieOfferWithKey]
contractTrieOffers: seq[ContractTrieOfferWithKey]
contractCodeOffers: seq[ContractCodeOfferWithKey]
proc init*(
T: type OffersBuilderRef, worldState: WorldStateRef, blockHash: BlockHash
): T =
proc init*(T: type OffersBuilder, worldState: WorldStateRef, blockHash: BlockHash): T =
T(worldState: worldState, blockHash: blockHash)
proc toTrieProof(proof: seq[seq[byte]]): TrieProof =
TrieProof.init(proof.map((node) => TrieNode.init(node)))
proc buildAccountTrieNodeOffer(
builder: var OffersBuilderRef, address: EthAddress, proof: TrieProof
builder: var OffersBuilder, address: EthAddress, proof: TrieProof
) =
try:
let
@ -44,23 +42,28 @@ proc buildAccountTrieNodeOffer(
raiseAssert(e.msg) # Should never happen
proc buildContractTrieNodeOffer(
builder: var OffersBuilderRef,
builder: var OffersBuilder,
address: EthAddress,
slotHash: SlotKeyHash,
storageProof: TrieProof,
accountProof: TrieProof,
) =
let
path = Nibbles.init(slotHash.data, isEven = true)
offerKey =
ContractTrieNodeKey.init(address, path, keccakHash(storageProof[^1].asSeq()))
offerValue =
ContractTrieNodeOffer.init(storageProof, accountProof, builder.blockHash)
try:
let
path = removeLeafKeyEndNibbles(
Nibbles.init(slotHash.data, isEven = true), storageProof[^1]
)
offerKey =
ContractTrieNodeKey.init(address, path, keccakHash(storageProof[^1].asSeq()))
offerValue =
ContractTrieNodeOffer.init(storageProof, accountProof, builder.blockHash)
builder.contractTrieOffers.add(offerValue.withKey(offerKey))
builder.contractTrieOffers.add(offerValue.withKey(offerKey))
except RlpError as e:
raiseAssert(e.msg) # Should never happen
proc buildContractCodeOffer(
builder: var OffersBuilderRef,
builder: var OffersBuilder,
address: EthAddress,
code: seq[byte],
accountProof: TrieProof,
@ -73,7 +76,7 @@ proc buildContractCodeOffer(
builder.contractCodeOffers.add(offerValue.withKey(offerKey))
proc buildBlockOffers*(builder: var OffersBuilderRef) =
proc buildBlockOffers*(builder: var OffersBuilder) =
for address, proof in builder.worldState.updatedAccountProofs():
let accountProof = toTrieProof(proof)
builder.buildAccountTrieNodeOffer(address, accountProof)
@ -86,11 +89,15 @@ proc buildBlockOffers*(builder: var OffersBuilderRef) =
if code.len() > 0:
builder.buildContractCodeOffer(address, code, accountProof)
proc getAccountTrieOffers*(builder: OffersBuilderRef): seq[AccountTrieOfferWithKey] =
proc getAccountTrieOffers*(builder: OffersBuilder): lent seq[AccountTrieOfferWithKey] =
builder.accountTrieOffers
proc getContractTrieOffers*(builder: OffersBuilderRef): seq[ContractTrieOfferWithKey] =
proc getContractTrieOffers*(
builder: OffersBuilder
): lent seq[ContractTrieOfferWithKey] =
builder.contractTrieOffers
proc getContractCodeOffers*(builder: OffersBuilderRef): seq[ContractCodeOfferWithKey] =
proc getContractCodeOffers*(
builder: OffersBuilder
): lent seq[ContractCodeOfferWithKey] =
builder.contractCodeOffers

View File

@ -202,3 +202,51 @@ proc getUpdatedBytecode*(
state: WorldStateRef, address: EthAddress
): seq[byte] {.inline.} =
state.db.getBytecodeUpdatedCache().get(toAccountKey(address).data)
# Slow: Used for testing only
proc verifyProofs*(
state: WorldStateRef, preStateRoot: KeccakHash, expectedStateRoot: KeccakHash
) =
try:
let trie =
initHexaryTrie(state.db.getAccountsBackend(), preStateRoot, isPruning = false)
var memDb = newMemoryDB()
for k, v in trie.replicate():
memDb.put(k, v)
for address, proof in state.updatedAccountProofs():
doAssert isValidBranch(
proof,
expectedStateRoot,
@(toAccountKey(address).data),
rlpFromBytes(proof[^1]).listElem(1).toBytes(), # pull the value out of the proof
)
for p in proof:
memDb.put(keccakHash(p).data, p)
let memTrie = initHexaryTrie(memDb, expectedStateRoot, isPruning = false)
doAssert(memTrie.rootHash() == expectedStateRoot)
for address, proof in state.updatedAccountProofs():
let
accountBytes = memTrie.get(toAccountKey(address).data)
account = rlp.decode(accountBytes, Account)
doAssert(accountBytes.len() > 0)
doAssert(accountBytes == rlpFromBytes(proof[^1]).listElem(1).toBytes())
# pull the value out of the proof
for slotHash, sProof in state.updatedStorageProofs(address):
doAssert isValidBranch(
sProof,
account.storageRoot,
@(slotHash.data),
rlpFromBytes(sProof[^1]).listElem(1).toBytes(),
# pull the value out of the proof
)
let updatedCode = state.getUpdatedBytecode(address)
if updatedCode.len() > 0:
doAssert(account.codeHash == keccakHash(updatedCode))
except RlpError as e:
raiseAssert(e.msg) # Should never happen