mirror of
synced 2025-03-01 14:40:54 +00:00
Each individual blob currently uses as much quota from the network limit as an entire block does, 128 items per second shared across all peers. Blobs are 128 KB each instead of up to several MB and are simpler to encode. There can be multiple per block (6 currently), so allow 2000 blobs per second across all peers. That decreases the cost per block from `3125 + 3125 * blobs.len` quota (= `[3125, 21875]`) to a lower `3125 + 200 * blobs.len` quota (= `[3125, 4325]`), accounting for the slight increase in data transfer and encoding time.
368 lines
14 KiB
368 lines
14 KiB
# beacon_chain
# Copyright (c) 2018-2024 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
chronicles, chronos, snappy, snappy/codec,
../spec/datatypes/[phase0, altair, bellatrix, capella, deneb],
../spec/[helpers, forks, network],
topics = "sync_proto"
blockResponseCost = allowedOpsPerSecondCost(64)
## Allow syncing ~64 blocks/sec (minus request costs)
blobResponseCost = allowedOpsPerSecondCost(1000)
## Multiple can exist per block, they are much smaller than blocks
BeaconSyncNetworkState* {.final.} = ref object of RootObj
dag: ChainDAGRef
cfg: RuntimeConfig
genesisBlockRoot: Eth2Digest
BlockRootSlot* = object
blockRoot: Eth2Digest
slot: Slot
BlockRootsList* = List[Eth2Digest, Limit MAX_REQUEST_BLOCKS]
BlobIdentifierList* = List[BlobIdentifier, Limit (MAX_REQUEST_BLOB_SIDECARS)]
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref ForkedSignedBeaconBlock)):
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CancelledError as exc:
raise exc
except CatchableError:
return neterr UnexpectedEOF
if contextBytes == peer.network.forkDigests.phase0:
let res = await readChunkPayload(conn, peer, phase0.SignedBeaconBlock)
if res.isOk:
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
return err(res.error)
elif contextBytes == peer.network.forkDigests.altair:
let res = await readChunkPayload(conn, peer, altair.SignedBeaconBlock)
if res.isOk:
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
return err(res.error)
elif contextBytes == peer.network.forkDigests.bellatrix:
let res = await readChunkPayload(conn, peer, bellatrix.SignedBeaconBlock)
if res.isOk:
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
return err(res.error)
elif contextBytes == peer.network.forkDigests.capella:
let res = await readChunkPayload(conn, peer, capella.SignedBeaconBlock)
if res.isOk:
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
return err(res.error)
elif contextBytes == peer.network.forkDigests.deneb:
let res = await readChunkPayload(conn, peer, deneb.SignedBeaconBlock)
if res.isOk:
return ok newClone(ForkedSignedBeaconBlock.init(res.get))
return err(res.error)
return neterr InvalidContextBytes
proc readChunkPayload*(
conn: Connection, peer: Peer, MsgType: type (ref BlobSidecar)):
Future[NetRes[MsgType]] {.async: (raises: [CancelledError]).} =
var contextBytes: ForkDigest
await conn.readExactly(addr contextBytes, sizeof contextBytes)
except CancelledError as exc:
raise exc
except CatchableError:
return neterr UnexpectedEOF
if contextBytes == peer.network.forkDigests.deneb:
let res = await readChunkPayload(conn, peer, BlobSidecar)
if res.isOk:
return ok newClone(res.get)
return err(res.error)
return neterr InvalidContextBytes
{.pop.} # TODO fix p2p macro for raises
p2pProtocol BeaconSync(version = 1,
networkState = BeaconSyncNetworkState):
proc beaconBlocksByRange_v2(
peer: Peer,
startSlot: Slot,
reqCount: uint64,
reqStep: uint64,
response: MultipleChunksResponse[
ref ForkedSignedBeaconBlock, Limit MAX_REQUEST_BLOCKS])
{.async, libp2pProtocol("beacon_blocks_by_range", 2).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
# hidden copies - in future nim versions with move support, this should
# be revisited
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref ForkedSignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
# TODO reqStep is deprecated - future versions can remove support for
# values != 1: https://github.com/ethereum/consensus-specs/pull/2856
trace "got range request", peer, startSlot,
count = reqCount, step = reqStep
if reqCount == 0 or reqStep == 0:
raise newException(InvalidInputsError, "Empty range requested")
var blocks: array[MAX_REQUEST_BLOCKS.int, BlockId]
dag = peer.networkState.dag
# Limit number of blocks in response
count = int min(reqCount, blocks.lenu64)
endIndex = count - 1
startIndex =
dag.getBlockRange(startSlot, reqStep,
blocks.toOpenArray(0, endIndex))
found = 0
bytes: seq[byte]
for i in startIndex..endIndex:
if dag.getBlockSZ(blocks[i], bytes):
# In general, there is not much intermediate time between post-merge
# blocks all being optimistic and none of them being optimistic. The
# EL catches up, tells the CL the head is verified, and that's it.
if blocks[i].slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and
not dag.head.executionValid:
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blocks[i])
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_range/2")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_range/2")
await response.writeBytesSZ(
uncompressedLen, bytes,
inc found
debug "Block range request done",
peer, startSlot, count, reqStep
proc beaconBlocksByRoot_v2(
peer: Peer,
# Please note that the SSZ list here ensures that the
# spec constant MAX_REQUEST_BLOCKS is enforced:
blockRoots: BlockRootsList,
response: MultipleChunksResponse[
ref ForkedSignedBeaconBlock, Limit MAX_REQUEST_BLOCKS])
{.async, libp2pProtocol("beacon_blocks_by_root", 2).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
# hidden copies - in future nim versions with move support, this should
# be revisited
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref ForkedSignedBeaconBlock]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
if blockRoots.len == 0:
raise newException(InvalidInputsError, "No blocks requested")
dag = peer.networkState.dag
count = blockRoots.len
found = 0
bytes: seq[byte]
for i in 0..<count:
blockRef = dag.getBlockRef(blockRoots[i]).valueOr:
if dag.getBlockSZ(blockRef.bid, bytes):
# In general, there is not much intermediate time between post-merge
# blocks all being optimistic and none of them being optimistic. The
# EL catches up, tells the CL the head is verified, and that's it.
if blockRef.slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and
not dag.head.executionValid:
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read block size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef)
# TODO extract from libp2pProtocol
peer.awaitQuota(blockResponseCost, "beacon_blocks_by_root/2")
peer.network.awaitQuota(blockResponseCost, "beacon_blocks_by_root/2")
await response.writeBytesSZ(
uncompressedLen, bytes,
inc found
debug "Block root request done",
peer, roots = blockRoots.len, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blobsidecarsbyroot-v1
proc blobSidecarsByRoot(
peer: Peer,
blobIds: BlobIdentifierList,
response: MultipleChunksResponse[
ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)])
{.async, libp2pProtocol("blob_sidecars_by_root", 1).} =
# TODO Semantically, this request should return a non-ref, but doing so
# runs into extreme inefficiency due to the compiler introducing
# hidden copies - in future nim versions with move support, this should
# be revisited
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref BlobSidecar]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
trace "got blobs range request", peer, len = blobIds.len
if blobIds.len == 0:
raise newException(InvalidInputsError, "No blobs requested")
dag = peer.networkState.dag
count = blobIds.len
found = 0
bytes: seq[byte]
for i in 0..<count:
let blockRef = dag.getBlockRef(blobIds[i].block_root).valueOr:
let index = blobIds[i].index
if dag.db.getBlobSidecarSZ(blockRef.bid.root, index, bytes):
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read blob size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockRef), blobindex = index
peer.awaitQuota(blobResponseCost, "blob_sidecars_by_root/1")
peer.network.awaitQuota(blobResponseCost, "blob_sidecars_by_root/1")
await response.writeBytesSZ(
uncompressedLen, bytes,
inc found
debug "Blob root request done",
peer, roots = blobIds.len, count, found
# https://github.com/ethereum/consensus-specs/blob/v1.3.0/specs/deneb/p2p-interface.md#blobsidecarsbyrange-v1
proc blobSidecarsByRange(
peer: Peer,
startSlot: Slot,
reqCount: uint64,
response: MultipleChunksResponse[
ref BlobSidecar, Limit(MAX_REQUEST_BLOB_SIDECARS)])
{.async, libp2pProtocol("blob_sidecars_by_range", 1).} =
# TODO This code is more complicated than it needs to be, since the type
# of the multiple chunks response is not actually used in this server
# implementation (it's used to derive the signature of the client
# function, not in the code below!)
# TODO although you can't tell from this function definition, a magic
# client call that returns `seq[ref BlobSidecar]` will
# will be generated by the libp2p macro - we guarantee that seq items
# are `not-nil` in the implementation
trace "got blobs range request", peer, startSlot, count = reqCount
if reqCount == 0:
raise newException(InvalidInputsError, "Empty range requested")
dag = peer.networkState.dag
epochBoundary =
if dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS >= dag.head.slot.epoch:
dag.head.slot.epoch - dag.cfg.MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS
if startSlot.epoch < epochBoundary:
raise newException(ResourceUnavailableError, BlobsOutOfRange)
var blockIds: array[int(MAX_REQUEST_BLOB_SIDECARS), BlockId]
count = int min(reqCount, blockIds.lenu64)
endIndex = count - 1
startIndex =
dag.getBlockRange(startSlot, 1, blockIds.toOpenArray(0, endIndex))
found = 0
bytes: seq[byte]
for i in startIndex..endIndex:
for j in 0..<MAX_BLOBS_PER_BLOCK:
if dag.db.getBlobSidecarSZ(blockIds[i].root, BlobIndex(j), bytes):
# In general, there is not much intermediate time between post-merge
# blocks all being optimistic and none of them being optimistic. The
# EL catches up, tells the CL the head is verified, and that's it.
if blockIds[i].slot.epoch >= dag.cfg.BELLATRIX_FORK_EPOCH and
not dag.head.executionValid:
let uncompressedLen = uncompressedLenFramed(bytes).valueOr:
warn "Cannot read blobs sidecar size, database corrupt?",
bytes = bytes.len(), blck = shortLog(blockIds[i])
# TODO extract from libp2pProtocol
peer.awaitQuota(blobResponseCost, "blobs_sidecars_by_range/1")
peer.network.awaitQuota(blobResponseCost, "blobs_sidecars_by_range/1")
await response.writeBytesSZ(
uncompressedLen, bytes,
inc found
debug "BlobSidecar range request done",
peer, startSlot, count = reqCount, found
proc init*(T: type BeaconSync.NetworkState, dag: ChainDAGRef): T =
dag: dag,