diff --git a/core/integration_tests_core/Cargo.toml b/core/integration_tests_core/Cargo.toml index 0f4bde8..8a5cabf 100644 --- a/core/integration_tests_core/Cargo.toml +++ b/core/integration_tests_core/Cargo.toml @@ -6,13 +6,22 @@ edition = "2024" # [[test]] # name = "integration_tests_core" -[dev-dependencies] +[dependencies] # Workspace dependencies (sorted) chat-sqlite = { workspace = true } components = { workspace = true } libchat = { workspace = true } -logos-account = { workspace = true , features = ["dev"]} +logos-account = { workspace = true, features = ["dev"]} +shared-traits = { workspace = true } + +# External dependencies (sorted) +tracing = "0.1" + +[dev-dependencies] +chat-sqlite = { workspace = true } storage = { workspace = true } # External dependencies (sorted) tempfile = "3" +tracing = "0.1.44" +tracing-subscriber = "0.3" diff --git a/core/integration_tests_core/src/lib.rs b/core/integration_tests_core/src/lib.rs index 8b13789..0c2e590 100644 --- a/core/integration_tests_core/src/lib.rs +++ b/core/integration_tests_core/src/lib.rs @@ -1 +1,4 @@ +mod test_client; +mod wakeup; +pub use test_client::TestHarness; diff --git a/core/integration_tests_core/src/test_client.rs b/core/integration_tests_core/src/test_client.rs new file mode 100644 index 0000000..62b7d8f --- /dev/null +++ b/core/integration_tests_core/src/test_client.rs @@ -0,0 +1,333 @@ +use libchat::{ConversationId, Core, IdentityProvider, PayloadOutcome}; +use logos_account::TestLogosAccount; +use shared_traits::IdentId; +use std::collections::HashMap; +use std::fmt::Debug; +use std::ops::{Deref, DerefMut}; +use std::time::Duration; +use tracing::{info, warn}; + +use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; + +use crate::wakeup::{TestWakeupProvider, TestWakeupService, WakeupRecord}; + +type WS = TestWakeupService; +type WP = TestWakeupProvider; + +const SARO: usize = 0; +const RAYA: usize = 1; +const PAX: usize = 2; +const MIRA: usize = 3; + +// type ClientType = CoreClient; +type ClientType = Core<( + TestLogosAccount, + LocalBroadcaster, + EphemeralRegistry, + WP, + MemStore, +)>; + +#[derive(Debug)] +pub struct ReceivedMessage { + pub convo_id: ConversationId, + pub contents: T, +} + +pub struct TestClient { + inner: ClientType, + received_messages: Vec>>, +} + +impl TestClient { + fn init(client: ClientType) -> Self { + Self { + inner: client, + received_messages: vec![], + } + } + + pub fn addr(&self) -> IdentId { + self.inner.ident_id().clone() + } + + fn drain_outcomes(&mut self) -> Vec { + let mut messages = vec![]; + while let Some(data) = self.inner.ds().poll() { + messages.push(data); + } + + let mut outcomes = vec![]; + for data in messages { + let outcome = self.inner.handle_payload(&data).unwrap(); + warn!(id= ?self.ident_id(),?outcome, "DRAIN CLIENT"); + // Copy Convo Messages to received buffer + + match &outcome { + PayloadOutcome::Empty => continue, + PayloadOutcome::Convo(convo_outcome) => { + if let Some(data) = &convo_outcome.content { + info!( + content = String::from_utf8_lossy(&data.bytes).to_string(), + "COT" + ); + self.received_messages.push(ReceivedMessage { + convo_id: convo_outcome.convo_id.clone(), + contents: data.bytes.clone(), + }); + } + } + PayloadOutcome::Inbox(_) => {} + } + + if !matches!(outcome, PayloadOutcome::Empty) { + outcomes.push(outcome); + } + } + outcomes + } + + pub fn received_messages(&self) -> &[ReceivedMessage>] { + &self.received_messages + } + + pub fn check(&self, convo_id: &str, content: &[u8]) -> bool { + for msg in &self.received_messages { + if msg.convo_id == convo_id && msg.contents == content { + return true; + } + } + false + } + + pub fn convo_count(&self) -> usize { + self.list_conversations().map_or(0, |v| v.len()) + } +} + +impl Deref for TestClient { + type Target = ClientType; + + fn deref(&self) -> &Self::Target { + &self.inner + } +} + +impl DerefMut for TestClient { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.inner + } +} + +#[allow(unused)] +pub struct Observation { + ident: IdentId, + outcome: PayloadOutcome, +} + +#[allow(unused)] +pub struct TestHarness { + addresses: HashMap, + clients: Vec, + wakeup_service: WS, + cb: Box, + // List of outcomes that were detected across all clients. + pub observed_outcomes: Vec, +} + +impl TestHarness { + pub fn new(cb: impl Fn(&TestClient, PayloadOutcome) + 'static) -> Self { + const { assert!(N > 0, "TestHarness requires at least one client") }; + const { assert!(N <= 4, "Only 4 clients are supported(Soft Limit") }; + + let mut clients = vec![]; + let mut addresses = HashMap::new(); + + let ds = LocalBroadcaster::new(); + let rs = EphemeralRegistry::new(); + let ws = TestWakeupService::new(); + + for i in 0..N { + let wp = ws.new_provider(i); + let ident = TestLogosAccount::new(Self::names(i)); + + addresses.insert(i, ident.id().clone()); + let core_client = + ClientType::new_with_name(ident, ds.clone(), rs.clone(), wp, MemStore::new()) + .unwrap(); + + let client = TestClient::init(core_client); + + clients.push(client); + } + + dbg!(&rs); + + Self { + addresses, + clients, + wakeup_service: ws, + cb: Box::new(cb), + observed_outcomes: vec![], + } + } + + pub fn client(&mut self, i: usize) -> &TestClient { + &self.clients[i] + } + + pub fn client_mut(&mut self, i: usize) -> &mut TestClient { + &mut self.clients[i] + } + + fn names(i: usize) -> &'static str { + match i { + SARO => "saro", + RAYA => "raya", + PAX => "pax", + MIRA => "mira", + _ => "unnamed", + } + } + + pub fn process(&mut self, duration: Duration) { + self.process_payloads(); + + let records = self.wakeup_service.advance_time(duration); + self.process_records(records); + } + + pub fn process_until(&mut self, predicate: impl Fn(&mut TestHarness) -> bool) { + let timeout = Duration::from_mins(1); + let step = Duration::from_millis(50); + let mut elapsed = Duration::ZERO; + + while !predicate(self) { + if elapsed >= timeout { + panic!("process_until timed out after {:?}", timeout); + } + self.process(step); + elapsed += step; + } + } + + pub fn process_until_label( + &mut self, + label: &str, + predicate: impl Fn(&mut TestHarness) -> bool, + ) { + info!(label, "Process Until"); + self.process_until(predicate); + } + + fn process_payloads(&mut self) { + // Process existing payloads for all clients. + for client in self.clients.iter_mut() { + for outcome in client.drain_outcomes() { + info!(id = ?client.ident_id(), ?outcome, "Process drain"); + self.observed_outcomes.push(Observation { + ident: client.ident_id().clone(), + outcome: outcome.clone(), + }); + info!(id = ?client.ident_id(), ?outcome, "Process drain"); + (self.cb)(client, outcome) + } + } + } + + fn process_records(&mut self, records: Vec) { + for record in records { + self.clients[record.client_index] + .wakeup(&record.convo_id) + .expect("Error During wakeup"); + } + } +} + +// Avoid Developer confusion by gating access functions +// based on the number of clients in the harness + +impl TestHarness<1> { + pub fn saro(&mut self) -> &mut TestClient { + &mut self.clients[SARO] + } +} + +impl TestHarness<2> { + pub fn saro(&mut self) -> &mut TestClient { + &mut self.clients[SARO] + } + + pub fn raya(&mut self) -> &mut TestClient { + &mut self.clients[RAYA] + } +} + +impl TestHarness<3> { + pub fn saro(&mut self) -> &mut TestClient { + &mut self.clients[SARO] + } + + pub fn raya(&mut self) -> &mut TestClient { + &mut self.clients[RAYA] + } + + pub fn pax(&mut self) -> &mut TestClient { + &mut self.clients[PAX] + } +} + +impl TestHarness<4> { + pub fn saro(&mut self) -> &mut TestClient { + &mut self.clients[SARO] + } + + pub fn raya(&mut self) -> &mut TestClient { + &mut self.clients[RAYA] + } + + pub fn pax(&mut self) -> &mut TestClient { + &mut self.clients[PAX] + } + + pub fn mira(&mut self) -> &mut TestClient { + &mut self.clients[MIRA] + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + let mut harness = TestHarness::<2>::new(|client, outcome| { + info!( id=?&client.ident_id(), outcome = ?outcome, "Result"); + }); + + //Create Convo + let particpants = &[&harness.raya().addr()]; + let convo_id = harness + .saro() + .create_group_convo(particpants) + .expect("saro create group"); + + harness.process_until_label("Raya Join", |h| h.raya().convo_count() == 1); + + assert_eq!(harness.raya().convo_count(), 1, "raya did not join"); + + harness + .saro() + .send_content(convo_id.as_str(), b"Hello") + .expect("raya send"); + + harness.process(Duration::from_millis(200)); + + assert!(harness.raya().check(&convo_id, b"Hello")) + } +} diff --git a/core/integration_tests_core/src/wakeup.rs b/core/integration_tests_core/src/wakeup.rs new file mode 100644 index 0000000..abcb125 --- /dev/null +++ b/core/integration_tests_core/src/wakeup.rs @@ -0,0 +1,176 @@ +use libchat::{ConversationId, WakeupService}; +use std::cell::RefCell; +use std::cmp::Reverse; +use std::collections::BinaryHeap; +use std::fmt::Debug; +use std::rc::Rc; +use std::time::Duration; +use tracing::{info, trace}; + +#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)] +pub(crate) struct WakeupRecord { + pub expiry: Duration, + pub client_index: usize, + pub convo_id: String, +} + +pub struct TestWakeupProvider { + service: Rc>, + client_index: usize, +} + +impl Debug for TestWakeupProvider { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("TestWakeupProvider") + .field("client_index", &self.client_index) + .finish() + } +} + +impl TestWakeupProvider { + pub fn new(service: Rc>, id: usize) -> Self { + Self { + service, + client_index: id, + } + } +} + +impl WakeupService for TestWakeupProvider { + fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId) { + info!(?duration, convo_id, "Wakeup In"); + self.service + .borrow_mut() + .register_wakeup(duration, self.client_index, convo_id); + } +} + +pub struct InnerWakeupService { + now: Duration, + pending: BinaryHeap>, +} + +impl InnerWakeupService { + pub fn new() -> Self { + Self { + now: Duration::new(0, 0), + pending: BinaryHeap::new(), + } + } + + pub fn register_wakeup(&mut self, wake_in: Duration, client_index: usize, convo_id: String) { + info!(%client_index, ?wake_in, "ask for wake up"); + self.pending.push(Reverse(WakeupRecord { + expiry: self.now + wake_in, + client_index, + convo_id, + })); + } + + fn get_expired(&mut self) -> Vec { + trace!("Get Expired"); + let mut fired = vec![]; + + while self + .pending + .peek() + .is_some_and(|Reverse(w)| w.expiry <= self.now) + { + let Reverse(w) = self.pending.pop().unwrap(); + info!(now = self.now.as_secs(), w.convo_id, "Popping"); + fired.push(w); + } + + fired + } +} + +pub struct TestWakeupService { + inner: Rc>, +} + +impl Debug for TestWakeupService { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let srv = self.inner.borrow_mut(); + + f.debug_struct("TestWakeupService") + .field("heap", &srv.pending) + .finish() + } +} + +impl TestWakeupService { + pub fn new() -> Self { + Self { + inner: Rc::new(RefCell::new(InnerWakeupService::new())), + } + } + + pub fn new_provider(&self, id: usize) -> TestWakeupProvider { + TestWakeupProvider { + service: self.inner.clone(), + client_index: id, + } + } + + // Returns the ConvoIDs that triggered in order + pub fn advance_time(&mut self, duration: Duration) -> Vec { + let mut srv = self.inner.borrow_mut(); + trace!(?duration, "Advanced"); + // de-mls deadlines are real wall-clock; sleep so the millisecond-scale + // commit/consensus timers actually elapse between poll cycles + // Note: This is error prone as WakeupService tracks its own `now` variable. Does not account for processing time. + std::thread::sleep(duration); + + srv.now = srv.now.checked_add(duration).unwrap(); + srv.get_expired() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_wakeup_service() { + let _ = tracing_subscriber::fmt().with_test_writer().try_init(); + + let mut ws = TestWakeupService::new(); + + let mut p1 = ws.new_provider(1); + let mut p2 = ws.new_provider(2); + + p1.wakeup_in(Duration::from_secs(2), "convo1".into()); + p1.wakeup_in(Duration::from_secs(4), "convo1".into()); + + p2.wakeup_in(Duration::from_secs(5), "convo1".into()); + p2.wakeup_in(Duration::from_secs(4), "convo1".into()); + + { + let batch = ws.advance_time(Duration::from_secs(2)); + assert_eq!(batch.len(), 1, "too many records"); + assert_eq!(batch[0].client_index, 1, "client mismatch"); + } + + { + let batch = ws.advance_time(Duration::from_secs(2)); + assert_eq!(batch.len(), 2, "too many records"); + assert_eq!( + batch[0].client_index, 1, + "client 1 shoudld be first, as it was entered first" + ); + assert_eq!(batch[1].client_index, 2, "client 2 should be second"); + } + + { + let batch = ws.advance_time(Duration::from_secs(1)); + assert_eq!(batch.len(), 1, "too many records"); + assert_eq!(batch[0].client_index, 2, "client mismatch"); + } + + { + let batch = ws.advance_time(Duration::from_secs(1)); + assert_eq!(batch.len(), 0, "records should be completely drained"); + } + } +} diff --git a/core/integration_tests_core/tests/causal_history.rs b/core/integration_tests_core/tests/causal_history.rs index 3b244ae..4757d55 100644 --- a/core/integration_tests_core/tests/causal_history.rs +++ b/core/integration_tests_core/tests/causal_history.rs @@ -107,7 +107,7 @@ fn missing_group_message_is_detected() { // Saro creates a group with Raya. let raya_id = raya.ident_id().clone(); - let convo_id = saro.create_group_convo(&[&raya_id]).unwrap().to_string(); + let convo_id = saro.create_group_convo_v1(&[&raya_id]).unwrap().to_string(); // Raya joins (processes the Welcome + commit). raya.process_messages(); diff --git a/core/integration_tests_core/tests/mls_integration.rs b/core/integration_tests_core/tests/mls_integration.rs index 9f578aa..d80e690 100644 --- a/core/integration_tests_core/tests/mls_integration.rs +++ b/core/integration_tests_core/tests/mls_integration.rs @@ -1,212 +1,60 @@ -use std::ops::{Deref, DerefMut}; - -use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; -use libchat::{ - Content, ConversationClass, ConvoOutcome, Core, NewConversation, PayloadOutcome, hex_trunc, -}; -use logos_account::TestLogosAccount; - -type ResultCallback = Box; - -// Simple client Functionality for testing -struct Client { - inner: Core<( - TestLogosAccount, - LocalBroadcaster, - EphemeralRegistry, - MemStore, - )>, - on_result: Option, - new_conversations: Vec, - received_messages: Vec<(libchat::ConversationId, Content)>, -} - -impl Client { - fn init( - core: Core<( - TestLogosAccount, - LocalBroadcaster, - EphemeralRegistry, - MemStore, - )>, - cb: Option, - ) -> Self { - Client { - inner: core, - on_result: cb.map(|f| Box::new(f) as ResultCallback), - new_conversations: Vec::new(), - received_messages: Vec::new(), - } - } - - fn process_messages(&mut self) { - let payloads: Vec<_> = { - let ds = self.ds(); - std::iter::from_fn(|| ds.poll()).collect() - }; - - for data in payloads { - let result = self.handle_payload(&data).unwrap(); - if let Some(cb) = &self.on_result { - cb(&result); - } - match result { - PayloadOutcome::Empty => {} - PayloadOutcome::Convo(co) => self.absorb_convo_outcome(co), - PayloadOutcome::Inbox(io) => { - self.new_conversations.push(io.new_conversation); - if let Some(initial) = io.initial { - self.absorb_convo_outcome(initial); - } - } - } - } - } - - fn absorb_convo_outcome(&mut self, outcome: ConvoOutcome) { - if let Some(content) = outcome.content { - self.received_messages.push((outcome.convo_id, content)); - } - } -} - -impl Deref for Client { - type Target = Core<( - TestLogosAccount, - LocalBroadcaster, - EphemeralRegistry, - MemStore, - )>; - - fn deref(&self) -> &Self::Target { - &self.inner - } -} - -impl DerefMut for Client { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.inner - } -} - -// Higher order function to handle printing -fn pretty_print(prefix: impl Into) -> ResultCallback { - let prefix = prefix.into(); - Box::new(move |result: &PayloadOutcome| match result { - PayloadOutcome::Empty => {} - PayloadOutcome::Inbox(io) => { - let cid = hex_trunc(io.new_conversation.convo_id.as_bytes()); - println!( - "{prefix} ({cid:?}) [conversation started: {:?}]", - io.new_conversation.class - ); - if let Some(initial) = &io.initial { - print_contents(&prefix, initial); - } - } - PayloadOutcome::Convo(co) => print_contents(&prefix, co), - }) -} - -fn print_contents(prefix: &str, outcome: &ConvoOutcome) { - let cid = hex_trunc(outcome.convo_id.as_bytes()); - if let Some(content) = &outcome.content { - let text = String::from_utf8_lossy(&content.bytes); - println!("{prefix} ({cid:?}) {text}"); - } -} - -fn process(clients: &mut Vec) { - for client in clients { - client.process_messages(); - } -} +use integration_tests_core::TestHarness; +use std::time::Duration; #[test] fn create_group() { - let ds = LocalBroadcaster::new(); - let rs = EphemeralRegistry::new(); + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); - let saro_ident = TestLogosAccount::new("saro"); - let saro = - Core::new_with_name(saro_ident, ds.new_consumer(), rs.clone(), MemStore::new()).unwrap(); + let mut harness = TestHarness::<3>::new(|_, _| {}); - let raya_ident = TestLogosAccount::new("raya"); - let raya = Core::new_with_name(raya_ident, ds.clone(), rs.clone(), MemStore::new()).unwrap(); + let raya_id = harness.raya().ident_id().clone(); + let pax_id = harness.pax().ident_id().clone(); - let mut clients = vec![ - Client::init(saro, Some(pretty_print(" Saro "))), - Client::init(raya, Some(pretty_print(" Raya "))), - ]; + const M_R1: &[u8; 12] = b"Hi From Raya"; + const M_P1: &[u8; 13] = b"Hey it's Pax!"; - const SARO: usize = 0; - const RAYA: usize = 1; + // Step: Saro Create Convo with Raya - let raya_id = clients[RAYA].ident_id().clone(); - let convo_id = clients[SARO] - .create_group_convo(&[&raya_id]) - .unwrap() - .to_string(); + let convo_id = harness + .saro() + .create_group_convo_v1(&[&raya_id]) + .expect("Saro invite Raya "); + harness.process_until(|h| h.raya().list_conversations().unwrap().len() == 1); - // Raya can read this message because - // 1) It was sent after add_members was committed, and - // 2) LocalBroadcaster provides historical messages. + // Step: Raya Send Content - clients[SARO] - .send_content(&convo_id, b"ok who broke the group chat again") - .unwrap(); + harness + .raya() + .send_content(&convo_id, M_R1) + .expect("Raya send Msg"); - process(&mut clients); + harness.process_until(|h| h.saro().received_messages().len() == 1); - // Raya should observe exactly one new Group conversation from the - // welcome, even though no initial content arrives with it. - let raya_started = clients[RAYA] - .new_conversations - .iter() - .filter(|nc| matches!(nc.class, ConversationClass::Group)) - .count(); - assert_eq!( - raya_started, 1, - "Raya should have observed exactly one new Group conversation for the welcome" - ); + // Step: Saro add Pax - clients[RAYA] - .send_content(&convo_id, b"it was literally working five minutes ago") - .unwrap(); - - process(&mut clients); - - let pax_ident = TestLogosAccount::new("pax"); - let pax = Core::new_with_name(pax_ident, ds, rs, MemStore::new()).unwrap(); - clients.push(Client::init(pax, Some(pretty_print(" Pax")))); - const PAX: usize = 2; - - let pax_id = clients[PAX].ident_id().clone(); - clients[SARO] + harness + .saro() .group_add_member(&convo_id, &[&pax_id]) - .unwrap(); + .expect("Saro invite pax"); + harness.process_until(|h| h.pax().list_conversations().unwrap().len() == 1); - process(&mut clients); + // Step: Pax send Content - let pax_started = clients[PAX] - .new_conversations - .iter() - .filter(|nc| matches!(nc.class, ConversationClass::Group)) - .count(); - assert_eq!( - pax_started, 1, - "Pax should have observed exactly one new Group conversation for the welcome" - ); + harness + .pax() + .send_content(&convo_id, M_P1) + .expect("Pax send"); + harness.process(Duration::from_millis(500)); - clients[PAX] - .send_content(&convo_id, b"ngl the key rotation is cooked") - .unwrap(); + assert!(harness.saro().check(&convo_id, M_R1)); + assert!(harness.saro().check(&convo_id, M_P1)); - process(&mut clients); + assert!(!harness.raya().check(&convo_id, M_R1)); + assert!(harness.raya().check(&convo_id, M_P1)); - clients[SARO] - .send_content(&convo_id, b"bro we literally just added you to the group ") - .unwrap(); - - process(&mut clients); + assert!(!harness.pax().check(&convo_id, M_R1)); + assert!(!harness.pax().check(&convo_id, M_P1)); } diff --git a/core/integration_tests_core/tests/test_group_v2.rs b/core/integration_tests_core/tests/test_group_v2.rs new file mode 100644 index 0000000..f000423 --- /dev/null +++ b/core/integration_tests_core/tests/test_group_v2.rs @@ -0,0 +1,178 @@ +use integration_tests_core::TestHarness; +use tracing::info; + +#[test] +fn groupv2_2way_roundtrip() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + const S_M1: &[u8] = b"aaaaa"; + const R_M1: &[u8] = b"Hello"; + + // Initialize TestHarness with 2 clients + let mut harness = TestHarness::<2>::new(|_, _| {}); + + //Saro Create Convo + let particpants = &[&harness.raya().addr()]; + let convo_id = harness + .saro() + .create_group_convo_v2(particpants) + .expect("saro create group"); + + // Carry the invite through (commit, WelcomeReady, routing to Raya's inbox, + // accept_welcome); settle until Raya has joined. + harness.process_until_label("Saro Send", |h| h.raya().convo_count() == 1); + + // Saro sends a message; settle until Raya receives it. + info!(target: "chat", "Saro -> sending: {S_M1:?}"); + harness + .saro() + .send_content(&convo_id, S_M1) + .expect("saro send"); + + harness.process_until(|h| h.raya().check(&convo_id, S_M1)); + + // Raya replies; settle until Saro receives it. + info!(target: "chat", "Raya -> sending:{R_M1:?}"); + harness.raya().send_content(&convo_id, R_M1).unwrap(); + harness.process_until(|h| h.saro().check(&convo_id, R_M1)); +} + +#[test] +fn core_client() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + const S_M1: &[u8] = b"HI"; + const R_M1: &[u8] = b"hi back"; + const S_M2: &[u8] = b"EPOCHCHK"; + + let mut harness = TestHarness::<3>::new(|_, _| {}); + + let particpants = &[&harness.raya().addr()]; + let convo_id = harness + .saro() + .create_group_convo_v2(particpants) + .expect("Saro create"); + + // Carry the invite through (commit, WelcomeReady, routing to Raya's inbox, + // accept_welcome); settle until Raya has joined. + harness.process_until_label("saro create", |h| h.raya().convo_count() == 1); + + // Saro sends a message; settle until Raya receives it. + info!(target: "chat", "Saro -> sending: {S_M1:?}"); + harness + .saro() + .send_content(&convo_id, S_M1) + .expect("saro send"); + + harness.process_until_label("Recv S_M1", |h| h.raya().check(&convo_id, S_M1)); + + // Raya replies; settle until Saro receives it. + info!(target: "chat", "Raya -> sending: {R_M1:?}"); + harness + .raya() + .send_content(&convo_id, R_M1) + .expect("raya send"); + + harness.process_until_label("Recv R_M1", |h| h.saro().check(&convo_id, R_M1)); + + // Raya (a non-creator) invites Pax; settle until Pax has joined. + let particpants = &[&harness.pax().addr()]; + harness + .raya() + .group_add_member(&convo_id, particpants) + .expect("Raya add Pax"); + + harness.process_until_label("Raya add Pax", |h| h.pax().convo_count() == 1); + + // Everyone must be at the SAME epoch after Pax joined: a marker Saro sends + // now decrypts only for members that applied the Add commit. + info!(target: "chat", "Saro -> sending: EPOCHCHK"); + harness.saro().send_content(&convo_id, S_M2).unwrap(); + + harness.process_until_label("epoch check", |h| { + h.raya().check(&convo_id, S_M2) && h.pax().check(&convo_id, S_M2) + }); +} + +#[test] +fn core_client_batch_add() { + // Saro creates the group and adds BOTH Raya and Pax at the same time: one + // Add commit producing a single welcome that names both joiners. + + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer(); + + let mut harness = TestHarness::<3>::new(|_, _| {}); + + let particpants = &[&harness.raya().addr(), &harness.pax().addr()]; + harness + .saro() + .create_group_convo_v2(particpants) + .expect("Saro create"); + + // Carry the invite through (commit, WelcomeReady, routing to Raya's inbox, + // accept_welcome); settle until Raya has joined. + harness.process_until_label("saro create", |h| { + h.raya().convo_count() == 1 && h.pax().convo_count() == 1 + }); +} + +#[test] +fn core_client_four_members_two_epochs() { + // Epoch 1: Saro creates and batch-adds Raya + Pax (3 members). Epoch 2: Raya + // (a non-creator) adds a 4th member, Mira. Afterwards every member must be + // at the same epoch (each can decrypt a freshly-sent message) and settled + // back in Working (the >sn_max election that the 4th member triggers must + // have completed — no one stuck in Freezing/Selection/Reelection). + + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + const MSG: &[u8] = b"CONVERGED"; + + let mut harness = TestHarness::<4>::new(|_, _| {}); + + let particpants = &[&harness.raya().addr(), &harness.pax().addr()]; + let convo_id = harness + .saro() + .create_group_convo_v2(particpants) + .expect("Saro create"); + + // Carry the invite through (commit, WelcomeReady, routing to Raya's inbox, + // accept_welcome); settle until Raya has joined. + harness.process_until_label("Raya + Pax join", |h| { + h.raya().convo_count() == 1 && h.pax().convo_count() == 1 + }); + + // Epoch 2: Raya adds the 4th member; settle until Mira has joined and the + // >sn_max election has returned everyone to Working. + let members = &[&harness.mira().addr()]; + harness + .raya() + .group_add_member(&convo_id, members) + .expect("Add Mira"); + + // TODO: Add State == Working for all clients + harness.process_until_label("Mira join", |h| h.mira().convo_count() == 1); + + // Same epoch: a message Saro sends now must reach all three peers. + harness + .saro() + .send_content(&convo_id, MSG) + .expect("Saro send"); + + harness.process_until_label("all chats converge", |h| { + h.raya().check(&convo_id, MSG) + && h.pax().check(&convo_id, MSG) + && h.mira().check(&convo_id, MSG) + }); +}