Add TestHarness

This commit is contained in:
Jazz Turner-Baggs 2026-06-13 08:54:08 -07:00
parent 3245534790
commit ef7a27e9bb
No known key found for this signature in database
7 changed files with 742 additions and 195 deletions

View File

@ -6,13 +6,22 @@ edition = "2024"
# [[test]]
# name = "integration_tests_core"
[dev-dependencies]
[dependencies]
# Workspace dependencies (sorted)
chat-sqlite = { workspace = true }
components = { workspace = true }
libchat = { workspace = true }
logos-account = { workspace = true , features = ["dev"]}
logos-account = { workspace = true, features = ["dev"]}
shared-traits = { workspace = true }
# External dependencies (sorted)
tracing = "0.1"
[dev-dependencies]
chat-sqlite = { workspace = true }
storage = { workspace = true }
# External dependencies (sorted)
tempfile = "3"
tracing = "0.1.44"
tracing-subscriber = "0.3"

View File

@ -1 +1,4 @@
mod test_client;
mod wakeup;
pub use test_client::TestHarness;

View File

@ -0,0 +1,333 @@
use libchat::{ConversationId, Core, IdentityProvider, PayloadOutcome};
use logos_account::TestLogosAccount;
use shared_traits::IdentId;
use std::collections::HashMap;
use std::fmt::Debug;
use std::ops::{Deref, DerefMut};
use std::time::Duration;
use tracing::{info, warn};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use crate::wakeup::{TestWakeupProvider, TestWakeupService, WakeupRecord};
type WS = TestWakeupService;
type WP = TestWakeupProvider;
const SARO: usize = 0;
const RAYA: usize = 1;
const PAX: usize = 2;
const MIRA: usize = 3;
// type ClientType = CoreClient<TestLogosAccount, LocalBroadcaster, EphemeralRegistry, WP, MemStore>;
type ClientType = Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
WP,
MemStore,
)>;
#[derive(Debug)]
pub struct ReceivedMessage<T> {
pub convo_id: ConversationId,
pub contents: T,
}
pub struct TestClient {
inner: ClientType,
received_messages: Vec<ReceivedMessage<Vec<u8>>>,
}
impl TestClient {
fn init(client: ClientType) -> Self {
Self {
inner: client,
received_messages: vec![],
}
}
pub fn addr(&self) -> IdentId {
self.inner.ident_id().clone()
}
fn drain_outcomes(&mut self) -> Vec<PayloadOutcome> {
let mut messages = vec![];
while let Some(data) = self.inner.ds().poll() {
messages.push(data);
}
let mut outcomes = vec![];
for data in messages {
let outcome = self.inner.handle_payload(&data).unwrap();
warn!(id= ?self.ident_id(),?outcome, "DRAIN CLIENT");
// Copy Convo Messages to received buffer
match &outcome {
PayloadOutcome::Empty => continue,
PayloadOutcome::Convo(convo_outcome) => {
if let Some(data) = &convo_outcome.content {
info!(
content = String::from_utf8_lossy(&data.bytes).to_string(),
"COT"
);
self.received_messages.push(ReceivedMessage {
convo_id: convo_outcome.convo_id.clone(),
contents: data.bytes.clone(),
});
}
}
PayloadOutcome::Inbox(_) => {}
}
if !matches!(outcome, PayloadOutcome::Empty) {
outcomes.push(outcome);
}
}
outcomes
}
pub fn received_messages(&self) -> &[ReceivedMessage<Vec<u8>>] {
&self.received_messages
}
pub fn check(&self, convo_id: &str, content: &[u8]) -> bool {
for msg in &self.received_messages {
if msg.convo_id == convo_id && msg.contents == content {
return true;
}
}
false
}
pub fn convo_count(&self) -> usize {
self.list_conversations().map_or(0, |v| v.len())
}
}
impl Deref for TestClient {
type Target = ClientType;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for TestClient {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
#[allow(unused)]
pub struct Observation {
ident: IdentId,
outcome: PayloadOutcome,
}
#[allow(unused)]
pub struct TestHarness<const N: usize> {
addresses: HashMap<usize, IdentId>,
clients: Vec<TestClient>,
wakeup_service: WS,
cb: Box<dyn Fn(&TestClient, PayloadOutcome)>,
// List of outcomes that were detected across all clients.
pub observed_outcomes: Vec<Observation>,
}
impl<const N: usize> TestHarness<N> {
pub fn new(cb: impl Fn(&TestClient, PayloadOutcome) + 'static) -> Self {
const { assert!(N > 0, "TestHarness requires at least one client") };
const { assert!(N <= 4, "Only 4 clients are supported(Soft Limit") };
let mut clients = vec![];
let mut addresses = HashMap::new();
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let ws = TestWakeupService::new();
for i in 0..N {
let wp = ws.new_provider(i);
let ident = TestLogosAccount::new(Self::names(i));
addresses.insert(i, ident.id().clone());
let core_client =
ClientType::new_with_name(ident, ds.clone(), rs.clone(), wp, MemStore::new())
.unwrap();
let client = TestClient::init(core_client);
clients.push(client);
}
dbg!(&rs);
Self {
addresses,
clients,
wakeup_service: ws,
cb: Box::new(cb),
observed_outcomes: vec![],
}
}
pub fn client(&mut self, i: usize) -> &TestClient {
&self.clients[i]
}
pub fn client_mut(&mut self, i: usize) -> &mut TestClient {
&mut self.clients[i]
}
fn names(i: usize) -> &'static str {
match i {
SARO => "saro",
RAYA => "raya",
PAX => "pax",
MIRA => "mira",
_ => "unnamed",
}
}
pub fn process(&mut self, duration: Duration) {
self.process_payloads();
let records = self.wakeup_service.advance_time(duration);
self.process_records(records);
}
pub fn process_until(&mut self, predicate: impl Fn(&mut TestHarness<N>) -> bool) {
let timeout = Duration::from_mins(1);
let step = Duration::from_millis(50);
let mut elapsed = Duration::ZERO;
while !predicate(self) {
if elapsed >= timeout {
panic!("process_until timed out after {:?}", timeout);
}
self.process(step);
elapsed += step;
}
}
pub fn process_until_label(
&mut self,
label: &str,
predicate: impl Fn(&mut TestHarness<N>) -> bool,
) {
info!(label, "Process Until");
self.process_until(predicate);
}
fn process_payloads(&mut self) {
// Process existing payloads for all clients.
for client in self.clients.iter_mut() {
for outcome in client.drain_outcomes() {
info!(id = ?client.ident_id(), ?outcome, "Process drain");
self.observed_outcomes.push(Observation {
ident: client.ident_id().clone(),
outcome: outcome.clone(),
});
info!(id = ?client.ident_id(), ?outcome, "Process drain");
(self.cb)(client, outcome)
}
}
}
fn process_records(&mut self, records: Vec<WakeupRecord>) {
for record in records {
self.clients[record.client_index]
.wakeup(&record.convo_id)
.expect("Error During wakeup");
}
}
}
// Avoid Developer confusion by gating access functions
// based on the number of clients in the harness
impl TestHarness<1> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
}
impl TestHarness<2> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
pub fn raya(&mut self) -> &mut TestClient {
&mut self.clients[RAYA]
}
}
impl TestHarness<3> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
pub fn raya(&mut self) -> &mut TestClient {
&mut self.clients[RAYA]
}
pub fn pax(&mut self) -> &mut TestClient {
&mut self.clients[PAX]
}
}
impl TestHarness<4> {
pub fn saro(&mut self) -> &mut TestClient {
&mut self.clients[SARO]
}
pub fn raya(&mut self) -> &mut TestClient {
&mut self.clients[RAYA]
}
pub fn pax(&mut self) -> &mut TestClient {
&mut self.clients[PAX]
}
pub fn mira(&mut self) -> &mut TestClient {
&mut self.clients[MIRA]
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
let mut harness = TestHarness::<2>::new(|client, outcome| {
info!( id=?&client.ident_id(), outcome = ?outcome, "Result");
});
//Create Convo
let particpants = &[&harness.raya().addr()];
let convo_id = harness
.saro()
.create_group_convo(particpants)
.expect("saro create group");
harness.process_until_label("Raya Join", |h| h.raya().convo_count() == 1);
assert_eq!(harness.raya().convo_count(), 1, "raya did not join");
harness
.saro()
.send_content(convo_id.as_str(), b"Hello")
.expect("raya send");
harness.process(Duration::from_millis(200));
assert!(harness.raya().check(&convo_id, b"Hello"))
}
}

