mirror of
https://github.com/logos-messaging/libchat.git
synced 2026-06-28 12:09:30 +00:00
* Simplify client * Fixups * Update Cli-Client to use builder * undeprecate legacy convos * Allow Storage config in builder * bug fixes * Clippy fix * fixes
286 lines
9.5 KiB
Rust
286 lines
9.5 KiB
Rust
use std::time::Duration;
|
|
|
|
use components::EphemeralRegistry;
|
|
use crossbeam_channel::{Receiver, Sender};
|
|
use crypto::Ed25519VerifyingKey;
|
|
use libchat::{AccountDirectory, IdentityProvider, SignedDeviceBundle, encode_bundle_payload};
|
|
use logos_account::TestLogosAccount;
|
|
use logos_chat::{
|
|
AddressedEnvelope, ChatClient, ChatClientBuilder, DelegateSigner, DeliveryService, Event,
|
|
InProcessDelivery, MessageBus, Transport,
|
|
};
|
|
|
|
/// Publish a signed device bundle endorsing `device` as a device of `account`,
|
|
/// so a receiver can verify the sender's account → device mapping.
|
|
fn publish_device_bundle(
|
|
reg: &mut EphemeralRegistry,
|
|
account: &TestLogosAccount,
|
|
device: &Ed25519VerifyingKey,
|
|
) {
|
|
let payload = encode_bundle_payload(0, std::slice::from_ref(device));
|
|
let signature = account.sign(&payload);
|
|
let bundle = SignedDeviceBundle {
|
|
account_pub: account.public_key().clone(),
|
|
payload,
|
|
signature,
|
|
};
|
|
reg.publish(&bundle).unwrap();
|
|
}
|
|
|
|
#[allow(clippy::type_complexity)]
|
|
fn create_test_client(
|
|
message_bus: MessageBus,
|
|
reg: EphemeralRegistry,
|
|
) -> Result<
|
|
(
|
|
ChatClient<DelegateSigner, InProcessDelivery, EphemeralRegistry, libchat::ChatStorage>,
|
|
Receiver<Event>,
|
|
),
|
|
logos_chat::ClientError,
|
|
> {
|
|
let d = InProcessDelivery::new(message_bus);
|
|
ChatClientBuilder::new()
|
|
.transport(d)
|
|
.registration(reg)
|
|
.build()
|
|
}
|
|
|
|
/// Block until the next event arrives and matches; panic on timeout/mismatch.
|
|
fn expect_event<F, T>(events: &Receiver<Event>, label: &str, mut f: F) -> T
|
|
where
|
|
F: FnMut(Event) -> Result<T, Event>,
|
|
{
|
|
let event = events
|
|
.recv_timeout(Duration::from_secs(5))
|
|
.unwrap_or_else(|_| panic!("timed out waiting for {label}"));
|
|
f(event).unwrap_or_else(|other| panic!("expected {label}, got {other:?}"))
|
|
}
|
|
|
|
#[test]
|
|
fn direct_v1_integration() {
|
|
let bus = MessageBus::default();
|
|
let reg_service = EphemeralRegistry::new();
|
|
|
|
let (mut saro, _saro_events) =
|
|
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
|
let (raya, raya_events) =
|
|
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
|
|
|
let convo_id = saro.create_direct_conversation(raya.addr()).unwrap();
|
|
|
|
// The invite payload yields ConversationStarted then MessageReceived.
|
|
expect_event(&raya_events, "ConversationStarted", |e| match e {
|
|
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
|
|
other => Err(other),
|
|
});
|
|
|
|
saro.send_message(&convo_id, b"Hey from saro")
|
|
.expect("payload mismatch");
|
|
expect_event(&raya_events, "MessageReceived", |e| match e {
|
|
Event::MessageReceived { content, .. } => {
|
|
assert_eq!(content.as_slice(), b"Hey from saro");
|
|
Ok(())
|
|
}
|
|
other => Err(other),
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn direct_v1_standalone_integration() {
|
|
let bus = MessageBus::default();
|
|
|
|
let mut reg_service = EphemeralRegistry::new();
|
|
|
|
// Create accounts and delegates, associate each delegate with its account
|
|
// address, and publish a device bundle so the receiver can verify the
|
|
// account → device mapping carried in the sender's credential.
|
|
let saro_account = TestLogosAccount::new("Saro");
|
|
let mut saro_delegate = DelegateSigner::random();
|
|
saro_delegate.associate(hex::encode(saro_account.public_key().as_ref()));
|
|
publish_device_bundle(&mut reg_service, &saro_account, saro_delegate.public_key());
|
|
|
|
let raya_account = TestLogosAccount::new("Raya");
|
|
let mut raya_delegate = DelegateSigner::random();
|
|
raya_delegate.associate(hex::encode(raya_account.public_key().as_ref()));
|
|
publish_device_bundle(&mut reg_service, &raya_account, raya_delegate.public_key());
|
|
|
|
let (mut saro, _saro_events) =
|
|
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
|
let (raya, raya_events) =
|
|
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
|
|
|
let raya_addr = raya.addr();
|
|
let convo_id = saro.create_direct_conversation(raya_addr).unwrap();
|
|
|
|
// The invite payload yields ConversationStarted then MessageReceived.
|
|
expect_event(&raya_events, "ConversationStarted", |e| match e {
|
|
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
|
|
other => Err(other),
|
|
});
|
|
|
|
saro.send_message(&convo_id, b"Hey from saro")
|
|
.expect("payload mismatch");
|
|
expect_event(&raya_events, "MessageReceived", |e| match e {
|
|
Event::MessageReceived { content, .. } => {
|
|
assert_eq!(content.as_slice(), b"Hey from saro");
|
|
Ok(())
|
|
}
|
|
other => Err(other),
|
|
});
|
|
}
|
|
|
|
#[test]
|
|
fn saro_raya_message_exchange() {
|
|
let bus = MessageBus::default();
|
|
let reg_service = EphemeralRegistry::new();
|
|
|
|
let (mut saro, saro_events) =
|
|
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
|
let (mut raya, raya_events) =
|
|
create_test_client(bus.clone(), reg_service.clone()).expect("client create");
|
|
|
|
let saro_convo_id = saro
|
|
.create_direct_conversation(raya.addr())
|
|
.expect("convo create");
|
|
|
|
// Wait for raya to process the Welcome and subscribe to the convo delivery
|
|
// address before saro sends — MessageBus only fans out to current subscribers,
|
|
// so a message sent before raya subscribes would be silently dropped.
|
|
let raya_convo_id = expect_event(&raya_events, "ConversationStarted", |e| match e {
|
|
Event::ConversationStarted { convo_id, .. } => Ok(convo_id),
|
|
other => Err(other),
|
|
});
|
|
|
|
saro.send_message(&saro_convo_id, b"hello raya").unwrap();
|
|
expect_event(&raya_events, "MessageReceived", |e| match e {
|
|
Event::MessageReceived { convo_id, content } => {
|
|
assert_eq!(convo_id, raya_convo_id);
|
|
assert_eq!(content.as_slice(), b"hello raya");
|
|
Ok(())
|
|
}
|
|
other => Err(other),
|
|
});
|
|
|
|
raya.send_message(&raya_convo_id, b"hi saro").unwrap();
|
|
expect_event(&saro_events, "MessageReceived", |e| match e {
|
|
Event::MessageReceived { content, .. } => {
|
|
assert_eq!(content.as_slice(), b"hi saro");
|
|
Ok(())
|
|
}
|
|
other => Err(other),
|
|
});
|
|
|
|
for i in 0u8..5 {
|
|
let msg = format!("msg {i}");
|
|
saro.send_message(&saro_convo_id, msg.as_bytes()).unwrap();
|
|
expect_event(
|
|
&raya_events,
|
|
&format!("MessageReceived(msg {i})"),
|
|
|e| match e {
|
|
Event::MessageReceived { content, .. } => {
|
|
assert_eq!(content.as_slice(), msg.as_bytes());
|
|
Ok(())
|
|
}
|
|
other => Err(other),
|
|
},
|
|
);
|
|
|
|
let reply = format!("reply {i}");
|
|
raya.send_message(&raya_convo_id, reply.as_bytes()).unwrap();
|
|
expect_event(
|
|
&saro_events,
|
|
&format!("MessageReceived(reply {i})"),
|
|
|e| match e {
|
|
Event::MessageReceived { content, .. } => {
|
|
assert_eq!(content.as_slice(), reply.as_bytes());
|
|
Ok(())
|
|
}
|
|
other => Err(other),
|
|
},
|
|
);
|
|
}
|
|
|
|
assert_eq!(saro.list_conversations().unwrap().len(), 1);
|
|
assert_eq!(raya.list_conversations().unwrap().len(), 1);
|
|
}
|
|
|
|
#[derive(Debug)]
|
|
struct FailingDelivery {
|
|
inbound_tx: Sender<Vec<u8>>,
|
|
inbound_rx: Option<Receiver<Vec<u8>>>,
|
|
}
|
|
|
|
impl FailingDelivery {
|
|
fn new() -> Self {
|
|
let (inbound_tx, inbound_rx) = crossbeam_channel::unbounded();
|
|
Self {
|
|
inbound_tx,
|
|
inbound_rx: Some(inbound_rx),
|
|
}
|
|
}
|
|
|
|
/// A sender into this transport's inbound stream — for tests to feed the
|
|
/// worker, or to hold open so it doesn't see a disconnect.
|
|
fn inbound_sender(&self) -> Sender<Vec<u8>> {
|
|
self.inbound_tx.clone()
|
|
}
|
|
}
|
|
|
|
impl DeliveryService for FailingDelivery {
|
|
type Error = &'static str;
|
|
|
|
fn publish(&mut self, _: AddressedEnvelope) -> Result<(), Self::Error> {
|
|
Err("simulated transport failure")
|
|
}
|
|
|
|
fn subscribe(&mut self, _: &str) -> Result<(), Self::Error> {
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl Transport for FailingDelivery {
|
|
fn inbound(&mut self) -> Receiver<Vec<u8>> {
|
|
self.inbound_rx
|
|
.take()
|
|
.expect("FailingDelivery::inbound called more than once")
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn dropping_client_shuts_down_worker() {
|
|
let (client, events) =
|
|
create_test_client(MessageBus::default(), EphemeralRegistry::new()).expect("client create");
|
|
|
|
drop(client);
|
|
// Drop joins the worker; once joined its Sender<Event> is gone, so recv
|
|
// reports the channel as disconnected.
|
|
let res = events.recv_timeout(Duration::from_secs(5));
|
|
assert!(matches!(
|
|
res,
|
|
Err(crossbeam_channel::RecvTimeoutError::Disconnected)
|
|
));
|
|
}
|
|
|
|
#[test]
|
|
fn malformed_inbound_surfaces_as_error_event() {
|
|
// Feed the worker's inbound channel bytes that can't be decoded and assert
|
|
// it emits an InboundError instead of silently dropping the failure.
|
|
let delivery = FailingDelivery::new();
|
|
let inbound_tx = delivery.inbound_sender();
|
|
|
|
let (_client, events) = ChatClientBuilder::new()
|
|
.transport(delivery)
|
|
.build()
|
|
.expect("client create");
|
|
|
|
inbound_tx.send(b"not a valid payload".to_vec()).unwrap();
|
|
|
|
expect_event(&events, "InboundError", |e| match e {
|
|
Event::InboundError { message } => {
|
|
assert!(!message.is_empty(), "error event should carry a message");
|
|
Ok(())
|
|
}
|
|
other => Err(other),
|
|
});
|
|
}
|