try to fix blocking test

This commit is contained in:
Al Liu 2024-02-05 02:46:00 +08:00
parent ad47dcf228
commit bd750f0ae6
No known key found for this signature in database
GPG Key ID: C8AE9A6E0166923E
57 changed files with 1630 additions and 359 deletions

View File

@ -1,4 +1,7 @@
[target.'cfg(target_os = "macos")']
# when using osx, we need to link against some golang libraries, it did just work with this missing flags
# from: https://github.com/golang/go/issues/42459
rustflags = ["-C", "link-args=-framework CoreFoundation -framework Security -framework CoreServices -lresolv"]
rustflags = [
"-C",
"link-args=-framework CoreFoundation -framework Security -framework CoreServices -lresolv",
]

View File

@ -1,3 +1,6 @@
#!/usr/bin/env groovy
library 'status-jenkins-lib@v1.8.6'
pipeline {
agent {
dockerfile {
@ -43,16 +46,16 @@ pipeline {
}
post {
failure {
always {
script {
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: 'Nightly Fuzztest Failed. Regression files archived as job artifacts')
}
}
success {
script {
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: 'Nightly Fuzztest Passed')
discord.send(
header: (
currentBuild.currentResult == 'SUCCESS' ?
'Nightly Fuzztest Passed' :
'Nightly Fuzztest Failed. Regression files archived as job artifacts'
),
cred: 'nomos-node-discord-commits-webhook',
)
}
}
cleanup { cleanWs() }

View File

@ -1,3 +1,6 @@
#!/usr/bin/env groovy
library 'status-jenkins-lib@v1.8.6'
pipeline {
agent {
dockerfile {
@ -67,20 +70,13 @@ pipeline {
}
post {
failure {
script {
def report = readFile("${WORKSPACE}/report.txt").trim()
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: "Nightly Integration Tests Failed: ${report}")
}
}
success {
script {
def report = readFile('report.txt').trim()
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: "Nightly Integration Tests Passed: ${report}")
}
}
always { script {
def report = readFile("${WORKSPACE}/report.txt").trim()
discord.send(
header: "Nightly Integration Tests ${currentBuild.currentResult}: ${report}",
cred: 'nomos-node-discord-commits-webhook',
)
} }
cleanup { cleanWs() }
}
}

View File

@ -1,32 +0,0 @@
def sendMessage(Map args=[:]) {
def opts = [
header: args.header ?: 'Build succeeded',
title: args.title ?: "${env.JOB_NAME}#${env.BUILD_NUMBER}",
cred: args.cred ?: 'nomos-node-discord-commits-webhook',
]
def repo = [
url: GIT_URL.minus('.git'),
branch: GIT_BRANCH.minus('origin/'),
commit: GIT_COMMIT.take(8),
]
withCredentials([
string(
credentialsId: opts.cred,
variable: 'DISCORD_WEBHOOK',
),
]) {
discordSend(
link: env.BUILD_URL,
result: currentBuild.currentResult,
webhookURL: env.DISCORD_WEBHOOK,
title: opts.title,
description: """
${opts.header}
Branch: [`${repo.branch}`](${repo.url}/commits/${repo.branch})
Commit: [`${repo.commit}`](${repo.url}/commit/${repo.commit})
""",
)
}
}
return this

View File

@ -139,6 +139,15 @@ services:
entrypoint: /usr/bin/mixnode
command: /etc/nomos/mixnode_config.yaml
chatbot:
container_name: chatbot
build:
context: .
dockerfile: testnet/Dockerfile
volumes:
- ./testnet:/etc/nomos
entrypoint: /etc/nomos/scripts/run_nomos_bot.sh
etcd:
container_name: etcd
image: quay.io/coreos/etcd:v3.4.15
@ -148,3 +157,29 @@ services:
- /usr/local/bin/etcd
- --advertise-client-urls=http://etcd:2379
- --listen-client-urls=http://0.0.0.0:2379
prometheus:
container_name: prometheus
image: prom/prometheus:latest
volumes:
- ./testnet/monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:z
command:
- --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.retention.time=7d
ports:
- 127.0.0.1:9090:9090
restart: on-failure
grafana:
container_name: grafana
image: grafana/grafana:latest
env_file:
- ./testnet/monitoring/grafana/plugins.env
volumes:
- ./testnet/monitoring/grafana/grafana.ini:/etc/grafana/grafana.ini:z
- ./testnet/monitoring/grafana/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:z
ports:
- 9091:3000
restart: on-failure
depends_on:
- prometheus

View File

@ -146,10 +146,15 @@ where
if self.is_member_of_leaf_committee(id) {
return 0;
}
self.carnot_tree
println!("{:?}", self.carnot_tree);
match self
.carnot_tree
.committee_by_member_id(&id)
.map(|c| apply_threshold(c.len(), self.threshold))
.expect("node is not part of any committee")
{
Some(threshold) => threshold,
None => panic!("node {id} is not part of any committee"),
}
}
// TODO: Carnot node in sim does not send votes to the next leader from the child committee of

View File

@ -1,5 +1,5 @@
[package]
name = "explorer"
name = "nomos-explorer"
version = "0.1.0"
edition = "2021"

View File

@ -0,0 +1,16 @@
// crates
use axum::extract::{Json, State};
use axum::response::Response;
use nomos_node::make_request_and_return_response;
// internal
use full_replication::Blob;
use nomos_api::http::da::da_blobs;
use nomos_core::da::blob;
use overwatch_rs::overwatch::handle::OverwatchHandle;
pub(crate) async fn blobs(
State(handle): State<OverwatchHandle>,
Json(items): Json<Vec<<Blob as blob::Blob>::Hash>>,
) -> Response {
make_request_and_return_response!(da_blobs(&handle, items))
}

View File

@ -1,3 +1,4 @@
mod da;
mod store;
use std::{fmt::Debug, hash::Hash};
@ -11,11 +12,10 @@ use tower_http::{
trace::TraceLayer,
};
use nomos_api::Backend;
use nomos_core::tx::Transaction;
use nomos_storage::backends::StorageSerde;
use nomos_api::Backend;
#[derive(Debug, Clone, serde::Deserialize, serde::Serialize)]
pub struct AxumBackendSettings {
/// Socket where the server will be listening on for incoming requests.
@ -84,12 +84,12 @@ where
)
.layer(TraceLayer::new_for_http())
// .merge(SwaggerUi::new("/swagger-ui").url("/api-docs/openapi.json", ApiDoc::openapi()))
.route("/store/blocks", routing::get(store::store_blocks::<T, S>))
.route("/explorer/blocks", routing::get(store::blocks::<T, S>))
.route(
"/explorer/blocks/depth",
routing::get(store::block_depth::<T, S>),
)
.route("/da/blobs", routing::post(da::blobs))
.with_state(handle);
Server::bind(&self.settings.address)

View File

@ -13,40 +13,9 @@ use full_replication::Certificate;
use nomos_api::http::storage;
use nomos_core::block::{Block, BlockId};
use nomos_core::tx::Transaction;
use nomos_node::make_request_and_return_response;
use nomos_storage::backends::StorageSerde;
use overwatch_rs::overwatch::handle::OverwatchHandle;
#[derive(Deserialize)]
pub(crate) struct QueryParams {
blocks: Vec<BlockId>,
}
pub(crate) async fn store_blocks<Tx, S>(
State(store): State<OverwatchHandle>,
Query(query): Query<QueryParams>,
) -> Response
where
Tx: Transaction
+ Clone
+ Debug
+ Eq
+ Hash
+ Serialize
+ DeserializeOwned
+ Send
+ Sync
+ 'static,
<Tx as Transaction>::Hash: std::cmp::Ord + Debug + Send + Sync + 'static,
S: StorageSerde + Send + Sync + 'static,
{
let QueryParams { blocks } = query;
let results: Vec<_> = blocks
.into_iter()
.map(|id| storage::block_req::<S, Tx>(&store, id))
.collect();
make_request_and_return_response!(futures::future::try_join_all(results))
}
#[derive(Deserialize)]
pub(crate) struct BlocksByIdQueryParams {
from: BlockId,

View File

@ -5,6 +5,8 @@ use nomos_api::ApiService;
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
use nomos_node::config::LoggerBackendType;
use nomos_node::{HttpArgs, LogArgs, Tx, Wire};
use nomos_storage::backends::rocksdb::RocksBackend;
use nomos_storage::StorageService;
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use tracing::Level;
@ -15,6 +17,7 @@ pub type ApiArgs = HttpArgs;
pub struct Config {
pub log: <Logger as ServiceData>::Settings,
pub api: <ApiService<AxumBackend<Tx, Wire>> as ServiceData>::Settings,
pub storage: <StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
}
impl Config {

39
nodes/explorer/src/lib.rs Normal file
View File

@ -0,0 +1,39 @@
pub use crate::api_backend::{AxumBackend, AxumBackendSettings};
pub use crate::config::{ApiArgs, Config};
use eyre::{eyre, Result};
use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::ServiceHandle;
use overwatch_rs::Services;
use nomos_api::ApiService;
use nomos_log::Logger;
use nomos_node::{Tx, Wire};
use nomos_storage::{backends::rocksdb::RocksBackend, StorageService};
mod api_backend;
mod config;
#[derive(Services)]
pub struct Explorer {
pub log: ServiceHandle<Logger>,
pub storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
pub api: ServiceHandle<ApiService<AxumBackend<Tx, Wire>>>,
}
impl Explorer {
pub fn run(config: Config) -> Result<()> {
let app = OverwatchRunner::<Explorer>::run(
ExplorerServiceSettings {
log: config.log,
storage: config.storage,
api: config.api,
},
None,
)
.map_err(|e| eyre!("Error encountered: {}", e))?;
app.wait_finished();
Ok(())
}
}

View File

@ -1,28 +1,8 @@
mod api_backend;
mod config;
use clap::Parser;
use eyre::{eyre, Result};
use nomos_api::ApiService;
use overwatch_rs::overwatch::OverwatchRunner;
use overwatch_rs::services::handle::ServiceHandle;
use overwatch_rs::Services;
use eyre::Result;
use crate::api_backend::AxumBackend;
use crate::config::{ApiArgs, Config};
use nomos_log::Logger;
use nomos_node::{LogArgs, Tx, Wire};
use nomos_storage::backends::sled::SledBackendSettings;
use nomos_storage::{backends::sled::SledBackend, StorageService};
#[derive(Services)]
struct Explorer {
log: ServiceHandle<Logger>,
storage: ServiceHandle<StorageService<SledBackend<Wire>>>,
api: ServiceHandle<ApiService<AxumBackend<Tx, Wire>>>,
}
const DEFAULT_DB_PATH: &str = "./db";
use nomos_explorer::{ApiArgs, Config, Explorer};
use nomos_node::LogArgs;
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
@ -48,17 +28,5 @@ fn main() -> Result<()> {
.update_api(api_args)?
.update_log(log_args)?;
let app = OverwatchRunner::<Explorer>::run(
ExplorerServiceSettings {
log: config.log,
storage: SledBackendSettings {
db_path: DEFAULT_DB_PATH.into(),
},
api: config.api,
},
None,
)
.map_err(|e| eyre!("Error encountered: {}", e))?;
app.wait_finished();
Ok(())
Explorer::run(config)
}

View File

@ -27,7 +27,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "li
nomos-metrics = { path = "../../nomos-metrics" }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = ["libp2p"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-da = { path = "../../nomos-services/data-availability", features = ["libp2p"] }
nomos-system-sig = { path = "../../nomos-services/system-sig" }

View File

@ -6,9 +6,9 @@ consensus:
private_key: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
fountain_settings: null
overlay_settings:
nodes: [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]]
nodes: ["0000000000000000000000000000000000000000000000000000000000000000", "0000000000000000000000000000000000000000000000000000000000000001"]
number_of_committees: 1
current_leader: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
current_leader: 0000000000000000000000000000000000000000000000000000000000000000
leader:
cur: 0
committee_membership: !Sad

View File

@ -15,6 +15,10 @@ use nomos_libp2p::{secp256k1::SecretKey, Multiaddr};
use nomos_log::{Logger, LoggerBackend, LoggerFormat};
use nomos_network::backends::libp2p::Libp2p;
use nomos_network::NetworkService;
use nomos_storage::{
backends::rocksdb::{RocksBackend, RocksBackendSettings},
StorageService,
};
use overwatch_rs::services::ServiceData;
use serde::{Deserialize, Serialize};
use tracing::Level;
@ -126,6 +130,19 @@ pub struct Config {
pub http: <ApiService<AxumBackend<Tx, Wire, MB16>> as ServiceData>::Settings,
pub consensus: <Carnot as ServiceData>::Settings,
pub da: <DataAvailability as ServiceData>::Settings,
#[serde(default = "default_storage_settings")]
pub storage: <StorageService<RocksBackend<Wire>> as ServiceData>::Settings,
}
const DEFAULT_DB_PATH: &str = "./db";
fn default_storage_settings() -> <StorageService<RocksBackend<Wire>> as ServiceData>::Settings {
RocksBackendSettings {
db_path: DEFAULT_DB_PATH.into(),
read_only: false,
column_family: Some("blocks".into()),
}
}
impl Config {

View File

@ -7,8 +7,6 @@ use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay};
use color_eyre::eyre::Result;
use full_replication::Certificate;
use full_replication::{AbsoluteNumber, Attestation, Blob, FullReplication};
#[cfg(feature = "metrics")]
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
use api::AxumBackend;
use bytes::Bytes;
@ -33,7 +31,7 @@ use nomos_mempool::{
use nomos_metrics::Metrics;
use nomos_network::backends::libp2p::Libp2p;
use nomos_storage::{
backends::{sled::SledBackend, StorageSerde},
backends::{rocksdb::RocksBackend, StorageSerde},
StorageService,
};
@ -68,7 +66,7 @@ pub type Carnot = CarnotConsensus<
TreeOverlay<RoundRobin, RandomBeaconState>,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, Certificate>,
SledBackend<Wire>,
RocksBackend<Wire>,
>;
pub type DataAvailability = DataAvailabilityService<
@ -94,7 +92,7 @@ pub struct Nomos {
consensus: ServiceHandle<Carnot>,
http: ServiceHandle<ApiService<AxumBackend<Tx, Wire, MB16>>>,
da: ServiceHandle<DataAvailability>,
storage: ServiceHandle<StorageService<SledBackend<Wire>>>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<Metrics>,
system_sig: ServiceHandle<SystemSig>,
@ -113,3 +111,66 @@ impl StorageSerde for Wire {
wire::deserialize(&buff)
}
}
pub fn run(
config: Config,
#[cfg(feature = "metrics")] metrics_args: MetricsArgs,
) -> color_eyre::Result<()> {
use color_eyre::eyre::eyre;
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
#[cfg(feature = "metrics")]
use nomos_metrics::MetricsSettings;
use overwatch_rs::overwatch::*;
#[cfg(feature = "metrics")]
let registry = cfg!(feature = "metrics")
.then(|| {
metrics_args
.with_metrics
.then(nomos_metrics::NomosRegistry::default)
})
.flatten();
#[cfg(not(feature = "metrics"))]
let registry = None;
let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings {
network: config.network,
logging: config.log,
http: config.http,
cl_mempool: nomos_mempool::Settings {
backend: (),
network: AdapterSettings {
topic: String::from(crate::CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: registry.clone(),
},
da_mempool: nomos_mempool::Settings {
backend: (),
network: AdapterSettings {
topic: String::from(crate::DA_TOPIC),
id: cert_id,
},
registry: registry.clone(),
},
consensus: config.consensus,
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
da: config.da,
storage: config.storage,
system_sig: (),
},
None,
)
.map_err(|e| eyre!("Error encountered: {}", e))?;
app.wait_finished();
Ok(())
}
fn cert_id(cert: &Certificate) -> <Blob as blob::Blob>::Hash {
use certificate::Certificate;
cert.hash()
}

View File

@ -1,24 +1,9 @@
use full_replication::{Blob, Certificate};
#[cfg(feature = "metrics")]
use nomos_metrics::MetricsSettings;
use nomos_node::{
Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, Nomos,
NomosServiceSettings, OverlayArgs, Tx,
};
use clap::Parser;
use color_eyre::eyre::{eyre, Result};
use nomos_core::{
da::{blob, certificate},
tx::Transaction,
use color_eyre::eyre::Result;
use nomos_node::{
Config, ConsensusArgs, DaArgs, HttpArgs, LogArgs, MetricsArgs, NetworkArgs, OverlayArgs,
};
use nomos_mempool::network::adapters::libp2p::Settings as AdapterSettings;
use overwatch_rs::overwatch::*;
const DEFAULT_DB_PATH: &str = "./db";
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
@ -58,6 +43,7 @@ fn main() -> Result<()> {
overlay_args,
metrics_args,
} = Args::parse();
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?
.update_da(da_args)?
.update_log(log_args)?
@ -66,52 +52,9 @@ fn main() -> Result<()> {
.update_overlay(overlay_args)?
.update_network(network_args)?;
let registry = cfg!(feature = "metrics")
.then(|| {
metrics_args
.with_metrics
.then(nomos_metrics::NomosRegistry::default)
})
.flatten();
let app = OverwatchRunner::<Nomos>::run(
NomosServiceSettings {
network: config.network,
logging: config.log,
http: config.http,
cl_mempool: nomos_mempool::Settings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::CL_TOPIC),
id: <Tx as Transaction>::hash,
},
registry: registry.clone(),
},
da_mempool: nomos_mempool::Settings {
backend: (),
network: AdapterSettings {
topic: String::from(nomos_node::DA_TOPIC),
id: cert_id,
},
registry: registry.clone(),
},
consensus: config.consensus,
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
da: config.da,
storage: nomos_storage::backends::sled::SledBackendSettings {
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),
},
system_sig: (),
},
None,
nomos_node::run(
config,
#[cfg(feature = "metrics")]
metrics_args,
)
.map_err(|e| eyre!("Error encountered: {}", e))?;
app.wait_finished();
Ok(())
}
fn cert_id(cert: &Certificate) -> <Blob as blob::Blob>::Hash {
use certificate::Certificate;
cert.hash()
}

View File

@ -26,7 +26,7 @@ nomos-libp2p = { path = "../nomos-libp2p"}
nomos-core = { path = "../nomos-core" }
nomos-node = { path = "../nodes/nomos-node" }
full-replication = { path = "../nomos-da/full-replication" }
reqwest = "0.11"
reqwest = { version = "0.11", features = ["json"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
thiserror = "1.0"
@ -36,4 +36,22 @@ crossterm = "0.27"
ratatui = "0.24"
tui-input = "0.8"
ansi-to-tui = "3"
rand = "0.8"
rand = "0.8"
[dev-dependencies]
tempfile = "3"
tokio = { version = "1", features = ["full"] }
nomos-api = { path = "../nomos-services/api" }
nomos-explorer = { path = "../nodes/explorer" }
nomos-log = { path = "../nomos-services/log" }
nomos-libp2p = { path = "../nomos-libp2p" }
nomos-storage = { path = "../nomos-services/storage" }
mixnode = { path = "../nodes/mixnode" }
mixnet-node = { path = "../mixnet/node", default-features = false }
mixnet-client = { path = "../mixnet/client" }
mixnet-topology = { path = "../mixnet/topology" }
scopeguard = "1"
[[test]]
name = "integration"
path = "tests/main.rs"

View File

@ -1,6 +1,5 @@
use super::CLIENT;
use carnot_consensus::CarnotInfo;
use carnot_engine::{Block, BlockId};
use reqwest::Url;
pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
@ -12,20 +11,3 @@ pub async fn carnot_info(node: &Url) -> Result<CarnotInfo, reqwest::Error> {
.json::<CarnotInfo>()
.await
}
pub async fn get_blocks_info(
node: &Url,
from: Option<BlockId>,
to: Option<BlockId>,
) -> Result<Vec<Block>, reqwest::Error> {
const NODE_CARNOT_INFO_PATH: &str = "carnot/blocks";
let mut req = CLIENT.get(node.join(NODE_CARNOT_INFO_PATH).unwrap());
if let Some(from) = from {
req = req.query(&[("from", from)]);
}
if let Some(to) = to {
req = req.query(&[("to", to)]);
}
req.send().await?.json().await
}

View File

@ -6,13 +6,13 @@ use nomos_node::Tx;
use reqwest::Url;
pub async fn get_block_contents(
node: &Url,
explorer: &Url,
block: &BlockId,
) -> Result<Option<Block<Tx, Certificate>>, reqwest::Error> {
const BLOCK_PATH: &str = "storage/block";
const BLOCK_PATH: &str = "explorer/blocks/depth";
CLIENT
.post(node.join(BLOCK_PATH).unwrap())
.json(block)
.get(explorer.join(BLOCK_PATH).unwrap())
.query(&[("from", block)])
.send()
.await?
.json()

View File

@ -5,7 +5,7 @@
mod ui;
use crate::{
api::consensus::get_blocks_info,
api::{consensus::carnot_info, storage::get_block_contents},
da::{
disseminate::{
DaProtocolChoice, DisseminateApp, DisseminateAppServiceSettings, Settings, Status,
@ -18,7 +18,7 @@ use full_replication::{
AbsoluteNumber, Attestation, Certificate, FullReplication, Settings as DaSettings,
};
use futures::{stream, StreamExt};
use nomos_core::{block::BlockId, da::DaProtocol, wire};
use nomos_core::{da::DaProtocol, wire};
use nomos_log::{LoggerBackend, LoggerSettings, SharedWriter};
use nomos_network::{backends::libp2p::Libp2p, NetworkService};
use overwatch_rs::{overwatch::OverwatchRunner, services::ServiceData};
@ -48,6 +48,8 @@ use ratatui::{
use tui_input::{backend::crossterm::EventHandler, Input};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
// Limit the number of maximum in-flight requests
const MAX_BUFFERED_REQUESTS: usize = 20;
#[derive(Clone, Debug, Args)]
/// The almost-instant messaging protocol.
@ -58,9 +60,18 @@ pub struct NomosChat {
/// The data availability protocol to use. Defaults to full replication.
#[clap(flatten)]
pub da_protocol: DaProtocolChoice,
/// The node to connect to to fetch blocks and blobs
/// The node to connect to to fetch carnot info
#[clap(long)]
pub node: Url,
/// The explorer to connect to to fetch blocks and blobs
#[clap(long)]
pub explorer: Url,
/// Author for non interactive message formation
#[clap(long, requires("message"))]
pub author: Option<String>,
/// Message for non interactive message formation
#[clap(long, requires("author"))]
pub message: Option<String>,
}
pub struct App {
@ -73,10 +84,27 @@ pub struct App {
payload_sender: UnboundedSender<Box<[u8]>>,
status_updates: Receiver<Status>,
node: Url,
explorer: Url,
logs: Arc<sync::Mutex<Vec<u8>>>,
scroll_logs: u16,
}
impl App {
pub fn send_message(&self, msg: String) {
self.payload_sender
.send(
wire::serialize(&ChatMessage {
author: self.username.clone().unwrap(),
message: msg,
_nonce: rand::random(),
})
.unwrap()
.into(),
)
.unwrap();
}
}
impl NomosChat {
pub fn run(&self) -> Result<(), Box<dyn std::error::Error>> {
let network = serde_yaml::from_reader::<
@ -84,6 +112,47 @@ impl NomosChat {
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let da_protocol = self.da_protocol.clone();
let node_addr = Some(self.node.clone());
let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel();
let (status_sender, status_updates) = std::sync::mpsc::channel();
let shared_writer = Arc::new(sync::Mutex::new(Vec::new()));
let backend = SharedWriter::from_inner(shared_writer.clone());
std::thread::spawn(move || {
OverwatchRunner::<DisseminateApp>::run(
DisseminateAppServiceSettings {
network,
send_blob: Settings {
payload: Arc::new(Mutex::new(payload_receiver)),
timeout: DEFAULT_TIMEOUT,
da_protocol,
status_updates: status_sender,
node_addr,
output: None,
},
logger: LoggerSettings {
backend: LoggerBackend::Writer(backend),
level: tracing::Level::INFO,
..Default::default()
},
},
None,
)
.unwrap()
.wait_finished()
});
if let Some(author) = self.author.as_ref() {
let message = self
.message
.as_ref()
.expect("Should be available if author is set");
return run_once(author, message, payload_sender);
}
// setup terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
@ -91,6 +160,46 @@ impl NomosChat {
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let app = App {
input: Input::default(),
username: None,
messages: Vec::new(),
message_status: None,
message_in_flight: false,
last_updated: Instant::now(),
payload_sender,
status_updates,
node: self.node.clone(),
explorer: self.explorer.clone(),
logs: shared_writer,
scroll_logs: 0,
};
run_app(&mut terminal, app);
// restore terminal
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
Ok(())
}
pub fn run_app_without_terminal(
&self,
username: String,
) -> Result<(std::sync::mpsc::Receiver<Vec<ChatMessage>>, App), Box<dyn std::error::Error>>
{
let network = serde_yaml::from_reader::<
_,
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let da_protocol = self.da_protocol.clone();
let node_addr = Some(self.node.clone());
let (payload_sender, payload_receiver) = tokio::sync::mpsc::unbounded_channel();
@ -125,7 +234,7 @@ impl NomosChat {
let app = App {
input: Input::default(),
username: None,
username: Some(username),
messages: Vec::new(),
message_status: None,
message_in_flight: false,
@ -133,29 +242,43 @@ impl NomosChat {
payload_sender,
status_updates,
node: self.node.clone(),
explorer: self.explorer.clone(),
logs: shared_writer,
scroll_logs: 0,
};
run_app(&mut terminal, app);
let (message_tx, message_rx) = std::sync::mpsc::channel();
let node = app.node.clone();
let explorer = app.explorer.clone();
std::thread::spawn(move || check_for_messages(message_tx, node, explorer));
// restore terminal
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
Ok(())
Ok((message_rx, app))
}
}
fn run_once(
author: &str,
message: &str,
payload_sender: UnboundedSender<Box<[u8]>>,
) -> Result<(), Box<dyn std::error::Error>> {
payload_sender.send(
wire::serialize(&ChatMessage {
author: author.to_string(),
message: message.to_string(),
_nonce: rand::random(),
})
.unwrap()
.into(),
)?;
Ok(())
}
fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) {
let (message_tx, message_rx) = std::sync::mpsc::channel();
let node = app.node.clone();
std::thread::spawn(move || check_for_messages(message_tx, node));
let explorer = app.explorer.clone();
std::thread::spawn(move || check_for_messages(message_tx, node, explorer));
loop {
terminal.draw(|f| ui::ui(f, &app)).unwrap();
@ -220,7 +343,7 @@ fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) {
}
#[derive(Serialize, Deserialize, Debug)]
struct ChatMessage {
pub struct ChatMessage {
author: String,
message: String,
// Since DA will rightfully ignore duplicated messages, we need to add a nonce to make sure
@ -228,15 +351,21 @@ struct ChatMessage {
_nonce: u64,
}
#[tokio::main]
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
// Should ask for the genesis block to be more robust
let mut last_tip = BlockId::zeros();
impl ChatMessage {
pub fn author(&self) -> &str {
&self.author
}
pub fn message(&self) -> &str {
&self.message
}
}
#[tokio::main]
async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url, explorer: Url) {
loop {
if let Ok((new_tip, messages)) = fetch_new_messages(&last_tip, &node).await {
if let Ok(messages) = fetch_new_messages(&explorer, &node).await {
sender.send(messages).expect("channel closed");
last_tip = new_tip;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
@ -245,10 +374,10 @@ async fn check_for_messages(sender: Sender<Vec<ChatMessage>>, node: Url) {
// Process a single block's blobs and return chat messages
async fn process_block_blobs(
node: Url,
block_id: &BlockId,
block: &nomos_core::block::Block<nomos_node::Tx, full_replication::Certificate>,
da_settings: DaSettings,
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
let blobs = get_block_blobs(&node, block_id).await?;
let blobs = get_block_blobs(&node, block).await?;
// Note that number of attestations is ignored here since we only use the da protocol to
// decode the blob data, not to validate the certificate
@ -269,29 +398,25 @@ async fn process_block_blobs(
// Fetch new messages since the last tip
async fn fetch_new_messages(
last_tip: &BlockId,
explorer: &Url,
node: &Url,
) -> Result<(BlockId, Vec<ChatMessage>), Box<dyn std::error::Error>> {
) -> Result<Vec<ChatMessage>, Box<dyn std::error::Error>> {
let info = carnot_info(node).await?;
// By only specifying the 'to' parameter we get all the blocks since the last tip
let mut new_blocks = get_blocks_info(node, None, Some(*last_tip))
let mut blocks = get_block_contents(explorer, &info.tip.id)
.await?
.into_iter()
.map(|block| block.id)
.collect::<Vec<_>>();
// The first block is the most recent one.
// Note that the 'to' is inclusive so the above request will always return at least one block
// as long as the block exists (which is the case since it was returned by a previous call)
let new_tip = new_blocks[0];
// We already processed the last block so let's remove it
new_blocks.pop();
blocks.pop();
let da_settings = DaSettings {
num_attestations: 1,
voter: [0; 32],
};
let block_stream = stream::iter(new_blocks.iter().rev());
let block_stream = stream::iter(blocks.iter());
let results: Vec<_> = block_stream
.map(|block| {
let node = node.clone();
@ -299,7 +424,7 @@ async fn fetch_new_messages(
process_block_blobs(node, block, da_settings)
})
.buffer_unordered(new_blocks.len())
.buffered(MAX_BUFFERED_REQUESTS)
.collect::<Vec<_>>()
.await;
@ -308,5 +433,5 @@ async fn fetch_new_messages(
new_messages.extend(result?);
}
Ok((new_tip, new_messages))
Ok(new_messages)
}

View File

@ -9,11 +9,11 @@ use reqwest::Url;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::Mutex;
#[derive(Args, Debug)]
#[derive(Args, Debug, Default)]
pub struct Disseminate {
// TODO: accept bytes
#[clap(short, long)]
pub data: String,
#[clap(short, long, required_unless_present("file"))]
pub data: Option<String>,
/// Path to the network config file
#[clap(short, long)]
pub network_config: PathBuf,
@ -30,6 +30,9 @@ pub struct Disseminate {
/// File to write the certificate to, if present.
#[clap(long)]
pub output: Option<PathBuf>,
/// File to disseminate
#[clap(short, long)]
pub file: Option<PathBuf>,
}
impl Disseminate {
@ -41,7 +44,15 @@ impl Disseminate {
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let (status_updates, rx) = std::sync::mpsc::channel();
let bytes: Box<[u8]> = self.data.clone().as_bytes().into();
let bytes: Box<[u8]> = if let Some(data) = &self.data {
data.clone().as_bytes().into()
} else {
let file_path = self.file.as_ref().unwrap();
let file_bytes = std::fs::read(file_path)?;
file_bytes.into_boxed_slice()
};
let timeout = Duration::from_secs(self.timeout);
let da_protocol = self.da_protocol.clone();
let node_addr = self.node_addr.clone();

View File

@ -205,7 +205,7 @@ impl ServiceCore for DisseminateService {
// protocols, but only the one chosen will be used.
// We can enforce only sensible combinations of protocol/settings
// are specified by using special clap directives
#[derive(Clone, Debug, Args)]
#[derive(Clone, Debug, Args, Default)]
pub struct DaProtocolChoice {
#[clap(long, default_value = "full-replication")]
pub da_protocol: Protocol,
@ -227,14 +227,15 @@ impl TryFrom<DaProtocolChoice> for FullReplication<AbsoluteNumber<Attestation, C
}
}
#[derive(Clone, Debug, Args)]
#[derive(Clone, Debug, Args, Default)]
pub struct ProtocolSettings {
#[clap(flatten)]
pub full_replication: FullReplicationSettings,
}
#[derive(Clone, Debug, ValueEnum)]
#[derive(Clone, Debug, ValueEnum, Default)]
pub enum Protocol {
#[default]
FullReplication,
}

View File

@ -1,10 +1,10 @@
use carnot_engine::BlockId;
use full_replication::Blob;
use nomos_core::da::certificate::Certificate;
use nomos_core::{block::Block, da::certificate::Certificate};
use nomos_node::Tx;
use reqwest::Url;
use thiserror::Error;
use crate::api::{da::get_blobs, storage::get_block_contents};
use crate::api::da::get_blobs;
#[derive(Error, Debug)]
pub enum Error {
@ -15,10 +15,9 @@ pub enum Error {
}
/// Return the blobs whose certificate has been included in the provided block.
pub async fn get_block_blobs(node: &Url, block: &BlockId) -> Result<Vec<Blob>, Error> {
let block = get_block_contents(node, block)
.await?
.ok_or(Error::NotFound)?;
pub async fn get_block_blobs(
node: &Url,
block: &Block<Tx, full_replication::Certificate>,
) -> Result<Vec<Blob>, Error> {
Ok(get_blobs(node, block.blobs().map(|cert| cert.blob()).collect()).await?)
}

View File

@ -0,0 +1,45 @@
backend:
host: 127.0.0.1
port: 3019
log_level: "fatal"
# Node key needs to be unique for every client.
node_key: "0000000000000000000000000000000000000000000000000000000000001444"
discV5BootstrapNodes: []
initial_peers: ["/dns/127.0.0.1/tcp/3000"]
relayTopics: []
# Mixclient configuration to communicate with mixnodes.
# The libp2p network backend always requires this mixclient configuration
# (cannot be disabled for now).
mixnet_client:
# A mixclient mode. For details, see the documentation of the "mixnet" crate.
# - Sender
# - !SenderReceiver [mixnode_client_listen_address]
mode: Sender
# A mixnet topology, which contains the information of all mixnodes in the mixnet.
# (The topology is static for now.)
topology:
# Each mixnet layer consists of a list of mixnodes.
layers:
- nodes:
- address: 127.0.0.1:7707 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: 127.0.0.1:7717 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: 127.0.0.1:7727 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
# A max number of connections that will stay connected to mixnodes in the first mixnet layer.
connection_pool_size: 255
max_retries: 5
retry_delay:
secs: 1
nanos: 0
# A range of total delay that will be set to each Sphinx packets
# sent to the mixnet for timing obfuscation.
# Panics if start > end.
mixnet_delay:
start: "0ms"
end: "0ms"

View File

@ -0,0 +1,45 @@
backend:
host: 127.0.0.1
port: 3020
log_level: "fatal"
# Node key needs to be unique for every client.
node_key: "0000000000000000000000000000000000000000000000000000000000001444"
discV5BootstrapNodes: []
initial_peers: ["/dns/127.0.0.1/tcp/3000"]
relayTopics: []
# Mixclient configuration to communicate with mixnodes.
# The libp2p network backend always requires this mixclient configuration
# (cannot be disabled for now).
mixnet_client:
# A mixclient mode. For details, see the documentation of the "mixnet" crate.
# - Sender
# - !SenderReceiver [mixnode_client_listen_address]
mode: Sender
# A mixnet topology, which contains the information of all mixnodes in the mixnet.
# (The topology is static for now.)
topology:
# Each mixnet layer consists of a list of mixnodes.
layers:
- nodes:
- address: 127.0.0.1:7707 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: 127.0.0.1:7717 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: 127.0.0.1:7727 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
# A max number of connections that will stay connected to mixnodes in the first mixnet layer.
connection_pool_size: 255
max_retries: 5
retry_delay:
secs: 1
nanos: 0
# A range of total delay that will be set to each Sphinx packets
# sent to the mixnet for timing obfuscation.
# Panics if start > end.
mixnet_delay:
start: "0ms"
end: "0ms"

329
nomos-cli/tests/main.rs Normal file
View File

@ -0,0 +1,329 @@
// use std::{env::current_dir, net::SocketAddr, path::PathBuf, thread::Builder, time::Duration};
// use carnot_consensus::CarnotSettings;
// use carnot_engine::{
// overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings},
// NodeId, Overlay,
// };
// use fraction::{Fraction, One};
// use mixnet_client::MixnetClientConfig;
// use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE};
// use mixnet_topology::{Layer, MixnetTopology, Node};
// use mixnode::{MixNode, MixNodeServiceSettings};
// use nomos_cli::{
// cmds::chat::{App, ChatMessage},
// da::disseminate::{DaProtocolChoice, FullReplicationSettings, ProtocolSettings},
// };
// use nomos_network::{
// backends::libp2p::{Libp2p, Libp2pConfig},
// NetworkConfig,
// };
// use nomos_node::{api::AxumBackendSettings, run, Config};
// use nomos_storage::backends::rocksdb::RocksBackendSettings;
// use overwatch_rs::{overwatch::OverwatchRunner, DynError};
// use rand::{thread_rng, RngCore};
// use reqwest::Url;
// use tracing::Level;
// fn run_mixnode(
// name: &'static str,
// listen_addr: SocketAddr,
// client_listen_addr: SocketAddr,
// ) -> MixnetNodeConfig {
// let mut private_key = [0u8; PRIVATE_KEY_SIZE];
// thread_rng().fill_bytes(&mut private_key);
// let config = MixnetNodeConfig {
// listen_address: listen_addr,
// client_listen_address: client_listen_addr,
// private_key,
// connection_pool_size: 255,
// ..Default::default()
// };
// let config1 = config.clone();
// Builder::new()
// .name(name.to_string())
// .spawn(move || {
// let mut cfg = MixNodeServiceSettings {
// node: config1,
// logging: Default::default(),
// };
// cfg.logging.level = Level::WARN;
// let app = OverwatchRunner::<MixNode>::run(cfg, None)?;
// app.wait_finished();
// std::result::Result::<_, DynError>::Ok(())
// })
// .unwrap();
// config
// }
// fn build_topology(configs: Vec<MixnetNodeConfig>) -> MixnetTopology {
// // Build three empty layers first
// let mut layers = vec![Layer { nodes: Vec::new() }; 3];
// let mut layer_id = 0;
// // Assign nodes to each layer in round-robin
// for config in &configs {
// let public_key = config.public_key();
// layers.get_mut(layer_id).unwrap().nodes.push(Node {
// address: config.listen_address,
// public_key,
// });
// layer_id = (layer_id + 1) % layers.len();
// }
// // Exclude empty layers
// MixnetTopology {
// layers: layers
// .iter()
// .filter(|layer| !layer.nodes.is_empty())
// .cloned()
// .collect(),
// }
// }
// fn run_nomos_node(name: &'static str, mut config: Config) {
// Builder::new()
// .name(name.into())
// .spawn(move || {
// config.log.level = Level::WARN;
// run(config)
// })
// .unwrap();
// }
// fn run_explorer(explorer_db_dir: PathBuf) {
// Builder::new()
// .name("explorer".into())
// .spawn(|| {
// let mut cfg = nomos_explorer::Config {
// log: Default::default(),
// api: nomos_api::ApiServiceSettings {
// backend_settings: nomos_explorer::AxumBackendSettings {
// address: "127.0.0.1:5000".parse().unwrap(),
// cors_origins: Vec::new(),
// },
// },
// storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
// db_path: explorer_db_dir,
// read_only: true,
// column_family: Some("blocks".into()),
// },
// };
// cfg.log.level = Level::WARN;
// nomos_explorer::Explorer::run(cfg).unwrap()
// })
// .unwrap();
// }
// fn run_chat(
// username: String,
// config: &'static str,
// node_url: Url,
// explorer_url: Url,
// network: NetworkConfig<Libp2p>,
// ) -> (std::sync::mpsc::Receiver<Vec<ChatMessage>>, App) {
// let c = nomos_cli::cmds::chat::NomosChat {
// network_config: current_dir().unwrap().join(config),
// da_protocol: DaProtocolChoice {
// da_protocol: nomos_cli::da::disseminate::Protocol::FullReplication,
// settings: ProtocolSettings {
// full_replication: FullReplicationSettings {
// voter: [0; 32],
// num_attestations: 1,
// },
// },
// },
// node: node_url,
// explorer: explorer_url,
// message: None,
// author: None,
// };
// c.run_app(username).unwrap()
// }
// fn create_node_config(
// db_dir: PathBuf,
// api_backend_addr: SocketAddr,
// network_backend_port: u16,
// nodes: Vec<NodeId>,
// id: [u8; 32],
// threshold: Fraction,
// timeout: Duration,
// mixnet_node_config: Option<MixnetNodeConfig>,
// mixnet_topology: MixnetTopology,
// ) -> Config {
// let mixnet_client_mode = match mixnet_node_config {
// Some(node_config) => mixnet_client::MixnetClientMode::SenderReceiver(
// node_config.client_listen_address.to_string(),
// ),
// None => mixnet_client::MixnetClientMode::Sender,
// };
// let mut config = Config {
// network: NetworkConfig {
// backend: Libp2pConfig {
// inner: Default::default(),
// initial_peers: vec![],
// mixnet_client: MixnetClientConfig {
// mode: mixnet_client_mode,
// topology: mixnet_topology,
// connection_pool_size: 255,
// max_retries: 3,
// retry_delay: Duration::from_secs(5),
// },
// mixnet_delay: Duration::ZERO..Duration::from_millis(10),
// },
// },
// consensus: CarnotSettings {
// private_key: id,
// overlay_settings: TreeOverlaySettings {
// nodes,
// leader: RoundRobin::new(),
// current_leader: [0; 32].into(),
// number_of_committees: 1,
// committee_membership: RandomBeaconState::initial_sad_from_entropy([0; 32]),
// // By setting the threshold to 1 we ensure that all nodes come
// // online before progressing. This is only necessary until we add a way
// // to recover poast blocks from other nodes.
// super_majority_threshold: Some(threshold),
// },
// timeout,
// transaction_selector_settings: (),
// blob_selector_settings: (),
// },
// log: Default::default(),
// http: nomos_api::ApiServiceSettings {
// backend_settings: AxumBackendSettings {
// address: api_backend_addr,
// cors_origins: vec![],
// },
// },
// da: nomos_da::Settings {
// da_protocol: full_replication::Settings {
// voter: id,
// num_attestations: 1,
// },
// backend: nomos_da::backend::memory_cache::BlobCacheSettings {
// max_capacity: usize::MAX,
// evicting_period: Duration::from_secs(60 * 60 * 24), // 1 day
// },
// },
// storage: RocksBackendSettings {
// db_path: db_dir,
// read_only: false,
// column_family: Some("blocks".into()),
// },
// };
// config.network.backend.inner.port = network_backend_port;
// config
// }
// #[test]
// fn integration_test() {
// let temp_dir = tempfile::tempdir().unwrap();
// let dir = temp_dir.path().to_path_buf().join("integration_test");
// let peer_dir = dir.join("peer");
// // create a directory for the db
// std::fs::create_dir_all(&peer_dir).unwrap();
// let mut mixnet_configs = Vec::with_capacity(3);
// // Run 3 mixnodes
// mixnet_configs.push(run_mixnode(
// "mixnode1",
// "127.0.0.1:7707".parse().unwrap(),
// "127.0.0.1:7708".parse().unwrap(),
// ));
// mixnet_configs.push(run_mixnode(
// "mixnode2",
// "127.0.0.1:7717".parse().unwrap(),
// "127.0.0.1:7718".parse().unwrap(),
// ));
// mixnet_configs.push(run_mixnode(
// "mixnode3",
// "127.0.0.1:7727".parse().unwrap(),
// "127.0.0.1:7728".parse().unwrap(),
// ));
// let mixnet_topology = build_topology(mixnet_configs.clone());
// // Run bootstrap nomos node
// let ids = [[0; 32], [1; 32]];
// let config1 = create_node_config(
// dir.clone(),
// "127.0.0.1:11000".parse().unwrap(),
// 3000,
// ids.iter().copied().map(NodeId::new).collect(),
// ids[0],
// Fraction::one(),
// Duration::from_secs(5),
// mixnet_configs.pop(),
// mixnet_topology.clone(),
// );
// let config2 = create_node_config(
// peer_dir.clone(),
// "127.0.0.1:11001".parse().unwrap(),
// 3001,
// ids.iter().copied().map(NodeId::new).collect(),
// ids[1],
// Fraction::one(),
// Duration::from_secs(5),
// mixnet_configs.pop(),
// mixnet_topology.clone(),
// );
// let mut configs = vec![config1, config2];
// let overlay = TreeOverlay::new(configs[0].consensus.overlay_settings.clone());
// let next_leader = overlay.next_leader();
// let next_leader_idx = ids
// .iter()
// .position(|&id| NodeId::from(id) == next_leader)
// .unwrap();
// let next_leader_config = configs.swap_remove(next_leader_idx);
// let libp2p_config = next_leader_config.network.clone();
// let explorer_db_dir = next_leader_config.storage.db_path.clone();
// let node_api_addr = next_leader_config.http.backend_settings.address;
// let prev_node_addr = nomos_libp2p::multiaddr!(
// Ip4([127, 0, 0, 1]),
// Tcp(next_leader_config.network.backend.inner.port)
// );
// run_nomos_node("bootstrap", next_leader_config);
// configs[0]
// .network
// .backend
// .initial_peers
// .push(prev_node_addr);
// run_nomos_node("libp2p", configs.pop().unwrap());
// // wait for the bootstrap node to start
// std::thread::sleep(std::time::Duration::from_secs(1));
// run_explorer(explorer_db_dir);
// let (rx1, app1) = run_chat(
// "user1".into(),
// "test.config.chat.user1.yaml",
// format!("http://{}", node_api_addr).parse().unwrap(),
// "http://127.0.0.1:5000".parse().unwrap(),
// libp2p_config.clone(),
// );
// let (rx2, app2) = run_chat(
// "user2".into(),
// "test.config.chat.user2.yaml",
// format!("http://{}", node_api_addr).parse().unwrap(),
// "http://127.0.0.1:5000".parse().unwrap(),
// libp2p_config,
// );
// app1.send_message("Hello from user1".into());
// tracing::info!("user1: sent message: Hello from user1");
// app2.send_message("Hello from user2".into());
// tracing::info!("user2: sent message: Hello from user2");
// let msgs1 = rx1.recv().unwrap();
// let msgs2 = rx2.recv().unwrap();
// assert_eq!(msgs1.len(), 1);
// assert_eq!(msgs2.len(), 1);
// }

View File

@ -20,7 +20,7 @@ nomos-network = { path = "../../nomos-services/network" }
nomos-da = { path = "../../nomos-services/data-availability" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
full-replication = { path = "../../nomos-da/full-replication" }
serde = { version = "1", features = ["derive"] }

View File

@ -23,7 +23,7 @@ use nomos_core::{
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter,
};
use nomos_storage::backends::{sled::SledBackend, StorageSerde};
use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde};
pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
ConsensusLibp2pAdapter,
@ -37,7 +37,7 @@ pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
TreeOverlay<RoundRobin, RandomBeaconState>,
FillSizeWithTx<SIZE, Tx>,
FillSizeWithBlobsCertificate<SIZE, Certificate>,
SledBackend<SS>,
RocksBackend<SS>,
>;
pub async fn carnot_info<Tx, SS, const SIZE: usize>(

View File

@ -1,7 +1,7 @@
use carnot_engine::BlockId;
use nomos_core::block::Block;
use nomos_storage::{
backends::{sled::SledBackend, StorageSerde},
backends::{rocksdb::RocksBackend, StorageSerde},
StorageMsg, StorageService,
};
@ -14,7 +14,7 @@ where
S: StorageSerde + Send + Sync + 'static,
{
let relay = handle
.relay::<StorageService<SledBackend<S>>>()
.relay::<StorageService<RocksBackend<S>>>()
.connect()
.await?;
let (msg, receiver) = StorageMsg::new_load_message(id);

View File

@ -13,7 +13,7 @@ use crate::network::{
messages::{ProposalMsg, VoteMsg},
BoxedStream, NetworkAdapter,
};
use consensus_engine::{BlockId, Committee, View};
use carnot_engine::{BlockId, Committee, View};
const MOCK_PUB_SUB_TOPIC: &str = "MockPubSubTopic";
const MOCK_BLOCK_CONTENT_TOPIC: MockContentTopic = MockContentTopic::new("MockSim", 1, "MockBlock");

View File

@ -144,6 +144,10 @@ macro_rules! registry_init {
#[async_trait::async_trait]
impl ServiceCore for Logger {
fn init(service_state: ServiceStateHandle<Self>) -> Result<Self, overwatch_rs::DynError> {
use std::sync::Once;
static ONCE_INIT: Once = Once::new();
let config = service_state.settings_reader.get_updated_settings();
let (non_blocking, _guard) = match config.backend {
LoggerBackend::Gelf { addr } => {
@ -152,7 +156,9 @@ impl ServiceCore for Logger {
.overwatch_handle
.runtime()
.spawn(async move { task.connect().await });
registry_init!(layer, config.format, config.level);
ONCE_INIT.call_once(move || {
registry_init!(layer, config.format, config.level);
});
return Ok(Self {
service_state,
worker_guard: None,
@ -179,7 +185,9 @@ impl ServiceCore for Logger {
let layer = tracing_subscriber::fmt::Layer::new()
.with_level(true)
.with_writer(non_blocking);
registry_init!(layer, config.format, config.level);
ONCE_INIT.call_once(move || {
registry_init!(layer, config.format, config.level);
});
Ok(Self {
service_state,
worker_guard: Some(_guard),

View File

@ -13,6 +13,7 @@ bytes = "1.2"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
serde = "1.0"
sled = { version = "0.34", optional = true }
rocksdb = { version = "0.22", optional = true }
thiserror = "1.0"
tracing = "0.1"
@ -24,3 +25,9 @@ tempfile = "3"
default = []
mock = []
sled-backend = ["sled"]
rocksdb-backend = ["rocksdb"]
[[bin]]
name = "rocks"
path = "src/bin/rocks.rs"
required-features = ["rocksdb-backend"]

View File

@ -3,6 +3,9 @@ pub mod mock;
#[cfg(feature = "sled")]
pub mod sled;
#[cfg(feature = "rocksdb")]
pub mod rocksdb;
// std
use std::error::Error;
// crates

View File

@ -0,0 +1,250 @@
// std
use std::path::PathBuf;
use std::{marker::PhantomData, sync::Arc};
// crates
use async_trait::async_trait;
use bytes::Bytes;
pub use rocksdb::Error;
use rocksdb::{Options, DB};
// internal
use super::{StorageBackend, StorageSerde, StorageTransaction};
/// Rocks backend setting
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct RocksBackendSettings {
/// File path to the db file
pub db_path: PathBuf,
pub read_only: bool,
pub column_family: Option<String>,
}
/// Rocks transaction type
// Do not use `TransactionDB` here, because rocksdb's `TransactionDB` does not support open by read-only mode.
// Thus, we cannot open the same db in two or more processes.
pub struct Transaction {
rocks: Arc<DB>,
#[allow(clippy::type_complexity)]
executor: Box<dyn FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync>,
}
impl Transaction {
/// Execute a function over the transaction
pub fn execute(self) -> Result<Option<Bytes>, Error> {
(self.executor)(&self.rocks)
}
}
impl StorageTransaction for Transaction {
type Result = Result<Option<Bytes>, Error>;
type Transaction = Self;
}
/// Rocks storage backend
pub struct RocksBackend<SerdeOp> {
rocks: Arc<DB>,
_serde_op: PhantomData<SerdeOp>,
}
impl<SerdeOp> RocksBackend<SerdeOp> {
pub fn txn(
&self,
executor: impl FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync + 'static,
) -> Transaction {
Transaction {
rocks: self.rocks.clone(),
executor: Box::new(executor),
}
}
}
impl<SerdeOp> core::fmt::Debug for RocksBackend<SerdeOp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
format!("RocksBackend {{ rocks: {:?} }}", self.rocks).fmt(f)
}
}
#[async_trait]
impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for RocksBackend<SerdeOp> {
type Settings = RocksBackendSettings;
type Error = rocksdb::Error;
type Transaction = Transaction;
type SerdeOperator = SerdeOp;
fn new(config: Self::Settings) -> Result<Self, Self::Error> {
let RocksBackendSettings {
db_path,
read_only,
column_family: cf,
} = config;
let db = match (read_only, cf) {
(true, None) => {
let mut opts = Options::default();
opts.create_if_missing(false);
DB::open_for_read_only(&opts, db_path, false)?
}
(true, Some(cf)) => {
let mut opts = Options::default();
opts.create_if_missing(false);
DB::open_cf_for_read_only(&opts, db_path, [cf], false)?
}
(false, None) => {
let mut opts = Options::default();
opts.create_if_missing(true);
DB::open(&opts, db_path)?
}
(false, Some(cf)) => {
let mut opts = Options::default();
opts.create_if_missing(true);
DB::open_cf(&opts, db_path, [cf])?
}
};
Ok(Self {
rocks: Arc::new(db),
_serde_op: Default::default(),
})
}
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> {
self.rocks.put(key, value)
}
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
self.rocks.get(key).map(|opt| opt.map(|ivec| ivec.into()))
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
self.load(key).await.and_then(|val| {
if val.is_some() {
self.rocks.delete(key).map(|_| val)
} else {
Ok(None)
}
})
}
async fn execute(
&mut self,
transaction: Self::Transaction,
) -> Result<<Self::Transaction as StorageTransaction>::Result, Self::Error> {
Ok(transaction.execute())
}
}
#[cfg(test)]
mod test {
use super::super::testing::NoStorageSerde;
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_store_load_remove(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
let temp_path = TempDir::new().unwrap();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let key = "foo";
let value = "bar";
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
db.store(key.as_bytes().into(), value.as_bytes().into())
.await?;
let load_value = db.load(key.as_bytes()).await?;
assert_eq!(load_value, Some(value.as_bytes().into()));
let removed_value = db.remove(key.as_bytes()).await?;
assert_eq!(removed_value, Some(value.as_bytes().into()));
Ok(())
}
#[tokio::test]
async fn test_transaction(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
let temp_path = TempDir::new().unwrap();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
let txn = db.txn(|db| {
let key = "foo";
let value = "bar";
db.put(key, value)?;
let result = db.get(key)?;
db.delete(key)?;
Ok(result.map(|ivec| ivec.to_vec().into()))
});
let result = db.execute(txn).await??;
assert_eq!(result, Some("bar".as_bytes().into()));
Ok(())
}
#[tokio::test]
async fn test_multi_readers_single_writer(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
use tokio::sync::mpsc::channel;
let temp_path = TempDir::new().unwrap();
let path = temp_path.path().to_path_buf();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let key = "foo";
let value = "bar";
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
let (tx, mut rx) = channel(5);
// now let us spawn a few readers
for _ in 0..5 {
let p = path.clone();
let tx = tx.clone();
std::thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let sled_settings = RocksBackendSettings {
db_path: p,
read_only: true,
column_family: None,
};
let key = "foo";
let mut db: RocksBackend<NoStorageSerde> =
RocksBackend::new(sled_settings).unwrap();
while db.load(key.as_bytes()).await.unwrap().is_none() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
tx.send(()).await.unwrap();
});
});
}
db.store(key.as_bytes().into(), value.as_bytes().into())
.await?;
let mut recvs = 0;
loop {
if rx.recv().await.is_some() {
recvs += 1;
if recvs == 5 {
break;
}
}
}
Ok(())
}
}

View File

@ -19,7 +19,7 @@ pub enum Error {
}
/// Sled backend setting
#[derive(Clone, Debug)]
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct SledBackendSettings {
/// File path to the db file
pub db_path: PathBuf,

View File

@ -0,0 +1,58 @@
use rocksdb::{Options, DB};
const TEMP_ROCKS_PATH: &str = "rocks";
pub fn rocksdb_ro() {
let mut opts = Options::default();
opts.create_if_missing(true);
// open in read only mode
let db = DB::open_cf_for_read_only(&opts, TEMP_ROCKS_PATH, ["blocks", "da"], false).unwrap();
let blocks_cf = db.cf_handle("blocks").unwrap();
let r = db.get_cf(blocks_cf, b"block1").unwrap().unwrap();
assert_eq!(r, b"block1data");
let da_cf = db.cf_handle("da").unwrap();
let r = db.get_cf(da_cf, b"da1").unwrap().unwrap();
assert_eq!(r, b"da1data");
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
pub fn rocksdb_rw() {
let mut opts = Options::default();
opts.create_if_missing(true);
opts.create_missing_column_families(true);
let db = DB::open_cf(&opts, TEMP_ROCKS_PATH, ["blocks", "da"]).unwrap();
// open blocks column family and insert a block
let blocks_cf = db.cf_handle("blocks").unwrap();
db.put_cf(blocks_cf, b"block1", b"block1data").unwrap();
// open da column family and insert a blob
let da_cf = db.cf_handle("da").unwrap();
db.put_cf(da_cf, b"da1", b"da1data").unwrap();
// A loop to mock a long running program
loop {
std::thread::sleep(std::time::Duration::from_secs(1));
}
}
fn main() {
let mut args = std::env::args();
args.next();
let o = args.next();
if o.is_none() {
println!("open in read-write mode");
rocksdb_rw()
} else {
println!("open in read-only mode");
rocksdb_ro()
}
}

View File

@ -20,6 +20,7 @@ pkgs.mkShell {
rust-bin.stable."1.75.0".default
clang_14
llvmPackages_14.libclang
openssl
];
shellHook = ''
export LIBCLANG_PATH="${pkgs.llvmPackages_14.libclang.lib}/lib";

View File

@ -2,10 +2,6 @@
FROM rust:1.75.0-slim-bullseye AS builder
# Using backports for go 1.19
RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \
>> /etc/apt/sources.list
# Dependecies for publishing documentation.
RUN apt-get update && apt-get install -yq \
git clang etcd-client libssl-dev pkg-config
@ -13,11 +9,11 @@ RUN apt-get update && apt-get install -yq \
WORKDIR /nomos
COPY . .
RUN cargo build --release --all
RUN cargo build --release --all --features metrics
# NODE IMAGE ----------------------------------------------------------
FROM bitnami/minideb:latest
FROM bitnami/minideb:bullseye
LABEL maintainer="augustinas@status.im" \
source="https://github.com/logos-co/nomos-node" \
@ -27,9 +23,9 @@ LABEL maintainer="augustinas@status.im" \
EXPOSE 3000 8080 9000 60000
COPY --from=builder /nomos/target/release/nomos-node /usr/bin/nomos-node
COPY --from=builder /nomos/target/release/nomos-cli /usr/bin/nomos-cli
COPY --from=builder /nomos/target/release/mixnode /usr/bin/mixnode
COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl
COPY nodes/nomos-node/config.yaml /etc/nomos/config.yaml
RUN install_packages python3 python3-etcd3
ENTRYPOINT ["/usr/bin/nomos-node"]

44
testnet/cli_config.yaml Normal file
View File

@ -0,0 +1,44 @@
backend:
host: 0.0.0.0
port: 4007
log_level: "fatal"
node_key: "0000000000000000000000000000000000000000000000000000000000000667"
discV5BootstrapNodes: []
initial_peers: ["/dns/bootstrap/tcp/3000"]
relayTopics: []
# Mixclient configuration to communicate with mixnodes.
# The libp2p network backend always requires this mixclient configuration
# (cannot be disabled for now).
mixnet_client:
# A mixclient mode. For details, see the documentation of the "mixnet" crate.
# - Sender
# - !SenderReceiver [mixnode_client_listen_address]
mode: Sender
# A mixnet topology, which contains the information of all mixnodes in the mixnet.
# (The topology is static for now.)
topology:
# Each mixnet layer consists of a list of mixnodes.
layers:
- nodes:
- address: mix-node-0:7707 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: mix-node-1:7717 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: mix-node-2:7727 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
# A max number of connections that will stay connected to mixnodes in the first mixnet layer.
connection_pool_size: 255
max_retries: 5
retry_delay:
secs: 1
nanos: 0
# A range of total delay that will be set to each Sphinx packets
# sent to the mixnet for timing obfuscation.
# Panics if start > end.
mixnet_delay:
start: "0ms"
end: "0ms"

View File

@ -0,0 +1,10 @@
log:
backend: "Stdout"
format: "Json"
level: "info"
api:
backend_settings:
address: 0.0.0.0:9090
cors_origins: []

View File

@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
org_id: 1
url: http://prometheus:9090
is_default: true
version: 1
editable: true

View File

@ -0,0 +1,51 @@
instance_name = nomos dashboard
;[dashboards.json]
;enabled = true
;path = /home/git/grafana/grafana-dashboards/dashboards
#################################### Auth ##########################
[auth]
disable_login_form = false
#################################### Anonymous Auth ##########################
[auth.anonymous]
# enable anonymous access
enabled = true
# specify organization name that should be used for unauthenticated users
;org_name = Public
# specify role for unauthenticated users
; org_role = Admin
org_role = Viewer
;[security]
;admin_user = ocr
;admin_password = ocr
;[users]
# disable user signup / registration
;allow_sign_up = false
# Set to true to automatically assign new users to the default organization (id 1)
;auto_assign_org = true
# Default role new users will be automatically assigned (if disabled above is set to true)
;auto_assign_org_role = Viewer
#################################### SMTP / Emailing ##########################
;[smtp]
;enabled = false
;host = localhost:25
;user =
;password =
;cert_file =
;key_file =
;skip_verify = false
;from_address = admin@grafana.localhost
;[emails]
;welcome_email_on_sign_up = false

View File

@ -0,0 +1 @@
GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel,yesoreyeram-boomtheme-panel,briangann-gauge-panel,pierosavi-imageit-panel,bessler-pictureit-panel,vonage-status-panel

View File

@ -0,0 +1,14 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
monitor: "Monitoring"
scrape_configs:
- job_name: "libp2p"
static_configs:
- targets:
- bootstrap:18080
- libp2p_node_1:18080
- libp2p_node_2:18080
- libp2p_node_3:18080

View File

@ -17,4 +17,4 @@ echo "CONSENSUS_PRIV_KEY: ${CONSENSUS_PRIV_KEY}"
echo "DA_VOTER: ${DA_VOTER}"
echo "OVERLAY_NODES: ${OVERLAY_NODES}"
exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml
exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml --with-metrics

View File

@ -0,0 +1,9 @@
#!/bin/sh
echo "I am a container ${HOSTNAME} bot"
while true
do
/usr/bin/nomos-cli chat --author nomos-ghost --message "$(date +%H:%M:%S) ~ ping" --network-config /etc/nomos/cli_config.yaml --node http://bootstrap:18080
sleep 10
done

View File

@ -33,4 +33,4 @@ echo "DA_VOTER: ${DA_VOTER}"
echo "OVERLAY_NODES: ${OVERLAY_NODES}"
echo "NET_INITIAL_PEERS: ${NET_INITIAL_PEERS}"
exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml
exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml --with-metrics

View File

@ -6,10 +6,12 @@ publish = false
[dependencies]
nomos-node = { path = "../nodes/nomos-node", default-features = false }
nomos-explorer = { path = "../nodes/explorer", default-features = false }
carnot-consensus = { path = "../nomos-services/carnot-consensus" }
nomos-network = { path = "../nomos-services/network", features = ["libp2p"]}
nomos-log = { path = "../nomos-services/log" }
nomos-api = { path = "../nomos-services/api" }
nomos-storage = { path = "../nomos-services/storage" }
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
nomos-core = { path = "../nomos-core" }
carnot-engine = { path = "../consensus/carnot-engine", features = ["serde"] }

View File

@ -9,6 +9,7 @@ use std::env;
// std
use std::net::TcpListener;
use std::ops::Mul;
use std::path::PathBuf;
use std::time::Duration;
use std::{fmt::Debug, sync::Mutex};
@ -41,7 +42,7 @@ pub fn adjust_timeout(d: Duration) -> Duration {
#[async_trait::async_trait]
pub trait Node: Sized {
type ConsensusInfo: Debug + Clone + PartialEq;
async fn spawn_nodes(config: SpawnConfig) -> Vec<Self>;
async fn spawn_nodes(config: SpawnConfig, storage_dir: Option<PathBuf>) -> Vec<Self>;
async fn consensus_info(&self) -> Self::ConsensusInfo;
fn stop(&mut self);
}

View File

@ -4,8 +4,10 @@ use std::{
time::Duration,
};
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE};
use mixnet_topology::{Layer, MixnetTopology, Node};
use nomos_log::{LoggerBackend, LoggerFormat};
use rand::{thread_rng, RngCore};
use tempfile::NamedTempFile;
@ -14,21 +16,37 @@ use crate::{get_available_port, MixnetConfig};
const MIXNODE_BIN: &str = "../target/debug/mixnode";
pub struct MixNode {
_tempdir: tempfile::TempDir,
child: Child,
}
impl Drop for MixNode {
fn drop(&mut self) {
self.child.kill().unwrap();
if std::thread::panicking() {
if let Err(e) = persist_tempdir(&mut self._tempdir, "mixnode") {
println!("failed to persist tempdir: {e}");
}
}
if let Err(e) = self.child.kill() {
println!("failed to kill the child process: {e}");
}
}
}
impl MixNode {
pub async fn spawn(config: MixnetNodeConfig) -> Self {
let config = mixnode::Config {
let dir = create_tempdir().unwrap();
let mut config = mixnode::Config {
mixnode: config,
log: Default::default(),
};
config.log.backend = LoggerBackend::File {
directory: dir.path().to_owned(),
prefix: Some(LOGS_PREFIX.into()),
};
config.log.format = LoggerFormat::Json;
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
@ -43,7 +61,10 @@ impl MixNode {
//TODO: use a sophisticated way to wait until the node is ready
tokio::time::sleep(Duration::from_secs(1)).await;
Self { child }
Self {
_tempdir: dir,
child,
}
}
pub async fn spawn_nodes(num_nodes: usize) -> (Vec<Self>, MixnetConfig) {

View File

@ -3,3 +3,26 @@ pub mod nomos;
pub use self::mixnode::MixNode;
pub use nomos::NomosNode;
use tempfile::TempDir;
const LOGS_PREFIX: &str = "__logs";
fn create_tempdir() -> std::io::Result<TempDir> {
// It's easier to use the current location instead of OS-default tempfile location
// because Github Actions can easily access files in the current location using wildcard
// to upload them as artifacts.
tempfile::TempDir::new_in(std::env::current_dir()?)
}
fn persist_tempdir(tempdir: &mut TempDir, label: &str) -> std::io::Result<()> {
println!(
"{}: persisting directory at {}",
label,
tempdir.path().display()
);
// we need ownership of the dir to persist it
let dir = std::mem::replace(tempdir, tempfile::tempdir()?);
// a bit confusing but `into_path` persists the directory
let _ = dir.into_path();
Ok(())
}

View File

@ -1,8 +1,10 @@
// std
use std::net::SocketAddr;
use std::path::PathBuf;
use std::process::{Child, Command, Stdio};
use std::time::Duration;
// internal
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, MixnetConfig, Node, SpawnConfig};
use carnot_consensus::{CarnotInfo, CarnotSettings};
use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings};
@ -22,14 +24,13 @@ use nomos_node::{api::AxumBackendSettings, Config, Tx};
use fraction::Fraction;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
use reqwest::Client;
use reqwest::{Client, Url};
use tempfile::NamedTempFile;
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const NOMOS_BIN: &str = "../target/debug/nomos-node";
const CARNOT_INFO_API: &str = "carnot/info";
const STORAGE_BLOCKS_API: &str = "storage/block";
const LOGS_PREFIX: &str = "__logs";
const GET_BLOCKS_INFO: &str = "carnot/blocks";
pub struct NomosNode {
@ -42,14 +43,14 @@ pub struct NomosNode {
impl Drop for NomosNode {
fn drop(&mut self) {
if std::thread::panicking() {
println!("persisting directory at {}", self._tempdir.path().display());
// we need ownership of the dir to persist it
let dir = std::mem::replace(&mut self._tempdir, tempfile::tempdir().unwrap());
// a bit confusing but `into_path` persists the directory
let _ = dir.into_path();
if let Err(e) = persist_tempdir(&mut self._tempdir, "nomos-node") {
println!("failed to persist tempdir: {e}");
}
}
self.child.kill().unwrap();
if let Err(e) = self.child.kill() {
println!("failed to kill the child process: {e}");
}
}
}
@ -61,11 +62,7 @@ impl NomosNode {
pub async fn spawn(mut config: Config) -> Self {
// Waku stores the messages in a db file in the current dir, we need a different
// directory for each node to avoid conflicts
//
// NOTE: It's easier to use the current location instead of OS-default tempfile location
// because Github Actions can easily access files in the current location using wildcard
// to upload them as artifacts.
let dir = tempfile::TempDir::new_in(std::env::current_dir().unwrap()).unwrap();
let dir = create_tempdir().unwrap();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
@ -115,6 +112,10 @@ impl NomosNode {
}
}
pub fn url(&self) -> Url {
format!("http://{}", self.addr).parse().unwrap()
}
pub async fn get_block(&self, id: BlockId) -> Option<Block<Tx, Certificate>> {
CLIENT
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
@ -201,10 +202,11 @@ impl NomosNode {
impl Node for NomosNode {
type ConsensusInfo = CarnotInfo;
async fn spawn_nodes(config: SpawnConfig) -> Vec<Self> {
async fn spawn_nodes(config: SpawnConfig, storage_dir: Option<PathBuf>) -> Vec<Self> {
match config {
SpawnConfig::Star { consensus, mixnet } => {
let (next_leader_config, configs) = create_node_configs(consensus, mixnet);
let (next_leader_config, configs) =
create_node_configs(consensus, mixnet, storage_dir);
let first_node_addr = node_address(&next_leader_config);
let mut nodes = vec![Self::spawn(next_leader_config).await];
@ -219,7 +221,8 @@ impl Node for NomosNode {
nodes
}
SpawnConfig::Chain { consensus, mixnet } => {
let (next_leader_config, configs) = create_node_configs(consensus, mixnet);
let (next_leader_config, configs) =
create_node_configs(consensus, mixnet, storage_dir);
let mut prev_node_addr = node_address(&next_leader_config);
let mut nodes = vec![Self::spawn(next_leader_config).await];
@ -253,6 +256,7 @@ impl Node for NomosNode {
fn create_node_configs(
consensus: ConsensusConfig,
mut mixnet: MixnetConfig,
storage_dir: Option<PathBuf>,
) -> (Config, Vec<Config>) {
let mut ids = vec![[0; 32]; consensus.n_participants];
for id in &mut ids {
@ -269,6 +273,7 @@ fn create_node_configs(
consensus.timeout,
mixnet.node_configs.pop(),
mixnet.topology.clone(),
storage_dir.clone(),
)
})
.collect::<Vec<_>>();
@ -292,6 +297,7 @@ fn create_node_config(
timeout: Duration,
mixnet_node_config: Option<MixnetNodeConfig>,
mixnet_topology: MixnetTopology,
storage_dir: Option<PathBuf>,
) -> Config {
let mixnet_client_mode = match mixnet_node_config {
Some(node_config) => {
@ -341,8 +347,6 @@ fn create_node_config(
cors_origins: vec![],
},
},
#[cfg(feature = "metrics")]
metrics: Default::default(),
da: nomos_da::Settings {
da_protocol: full_replication::Settings {
voter: id,
@ -353,6 +357,17 @@ fn create_node_config(
evicting_period: Duration::from_secs(60 * 60 * 24), // 1 day
},
},
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path: {
let p = storage_dir
.unwrap_or("db".into())
.join(NodeId::new(id).to_string());
let _ = std::fs::create_dir_all(&p);
p
},
read_only: false,
column_family: Some("blocks".into()),
},
};
config.network.backend.inner.port = get_available_port();

View File

@ -1,19 +1,50 @@
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
use nomos_cli::{
cmds::{disseminate::Disseminate, Command},
api::da::get_blobs,
cmds::disseminate::Disseminate,
da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
};
use std::time::Duration;
use nomos_core::da::{blob::Blob as _, DaProtocol};
use std::{io::Write, net::SocketAddr, path::PathBuf, time::Duration};
use tempfile::NamedTempFile;
use tests::{
adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig,
};
const CLI_BIN: &str = "../target/debug/nomos-cli";
use std::process::Command;
const TIMEOUT_SECS: u64 = 20;
#[tokio::test]
async fn disseminate_blob() {
fn run_disseminate(disseminate: &Disseminate) {
let mut binding = Command::new(CLI_BIN);
let c = binding
.args(["disseminate", "--network-config"])
.arg(disseminate.network_config.as_os_str())
.arg("--node-addr")
.arg(disseminate.node_addr.as_ref().unwrap().as_str());
match (&disseminate.data, &disseminate.file) {
(Some(data), None) => c.args(["--data", &data]),
(None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]),
(_, _) => panic!("Either data or file needs to be provided, but not both"),
};
c.status().expect("failed to execute nomos cli");
}
async fn spawn_and_setup_config(
file: &mut NamedTempFile,
config: &mut Disseminate,
storage_dir: Option<PathBuf>,
) -> (
Vec<NomosNode>,
FullReplication<AbsoluteNumber<Attestation, Certificate>>,
) {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await;
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
let mut nodes =
NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), storage_dir).await;
// kill the node so that we can reuse its network config
nodes[1].stop();
@ -21,35 +52,42 @@ async fn disseminate_blob() {
let mut network_config = nodes[1].config().network.clone();
// use a new port because the old port is sometimes not closed immediately
network_config.backend.inner.port = get_available_port();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
serde_yaml::to_writer(&mut file, &network_config).unwrap();
let cmd = Command::Disseminate(Disseminate {
data: "Hello World".into(),
timeout: 20,
network_config: config_path,
da_protocol: DaProtocolChoice {
da_protocol: Protocol::FullReplication,
settings: ProtocolSettings {
full_replication: FullReplicationSettings {
voter: [0; 32],
num_attestations: 1,
},
serde_yaml::to_writer(file, &network_config).unwrap();
let da_protocol = DaProtocolChoice {
da_protocol: Protocol::FullReplication,
settings: ProtocolSettings {
full_replication: FullReplicationSettings {
voter: [0; 32],
num_attestations: 1,
},
},
node_addr: Some(
format!(
"http://{}",
nodes[0].config().http.backend_settings.address.clone()
)
.parse()
.unwrap(),
),
output: None,
});
};
let thread = std::thread::spawn(move || cmd.run().unwrap());
let da =
<FullReplication<AbsoluteNumber<Attestation, Certificate>>>::try_from(da_protocol.clone())
.unwrap();
config.timeout = 20;
config.network_config = config_path;
config.da_protocol = da_protocol;
config.node_addr = Some(
format!(
"http://{}",
nodes[0].config().http.backend_settings.address.clone()
)
.parse()
.unwrap(),
);
(nodes, da)
}
async fn disseminate(config: &mut Disseminate) {
let mut file = NamedTempFile::new().unwrap();
let (nodes, da) = spawn_and_setup_config(&mut file, config, None).await;
run_disseminate(config);
// let thread = std::thread::spawn(move || cmd.run().unwrap());
tokio::time::timeout(
adjust_timeout(Duration::from_secs(TIMEOUT_SECS)),
@ -58,7 +96,131 @@ async fn disseminate_blob() {
.await
.unwrap();
thread.join().unwrap();
let (blob, bytes) = if let Some(data) = &config.data {
let bytes = data.as_bytes().to_vec();
(da.encode(bytes.clone())[0].hash(), bytes)
} else {
let bytes = std::fs::read(&config.file.as_ref().unwrap()).unwrap();
(da.encode(bytes.clone())[0].hash(), bytes)
};
assert_eq!(
get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(),
bytes.clone()
);
}
#[tokio::test]
async fn disseminate_blob() {
let mut config = Disseminate {
data: Some("hello world".to_string()),
..Default::default()
};
disseminate(&mut config).await;
}
#[tokio::test]
async fn disseminate_big_blob() {
const MSG_SIZE: usize = 1024;
let mut config = Disseminate {
data: std::iter::repeat(String::from("X"))
.take(MSG_SIZE)
.collect::<Vec<_>>()
.join("")
.into(),
..Default::default()
};
disseminate(&mut config).await;
}
#[tokio::test]
async fn disseminate_blob_from_file() {
let mut file = NamedTempFile::new().unwrap();
file.write_all("hello world".as_bytes()).unwrap();
let mut config = Disseminate {
file: Some(file.path().to_path_buf()),
..Default::default()
};
disseminate(&mut config).await;
}
fn run_explorer(explorer_api_addr: SocketAddr, db_path: PathBuf) {
let cfg = nomos_explorer::Config {
log: Default::default(),
api: nomos_api::ApiServiceSettings {
backend_settings: nomos_explorer::AxumBackendSettings {
address: explorer_api_addr,
cors_origins: Vec::new(),
},
},
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path,
read_only: true,
column_family: Some("blocks".into()),
},
};
std::thread::spawn(move || nomos_explorer::Explorer::run(cfg).unwrap());
}
#[test]
fn explorer() {
let (tx, rx) = std::sync::mpsc::channel();
let tx1 = tx.clone();
let temp = tempfile::tempdir().unwrap();
let db_path = temp.path().to_path_buf();
std::thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let mut file = NamedTempFile::new().unwrap();
let mut config = Disseminate::default();
let (nodes, _) =
spawn_and_setup_config(&mut file, &mut config, Some(db_path)).await;
let explorer_db = nodes[0].config().storage.db_path.clone();
let explorer_api_addr = format!("127.0.0.1:{}", get_available_port())
.parse()
.unwrap();
run_explorer(explorer_api_addr, explorer_db);
let c = nomos_cli::cmds::chat::NomosChat {
network_config: config.network_config.clone(),
da_protocol: config.da_protocol.clone(),
node: config.node_addr.clone().unwrap(),
explorer: format!("http://{}", explorer_api_addr).parse().unwrap(),
message: None,
author: None,
};
let c1 = c.clone();
std::thread::Builder::new()
.name("user1".into())
.spawn(move || {
let (rx1, app1) = c.run_app_without_terminal("user1".into()).unwrap();
app1.send_message("Hello from user1".into());
let msgs1 = rx1.recv().unwrap();
assert!(!msgs1.is_empty());
tx1.send(()).unwrap();
})
.unwrap();
std::thread::Builder::new()
.name("user2".into())
.spawn(move || {
let (rx2, app2) = c1.run_app_without_terminal("user2".into()).unwrap();
app2.send_message("Hello from user2".into());
let msgs2 = rx2.recv().unwrap();
assert!(!msgs2.is_empty());
tx.send(()).unwrap();
})
.unwrap();
});
});
assert!(rx.recv().is_ok());
assert!(rx.recv().is_ok());
}
async fn wait_for_cert_in_mempool(node: &NomosNode) {

View File

@ -72,21 +72,21 @@ async fn happy_test(nodes: &[NomosNode]) {
#[tokio::test]
async fn two_nodes_happy() {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), None).await;
happy_test(&nodes).await;
}
#[tokio::test]
async fn ten_nodes_happy() {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(10, mixnet_config)).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(10, mixnet_config), None).await;
happy_test(&nodes).await;
}
#[tokio::test]
async fn test_get_block() {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
let nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config), None).await;
happy_test(&nodes).await;
let id = nodes[0].consensus_info().await.last_committed_block.id;
tokio::time::timeout(Duration::from_secs(10), async {

View File

@ -11,14 +11,17 @@ const DUMMY_NODE_ID: NodeId = NodeId::new([0u8; 32]);
#[tokio::test]
async fn ten_nodes_one_down() {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(3).await;
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::Chain {
consensus: ConsensusConfig {
n_participants: 10,
threshold: Fraction::new(9u32, 10u32),
timeout: std::time::Duration::from_secs(5),
let mut nodes = NomosNode::spawn_nodes(
SpawnConfig::Chain {
consensus: ConsensusConfig {
n_participants: 10,
threshold: Fraction::new(9u32, 10u32),
timeout: std::time::Duration::from_secs(5),
},
mixnet: mixnet_config,
},
mixnet: mixnet_config,
})
None,
)
.await;
let mut failed_node = nodes.pop().unwrap();
failed_node.stop();