View File

@ -0,0 +1,176 @@
use libchat::{ConversationId, WakeupService};
use std::cell::RefCell;
use std::cmp::Reverse;
use std::collections::BinaryHeap;
use std::fmt::Debug;
use std::rc::Rc;
use std::time::Duration;
use tracing::{info, trace};
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd)]
pub(crate) struct WakeupRecord {
pub expiry: Duration,
pub client_index: usize,
pub convo_id: String,
}
pub struct TestWakeupProvider {
service: Rc<RefCell<InnerWakeupService>>,
client_index: usize,
}
impl Debug for TestWakeupProvider {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TestWakeupProvider")
.field("client_index", &self.client_index)
.finish()
}
}
impl TestWakeupProvider {
pub fn new(service: Rc<RefCell<InnerWakeupService>>, id: usize) -> Self {
Self {
service,
client_index: id,
}
}
}
impl WakeupService for TestWakeupProvider {
fn wakeup_in(&mut self, duration: Duration, convo_id: ConversationId) {
info!(?duration, convo_id, "Wakeup In");
self.service
.borrow_mut()
.register_wakeup(duration, self.client_index, convo_id);
}
}
pub struct InnerWakeupService {
now: Duration,
pending: BinaryHeap<Reverse<WakeupRecord>>,
}
impl InnerWakeupService {
pub fn new() -> Self {
Self {
now: Duration::new(0, 0),
pending: BinaryHeap::new(),
}
}
pub fn register_wakeup(&mut self, wake_in: Duration, client_index: usize, convo_id: String) {
info!(%client_index, ?wake_in, "ask for wake up");
self.pending.push(Reverse(WakeupRecord {
expiry: self.now + wake_in,
client_index,
convo_id,
}));
}
fn get_expired(&mut self) -> Vec<WakeupRecord> {
trace!("Get Expired");
let mut fired = vec![];
while self
.pending
.peek()
.is_some_and(|Reverse(w)| w.expiry <= self.now)
{
let Reverse(w) = self.pending.pop().unwrap();
info!(now = self.now.as_secs(), w.convo_id, "Popping");
fired.push(w);
}
fired
}
}
pub struct TestWakeupService {
inner: Rc<RefCell<InnerWakeupService>>,
}
impl Debug for TestWakeupService {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let srv = self.inner.borrow_mut();
f.debug_struct("TestWakeupService")
.field("heap", &srv.pending)
.finish()
}
}
impl TestWakeupService {
pub fn new() -> Self {
Self {
inner: Rc::new(RefCell::new(InnerWakeupService::new())),
}
}
pub fn new_provider(&self, id: usize) -> TestWakeupProvider {
TestWakeupProvider {
service: self.inner.clone(),
client_index: id,
}
}
// Returns the ConvoIDs that triggered in order
pub fn advance_time(&mut self, duration: Duration) -> Vec<WakeupRecord> {
let mut srv = self.inner.borrow_mut();
trace!(?duration, "Advanced");
// de-mls deadlines are real wall-clock; sleep so the millisecond-scale
// commit/consensus timers actually elapse between poll cycles
// Note: This is error prone as WakeupService tracks its own `now` variable. Does not account for processing time.
std::thread::sleep(duration);
srv.now = srv.now.checked_add(duration).unwrap();
srv.get_expired()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_wakeup_service() {
let _ = tracing_subscriber::fmt().with_test_writer().try_init();
let mut ws = TestWakeupService::new();
let mut p1 = ws.new_provider(1);
let mut p2 = ws.new_provider(2);
p1.wakeup_in(Duration::from_secs(2), "convo1".into());
p1.wakeup_in(Duration::from_secs(4), "convo1".into());
p2.wakeup_in(Duration::from_secs(5), "convo1".into());
p2.wakeup_in(Duration::from_secs(4), "convo1".into());
{
let batch = ws.advance_time(Duration::from_secs(2));
assert_eq!(batch.len(), 1, "too many records");
assert_eq!(batch[0].client_index, 1, "client mismatch");
}
{
let batch = ws.advance_time(Duration::from_secs(2));
assert_eq!(batch.len(), 2, "too many records");
assert_eq!(
batch[0].client_index, 1,
"client 1 shoudld be first, as it was entered first"
);
assert_eq!(batch[1].client_index, 2, "client 2 should be second");
}
{
let batch = ws.advance_time(Duration::from_secs(1));
assert_eq!(batch.len(), 1, "too many records");
assert_eq!(batch[0].client_index, 2, "client mismatch");
}
{
let batch = ws.advance_time(Duration::from_secs(1));
assert_eq!(batch.len(), 0, "records should be completely drained");
}
}
}

View File

@ -107,7 +107,7 @@ fn missing_group_message_is_detected() {
// Saro creates a group with Raya.
let raya_id = raya.ident_id().clone();
let convo_id = saro.create_group_convo(&[&raya_id]).unwrap().to_string();
let convo_id = saro.create_group_convo_v1(&[&raya_id]).unwrap().to_string();
// Raya joins (processes the Welcome + commit).
raya.process_messages();

View File

@ -1,212 +1,60 @@
use std::ops::{Deref, DerefMut};
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
use libchat::{
Content, ConversationClass, ConvoOutcome, Core, NewConversation, PayloadOutcome, hex_trunc,
};
use logos_account::TestLogosAccount;
type ResultCallback = Box<dyn Fn(&PayloadOutcome)>;
// Simple client Functionality for testing
struct Client {
inner: Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
MemStore,
)>,
on_result: Option<ResultCallback>,
new_conversations: Vec<NewConversation>,
received_messages: Vec<(libchat::ConversationId, Content)>,
}
impl Client {
fn init(
core: Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
MemStore,
)>,
cb: Option<impl Fn(&PayloadOutcome) + 'static>,
) -> Self {
Client {
inner: core,
on_result: cb.map(|f| Box::new(f) as ResultCallback),
new_conversations: Vec::new(),
received_messages: Vec::new(),
}
}
fn process_messages(&mut self) {
let payloads: Vec<_> = {
let ds = self.ds();
std::iter::from_fn(|| ds.poll()).collect()
};
for data in payloads {
let result = self.handle_payload(&data).unwrap();
if let Some(cb) = &self.on_result {
cb(&result);
}
match result {
PayloadOutcome::Empty => {}
PayloadOutcome::Convo(co) => self.absorb_convo_outcome(co),
PayloadOutcome::Inbox(io) => {
self.new_conversations.push(io.new_conversation);
if let Some(initial) = io.initial {
self.absorb_convo_outcome(initial);
}
}
}
}
}
fn absorb_convo_outcome(&mut self, outcome: ConvoOutcome) {
if let Some(content) = outcome.content {
self.received_messages.push((outcome.convo_id, content));
}
}
}
impl Deref for Client {
type Target = Core<(
TestLogosAccount,
LocalBroadcaster,
EphemeralRegistry,
MemStore,
)>;
fn deref(&self) -> &Self::Target {
&self.inner
}
}
impl DerefMut for Client {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.inner
}
}
// Higher order function to handle printing
fn pretty_print(prefix: impl Into<String>) -> ResultCallback {
let prefix = prefix.into();
Box::new(move |result: &PayloadOutcome| match result {
PayloadOutcome::Empty => {}
PayloadOutcome::Inbox(io) => {
let cid = hex_trunc(io.new_conversation.convo_id.as_bytes());
println!(
"{prefix} ({cid:?}) [conversation started: {:?}]",
io.new_conversation.class
);
if let Some(initial) = &io.initial {
print_contents(&prefix, initial);
}
}
PayloadOutcome::Convo(co) => print_contents(&prefix, co),
})
}
fn print_contents(prefix: &str, outcome: &ConvoOutcome) {
let cid = hex_trunc(outcome.convo_id.as_bytes());
if let Some(content) = &outcome.content {
let text = String::from_utf8_lossy(&content.bytes);
println!("{prefix} ({cid:?}) {text}");
}
}
fn process(clients: &mut Vec<Client>) {
for client in clients {
client.process_messages();
}
}
use integration_tests_core::TestHarness;
use std::time::Duration;
#[test]
fn create_group() {
let ds = LocalBroadcaster::new();
let rs = EphemeralRegistry::new();
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
let saro_ident = TestLogosAccount::new("saro");
let saro =
Core::new_with_name(saro_ident, ds.new_consumer(), rs.clone(), MemStore::new()).unwrap();
let mut harness = TestHarness::<3>::new(|_, _| {});
let raya_ident = TestLogosAccount::new("raya");
let raya = Core::new_with_name(raya_ident, ds.clone(), rs.clone(), MemStore::new()).unwrap();
let raya_id = harness.raya().ident_id().clone();
let pax_id = harness.pax().ident_id().clone();
let mut clients = vec![
Client::init(saro, Some(pretty_print(" Saro "))),
Client::init(raya, Some(pretty_print(" Raya "))),
];
const M_R1: &[u8; 12] = b"Hi From Raya";
const M_P1: &[u8; 13] = b"Hey it's Pax!";
const SARO: usize = 0;
const RAYA: usize = 1;
// Step: Saro Create Convo with Raya
let raya_id = clients[RAYA].ident_id().clone();
let convo_id = clients[SARO]
.create_group_convo(&[&raya_id])
.unwrap()
.to_string();
let convo_id = harness
.saro()
.create_group_convo_v1(&[&raya_id])
.expect("Saro invite Raya ");
harness.process_until(|h| h.raya().list_conversations().unwrap().len() == 1);
// Raya can read this message because
// 1) It was sent after add_members was committed, and
// 2) LocalBroadcaster provides historical messages.
// Step: Raya Send Content
clients[SARO]
.send_content(&convo_id, b"ok who broke the group chat again")
.unwrap();
harness
.raya()
.send_content(&convo_id, M_R1)
.expect("Raya send Msg");
process(&mut clients);
harness.process_until(|h| h.saro().received_messages().len() == 1);
// Raya should observe exactly one new Group conversation from the
// welcome, even though no initial content arrives with it.
let raya_started = clients[RAYA]
.new_conversations
.iter()
.filter(|nc| matches!(nc.class, ConversationClass::Group))
.count();
assert_eq!(
raya_started, 1,
"Raya should have observed exactly one new Group conversation for the welcome"
);
// Step: Saro add Pax
clients[RAYA]
.send_content(&convo_id, b"it was literally working five minutes ago")
.unwrap();
process(&mut clients);
let pax_ident = TestLogosAccount::new("pax");
let pax = Core::new_with_name(pax_ident, ds, rs, MemStore::new()).unwrap();
clients.push(Client::init(pax, Some(pretty_print(" Pax"))));
const PAX: usize = 2;
let pax_id = clients[PAX].ident_id().clone();
clients[SARO]
harness
.saro()
.group_add_member(&convo_id, &[&pax_id])
.unwrap();
.expect("Saro invite pax");
harness.process_until(|h| h.pax().list_conversations().unwrap().len() == 1);
process(&mut clients);
// Step: Pax send Content
let pax_started = clients[PAX]
.new_conversations
.iter()
.filter(|nc| matches!(nc.class, ConversationClass::Group))
.count();
assert_eq!(
pax_started, 1,
"Pax should have observed exactly one new Group conversation for the welcome"
);
harness
.pax()
.send_content(&convo_id, M_P1)
.expect("Pax send");
harness.process(Duration::from_millis(500));
clients[PAX]
.send_content(&convo_id, b"ngl the key rotation is cooked")
.unwrap();
assert!(harness.saro().check(&convo_id, M_R1));
assert!(harness.saro().check(&convo_id, M_P1));
process(&mut clients);
assert!(!harness.raya().check(&convo_id, M_R1));
assert!(harness.raya().check(&convo_id, M_P1));
clients[SARO]
.send_content(&convo_id, b"bro we literally just added you to the group ")
.unwrap();
process(&mut clients);
assert!(!harness.pax().check(&convo_id, M_R1));
assert!(!harness.pax().check(&convo_id, M_P1));
}

