From 1ce196e5ec7451622cb87849817252813cb04603 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Fri, 6 Feb 2026 23:41:12 +0700 Subject: [PATCH] Implement `handle_payload` (#44) * Implement handle_payload * Cleanup handler for easier understanding * Fixups --- conversations/src/api.rs | 4 +- conversations/src/context.rs | 54 ++++++++++++++++---- conversations/src/conversation.rs | 22 ++++++-- conversations/src/conversation/group_test.rs | 10 +++- conversations/src/conversation/privatev1.rs | 33 +++++++++++- conversations/src/inbox/inbox.rs | 26 ++++------ conversations/src/types.rs | 1 + 7 files changed, 115 insertions(+), 35 deletions(-) diff --git a/conversations/src/api.rs b/conversations/src/api.rs index 1731cd2..dc1d215 100644 --- a/conversations/src/api.rs +++ b/conversations/src/api.rs @@ -148,7 +148,7 @@ pub fn handle_payload( mut content_out: c_slice::Mut<'_, u8>, ) -> i32 { match ctx.0.handle_payload(&payload) { - Some(content) => { + Ok(Some(content)) => { let convo_id_bytes = content.conversation_id.as_bytes(); if conversation_id_out.len() < convo_id_bytes.len() { @@ -165,7 +165,7 @@ pub fn handle_payload( content.data.len() as i32 } - None => 0, + _ => 0, } } diff --git a/conversations/src/context.rs b/conversations/src/context.rs index e82e464..e19050f 100644 --- a/conversations/src/context.rs +++ b/conversations/src/context.rs @@ -1,10 +1,11 @@ use std::{collections::HashMap, rc::Rc, sync::Arc}; use crate::{ - conversation::{ConversationStore, Convo, Id}, + conversation::{ConversationId, ConversationStore, Convo, Id}, errors::ChatError, identity::Identity, inbox::Inbox, + proto::{EncryptedPayload, EnvelopeV1, Message}, types::{AddressedEnvelope, ContentData}, }; @@ -54,7 +55,7 @@ impl Context { .map(|p| p.to_envelope(convo.id().to_string())) .collect(); - let convo_handle = self.add_convo(convo); + let convo_handle = self.add_convo(Box::new(convo)); (convo_handle, payload_bytes) } @@ -76,12 +77,45 @@ impl Context { .collect()) } - pub fn handle_payload(&mut self, _payload: &[u8]) -> Option { - // !TODO Replace Mock - Some(ContentData { - conversation_id: "convo_id".into(), - data: vec![1, 2, 3, 4, 5, 6], - }) + // Decode bytes and send to protocol for processing. + pub fn handle_payload(&mut self, payload: &[u8]) -> Result, ChatError> { + let env = match EnvelopeV1::decode(payload) { + Ok(v) => v, + Err(e) => return Err(e.into()), + }; + + // TODO: Impl Conversation hinting + let convo_id = env.conversation_hint; + let enc = EncryptedPayload::decode(payload)?; + + match convo_id { + c if c == self.inbox.id() => self.dispatch_to_inbox(enc), + c if self.store.has(&c) => self.dispatch_to_convo(&c, enc), + _ => Err(ChatError::NoConvo(0)), // TODO: Remove ConvoHandle type + } + } + + // Dispatch encrypted payload to Inbox, and register the created Conversation + fn dispatch_to_inbox( + &mut self, + enc_payload: EncryptedPayload, + ) -> Result, ChatError> { + let (convo, content) = self.inbox.handle_frame(enc_payload)?; + self.add_convo(convo); + Ok(content) + } + + // Dispatch encrypted payload to its corresponding conversation + fn dispatch_to_convo( + &mut self, + convo_id: ConversationId, + enc_payload: EncryptedPayload, + ) -> Result, ChatError> { + let Some(convo) = self.store.get_mut(&convo_id) else { + return Err(ChatError::Protocol("convo id not found".into())); + }; + + convo.handle_frame(enc_payload) } pub fn create_intro_bundle(&mut self) -> Result, ChatError> { @@ -89,7 +123,7 @@ impl Context { Ok(Introduction::from(pkb).into()) } - fn add_convo(&mut self, convo: impl Convo + Id + 'static) -> ConvoHandle { + fn add_convo(&mut self, convo: Box) -> ConvoHandle { let handle = self.next_convo_handle; self.next_convo_handle += 1; let convo_id = self.store.insert_convo(convo); @@ -123,7 +157,7 @@ mod tests { let mut store: ConversationStore = ConversationStore::new(); let new_convo = GroupTestConvo::new(); - let convo_id = store.insert_convo(new_convo); + let convo_id = store.insert_convo(Box::new(new_convo)); let convo = store.get_mut(&convo_id).ok_or_else(|| 0); convo.unwrap(); diff --git a/conversations/src/conversation.rs b/conversations/src/conversation.rs index 6e261a5..8fd474f 100644 --- a/conversations/src/conversation.rs +++ b/conversations/src/conversation.rs @@ -3,7 +3,7 @@ use std::fmt::Debug; use std::sync::Arc; pub use crate::errors::ChatError; -use crate::types::AddressedEncryptedPayload; +use crate::types::{AddressedEncryptedPayload, ContentData}; pub type ConversationId<'a> = &'a str; pub type ConversationIdOwned = Arc; @@ -16,6 +16,16 @@ pub trait Convo: Id + Debug { fn send_message(&mut self, content: &[u8]) -> Result, ChatError>; + /// Decrypts and processes an incoming encrypted frame. + /// + /// Returns `Ok(Some(ContentData))` if the frame contains user content, + /// `Ok(None)` for protocol frames (e.g., placeholders), or an error if + /// decryption or frame parsing fails. + fn handle_frame( + &mut self, + enc_payload: EncryptedPayload, + ) -> Result, ChatError>; + fn remote_id(&self) -> String; } @@ -30,13 +40,16 @@ impl ConversationStore { } } - pub fn insert_convo(&mut self, conversation: impl Convo + Id + 'static) -> ConversationIdOwned { + pub fn insert_convo(&mut self, conversation: Box) -> ConversationIdOwned { let key: ConversationIdOwned = Arc::from(conversation.id()); - self.conversations - .insert(key.clone(), Box::new(conversation)); + self.conversations.insert(key.clone(), conversation); key } + pub fn has(&self, id: ConversationId) -> bool { + self.conversations.contains_key(id) + } + pub fn get(&self, id: ConversationId) -> Option<&(dyn Convo + '_)> { self.conversations.get(id).map(|c| c.as_ref()) } @@ -53,5 +66,6 @@ impl ConversationStore { mod group_test; mod privatev1; +use chat_proto::logoschat::encryption::EncryptedPayload; pub use group_test::GroupTestConvo; pub use privatev1::PrivateV1Convo; diff --git a/conversations/src/conversation/group_test.rs b/conversations/src/conversation/group_test.rs index 934ab1c..1b67128 100644 --- a/conversations/src/conversation/group_test.rs +++ b/conversations/src/conversation/group_test.rs @@ -1,6 +1,7 @@ use crate::{ conversation::{ChatError, ConversationId, Convo, Id}, - types::AddressedEncryptedPayload, + proto::EncryptedPayload, + types::{AddressedEncryptedPayload, ContentData}, }; #[derive(Debug)] @@ -27,6 +28,13 @@ impl Convo for GroupTestConvo { Ok(vec![]) } + fn handle_frame( + &mut self, + _encoded_payload: EncryptedPayload, + ) -> Result, ChatError> { + Ok(None) + } + fn remote_id(&self) -> String { self.id().to_string() } diff --git a/conversations/src/conversation/privatev1.rs b/conversations/src/conversation/privatev1.rs index 704918b..b58eb69 100644 --- a/conversations/src/conversation/privatev1.rs +++ b/conversations/src/conversation/privatev1.rs @@ -12,7 +12,7 @@ use crate::{ conversation::{ChatError, ConversationId, Convo, Id}, errors::EncryptionError, proto, - types::AddressedEncryptedPayload, + types::{AddressedEncryptedPayload, ContentData}, utils::timestamp_millis, }; @@ -89,6 +89,15 @@ impl PrivateV1Convo { .map_err(|e| EncryptionError::Decryption(e.to_string()))?; Ok(PrivateV1Frame::decode(content_bytes.as_slice()).unwrap()) } + + // Handler for application content + fn handle_content(&self, data: Vec) -> Option { + Some(ContentData { + conversation_id: self.id().into(), + data, + is_new_convo: false, + }) + } } impl Id for PrivateV1Convo { @@ -118,6 +127,28 @@ impl Convo for PrivateV1Convo { }]) } + fn handle_frame( + &mut self, + encoded_payload: EncryptedPayload, + ) -> Result, ChatError> { + // Extract expected frame + let frame = self + .decrypt(encoded_payload) + .map_err(|_| ChatError::Protocol("decryption".into()))?; + + let Some(frame_type) = frame.frame_type else { + return Err(ChatError::ProtocolExpectation("None", "Some".into())); + }; + + // Handle FrameTypes + let output = match frame_type { + FrameType::Content(bytes) => self.handle_content(bytes.into()), + FrameType::Placeholder(_) => None, + }; + + Ok(output) + } + fn remote_id(&self) -> String { //TODO: Implement as per spec self.id().into() diff --git a/conversations/src/inbox/inbox.rs b/conversations/src/inbox/inbox.rs index 401093c..66888b7 100644 --- a/conversations/src/inbox/inbox.rs +++ b/conversations/src/inbox/inbox.rs @@ -1,3 +1,4 @@ +use chat_proto::logoschat::encryption::EncryptedPayload; use hex; use prost::Message; use prost::bytes::Bytes; @@ -119,15 +120,11 @@ impl Inbox { Ok((convo, payloads)) } - fn handle_frame( + pub fn handle_frame( &mut self, - message: &[u8], - ) -> Result<(Box, Vec), ChatError> { - if message.len() == 0 { - return Err(ChatError::Protocol("Example error".into())); - } - - let handshake = Self::extract_payload(proto::EncryptedPayload::decode(message)?)?; + enc_payload: EncryptedPayload, + ) -> Result<(Box, Option), ChatError> { + let handshake = Self::extract_payload(enc_payload)?; let header = handshake .header @@ -145,7 +142,7 @@ impl Inbox { let convo = PrivateV1Convo::new_responder(seed_key, ephemeral_key.clone().into()); // TODO: Update PrivateV1 Constructor with DR, initial_message - Ok((Box::new(convo), vec![])) + Ok((Box::new(convo), None)) } } } @@ -240,19 +237,14 @@ mod tests { let mut raya_inbox = Inbox::new(raya_ident.into()); let bundle = raya_inbox.create_bundle(); - let (_, payloads) = saro_inbox + let (_, mut payloads) = saro_inbox .invite_to_private_convo(&bundle.into(), "hello".as_bytes()) .unwrap(); - let payload = payloads - .get(0) - .expect("RemoteInbox::invite_to_private_convo did not generate any payloads"); - - let mut buf = Vec::new(); - payload.data.encode(&mut buf).unwrap(); + let payload = payloads.remove(0); // Test handle_frame with valid payload - let result = raya_inbox.handle_frame(&buf); + let result = raya_inbox.handle_frame(payload.data); assert!( result.is_ok(), diff --git a/conversations/src/types.rs b/conversations/src/types.rs index 254ab9e..fd289a5 100644 --- a/conversations/src/types.rs +++ b/conversations/src/types.rs @@ -14,6 +14,7 @@ pub struct AddressedEnvelope { pub struct ContentData { pub conversation_id: String, pub data: Vec, + pub is_new_convo: bool, } // Internal type Definitions