mirror of
https://github.com/logos-messaging/libchat.git
synced 2026-06-27 19:49:31 +00:00
refactor: process response with enum
This commit is contained in:
parent
addcde03eb
commit
572c5c9cd2
@ -6,11 +6,11 @@ use crate::account::LogosAccount;
|
||||
use crate::causal_history::MissingMessage;
|
||||
use crate::conversation::{Convo, GroupConvo};
|
||||
|
||||
use crate::response::{ConvoResponse, ProcessResponse};
|
||||
use crate::{DeliveryService, RegistrationService};
|
||||
use crate::{
|
||||
conversation::{Id, PrivateV1Convo},
|
||||
errors::ChatError,
|
||||
inbound::InboundResult,
|
||||
inbox::Inbox,
|
||||
inbox_v2::InboxV2,
|
||||
proto::{EncryptedPayload, EnvelopeV1, Message},
|
||||
@ -226,7 +226,7 @@ where
|
||||
}
|
||||
|
||||
// Decode bytes and send to protocol for processing.
|
||||
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<InboundResult, ChatError> {
|
||||
pub fn handle_payload(&mut self, payload: &[u8]) -> Result<ProcessResponse, ChatError> {
|
||||
let env = EnvelopeV1::decode(payload)?;
|
||||
|
||||
// TODO: Impl Conversation hinting
|
||||
@ -238,24 +238,30 @@ where
|
||||
c if self.store.borrow().has_conversation(&c)? => {
|
||||
self.dispatch_to_convo(&c, &env.payload)
|
||||
}
|
||||
_ => Ok(InboundResult::default()),
|
||||
_ => Ok(ProcessResponse::Unknown),
|
||||
}
|
||||
}
|
||||
|
||||
// Dispatch encrypted payload to Inbox. The Inbox persists the newly
|
||||
// created conversation and consumes the ephemeral key internally.
|
||||
fn dispatch_to_inbox(&mut self, enc_payload_bytes: &[u8]) -> Result<InboundResult, ChatError> {
|
||||
fn dispatch_to_inbox(
|
||||
&mut self,
|
||||
enc_payload_bytes: &[u8],
|
||||
) -> Result<ProcessResponse, ChatError> {
|
||||
// EncryptedPayloads are not used by GroupConvos at this time, else this can be performed in `handle_payload`
|
||||
// TODO: (P1) reconcile envelope parsing between Covno and GroupConvo
|
||||
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
|
||||
let public_key_hex = Inbox::<CS>::extract_ephemeral_key_hex(&enc_payload)?;
|
||||
self.inbox
|
||||
.handle_frame(enc_payload, &public_key_hex, Rc::clone(&self.store))
|
||||
.map(ProcessResponse::InboxResponse)
|
||||
}
|
||||
|
||||
// Dispatch encrypted payload to the post-quantum inbox.
|
||||
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<InboundResult, ChatError> {
|
||||
self.pq_inbox.handle_frame(payload)
|
||||
fn dispatch_to_inbox2(&mut self, payload: &[u8]) -> Result<ProcessResponse, ChatError> {
|
||||
self.pq_inbox
|
||||
.handle_frame(payload)
|
||||
.map(ProcessResponse::InboxResponse)
|
||||
}
|
||||
|
||||
// Dispatch encrypted payload to its corresponding conversation
|
||||
@ -263,14 +269,11 @@ where
|
||||
&mut self,
|
||||
convo_id: ConversationId,
|
||||
enc_payload_bytes: &[u8],
|
||||
) -> Result<InboundResult, ChatError> {
|
||||
) -> Result<ProcessResponse, ChatError> {
|
||||
let enc_payload = EncryptedPayload::decode(enc_payload_bytes)?;
|
||||
let mut convo = self.load_convo(convo_id)?;
|
||||
let frame = convo.handle_frame(enc_payload)?;
|
||||
Ok(InboundResult {
|
||||
new_conversation: None,
|
||||
frame,
|
||||
})
|
||||
Ok(ProcessResponse::ConvoResponse(ConvoResponse { frame }))
|
||||
}
|
||||
|
||||
pub fn create_intro_bundle(&mut self) -> Result<Vec<u8>, ChatError> {
|
||||
|
||||
@ -3,7 +3,7 @@ mod privatev1;
|
||||
|
||||
use crate::{
|
||||
DeliveryService,
|
||||
inbound::FrameOutcome,
|
||||
response::FrameOutcome,
|
||||
service_traits::KeyPackageProvider,
|
||||
types::{AccountId, AddressedEncryptedPayload},
|
||||
};
|
||||
|
||||
@ -22,7 +22,7 @@ use crate::types::AccountId;
|
||||
use crate::{
|
||||
DeliveryService,
|
||||
conversation::{ChatError, ConversationId, Convo, GroupConvo, Id},
|
||||
inbound::{FrameOutcome, Message},
|
||||
response::{FrameOutcome, Message},
|
||||
service_traits::KeyPackageProvider,
|
||||
types::AddressedEncryptedPayload,
|
||||
};
|
||||
@ -339,27 +339,30 @@ where
|
||||
.process_message(provider, protocol_message)
|
||||
.map_err(ChatError::generic)?;
|
||||
|
||||
let messages = match processed.into_content() {
|
||||
match processed.into_content() {
|
||||
ProcessedMessageContent::ApplicationMessage(msg) => {
|
||||
let reliable = ReliablePayload::decode(msg.into_bytes().as_slice())?;
|
||||
self.causal.on_receive(&self.convo_id, &reliable);
|
||||
vec![Message {
|
||||
let missing_messages = self.causal.on_receive(&self.convo_id, &reliable);
|
||||
let message = Some(Message {
|
||||
convo_id: Arc::from(self.id()),
|
||||
content: reliable.content.to_vec(),
|
||||
}]
|
||||
});
|
||||
Ok(FrameOutcome {
|
||||
message,
|
||||
missing_messages,
|
||||
})
|
||||
}
|
||||
ProcessedMessageContent::StagedCommitMessage(commit) => {
|
||||
self.mls_group
|
||||
.merge_staged_commit(provider, *commit)
|
||||
.map_err(ChatError::generic)?;
|
||||
vec![]
|
||||
Ok(FrameOutcome::default())
|
||||
}
|
||||
_ => {
|
||||
// TODO: (P2) Log unknown message type
|
||||
vec![]
|
||||
Ok(FrameOutcome::default())
|
||||
}
|
||||
};
|
||||
Ok(FrameOutcome { messages })
|
||||
}
|
||||
}
|
||||
|
||||
fn remote_id(&self) -> String {
|
||||
|
||||
@ -16,8 +16,8 @@ use crate::{
|
||||
context::ConversationIdOwned,
|
||||
conversation::{ChatError, ConversationId, Convo, Id},
|
||||
errors::EncryptionError,
|
||||
inbound::{FrameOutcome, Message},
|
||||
proto,
|
||||
response::{FrameOutcome, Message},
|
||||
types::AddressedEncryptedPayload,
|
||||
utils::timestamp_millis,
|
||||
};
|
||||
@ -201,11 +201,11 @@ impl<S: ConversationStore + RatchetStore> PrivateV1Convo<S> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_content(&self, bytes: Bytes) -> Vec<Message> {
|
||||
vec![Message {
|
||||
fn handle_content(&self, bytes: Bytes) -> Message {
|
||||
Message {
|
||||
convo_id: Arc::from(self.id()),
|
||||
content: bytes.into(),
|
||||
}]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -252,11 +252,14 @@ impl<S: ConversationStore + RatchetStore> Convo for PrivateV1Convo<S> {
|
||||
|
||||
self.save_ratchet_state(&mut *self.store.borrow_mut())?;
|
||||
|
||||
let messages = match frame_type {
|
||||
FrameType::Content(bytes) => self.handle_content(bytes),
|
||||
FrameType::Placeholder(_) => vec![],
|
||||
let message = match frame_type {
|
||||
FrameType::Content(bytes) => Some(self.handle_content(bytes)),
|
||||
FrameType::Placeholder(_) => None,
|
||||
};
|
||||
Ok(FrameOutcome { messages })
|
||||
Ok(FrameOutcome {
|
||||
message,
|
||||
missing_messages: Vec::new(),
|
||||
})
|
||||
}
|
||||
|
||||
fn remote_id(&self) -> String {
|
||||
|
||||
@ -1,57 +0,0 @@
|
||||
//! Outcome of processing a single inbound payload.
|
||||
//!
|
||||
//! [`InboundResult`] composes two layers:
|
||||
//! - [`FrameOutcome`] captures what processing one frame within a conversation
|
||||
//! produces: today, decrypted messages. As protocol features land, new
|
||||
//! per-conversation observations (e.g. group membership changes) become
|
||||
//! additive fields on `FrameOutcome`.
|
||||
//! - [`InboundResult`] wraps a `FrameOutcome` and adds the payload-level
|
||||
//! observations a single frame cannot produce — today, the appearance of
|
||||
//! a new conversation from the peer side.
|
||||
|
||||
use storage::ConversationKind;
|
||||
|
||||
use crate::context::ConversationIdOwned;
|
||||
|
||||
/// Observations a conversation produces from processing one frame.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct FrameOutcome {
|
||||
/// User content decrypted from this frame, in protocol order.
|
||||
pub messages: Vec<Message>,
|
||||
}
|
||||
|
||||
impl FrameOutcome {
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.messages.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// Everything one inbound payload produced.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct InboundResult {
|
||||
/// A new conversation appeared from this payload, if any.
|
||||
pub new_conversation: Option<NewConversation>,
|
||||
/// Observations from the frame inside this payload.
|
||||
pub frame: FrameOutcome,
|
||||
}
|
||||
|
||||
impl InboundResult {
|
||||
/// True when the payload produced no observable outcome.
|
||||
pub fn is_empty(&self) -> bool {
|
||||
self.new_conversation.is_none() && self.frame.is_empty()
|
||||
}
|
||||
}
|
||||
|
||||
/// A conversation newly observed from the peer side.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NewConversation {
|
||||
pub convo_id: ConversationIdOwned,
|
||||
pub kind: ConversationKind,
|
||||
}
|
||||
|
||||
/// User content decrypted from an inbound payload.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Message {
|
||||
pub convo_id: ConversationIdOwned,
|
||||
pub content: Vec<u8>,
|
||||
}
|
||||
@ -13,9 +13,9 @@ use crypto::{PrekeyBundle, SymmetricKey32};
|
||||
use crate::context::Introduction;
|
||||
use crate::conversation::{ChatError, ConversationId, Convo, Id, PrivateV1Convo};
|
||||
use crate::crypto::{CopyBytes, PrivateKey, PublicKey};
|
||||
use crate::inbound::{InboundResult, NewConversation};
|
||||
use crate::inbox::handshake::InboxHandshake;
|
||||
use crate::proto;
|
||||
use crate::response::{InboxResponse, NewConversation};
|
||||
use crate::types::AddressedEncryptedPayload;
|
||||
use crypto::Identity;
|
||||
|
||||
@ -124,14 +124,14 @@ impl<S: EphemeralKeyStore> Inbox<S> {
|
||||
/// Handles an incoming inbox frame. The caller must provide the ephemeral
|
||||
/// private key hex looked up from storage. Persists the created
|
||||
/// conversation and consumes the ephemeral key. Returns the
|
||||
/// [`InboundResult`] describing what was observed — for a successful
|
||||
/// [`InboxResponse`] describing what was observed — for a successful
|
||||
/// invite, a `new_conversation` and one initial `message`.
|
||||
pub fn handle_frame<PS: ConversationStore + RatchetStore>(
|
||||
&self,
|
||||
enc_payload: EncryptedPayload,
|
||||
public_key_hex: &str,
|
||||
private_store: Rc<RefCell<PS>>,
|
||||
) -> Result<InboundResult, ChatError> {
|
||||
) -> Result<InboxResponse, ChatError> {
|
||||
let ephemeral_key = self
|
||||
.store
|
||||
.borrow()
|
||||
@ -158,7 +158,7 @@ impl<S: EphemeralKeyStore> Inbox<S> {
|
||||
};
|
||||
|
||||
let frame = convo.handle_frame(enc_payload)?;
|
||||
if frame.messages.is_empty() {
|
||||
if frame.message.is_none() {
|
||||
return Err(ChatError::Protocol(
|
||||
"expected initial message in invite".into(),
|
||||
));
|
||||
@ -170,9 +170,9 @@ impl<S: EphemeralKeyStore> Inbox<S> {
|
||||
};
|
||||
convo.persist()?;
|
||||
|
||||
InboundResult {
|
||||
new_conversation: Some(new_conversation),
|
||||
frame,
|
||||
InboxResponse {
|
||||
new_conversation,
|
||||
frame: Some(frame),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
@ -21,7 +21,8 @@ use crate::causal_history::MissingMessage;
|
||||
use crate::conversation::GroupConvo;
|
||||
use crate::conversation::group_v1::MlsContext;
|
||||
use crate::conversation::{GroupV1Convo, Id, IdentityProvider};
|
||||
use crate::inbound::{FrameOutcome, InboundResult, NewConversation};
|
||||
use crate::response::InboxResponse;
|
||||
use crate::response::NewConversation;
|
||||
use crate::types::AccountId;
|
||||
use crate::utils::{blake2b_hex, hash_size};
|
||||
pub struct PqMlsContext {
|
||||
@ -155,7 +156,7 @@ where
|
||||
)
|
||||
}
|
||||
|
||||
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<InboundResult, ChatError> {
|
||||
pub fn handle_frame(&self, payload_bytes: &[u8]) -> Result<InboxResponse, ChatError> {
|
||||
let inbox_frame = InboxV2Frame::decode(payload_bytes)?;
|
||||
|
||||
let Some(payload) = inbox_frame.payload else {
|
||||
@ -182,7 +183,10 @@ where
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn handle_heavy_invite(&self, invite: GroupV1HeavyInvite) -> Result<InboundResult, ChatError> {
|
||||
fn handle_heavy_invite(
|
||||
&self,
|
||||
invite: GroupV1HeavyInvite,
|
||||
) -> Result<InboxResponse, ChatError> {
|
||||
let (msg_in, _rest) = MlsMessageIn::tls_deserialize_bytes(invite.welcome_bytes.as_slice())?;
|
||||
|
||||
let MlsMessageBodyIn::Welcome(welcome) = msg_in.extract() else {
|
||||
@ -202,12 +206,12 @@ where
|
||||
)?;
|
||||
let convo_id = Arc::from(convo.id());
|
||||
self.persist_convo(convo)?;
|
||||
Ok(InboundResult {
|
||||
new_conversation: Some(NewConversation {
|
||||
Ok(InboxResponse {
|
||||
new_conversation: NewConversation {
|
||||
convo_id,
|
||||
kind: ConversationKind::GroupV1,
|
||||
}),
|
||||
frame: FrameOutcome::default(),
|
||||
},
|
||||
frame: None,
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -4,10 +4,10 @@ mod context;
|
||||
mod conversation;
|
||||
mod crypto;
|
||||
mod errors;
|
||||
mod inbound;
|
||||
mod inbox;
|
||||
mod inbox_v2;
|
||||
mod proto;
|
||||
mod response;
|
||||
mod service_traits;
|
||||
mod types;
|
||||
mod utils;
|
||||
@ -19,7 +19,7 @@ pub use chat_sqlite::StorageConfig;
|
||||
pub use context::{Context, ConversationId, ConversationIdOwned, Introduction};
|
||||
pub use conversation::GroupConvo;
|
||||
pub use errors::ChatError;
|
||||
pub use inbound::{FrameOutcome, InboundResult, Message, NewConversation};
|
||||
pub use response::{FrameOutcome, InboxResponse, Message, NewConversation, ProcessResponse};
|
||||
pub use service_traits::{DeliveryService, IdentityProvider, RegistrationService};
|
||||
pub use storage::ConversationKind;
|
||||
pub use types::{AccountId, AddressedEnvelope};
|
||||
|
||||
62
core/conversations/src/response.rs
Normal file
62
core/conversations/src/response.rs
Normal file
@ -0,0 +1,62 @@
|
||||
//! Outcome of processing a single inbound payload.
|
||||
//!
|
||||
//! [`ProcessResponse`] is the tagged sum of what a payload produced, one
|
||||
//! variant per dispatch destination (inbox, existing conversation, unknown).
|
||||
//! [`FrameOutcome`] captures what processing one frame within a conversation
|
||||
//! produces: today, a decrypted message. As protocol features land, new
|
||||
//! per-conversation observations become additive fields on `FrameOutcome`.
|
||||
//!
|
||||
//! [`InboxResponse::frame`] is `None` when the inbox produced a new conversation
|
||||
//! without an initial message (V2 invite); `Some` when an initial message was
|
||||
//! delivered alongside the invite (V1).
|
||||
|
||||
use storage::ConversationKind;
|
||||
|
||||
use crate::causal_history::MissingMessage;
|
||||
use crate::context::ConversationIdOwned;
|
||||
|
||||
/// Observations a conversation produces from processing one frame.
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct FrameOutcome {
|
||||
/// User content decrypted from this frame, in protocol order.
|
||||
pub message: Option<Message>,
|
||||
/// Causal-history gaps detected from this frame's piggybacked history.
|
||||
/// Empty for protocols without causal history (e.g. PrivateV1) and for
|
||||
/// frames that close no gaps.
|
||||
pub missing_messages: Vec<MissingMessage>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum ProcessResponse {
|
||||
InboxResponse(InboxResponse),
|
||||
ConvoResponse(ConvoResponse),
|
||||
Unknown,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct InboxResponse {
|
||||
/// A new conversation appeared from this payload, if any.
|
||||
pub new_conversation: NewConversation,
|
||||
/// Observations from the frame inside this payload.
|
||||
pub frame: Option<FrameOutcome>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
pub struct ConvoResponse {
|
||||
/// Observations from the frame inside this payload.
|
||||
pub frame: FrameOutcome,
|
||||
}
|
||||
|
||||
/// A conversation newly observed from the peer side.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NewConversation {
|
||||
pub convo_id: ConversationIdOwned,
|
||||
pub kind: ConversationKind,
|
||||
}
|
||||
|
||||
/// User content decrypted from an inbound payload.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct Message {
|
||||
pub convo_id: ConversationIdOwned,
|
||||
pub content: Vec<u8>,
|
||||
}
|
||||
@ -2,10 +2,10 @@ use std::ops::{Deref, DerefMut};
|
||||
|
||||
use components::{EphemeralRegistry, LocalBroadcaster, MemStore};
|
||||
use libchat::{
|
||||
Context, ConversationKind, GroupConvo, InboundResult, Message, NewConversation, hex_trunc,
|
||||
Context, ConversationKind, GroupConvo, Message, NewConversation, ProcessResponse, hex_trunc,
|
||||
};
|
||||
|
||||
type ResultCallback = Box<dyn Fn(&InboundResult)>;
|
||||
type ResultCallback = Box<dyn Fn(&ProcessResponse)>;
|
||||
|
||||
// Simple client Functionality for testing
|
||||
struct Client {
|
||||
@ -18,7 +18,7 @@ struct Client {
|
||||
impl Client {
|
||||
fn init(
|
||||
ctx: Context<LocalBroadcaster, EphemeralRegistry, MemStore>,
|
||||
cb: Option<impl Fn(&InboundResult) + 'static>,
|
||||
cb: Option<impl Fn(&ProcessResponse) + 'static>,
|
||||
) -> Self {
|
||||
Client {
|
||||
inner: ctx,
|
||||
@ -39,10 +39,18 @@ impl Client {
|
||||
if let Some(cb) = &self.on_result {
|
||||
cb(&result);
|
||||
}
|
||||
if let Some(nc) = result.new_conversation {
|
||||
self.new_conversations.push(nc);
|
||||
match result {
|
||||
ProcessResponse::InboxResponse(r) => {
|
||||
self.new_conversations.push(r.new_conversation);
|
||||
if let Some(frame) = r.frame {
|
||||
self.received_messages.extend(frame.message);
|
||||
}
|
||||
}
|
||||
ProcessResponse::ConvoResponse(r) => {
|
||||
self.received_messages.extend(r.frame.message);
|
||||
}
|
||||
ProcessResponse::Unknown => {}
|
||||
}
|
||||
self.received_messages.extend(result.frame.messages);
|
||||
}
|
||||
}
|
||||
|
||||
@ -69,22 +77,36 @@ impl DerefMut for Client {
|
||||
}
|
||||
}
|
||||
|
||||
fn print_new_conversation(prefix: &str, nc: &NewConversation) {
|
||||
let cid = hex_trunc(nc.convo_id.as_bytes());
|
||||
println!(
|
||||
"{prefix} ({cid:?}) [conversation started: {:?}]",
|
||||
nc.kind
|
||||
);
|
||||
}
|
||||
|
||||
fn print_message(prefix: &str, message: Option<&Message>) {
|
||||
if let Some(msg) = message {
|
||||
let cid = hex_trunc(msg.convo_id.as_bytes());
|
||||
let text = String::from_utf8_lossy(&msg.content);
|
||||
println!("{prefix} ({cid:?}) {text}");
|
||||
}
|
||||
}
|
||||
|
||||
// Higher order function to handle printing
|
||||
fn pretty_print(prefix: impl Into<String>) -> ResultCallback {
|
||||
let prefix = prefix.into();
|
||||
Box::new(move |result: &InboundResult| {
|
||||
if let Some(nc) = &result.new_conversation {
|
||||
let cid = hex_trunc(nc.convo_id.as_bytes());
|
||||
println!(
|
||||
"{prefix} ({cid:?}) [conversation started: {:?}]",
|
||||
nc.kind
|
||||
);
|
||||
Box::new(move |result: &ProcessResponse| match result {
|
||||
ProcessResponse::InboxResponse(r) => {
|
||||
print_new_conversation(&prefix, &r.new_conversation);
|
||||
if let Some(frame) = &r.frame {
|
||||
print_message(&prefix, frame.message.as_ref());
|
||||
}
|
||||
}
|
||||
for msg in &result.frame.messages {
|
||||
let cid = hex_trunc(msg.convo_id.as_bytes());
|
||||
let text = String::from_utf8_lossy(&msg.content);
|
||||
println!("{prefix} ({cid:?}) {text}");
|
||||
ProcessResponse::ConvoResponse(r) => {
|
||||
print_message(&prefix, r.frame.message.as_ref());
|
||||
}
|
||||
ProcessResponse::Unknown => {}
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
@ -1,10 +1,24 @@
|
||||
use chat_sqlite::{ChatStorage, StorageConfig};
|
||||
use libchat::{Context, ConversationKind, Introduction};
|
||||
use libchat::{Context, ConversationKind, Introduction, ProcessResponse};
|
||||
use storage::{ConversationStore, IdentityStore};
|
||||
use tempfile::tempdir;
|
||||
|
||||
use components::{EphemeralRegistry, LocalBroadcaster};
|
||||
|
||||
fn expect_convo_response(result: ProcessResponse) -> libchat::FrameOutcome {
|
||||
match result {
|
||||
ProcessResponse::ConvoResponse(r) => r.frame,
|
||||
other => panic!("expected ConvoResponse, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn expect_inbox_response(result: ProcessResponse) -> libchat::InboxResponse {
|
||||
match result {
|
||||
ProcessResponse::InboxResponse(r) => r,
|
||||
other => panic!("expected InboxResponse, got {other:?}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn send_and_verify(
|
||||
sender: &mut Context<LocalBroadcaster, EphemeralRegistry, ChatStorage>,
|
||||
receiver: &mut Context<LocalBroadcaster, EphemeralRegistry, ChatStorage>,
|
||||
@ -14,13 +28,12 @@ fn send_and_verify(
|
||||
let payloads = sender.send_content(convo_id, content).unwrap();
|
||||
let payload = payloads.first().unwrap();
|
||||
let result = receiver.handle_payload(&payload.data).unwrap();
|
||||
assert!(result.new_conversation.is_none());
|
||||
assert_eq!(
|
||||
result.frame.messages.len(),
|
||||
1,
|
||||
"steady-state send should yield one message"
|
||||
);
|
||||
assert_eq!(content, result.frame.messages[0].content.as_slice());
|
||||
let frame = expect_convo_response(result);
|
||||
let msg = frame
|
||||
.message
|
||||
.as_ref()
|
||||
.expect("steady-state send should yield one message");
|
||||
assert_eq!(content, msg.content.as_slice());
|
||||
}
|
||||
|
||||
#[test]
|
||||
@ -42,19 +55,16 @@ fn ctx_integration() {
|
||||
|
||||
// Raya receives the invite + initial message
|
||||
let payload = payloads.first().unwrap();
|
||||
let initial = raya.handle_payload(&payload.data).unwrap();
|
||||
let new_convo = initial
|
||||
.new_conversation
|
||||
.as_ref()
|
||||
.expect("invite must create a conversation");
|
||||
let initial = expect_inbox_response(raya.handle_payload(&payload.data).unwrap());
|
||||
let new_convo = &initial.new_conversation;
|
||||
assert!(matches!(new_convo.kind, ConversationKind::PrivateV1));
|
||||
assert_eq!(
|
||||
initial.frame.messages.len(),
|
||||
1,
|
||||
"invite must include initial message"
|
||||
);
|
||||
assert_eq!(content, initial.frame.messages[0].content);
|
||||
assert_eq!(new_convo.convo_id, initial.frame.messages[0].convo_id);
|
||||
let initial_msg = initial
|
||||
.frame
|
||||
.as_ref()
|
||||
.and_then(|f| f.message.as_ref())
|
||||
.expect("invite must include initial message");
|
||||
assert_eq!(content, initial_msg.content);
|
||||
assert_eq!(new_convo.convo_id, initial_msg.convo_id);
|
||||
let raya_convo_id = new_convo.convo_id.clone();
|
||||
|
||||
// Exchange messages back and forth
|
||||
@ -115,11 +125,8 @@ fn conversation_metadata_persistence() {
|
||||
let (_, payloads) = bob.create_private_convo(&intro, b"hi").unwrap();
|
||||
|
||||
let payload = payloads.first().unwrap();
|
||||
let result = alice.handle_payload(&payload.data).unwrap();
|
||||
let new_convo = result
|
||||
.new_conversation
|
||||
.as_ref()
|
||||
.expect("invite must create a conversation");
|
||||
let result = expect_inbox_response(alice.handle_payload(&payload.data).unwrap());
|
||||
let new_convo = &result.new_conversation;
|
||||
assert!(matches!(new_convo.kind, ConversationKind::PrivateV1));
|
||||
|
||||
let convos = alice.store().load_conversations().unwrap();
|
||||
@ -140,23 +147,18 @@ fn conversation_full_flow() {
|
||||
let (bob_convo_id, payloads) = bob.create_private_convo(&intro, b"hello").unwrap();
|
||||
|
||||
let payload = payloads.first().unwrap();
|
||||
let result = alice.handle_payload(&payload.data).unwrap();
|
||||
let alice_convo_id = result
|
||||
.new_conversation
|
||||
.as_ref()
|
||||
.expect("invite must create a conversation")
|
||||
.convo_id
|
||||
.clone();
|
||||
let result = expect_inbox_response(alice.handle_payload(&payload.data).unwrap());
|
||||
let alice_convo_id = result.new_conversation.convo_id.clone();
|
||||
|
||||
let payloads = alice.send_content(&alice_convo_id, b"reply 1").unwrap();
|
||||
let payload = payloads.first().unwrap();
|
||||
let result = bob.handle_payload(&payload.data).unwrap();
|
||||
assert_eq!(result.frame.messages[0].content, b"reply 1");
|
||||
let frame = expect_convo_response(bob.handle_payload(&payload.data).unwrap());
|
||||
assert_eq!(frame.message.unwrap().content, b"reply 1");
|
||||
|
||||
let payloads = bob.send_content(&bob_convo_id, b"reply 2").unwrap();
|
||||
let payload = payloads.first().unwrap();
|
||||
let result = alice.handle_payload(&payload.data).unwrap();
|
||||
assert_eq!(result.frame.messages[0].content, b"reply 2");
|
||||
let frame = expect_convo_response(alice.handle_payload(&payload.data).unwrap());
|
||||
assert_eq!(frame.message.unwrap().content, b"reply 2");
|
||||
|
||||
// Verify conversation list
|
||||
let convo_ids = alice.list_conversations().unwrap();
|
||||
@ -165,12 +167,12 @@ fn conversation_full_flow() {
|
||||
// Continue exchanging messages
|
||||
let payloads = bob.send_content(&bob_convo_id, b"more messages").unwrap();
|
||||
let payload = payloads.first().unwrap();
|
||||
let result = alice.handle_payload(&payload.data).expect("should decrypt");
|
||||
assert_eq!(result.frame.messages[0].content, b"more messages");
|
||||
let frame = expect_convo_response(alice.handle_payload(&payload.data).expect("should decrypt"));
|
||||
assert_eq!(frame.message.unwrap().content, b"more messages");
|
||||
|
||||
// Alice can also send back
|
||||
let payloads = alice.send_content(&alice_convo_id, b"alice reply").unwrap();
|
||||
let payload = payloads.first().unwrap();
|
||||
let result = bob.handle_payload(&payload.data).unwrap();
|
||||
assert_eq!(result.frame.messages[0].content, b"alice reply");
|
||||
let frame = expect_convo_response(bob.handle_payload(&payload.data).unwrap());
|
||||
assert_eq!(frame.message.unwrap().content, b"alice reply");
|
||||
}
|
||||
|
||||
@ -1,6 +1,6 @@
|
||||
use libchat::{
|
||||
AddressedEnvelope, ChatError, ChatStorage, Context, ConversationIdOwned, ConversationKind,
|
||||
DeliveryService, InboundResult, Introduction, StorageConfig,
|
||||
DeliveryService, Introduction, ProcessResponse, StorageConfig,
|
||||
};
|
||||
|
||||
use components::EphemeralRegistry;
|
||||
@ -94,28 +94,48 @@ impl<D: DeliveryService + 'static> ChatClient<D> {
|
||||
}
|
||||
}
|
||||
|
||||
/// Walk an [`InboundResult`] in causal order and emit one `Event` per
|
||||
/// observation. The structural ordering of `InboundResult` (new conversation
|
||||
/// before frame contents) determines the order of events here.
|
||||
fn events_from_inbound(result: InboundResult) -> Vec<Event> {
|
||||
let mut events = Vec::with_capacity(
|
||||
usize::from(result.new_conversation.is_some()) + result.frame.messages.len(),
|
||||
);
|
||||
if let Some(nc) = result.new_conversation
|
||||
&& let Some(class) = class_from_kind(&nc.kind)
|
||||
{
|
||||
/// Walk a [`ProcessResponse`] in causal order and emit one `Event` per
|
||||
/// observation. New-conversation events are emitted before any frame
|
||||
/// contents from the same payload.
|
||||
fn events_from_inbound(result: ProcessResponse) -> Vec<Event> {
|
||||
let mut events = Vec::new();
|
||||
match result {
|
||||
ProcessResponse::InboxResponse(r) => {
|
||||
push_new_conversation(&mut events, r.new_conversation);
|
||||
if let Some(frame) = r.frame {
|
||||
push_frame(&mut events, frame);
|
||||
}
|
||||
}
|
||||
ProcessResponse::ConvoResponse(r) => {
|
||||
push_frame(&mut events, r.frame);
|
||||
}
|
||||
ProcessResponse::Unknown => {}
|
||||
}
|
||||
events
|
||||
}
|
||||
|
||||
fn push_new_conversation(events: &mut Vec<Event>, nc: libchat::NewConversation) {
|
||||
if let Some(class) = class_from_kind(&nc.kind) {
|
||||
events.push(Event::ConversationStarted {
|
||||
convo_id: nc.convo_id,
|
||||
class,
|
||||
});
|
||||
}
|
||||
for msg in result.frame.messages {
|
||||
}
|
||||
|
||||
fn push_frame(events: &mut Vec<Event>, frame: libchat::FrameOutcome) {
|
||||
if let Some(msg) = frame.message {
|
||||
events.push(Event::MessageReceived {
|
||||
convo_id: msg.convo_id,
|
||||
content: msg.content,
|
||||
});
|
||||
}
|
||||
events
|
||||
for missing in frame.missing_messages {
|
||||
events.push(Event::MessageMissing {
|
||||
convo_id: std::sync::Arc::from(missing.conversation_id.as_str()),
|
||||
message_id: missing.message_id,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/// Map a core [`ConversationKind`] to the coarse app-facing
|
||||
|
||||
@ -22,6 +22,12 @@ pub enum Event {
|
||||
convo_id: ConversationIdOwned,
|
||||
content: Vec<u8>,
|
||||
},
|
||||
/// A causal-history gap surfaced: a message referenced by another
|
||||
/// delivered message but never seen locally.
|
||||
MessageMissing {
|
||||
convo_id: ConversationIdOwned,
|
||||
message_id: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Coarse classification of a conversation, intended as a UI/UX hint.
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user