Replication behavior membership test (#805)

This commit is contained in:
gusto 2024-10-03 20:37:13 +03:00 committed by GitHub
parent d7fe88f30b
commit d17afcbe4d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 244 additions and 0 deletions

View File

@ -232,3 +232,247 @@ where
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::task::{waker_ref, ArcWake};
use libp2p::{identity, PeerId};
use nomos_da_messages::common::Blob;
use std::collections::HashSet;
use std::sync::Arc;
use std::task::{Context, Poll};
#[derive(Clone, Debug)]
struct MockMembershipHandler {
membership: HashMap<PeerId, HashSet<SubnetworkId>>,
}
impl MembershipHandler for MockMembershipHandler {
type NetworkId = SubnetworkId;
type Id = PeerId;
fn membership(&self, peer_id: &PeerId) -> HashSet<Self::NetworkId> {
self.membership.get(peer_id).cloned().unwrap_or_default()
}
fn members_of(&self, subnetwork: &Self::NetworkId) -> HashSet<Self::Id> {
self.membership
.iter()
.filter_map(|(id, nets)| {
if nets.contains(subnetwork) {
Some(*id)
} else {
None
}
})
.collect()
}
fn is_allowed(&self, _id: &Self::Id) -> bool {
unimplemented!()
}
fn members(&self) -> HashSet<Self::Id> {
unimplemented!()
}
}
struct TestWaker;
impl ArcWake for TestWaker {
fn wake_by_ref(_arc_self: &Arc<Self>) {}
}
fn create_replication_behaviours(
num_instances: usize,
subnet_id: u32,
membership: &mut HashMap<PeerId, HashSet<SubnetworkId>>,
) -> Vec<ReplicationBehaviour<MockMembershipHandler>> {
let mut behaviours = Vec::new();
let mut peer_ids = Vec::new();
for _ in 0..num_instances {
let keypair = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(keypair.public());
peer_ids.push(peer_id);
}
for peer_id in &peer_ids {
membership.insert(*peer_id, HashSet::from([subnet_id]));
}
let membership_handler = MockMembershipHandler {
membership: HashMap::default(), // This will be updated after all behaviours are added.
};
for peer_id in peer_ids {
let behaviour = ReplicationBehaviour::new(peer_id, membership_handler.clone());
behaviours.push(behaviour);
}
behaviours
}
fn establish_connection(
behaviours: &mut [ReplicationBehaviour<MockMembershipHandler>],
i: usize,
j: usize,
connection_id: ConnectionId,
) {
let peer_id_i = behaviours[i].local_peer_id;
let peer_id_j = behaviours[j].local_peer_id;
behaviours[i]
.handle_established_outbound_connection(
connection_id,
peer_id_j,
&Multiaddr::empty(),
Endpoint::Dialer,
)
.unwrap();
behaviours[j]
.handle_established_inbound_connection(
connection_id,
peer_id_i,
&Multiaddr::empty(),
&Multiaddr::empty(),
)
.unwrap();
}
fn deliver_message_to_peer(
all_behaviours: &mut [ReplicationBehaviour<MockMembershipHandler>],
peer_id: PeerId,
connection_id: ConnectionId,
message: DaMessage,
) {
if let Some(behaviour) = all_behaviours
.iter_mut()
.find(|b| b.local_peer_id == peer_id)
{
// Simulate the handler receiving the message.
behaviour.on_connection_handler_event(
peer_id,
connection_id,
Either::Left(HandlerEventToBehaviour::IncomingMessage { message }),
);
}
}
#[tokio::test]
async fn test_replication_behaviour() {
let num_instances = 20;
let mut membership = HashMap::default();
let subnet_0_behaviours =
create_replication_behaviours(num_instances / 2, 0, &mut membership);
let subnet_1_behaviours =
create_replication_behaviours(num_instances / 2, 1, &mut membership);
let mut all_behaviours = subnet_0_behaviours;
all_behaviours.extend(subnet_1_behaviours);
for behaviour in all_behaviours.iter_mut() {
let membership_handler = MockMembershipHandler {
membership: membership.clone(),
};
behaviour.update_membership(membership_handler);
}
// Simulate peer connections.
for (i, j) in (0..num_instances).flat_map(|i| (i + 1..num_instances).map(move |j| (i, j))) {
let connection_id = ConnectionId::new_unchecked(i);
establish_connection(&mut all_behaviours, i, j, connection_id);
}
// Simulate sending a message from the first behavior.
let message = DaMessage {
blob: Some(Blob {
blob_id: vec![1, 2, 3],
data: vec![4, 5, 6],
}),
subnetwork_id: 0,
};
all_behaviours[0].replicate_message(message.clone());
let waker = Arc::new(TestWaker);
let waker_ref = waker_ref(&waker);
let mut cx = Context::from_waker(&waker_ref);
// Poll all behaviors until no more events are generated.
let mut pending_behaviours: Vec<_> = (0..num_instances).collect();
let mut completed = false;
while !completed {
completed = true;
for i in &pending_behaviours {
let behaviour = &mut all_behaviours[*i];
let mut events = vec![];
while let Poll::Ready(event) = behaviour.poll(&mut cx) {
events.push(event);
}
// If there are events, set completed to false to continue polling.
if !events.is_empty() {
completed = false;
for event in events {
// Intercept the events that should be processed by the handler.
if let ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event:
Either::Left(BehaviourEventToHandler::OutgoingMessage { message }),
} = event
{
// Deliver the message to the appropriate peer's handler.
deliver_message_to_peer(
&mut all_behaviours,
peer_id,
connection_id,
message.clone(),
);
}
}
}
}
// Filter out behaviors that no longer generate events.
pending_behaviours.retain(|i| {
let mut events = vec![];
let behaviour = &mut all_behaviours[*i];
while let Poll::Ready(event) = behaviour.poll(&mut cx) {
events.push(event);
}
!events.is_empty()
});
}
// Verify that all peers in subnet 0 have received the message, and others have not.
let (subnet_0_behaviours, other_behaviours): (Vec<_>, Vec<_>) =
all_behaviours.iter().partition(|behaviour| {
behaviour
.membership
.membership(&behaviour.local_peer_id)
.contains(&0)
});
// Assert that all members of subnet 0 have received the message.
for behaviour in &subnet_0_behaviours {
assert!(behaviour
.seen_message_cache
.contains(&(vec![1, 2, 3], message.subnetwork_id)));
}
// Assert that no members of other subnets have received the message.
for behaviour in &other_behaviours {
assert!(behaviour.seen_message_cache.is_empty());
}
// Ensure the number of peers with the message matches the expected count
assert_eq!(subnet_0_behaviours.len(), num_instances / 2);
}
}