diff --git a/core/conversations/src/conversation.rs b/core/conversations/src/conversation.rs index c9377c4..6f17892 100644 --- a/core/conversations/src/conversation.rs +++ b/core/conversations/src/conversation.rs @@ -1,3 +1,4 @@ +mod direct_v1; pub mod group_v1; mod group_v2; mod privatev1; @@ -6,6 +7,7 @@ pub use crate::errors::ChatError; use crate::outcomes::ConvoOutcome; use crate::proto::EncryptedPayload; use crate::service_context::{ExternalServices, ServiceContext}; +pub use direct_v1::DirectV1Convo; pub use group_v1::GroupV1Convo; pub use group_v2::GroupV2Convo; pub use privatev1::PrivateV1Convo; @@ -15,7 +17,7 @@ pub type ConversationId = String; pub type ConversationIdRef<'a> = &'a str; /// Behaviour shared by every conversation kind. -pub(crate) trait Convo: Identified { +pub(crate) trait Convo: Identified + Send { fn send_content(&mut self, cx: &mut ServiceContext, content: &[u8]) -> Result<(), ChatError>; diff --git a/core/conversations/src/conversation/direct_v1.rs b/core/conversations/src/conversation/direct_v1.rs new file mode 100644 index 0000000..f75bdb0 --- /dev/null +++ b/core/conversations/src/conversation/direct_v1.rs @@ -0,0 +1,61 @@ +use chat_proto::logoschat::encryption::EncryptedPayload; +use shared_traits::IdentIdRef; + +use crate::{ + ChatError, ExternalServices, + conversation::{ConversationIdRef, Convo, GroupConvo, GroupV1Convo, Identified}, + service_context::ServiceContext, +}; + +type DelegateGroup = GroupV1Convo; + +/// A Conversation between two participants. +#[derive(Debug)] +pub struct DirectV1Convo { + inner_group: DelegateGroup, +} + +impl DirectV1Convo { + // Constructor must accept multiple IdentId's + // While the conversation is limited to 2 participants, each participants may + // have multiple Installations. + pub fn new( + cx: &mut ServiceContext, + members: &[IdentIdRef], + ) -> Result { + let mut inner_group = DelegateGroup::new(cx)?; + inner_group.add_member(cx, members)?; + Ok(Self { inner_group }) + } +} + +impl Identified for DirectV1Convo { + fn id(&self) -> ConversationIdRef<'_> { + self.inner_group.id() + } +} + +impl Convo for DirectV1Convo +where + S: ExternalServices, +{ + fn send_content( + &mut self, + cx: &mut ServiceContext, + content: &[u8], + ) -> Result<(), super::ChatError> { + self.inner_group.send_content(cx, content) + } + + fn handle_frame( + &mut self, + cx: &mut ServiceContext, + enc: EncryptedPayload, + ) -> Result { + self.inner_group.handle_frame(cx, enc) + } + + fn wakeup(&mut self, service_ctx: &mut ServiceContext) -> Result<(), ChatError> { + self.inner_group.wakeup(service_ctx) + } +} diff --git a/core/conversations/src/core.rs b/core/conversations/src/core.rs index 1bc5a88..d88a4a8 100644 --- a/core/conversations/src/core.rs +++ b/core/conversations/src/core.rs @@ -1,6 +1,6 @@ use crate::causal_history::{CausalHistoryStore, MissingMessage}; use crate::conversation::{ - ConversationIdRef, GroupV1Convo, GroupV2Convo, Identified, PrivateV1Convo, + ConversationIdRef, DirectV1Convo, GroupV1Convo, GroupV2Convo, Identified, PrivateV1Convo, }; use crate::service_context::{ExternalServices, ServiceContext}; use crate::{DeliveryService, IdentityProvider, RegistrationService, WakeupService}; @@ -16,6 +16,7 @@ use crypto::{Identity, PublicKey}; use openmls::group::GroupId; use shared_traits::IdentIdRef; use std::collections::HashMap; +use std::fmt::Debug; use storage::{ChatStore, ConversationKind, ConversationStore}; use tracing::{info, instrument}; @@ -188,6 +189,14 @@ impl<'a, S: ExternalServices + 'static> Core { &mut self, remote_bundle: &Introduction, content: &[u8], + ) -> Result { + self.create_private_convo_v1(remote_bundle, content) + } + + pub fn create_private_convo_v1( + &mut self, + remote_bundle: &Introduction, + content: &[u8], ) -> Result { let (mut convo, payloads) = self.inbox @@ -204,6 +213,17 @@ impl<'a, S: ExternalServices + 'static> Core { Ok(convo_id) } + pub fn create_direct_convo_v1( + &mut self, + members: &[IdentIdRef], + ) -> Result { + let convo = DirectV1Convo::new(&mut self.services, members)?; + let convo_id = convo.id().to_string(); + self.register_convo(ConvoTypeOwned::Direct(Box::new(convo)))?; + + Ok(convo_id) + } + pub fn create_group_convo( &mut self, participants: &[IdentIdRef], @@ -266,6 +286,10 @@ impl<'a, S: ExternalServices + 'static> Core { ConvoTypeOwned::Group(group_convo) => { group_convo.add_member(&mut self.services, members) } + ConvoTypeOwned::Direct(convo) => Err(ChatError::UnsupportedFunction( + convo.id().into(), + "Add Member".into(), + )), } } else { let mut convo = self.load_group_convo(convo_id)?; @@ -397,6 +421,7 @@ impl<'a, S: ExternalServices + 'static> Core { }; let convo = match convo { ConvoTypeOwned::Group(c) => c.as_mut(), + ConvoTypeOwned::Direct(c) => c.as_mut(), }; convo.wakeup(&mut self.services) @@ -466,15 +491,24 @@ impl<'a, S: ExternalServices + 'static> Core { } } -#[derive(Debug)] enum ConvoTypeOwned { - // Pairwise(Box>), + Direct(Box>), Group(Box>), } +impl Debug for ConvoTypeOwned { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + Self::Direct(arg0) => f.debug_tuple("Pairwise").field(&arg0.id()).finish(), + Self::Group(arg0) => f.debug_tuple("Group").field(&arg0.id()).finish(), + } + } +} + impl Identified for ConvoTypeOwned { fn id(&self) -> ConversationIdRef<'_> { match self { + ConvoTypeOwned::Direct(convo) => convo.id(), ConvoTypeOwned::Group(group_convo) => group_convo.id(), } } @@ -488,6 +522,7 @@ impl Convo for ConvoTypeOwned { ) -> Result<(), ChatError> { match self { ConvoTypeOwned::Group(group_convo) => group_convo.send_content(cx, content), + ConvoTypeOwned::Direct(convo) => convo.send_content(cx, content), } } @@ -498,12 +533,14 @@ impl Convo for ConvoTypeOwned { ) -> Result { match self { ConvoTypeOwned::Group(group_convo) => group_convo.handle_frame(cx, enc), + ConvoTypeOwned::Direct(convo) => convo.handle_frame(cx, enc), } } fn wakeup(&mut self, service_ctx: &mut ServiceContext) -> Result<(), ChatError> { match self { ConvoTypeOwned::Group(group_convo) => group_convo.wakeup(service_ctx), + ConvoTypeOwned::Direct(convo) => convo.wakeup(service_ctx), } } } diff --git a/core/conversations/src/errors.rs b/core/conversations/src/errors.rs index 879e923..4126582 100644 --- a/core/conversations/src/errors.rs +++ b/core/conversations/src/errors.rs @@ -4,6 +4,8 @@ pub use thiserror::Error; use storage::StorageError; +use crate::ConversationId; + #[derive(Error, Debug)] pub enum ChatError { #[error("protocol error: {0:?}")] @@ -42,6 +44,9 @@ pub enum ChatError { MlsError(#[from] MlsError), #[error("demls error: {0}")] DeMlsError(#[from] ConversationError), + // Used when a core function is called with a convo_id which is unsupported + #[error("convo:{0} does not support {1}")] + UnsupportedFunction(ConversationId, String), } impl ChatError { diff --git a/core/integration_tests_core/tests/private_integration.rs b/core/integration_tests_core/tests/private_integration.rs index 61e9584..55819e0 100644 --- a/core/integration_tests_core/tests/private_integration.rs +++ b/core/integration_tests_core/tests/private_integration.rs @@ -87,7 +87,7 @@ fn ctx_integration() { // Saro initiates conversation with Raya let mut content = vec![10]; - let saro_convo_id = saro.create_private_convo(&intro, &content).unwrap(); + let saro_convo_id = saro.create_private_convo_v1(&intro, &content).unwrap(); // Raya receives the invite + initial message let initial = recv_one(&mut raya); @@ -178,7 +178,7 @@ fn conversation_metadata_persistence() { let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - bob.create_private_convo(&intro, b"hi").unwrap(); + bob.create_private_convo_v1(&intro, b"hi").unwrap(); let result = recv_one(&mut alice); let PayloadOutcome::Inbox(io) = result else { @@ -219,7 +219,7 @@ fn conversation_full_flow() { let bundle = alice.create_intro_bundle().unwrap(); let intro = Introduction::try_from(bundle.as_slice()).unwrap(); - let bob_convo_id = bob.create_private_convo(&intro, b"hello").unwrap(); + let bob_convo_id = bob.create_private_convo_v1(&intro, b"hello").unwrap(); let result = recv_one(&mut alice); let PayloadOutcome::Inbox(io) = result else { diff --git a/core/integration_tests_core/tests/test_direct_v1.rs b/core/integration_tests_core/tests/test_direct_v1.rs new file mode 100644 index 0000000..da050d1 --- /dev/null +++ b/core/integration_tests_core/tests/test_direct_v1.rs @@ -0,0 +1,43 @@ +use integration_tests_core::TestHarness; +use tracing::info; + +#[test] +fn happypath_roundtrip() { + let _ = tracing_subscriber::fmt() + .with_max_level(tracing::Level::INFO) + .with_test_writer() + .try_init(); + + const S_M1: &[u8] = b"Marco"; + const R_M1: &[u8] = b"Polo"; + + // Initialize TestHarness with 2 clients + let mut harness = TestHarness::<2>::new(|_, _| {}); + + //Saro Create Convo + let particpant = harness.raya().addr(); + let convo_id = harness + .saro() + .create_direct_convo_v1(&[&particpant]) + .expect("saro create group"); + + // Carry the invite through (commit, WelcomeReady, routing to Raya's inbox, + // accept_welcome); settle until Raya has joined. + harness.process_until_label("Saro Send", |h| h.raya().convo_count() == 1); + + // Saro sends a message; settle until Raya receives it. + info!(target: "chat", "Saro -> sending: {S_M1:?}"); + harness + .saro() + .send_content(&convo_id, S_M1) + .expect("saro send"); + + harness.process_until(|h| h.raya().check(&convo_id, S_M1)); + + // Raya replies; settle until Saro receives it. + info!(target: "chat", "Raya -> sending:{R_M1:?}"); + harness.raya().send_content(&convo_id, R_M1).unwrap(); + harness.process_until(|h| h.saro().check(&convo_id, R_M1)); + + assert!(harness.saro().check(&convo_id, R_M1)); +} diff --git a/crates/client/src/client.rs b/crates/client/src/client.rs index 17b5439..170030a 100644 --- a/crates/client/src/client.rs +++ b/crates/client/src/client.rs @@ -168,7 +168,7 @@ where let intro = Introduction::try_from(intro_bundle)?; self.core .lock() - .create_private_convo(&intro, initial_content) + .create_private_convo_v1(&intro, initial_content) .map_err(Into::into) }