Add newMessage Callback

This commit is contained in:
Jazz Turner-Baggs 2025-08-22 18:45:55 -07:00
parent 1d0e2e83ca
commit eacb98b50c
8 changed files with 105 additions and 43 deletions

View File

@ -12,7 +12,8 @@ message Placeholder {
} }
message PrivateV1Frame { message PrivateV1Frame {
string conversation_id = 1; string conversation_id = 1;
bytes sender = 2;
oneof frame_type { oneof frame_type {
common_frames.ContentFrame content = 10; common_frames.ContentFrame content = 10;

View File

@ -6,6 +6,8 @@
import # Foreign import # Foreign
chronicles, chronicles,
chronos, chronos,
sequtils,
std/tables,
std/sequtils, std/sequtils,
strformat, strformat,
strutils, strutils,
@ -31,6 +33,10 @@ logScope:
# Definitions # Definitions
################################################# #################################################
type
MessageCallback[T] = proc(conversation: Conversation, msg: T): Future[void] {.async.}
type KeyEntry* = object type KeyEntry* = object
keyType: string keyType: string
privateKey: PrivateKey privateKey: PrivateKey
@ -44,6 +50,8 @@ type Client* = ref object
inboundQueue: QueueRef inboundQueue: QueueRef
isRunning: bool isRunning: bool
newMessageCallbacks: seq[MessageCallback[string]]
################################################# #################################################
# Constructors # Constructors
################################################# #################################################
@ -61,7 +69,8 @@ proc newClient*(name: string, cfg: WakuConfig): Client {.raises: [IOError,
keyStore: initTable[string, KeyEntry](), keyStore: initTable[string, KeyEntry](),
conversations: initTable[string, Conversation](), conversations: initTable[string, Conversation](),
inboundQueue: q, inboundQueue: q,
isRunning: false) isRunning: false,
newMessageCallbacks: @[])
let defaultInbox = initInbox(c.ident.getPubkey()) let defaultInbox = initInbox(c.ident.getPubkey())
c.conversations[defaultInbox.id()] = defaultInbox c.conversations[defaultInbox.id()] = defaultInbox
@ -71,6 +80,7 @@ proc newClient*(name: string, cfg: WakuConfig): Client {.raises: [IOError,
result = c result = c
except Exception as e: except Exception as e:
error "newCLient", err = e.msg error "newCLient", err = e.msg
################################################# #################################################
# Parameter Access # Parameter Access
################################################# #################################################
@ -95,6 +105,21 @@ proc getConversationFromHint(self: Client,
ok(some(self.conversations[conversationHint])) ok(some(self.conversations[conversationHint]))
proc listConversations*(client: Client): seq[Conversation] =
result = toSeq(client.conversations.values())
#################################################
# Callback Handling
#################################################
proc onNewMessage*(client: Client, callback: MessageCallback[string]) =
client.newMessageCallbacks.add(callback)
proc notifyNewMessage(client: Client, convo: Conversation, msg: string) =
for cb in client.newMessageCallbacks:
discard cb(convo, msg)
################################################# #################################################
# Functional # Functional
################################################# #################################################
@ -196,16 +221,6 @@ proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError,
except Exception as e: except Exception as e:
error "HandleFrame Failed", error = e.msg error "HandleFrame Failed", error = e.msg
proc addMessage*(client: Client, convo: PrivateV1,
text: string = "") {.async.} =
## Test Function to send automatic messages. to be removed.
let message = PrivateV1Frame(content: ContentFrame(domain: 0, tag: 1,
bytes: text.toBytes()))
await convo.sendMessage(client.ds, message)
################################################# #################################################
# Async Tasks # Async Tasks
################################################# #################################################
@ -227,23 +242,6 @@ proc messageQueueConsumer(client: Client) {.async.} =
pubsub = message.pubsubTopic, contentTopic = message.contentTopic pubsub = message.pubsubTopic, contentTopic = message.contentTopic
proc simulateMessages(client: Client){.async.} =
## Test Task to generate messages after initialization. To be removed.
# TODO: FutureBug - This should wait for a privateV1 conversation.
while client.conversations.len() <= 1:
await sleepAsync(4.seconds)
notice "Starting Message Simulation", client = client.getId()
for a in 1..5:
await sleepAsync(4.seconds)
for conversation in client.conversations.values():
if conversation of PrivateV1:
await client.addMessage(PrivateV1(conversation),
fmt"message: {a} from:{client.getId()}")
################################################# #################################################
# Control Functions # Control Functions
################################################# #################################################
@ -256,7 +254,6 @@ proc start*(client: Client) {.async.} =
client.isRunning = true client.isRunning = true
asyncSpawn client.messageQueueConsumer() asyncSpawn client.messageQueueConsumer()
asyncSpawn client.simulateMessages()
notice "Client start complete", client = client.getId() notice "Client start complete", client = client.getId()

View File

@ -6,11 +6,11 @@ import identity
type ConvoId = string type ConvoId = string
type type
ConversationStore* = concept ConversationStore* = concept
proc addConversation(self: Self, convo: Conversation) proc addConversation(self: Self, convo: Conversation)
proc getConversation(self: Self, convoId: string): Conversation proc getConversation(self: Self, convoId: string): Conversation
proc identity(self: Self): Identity proc identity(self: Self): Identity
proc getId(self: Self): string proc getId(self: Self): string
proc notifyNewMessage(self: Self, convo: Conversation, msg: string)

View File

@ -1,4 +1,9 @@
import chronos
import strformat import strformat
import strutils
import ../delivery/waku_client
import ../utils
type type
ConvoTypes* = enum ConvoTypes* = enum
@ -11,6 +16,14 @@ type
proc `$`(conv: Conversation): string = proc `$`(conv: Conversation): string =
fmt"Convo: {conv.name}" fmt"Convo: {conv.name}"
method id*(self: Conversation): string {.raises: [Defect].} = # TODO: Removing the raises clause and the exception raise causes this
raise newException(Defect, "Abstract function") # error --> ...src/chat_sdk/client.nim(166, 9) Error: addConversation(client, convo) can raise an unlisted exception: Exception
# Need better understanding of NIMs Exception model
method id*(self: Conversation): string {.raises: [Defect, ValueError].} =
# TODO: make this a compile time check
panic("ProgramError: Missing concrete implementation")
method sendMessage*(convo: Conversation, ds: WakuClient,
text: string) {.async, base, gcsafe.} =
# TODO: make this a compile time check
panic("ProgramError: Missing concrete implementation")

View File

@ -2,7 +2,7 @@
import chronicles import chronicles
import chronos import chronos
import std/[sequtils, strutils] import std/[sequtils, strutils, strformat]
import std/algorithm import std/algorithm
import sugar import sugar
@ -66,11 +66,8 @@ proc initPrivateV1*(owner: Identity, participant: PublicKey,
discriminator: discriminator discriminator: discriminator
) )
proc sendMessage*(self: PrivateV1, ds: WakuClient, proc sendFrame(self: PrivateV1, ds: WakuClient,
msg: PrivateV1Frame): Future[void]{.async.} = msg: PrivateV1Frame): Future[void]{.async.} =
notice "SENDING MSG", fromm = self.owner.getId(),
participants = self.participants, msg = msg
let encryptedBytes = EncryptedPayload(plaintext: Plaintext(payload: encode(msg))) let encryptedBytes = EncryptedPayload(plaintext: Plaintext(payload: encode(msg)))
discard ds.sendPayload(self.getTopic(), encryptedBytes.toEnvelope( discard ds.sendPayload(self.getTopic(), encryptedBytes.toEnvelope(
@ -87,9 +84,26 @@ proc handleFrame*[T: ConversationStore](convo: PrivateV1, client: T,
let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result
let frame = convo.decrypt(enc) # TODO: handle result let frame = convo.decrypt(enc) # TODO: handle result
if frame.sender == @(convo.owner.getPubkey().bytes()):
notice "Self Message", convo = convo.id()
return
case frame.getKind(): case frame.getKind():
of typeContentFrame: of typeContentFrame:
# TODO: Using client.getId() results in an error in this context # TODO: Using client.getId() results in an error in this context
notice "Got Mail", text = frame.content.bytes.toUtfString() client.notifyNewMessage(convo, toUtfString(frame.content.bytes))
of typePlaceholder: of typePlaceholder:
notice "Got Placeholder", text = frame.placeholder.counter notice "Got Placeholder", text = frame.placeholder.counter
method sendMessage*(convo: PrivateV1, ds: WakuClient, text: string) {.async.} =
try:
let frame = PrivateV1Frame(sender: @(convo.owner.getPubkey().bytes()),
content: ContentFrame(domain: 0, tag: 1, bytes: text.toBytes()))
await convo.sendFrame(ds, frame)
except Exception as e:
error "Unknown error in PrivateV1:SendMessage"

View File

@ -9,6 +9,7 @@ import
conversations, conversations,
conversation_store, conversation_store,
crypto, crypto,
delivery/waku_client,
proto_types, proto_types,
utils utils
@ -77,7 +78,8 @@ proc createPrivateV1FromInvite*[T: ConversationStore](client: T,
topic = convo.getConvoId() topic = convo.getConvoId()
client.addConversation(convo) client.addConversation(convo)
proc handleFrame*[T: ConversationStore](convo: Inbox, client: T, bytes: seq[byte]) = proc handleFrame*[T: ConversationStore](convo: Inbox, client: T, bytes: seq[
byte]) =
## Dispatcher for Incoming `InboxV1Frames`. ## Dispatcher for Incoming `InboxV1Frames`.
## Calls further processing depending on the kind of frame. ## Calls further processing depending on the kind of frame.
@ -95,3 +97,7 @@ proc handleFrame*[T: ConversationStore](convo: Inbox, client: T, bytes: seq[byte
of typeNote: of typeNote:
notice "Receive Note", client = client.getId(), text = frame.note.text notice "Receive Note", client = client.getId(), text = frame.note.text
method sendMessage*(convo: Inbox, ds: WakuClient, text: string) {.async.} =
warn "Cannot send message to Inbox"

View File

@ -1,5 +1,5 @@
import waku/waku_core import waku/waku_core
import std/[random, times] import std/[macros, random, times]
import crypto import crypto
import blake2 import blake2
import strutils import strutils
@ -28,3 +28,11 @@ proc toBytes*(s: string): seq[byte] =
proc toUtfString*(b: seq[byte]): string = proc toUtfString*(b: seq[byte]): string =
result = cast[string](b) result = cast[string](b)
macro panic*(reason: string): untyped =
result = quote do:
let pos = instantiationInfo()
echo `reason` & " ($1:$2)" % [
pos.filename, $pos.line]
echo "traceback:\n", getStackTrace()
quit(1)

View File

@ -1,7 +1,11 @@
import chronos import chronos
import chronicles import chronicles
import strformat
import chat_sdk/client import chat_sdk/client
import chat_sdk/conversations
import chat_sdk/delivery/waku_client import chat_sdk/delivery/waku_client
import chat_sdk/utils
proc initLogging() = proc initLogging() =
when defined(chronicles_runtime_filtering): when defined(chronicles_runtime_filtering):
@ -25,9 +29,17 @@ proc main() {.async.} =
# Start Clients # Start Clients
var saro = newClient("Saro", cfg_saro) var saro = newClient("Saro", cfg_saro)
saro.onNewMessage(proc(convo: Conversation, msg: string) {.async.} =
echo " Saro <------ :: " & msg
await sleepAsync(10000)
await convo.sendMessage(saro.ds, "Ping"))
await saro.start() await saro.start()
var raya = newClient("Raya", cfg_raya) var raya = newClient("Raya", cfg_raya)
raya.onNewMessage(proc(convo: Conversation, msg: string) {.async.} =
echo " ------> Raya :: " & msg
await sleepAsync(10000)
await convo.sendMessage(raya.ds, "Pong"))
await raya.start() await raya.start()
await sleepAsync(5000) await sleepAsync(5000)
@ -36,7 +48,18 @@ proc main() {.async.} =
let raya_bundle = raya.createIntroBundle() let raya_bundle = raya.createIntroBundle()
discard await saro.newPrivateConversation(raya_bundle) discard await saro.newPrivateConversation(raya_bundle)
# Let messages process await sleepAsync(5000)
try:
for convo in raya.listConversations():
notice " Convo", convo = convo.id()
await convo.sendMessage(raya.ds, "Hello")
# Let messages process
except Exception as e:
panic("UnCaught Exception"&e.msg)
await sleepAsync(400000) await sleepAsync(400000)
saro.stop() saro.stop()