fix: integration updates

This commit is contained in:
Pravdyvy 2026-01-15 15:44:48 +02:00
parent b6a0e62871
commit c2e09031e1
16 changed files with 587 additions and 355 deletions

489
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,8 +1,6 @@
use nomos_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexerConfig {
pub resubscribe_interval: u64,
pub channel_id: ChannelId,
}

View File

@ -1,25 +1,29 @@
use std::sync::Arc;
use anyhow::Result;
use bedrock_client::{BasicAuthCredentials, BedrockClient};
use common::block::{BlockHash, HashableBlockData};
use common::block::HashableBlockData;
use futures::StreamExt;
use log::info;
use nomos_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::mpsc::Sender;
use tokio::sync::{RwLock, mpsc::Sender};
use url::Url;
use crate::{config::IndexerConfig, state::IndexerState};
use crate::{config::IndexerConfig, message::IndexerToSequencerMessage, state::IndexerState};
pub mod config;
pub mod message;
pub mod state;
pub struct IndexerCore {
pub bedrock_client: BedrockClient,
pub bedrock_url: Url,
pub channel_sender: Sender<BlockHash>,
pub channel_sender: Sender<IndexerToSequencerMessage>,
pub config: IndexerConfig,
pub bedrock_url: Url,
pub channel_id: ChannelId,
pub state: IndexerState,
}
@ -27,17 +31,19 @@ impl IndexerCore {
pub fn new(
addr: &str,
auth: Option<BasicAuthCredentials>,
sender: Sender<BlockHash>,
sender: Sender<IndexerToSequencerMessage>,
config: IndexerConfig,
channel_id: ChannelId,
) -> Result<Self> {
Ok(Self {
bedrock_client: BedrockClient::new(auth)?,
bedrock_url: Url::parse(addr)?,
channel_sender: sender,
config,
channel_id,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: 0,
latest_seen_block: Arc::new(RwLock::new(0)),
},
})
}
@ -63,16 +69,29 @@ impl IndexerCore {
.get_block_by_id(self.bedrock_url.clone(), header_id)
.await?
{
info!("Extracted L1 block at height {} with data {l1_block:#?}", block_info.height);
info!("Extracted L1 block at height {}", block_info.height);
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
);
let l2_blocks_parsed =
parse_blocks(l1_block.into_transactions().into_iter(), &self.channel_id);
for l2_block in l2_blocks_parsed {
// State modification, will be updated in future
{
let mut guard = self.state.latest_seen_block.write().await;
if l2_block.block_id > *guard {
*guard = l2_block.block_id;
}
}
// Sending data into sequencer, may need to be expanded.
self.channel_sender.send(l2_block.block_hash()).await?;
let message = IndexerToSequencerMessage::BlockObserved {
l1_block_id: block_info.height,
l2_block_height: l2_block.block_id,
};
self.channel_sender.send(message.clone()).await?;
info!("Sent message {:#?} to sequencer", message);
}
}
}
@ -96,7 +115,6 @@ pub fn parse_blocks(
inscription,
..
}) if channel_id == decoded_channel_id => {
// Assuming, that it is how block will be inscribed on l1
borsh::from_slice::<HashableBlockData>(inscription).ok()
}
_ => None,

7
indexer/src/message.rs Normal file
View File

@ -0,0 +1,7 @@
#[derive(Debug, Clone)]
pub enum IndexerToSequencerMessage {
BlockObserved {
l1_block_id: u64,
l2_block_height: u64,
},
}

View File

@ -1,5 +1,9 @@
use std::sync::Arc;
use tokio::sync::RwLock;
#[derive(Debug)]
pub struct IndexerState {
// Only one field for now, for testing.
pub latest_seen_block: u64,
pub latest_seen_block: Arc<RwLock<u64>>,
}

View File

@ -155,13 +155,13 @@
37,
37
],
"bedrock_addr": "http://127.0.0.1:8080",
"bedrock_auth": [
"user",
"password"
],
"indexer_config": {
"resubscribe_interval": 1000,
"channel_id": "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a"
"bedrock_config": {
"channel_id": "2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a2a",
"node_url": "http://127.0.0.1:8080",
"user": "user",
"password": null,
"indexer_config": {
"resubscribe_interval": 1000
}
}
}