View File

@ -0,0 +1,178 @@
use integration_tests_core::TestHarness;
use tracing::info;
#[test]
fn groupv2_2way_roundtrip() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
const S_M1: &[u8] = b"aaaaa";
const R_M1: &[u8] = b"Hello";
// Initialize TestHarness with 2 clients
let mut harness = TestHarness::<2>::new(|_, _| {});
//Saro Create Convo
let particpants = &[&harness.raya().addr()];
let convo_id = harness
.saro()
.create_group_convo_v2(particpants)
.expect("saro create group");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("Saro Send", |h| h.raya().convo_count() == 1);
// Saro sends a message; settle until Raya receives it.
info!(target: "chat", "Saro -> sending: {S_M1:?}");
harness
.saro()
.send_content(&convo_id, S_M1)
.expect("saro send");
harness.process_until(|h| h.raya().check(&convo_id, S_M1));
// Raya replies; settle until Saro receives it.
info!(target: "chat", "Raya -> sending:{R_M1:?}");
harness.raya().send_content(&convo_id, R_M1).unwrap();
harness.process_until(|h| h.saro().check(&convo_id, R_M1));
}
#[test]
fn core_client() {
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
const S_M1: &[u8] = b"HI";
const R_M1: &[u8] = b"hi back";
const S_M2: &[u8] = b"EPOCHCHK";
let mut harness = TestHarness::<3>::new(|_, _| {});
let particpants = &[&harness.raya().addr()];
let convo_id = harness
.saro()
.create_group_convo_v2(particpants)
.expect("Saro create");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("saro create", |h| h.raya().convo_count() == 1);
// Saro sends a message; settle until Raya receives it.
info!(target: "chat", "Saro -> sending: {S_M1:?}");
harness
.saro()
.send_content(&convo_id, S_M1)
.expect("saro send");
harness.process_until_label("Recv S_M1", |h| h.raya().check(&convo_id, S_M1));
// Raya replies; settle until Saro receives it.
info!(target: "chat", "Raya -> sending: {R_M1:?}");
harness
.raya()
.send_content(&convo_id, R_M1)
.expect("raya send");
harness.process_until_label("Recv R_M1", |h| h.saro().check(&convo_id, R_M1));
// Raya (a non-creator) invites Pax; settle until Pax has joined.
let particpants = &[&harness.pax().addr()];
harness
.raya()
.group_add_member(&convo_id, particpants)
.expect("Raya add Pax");
harness.process_until_label("Raya add Pax", |h| h.pax().convo_count() == 1);
// Everyone must be at the SAME epoch after Pax joined: a marker Saro sends
// now decrypts only for members that applied the Add commit.
info!(target: "chat", "Saro -> sending: EPOCHCHK");
harness.saro().send_content(&convo_id, S_M2).unwrap();
harness.process_until_label("epoch check", |h| {
h.raya().check(&convo_id, S_M2) && h.pax().check(&convo_id, S_M2)
});
}
#[test]
fn core_client_batch_add() {
// Saro creates the group and adds BOTH Raya and Pax at the same time: one
// Add commit producing a single welcome that names both joiners.
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer();
let mut harness = TestHarness::<3>::new(|_, _| {});
let particpants = &[&harness.raya().addr(), &harness.pax().addr()];
harness
.saro()
.create_group_convo_v2(particpants)
.expect("Saro create");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("saro create", |h| {
h.raya().convo_count() == 1 && h.pax().convo_count() == 1
});
}
#[test]
fn core_client_four_members_two_epochs() {
// Epoch 1: Saro creates and batch-adds Raya + Pax (3 members). Epoch 2: Raya
// (a non-creator) adds a 4th member, Mira. Afterwards every member must be
// at the same epoch (each can decrypt a freshly-sent message) and settled
// back in Working (the >sn_max election that the 4th member triggers must
// have completed — no one stuck in Freezing/Selection/Reelection).
let _ = tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_test_writer()
.try_init();
const MSG: &[u8] = b"CONVERGED";
let mut harness = TestHarness::<4>::new(|_, _| {});
let particpants = &[&harness.raya().addr(), &harness.pax().addr()];
let convo_id = harness
.saro()
.create_group_convo_v2(particpants)
.expect("Saro create");
// Carry the invite through (commit, WelcomeReady, routing to Raya's inbox,
// accept_welcome); settle until Raya has joined.
harness.process_until_label("Raya + Pax join", |h| {
h.raya().convo_count() == 1 && h.pax().convo_count() == 1
});
// Epoch 2: Raya adds the 4th member; settle until Mira has joined and the
// >sn_max election has returned everyone to Working.
let members = &[&harness.mira().addr()];
harness
.raya()
.group_add_member(&convo_id, members)
.expect("Add Mira");
// TODO: Add State == Working for all clients
harness.process_until_label("Mira join", |h| h.mira().convo_count() == 1);
// Same epoch: a message Saro sends now must reach all three peers.
harness
.saro()
.send_content(&convo_id, MSG)
.expect("Saro send");
harness.process_until_label("all chats converge", |h| {
h.raya().check(&convo_id, MSG)
&& h.pax().check(&convo_id, MSG)
&& h.mira().check(&convo_id, MSG)
});
}