diff --git a/Cargo.lock b/Cargo.lock index 0450f3d..629ba8b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1715,7 +1715,9 @@ dependencies = [ "serde", "serde_json", "thiserror 2.0.18", + "tracing", "tracing-subscriber 0.3.23", + "uuid", "zeroize", "zxcvbn", ] @@ -7625,6 +7627,7 @@ checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9" dependencies = [ "getrandom 0.4.2", "js-sys", + "serde_core", "wasm-bindgen", ] diff --git a/Cargo.toml b/Cargo.toml index 6b9cafe..f21fd64 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -76,6 +76,14 @@ lb-wallet-http-client = { default-features = false, package = "logo lb-wallet-service = { default-features = false, package = "logos-blockchain-wallet-service", path = "./logos-blockchain/services/wallet" } lb-witness-generator = { default-features = false, package = "logos-blockchain-witness-generator", path = "./logos-blockchain/zk/circuits/witness-generator" } lb-zksign = { default-features = false, package = "logos-blockchain-zksign", path = "./logos-blockchain/zk/proofs/zksign" } +lb-log-targets = { default-features = false, package = "logos-blockchain-log-targets", path = "./logos-blockchain/tracing/targets" } +lb-log-targets-macros = { default-features = false, package = "logos-blockchain-log-targets-macros", path = "./logos-blockchain/tracing/targets/macros" } +const-str = { default-features = false, version = "0.4.3" } +nutype = { default-features = false, version = "0.6.2" } +proc-macro2 = { default-features = false, version = "1" } +quote = { default-features = false, version = "1" } +syn = { features = ["full"], version = "2" } +tonic = { default-features = false, version = "0.14.5" } lb-zone-sdk = { default-features = false, package = "logos-blockchain-zone-sdk", path = "./logos-blockchain/zone-sdk" } lb_network = { default-features = false, package = "logos-blockchain-network-service", path = "./logos-blockchain/services/network" } testing-framework-core = { default-features = false, git = "https://github.com/logos-blockchain/logos-blockchain-testing.git", rev = "f731791" } diff --git a/common/Cargo.toml b/common/Cargo.toml index 45c5f4c..f56a8fa 100644 --- a/common/Cargo.toml +++ b/common/Cargo.toml @@ -25,10 +25,12 @@ nanosql = { features = ["chrono"], version = "0.10.0" } rand = { version = "0.9" } ratatui = { features = ["serde"], version = "0.29" } serde = { features = ["derive"], workspace = true } +uuid = { features = ["serde", "v4"], version = "1" } serde_json = { workspace = true } thiserror = { workspace = true } tracing-subscriber = { features = ["env-filter"], version = "0.3" } zeroize = { features = ["alloc"], workspace = true } +tracing = { workspace = true } [dev-dependencies] rand = { version = "0.9" } diff --git a/indexer/src/indexer.rs b/indexer/src/indexer.rs index 54acf0d..9380339 100644 --- a/indexer/src/indexer.rs +++ b/indexer/src/indexer.rs @@ -59,7 +59,7 @@ impl Indexer { Ok(Self { zone_indexer, db_path: db_path.to_owned() }) } - pub async fn run(&self) { + pub async fn run(self) { let db = match DatabaseReadOnly::open(&self.db_path) { Ok(db) => db, Err(e) => { @@ -82,10 +82,11 @@ impl Indexer { futures::pin_mut!(stream); while let Some(zone_msg) = stream.next().await { - let logos_blockchain_zone_sdk::ZoneMessage::Block(zone_block) = zone_msg else { - continue; + let data = match zone_msg { + logos_blockchain_zone_sdk::ZoneMessage::Block(block) => block.data, + logos_blockchain_zone_sdk::ZoneMessage::Deposit(_) => continue, }; - let sql_text = match String::from_utf8(zone_block.data) { + let sql_text = match String::from_utf8(data) { Ok(s) => s, Err(e) => { error!("Zone block data is not valid UTF-8: {e}"); @@ -95,8 +96,8 @@ impl Indexer { let statements: Vec<&str> = sql_text .lines() - .map(|l| l.trim().trim_end_matches(';').trim()) - .filter(|s| !s.is_empty()) + .map(|l: &str| l.trim().trim_end_matches(';').trim()) + .filter(|s: &&str| !s.is_empty()) .collect(); if statements.is_empty() { diff --git a/indexer/src/lib.rs b/indexer/src/lib.rs index 8d155c3..df2cd64 100644 --- a/indexer/src/lib.rs +++ b/indexer/src/lib.rs @@ -95,12 +95,9 @@ pub async fn run(args: IndexerArgs) -> Result<()> { }; info!("Indexer ready"); + let handle = tokio::runtime::Handle::current(); std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .expect("failed to build tokio runtime for indexer"); - rt.block_on(indexer.run()); + handle.block_on(indexer.run()); }); info!("Background indexer started"); diff --git a/sequencer/src/lib.rs b/sequencer/src/lib.rs index 7460377..7f988ed 100644 --- a/sequencer/src/lib.rs +++ b/sequencer/src/lib.rs @@ -7,8 +7,6 @@ pub use demo_sqlite_common::{config, crypto, error, screen}; mod tui; -use std::sync::Arc; - use clap::Parser; use demo_sqlite_common::logging::RawModeWriter; use sequencer::Sequencer; @@ -90,7 +88,6 @@ impl App { } } -#[expect(clippy::unused_async)] pub async fn run(args: SequencerArgs) -> Result<()> { tracing_subscriber::registry() .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) @@ -110,8 +107,8 @@ pub async fn run(args: SequencerArgs) -> Result<()> { &args.queue_file, &args.checkpoint_path, &args.channel_path, - ) { - Ok(s) => Arc::new(s), + ).await { + Ok(s) => s, Err(e) => { error!("Sequencer initialization failed: {e}"); std::process::exit(1); @@ -119,9 +116,8 @@ pub async fn run(args: SequencerArgs) -> Result<()> { }; info!("Sequencer ready"); - let sequencer_clone = Arc::clone(&sequencer); tokio::spawn(async move { - sequencer_clone.run_processing_loop().await; + sequencer.run_processing_loop().await; }); info!("Background processor started"); diff --git a/sequencer/src/sequencer.rs b/sequencer/src/sequencer.rs index 1da72a8..cd458f6 100644 --- a/sequencer/src/sequencer.rs +++ b/sequencer/src/sequencer.rs @@ -13,7 +13,7 @@ use lb_core::mantle::ops::channel::ChannelId; use lb_key_management_system_service::keys::{ED25519_SECRET_KEY_SIZE, Ed25519Key}; use logos_blockchain_zone_sdk::adapter::NodeHttpClient; use logos_blockchain_zone_sdk::sequencer::{ - Error as ZoneSequencerError, SequencerCheckpoint, SequencerHandle, ZoneSequencer, + Error as ZoneSequencerError, SequencerHandle, ZoneSequencer, SequencerCheckpoint, }; use nanosql::rusqlite::Error as SqliteError; use reqwest::Url; @@ -41,34 +41,11 @@ pub enum SequencerError { pub type Result = std::result::Result; -/// The sequencer that handles transactions using the Zone SDK +/// The sequencer that handles transactions using the Zone SDK. pub struct Sequencer { handle: SequencerHandle, queue_file: String, - checkpoint_path: String, -} - -/// Load signing key from file or generate a new one if it doesn't exist -fn load_or_create_signing_key(path: &Path) -> Result { - if path.exists() { - debug!("Loading existing signing key from {:?}", path); - let key_bytes = fs::read(path)?; - if key_bytes.len() != ED25519_SECRET_KEY_SIZE { - return Err(SequencerError::InvalidKeyFile { - expected: ED25519_SECRET_KEY_SIZE, - actual: key_bytes.len(), - }); - } - let key_array: [u8; ED25519_SECRET_KEY_SIZE] = - key_bytes.try_into().expect("length already checked"); - Ok(Ed25519Key::from_bytes(&key_array)) - } else { - debug!("Generating new signing key and saving to {:?}", path); - let mut key_bytes = [0u8; ED25519_SECRET_KEY_SIZE]; - rand::RngCore::fill_bytes(&mut rand::rng(), &mut key_bytes); - fs::write(path, key_bytes)?; - Ok(Ed25519Key::from_bytes(&key_bytes)) - } + pub checkpoint_path: String, } fn save_checkpoint(path: &Path, checkpoint: &SequencerCheckpoint) { @@ -84,8 +61,29 @@ fn load_checkpoint(path: &Path) -> Option { Some(serde_json::from_slice(&data).expect("failed to deserialize checkpoint")) } +/// Load signing key from file or generate a new one if it doesn't exist. +fn load_or_create_signing_key(path: &Path) -> Ed25519Key { + if path.exists() { + let key_bytes = fs::read(path).expect("failed to read key file"); + assert!( + key_bytes.len() == ED25519_SECRET_KEY_SIZE, + "invalid key file: expected {} bytes, got {}", + ED25519_SECRET_KEY_SIZE, + key_bytes.len() + ); + let key_array: [u8; ED25519_SECRET_KEY_SIZE] = + key_bytes.try_into().expect("length already checked"); + Ed25519Key::from_bytes(&key_array) + } else { + let mut key_bytes = [0u8; ED25519_SECRET_KEY_SIZE]; + rand::RngCore::fill_bytes(&mut rand::rng(), &mut key_bytes); + fs::write(path, key_bytes).expect("failed to write key file"); + Ed25519Key::from_bytes(&key_bytes) + } +} + impl Sequencer { - pub fn new( + pub async fn new( node_endpoint: &str, signing_key_path: &str, node_auth_username: Option, @@ -96,8 +94,6 @@ impl Sequencer { ) -> Result { let node_url = Url::parse(node_endpoint).map_err(|e| SequencerError::Url(e.to_string()))?; - info!("{}", node_url.clone().to_string()); - let basic_auth = node_auth_username .map(|username| BasicAuthCredentials::new(username, node_auth_password)); @@ -112,15 +108,21 @@ impl Sequencer { println!(" Restored checkpoint from {checkpoint_path}"); } - let signing_key = load_or_create_signing_key(Path::new(signing_key_path))?; + let signing_key = load_or_create_signing_key(Path::new(signing_key_path)); let channel_id = ChannelId::from(signing_key.public_key().to_bytes()); fs::write(channel_path, hex::encode(channel_id.as_ref())) .expect("failed to write channel id"); let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), node_url); - let (zone_sequencer, handle) = ZoneSequencer::init(channel_id, signing_key, node, checkpoint); + let (zone_sequencer, mut handle) = + ZoneSequencer::init(channel_id, signing_key, node, checkpoint); + zone_sequencer.spawn(); + println!("Connecting to node..."); + handle.wait_ready().await; + println!("Sequencer ready."); + Ok(Self { handle, queue_file: queue_file.to_owned(), @@ -128,7 +130,7 @@ impl Sequencer { }) } - /// Drain the queue file and return all pending queries + /// Drain the queue file and return all pending SQL statements. fn queue_drain(&self) -> Result> { let file = OpenOptions::new() .read(true) @@ -148,7 +150,7 @@ impl Sequencer { Ok(queue_vec) } - /// Process all pending queries as a single inscription + /// Bundle all pending SQL statements into one inscription and publish it. async fn process_pending_batch(&self) -> Result<()> { let pending = self.queue_drain()?; if pending.is_empty() { @@ -158,20 +160,22 @@ impl Sequencer { let count = pending.len(); debug!("Processing batch of {} queries", count); - let data = pending.join("\n").into_bytes(); - let result = self.handle.publish_message(data).await?; - - info!( - "Inscription published with tx_hash: {:?}", - result.inscription_id - ); - - save_checkpoint(Path::new(&self.checkpoint_path), &result.checkpoint); + let sql_text = pending.join("\n").as_bytes().to_vec(); + + match self.handle.publish_message(sql_text).await { + Ok(result) => { + info!("Submitted batch of {} statement(s)", count); + save_checkpoint(Path::new(&self.checkpoint_path), &result.checkpoint); + } + Err(e) => { + println!(" error: {e}"); + } + } Ok(()) } - /// Check if the queue file is empty + /// Check if the queue file is empty. pub fn queue_is_empty(&self) -> Result { match fs::metadata(self.queue_file.clone()) { Ok(meta) => Ok(meta.len() == 0), @@ -180,7 +184,7 @@ impl Sequencer { } } - /// Background processing loop - call this in a spawned task + /// Background processing loop — call this in a spawned task. pub async fn run_processing_loop(&self) { let poll_interval = Duration::from_millis(100);