View File

@ -0,0 +1,158 @@
{
"home": "",
"override_rust_log": null,
"genesis_id": 1,
"is_genesis_random": true,
"max_num_tx_in_block": 20,
"mempool_max_size": 10000,
"block_create_timeout_millis": 10000,
"port": 0,
"initial_accounts": [
{
"account_id": "BLgCRDXYdQPMMWVHYRFGQZbgeHx9frkipa8GtpG2Syqy",
"balance": 10000
},
{
"account_id": "Gj1mJy5W7J5pfmLRujmQaLfLMWidNxQ6uwnhb666ZwHw",
"balance": 20000
}
],
"initial_commitments": [
{
"npk": [
63,
202,
178,
231,
183,
82,
237,
212,
216,
221,
215,
255,
153,
101,
177,
161,
254,
210,
128,
122,
54,
190,
230,
151,
183,
64,
225,
229,
113,
1,
228,
97
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 10000,
"data": [],
"nonce": 0
}
},
{
"npk": [
192,
251,
166,
243,
167,
236,
84,
249,
35,
136,
130,
172,
219,
225,
161,
139,
229,
89,
243,
125,
194,
213,
209,
30,
23,
174,
100,
244,
124,
74,
140,
47
],
"account": {
"program_owner": [
0,
0,
0,
0,
0,
0,
0,
0
],
"balance": 20000,
"data": [],
"nonce": 0
}
}
],
"signing_key": [
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37,
37
]
}

View File

