diff --git a/indexer/src/indexer.rs b/indexer/src/indexer.rs index 1150af5..d4f71d4 100644 --- a/indexer/src/indexer.rs +++ b/indexer/src/indexer.rs @@ -12,109 +12,4 @@ use crate::{db::DatabaseReadOnly, error::Error}; pub type Result = std::result::Result; -pub struct Indexer { - zone_indexer: ZoneIndexer, - db_path: String, -} - -fn parse_channel_id(channel_id_str: &str) -> Result { - let decoded = hex::decode(channel_id_str).map_err(|_| { - Error::InvalidChannelId(format!( - "INDEXER_CHANNEL_ID must be a valid hex string, got: '{channel_id_str}'" - )) - })?; - let channel_bytes: [u8; 32] = decoded.try_into().map_err(|v: Vec| { - Error::InvalidChannelId(format!( - "INDEXER_CHANNEL_ID must be exactly 64 hex characters (32 bytes), got {} characters ({} bytes)", - v.len() * 2, - v.len() - )) - })?; - Ok(ChannelId::from(channel_bytes)) -} - -impl Indexer { - pub fn new( - db_path: &str, - node_endpoint: &str, - channel_path: &str, - node_auth_username: Option, - node_auth_password: Option, - ) -> Result { - let node_url = Url::parse(node_endpoint).map_err(|e| Error::Url(e.to_string()))?; - - let basic_auth = node_auth_username - .map(|username| BasicAuthCredentials::new(username, node_auth_password)); - - let channel_id_str = fs::read_to_string(channel_path).map_err(|e| { - Error::InvalidChannelId(format!("Failed to read channel path '{channel_path}': {e}")) - })?; - let channel_id = parse_channel_id(channel_id_str.trim())?; - - info!("Channel ID: {}", hex::encode(channel_id.as_ref())); - - let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), node_url); - let zone_indexer = ZoneIndexer::new(channel_id, node); - - Ok(Self { zone_indexer, db_path: db_path.to_owned() }) - } - - pub async fn run(self) { - let db = match DatabaseReadOnly::open(&self.db_path) { - Ok(db) => db, - Err(e) => { - error!("Failed to open database: {e}"); - return; - } - }; - - loop { - info!("Connecting to zone block stream..."); - let stream = match self.zone_indexer.follow().await { - Ok(s) => s, - Err(e) => { - error!("Failed to connect to block stream: {e}"); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - continue; - } - }; - info!("Connected to zone block stream"); - - futures::pin_mut!(stream); - while let Some(zone_msg) = stream.next().await { - let logos_blockchain_zone_sdk::ZoneMessage::Block(zone_block) = zone_msg else { - continue; - }; - let sql_text = match String::from_utf8(zone_block.data) { - Ok(s) => s, - Err(e) => { - error!("Zone block data is not valid UTF-8: {e}"); - continue; - } - }; - - let statements: Vec<&str> = sql_text - .lines() - .map(|l: &str| l.trim().trim_end_matches(';').trim()) - .filter(|s: &&str| !s.is_empty()) - .collect(); - - if statements.is_empty() { - continue; - } - - info!("Applying {} SQL statement(s)", statements.len()); - - for stmt in &statements { - if let Err(e) = db.execute_batch(stmt) { - error!("Failed to execute SQL '{}': {e}", stmt); - } - } - info!("Applied {} statement(s)", statements.len()); - } - - error!("Zone block stream ended, reconnecting..."); - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - } - } -} +// Your Code Here diff --git a/sequencer/src/sequencer.rs b/sequencer/src/sequencer.rs index 5341488..abcd3ee 100644 --- a/sequencer/src/sequencer.rs +++ b/sequencer/src/sequencer.rs @@ -30,185 +30,4 @@ pub enum SequencerError { pub type Result = std::result::Result; -pub struct Sequencer { - sequencer: ZoneSequencer, - handle: SequencerHandle, - state: InMemoryZoneState, - queue_file: String, - checkpoint_path: String, -} - -fn save_checkpoint(path: &Path, checkpoint: &SequencerCheckpoint) { - let data = serde_json::to_vec(checkpoint).expect("failed to serialize checkpoint"); - fs::write(path, data).expect("failed to write checkpoint file"); -} - -fn load_checkpoint(path: &Path) -> Option { - if !path.exists() { - return None; - } - let data = fs::read(path).expect("failed to read checkpoint file"); - Some(serde_json::from_slice(&data).expect("failed to deserialize checkpoint")) -} - -fn load_or_create_signing_key(path: &Path) -> Ed25519Key { - if path.exists() { - let key_bytes = fs::read(path).expect("failed to read key file"); - assert!( - key_bytes.len() == ED25519_SECRET_KEY_SIZE, - "invalid key file: expected {} bytes, got {}", - ED25519_SECRET_KEY_SIZE, - key_bytes.len() - ); - let key_array: [u8; ED25519_SECRET_KEY_SIZE] = - key_bytes.try_into().expect("length already checked"); - Ed25519Key::from_bytes(&key_array) - } else { - let mut key_bytes = [0u8; ED25519_SECRET_KEY_SIZE]; - rand::RngCore::fill_bytes(&mut rand::rng(), &mut key_bytes); - fs::write(path, key_bytes).expect("failed to write key file"); - Ed25519Key::from_bytes(&key_bytes) - } -} - -impl Sequencer { - pub async fn new( - node_endpoint: &str, - signing_key_path: &str, - node_auth_username: Option, - node_auth_password: Option, - queue_file: &str, - checkpoint_path: &str, - channel_path: &str, - ) -> Result { - let node_url = Url::parse(node_endpoint).map_err(|e| SequencerError::Url(e.to_string()))?; - - let basic_auth = node_auth_username - .map(|username| BasicAuthCredentials::new(username, node_auth_password)); - - for path in [signing_key_path, checkpoint_path, channel_path] { - if let Some(parent) = Path::new(path).parent() { - fs::create_dir_all(parent)?; - } - } - - let checkpoint = load_checkpoint(Path::new(checkpoint_path)); - if checkpoint.is_some() { - println!(" Restored checkpoint from {checkpoint_path}"); - } - - let signing_key = load_or_create_signing_key(Path::new(signing_key_path)); - let channel_id = ChannelId::from(signing_key.public_key().to_bytes()); - fs::write(channel_path, hex::encode(channel_id.as_ref())) - .expect("failed to write channel id"); - - let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), node_url); - let (sequencer, handle) = ZoneSequencer::init(channel_id, signing_key, node, checkpoint); - - Ok(Self { - sequencer, - handle, - state: InMemoryZoneState::default(), - queue_file: queue_file.to_owned(), - checkpoint_path: checkpoint_path.to_owned(), - }) - } - - pub async fn run(self) { - let Self { mut sequencer, handle, mut state, queue_file, checkpoint_path } = self; - - let mut batch_handle = handle.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(Duration::from_millis(100)); - loop { - interval.tick().await; - batch_handle.wait_ready().await; - if let Err(e) = process_pending_batch(&queue_file, &batch_handle).await { - error!("Batch processing failed: {e}"); - } - } - }); - - loop { - let Some(event) = sequencer.next_event().await else { continue; }; - handle_event(event, &handle, &mut state, &checkpoint_path).await; - } - } -} - -async fn handle_event( - event: Event, - handle: &SequencerHandle, - state: &mut InMemoryZoneState, - checkpoint_path: &str, -) { - match event { - Event::Ready => { - info!("Sequencer ready"); - } - Event::ChannelUpdate { orphaned, adopted } => { - state.on_adopted(&adopted); - for info in &orphaned { - state.on_orphaned(&info.this_msg); - debug!(msg_id = %hex::encode(info.this_msg.as_ref()), "Auto-republishing orphan"); - if let Err(e) = handle.publish_message(info.payload.clone()).await { - error!("failed to auto-republish: {e}"); - } - } - } - Event::TxsFinalized { inscriptions, .. } => { - state.on_finalized(&inscriptions); - } - Event::Published { info, checkpoint } => { - debug!(msg_id = %hex::encode(info.this_msg.as_ref()), "Published"); - state.on_published(&info); - save_checkpoint(Path::new(checkpoint_path), &checkpoint); - state.save_checkpoint(checkpoint); - } - Event::FinalizedInscriptions { inscriptions } => { - state.on_finalized(&inscriptions); - } - } -} - -async fn process_pending_batch( - queue_file: &str, - handle: &SequencerHandle, -) -> Result<()> { - let pending = queue_drain(queue_file)?; - if pending.is_empty() { - return Ok(()); - } - - let count = pending.len(); - debug!("Processing batch of {} queries", count); - - let sql_text = pending.join("\n").as_bytes().to_vec(); - if let Err(e) = handle.publish_message(sql_text).await { - error!("failed to publish batch: {e}"); - } else { - info!("Submitted batch of {} statement(s)", count); - } - - Ok(()) -} - -fn queue_drain(queue_file: &str) -> Result> { - let file = match OpenOptions::new().read(true).write(true).open(queue_file) { - Ok(f) => f, - Err(e) if e.kind() == io::ErrorKind::NotFound => return Ok(Vec::new()), - Err(e) => return Err(SequencerError::Io(e)), - }; - - file.lock_exclusive()?; - - let reader = BufReader::new(&file); - let mut queue_vec = Vec::new(); - for query in reader.lines() { - queue_vec.push(query?); - } - - file.set_len(0)?; - - Ok(queue_vec) -} +// Your Code Here \ No newline at end of file