mirror of
https://github.com/logos-blockchain/logos-sql-zone.git
synced 2026-06-07 10:19:32 +00:00
updated to work with latest version of zone-sdk
This commit is contained in:
parent
f5dee36207
commit
a4a9223169
3
Cargo.lock
generated
3
Cargo.lock
generated
@ -1715,7 +1715,9 @@ dependencies = [
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
"tracing",
|
||||
"tracing-subscriber 0.3.23",
|
||||
"uuid",
|
||||
"zeroize",
|
||||
"zxcvbn",
|
||||
]
|
||||
@ -7625,6 +7627,7 @@ checksum = "5ac8b6f42ead25368cf5b098aeb3dc8a1a2c05a3eee8a9a1a68c640edbfc79d9"
|
||||
dependencies = [
|
||||
"getrandom 0.4.2",
|
||||
"js-sys",
|
||||
"serde_core",
|
||||
"wasm-bindgen",
|
||||
]
|
||||
|
||||
|
||||
@ -76,6 +76,14 @@ lb-wallet-http-client = { default-features = false, package = "logo
|
||||
lb-wallet-service = { default-features = false, package = "logos-blockchain-wallet-service", path = "./logos-blockchain/services/wallet" }
|
||||
lb-witness-generator = { default-features = false, package = "logos-blockchain-witness-generator", path = "./logos-blockchain/zk/circuits/witness-generator" }
|
||||
lb-zksign = { default-features = false, package = "logos-blockchain-zksign", path = "./logos-blockchain/zk/proofs/zksign" }
|
||||
lb-log-targets = { default-features = false, package = "logos-blockchain-log-targets", path = "./logos-blockchain/tracing/targets" }
|
||||
lb-log-targets-macros = { default-features = false, package = "logos-blockchain-log-targets-macros", path = "./logos-blockchain/tracing/targets/macros" }
|
||||
const-str = { default-features = false, version = "0.4.3" }
|
||||
nutype = { default-features = false, version = "0.6.2" }
|
||||
proc-macro2 = { default-features = false, version = "1" }
|
||||
quote = { default-features = false, version = "1" }
|
||||
syn = { features = ["full"], version = "2" }
|
||||
tonic = { default-features = false, version = "0.14.5" }
|
||||
lb-zone-sdk = { default-features = false, package = "logos-blockchain-zone-sdk", path = "./logos-blockchain/zone-sdk" }
|
||||
lb_network = { default-features = false, package = "logos-blockchain-network-service", path = "./logos-blockchain/services/network" }
|
||||
testing-framework-core = { default-features = false, git = "https://github.com/logos-blockchain/logos-blockchain-testing.git", rev = "f731791" }
|
||||
|
||||
@ -25,10 +25,12 @@ nanosql = { features = ["chrono"], version = "0.10.0" }
|
||||
rand = { version = "0.9" }
|
||||
ratatui = { features = ["serde"], version = "0.29" }
|
||||
serde = { features = ["derive"], workspace = true }
|
||||
uuid = { features = ["serde", "v4"], version = "1" }
|
||||
serde_json = { workspace = true }
|
||||
thiserror = { workspace = true }
|
||||
tracing-subscriber = { features = ["env-filter"], version = "0.3" }
|
||||
zeroize = { features = ["alloc"], workspace = true }
|
||||
tracing = { workspace = true }
|
||||
|
||||
[dev-dependencies]
|
||||
rand = { version = "0.9" }
|
||||
|
||||
@ -59,7 +59,7 @@ impl Indexer {
|
||||
Ok(Self { zone_indexer, db_path: db_path.to_owned() })
|
||||
}
|
||||
|
||||
pub async fn run(&self) {
|
||||
pub async fn run(self) {
|
||||
let db = match DatabaseReadOnly::open(&self.db_path) {
|
||||
Ok(db) => db,
|
||||
Err(e) => {
|
||||
@ -82,10 +82,11 @@ impl Indexer {
|
||||
|
||||
futures::pin_mut!(stream);
|
||||
while let Some(zone_msg) = stream.next().await {
|
||||
let logos_blockchain_zone_sdk::ZoneMessage::Block(zone_block) = zone_msg else {
|
||||
continue;
|
||||
let data = match zone_msg {
|
||||
logos_blockchain_zone_sdk::ZoneMessage::Block(block) => block.data,
|
||||
logos_blockchain_zone_sdk::ZoneMessage::Deposit(_) => continue,
|
||||
};
|
||||
let sql_text = match String::from_utf8(zone_block.data) {
|
||||
let sql_text = match String::from_utf8(data) {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!("Zone block data is not valid UTF-8: {e}");
|
||||
@ -95,8 +96,8 @@ impl Indexer {
|
||||
|
||||
let statements: Vec<&str> = sql_text
|
||||
.lines()
|
||||
.map(|l| l.trim().trim_end_matches(';').trim())
|
||||
.filter(|s| !s.is_empty())
|
||||
.map(|l: &str| l.trim().trim_end_matches(';').trim())
|
||||
.filter(|s: &&str| !s.is_empty())
|
||||
.collect();
|
||||
|
||||
if statements.is_empty() {
|
||||
|
||||
@ -95,12 +95,9 @@ pub async fn run(args: IndexerArgs) -> Result<()> {
|
||||
};
|
||||
info!("Indexer ready");
|
||||
|
||||
let handle = tokio::runtime::Handle::current();
|
||||
std::thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to build tokio runtime for indexer");
|
||||
rt.block_on(indexer.run());
|
||||
handle.block_on(indexer.run());
|
||||
});
|
||||
info!("Background indexer started");
|
||||
|
||||
|
||||
@ -7,8 +7,6 @@ pub use demo_sqlite_common::{config, crypto, error, screen};
|
||||
|
||||
mod tui;
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use demo_sqlite_common::logging::RawModeWriter;
|
||||
use sequencer::Sequencer;
|
||||
@ -90,7 +88,6 @@ impl App {
|
||||
}
|
||||
}
|
||||
|
||||
#[expect(clippy::unused_async)]
|
||||
pub async fn run(args: SequencerArgs) -> Result<()> {
|
||||
tracing_subscriber::registry()
|
||||
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
|
||||
@ -110,8 +107,8 @@ pub async fn run(args: SequencerArgs) -> Result<()> {
|
||||
&args.queue_file,
|
||||
&args.checkpoint_path,
|
||||
&args.channel_path,
|
||||
) {
|
||||
Ok(s) => Arc::new(s),
|
||||
).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
error!("Sequencer initialization failed: {e}");
|
||||
std::process::exit(1);
|
||||
@ -119,9 +116,8 @@ pub async fn run(args: SequencerArgs) -> Result<()> {
|
||||
};
|
||||
info!("Sequencer ready");
|
||||
|
||||
let sequencer_clone = Arc::clone(&sequencer);
|
||||
tokio::spawn(async move {
|
||||
sequencer_clone.run_processing_loop().await;
|
||||
sequencer.run_processing_loop().await;
|
||||
});
|
||||
info!("Background processor started");
|
||||
|
||||
|
||||
@ -13,7 +13,7 @@ use lb_core::mantle::ops::channel::ChannelId;
|
||||
use lb_key_management_system_service::keys::{ED25519_SECRET_KEY_SIZE, Ed25519Key};
|
||||
use logos_blockchain_zone_sdk::adapter::NodeHttpClient;
|
||||
use logos_blockchain_zone_sdk::sequencer::{
|
||||
Error as ZoneSequencerError, SequencerCheckpoint, SequencerHandle, ZoneSequencer,
|
||||
Error as ZoneSequencerError, SequencerHandle, ZoneSequencer, SequencerCheckpoint,
|
||||
};
|
||||
use nanosql::rusqlite::Error as SqliteError;
|
||||
use reqwest::Url;
|
||||
@ -41,34 +41,11 @@ pub enum SequencerError {
|
||||
|
||||
pub type Result<T> = std::result::Result<T, SequencerError>;
|
||||
|
||||
/// The sequencer that handles transactions using the Zone SDK
|
||||
/// The sequencer that handles transactions using the Zone SDK.
|
||||
pub struct Sequencer {
|
||||
handle: SequencerHandle<NodeHttpClient>,
|
||||
queue_file: String,
|
||||
checkpoint_path: String,
|
||||
}
|
||||
|
||||
/// Load signing key from file or generate a new one if it doesn't exist
|
||||
fn load_or_create_signing_key(path: &Path) -> Result<Ed25519Key> {
|
||||
if path.exists() {
|
||||
debug!("Loading existing signing key from {:?}", path);
|
||||
let key_bytes = fs::read(path)?;
|
||||
if key_bytes.len() != ED25519_SECRET_KEY_SIZE {
|
||||
return Err(SequencerError::InvalidKeyFile {
|
||||
expected: ED25519_SECRET_KEY_SIZE,
|
||||
actual: key_bytes.len(),
|
||||
});
|
||||
}
|
||||
let key_array: [u8; ED25519_SECRET_KEY_SIZE] =
|
||||
key_bytes.try_into().expect("length already checked");
|
||||
Ok(Ed25519Key::from_bytes(&key_array))
|
||||
} else {
|
||||
debug!("Generating new signing key and saving to {:?}", path);
|
||||
let mut key_bytes = [0u8; ED25519_SECRET_KEY_SIZE];
|
||||
rand::RngCore::fill_bytes(&mut rand::rng(), &mut key_bytes);
|
||||
fs::write(path, key_bytes)?;
|
||||
Ok(Ed25519Key::from_bytes(&key_bytes))
|
||||
}
|
||||
pub checkpoint_path: String,
|
||||
}
|
||||
|
||||
fn save_checkpoint(path: &Path, checkpoint: &SequencerCheckpoint) {
|
||||
@ -84,8 +61,29 @@ fn load_checkpoint(path: &Path) -> Option<SequencerCheckpoint> {
|
||||
Some(serde_json::from_slice(&data).expect("failed to deserialize checkpoint"))
|
||||
}
|
||||
|
||||
/// Load signing key from file or generate a new one if it doesn't exist.
|
||||
fn load_or_create_signing_key(path: &Path) -> Ed25519Key {
|
||||
if path.exists() {
|
||||
let key_bytes = fs::read(path).expect("failed to read key file");
|
||||
assert!(
|
||||
key_bytes.len() == ED25519_SECRET_KEY_SIZE,
|
||||
"invalid key file: expected {} bytes, got {}",
|
||||
ED25519_SECRET_KEY_SIZE,
|
||||
key_bytes.len()
|
||||
);
|
||||
let key_array: [u8; ED25519_SECRET_KEY_SIZE] =
|
||||
key_bytes.try_into().expect("length already checked");
|
||||
Ed25519Key::from_bytes(&key_array)
|
||||
} else {
|
||||
let mut key_bytes = [0u8; ED25519_SECRET_KEY_SIZE];
|
||||
rand::RngCore::fill_bytes(&mut rand::rng(), &mut key_bytes);
|
||||
fs::write(path, key_bytes).expect("failed to write key file");
|
||||
Ed25519Key::from_bytes(&key_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl Sequencer {
|
||||
pub fn new(
|
||||
pub async fn new(
|
||||
node_endpoint: &str,
|
||||
signing_key_path: &str,
|
||||
node_auth_username: Option<String>,
|
||||
@ -96,8 +94,6 @@ impl Sequencer {
|
||||
) -> Result<Self> {
|
||||
let node_url = Url::parse(node_endpoint).map_err(|e| SequencerError::Url(e.to_string()))?;
|
||||
|
||||
info!("{}", node_url.clone().to_string());
|
||||
|
||||
let basic_auth = node_auth_username
|
||||
.map(|username| BasicAuthCredentials::new(username, node_auth_password));
|
||||
|
||||
@ -112,15 +108,21 @@ impl Sequencer {
|
||||
println!(" Restored checkpoint from {checkpoint_path}");
|
||||
}
|
||||
|
||||
let signing_key = load_or_create_signing_key(Path::new(signing_key_path))?;
|
||||
let signing_key = load_or_create_signing_key(Path::new(signing_key_path));
|
||||
let channel_id = ChannelId::from(signing_key.public_key().to_bytes());
|
||||
fs::write(channel_path, hex::encode(channel_id.as_ref()))
|
||||
.expect("failed to write channel id");
|
||||
|
||||
let node = NodeHttpClient::new(CommonHttpClient::new(basic_auth), node_url);
|
||||
let (zone_sequencer, handle) = ZoneSequencer::init(channel_id, signing_key, node, checkpoint);
|
||||
let (zone_sequencer, mut handle) =
|
||||
ZoneSequencer::init(channel_id, signing_key, node, checkpoint);
|
||||
|
||||
zone_sequencer.spawn();
|
||||
|
||||
println!("Connecting to node...");
|
||||
handle.wait_ready().await;
|
||||
println!("Sequencer ready.");
|
||||
|
||||
Ok(Self {
|
||||
handle,
|
||||
queue_file: queue_file.to_owned(),
|
||||
@ -128,7 +130,7 @@ impl Sequencer {
|
||||
})
|
||||
}
|
||||
|
||||
/// Drain the queue file and return all pending queries
|
||||
/// Drain the queue file and return all pending SQL statements.
|
||||
fn queue_drain(&self) -> Result<Vec<String>> {
|
||||
let file = OpenOptions::new()
|
||||
.read(true)
|
||||
@ -148,7 +150,7 @@ impl Sequencer {
|
||||
Ok(queue_vec)
|
||||
}
|
||||
|
||||
/// Process all pending queries as a single inscription
|
||||
/// Bundle all pending SQL statements into one inscription and publish it.
|
||||
async fn process_pending_batch(&self) -> Result<()> {
|
||||
let pending = self.queue_drain()?;
|
||||
if pending.is_empty() {
|
||||
@ -158,20 +160,22 @@ impl Sequencer {
|
||||
let count = pending.len();
|
||||
debug!("Processing batch of {} queries", count);
|
||||
|
||||
let data = pending.join("\n").into_bytes();
|
||||
let result = self.handle.publish_message(data).await?;
|
||||
|
||||
info!(
|
||||
"Inscription published with tx_hash: {:?}",
|
||||
result.inscription_id
|
||||
);
|
||||
|
||||
save_checkpoint(Path::new(&self.checkpoint_path), &result.checkpoint);
|
||||
let sql_text = pending.join("\n").as_bytes().to_vec();
|
||||
|
||||
match self.handle.publish_message(sql_text).await {
|
||||
Ok(result) => {
|
||||
info!("Submitted batch of {} statement(s)", count);
|
||||
save_checkpoint(Path::new(&self.checkpoint_path), &result.checkpoint);
|
||||
}
|
||||
Err(e) => {
|
||||
println!(" error: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check if the queue file is empty
|
||||
/// Check if the queue file is empty.
|
||||
pub fn queue_is_empty(&self) -> Result<bool> {
|
||||
match fs::metadata(self.queue_file.clone()) {
|
||||
Ok(meta) => Ok(meta.len() == 0),
|
||||
@ -180,7 +184,7 @@ impl Sequencer {
|
||||
}
|
||||
}
|
||||
|
||||
/// Background processing loop - call this in a spawned task
|
||||
/// Background processing loop — call this in a spawned task.
|
||||
pub async fn run_processing_loop(&self) {
|
||||
let poll_interval = Duration::from_millis(100);
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user