libchat/crates/client/tests/saro_and_raya.rs
osmaczko 5d02196e6a
fix: deliver DirectV1 Welcome to account-associated invitees (routing_id)
DirectV1 over a shared key-package registry (HttpRegistry) could not deliver
the MLS Welcome to an account-associated invitee. Key-package resolution
requires the shared address to be the account key (the account directory
resolves it to the device key HttpRegistry stores the package under), but the
Welcome was routed to, and the invitee's InboxV2 subscribed and gated on, the
credential id (hex of the DelegateCredential TLV). The two strings never
matched, so the Welcome fell through the dispatch gate to PayloadOutcome::Empty
and the invitee never joined. EphemeralRegistry hid this by keying key-packages
on the credential id, collapsing both halves onto one string.

Decouple the 1:1 routing identity from the credential identity:

- Add a defaulted IdentityProvider::routing_id() (defaults to id()).
- DelegateSigner returns its account address from routing_id() once associated.
- Core::assemble feeds InboxV2 routing_id() instead of id(); the MLS credential,
  member id, sender id, and decode_sender keep reading id(), so MLS membership
  and sender attribution are unchanged.

Add a regression test (direct_v1_associated_invitee_receives_welcome) over a
DeviceKeyedRegistry that keys key-packages by the device verifying key, as the
deployed HttpRegistry does; it fails without routing_id and passes with it.
2026-06-27 13:58:05 +02:00

432 lines
15 KiB
Rust

