explorer tests
This commit is contained in:
parent
e5e3d3aecb
commit
a4f403b545
|
@ -91,7 +91,7 @@ where
|
|||
)
|
||||
.route("/da/blobs", routing::post(da::blobs))
|
||||
.with_state(handle);
|
||||
|
||||
tracing::info!("Explorer listen on: {}...", self.settings.address);
|
||||
Server::bind(&self.settings.address)
|
||||
.serve(app.into_make_service())
|
||||
.await
|
||||
|
|
|
@ -141,7 +141,7 @@ fn default_storage_settings() -> <StorageService<RocksBackend<Wire>> as ServiceD
|
|||
RocksBackendSettings {
|
||||
db_path: DEFAULT_DB_PATH.into(),
|
||||
read_only: false,
|
||||
column_family: Some("blocks".into()),
|
||||
column_family: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,3 @@ mixnet-node = { path = "../mixnet/node", default-features = false }
|
|||
mixnet-client = { path = "../mixnet/client" }
|
||||
mixnet-topology = { path = "../mixnet/topology" }
|
||||
scopeguard = "1"
|
||||
|
||||
[[test]]
|
||||
name = "integration"
|
||||
path = "tests/main.rs"
|
|
@ -113,6 +113,53 @@ impl NomosChat {
|
|||
>(std::fs::File::open(&self.network_config)?)?;
|
||||
let da_protocol = self.da_protocol.clone();
|
||||
|
||||
let node_addr = Some(self.node.clone());
|
||||
|
||||
let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel();
|
||||
let (status_sender, status_updates) = std::sync::mpsc::channel();
|
||||
|
||||
let shared_writer = Arc::new(sync::Mutex::new(Vec::new()));
|
||||
let backend = SharedWriter::from_inner(shared_writer.clone());
|
||||
|
||||
std::thread::spawn(move || {
|
||||
OverwatchRunner::<DisseminateApp>::run(
|
||||
DisseminateAppServiceSettings {
|
||||
network,
|
||||
send_blob: Settings {
|
||||
payload: Arc::new(Mutex::new(payload_receiver)),
|
||||
timeout: DEFAULT_TIMEOUT,
|
||||
da_protocol,
|
||||
status_updates: status_sender,
|
||||
node_addr,
|
||||
output: None,
|
||||
},
|
||||
logger: LoggerSettings {
|
||||
backend: LoggerBackend::Writer(backend),
|
||||
level: tracing::Level::INFO,
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
None,
|
||||
)
|
||||
.unwrap()
|
||||
.wait_finished()
|
||||
});
|
||||
|
||||
if let Some(author) = self.author.as_ref() {
|
||||
let message = self
|
||||
.message
|
||||
.as_ref()
|
||||
.expect("Should be available if author is set");
|
||||
return run_once(author, message, payload_sender);
|
||||
}
|
||||
|
||||
// setup terminal
|
||||
enable_raw_mode()?;
|
||||
let mut stdout = io::stdout();
|
||||
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
|
||||
let backend = CrosstermBackend::new(stdout);
|
||||
let mut terminal = Terminal::new(backend)?;
|
||||
|
||||
let app = App {
|
||||
input: Input::default(),
|
||||
username: None,
|
||||
|
@ -122,8 +169,8 @@ impl NomosChat {
|
|||
last_updated: Instant::now(),
|
||||
payload_sender,
|
||||
status_updates,
|
||||
node: self.node.clone(),
|
||||
explorer: self.explorer.clone(),
|
||||
node: self.node.clone(),
|
||||
logs: shared_writer,
|
||||
scroll_logs: 0,
|
||||
};
|
||||
|
@ -185,21 +232,6 @@ impl NomosChat {
|
|||
.wait_finished()
|
||||
});
|
||||
|
||||
if let Some(author) = self.author.as_ref() {
|
||||
let message = self
|
||||
.message
|
||||
.as_ref()
|
||||
.expect("Should be available if author is set");
|
||||
return run_once(author, message, payload_sender);
|
||||
}
|
||||
|
||||
// setup terminal
|
||||
enable_raw_mode()?;
|
||||
let mut stdout = io::stdout();
|
||||
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
|
||||
let backend = CrosstermBackend::new(stdout);
|
||||
let mut terminal = Terminal::new(backend)?;
|
||||
|
||||
let app = App {
|
||||
input: Input::default(),
|
||||
username: Some(username),
|
||||
|
|
|
@ -82,24 +82,36 @@ impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for RocksBack
|
|||
(true, None) => {
|
||||
let mut opts = Options::default();
|
||||
opts.create_if_missing(false);
|
||||
DB::open_for_read_only(&opts, db_path, false)?
|
||||
DB::open_for_read_only(&opts, &db_path, false).map_err(|e| {
|
||||
tracing::error!(err=%e, "rocks storage: fail to open {}", db_path.display());
|
||||
e
|
||||
})?
|
||||
}
|
||||
(true, Some(cf)) => {
|
||||
let mut opts = Options::default();
|
||||
opts.create_if_missing(false);
|
||||
DB::open_cf_for_read_only(&opts, db_path, [cf], false)?
|
||||
DB::open_cf_for_read_only(&opts, &db_path, [cf], false).map_err(|e| {
|
||||
tracing::error!(err=%e, "rocks storage: fail to open {}", db_path.display());
|
||||
e
|
||||
})?
|
||||
}
|
||||
(false, None) => {
|
||||
let mut opts = Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.create_missing_column_families(true);
|
||||
DB::open(&opts, db_path)?
|
||||
DB::open(&opts, &db_path).map_err(|e| {
|
||||
tracing::error!(err=%e, "rocks storage: fail to open {}", db_path.display());
|
||||
e
|
||||
})?
|
||||
}
|
||||
(false, Some(cf)) => {
|
||||
let mut opts = Options::default();
|
||||
opts.create_if_missing(true);
|
||||
opts.create_missing_column_families(true);
|
||||
DB::open_cf(&opts, db_path, [cf])?
|
||||
DB::open_cf(&opts, &db_path, [cf]).map_err(|e| {
|
||||
tracing::error!(err=%e, "rocks storage: fail to open {}", db_path.display());
|
||||
e
|
||||
})?
|
||||
}
|
||||
};
|
||||
|
||||
|
|
|
@ -33,6 +33,7 @@ serde = { version = "1", features = ["derive"] }
|
|||
serde_yaml = "0.9"
|
||||
serde_json = "1.0"
|
||||
tokio = "1"
|
||||
tracing = "0.1"
|
||||
futures = "0.3"
|
||||
async-trait = "0.1"
|
||||
fraction = "0.13"
|
||||
|
@ -56,6 +57,10 @@ path = "src/tests/mixnet.rs"
|
|||
name = "test_cli"
|
||||
path = "src/tests/cli.rs"
|
||||
|
||||
[[test]]
|
||||
name = "test_explorer"
|
||||
path = "src/tests/explorer.rs"
|
||||
|
||||
[[bench]]
|
||||
name = "mixnet"
|
||||
path = "src/benches/mixnet.rs"
|
||||
|
|
|
@ -91,6 +91,10 @@ impl MixNode {
|
|||
|
||||
let mut nodes = Vec::<MixNode>::new();
|
||||
for config in &configs {
|
||||
println!(
|
||||
"mixnode client {} mixnode listener {}",
|
||||
config.client_listen_address, config.listen_address
|
||||
);
|
||||
nodes.push(Self::spawn(*config).await);
|
||||
}
|
||||
|
||||
|
|
|
@ -72,7 +72,7 @@ impl NomosNode {
|
|||
prefix: Some(LOGS_PREFIX.into()),
|
||||
};
|
||||
config.log.format = LoggerFormat::Json;
|
||||
|
||||
config.storage.db_path = dir.path().join("db");
|
||||
serde_yaml::to_writer(&mut file, &config).unwrap();
|
||||
let child = Command::new(std::env::current_dir().unwrap().join(NOMOS_BIN))
|
||||
.arg(&config_path)
|
||||
|
@ -366,11 +366,15 @@ fn create_node_config(
|
|||
p
|
||||
},
|
||||
read_only: false,
|
||||
column_family: Some("blocks".into()),
|
||||
column_family: None,
|
||||
},
|
||||
};
|
||||
|
||||
config.network.backend.inner.port = get_available_port();
|
||||
println!(
|
||||
"config.network.backend.inner.port {}",
|
||||
config.network.backend.inner.port
|
||||
);
|
||||
|
||||
config
|
||||
}
|
||||
|
|
|
@ -1,11 +1,15 @@
|
|||
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
|
||||
use futures::{stream, StreamExt};
|
||||
use nomos_cli::{
|
||||
api::da::get_blobs,
|
||||
cmds::disseminate::Disseminate,
|
||||
da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
|
||||
};
|
||||
use nomos_core::block::Block;
|
||||
use nomos_core::da::{blob::Blob as _, DaProtocol};
|
||||
use std::{io::Write, net::SocketAddr, path::PathBuf, time::Duration};
|
||||
use nomos_network::{backends::libp2p::Libp2p, NetworkConfig};
|
||||
use nomos_node::Tx;
|
||||
use std::{collections::HashSet, io::Write, net::SocketAddr, path::PathBuf, time::Duration};
|
||||
use tempfile::NamedTempFile;
|
||||
use tests::{
|
||||
adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig,
|
||||
|
@ -35,24 +39,38 @@ fn run_disseminate(disseminate: &Disseminate) {
|
|||
}
|
||||
|
||||
async fn spawn_and_setup_config(
|
||||
num_mixnodes: usize,
|
||||
num_nodes: usize,
|
||||
file: &mut NamedTempFile,
|
||||
config: &mut Disseminate,
|
||||
network_config: Option<NetworkConfig<Libp2p>>,
|
||||
storage_dir: Option<PathBuf>,
|
||||
) -> (
|
||||
Vec<MixNode>,
|
||||
Vec<NomosNode>,
|
||||
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
|
||||
) {
|
||||
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await;
|
||||
let mut nodes =
|
||||
NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), storage_dir).await;
|
||||
let (mixnodes, mixnet_config) = MixNode::spawn_nodes(num_mixnodes).await;
|
||||
let mut nodes = NomosNode::spawn_nodes(
|
||||
SpawnConfig::chain_happy(num_nodes, mixnet_config),
|
||||
storage_dir,
|
||||
)
|
||||
.await;
|
||||
|
||||
// kill the node so that we can reuse its network config
|
||||
nodes[1].stop();
|
||||
let network_config = if let Some(nc) = network_config {
|
||||
nc
|
||||
} else {
|
||||
// kill the node so that we can reuse its network config
|
||||
nodes[1].stop();
|
||||
|
||||
let mut network_config = nodes[1].config().network.clone();
|
||||
// use a new port because the old port is sometimes not closed immediately
|
||||
network_config.backend.inner.port = get_available_port();
|
||||
network_config
|
||||
};
|
||||
|
||||
let mut network_config = nodes[1].config().network.clone();
|
||||
// use a new port because the old port is sometimes not closed immediately
|
||||
network_config.backend.inner.port = get_available_port();
|
||||
let config_path = file.path().to_owned();
|
||||
println!("network config {:?}", network_config);
|
||||
serde_yaml::to_writer(file, &network_config).unwrap();
|
||||
let da_protocol = DaProtocolChoice {
|
||||
da_protocol: Protocol::FullReplication,
|
||||
|
@ -80,14 +98,13 @@ async fn spawn_and_setup_config(
|
|||
.unwrap(),
|
||||
);
|
||||
|
||||
(nodes, da)
|
||||
(mixnodes, nodes, da)
|
||||
}
|
||||
|
||||
async fn disseminate(config: &mut Disseminate) {
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
let (nodes, da) = spawn_and_setup_config(&mut file, config, None).await;
|
||||
let (_mixnodes, nodes, da) = spawn_and_setup_config(2, 2, &mut file, config, None, None).await;
|
||||
run_disseminate(config);
|
||||
// let thread = std::thread::spawn(move || cmd.run().unwrap());
|
||||
|
||||
tokio::time::timeout(
|
||||
adjust_timeout(Duration::from_secs(TIMEOUT_SECS)),
|
||||
|
@ -145,84 +162,6 @@ async fn disseminate_blob_from_file() {
|
|||
disseminate(&mut config).await;
|
||||
}
|
||||
|
||||
fn run_explorer(explorer_api_addr: SocketAddr, db_path: PathBuf) {
|
||||
let cfg = nomos_explorer::Config {
|
||||
log: Default::default(),
|
||||
api: nomos_api::ApiServiceSettings {
|
||||
backend_settings: nomos_explorer::AxumBackendSettings {
|
||||
address: explorer_api_addr,
|
||||
cors_origins: Vec::new(),
|
||||
},
|
||||
},
|
||||
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
|
||||
db_path,
|
||||
read_only: true,
|
||||
column_family: Some("blocks".into()),
|
||||
},
|
||||
};
|
||||
std::thread::spawn(move || nomos_explorer::Explorer::run(cfg).unwrap());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn explorer() {
|
||||
let (tx, rx) = std::sync::mpsc::channel();
|
||||
let tx1 = tx.clone();
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let db_path = temp.path().to_path_buf();
|
||||
|
||||
std::thread::spawn(move || {
|
||||
tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(async move {
|
||||
let mut file = NamedTempFile::new().unwrap();
|
||||
let mut config = Disseminate::default();
|
||||
let (nodes, _) =
|
||||
spawn_and_setup_config(&mut file, &mut config, Some(db_path)).await;
|
||||
let explorer_db = nodes[0].config().storage.db_path.clone();
|
||||
|
||||
let explorer_api_addr = format!("127.0.0.1:{}", get_available_port())
|
||||
.parse()
|
||||
.unwrap();
|
||||
run_explorer(explorer_api_addr, explorer_db);
|
||||
|
||||
let c = nomos_cli::cmds::chat::NomosChat {
|
||||
network_config: config.network_config.clone(),
|
||||
da_protocol: config.da_protocol.clone(),
|
||||
node: config.node_addr.clone().unwrap(),
|
||||
explorer: format!("http://{}", explorer_api_addr).parse().unwrap(),
|
||||
message: None,
|
||||
author: None,
|
||||
};
|
||||
let c1 = c.clone();
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("user1".into())
|
||||
.spawn(move || {
|
||||
let (rx1, app1) = c.run_app_without_terminal("user1".into()).unwrap();
|
||||
app1.send_message("Hello from user1".into());
|
||||
let msgs1 = rx1.recv().unwrap();
|
||||
assert!(!msgs1.is_empty());
|
||||
tx1.send(()).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
std::thread::Builder::new()
|
||||
.name("user2".into())
|
||||
.spawn(move || {
|
||||
let (rx2, app2) = c1.run_app_without_terminal("user2".into()).unwrap();
|
||||
app2.send_message("Hello from user2".into());
|
||||
let msgs2 = rx2.recv().unwrap();
|
||||
assert!(!msgs2.is_empty());
|
||||
tx.send(()).unwrap();
|
||||
})
|
||||
.unwrap();
|
||||
});
|
||||
});
|
||||
|
||||
assert!(rx.recv().is_ok());
|
||||
assert!(rx.recv().is_ok());
|
||||
}
|
||||
|
||||
async fn wait_for_cert_in_mempool(node: &NomosNode) {
|
||||
loop {
|
||||
if node
|
||||
|
|
|
@ -0,0 +1,120 @@
|
|||
use futures::{stream, StreamExt};
|
||||
use nomos_core::block::Block;
|
||||
use nomos_node::Tx;
|
||||
use std::{collections::HashSet, net::SocketAddr, path::PathBuf, time::Duration};
|
||||
use tests::{get_available_port, MixNode, Node, NomosNode, SpawnConfig};
|
||||
|
||||
fn run_explorer(explorer_api_addr: SocketAddr, db_path: PathBuf) {
|
||||
let cfg = nomos_explorer::Config {
|
||||
log: Default::default(),
|
||||
api: nomos_api::ApiServiceSettings {
|
||||
backend_settings: nomos_explorer::AxumBackendSettings {
|
||||
address: explorer_api_addr,
|
||||
cors_origins: Vec::new(),
|
||||
},
|
||||
},
|
||||
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
|
||||
db_path,
|
||||
read_only: true,
|
||||
column_family: None,
|
||||
},
|
||||
};
|
||||
std::thread::spawn(move || nomos_explorer::Explorer::run(cfg).unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn explorer_depth() {
|
||||
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await;
|
||||
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), None).await;
|
||||
let explorer_db = nodes[0].config().storage.db_path.clone();
|
||||
|
||||
// wait for nodes to create blocks
|
||||
let explorer_api_addr: SocketAddr = format!("127.0.0.1:{}", get_available_port())
|
||||
.parse()
|
||||
.unwrap();
|
||||
let explorer_api_addr1: reqwest::Url =
|
||||
format!("http://{}/explorer/blocks/depth", explorer_api_addr)
|
||||
.parse()
|
||||
.unwrap();
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
std::thread::spawn(move || {
|
||||
tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(async move {
|
||||
run_explorer(explorer_api_addr, explorer_db);
|
||||
});
|
||||
});
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
let infos = stream::iter(nodes.iter())
|
||||
.then(|n| async move { n.get_blocks_info(None, None).await })
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
let blocks = infos
|
||||
.iter()
|
||||
.map(|i| i.iter().find(|b| b.view == 20.into()).unwrap())
|
||||
.collect::<HashSet<_>>();
|
||||
|
||||
for block in blocks.iter() {
|
||||
let resp = reqwest::Client::new()
|
||||
.get(explorer_api_addr1.clone())
|
||||
.query(&[("from", block.id.to_string()), ("depth", 10.to_string())])
|
||||
.header("Content-Type", "application/json")
|
||||
.send()
|
||||
.await
|
||||
.unwrap()
|
||||
.json::<Vec<Block<Tx, full_replication::Certificate>>>()
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(resp.len(), 10);
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn explorer() {
|
||||
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await;
|
||||
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), None).await;
|
||||
let explorer_db = nodes[0].config().storage.db_path.clone();
|
||||
|
||||
// wait for nodes to create blocks
|
||||
let explorer_api_addr: SocketAddr = format!("127.0.0.1:{}", get_available_port())
|
||||
.parse()
|
||||
.unwrap();
|
||||
let explorer_api_addr1: reqwest::Url = format!("http://{}/explorer/blocks", explorer_api_addr)
|
||||
.parse()
|
||||
.unwrap();
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
std::thread::spawn(move || {
|
||||
tokio::runtime::Runtime::new()
|
||||
.unwrap()
|
||||
.block_on(async move {
|
||||
run_explorer(explorer_api_addr, explorer_db);
|
||||
});
|
||||
});
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
|
||||
let infos = stream::iter(nodes.iter())
|
||||
.then(|n| async move { n.get_blocks_info(None, None).await })
|
||||
.collect::<Vec<_>>()
|
||||
.await;
|
||||
let blocks = infos
|
||||
.iter()
|
||||
.map(|i| i.iter().find(|b| b.view == 20.into()).unwrap())
|
||||
.collect::<HashSet<_>>();
|
||||
for block in blocks.iter() {
|
||||
let from = block.parent().to_string();
|
||||
let to = block.id.to_string();
|
||||
|
||||
let resp = reqwest::Client::new()
|
||||
.get(explorer_api_addr1.clone())
|
||||
.query(&[("from", from), ("to", to)])
|
||||
.header("Content-Type", "application/json")
|
||||
.send()
|
||||
.await
|
||||
.unwrap()
|
||||
.json::<Vec<Block<Tx, full_replication::Certificate>>>()
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(!resp.is_empty());
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue