feat: configurable block size limit

This commit is contained in:
Daniil Polyakov 2026-02-24 19:41:01 +03:00
parent 84c23e8680
commit 8b5524901c
24 changed files with 339 additions and 21 deletions

13
Cargo.lock generated
View File

@ -1488,6 +1488,15 @@ dependencies = [
"serde",
]
[[package]]
name = "bytesize"
version = "2.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bd91ee7b2422bcb158d90ef4d14f75ef67f340943fc4149891dcce8f8b972a3"
dependencies = [
"serde_core",
]
[[package]]
name = "bytestring"
version = "1.5.0"
@ -1743,6 +1752,7 @@ dependencies = [
"anyhow",
"base64 0.22.1",
"borsh",
"bytesize",
"hex",
"log",
"logos-blockchain-common-http-client",
@ -3835,6 +3845,7 @@ dependencies = [
"anyhow",
"base64 0.22.1",
"borsh",
"bytesize",
"common",
"env_logger",
"futures",
@ -7430,6 +7441,7 @@ dependencies = [
"base58",
"bedrock_client",
"borsh",
"bytesize",
"chrono",
"common",
"futures",
@ -7460,6 +7472,7 @@ dependencies = [
"base64 0.22.1",
"bedrock_client",
"borsh",
"bytesize",
"common",
"futures",
"hex",

View File

@ -25,8 +25,6 @@ members = [
"indexer/service/protocol",
"indexer/service/rpc",
"explorer_service",
"programs/token/core",
"programs/token",
"program_methods",
"program_methods/guest",
"test_program_methods",
@ -89,6 +87,7 @@ thiserror = "2.0.12"
sha2 = "0.10.8"
hex = "0.4.3"
bytemuck = "1.24.0"
bytesize = { version = "2.3.1", features = ["serde"] }
aes-gcm = "0.10.3"
toml = "0.7.4"
bincode = "1.3.3"

View File

@ -18,6 +18,7 @@ sha2.workspace = true
log.workspace = true
hex.workspace = true
borsh.workspace = true
bytesize.workspace = true
base64.workspace = true
url.workspace = true
logos-blockchain-common-http-client.workspace = true

View File

@ -4,6 +4,7 @@
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"max_block_size": "1 MiB",
"mempool_max_size": 10000,
"block_create_timeout_millis": 10000,
"retry_pending_blocks_timeout_millis": 7000,

View File

@ -28,5 +28,6 @@ tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
hex.workspace = true
tempfile.workspace = true
borsh.workspace = true
bytesize.workspace = true
futures.workspace = true
testcontainers = { version = "0.27.0", features = ["docker-compose"] }

View File

@ -1,6 +1,7 @@
use std::{net::SocketAddr, path::PathBuf};
use anyhow::{Context, Result};
use bytesize::ByteSize;
use common::block::{AccountInitialData, CommitmentsInitialData};
use indexer_service::{BackoffConfig, ChannelId, ClientConfig, IndexerConfig};
use key_protocol::key_management::KeyChain;
@ -39,6 +40,7 @@ pub fn indexer_config(
/// Sequencer config options available for custom changes in integration tests.
pub struct SequencerPartialConfig {
pub max_num_tx_in_block: usize,
pub max_block_size: ByteSize,
pub mempool_max_size: usize,
pub block_create_timeout_millis: u64,
}
@ -47,6 +49,7 @@ impl Default for SequencerPartialConfig {
fn default() -> Self {
Self {
max_num_tx_in_block: 20,
max_block_size: ByteSize::mib(1),
mempool_max_size: 10_000,
block_create_timeout_millis: 10_000,
}
@ -62,6 +65,7 @@ pub fn sequencer_config(
) -> Result<SequencerConfig> {
let SequencerPartialConfig {
max_num_tx_in_block,
max_block_size,
mempool_max_size,
block_create_timeout_millis,
} = partial;
@ -72,6 +76,7 @@ pub fn sequencer_config(
genesis_id: 1,
is_genesis_random: true,
max_num_tx_in_block,
max_block_size,
mempool_max_size,
block_create_timeout_millis,
retry_pending_blocks_timeout_millis: 120_000,

View File

@ -21,6 +21,7 @@ pub mod config;
// TODO: Remove this and control time from tests
pub const TIME_TO_WAIT_FOR_BLOCK_SECONDS: u64 = 12;
pub const NSSA_PROGRAM_FOR_TEST_DATA_CHANGER: &str = "data_changer.bin";
pub const NSSA_PROGRAM_FOR_TEST_NOOP: &str = "noop.bin";
const BEDROCK_SERVICE_WITH_OPEN_PORT: &str = "logos-blockchain-node-0";
const BEDROCK_SERVICE_PORT: u16 = 18080;

View File

@ -0,0 +1,185 @@
use std::time::Duration;
use anyhow::Result;
use bytesize::ByteSize;
use common::{block::HashableBlockData, transaction::NSSATransaction};
use integration_tests::{
TIME_TO_WAIT_FOR_BLOCK_SECONDS, TestContext, config::SequencerPartialConfig,
};
use nssa::program::Program;
use tokio::test;
#[test]
async fn reject_oversized_transaction() -> Result<()> {
let ctx = TestContext::builder()
.with_sequencer_partial_config(SequencerPartialConfig {
max_num_tx_in_block: 100,
max_block_size: ByteSize::mib(1),
mempool_max_size: 1000,
block_create_timeout_millis: 10_000,
})
.build()
.await?;
// Create a transaction that's definitely too large
// Block size is 1 MiB (1,048,576 bytes), minus ~200 bytes for header = ~1,048,376 bytes max tx
// Create a 1.1 MiB binary to ensure it exceeds the limit
let oversized_binary = vec![0u8; 1100 * 1024]; // 1.1 MiB binary
let message = nssa::program_deployment_transaction::Message::new(oversized_binary);
let tx = nssa::ProgramDeploymentTransaction::new(message);
// Try to submit the transaction and expect an error
let result = ctx.sequencer_client().send_tx_program(tx).await;
assert!(
result.is_err(),
"Expected error when submitting oversized transaction"
);
let err = result.unwrap_err();
let err_str = format!("{:?}", err);
// Check if the error contains information about transaction being too large
assert!(
err_str.contains("TransactionTooLarge") || err_str.contains("too large"),
"Expected TransactionTooLarge error, got: {}",
err_str
);
Ok(())
}
#[test]
async fn accept_transaction_within_limit() -> Result<()> {
let ctx = TestContext::builder()
.with_sequencer_partial_config(SequencerPartialConfig {
max_num_tx_in_block: 100,
max_block_size: ByteSize::mib(1),
mempool_max_size: 1000,
block_create_timeout_millis: 10_000,
})
.build()
.await?;
// Create a small program deployment that should fit
let small_binary = vec![0u8; 1024]; // 1 KiB binary
let message = nssa::program_deployment_transaction::Message::new(small_binary);
let tx = nssa::ProgramDeploymentTransaction::new(message);
// This should succeed
let result = ctx.sequencer_client().send_tx_program(tx).await;
assert!(
result.is_ok(),
"Expected successful submission of small transaction, got error: {:?}",
result.as_ref().unwrap_err()
);
Ok(())
}
#[test]
async fn transaction_deferred_to_next_block_when_current_full() -> Result<()> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let artifacts_dir =
std::path::PathBuf::from(manifest_dir).join("../artifacts/test_program_methods");
let burner_bytecode = std::fs::read(artifacts_dir.join("burner.bin"))?;
let chain_caller_bytecode = std::fs::read(artifacts_dir.join("chain_caller.bin"))?;
// Calculate block size to fit only one of the two transactions, leaving some room for headers
// (e.g., 10 KiB)
let max_program_size = burner_bytecode.len().max(chain_caller_bytecode.len());
let block_size = ByteSize::b((max_program_size + 10 * 1024) as u64);
let ctx = TestContext::builder()
.with_sequencer_partial_config(SequencerPartialConfig {
max_num_tx_in_block: 100,
max_block_size: block_size,
mempool_max_size: 1000,
block_create_timeout_millis: 10_000,
})
.build()
.await?;
let burner_id = Program::new(burner_bytecode.clone())?.id();
let chain_caller_id = Program::new(chain_caller_bytecode.clone())?.id();
let initial_block_height = ctx.sequencer_client().get_last_block().await?.last_block;
// Submit both program deployments
ctx.sequencer_client()
.send_tx_program(nssa::ProgramDeploymentTransaction::new(
nssa::program_deployment_transaction::Message::new(burner_bytecode),
))
.await?;
ctx.sequencer_client()
.send_tx_program(nssa::ProgramDeploymentTransaction::new(
nssa::program_deployment_transaction::Message::new(chain_caller_bytecode),
))
.await?;
// Wait for first block
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let block1_response = ctx
.sequencer_client()
.get_block(initial_block_height + 1)
.await?;
let block1: HashableBlockData = borsh::from_slice(&block1_response.block)?;
// Check which program is in block 1
let get_program_ids = |block: &HashableBlockData| -> Vec<nssa::ProgramId> {
block
.transactions
.iter()
.filter_map(|tx| {
if let NSSATransaction::ProgramDeployment(deployment) = tx {
let bytecode = deployment.message.clone().into_bytecode();
Program::new(bytecode).ok().map(|p| p.id())
} else {
None
}
})
.collect()
};
let block1_program_ids = get_program_ids(&block1);
// First program should be in block 1, but not both due to block size limit
assert_eq!(
block1_program_ids.len(),
1,
"Expected exactly one program deployment in block 1"
);
assert_eq!(
block1_program_ids[0], burner_id,
"Expected burner program to be deployed in block 1"
);
// Wait for second block
tokio::time::sleep(Duration::from_secs(TIME_TO_WAIT_FOR_BLOCK_SECONDS)).await;
let block2_response = ctx
.sequencer_client()
.get_block(initial_block_height + 2)
.await?;
let block2: HashableBlockData = borsh::from_slice(&block2_response.block)?;
let block2_program_ids = get_program_ids(&block2);
// The other program should be in block 2
assert_eq!(
block2_program_ids.len(),
1,
"Expected exactly one program deployment in block 2"
);
assert_eq!(
block2_program_ids[0], chain_caller_id,
"Expected chain_caller program to be deployed in block 2"
);
Ok(())
}

View File

@ -1,6 +1,7 @@
use std::time::{Duration, Instant};
use anyhow::Result;
use bytesize::ByteSize;
use integration_tests::{
TestContext,
config::{InitialData, SequencerPartialConfig},
@ -178,6 +179,7 @@ impl TpsTestManager {
fn generate_sequencer_partial_config() -> SequencerPartialConfig {
SequencerPartialConfig {
max_num_tx_in_block: 300,
max_block_size: ByteSize::mb(500),
mempool_max_size: 10_000,
block_create_timeout_millis: 12_000,
}

View File

@ -2,13 +2,17 @@ use tokio::sync::mpsc::{Receiver, Sender};
pub struct MemPool<T> {
receiver: Receiver<T>,
front_buffer: Vec<T>,
}
impl<T> MemPool<T> {
pub fn new(max_size: usize) -> (Self, MemPoolHandle<T>) {
let (sender, receiver) = tokio::sync::mpsc::channel(max_size);
let mem_pool = Self { receiver };
let mem_pool = Self {
receiver,
front_buffer: Vec::new(),
};
let sender = MemPoolHandle::new(sender);
(mem_pool, sender)
}
@ -16,6 +20,13 @@ impl<T> MemPool<T> {
pub fn pop(&mut self) -> Option<T> {
use tokio::sync::mpsc::error::TryRecvError;
// First check if there are any items in the front buffer (LIFO)
if let Some(item) = self.front_buffer.pop() {
return Some(item);
}
// Otherwise, try to receive from the channel (FIFO)
match self.receiver.try_recv() {
Ok(item) => Some(item),
Err(TryRecvError::Empty) => None,
@ -24,6 +35,11 @@ impl<T> MemPool<T> {
}
}
}
/// Push an item to the front of the mempool (will be popped first)
pub fn push_front(&mut self, item: T) {
self.front_buffer.push(item);
}
}
pub struct MemPoolHandle<T> {
@ -96,4 +112,24 @@ mod tests {
assert_eq!(pool.pop(), Some(1));
assert_eq!(pool.pop(), Some(2));
}
#[test]
async fn test_push_front() {
let (mut pool, handle) = MemPool::new(10);
handle.push(1).await.unwrap();
handle.push(2).await.unwrap();
// Push items to the front - these should be popped first
pool.push_front(10);
pool.push_front(20);
// Items pushed to front are popped in LIFO order
assert_eq!(pool.pop(), Some(20));
assert_eq!(pool.pop(), Some(10));
// Original items are then popped in FIFO order
assert_eq!(pool.pop(), Some(1));
assert_eq!(pool.pop(), Some(2));
assert_eq!(pool.pop(), None);
}
}

View File

@ -21,7 +21,7 @@ fn hash_value(value: &Value) -> Node {
}
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
#[derive(BorshSerialize, BorshDeserialize)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
pub struct MerkleTree {
nodes: Vec<Node>,
capacity: usize,

View File

@ -16,7 +16,7 @@ use crate::{
pub const MAX_NUMBER_CHAINED_CALLS: usize = 10;
#[derive(BorshSerialize, BorshDeserialize)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
pub(crate) struct CommitmentSet {
merkle_tree: MerkleTree,
@ -64,6 +64,7 @@ impl CommitmentSet {
}
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
#[derive(Clone)]
struct NullifierSet(BTreeSet<Nullifier>);
impl NullifierSet {
@ -104,7 +105,7 @@ impl BorshDeserialize for NullifierSet {
}
}
#[derive(BorshSerialize, BorshDeserialize)]
#[derive(Clone, BorshSerialize, BorshDeserialize)]
#[cfg_attr(test, derive(Debug, PartialEq, Eq))]
pub struct V02State {
public_state: HashMap<AccountId, Account>,

View File

@ -24,6 +24,7 @@ logos-blockchain-key-management-system-service.workspace = true
logos-blockchain-core.workspace = true
rand.workspace = true
borsh.workspace = true
bytesize.workspace = true
url.workspace = true
jsonrpsee = { workspace = true, features = ["ws-client"] }

View File

@ -6,6 +6,7 @@ use std::{
use anyhow::Result;
use bedrock_client::BackoffConfig;
use bytesize::ByteSize;
use common::{
block::{AccountInitialData, CommitmentsInitialData},
config::BasicAuth,
@ -27,6 +28,9 @@ pub struct SequencerConfig {
pub is_genesis_random: bool,
/// Maximum number of transactions in block
pub max_num_tx_in_block: usize,
/// Maximum block size (includes header and transactions)
#[serde(default = "default_max_block_size")]
pub max_block_size: ByteSize,
/// Mempool maximum size
pub mempool_max_size: usize,
/// Interval in which blocks produced
@ -68,3 +72,7 @@ impl SequencerConfig {
Ok(serde_json::from_reader(reader)?)
}
}
fn default_max_block_size() -> ByteSize {
ByteSize::mib(1)
}

View File

@ -48,6 +48,7 @@ pub struct SequencerCore<
pub enum TransactionMalformationError {
InvalidSignature,
FailedToDecode { tx: HashType },
TransactionTooLarge { size: usize, max: usize },
}
impl Display for TransactionMalformationError {
@ -204,13 +205,49 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
let mut valid_transactions = vec![];
let max_block_size = self.sequencer_config.max_block_size.as_u64() as usize;
let latest_block_meta = self
.store
.latest_block_meta()
.context("Failed to get latest block meta from store")?;
let curr_time = chrono::Utc::now().timestamp_millis() as u64;
while let Some(tx) = self.mempool.pop() {
let tx_hash = tx.hash();
// Check if block size exceeds limit
let temp_valid_transactions =
[valid_transactions.as_slice(), std::slice::from_ref(&tx)].concat();
let temp_hashable_data = HashableBlockData {
block_id: new_block_height,
transactions: temp_valid_transactions,
prev_block_hash: latest_block_meta.hash,
timestamp: curr_time,
};
let block_size = borsh::to_vec(&temp_hashable_data)
.context("Failed to serialize block for size check")?
.len();
if block_size > max_block_size {
// Block would exceed size limit, remove last transaction and push back
warn!(
"Transaction with hash {tx_hash} deferred to next block: \
block size {block_size} bytes would exceed limit of {max_block_size} bytes",
);
self.mempool.push_front(tx);
break;
}
match self.execute_check_transaction_on_state(tx) {
Ok(valid_tx) => {
info!("Validated transaction with hash {tx_hash}, including it in block",);
valid_transactions.push(valid_tx);
info!("Validated transaction with hash {tx_hash}, including it in block");
if valid_transactions.len() >= self.sequencer_config.max_num_tx_in_block {
break;
}
@ -224,13 +261,6 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> SequencerCore<BC, I
}
}
let latest_block_meta = self
.store
.latest_block_meta()
.context("Failed to get latest block meta from store")?;
let curr_time = chrono::Utc::now().timestamp_millis() as u64;
let hashable_data = HashableBlockData {
block_id: new_block_height,
transactions: valid_transactions,
@ -375,6 +405,7 @@ mod tests {
genesis_id: 1,
is_genesis_random: false,
max_num_tx_in_block: 10,
max_block_size: bytesize::ByteSize::mib(1),
mempool_max_size: 10000,
block_create_timeout_millis: 1000,
port: 8080,

View File

@ -25,6 +25,7 @@ itertools.workspace = true
actix-web.workspace = true
tokio.workspace = true
borsh.workspace = true
bytesize.workspace = true
[dev-dependencies]
sequencer_core = { workspace = true, features = ["mock"] }

View File

@ -28,6 +28,7 @@ pub struct JsonHandler<
> {
sequencer_state: Arc<Mutex<SequencerCore<BC, IC>>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
max_block_size: usize,
}
fn respond<T: Serialize>(val: T) -> Result<Value, RpcErr> {

View File

@ -52,7 +52,7 @@ fn get_cors(cors_allowed_origins: &[String]) -> Cors {
.max_age(3600)
}
pub fn new_http_server(
pub async fn new_http_server(
config: RpcConfig,
seuquencer_core: Arc<Mutex<SequencerCore>>,
mempool_handle: MemPoolHandle<NSSATransaction>,
@ -63,9 +63,16 @@ pub fn new_http_server(
limits_config,
} = config;
info!(target:NETWORK, "Starting HTTP server at {addr}");
let max_block_size = seuquencer_core
.lock()
.await
.sequencer_config()
.max_block_size
.as_u64() as usize;
let handler = web::Data::new(JsonHandler {
sequencer_state: seuquencer_core.clone(),
mempool_handle,
max_block_size,
});
// HTTP server

View File

@ -94,6 +94,27 @@ impl<BC: BlockSettlementClientTrait, IC: IndexerClientTrait> JsonHandler<BC, IC>
let tx = borsh::from_slice::<NSSATransaction>(&send_tx_req.transaction).unwrap();
let tx_hash = tx.hash();
// Check transaction size against block size limit
// Reserve ~200 bytes for block header overhead
const BLOCK_HEADER_OVERHEAD: usize = 200;
let tx_size = borsh::to_vec(&tx)
.map_err(
|_| sequencer_core::TransactionMalformationError::FailedToDecode { tx: tx_hash },
)?
.len();
let max_tx_size = self.max_block_size.saturating_sub(BLOCK_HEADER_OVERHEAD);
if tx_size > max_tx_size {
return Err(
sequencer_core::TransactionMalformationError::TransactionTooLarge {
size: tx_size,
max: max_tx_size,
}
.into(),
);
}
let authenticated_tx = tx
.transaction_stateless_check()
.inspect_err(|err| warn!("Error at pre_check {err:#?}"))?;
@ -377,6 +398,7 @@ mod tests {
genesis_id: 1,
is_genesis_random: false,
max_num_tx_in_block: 10,
max_block_size: bytesize::ByteSize::mib(1),
mempool_max_size: 1000,
block_create_timeout_millis: 1000,
port: 8080,
@ -437,12 +459,14 @@ mod tests {
.produce_new_block_with_mempool_transactions()
.unwrap();
let max_block_size = sequencer_core.sequencer_config().max_block_size.as_u64() as usize;
let sequencer_core = Arc::new(Mutex::new(sequencer_core));
(
JsonHandlerWithMockClients {
sequencer_state: sequencer_core,
mempool_handle,
max_block_size,
},
initial_accounts,
tx,

View File

@ -44,10 +44,7 @@ impl RpcErrKind for RpcErrInternal {
impl RpcErrKind for TransactionMalformationError {
fn into_rpc_err(self) -> RpcError {
RpcError::new_internal_error(
Some(serde_json::to_value(self).unwrap()),
"transaction not accepted",
)
RpcError::invalid_params(Some(serde_json::to_value(self).unwrap()))
}
}

View File

@ -4,6 +4,7 @@
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"max_block_size": "1 MiB",
"mempool_max_size": 1000,
"block_create_timeout_millis": 15000,
"retry_pending_blocks_timeout_millis": 5000,

View File

@ -4,6 +4,7 @@
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"max_block_size": "1 MiB",
"mempool_max_size": 10000,
"block_create_timeout_millis": 10000,
"port": 3040,

View File

@ -114,7 +114,8 @@ pub async fn startup_sequencer(app_config: SequencerConfig) -> Result<SequencerH
RpcConfig::with_port(port),
Arc::clone(&seq_core_wrapped),
mempool_handle,
)?;
)
.await?;
info!("HTTP server started");
let http_server_handle = http_server.handle();
tokio::spawn(http_server);

View File

@ -173,7 +173,7 @@ pub async fn execute_subcommand(
.sequencer_client
.send_tx_program(transaction)
.await
.context("Transaction submission error");
.context("Transaction submission error")?;
SubcommandReturnValue::Empty
}