This commit is contained in:
Al Liu 2024-02-26 12:34:42 +08:00
commit ff560f263f
No known key found for this signature in database
GPG Key ID: C8AE9A6E0166923E
32 changed files with 694 additions and 135 deletions

View File

@ -1,3 +1,6 @@
#!/usr/bin/env groovy
library 'status-jenkins-lib@v1.8.6'
pipeline {
agent {
dockerfile {
@ -43,16 +46,16 @@ pipeline {
}
post {
failure {
always {
script {
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: 'Nightly Fuzztest Failed. Regression files archived as job artifacts')
}
}
success {
script {
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: 'Nightly Fuzztest Passed')
discord.send(
header: (
currentBuild.currentResult == 'SUCCESS' ?
'Nightly Fuzztest Passed' :
'Nightly Fuzztest Failed. Regression files archived as job artifacts'
),
cred: 'nomos-node-discord-commits-webhook',
)
}
}
cleanup { cleanWs() }

View File

@ -1,3 +1,6 @@
#!/usr/bin/env groovy
library 'status-jenkins-lib@v1.8.6'
pipeline {
agent {
dockerfile {
@ -67,20 +70,13 @@ pipeline {
}
post {
failure {
script {
def report = readFile("${WORKSPACE}/report.txt").trim()
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: "Nightly Integration Tests Failed: ${report}")
}
}
success {
script {
def report = readFile('report.txt').trim()
def discord = load "${WORKSPACE}/ci/discord.groovy"
discord.sendMessage(header: "Nightly Integration Tests Passed: ${report}")
}
}
always { script {
def report = readFile("${WORKSPACE}/report.txt").trim()
discord.send(
header: "Nightly Integration Tests ${currentBuild.currentResult}: ${report}",
cred: 'nomos-node-discord-commits-webhook',
)
} }
cleanup { cleanWs() }
}
}

View File

@ -1,32 +0,0 @@
def sendMessage(Map args=[:]) {
def opts = [
header: args.header ?: 'Build succeeded',
title: args.title ?: "${env.JOB_NAME}#${env.BUILD_NUMBER}",
cred: args.cred ?: 'nomos-node-discord-commits-webhook',
]
def repo = [
url: GIT_URL.minus('.git'),
branch: GIT_BRANCH.minus('origin/'),
commit: GIT_COMMIT.take(8),
]
withCredentials([
string(
credentialsId: opts.cred,
variable: 'DISCORD_WEBHOOK',
),
]) {
discordSend(
link: env.BUILD_URL,
result: currentBuild.currentResult,
webhookURL: env.DISCORD_WEBHOOK,
title: opts.title,
description: """
${opts.header}
Branch: [`${repo.branch}`](${repo.url}/commits/${repo.branch})
Commit: [`${repo.commit}`](${repo.url}/commit/${repo.commit})
""",
)
}
}
return this

View File

@ -139,6 +139,15 @@ services:
entrypoint: /usr/bin/mixnode
command: /etc/nomos/mixnode_config.yaml
chatbot:
container_name: chatbot
build:
context: .
dockerfile: testnet/Dockerfile
volumes:
- ./testnet:/etc/nomos
entrypoint: /etc/nomos/scripts/run_nomos_bot.sh
etcd:
container_name: etcd
image: quay.io/coreos/etcd:v3.4.15
@ -148,3 +157,29 @@ services:
- /usr/local/bin/etcd
- --advertise-client-urls=http://etcd:2379
- --listen-client-urls=http://0.0.0.0:2379
prometheus:
container_name: prometheus
image: prom/prometheus:latest
volumes:
- ./testnet/monitoring/prometheus-config.yml:/etc/prometheus/prometheus.yml:z
command:
- --config.file=/etc/prometheus/prometheus.yml
- --storage.tsdb.retention.time=7d
ports:
- 127.0.0.1:9090:9090
restart: on-failure
grafana:
container_name: grafana
image: grafana/grafana:latest
env_file:
- ./testnet/monitoring/grafana/plugins.env
volumes:
- ./testnet/monitoring/grafana/grafana.ini:/etc/grafana/grafana.ini:z
- ./testnet/monitoring/grafana/datasources.yaml:/etc/grafana/provisioning/datasources/datasources.yaml:z
ports:
- 9091:3000
restart: on-failure
depends_on:
- prometheus

View File

@ -27,7 +27,7 @@ nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "li
nomos-metrics = { path = "../../nomos-metrics" }
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
carnot-consensus = { path = "../../nomos-services/carnot-consensus", features = ["libp2p"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
nomos-da = { path = "../../nomos-services/data-availability", features = ["libp2p"] }
nomos-system-sig = { path = "../../nomos-services/system-sig" }

View File

@ -6,9 +6,9 @@ consensus:
private_key: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
fountain_settings: null
overlay_settings:
nodes: [[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1]]
nodes: ["0000000000000000000000000000000000000000000000000000000000000000", "0000000000000000000000000000000000000000000000000000000000000001"]
number_of_committees: 1
current_leader: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
current_leader: 0000000000000000000000000000000000000000000000000000000000000000
leader:
cur: 0
committee_membership: !Sad

View File

@ -33,7 +33,7 @@ use nomos_mempool::{
use nomos_metrics::Metrics;
use nomos_network::backends::libp2p::Libp2p;
use nomos_storage::{
backends::{sled::SledBackend, StorageSerde},
backends::{rocksdb::RocksBackend, StorageSerde},
StorageService,
};
@ -68,7 +68,7 @@ pub type Carnot = CarnotConsensus<
TreeOverlay<RoundRobin, RandomBeaconState>,
FillSizeWithTx<MB16, Tx>,
FillSizeWithBlobsCertificate<MB16, Certificate>,
SledBackend<Wire>,
RocksBackend<Wire>,
>;
pub type DataAvailability = DataAvailabilityService<
@ -94,7 +94,7 @@ pub struct Nomos {
consensus: ServiceHandle<Carnot>,
http: ServiceHandle<ApiService<AxumBackend<Tx, Wire, MB16>>>,
da: ServiceHandle<DataAvailability>,
storage: ServiceHandle<StorageService<SledBackend<Wire>>>,
storage: ServiceHandle<StorageService<RocksBackend<Wire>>>,
#[cfg(feature = "metrics")]
metrics: ServiceHandle<Metrics>,
system_sig: ServiceHandle<SystemSig>,

View File

@ -99,8 +99,10 @@ fn main() -> Result<()> {
#[cfg(feature = "metrics")]
metrics: MetricsSettings { registry },
da: config.da,
storage: nomos_storage::backends::sled::SledBackendSettings {
storage: nomos_storage::backends::rocksdb::RocksBackendSettings {
db_path: std::path::PathBuf::from(DEFAULT_DB_PATH),
read_only: false,
column_family: Some("blocks".into()),
},
system_sig: (),
},

View File

@ -48,6 +48,8 @@ use ratatui::{
use tui_input::{backend::crossterm::EventHandler, Input};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
// Limit the number of maximum in-flight requests
const MAX_BUFFERED_REQUESTS: usize = 20;
#[derive(Clone, Debug, Args)]
/// The almost-instant messaging protocol.
@ -64,6 +66,12 @@ pub struct NomosChat {
/// The explorer to connect to to fetch blocks and blobs
#[clap(long)]
pub explorer: Url,
/// Author for non interactive message formation
#[clap(long, requires("message"))]
pub author: Option<String>,
/// Message for non interactive message formation
#[clap(long, requires("author"))]
pub message: Option<String>,
}
pub struct App {
@ -88,12 +96,6 @@ impl NomosChat {
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let da_protocol = self.da_protocol.clone();
// setup terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let node_addr = Some(self.node.clone());
@ -127,6 +129,21 @@ impl NomosChat {
.wait_finished()
});
if let Some(author) = self.author.as_ref() {
let message = self
.message
.as_ref()
.expect("Should be available if author is set");
return run_once(author, message, payload_sender);
}
// setup terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
let app = App {
input: Input::default(),
username: None,
@ -157,6 +174,24 @@ impl NomosChat {
}
}
fn run_once(
author: &str,
message: &str,
payload_sender: UnboundedSender<Box<[u8]>>,
) -> Result<(), Box<dyn std::error::Error>> {
payload_sender.send(
wire::serialize(&ChatMessage {
author: author.to_string(),
message: message.to_string(),
_nonce: rand::random(),
})
.unwrap()
.into(),
)?;
Ok(())
}
fn run_app<B: Backend>(terminal: &mut Terminal<B>, mut app: App) {
let (message_tx, message_rx) = std::sync::mpsc::channel();
let node = app.node.clone();
@ -297,7 +332,7 @@ async fn fetch_new_messages(
process_block_blobs(node, block, da_settings)
})
.buffer_unordered(blocks.len())
.buffered(MAX_BUFFERED_REQUESTS)
.collect::<Vec<_>>()
.await;

View File

@ -9,11 +9,11 @@ use reqwest::Url;
use std::{path::PathBuf, sync::Arc, time::Duration};
use tokio::sync::Mutex;
#[derive(Args, Debug)]
#[derive(Args, Debug, Default)]
pub struct Disseminate {
// TODO: accept bytes
#[clap(short, long)]
pub data: String,
#[clap(short, long, required_unless_present("file"))]
pub data: Option<String>,
/// Path to the network config file
#[clap(short, long)]
pub network_config: PathBuf,
@ -30,6 +30,9 @@ pub struct Disseminate {
/// File to write the certificate to, if present.
#[clap(long)]
pub output: Option<PathBuf>,
/// File to disseminate
#[clap(short, long)]
pub file: Option<PathBuf>,
}
impl Disseminate {
@ -41,7 +44,15 @@ impl Disseminate {
<NetworkService<Libp2p> as ServiceData>::Settings,
>(std::fs::File::open(&self.network_config)?)?;
let (status_updates, rx) = std::sync::mpsc::channel();
let bytes: Box<[u8]> = self.data.clone().as_bytes().into();
let bytes: Box<[u8]> = if let Some(data) = &self.data {
data.clone().as_bytes().into()
} else {
let file_path = self.file.as_ref().unwrap();
let file_bytes = std::fs::read(file_path)?;
file_bytes.into_boxed_slice()
};
let timeout = Duration::from_secs(self.timeout);
let da_protocol = self.da_protocol.clone();
let node_addr = self.node_addr.clone();

View File

@ -205,7 +205,7 @@ impl ServiceCore for DisseminateService {
// protocols, but only the one chosen will be used.
// We can enforce only sensible combinations of protocol/settings
// are specified by using special clap directives
#[derive(Clone, Debug, Args)]
#[derive(Clone, Debug, Args, Default)]
pub struct DaProtocolChoice {
#[clap(long, default_value = "full-replication")]
pub da_protocol: Protocol,
@ -227,14 +227,15 @@ impl TryFrom<DaProtocolChoice> for FullReplication<AbsoluteNumber<Attestation, C
}
}
#[derive(Clone, Debug, Args)]
#[derive(Clone, Debug, Args, Default)]
pub struct ProtocolSettings {
#[clap(flatten)]
pub full_replication: FullReplicationSettings,
}
#[derive(Clone, Debug, ValueEnum)]
#[derive(Clone, Debug, ValueEnum, Default)]
pub enum Protocol {
#[default]
FullReplication,
}

View File

@ -20,7 +20,7 @@ nomos-network = { path = "../../nomos-services/network" }
nomos-da = { path = "../../nomos-services/data-availability" }
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock", "libp2p", "openapi"] }
nomos-metrics = { path = "../../nomos-metrics" }
nomos-storage = { path = "../../nomos-services/storage", features = ["sled"] }
nomos-storage = { path = "../../nomos-services/storage", features = ["rocksdb"] }
nomos-libp2p = { path = "../../nomos-libp2p" }
full-replication = { path = "../../nomos-da/full-replication" }
serde = { version = "1", features = ["derive"] }

View File

@ -23,7 +23,7 @@ use nomos_core::{
use nomos_mempool::{
backend::mockpool::MockPool, network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter,
};
use nomos_storage::backends::{sled::SledBackend, StorageSerde};
use nomos_storage::backends::{rocksdb::RocksBackend, StorageSerde};
pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
ConsensusLibp2pAdapter,
@ -37,7 +37,7 @@ pub type Carnot<Tx, SS, const SIZE: usize> = CarnotConsensus<
TreeOverlay<RoundRobin, RandomBeaconState>,
FillSizeWithTx<SIZE, Tx>,
FillSizeWithBlobsCertificate<SIZE, Certificate>,
SledBackend<SS>,
RocksBackend<SS>,
>;
pub async fn carnot_info<Tx, SS, const SIZE: usize>(

View File

@ -1,7 +1,7 @@
use carnot_engine::BlockId;
use nomos_core::block::Block;
use nomos_storage::{
backends::{sled::SledBackend, StorageSerde},
backends::{rocksdb::RocksBackend, StorageSerde},
StorageMsg, StorageService,
};
@ -14,7 +14,7 @@ where
S: StorageSerde + Send + Sync + 'static,
{
let relay = handle
.relay::<StorageService<SledBackend<S>>>()
.relay::<StorageService<RocksBackend<S>>>()
.connect()
.await?;
let (msg, receiver) = StorageMsg::new_load_message(id);

View File

@ -13,6 +13,7 @@ bytes = "1.2"
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", rev = "2f70806" }
serde = "1.0"
sled = { version = "0.34", optional = true }
rocksdb = { version = "0.22", optional = true }
thiserror = "1.0"
tracing = "0.1"

View File

@ -3,6 +3,9 @@ pub mod mock;
#[cfg(feature = "sled")]
pub mod sled;
#[cfg(feature = "rocksdb")]
pub mod rocksdb;
// std
use std::error::Error;
// crates

View File

@ -0,0 +1,253 @@
// std
use std::path::PathBuf;
use std::{marker::PhantomData, sync::Arc};
// crates
use async_trait::async_trait;
use bytes::Bytes;
pub use rocksdb::Error;
use rocksdb::{Options, DB};
// internal
use super::{StorageBackend, StorageSerde, StorageTransaction};
/// Rocks backend setting
#[derive(Clone, Debug)]
pub struct RocksBackendSettings {
/// File path to the db file
pub db_path: PathBuf,
pub read_only: bool,
pub column_family: Option<String>,
}
/// Rocks transaction type
// Do not use `TransactionDB` here, because rocksdb's `TransactionDB` does not support open by read-only mode.
// Thus, we cannot open the same db in two or more processes.
pub struct Transaction {
rocks: Arc<DB>,
#[allow(clippy::type_complexity)]
executor: Box<dyn FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync>,
}
impl Transaction {
/// Execute a function over the transaction
pub fn execute(self) -> Result<Option<Bytes>, Error> {
(self.executor)(&self.rocks)
}
}
impl StorageTransaction for Transaction {
type Result = Result<Option<Bytes>, Error>;
type Transaction = Self;
}
/// Rocks storage backend
pub struct RocksBackend<SerdeOp> {
rocks: Arc<DB>,
_serde_op: PhantomData<SerdeOp>,
}
impl<SerdeOp> RocksBackend<SerdeOp> {
pub fn txn(
&self,
executor: impl FnOnce(&DB) -> Result<Option<Bytes>, Error> + Send + Sync + 'static,
) -> Transaction {
Transaction {
rocks: self.rocks.clone(),
executor: Box::new(executor),
}
}
}
impl<SerdeOp> core::fmt::Debug for RocksBackend<SerdeOp> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
format!("RocksBackend {{ rocks: {:?} }}", self.rocks).fmt(f)
}
}
#[async_trait]
impl<SerdeOp: StorageSerde + Send + Sync + 'static> StorageBackend for RocksBackend<SerdeOp> {
type Settings = RocksBackendSettings;
type Error = rocksdb::Error;
type Transaction = Transaction;
type SerdeOperator = SerdeOp;
fn new(config: Self::Settings) -> Result<Self, Self::Error> {
let RocksBackendSettings {
db_path,
read_only,
column_family: cf,
} = config;
let db = match (read_only, cf) {
(true, None) => {
let mut opts = Options::default();
opts.create_if_missing(false);
DB::open_for_read_only(&opts, db_path, false)?
}
(true, Some(cf)) => {
let mut opts = Options::default();
opts.create_if_missing(false);
DB::open_cf_for_read_only(&opts, db_path, [cf], false)?
}
(false, None) => {
let mut opts = Options::default();
opts.create_if_missing(true);
DB::open(&opts, db_path)?
}
(false, Some(cf)) => {
let mut opts = Options::default();
opts.create_if_missing(true);
DB::open_cf(&opts, db_path, [cf])?
}
};
Ok(Self {
rocks: Arc::new(db),
_serde_op: Default::default(),
})
}
async fn store(&mut self, key: Bytes, value: Bytes) -> Result<(), Self::Error> {
self.rocks.put(key, value)
}
async fn load(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
self.rocks.get(key).map(|opt| opt.map(|ivec| ivec.into()))
}
async fn remove(&mut self, key: &[u8]) -> Result<Option<Bytes>, Self::Error> {
self.load(key).await.and_then(|val| {
if val.is_some() {
self.rocks.delete(key).map(|_| val)
} else {
Ok(None)
}
})
}
async fn execute(
&mut self,
transaction: Self::Transaction,
) -> Result<<Self::Transaction as StorageTransaction>::Result, Self::Error> {
match transaction.execute() {
Ok(result) => Ok(Ok(result)),
Err(e) => Err(e),
}
}
}
#[cfg(test)]
mod test {
use super::super::testing::NoStorageSerde;
use super::*;
use tempfile::TempDir;
#[tokio::test]
async fn test_store_load_remove(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
let temp_path = TempDir::new().unwrap();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let key = "foo";
let value = "bar";
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
db.store(key.as_bytes().into(), value.as_bytes().into())
.await?;
let load_value = db.load(key.as_bytes()).await?;
assert_eq!(load_value, Some(value.as_bytes().into()));
let removed_value = db.remove(key.as_bytes()).await?;
assert_eq!(removed_value, Some(value.as_bytes().into()));
Ok(())
}
#[tokio::test]
async fn test_transaction(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
let temp_path = TempDir::new().unwrap();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
let txn = db.txn(|db| {
let key = "foo";
let value = "bar";
db.put(key, value)?;
let result = db.get(key)?;
db.delete(key)?;
Ok(result.map(|ivec| ivec.to_vec().into()))
});
let result = db.execute(txn).await??;
assert_eq!(result, Some("bar".as_bytes().into()));
Ok(())
}
#[tokio::test]
async fn test_multi_readers_single_writer(
) -> Result<(), <RocksBackend<NoStorageSerde> as StorageBackend>::Error> {
use tokio::sync::mpsc::channel;
let temp_path = TempDir::new().unwrap();
let path = temp_path.path().to_path_buf();
let sled_settings = RocksBackendSettings {
db_path: temp_path.path().to_path_buf(),
read_only: false,
column_family: None,
};
let key = "foo";
let value = "bar";
let mut db: RocksBackend<NoStorageSerde> = RocksBackend::new(sled_settings)?;
let (tx, mut rx) = channel(5);
// now let us spawn a few readers
for _ in 0..5 {
let p = path.clone();
let tx = tx.clone();
std::thread::spawn(move || {
tokio::runtime::Runtime::new()
.unwrap()
.block_on(async move {
let sled_settings = RocksBackendSettings {
db_path: p,
read_only: true,
column_family: None,
};
let key = "foo";
let mut db: RocksBackend<NoStorageSerde> =
RocksBackend::new(sled_settings).unwrap();
while db.load(key.as_bytes()).await.unwrap().is_none() {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
}
tx.send(()).await.unwrap();
});
});
}
db.store(key.as_bytes().into(), value.as_bytes().into())
.await?;
let mut recvs = 0;
loop {
if rx.recv().await.is_some() {
recvs += 1;
if recvs == 5 {
break;
}
}
}
Ok(())
}
}

View File

@ -20,6 +20,7 @@ pkgs.mkShell {
rust-bin.stable."1.75.0".default
clang_14
llvmPackages_14.libclang
openssl
];
shellHook = ''
export LIBCLANG_PATH="${pkgs.llvmPackages_14.libclang.lib}/lib";

View File

@ -2,10 +2,6 @@
FROM rust:1.75.0-slim-bullseye AS builder
# Using backports for go 1.19
RUN echo 'deb http://deb.debian.org/debian bullseye-backports main' \
>> /etc/apt/sources.list
# Dependecies for publishing documentation.
RUN apt-get update && apt-get install -yq \
git clang etcd-client libssl-dev pkg-config
@ -13,11 +9,11 @@ RUN apt-get update && apt-get install -yq \
WORKDIR /nomos
COPY . .
RUN cargo build --release --all
RUN cargo build --release --all --features metrics
# NODE IMAGE ----------------------------------------------------------
FROM bitnami/minideb:latest
FROM bitnami/minideb:bullseye
LABEL maintainer="augustinas@status.im" \
source="https://github.com/logos-co/nomos-node" \
@ -27,9 +23,9 @@ LABEL maintainer="augustinas@status.im" \
EXPOSE 3000 8080 9000 60000
COPY --from=builder /nomos/target/release/nomos-node /usr/bin/nomos-node
COPY --from=builder /nomos/target/release/nomos-cli /usr/bin/nomos-cli
COPY --from=builder /nomos/target/release/mixnode /usr/bin/mixnode
COPY --from=builder /usr/bin/etcdctl /usr/bin/etcdctl
COPY nodes/nomos-node/config.yaml /etc/nomos/config.yaml
RUN install_packages python3 python3-etcd3
ENTRYPOINT ["/usr/bin/nomos-node"]

44
testnet/cli_config.yaml Normal file
View File

@ -0,0 +1,44 @@
backend:
host: 0.0.0.0
port: 4007
log_level: "fatal"
node_key: "0000000000000000000000000000000000000000000000000000000000000667"
discV5BootstrapNodes: []
initial_peers: ["/dns/bootstrap/tcp/3000"]
relayTopics: []
# Mixclient configuration to communicate with mixnodes.
# The libp2p network backend always requires this mixclient configuration
# (cannot be disabled for now).
mixnet_client:
# A mixclient mode. For details, see the documentation of the "mixnet" crate.
# - Sender
# - !SenderReceiver [mixnode_client_listen_address]
mode: Sender
# A mixnet topology, which contains the information of all mixnodes in the mixnet.
# (The topology is static for now.)
topology:
# Each mixnet layer consists of a list of mixnodes.
layers:
- nodes:
- address: mix-node-0:7707 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: mix-node-1:7717 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
- nodes:
- address: mix-node-2:7727 # A listen address of the mixnode
public_key: "fd3384e132ad02a56c78f45547ee40038dc79002b90d29ed90e08eee762ae715"
# A max number of connections that will stay connected to mixnodes in the first mixnet layer.
connection_pool_size: 255
max_retries: 5
retry_delay:
secs: 1
nanos: 0
# A range of total delay that will be set to each Sphinx packets
# sent to the mixnet for timing obfuscation.
# Panics if start > end.
mixnet_delay:
start: "0ms"
end: "0ms"

View File

@ -0,0 +1,10 @@
log:
backend: "Stdout"
format: "Json"
level: "info"
api:
backend_settings:
address: 0.0.0.0:9090
cors_origins: []

View File

@ -0,0 +1,11 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
access: proxy
org_id: 1
url: http://prometheus:9090
is_default: true
version: 1
editable: true

View File

@ -0,0 +1,51 @@
instance_name = nomos dashboard
;[dashboards.json]
;enabled = true
;path = /home/git/grafana/grafana-dashboards/dashboards
#################################### Auth ##########################
[auth]
disable_login_form = false
#################################### Anonymous Auth ##########################
[auth.anonymous]
# enable anonymous access
enabled = true
# specify organization name that should be used for unauthenticated users
;org_name = Public
# specify role for unauthenticated users
; org_role = Admin
org_role = Viewer
;[security]
;admin_user = ocr
;admin_password = ocr
;[users]
# disable user signup / registration
;allow_sign_up = false
# Set to true to automatically assign new users to the default organization (id 1)
;auto_assign_org = true
# Default role new users will be automatically assigned (if disabled above is set to true)
;auto_assign_org_role = Viewer
#################################### SMTP / Emailing ##########################
;[smtp]
;enabled = false
;host = localhost:25
;user =
;password =
;cert_file =
;key_file =
;skip_verify = false
;from_address = admin@grafana.localhost
;[emails]
;welcome_email_on_sign_up = false

View File

@ -0,0 +1 @@
GF_INSTALL_PLUGINS=grafana-worldmap-panel,grafana-piechart-panel,yesoreyeram-boomtheme-panel,briangann-gauge-panel,pierosavi-imageit-panel,bessler-pictureit-panel,vonage-status-panel

View File

@ -0,0 +1,14 @@
global:
scrape_interval: 15s
evaluation_interval: 15s
external_labels:
monitor: "Monitoring"
scrape_configs:
- job_name: "libp2p"
static_configs:
- targets:
- bootstrap:18080
- libp2p_node_1:18080
- libp2p_node_2:18080
- libp2p_node_3:18080

View File

@ -17,4 +17,4 @@ echo "CONSENSUS_PRIV_KEY: ${CONSENSUS_PRIV_KEY}"
echo "DA_VOTER: ${DA_VOTER}"
echo "OVERLAY_NODES: ${OVERLAY_NODES}"
exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml
exec /usr/bin/nomos-node /etc/nomos/bootstrap_config.yaml --with-metrics

View File

@ -0,0 +1,9 @@
#!/bin/sh
echo "I am a container ${HOSTNAME} bot"
while true
do
/usr/bin/nomos-cli chat --author nomos-ghost --message "$(date +%H:%M:%S) ~ ping" --network-config /etc/nomos/cli_config.yaml --node http://bootstrap:18080
sleep 10
done

View File

@ -33,4 +33,4 @@ echo "DA_VOTER: ${DA_VOTER}"
echo "OVERLAY_NODES: ${OVERLAY_NODES}"
echo "NET_INITIAL_PEERS: ${NET_INITIAL_PEERS}"
exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml
exec /usr/bin/nomos-node /etc/nomos/libp2p_config.yaml --with-metrics

View File

@ -4,8 +4,10 @@ use std::{
time::Duration,
};
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use mixnet_node::{MixnetNodeConfig, PRIVATE_KEY_SIZE};
use mixnet_topology::{Layer, MixnetTopology, Node};
use nomos_log::{LoggerBackend, LoggerFormat};
use rand::{thread_rng, RngCore};
use tempfile::NamedTempFile;
@ -14,21 +16,37 @@ use crate::{get_available_port, MixnetConfig};
const MIXNODE_BIN: &str = "../target/debug/mixnode";
pub struct MixNode {
_tempdir: tempfile::TempDir,
child: Child,
}
impl Drop for MixNode {
fn drop(&mut self) {
self.child.kill().unwrap();
if std::thread::panicking() {
if let Err(e) = persist_tempdir(&mut self._tempdir, "mixnode") {
println!("failed to persist tempdir: {e}");
}
}
if let Err(e) = self.child.kill() {
println!("failed to kill the child process: {e}");
}
}
}
impl MixNode {
pub async fn spawn(config: MixnetNodeConfig) -> Self {
let config = mixnode::Config {
let dir = create_tempdir().unwrap();
let mut config = mixnode::Config {
mixnode: config,
log: Default::default(),
};
config.log.backend = LoggerBackend::File {
directory: dir.path().to_owned(),
prefix: Some(LOGS_PREFIX.into()),
};
config.log.format = LoggerFormat::Json;
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
@ -43,7 +61,10 @@ impl MixNode {
//TODO: use a sophisticated way to wait until the node is ready
tokio::time::sleep(Duration::from_secs(1)).await;
Self { child }
Self {
_tempdir: dir,
child,
}
}
pub async fn spawn_nodes(num_nodes: usize) -> (Vec<Self>, MixnetConfig) {

View File

@ -3,3 +3,26 @@ pub mod nomos;
pub use self::mixnode::MixNode;
pub use nomos::NomosNode;
use tempfile::TempDir;
const LOGS_PREFIX: &str = "__logs";
fn create_tempdir() -> std::io::Result<TempDir> {
// It's easier to use the current location instead of OS-default tempfile location
// because Github Actions can easily access files in the current location using wildcard
// to upload them as artifacts.
tempfile::TempDir::new_in(std::env::current_dir()?)
}
fn persist_tempdir(tempdir: &mut TempDir, label: &str) -> std::io::Result<()> {
println!(
"{}: persisting directory at {}",
label,
tempdir.path().display()
);
// we need ownership of the dir to persist it
let dir = std::mem::replace(tempdir, tempfile::tempdir()?);
// a bit confusing but `into_path` persists the directory
let _ = dir.into_path();
Ok(())
}

View File

@ -3,6 +3,7 @@ use std::net::SocketAddr;
use std::process::{Child, Command, Stdio};
use std::time::Duration;
// internal
use super::{create_tempdir, persist_tempdir, LOGS_PREFIX};
use crate::{adjust_timeout, get_available_port, ConsensusConfig, MixnetConfig, Node, SpawnConfig};
use carnot_consensus::{CarnotInfo, CarnotSettings};
use carnot_engine::overlay::{RandomBeaconState, RoundRobin, TreeOverlay, TreeOverlaySettings};
@ -22,14 +23,13 @@ use nomos_node::{api::AxumBackendSettings, Config, Tx};
use fraction::Fraction;
use once_cell::sync::Lazy;
use rand::{thread_rng, Rng};
use reqwest::Client;
use reqwest::{Client, Url};
use tempfile::NamedTempFile;
static CLIENT: Lazy<Client> = Lazy::new(Client::new);
const NOMOS_BIN: &str = "../target/debug/nomos-node";
const CARNOT_INFO_API: &str = "carnot/info";
const STORAGE_BLOCKS_API: &str = "storage/block";
const LOGS_PREFIX: &str = "__logs";
const GET_BLOCKS_INFO: &str = "carnot/blocks";
pub struct NomosNode {
@ -42,14 +42,14 @@ pub struct NomosNode {
impl Drop for NomosNode {
fn drop(&mut self) {
if std::thread::panicking() {
println!("persisting directory at {}", self._tempdir.path().display());
// we need ownership of the dir to persist it
let dir = std::mem::replace(&mut self._tempdir, tempfile::tempdir().unwrap());
// a bit confusing but `into_path` persists the directory
let _ = dir.into_path();
if let Err(e) = persist_tempdir(&mut self._tempdir, "nomos-node") {
println!("failed to persist tempdir: {e}");
}
}
self.child.kill().unwrap();
if let Err(e) = self.child.kill() {
println!("failed to kill the child process: {e}");
}
}
}
@ -61,11 +61,7 @@ impl NomosNode {
pub async fn spawn(mut config: Config) -> Self {
// Waku stores the messages in a db file in the current dir, we need a different
// directory for each node to avoid conflicts
//
// NOTE: It's easier to use the current location instead of OS-default tempfile location
// because Github Actions can easily access files in the current location using wildcard
// to upload them as artifacts.
let dir = tempfile::TempDir::new_in(std::env::current_dir().unwrap()).unwrap();
let dir = create_tempdir().unwrap();
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
@ -115,6 +111,10 @@ impl NomosNode {
}
}
pub fn url(&self) -> Url {
format!("http://{}", self.addr).parse().unwrap()
}
pub async fn get_block(&self, id: BlockId) -> Option<Block<Tx, Certificate>> {
CLIENT
.post(&format!("http://{}/{}", self.addr, STORAGE_BLOCKS_API))
@ -341,8 +341,6 @@ fn create_node_config(
cors_origins: vec![],
},
},
#[cfg(feature = "metrics")]
metrics: Default::default(),
da: nomos_da::Settings {
da_protocol: full_replication::Settings {
voter: id,

View File

@ -1,17 +1,40 @@
use full_replication::{AbsoluteNumber, Attestation, Certificate, FullReplication};
use nomos_cli::{
cmds::{disseminate::Disseminate, Command},
api::da::get_blobs,
cmds::disseminate::Disseminate,
da::disseminate::{DaProtocolChoice, FullReplicationSettings, Protocol, ProtocolSettings},
};
use std::time::Duration;
use nomos_core::da::{blob::Blob as _, DaProtocol};
use std::{io::Write, time::Duration};
use tempfile::NamedTempFile;
use tests::{
adjust_timeout, get_available_port, nodes::nomos::Pool, MixNode, Node, NomosNode, SpawnConfig,
};
const CLI_BIN: &str = "../target/debug/nomos-cli";
use std::process::Command;
const TIMEOUT_SECS: u64 = 20;
#[tokio::test]
async fn disseminate_blob() {
fn run_disseminate(disseminate: &Disseminate) {
let mut binding = Command::new(CLI_BIN);
let c = binding
.args(["disseminate", "--network-config"])
.arg(disseminate.network_config.as_os_str())
.arg("--node-addr")
.arg(disseminate.node_addr.as_ref().unwrap().as_str());
match (&disseminate.data, &disseminate.file) {
(Some(data), None) => c.args(["--data", &data]),
(None, Some(file)) => c.args(["--file", file.as_os_str().to_str().unwrap()]),
(_, _) => panic!("Either data or file needs to be provided, but not both"),
};
c.status().expect("failed to execute nomos cli");
}
async fn disseminate(config: &mut Disseminate) {
let (_mixnodes, mixnet_config) = MixNode::spawn_nodes(2).await;
let mut nodes = NomosNode::spawn_nodes(SpawnConfig::chain_happy(2, mixnet_config)).await;
@ -25,31 +48,34 @@ async fn disseminate_blob() {
let mut file = NamedTempFile::new().unwrap();
let config_path = file.path().to_owned();
serde_yaml::to_writer(&mut file, &network_config).unwrap();
let cmd = Command::Disseminate(Disseminate {
data: "Hello World".into(),
timeout: 20,
network_config: config_path,
da_protocol: DaProtocolChoice {
da_protocol: Protocol::FullReplication,
settings: ProtocolSettings {
full_replication: FullReplicationSettings {
voter: [0; 32],
num_attestations: 1,
},
let da_protocol = DaProtocolChoice {
da_protocol: Protocol::FullReplication,
settings: ProtocolSettings {
full_replication: FullReplicationSettings {
voter: [0; 32],
num_attestations: 1,
},
},
node_addr: Some(
format!(
"http://{}",
nodes[0].config().http.backend_settings.address.clone()
)
.parse()
.unwrap(),
),
output: None,
});
};
let thread = std::thread::spawn(move || cmd.run().unwrap());
let da =
<FullReplication<AbsoluteNumber<Attestation, Certificate>>>::try_from(da_protocol.clone())
.unwrap();
config.timeout = 20;
config.network_config = config_path;
config.da_protocol = da_protocol;
config.node_addr = Some(
format!(
"http://{}",
nodes[0].config().http.backend_settings.address.clone()
)
.parse()
.unwrap(),
);
run_disseminate(&config);
// let thread = std::thread::spawn(move || cmd.run().unwrap());
tokio::time::timeout(
adjust_timeout(Duration::from_secs(TIMEOUT_SECS)),
@ -58,7 +84,53 @@ async fn disseminate_blob() {
.await
.unwrap();
thread.join().unwrap();
let (blob, bytes) = if let Some(data) = &config.data {
let bytes = data.as_bytes().to_vec();
(da.encode(bytes.clone())[0].hash(), bytes)
} else {
let bytes = std::fs::read(&config.file.as_ref().unwrap()).unwrap();
(da.encode(bytes.clone())[0].hash(), bytes)
};
assert_eq!(
get_blobs(&nodes[0].url(), vec![blob]).await.unwrap()[0].as_bytes(),
bytes.clone()
);
}
#[tokio::test]
async fn disseminate_blob() {
let mut config = Disseminate {
data: Some("hello world".to_string()),
..Default::default()
};
disseminate(&mut config).await;
}
#[tokio::test]
async fn disseminate_big_blob() {
const MSG_SIZE: usize = 1024;
let mut config = Disseminate {
data: std::iter::repeat(String::from("X"))
.take(MSG_SIZE)
.collect::<Vec<_>>()
.join("")
.into(),
..Default::default()
};
disseminate(&mut config).await;
}
#[tokio::test]
async fn disseminate_blob_from_file() {
let mut file = NamedTempFile::new().unwrap();
file.write_all("hello world".as_bytes()).unwrap();
let mut config = Disseminate {
file: Some(file.path().to_path_buf()),
..Default::default()
};
disseminate(&mut config).await;
}
async fn wait_for_cert_in_mempool(node: &NomosNode) {