fix: indexer core added to all objects

This commit is contained in:
Pravdyvy 2026-01-13 15:11:51 +02:00
parent 1fd6651121
commit bbbb1c1a23
18 changed files with 134 additions and 18 deletions

6
Cargo.lock generated
View File

@ -2954,6 +2954,7 @@ dependencies = [
"env_logger", "env_logger",
"futures", "futures",
"hex", "hex",
"indexer",
"key_protocol", "key_protocol",
"log", "log",
"nssa", "nssa",
@ -5176,6 +5177,7 @@ dependencies = [
"chrono", "chrono",
"common", "common",
"futures", "futures",
"indexer",
"log", "log",
"mempool", "mempool",
"nssa", "nssa",
@ -5196,10 +5198,12 @@ dependencies = [
"anyhow", "anyhow",
"base58", "base58",
"base64", "base64",
"bedrock_client",
"borsh", "borsh",
"common", "common",
"futures", "futures",
"hex", "hex",
"indexer",
"itertools 0.14.0", "itertools 0.14.0",
"log", "log",
"mempool", "mempool",
@ -5218,9 +5222,11 @@ dependencies = [
"actix", "actix",
"actix-web", "actix-web",
"anyhow", "anyhow",
"bedrock_client",
"clap", "clap",
"common", "common",
"env_logger", "env_logger",
"indexer",
"log", "log",
"sequencer_core", "sequencer_core",
"sequencer_rpc", "sequencer_rpc",

View File

@ -19,7 +19,7 @@ members = [
"examples/program_deployment", "examples/program_deployment",
"examples/program_deployment/methods", "examples/program_deployment/methods",
"examples/program_deployment/methods/guest", "examples/program_deployment/methods/guest",
"bedrock_client", "bedrock_client",
"indexer", "indexer",
] ]

View File

@ -69,6 +69,10 @@ impl HashableBlockData {
}, },
} }
} }
pub fn block_hash(&self) -> BlockHash {
OwnHasher::hash(&borsh::to_vec(&self).unwrap())
}
} }
impl From<Block> for HashableBlockData { impl From<Block> for HashableBlockData {

View File

@ -30,4 +30,4 @@ async-stream = "0.3.6"
indicatif = { version = "0.18.3", features = ["improved_unicode"] } indicatif = { version = "0.18.3", features = ["improved_unicode"] }
risc0-zkvm.workspace = true risc0-zkvm.workspace = true
url.workspace = true url.workspace = true
nomos-core.workspace = true nomos-core.workspace = true

View File

@ -1,6 +1,7 @@
use nomos_core::mantle::ops::channel::ChannelId; use nomos_core::mantle::ops::channel::ChannelId;
use serde::{Deserialize, Serialize};
#[derive(Debug)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexerConfig { pub struct IndexerConfig {
pub resubscribe_interval: u64, pub resubscribe_interval: u64,
pub channel_id: ChannelId, pub channel_id: ChannelId,

View File

@ -1,6 +1,6 @@
use anyhow::Result; use anyhow::Result;
use bedrock_client::{BasicAuthCredentials, BedrockClient}; use bedrock_client::{BasicAuthCredentials, BedrockClient};
use common::block::HashableBlockData; use common::block::{BlockHash, HashableBlockData};
use futures::StreamExt; use futures::StreamExt;
use nomos_core::mantle::{ use nomos_core::mantle::{
Op, SignedMantleTx, Op, SignedMantleTx,
@ -9,22 +9,24 @@ use nomos_core::mantle::{
use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::Sender;
use url::Url; use url::Url;
use crate::config::IndexerConfig; use crate::{config::IndexerConfig, state::IndexerState};
pub mod config; pub mod config;
pub mod state;
pub struct IndexerCore { pub struct IndexerCore {
pub bedrock_client: BedrockClient, pub bedrock_client: BedrockClient,
pub bedrock_url: Url, pub bedrock_url: Url,
pub channel_sender: Sender<HashableBlockData>, pub channel_sender: Sender<BlockHash>,
pub config: IndexerConfig, pub config: IndexerConfig,
pub state: IndexerState,
} }
impl IndexerCore { impl IndexerCore {
pub fn new( pub fn new(
addr: &str, addr: &str,
auth: Option<BasicAuthCredentials>, auth: Option<BasicAuthCredentials>,
sender: Sender<HashableBlockData>, sender: Sender<BlockHash>,
config: IndexerConfig, config: IndexerConfig,
) -> Result<Self> { ) -> Result<Self> {
Ok(Self { Ok(Self {
@ -32,6 +34,10 @@ impl IndexerCore {
bedrock_url: Url::parse(addr)?, bedrock_url: Url::parse(addr)?,
channel_sender: sender, channel_sender: sender,
config, config,
// No state setup for now, future task.
state: IndexerState {
latest_seen_block: 0,
},
}) })
} }
@ -58,7 +64,8 @@ impl IndexerCore {
); );
for l2_block in l2_blocks_parsed { for l2_block in l2_blocks_parsed {
self.channel_sender.send(l2_block).await?; // Sending data into sequencer, may need to be expanded.
self.channel_sender.send(l2_block.block_hash()).await?;
} }
} }
} }

5
indexer/src/state.rs Normal file
View File

@ -0,0 +1,5 @@
#[derive(Debug)]
pub struct IndexerState {
// Only one field for now, for testing.
pub latest_seen_block: u64,
}

View File

@ -11,6 +11,7 @@ sequencer_runner.workspace = true
wallet.workspace = true wallet.workspace = true
common.workspace = true common.workspace = true
key_protocol.workspace = true key_protocol.workspace = true
indexer.workspace = true
anyhow.workspace = true anyhow.workspace = true
env_logger.workspace = true env_logger.workspace = true

View File

@ -1,6 +1,7 @@
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::Result; use anyhow::Result;
use indexer::config::IndexerConfig;
use integration_tests::TestContext; use integration_tests::TestContext;
use key_protocol::key_management::ephemeral_key_holder::EphemeralKeyHolder; use key_protocol::key_management::ephemeral_key_holder::EphemeralKeyHolder;
use log::info; use log::info;
@ -185,6 +186,12 @@ impl TpsTestManager {
initial_accounts: initial_public_accounts, initial_accounts: initial_public_accounts,
initial_commitments: vec![initial_commitment], initial_commitments: vec![initial_commitment],
signing_key: [37; 32], signing_key: [37; 32],
bedrock_addr: "0.0.0.0".to_string(),
bedrock_auth: ("".to_string(), "".to_string()),
indexer_config: IndexerConfig {
resubscribe_interval: 100,
channel_id: [42; 32].into(),
},
} }
} }
} }