use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use components::EphemeralRegistry;
use crossbeam_channel::{Receiver, Sender};
use crypto::Ed25519VerifyingKey;
use libchat::{
AccountDirectory, DeviceSet, IdentityProvider, RegistrationService, SignedDeviceBundle,
encode_bundle_payload, verify_bundle,
};
use logos_account::TestLogosAccount;
use logos_chat::{
AddressedEnvelope, ChatClient, ChatClientBuilder, DelegateSigner, DeliveryService, Event,
InProcessDelivery, MessageBus, Transport,
};
/// Publish a signed device bundle endorsing `device` as a device of `account`,
/// so a receiver can verify the sender's account → device mapping.
fn publish_device_bundle(
reg: &mut impl AccountDirectory<Error = String>,
account: &TestLogosAccount,
device: &Ed25519VerifyingKey,
) {
let payload = encode_bundle_payload(0, std::slice::from_ref(device));
let signature = account.sign(&payload);
let bundle = SignedDeviceBundle {
account_pub: account.public_key().clone(),
payload,
signature,
};
reg.publish(&bundle).unwrap();
}
#[allow(clippy::type_complexity)]
fn create_test_client(
message_bus: MessageBus,
reg: EphemeralRegistry,
) -> Result<
(
ChatClient<DelegateSigner, InProcessDelivery, EphemeralRegistry, libchat::ChatStorage>,
Receiver<Event>,
),
logos_chat::ClientError,
> {
let d = InProcessDelivery::new(message_bus);
ChatClientBuilder::new()
.transport(d)
.registration(reg)
.build()
}
/// Block until the next event arrives and matches; panic on timeout/mismatch.
fn expect_event<F, T>(events: &Receiver<Event>, label: &str, mut f: F) -> T
where
F: FnMut(Event) -> Result<T, Event>,
{
let event = events
.recv_timeout(Duration::from_secs(5))
.unwrap_or_else(|_| panic!("timed out waiting for {label}"));
f(event).unwrap_or_else(|other| panic!("expected {label}, got {other:?}"))
}
#[test]
fn direct_v1_integration() {
let bus = MessageBus::default();
let reg_service = EphemeralRegistry::new();
let (mut saro, _saro_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let (raya, raya_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let convo_id = saro.create_direct_conversation(raya.addr()).unwrap();
// The invite payload yields ConversationStarted then MessageReceived.
expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
other => Err(other),
});
saro.send_message(&convo_id, b"Hey from saro")
.expect("payload mismatch");
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), b"Hey from saro");
Ok(())
}
other => Err(other),
});
}
#[test]
fn direct_v1_standalone_integration() {
let bus = MessageBus::default();
let mut reg_service = EphemeralRegistry::new();
// Create accounts and delegates, associate each delegate with its account
// address, and publish a device bundle so the receiver can verify the
// account → device mapping carried in the sender's credential.
let saro_account = TestLogosAccount::new("Saro");
let saro_account_id = hex::encode(saro_account.public_key().as_ref());
let mut saro_delegate = DelegateSigner::random();
saro_delegate.associate(saro_account_id.clone());
let saro_device_id = hex::encode(saro_delegate.public_key().as_ref());
publish_device_bundle(&mut reg_service, &saro_account, saro_delegate.public_key());
let raya_account = TestLogosAccount::new("Raya");
let mut raya_delegate = DelegateSigner::random();
raya_delegate.associate(hex::encode(raya_account.public_key().as_ref()));
publish_device_bundle(&mut reg_service, &raya_account, raya_delegate.public_key());
// Build saro's client with its associated delegate so its outbound messages
// carry a credential the receiver can verify against the published bundle.
let (mut saro, _saro_events) = ChatClientBuilder::new()
.ident(saro_delegate)
.transport(InProcessDelivery::new(bus.clone()))
.registration(reg_service.clone())
.build()
.expect("client create");
let (raya, raya_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let raya_addr = raya.addr();
let convo_id = saro.create_direct_conversation(raya_addr).unwrap();
// The invite payload yields ConversationStarted then MessageReceived.
expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
other => Err(other),
});
saro.send_message(&convo_id, b"Hey from saro")
.expect("payload mismatch");
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived {
content, sender, ..
} => {
assert_eq!(content.as_slice(), b"Hey from saro");
// saro associated an account and published a matching bundle, so the
// sender surfaces with a verified account and its device.
assert_eq!(
sender.account.as_ref().map(|a| a.as_str()),
Some(saro_account_id.as_str())
);
assert_eq!(sender.local_identity.as_str(), saro_device_id.as_str());
Ok(())
}
other => Err(other),
});
}
/// Test registry that keys keypackages by `hex(public_key())`, exactly as the
/// deployed `HttpRegistry` does (the server keys by the device verifying key for
/// proof-of-possession). [`EphemeralRegistry`] instead keys by `id()`, which
/// collapses the account/credential split and so cannot exercise an associated
/// invitee: the account directory resolves a shared account address to a device
/// key, and the keypackage must be retrievable under that device key.
#[derive(Clone, Default)]
struct DeviceKeyedRegistry {
key_packages: Arc<Mutex<HashMap<String, Vec<u8>>>>,
installations: Arc<Mutex<HashMap<String, SignedDeviceBundle>>>,
}
impl std::fmt::Debug for DeviceKeyedRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("DeviceKeyedRegistry")
}
}
impl RegistrationService for DeviceKeyedRegistry {
type Error = String;
fn register(
&mut self,
identity: &dyn IdentityProvider,
key_bundle: Vec<u8>,
) -> Result<(), String> {
self.key_packages
.lock()
.unwrap()
.insert(hex::encode(identity.public_key().as_ref()), key_bundle);
Ok(())
}
fn retrieve(&self, device_id: &str) -> Result<Option<Vec<u8>>, String> {
Ok(self.key_packages.lock().unwrap().get(device_id).cloned())
}
}
impl AccountDirectory for DeviceKeyedRegistry {
type Error = String;
fn publish(&mut self, bundle: &SignedDeviceBundle) -> Result<(), String> {
self.installations
.lock()
.unwrap()
.insert(hex::encode(bundle.account_pub.as_ref()), bundle.clone());
Ok(())
}
fn fetch(&self, account: &Ed25519VerifyingKey) -> Result<Option<DeviceSet>, String> {
let Some(bundle) = self
.installations
.lock()
.unwrap()
.get(&hex::encode(account.as_ref()))
.cloned()
else {
return Ok(None);
};
verify_bundle(account, &bundle)
.map(Some)
.map_err(|e| e.to_string())
}
}
/// Regression test for the account-flow Welcome-routing fix (`routing_id`). The
/// invitee (raya) is an account-associated delegate, so the address the inviter
/// shares is raya's *account* key. Before the fix raya's InboxV2 subscribed and
/// gated on her *credential* id, so the Welcome — routed to the account address —
/// never reached her; with `routing_id` she routes on her account address, so it
/// lands and she joins. Needs a `hex(public_key())`-keyed registry because the
/// account directory resolves raya's account to her device key.
#[test]
fn direct_v1_associated_invitee_receives_welcome() {
let bus = MessageBus::default();
let mut reg = DeviceKeyedRegistry::default();
let raya_account = TestLogosAccount::new("Raya");
let raya_account_id = hex::encode(raya_account.public_key().as_ref());
let mut raya_delegate = DelegateSigner::random();
raya_delegate.associate(raya_account_id.clone());
publish_device_bundle(&mut reg, &raya_account, raya_delegate.public_key());
let (mut saro, _saro_events) = ChatClientBuilder::new()
.transport(InProcessDelivery::new(bus.clone()))
.registration(reg.clone())
.build()
.expect("client create");
let (raya, raya_events) = ChatClientBuilder::new()
.ident(raya_delegate)
.transport(InProcessDelivery::new(bus.clone()))
.registration(reg.clone())
.build()
.expect("client create");
// Raya is addressed by her account id (routing_id), not her credential TLV.
assert_eq!(raya.addr(), raya_account_id.as_str());
// Saro opens the conversation with raya's account address, shared out of band.
let convo_id = saro.create_direct_conversation(&raya_account_id).unwrap();
expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted { .. } => Ok(()),
other => Err(other),
});
saro.send_message(&convo_id, b"hey raya").unwrap();
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), b"hey raya");
Ok(())
}
other => Err(other),
});
}
#[test]
fn saro_raya_message_exchange() {
let bus = MessageBus::default();
let reg_service = EphemeralRegistry::new();
let (mut saro, saro_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let (mut raya, raya_events) =
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
let saro_convo_id = saro
.create_direct_conversation(raya.addr())
.expect("convo create");
// Wait for raya to process the Welcome and subscribe to the convo delivery
// address before saro sends — MessageBus only fans out to current subscribers,
// so a message sent before raya subscribes would be silently dropped.
let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e {
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
other => Err(other),
});
saro.send_message(&saro_convo_id, b"hello raya").unwrap();
expect_event(&raya_events, "MessageReceived", |e| match e {
Event::MessageReceived {
convo_id,
content,
sender,
} => {
assert_eq!(convo_id, raya_convo_id);
assert_eq!(content.as_slice(), b"hello raya");
// saro's delegate is unassociated, so the sender surfaces its device
// but claims no account.
assert!(sender.account.is_none());
assert!(!sender.local_identity.as_str().is_empty());
Ok(())
}
other => Err(other),
});
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
expect_event(&saro_events, "MessageReceived", |e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), b"hi saro");
Ok(())
}
other => Err(other),
});
for i in 0u8..5 {
let msg = format!("msg {i}");
saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap();
expect_event(
&raya_events,
&format!("MessageReceived(msg {i})"),
|e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), msg.as_bytes());
Ok(())
}
other => Err(other),
},
);
let reply = format!("reply {i}");
raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap();
expect_event(
&saro_events,
&format!("MessageReceived(reply {i})"),
|e| match e {
Event::MessageReceived { content, .. } => {
assert_eq!(content.as_slice(), reply.as_bytes());
Ok(())
}
other => Err(other),
},
);
}
assert_eq!(saro.list_conversations().unwrap().len(), 1);
assert_eq!(raya.list_conversations().unwrap().len(), 1);
}
#[derive(Debug)]
struct FailingDelivery {
inbound_tx: Sender<Vec<u8>>,
inbound_rx: Option<Receiver<Vec<u8>>>,
}
impl FailingDelivery {
fn new() -> Self {
let (inbound_tx, inbound_rx) = crossbeam_channel::unbounded();
Self {
inbound_tx,
inbound_rx: Some(inbound_rx),
}
}
/// A sender into this transport's inbound stream — for tests to feed the
/// worker, or to hold open so it doesn't see a disconnect.
fn inbound_sender(&self) -> Sender<Vec<u8>> {
self.inbound_tx.clone()
}
}
impl DeliveryService for FailingDelivery {
type Error = &'static str;
fn publish(&mut self, _: AddressedEnvelope) -> Result<(), Self::Error> {
Err("simulated transport failure")
}
fn subscribe(&mut self, _: &str) -> Result<(), Self::Error> {
Ok(())
}
}
impl Transport for FailingDelivery {
fn inbound(&mut self) -> Receiver<Vec<u8>> {
self.inbound_rx
.take()
.expect("FailingDelivery::inbound called more than once")
}
}
#[test]
fn dropping_client_shuts_down_worker() {
let (client, events) =
create_test_client(MessageBus::default(), EphemeralRegistry::new()).expect("client create");
drop(client);
// Drop joins the worker; once joined its Sender<Event> is gone, so recv
// reports the channel as disconnected.
let res = events.recv_timeout(Duration::from_secs(5));
assert!(matches!(
res,
Err(crossbeam_channel::RecvTimeoutError::Disconnected)
));
}
#[test]
fn malformed_inbound_surfaces_as_error_event() {
// Feed the worker's inbound channel bytes that can't be decoded and assert
// it emits an InboundError instead of silently dropping the failure.
let delivery = FailingDelivery::new();
let inbound_tx = delivery.inbound_sender();
let (_client, events) = ChatClientBuilder::new()
.transport(delivery)
.build()
.expect("client create");
inbound_tx.send(b"not a valid payload".to_vec()).unwrap();
expect_event(&events, "InboundError", |e| match e {
Event::InboundError { message } => {
assert!(!message.is_empty(), "error event should carry a message");
Ok(())
}
other => Err(other),
});
}