diff --git a/src/chat/client.nim b/src/chat/client.nim index 79af020..6ffec00 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -26,6 +26,27 @@ logScope: # Definitions ################################################# +# Type used to return message data via callback +type ReceivedMessage* = ref object of RootObj + sender*: PublicKey + timestamp*: int64 + content*: seq[byte] + + +type ConvoType = enum + PrivateV1 + +type Conversation* = object + ctx: LibChat + convoId: string + ds: WakuClient + convo_type: ConvoType + + +proc id*(self: Conversation): string = + return "TODO" + + type MessageCallback* = proc(conversation: Conversation, msg: ReceivedMessage): Future[void] {.async.} NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.} @@ -40,13 +61,10 @@ type KeyEntry* = object type ChatClient* = ref object libchatCtx: LibChat - ident: Identity ds*: WakuClient - keyStore: Table[string, KeyEntry] # Keyed by HexEncoded Public Key - conversations: Table[string, Conversation] # Keyed by conversation ID + id: string inboundQueue: QueueRef isRunning: bool - inbox: Inbox newMessageCallbacks: seq[MessageCallback] newConvoCallbacks: seq[NewConvoCallback] @@ -56,36 +74,23 @@ type ChatClient* = ref object # Constructors ################################################# -proc newClient*(ds: WakuClient, ident: Identity): ChatClient {.raises: [IOError, - ValueError, SerializationError].} = +proc newClient*(ds: WakuClient, ident: Identity): ChatClient {.raises: [IOError, ValueError].} = ## Creates new instance of a `ChatClient` with a given `WakuConfig` try: - let rm = newReliabilityManager().valueOr: - raise newException(ValueError, fmt"SDS InitializationError") - - let defaultInbox = initInbox(ident) var q = QueueRef(queue: newAsyncQueue[ChatPayload](10)) - var c = ChatClient(ident: ident, var c = ChatClient( libchatCtx: newConversationsContext(), - ident: ident, ds: ds, - keyStore: initTable[string, KeyEntry](), - conversations: initTable[string, Conversation](), + id: ident.getName(), inboundQueue: q, isRunning: false, - inbox: defaultInbox, newMessageCallbacks: @[], newConvoCallbacks: @[]) - c.conversations[defaultInbox.id()] = defaultInbox - notice "Client started", client = c.ident.getName(), - defaultInbox = defaultInbox, inTopic= topic_inbox(c.ident.get_addr()) + notice "Client started", client = c.id - # Set LibChatBufferSize - c.libchatCtx.setBufferSize(256); result = c except Exception as e: error "newCLient", err = e.msg @@ -95,27 +100,12 @@ proc newClient*(ds: WakuClient, ident: Identity): ChatClient {.raises: [IOError, ################################################# proc getId*(client: ChatClient): string = - result = client.ident.getName() - -proc identity*(client: ChatClient): Identity = - result = client.ident - -proc defaultInboxConversationId*(self: ChatClient): string = - ## Returns the default inbox address for the client. - result = conversationIdFor(self.ident.getPubkey()) - -proc getConversationFromHint(self: ChatClient, - conversationHint: string): Result[Option[Conversation], string] = - - # TODO: Implementing Hinting - if not self.conversations.hasKey(conversationHint): - ok(none(Conversation)) - else: - ok(some(self.conversations[conversationHint])) + result = client.id proc listConversations*(client: ChatClient): seq[Conversation] = - result = toSeq(client.conversations.values()) + # TODO: (P1) Implement list conversations + result = @[] ################################################# # Callback Handling @@ -133,6 +123,7 @@ proc onNewConversation*(client: ChatClient, callback: NewConvoCallback) = proc notifyNewConversation(client: ChatClient, convo: Conversation) = for cb in client.newConvoCallbacks: + debug "calling OnConvo CB", client=client.getId(), len = client.newConvoCallbacks.len() discard cb(convo) proc onDeliveryAck*(client: ChatClient, callback: DeliveryAckCallback) = @@ -147,51 +138,47 @@ proc notifyDeliveryAck(client: ChatClient, convo: Conversation, # Functional ################################################# -proc createIntroBundle*(self: var ChatClient): IntroBundle = +proc createIntroBundle*(self: var ChatClient): seq[byte] = ## Generates an IntroBundle for the client, which includes ## the required information to send a message. - - # Create Ephemeral keypair, save it in the key store - let ephemeralKey = generateKey() - - self.keyStore[ephemeralKey.getPublicKey().bytes().bytesToHex()] = KeyEntry( - keyType: "ephemeral", - privateKey: ephemeralKey, - timestamp: getCurrentTimestamp() - ) - - result = IntroBundle( - ident: @(self.ident.getPubkey().bytes()), - ephemeral: @(ephemeralKey.getPublicKey().bytes()), - ) - + result = self.libchatCtx.createIntroductionBundle().valueOr: + error "could not create bundle",error=error, client = self.getId() + return + notice "IntroBundleCreated", client = self.getId(), - pubBytes = result.ident + bundle = result + +proc sendPayloads(ds: WakuClient, payloads: seq[PayloadResult]) = + for payload in payloads: + # TODO: (P2) surface errors + discard ds.sendBytes(payload.address, payload.data) ################################################# # Conversation Initiation ################################################# -proc addConversation*(client: ChatClient, convo: Conversation) = - notice "Creating conversation", client = client.getId(), convoId = convo.id() - client.conversations[convo.id()] = convo - client.notifyNewConversation(convo) proc getConversation*(client: ChatClient, convoId: string): Conversation = - notice "Get conversation", client = client.getId(), convoId = convoId - result = client.conversations[convoId] + result = Conversation(ctx:client.libchatCtx, convoId:convoId, ds: client.ds, convo_type: PrivateV1) proc newPrivateConversation*(client: ChatClient, - introBundle: IntroBundle, content: Content): Future[Option[ChatError]] {.async.} = - ## Creates a private conversation with the given `IntroBundle`. - ## `IntroBundles` are provided out-of-band. - let remote_pubkey = loadPublicKeyFromBytes(introBundle.ident).get() - let remote_ephemeralkey = loadPublicKeyFromBytes(introBundle.ephemeral).get() + introBundle: seq[byte], content: Content): Future[Option[ChatError]] {.async.} = - let convo = await client.inbox.inviteToPrivateConversation(client.ds,remote_pubkey, remote_ephemeralkey, content ) - client.addConversation(convo) # TODO: Fix re-entrantancy bug. Convo needs to be saved before payload is sent. + let res = client.libchatCtx.createNewPrivateConvo(introBundle, content) + let (convoId, payloads) = res.valueOr: + error "could not create bundle",error=error, client = client.getId() + return some(ChatError(code: errLibChat, context:fmt"got: {error}" )) + + client.ds.sendPayloads(payloads); + + + client.notifyNewConversation(Conversation(ctx: client.libchatCtx, + convoId : convoId, ds: client.ds, convo_type: ConvoType.PrivateV1 + )) + + notice "CREATED", client=client.getId(), convoId=convoId return none(ChatError) @@ -200,31 +187,33 @@ proc newPrivateConversation*(client: ChatClient, # Receives a incoming payload, decodes it, and processes it. ################################################# -proc parseMessage(client: ChatClient, msg: ChatPayload) {.raises: [ValueError, - SerializationError].} = - let envelopeRes = decode(msg.bytes, WapEnvelopeV1) - if envelopeRes.isErr: - debug "Failed to decode WapEnvelopeV1", client = client.getId(), err = envelopeRes.error - return - let envelope = envelopeRes.get() - - let convo = block: - let opt = client.getConversationFromHint(envelope.conversationHint).valueOr: - raise newException(ValueError, "Failed to get conversation: " & error) - - if opt.isSome(): - opt.get() - else: - let k = toSeq(client.conversations.keys()).join(", ") - warn "No conversation found", client = client.getId(), - hint = envelope.conversationHint, knownIds = k - return +proc parseMessage(client: ChatClient, msg: ChatPayload) {.raises: [ValueError].} = try: - convo.handleFrame(client, envelope.payload) + let opt_content = client.libchatCtx.handlePayload(msg.bytes).valueOr: + error "handlePayload" ,client=client.getId() + return + + if opt_content.isSome(): + let content = opt_content.get() + let convo = client.getConversation(content.conversationId) + + let msg = ReceivedMessage(timestamp:getCurrentTimestamp(),content: content.data ) + client.notifyNewMessage(convo, msg) + else: + debug "Parsed message generated no content", client=client.getId() + except Exception as e: error "HandleFrame Failed", error = e.msg +proc sendMessage*(convo: Conversation, content: Content) : Future[MessageId] {.async, gcsafe.} = + let payloads = convo.ctx.sendContent(convo.convoId, content).valueOr: + error "SendMessage", e=error + return "error" + + convo.ds.sendPayloads(payloads); + + ################################################# # Async Tasks ################################################# @@ -235,20 +224,9 @@ proc messageQueueConsumer(client: ChatClient) {.async.} = while client.isRunning: let message = await client.inboundQueue.queue.get() + debug "Got WakuMessage", client = client.getId() , topic= message.content_topic, len=message.bytes.len() - let topicRes = inbox.parseTopic(message.contentTopic).or(private_v1.parseTopic(message.contentTopic)) - if topicRes.isErr: - debug "Invalid content topic", client = client.getId(), err = topicRes.error, contentTopic = message.contentTopic - continue - - notice "Inbound Message Received", client = client.getId(), - contentTopic = message.contentTopic, len = message.bytes.len() - try: - client.parseMessage(message) - - except CatchableError as e: - error "Error in message listener", err = e.msg, - pubsub = message.pubsubTopic, contentTopic = message.contentTopic + client.parseMessage(message) ################################################# @@ -269,5 +247,6 @@ proc start*(client: ChatClient) {.async.} = proc stop*(client: ChatClient) {.async.} = ## Stop the client. await client.ds.stop() + client.libchatCtx.destroy() client.isRunning = false notice "Client stopped", client = client.getId()