View File

@ -9,6 +9,7 @@ nssa_core.workspace = true
common.workspace = true common.workspace = true
storage.workspace = true storage.workspace = true
mempool.workspace = true mempool.workspace = true
indexer.workspace = true
base58.workspace = true base58.workspace = true
anyhow.workspace = true anyhow.workspace = true
@ -17,11 +18,11 @@ serde_json.workspace = true
tempfile.workspace = true tempfile.workspace = true
chrono.workspace = true chrono.workspace = true
log.workspace = true log.workspace = true
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
[features] [features]
default = [] default = []
testnet = [] testnet = []
[dev-dependencies] [dev-dependencies]
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
futures.workspace = true futures.workspace = true

View File

@ -5,6 +5,7 @@ use std::{
}; };
use anyhow::Result; use anyhow::Result;
use indexer::config::IndexerConfig;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -47,6 +48,12 @@ pub struct SequencerConfig {
pub initial_commitments: Vec<CommitmentsInitialData>, pub initial_commitments: Vec<CommitmentsInitialData>,
/// Sequencer own signing key /// Sequencer own signing key
pub signing_key: [u8; 32], pub signing_key: [u8; 32],
/// Bedrock addr
pub bedrock_addr: String,
/// Bedrock auth
pub bedrock_auth: (String, String),
/// Indexer config
pub indexer_config: IndexerConfig,
} }
impl SequencerConfig { impl SequencerConfig {

View File

@ -5,13 +5,14 @@ use anyhow::Result;
use common::PINATA_BASE58; use common::PINATA_BASE58;
use common::{ use common::{
HashType, HashType,
block::HashableBlockData, block::{BlockHash, HashableBlockData},
transaction::{EncodedTransaction, NSSATransaction}, transaction::{EncodedTransaction, NSSATransaction},
}; };
use config::SequencerConfig; use config::SequencerConfig;
use log::warn; use log::warn;
use mempool::{MemPool, MemPoolHandle}; use mempool::{MemPool, MemPoolHandle};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::Receiver;
use crate::block_store::SequencerBlockStore; use crate::block_store::SequencerBlockStore;
@ -24,6 +25,9 @@ pub struct SequencerCore {
mempool: MemPool<EncodedTransaction>, mempool: MemPool<EncodedTransaction>,
sequencer_config: SequencerConfig, sequencer_config: SequencerConfig,
chain_height: u64, chain_height: u64,
// No logic here for now
#[allow(unused)]
receiver: Receiver<BlockHash>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
@ -42,7 +46,10 @@ impl std::error::Error for TransactionMalformationError {}
impl SequencerCore { impl SequencerCore {
/// Start Sequencer from configuration and construct transaction sender /// Start Sequencer from configuration and construct transaction sender
pub fn start_from_config(config: SequencerConfig) -> (Self, MemPoolHandle<EncodedTransaction>) { pub fn start_from_config(
config: SequencerConfig,
receiver: Receiver<BlockHash>,
) -> (Self, MemPoolHandle<EncodedTransaction>) {
let hashable_data = HashableBlockData { let hashable_data = HashableBlockData {
block_id: config.genesis_id, block_id: config.genesis_id,
transactions: vec![], transactions: vec![],
@ -93,6 +100,7 @@ impl SequencerCore {
mempool, mempool,
chain_height: config.genesis_id, chain_height: config.genesis_id,
sequencer_config: config, sequencer_config: config,
receiver,
}; };
this.sync_state_with_stored_blocks(); this.sync_state_with_stored_blocks();
@ -248,6 +256,7 @@ mod tests {
use base58::{FromBase58, ToBase58}; use base58::{FromBase58, ToBase58};
use common::test_utils::sequencer_sign_key_for_testing; use common::test_utils::sequencer_sign_key_for_testing;
use indexer::config::IndexerConfig;
use nssa::PrivateKey; use nssa::PrivateKey;
use super::*; use super::*;
@ -277,6 +286,12 @@ mod tests {
initial_accounts, initial_accounts,
initial_commitments: vec![], initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(), signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_addr: "0.0.0.0".to_string(),
bedrock_auth: ("".to_string(), "".to_string()),
indexer_config: IndexerConfig {
resubscribe_interval: 100,
channel_id: [42; 32].into(),
},
} }
} }
@ -322,7 +337,9 @@ mod tests {
async fn common_setup_with_config( async fn common_setup_with_config(
config: SequencerConfig, config: SequencerConfig,
) -> (SequencerCore, MemPoolHandle<EncodedTransaction>) { ) -> (SequencerCore, MemPoolHandle<EncodedTransaction>) {
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config); let (_, receiver) = tokio::sync::mpsc::channel(100);
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config, receiver);
let tx = common::test_utils::produce_dummy_empty_transaction(); let tx = common::test_utils::produce_dummy_empty_transaction();
mempool_handle.push(tx).await.unwrap(); mempool_handle.push(tx).await.unwrap();
@ -337,7 +354,9 @@ mod tests {
#[test] #[test]
fn test_start_from_config() { fn test_start_from_config() {
let config = setup_sequencer_config(); let config = setup_sequencer_config();
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); let (_, receiver) = tokio::sync::mpsc::channel(100);
let (sequencer, _mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
assert_eq!(sequencer.chain_height, config.genesis_id); assert_eq!(sequencer.chain_height, config.genesis_id);
assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10); assert_eq!(sequencer.sequencer_config.max_num_tx_in_block, 10);
@ -396,7 +415,9 @@ mod tests {
let initial_accounts = vec![initial_acc1, initial_acc2]; let initial_accounts = vec![initial_acc1, initial_acc2];
let config = setup_sequencer_config_variable_initial_accounts(initial_accounts); let config = setup_sequencer_config_variable_initial_accounts(initial_accounts);
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); let (_, receiver) = tokio::sync::mpsc::channel(100);
let (sequencer, _mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
let acc1_account_id = config.initial_accounts[0] let acc1_account_id = config.initial_accounts[0]
.account_id .account_id
@ -729,7 +750,9 @@ mod tests {
// from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in // from `acc_1` to `acc_2`. The block created with that transaction will be kept stored in
// the temporary directory for the block storage of this test. // the temporary directory for the block storage of this test.
{ {
let (mut sequencer, mempool_handle) = SequencerCore::start_from_config(config.clone()); let (_, receiver) = tokio::sync::mpsc::channel(100);
let (mut sequencer, mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
let signing_key = PrivateKey::try_new([1; 32]).unwrap(); let signing_key = PrivateKey::try_new([1; 32]).unwrap();
let tx = common::test_utils::create_transaction_native_token_transfer( let tx = common::test_utils::create_transaction_native_token_transfer(
@ -753,7 +776,9 @@ mod tests {
// Instantiating a new sequencer from the same config. This should load the existing block // Instantiating a new sequencer from the same config. This should load the existing block
// with the above transaction and update the state to reflect that. // with the above transaction and update the state to reflect that.
let (sequencer, _mempool_handle) = SequencerCore::start_from_config(config.clone()); let (_, receiver) = tokio::sync::mpsc::channel(100);
let (sequencer, _mempool_handle) =
SequencerCore::start_from_config(config.clone(), receiver);
let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance; let balance_acc_1 = sequencer.state.get_account_by_id(&acc1_account_id).balance;
let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance; let balance_acc_2 = sequencer.state.get_account_by_id(&acc2_account_id).balance;

View File

@ -8,6 +8,8 @@ nssa.workspace = true
common.workspace = true common.workspace = true
mempool.workspace = true mempool.workspace = true
sequencer_core.workspace = true sequencer_core.workspace = true
indexer.workspace = true
bedrock_client.workspace = true
anyhow.workspace = true anyhow.workspace = true
serde_json.workspace = true serde_json.workspace = true

View File

@ -8,6 +8,7 @@ use common::{
rpc_primitives::errors::{RpcError, RpcErrorKind}, rpc_primitives::errors::{RpcError, RpcErrorKind},
transaction::EncodedTransaction, transaction::EncodedTransaction,
}; };
use indexer::IndexerCore;
use mempool::MemPoolHandle; use mempool::MemPoolHandle;
pub use net_utils::*; pub use net_utils::*;
use sequencer_core::SequencerCore; use sequencer_core::SequencerCore;
@ -20,6 +21,9 @@ use self::types::err_rpc::RpcErr;
// ToDo: Add necessary fields // ToDo: Add necessary fields
pub struct JsonHandler { pub struct JsonHandler {
sequencer_state: Arc<Mutex<SequencerCore>>, sequencer_state: Arc<Mutex<SequencerCore>>,
// No functionality for now.
#[allow(unused)]
indexer_state: Arc<Mutex<IndexerCore>>,
mempool_handle: MemPoolHandle<EncodedTransaction>, mempool_handle: MemPoolHandle<EncodedTransaction>,
} }

View File

@ -7,6 +7,7 @@ use common::{
transaction::EncodedTransaction, transaction::EncodedTransaction,
}; };
use futures::{Future, FutureExt}; use futures::{Future, FutureExt};
use indexer::IndexerCore;
use log::info; use log::info;
use mempool::MemPoolHandle; use mempool::MemPoolHandle;
use sequencer_core::SequencerCore; use sequencer_core::SequencerCore;
@ -46,6 +47,7 @@ pub fn new_http_server(
config: RpcConfig, config: RpcConfig,
seuquencer_core: Arc<Mutex<SequencerCore>>, seuquencer_core: Arc<Mutex<SequencerCore>>,
mempool_handle: MemPoolHandle<EncodedTransaction>, mempool_handle: MemPoolHandle<EncodedTransaction>,
indexer_core: Arc<Mutex<IndexerCore>>,
) -> io::Result<(actix_web::dev::Server, SocketAddr)> { ) -> io::Result<(actix_web::dev::Server, SocketAddr)> {
let RpcConfig { let RpcConfig {
addr, addr,
@ -55,6 +57,7 @@ pub fn new_http_server(
info!(target:NETWORK, "Starting HTTP server at {addr}"); info!(target:NETWORK, "Starting HTTP server at {addr}");
let handler = web::Data::new(JsonHandler { let handler = web::Data::new(JsonHandler {
sequencer_state: seuquencer_core.clone(), sequencer_state: seuquencer_core.clone(),
indexer_state: indexer_core.clone(),
mempool_handle, mempool_handle,
}); });

View File

@ -340,7 +340,9 @@ mod tests {
use base58::ToBase58; use base58::ToBase58;
use base64::{Engine, engine::general_purpose}; use base64::{Engine, engine::general_purpose};
use bedrock_client::BasicAuthCredentials;
use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction}; use common::{test_utils::sequencer_sign_key_for_testing, transaction::EncodedTransaction};
use indexer::{IndexerCore, config::IndexerConfig};
use sequencer_core::{ use sequencer_core::{
SequencerCore, SequencerCore,
config::{AccountInitialData, SequencerConfig}, config::{AccountInitialData, SequencerConfig},
@ -388,12 +390,30 @@ mod tests {
initial_accounts, initial_accounts,
initial_commitments: vec![], initial_commitments: vec![],
signing_key: *sequencer_sign_key_for_testing().value(), signing_key: *sequencer_sign_key_for_testing().value(),
bedrock_addr: "0.0.0.0".to_string(),
bedrock_auth: ("".to_string(), "".to_string()),
indexer_config: IndexerConfig {
resubscribe_interval: 100,
channel_id: [42; 32].into(),
},
} }
} }
async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) { async fn components_for_tests() -> (JsonHandler, Vec<AccountInitialData>, EncodedTransaction) {
let config = sequencer_config_for_tests(); let config = sequencer_config_for_tests();
let (mut sequencer_core, mempool_handle) = SequencerCore::start_from_config(config); let (sender, receiver) = tokio::sync::mpsc::channel(100);
let indexer_core = IndexerCore::new(
&config.bedrock_addr,
Some(BasicAuthCredentials::new(
config.bedrock_auth.0.clone(),
Some(config.bedrock_auth.1.clone()),
)),
sender,
config.indexer_config.clone(),
)
.unwrap();
let (mut sequencer_core, mempool_handle) =
SequencerCore::start_from_config(config, receiver);
let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone(); let initial_accounts = sequencer_core.sequencer_config().initial_accounts.clone();
let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap(); let signing_key = nssa::PrivateKey::try_new([1; 32]).unwrap();
@ -419,10 +439,12 @@ mod tests {
.unwrap(); .unwrap();
let sequencer_core = Arc::new(Mutex::new(sequencer_core)); let sequencer_core = Arc::new(Mutex::new(sequencer_core));
let indexer_core = Arc::new(Mutex::new(indexer_core));
( (
JsonHandler { JsonHandler {
sequencer_state: sequencer_core, sequencer_state: sequencer_core,
indexer_state: indexer_core,
mempool_handle, mempool_handle,
}, },
initial_accounts, initial_accounts,

View File

@ -7,6 +7,8 @@ edition = "2024"
common.workspace = true common.workspace = true
sequencer_core = { workspace = true, features = ["testnet"] } sequencer_core = { workspace = true, features = ["testnet"] }
sequencer_rpc.workspace = true sequencer_rpc.workspace = true
indexer.workspace = true
bedrock_client.workspace = true
clap = { workspace = true, features = ["derive", "env"] } clap = { workspace = true, features = ["derive", "env"] }
anyhow.workspace = true anyhow.workspace = true

View File

@ -2,8 +2,10 @@ use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use actix_web::dev::ServerHandle; use actix_web::dev::ServerHandle;
use anyhow::Result; use anyhow::Result;
use bedrock_client::BasicAuthCredentials;
use clap::Parser; use clap::Parser;
use common::rpc_primitives::RpcConfig; use common::rpc_primitives::RpcConfig;
use indexer::IndexerCore;
use log::info; use log::info;
use sequencer_core::{SequencerCore, config::SequencerConfig}; use sequencer_core::{SequencerCore, config::SequencerConfig};
use sequencer_rpc::new_http_server; use sequencer_rpc::new_http_server;
@ -24,16 +26,33 @@ pub async fn startup_sequencer(
let block_timeout = app_config.block_create_timeout_millis; let block_timeout = app_config.block_create_timeout_millis;
let port = app_config.port; let port = app_config.port;
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config); // ToDo: Maybe make buffer size configurable.
let (sender, receiver) = tokio::sync::mpsc::channel(100);
let indexer_core = IndexerCore::new(
&app_config.bedrock_addr,
Some(BasicAuthCredentials::new(
app_config.bedrock_auth.0.clone(),
Some(app_config.bedrock_auth.1.clone()),
)),
sender,
app_config.indexer_config.clone(),
)?;
info!("Indexer core set up");
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config, receiver);
info!("Sequencer core set up"); info!("Sequencer core set up");
let indexer_core_wrapped = Arc::new(Mutex::new(indexer_core));
let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core)); let seq_core_wrapped = Arc::new(Mutex::new(sequencer_core));
let (http_server, addr) = new_http_server( let (http_server, addr) = new_http_server(
RpcConfig::with_port(port), RpcConfig::with_port(port),
Arc::clone(&seq_core_wrapped), Arc::clone(&seq_core_wrapped),
mempool_handle, mempool_handle,
Arc::clone(&indexer_core_wrapped),
)?; )?;
info!("HTTP server started"); info!("HTTP server started");
let http_server_handle = http_server.handle(); let http_server_handle = http_server.handle();