diff --git a/core/conversations/src/context.rs b/core/conversations/src/context.rs index d0c28c8..2e5f65d 100644 --- a/core/conversations/src/context.rs +++ b/core/conversations/src/context.rs @@ -6,11 +6,11 @@ use crate::account::LogosAccount; use crate::causal_history::MissingMessage; use crate::conversation::{Convo, GroupConvo}; +use crate::response::{ConvoResponse, ProcessResponse}; use crate::{DeliveryService, RegistrationService}; use crate::{ conversation::{Id, PrivateV1Convo}, errors::ChatError, - inbound::InboundResult, inbox::Inbox, inbox_v2::InboxV2, proto::{EncryptedPayload, EnvelopeV1, Message}, @@ -226,7 +226,7 @@ where } // Decode bytes and send to protocol for processing. - pub fn handle_payload(&mut self, payload: &[u8]) -> Result { + pub fn handle_payload(&mut self, payload: &[u8]) -> Result { let env = EnvelopeV1::decode(payload)?; // TODO: Impl Conversation hinting @@ -238,24 +238,30 @@ where c if self.store.borrow().has_conversation(&c)? => { self.dispatch_to_convo(&c, &env.payload) } - _ => Ok(InboundResult::default()), + _ => Ok(ProcessResponse::Unknown), } } // Dispatch encrypted payload to Inbox. The Inbox persists the newly // created conversation and consumes the ephemeral key internally. - fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result { + fn dispatch_to_inbox( + &mut self, + enc_payload_bytes: &[u8], + ) -> Result { // EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload` // TODO: (P1) reconcile envelope parsing between Covno and GroupConvo let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; let public_key_hex = Inbox::::extract_ephemeral_key_hex(&enc_payload)?; self.inbox .handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store)) + .map(ProcessResponse::InboxResponse) } // Dispatch encrypted payload to the post-quantum inbox. - fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result { - self.pq_inbox.handle_frame(payload) + fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result { + self.pq_inbox + .handle_frame(payload) + .map(ProcessResponse::InboxResponse) } // Dispatch encrypted payload to its corresponding conversation @@ -263,14 +269,11 @@ where &mut self, convo_id: ConversationId, enc_payload_bytes: &[u8], - ) -> Result { + ) -> Result { let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?; let mut convo = self.load_convo(convo_id)?; let frame = convo.handle_frame(enc_payload)?; - Ok(InboundResult { - new_conversation: None, - frame, - }) + Ok(ProcessResponse::ConvoResponse(ConvoResponse { frame })) } pub fn create_intro_bundle(&mut self) -> Result, ChatError> { diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index cde8042..ed9e828 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -3,7 +3,7 @@ mod privatev1; use crate::{ DeliveryService, - inbound::FrameOutcome, + response::FrameOutcome, service_traits::KeyPackageProvider, types::{AccountId, AddressedEncryptedPayload}, }; diff --git a/core/conversations/src/conversation/group_v1.rs b/core/conversations/src/conversation/group_v1.rs index 76bf62a..2ab6ed7 100644 --- a/core/conversations/src/conversation/group_v1.rs +++ b/core/conversations/src/conversation/group_v1.rs @@ -22,7 +22,7 @@ use crate::types::AccountId; use crate::{ DeliveryService, conversation::{ChatError, ConversationId, Convo, GroupConvo, Id}, - inbound::{FrameOutcome, Message}, + response::{FrameOutcome, Message}, service_traits::KeyPackageProvider, types::AddressedEncryptedPayload, }; @@ -339,27 +339,30 @@ where .process_message(provider, protocol_message) .map_err(ChatError::generic)?; - let messages = match processed.into_content() { + match processed.into_content() { ProcessedMessageContent::ApplicationMessage(msg) => { let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?; - self.causal.on_receive(&self.convo_id, &reliable); - vec![Message { + let missing_messages = self.causal.on_receive(&self.convo_id, &reliable); + let message = Some(Message { convo_id: Arc::from(self.id()), content: reliable.content.to_vec(), - }] + }); + Ok(FrameOutcome { + message, + missing_messages, + }) } ProcessedMessageContent::StagedCommitMessage(commit) => { self.mls_group .merge_staged_commit(provider, *commit) .map_err(ChatError::generic)?; - vec![] + Ok(FrameOutcome::default()) } _ => { // TODO: (P2) Log unknown message type - vec![] + Ok(FrameOutcome::default()) } - }; - Ok(FrameOutcome { messages }) + } } fn remote_id(&self) -> String { diff --git a/core/conversations/src/conversation/privatev1.rs b/core/conversations/src/conversation/privatev1.rs index c573dda..04dbc4c 100644 --- a/core/conversations/src/conversation/privatev1.rs +++ b/core/conversations/src/conversation/privatev1.rs @@ -16,8 +16,8 @@ use crate::{ context::ConversationIdOwned, conversation::{ChatError, ConversationId, Convo, Id}, errors::EncryptionError, - inbound::{FrameOutcome, Message}, proto, + response::{FrameOutcome, Message}, types::AddressedEncryptedPayload, utils::timestamp_millis, }; @@ -201,11 +201,11 @@ impl PrivateV1Convo { Ok(()) } - fn handle_content(&self, bytes: Bytes) -> Vec { - vec![Message { + fn handle_content(&self, bytes: Bytes) -> Message { + Message { convo_id: Arc::from(self.id()), content: bytes.into(), - }] + } } } @@ -252,11 +252,14 @@ impl Convo for PrivateV1Convo { self.save_ratchet_state(&mut *self.store.borrow_mut())?; - let messages = match frame_type { - FrameType::Content(bytes) => self.handle_content(bytes), - FrameType::Placeholder(_) => vec![], + let message = match frame_type { + FrameType::Content(bytes) => Some(self.handle_content(bytes)), + FrameType::Placeholder(_) => None, }; - Ok(FrameOutcome { messages }) + Ok(FrameOutcome { + message, + missing_messages: Vec::new(), + }) } fn remote_id(&self) -> String { diff --git a/core/conversations/src/inbound.rs b/core/conversations/src/inbound.rs deleted file mode 100644 index 671f066..0000000 --- a/core/conversations/src/inbound.rs +++ /dev/null @@ -1,57 +0,0 @@ -//! Outcome of processing a single inbound payload. -//! -//! [`InboundResult`] composes two layers: -//! - [`FrameOutcome`] captures what processing one frame within a conversation -//! produces: today, decrypted messages. As protocol features land, new -//! per-conversation observations (e.g. group membership changes) become -//! additive fields on `FrameOutcome`. -//! - [`InboundResult`] wraps a `FrameOutcome` and adds the payload-level -//! observations a single frame cannot produce — today, the appearance of -//! a new conversation from the peer side. - -use storage::ConversationKind; - -use crate::context::ConversationIdOwned; - -/// Observations a conversation produces from processing one frame. -#[derive(Debug, Clone, Default)] -pub struct FrameOutcome { - /// User content decrypted from this frame, in protocol order. - pub messages: Vec, -} - -impl FrameOutcome { - pub fn is_empty(&self) -> bool { - self.messages.is_empty() - } -} - -/// Everything one inbound payload produced. -#[derive(Debug, Clone, Default)] -pub struct InboundResult { - /// A new conversation appeared from this payload, if any. - pub new_conversation: Option, - /// Observations from the frame inside this payload. - pub frame: FrameOutcome, -} - -impl InboundResult { - /// True when the payload produced no observable outcome. - pub fn is_empty(&self) -> bool { - self.new_conversation.is_none() && self.frame.is_empty() - } -} - -/// A conversation newly observed from the peer side. -#[derive(Debug, Clone)] -pub struct NewConversation { - pub convo_id: ConversationIdOwned, - pub kind: ConversationKind, -} - -/// User content decrypted from an inbound payload. -#[derive(Debug, Clone)] -pub struct Message { - pub convo_id: ConversationIdOwned, - pub content: Vec, -} diff --git a/core/conversations/src/inbox/handler.rs b/core/conversations/src/inbox/handler.rs index 2c418c1..eb53cc6 100644 --- a/core/conversations/src/inbox/handler.rs +++ b/core/conversations/src/inbox/handler.rs @@ -13,9 +13,9 @@ use crypto::{PrekeyBundle, SymmetricKey32}; use crate::context::Introduction; use crate::conversation::{ChatError, ConversationId, Convo, Id, PrivateV1Convo}; use crate::crypto::{CopyBytes, PrivateKey, PublicKey}; -use crate::inbound::{InboundResult, NewConversation}; use crate::inbox::handshake::InboxHandshake; use crate::proto; +use crate::response::{InboxResponse, NewConversation}; use crate::types::AddressedEncryptedPayload; use crypto::Identity; @@ -124,14 +124,14 @@ impl Inbox { /// Handles an incoming inbox frame. The caller must provide the ephemeral /// private key hex looked up from storage. Persists the created /// conversation and consumes the ephemeral key. Returns the - /// [`InboundResult`] describing what was observed — for a successful + /// [`InboxResponse`] describing what was observed — for a successful /// invite, a `new_conversation` and one initial `message`. pub fn handle_frame( &self, enc_payload: EncryptedPayload, public_key_hex: &str, private_store: Rc>, - ) -> Result { + ) -> Result { let ephemeral_key = self .store .borrow() @@ -158,7 +158,7 @@ impl Inbox { }; let frame = convo.handle_frame(enc_payload)?; - if frame.messages.is_empty() { + if frame.message.is_none() { return Err(ChatError::Protocol( "expected initial message in invite".into(), )); @@ -170,9 +170,9 @@ impl Inbox { }; convo.persist()?; - InboundResult { - new_conversation: Some(new_conversation), - frame, + InboxResponse { + new_conversation, + frame: Some(frame), } } }; diff --git a/core/conversations/src/inbox_v2.rs b/core/conversations/src/inbox_v2.rs index 7783975..23e0ef7 100644 --- a/core/conversations/src/inbox_v2.rs +++ b/core/conversations/src/inbox_v2.rs @@ -21,7 +21,8 @@ use crate::causal_history::MissingMessage; use crate::conversation::GroupConvo; use crate::conversation::group_v1::MlsContext; use crate::conversation::{GroupV1Convo, Id, IdentityProvider}; -use crate::inbound::{FrameOutcome, InboundResult, NewConversation}; +use crate::response::InboxResponse; +use crate::response::NewConversation; use crate::types::AccountId; use crate::utils::{blake2b_hex, hash_size}; pub struct PqMlsContext { @@ -155,7 +156,7 @@ where ) } - pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result { + pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result { let inbox_frame = InboxV2Frame::decode(payload_bytes)?; let Some(payload) = inbox_frame.payload else { @@ -182,7 +183,10 @@ where Ok(()) } - fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result { + fn handle_heavy_invite( + &self, + invite: GroupV1HeavyInvite, + ) -> Result { let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?; let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else { @@ -202,12 +206,12 @@ where )?; let convo_id = Arc::from(convo.id()); self.persist_convo(convo)?; - Ok(InboundResult { - new_conversation: Some(NewConversation { + Ok(InboxResponse { + new_conversation: NewConversation { convo_id, kind: ConversationKind::GroupV1, - }), - frame: FrameOutcome::default(), + }, + frame: None, }) } diff --git a/core/conversations/src/lib.rs b/core/conversations/src/lib.rs index 04f0670..0559bf1 100644 --- a/core/conversations/src/lib.rs +++ b/core/conversations/src/lib.rs @@ -4,10 +4,10 @@ mod context; mod conversation; mod crypto; mod errors; -mod inbound; mod inbox; mod inbox_v2; mod proto; +mod response; mod service_traits; mod types; mod utils; @@ -19,7 +19,7 @@ pub use chat_sqlite::StorageConfig; pub use context::{Context, ConversationId, ConversationIdOwned, Introduction}; pub use conversation::GroupConvo; pub use errors::ChatError; -pub use inbound::{FrameOutcome, InboundResult, Message, NewConversation}; +pub use response::{FrameOutcome, InboxResponse, Message, NewConversation, ProcessResponse}; pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService}; pub use storage::ConversationKind; pub use types::{AccountId, AddressedEnvelope}; diff --git a/core/conversations/src/response.rs b/core/conversations/src/response.rs new file mode 100644 index 0000000..a6d7de6 --- /dev/null +++ b/core/conversations/src/response.rs @@ -0,0 +1,62 @@ +//! Outcome of processing a single inbound payload. +//! +//! [`ProcessResponse`] is the tagged sum of what a payload produced, one +//! variant per dispatch destination (inbox, existing conversation, unknown). +//! [`FrameOutcome`] captures what processing one frame within a conversation +//! produces: today, a decrypted message. As protocol features land, new +//! per-conversation observations become additive fields on `FrameOutcome`. +//! +//! [`InboxResponse::frame`] is `None` when the inbox produced a new conversation +//! without an initial message (V2 invite); `Some` when an initial message was +//! delivered alongside the invite (V1). + +use storage::ConversationKind; + +use crate::causal_history::MissingMessage; +use crate::context::ConversationIdOwned; + +/// Observations a conversation produces from processing one frame. +#[derive(Debug, Clone, Default)] +pub struct FrameOutcome { + /// User content decrypted from this frame, in protocol order. + pub message: Option, + /// Causal-history gaps detected from this frame's piggybacked history. + /// Empty for protocols without causal history (e.g. PrivateV1) and for + /// frames that close no gaps. + pub missing_messages: Vec, +} + +#[derive(Debug, Clone)] +pub enum ProcessResponse { + InboxResponse(InboxResponse), + ConvoResponse(ConvoResponse), + Unknown, +} + +#[derive(Debug, Clone)] +pub struct InboxResponse { + /// A new conversation appeared from this payload, if any. + pub new_conversation: NewConversation, + /// Observations from the frame inside this payload. + pub frame: Option, +} + +#[derive(Debug, Clone, Default)] +pub struct ConvoResponse { + /// Observations from the frame inside this payload. + pub frame: FrameOutcome, +} + +/// A conversation newly observed from the peer side. +#[derive(Debug, Clone)] +pub struct NewConversation { + pub convo_id: ConversationIdOwned, + pub kind: ConversationKind, +} + +/// User content decrypted from an inbound payload. +#[derive(Debug, Clone)] +pub struct Message { + pub convo_id: ConversationIdOwned, + pub content: Vec, +} diff --git a/core/integration_tests_core/tests/mls_integration.rs b/core/integration_tests_core/tests/mls_integration.rs index ca69370..1b4126a 100644 --- a/core/integration_tests_core/tests/mls_integration.rs +++ b/core/integration_tests_core/tests/mls_integration.rs @@ -2,10 +2,10 @@ use std::ops::{Deref, DerefMut}; use components::{EphemeralRegistry, LocalBroadcaster, MemStore}; use libchat::{ - Context, ConversationKind, GroupConvo, InboundResult, Message, NewConversation, hex_trunc, + Context, ConversationKind, GroupConvo, Message, NewConversation, ProcessResponse, hex_trunc, }; -type ResultCallback = Box; +type ResultCallback = Box; // Simple client Functionality for testing struct Client { @@ -18,7 +18,7 @@ struct Client { impl Client { fn init( ctx: Context, - cb: Option, + cb: Option, ) -> Self { Client { inner: ctx, @@ -39,10 +39,18 @@ impl Client { if let Some(cb) = &self.on_result { cb(&result); } - if let Some(nc) = result.new_conversation { - self.new_conversations.push(nc); + match result { + ProcessResponse::InboxResponse(r) => { + self.new_conversations.push(r.new_conversation); + if let Some(frame) = r.frame { + self.received_messages.extend(frame.message); + } + } + ProcessResponse::ConvoResponse(r) => { + self.received_messages.extend(r.frame.message); + } + ProcessResponse::Unknown => {} } - self.received_messages.extend(result.frame.messages); } } @@ -69,22 +77,36 @@ impl DerefMut for Client { } } +fn print_new_conversation(prefix: &str, nc: &NewConversation) { + let cid = hex_trunc(nc.convo_id.as_bytes()); + println!( + "{prefix} ({cid:?}) [conversation started: {:?}]", + nc.kind + ); +} + +fn print_message(prefix: &str, message: Option<&Message>) { + if let Some(msg) = message { + let cid = hex_trunc(msg.convo_id.as_bytes()); + let text = String::from_utf8_lossy(&msg.content); + println!("{prefix} ({cid:?}) {text}"); + } +} + // Higher order function to handle printing fn pretty_print(prefix: impl Into) -> ResultCallback { let prefix = prefix.into(); - Box::new(move |result: &InboundResult| { - if let Some(nc) = &result.new_conversation { - let cid = hex_trunc(nc.convo_id.as_bytes()); - println!( - "{prefix} ({cid:?}) [conversation started: {:?}]", - nc.kind - ); + Box::new(move |result: &ProcessResponse| match result { + ProcessResponse::InboxResponse(r) => { + print_new_conversation(&prefix, &r.new_conversation); + if let Some(frame) = &r.frame { + print_message(&prefix, frame.message.as_ref()); + } } - for msg in &result.frame.messages { - let cid = hex_trunc(msg.convo_id.as_bytes()); - let text = String::from_utf8_lossy(&msg.content); - println!("{prefix} ({cid:?}) {text}"); + ProcessResponse::ConvoResponse(r) => { + print_message(&prefix, r.frame.message.as_ref()); } + ProcessResponse::Unknown => {} }) } diff --git a/core/integration_tests_core/tests/private_integration.rs b/core/integration_tests_core/tests/private_integration.rs index b29117d..634efd1 100644 --- a/core/integration_tests_core/tests/private_integration.rs +++ b/core/integration_tests_core/tests/private_integration.rs @@ -1,10 +1,24 @@ use chat_sqlite::{ChatStorage, StorageConfig}; -use libchat::{Context, ConversationKind, Introduction}; +use libchat::{Context, ConversationKind, Introduction, ProcessResponse}; use storage::{ConversationStore, IdentityStore}; use tempfile::tempdir; use components::{EphemeralRegistry, LocalBroadcaster}; +fn expect_convo_response(result: ProcessResponse) -> libchat::FrameOutcome { + match result { + ProcessResponse::ConvoResponse(r) => r.frame, + other => panic!("expected ConvoResponse, got {other:?}"), + } +} + +fn expect_inbox_response(result: ProcessResponse) -> libchat::InboxResponse { + match result { + ProcessResponse::InboxResponse(r) => r, + other => panic!("expected InboxResponse, got {other:?}"), + } +} + fn send_and_verify( sender: &mut Context, receiver: &mut Context, @@ -14,13 +28,12 @@ fn send_and_verify( let payloads = sender.send_content(convo_id, content).unwrap(); let payload = payloads.first().unwrap(); let result = receiver.handle_payload(&payload.data).unwrap(); - assert!(result.new_conversation.is_none()); - assert_eq!( - result.frame.messages.len(), - 1, - "steady-state send should yield one message" - ); - assert_eq!(content, result.frame.messages[0].content.as_slice()); + let frame = expect_convo_response(result); + let msg = frame + .message + .as_ref() + .expect("steady-state send should yield one message"); + assert_eq!(content, msg.content.as_slice()); } #[test] @@ -42,19 +55,16 @@ fn ctx_integration() { // Raya receives the invite + initial message let payload = payloads.first().unwrap(); - let initial = raya.handle_payload(&payload.data).unwrap(); - let new_convo = initial - .new_conversation - .as_ref() - .expect("invite must create a conversation"); + let initial = expect_inbox_response(raya.handle_payload(&payload.data).unwrap()); + let new_convo = &initial.new_conversation; assert!(matches!(new_convo.kind, ConversationKind::PrivateV1)); - assert_eq!( - initial.frame.messages.len(), - 1, - "invite must include initial message" - ); - assert_eq!(content, initial.frame.messages[0].content); - assert_eq!(new_convo.convo_id, initial.frame.messages[0].convo_id); + let initial_msg = initial + .frame + .as_ref() + .and_then(|f| f.message.as_ref()) + .expect("invite must include initial message"); + assert_eq!(content, initial_msg.content); + assert_eq!(new_convo.convo_id, initial_msg.convo_id); let raya_convo_id = new_convo.convo_id.clone(); // Exchange messages back and forth @@ -115,11 +125,8 @@ fn conversation_metadata_persistence() { let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap(); let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).unwrap(); - let new_convo = result - .new_conversation - .as_ref() - .expect("invite must create a conversation"); + let result = expect_inbox_response(alice.handle_payload(&payload.data).unwrap()); + let new_convo = &result.new_conversation; assert!(matches!(new_convo.kind, ConversationKind::PrivateV1)); let convos = alice.store().load_conversations().unwrap(); @@ -140,23 +147,18 @@ fn conversation_full_flow() { let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap(); let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).unwrap(); - let alice_convo_id = result - .new_conversation - .as_ref() - .expect("invite must create a conversation") - .convo_id - .clone(); + let result = expect_inbox_response(alice.handle_payload(&payload.data).unwrap()); + let alice_convo_id = result.new_conversation.convo_id.clone(); let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap(); let payload = payloads.first().unwrap(); - let result = bob.handle_payload(&payload.data).unwrap(); - assert_eq!(result.frame.messages[0].content, b"reply 1"); + let frame = expect_convo_response(bob.handle_payload(&payload.data).unwrap()); + assert_eq!(frame.message.unwrap().content, b"reply 1"); let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap(); let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).unwrap(); - assert_eq!(result.frame.messages[0].content, b"reply 2"); + let frame = expect_convo_response(alice.handle_payload(&payload.data).unwrap()); + assert_eq!(frame.message.unwrap().content, b"reply 2"); // Verify conversation list let convo_ids = alice.list_conversations().unwrap(); @@ -165,12 +167,12 @@ fn conversation_full_flow() { // Continue exchanging messages let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap(); let payload = payloads.first().unwrap(); - let result = alice.handle_payload(&payload.data).expect("should decrypt"); - assert_eq!(result.frame.messages[0].content, b"more messages"); + let frame = expect_convo_response(alice.handle_payload(&payload.data).expect("should decrypt")); + assert_eq!(frame.message.unwrap().content, b"more messages"); // Alice can also send back let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap(); let payload = payloads.first().unwrap(); - let result = bob.handle_payload(&payload.data).unwrap(); - assert_eq!(result.frame.messages[0].content, b"alice reply"); + let frame = expect_convo_response(bob.handle_payload(&payload.data).unwrap()); + assert_eq!(frame.message.unwrap().content, b"alice reply"); } diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 5c72876..7c0f923 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -1,6 +1,6 @@ use libchat::{ AddressedEnvelope, ChatError, ChatStorage, Context, ConversationIdOwned, ConversationKind, - DeliveryService, InboundResult, Introduction, StorageConfig, + DeliveryService, Introduction, ProcessResponse, StorageConfig, }; use components::EphemeralRegistry; @@ -94,28 +94,48 @@ impl ChatClient { } } -/// Walk an [`InboundResult`] in causal order and emit one `Event` per -/// observation. The structural ordering of `InboundResult` (new conversation -/// before frame contents) determines the order of events here. -fn events_from_inbound(result: InboundResult) -> Vec { - let mut events = Vec::with_capacity( - usize::from(result.new_conversation.is_some()) + result.frame.messages.len(), - ); - if let Some(nc) = result.new_conversation - && let Some(class) = class_from_kind(&nc.kind) - { +/// Walk a [`ProcessResponse`] in causal order and emit one `Event` per +/// observation. New-conversation events are emitted before any frame +/// contents from the same payload. +fn events_from_inbound(result: ProcessResponse) -> Vec { + let mut events = Vec::new(); + match result { + ProcessResponse::InboxResponse(r) => { + push_new_conversation(&mut events, r.new_conversation); + if let Some(frame) = r.frame { + push_frame(&mut events, frame); + } + } + ProcessResponse::ConvoResponse(r) => { + push_frame(&mut events, r.frame); + } + ProcessResponse::Unknown => {} + } + events +} + +fn push_new_conversation(events: &mut Vec, nc: libchat::NewConversation) { + if let Some(class) = class_from_kind(&nc.kind) { events.push(Event::ConversationStarted { convo_id: nc.convo_id, class, }); } - for msg in result.frame.messages { +} + +fn push_frame(events: &mut Vec, frame: libchat::FrameOutcome) { + if let Some(msg) = frame.message { events.push(Event::MessageReceived { convo_id: msg.convo_id, content: msg.content, }); } - events + for missing in frame.missing_messages { + events.push(Event::MessageMissing { + convo_id: std::sync::Arc::from(missing.conversation_id.as_str()), + message_id: missing.message_id, + }); + } } /// Map a core [`ConversationKind`] to the coarse app-facing diff --git a/crates/client/src/event.rs b/crates/client/src/event.rs index 2353b3e..4139528 100644 --- a/crates/client/src/event.rs +++ b/crates/client/src/event.rs @@ -22,6 +22,12 @@ pub enum Event { convo_id: ConversationIdOwned, content: Vec, }, + /// A causal-history gap surfaced: a message referenced by another + /// delivered message but never seen locally. + MessageMissing { + convo_id: ConversationIdOwned, + message_id: String, + }, } /// Coarse classification of a conversation, intended as a UI/UX hint.