From eacb98b50c78e2038f873124f17cd2a3d0ae7b4b Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Fri, 22 Aug 2025 18:45:55 -0700 Subject: [PATCH] Add newMessage Callback --- protos/private_v1.proto | 3 +- src/chat_sdk/client.nim | 55 +++++++++++------------ src/chat_sdk/conversation_store.nim | 4 +- src/chat_sdk/conversations/convo_type.nim | 17 ++++++- src/chat_sdk/conversations/private_v1.nim | 26 ++++++++--- src/chat_sdk/inbox.nim | 8 +++- src/chat_sdk/utils.nim | 10 ++++- src/nim_chat_poc.nim | 25 ++++++++++- 8 files changed, 105 insertions(+), 43 deletions(-) diff --git a/protos/private_v1.proto b/protos/private_v1.proto index d2022a2..f5eb691 100644 --- a/protos/private_v1.proto +++ b/protos/private_v1.proto @@ -12,7 +12,8 @@ message Placeholder { } message PrivateV1Frame { - string conversation_id = 1; + string conversation_id = 1; + bytes sender = 2; oneof frame_type { common_frames.ContentFrame content = 10; diff --git a/src/chat_sdk/client.nim b/src/chat_sdk/client.nim index 942d193..ee591c0 100644 --- a/src/chat_sdk/client.nim +++ b/src/chat_sdk/client.nim @@ -6,6 +6,8 @@ import # Foreign chronicles, chronos, + sequtils, + std/tables, std/sequtils, strformat, strutils, @@ -31,6 +33,10 @@ logScope: # Definitions ################################################# +type + MessageCallback[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.} + + type KeyEntry* = object keyType: string privateKey: PrivateKey @@ -44,6 +50,8 @@ type Client* = ref object inboundQueue: QueueRef isRunning: bool + newMessageCallbacks: seq[MessageCallback[string]] + ################################################# # Constructors ################################################# @@ -61,7 +69,8 @@ proc newClient*(name: string, cfg: WakuConfig): Client {.raises: [IOError, keyStore: initTable[string, KeyEntry](), conversations: initTable[string, Conversation](), inboundQueue: q, - isRunning: false) + isRunning: false, + newMessageCallbacks: @[]) let defaultInbox = initInbox(c.ident.getPubkey()) c.conversations[defaultInbox.id()] = defaultInbox @@ -71,6 +80,7 @@ proc newClient*(name: string, cfg: WakuConfig): Client {.raises: [IOError, result = c except Exception as e: error "newCLient", err = e.msg + ################################################# # Parameter Access ################################################# @@ -95,6 +105,21 @@ proc getConversationFromHint(self: Client, ok(some(self.conversations[conversationHint])) +proc listConversations*(client: Client): seq[Conversation] = + result = toSeq(client.conversations.values()) + +################################################# +# Callback Handling +################################################# + +proc onNewMessage*(client: Client, callback: MessageCallback[string]) = + client.newMessageCallbacks.add(callback) + +proc notifyNewMessage(client: Client, convo: Conversation, msg: string) = + for cb in client.newMessageCallbacks: + discard cb(convo, msg) + + ################################################# # Functional ################################################# @@ -196,16 +221,6 @@ proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError, except Exception as e: error "HandleFrame Failed", error = e.msg - -proc addMessage*(client: Client, convo: PrivateV1, - text: string = "") {.async.} = - ## Test Function to send automatic messages. to be removed. - let message = PrivateV1Frame(content: ContentFrame(domain: 0, tag: 1, - bytes: text.toBytes())) - - await convo.sendMessage(client.ds, message) - - ################################################# # Async Tasks ################################################# @@ -227,23 +242,6 @@ proc messageQueueConsumer(client: Client) {.async.} = pubsub = message.pubsubTopic, contentTopic = message.contentTopic -proc simulateMessages(client: Client){.async.} = - ## Test Task to generate messages after initialization. To be removed. - - # TODO: FutureBug - This should wait for a privateV1 conversation. - while client.conversations.len() <= 1: - await sleepAsync(4.seconds) - - notice "Starting Message Simulation", client = client.getId() - for a in 1..5: - await sleepAsync(4.seconds) - - - for conversation in client.conversations.values(): - if conversation of PrivateV1: - await client.addMessage(PrivateV1(conversation), - fmt"message: {a} from:{client.getId()}") - ################################################# # Control Functions ################################################# @@ -256,7 +254,6 @@ proc start*(client: Client) {.async.} = client.isRunning = true asyncSpawn client.messageQueueConsumer() - asyncSpawn client.simulateMessages() notice "Client start complete", client = client.getId() diff --git a/src/chat_sdk/conversation_store.nim b/src/chat_sdk/conversation_store.nim index 360d49c..b887f6a 100644 --- a/src/chat_sdk/conversation_store.nim +++ b/src/chat_sdk/conversation_store.nim @@ -6,11 +6,11 @@ import identity type ConvoId = string - - type ConversationStore* = concept proc addConversation(self: Self, convo: Conversation) proc getConversation(self: Self, convoId: string): Conversation proc identity(self: Self): Identity proc getId(self: Self): string + + proc notifyNewMessage(self: Self, convo: Conversation, msg: string) diff --git a/src/chat_sdk/conversations/convo_type.nim b/src/chat_sdk/conversations/convo_type.nim index 3490050..869007c 100644 --- a/src/chat_sdk/conversations/convo_type.nim +++ b/src/chat_sdk/conversations/convo_type.nim @@ -1,4 +1,9 @@ +import chronos import strformat +import strutils + +import ../delivery/waku_client +import ../utils type ConvoTypes* = enum @@ -11,6 +16,14 @@ type proc `$`(conv: Conversation): string = fmt"Convo: {conv.name}" -method id*(self: Conversation): string {.raises: [Defect].} = - raise newException(Defect, "Abstract function") +# TODO: Removing the raises clause and the exception raise causes this +# error --> ...src/chat_sdk/client.nim(166, 9) Error: addConversation(client, convo) can raise an unlisted exception: Exception +# Need better understanding of NIMs Exception model +method id*(self: Conversation): string {.raises: [Defect, ValueError].} = + # TODO: make this a compile time check + panic("ProgramError: Missing concrete implementation") +method sendMessage*(convo: Conversation, ds: WakuClient, + text: string) {.async, base, gcsafe.} = + # TODO: make this a compile time check + panic("ProgramError: Missing concrete implementation") diff --git a/src/chat_sdk/conversations/private_v1.nim b/src/chat_sdk/conversations/private_v1.nim index bff2033..1c5e663 100644 --- a/src/chat_sdk/conversations/private_v1.nim +++ b/src/chat_sdk/conversations/private_v1.nim @@ -2,7 +2,7 @@ import chronicles import chronos -import std/[sequtils, strutils] +import std/[sequtils, strutils, strformat] import std/algorithm import sugar @@ -66,11 +66,8 @@ proc initPrivateV1*(owner: Identity, participant: PublicKey, discriminator: discriminator ) -proc sendMessage*(self: PrivateV1, ds: WakuClient, +proc sendFrame(self: PrivateV1, ds: WakuClient, msg: PrivateV1Frame): Future[void]{.async.} = - notice "SENDING MSG", fromm = self.owner.getId(), - participants = self.participants, msg = msg - let encryptedBytes = EncryptedPayload(plaintext: Plaintext(payload: encode(msg))) discard ds.sendPayload(self.getTopic(), encryptedBytes.toEnvelope( @@ -87,9 +84,26 @@ proc handleFrame*[T: ConversationStore](convo: PrivateV1, client: T, let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result let frame = convo.decrypt(enc) # TODO: handle result + if frame.sender == @(convo.owner.getPubkey().bytes()): + notice "Self Message", convo = convo.id() + return + case frame.getKind(): of typeContentFrame: # TODO: Using client.getId() results in an error in this context - notice "Got Mail", text = frame.content.bytes.toUtfString() + client.notifyNewMessage(convo, toUtfString(frame.content.bytes)) + of typePlaceholder: notice "Got Placeholder", text = frame.placeholder.counter + + +method sendMessage*(convo: PrivateV1, ds: WakuClient, text: string) {.async.} = + + try: + let frame = PrivateV1Frame(sender: @(convo.owner.getPubkey().bytes()), + content: ContentFrame(domain: 0, tag: 1, bytes: text.toBytes())) + + await convo.sendFrame(ds, frame) + except Exception as e: + error "Unknown error in PrivateV1:SendMessage" + diff --git a/src/chat_sdk/inbox.nim b/src/chat_sdk/inbox.nim index fa752ff..b809368 100644 --- a/src/chat_sdk/inbox.nim +++ b/src/chat_sdk/inbox.nim @@ -9,6 +9,7 @@ import conversations, conversation_store, crypto, + delivery/waku_client, proto_types, utils @@ -77,7 +78,8 @@ proc createPrivateV1FromInvite*[T: ConversationStore](client: T, topic = convo.getConvoId() client.addConversation(convo) -proc handleFrame*[T: ConversationStore](convo: Inbox, client: T, bytes: seq[byte]) = +proc handleFrame*[T: ConversationStore](convo: Inbox, client: T, bytes: seq[ + byte]) = ## Dispatcher for Incoming `InboxV1Frames`. ## Calls further processing depending on the kind of frame. @@ -95,3 +97,7 @@ proc handleFrame*[T: ConversationStore](convo: Inbox, client: T, bytes: seq[byte of typeNote: notice "Receive Note", client = client.getId(), text = frame.note.text + + +method sendMessage*(convo: Inbox, ds: WakuClient, text: string) {.async.} = + warn "Cannot send message to Inbox" diff --git a/src/chat_sdk/utils.nim b/src/chat_sdk/utils.nim index b212989..043075a 100644 --- a/src/chat_sdk/utils.nim +++ b/src/chat_sdk/utils.nim @@ -1,5 +1,5 @@ import waku/waku_core -import std/[random, times] +import std/[macros, random, times] import crypto import blake2 import strutils @@ -28,3 +28,11 @@ proc toBytes*(s: string): seq[byte] = proc toUtfString*(b: seq[byte]): string = result = cast[string](b) + +macro panic*(reason: string): untyped = + result = quote do: + let pos = instantiationInfo() + echo `reason` & " ($1:$2)" % [ + pos.filename, $pos.line] + echo "traceback:\n", getStackTrace() + quit(1) diff --git a/src/nim_chat_poc.nim b/src/nim_chat_poc.nim index 03413aa..27745e6 100644 --- a/src/nim_chat_poc.nim +++ b/src/nim_chat_poc.nim @@ -1,7 +1,11 @@ import chronos import chronicles +import strformat + import chat_sdk/client +import chat_sdk/conversations import chat_sdk/delivery/waku_client +import chat_sdk/utils proc initLogging() = when defined(chronicles_runtime_filtering): @@ -25,9 +29,17 @@ proc main() {.async.} = # Start Clients var saro = newClient("Saro", cfg_saro) + saro.onNewMessage(proc(convo: Conversation, msg: string) {.async.} = + echo " Saro <------ :: " & msg + await sleepAsync(10000) + await convo.sendMessage(saro.ds, "Ping")) await saro.start() var raya = newClient("Raya", cfg_raya) + raya.onNewMessage(proc(convo: Conversation, msg: string) {.async.} = + echo " ------> Raya :: " & msg + await sleepAsync(10000) + await convo.sendMessage(raya.ds, "Pong")) await raya.start() await sleepAsync(5000) @@ -36,7 +48,18 @@ proc main() {.async.} = let raya_bundle = raya.createIntroBundle() discard await saro.newPrivateConversation(raya_bundle) - # Let messages process + await sleepAsync(5000) + + + try: + for convo in raya.listConversations(): + notice " Convo", convo = convo.id() + await convo.sendMessage(raya.ds, "Hello") + # Let messages process + + except Exception as e: + panic("UnCaught Exception"&e.msg) + await sleepAsync(400000) saro.stop()