Merge branch 'master' into chore-da-integration-tests

This commit is contained in:
Roman Zajic 2024-10-04 09:45:04 +08:00 committed by GitHub
commit bc180efcd1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 249 additions and 66 deletions

View File

@ -26,7 +26,6 @@ services:
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- cfgsync
- graylog
entrypoint: /etc/nomos/scripts/run_nomos_node.sh
nomos-node-1:
@ -40,7 +39,6 @@ services:
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- cfgsync
- graylog
ports:
- "3001:3000/udp"
- "18081:18080/tcp"
@ -57,7 +55,6 @@ services:
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- cfgsync
- graylog
ports:
- "3002:3000/udp"
- "18082:18080/tcp"
@ -74,7 +71,6 @@ services:
- ./tests/kzgrs/kzgrs_test_params:/kzgrs_test_params:z
depends_on:
- cfgsync
- graylog
ports:
- "3003:3000/udp"
- "18083:18080/tcp"
@ -105,64 +101,3 @@ services:
restart: on-failure
depends_on:
- prometheus
# Graylog related configuration
# More information at https://github.com/Graylog2/docker-compose/blob/main/open-core/docker-compose.yml
mongodb:
image: "mongo:5.0"
volumes:
- "mongodb_data:/data/db"
restart: "on-failure"
datanode:
image: "${DATANODE_IMAGE:-graylog/graylog-datanode:5.2}"
hostname: "datanode"
environment:
GRAYLOG_DATANODE_NODE_ID_FILE: "/var/lib/graylog-datanode/node-id"
GRAYLOG_DATANODE_PASSWORD_SECRET: "${GRAYLOG_PASSWORD_SECRET:?Please configure GRAYLOG_PASSWORD_SECRET in the .env file}"
GRAYLOG_DATANODE_ROOT_PASSWORD_SHA2: "${GRAYLOG_ROOT_PASSWORD_SHA2:?Please configure GRAYLOG_ROOT_PASSWORD_SHA2 in the .env file}"
GRAYLOG_DATANODE_MONGODB_URI: "mongodb://mongodb:27017/graylog"
ulimits:
memlock:
hard: -1
soft: -1
nofile:
soft: 65536
hard: 65536
ports:
- "8999:8999/tcp" # DataNode API
- "9200:9200/tcp"
- "9300:9300/tcp"
volumes:
- "graylog-datanode:/var/lib/graylog-datanode"
restart: "on-failure"
graylog:
image: "${GRAYLOG_IMAGE:-graylog/graylog:5.2}"
depends_on:
mongodb:
condition: "service_started"
entrypoint: "/usr/bin/tini -- /docker-entrypoint.sh"
environment:
GRAYLOG_NODE_ID_FILE: "/usr/share/graylog/data/data/node-id"
GRAYLOG_PASSWORD_SECRET: "${GRAYLOG_PASSWORD_SECRET:?Please configure GRAYLOG_PASSWORD_SECRET in the .env file}"
GRAYLOG_ROOT_PASSWORD_SHA2: "${GRAYLOG_ROOT_PASSWORD_SHA2:?Please configure GRAYLOG_ROOT_PASSWORD_SHA2 in the .env file}"
GRAYLOG_HTTP_BIND_ADDRESS: "0.0.0.0:9000"
GRAYLOG_HTTP_EXTERNAL_URI: "http://localhost:9000/"
GRAYLOG_MONGODB_URI: "mongodb://mongodb:27017/graylog"
ports:
- "9000:9000/tcp" # Server API
- "12201:12201/tcp" # GELF TCP
- "12201:12201/udp" # GELF UDP
volumes:
- "graylog_data:/usr/share/graylog/data/data"
- "graylog_journal:/usr/share/graylog/data/journal"
- ./testnet/monitoring/graylog.conf:/usr/share/graylog/data/config/graylog.conf
restart: "on-failure"
volumes:
mongodb_data:
graylog-datanode:
graylog_data:
graylog_journal:

