From bd750f0ae688b6a29e1db539e55a0304777eb434 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 5 Feb 2024 02:46:00 +0800 Subject: [PATCH] try to fix blocking test --- .cargo/config.toml | 5 +- ci/Jenkinsfile.nightly.fuzzy | 21 +- ci/Jenkinsfile.nightly.integration | 24 +- ci/discord.groovy | 32 -- compose.static.yml | 35 ++ .../src/overlay/tree_overlay/overlay.rs | 9 +- nodes/explorer/Cargo.toml | 2 +- nodes/explorer/src/api_backend/da.rs | 16 + nodes/explorer/src/api_backend/mod.rs | 6 +- nodes/explorer/src/api_backend/store.rs | 31 -- nodes/explorer/src/config.rs | 3 + nodes/explorer/src/lib.rs | 39 +++ nodes/explorer/src/main.rs | 40 +-- nodes/nomos-node/Cargo.toml | 2 +- nodes/nomos-node/config.yaml | 4 +- nodes/nomos-node/src/config.rs | 17 + nodes/nomos-node/src/lib.rs | 71 +++- nodes/nomos-node/src/main.rs | 73 +--- nomos-cli/Cargo.toml | 22 +- nomos-cli/src/api/consensus.rs | 18 - nomos-cli/src/api/storage.rs | 8 +- nomos-cli/src/cmds/chat/mod.rs | 199 +++++++++-- nomos-cli/src/cmds/disseminate/mod.rs | 19 +- nomos-cli/src/da/disseminate.rs | 7 +- nomos-cli/src/da/retrieve.rs | 15 +- nomos-cli/test.config.chat.user1.yaml | 45 +++ nomos-cli/test.config.chat.user2.yaml | 45 +++ nomos-cli/tests/main.rs | 329 ++++++++++++++++++ nomos-services/api/Cargo.toml | 2 +- nomos-services/api/src/http/consensus.rs | 4 +- nomos-services/api/src/http/storage.rs | 4 +- .../src/network/adapters/mock.rs | 2 +- nomos-services/log/src/lib.rs | 12 +- nomos-services/storage/Cargo.toml | 7 + nomos-services/storage/src/backends/mod.rs | 3 + .../storage/src/backends/rocksdb.rs | 250 +++++++++++++ nomos-services/storage/src/backends/sled.rs | 2 +- nomos-services/storage/src/bin/rocks.rs | 58 +++ shell.nix | 1 + testnet/Dockerfile | 10 +- testnet/cli_config.yaml | 44 +++ testnet/explorer_config.yaml | 10 + testnet/monitoring/grafana/datasources.yaml | 11 + testnet/monitoring/grafana/grafana.ini | 51 +++ testnet/monitoring/grafana/plugins.env | 1 + testnet/monitoring/prometheus-config.yml | 14 + testnet/scripts/run_bootstrap_node.sh | 2 +- testnet/scripts/run_nomos_bot.sh | 9 + testnet/scripts/run_nomos_node.sh | 2 +- tests/Cargo.toml | 2 + tests/src/lib.rs | 3 +- tests/src/nodes/mixnode.rs | 27 +- tests/src/nodes/mod.rs | 23 ++ tests/src/nodes/nomos.rs | 51 ++- tests/src/tests/cli.rs | 224 ++++++++++-- tests/src/tests/happy.rs | 6 +- tests/src/tests/unhappy.rs | 17 +- 57 files changed, 1630 insertions(+), 359 deletions(-) delete mode 100644 ci/discord.groovy create mode 100644 nodes/explorer/src/api_backend/da.rs create mode 100644 nodes/explorer/src/lib.rs create mode 100644 nomos-cli/test.config.chat.user1.yaml create mode 100644 nomos-cli/test.config.chat.user2.yaml create mode 100644 nomos-cli/tests/main.rs create mode 100644 nomos-services/storage/src/backends/rocksdb.rs create mode 100644 nomos-services/storage/src/bin/rocks.rs create mode 100644 testnet/cli_config.yaml create mode 100644 testnet/explorer_config.yaml create mode 100644 testnet/monitoring/grafana/datasources.yaml create mode 100644 testnet/monitoring/grafana/grafana.ini create mode 100644 testnet/monitoring/grafana/plugins.env create mode 100644 testnet/monitoring/prometheus-config.yml create mode 100755 testnet/scripts/run_nomos_bot.sh diff --git a/.cargo/config.toml b/.cargo/config.toml index b915cdf8..8d771484 100644 --- a/.cargo/config.toml +++ b/.cargo/config.toml @@ -1,4 +1,7 @@ [target.'cfg(target_os = "macos")'] # when using osx, we need to link against some golang libraries, it did just work with this missing flags # from: https://github.com/golang/go/issues/42459 -rustflags = ["-C", "link-args=-framework CoreFoundation -framework Security -framework CoreServices -lresolv"] \ No newline at end of file +rustflags = [ + "-C", + "link-args=-framework CoreFoundation -framework Security -framework CoreServices -lresolv", +] diff --git a/ci/Jenkinsfile.nightly.fuzzy b/ci/Jenkinsfile.nightly.fuzzy index d1720d55..2331754e 100644 --- a/ci/Jenkinsfile.nightly.fuzzy +++ b/ci/Jenkinsfile.nightly.fuzzy @@ -1,3 +1,6 @@ +#!/usr/bin/env groovy +library 'status-jenkins-lib@v1.8.6' + pipeline { agent { dockerfile { @@ -43,16 +46,16 @@ pipeline { } post { - failure { + always { script { - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: 'Nightly Fuzztest Failed. Regression files archived as job artifacts') - } - } - success { - script { - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: 'Nightly Fuzztest Passed') + discord.send( + header: ( + currentBuild.currentResult == 'SUCCESS' ? + 'Nightly Fuzztest Passed' : + 'Nightly Fuzztest Failed. Regression files archived as job artifacts' + ), + cred: 'nomos-node-discord-commits-webhook', + ) } } cleanup { cleanWs() } diff --git a/ci/Jenkinsfile.nightly.integration b/ci/Jenkinsfile.nightly.integration index 214a4ebb..9dff6b1a 100644 --- a/ci/Jenkinsfile.nightly.integration +++ b/ci/Jenkinsfile.nightly.integration @@ -1,3 +1,6 @@ +#!/usr/bin/env groovy +library 'status-jenkins-lib@v1.8.6' + pipeline { agent { dockerfile { @@ -67,20 +70,13 @@ pipeline { } post { - failure { - script { - def report = readFile("${WORKSPACE}/report.txt").trim() - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: "Nightly Integration Tests Failed: ${report}") - } - } - success { - script { - def report = readFile('report.txt').trim() - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: "Nightly Integration Tests Passed: ${report}") - } - } + always { script { + def report = readFile("${WORKSPACE}/report.txt").trim() + discord.send( + header: "Nightly Integration Tests ${currentBuild.currentResult}: ${report}", + cred: 'nomos-node-discord-commits-webhook', + ) + } } cleanup { cleanWs() } } } diff --git a/ci/discord.groovy b/ci/discord.groovy deleted file mode 100644 index dd43d09d..00000000 --- a/ci/discord.groovy +++ /dev/null @@ -1,32 +0,0 @@ -def sendMessage(Map args=[:]) { - def opts = [ - header: args.header ?: 'Build succeeded', - title: args.title ?: "${env.JOB_NAME}#${env.BUILD_NUMBER}", - cred: args.cred ?: 'nomos-node-discord-commits-webhook', - ] - def repo = [ - url: GIT_URL.minus('.git'), - branch: GIT_BRANCH.minus('origin/'), - commit: GIT_COMMIT.take(8), - ] - withCredentials([ - string( - credentialsId: opts.cred, - variable: 'DISCORD_WEBHOOK', - ), - ]) { - discordSend( - link: env.BUILD_URL, - result: currentBuild.currentResult, - webhookURL: env.DISCORD_WEBHOOK, - title: opts.title, - description: """ - ${opts.header} - Branch: [`${repo.branch}`](${repo.url}/commits/${repo.branch}) - Commit: [`${repo.commit}`](${repo.url}/commit/${repo.commit}) - """, - ) - } -} - -return this diff --git a/compose.static.yml b/compose.static.yml index 1f88299f..fb17ec89 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -139,6 +139,15 @@ services: entrypoint: /usr/bin/mixnode command: /etc/nomos/mixnode_config.yaml + chatbot: + container_name: chatbot + build: + context: . + dockerfile: testnet/Dockerfile + volumes: + - ./testnet:/etc/nomos + entrypoint: /etc/nomos/scripts/run_nomos_bot.sh + etcd: container_name: etcd image: quay.io/coreos/etcd:v3.4.15 @@ -148,3 +157,29 @@ services: - /usr/local/bin/etcd - --advertise-client-urls=http://etcd:2379 - --listen-client-urls=http://0.0.0.0:2379 + + prometheus: + container_name: prometheus + image: prom/prometheus:latest + volumes: + - ./testnet/monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:z + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.retention.time=7d + ports: + - 127.0.0.1:9090:9090 + restart: on-failure + + grafana: + container_name: grafana + image: grafana/grafana:latest + env_file: + - ./testnet/monitoring/grafana/plugins.env + volumes: + - ./testnet/monitoring/grafana/grafana.ini:/etc/grafana/grafana.ini:z + - ./testnet/monitoring/grafana/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:z + ports: + - 9091:3000 + restart: on-failure + depends_on: + - prometheus diff --git a/consensus/carnot-engine/src/overlay/tree_overlay/overlay.rs b/consensus/carnot-engine/src/overlay/tree_overlay/overlay.rs index 8155238a..0854ce37 100644 --- a/consensus/carnot-engine/src/overlay/tree_overlay/overlay.rs +++ b/consensus/carnot-engine/src/overlay/tree_overlay/overlay.rs @@ -146,10 +146,15 @@ where if self.is_member_of_leaf_committee(id) { return 0; } - self.carnot_tree + println!("{:?}", self.carnot_tree); + match self + .carnot_tree .committee_by_member_id(&id) .map(|c| apply_threshold(c.len(), self.threshold)) - .expect("node is not part of any committee") + { + Some(threshold) => threshold, + None => panic!("node {id} is not part of any committee"), + } } // TODO: Carnot node in sim does not send votes to the next leader from the child committee of diff --git a/nodes/explorer/Cargo.toml b/nodes/explorer/Cargo.toml index a1eefda2..806a2ccd 100644 --- a/nodes/explorer/Cargo.toml +++ b/nodes/explorer/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "explorer" +name = "nomos-explorer" version = "0.1.0" edition = "2021" diff --git a/nodes/explorer/src/api_backend/da.rs b/nodes/explorer/src/api_backend/da.rs new file mode 100644 index 00000000..b9f70ff0 --- /dev/null +++ b/nodes/explorer/src/api_backend/da.rs @@ -0,0 +1,16 @@ +// crates +use axum::extract::{Json, State}; +use axum::response::Response; +use nomos_node::make_request_and_return_response; +// internal +use full_replication::Blob; +use nomos_api::http::da::da_blobs; +use nomos_core::da::blob; +use overwatch_rs::overwatch::handle::OverwatchHandle; + +pub(crate) async fn blobs( + State(handle): State, + Json(items): Json::Hash>>, +) -> Response { + make_request_and_return_response!(da_blobs(&handle, items)) +} diff --git a/nodes/explorer/src/api_backend/mod.rs b/nodes/explorer/src/api_backend/mod.rs index 0a384e5d..d631d2d1 100644 --- a/nodes/explorer/src/api_backend/mod.rs +++ b/nodes/explorer/src/api_backend/mod.rs @@ -1,3 +1,4 @@ +mod da; mod store; use std::{fmt::Debug, hash::Hash}; @@ -11,11 +12,10 @@ use tower_http::{ trace::TraceLayer, }; +use nomos_api::Backend; use nomos_core::tx::Transaction; use nomos_storage::backends::StorageSerde; -use nomos_api::Backend; - #[derive(Debug, Clone, serde::Deserialize, serde::Serialize)] pub struct AxumBackendSettings { /// Socket where the server will be listening on for incoming requests. @@ -84,12 +84,12 @@ where ) .layer(TraceLayer::new_for_http()) // .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi())) - .route("/store/blocks", routing::get(store::store_blocks::)) .route("/explorer/blocks", routing::get(store::blocks::)) .route( "/explorer/blocks/depth", routing::get(store::block_depth::), ) + .route("/da/blobs", routing::post(da::blobs)) .with_state(handle); Server::bind(&self.settings.address) diff --git a/nodes/explorer/src/api_backend/store.rs b/nodes/explorer/src/api_backend/store.rs index de2cbd1b..5f88d742 100644 --- a/nodes/explorer/src/api_backend/store.rs +++ b/nodes/explorer/src/api_backend/store.rs @@ -13,40 +13,9 @@ use full_replication::Certificate; use nomos_api::http::storage; use nomos_core::block::{Block, BlockId}; use nomos_core::tx::Transaction; -use nomos_node::make_request_and_return_response; use nomos_storage::backends::StorageSerde; use overwatch_rs::overwatch::handle::OverwatchHandle; -#[derive(Deserialize)] -pub(crate) struct QueryParams { - blocks: Vec, -} -pub(crate) async fn store_blocks( - State(store): State, - Query(query): Query, -) -> Response -where - Tx: Transaction - + Clone - + Debug - + Eq - + Hash - + Serialize - + DeserializeOwned - + Send - + Sync - + 'static, - ::Hash: std::cmp::Ord + Debug + Send + Sync + 'static, - S: StorageSerde + Send + Sync + 'static, -{ - let QueryParams { blocks } = query; - let results: Vec<_> = blocks - .into_iter() - .map(|id| storage::block_req::(&store, id)) - .collect(); - make_request_and_return_response!(futures::future::try_join_all(results)) -} - #[derive(Deserialize)] pub(crate) struct BlocksByIdQueryParams { from: BlockId, diff --git a/nodes/explorer/src/config.rs b/nodes/explorer/src/config.rs index b10cb7b1..00edd45b 100644 --- a/nodes/explorer/src/config.rs +++ b/nodes/explorer/src/config.rs @@ -5,6 +5,8 @@ use nomos_api::ApiService; use nomos_log::{Logger, LoggerBackend, LoggerFormat}; use nomos_node::config::LoggerBackendType; use nomos_node::{HttpArgs, LogArgs, Tx, Wire}; +use nomos_storage::backends::rocksdb::RocksBackend; +use nomos_storage::StorageService; use overwatch_rs::services::ServiceData; use serde::{Deserialize, Serialize}; use tracing::Level; @@ -15,6 +17,7 @@ pub type ApiArgs = HttpArgs; pub struct Config { pub log: ::Settings, pub api: > as ServiceData>::Settings, + pub storage: > as ServiceData>::Settings, } impl Config { diff --git a/nodes/explorer/src/lib.rs b/nodes/explorer/src/lib.rs new file mode 100644 index 00000000..b5bc162d --- /dev/null +++ b/nodes/explorer/src/lib.rs @@ -0,0 +1,39 @@ +pub use crate::api_backend::{AxumBackend, AxumBackendSettings}; +pub use crate::config::{ApiArgs, Config}; + +use eyre::{eyre, Result}; + +use overwatch_rs::overwatch::OverwatchRunner; +use overwatch_rs::services::handle::ServiceHandle; +use overwatch_rs::Services; + +use nomos_api::ApiService; +use nomos_log::Logger; +use nomos_node::{Tx, Wire}; +use nomos_storage::{backends::rocksdb::RocksBackend, StorageService}; + +mod api_backend; +mod config; + +#[derive(Services)] +pub struct Explorer { + pub log: ServiceHandle, + pub storage: ServiceHandle>>, + pub api: ServiceHandle>>, +} + +impl Explorer { + pub fn run(config: Config) -> Result<()> { + let app = OverwatchRunner::::run( + ExplorerServiceSettings { + log: config.log, + storage: config.storage, + api: config.api, + }, + None, + ) + .map_err(|e| eyre!("Error encountered: {}", e))?; + app.wait_finished(); + Ok(()) + } +} diff --git a/nodes/explorer/src/main.rs b/nodes/explorer/src/main.rs index 8dd3b66b..991f1491 100644 --- a/nodes/explorer/src/main.rs +++ b/nodes/explorer/src/main.rs @@ -1,28 +1,8 @@ -mod api_backend; -mod config; - use clap::Parser; -use eyre::{eyre, Result}; -use nomos_api::ApiService; -use overwatch_rs::overwatch::OverwatchRunner; -use overwatch_rs::services::handle::ServiceHandle; -use overwatch_rs::Services; +use eyre::Result; -use crate::api_backend::AxumBackend; -use crate::config::{ApiArgs, Config}; -use nomos_log::Logger; -use nomos_node::{LogArgs, Tx, Wire}; -use nomos_storage::backends::sled::SledBackendSettings; -use nomos_storage::{backends::sled::SledBackend, StorageService}; - -#[derive(Services)] -struct Explorer { - log: ServiceHandle, - storage: ServiceHandle>>, - api: ServiceHandle>>, -} - -const DEFAULT_DB_PATH: &str = "./db"; +use nomos_explorer::{ApiArgs, Config, Explorer}; +use nomos_node::LogArgs; #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] @@ -48,17 +28,5 @@ fn main() -> Result<()> { .update_api(api_args)? .update_log(log_args)?; - let app = OverwatchRunner::::run( - ExplorerServiceSettings { - log: config.log, - storage: SledBackendSettings { - db_path: DEFAULT_DB_PATH.into(), - }, - api: config.api, - }, - None, - ) - .map_err(|e| eyre!("Error encountered: {}", e))?; - app.wait_finished(); - Ok(()) + Explorer::run(config) } diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 87ca213f..d2590f42 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -27,7 +27,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "li nomos-metrics = { path = "../../nomos-metrics" } nomos-http = { path = "../../nomos-services/http", features = ["http"] } carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = ["libp2p"] } -nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } +nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-da = { path = "../../nomos-services/data-availability", features = ["libp2p"] } nomos-system-sig = { path = "../../nomos-services/system-sig" } diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index 808dcf98..f63aca7c 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -6,9 +6,9 @@ consensus: private_key: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] fountain_settings: null overlay_settings: - nodes: [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]] + nodes: ["0000000000000000000000000000000000000000000000000000000000000000", "0000000000000000000000000000000000000000000000000000000000000001"] number_of_committees: 1 - current_leader: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + current_leader: 0000000000000000000000000000000000000000000000000000000000000000 leader: cur: 0 committee_membership: !Sad diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 2425403e..cd8c88f6 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -15,6 +15,10 @@ use nomos_libp2p::{secp256k1::SecretKey, Multiaddr}; use nomos_log::{Logger, LoggerBackend, LoggerFormat}; use nomos_network::backends::libp2p::Libp2p; use nomos_network::NetworkService; +use nomos_storage::{ + backends::rocksdb::{RocksBackend, RocksBackendSettings}, + StorageService, +}; use overwatch_rs::services::ServiceData; use serde::{Deserialize, Serialize}; use tracing::Level; @@ -126,6 +130,19 @@ pub struct Config { pub http: > as ServiceData>::Settings, pub consensus: ::Settings, pub da: ::Settings, + + #[serde(default = "default_storage_settings")] + pub storage: > as ServiceData>::Settings, +} + +const DEFAULT_DB_PATH: &str = "./db"; + +fn default_storage_settings() -> > as ServiceData>::Settings { + RocksBackendSettings { + db_path: DEFAULT_DB_PATH.into(), + read_only: false, + column_family: Some("blocks".into()), + } } impl Config { diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index d8727ddc..3b2dbaa6 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -7,8 +7,6 @@ use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay}; use color_eyre::eyre::Result; use full_replication::Certificate; use full_replication::{AbsoluteNumber, Attestation, Blob, FullReplication}; -#[cfg(feature = "metrics")] -use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService}; use api::AxumBackend; use bytes::Bytes; @@ -33,7 +31,7 @@ use nomos_mempool::{ use nomos_metrics::Metrics; use nomos_network::backends::libp2p::Libp2p; use nomos_storage::{ - backends::{sled::SledBackend, StorageSerde}, + backends::{rocksdb::RocksBackend, StorageSerde}, StorageService, }; @@ -68,7 +66,7 @@ pub type Carnot = CarnotConsensus< TreeOverlay, FillSizeWithTx, FillSizeWithBlobsCertificate, - SledBackend, + RocksBackend, >; pub type DataAvailability = DataAvailabilityService< @@ -94,7 +92,7 @@ pub struct Nomos { consensus: ServiceHandle, http: ServiceHandle>>, da: ServiceHandle, - storage: ServiceHandle>>, + storage: ServiceHandle>>, #[cfg(feature = "metrics")] metrics: ServiceHandle, system_sig: ServiceHandle, @@ -113,3 +111,66 @@ impl StorageSerde for Wire { wire::deserialize(&buff) } } + +pub fn run( + config: Config, + #[cfg(feature = "metrics")] metrics_args: MetricsArgs, +) -> color_eyre::Result<()> { + use color_eyre::eyre::eyre; + + use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; + #[cfg(feature = "metrics")] + use nomos_metrics::MetricsSettings; + use overwatch_rs::overwatch::*; + + #[cfg(feature = "metrics")] + let registry = cfg!(feature = "metrics") + .then(|| { + metrics_args + .with_metrics + .then(nomos_metrics::NomosRegistry::default) + }) + .flatten(); + + #[cfg(not(feature = "metrics"))] + let registry = None; + + let app = OverwatchRunner::::run( + NomosServiceSettings { + network: config.network, + logging: config.log, + http: config.http, + cl_mempool: nomos_mempool::Settings { + backend: (), + network: AdapterSettings { + topic: String::from(crate::CL_TOPIC), + id: ::hash, + }, + registry: registry.clone(), + }, + da_mempool: nomos_mempool::Settings { + backend: (), + network: AdapterSettings { + topic: String::from(crate::DA_TOPIC), + id: cert_id, + }, + registry: registry.clone(), + }, + consensus: config.consensus, + #[cfg(feature = "metrics")] + metrics: MetricsSettings { registry }, + da: config.da, + storage: config.storage, + system_sig: (), + }, + None, + ) + .map_err(|e| eyre!("Error encountered: {}", e))?; + app.wait_finished(); + Ok(()) +} + +fn cert_id(cert: &Certificate) -> ::Hash { + use certificate::Certificate; + cert.hash() +} diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 25bc5787..1d1a1a6c 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -1,24 +1,9 @@ -use full_replication::{Blob, Certificate}; -#[cfg(feature = "metrics")] -use nomos_metrics::MetricsSettings; -use nomos_node::{ - Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos, - NomosServiceSettings, OverlayArgs, Tx, -}; - use clap::Parser; -use color_eyre::eyre::{eyre, Result}; -use nomos_core::{ - da::{blob, certificate}, - tx::Transaction, +use color_eyre::eyre::Result; +use nomos_node::{ + Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, OverlayArgs, }; -use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings; - -use overwatch_rs::overwatch::*; - -const DEFAULT_DB_PATH: &str = "./db"; - #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] struct Args { @@ -58,6 +43,7 @@ fn main() -> Result<()> { overlay_args, metrics_args, } = Args::parse(); + let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)? .update_da(da_args)? .update_log(log_args)? @@ -66,52 +52,9 @@ fn main() -> Result<()> { .update_overlay(overlay_args)? .update_network(network_args)?; - let registry = cfg!(feature = "metrics") - .then(|| { - metrics_args - .with_metrics - .then(nomos_metrics::NomosRegistry::default) - }) - .flatten(); - - let app = OverwatchRunner::::run( - NomosServiceSettings { - network: config.network, - logging: config.log, - http: config.http, - cl_mempool: nomos_mempool::Settings { - backend: (), - network: AdapterSettings { - topic: String::from(nomos_node::CL_TOPIC), - id: ::hash, - }, - registry: registry.clone(), - }, - da_mempool: nomos_mempool::Settings { - backend: (), - network: AdapterSettings { - topic: String::from(nomos_node::DA_TOPIC), - id: cert_id, - }, - registry: registry.clone(), - }, - consensus: config.consensus, - #[cfg(feature = "metrics")] - metrics: MetricsSettings { registry }, - da: config.da, - storage: nomos_storage::backends::sled::SledBackendSettings { - db_path: std::path::PathBuf::from(DEFAULT_DB_PATH), - }, - system_sig: (), - }, - None, + nomos_node::run( + config, + #[cfg(feature = "metrics")] + metrics_args, ) - .map_err(|e| eyre!("Error encountered: {}", e))?; - app.wait_finished(); - Ok(()) -} - -fn cert_id(cert: &Certificate) -> ::Hash { - use certificate::Certificate; - cert.hash() } diff --git a/nomos-cli/Cargo.toml b/nomos-cli/Cargo.toml index f14f1c01..118768aa 100644 --- a/nomos-cli/Cargo.toml +++ b/nomos-cli/Cargo.toml @@ -26,7 +26,7 @@ nomos-libp2p = { path = "../nomos-libp2p"} nomos-core = { path = "../nomos-core" } nomos-node = { path = "../nodes/nomos-node" } full-replication = { path = "../nomos-da/full-replication" } -reqwest = "0.11" +reqwest = { version = "0.11", features = ["json"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" thiserror = "1.0" @@ -36,4 +36,22 @@ crossterm = "0.27" ratatui = "0.24" tui-input = "0.8" ansi-to-tui = "3" -rand = "0.8" \ No newline at end of file +rand = "0.8" + +[dev-dependencies] +tempfile = "3" +tokio = { version = "1", features = ["full"] } +nomos-api = { path = "../nomos-services/api" } +nomos-explorer = { path = "../nodes/explorer" } +nomos-log = { path = "../nomos-services/log" } +nomos-libp2p = { path = "../nomos-libp2p" } +nomos-storage = { path = "../nomos-services/storage" } +mixnode = { path = "../nodes/mixnode" } +mixnet-node = { path = "../mixnet/node", default-features = false } +mixnet-client = { path = "../mixnet/client" } +mixnet-topology = { path = "../mixnet/topology" } +scopeguard = "1" + +[[test]] +name = "integration" +path = "tests/main.rs" \ No newline at end of file diff --git a/nomos-cli/src/api/consensus.rs b/nomos-cli/src/api/consensus.rs index 86d31592..92b7b278 100644 --- a/nomos-cli/src/api/consensus.rs +++ b/nomos-cli/src/api/consensus.rs @@ -1,6 +1,5 @@ use super::CLIENT; use carnot_consensus::CarnotInfo; -use carnot_engine::{Block, BlockId}; use reqwest::Url; pub async fn carnot_info(node: &Url) -> Result { @@ -12,20 +11,3 @@ pub async fn carnot_info(node: &Url) -> Result { .json::() .await } - -pub async fn get_blocks_info( - node: &Url, - from: Option, - to: Option, -) -> Result, reqwest::Error> { - const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks"; - let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap()); - if let Some(from) = from { - req = req.query(&[("from", from)]); - } - if let Some(to) = to { - req = req.query(&[("to", to)]); - } - - req.send().await?.json().await -} diff --git a/nomos-cli/src/api/storage.rs b/nomos-cli/src/api/storage.rs index cc439600..c8c1d256 100644 --- a/nomos-cli/src/api/storage.rs +++ b/nomos-cli/src/api/storage.rs @@ -6,13 +6,13 @@ use nomos_node::Tx; use reqwest::Url; pub async fn get_block_contents( - node: &Url, + explorer: &Url, block: &BlockId, ) -> Result>, reqwest::Error> { - const BLOCK_PATH: &str = "storage/block"; + const BLOCK_PATH: &str = "explorer/blocks/depth"; CLIENT - .post(node.join(BLOCK_PATH).unwrap()) - .json(block) + .get(explorer.join(BLOCK_PATH).unwrap()) + .query(&[("from", block)]) .send() .await? .json() diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index c89619e4..3e84e1e1 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -5,7 +5,7 @@ mod ui; use crate::{ - api::consensus::get_blocks_info, + api::{consensus::carnot_info, storage::get_block_contents}, da::{ disseminate::{ DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status, @@ -18,7 +18,7 @@ use full_replication::{ AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings, }; use futures::{stream, StreamExt}; -use nomos_core::{block::BlockId, da::DaProtocol, wire}; +use nomos_core::{da::DaProtocol, wire}; use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter}; use nomos_network::{backends::libp2p::Libp2p, NetworkService}; use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData}; @@ -48,6 +48,8 @@ use ratatui::{ use tui_input::{backend::crossterm::EventHandler, Input}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); +// Limit the number of maximum in-flight requests +const MAX_BUFFERED_REQUESTS: usize = 20; #[derive(Clone, Debug, Args)] /// The almost-instant messaging protocol. @@ -58,9 +60,18 @@ pub struct NomosChat { /// The data availability protocol to use. Defaults to full replication. #[clap(flatten)] pub da_protocol: DaProtocolChoice, - /// The node to connect to to fetch blocks and blobs + /// The node to connect to to fetch carnot info #[clap(long)] pub node: Url, + /// The explorer to connect to to fetch blocks and blobs + #[clap(long)] + pub explorer: Url, + /// Author for non interactive message formation + #[clap(long, requires("message"))] + pub author: Option, + /// Message for non interactive message formation + #[clap(long, requires("author"))] + pub message: Option, } pub struct App { @@ -73,10 +84,27 @@ pub struct App { payload_sender: UnboundedSender>, status_updates: Receiver, node: Url, + explorer: Url, logs: Arc>>, scroll_logs: u16, } +impl App { + pub fn send_message(&self, msg: String) { + self.payload_sender + .send( + wire::serialize(&ChatMessage { + author: self.username.clone().unwrap(), + message: msg, + _nonce: rand::random(), + }) + .unwrap() + .into(), + ) + .unwrap(); + } +} + impl NomosChat { pub fn run(&self) -> Result<(), Box> { let network = serde_yaml::from_reader::< @@ -84,6 +112,47 @@ impl NomosChat { as ServiceData>::Settings, >(std::fs::File::open(&self.network_config)?)?; let da_protocol = self.da_protocol.clone(); + + let node_addr = Some(self.node.clone()); + + let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel(); + let (status_sender, status_updates) = std::sync::mpsc::channel(); + + let shared_writer = Arc::new(sync::Mutex::new(Vec::new())); + let backend = SharedWriter::from_inner(shared_writer.clone()); + + std::thread::spawn(move || { + OverwatchRunner::::run( + DisseminateAppServiceSettings { + network, + send_blob: Settings { + payload: Arc::new(Mutex::new(payload_receiver)), + timeout: DEFAULT_TIMEOUT, + da_protocol, + status_updates: status_sender, + node_addr, + output: None, + }, + logger: LoggerSettings { + backend: LoggerBackend::Writer(backend), + level: tracing::Level::INFO, + ..Default::default() + }, + }, + None, + ) + .unwrap() + .wait_finished() + }); + + if let Some(author) = self.author.as_ref() { + let message = self + .message + .as_ref() + .expect("Should be available if author is set"); + return run_once(author, message, payload_sender); + } + // setup terminal enable_raw_mode()?; let mut stdout = io::stdout(); @@ -91,6 +160,46 @@ impl NomosChat { let backend = CrosstermBackend::new(stdout); let mut terminal = Terminal::new(backend)?; + let app = App { + input: Input::default(), + username: None, + messages: Vec::new(), + message_status: None, + message_in_flight: false, + last_updated: Instant::now(), + payload_sender, + status_updates, + node: self.node.clone(), + explorer: self.explorer.clone(), + logs: shared_writer, + scroll_logs: 0, + }; + + run_app(&mut terminal, app); + + // restore terminal + disable_raw_mode()?; + execute!( + terminal.backend_mut(), + LeaveAlternateScreen, + DisableMouseCapture + )?; + terminal.show_cursor()?; + + Ok(()) + } + + pub fn run_app_without_terminal( + &self, + username: String, + ) -> Result<(std::sync::mpsc::Receiver>, App), Box> + { + let network = serde_yaml::from_reader::< + _, + as ServiceData>::Settings, + >(std::fs::File::open(&self.network_config)?)?; + let da_protocol = self.da_protocol.clone(); + let node_addr = Some(self.node.clone()); let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel(); @@ -125,7 +234,7 @@ impl NomosChat { let app = App { input: Input::default(), - username: None, + username: Some(username), messages: Vec::new(), message_status: None, message_in_flight: false, @@ -133,29 +242,43 @@ impl NomosChat { payload_sender, status_updates, node: self.node.clone(), + explorer: self.explorer.clone(), logs: shared_writer, scroll_logs: 0, }; - run_app(&mut terminal, app); + let (message_tx, message_rx) = std::sync::mpsc::channel(); + let node = app.node.clone(); + let explorer = app.explorer.clone(); + std::thread::spawn(move || check_for_messages(message_tx, node, explorer)); - // restore terminal - disable_raw_mode()?; - execute!( - terminal.backend_mut(), - LeaveAlternateScreen, - DisableMouseCapture - )?; - terminal.show_cursor()?; - - Ok(()) + Ok((message_rx, app)) } } +fn run_once( + author: &str, + message: &str, + payload_sender: UnboundedSender>, +) -> Result<(), Box> { + payload_sender.send( + wire::serialize(&ChatMessage { + author: author.to_string(), + message: message.to_string(), + _nonce: rand::random(), + }) + .unwrap() + .into(), + )?; + + Ok(()) +} + fn run_app(terminal: &mut Terminal, mut app: App) { let (message_tx, message_rx) = std::sync::mpsc::channel(); let node = app.node.clone(); - std::thread::spawn(move || check_for_messages(message_tx, node)); + let explorer = app.explorer.clone(); + std::thread::spawn(move || check_for_messages(message_tx, node, explorer)); loop { terminal.draw(|f| ui::ui(f, &app)).unwrap(); @@ -220,7 +343,7 @@ fn run_app(terminal: &mut Terminal, mut app: App) { } #[derive(Serialize, Deserialize, Debug)] -struct ChatMessage { +pub struct ChatMessage { author: String, message: String, // Since DA will rightfully ignore duplicated messages, we need to add a nonce to make sure @@ -228,15 +351,21 @@ struct ChatMessage { _nonce: u64, } -#[tokio::main] -async fn check_for_messages(sender: Sender>, node: Url) { - // Should ask for the genesis block to be more robust - let mut last_tip = BlockId::zeros(); +impl ChatMessage { + pub fn author(&self) -> &str { + &self.author + } + pub fn message(&self) -> &str { + &self.message + } +} + +#[tokio::main] +async fn check_for_messages(sender: Sender>, node: Url, explorer: Url) { loop { - if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await { + if let Ok(messages) = fetch_new_messages(&explorer, &node).await { sender.send(messages).expect("channel closed"); - last_tip = new_tip; } tokio::time::sleep(Duration::from_millis(100)).await; } @@ -245,10 +374,10 @@ async fn check_for_messages(sender: Sender>, node: Url) { // Process a single block's blobs and return chat messages async fn process_block_blobs( node: Url, - block_id: &BlockId, + block: &nomos_core::block::Block, da_settings: DaSettings, ) -> Result, Box> { - let blobs = get_block_blobs(&node, block_id).await?; + let blobs = get_block_blobs(&node, block).await?; // Note that number of attestations is ignored here since we only use the da protocol to // decode the blob data, not to validate the certificate @@ -269,29 +398,25 @@ async fn process_block_blobs( // Fetch new messages since the last tip async fn fetch_new_messages( - last_tip: &BlockId, + explorer: &Url, node: &Url, -) -> Result<(BlockId, Vec), Box> { +) -> Result, Box> { + let info = carnot_info(node).await?; // By only specifying the 'to' parameter we get all the blocks since the last tip - let mut new_blocks = get_blocks_info(node, None, Some(*last_tip)) + let mut blocks = get_block_contents(explorer, &info.tip.id) .await? .into_iter() - .map(|block| block.id) .collect::>(); - // The first block is the most recent one. - // Note that the 'to' is inclusive so the above request will always return at least one block - // as long as the block exists (which is the case since it was returned by a previous call) - let new_tip = new_blocks[0]; // We already processed the last block so let's remove it - new_blocks.pop(); + blocks.pop(); let da_settings = DaSettings { num_attestations: 1, voter: [0; 32], }; - let block_stream = stream::iter(new_blocks.iter().rev()); + let block_stream = stream::iter(blocks.iter()); let results: Vec<_> = block_stream .map(|block| { let node = node.clone(); @@ -299,7 +424,7 @@ async fn fetch_new_messages( process_block_blobs(node, block, da_settings) }) - .buffer_unordered(new_blocks.len()) + .buffered(MAX_BUFFERED_REQUESTS) .collect::>() .await; @@ -308,5 +433,5 @@ async fn fetch_new_messages( new_messages.extend(result?); } - Ok((new_tip, new_messages)) + Ok(new_messages) } diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index 27f511fe..fe8ebc6d 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -9,11 +9,11 @@ use reqwest::Url; use std::{path::PathBuf, sync::Arc, time::Duration}; use tokio::sync::Mutex; -#[derive(Args, Debug)] +#[derive(Args, Debug, Default)] pub struct Disseminate { // TODO: accept bytes - #[clap(short, long)] - pub data: String, + #[clap(short, long, required_unless_present("file"))] + pub data: Option, /// Path to the network config file #[clap(short, long)] pub network_config: PathBuf, @@ -30,6 +30,9 @@ pub struct Disseminate { /// File to write the certificate to, if present. #[clap(long)] pub output: Option, + /// File to disseminate + #[clap(short, long)] + pub file: Option, } impl Disseminate { @@ -41,7 +44,15 @@ impl Disseminate { as ServiceData>::Settings, >(std::fs::File::open(&self.network_config)?)?; let (status_updates, rx) = std::sync::mpsc::channel(); - let bytes: Box<[u8]> = self.data.clone().as_bytes().into(); + + let bytes: Box<[u8]> = if let Some(data) = &self.data { + data.clone().as_bytes().into() + } else { + let file_path = self.file.as_ref().unwrap(); + let file_bytes = std::fs::read(file_path)?; + file_bytes.into_boxed_slice() + }; + let timeout = Duration::from_secs(self.timeout); let da_protocol = self.da_protocol.clone(); let node_addr = self.node_addr.clone(); diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs index 5386496f..b28c704f 100644 --- a/nomos-cli/src/da/disseminate.rs +++ b/nomos-cli/src/da/disseminate.rs @@ -205,7 +205,7 @@ impl ServiceCore for DisseminateService { // protocols, but only the one chosen will be used. // We can enforce only sensible combinations of protocol/settings // are specified by using special clap directives -#[derive(Clone, Debug, Args)] +#[derive(Clone, Debug, Args, Default)] pub struct DaProtocolChoice { #[clap(long, default_value = "full-replication")] pub da_protocol: Protocol, @@ -227,14 +227,15 @@ impl TryFrom for FullReplication Result, Error> { - let block = get_block_contents(node, block) - .await? - .ok_or(Error::NotFound)?; - +pub async fn get_block_blobs( + node: &Url, + block: &Block, +) -> Result, Error> { Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?) } diff --git a/nomos-cli/test.config.chat.user1.yaml b/nomos-cli/test.config.chat.user1.yaml new file mode 100644 index 00000000..f62a84e6 --- /dev/null +++ b/nomos-cli/test.config.chat.user1.yaml @@ -0,0 +1,45 @@ +backend: + host: 127.0.0.1 + port: 3019 + log_level: "fatal" + # Node key needs to be unique for every client. + node_key: "0000000000000000000000000000000000000000000000000000000000001444" + discV5BootstrapNodes: [] + initial_peers: ["/dns/127.0.0.1/tcp/3000"] + relayTopics: [] + # Mixclient configuration to communicate with mixnodes. + # The libp2p network backend always requires this mixclient configuration + # (cannot be disabled for now). + mixnet_client: + # A mixclient mode. For details, see the documentation of the "mixnet" crate. + # - Sender + # - !SenderReceiver [mixnode_client_listen_address] + mode: Sender + # A mixnet topology, which contains the information of all mixnodes in the mixnet. + # (The topology is static for now.) + topology: + # Each mixnet layer consists of a list of mixnodes. + layers: + - nodes: + - address: 127.0.0.1:7707 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + - nodes: + - address: 127.0.0.1:7717 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + - nodes: + - address: 127.0.0.1:7727 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + # A max number of connections that will stay connected to mixnodes in the first mixnet layer. + connection_pool_size: 255 + max_retries: 5 + retry_delay: + secs: 1 + nanos: 0 + # A range of total delay that will be set to each Sphinx packets + # sent to the mixnet for timing obfuscation. + # Panics if start > end. + mixnet_delay: + start: "0ms" + end: "0ms" diff --git a/nomos-cli/test.config.chat.user2.yaml b/nomos-cli/test.config.chat.user2.yaml new file mode 100644 index 00000000..36404028 --- /dev/null +++ b/nomos-cli/test.config.chat.user2.yaml @@ -0,0 +1,45 @@ +backend: + host: 127.0.0.1 + port: 3020 + log_level: "fatal" + # Node key needs to be unique for every client. + node_key: "0000000000000000000000000000000000000000000000000000000000001444" + discV5BootstrapNodes: [] + initial_peers: ["/dns/127.0.0.1/tcp/3000"] + relayTopics: [] + # Mixclient configuration to communicate with mixnodes. + # The libp2p network backend always requires this mixclient configuration + # (cannot be disabled for now). + mixnet_client: + # A mixclient mode. For details, see the documentation of the "mixnet" crate. + # - Sender + # - !SenderReceiver [mixnode_client_listen_address] + mode: Sender + # A mixnet topology, which contains the information of all mixnodes in the mixnet. + # (The topology is static for now.) + topology: + # Each mixnet layer consists of a list of mixnodes. + layers: + - nodes: + - address: 127.0.0.1:7707 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + - nodes: + - address: 127.0.0.1:7717 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + - nodes: + - address: 127.0.0.1:7727 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + # A max number of connections that will stay connected to mixnodes in the first mixnet layer. + connection_pool_size: 255 + max_retries: 5 + retry_delay: + secs: 1 + nanos: 0 + # A range of total delay that will be set to each Sphinx packets + # sent to the mixnet for timing obfuscation. + # Panics if start > end. + mixnet_delay: + start: "0ms" + end: "0ms" diff --git a/nomos-cli/tests/main.rs b/nomos-cli/tests/main.rs new file mode 100644 index 00000000..7349fe20 --- /dev/null +++ b/nomos-cli/tests/main.rs @@ -0,0 +1,329 @@ +// use std::{env::current_dir, net::SocketAddr, path::PathBuf, thread::Builder, time::Duration}; + +// use carnot_consensus::CarnotSettings; +// use carnot_engine::{ +// overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings}, +// NodeId, Overlay, +// }; +// use fraction::{Fraction, One}; +// use mixnet_client::MixnetClientConfig; +// use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE}; +// use mixnet_topology::{Layer, MixnetTopology, Node}; +// use mixnode::{MixNode, MixNodeServiceSettings}; +// use nomos_cli::{ +// cmds::chat::{App, ChatMessage}, +// da::disseminate::{DaProtocolChoice, FullReplicationSettings, ProtocolSettings}, +// }; +// use nomos_network::{ +// backends::libp2p::{Libp2p, Libp2pConfig}, +// NetworkConfig, +// }; +// use nomos_node::{api::AxumBackendSettings, run, Config}; +// use nomos_storage::backends::rocksdb::RocksBackendSettings; +// use overwatch_rs::{overwatch::OverwatchRunner, DynError}; +// use rand::{thread_rng, RngCore}; +// use reqwest::Url; +// use tracing::Level; + +// fn run_mixnode( +// name: &'static str, +// listen_addr: SocketAddr, +// client_listen_addr: SocketAddr, +// ) -> MixnetNodeConfig { +// let mut private_key = [0u8; PRIVATE_KEY_SIZE]; +// thread_rng().fill_bytes(&mut private_key); +// let config = MixnetNodeConfig { +// listen_address: listen_addr, +// client_listen_address: client_listen_addr, +// private_key, +// connection_pool_size: 255, +// ..Default::default() +// }; +// let config1 = config.clone(); +// Builder::new() +// .name(name.to_string()) +// .spawn(move || { +// let mut cfg = MixNodeServiceSettings { +// node: config1, +// logging: Default::default(), +// }; +// cfg.logging.level = Level::WARN; + +// let app = OverwatchRunner::::run(cfg, None)?; +// app.wait_finished(); +// std::result::Result::<_, DynError>::Ok(()) +// }) +// .unwrap(); +// config +// } + +// fn build_topology(configs: Vec) -> MixnetTopology { +// // Build three empty layers first +// let mut layers = vec![Layer { nodes: Vec::new() }; 3]; +// let mut layer_id = 0; + +// // Assign nodes to each layer in round-robin +// for config in &configs { +// let public_key = config.public_key(); +// layers.get_mut(layer_id).unwrap().nodes.push(Node { +// address: config.listen_address, +// public_key, +// }); +// layer_id = (layer_id + 1) % layers.len(); +// } + +// // Exclude empty layers +// MixnetTopology { +// layers: layers +// .iter() +// .filter(|layer| !layer.nodes.is_empty()) +// .cloned() +// .collect(), +// } +// } + +// fn run_nomos_node(name: &'static str, mut config: Config) { +// Builder::new() +// .name(name.into()) +// .spawn(move || { +// config.log.level = Level::WARN; +// run(config) +// }) +// .unwrap(); +// } + +// fn run_explorer(explorer_db_dir: PathBuf) { +// Builder::new() +// .name("explorer".into()) +// .spawn(|| { +// let mut cfg = nomos_explorer::Config { +// log: Default::default(), +// api: nomos_api::ApiServiceSettings { +// backend_settings: nomos_explorer::AxumBackendSettings { +// address: "127.0.0.1:5000".parse().unwrap(), +// cors_origins: Vec::new(), +// }, +// }, +// storage: nomos_storage::backends::rocksdb::RocksBackendSettings { +// db_path: explorer_db_dir, +// read_only: true, +// column_family: Some("blocks".into()), +// }, +// }; +// cfg.log.level = Level::WARN; +// nomos_explorer::Explorer::run(cfg).unwrap() +// }) +// .unwrap(); +// } + +// fn run_chat( +// username: String, +// config: &'static str, +// node_url: Url, +// explorer_url: Url, +// network: NetworkConfig, +// ) -> (std::sync::mpsc::Receiver>, App) { +// let c = nomos_cli::cmds::chat::NomosChat { +// network_config: current_dir().unwrap().join(config), +// da_protocol: DaProtocolChoice { +// da_protocol: nomos_cli::da::disseminate::Protocol::FullReplication, +// settings: ProtocolSettings { +// full_replication: FullReplicationSettings { +// voter: [0; 32], +// num_attestations: 1, +// }, +// }, +// }, +// node: node_url, +// explorer: explorer_url, +// message: None, +// author: None, +// }; +// c.run_app(username).unwrap() +// } + +// fn create_node_config( +// db_dir: PathBuf, +// api_backend_addr: SocketAddr, +// network_backend_port: u16, +// nodes: Vec, +// id: [u8; 32], +// threshold: Fraction, +// timeout: Duration, +// mixnet_node_config: Option, +// mixnet_topology: MixnetTopology, +// ) -> Config { +// let mixnet_client_mode = match mixnet_node_config { +// Some(node_config) => mixnet_client::MixnetClientMode::SenderReceiver( +// node_config.client_listen_address.to_string(), +// ), +// None => mixnet_client::MixnetClientMode::Sender, +// }; + +// let mut config = Config { +// network: NetworkConfig { +// backend: Libp2pConfig { +// inner: Default::default(), +// initial_peers: vec![], +// mixnet_client: MixnetClientConfig { +// mode: mixnet_client_mode, +// topology: mixnet_topology, +// connection_pool_size: 255, +// max_retries: 3, +// retry_delay: Duration::from_secs(5), +// }, +// mixnet_delay: Duration::ZERO..Duration::from_millis(10), +// }, +// }, +// consensus: CarnotSettings { +// private_key: id, +// overlay_settings: TreeOverlaySettings { +// nodes, +// leader: RoundRobin::new(), +// current_leader: [0; 32].into(), +// number_of_committees: 1, +// committee_membership: RandomBeaconState::initial_sad_from_entropy([0; 32]), +// // By setting the threshold to 1 we ensure that all nodes come +// // online before progressing. This is only necessary until we add a way +// // to recover poast blocks from other nodes. +// super_majority_threshold: Some(threshold), +// }, +// timeout, +// transaction_selector_settings: (), +// blob_selector_settings: (), +// }, +// log: Default::default(), +// http: nomos_api::ApiServiceSettings { +// backend_settings: AxumBackendSettings { +// address: api_backend_addr, +// cors_origins: vec![], +// }, +// }, +// da: nomos_da::Settings { +// da_protocol: full_replication::Settings { +// voter: id, +// num_attestations: 1, +// }, +// backend: nomos_da::backend::memory_cache::BlobCacheSettings { +// max_capacity: usize::MAX, +// evicting_period: Duration::from_secs(60 * 60 * 24), // 1 day +// }, +// }, +// storage: RocksBackendSettings { +// db_path: db_dir, +// read_only: false, +// column_family: Some("blocks".into()), +// }, +// }; + +// config.network.backend.inner.port = network_backend_port; + +// config +// } + +// #[test] +// fn integration_test() { +// let temp_dir = tempfile::tempdir().unwrap(); +// let dir = temp_dir.path().to_path_buf().join("integration_test"); +// let peer_dir = dir.join("peer"); +// // create a directory for the db +// std::fs::create_dir_all(&peer_dir).unwrap(); + +// let mut mixnet_configs = Vec::with_capacity(3); +// // Run 3 mixnodes +// mixnet_configs.push(run_mixnode( +// "mixnode1", +// "127.0.0.1:7707".parse().unwrap(), +// "127.0.0.1:7708".parse().unwrap(), +// )); +// mixnet_configs.push(run_mixnode( +// "mixnode2", +// "127.0.0.1:7717".parse().unwrap(), +// "127.0.0.1:7718".parse().unwrap(), +// )); +// mixnet_configs.push(run_mixnode( +// "mixnode3", +// "127.0.0.1:7727".parse().unwrap(), +// "127.0.0.1:7728".parse().unwrap(), +// )); +// let mixnet_topology = build_topology(mixnet_configs.clone()); + +// // Run bootstrap nomos node +// let ids = [[0; 32], [1; 32]]; +// let config1 = create_node_config( +// dir.clone(), +// "127.0.0.1:11000".parse().unwrap(), +// 3000, +// ids.iter().copied().map(NodeId::new).collect(), +// ids[0], +// Fraction::one(), +// Duration::from_secs(5), +// mixnet_configs.pop(), +// mixnet_topology.clone(), +// ); +// let config2 = create_node_config( +// peer_dir.clone(), +// "127.0.0.1:11001".parse().unwrap(), +// 3001, +// ids.iter().copied().map(NodeId::new).collect(), +// ids[1], +// Fraction::one(), +// Duration::from_secs(5), +// mixnet_configs.pop(), +// mixnet_topology.clone(), +// ); +// let mut configs = vec![config1, config2]; +// let overlay = TreeOverlay::new(configs[0].consensus.overlay_settings.clone()); +// let next_leader = overlay.next_leader(); +// let next_leader_idx = ids +// .iter() +// .position(|&id| NodeId::from(id) == next_leader) +// .unwrap(); +// let next_leader_config = configs.swap_remove(next_leader_idx); +// let libp2p_config = next_leader_config.network.clone(); +// let explorer_db_dir = next_leader_config.storage.db_path.clone(); +// let node_api_addr = next_leader_config.http.backend_settings.address; +// let prev_node_addr = nomos_libp2p::multiaddr!( +// Ip4([127, 0, 0, 1]), +// Tcp(next_leader_config.network.backend.inner.port) +// ); +// run_nomos_node("bootstrap", next_leader_config); + +// configs[0] +// .network +// .backend +// .initial_peers +// .push(prev_node_addr); +// run_nomos_node("libp2p", configs.pop().unwrap()); + +// // wait for the bootstrap node to start +// std::thread::sleep(std::time::Duration::from_secs(1)); + +// run_explorer(explorer_db_dir); + +// let (rx1, app1) = run_chat( +// "user1".into(), +// "test.config.chat.user1.yaml", +// format!("http://{}", node_api_addr).parse().unwrap(), +// "http://127.0.0.1:5000".parse().unwrap(), +// libp2p_config.clone(), +// ); +// let (rx2, app2) = run_chat( +// "user2".into(), +// "test.config.chat.user2.yaml", +// format!("http://{}", node_api_addr).parse().unwrap(), +// "http://127.0.0.1:5000".parse().unwrap(), +// libp2p_config, +// ); + +// app1.send_message("Hello from user1".into()); +// tracing::info!("user1: sent message: Hello from user1"); + +// app2.send_message("Hello from user2".into()); +// tracing::info!("user2: sent message: Hello from user2"); +// let msgs1 = rx1.recv().unwrap(); +// let msgs2 = rx2.recv().unwrap(); + +// assert_eq!(msgs1.len(), 1); +// assert_eq!(msgs2.len(), 1); +// } diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index 0e7aa4e1..bd3407a9 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -20,7 +20,7 @@ nomos-network = { path = "../../nomos-services/network" } nomos-da = { path = "../../nomos-services/data-availability" } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] } nomos-metrics = { path = "../../nomos-metrics" } -nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } +nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } nomos-libp2p = { path = "../../nomos-libp2p" } full-replication = { path = "../../nomos-da/full-replication" } serde = { version = "1", features = ["derive"] } diff --git a/nomos-services/api/src/http/consensus.rs b/nomos-services/api/src/http/consensus.rs index a858ebff..218f59f2 100644 --- a/nomos-services/api/src/http/consensus.rs +++ b/nomos-services/api/src/http/consensus.rs @@ -23,7 +23,7 @@ use nomos_core::{ use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter, }; -use nomos_storage::backends::{sled::SledBackend, StorageSerde}; +use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde}; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, @@ -37,7 +37,7 @@ pub type Carnot = CarnotConsensus< TreeOverlay, FillSizeWithTx, FillSizeWithBlobsCertificate, - SledBackend, + RocksBackend, >; pub async fn carnot_info( diff --git a/nomos-services/api/src/http/storage.rs b/nomos-services/api/src/http/storage.rs index 090c09cd..e7579d62 100644 --- a/nomos-services/api/src/http/storage.rs +++ b/nomos-services/api/src/http/storage.rs @@ -1,7 +1,7 @@ use carnot_engine::BlockId; use nomos_core::block::Block; use nomos_storage::{ - backends::{sled::SledBackend, StorageSerde}, + backends::{rocksdb::RocksBackend, StorageSerde}, StorageMsg, StorageService, }; @@ -14,7 +14,7 @@ where S: StorageSerde + Send + Sync + 'static, { let relay = handle - .relay::>>() + .relay::>>() .connect() .await?; let (msg, receiver) = StorageMsg::new_load_message(id); diff --git a/nomos-services/carnot-consensus/src/network/adapters/mock.rs b/nomos-services/carnot-consensus/src/network/adapters/mock.rs index db43164a..67295e06 100644 --- a/nomos-services/carnot-consensus/src/network/adapters/mock.rs +++ b/nomos-services/carnot-consensus/src/network/adapters/mock.rs @@ -13,7 +13,7 @@ use crate::network::{ messages::{ProposalMsg, VoteMsg}, BoxedStream, NetworkAdapter, }; -use consensus_engine::{BlockId, Committee, View}; +use carnot_engine::{BlockId, Committee, View}; const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic"; const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockBlock"); diff --git a/nomos-services/log/src/lib.rs b/nomos-services/log/src/lib.rs index afa726ce..09c3043d 100644 --- a/nomos-services/log/src/lib.rs +++ b/nomos-services/log/src/lib.rs @@ -144,6 +144,10 @@ macro_rules! registry_init { #[async_trait::async_trait] impl ServiceCore for Logger { fn init(service_state: ServiceStateHandle) -> Result { + use std::sync::Once; + + static ONCE_INIT: Once = Once::new(); + let config = service_state.settings_reader.get_updated_settings(); let (non_blocking, _guard) = match config.backend { LoggerBackend::Gelf { addr } => { @@ -152,7 +156,9 @@ impl ServiceCore for Logger { .overwatch_handle .runtime() .spawn(async move { task.connect().await }); - registry_init!(layer, config.format, config.level); + ONCE_INIT.call_once(move || { + registry_init!(layer, config.format, config.level); + }); return Ok(Self { service_state, worker_guard: None, @@ -179,7 +185,9 @@ impl ServiceCore for Logger { let layer = tracing_subscriber::fmt::Layer::new() .with_level(true) .with_writer(non_blocking); - registry_init!(layer, config.format, config.level); + ONCE_INIT.call_once(move || { + registry_init!(layer, config.format, config.level); + }); Ok(Self { service_state, worker_guard: Some(_guard), diff --git a/nomos-services/storage/Cargo.toml b/nomos-services/storage/Cargo.toml index b2a92743..a7d0468b 100644 --- a/nomos-services/storage/Cargo.toml +++ b/nomos-services/storage/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.2" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } serde = "1.0" sled = { version = "0.34", optional = true } +rocksdb = { version = "0.22", optional = true } thiserror = "1.0" tracing = "0.1" @@ -24,3 +25,9 @@ tempfile = "3" default = [] mock = [] sled-backend = ["sled"] +rocksdb-backend = ["rocksdb"] + +[[bin]] +name = "rocks" +path = "src/bin/rocks.rs" +required-features = ["rocksdb-backend"] diff --git a/nomos-services/storage/src/backends/mod.rs b/nomos-services/storage/src/backends/mod.rs index ac640857..2594b9ca 100644 --- a/nomos-services/storage/src/backends/mod.rs +++ b/nomos-services/storage/src/backends/mod.rs @@ -3,6 +3,9 @@ pub mod mock; #[cfg(feature = "sled")] pub mod sled; +#[cfg(feature = "rocksdb")] +pub mod rocksdb; + // std use std::error::Error; // crates diff --git a/nomos-services/storage/src/backends/rocksdb.rs b/nomos-services/storage/src/backends/rocksdb.rs new file mode 100644 index 00000000..5a37cb78 --- /dev/null +++ b/nomos-services/storage/src/backends/rocksdb.rs @@ -0,0 +1,250 @@ +// std +use std::path::PathBuf; +use std::{marker::PhantomData, sync::Arc}; +// crates +use async_trait::async_trait; +use bytes::Bytes; +pub use rocksdb::Error; +use rocksdb::{Options, DB}; +// internal +use super::{StorageBackend, StorageSerde, StorageTransaction}; + +/// Rocks backend setting +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] +pub struct RocksBackendSettings { + /// File path to the db file + pub db_path: PathBuf, + pub read_only: bool, + pub column_family: Option, +} + +/// Rocks transaction type +// Do not use `TransactionDB` here, because rocksdb's `TransactionDB` does not support open by read-only mode. +// Thus, we cannot open the same db in two or more processes. +pub struct Transaction { + rocks: Arc, + #[allow(clippy::type_complexity)] + executor: Box Result, Error> + Send + Sync>, +} + +impl Transaction { + /// Execute a function over the transaction + pub fn execute(self) -> Result, Error> { + (self.executor)(&self.rocks) + } +} + +impl StorageTransaction for Transaction { + type Result = Result, Error>; + type Transaction = Self; +} + +/// Rocks storage backend + +pub struct RocksBackend { + rocks: Arc, + _serde_op: PhantomData, +} + +impl RocksBackend { + pub fn txn( + &self, + executor: impl FnOnce(&DB) -> Result, Error> + Send + Sync + 'static, + ) -> Transaction { + Transaction { + rocks: self.rocks.clone(), + executor: Box::new(executor), + } + } +} + +impl core::fmt::Debug for RocksBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + format!("RocksBackend {{ rocks: {:?} }}", self.rocks).fmt(f) + } +} + +#[async_trait] +impl StorageBackend for RocksBackend { + type Settings = RocksBackendSettings; + type Error = rocksdb::Error; + type Transaction = Transaction; + type SerdeOperator = SerdeOp; + + fn new(config: Self::Settings) -> Result { + let RocksBackendSettings { + db_path, + read_only, + column_family: cf, + } = config; + + let db = match (read_only, cf) { + (true, None) => { + let mut opts = Options::default(); + opts.create_if_missing(false); + DB::open_for_read_only(&opts, db_path, false)? + } + (true, Some(cf)) => { + let mut opts = Options::default(); + opts.create_if_missing(false); + DB::open_cf_for_read_only(&opts, db_path, [cf], false)? + } + (false, None) => { + let mut opts = Options::default(); + opts.create_if_missing(true); + DB::open(&opts, db_path)? + } + (false, Some(cf)) => { + let mut opts = Options::default(); + opts.create_if_missing(true); + DB::open_cf(&opts, db_path, [cf])? + } + }; + + Ok(Self { + rocks: Arc::new(db), + _serde_op: Default::default(), + }) + } + + async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> { + self.rocks.put(key, value) + } + + async fn load(&mut self, key: &[u8]) -> Result, Self::Error> { + self.rocks.get(key).map(|opt| opt.map(|ivec| ivec.into())) + } + + async fn remove(&mut self, key: &[u8]) -> Result, Self::Error> { + self.load(key).await.and_then(|val| { + if val.is_some() { + self.rocks.delete(key).map(|_| val) + } else { + Ok(None) + } + }) + } + + async fn execute( + &mut self, + transaction: Self::Transaction, + ) -> Result<::Result, Self::Error> { + Ok(transaction.execute()) + } +} + +#[cfg(test)] +mod test { + use super::super::testing::NoStorageSerde; + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_store_load_remove( + ) -> Result<(), as StorageBackend>::Error> { + let temp_path = TempDir::new().unwrap(); + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + let key = "foo"; + let value = "bar"; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + db.store(key.as_bytes().into(), value.as_bytes().into()) + .await?; + let load_value = db.load(key.as_bytes()).await?; + assert_eq!(load_value, Some(value.as_bytes().into())); + let removed_value = db.remove(key.as_bytes()).await?; + assert_eq!(removed_value, Some(value.as_bytes().into())); + + Ok(()) + } + + #[tokio::test] + async fn test_transaction( + ) -> Result<(), as StorageBackend>::Error> { + let temp_path = TempDir::new().unwrap(); + + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + let txn = db.txn(|db| { + let key = "foo"; + let value = "bar"; + db.put(key, value)?; + let result = db.get(key)?; + db.delete(key)?; + Ok(result.map(|ivec| ivec.to_vec().into())) + }); + let result = db.execute(txn).await??; + assert_eq!(result, Some("bar".as_bytes().into())); + + Ok(()) + } + + #[tokio::test] + async fn test_multi_readers_single_writer( + ) -> Result<(), as StorageBackend>::Error> { + use tokio::sync::mpsc::channel; + + let temp_path = TempDir::new().unwrap(); + let path = temp_path.path().to_path_buf(); + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + let key = "foo"; + let value = "bar"; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + + let (tx, mut rx) = channel(5); + // now let us spawn a few readers + for _ in 0..5 { + let p = path.clone(); + let tx = tx.clone(); + std::thread::spawn(move || { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let sled_settings = RocksBackendSettings { + db_path: p, + read_only: true, + column_family: None, + }; + let key = "foo"; + + let mut db: RocksBackend = + RocksBackend::new(sled_settings).unwrap(); + + while db.load(key.as_bytes()).await.unwrap().is_none() { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + + tx.send(()).await.unwrap(); + }); + }); + } + + db.store(key.as_bytes().into(), value.as_bytes().into()) + .await?; + + let mut recvs = 0; + loop { + if rx.recv().await.is_some() { + recvs += 1; + if recvs == 5 { + break; + } + } + } + Ok(()) + } +} diff --git a/nomos-services/storage/src/backends/sled.rs b/nomos-services/storage/src/backends/sled.rs index 5783bd21..225b823c 100644 --- a/nomos-services/storage/src/backends/sled.rs +++ b/nomos-services/storage/src/backends/sled.rs @@ -19,7 +19,7 @@ pub enum Error { } /// Sled backend setting -#[derive(Clone, Debug)] +#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] pub struct SledBackendSettings { /// File path to the db file pub db_path: PathBuf, diff --git a/nomos-services/storage/src/bin/rocks.rs b/nomos-services/storage/src/bin/rocks.rs new file mode 100644 index 00000000..bfd567e8 --- /dev/null +++ b/nomos-services/storage/src/bin/rocks.rs @@ -0,0 +1,58 @@ +use rocksdb::{Options, DB}; + +const TEMP_ROCKS_PATH: &str = "rocks"; + +pub fn rocksdb_ro() { + let mut opts = Options::default(); + opts.create_if_missing(true); + + // open in read only mode + let db = DB::open_cf_for_read_only(&opts, TEMP_ROCKS_PATH, ["blocks", "da"], false).unwrap(); + + let blocks_cf = db.cf_handle("blocks").unwrap(); + let r = db.get_cf(blocks_cf, b"block1").unwrap().unwrap(); + + assert_eq!(r, b"block1data"); + + let da_cf = db.cf_handle("da").unwrap(); + let r = db.get_cf(da_cf, b"da1").unwrap().unwrap(); + assert_eq!(r, b"da1data"); + + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +pub fn rocksdb_rw() { + let mut opts = Options::default(); + opts.create_if_missing(true); + opts.create_missing_column_families(true); + + let db = DB::open_cf(&opts, TEMP_ROCKS_PATH, ["blocks", "da"]).unwrap(); + + // open blocks column family and insert a block + let blocks_cf = db.cf_handle("blocks").unwrap(); + db.put_cf(blocks_cf, b"block1", b"block1data").unwrap(); + + // open da column family and insert a blob + let da_cf = db.cf_handle("da").unwrap(); + db.put_cf(da_cf, b"da1", b"da1data").unwrap(); + + // A loop to mock a long running program + loop { + std::thread::sleep(std::time::Duration::from_secs(1)); + } +} + +fn main() { + let mut args = std::env::args(); + args.next(); + let o = args.next(); + if o.is_none() { + println!("open in read-write mode"); + rocksdb_rw() + } else { + println!("open in read-only mode"); + rocksdb_ro() + } +} diff --git a/shell.nix b/shell.nix index 3c7016f5..fc909337 100644 --- a/shell.nix +++ b/shell.nix @@ -20,6 +20,7 @@ pkgs.mkShell { rust-bin.stable."1.75.0".default clang_14 llvmPackages_14.libclang + openssl ]; shellHook = '' export LIBCLANG_PATH="${pkgs.llvmPackages_14.libclang.lib}/lib"; diff --git a/testnet/Dockerfile b/testnet/Dockerfile index b81e172d..e901b96d 100644 --- a/testnet/Dockerfile +++ b/testnet/Dockerfile @@ -2,10 +2,6 @@ FROM rust:1.75.0-slim-bullseye AS builder -# Using backports for go 1.19 -RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ - >> /etc/apt/sources.list - # Dependecies for publishing documentation. RUN apt-get update && apt-get install -yq \ git clang etcd-client libssl-dev pkg-config @@ -13,11 +9,11 @@ RUN apt-get update && apt-get install -yq \ WORKDIR /nomos COPY . . -RUN cargo build --release --all +RUN cargo build --release --all --features metrics # NODE IMAGE ---------------------------------------------------------- -FROM bitnami/minideb:latest +FROM bitnami/minideb:bullseye LABEL maintainer="augustinas@status.im" \ source="https://github.com/logos-co/nomos-node" \ @@ -27,9 +23,9 @@ LABEL maintainer="augustinas@status.im" \ EXPOSE 3000 8080 9000 60000 COPY --from=builder /nomos/target/release/nomos-node /usr/bin/nomos-node +COPY --from=builder /nomos/target/release/nomos-cli /usr/bin/nomos-cli COPY --from=builder /nomos/target/release/mixnode /usr/bin/mixnode COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl COPY nodes/nomos-node/config.yaml /etc/nomos/config.yaml -RUN install_packages python3 python3-etcd3 ENTRYPOINT ["/usr/bin/nomos-node"] diff --git a/testnet/cli_config.yaml b/testnet/cli_config.yaml new file mode 100644 index 00000000..7f4eef37 --- /dev/null +++ b/testnet/cli_config.yaml @@ -0,0 +1,44 @@ +backend: + host: 0.0.0.0 + port: 4007 + log_level: "fatal" + node_key: "0000000000000000000000000000000000000000000000000000000000000667" + discV5BootstrapNodes: [] + initial_peers: ["/dns/bootstrap/tcp/3000"] + relayTopics: [] + # Mixclient configuration to communicate with mixnodes. + # The libp2p network backend always requires this mixclient configuration + # (cannot be disabled for now). + mixnet_client: + # A mixclient mode. For details, see the documentation of the "mixnet" crate. + # - Sender + # - !SenderReceiver [mixnode_client_listen_address] + mode: Sender + # A mixnet topology, which contains the information of all mixnodes in the mixnet. + # (The topology is static for now.) + topology: + # Each mixnet layer consists of a list of mixnodes. + layers: + - nodes: + - address: mix-node-0:7707 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + - nodes: + - address: mix-node-1:7717 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + - nodes: + - address: mix-node-2:7727 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + # A max number of connections that will stay connected to mixnodes in the first mixnet layer. + connection_pool_size: 255 + max_retries: 5 + retry_delay: + secs: 1 + nanos: 0 + # A range of total delay that will be set to each Sphinx packets + # sent to the mixnet for timing obfuscation. + # Panics if start > end. + mixnet_delay: + start: "0ms" + end: "0ms" diff --git a/testnet/explorer_config.yaml b/testnet/explorer_config.yaml new file mode 100644 index 00000000..a5b47758 --- /dev/null +++ b/testnet/explorer_config.yaml @@ -0,0 +1,10 @@ +log: + backend: "Stdout" + format: "Json" + level: "info" + +api: + backend_settings: + address: 0.0.0.0:9090 + cors_origins: [] + diff --git a/testnet/monitoring/grafana/datasources.yaml b/testnet/monitoring/grafana/datasources.yaml new file mode 100644 index 00000000..2d99f845 --- /dev/null +++ b/testnet/monitoring/grafana/datasources.yaml @@ -0,0 +1,11 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + org_id: 1 + url: http://prometheus:9090 + is_default: true + version: 1 + editable: true diff --git a/testnet/monitoring/grafana/grafana.ini b/testnet/monitoring/grafana/grafana.ini new file mode 100644 index 00000000..3c60e138 --- /dev/null +++ b/testnet/monitoring/grafana/grafana.ini @@ -0,0 +1,51 @@ +instance_name = nomos dashboard + +;[dashboards.json] +;enabled = true +;path = /home/git/grafana/grafana-dashboards/dashboards + + +#################################### Auth ########################## +[auth] +disable_login_form = false + +#################################### Anonymous Auth ########################## +[auth.anonymous] +# enable anonymous access +enabled = true + +# specify organization name that should be used for unauthenticated users +;org_name = Public + +# specify role for unauthenticated users +; org_role = Admin +org_role = Viewer + +;[security] +;admin_user = ocr +;admin_password = ocr + +;[users] +# disable user signup / registration +;allow_sign_up = false + +# Set to true to automatically assign new users to the default organization (id 1) +;auto_assign_org = true + +# Default role new users will be automatically assigned (if disabled above is set to true) +;auto_assign_org_role = Viewer + +#################################### SMTP / Emailing ########################## +;[smtp] +;enabled = false +;host = localhost:25 +;user = +;password = +;cert_file = +;key_file = +;skip_verify = false +;from_address = admin@grafana.localhost + +;[emails] +;welcome_email_on_sign_up = false + diff --git a/testnet/monitoring/grafana/plugins.env b/testnet/monitoring/grafana/plugins.env new file mode 100644 index 00000000..2a4b4876 --- /dev/null +++ b/testnet/monitoring/grafana/plugins.env @@ -0,0 +1 @@ +GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel,yesoreyeram-boomtheme-panel,briangann-gauge-panel,pierosavi-imageit-panel,bessler-pictureit-panel,vonage-status-panel diff --git a/testnet/monitoring/prometheus-config.yml b/testnet/monitoring/prometheus-config.yml new file mode 100644 index 00000000..7be135f9 --- /dev/null +++ b/testnet/monitoring/prometheus-config.yml @@ -0,0 +1,14 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + external_labels: + monitor: "Monitoring" + +scrape_configs: + - job_name: "libp2p" + static_configs: + - targets: + - bootstrap:18080 + - libp2p_node_1:18080 + - libp2p_node_2:18080 + - libp2p_node_3:18080 diff --git a/testnet/scripts/run_bootstrap_node.sh b/testnet/scripts/run_bootstrap_node.sh index 180940ab..525f7f10 100755 --- a/testnet/scripts/run_bootstrap_node.sh +++ b/testnet/scripts/run_bootstrap_node.sh @@ -17,4 +17,4 @@ echo "CONSENSUS_PRIV_KEY: ${CONSENSUS_PRIV_KEY}" echo "DA_VOTER: ${DA_VOTER}" echo "OVERLAY_NODES: ${OVERLAY_NODES}" -exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml +exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml --with-metrics diff --git a/testnet/scripts/run_nomos_bot.sh b/testnet/scripts/run_nomos_bot.sh new file mode 100755 index 00000000..c93aa01f --- /dev/null +++ b/testnet/scripts/run_nomos_bot.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +echo "I am a container ${HOSTNAME} bot" + +while true +do + /usr/bin/nomos-cli chat --author nomos-ghost --message "$(date +%H:%M:%S) ~ ping" --network-config /etc/nomos/cli_config.yaml --node http://bootstrap:18080 + sleep 10 +done diff --git a/testnet/scripts/run_nomos_node.sh b/testnet/scripts/run_nomos_node.sh index 742a604f..ea462f95 100755 --- a/testnet/scripts/run_nomos_node.sh +++ b/testnet/scripts/run_nomos_node.sh @@ -33,4 +33,4 @@ echo "DA_VOTER: ${DA_VOTER}" echo "OVERLAY_NODES: ${OVERLAY_NODES}" echo "NET_INITIAL_PEERS: ${NET_INITIAL_PEERS}" -exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml +exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml --with-metrics diff --git a/tests/Cargo.toml b/tests/Cargo.toml index 4b4313db..30b823e8 100644 --- a/tests/Cargo.toml +++ b/tests/Cargo.toml @@ -6,10 +6,12 @@ publish = false [dependencies] nomos-node = { path = "../nodes/nomos-node", default-features = false } +nomos-explorer = { path = "../nodes/explorer", default-features = false } carnot-consensus = { path = "../nomos-services/carnot-consensus" } nomos-network = { path = "../nomos-services/network", features = ["libp2p"]} nomos-log = { path = "../nomos-services/log" } nomos-api = { path = "../nomos-services/api" } +nomos-storage = { path = "../nomos-services/storage" } overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } nomos-core = { path = "../nomos-core" } carnot-engine = { path = "../consensus/carnot-engine", features = ["serde"] } diff --git a/tests/src/lib.rs b/tests/src/lib.rs index cbc6da5b..960d30eb 100644 --- a/tests/src/lib.rs +++ b/tests/src/lib.rs @@ -9,6 +9,7 @@ use std::env; // std use std::net::TcpListener; use std::ops::Mul; +use std::path::PathBuf; use std::time::Duration; use std::{fmt::Debug, sync::Mutex}; @@ -41,7 +42,7 @@ pub fn adjust_timeout(d: Duration) -> Duration { #[async_trait::async_trait] pub trait Node: Sized { type ConsensusInfo: Debug + Clone + PartialEq; - async fn spawn_nodes(config: SpawnConfig) -> Vec; + async fn spawn_nodes(config: SpawnConfig, storage_dir: Option) -> Vec; async fn consensus_info(&self) -> Self::ConsensusInfo; fn stop(&mut self); } diff --git a/tests/src/nodes/mixnode.rs b/tests/src/nodes/mixnode.rs index 74678364..87213c1b 100644 --- a/tests/src/nodes/mixnode.rs +++ b/tests/src/nodes/mixnode.rs @@ -4,8 +4,10 @@ use std::{ time::Duration, }; +use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE}; use mixnet_topology::{Layer, MixnetTopology, Node}; +use nomos_log::{LoggerBackend, LoggerFormat}; use rand::{thread_rng, RngCore}; use tempfile::NamedTempFile; @@ -14,21 +16,37 @@ use crate::{get_available_port, MixnetConfig}; const MIXNODE_BIN: &str = "../target/debug/mixnode"; pub struct MixNode { + _tempdir: tempfile::TempDir, child: Child, } impl Drop for MixNode { fn drop(&mut self) { - self.child.kill().unwrap(); + if std::thread::panicking() { + if let Err(e) = persist_tempdir(&mut self._tempdir, "mixnode") { + println!("failed to persist tempdir: {e}"); + } + } + + if let Err(e) = self.child.kill() { + println!("failed to kill the child process: {e}"); + } } } impl MixNode { pub async fn spawn(config: MixnetNodeConfig) -> Self { - let config = mixnode::Config { + let dir = create_tempdir().unwrap(); + + let mut config = mixnode::Config { mixnode: config, log: Default::default(), }; + config.log.backend = LoggerBackend::File { + directory: dir.path().to_owned(), + prefix: Some(LOGS_PREFIX.into()), + }; + config.log.format = LoggerFormat::Json; let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); @@ -43,7 +61,10 @@ impl MixNode { //TODO: use a sophisticated way to wait until the node is ready tokio::time::sleep(Duration::from_secs(1)).await; - Self { child } + Self { + _tempdir: dir, + child, + } } pub async fn spawn_nodes(num_nodes: usize) -> (Vec, MixnetConfig) { diff --git a/tests/src/nodes/mod.rs b/tests/src/nodes/mod.rs index 05c24ae1..bc2a9e61 100644 --- a/tests/src/nodes/mod.rs +++ b/tests/src/nodes/mod.rs @@ -3,3 +3,26 @@ pub mod nomos; pub use self::mixnode::MixNode; pub use nomos::NomosNode; +use tempfile::TempDir; + +const LOGS_PREFIX: &str = "__logs"; + +fn create_tempdir() -> std::io::Result { + // It's easier to use the current location instead of OS-default tempfile location + // because Github Actions can easily access files in the current location using wildcard + // to upload them as artifacts. + tempfile::TempDir::new_in(std::env::current_dir()?) +} + +fn persist_tempdir(tempdir: &mut TempDir, label: &str) -> std::io::Result<()> { + println!( + "{}: persisting directory at {}", + label, + tempdir.path().display() + ); + // we need ownership of the dir to persist it + let dir = std::mem::replace(tempdir, tempfile::tempdir()?); + // a bit confusing but `into_path` persists the directory + let _ = dir.into_path(); + Ok(()) +} diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 984357a3..efcb7ddb 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -1,8 +1,10 @@ // std use std::net::SocketAddr; +use std::path::PathBuf; use std::process::{Child, Command, Stdio}; use std::time::Duration; // internal +use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; use crate::{adjust_timeout, get_available_port, ConsensusConfig, MixnetConfig, Node, SpawnConfig}; use carnot_consensus::{CarnotInfo, CarnotSettings}; use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings}; @@ -22,14 +24,13 @@ use nomos_node::{api::AxumBackendSettings, Config, Tx}; use fraction::Fraction; use once_cell::sync::Lazy; use rand::{thread_rng, Rng}; -use reqwest::Client; +use reqwest::{Client, Url}; use tempfile::NamedTempFile; static CLIENT: Lazy = Lazy::new(Client::new); const NOMOS_BIN: &str = "../target/debug/nomos-node"; const CARNOT_INFO_API: &str = "carnot/info"; const STORAGE_BLOCKS_API: &str = "storage/block"; -const LOGS_PREFIX: &str = "__logs"; const GET_BLOCKS_INFO: &str = "carnot/blocks"; pub struct NomosNode { @@ -42,14 +43,14 @@ pub struct NomosNode { impl Drop for NomosNode { fn drop(&mut self) { if std::thread::panicking() { - println!("persisting directory at {}", self._tempdir.path().display()); - // we need ownership of the dir to persist it - let dir = std::mem::replace(&mut self._tempdir, tempfile::tempdir().unwrap()); - // a bit confusing but `into_path` persists the directory - let _ = dir.into_path(); + if let Err(e) = persist_tempdir(&mut self._tempdir, "nomos-node") { + println!("failed to persist tempdir: {e}"); + } } - self.child.kill().unwrap(); + if let Err(e) = self.child.kill() { + println!("failed to kill the child process: {e}"); + } } } @@ -61,11 +62,7 @@ impl NomosNode { pub async fn spawn(mut config: Config) -> Self { // Waku stores the messages in a db file in the current dir, we need a different // directory for each node to avoid conflicts - // - // NOTE: It's easier to use the current location instead of OS-default tempfile location - // because Github Actions can easily access files in the current location using wildcard - // to upload them as artifacts. - let dir = tempfile::TempDir::new_in(std::env::current_dir().unwrap()).unwrap(); + let dir = create_tempdir().unwrap(); let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); @@ -115,6 +112,10 @@ impl NomosNode { } } + pub fn url(&self) -> Url { + format!("http://{}", self.addr).parse().unwrap() + } + pub async fn get_block(&self, id: BlockId) -> Option> { CLIENT .post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API)) @@ -201,10 +202,11 @@ impl NomosNode { impl Node for NomosNode { type ConsensusInfo = CarnotInfo; - async fn spawn_nodes(config: SpawnConfig) -> Vec { + async fn spawn_nodes(config: SpawnConfig, storage_dir: Option) -> Vec { match config { SpawnConfig::Star { consensus, mixnet } => { - let (next_leader_config, configs) = create_node_configs(consensus, mixnet); + let (next_leader_config, configs) = + create_node_configs(consensus, mixnet, storage_dir); let first_node_addr = node_address(&next_leader_config); let mut nodes = vec![Self::spawn(next_leader_config).await]; @@ -219,7 +221,8 @@ impl Node for NomosNode { nodes } SpawnConfig::Chain { consensus, mixnet } => { - let (next_leader_config, configs) = create_node_configs(consensus, mixnet); + let (next_leader_config, configs) = + create_node_configs(consensus, mixnet, storage_dir); let mut prev_node_addr = node_address(&next_leader_config); let mut nodes = vec![Self::spawn(next_leader_config).await]; @@ -253,6 +256,7 @@ impl Node for NomosNode { fn create_node_configs( consensus: ConsensusConfig, mut mixnet: MixnetConfig, + storage_dir: Option, ) -> (Config, Vec) { let mut ids = vec![[0; 32]; consensus.n_participants]; for id in &mut ids { @@ -269,6 +273,7 @@ fn create_node_configs( consensus.timeout, mixnet.node_configs.pop(), mixnet.topology.clone(), + storage_dir.clone(), ) }) .collect::>(); @@ -292,6 +297,7 @@ fn create_node_config( timeout: Duration, mixnet_node_config: Option, mixnet_topology: MixnetTopology, + storage_dir: Option, ) -> Config { let mixnet_client_mode = match mixnet_node_config { Some(node_config) => { @@ -341,8 +347,6 @@ fn create_node_config( cors_origins: vec![], }, }, - #[cfg(feature = "metrics")] - metrics: Default::default(), da: nomos_da::Settings { da_protocol: full_replication::Settings { voter: id, @@ -353,6 +357,17 @@ fn create_node_config( evicting_period: Duration::from_secs(60 * 60 * 24), // 1 day }, }, + storage: nomos_storage::backends::rocksdb::RocksBackendSettings { + db_path: { + let p = storage_dir + .unwrap_or("db".into()) + .join(NodeId::new(id).to_string()); + let _ = std::fs::create_dir_all(&p); + p + }, + read_only: false, + column_family: Some("blocks".into()), + }, }; config.network.backend.inner.port = get_available_port(); diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index 356c02a9..442146f4 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -1,19 +1,50 @@ +use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication}; use nomos_cli::{ - cmds::{disseminate::Disseminate, Command}, + api::da::get_blobs, + cmds::disseminate::Disseminate, da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings}, }; -use std::time::Duration; +use nomos_core::da::{blob::Blob as _, DaProtocol}; +use std::{io::Write, net::SocketAddr, path::PathBuf, time::Duration}; use tempfile::NamedTempFile; use tests::{ adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig, }; +const CLI_BIN: &str = "../target/debug/nomos-cli"; + +use std::process::Command; + const TIMEOUT_SECS: u64 = 20; -#[tokio::test] -async fn disseminate_blob() { +fn run_disseminate(disseminate: &Disseminate) { + let mut binding = Command::new(CLI_BIN); + let c = binding + .args(["disseminate", "--network-config"]) + .arg(disseminate.network_config.as_os_str()) + .arg("--node-addr") + .arg(disseminate.node_addr.as_ref().unwrap().as_str()); + + match (&disseminate.data, &disseminate.file) { + (Some(data), None) => c.args(["--data", &data]), + (None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]), + (_, _) => panic!("Either data or file needs to be provided, but not both"), + }; + + c.status().expect("failed to execute nomos cli"); +} + +async fn spawn_and_setup_config( + file: &mut NamedTempFile, + config: &mut Disseminate, + storage_dir: Option, +) -> ( + Vec, + FullReplication>, +) { let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await; - let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await; + let mut nodes = + NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), storage_dir).await; // kill the node so that we can reuse its network config nodes[1].stop(); @@ -21,35 +52,42 @@ async fn disseminate_blob() { let mut network_config = nodes[1].config().network.clone(); // use a new port because the old port is sometimes not closed immediately network_config.backend.inner.port = get_available_port(); - - let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); - serde_yaml::to_writer(&mut file, &network_config).unwrap(); - let cmd = Command::Disseminate(Disseminate { - data: "Hello World".into(), - timeout: 20, - network_config: config_path, - da_protocol: DaProtocolChoice { - da_protocol: Protocol::FullReplication, - settings: ProtocolSettings { - full_replication: FullReplicationSettings { - voter: [0; 32], - num_attestations: 1, - }, + serde_yaml::to_writer(file, &network_config).unwrap(); + let da_protocol = DaProtocolChoice { + da_protocol: Protocol::FullReplication, + settings: ProtocolSettings { + full_replication: FullReplicationSettings { + voter: [0; 32], + num_attestations: 1, }, }, - node_addr: Some( - format!( - "http://{}", - nodes[0].config().http.backend_settings.address.clone() - ) - .parse() - .unwrap(), - ), - output: None, - }); + }; - let thread = std::thread::spawn(move || cmd.run().unwrap()); + let da = + >>::try_from(da_protocol.clone()) + .unwrap(); + + config.timeout = 20; + config.network_config = config_path; + config.da_protocol = da_protocol; + config.node_addr = Some( + format!( + "http://{}", + nodes[0].config().http.backend_settings.address.clone() + ) + .parse() + .unwrap(), + ); + + (nodes, da) +} + +async fn disseminate(config: &mut Disseminate) { + let mut file = NamedTempFile::new().unwrap(); + let (nodes, da) = spawn_and_setup_config(&mut file, config, None).await; + run_disseminate(config); + // let thread = std::thread::spawn(move || cmd.run().unwrap()); tokio::time::timeout( adjust_timeout(Duration::from_secs(TIMEOUT_SECS)), @@ -58,7 +96,131 @@ async fn disseminate_blob() { .await .unwrap(); - thread.join().unwrap(); + let (blob, bytes) = if let Some(data) = &config.data { + let bytes = data.as_bytes().to_vec(); + (da.encode(bytes.clone())[0].hash(), bytes) + } else { + let bytes = std::fs::read(&config.file.as_ref().unwrap()).unwrap(); + (da.encode(bytes.clone())[0].hash(), bytes) + }; + + assert_eq!( + get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(), + bytes.clone() + ); +} + +#[tokio::test] +async fn disseminate_blob() { + let mut config = Disseminate { + data: Some("hello world".to_string()), + ..Default::default() + }; + disseminate(&mut config).await; +} + +#[tokio::test] +async fn disseminate_big_blob() { + const MSG_SIZE: usize = 1024; + let mut config = Disseminate { + data: std::iter::repeat(String::from("X")) + .take(MSG_SIZE) + .collect::>() + .join("") + .into(), + ..Default::default() + }; + disseminate(&mut config).await; +} + +#[tokio::test] +async fn disseminate_blob_from_file() { + let mut file = NamedTempFile::new().unwrap(); + file.write_all("hello world".as_bytes()).unwrap(); + + let mut config = Disseminate { + file: Some(file.path().to_path_buf()), + ..Default::default() + }; + disseminate(&mut config).await; +} + +fn run_explorer(explorer_api_addr: SocketAddr, db_path: PathBuf) { + let cfg = nomos_explorer::Config { + log: Default::default(), + api: nomos_api::ApiServiceSettings { + backend_settings: nomos_explorer::AxumBackendSettings { + address: explorer_api_addr, + cors_origins: Vec::new(), + }, + }, + storage: nomos_storage::backends::rocksdb::RocksBackendSettings { + db_path, + read_only: true, + column_family: Some("blocks".into()), + }, + }; + std::thread::spawn(move || nomos_explorer::Explorer::run(cfg).unwrap()); +} + +#[test] +fn explorer() { + let (tx, rx) = std::sync::mpsc::channel(); + let tx1 = tx.clone(); + let temp = tempfile::tempdir().unwrap(); + let db_path = temp.path().to_path_buf(); + + std::thread::spawn(move || { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let mut file = NamedTempFile::new().unwrap(); + let mut config = Disseminate::default(); + let (nodes, _) = + spawn_and_setup_config(&mut file, &mut config, Some(db_path)).await; + let explorer_db = nodes[0].config().storage.db_path.clone(); + + let explorer_api_addr = format!("127.0.0.1:{}", get_available_port()) + .parse() + .unwrap(); + run_explorer(explorer_api_addr, explorer_db); + + let c = nomos_cli::cmds::chat::NomosChat { + network_config: config.network_config.clone(), + da_protocol: config.da_protocol.clone(), + node: config.node_addr.clone().unwrap(), + explorer: format!("http://{}", explorer_api_addr).parse().unwrap(), + message: None, + author: None, + }; + let c1 = c.clone(); + + std::thread::Builder::new() + .name("user1".into()) + .spawn(move || { + let (rx1, app1) = c.run_app_without_terminal("user1".into()).unwrap(); + app1.send_message("Hello from user1".into()); + let msgs1 = rx1.recv().unwrap(); + assert!(!msgs1.is_empty()); + tx1.send(()).unwrap(); + }) + .unwrap(); + + std::thread::Builder::new() + .name("user2".into()) + .spawn(move || { + let (rx2, app2) = c1.run_app_without_terminal("user2".into()).unwrap(); + app2.send_message("Hello from user2".into()); + let msgs2 = rx2.recv().unwrap(); + assert!(!msgs2.is_empty()); + tx.send(()).unwrap(); + }) + .unwrap(); + }); + }); + + assert!(rx.recv().is_ok()); + assert!(rx.recv().is_ok()); } async fn wait_for_cert_in_mempool(node: &NomosNode) { diff --git a/tests/src/tests/happy.rs b/tests/src/tests/happy.rs index 133d2b10..aa5a2d79 100644 --- a/tests/src/tests/happy.rs +++ b/tests/src/tests/happy.rs @@ -72,21 +72,21 @@ async fn happy_test(nodes: &[NomosNode]) { #[tokio::test] async fn two_nodes_happy() { let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await; - let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await; + let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), None).await; happy_test(&nodes).await; } #[tokio::test] async fn ten_nodes_happy() { let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await; - let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(10, mixnet_config)).await; + let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(10, mixnet_config), None).await; happy_test(&nodes).await; } #[tokio::test] async fn test_get_block() { let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await; - let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await; + let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), None).await; happy_test(&nodes).await; let id = nodes[0].consensus_info().await.last_committed_block.id; tokio::time::timeout(Duration::from_secs(10), async { diff --git a/tests/src/tests/unhappy.rs b/tests/src/tests/unhappy.rs index 6e1a437a..b78da320 100644 --- a/tests/src/tests/unhappy.rs +++ b/tests/src/tests/unhappy.rs @@ -11,14 +11,17 @@ const DUMMY_NODE_ID: NodeId = NodeId::new([0u8; 32]); #[tokio::test] async fn ten_nodes_one_down() { let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await; - let mut nodes = NomosNode::spawn_nodes(SpawnConfig::Chain { - consensus: ConsensusConfig { - n_participants: 10, - threshold: Fraction::new(9u32, 10u32), - timeout: std::time::Duration::from_secs(5), + let mut nodes = NomosNode::spawn_nodes( + SpawnConfig::Chain { + consensus: ConsensusConfig { + n_participants: 10, + threshold: Fraction::new(9u32, 10u32), + timeout: std::time::Duration::from_secs(5), + }, + mixnet: mixnet_config, }, - mixnet: mixnet_config, - }) + None, + ) .await; let mut failed_node = nodes.pop().unwrap(); failed_node.stop();