Update client

This commit is contained in:
Jazz Turner-Baggs 2026-02-11 09:56:57 -08:00
parent 6fc0dbb076
commit 3767942fc3
No known key found for this signature in database

View File

@ -26,6 +26,27 @@ logScope:
# Definitions
#################################################
# Type used to return message data via callback
type ReceivedMessage* = ref object of RootObj
sender*: PublicKey
timestamp*: int64
content*: seq[byte]
type ConvoType = enum
PrivateV1
type Conversation* = object
ctx: LibChat
convoId: string
ds: WakuClient
convo_type: ConvoType
proc id*(self: Conversation): string =
return "TODO"
type
MessageCallback* = proc(conversation: Conversation, msg: ReceivedMessage): Future[void] {.async.}
NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.}
@ -40,13 +61,10 @@ type KeyEntry* = object
type ChatClient* = ref object
libchatCtx: LibChat
ident: Identity
ds*: WakuClient
keyStore: Table[string, KeyEntry] # Keyed by HexEncoded Public Key
conversations: Table[string, Conversation] # Keyed by conversation ID
id: string
inboundQueue: QueueRef
isRunning: bool
inbox: Inbox
newMessageCallbacks: seq[MessageCallback]
newConvoCallbacks: seq[NewConvoCallback]
@ -56,36 +74,23 @@ type ChatClient* = ref object
# Constructors
#################################################
proc newClient*(ds: WakuClient, ident: Identity): ChatClient {.raises: [IOError,
ValueError, SerializationError].} =
proc newClient*(ds: WakuClient, ident: Identity): ChatClient {.raises: [IOError, ValueError].} =
## Creates new instance of a `ChatClient` with a given `WakuConfig`
try:
let rm = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"SDS InitializationError")
let defaultInbox = initInbox(ident)
var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
var c = ChatClient(ident: ident,
var c = ChatClient(
libchatCtx: newConversationsContext(),
ident: ident,
ds: ds,
keyStore: initTable[string, KeyEntry](),
conversations: initTable[string, Conversation](),
id: ident.getName(),
inboundQueue: q,
isRunning: false,
inbox: defaultInbox,
newMessageCallbacks: @[],
newConvoCallbacks: @[])
c.conversations[defaultInbox.id()] = defaultInbox
notice "Client started", client = c.ident.getName(),
defaultInbox = defaultInbox, inTopic= topic_inbox(c.ident.get_addr())
notice "Client started", client = c.id
# Set LibChatBufferSize
c.libchatCtx.setBufferSize(256);
result = c
except Exception as e:
error "newCLient", err = e.msg
@ -95,27 +100,12 @@ proc newClient*(ds: WakuClient, ident: Identity): ChatClient {.raises: [IOError,
#################################################
proc getId*(client: ChatClient): string =
result = client.ident.getName()
proc identity*(client: ChatClient): Identity =
result = client.ident
proc defaultInboxConversationId*(self: ChatClient): string =
## Returns the default inbox address for the client.
result = conversationIdFor(self.ident.getPubkey())
proc getConversationFromHint(self: ChatClient,
conversationHint: string): Result[Option[Conversation], string] =
# TODO: Implementing Hinting
if not self.conversations.hasKey(conversationHint):
ok(none(Conversation))
else:
ok(some(self.conversations[conversationHint]))
result = client.id
proc listConversations*(client: ChatClient): seq[Conversation] =
result = toSeq(client.conversations.values())
# TODO: (P1) Implement list conversations
result = @[]
#################################################
# Callback Handling
@ -133,6 +123,7 @@ proc onNewConversation*(client: ChatClient, callback: NewConvoCallback) =
proc notifyNewConversation(client: ChatClient, convo: Conversation) =
for cb in client.newConvoCallbacks:
debug "calling OnConvo CB", client=client.getId(), len = client.newConvoCallbacks.len()
discard cb(convo)
proc onDeliveryAck*(client: ChatClient, callback: DeliveryAckCallback) =
@ -147,51 +138,47 @@ proc notifyDeliveryAck(client: ChatClient, convo: Conversation,
# Functional
#################################################
proc createIntroBundle*(self: var ChatClient): IntroBundle =
proc createIntroBundle*(self: var ChatClient): seq[byte] =
## Generates an IntroBundle for the client, which includes
## the required information to send a message.
# Create Ephemeral keypair, save it in the key store
let ephemeralKey = generateKey()
self.keyStore[ephemeralKey.getPublicKey().bytes().bytesToHex()] = KeyEntry(
keyType: "ephemeral",
privateKey: ephemeralKey,
timestamp: getCurrentTimestamp()
)
result = IntroBundle(
ident: @(self.ident.getPubkey().bytes()),
ephemeral: @(ephemeralKey.getPublicKey().bytes()),
)
result = self.libchatCtx.createIntroductionBundle().valueOr:
error "could not create bundle",error=error, client = self.getId()
return
notice "IntroBundleCreated", client = self.getId(),
pubBytes = result.ident
bundle = result
proc sendPayloads(ds: WakuClient, payloads: seq[PayloadResult]) =
for payload in payloads:
# TODO: (P2) surface errors
discard ds.sendBytes(payload.address, payload.data)
#################################################
# Conversation Initiation
#################################################
proc addConversation*(client: ChatClient, convo: Conversation) =
notice "Creating conversation", client = client.getId(), convoId = convo.id()
client.conversations[convo.id()] = convo
client.notifyNewConversation(convo)
proc getConversation*(client: ChatClient, convoId: string): Conversation =
notice "Get conversation", client = client.getId(), convoId = convoId
result = client.conversations[convoId]
result = Conversation(ctx:client.libchatCtx, convoId:convoId, ds: client.ds, convo_type: PrivateV1)
proc newPrivateConversation*(client: ChatClient,
introBundle: IntroBundle, content: Content): Future[Option[ChatError]] {.async.} =
## Creates a private conversation with the given `IntroBundle`.
## `IntroBundles` are provided out-of-band.
let remote_pubkey = loadPublicKeyFromBytes(introBundle.ident).get()
let remote_ephemeralkey = loadPublicKeyFromBytes(introBundle.ephemeral).get()
introBundle: seq[byte], content: Content): Future[Option[ChatError]] {.async.} =
let convo = await client.inbox.inviteToPrivateConversation(client.ds,remote_pubkey, remote_ephemeralkey, content )
client.addConversation(convo) # TODO: Fix re-entrantancy bug. Convo needs to be saved before payload is sent.
let res = client.libchatCtx.createNewPrivateConvo(introBundle, content)
let (convoId, payloads) = res.valueOr:
error "could not create bundle",error=error, client = client.getId()
return some(ChatError(code: errLibChat, context:fmt"got: {error}" ))
client.ds.sendPayloads(payloads);
client.notifyNewConversation(Conversation(ctx: client.libchatCtx,
convoId : convoId, ds: client.ds, convo_type: ConvoType.PrivateV1
))
notice "CREATED", client=client.getId(), convoId=convoId
return none(ChatError)
@ -200,31 +187,33 @@ proc newPrivateConversation*(client: ChatClient,
# Receives a incoming payload, decodes it, and processes it.
#################################################
proc parseMessage(client: ChatClient, msg: ChatPayload) {.raises: [ValueError,
SerializationError].} =
let envelopeRes = decode(msg.bytes, WapEnvelopeV1)
if envelopeRes.isErr:
debug "Failed to decode WapEnvelopeV1", client = client.getId(), err = envelopeRes.error
return
let envelope = envelopeRes.get()
let convo = block:
let opt = client.getConversationFromHint(envelope.conversationHint).valueOr:
raise newException(ValueError, "Failed to get conversation: " & error)
if opt.isSome():
opt.get()
else:
let k = toSeq(client.conversations.keys()).join(", ")
warn "No conversation found", client = client.getId(),
hint = envelope.conversationHint, knownIds = k
return
proc parseMessage(client: ChatClient, msg: ChatPayload) {.raises: [ValueError].} =
try:
convo.handleFrame(client, envelope.payload)
let opt_content = client.libchatCtx.handlePayload(msg.bytes).valueOr:
error "handlePayload" ,client=client.getId()
return
if opt_content.isSome():
let content = opt_content.get()
let convo = client.getConversation(content.conversationId)
let msg = ReceivedMessage(timestamp:getCurrentTimestamp(),content: content.data )
client.notifyNewMessage(convo, msg)
else:
debug "Parsed message generated no content", client=client.getId()
except Exception as e:
error "HandleFrame Failed", error = e.msg
proc sendMessage*(convo: Conversation, content: Content) : Future[MessageId] {.async, gcsafe.} =
let payloads = convo.ctx.sendContent(convo.convoId, content).valueOr:
error "SendMessage", e=error
return "error"
convo.ds.sendPayloads(payloads);
#################################################
# Async Tasks
#################################################
@ -235,20 +224,9 @@ proc messageQueueConsumer(client: ChatClient) {.async.} =
while client.isRunning:
let message = await client.inboundQueue.queue.get()
debug "Got WakuMessage", client = client.getId() , topic= message.content_topic, len=message.bytes.len()
let topicRes = inbox.parseTopic(message.contentTopic).or(private_v1.parseTopic(message.contentTopic))
if topicRes.isErr:
debug "Invalid content topic", client = client.getId(), err = topicRes.error, contentTopic = message.contentTopic
continue
notice "Inbound Message Received", client = client.getId(),
contentTopic = message.contentTopic, len = message.bytes.len()
try:
client.parseMessage(message)
except CatchableError as e:
error "Error in message listener", err = e.msg,
pubsub = message.pubsubTopic, contentTopic = message.contentTopic
client.parseMessage(message)
#################################################
@ -269,5 +247,6 @@ proc start*(client: ChatClient) {.async.} =
proc stop*(client: ChatClient) {.async.} =
## Stop the client.
await client.ds.stop()
client.libchatCtx.destroy()
client.isRunning = false
notice "Client stopped", client = client.getId()