mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-02-18 20:33:13 +00:00
feat: updated block streaming
This commit is contained in:
parent
c420575673
commit
b6d7f1ecbd
@ -1,6 +1,7 @@
|
||||
use std::{path::Path, sync::Arc};
|
||||
|
||||
use anyhow::Result;
|
||||
use bedrock_client::HeaderId;
|
||||
use common::{
|
||||
block::Block,
|
||||
transaction::{NSSATransaction, execute_check_transaction_on_state, transaction_pre_check},
|
||||
@ -34,6 +35,13 @@ impl IndexerStore {
|
||||
Self::open_db_with_genesis(location, None)
|
||||
}
|
||||
|
||||
pub fn last_observed_l1_header(&self) -> Result<Option<HeaderId>> {
|
||||
Ok(self
|
||||
.dbio
|
||||
.get_meta_last_observed_l1_block_in_db()?
|
||||
.map(|raw_id| HeaderId::from(raw_id)))
|
||||
}
|
||||
|
||||
pub fn get_last_block_id(&self) -> Result<u64> {
|
||||
Ok(self.dbio.get_meta_last_block_in_db()?)
|
||||
}
|
||||
@ -95,7 +103,7 @@ impl IndexerStore {
|
||||
Ok(self.final_state()?.get_account_by_id(*account_id))
|
||||
}
|
||||
|
||||
pub fn put_block(&self, block: Block) -> Result<()> {
|
||||
pub fn put_block(&self, block: Block, l1_header: HeaderId) -> Result<()> {
|
||||
let mut final_state = self.dbio.final_state()?;
|
||||
|
||||
for transaction in &block.body.transactions {
|
||||
@ -105,6 +113,6 @@ impl IndexerStore {
|
||||
)?;
|
||||
}
|
||||
|
||||
Ok(self.dbio.put_block(block)?)
|
||||
Ok(self.dbio.put_block(block, l1_header.into())?)
|
||||
}
|
||||
}
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
use std::collections::VecDeque;
|
||||
|
||||
use anyhow::Result;
|
||||
use bedrock_client::{BedrockClient, HeaderId};
|
||||
use common::block::{Block, HashableBlockData};
|
||||
@ -78,22 +80,51 @@ impl IndexerCore {
|
||||
|
||||
pub async fn subscribe_parse_block_stream(&self) -> impl futures::Stream<Item = Result<Block>> {
|
||||
async_stream::stream! {
|
||||
loop {
|
||||
let mut stream_pinned = Box::pin(self.bedrock_client.get_lib_stream().await?);
|
||||
let last_l1_header = self.store.last_observed_l1_header()?;
|
||||
|
||||
info!("Block stream joined");
|
||||
let mut last_fin_header = match last_l1_header {
|
||||
Some(last_l1_header) => {
|
||||
last_l1_header
|
||||
},
|
||||
None => {
|
||||
info!("Searching for the start of a channel");
|
||||
|
||||
while let Some(block_info) = stream_pinned.next().await {
|
||||
let header_id = block_info.header_id;
|
||||
let start_buff = self.search_for_channel_start().await?;
|
||||
|
||||
info!("Observed L1 block at height {}", block_info.height);
|
||||
let last_l1_header = start_buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id();
|
||||
|
||||
if let Some(l1_block) = self
|
||||
.bedrock_client
|
||||
.get_block_by_id(header_id)
|
||||
.await?
|
||||
{
|
||||
info!("Extracted L1 block at height {}", block_info.height);
|
||||
for l1_block in start_buff {
|
||||
info!("Observed L1 block at height {}", l1_block.header().slot().into_inner());
|
||||
|
||||
let curr_l1_header = l1_block.header().id();
|
||||
|
||||
let l2_blocks_parsed = parse_blocks(
|
||||
l1_block.into_transactions().into_iter(),
|
||||
&self.config.channel_id,
|
||||
).collect::<Vec<_>>();
|
||||
|
||||
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
|
||||
|
||||
for l2_block in l2_blocks_parsed {
|
||||
self.store.put_block(l2_block.clone(), curr_l1_header)?;
|
||||
|
||||
yield Ok(l2_block);
|
||||
}
|
||||
}
|
||||
|
||||
last_l1_header
|
||||
},
|
||||
};
|
||||
|
||||
loop {
|
||||
let buff = self.rollback_to_last_known_finalized_l1_id(last_fin_header).await?;
|
||||
|
||||
last_fin_header = buff.back().ok_or(anyhow::anyhow!("Failure: Chain is empty"))?.header().id();
|
||||
|
||||
for l1_block in buff {
|
||||
info!("Observed L1 block at height {}", l1_block.header().slot().into_inner());
|
||||
|
||||
let curr_l1_header = l1_block.header().id();
|
||||
|
||||
let l2_blocks_parsed = parse_blocks(
|
||||
l1_block.into_transactions().into_iter(),
|
||||
@ -103,19 +134,12 @@ impl IndexerCore {
|
||||
info!("Parsed {} L2 blocks", l2_blocks_parsed.len());
|
||||
|
||||
for l2_block in l2_blocks_parsed {
|
||||
self.store.put_block(l2_block.clone())?;
|
||||
self.store.put_block(l2_block.clone(), curr_l1_header)?;
|
||||
|
||||
yield Ok(l2_block);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Refetch stream after delay
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
self.config.resubscribe_interval_millis,
|
||||
))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -130,11 +154,13 @@ impl IndexerCore {
|
||||
|
||||
pub async fn search_for_channel_start(
|
||||
&self,
|
||||
channel_id_to_search: &ChannelId,
|
||||
) -> Result<HeaderId> {
|
||||
) -> Result<VecDeque<bedrock_client::Block<SignedMantleTx>>> {
|
||||
let mut curr_last_header = self.wait_last_finalized_block_header().await?;
|
||||
// ToDo: Not scalable, initial buffer should be stored in DB to not run out of memory
|
||||
// Don't want to complicate DB even more right now.
|
||||
let mut initial_block_buffer = VecDeque::new();
|
||||
|
||||
let first_header = loop {
|
||||
loop {
|
||||
let Some(curr_last_block) = self
|
||||
.bedrock_client
|
||||
.get_block_by_id(curr_last_header)
|
||||
@ -143,12 +169,14 @@ impl IndexerCore {
|
||||
return Err(anyhow::anyhow!("Chain inconsistency"));
|
||||
};
|
||||
|
||||
if let Some(search_res) = curr_last_block.transactions().find_map(|tx| {
|
||||
initial_block_buffer.push_front(curr_last_block.clone());
|
||||
|
||||
if let Some(_) = curr_last_block.transactions().find_map(|tx| {
|
||||
tx.mantle_tx.ops.iter().find_map(|op| match op {
|
||||
Op::ChannelInscribe(InscriptionOp {
|
||||
channel_id, parent, ..
|
||||
}) => {
|
||||
if (channel_id == channel_id_to_search) && (parent == &MsgId::root()) {
|
||||
if (channel_id == &self.config.channel_id) && (parent == &MsgId::root()) {
|
||||
Some(curr_last_block.header().id())
|
||||
} else {
|
||||
None
|
||||
@ -157,13 +185,45 @@ impl IndexerCore {
|
||||
_ => None,
|
||||
})
|
||||
}) {
|
||||
break search_res;
|
||||
break;
|
||||
} else {
|
||||
// Step back to parent
|
||||
curr_last_header = curr_last_block.header().parent();
|
||||
};
|
||||
};
|
||||
}
|
||||
|
||||
Ok(first_header)
|
||||
Ok(initial_block_buffer)
|
||||
}
|
||||
|
||||
pub async fn rollback_to_last_known_finalized_l1_id(
|
||||
&self,
|
||||
last_fin_header: HeaderId,
|
||||
) -> Result<VecDeque<bedrock_client::Block<SignedMantleTx>>> {
|
||||
let mut curr_last_header = self.wait_last_finalized_block_header().await?;
|
||||
// ToDo: Not scalable, buffer should be stored in DB to not run out of memory
|
||||
// Don't want to complicate DB even more right now.
|
||||
let mut block_buffer = VecDeque::new();
|
||||
|
||||
loop {
|
||||
let Some(curr_last_block) = self
|
||||
.bedrock_client
|
||||
.get_block_by_id(curr_last_header)
|
||||
.await?
|
||||
else {
|
||||
return Err(anyhow::anyhow!("Chain inconsistency"));
|
||||
};
|
||||
|
||||
if curr_last_block.header().id() == last_fin_header {
|
||||
break;
|
||||
} else {
|
||||
// Step back to parent
|
||||
curr_last_header = curr_last_block.header().parent();
|
||||
}
|
||||
|
||||
block_buffer.push_front(curr_last_block.clone());
|
||||
}
|
||||
|
||||
Ok(block_buffer)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -27,6 +27,8 @@ pub const CACHE_SIZE: usize = 1000;
|
||||
pub const DB_META_FIRST_BLOCK_IN_DB_KEY: &str = "first_block_in_db";
|
||||
/// Key base for storing metainformation about id of last current block in db
|
||||
pub const DB_META_LAST_BLOCK_IN_DB_KEY: &str = "last_block_in_db";
|
||||
/// Key base for storing metainformation about id of last observed L1 block in db
|
||||
pub const DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY: &str = "last_observed_l1_block_in_db";
|
||||
/// Key base for storing metainformation which describe if first block has been set
|
||||
pub const DB_META_FIRST_BLOCK_SET_KEY: &str = "first_block_set";
|
||||
/// Key base for storing metainformation about the last breakpoint
|
||||
@ -217,6 +219,35 @@ impl RocksDBIO {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_meta_last_observed_l1_block_in_db(&self) -> DbResult<Option<[u8; 32]>> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
.db
|
||||
.get_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some(
|
||||
"Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY"
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
res.map(|data| {
|
||||
borsh::from_slice::<[u8; 32]>(&data).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to deserialize last l1 header".to_string()),
|
||||
)
|
||||
})
|
||||
})
|
||||
.transpose()
|
||||
}
|
||||
|
||||
pub fn get_meta_is_first_block_set(&self) -> DbResult<bool> {
|
||||
let cf_meta = self.meta_column();
|
||||
let res = self
|
||||
@ -284,7 +315,7 @@ impl RocksDBIO {
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
|
||||
self.put_block(block)?;
|
||||
self.put_block(block, [0; 32])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@ -310,6 +341,31 @@ impl RocksDBIO {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_last_observed_l1_block_in_db(&self, l1_block_header: [u8; 32]) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
.put_cf(
|
||||
&cf_meta,
|
||||
borsh::to_vec(&DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some(
|
||||
"Failed to serialize DB_META_LAST_OBSERVED_L1_BLOCK_IN_DB_KEY"
|
||||
.to_string(),
|
||||
),
|
||||
)
|
||||
})?,
|
||||
borsh::to_vec(&l1_block_header).map_err(|err| {
|
||||
DbError::borsh_cast_message(
|
||||
err,
|
||||
Some("Failed to serialize last l1 block header".to_string()),
|
||||
)
|
||||
})?,
|
||||
)
|
||||
.map_err(|rerr| DbError::rocksdb_cast_message(rerr, None))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn put_meta_last_breakpoint_id(&self, br_id: u64) -> DbResult<()> {
|
||||
let cf_meta = self.meta_column();
|
||||
self.db
|
||||
@ -351,7 +407,7 @@ impl RocksDBIO {
|
||||
|
||||
// Block
|
||||
|
||||
pub fn put_block(&self, block: Block) -> DbResult<()> {
|
||||
pub fn put_block(&self, block: Block, l1_block_header: [u8; 32]) -> DbResult<()> {
|
||||
let cf_block = self.block_column();
|
||||
let cf_hti = self.hash_to_id_column();
|
||||
let cf_tti: Arc<BoundColumnFamily<'_>> = self.tx_hash_to_id_column();
|
||||
@ -380,6 +436,7 @@ impl RocksDBIO {
|
||||
|
||||
if block.header.block_id > last_curr_block {
|
||||
self.put_meta_last_block_in_db(block.header.block_id)?;
|
||||
self.put_meta_last_observed_l1_block_in_db(l1_block_header)?;
|
||||
}
|
||||
|
||||
self.db
|
||||
@ -957,7 +1014,7 @@ mod tests {
|
||||
let transfer_tx = transfer(1, 0, true);
|
||||
let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]);
|
||||
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [1; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let first_id = dbio.get_meta_first_block_in_db().unwrap();
|
||||
@ -1000,7 +1057,7 @@ mod tests {
|
||||
let transfer_tx = transfer(1, (i - 1) as u128, true);
|
||||
let block =
|
||||
common::test_utils::produce_dummy_block(i + 1, Some(prev_hash), vec![transfer_tx]);
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [i as u8; 32]).unwrap();
|
||||
}
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
@ -1054,7 +1111,7 @@ mod tests {
|
||||
|
||||
let control_hash1 = block.header.hash;
|
||||
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [1; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1065,7 +1122,7 @@ mod tests {
|
||||
|
||||
let control_hash2 = block.header.hash;
|
||||
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [2; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1076,7 +1133,7 @@ mod tests {
|
||||
let control_tx_hash1 = transfer_tx.hash();
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]);
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [3; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1087,7 +1144,7 @@ mod tests {
|
||||
let control_tx_hash2 = transfer_tx.hash();
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]);
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [4; 32]).unwrap();
|
||||
|
||||
let control_block_id1 = dbio.get_block_id_by_hash(control_hash1.0).unwrap();
|
||||
let control_block_id2 = dbio.get_block_id_by_hash(control_hash2.0).unwrap();
|
||||
@ -1118,7 +1175,7 @@ mod tests {
|
||||
let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]);
|
||||
|
||||
block_res.push(block.clone());
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [1; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1128,7 +1185,7 @@ mod tests {
|
||||
let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]);
|
||||
|
||||
block_res.push(block.clone());
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [2; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1138,7 +1195,7 @@ mod tests {
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]);
|
||||
block_res.push(block.clone());
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [3; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1148,7 +1205,7 @@ mod tests {
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]);
|
||||
block_res.push(block.clone());
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [4; 32]).unwrap();
|
||||
|
||||
let block_hashes_mem: Vec<[u8; 32]> =
|
||||
block_res.into_iter().map(|bl| bl.header.hash.0).collect();
|
||||
@ -1192,7 +1249,7 @@ mod tests {
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(2, Some(prev_hash), vec![transfer_tx]);
|
||||
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [1; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1204,7 +1261,7 @@ mod tests {
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(3, Some(prev_hash), vec![transfer_tx]);
|
||||
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [2; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1216,7 +1273,7 @@ mod tests {
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(4, Some(prev_hash), vec![transfer_tx]);
|
||||
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [3; 32]).unwrap();
|
||||
|
||||
let last_id = dbio.get_meta_last_block_in_db().unwrap();
|
||||
let last_block = dbio.get_block(last_id).unwrap();
|
||||
@ -1228,7 +1285,7 @@ mod tests {
|
||||
|
||||
let block = common::test_utils::produce_dummy_block(5, Some(prev_hash), vec![transfer_tx]);
|
||||
|
||||
dbio.put_block(block).unwrap();
|
||||
dbio.put_block(block, [4; 32]).unwrap();
|
||||
|
||||
let acc1_tx = dbio.get_acc_transactions(*acc1().value(), 0, 4).unwrap();
|
||||
let acc1_tx_hashes: Vec<[u8; 32]> = acc1_tx.into_iter().map(|tx| tx.hash().0).collect();
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user