From aa6da018ebfe86c120ee3058a91d362c863dc8ae Mon Sep 17 00:00:00 2001 From: Pravdyvy Date: Mon, 12 Jan 2026 15:51:24 +0200 Subject: [PATCH] feat: indexer crate reding and parsing --- Cargo.lock | 1 + Cargo.toml | 1 + indexer/Cargo.toml | 3 +- indexer/src/config.rs | 7 ++++ indexer/src/lib.rs | 91 ++++++++++++++++++++++++++++++++++++++++--- 5 files changed, 96 insertions(+), 7 deletions(-) create mode 100644 indexer/src/config.rs diff --git a/Cargo.lock b/Cargo.lock index ce029fb0..326cc356 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2884,6 +2884,7 @@ dependencies = [ "itertools 0.14.0", "key_protocol", "log", + "nomos-core", "nssa", "nssa_core", "rand 0.8.5", diff --git a/Cargo.toml b/Cargo.toml index 83fdb493..35727ff5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -83,6 +83,7 @@ itertools = "0.14.0" url = "2.5.4" common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git", branch = "marbella-offsite-2025-12" } +nomos-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git", branch = "marbella-offsite-2025-12" } rocksdb = { version = "0.24.0", default-features = false, features = [ "snappy", diff --git a/indexer/Cargo.toml b/indexer/Cargo.toml index 7984c944..2025a3b1 100644 --- a/indexer/Cargo.toml +++ b/indexer/Cargo.toml @@ -29,4 +29,5 @@ futures.workspace = true async-stream = "0.3.6" indicatif = { version = "0.18.3", features = ["improved_unicode"] } risc0-zkvm.workspace = true -url.workspace = true \ No newline at end of file +url.workspace = true +nomos-core.workspace = true \ No newline at end of file diff --git a/indexer/src/config.rs b/indexer/src/config.rs new file mode 100644 index 00000000..8818d15e --- /dev/null +++ b/indexer/src/config.rs @@ -0,0 +1,7 @@ +use nomos_core::mantle::ops::channel::ChannelId; + +#[derive(Debug)] +pub struct IndexerConfig { + pub resubscribe_interval: u64, + pub channel_id: ChannelId, +} diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 904ab816..1474f85c 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -1,14 +1,93 @@ -use bedrock_client::BedrockClient; -use futures::Stream; +use anyhow::Result; +use bedrock_client::{BasicAuthCredentials, BedrockClient}; +use common::block::HashableBlockData; +use futures::StreamExt; +use nomos_core::mantle::{ + Op, SignedMantleTx, + ops::channel::{ChannelId, inscribe::InscriptionOp}, +}; +use tokio::sync::mpsc::Sender; use url::Url; +use crate::config::IndexerConfig; + +pub mod config; + pub struct IndexerCore { - pub bedrock_client: BedrockClient, + pub bedrock_client: BedrockClient, pub bedrock_url: Url, + pub channel_sender: Sender, + pub config: IndexerConfig, } impl IndexerCore { - pub async fn subscribe_block_stream(&self) -> Result, bedrock_client::Error> { - self.bedrock_client.0.get_lib_stream(self.bedrock_url).await + pub fn new( + addr: &str, + auth: Option, + sender: Sender, + config: IndexerConfig, + ) -> Result { + Ok(Self { + bedrock_client: BedrockClient::new(auth)?, + bedrock_url: Url::parse(addr)?, + channel_sender: sender, + config, + }) } -} \ No newline at end of file + + pub async fn subscribe_parse_block_stream(&self) -> Result<()> { + let mut stream_pinned = Box::pin( + self.bedrock_client + .0 + .get_lib_stream(self.bedrock_url.clone()) + .await?, + ); + + while let Some(block_info) = stream_pinned.next().await { + let header_id = block_info.header_id; + + if let Some(l1_block) = self + .bedrock_client + .0 + .get_block_by_id(self.bedrock_url.clone(), header_id) + .await? + { + let l2_blocks_parsed = parse_blocks( + l1_block.into_transactions().into_iter(), + &self.config.channel_id, + ); + + for l2_block in l2_blocks_parsed { + self.channel_sender.send(l2_block).await?; + } + } + } + + Ok(()) + } +} + +pub fn parse_blocks( + block_txs: impl Iterator, + decoded_channel_id: &ChannelId, +) -> Vec { + block_txs + .flat_map(|tx| { + tx.mantle_tx + .ops + .iter() + .filter_map(|op| match op { + Op::ChannelInscribe(InscriptionOp { + channel_id, + inscription, + .. + }) if channel_id == decoded_channel_id => { + // Assuming, that it is how block will be inscribed on l1 + borsh::from_slice::(inscription).ok() + } + _ => None, + }) + .collect::>() + }) + .collect() +}