mirror of
https://github.com/logos-blockchain/lssa.git
synced 2026-02-19 04:43:36 +00:00
Integrate indexer client into sequencer
This commit is contained in:
parent
616159f936
commit
e0729a1725
6
Cargo.lock
generated
6
Cargo.lock
generated
@ -3232,6 +3232,7 @@ dependencies = [
|
||||
"indexer_service_rpc",
|
||||
"jsonrpsee",
|
||||
"log",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
"tokio-util",
|
||||
]
|
||||
@ -6446,6 +6447,7 @@ dependencies = [
|
||||
"chrono",
|
||||
"common",
|
||||
"futures",
|
||||
"jsonrpsee",
|
||||
"log",
|
||||
"logos-blockchain-core",
|
||||
"logos-blockchain-key-management-system-service",
|
||||
@ -6459,6 +6461,7 @@ dependencies = [
|
||||
"storage",
|
||||
"tempfile",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@ -6495,6 +6498,9 @@ dependencies = [
|
||||
"clap 4.5.53",
|
||||
"common",
|
||||
"env_logger",
|
||||
"futures",
|
||||
"indexer_service_protocol",
|
||||
"indexer_service_rpc",
|
||||
"log",
|
||||
"sequencer_core",
|
||||
"sequencer_rpc",
|
||||
|
||||
@ -69,14 +69,14 @@ impl IndexerCore {
|
||||
}
|
||||
}
|
||||
|
||||
// Sending data into sequencer, may need to be expanded.
|
||||
let message = Message::L2BlockFinalized {
|
||||
l2_block_height: l2_block.block_id,
|
||||
};
|
||||
// // Sending data into sequencer, may need to be expanded.
|
||||
// let message = Message::L2BlockFinalized {
|
||||
// l2_block_height: l2_block.block_id,
|
||||
// };
|
||||
|
||||
let status = self.send_message_to_sequencer(message.clone()).await?;
|
||||
// let status = self.send_message_to_sequencer(message.clone()).await?;
|
||||
|
||||
info!("Sent message {message:#?} to sequencer; status {status:#?}");
|
||||
// info!("Sent message {message:#?} to sequencer; status {status:#?}");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -15,6 +15,7 @@ tokio-util.workspace = true
|
||||
env_logger.workspace = true
|
||||
log.workspace = true
|
||||
jsonrpsee.workspace = true
|
||||
serde_json.workspace = true
|
||||
async-trait = "0.1.89"
|
||||
|
||||
[features]
|
||||
|
||||
@ -23,8 +23,8 @@ pub trait Rpc {
|
||||
Ok(serde_json::to_value(block_schema).expect("Schema serialization should not fail"))
|
||||
}
|
||||
|
||||
#[subscription(name = "subscribeToBlocks", item = Vec<Block>)]
|
||||
async fn subscribe_to_blocks(&self, from: BlockId) -> SubscriptionResult;
|
||||
#[subscription(name = "subscribeToFinalizedBlocks", item = Block)]
|
||||
async fn subscribe_to_finalized_blocks(&self, from: BlockId) -> SubscriptionResult;
|
||||
|
||||
#[method(name = "getBlockById")]
|
||||
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned>;
|
||||
|
||||
@ -2,6 +2,7 @@ use std::{net::SocketAddr, path::PathBuf};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use clap::Parser;
|
||||
use indexer_core::config::IndexerConfig;
|
||||
use indexer_service_rpc::RpcServer as _;
|
||||
use jsonrpsee::server::Server;
|
||||
use log::{error, info};
|
||||
@ -10,7 +11,7 @@ use tokio_util::sync::CancellationToken;
|
||||
#[derive(Debug, Parser)]
|
||||
#[clap(version)]
|
||||
struct Args {
|
||||
#[clap(rename = "config")]
|
||||
#[clap(name = "config")]
|
||||
config_path: PathBuf,
|
||||
#[clap(short, long, default_value = "8779")]
|
||||
port: u16,
|
||||
@ -42,7 +43,9 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
|
||||
async fn run_server(config_path: PathBuf, port: u16) -> Result<jsonrpsee::server::ServerHandle> {
|
||||
let config = IndexerServiceConfig::from_file(&config_path)?;
|
||||
let config = IndexerConfig::from_path(&config_path)?;
|
||||
#[cfg(feature = "mock-responses")]
|
||||
let _ = config;
|
||||
|
||||
let server = Server::builder()
|
||||
.build(SocketAddr::from(([0, 0, 0, 0], port)))
|
||||
|
||||
@ -163,13 +163,17 @@ impl MockIndexerService {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl indexer_service_rpc::RpcServer for MockIndexerService {
|
||||
async fn subscribe_to_blocks(
|
||||
async fn subscribe_to_finalized_blocks(
|
||||
&self,
|
||||
_subscription_sink: jsonrpsee::PendingSubscriptionSink,
|
||||
_from: BlockId,
|
||||
subscription_sink: jsonrpsee::PendingSubscriptionSink,
|
||||
from: BlockId,
|
||||
) -> SubscriptionResult {
|
||||
// Subscription not implemented for mock service
|
||||
Err("Subscriptions not supported in mock service".into())
|
||||
let sink = subscription_sink.accept().await?;
|
||||
for block in self.blocks.iter().filter(|b| b.header.block_id >= from) {
|
||||
let json = serde_json::value::to_raw_value(block).unwrap();
|
||||
sink.send(json).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_block_by_id(&self, block_id: BlockId) -> Result<Block, ErrorObjectOwned> {
|
||||
|
||||
@ -17,7 +17,7 @@ impl IndexerService {
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl indexer_service_rpc::RpcServer for IndexerService {
|
||||
async fn subscribe_to_blocks(
|
||||
async fn subscribe_to_finalized_blocks(
|
||||
&self,
|
||||
_subscription_sink: jsonrpsee::PendingSubscriptionSink,
|
||||
_from: BlockId,
|
||||
|
||||
@ -2,19 +2,18 @@
|
||||
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::LazyLock};
|
||||
|
||||
use actix_web::dev::ServerHandle;
|
||||
use anyhow::{Context, Result};
|
||||
use base64::{Engine, engine::general_purpose::STANDARD as BASE64};
|
||||
use common::{
|
||||
sequencer_client::SequencerClient,
|
||||
transaction::{EncodedTransaction, NSSATransaction},
|
||||
};
|
||||
use futures::FutureExt as _;
|
||||
use indexer_core::{IndexerCore, config::IndexerConfig};
|
||||
use log::debug;
|
||||
use nssa::PrivacyPreservingTransaction;
|
||||
use nssa_core::Commitment;
|
||||
use sequencer_core::config::SequencerConfig;
|
||||
use sequencer_runner::SequencerHandle;
|
||||
use tempfile::TempDir;
|
||||
use tokio::task::JoinHandle;
|
||||
use url::Url;
|
||||
@ -38,9 +37,7 @@ static LOGGER: LazyLock<()> = LazyLock::new(env_logger::init);
|
||||
/// It's memory and logically safe to create multiple instances of this struct in parallel tests,
|
||||
/// as each instance uses its own temporary directories for sequencer and wallet data.
|
||||
pub struct TestContext {
|
||||
sequencer_server_handle: ServerHandle,
|
||||
sequencer_loop_handle: JoinHandle<Result<()>>,
|
||||
sequencer_retry_pending_blocks_handle: JoinHandle<Result<()>>,
|
||||
_sequencer_handle: SequencerHandle,
|
||||
indexer_loop_handle: Option<JoinHandle<Result<()>>>,
|
||||
sequencer_client: SequencerClient,
|
||||
wallet: WalletCore,
|
||||
@ -95,15 +92,10 @@ impl TestContext {
|
||||
|
||||
debug!("Test context setup");
|
||||
|
||||
let (
|
||||
sequencer_server_handle,
|
||||
sequencer_addr,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
temp_sequencer_dir,
|
||||
) = Self::setup_sequencer(sequencer_config)
|
||||
.await
|
||||
.context("Failed to setup sequencer")?;
|
||||
let (_sequencer_handle, sequencer_addr, temp_sequencer_dir) =
|
||||
Self::setup_sequencer(sequencer_config)
|
||||
.await
|
||||
.context("Failed to setup sequencer")?;
|
||||
|
||||
// Convert 0.0.0.0 to 127.0.0.1 for client connections
|
||||
// When binding to port 0, the server binds to 0.0.0.0:<random_port>
|
||||
@ -123,10 +115,7 @@ impl TestContext {
|
||||
)
|
||||
.context("Failed to create sequencer client")?;
|
||||
|
||||
if let Some(mut indexer_config) = indexer_config {
|
||||
indexer_config.sequencer_client_config.addr =
|
||||
Url::parse(&sequencer_addr).context("Failed to parse sequencer addr")?;
|
||||
|
||||
if let Some(indexer_config) = indexer_config {
|
||||
let indexer_core = IndexerCore::new(indexer_config)?;
|
||||
|
||||
let indexer_loop_handle = Some(tokio::spawn(async move {
|
||||
@ -134,9 +123,7 @@ impl TestContext {
|
||||
}));
|
||||
|
||||
Ok(Self {
|
||||
sequencer_server_handle,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
_sequencer_handle,
|
||||
indexer_loop_handle,
|
||||
sequencer_client,
|
||||
wallet,
|
||||
@ -145,9 +132,7 @@ impl TestContext {
|
||||
})
|
||||
} else {
|
||||
Ok(Self {
|
||||
sequencer_server_handle,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
_sequencer_handle,
|
||||
indexer_loop_handle: None,
|
||||
sequencer_client,
|
||||
wallet,
|
||||
@ -159,13 +144,7 @@ impl TestContext {
|
||||
|
||||
async fn setup_sequencer(
|
||||
mut config: SequencerConfig,
|
||||
) -> Result<(
|
||||
ServerHandle,
|
||||
SocketAddr,
|
||||
JoinHandle<Result<()>>,
|
||||
JoinHandle<Result<()>>,
|
||||
TempDir,
|
||||
)> {
|
||||
) -> Result<(SequencerHandle, SocketAddr, TempDir)> {
|
||||
let temp_sequencer_dir =
|
||||
tempfile::tempdir().context("Failed to create temp dir for sequencer home")?;
|
||||
|
||||
@ -177,20 +156,10 @@ impl TestContext {
|
||||
// Setting port to 0 lets the OS choose a free port for us
|
||||
config.port = 0;
|
||||
|
||||
let (
|
||||
sequencer_server_handle,
|
||||
sequencer_addr,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
) = sequencer_runner::startup_sequencer(config).await?;
|
||||
let (sequencer_handle, sequencer_addr) =
|
||||
sequencer_runner::startup_sequencer(config).await?;
|
||||
|
||||
Ok((
|
||||
sequencer_server_handle,
|
||||
sequencer_addr,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
temp_sequencer_dir,
|
||||
))
|
||||
Ok((sequencer_handle, sequencer_addr, temp_sequencer_dir))
|
||||
}
|
||||
|
||||
async fn setup_wallet(sequencer_addr: String) -> Result<(WalletCore, TempDir)> {
|
||||
@ -247,9 +216,7 @@ impl Drop for TestContext {
|
||||
debug!("Test context cleanup");
|
||||
|
||||
let Self {
|
||||
sequencer_server_handle,
|
||||
sequencer_loop_handle,
|
||||
sequencer_retry_pending_blocks_handle,
|
||||
_sequencer_handle,
|
||||
indexer_loop_handle,
|
||||
sequencer_client: _,
|
||||
wallet: _,
|
||||
@ -257,14 +224,9 @@ impl Drop for TestContext {
|
||||
_temp_wallet_dir,
|
||||
} = self;
|
||||
|
||||
sequencer_loop_handle.abort();
|
||||
sequencer_retry_pending_blocks_handle.abort();
|
||||
if let Some(indexer_loop_handle) = indexer_loop_handle {
|
||||
indexer_loop_handle.abort();
|
||||
}
|
||||
|
||||
// Can't wait here as Drop can't be async, but anyway stop signal should be sent
|
||||
sequencer_server_handle.stop(true).now_or_never();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -191,6 +191,7 @@ impl TpsTestManager {
|
||||
signing_key: [37; 32],
|
||||
bedrock_config: None,
|
||||
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
|
||||
indexer_rpc_url: "http://localhost:8779".parse().unwrap(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -9,6 +9,7 @@ nssa_core.workspace = true
|
||||
common.workspace = true
|
||||
storage.workspace = true
|
||||
mempool.workspace = true
|
||||
bedrock_client.workspace = true
|
||||
|
||||
base58.workspace = true
|
||||
anyhow.workspace = true
|
||||
@ -18,12 +19,13 @@ tempfile.workspace = true
|
||||
chrono.workspace = true
|
||||
log.workspace = true
|
||||
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
|
||||
bedrock_client.workspace = true
|
||||
logos-blockchain-key-management-system-service.workspace = true
|
||||
logos-blockchain-core.workspace = true
|
||||
rand.workspace = true
|
||||
reqwest.workspace = true
|
||||
borsh.workspace = true
|
||||
url.workspace = true
|
||||
jsonrpsee = { workspace = true, features = ["http-client"] }
|
||||
|
||||
[features]
|
||||
default = []
|
||||
|
||||
@ -8,6 +8,7 @@ use anyhow::Result;
|
||||
use common::config::BasicAuth;
|
||||
use logos_blockchain_core::mantle::ops::channel::ChannelId;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||
/// Helperstruct for account serialization
|
||||
@ -53,6 +54,8 @@ pub struct SequencerConfig {
|
||||
pub signing_key: [u8; 32],
|
||||
/// Bedrock configuration options
|
||||
pub bedrock_config: Option<BedrockConfig>,
|
||||
/// Indexer RPC URL
|
||||
pub indexer_rpc_url: Url,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
@ -60,7 +63,7 @@ pub struct BedrockConfig {
|
||||
/// Bedrock channel ID
|
||||
pub channel_id: ChannelId,
|
||||
/// Bedrock Url
|
||||
pub node_url: String,
|
||||
pub node_url: Url,
|
||||
/// Bedrock auth
|
||||
pub auth: Option<BasicAuth>,
|
||||
}
|
||||
|
||||
@ -19,6 +19,8 @@ mod block_settlement_client;
|
||||
pub mod block_store;
|
||||
pub mod config;
|
||||
|
||||
type IndexerClient = jsonrpsee::http_client::HttpClient;
|
||||
|
||||
pub struct SequencerCore {
|
||||
state: nssa::V02State,
|
||||
store: SequencerStore,
|
||||
@ -26,6 +28,7 @@ pub struct SequencerCore {
|
||||
sequencer_config: SequencerConfig,
|
||||
chain_height: u64,
|
||||
block_settlement_client: Option<BlockSettlementClient>,
|
||||
indexer_client: IndexerClient,
|
||||
last_bedrock_msg_id: MantleMsgId,
|
||||
}
|
||||
|
||||
@ -113,6 +116,10 @@ impl SequencerCore {
|
||||
.expect("Block settlement client should be constructible")
|
||||
});
|
||||
|
||||
let indexer_client = jsonrpsee::http_client::HttpClientBuilder::default()
|
||||
.build(config.indexer_rpc_url.clone())
|
||||
.expect("Failed to create Indexer client");
|
||||
|
||||
let sequencer_core = Self {
|
||||
state,
|
||||
store,
|
||||
@ -120,6 +127,7 @@ impl SequencerCore {
|
||||
chain_height: config.genesis_id,
|
||||
sequencer_config: config,
|
||||
block_settlement_client,
|
||||
indexer_client,
|
||||
last_bedrock_msg_id: channel_genesis_msg_id,
|
||||
};
|
||||
|
||||
@ -252,6 +260,14 @@ impl SequencerCore {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn first_pending_block_id(&self) -> Result<Option<u64>> {
|
||||
Ok(self
|
||||
.get_pending_blocks()?
|
||||
.iter()
|
||||
.map(|block| block.header.block_id)
|
||||
.min())
|
||||
}
|
||||
|
||||
/// Returns the list of stored pending blocks.
|
||||
pub fn get_pending_blocks(&self) -> Result<Vec<Block>> {
|
||||
Ok(self
|
||||
@ -266,6 +282,10 @@ impl SequencerCore {
|
||||
pub fn block_settlement_client(&self) -> Option<BlockSettlementClient> {
|
||||
self.block_settlement_client.clone()
|
||||
}
|
||||
|
||||
pub fn indexer_client(&self) -> &IndexerClient {
|
||||
&self.indexer_client
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Introduce type-safe wrapper around checked transaction, e.g. AuthenticatedTransaction
|
||||
@ -329,6 +349,7 @@ mod tests {
|
||||
signing_key: *sequencer_sign_key_for_testing().value(),
|
||||
bedrock_config: None,
|
||||
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
|
||||
indexer_rpc_url: "http://localhost:8779".parse().unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -18,8 +18,8 @@ use common::{
|
||||
GetInitialTestnetAccountsRequest, GetLastBlockRequest, GetLastBlockResponse,
|
||||
GetProgramIdsRequest, GetProgramIdsResponse, GetProofForCommitmentRequest,
|
||||
GetProofForCommitmentResponse, GetTransactionByHashRequest,
|
||||
GetTransactionByHashResponse, HelloRequest, HelloResponse, PostIndexerMessageRequest,
|
||||
PostIndexerMessageResponse, SendTxRequest, SendTxResponse,
|
||||
GetTransactionByHashResponse, HelloRequest, HelloResponse, SendTxRequest,
|
||||
SendTxResponse,
|
||||
},
|
||||
},
|
||||
transaction::{EncodedTransaction, NSSATransaction},
|
||||
@ -341,7 +341,7 @@ mod tests {
|
||||
use base58::ToBase58;
|
||||
use base64::{Engine, engine::general_purpose};
|
||||
use common::{
|
||||
sequencer_client::BasicAuth, test_utils::sequencer_sign_key_for_testing,
|
||||
config::BasicAuth, test_utils::sequencer_sign_key_for_testing,
|
||||
transaction::EncodedTransaction,
|
||||
};
|
||||
use sequencer_core::{
|
||||
@ -394,12 +394,13 @@ mod tests {
|
||||
retry_pending_blocks_timeout_millis: 1000 * 60 * 4,
|
||||
bedrock_config: Some(BedrockConfig {
|
||||
channel_id: [42; 32].into(),
|
||||
node_url: "http://localhost:8080".to_string(),
|
||||
node_url: "http://localhost:8080".parse().unwrap(),
|
||||
auth: Some(BasicAuth {
|
||||
username: "user".to_string(),
|
||||
password: None,
|
||||
}),
|
||||
}),
|
||||
indexer_rpc_url: "http://localhost:8779".parse().unwrap(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@ -7,6 +7,8 @@ edition = "2024"
|
||||
common.workspace = true
|
||||
sequencer_core = { workspace = true, features = ["testnet"] }
|
||||
sequencer_rpc.workspace = true
|
||||
indexer_service_protocol.workspace = true
|
||||
indexer_service_rpc = { workspace = true, features = ["client"] }
|
||||
|
||||
clap = { workspace = true, features = ["derive", "env"] }
|
||||
anyhow.workspace = true
|
||||
@ -15,3 +17,4 @@ log.workspace = true
|
||||
actix.workspace = true
|
||||
actix-web.workspace = true
|
||||
tokio.workspace = true
|
||||
futures.workspace = true
|
||||
|
||||
@ -1,10 +1,11 @@
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
|
||||
|
||||
use actix_web::dev::ServerHandle;
|
||||
use anyhow::Result;
|
||||
use anyhow::{Context as _, Result};
|
||||
use clap::Parser;
|
||||
use common::rpc_primitives::RpcConfig;
|
||||
use log::{info, warn};
|
||||
use futures::FutureExt as _;
|
||||
use log::{debug, error, info, warn};
|
||||
use sequencer_core::{SequencerCore, config::SequencerConfig};
|
||||
use sequencer_rpc::new_http_server;
|
||||
use tokio::{sync::Mutex, task::JoinHandle};
|
||||
@ -18,16 +19,76 @@ struct Args {
|
||||
home_dir: PathBuf,
|
||||
}
|
||||
|
||||
/// An enum that can never be instantiated, used to replace unstable `!` type.
|
||||
#[derive(Debug)]
|
||||
pub enum Never {}
|
||||
|
||||
/// Handle to manage the sequencer and its tasks.
|
||||
///
|
||||
/// Implements `Drop` to ensure all tasks are aborted and the HTTP server is stopped when dropped.
|
||||
pub struct SequencerHandle {
|
||||
http_server_handle: ServerHandle,
|
||||
main_loop_handle: JoinHandle<Result<Never>>,
|
||||
retry_pending_blocks_loop_handle: JoinHandle<Result<Never>>,
|
||||
listen_for_bedrock_blocks_loop_handle: JoinHandle<Result<Never>>,
|
||||
}
|
||||
|
||||
impl SequencerHandle {
|
||||
/// Runs the sequencer indefinitely, monitoring its tasks.
|
||||
///
|
||||
/// If no error occurs, this function will never return.
|
||||
async fn run_forever(&mut self) -> Result<Never> {
|
||||
let Self {
|
||||
http_server_handle: _,
|
||||
main_loop_handle,
|
||||
retry_pending_blocks_loop_handle,
|
||||
listen_for_bedrock_blocks_loop_handle,
|
||||
} = self;
|
||||
|
||||
tokio::select! {
|
||||
res = main_loop_handle => {
|
||||
res
|
||||
.context("Main loop task panicked")?
|
||||
.context("Main loop exited unexpectedly")
|
||||
}
|
||||
res = retry_pending_blocks_loop_handle => {
|
||||
res
|
||||
.context("Retry pending blocks loop task panicked")?
|
||||
.context("Retry pending blocks loop exited unexpectedly")
|
||||
}
|
||||
res = listen_for_bedrock_blocks_loop_handle => {
|
||||
res
|
||||
.context("Listen for bedrock blocks loop task panicked")?
|
||||
.context("Listen for bedrock blocks loop exited unexpectedly")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Drop for SequencerHandle {
|
||||
fn drop(&mut self) {
|
||||
let Self {
|
||||
http_server_handle,
|
||||
main_loop_handle,
|
||||
retry_pending_blocks_loop_handle,
|
||||
listen_for_bedrock_blocks_loop_handle,
|
||||
} = self;
|
||||
|
||||
main_loop_handle.abort();
|
||||
retry_pending_blocks_loop_handle.abort();
|
||||
listen_for_bedrock_blocks_loop_handle.abort();
|
||||
|
||||
// Can't wait here as Drop can't be async, but anyway stop signal should be sent
|
||||
http_server_handle.stop(true).now_or_never();
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn startup_sequencer(
|
||||
app_config: SequencerConfig,
|
||||
) -> Result<(
|
||||
ServerHandle,
|
||||
SocketAddr,
|
||||
JoinHandle<Result<()>>,
|
||||
JoinHandle<Result<()>>,
|
||||
)> {
|
||||
let block_timeout = app_config.block_create_timeout_millis;
|
||||
let retry_pending_blocks_timeout = app_config.retry_pending_blocks_timeout_millis;
|
||||
) -> Result<(SequencerHandle, SocketAddr)> {
|
||||
let block_timeout = Duration::from_millis(app_config.block_create_timeout_millis);
|
||||
let retry_pending_blocks_timeout =
|
||||
Duration::from_millis(app_config.retry_pending_blocks_timeout_millis);
|
||||
let port = app_config.port;
|
||||
|
||||
let (sequencer_core, mempool_handle) = SequencerCore::start_from_config(app_config);
|
||||
@ -45,69 +106,128 @@ pub async fn startup_sequencer(
|
||||
let http_server_handle = http_server.handle();
|
||||
tokio::spawn(http_server);
|
||||
|
||||
info!("Starting pending block retry loop");
|
||||
let seq_core_wrapped_for_block_retry = seq_core_wrapped.clone();
|
||||
let retry_pending_blocks_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(
|
||||
retry_pending_blocks_timeout,
|
||||
))
|
||||
.await;
|
||||
|
||||
let (pending_blocks, block_settlement_client) = {
|
||||
let sequencer_core = seq_core_wrapped_for_block_retry.lock().await;
|
||||
let client = sequencer_core.block_settlement_client();
|
||||
let pending_blocks = sequencer_core
|
||||
.get_pending_blocks()
|
||||
.expect("Sequencer should be able to retrieve pending blocks");
|
||||
(pending_blocks, client)
|
||||
};
|
||||
|
||||
let Some(client) = block_settlement_client else {
|
||||
continue;
|
||||
};
|
||||
|
||||
info!("Resubmitting {} pending blocks", pending_blocks.len());
|
||||
for block in &pending_blocks {
|
||||
if let Err(e) = client.submit_block_to_bedrock(block).await {
|
||||
warn!(
|
||||
"Failed to resubmit block with id {} with error {}",
|
||||
block.header.block_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
info!("Starting main sequencer loop");
|
||||
let main_loop_handle = tokio::spawn(async move {
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_millis(block_timeout)).await;
|
||||
let main_loop_handle = tokio::spawn(main_loop(Arc::clone(&seq_core_wrapped), block_timeout));
|
||||
|
||||
info!("Collecting transactions from mempool, block creation");
|
||||
info!("Starting pending block retry loop");
|
||||
let retry_pending_blocks_loop_handle = tokio::spawn(retry_pending_blocks_loop(
|
||||
Arc::clone(&seq_core_wrapped),
|
||||
retry_pending_blocks_timeout,
|
||||
));
|
||||
|
||||
let id = {
|
||||
let mut state = seq_core_wrapped.lock().await;
|
||||
|
||||
state
|
||||
.produce_new_block_and_post_to_settlement_layer()
|
||||
.await?
|
||||
};
|
||||
|
||||
info!("Block with id {id} created");
|
||||
|
||||
info!("Waiting for new transactions");
|
||||
}
|
||||
});
|
||||
info!("Starting bedrock block listening loop");
|
||||
let listen_for_bedrock_blocks_loop_handle =
|
||||
tokio::spawn(listen_for_bedrock_blocks_loop(seq_core_wrapped));
|
||||
|
||||
Ok((
|
||||
http_server_handle,
|
||||
SequencerHandle {
|
||||
http_server_handle,
|
||||
main_loop_handle,
|
||||
retry_pending_blocks_loop_handle,
|
||||
listen_for_bedrock_blocks_loop_handle,
|
||||
},
|
||||
addr,
|
||||
main_loop_handle,
|
||||
retry_pending_blocks_handle,
|
||||
))
|
||||
}
|
||||
|
||||
async fn main_loop(seq_core: Arc<Mutex<SequencerCore>>, block_timeout: Duration) -> Result<Never> {
|
||||
loop {
|
||||
tokio::time::sleep(block_timeout).await;
|
||||
|
||||
info!("Collecting transactions from mempool, block creation");
|
||||
|
||||
let id = {
|
||||
let mut state = seq_core.lock().await;
|
||||
|
||||
state
|
||||
.produce_new_block_and_post_to_settlement_layer()
|
||||
.await?
|
||||
};
|
||||
|
||||
info!("Block with id {id} created");
|
||||
|
||||
info!("Waiting for new transactions");
|
||||
}
|
||||
}
|
||||
|
||||
async fn retry_pending_blocks_loop(
|
||||
seq_core: Arc<Mutex<SequencerCore>>,
|
||||
retry_pending_blocks_timeout: Duration,
|
||||
) -> Result<Never> {
|
||||
loop {
|
||||
tokio::time::sleep(retry_pending_blocks_timeout).await;
|
||||
|
||||
let (pending_blocks, block_settlement_client) = {
|
||||
let sequencer_core = seq_core.lock().await;
|
||||
let client = sequencer_core.block_settlement_client();
|
||||
let pending_blocks = sequencer_core
|
||||
.get_pending_blocks()
|
||||
.expect("Sequencer should be able to retrieve pending blocks");
|
||||
(pending_blocks, client)
|
||||
};
|
||||
|
||||
let Some(client) = block_settlement_client else {
|
||||
continue;
|
||||
};
|
||||
|
||||
info!("Resubmitting {} pending blocks", pending_blocks.len());
|
||||
for block in &pending_blocks {
|
||||
if let Err(e) = client.submit_block_to_bedrock(block).await {
|
||||
warn!(
|
||||
"Failed to resubmit block with id {} with error {}",
|
||||
block.header.block_id, e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn listen_for_bedrock_blocks_loop(seq_core: Arc<Mutex<SequencerCore>>) -> Result<Never> {
|
||||
use indexer_service_rpc::RpcClient as _;
|
||||
|
||||
let indexer_client = seq_core.lock().await.indexer_client().clone();
|
||||
|
||||
loop {
|
||||
let first_pending_block_id = {
|
||||
let sequencer_core = seq_core.lock().await;
|
||||
|
||||
sequencer_core
|
||||
.first_pending_block_id()
|
||||
.context("Failed to get first pending block ID")?
|
||||
.unwrap_or(sequencer_core.chain_height())
|
||||
};
|
||||
|
||||
info!("Subscribing to blocks from ID {first_pending_block_id}");
|
||||
let mut subscription = indexer_client
|
||||
.subscribe_to_finalized_blocks(first_pending_block_id)
|
||||
.await
|
||||
.with_context(|| {
|
||||
format!("Failed to subscribe to blocks from {first_pending_block_id}")
|
||||
})?;
|
||||
|
||||
while let Some(block) = subscription.next().await {
|
||||
let block = block.context("Failed to get next block from subscription")?;
|
||||
let block_id = block.header.block_id;
|
||||
|
||||
info!("Received new L2 block with ID {block_id}");
|
||||
debug!("Block data: {block:#?}");
|
||||
|
||||
seq_core
|
||||
.lock()
|
||||
.await
|
||||
.clean_finalized_blocks_from_db(block_id)
|
||||
.with_context(|| {
|
||||
format!("Failed to clean finalized blocks from DB for block ID {block_id}")
|
||||
})?;
|
||||
}
|
||||
|
||||
warn!(
|
||||
"Block subscription closed unexpectedly, reason: {:?}",
|
||||
subscription.close_reason()
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn main_runner() -> Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
@ -125,24 +245,12 @@ pub async fn main_runner() -> Result<()> {
|
||||
}
|
||||
|
||||
// ToDo: Add restart on failures
|
||||
let (_, _, main_loop_handle, retry_loop_handle) = startup_sequencer(app_config).await?;
|
||||
let (mut sequencer_handle, _addr) = startup_sequencer(app_config).await?;
|
||||
|
||||
info!("Sequencer running. Monitoring concurrent tasks...");
|
||||
|
||||
tokio::select! {
|
||||
res = main_loop_handle => {
|
||||
match res {
|
||||
Ok(inner_res) => warn!("Main loop exited unexpectedly: {:?}", inner_res),
|
||||
Err(e) => warn!("Main loop task panicked: {:?}", e),
|
||||
}
|
||||
}
|
||||
res = retry_loop_handle => {
|
||||
match res {
|
||||
Ok(inner_res) => warn!("Retry loop exited unexpectedly: {:?}", inner_res),
|
||||
Err(e) => warn!("Retry loop task panicked: {:?}", e),
|
||||
}
|
||||
}
|
||||
}
|
||||
let Err(err) = sequencer_handle.run_forever().await;
|
||||
error!("Sequencer failed: {err:?}");
|
||||
|
||||
info!("Shutting down sequencer...");
|
||||
|
||||
|
||||
@ -4,7 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use anyhow::{Context as _, Result};
|
||||
use common::common::BasicAuth;
|
||||
use common::config::BasicAuth;
|
||||
use key_protocol::key_management::{
|
||||
KeyChain,
|
||||
key_tree::{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user