diff --git a/compose.static.yml b/compose.static.yml index b311df65..41300081 100644 --- a/compose.static.yml +++ b/compose.static.yml @@ -26,7 +26,6 @@ services: - ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z depends_on: - cfgsync - - graylog entrypoint: /etc/nomos/scripts/run_nomos_node.sh nomos-node-1: @@ -40,7 +39,6 @@ services: - ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z depends_on: - cfgsync - - graylog ports: - "3001:3000/udp" - "18081:18080/tcp" @@ -57,7 +55,6 @@ services: - ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z depends_on: - cfgsync - - graylog ports: - "3002:3000/udp" - "18082:18080/tcp" @@ -74,7 +71,6 @@ services: - ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z depends_on: - cfgsync - - graylog ports: - "3003:3000/udp" - "18083:18080/tcp" @@ -105,64 +101,3 @@ services: restart: on-failure depends_on: - prometheus - - # Graylog related configuration - # More information at https://github.com/Graylog2/docker-compose/blob/main/open-core/docker-compose.yml - mongodb: - image: "mongo:5.0" - volumes: - - "mongodb_data:/data/db" - restart: "on-failure" - - datanode: - image: "${DATANODE_IMAGE:-graylog/graylog-datanode:5.2}" - hostname: "datanode" - environment: - GRAYLOG_DATANODE_NODE_ID_FILE: "/var/lib/graylog-datanode/node-id" - GRAYLOG_DATANODE_PASSWORD_SECRET: "${GRAYLOG_PASSWORD_SECRET:?Please configure GRAYLOG_PASSWORD_SECRET in the .env file}" - GRAYLOG_DATANODE_ROOT_PASSWORD_SHA2: "${GRAYLOG_ROOT_PASSWORD_SHA2:?Please configure GRAYLOG_ROOT_PASSWORD_SHA2 in the .env file}" - GRAYLOG_DATANODE_MONGODB_URI: "mongodb://mongodb:27017/graylog" - ulimits: - memlock: - hard: -1 - soft: -1 - nofile: - soft: 65536 - hard: 65536 - ports: - - "8999:8999/tcp" # DataNode API - - "9200:9200/tcp" - - "9300:9300/tcp" - volumes: - - "graylog-datanode:/var/lib/graylog-datanode" - restart: "on-failure" - - graylog: - image: "${GRAYLOG_IMAGE:-graylog/graylog:5.2}" - depends_on: - mongodb: - condition: "service_started" - entrypoint: "/usr/bin/tini -- /docker-entrypoint.sh" - environment: - GRAYLOG_NODE_ID_FILE: "/usr/share/graylog/data/data/node-id" - GRAYLOG_PASSWORD_SECRET: "${GRAYLOG_PASSWORD_SECRET:?Please configure GRAYLOG_PASSWORD_SECRET in the .env file}" - GRAYLOG_ROOT_PASSWORD_SHA2: "${GRAYLOG_ROOT_PASSWORD_SHA2:?Please configure GRAYLOG_ROOT_PASSWORD_SHA2 in the .env file}" - GRAYLOG_HTTP_BIND_ADDRESS: "0.0.0.0:9000" - GRAYLOG_HTTP_EXTERNAL_URI: "http://localhost:9000/" - GRAYLOG_MONGODB_URI: "mongodb://mongodb:27017/graylog" - ports: - - "9000:9000/tcp" # Server API - - "12201:12201/tcp" # GELF TCP - - "12201:12201/udp" # GELF UDP - volumes: - - "graylog_data:/usr/share/graylog/data/data" - - "graylog_journal:/usr/share/graylog/data/journal" - - ./testnet/monitoring/graylog.conf:/usr/share/graylog/data/config/graylog.conf - restart: "on-failure" - - -volumes: - mongodb_data: - graylog-datanode: - graylog_data: - graylog_journal: diff --git a/nodes/nomos-node/src/config.rs b/nodes/nomos-node/src/config.rs index 0255a422..9e91487a 100644 --- a/nodes/nomos-node/src/config.rs +++ b/nodes/nomos-node/src/config.rs @@ -180,6 +180,9 @@ pub fn update_log(log: &mut ::Settings, log_args: LogArgs if let Some(level_str) = level { log.level = match level_str.as_str() { "DEBUG" => Level::DEBUG, + "INFO" => Level::INFO, + "ERROR" => Level::ERROR, + "WARN" => Level::WARN, _ => return Err(eyre!("Invalid log level provided.")), }; } diff --git a/nomos-da/network/core/src/protocols/replication/behaviour.rs b/nomos-da/network/core/src/protocols/replication/behaviour.rs index b00e96d8..4ae0bea2 100644 --- a/nomos-da/network/core/src/protocols/replication/behaviour.rs +++ b/nomos-da/network/core/src/protocols/replication/behaviour.rs @@ -232,3 +232,247 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use futures::task::{waker_ref, ArcWake}; + use libp2p::{identity, PeerId}; + use nomos_da_messages::common::Blob; + use std::collections::HashSet; + use std::sync::Arc; + use std::task::{Context, Poll}; + + #[derive(Clone, Debug)] + struct MockMembershipHandler { + membership: HashMap>, + } + + impl MembershipHandler for MockMembershipHandler { + type NetworkId = SubnetworkId; + type Id = PeerId; + + fn membership(&self, peer_id: &PeerId) -> HashSet { + self.membership.get(peer_id).cloned().unwrap_or_default() + } + + fn members_of(&self, subnetwork: &Self::NetworkId) -> HashSet { + self.membership + .iter() + .filter_map(|(id, nets)| { + if nets.contains(subnetwork) { + Some(*id) + } else { + None + } + }) + .collect() + } + + fn is_allowed(&self, _id: &Self::Id) -> bool { + unimplemented!() + } + + fn members(&self) -> HashSet { + unimplemented!() + } + } + + struct TestWaker; + + impl ArcWake for TestWaker { + fn wake_by_ref(_arc_self: &Arc) {} + } + + fn create_replication_behaviours( + num_instances: usize, + subnet_id: u32, + membership: &mut HashMap>, + ) -> Vec> { + let mut behaviours = Vec::new(); + + let mut peer_ids = Vec::new(); + for _ in 0..num_instances { + let keypair = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(keypair.public()); + peer_ids.push(peer_id); + } + + for peer_id in &peer_ids { + membership.insert(*peer_id, HashSet::from([subnet_id])); + } + + let membership_handler = MockMembershipHandler { + membership: HashMap::default(), // This will be updated after all behaviours are added. + }; + + for peer_id in peer_ids { + let behaviour = ReplicationBehaviour::new(peer_id, membership_handler.clone()); + behaviours.push(behaviour); + } + + behaviours + } + + fn establish_connection( + behaviours: &mut [ReplicationBehaviour], + i: usize, + j: usize, + connection_id: ConnectionId, + ) { + let peer_id_i = behaviours[i].local_peer_id; + let peer_id_j = behaviours[j].local_peer_id; + + behaviours[i] + .handle_established_outbound_connection( + connection_id, + peer_id_j, + &Multiaddr::empty(), + Endpoint::Dialer, + ) + .unwrap(); + + behaviours[j] + .handle_established_inbound_connection( + connection_id, + peer_id_i, + &Multiaddr::empty(), + &Multiaddr::empty(), + ) + .unwrap(); + } + + fn deliver_message_to_peer( + all_behaviours: &mut [ReplicationBehaviour], + peer_id: PeerId, + connection_id: ConnectionId, + message: DaMessage, + ) { + if let Some(behaviour) = all_behaviours + .iter_mut() + .find(|b| b.local_peer_id == peer_id) + { + // Simulate the handler receiving the message. + behaviour.on_connection_handler_event( + peer_id, + connection_id, + Either::Left(HandlerEventToBehaviour::IncomingMessage { message }), + ); + } + } + + #[tokio::test] + async fn test_replication_behaviour() { + let num_instances = 20; + let mut membership = HashMap::default(); + + let subnet_0_behaviours = + create_replication_behaviours(num_instances / 2, 0, &mut membership); + let subnet_1_behaviours = + create_replication_behaviours(num_instances / 2, 1, &mut membership); + + let mut all_behaviours = subnet_0_behaviours; + all_behaviours.extend(subnet_1_behaviours); + + for behaviour in all_behaviours.iter_mut() { + let membership_handler = MockMembershipHandler { + membership: membership.clone(), + }; + behaviour.update_membership(membership_handler); + } + + // Simulate peer connections. + for (i, j) in (0..num_instances).flat_map(|i| (i + 1..num_instances).map(move |j| (i, j))) { + let connection_id = ConnectionId::new_unchecked(i); + establish_connection(&mut all_behaviours, i, j, connection_id); + } + + // Simulate sending a message from the first behavior. + let message = DaMessage { + blob: Some(Blob { + blob_id: vec![1, 2, 3], + data: vec![4, 5, 6], + }), + subnetwork_id: 0, + }; + all_behaviours[0].replicate_message(message.clone()); + + let waker = Arc::new(TestWaker); + let waker_ref = waker_ref(&waker); + let mut cx = Context::from_waker(&waker_ref); + + // Poll all behaviors until no more events are generated. + let mut pending_behaviours: Vec<_> = (0..num_instances).collect(); + let mut completed = false; + + while !completed { + completed = true; + for i in &pending_behaviours { + let behaviour = &mut all_behaviours[*i]; + let mut events = vec![]; + + while let Poll::Ready(event) = behaviour.poll(&mut cx) { + events.push(event); + } + + // If there are events, set completed to false to continue polling. + if !events.is_empty() { + completed = false; + + for event in events { + // Intercept the events that should be processed by the handler. + if let ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection_id), + event: + Either::Left(BehaviourEventToHandler::OutgoingMessage { message }), + } = event + { + // Deliver the message to the appropriate peer's handler. + deliver_message_to_peer( + &mut all_behaviours, + peer_id, + connection_id, + message.clone(), + ); + } + } + } + } + + // Filter out behaviors that no longer generate events. + pending_behaviours.retain(|i| { + let mut events = vec![]; + let behaviour = &mut all_behaviours[*i]; + while let Poll::Ready(event) = behaviour.poll(&mut cx) { + events.push(event); + } + !events.is_empty() + }); + } + + // Verify that all peers in subnet 0 have received the message, and others have not. + let (subnet_0_behaviours, other_behaviours): (Vec<_>, Vec<_>) = + all_behaviours.iter().partition(|behaviour| { + behaviour + .membership + .membership(&behaviour.local_peer_id) + .contains(&0) + }); + + // Assert that all members of subnet 0 have received the message. + for behaviour in &subnet_0_behaviours { + assert!(behaviour + .seen_message_cache + .contains(&(vec![1, 2, 3], message.subnetwork_id))); + } + + // Assert that no members of other subnets have received the message. + for behaviour in &other_behaviours { + assert!(behaviour.seen_message_cache.is_empty()); + } + + // Ensure the number of peers with the message matches the expected count + assert_eq!(subnet_0_behaviours.len(), num_instances / 2); + } +} diff --git a/testnet/scripts/run_nomos_node.sh b/testnet/scripts/run_nomos_node.sh index b7390146..0bb26b01 100755 --- a/testnet/scripts/run_nomos_node.sh +++ b/testnet/scripts/run_nomos_node.sh @@ -5,7 +5,8 @@ set -e export CFG_FILE_PATH="/config.yaml" \ CFG_SERVER_ADDR="http://cfgsync:4400" \ CFG_HOST_IP=$(hostname -i) \ + LOG_LEVEL="INFO" \ RISC0_DEV_MODE=true /usr/bin/cfgsync-client && \ - exec /usr/bin/nomos-node /config.yaml --with-metrics --log-backend gelf --log-addr graylog:12201 + exec /usr/bin/nomos-node /config.yaml --with-metrics