@ -38,7 +38,7 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
pub struct TestContext {
sequencer_server_handle: ServerHandle,
sequencer_loop_handle: JoinHandle<Result<()>>,
indexer_loop_handle: JoinHandle<Result<()>>,
indexer_loop_handle: Option<JoinHandle<Result<()>>>,
sequencer_client: SequencerClient,
wallet: WalletCore,
_temp_sequencer_dir: TempDir,
@ -46,12 +46,25 @@ pub struct TestContext {
}
impl TestContext {
/// Create new test context.
/// Create new test context in detached mode. Default.
pub async fn new() -> Result<Self> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let sequencer_config_path =
PathBuf::from(manifest_dir).join("configs/sequencer/sequencer_config.json");
PathBuf::from(manifest_dir).join("configs/sequencer/detached/sequencer_config.json");
let sequencer_config = SequencerConfig::from_path(&sequencer_config_path)
.context("Failed to create sequencer config from file")?;
Self::new_with_sequencer_config(sequencer_config).await
}
/// Create new test context in local bedrock node attached mode.
pub async fn new_bedrock_local_attached() -> Result<Self> {
let manifest_dir = env!("CARGO_MANIFEST_DIR");
let sequencer_config_path = PathBuf::from(manifest_dir)
.join("configs/sequencer/bedrock_local_attached/sequencer_config.json");
let sequencer_config = SequencerConfig::from_path(&sequencer_config_path)
.context("Failed to create sequencer config from file")?;
@ -69,10 +82,15 @@ impl TestContext {
debug!("Test context setup");
let (sequencer_server_handle, sequencer_addr, sequencer_loop_handle, temp_sequencer_dir, indexer_loop_handle) =
Self::setup_sequencer(sequencer_config)
.await
.context("Failed to setup sequencer")?;
let (
sequencer_server_handle,
sequencer_addr,
sequencer_loop_handle,
temp_sequencer_dir,
indexer_loop_handle,
) = Self::setup_sequencer(sequencer_config)
.await
.context("Failed to setup sequencer")?;
// Convert 0.0.0.0 to 127.0.0.1 for client connections
// When binding to port 0, the server binds to 0.0.0.0:<random_port>
@ -103,7 +121,13 @@ impl TestContext {
async fn setup_sequencer(
mut config: SequencerConfig,
) -> Result<(ServerHandle, SocketAddr, JoinHandle<Result<()>>, TempDir, JoinHandle<Result<()>>)> {
) -> Result<(
ServerHandle,
SocketAddr,
JoinHandle<Result<()>>,
TempDir,
Option<JoinHandle<Result<()>>>,
)> {
let temp_sequencer_dir =
tempfile::tempdir().context("Failed to create temp dir for sequencer home")?;
@ -123,7 +147,7 @@ impl TestContext {
sequencer_addr,
sequencer_loop_handle,
temp_sequencer_dir,
indexer_loop_handle
indexer_loop_handle,
))
}
@ -191,7 +215,12 @@ impl Drop for TestContext {
} = self;
sequencer_loop_handle.abort();
indexer_loop_handle.abort();
match indexer_loop_handle {
Some(handle) => {
handle.abort();
}
None => {}
};
// Can't wait here as Drop can't be async, but anyway stop signal should be sent
sequencer_server_handle.stop(true).now_or_never();

View File

@ -6,17 +6,17 @@ use tokio::test;
#[test]
async fn indexer_run_local_node() -> Result<()> {
println!("Waiting 20 seconds for L1 node to start producing");
tokio::time::sleep(std::time::Duration::from_secs(20)).await;
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
let ctx = TestContext::new().await?;
let ctx = TestContext::new_bedrock_local_attached().await?;
info!("Let's observe behaviour");
tokio::time::sleep(std::time::Duration::from_secs(30)).await;
tokio::time::sleep(std::time::Duration::from_secs(300)).await;
let gen_id = ctx.sequencer_client().get_genesis_id().await.unwrap();
info!("btw, gen id is {gen_id:?}");
Ok(())
}
}

View File

@ -1,7 +1,6 @@
use std::time::{Duration, Instant};
use anyhow::Result;
use indexer::config::IndexerConfig;
use integration_tests::TestContext;
use key_protocol::key_management::ephemeral_key_holder::EphemeralKeyHolder;
use log::info;
@ -186,12 +185,6 @@ impl TpsTestManager {
initial_accounts: initial_public_accounts,
initial_commitments: vec![initial_commitment],
signing_key: [37; 32],
bedrock_addr: "0.0.0.0".to_string(),
bedrock_auth: ("".to_string(), "".to_string()),
indexer_config: IndexerConfig {
resubscribe_interval: 100,
channel_id: [42; 32].into(),
},
bedrock_config: None,
}
}

View File

@ -6,6 +6,7 @@ use std::{
use anyhow::Result;
use indexer::config::IndexerConfig;
use nomos_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)]
@ -48,12 +49,6 @@ pub struct SequencerConfig {
pub initial_commitments: Vec<CommitmentsInitialData>,
/// Sequencer own signing key
pub signing_key: [u8; 32],
/// Bedrock addr
pub bedrock_addr: String,
/// Bedrock auth
pub bedrock_auth: (String, String),
/// Indexer config
pub indexer_config: IndexerConfig,
/// Bedrock configuration options
pub bedrock_config: Option<BedrockConfig>,
}
@ -61,9 +56,15 @@ pub struct SequencerConfig {
#[derive(Clone, Serialize, Deserialize)]
pub struct BedrockConfig {
/// Bedrock channel ID
pub channel_id: [u8; 32],
pub channel_id: ChannelId,
/// Bedrock Url
pub node_url: String,
/// Bedrock user
pub user: String,
/// Bedrock password(optional)
pub password: Option<String>,
/// Indexer config
pub indexer_config: IndexerConfig,
}
impl SequencerConfig {

View File

@ -5,10 +5,11 @@ use anyhow::Result;
use common::PINATA_BASE58;
use common::{
HashType,
block::{BlockHash, HashableBlockData},
block::HashableBlockData,
transaction::{EncodedTransaction, NSSATransaction},
};
use config::SequencerConfig;
use indexer::message::IndexerToSequencerMessage;
use log::warn;
use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize};
@ -28,7 +29,7 @@ pub struct SequencerCore {
chain_height: u64,
// No logic here for now
#[allow(unused)]
receiver: Receiver<BlockHash>,
receiver: Option<Receiver<IndexerToSequencerMessage>>,
block_settlement_client: Option<BlockSettlementClient>,
}
@ -50,7 +51,7 @@ impl SequencerCore {
/// Start Sequencer from configuration and construct transaction sender
pub fn start_from_config(
config: SequencerConfig,
receiver: Receiver<BlockHash>,
receiver: Option<Receiver<IndexerToSequencerMessage>>,
) -> (Self, MemPoolHandle<EncodedTransaction>) {
let hashable_data = HashableBlockData {
block_id: config.genesis_id,
@ -275,7 +276,6 @@ mod tests {
use base58::{FromBase58, ToBase58};
use common::test_utils::sequencer_sign_key_for_testing;
use indexer::config::IndexerConfig;
use nssa::PrivateKey;
use super::*;
@ -305,12 +305,6 @@ mod tests {
initial_accounts,
initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_addr: "0.0.0.0".to_string(),
bedrock_auth: ("".to_string(), "".to_string()),
indexer_config: IndexerConfig {
resubscribe_interval: 100,
channel_id: [42; 32].into(),
},
bedrock_config: None,
}
}
@ -357,9 +351,7 @@ mod tests {
async fn common_setup_with_config(
config: SequencerConfig,
) -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
let (_, receiver) = tokio::sync::mpsc::channel(100);
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config, receiver);
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config, None);
let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap();
@ -374,9 +366,7 @@ mod tests {
#[test]
fn test_start_from_config() {
let config = setup_sequencer_config();
let (_, receiver) = tokio::sync::mpsc::channel(100);
let (sequencer, _mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None);
assert_eq!(sequencer.chain_height, config.genesis_id);
assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10);
@ -435,9 +425,7 @@ mod tests {
let initial_accounts = vec![initial_acc1, initial_acc2];
let config = setup_sequencer_config_variable_initial_accounts(initial_accounts);
let (_, receiver) = tokio::sync::mpsc::channel(100);
let (sequencer, _mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None);
let acc1_account_id = config.initial_accounts[0]
.account_id
@ -773,9 +761,8 @@ mod tests {
// from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in
// the temporary directory for the block storage of this test.
{
let (_, receiver) = tokio::sync::mpsc::channel(100);
let (mut sequencer, mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
SequencerCore::start_from_config(config.clone(), None);
let signing_key = PrivateKey::try_new([1; 32]).unwrap();
let tx = common::test_utils::create_transaction_native_token_transfer(
@ -800,9 +787,7 @@ mod tests {
// Instantiating a new sequencer from the same config. This should load the existing block
// with the above transaction and update the state to reflect that.
let (_, receiver) = tokio::sync::mpsc::channel(100);
let (sequencer, _mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone(), None);
let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance;
let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance;

View File

@ -23,7 +23,7 @@ pub struct JsonHandler {
sequencer_state: Arc<Mutex<SequencerCore>>,
// No functionality for now.
#[allow(unused)]
indexer_state: Arc<Mutex<IndexerCore>>,
indexer_state: Option<Arc<Mutex<IndexerCore>>>,
mempool_handle: MemPoolHandle<EncodedTransaction>,
}

View File

@ -47,7 +47,7 @@ pub fn new_http_server(
config: RpcConfig,
seuquencer_core: Arc<Mutex<SequencerCore>>,
mempool_handle: MemPoolHandle<EncodedTransaction>,
indexer_core: Arc<Mutex<IndexerCore>>,
indexer_core: Option<Arc<Mutex<IndexerCore>>>,
) -> io::Result<(actix_web::dev::Server, SocketAddr)> {
let RpcConfig {
addr,

View File

@ -345,7 +345,7 @@ mod tests {
use indexer::{IndexerCore, config::IndexerConfig};
use sequencer_core::{
SequencerCore,
config::{AccountInitialData, SequencerConfig},
config::{AccountInitialData, BedrockConfig, SequencerConfig},
};
use serde_json::Value;
use tempfile::tempdir;
@ -390,31 +390,37 @@ mod tests {
initial_accounts,
initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_addr: "http://127.0.0.1:8080".to_string(),
bedrock_auth: ("".to_string(), "".to_string()),
indexer_config: IndexerConfig {
resubscribe_interval: 100,
bedrock_config: Some(BedrockConfig {
channel_id: [42; 32].into(),
},
bedrock_config: None,
node_url: "http://localhost:8080".to_string(),
user: "user".to_string(),
password: None,
indexer_config: IndexerConfig {
resubscribe_interval: 100,
},
}),
}
}
async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
let config = sequencer_config_for_tests();
let bedrock_config = config.bedrock_config.clone().unwrap();
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let indexer_core = IndexerCore::new(
&config.bedrock_addr,
&bedrock_config.node_url,
Some(BasicAuthCredentials::new(
config.bedrock_auth.0.clone(),
Some(config.bedrock_auth.1.clone()),
bedrock_config.user.clone(),
bedrock_config.password.clone(),
)),
sender,
config.indexer_config.clone(),
bedrock_config.indexer_config.clone(),
bedrock_config.channel_id.into(),
)
.unwrap();
let (mut sequencer_core, mempool_handle) =
SequencerCore::start_from_config(config, receiver);
SequencerCore::start_from_config(config, Some(receiver));
let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone();
let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap();
@ -445,7 +451,7 @@ mod tests {
(
JsonHandler {
sequencer_state: sequencer_core,
indexer_state: indexer_core,
indexer_state: Some(indexer_core),
mempool_handle,
},
initial_accounts,

View File

@ -22,37 +22,51 @@ struct Args {
pub async fn startup_sequencer(
app_config: SequencerConfig,
) -> Result<(ServerHandle, SocketAddr, JoinHandle<Result<()>>, JoinHandle<Result<()>>)> {
) -> Result<(
ServerHandle,
SocketAddr,
JoinHandle<Result<()>>,
Option<JoinHandle<Result<()>>>,
)> {
let block_timeout = app_config.block_create_timeout_millis;
let port = app_config.port;
// ToDo: Maybe make buffer size configurable.
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let (indexer_core, receiver) = if let Some(bedrock_config) = app_config.bedrock_config.clone() {
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let indexer_core = IndexerCore::new(
&app_config.bedrock_addr,
Some(BasicAuthCredentials::new(
app_config.bedrock_auth.0.clone(),
Some(app_config.bedrock_auth.1.clone()),
)),
sender,
app_config.indexer_config.clone(),
)?;
let indexer_core = IndexerCore::new(
&bedrock_config.node_url,
Some(BasicAuthCredentials::new(
bedrock_config.user.clone(),
bedrock_config.password.clone(),
)),
sender,
bedrock_config.indexer_config.clone(),
bedrock_config.channel_id.into(),
)?;
info!("Indexer core set up");
info!("Indexer core set up");
(Some(indexer_core), Some(receiver))
} else {
info!("Bedrock config not provided, skipping indexer setup");
(None, None)
};
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config, receiver);
info!("Sequencer core set up");
let indexer_core_wrapped = Arc::new(Mutex::new(indexer_core));
let indexer_core_wrapped = indexer_core.map(|core| Arc::new(Mutex::new(core)));
let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core));
let (http_server, addr) = new_http_server(
RpcConfig::with_port(port),
Arc::clone(&seq_core_wrapped),
mempool_handle,
Arc::clone(&indexer_core_wrapped),
indexer_core_wrapped.clone(),
)?;
info!("HTTP server started");
let http_server_handle = http_server.handle();
@ -80,18 +94,25 @@ pub async fn startup_sequencer(
}
});
let indexer_loop_handle = tokio::spawn(async move {
{
let indexer_guard = indexer_core_wrapped.lock().await;
let res = indexer_guard.subscribe_parse_block_stream().await;
let indexer_loop_handle = indexer_core_wrapped.map(|indexer_core_wrapped| {
tokio::spawn(async move {
{
let indexer_guard = indexer_core_wrapped.lock().await;
let res = indexer_guard.subscribe_parse_block_stream().await;
info!("Indexer loop res is {res:#?}");
}
info!("Indexer loop res is {res:#?}");
}
Ok(())
Ok(())
})
});
Ok((http_server_handle, addr, main_loop_handle, indexer_loop_handle))
Ok((
http_server_handle,
addr,
main_loop_handle,
indexer_loop_handle,
))
}
pub async fn main_runner() -> Result<()> {
@ -114,7 +135,10 @@ pub async fn main_runner() -> Result<()> {
let (_, _, main_loop_handle, indexer_loop_handle) = startup_sequencer(app_config).await?;
main_loop_handle.await??;
indexer_loop_handle.await??;
if indexer_loop_handle.is_some() {
indexer_loop_handle.unwrap().await??;
}
Ok(())
}