feat: indexer crate reding and parsing

This commit is contained in:
Pravdyvy 2026-01-12 15:51:24 +02:00
parent 324eec24df
commit aa6da018eb
5 changed files with 96 additions and 7 deletions

1
Cargo.lock generated
View File

@ -2884,6 +2884,7 @@ dependencies = [
"itertools 0.14.0",
"key_protocol",
"log",
"nomos-core",
"nssa",
"nssa_core",
"rand 0.8.5",

View File

@ -83,6 +83,7 @@ itertools = "0.14.0"
url = "2.5.4"
common-http-client = { git = "https://github.com/logos-blockchain/logos-blockchain.git", branch = "marbella-offsite-2025-12" }
nomos-core = { git = "https://github.com/logos-blockchain/logos-blockchain.git", branch = "marbella-offsite-2025-12" }
rocksdb = { version = "0.24.0", default-features = false, features = [
"snappy",

View File

@ -29,4 +29,5 @@ futures.workspace = true
async-stream = "0.3.6"
indicatif = { version = "0.18.3", features = ["improved_unicode"] }
risc0-zkvm.workspace = true
url.workspace = true
url.workspace = true
nomos-core.workspace = true

7
indexer/src/config.rs Normal file
View File

@ -0,0 +1,7 @@
use nomos_core::mantle::ops::channel::ChannelId;
#[derive(Debug)]
pub struct IndexerConfig {
pub resubscribe_interval: u64,
pub channel_id: ChannelId,
}

View File

@ -1,14 +1,93 @@
use bedrock_client::BedrockClient;
use futures::Stream;
use anyhow::Result;
use bedrock_client::{BasicAuthCredentials, BedrockClient};
use common::block::HashableBlockData;
use futures::StreamExt;
use nomos_core::mantle::{
Op, SignedMantleTx,
ops::channel::{ChannelId, inscribe::InscriptionOp},
};
use tokio::sync::mpsc::Sender;
use url::Url;
use crate::config::IndexerConfig;
pub mod config;
pub struct IndexerCore {
pub bedrock_client: BedrockClient,
pub bedrock_client: BedrockClient,
pub bedrock_url: Url,
pub channel_sender: Sender<HashableBlockData>,
pub config: IndexerConfig,
}
impl IndexerCore {
pub async fn subscribe_block_stream(&self) -> Result<impl Stream<Item = BlockInfo>, bedrock_client::Error> {
self.bedrock_client.0.get_lib_stream(self.bedrock_url).await
pub fn new(
addr: &str,
auth: Option<BasicAuthCredentials>,
sender: Sender<HashableBlockData>,
config: IndexerConfig,
) -> Result<Self> {
Ok(Self {
bedrock_client: BedrockClient::new(auth)?,
bedrock_url: Url::parse(addr)?,
channel_sender: sender,
config,
})
}
}
pub async fn subscribe_parse_block_stream(&self) -> Result<()> {
let mut stream_pinned = Box::pin(
self.bedrock_client
.0
.get_lib_stream(self.bedrock_url.clone())
.await?,
);
while let Some(block_info) = stream_pinned.next().await {
let header_id = block_info.header_id;
if let Some(l1_block) = self
.bedrock_client
.0
.get_block_by_id(self.bedrock_url.clone(), header_id)
.await?
{
let l2_blocks_parsed = parse_blocks(
l1_block.into_transactions().into_iter(),
&self.config.channel_id,
);
for l2_block in l2_blocks_parsed {
self.channel_sender.send(l2_block).await?;
}
}
}
Ok(())
}
}
pub fn parse_blocks(
block_txs: impl Iterator<Item = SignedMantleTx>,
decoded_channel_id: &ChannelId,
) -> Vec<HashableBlockData> {
block_txs
.flat_map(|tx| {
tx.mantle_tx
.ops
.iter()
.filter_map(|op| match op {
Op::ChannelInscribe(InscriptionOp {
channel_id,
inscription,
..
}) if channel_id == decoded_channel_id => {
// Assuming, that it is how block will be inscribed on l1
borsh::from_slice::<HashableBlockData>(inscription).ok()
}
_ => None,
})
.collect::<Vec<_>>()
})
.collect()
}