Add libp2p backend to nomos-node (#285)
* Add support for libp2p backend in integration tests * Add support for libp2p in nomos-node * change default to waku * add mutually exclusive features warning * disable default features to avoid unification * disable default features * remove leftover cargo build * Make sure we are subscribed to libp2p topic at startup * unify imports * typo in ci config * Sequential build and test steps for features * Add RandomBeaconState to libp2p carnot variant --------- Co-authored-by: Gusto <bacvinka@gmail.com>
This commit is contained in:
parent
d675585a0f
commit
78c6566d8a
@ -89,7 +89,7 @@ def runTestCases(test_cases, iterations) {
|
||||
echo "Running iteration ${i + 1} of ${iterations}"
|
||||
|
||||
for (test_case in test_cases) {
|
||||
def test_cmd = "cargo test -p tests --features ${feature} ${test_case}"
|
||||
def test_cmd = "cargo test -p tests --no-default-features --features ${feature} ${test_case}"
|
||||
if (sh(script: test_cmd, returnStatus: true) != 0) {
|
||||
error("Test '${test_case}' failed on iteration ${i + 1}")
|
||||
return
|
||||
|
@ -32,22 +32,28 @@ pipeline {
|
||||
stages {
|
||||
stage('Check') {
|
||||
steps {
|
||||
sh "cargo check --all --features ${FEATURES}"
|
||||
sh "cargo check --all --no-default-features --features ${FEATURES}"
|
||||
sh "cargo fmt -- --check"
|
||||
sh "cargo clippy --all --features ${FEATURES} -- --deny warnings"
|
||||
sh "cargo clippy --all --no-default-features --features ${FEATURES} -- --deny warnings"
|
||||
}
|
||||
}
|
||||
|
||||
stage('Build') {
|
||||
steps {
|
||||
sh "cargo build"
|
||||
sh "cargo build --all --features ${FEATURES}"
|
||||
stage('BuildAndTest') {
|
||||
options {
|
||||
lock("sync-features")
|
||||
}
|
||||
}
|
||||
stages {
|
||||
stage('Build') {
|
||||
steps {
|
||||
sh "cargo build --all --no-default-features --features ${FEATURES}"
|
||||
}
|
||||
}
|
||||
|
||||
stage('Test') {
|
||||
steps {
|
||||
sh "cargo test --all --features ${FEATURES}"
|
||||
stage('Test') {
|
||||
steps {
|
||||
sh "cargo test --all --no-default-features --features ${FEATURES}"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -31,23 +31,29 @@ pipeline {
|
||||
stages {
|
||||
stage('Check') {
|
||||
steps { script {
|
||||
nix.shell("cargo check --all --features ${FEATURES}")
|
||||
nix.shell("cargo check --all --no-default-features --features ${FEATURES}")
|
||||
nix.shell("cargo fmt -- --check")
|
||||
nix.shell("cargo clippy --all --features ${FEATURES} -- --deny warnings")
|
||||
nix.shell("cargo clippy --all --no-default-features --features ${FEATURES} -- --deny warnings")
|
||||
} }
|
||||
}
|
||||
|
||||
stage('Build') {
|
||||
steps { script {
|
||||
nix.shell("cargo build")
|
||||
nix.shell("cargo build --all --features ${FEATURES}")
|
||||
} }
|
||||
}
|
||||
stage('BuildAndTest') {
|
||||
options {
|
||||
lock('sync-features')
|
||||
}
|
||||
stages {
|
||||
stage('Build') {
|
||||
steps { script {
|
||||
nix.shell("cargo build --all --no-default-features --features ${FEATURES}")
|
||||
} }
|
||||
}
|
||||
|
||||
stage('Test') {
|
||||
steps { script {
|
||||
nix.shell("cargo test --all --features ${FEATURES}")
|
||||
} }
|
||||
stage('Test') {
|
||||
steps { script {
|
||||
nix.shell("cargo test --all --no-default-features --features ${FEATURES}")
|
||||
} }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -16,21 +16,13 @@ http = "0.2.9"
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
overwatch-derive = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
tracing = "0.1"
|
||||
multiaddr = "0.17"
|
||||
multiaddr = "0.18"
|
||||
nomos-core = { path = "../../nomos-core" }
|
||||
nomos-network = { path = "../../nomos-services/network", features = [
|
||||
"waku",
|
||||
"libp2p",
|
||||
] }
|
||||
nomos-network = { path = "../../nomos-services/network" }
|
||||
nomos-log = { path = "../../nomos-services/log" }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = [
|
||||
"waku",
|
||||
"mock",
|
||||
] }
|
||||
nomos-mempool = { path = "../../nomos-services/mempool", features = ["mock"] }
|
||||
nomos-http = { path = "../../nomos-services/http", features = ["http"] }
|
||||
nomos-consensus = { path = "../../nomos-services/consensus", features = [
|
||||
"waku",
|
||||
] }
|
||||
nomos-consensus = { path = "../../nomos-services/consensus" }
|
||||
metrics = { path = "../../nomos-services/metrics", optional = true }
|
||||
tracing-subscriber = "0.3"
|
||||
consensus-engine = { path = "../../consensus-engine" }
|
||||
@ -39,4 +31,10 @@ serde_json = "1.0"
|
||||
serde_yaml = "0.9"
|
||||
color-eyre = "0.6.0"
|
||||
serde = "1"
|
||||
waku-bindings = "0.1.1"
|
||||
waku-bindings = { version = "0.1.1", optional = true }
|
||||
|
||||
|
||||
[features]
|
||||
default = ["waku"]
|
||||
waku = ["waku-bindings", "nomos-network/waku", "nomos-mempool/waku", "nomos-consensus/waku"]
|
||||
libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-consensus/libp2p"]
|
||||
|
@ -1,296 +0,0 @@
|
||||
// std
|
||||
// crates
|
||||
use bytes::Bytes;
|
||||
use http::StatusCode;
|
||||
use nomos_consensus::{CarnotInfo, ConsensusMsg};
|
||||
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
// internal
|
||||
use futures::future::join_all;
|
||||
use multiaddr::Multiaddr;
|
||||
use nomos_core::wire;
|
||||
use nomos_http::backends::axum::AxumBackend;
|
||||
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
|
||||
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
|
||||
use nomos_mempool::backend::mockpool::MockPool;
|
||||
use nomos_mempool::network::adapters::waku::{
|
||||
WakuAdapter, WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC,
|
||||
};
|
||||
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
|
||||
use nomos_network::backends::waku::{Waku, WakuBackendMessage, WakuInfo};
|
||||
use nomos_network::{NetworkMsg, NetworkService};
|
||||
use nomos_node::{Carnot, Tx};
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use waku_bindings::WakuMessage;
|
||||
|
||||
pub fn carnot_info_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
let (carnot_channel, mut http_request_channel) =
|
||||
build_http_bridge::<Carnot, AxumBackend, _>(handle, HttpMethod::GET, "info")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
|
||||
if let Err(e) = handle_carnot_info_req(&carnot_channel, &res_tx).await {
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn mempool_metrics_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
let (mempool_channel, mut http_request_channel) =
|
||||
build_http_bridge::<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, AxumBackend, _>(
|
||||
handle,
|
||||
HttpMethod::GET,
|
||||
"metrics",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
|
||||
if let Err(e) = handle_mempool_metrics_req(&mempool_channel, res_tx).await {
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn mempool_add_tx_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
let (mempool_channel, mut http_request_channel) =
|
||||
build_http_bridge::<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, AxumBackend, _>(
|
||||
handle.clone(),
|
||||
HttpMethod::POST,
|
||||
"addtx",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest {
|
||||
res_tx, payload, ..
|
||||
}) = http_request_channel.recv().await
|
||||
{
|
||||
if let Err(e) =
|
||||
handle_mempool_add_tx_req(&handle, &mempool_channel, res_tx, payload).await
|
||||
{
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn waku_info_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
let (waku_channel, mut http_request_channel) = build_http_bridge::<
|
||||
NetworkService<Waku>,
|
||||
AxumBackend,
|
||||
_,
|
||||
>(handle, HttpMethod::GET, "info")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
|
||||
if let Err(e) = handle_waku_info_req(&waku_channel, &res_tx).await {
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn waku_add_conn_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
let (waku_channel, mut http_request_channel) = build_http_bridge::<
|
||||
NetworkService<Waku>,
|
||||
AxumBackend,
|
||||
_,
|
||||
>(handle, HttpMethod::POST, "conn")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest {
|
||||
res_tx, payload, ..
|
||||
}) = http_request_channel.recv().await
|
||||
{
|
||||
if let Err(e) = handle_add_conn_req(&waku_channel, res_tx, payload).await {
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_carnot_info_req(
|
||||
carnot_channel: &OutboundRelay<ConsensusMsg>,
|
||||
res_tx: &Sender<HttpResponse>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
carnot_channel
|
||||
.send(ConsensusMsg::Info { tx: sender })
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
let carnot_info: CarnotInfo = receiver.await.unwrap();
|
||||
res_tx
|
||||
.send(Ok(serde_json::to_vec(&carnot_info)?.into()))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_mempool_metrics_req(
|
||||
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
mempool_channel
|
||||
.send(MempoolMsg::Metrics {
|
||||
reply_channel: sender,
|
||||
})
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
let metrics: MempoolMetrics = receiver.await.unwrap();
|
||||
res_tx
|
||||
// TODO: use serde to serialize metrics
|
||||
.send(Ok(format!(
|
||||
"{{\"pending_tx\": {}, \"last_tx\": {}}}",
|
||||
metrics.pending_txs, metrics.last_tx_timestamp
|
||||
)
|
||||
.into()))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_mempool_add_tx_req(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
payload: Option<Bytes>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
if let Some(data) = payload
|
||||
.as_ref()
|
||||
.and_then(|b| String::from_utf8(b.to_vec()).ok())
|
||||
{
|
||||
let tx = Tx(data);
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
mempool_channel
|
||||
.send(MempoolMsg::AddTx {
|
||||
tx: tx.clone(),
|
||||
reply_channel: sender,
|
||||
})
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
match receiver.await {
|
||||
Ok(Ok(())) => {
|
||||
// broadcast transaction to peers
|
||||
let network_relay = handle.relay::<NetworkService<Waku>>().connect().await?;
|
||||
send_transaction(network_relay, tx).await?;
|
||||
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
|
||||
}
|
||||
Ok(Err(())) => Ok(res_tx
|
||||
.send(Err((
|
||||
StatusCode::CONFLICT,
|
||||
"error: unable to add tx".into(),
|
||||
)))
|
||||
.await?),
|
||||
Err(err) => Ok(res_tx
|
||||
.send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())))
|
||||
.await?),
|
||||
}
|
||||
} else {
|
||||
Err(
|
||||
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_waku_info_req(
|
||||
waku_channel: &OutboundRelay<NetworkMsg<Waku>>,
|
||||
res_tx: &Sender<HttpResponse>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
waku_channel
|
||||
.send(NetworkMsg::Process(WakuBackendMessage::Info {
|
||||
reply_channel: sender,
|
||||
}))
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
let waku_info: WakuInfo = receiver.await.unwrap();
|
||||
res_tx
|
||||
.send(Ok(serde_json::to_vec(&waku_info)?.into()))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_add_conn_req(
|
||||
waku_channel: &OutboundRelay<NetworkMsg<Waku>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
payload: Option<Bytes>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
if let Some(payload) = payload {
|
||||
if let Ok(addrs) = serde_json::from_slice::<Vec<Multiaddr>>(&payload) {
|
||||
let reqs: Vec<_> = addrs
|
||||
.into_iter()
|
||||
.map(|addr| {
|
||||
waku_channel.send(NetworkMsg::Process(WakuBackendMessage::ConnectPeer {
|
||||
addr,
|
||||
}))
|
||||
})
|
||||
.collect();
|
||||
|
||||
join_all(reqs).await;
|
||||
}
|
||||
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
|
||||
} else {
|
||||
Err(
|
||||
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
async fn send_transaction(
|
||||
network_relay: OutboundRelay<NetworkMsg<Waku>>,
|
||||
tx: Tx,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let payload = wire::serialize(&tx).expect("Tx serialization failed");
|
||||
network_relay
|
||||
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
|
||||
message: WakuMessage::new(
|
||||
payload,
|
||||
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
|
||||
1,
|
||||
chrono::Utc::now().timestamp_nanos() as usize,
|
||||
[],
|
||||
false,
|
||||
),
|
||||
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
|
||||
}))
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
Ok(())
|
||||
}
|
27
nodes/nomos-node/src/bridges/libp2p.rs
Normal file
27
nodes/nomos-node/src/bridges/libp2p.rs
Normal file
@ -0,0 +1,27 @@
|
||||
// std
|
||||
// crates
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot;
|
||||
// internal
|
||||
use nomos_http::http::HttpResponse;
|
||||
use nomos_network::backends::libp2p::{Command, Libp2p};
|
||||
use nomos_network::NetworkMsg;
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
|
||||
#[cfg(feature = "libp2p")]
|
||||
pub(super) async fn handle_libp2p_info_req(
|
||||
channel: &OutboundRelay<NetworkMsg<Libp2p>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
channel
|
||||
.send(NetworkMsg::Process(Command::Info { reply: sender }))
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
let info = receiver.await.unwrap();
|
||||
res_tx.send(Ok(serde_json::to_vec(&info)?.into())).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
179
nodes/nomos-node/src/bridges/mod.rs
Normal file
179
nodes/nomos-node/src/bridges/mod.rs
Normal file
@ -0,0 +1,179 @@
|
||||
// std
|
||||
// crates
|
||||
use nomos_consensus::{CarnotInfo, ConsensusMsg};
|
||||
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot;
|
||||
use tracing::error;
|
||||
// internal
|
||||
use nomos_http::backends::axum::AxumBackend;
|
||||
use nomos_http::bridge::{build_http_bridge, HttpBridgeRunner};
|
||||
use nomos_http::http::{HttpMethod, HttpRequest, HttpResponse};
|
||||
use nomos_mempool::backend::mockpool::MockPool;
|
||||
|
||||
use nomos_mempool::{MempoolMetrics, MempoolMsg, MempoolService};
|
||||
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter;
|
||||
#[cfg(feature = "waku")]
|
||||
use nomos_mempool::network::adapters::waku::WakuAdapter;
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p;
|
||||
#[cfg(feature = "waku")]
|
||||
use nomos_network::backends::waku::Waku;
|
||||
use nomos_network::NetworkService;
|
||||
use nomos_node::{Carnot, Tx};
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
|
||||
#[cfg(feature = "waku")]
|
||||
mod waku;
|
||||
#[cfg(feature = "waku")]
|
||||
use waku::*;
|
||||
#[cfg(feature = "libp2p")]
|
||||
mod libp2p;
|
||||
#[cfg(feature = "libp2p")]
|
||||
use libp2p::*;
|
||||
|
||||
macro_rules! get_handler {
|
||||
($handle:expr, $service:ty, $path:expr => $handler:tt) => {{
|
||||
let (channel, mut http_request_channel) =
|
||||
build_http_bridge::<$service, AxumBackend, _>($handle, HttpMethod::GET, $path)
|
||||
.await
|
||||
.unwrap();
|
||||
while let Some(HttpRequest { res_tx, .. }) = http_request_channel.recv().await {
|
||||
if let Err(e) = $handler(&channel, res_tx).await {
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}};
|
||||
}
|
||||
|
||||
pub fn carnot_info_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
get_handler!(handle, Carnot, "info" => handle_carnot_info_req)
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn mempool_metrics_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
#[cfg(feature = "waku")]
|
||||
{
|
||||
get_handler!(handle, MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, "metrics" => handle_mempool_metrics_req)
|
||||
}
|
||||
#[cfg(feature = "libp2p")]
|
||||
get_handler!(handle, MempoolService<Libp2pAdapter<Tx>, MockPool<Tx>>, "metrics" => handle_mempool_metrics_req)
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn network_info_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
#[cfg(feature = "waku")]
|
||||
{
|
||||
get_handler!(handle, NetworkService<Waku>, "info" => handle_waku_info_req)
|
||||
}
|
||||
#[cfg(feature = "libp2p")]
|
||||
get_handler!(handle, NetworkService<Libp2p>, "info" => handle_libp2p_info_req)
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(feature = "waku")]
|
||||
pub fn mempool_add_tx_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
let (mempool_channel, mut http_request_channel) =
|
||||
build_http_bridge::<MempoolService<WakuAdapter<Tx>, MockPool<Tx>>, AxumBackend, _>(
|
||||
handle.clone(),
|
||||
HttpMethod::POST,
|
||||
"addtx",
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest {
|
||||
res_tx, payload, ..
|
||||
}) = http_request_channel.recv().await
|
||||
{
|
||||
if let Err(e) =
|
||||
handle_mempool_add_tx_req(&handle, &mempool_channel, res_tx, payload).await
|
||||
{
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
#[cfg(feature = "waku")]
|
||||
pub fn waku_add_conn_bridge(
|
||||
handle: overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
) -> HttpBridgeRunner {
|
||||
Box::new(Box::pin(async move {
|
||||
let (waku_channel, mut http_request_channel) = build_http_bridge::<
|
||||
NetworkService<Waku>,
|
||||
AxumBackend,
|
||||
_,
|
||||
>(handle, HttpMethod::POST, "conn")
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
while let Some(HttpRequest {
|
||||
res_tx, payload, ..
|
||||
}) = http_request_channel.recv().await
|
||||
{
|
||||
if let Err(e) = handle_add_conn_req(&waku_channel, res_tx, payload).await {
|
||||
error!(e);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}))
|
||||
}
|
||||
|
||||
async fn handle_carnot_info_req(
|
||||
carnot_channel: &OutboundRelay<ConsensusMsg>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
carnot_channel
|
||||
.send(ConsensusMsg::Info { tx: sender })
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
let carnot_info: CarnotInfo = receiver.await.unwrap();
|
||||
res_tx
|
||||
.send(Ok(serde_json::to_vec(&carnot_info)?.into()))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_mempool_metrics_req(
|
||||
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
mempool_channel
|
||||
.send(MempoolMsg::Metrics {
|
||||
reply_channel: sender,
|
||||
})
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
let metrics: MempoolMetrics = receiver.await.unwrap();
|
||||
res_tx
|
||||
// TODO: use serde to serialize metrics
|
||||
.send(Ok(format!(
|
||||
"{{\"pending_tx\": {}, \"last_tx\": {}}}",
|
||||
metrics.pending_txs, metrics.last_tx_timestamp
|
||||
)
|
||||
.into()))
|
||||
.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
130
nodes/nomos-node/src/bridges/waku.rs
Normal file
130
nodes/nomos-node/src/bridges/waku.rs
Normal file
@ -0,0 +1,130 @@
|
||||
use bytes::Bytes;
|
||||
use http::StatusCode;
|
||||
use tokio::sync::mpsc::Sender;
|
||||
use tokio::sync::oneshot;
|
||||
// internal
|
||||
use futures::future::join_all;
|
||||
use nomos_core::wire;
|
||||
use nomos_http::http::HttpResponse;
|
||||
use nomos_mempool::network::adapters::waku::{
|
||||
WAKU_CARNOT_PUB_SUB_TOPIC, WAKU_CARNOT_TX_CONTENT_TOPIC,
|
||||
};
|
||||
use nomos_mempool::MempoolMsg;
|
||||
use nomos_network::backends::waku::{Waku, WakuBackendMessage};
|
||||
use nomos_network::{NetworkMsg, NetworkService};
|
||||
use nomos_node::Tx;
|
||||
use overwatch_rs::services::relay::OutboundRelay;
|
||||
use waku_bindings::{Multiaddr, WakuMessage};
|
||||
|
||||
pub(super) async fn handle_mempool_add_tx_req(
|
||||
handle: &overwatch_rs::overwatch::handle::OverwatchHandle,
|
||||
mempool_channel: &OutboundRelay<MempoolMsg<Tx>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
payload: Option<Bytes>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
if let Some(data) = payload
|
||||
.as_ref()
|
||||
.and_then(|b| String::from_utf8(b.to_vec()).ok())
|
||||
{
|
||||
let tx = Tx(data);
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
mempool_channel
|
||||
.send(MempoolMsg::AddTx {
|
||||
tx: tx.clone(),
|
||||
reply_channel: sender,
|
||||
})
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
match receiver.await {
|
||||
Ok(Ok(())) => {
|
||||
// broadcast transaction to peers
|
||||
let network_relay = handle.relay::<NetworkService<Waku>>().connect().await?;
|
||||
send_transaction(network_relay, tx).await?;
|
||||
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
|
||||
}
|
||||
Ok(Err(())) => Ok(res_tx
|
||||
.send(Err((
|
||||
StatusCode::CONFLICT,
|
||||
"error: unable to add tx".into(),
|
||||
)))
|
||||
.await?),
|
||||
Err(err) => Ok(res_tx
|
||||
.send(Err((StatusCode::INTERNAL_SERVER_ERROR, err.to_string())))
|
||||
.await?),
|
||||
}
|
||||
} else {
|
||||
Err(
|
||||
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn handle_waku_info_req(
|
||||
channel: &OutboundRelay<NetworkMsg<Waku>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let (sender, receiver) = oneshot::channel();
|
||||
|
||||
channel
|
||||
.send(NetworkMsg::Process(WakuBackendMessage::Info {
|
||||
reply_channel: sender,
|
||||
}))
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
let info = receiver.await.unwrap();
|
||||
res_tx.send(Ok(serde_json::to_vec(&info)?.into())).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(super) async fn handle_add_conn_req(
|
||||
waku_channel: &OutboundRelay<NetworkMsg<Waku>>,
|
||||
res_tx: Sender<HttpResponse>,
|
||||
payload: Option<Bytes>,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
if let Some(payload) = payload {
|
||||
if let Ok(addrs) = serde_json::from_slice::<Vec<Multiaddr>>(&payload) {
|
||||
let reqs: Vec<_> = addrs
|
||||
.into_iter()
|
||||
.map(|addr| {
|
||||
waku_channel.send(NetworkMsg::Process(WakuBackendMessage::ConnectPeer {
|
||||
addr,
|
||||
}))
|
||||
})
|
||||
.collect();
|
||||
|
||||
join_all(reqs).await;
|
||||
}
|
||||
Ok(res_tx.send(Ok(b"".to_vec().into())).await?)
|
||||
} else {
|
||||
Err(
|
||||
format!("Invalid payload, {payload:?}. Empty or couldn't transform into a utf8 String")
|
||||
.into(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
pub(super) async fn send_transaction(
|
||||
network_relay: OutboundRelay<NetworkMsg<Waku>>,
|
||||
tx: Tx,
|
||||
) -> Result<(), overwatch_rs::DynError> {
|
||||
let payload = wire::serialize(&tx).expect("Tx serialization failed");
|
||||
network_relay
|
||||
.send(NetworkMsg::Process(WakuBackendMessage::Broadcast {
|
||||
message: WakuMessage::new(
|
||||
payload,
|
||||
WAKU_CARNOT_TX_CONTENT_TOPIC.clone(),
|
||||
1,
|
||||
chrono::Utc::now().timestamp_nanos() as usize,
|
||||
[],
|
||||
false,
|
||||
),
|
||||
topic: Some(WAKU_CARNOT_PUB_SUB_TOPIC.clone()),
|
||||
}))
|
||||
.await
|
||||
.map_err(|(e, _)| e)?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -4,35 +4,49 @@ use color_eyre::eyre::Result;
|
||||
use consensus_engine::overlay::{FlatOverlay, RandomBeaconState, RoundRobin};
|
||||
#[cfg(feature = "metrics")]
|
||||
use metrics::{backend::map::MapMetricsBackend, types::MetricsData, MetricsService};
|
||||
use nomos_consensus::{
|
||||
network::adapters::waku::WakuAdapter as ConsensusWakuAdapter, CarnotConsensus,
|
||||
};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_consensus::network::adapters::libp2p::Libp2pAdapter as ConsensusLibp2pAdapter;
|
||||
#[cfg(feature = "waku")]
|
||||
use nomos_consensus::network::adapters::waku::WakuAdapter as ConsensusWakuAdapter;
|
||||
use nomos_consensus::CarnotConsensus;
|
||||
use nomos_core::fountain::mock::MockFountain;
|
||||
use nomos_http::backends::axum::AxumBackend;
|
||||
use nomos_http::bridge::HttpBridgeService;
|
||||
use nomos_http::http::HttpService;
|
||||
use nomos_log::Logger;
|
||||
use nomos_mempool::{
|
||||
backend::mockpool::MockPool, network::adapters::waku::WakuAdapter as MempoolWakuAdapter,
|
||||
MempoolService,
|
||||
};
|
||||
use nomos_network::{backends::waku::Waku, NetworkService};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_mempool::network::adapters::libp2p::Libp2pAdapter as MempoolLibp2pAdapter;
|
||||
#[cfg(feature = "waku")]
|
||||
use nomos_mempool::network::adapters::waku::WakuAdapter as MempoolWakuAdapter;
|
||||
use nomos_mempool::{backend::mockpool::MockPool, MempoolService};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2p;
|
||||
#[cfg(feature = "waku")]
|
||||
use nomos_network::backends::waku::Waku;
|
||||
use nomos_network::NetworkService;
|
||||
use overwatch_derive::*;
|
||||
use overwatch_rs::services::{handle::ServiceHandle, ServiceData};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub use tx::Tx;
|
||||
|
||||
#[cfg(all(feature = "waku", feature = "libp2p"))]
|
||||
compile_error!("feature \"waku\" and feature \"libp2p\" cannot be enabled at the same time");
|
||||
|
||||
#[derive(Deserialize, Debug, Clone, Serialize)]
|
||||
pub struct Config {
|
||||
pub log: <Logger as ServiceData>::Settings,
|
||||
#[cfg(feature = "waku")]
|
||||
pub network: <NetworkService<Waku> as ServiceData>::Settings,
|
||||
#[cfg(feature = "libp2p")]
|
||||
pub network: <NetworkService<Libp2p> as ServiceData>::Settings,
|
||||
pub http: <HttpService<AxumBackend> as ServiceData>::Settings,
|
||||
pub consensus: <Carnot as ServiceData>::Settings,
|
||||
#[cfg(feature = "metrics")]
|
||||
pub metrics: <MetricsService<MapMetricsBackend<MetricsData>> as ServiceData>::Settings,
|
||||
}
|
||||
|
||||
#[cfg(feature = "waku")]
|
||||
pub type Carnot = CarnotConsensus<
|
||||
ConsensusWakuAdapter,
|
||||
MockPool<Tx>,
|
||||
@ -41,11 +55,26 @@ pub type Carnot = CarnotConsensus<
|
||||
FlatOverlay<RoundRobin, RandomBeaconState>,
|
||||
>;
|
||||
|
||||
#[cfg(feature = "libp2p")]
|
||||
pub type Carnot = CarnotConsensus<
|
||||
ConsensusLibp2pAdapter,
|
||||
MockPool<Tx>,
|
||||
MempoolLibp2pAdapter<Tx>,
|
||||
MockFountain,
|
||||
FlatOverlay<RoundRobin, RandomBeaconState>,
|
||||
>;
|
||||
|
||||
#[derive(Services)]
|
||||
pub struct Nomos {
|
||||
logging: ServiceHandle<Logger>,
|
||||
#[cfg(feature = "waku")]
|
||||
network: ServiceHandle<NetworkService<Waku>>,
|
||||
#[cfg(feature = "libp2p")]
|
||||
network: ServiceHandle<NetworkService<Libp2p>>,
|
||||
#[cfg(feature = "waku")]
|
||||
mockpool: ServiceHandle<MempoolService<MempoolWakuAdapter<Tx>, MockPool<Tx>>>,
|
||||
#[cfg(feature = "libp2p")]
|
||||
mockpool: ServiceHandle<MempoolService<MempoolLibp2pAdapter<Tx>, MockPool<Tx>>>,
|
||||
consensus: ServiceHandle<Carnot>,
|
||||
http: ServiceHandle<HttpService<AxumBackend>>,
|
||||
bridges: ServiceHandle<HttpBridgeService>,
|
||||
|
@ -21,10 +21,12 @@ fn main() -> Result<()> {
|
||||
let config = serde_yaml::from_reader::<_, Config>(std::fs::File::open(config)?)?;
|
||||
let bridges: Vec<HttpBridge> = vec![
|
||||
Arc::new(Box::new(bridges::carnot_info_bridge)),
|
||||
Arc::new(Box::new(bridges::mempool_add_tx_bridge)),
|
||||
Arc::new(Box::new(bridges::mempool_metrics_bridge)),
|
||||
Arc::new(Box::new(bridges::network_info_bridge)),
|
||||
#[cfg(feature = "waku")]
|
||||
Arc::new(Box::new(bridges::mempool_add_tx_bridge)),
|
||||
#[cfg(feature = "waku")]
|
||||
Arc::new(Box::new(bridges::waku_add_conn_bridge)),
|
||||
Arc::new(Box::new(bridges::waku_info_bridge)),
|
||||
];
|
||||
let app = OverwatchRunner::<Nomos>::run(
|
||||
NomosServiceSettings {
|
||||
|
@ -230,9 +230,9 @@ impl NetworkAdapter for Libp2pAdapter {
|
||||
let message_cache = MessageCache::new();
|
||||
let cache = message_cache.clone();
|
||||
let relay = network_relay.clone();
|
||||
Self::subscribe(&relay, TOPIC).await;
|
||||
// TODO: maybe we need the runtime handle here?
|
||||
tokio::spawn(async move {
|
||||
Self::subscribe(&relay, TOPIC).await;
|
||||
let (sender, receiver) = tokio::sync::oneshot::channel();
|
||||
if let Err((e, _)) = relay
|
||||
.send(NetworkMsg::Subscribe {
|
||||
|
@ -5,21 +5,22 @@ edition = "2021"
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
nomos-node = { path = "../nodes/nomos-node" }
|
||||
nomos-node = { path = "../nodes/nomos-node", default-features = false }
|
||||
nomos-consensus = { path = "../nomos-services/consensus" }
|
||||
nomos-network = { path = "../nomos-services/network", features = ["waku"] }
|
||||
nomos-network = { path = "../nomos-services/network" }
|
||||
nomos-log = { path = "../nomos-services/log" }
|
||||
nomos-http = { path = "../nomos-services/http", features = ["http"] }
|
||||
overwatch-rs = { git = "https://github.com/logos-co/Overwatch", branch = "main" }
|
||||
nomos-core = { path = "../nomos-core" }
|
||||
consensus-engine = { path = "../consensus-engine", features = ["serde"] }
|
||||
nomos-mempool = { path = "../nomos-services/mempool", features = ["waku", "mock"] }
|
||||
nomos-mempool = { path = "../nomos-services/mempool", features = ["mock"] }
|
||||
rand = "0.8"
|
||||
once_cell = "1"
|
||||
rand_xoshiro = "0.6"
|
||||
secp256k1 = { version = "0.26", features = ["rand"] }
|
||||
waku-bindings = "0.1.1"
|
||||
waku-bindings = { version = "0.1.1", optional = true }
|
||||
reqwest = { version = "0.11", features = ["json"] }
|
||||
nomos-libp2p = { path = "../nomos-libp2p", optional = true }
|
||||
tempfile = "3.6"
|
||||
serde_yaml = "0.9"
|
||||
tokio = "1"
|
||||
@ -38,3 +39,5 @@ path = "src/tests/unhappy.rs"
|
||||
|
||||
[features]
|
||||
metrics = ["nomos-node/metrics"]
|
||||
waku = ["nomos-network/waku", "nomos-mempool/waku", "waku-bindings"]
|
||||
libp2p = ["nomos-network/libp2p", "nomos-mempool/libp2p", "nomos-libp2p"]
|
@ -8,12 +8,16 @@ use consensus_engine::overlay::{FlatOverlaySettings, RoundRobin};
|
||||
use consensus_engine::NodeId;
|
||||
use nomos_consensus::{CarnotInfo, CarnotSettings};
|
||||
use nomos_http::backends::axum::AxumBackendSettings;
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_libp2p::{Multiaddr, SwarmConfig};
|
||||
use nomos_log::{LoggerBackend, LoggerFormat};
|
||||
use nomos_network::{
|
||||
backends::waku::{WakuConfig, WakuInfo},
|
||||
NetworkConfig,
|
||||
};
|
||||
#[cfg(feature = "libp2p")]
|
||||
use nomos_network::backends::libp2p::Libp2pInfo;
|
||||
#[cfg(feature = "waku")]
|
||||
use nomos_network::backends::waku::{WakuConfig, WakuInfo};
|
||||
use nomos_network::NetworkConfig;
|
||||
use nomos_node::Config;
|
||||
#[cfg(feature = "waku")]
|
||||
use waku_bindings::{Multiaddr, PeerId};
|
||||
// crates
|
||||
use fraction::Fraction;
|
||||
@ -75,7 +79,12 @@ impl NomosNode {
|
||||
child,
|
||||
_tempdir: dir,
|
||||
};
|
||||
node.wait_online().await;
|
||||
tokio::time::timeout(std::time::Duration::from_secs(10), async {
|
||||
node.wait_online().await
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
node
|
||||
}
|
||||
|
||||
@ -92,6 +101,7 @@ impl NomosNode {
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "waku")]
|
||||
pub async fn peer_id(&self) -> PeerId {
|
||||
self.get(NETWORK_INFO_API)
|
||||
.await
|
||||
@ -103,6 +113,7 @@ impl NomosNode {
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
#[cfg(feature = "waku")]
|
||||
pub async fn get_listening_address(&self) -> Multiaddr {
|
||||
self.get(NETWORK_INFO_API)
|
||||
.await
|
||||
@ -115,6 +126,18 @@ impl NomosNode {
|
||||
.swap_remove(0)
|
||||
}
|
||||
|
||||
#[cfg(feature = "libp2p")]
|
||||
pub async fn get_listening_address(&self) -> Multiaddr {
|
||||
self.get(NETWORK_INFO_API)
|
||||
.await
|
||||
.unwrap()
|
||||
.json::<Libp2pInfo>()
|
||||
.await
|
||||
.unwrap()
|
||||
.listen_addresses
|
||||
.swap_remove(0)
|
||||
}
|
||||
|
||||
// not async so that we can use this in `Drop`
|
||||
pub fn get_logs_from_file(&self) -> String {
|
||||
println!(
|
||||
@ -200,10 +223,16 @@ fn create_node_config(
|
||||
) -> Config {
|
||||
let mut config = Config {
|
||||
network: NetworkConfig {
|
||||
#[cfg(feature = "waku")]
|
||||
backend: WakuConfig {
|
||||
initial_peers: vec![],
|
||||
inner: Default::default(),
|
||||
},
|
||||
#[cfg(feature = "libp2p")]
|
||||
backend: SwarmConfig {
|
||||
initial_peers: vec![],
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
consensus: CarnotSettings {
|
||||
private_key,
|
||||
@ -230,6 +259,14 @@ fn create_node_config(
|
||||
#[cfg(feature = "metrics")]
|
||||
metrics: Default::default(),
|
||||
};
|
||||
config.network.backend.inner.port = Some(get_available_port() as usize);
|
||||
#[cfg(feature = "waku")]
|
||||
{
|
||||
config.network.backend.inner.port = Some(get_available_port() as usize);
|
||||
}
|
||||
#[cfg(feature = "libp2p")]
|
||||
{
|
||||
config.network.backend.port = get_available_port();
|
||||
}
|
||||
|
||||
config
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user