View File

@ -180,6 +180,9 @@ pub fn update_log(log: &mut <Logger as ServiceData>::Settings, log_args: LogArgs
if let Some(level_str) = level {
log.level = match level_str.as_str() {
"DEBUG" => Level::DEBUG,
"INFO" => Level::INFO,
"ERROR" => Level::ERROR,
"WARN" => Level::WARN,
_ => return Err(eyre!("Invalid log level provided.")),
};
}

View File

@ -232,3 +232,247 @@ where
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::task::{waker_ref, ArcWake};
use libp2p::{identity, PeerId};
use nomos_da_messages::common::Blob;
use std::collections::HashSet;
use std::sync::Arc;
use std::task::{Context, Poll};
#[derive(Clone, Debug)]
struct MockMembershipHandler {
membership: HashMap<PeerId, HashSet<SubnetworkId>>,
}
impl MembershipHandler for MockMembershipHandler {
type NetworkId = SubnetworkId;
type Id = PeerId;
fn membership(&self, peer_id: &PeerId) -> HashSet<Self::NetworkId> {
self.membership.get(peer_id).cloned().unwrap_or_default()
}
fn members_of(&self, subnetwork: &Self::NetworkId) -> HashSet<Self::Id> {
self.membership
.iter()
.filter_map(|(id, nets)| {
if nets.contains(subnetwork) {
Some(*id)
} else {
None
}
})
.collect()
}
fn is_allowed(&self, _id: &Self::Id) -> bool {
unimplemented!()
}
fn members(&self) -> HashSet<Self::Id> {
unimplemented!()
}
}
struct TestWaker;
impl ArcWake for TestWaker {
fn wake_by_ref(_arc_self: &Arc<Self>) {}
}
fn create_replication_behaviours(
num_instances: usize,
subnet_id: u32,
membership: &mut HashMap<PeerId, HashSet<SubnetworkId>>,
) -> Vec<ReplicationBehaviour<MockMembershipHandler>> {
let mut behaviours = Vec::new();
let mut peer_ids = Vec::new();
for _ in 0..num_instances {
let keypair = identity::Keypair::generate_ed25519();
let peer_id = PeerId::from(keypair.public());
peer_ids.push(peer_id);
}
for peer_id in &peer_ids {
membership.insert(*peer_id, HashSet::from([subnet_id]));
}
let membership_handler = MockMembershipHandler {
membership: HashMap::default(), // This will be updated after all behaviours are added.
};
for peer_id in peer_ids {
let behaviour = ReplicationBehaviour::new(peer_id, membership_handler.clone());
behaviours.push(behaviour);
}
behaviours
}
fn establish_connection(
behaviours: &mut [ReplicationBehaviour<MockMembershipHandler>],
i: usize,
j: usize,
connection_id: ConnectionId,
) {
let peer_id_i = behaviours[i].local_peer_id;
let peer_id_j = behaviours[j].local_peer_id;
behaviours[i]
.handle_established_outbound_connection(
connection_id,
peer_id_j,
&Multiaddr::empty(),
Endpoint::Dialer,
)
.unwrap();
behaviours[j]
.handle_established_inbound_connection(
connection_id,
peer_id_i,
&Multiaddr::empty(),
&Multiaddr::empty(),
)
.unwrap();
}
fn deliver_message_to_peer(
all_behaviours: &mut [ReplicationBehaviour<MockMembershipHandler>],
peer_id: PeerId,
connection_id: ConnectionId,
message: DaMessage,
) {
if let Some(behaviour) = all_behaviours
.iter_mut()
.find(|b| b.local_peer_id == peer_id)
{
// Simulate the handler receiving the message.
behaviour.on_connection_handler_event(
peer_id,
connection_id,
Either::Left(HandlerEventToBehaviour::IncomingMessage { message }),
);
}
}
#[tokio::test]
async fn test_replication_behaviour() {
let num_instances = 20;
let mut membership = HashMap::default();
let subnet_0_behaviours =
create_replication_behaviours(num_instances / 2, 0, &mut membership);
let subnet_1_behaviours =
create_replication_behaviours(num_instances / 2, 1, &mut membership);
let mut all_behaviours = subnet_0_behaviours;
all_behaviours.extend(subnet_1_behaviours);
for behaviour in all_behaviours.iter_mut() {
let membership_handler = MockMembershipHandler {
membership: membership.clone(),
};
behaviour.update_membership(membership_handler);
}
// Simulate peer connections.
for (i, j) in (0..num_instances).flat_map(|i| (i + 1..num_instances).map(move |j| (i, j))) {
let connection_id = ConnectionId::new_unchecked(i);
establish_connection(&mut all_behaviours, i, j, connection_id);
}
// Simulate sending a message from the first behavior.
let message = DaMessage {
blob: Some(Blob {
blob_id: vec![1, 2, 3],
data: vec![4, 5, 6],
}),
subnetwork_id: 0,
};
all_behaviours[0].replicate_message(message.clone());
let waker = Arc::new(TestWaker);
let waker_ref = waker_ref(&waker);
let mut cx = Context::from_waker(&waker_ref);
// Poll all behaviors until no more events are generated.
let mut pending_behaviours: Vec<_> = (0..num_instances).collect();
let mut completed = false;
while !completed {
completed = true;
for i in &pending_behaviours {
let behaviour = &mut all_behaviours[*i];
let mut events = vec![];
while let Poll::Ready(event) = behaviour.poll(&mut cx) {
events.push(event);
}
// If there are events, set completed to false to continue polling.
if !events.is_empty() {
completed = false;
for event in events {
// Intercept the events that should be processed by the handler.
if let ToSwarm::NotifyHandler {
peer_id,
handler: NotifyHandler::One(connection_id),
event:
Either::Left(BehaviourEventToHandler::OutgoingMessage { message }),
} = event
{
// Deliver the message to the appropriate peer's handler.
deliver_message_to_peer(
&mut all_behaviours,
peer_id,
connection_id,
message.clone(),
);
}
}
}
}
// Filter out behaviors that no longer generate events.
pending_behaviours.retain(|i| {
let mut events = vec![];
let behaviour = &mut all_behaviours[*i];
while let Poll::Ready(event) = behaviour.poll(&mut cx) {
events.push(event);
}
!events.is_empty()
});
}
// Verify that all peers in subnet 0 have received the message, and others have not.
let (subnet_0_behaviours, other_behaviours): (Vec<_>, Vec<_>) =
all_behaviours.iter().partition(|behaviour| {
behaviour
.membership
.membership(&behaviour.local_peer_id)
.contains(&0)
});
// Assert that all members of subnet 0 have received the message.
for behaviour in &subnet_0_behaviours {
assert!(behaviour
.seen_message_cache
.contains(&(vec![1, 2, 3], message.subnetwork_id)));
}
// Assert that no members of other subnets have received the message.
for behaviour in &other_behaviours {
assert!(behaviour.seen_message_cache.is_empty());
}
// Ensure the number of peers with the message matches the expected count
assert_eq!(subnet_0_behaviours.len(), num_instances / 2);
}
}

View File

@ -5,7 +5,8 @@ set -e
export CFG_FILE_PATH="/config.yaml" \
CFG_SERVER_ADDR="http://cfgsync:4400" \
CFG_HOST_IP=$(hostname -i) \
LOG_LEVEL="INFO" \
RISC0_DEV_MODE=true
/usr/bin/cfgsync-client && \
exec /usr/bin/nomos-node /config.yaml --with-metrics --log-backend gelf --log-addr graylog:12201
exec /usr/bin/nomos-node /config.yaml --with-metrics