From d17afcbe4d6845d3025c9ad516e4fb0aed07f50c Mon Sep 17 00:00:00 2001 From: gusto Date: Thu, 3 Oct 2024 20:37:13 +0300 Subject: [PATCH] Replication behavior membership test (#805) --- .../src/protocols/replication/behaviour.rs | 244 ++++++++++++++++++ 1 file changed, 244 insertions(+) diff --git a/nomos-da/network/core/src/protocols/replication/behaviour.rs b/nomos-da/network/core/src/protocols/replication/behaviour.rs index b00e96d8..4ae0bea2 100644 --- a/nomos-da/network/core/src/protocols/replication/behaviour.rs +++ b/nomos-da/network/core/src/protocols/replication/behaviour.rs @@ -232,3 +232,247 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + use futures::task::{waker_ref, ArcWake}; + use libp2p::{identity, PeerId}; + use nomos_da_messages::common::Blob; + use std::collections::HashSet; + use std::sync::Arc; + use std::task::{Context, Poll}; + + #[derive(Clone, Debug)] + struct MockMembershipHandler { + membership: HashMap>, + } + + impl MembershipHandler for MockMembershipHandler { + type NetworkId = SubnetworkId; + type Id = PeerId; + + fn membership(&self, peer_id: &PeerId) -> HashSet { + self.membership.get(peer_id).cloned().unwrap_or_default() + } + + fn members_of(&self, subnetwork: &Self::NetworkId) -> HashSet { + self.membership + .iter() + .filter_map(|(id, nets)| { + if nets.contains(subnetwork) { + Some(*id) + } else { + None + } + }) + .collect() + } + + fn is_allowed(&self, _id: &Self::Id) -> bool { + unimplemented!() + } + + fn members(&self) -> HashSet { + unimplemented!() + } + } + + struct TestWaker; + + impl ArcWake for TestWaker { + fn wake_by_ref(_arc_self: &Arc) {} + } + + fn create_replication_behaviours( + num_instances: usize, + subnet_id: u32, + membership: &mut HashMap>, + ) -> Vec> { + let mut behaviours = Vec::new(); + + let mut peer_ids = Vec::new(); + for _ in 0..num_instances { + let keypair = identity::Keypair::generate_ed25519(); + let peer_id = PeerId::from(keypair.public()); + peer_ids.push(peer_id); + } + + for peer_id in &peer_ids { + membership.insert(*peer_id, HashSet::from([subnet_id])); + } + + let membership_handler = MockMembershipHandler { + membership: HashMap::default(), // This will be updated after all behaviours are added. + }; + + for peer_id in peer_ids { + let behaviour = ReplicationBehaviour::new(peer_id, membership_handler.clone()); + behaviours.push(behaviour); + } + + behaviours + } + + fn establish_connection( + behaviours: &mut [ReplicationBehaviour], + i: usize, + j: usize, + connection_id: ConnectionId, + ) { + let peer_id_i = behaviours[i].local_peer_id; + let peer_id_j = behaviours[j].local_peer_id; + + behaviours[i] + .handle_established_outbound_connection( + connection_id, + peer_id_j, + &Multiaddr::empty(), + Endpoint::Dialer, + ) + .unwrap(); + + behaviours[j] + .handle_established_inbound_connection( + connection_id, + peer_id_i, + &Multiaddr::empty(), + &Multiaddr::empty(), + ) + .unwrap(); + } + + fn deliver_message_to_peer( + all_behaviours: &mut [ReplicationBehaviour], + peer_id: PeerId, + connection_id: ConnectionId, + message: DaMessage, + ) { + if let Some(behaviour) = all_behaviours + .iter_mut() + .find(|b| b.local_peer_id == peer_id) + { + // Simulate the handler receiving the message. + behaviour.on_connection_handler_event( + peer_id, + connection_id, + Either::Left(HandlerEventToBehaviour::IncomingMessage { message }), + ); + } + } + + #[tokio::test] + async fn test_replication_behaviour() { + let num_instances = 20; + let mut membership = HashMap::default(); + + let subnet_0_behaviours = + create_replication_behaviours(num_instances / 2, 0, &mut membership); + let subnet_1_behaviours = + create_replication_behaviours(num_instances / 2, 1, &mut membership); + + let mut all_behaviours = subnet_0_behaviours; + all_behaviours.extend(subnet_1_behaviours); + + for behaviour in all_behaviours.iter_mut() { + let membership_handler = MockMembershipHandler { + membership: membership.clone(), + }; + behaviour.update_membership(membership_handler); + } + + // Simulate peer connections. + for (i, j) in (0..num_instances).flat_map(|i| (i + 1..num_instances).map(move |j| (i, j))) { + let connection_id = ConnectionId::new_unchecked(i); + establish_connection(&mut all_behaviours, i, j, connection_id); + } + + // Simulate sending a message from the first behavior. + let message = DaMessage { + blob: Some(Blob { + blob_id: vec![1, 2, 3], + data: vec![4, 5, 6], + }), + subnetwork_id: 0, + }; + all_behaviours[0].replicate_message(message.clone()); + + let waker = Arc::new(TestWaker); + let waker_ref = waker_ref(&waker); + let mut cx = Context::from_waker(&waker_ref); + + // Poll all behaviors until no more events are generated. + let mut pending_behaviours: Vec<_> = (0..num_instances).collect(); + let mut completed = false; + + while !completed { + completed = true; + for i in &pending_behaviours { + let behaviour = &mut all_behaviours[*i]; + let mut events = vec![]; + + while let Poll::Ready(event) = behaviour.poll(&mut cx) { + events.push(event); + } + + // If there are events, set completed to false to continue polling. + if !events.is_empty() { + completed = false; + + for event in events { + // Intercept the events that should be processed by the handler. + if let ToSwarm::NotifyHandler { + peer_id, + handler: NotifyHandler::One(connection_id), + event: + Either::Left(BehaviourEventToHandler::OutgoingMessage { message }), + } = event + { + // Deliver the message to the appropriate peer's handler. + deliver_message_to_peer( + &mut all_behaviours, + peer_id, + connection_id, + message.clone(), + ); + } + } + } + } + + // Filter out behaviors that no longer generate events. + pending_behaviours.retain(|i| { + let mut events = vec![]; + let behaviour = &mut all_behaviours[*i]; + while let Poll::Ready(event) = behaviour.poll(&mut cx) { + events.push(event); + } + !events.is_empty() + }); + } + + // Verify that all peers in subnet 0 have received the message, and others have not. + let (subnet_0_behaviours, other_behaviours): (Vec<_>, Vec<_>) = + all_behaviours.iter().partition(|behaviour| { + behaviour + .membership + .membership(&behaviour.local_peer_id) + .contains(&0) + }); + + // Assert that all members of subnet 0 have received the message. + for behaviour in &subnet_0_behaviours { + assert!(behaviour + .seen_message_cache + .contains(&(vec![1, 2, 3], message.subnetwork_id))); + } + + // Assert that no members of other subnets have received the message. + for behaviour in &other_behaviours { + assert!(behaviour.seen_message_cache.is_empty()); + } + + // Ensure the number of peers with the message matches the expected count + assert_eq!(subnet_0_behaviours.len(), num_instances / 2); + } +}