From d527050633e11bb8790a676278e9669bd75beaa4 Mon Sep 17 00:00:00 2001 From: Corey Petty Date: Wed, 17 Jan 2024 09:51:43 -0500 Subject: [PATCH 01/12] fix: added openssl to shell.nix (#560) --- shell.nix | 1 + 1 file changed, 1 insertion(+) diff --git a/shell.nix b/shell.nix index 3c7016f5..fc909337 100644 --- a/shell.nix +++ b/shell.nix @@ -20,6 +20,7 @@ pkgs.mkShell { rust-bin.stable."1.75.0".default clang_14 llvmPackages_14.libclang + openssl ]; shellHook = '' export LIBCLANG_PATH="${pkgs.llvmPackages_14.libclang.lib}/lib"; From aeaf13fc88ce13524d0eb8c53f4aff037d45c0eb Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Fri, 19 Jan 2024 14:17:10 +0100 Subject: [PATCH 02/12] Limit in-flight requests in chat app (#562) --- nomos-cli/src/cmds/chat/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index c89619e4..d9a114d3 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -48,6 +48,8 @@ use ratatui::{ use tui_input::{backend::crossterm::EventHandler, Input}; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); +// Limit the number of maximum in-flight requests +const MAX_BUFFERED_REQUESTS: usize = 20; #[derive(Clone, Debug, Args)] /// The almost-instant messaging protocol. @@ -299,7 +301,7 @@ async fn fetch_new_messages( process_block_blobs(node, block, da_settings) }) - .buffer_unordered(new_blocks.len()) + .buffered(MAX_BUFFERED_REQUESTS) .collect::>() .await; From 16c97c07eccb326fa50b5be46c840306f6bfcd53 Mon Sep 17 00:00:00 2001 From: Giacomo Pasini <21265557+zeegomo@users.noreply.github.com> Date: Fri, 19 Jan 2024 19:08:13 +0100 Subject: [PATCH 03/12] Add test for big blob dissemination (#563) * Add tests for dissemination of big blob * Warn if a requested blob was not returned --- nomos-cli/src/da/retrieve.rs | 13 +++++- tests/src/nodes/nomos.rs | 6 ++- tests/src/tests/cli.rs | 84 ++++++++++++++++++++++++++++-------- 3 files changed, 83 insertions(+), 20 deletions(-) diff --git a/nomos-cli/src/da/retrieve.rs b/nomos-cli/src/da/retrieve.rs index 6723d410..af42fcec 100644 --- a/nomos-cli/src/da/retrieve.rs +++ b/nomos-cli/src/da/retrieve.rs @@ -20,5 +20,16 @@ pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result, E .await? .ok_or(Error::NotFound)?; - Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?) + let blobs = block.blobs().map(|cert| cert.blob()).collect::>(); + + if blobs.is_empty() { + return Ok(vec![]); + } + + let n_blobs = blobs.len(); + let resp = get_blobs(node, blobs).await?; + if resp.len() != n_blobs { + tracing::warn!("Only {}/{} blobs returned", resp.len(), n_blobs); + } + Ok(resp) } diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index 984357a3..d6c0cb33 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -22,7 +22,7 @@ use nomos_node::{api::AxumBackendSettings, Config, Tx}; use fraction::Fraction; use once_cell::sync::Lazy; use rand::{thread_rng, Rng}; -use reqwest::Client; +use reqwest::{Client, Url}; use tempfile::NamedTempFile; static CLIENT: Lazy = Lazy::new(Client::new); @@ -115,6 +115,10 @@ impl NomosNode { } } + pub fn url(&self) -> Url { + format!("http://{}", self.addr).parse().unwrap() + } + pub async fn get_block(&self, id: BlockId) -> Option> { CLIENT .post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API)) diff --git a/tests/src/tests/cli.rs b/tests/src/tests/cli.rs index 356c02a9..650bdcfd 100644 --- a/tests/src/tests/cli.rs +++ b/tests/src/tests/cli.rs @@ -1,17 +1,37 @@ +use full_replication::{AbsoluteNumber, Attestation, Blob, Certificate, FullReplication}; use nomos_cli::{ - cmds::{disseminate::Disseminate, Command}, + api::da::get_blobs, + cmds::disseminate::{self, Disseminate}, da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings}, }; -use std::time::Duration; +use nomos_core::da::{blob::Blob as _, DaProtocol}; +use std::{ + path::{self, PathBuf}, + time::Duration, +}; use tempfile::NamedTempFile; use tests::{ adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig, }; +const CLI_BIN: &str = "../target/debug/nomos-cli"; + +use std::process::Command; + const TIMEOUT_SECS: u64 = 20; -#[tokio::test] -async fn disseminate_blob() { +fn run_disseminate(disseminate: &Disseminate) { + Command::new(CLI_BIN) + .args(["disseminate", "--network-config"]) + .arg(disseminate.network_config.as_os_str()) + .args(["--data", &disseminate.data]) + .arg("--node-addr") + .arg(disseminate.node_addr.as_ref().unwrap().as_str()) + .status() + .expect("failed to execute nomos cli"); +} + +async fn disseminate(data: String) { let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await; let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await; @@ -25,19 +45,24 @@ async fn disseminate_blob() { let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); serde_yaml::to_writer(&mut file, &network_config).unwrap(); - let cmd = Command::Disseminate(Disseminate { - data: "Hello World".into(), - timeout: 20, - network_config: config_path, - da_protocol: DaProtocolChoice { - da_protocol: Protocol::FullReplication, - settings: ProtocolSettings { - full_replication: FullReplicationSettings { - voter: [0; 32], - num_attestations: 1, - }, + let da_protocol = DaProtocolChoice { + da_protocol: Protocol::FullReplication, + settings: ProtocolSettings { + full_replication: FullReplicationSettings { + voter: [0; 32], + num_attestations: 1, }, }, + }; + + let da = + >>::try_from(da_protocol.clone()) + .unwrap(); + let config = Disseminate { + data: data.clone(), + timeout: 20, + network_config: config_path, + da_protocol, node_addr: Some( format!( "http://{}", @@ -47,9 +72,10 @@ async fn disseminate_blob() { .unwrap(), ), output: None, - }); + }; - let thread = std::thread::spawn(move || cmd.run().unwrap()); + run_disseminate(&config); + // let thread = std::thread::spawn(move || cmd.run().unwrap()); tokio::time::timeout( adjust_timeout(Duration::from_secs(TIMEOUT_SECS)), @@ -58,7 +84,29 @@ async fn disseminate_blob() { .await .unwrap(); - thread.join().unwrap(); + let blob = da.encode(data.as_bytes().to_vec())[0].hash(); + + assert_eq!( + get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(), + data.as_bytes() + ); +} + +#[tokio::test] +async fn disseminate_blob() { + disseminate("hello world".to_string()).await; +} + +#[tokio::test] +async fn disseminate_big_blob() { + const MSG_SIZE: usize = 1024; + disseminate( + std::iter::repeat(String::from("X")) + .take(MSG_SIZE) + .collect::>() + .join(""), + ) + .await; } async fn wait_for_cert_in_mempool(node: &NomosNode) { From aa06080baafb2e1e82effb3527213b9787e5840c Mon Sep 17 00:00:00 2001 From: gusto Date: Tue, 23 Jan 2024 18:16:33 +0200 Subject: [PATCH 04/12] Docker compose metrics containers (#564) * Prometheus container and configuration for testnet * Add graphana related configuration and container * Use metrics feature in testnet nodes * Grafana configuration files and datasources --- compose.static.yml | 26 +++++++++++ nodes/nomos-node/config.yaml | 4 +- testnet/Dockerfile | 2 +- testnet/explorer_config.yaml | 10 ++++ testnet/monitoring/grafana/datasources.yaml | 11 +++++ testnet/monitoring/grafana/grafana.ini | 51 +++++++++++++++++++++ testnet/monitoring/grafana/plugins.env | 1 + testnet/monitoring/prometheus-config.yml | 14 ++++++ testnet/scripts/run_bootstrap_node.sh | 2 +- testnet/scripts/run_nomos_node.sh | 2 +- tests/src/nodes/nomos.rs | 2 - 11 files changed, 118 insertions(+), 7 deletions(-) create mode 100644 testnet/explorer_config.yaml create mode 100644 testnet/monitoring/grafana/datasources.yaml create mode 100644 testnet/monitoring/grafana/grafana.ini create mode 100644 testnet/monitoring/grafana/plugins.env create mode 100644 testnet/monitoring/prometheus-config.yml diff --git a/compose.static.yml b/compose.static.yml index 1f88299f..d0df5f78 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -148,3 +148,29 @@ services: - /usr/local/bin/etcd - --advertise-client-urls=http://etcd:2379 - --listen-client-urls=http://0.0.0.0:2379 + + prometheus: + container_name: prometheus + image: prom/prometheus:latest + volumes: + - ./testnet/monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:z + command: + - --config.file=/etc/prometheus/prometheus.yml + - --storage.tsdb.retention.time=7d + ports: + - 127.0.0.1:9090:9090 + restart: on-failure + + grafana: + container_name: grafana + image: grafana/grafana:latest + env_file: + - ./testnet/monitoring/grafana/plugins.env + volumes: + - ./testnet/monitoring/grafana/grafana.ini:/etc/grafana/grafana.ini + ./testnet/monitoring/grafana/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml + ports: + - 9091:3000 + restart: on-failure + depends_on: + - prometheus diff --git a/nodes/nomos-node/config.yaml b/nodes/nomos-node/config.yaml index 808dcf98..f63aca7c 100644 --- a/nodes/nomos-node/config.yaml +++ b/nodes/nomos-node/config.yaml @@ -6,9 +6,9 @@ consensus: private_key: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] fountain_settings: null overlay_settings: - nodes: [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]] + nodes: ["0000000000000000000000000000000000000000000000000000000000000000", "0000000000000000000000000000000000000000000000000000000000000001"] number_of_committees: 1 - current_leader: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + current_leader: 0000000000000000000000000000000000000000000000000000000000000000 leader: cur: 0 committee_membership: !Sad diff --git a/testnet/Dockerfile b/testnet/Dockerfile index b81e172d..b18997ba 100644 --- a/testnet/Dockerfile +++ b/testnet/Dockerfile @@ -13,7 +13,7 @@ RUN apt-get update && apt-get install -yq \ WORKDIR /nomos COPY . . -RUN cargo build --release --all +RUN cargo build --release --all --features metrics # NODE IMAGE ---------------------------------------------------------- diff --git a/testnet/explorer_config.yaml b/testnet/explorer_config.yaml new file mode 100644 index 00000000..a5b47758 --- /dev/null +++ b/testnet/explorer_config.yaml @@ -0,0 +1,10 @@ +log: + backend: "Stdout" + format: "Json" + level: "info" + +api: + backend_settings: + address: 0.0.0.0:9090 + cors_origins: [] + diff --git a/testnet/monitoring/grafana/datasources.yaml b/testnet/monitoring/grafana/datasources.yaml new file mode 100644 index 00000000..2d99f845 --- /dev/null +++ b/testnet/monitoring/grafana/datasources.yaml @@ -0,0 +1,11 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + org_id: 1 + url: http://prometheus:9090 + is_default: true + version: 1 + editable: true diff --git a/testnet/monitoring/grafana/grafana.ini b/testnet/monitoring/grafana/grafana.ini new file mode 100644 index 00000000..3c60e138 --- /dev/null +++ b/testnet/monitoring/grafana/grafana.ini @@ -0,0 +1,51 @@ +instance_name = nomos dashboard + +;[dashboards.json] +;enabled = true +;path = /home/git/grafana/grafana-dashboards/dashboards + + +#################################### Auth ########################## +[auth] +disable_login_form = false + +#################################### Anonymous Auth ########################## +[auth.anonymous] +# enable anonymous access +enabled = true + +# specify organization name that should be used for unauthenticated users +;org_name = Public + +# specify role for unauthenticated users +; org_role = Admin +org_role = Viewer + +;[security] +;admin_user = ocr +;admin_password = ocr + +;[users] +# disable user signup / registration +;allow_sign_up = false + +# Set to true to automatically assign new users to the default organization (id 1) +;auto_assign_org = true + +# Default role new users will be automatically assigned (if disabled above is set to true) +;auto_assign_org_role = Viewer + +#################################### SMTP / Emailing ########################## +;[smtp] +;enabled = false +;host = localhost:25 +;user = +;password = +;cert_file = +;key_file = +;skip_verify = false +;from_address = admin@grafana.localhost + +;[emails] +;welcome_email_on_sign_up = false + diff --git a/testnet/monitoring/grafana/plugins.env b/testnet/monitoring/grafana/plugins.env new file mode 100644 index 00000000..2a4b4876 --- /dev/null +++ b/testnet/monitoring/grafana/plugins.env @@ -0,0 +1 @@ +GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel,yesoreyeram-boomtheme-panel,briangann-gauge-panel,pierosavi-imageit-panel,bessler-pictureit-panel,vonage-status-panel diff --git a/testnet/monitoring/prometheus-config.yml b/testnet/monitoring/prometheus-config.yml new file mode 100644 index 00000000..7be135f9 --- /dev/null +++ b/testnet/monitoring/prometheus-config.yml @@ -0,0 +1,14 @@ +global: + scrape_interval: 15s + evaluation_interval: 15s + external_labels: + monitor: "Monitoring" + +scrape_configs: + - job_name: "libp2p" + static_configs: + - targets: + - bootstrap:18080 + - libp2p_node_1:18080 + - libp2p_node_2:18080 + - libp2p_node_3:18080 diff --git a/testnet/scripts/run_bootstrap_node.sh b/testnet/scripts/run_bootstrap_node.sh index 180940ab..525f7f10 100755 --- a/testnet/scripts/run_bootstrap_node.sh +++ b/testnet/scripts/run_bootstrap_node.sh @@ -17,4 +17,4 @@ echo "CONSENSUS_PRIV_KEY: ${CONSENSUS_PRIV_KEY}" echo "DA_VOTER: ${DA_VOTER}" echo "OVERLAY_NODES: ${OVERLAY_NODES}" -exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml +exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml --with-metrics diff --git a/testnet/scripts/run_nomos_node.sh b/testnet/scripts/run_nomos_node.sh index 742a604f..ea462f95 100755 --- a/testnet/scripts/run_nomos_node.sh +++ b/testnet/scripts/run_nomos_node.sh @@ -33,4 +33,4 @@ echo "DA_VOTER: ${DA_VOTER}" echo "OVERLAY_NODES: ${OVERLAY_NODES}" echo "NET_INITIAL_PEERS: ${NET_INITIAL_PEERS}" -exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml +exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml --with-metrics diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index d6c0cb33..af656493 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -345,8 +345,6 @@ fn create_node_config( cors_origins: vec![], }, }, - #[cfg(feature = "metrics")] - metrics: Default::default(), da: nomos_da::Settings { da_protocol: full_replication::Settings { voter: id, From f74159031576c76ed3dab00b3accdc7bbdba0d55 Mon Sep 17 00:00:00 2001 From: gusto Date: Tue, 23 Jan 2024 18:17:05 +0200 Subject: [PATCH 05/12] Disseminate app file input (#565) * Accept file option in dissemination app * File dissemination tests --- nomos-cli/src/cmds/disseminate/mod.rs | 17 ++++- nomos-cli/src/da/disseminate.rs | 7 +- tests/src/tests/cli.rs | 94 +++++++++++++++++---------- 3 files changed, 77 insertions(+), 41 deletions(-) diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index 27f511fe..8e560fee 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -9,11 +9,11 @@ use reqwest::Url; use std::{path::PathBuf, sync::Arc, time::Duration}; use tokio::sync::Mutex; -#[derive(Args, Debug)] +#[derive(Args, Debug, Default)] pub struct Disseminate { // TODO: accept bytes #[clap(short, long)] - pub data: String, + pub data: Option, /// Path to the network config file #[clap(short, long)] pub network_config: PathBuf, @@ -30,6 +30,9 @@ pub struct Disseminate { /// File to write the certificate to, if present. #[clap(long)] pub output: Option, + /// File to disseminate + #[clap(short, long)] + pub file: Option, } impl Disseminate { @@ -41,7 +44,15 @@ impl Disseminate { as ServiceData>::Settings, >(std::fs::File::open(&self.network_config)?)?; let (status_updates, rx) = std::sync::mpsc::channel(); - let bytes: Box<[u8]> = self.data.clone().as_bytes().into(); + let bytes: Box<[u8]> = match (&self.data, &self.file) { + (Some(data), None) => data.clone().as_bytes().into(), + (None, Some(file_path)) => { + let file_bytes = std::fs::read(file_path)?; + file_bytes.into_boxed_slice() + } + (Some(_), Some(_)) => return Err("Cannot specify both data and file".into()), + (None, None) => return Err("Either data or file must be specified".into()), + }; let timeout = Duration::from_secs(self.timeout); let da_protocol = self.da_protocol.clone(); let node_addr = self.node_addr.clone(); diff --git a/nomos-cli/src/da/disseminate.rs b/nomos-cli/src/da/disseminate.rs index 5386496f..b28c704f 100644 --- a/nomos-cli/src/da/disseminate.rs +++ b/nomos-cli/src/da/disseminate.rs @@ -205,7 +205,7 @@ impl ServiceCore for DisseminateService { // protocols, but only the one chosen will be used. // We can enforce only sensible combinations of protocol/settings // are specified by using special clap directives -#[derive(Clone, Debug, Args)] +#[derive(Clone, Debug, Args, Default)] pub struct DaProtocolChoice { #[clap(long, default_value = "full-replication")] pub da_protocol: Protocol, @@ -227,14 +227,15 @@ impl TryFrom for FullReplication c.args(["--data", &data]), + (None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]), + (_, _) => panic!("Either data or file needs to be provided, but not both"), + }; + + c.status().expect("failed to execute nomos cli"); } -async fn disseminate(data: String) { +async fn disseminate(config: &mut Disseminate) { let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await; let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await; @@ -58,21 +61,18 @@ async fn disseminate(data: String) { let da = >>::try_from(da_protocol.clone()) .unwrap(); - let config = Disseminate { - data: data.clone(), - timeout: 20, - network_config: config_path, - da_protocol, - node_addr: Some( - format!( - "http://{}", - nodes[0].config().http.backend_settings.address.clone() - ) - .parse() - .unwrap(), - ), - output: None, - }; + + config.timeout = 20; + config.network_config = config_path; + config.da_protocol = da_protocol; + config.node_addr = Some( + format!( + "http://{}", + nodes[0].config().http.backend_settings.address.clone() + ) + .parse() + .unwrap(), + ); run_disseminate(&config); // let thread = std::thread::spawn(move || cmd.run().unwrap()); @@ -84,29 +84,53 @@ async fn disseminate(data: String) { .await .unwrap(); - let blob = da.encode(data.as_bytes().to_vec())[0].hash(); + let (blob, bytes) = if let Some(data) = &config.data { + let bytes = data.as_bytes().to_vec(); + (da.encode(bytes.clone())[0].hash(), bytes) + } else { + let bytes = std::fs::read(&config.file.as_ref().unwrap()).unwrap(); + (da.encode(bytes.clone())[0].hash(), bytes) + }; assert_eq!( get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(), - data.as_bytes() + bytes.clone() ); } #[tokio::test] async fn disseminate_blob() { - disseminate("hello world".to_string()).await; + let mut config = Disseminate { + data: Some("hello world".to_string()), + ..Default::default() + }; + disseminate(&mut config).await; } #[tokio::test] async fn disseminate_big_blob() { const MSG_SIZE: usize = 1024; - disseminate( - std::iter::repeat(String::from("X")) + let mut config = Disseminate { + data: std::iter::repeat(String::from("X")) .take(MSG_SIZE) .collect::>() - .join(""), - ) - .await; + .join("") + .into(), + ..Default::default() + }; + disseminate(&mut config).await; +} + +#[tokio::test] +async fn disseminate_blob_from_file() { + let mut file = NamedTempFile::new().unwrap(); + file.write_all("hello world".as_bytes()).unwrap(); + + let mut config = Disseminate { + file: Some(file.path().to_path_buf()), + ..Default::default() + }; + disseminate(&mut config).await; } async fn wait_for_cert_in_mempool(node: &NomosNode) { From 42d6816b1b6604ee6fe915add32fe9875bb15f6b Mon Sep 17 00:00:00 2001 From: Youngjoon Lee <5462944+youngjoon-lee@users.noreply.github.com> Date: Wed, 24 Jan 2024 16:15:11 +0900 Subject: [PATCH 06/12] Write mixnode logs to files in integration tests (#510) --- tests/src/nodes/mixnode.rs | 27 ++++++++++++++++++++++++--- tests/src/nodes/mod.rs | 23 +++++++++++++++++++++++ tests/src/nodes/nomos.rs | 20 ++++++++------------ 3 files changed, 55 insertions(+), 15 deletions(-) diff --git a/tests/src/nodes/mixnode.rs b/tests/src/nodes/mixnode.rs index 74678364..87213c1b 100644 --- a/tests/src/nodes/mixnode.rs +++ b/tests/src/nodes/mixnode.rs @@ -4,8 +4,10 @@ use std::{ time::Duration, }; +use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE}; use mixnet_topology::{Layer, MixnetTopology, Node}; +use nomos_log::{LoggerBackend, LoggerFormat}; use rand::{thread_rng, RngCore}; use tempfile::NamedTempFile; @@ -14,21 +16,37 @@ use crate::{get_available_port, MixnetConfig}; const MIXNODE_BIN: &str = "../target/debug/mixnode"; pub struct MixNode { + _tempdir: tempfile::TempDir, child: Child, } impl Drop for MixNode { fn drop(&mut self) { - self.child.kill().unwrap(); + if std::thread::panicking() { + if let Err(e) = persist_tempdir(&mut self._tempdir, "mixnode") { + println!("failed to persist tempdir: {e}"); + } + } + + if let Err(e) = self.child.kill() { + println!("failed to kill the child process: {e}"); + } } } impl MixNode { pub async fn spawn(config: MixnetNodeConfig) -> Self { - let config = mixnode::Config { + let dir = create_tempdir().unwrap(); + + let mut config = mixnode::Config { mixnode: config, log: Default::default(), }; + config.log.backend = LoggerBackend::File { + directory: dir.path().to_owned(), + prefix: Some(LOGS_PREFIX.into()), + }; + config.log.format = LoggerFormat::Json; let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); @@ -43,7 +61,10 @@ impl MixNode { //TODO: use a sophisticated way to wait until the node is ready tokio::time::sleep(Duration::from_secs(1)).await; - Self { child } + Self { + _tempdir: dir, + child, + } } pub async fn spawn_nodes(num_nodes: usize) -> (Vec, MixnetConfig) { diff --git a/tests/src/nodes/mod.rs b/tests/src/nodes/mod.rs index 05c24ae1..bc2a9e61 100644 --- a/tests/src/nodes/mod.rs +++ b/tests/src/nodes/mod.rs @@ -3,3 +3,26 @@ pub mod nomos; pub use self::mixnode::MixNode; pub use nomos::NomosNode; +use tempfile::TempDir; + +const LOGS_PREFIX: &str = "__logs"; + +fn create_tempdir() -> std::io::Result { + // It's easier to use the current location instead of OS-default tempfile location + // because Github Actions can easily access files in the current location using wildcard + // to upload them as artifacts. + tempfile::TempDir::new_in(std::env::current_dir()?) +} + +fn persist_tempdir(tempdir: &mut TempDir, label: &str) -> std::io::Result<()> { + println!( + "{}: persisting directory at {}", + label, + tempdir.path().display() + ); + // we need ownership of the dir to persist it + let dir = std::mem::replace(tempdir, tempfile::tempdir()?); + // a bit confusing but `into_path` persists the directory + let _ = dir.into_path(); + Ok(()) +} diff --git a/tests/src/nodes/nomos.rs b/tests/src/nodes/nomos.rs index af656493..b99c0e61 100644 --- a/tests/src/nodes/nomos.rs +++ b/tests/src/nodes/nomos.rs @@ -3,6 +3,7 @@ use std::net::SocketAddr; use std::process::{Child, Command, Stdio}; use std::time::Duration; // internal +use super::{create_tempdir, persist_tempdir, LOGS_PREFIX}; use crate::{adjust_timeout, get_available_port, ConsensusConfig, MixnetConfig, Node, SpawnConfig}; use carnot_consensus::{CarnotInfo, CarnotSettings}; use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings}; @@ -29,7 +30,6 @@ static CLIENT: Lazy = Lazy::new(Client::new); const NOMOS_BIN: &str = "../target/debug/nomos-node"; const CARNOT_INFO_API: &str = "carnot/info"; const STORAGE_BLOCKS_API: &str = "storage/block"; -const LOGS_PREFIX: &str = "__logs"; const GET_BLOCKS_INFO: &str = "carnot/blocks"; pub struct NomosNode { @@ -42,14 +42,14 @@ pub struct NomosNode { impl Drop for NomosNode { fn drop(&mut self) { if std::thread::panicking() { - println!("persisting directory at {}", self._tempdir.path().display()); - // we need ownership of the dir to persist it - let dir = std::mem::replace(&mut self._tempdir, tempfile::tempdir().unwrap()); - // a bit confusing but `into_path` persists the directory - let _ = dir.into_path(); + if let Err(e) = persist_tempdir(&mut self._tempdir, "nomos-node") { + println!("failed to persist tempdir: {e}"); + } } - self.child.kill().unwrap(); + if let Err(e) = self.child.kill() { + println!("failed to kill the child process: {e}"); + } } } @@ -61,11 +61,7 @@ impl NomosNode { pub async fn spawn(mut config: Config) -> Self { // Waku stores the messages in a db file in the current dir, we need a different // directory for each node to avoid conflicts - // - // NOTE: It's easier to use the current location instead of OS-default tempfile location - // because Github Actions can easily access files in the current location using wildcard - // to upload them as artifacts. - let dir = tempfile::TempDir::new_in(std::env::current_dir().unwrap()).unwrap(); + let dir = create_tempdir().unwrap(); let mut file = NamedTempFile::new().unwrap(); let config_path = file.path().to_owned(); From f33c2613fbc1e46892fbe20df3e968deb2ae3dda Mon Sep 17 00:00:00 2001 From: gusto Date: Mon, 29 Jan 2024 09:43:18 +0200 Subject: [PATCH 07/12] Nomos chat app non interactive (#567) * Add option to send chat message non iteractively via nomos cli * Use clap to check if data or file is set * Require author if message flag set --- nomos-cli/src/cmds/chat/mod.rs | 45 +++++++++++++++++++++++---- nomos-cli/src/cmds/disseminate/mod.rs | 18 +++++------ 2 files changed, 48 insertions(+), 15 deletions(-) diff --git a/nomos-cli/src/cmds/chat/mod.rs b/nomos-cli/src/cmds/chat/mod.rs index d9a114d3..5242c191 100644 --- a/nomos-cli/src/cmds/chat/mod.rs +++ b/nomos-cli/src/cmds/chat/mod.rs @@ -63,6 +63,12 @@ pub struct NomosChat { /// The node to connect to to fetch blocks and blobs #[clap(long)] pub node: Url, + /// Author for non interactive message formation + #[clap(long, requires("message"))] + pub author: Option, + /// Message for non interactive message formation + #[clap(long, requires("author"))] + pub message: Option, } pub struct App { @@ -86,12 +92,6 @@ impl NomosChat { as ServiceData>::Settings, >(std::fs::File::open(&self.network_config)?)?; let da_protocol = self.da_protocol.clone(); - // setup terminal - enable_raw_mode()?; - let mut stdout = io::stdout(); - execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; - let backend = CrosstermBackend::new(stdout); - let mut terminal = Terminal::new(backend)?; let node_addr = Some(self.node.clone()); @@ -125,6 +125,21 @@ impl NomosChat { .wait_finished() }); + if let Some(author) = self.author.as_ref() { + let message = self + .message + .as_ref() + .expect("Should be available if author is set"); + return run_once(author, message, payload_sender); + } + + // setup terminal + enable_raw_mode()?; + let mut stdout = io::stdout(); + execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; + let backend = CrosstermBackend::new(stdout); + let mut terminal = Terminal::new(backend)?; + let app = App { input: Input::default(), username: None, @@ -154,6 +169,24 @@ impl NomosChat { } } +fn run_once( + author: &str, + message: &str, + payload_sender: UnboundedSender>, +) -> Result<(), Box> { + payload_sender.send( + wire::serialize(&ChatMessage { + author: author.to_string(), + message: message.to_string(), + _nonce: rand::random(), + }) + .unwrap() + .into(), + )?; + + Ok(()) +} + fn run_app(terminal: &mut Terminal, mut app: App) { let (message_tx, message_rx) = std::sync::mpsc::channel(); let node = app.node.clone(); diff --git a/nomos-cli/src/cmds/disseminate/mod.rs b/nomos-cli/src/cmds/disseminate/mod.rs index 8e560fee..fe8ebc6d 100644 --- a/nomos-cli/src/cmds/disseminate/mod.rs +++ b/nomos-cli/src/cmds/disseminate/mod.rs @@ -12,7 +12,7 @@ use tokio::sync::Mutex; #[derive(Args, Debug, Default)] pub struct Disseminate { // TODO: accept bytes - #[clap(short, long)] + #[clap(short, long, required_unless_present("file"))] pub data: Option, /// Path to the network config file #[clap(short, long)] @@ -44,15 +44,15 @@ impl Disseminate { as ServiceData>::Settings, >(std::fs::File::open(&self.network_config)?)?; let (status_updates, rx) = std::sync::mpsc::channel(); - let bytes: Box<[u8]> = match (&self.data, &self.file) { - (Some(data), None) => data.clone().as_bytes().into(), - (None, Some(file_path)) => { - let file_bytes = std::fs::read(file_path)?; - file_bytes.into_boxed_slice() - } - (Some(_), Some(_)) => return Err("Cannot specify both data and file".into()), - (None, None) => return Err("Either data or file must be specified".into()), + + let bytes: Box<[u8]> = if let Some(data) = &self.data { + data.clone().as_bytes().into() + } else { + let file_path = self.file.as_ref().unwrap(); + let file_bytes = std::fs::read(file_path)?; + file_bytes.into_boxed_slice() }; + let timeout = Duration::from_secs(self.timeout); let da_protocol = self.da_protocol.clone(); let node_addr = self.node_addr.clone(); From 4722b5449317229ac50d2ac3d1dcb07bc988cc86 Mon Sep 17 00:00:00 2001 From: gusto Date: Mon, 29 Jan 2024 11:26:54 +0200 Subject: [PATCH 08/12] Use chat app as testnet bot (#568) --- compose.static.yml | 7 +++++++ testnet/Dockerfile | 1 + testnet/scripts/run_nomos_bot.sh | 9 +++++++++ 3 files changed, 17 insertions(+) create mode 100755 testnet/scripts/run_nomos_bot.sh diff --git a/compose.static.yml b/compose.static.yml index d0df5f78..0304b9b9 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -139,6 +139,13 @@ services: entrypoint: /usr/bin/mixnode command: /etc/nomos/mixnode_config.yaml + chatbot: + container_name: chatbot + build: + context: . + dockerfile: testnet/Dockerfile + entrypoint: /etc/nomos/scripts/run_nomos_bot.sh + etcd: container_name: etcd image: quay.io/coreos/etcd:v3.4.15 diff --git a/testnet/Dockerfile b/testnet/Dockerfile index b18997ba..6dea08d4 100644 --- a/testnet/Dockerfile +++ b/testnet/Dockerfile @@ -27,6 +27,7 @@ LABEL maintainer="augustinas@status.im" \ EXPOSE 3000 8080 9000 60000 COPY --from=builder /nomos/target/release/nomos-node /usr/bin/nomos-node +COPY --from=builder /nomos/target/release/nomos-cli /usr/bin/nomos-cli COPY --from=builder /nomos/target/release/mixnode /usr/bin/mixnode COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl COPY nodes/nomos-node/config.yaml /etc/nomos/config.yaml diff --git a/testnet/scripts/run_nomos_bot.sh b/testnet/scripts/run_nomos_bot.sh new file mode 100755 index 00000000..2be7a276 --- /dev/null +++ b/testnet/scripts/run_nomos_bot.sh @@ -0,0 +1,9 @@ +#!/bin/sh + +echo "I am a container ${HOSTNAME} bot" + +while true +do + /usr/bin/nomos-cli chat --author nomos-ghost --message "$(date +%H:%M:%S) ~ ping" --network-config /etc/nomos/cli_config.yml --node-addr http://bootstrap:18080 + sleep 10 +done From 3f2905817a92f0e8714b273183b2c0515004ea7f Mon Sep 17 00:00:00 2001 From: gusto Date: Tue, 30 Jan 2024 10:04:11 +0200 Subject: [PATCH 09/12] Add testnet cli_config and volume (#574) * Add cli_config and volume * Grafana config file mode required on linux hosts * Pin minideb version for openssl compatibility with the build image --- compose.static.yml | 6 +++-- testnet/Dockerfile | 7 +---- testnet/cli_config.yaml | 44 ++++++++++++++++++++++++++++++++ testnet/scripts/run_nomos_bot.sh | 2 +- 4 files changed, 50 insertions(+), 9 deletions(-) create mode 100644 testnet/cli_config.yaml diff --git a/compose.static.yml b/compose.static.yml index 0304b9b9..fb17ec89 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -144,6 +144,8 @@ services: build: context: . dockerfile: testnet/Dockerfile + volumes: + - ./testnet:/etc/nomos entrypoint: /etc/nomos/scripts/run_nomos_bot.sh etcd: @@ -174,8 +176,8 @@ services: env_file: - ./testnet/monitoring/grafana/plugins.env volumes: - - ./testnet/monitoring/grafana/grafana.ini:/etc/grafana/grafana.ini - ./testnet/monitoring/grafana/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml + - ./testnet/monitoring/grafana/grafana.ini:/etc/grafana/grafana.ini:z + - ./testnet/monitoring/grafana/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:z ports: - 9091:3000 restart: on-failure diff --git a/testnet/Dockerfile b/testnet/Dockerfile index 6dea08d4..e901b96d 100644 --- a/testnet/Dockerfile +++ b/testnet/Dockerfile @@ -2,10 +2,6 @@ FROM rust:1.75.0-slim-bullseye AS builder -# Using backports for go 1.19 -RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \ - >> /etc/apt/sources.list - # Dependecies for publishing documentation. RUN apt-get update && apt-get install -yq \ git clang etcd-client libssl-dev pkg-config @@ -17,7 +13,7 @@ RUN cargo build --release --all --features metrics # NODE IMAGE ---------------------------------------------------------- -FROM bitnami/minideb:latest +FROM bitnami/minideb:bullseye LABEL maintainer="augustinas@status.im" \ source="https://github.com/logos-co/nomos-node" \ @@ -31,6 +27,5 @@ COPY --from=builder /nomos/target/release/nomos-cli /usr/bin/nomos-cli COPY --from=builder /nomos/target/release/mixnode /usr/bin/mixnode COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl COPY nodes/nomos-node/config.yaml /etc/nomos/config.yaml -RUN install_packages python3 python3-etcd3 ENTRYPOINT ["/usr/bin/nomos-node"] diff --git a/testnet/cli_config.yaml b/testnet/cli_config.yaml new file mode 100644 index 00000000..7f4eef37 --- /dev/null +++ b/testnet/cli_config.yaml @@ -0,0 +1,44 @@ +backend: + host: 0.0.0.0 + port: 4007 + log_level: "fatal" + node_key: "0000000000000000000000000000000000000000000000000000000000000667" + discV5BootstrapNodes: [] + initial_peers: ["/dns/bootstrap/tcp/3000"] + relayTopics: [] + # Mixclient configuration to communicate with mixnodes. + # The libp2p network backend always requires this mixclient configuration + # (cannot be disabled for now). + mixnet_client: + # A mixclient mode. For details, see the documentation of the "mixnet" crate. + # - Sender + # - !SenderReceiver [mixnode_client_listen_address] + mode: Sender + # A mixnet topology, which contains the information of all mixnodes in the mixnet. + # (The topology is static for now.) + topology: + # Each mixnet layer consists of a list of mixnodes. + layers: + - nodes: + - address: mix-node-0:7707 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + - nodes: + - address: mix-node-1:7717 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + - nodes: + - address: mix-node-2:7727 # A listen address of the mixnode + public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715" + + # A max number of connections that will stay connected to mixnodes in the first mixnet layer. + connection_pool_size: 255 + max_retries: 5 + retry_delay: + secs: 1 + nanos: 0 + # A range of total delay that will be set to each Sphinx packets + # sent to the mixnet for timing obfuscation. + # Panics if start > end. + mixnet_delay: + start: "0ms" + end: "0ms" diff --git a/testnet/scripts/run_nomos_bot.sh b/testnet/scripts/run_nomos_bot.sh index 2be7a276..c93aa01f 100755 --- a/testnet/scripts/run_nomos_bot.sh +++ b/testnet/scripts/run_nomos_bot.sh @@ -4,6 +4,6 @@ echo "I am a container ${HOSTNAME} bot" while true do - /usr/bin/nomos-cli chat --author nomos-ghost --message "$(date +%H:%M:%S) ~ ping" --network-config /etc/nomos/cli_config.yml --node-addr http://bootstrap:18080 + /usr/bin/nomos-cli chat --author nomos-ghost --message "$(date +%H:%M:%S) ~ ping" --network-config /etc/nomos/cli_config.yaml --node http://bootstrap:18080 sleep 10 done From bafc6dfb1fb0f218ad3b8a06c3b38ea2750aa541 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Soko=C5=82owski?= Date: Wed, 14 Feb 2024 16:46:15 +0100 Subject: [PATCH 10/12] chore(ci): reuse discord send function from library MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Provides more info and requires less boilerplate. Signed-off-by: Jakub SokoĊ‚owski --- ci/Jenkinsfile.nightly.fuzzy | 21 +++++++++++--------- ci/Jenkinsfile.nightly.integration | 24 ++++++++++------------ ci/discord.groovy | 32 ------------------------------ 3 files changed, 22 insertions(+), 55 deletions(-) delete mode 100644 ci/discord.groovy diff --git a/ci/Jenkinsfile.nightly.fuzzy b/ci/Jenkinsfile.nightly.fuzzy index d1720d55..2331754e 100644 --- a/ci/Jenkinsfile.nightly.fuzzy +++ b/ci/Jenkinsfile.nightly.fuzzy @@ -1,3 +1,6 @@ +#!/usr/bin/env groovy +library 'status-jenkins-lib@v1.8.6' + pipeline { agent { dockerfile { @@ -43,16 +46,16 @@ pipeline { } post { - failure { + always { script { - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: 'Nightly Fuzztest Failed. Regression files archived as job artifacts') - } - } - success { - script { - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: 'Nightly Fuzztest Passed') + discord.send( + header: ( + currentBuild.currentResult == 'SUCCESS' ? + 'Nightly Fuzztest Passed' : + 'Nightly Fuzztest Failed. Regression files archived as job artifacts' + ), + cred: 'nomos-node-discord-commits-webhook', + ) } } cleanup { cleanWs() } diff --git a/ci/Jenkinsfile.nightly.integration b/ci/Jenkinsfile.nightly.integration index 214a4ebb..9dff6b1a 100644 --- a/ci/Jenkinsfile.nightly.integration +++ b/ci/Jenkinsfile.nightly.integration @@ -1,3 +1,6 @@ +#!/usr/bin/env groovy +library 'status-jenkins-lib@v1.8.6' + pipeline { agent { dockerfile { @@ -67,20 +70,13 @@ pipeline { } post { - failure { - script { - def report = readFile("${WORKSPACE}/report.txt").trim() - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: "Nightly Integration Tests Failed: ${report}") - } - } - success { - script { - def report = readFile('report.txt').trim() - def discord = load "${WORKSPACE}/ci/discord.groovy" - discord.sendMessage(header: "Nightly Integration Tests Passed: ${report}") - } - } + always { script { + def report = readFile("${WORKSPACE}/report.txt").trim() + discord.send( + header: "Nightly Integration Tests ${currentBuild.currentResult}: ${report}", + cred: 'nomos-node-discord-commits-webhook', + ) + } } cleanup { cleanWs() } } } diff --git a/ci/discord.groovy b/ci/discord.groovy deleted file mode 100644 index dd43d09d..00000000 --- a/ci/discord.groovy +++ /dev/null @@ -1,32 +0,0 @@ -def sendMessage(Map args=[:]) { - def opts = [ - header: args.header ?: 'Build succeeded', - title: args.title ?: "${env.JOB_NAME}#${env.BUILD_NUMBER}", - cred: args.cred ?: 'nomos-node-discord-commits-webhook', - ] - def repo = [ - url: GIT_URL.minus('.git'), - branch: GIT_BRANCH.minus('origin/'), - commit: GIT_COMMIT.take(8), - ] - withCredentials([ - string( - credentialsId: opts.cred, - variable: 'DISCORD_WEBHOOK', - ), - ]) { - discordSend( - link: env.BUILD_URL, - result: currentBuild.currentResult, - webhookURL: env.DISCORD_WEBHOOK, - title: opts.title, - description: """ - ${opts.header} - Branch: [`${repo.branch}`](${repo.url}/commits/${repo.branch}) - Commit: [`${repo.commit}`](${repo.url}/commit/${repo.commit}) - """, - ) - } -} - -return this From 818ea2f57f946ff15abd0565084f5eba0b9a137e Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 26 Feb 2024 02:05:04 +0800 Subject: [PATCH 11/12] finish rocksdb storage service --- nomos-services/storage/Cargo.toml | 1 + nomos-services/storage/src/backends/mod.rs | 3 + .../storage/src/backends/rocksdb.rs | 253 ++++++++++++++++++ 3 files changed, 257 insertions(+) create mode 100644 nomos-services/storage/src/backends/rocksdb.rs diff --git a/nomos-services/storage/Cargo.toml b/nomos-services/storage/Cargo.toml index b2a92743..7a992a53 100644 --- a/nomos-services/storage/Cargo.toml +++ b/nomos-services/storage/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.2" overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" } serde = "1.0" sled = { version = "0.34", optional = true } +rocksdb = { version = "0.22", optional = true } thiserror = "1.0" tracing = "0.1" diff --git a/nomos-services/storage/src/backends/mod.rs b/nomos-services/storage/src/backends/mod.rs index ac640857..2594b9ca 100644 --- a/nomos-services/storage/src/backends/mod.rs +++ b/nomos-services/storage/src/backends/mod.rs @@ -3,6 +3,9 @@ pub mod mock; #[cfg(feature = "sled")] pub mod sled; +#[cfg(feature = "rocksdb")] +pub mod rocksdb; + // std use std::error::Error; // crates diff --git a/nomos-services/storage/src/backends/rocksdb.rs b/nomos-services/storage/src/backends/rocksdb.rs new file mode 100644 index 00000000..19493f10 --- /dev/null +++ b/nomos-services/storage/src/backends/rocksdb.rs @@ -0,0 +1,253 @@ +// std +use std::path::PathBuf; +use std::{marker::PhantomData, sync::Arc}; +// crates +use async_trait::async_trait; +use bytes::Bytes; +pub use rocksdb::Error; +use rocksdb::{Options, DB}; +// internal +use super::{StorageBackend, StorageSerde, StorageTransaction}; + +/// Rocks backend setting +#[derive(Clone, Debug)] +pub struct RocksBackendSettings { + /// File path to the db file + pub db_path: PathBuf, + pub read_only: bool, + pub column_family: Option, +} + +/// Rocks transaction type +// Do not use `TransactionDB` here, because rocksdb's `TransactionDB` does not support open by read-only mode. +// Thus, we cannot open the same db in two or more processes. +pub struct Transaction { + rocks: Arc, + #[allow(clippy::type_complexity)] + executor: Box Result, Error> + Send + Sync>, +} + +impl Transaction { + /// Execute a function over the transaction + pub fn execute(self) -> Result, Error> { + (self.executor)(&self.rocks) + } +} + +impl StorageTransaction for Transaction { + type Result = Result, Error>; + type Transaction = Self; +} + +/// Rocks storage backend + +pub struct RocksBackend { + rocks: Arc, + _serde_op: PhantomData, +} + +impl RocksBackend { + pub fn txn( + &self, + executor: impl FnOnce(&DB) -> Result, Error> + Send + Sync + 'static, + ) -> Transaction { + Transaction { + rocks: self.rocks.clone(), + executor: Box::new(executor), + } + } +} + +impl core::fmt::Debug for RocksBackend { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + format!("RocksBackend {{ rocks: {:?} }}", self.rocks).fmt(f) + } +} + +#[async_trait] +impl StorageBackend for RocksBackend { + type Settings = RocksBackendSettings; + type Error = rocksdb::Error; + type Transaction = Transaction; + type SerdeOperator = SerdeOp; + + fn new(config: Self::Settings) -> Result { + let RocksBackendSettings { + db_path, + read_only, + column_family: cf, + } = config; + + let db = match (read_only, cf) { + (true, None) => { + let mut opts = Options::default(); + opts.create_if_missing(false); + DB::open_for_read_only(&opts, db_path, false)? + } + (true, Some(cf)) => { + let mut opts = Options::default(); + opts.create_if_missing(false); + DB::open_cf_for_read_only(&opts, db_path, [cf], false)? + } + (false, None) => { + let mut opts = Options::default(); + opts.create_if_missing(true); + DB::open(&opts, db_path)? + } + (false, Some(cf)) => { + let mut opts = Options::default(); + opts.create_if_missing(true); + DB::open_cf(&opts, db_path, [cf])? + } + }; + + Ok(Self { + rocks: Arc::new(db), + _serde_op: Default::default(), + }) + } + + async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> { + self.rocks.put(key, value) + } + + async fn load(&mut self, key: &[u8]) -> Result, Self::Error> { + self.rocks.get(key).map(|opt| opt.map(|ivec| ivec.into())) + } + + async fn remove(&mut self, key: &[u8]) -> Result, Self::Error> { + self.load(key).await.and_then(|val| { + if val.is_some() { + self.rocks.delete(key).map(|_| val) + } else { + Ok(None) + } + }) + } + + async fn execute( + &mut self, + transaction: Self::Transaction, + ) -> Result<::Result, Self::Error> { + match transaction.execute() { + Ok(result) => Ok(Ok(result)), + Err(e) => Err(e), + } + } +} + +#[cfg(test)] +mod test { + use super::super::testing::NoStorageSerde; + use super::*; + use tempfile::TempDir; + + #[tokio::test] + async fn test_store_load_remove( + ) -> Result<(), as StorageBackend>::Error> { + let temp_path = TempDir::new().unwrap(); + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + let key = "foo"; + let value = "bar"; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + db.store(key.as_bytes().into(), value.as_bytes().into()) + .await?; + let load_value = db.load(key.as_bytes()).await?; + assert_eq!(load_value, Some(value.as_bytes().into())); + let removed_value = db.remove(key.as_bytes()).await?; + assert_eq!(removed_value, Some(value.as_bytes().into())); + + Ok(()) + } + + #[tokio::test] + async fn test_transaction( + ) -> Result<(), as StorageBackend>::Error> { + let temp_path = TempDir::new().unwrap(); + + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + let txn = db.txn(|db| { + let key = "foo"; + let value = "bar"; + db.put(key, value)?; + let result = db.get(key)?; + db.delete(key)?; + Ok(result.map(|ivec| ivec.to_vec().into())) + }); + let result = db.execute(txn).await??; + assert_eq!(result, Some("bar".as_bytes().into())); + + Ok(()) + } + + #[tokio::test] + async fn test_multi_readers_single_writer( + ) -> Result<(), as StorageBackend>::Error> { + use tokio::sync::mpsc::channel; + + let temp_path = TempDir::new().unwrap(); + let path = temp_path.path().to_path_buf(); + let sled_settings = RocksBackendSettings { + db_path: temp_path.path().to_path_buf(), + read_only: false, + column_family: None, + }; + let key = "foo"; + let value = "bar"; + + let mut db: RocksBackend = RocksBackend::new(sled_settings)?; + + let (tx, mut rx) = channel(5); + // now let us spawn a few readers + for _ in 0..5 { + let p = path.clone(); + let tx = tx.clone(); + std::thread::spawn(move || { + tokio::runtime::Runtime::new() + .unwrap() + .block_on(async move { + let sled_settings = RocksBackendSettings { + db_path: p, + read_only: true, + column_family: None, + }; + let key = "foo"; + + let mut db: RocksBackend = + RocksBackend::new(sled_settings).unwrap(); + + while db.load(key.as_bytes()).await.unwrap().is_none() { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + + tx.send(()).await.unwrap(); + }); + }); + } + + db.store(key.as_bytes().into(), value.as_bytes().into()) + .await?; + + let mut recvs = 0; + loop { + if rx.recv().await.is_some() { + recvs += 1; + if recvs == 5 { + break; + } + } + } + Ok(()) + } +} From 292f73cf8427207e5ae630438160e4b82fe7aa50 Mon Sep 17 00:00:00 2001 From: Al Liu Date: Mon, 26 Feb 2024 12:31:08 +0800 Subject: [PATCH 12/12] use rocksdb --- nodes/nomos-node/Cargo.toml | 2 +- nodes/nomos-node/src/lib.rs | 6 +++--- nodes/nomos-node/src/main.rs | 4 +++- nomos-services/api/Cargo.toml | 2 +- nomos-services/api/src/http/consensus.rs | 4 ++-- nomos-services/api/src/http/storage.rs | 4 ++-- 6 files changed, 12 insertions(+), 10 deletions(-) diff --git a/nodes/nomos-node/Cargo.toml b/nodes/nomos-node/Cargo.toml index 71168c8c..8aa6cf22 100644 --- a/nodes/nomos-node/Cargo.toml +++ b/nodes/nomos-node/Cargo.toml @@ -27,7 +27,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "li nomos-metrics = { path = "../../nomos-metrics" } nomos-http = { path = "../../nomos-services/http", features = ["http"] } carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = ["libp2p"] } -nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } +nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } nomos-libp2p = { path = "../../nomos-libp2p" } nomos-da = { path = "../../nomos-services/data-availability", features = ["libp2p"] } nomos-system-sig = { path = "../../nomos-services/system-sig" } diff --git a/nodes/nomos-node/src/lib.rs b/nodes/nomos-node/src/lib.rs index cebd8886..1b9a94d2 100644 --- a/nodes/nomos-node/src/lib.rs +++ b/nodes/nomos-node/src/lib.rs @@ -33,7 +33,7 @@ use nomos_mempool::{ use nomos_metrics::Metrics; use nomos_network::backends::libp2p::Libp2p; use nomos_storage::{ - backends::{sled::SledBackend, StorageSerde}, + backends::{rocksdb::RocksBackend, StorageSerde}, StorageService, }; @@ -68,7 +68,7 @@ pub type Carnot = CarnotConsensus< TreeOverlay, FillSizeWithTx, FillSizeWithBlobsCertificate, - SledBackend, + RocksBackend, >; pub type DataAvailability = DataAvailabilityService< @@ -94,7 +94,7 @@ pub struct Nomos { consensus: ServiceHandle, http: ServiceHandle>>, da: ServiceHandle, - storage: ServiceHandle>>, + storage: ServiceHandle>>, #[cfg(feature = "metrics")] metrics: ServiceHandle, system_sig: ServiceHandle, diff --git a/nodes/nomos-node/src/main.rs b/nodes/nomos-node/src/main.rs index 25bc5787..6ffaa517 100644 --- a/nodes/nomos-node/src/main.rs +++ b/nodes/nomos-node/src/main.rs @@ -99,8 +99,10 @@ fn main() -> Result<()> { #[cfg(feature = "metrics")] metrics: MetricsSettings { registry }, da: config.da, - storage: nomos_storage::backends::sled::SledBackendSettings { + storage: nomos_storage::backends::rocksdb::RocksBackendSettings { db_path: std::path::PathBuf::from(DEFAULT_DB_PATH), + read_only: false, + column_family: Some("blocks".into()), }, system_sig: (), }, diff --git a/nomos-services/api/Cargo.toml b/nomos-services/api/Cargo.toml index 0e7aa4e1..bd3407a9 100644 --- a/nomos-services/api/Cargo.toml +++ b/nomos-services/api/Cargo.toml @@ -20,7 +20,7 @@ nomos-network = { path = "../../nomos-services/network" } nomos-da = { path = "../../nomos-services/data-availability" } nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] } nomos-metrics = { path = "../../nomos-metrics" } -nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] } +nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] } nomos-libp2p = { path = "../../nomos-libp2p" } full-replication = { path = "../../nomos-da/full-replication" } serde = { version = "1", features = ["derive"] } diff --git a/nomos-services/api/src/http/consensus.rs b/nomos-services/api/src/http/consensus.rs index a858ebff..218f59f2 100644 --- a/nomos-services/api/src/http/consensus.rs +++ b/nomos-services/api/src/http/consensus.rs @@ -23,7 +23,7 @@ use nomos_core::{ use nomos_mempool::{ backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter, }; -use nomos_storage::backends::{sled::SledBackend, StorageSerde}; +use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde}; pub type Carnot = CarnotConsensus< ConsensusLibp2pAdapter, @@ -37,7 +37,7 @@ pub type Carnot = CarnotConsensus< TreeOverlay, FillSizeWithTx, FillSizeWithBlobsCertificate, - SledBackend, + RocksBackend, >; pub async fn carnot_info( diff --git a/nomos-services/api/src/http/storage.rs b/nomos-services/api/src/http/storage.rs index 090c09cd..e7579d62 100644 --- a/nomos-services/api/src/http/storage.rs +++ b/nomos-services/api/src/http/storage.rs @@ -1,7 +1,7 @@ use carnot_engine::BlockId; use nomos_core::block::Block; use nomos_storage::{ - backends::{sled::SledBackend, StorageSerde}, + backends::{rocksdb::RocksBackend, StorageSerde}, StorageMsg, StorageService, }; @@ -14,7 +14,7 @@ where S: StorageSerde + Send + Sync + 'static, { let relay = handle - .relay::>>() + .relay::>>() .connect() .await?; let (msg, receiver) = StorageMsg::new_load_message(id);