logos-chat/src/chat/client.nim

254 lines
7.5 KiB
Nim
Raw Normal View History

2025-08-15 08:37:53 -07:00
## Main Entry point to the ChatSDK.
## Clients are the primary manager of sending and receiving
## messages, and managing conversations.
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
import # Foreign
chronicles,
chronos,
libchat,
std/options,
2025-08-15 07:31:19 -07:00
strformat,
2025-09-05 16:19:26 -07:00
types
2025-08-15 07:31:19 -07:00
import #local
2025-08-21 12:35:55 -07:00
delivery/waku_client,
errors,
2025-08-15 07:31:19 -07:00
types,
2025-08-21 12:35:55 -07:00
utils
2025-08-15 07:31:19 -07:00
logScope:
topics = "chat client"
2025-09-15 00:42:07 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Definitions
#################################################
# Type used to return message data via callback
type ReceivedMessage* = ref object of RootObj
sender*: PublicKey
timestamp*: int64
content*: seq[byte]
type ConvoType = enum
PrivateV1
type Conversation* = object
ctx: LibChat
convoId: string
ds: WakuClient
convo_type: ConvoType
proc id*(self: Conversation): string =
return self.convoId
2025-08-22 18:45:55 -07:00
type
MessageCallback* = proc(conversation: Conversation, msg: ReceivedMessage): Future[void] {.async.}
2025-09-05 15:35:47 -07:00
NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.}
2025-09-05 16:19:26 -07:00
DeliveryAckCallback* = proc(conversation: Conversation,
msgId: MessageId): Future[void] {.async.}
2025-08-22 18:45:55 -07:00
2026-01-12 18:16:01 +02:00
type ChatClient* = ref object
libchatCtx: LibChat
2025-08-15 07:31:19 -07:00
ds*: WakuClient
inboundQueue: QueueRef
isRunning: bool
2025-07-05 14:54:19 -07:00
2025-10-15 13:48:26 -07:00
newMessageCallbacks: seq[MessageCallback]
2025-08-22 22:40:27 -07:00
newConvoCallbacks: seq[NewConvoCallback]
2025-09-05 16:19:26 -07:00
deliveryAckCallbacks: seq[DeliveryAckCallback]
2025-08-22 18:45:55 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Constructors
#################################################
2025-07-05 14:54:19 -07:00
2026-02-17 16:19:53 -08:00
proc newClient*(ds: WakuClient, ephemeral: bool = true, installation_name: string = "default"): Result[ChatClient, ErrorType] =
2026-02-17 12:55:40 -08:00
## Creates new instance of a `ChatClient` with a given `WakuConfig`.
## A new installation is created if no saved installation with `installation_name` is found
if not ephemeral:
return err("persistence is not currently supported")
2025-08-20 10:16:52 -07:00
try:
2025-08-20 10:16:52 -07:00
var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
var c = ChatClient(
libchatCtx: newConversationsContext(),
ds: ds,
2025-08-20 10:16:52 -07:00
inboundQueue: q,
2025-08-22 18:45:55 -07:00
isRunning: false,
2025-08-22 22:40:27 -07:00
newMessageCallbacks: @[],
newConvoCallbacks: @[])
2025-08-20 10:16:52 -07:00
2026-02-17 12:55:40 -08:00
notice "Client started"
2026-02-17 12:55:40 -08:00
result = ok(c)
2025-08-20 10:16:52 -07:00
except Exception as e:
error "newCLient", err = e.msg
2026-02-17 12:55:40 -08:00
result = err(e.msg)
2025-08-22 18:45:55 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Parameter Access
#################################################
2026-01-12 18:16:01 +02:00
proc getId*(client: ChatClient): string =
2026-02-17 12:55:40 -08:00
result = "PLACEHOLDER_CLIENT_ID" # TODO: (!) get from Libchat.
2025-08-15 08:05:59 -07:00
2026-01-12 18:16:01 +02:00
proc listConversations*(client: ChatClient): seq[Conversation] =
# TODO: (P1) Implement list conversations
result = @[]
2025-08-22 18:45:55 -07:00
#################################################
# Callback Handling
#################################################
2026-01-12 18:16:01 +02:00
proc onNewMessage*(client: ChatClient, callback: MessageCallback) =
2025-08-22 18:45:55 -07:00
client.newMessageCallbacks.add(callback)
2026-01-12 18:16:01 +02:00
proc notifyNewMessage*(client: ChatClient, convo: Conversation, msg: ReceivedMessage) =
2025-08-22 18:45:55 -07:00
for cb in client.newMessageCallbacks:
discard cb(convo, msg)
2025-08-22 18:45:55 -07:00
2026-01-12 18:16:01 +02:00
proc onNewConversation*(client: ChatClient, callback: NewConvoCallback) =
2025-08-22 22:40:27 -07:00
client.newConvoCallbacks.add(callback)
2026-01-12 18:16:01 +02:00
proc notifyNewConversation(client: ChatClient, convo: Conversation) =
2025-08-22 22:40:27 -07:00
for cb in client.newConvoCallbacks:
debug "calling OnConvo CB", client=client.getId(), len = client.newConvoCallbacks.len()
2025-08-22 22:40:27 -07:00
discard cb(convo)
2025-08-22 18:45:55 -07:00
2026-01-12 18:16:01 +02:00
proc onDeliveryAck*(client: ChatClient, callback: DeliveryAckCallback) =
2025-09-05 16:19:26 -07:00
client.deliveryAckCallbacks.add(callback)
2025-09-05 15:35:47 -07:00
2026-01-12 18:16:01 +02:00
proc notifyDeliveryAck(client: ChatClient, convo: Conversation,
2025-09-05 15:35:47 -07:00
messageId: MessageId) =
2025-09-05 16:19:26 -07:00
for cb in client.deliveryAckCallbacks:
2025-09-05 15:35:47 -07:00
discard cb(convo, messageId)
2025-08-15 08:05:59 -07:00
#################################################
# Functional
#################################################
proc createIntroBundle*(self: ChatClient): seq[byte] =
2025-07-05 14:54:19 -07:00
## Generates an IntroBundle for the client, which includes
## the required information to send a message.
result = self.libchatCtx.createIntroductionBundle().valueOr:
error "could not create bundle",error=error, client = self.getId()
return
2025-08-15 07:38:31 -07:00
notice "IntroBundleCreated", client = self.getId(),
bundle = result
proc sendPayloads(ds: WakuClient, payloads: seq[PayloadResult]) =
for payload in payloads:
# TODO: (P2) surface errors
discard ds.sendBytes(payload.address, payload.data)
2025-08-15 07:31:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Conversation Initiation
#################################################
2025-08-15 08:37:53 -07:00
2026-01-12 18:16:01 +02:00
proc getConversation*(client: ChatClient, convoId: string): Conversation =
result = Conversation(ctx:client.libchatCtx, convoId:convoId, ds: client.ds, convo_type: PrivateV1)
2025-07-07 14:20:46 -07:00
2026-01-12 18:16:01 +02:00
proc newPrivateConversation*(client: ChatClient,
introBundle: seq[byte], content: Content): Future[Option[ChatError]] {.async.} =
2025-07-05 14:54:19 -07:00
let res = client.libchatCtx.createNewPrivateConvo(introBundle, content)
let (convoId, payloads) = res.valueOr:
error "could not create bundle",error=error, client = client.getId()
return some(ChatError(code: errLibChat, context:fmt"got: {error}" ))
2025-07-05 14:54:19 -07:00
client.ds.sendPayloads(payloads);
client.notifyNewConversation(Conversation(ctx: client.libchatCtx,
convoId : convoId, ds: client.ds, convo_type: ConvoType.PrivateV1
))
notice "CREATED", client=client.getId(), convoId=convoId
2025-08-15 07:31:19 -07:00
return none(ChatError)
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Payload Handling
2025-12-04 11:51:34 +08:00
# Receives a incoming payload, decodes it, and processes it.
2025-08-15 08:05:59 -07:00
#################################################
2025-07-05 14:54:19 -07:00
proc parseMessage(client: ChatClient, msg: ChatPayload) {.raises: [ValueError].} =
2025-07-05 14:54:19 -07:00
try:
let opt_content = client.libchatCtx.handlePayload(msg.bytes).valueOr:
error "handlePayload" , error=error, client=client.getId()
return
if opt_content.isSome():
let content = opt_content.get()
let convo = client.getConversation(content.conversationId)
if content.isNewConvo:
client.notifyNewConversation(convo)
# TODO: (P1) Add sender information from LibChat.
2026-02-17 11:11:27 -08:00
let msg = ReceivedMessage(timestamp:getCurrentTimestamp(),content: content.data )
client.notifyNewMessage(convo, msg)
2025-08-15 10:37:07 -07:00
else:
debug "Parsed message generated no content", client=client.getId()
2025-08-15 07:31:19 -07:00
2025-08-20 10:16:52 -07:00
except Exception as e:
error "HandleFrame Failed", error = e.msg
2025-08-15 07:31:19 -07:00
proc sendMessage*(convo: Conversation, content: Content) : Future[MessageId] {.async, gcsafe.} =
let payloads = convo.ctx.sendContent(convo.convoId, content).valueOr:
error "SendMessage", e=error
return "error"
convo.ds.sendPayloads(payloads);
2025-08-15 08:05:59 -07:00
#################################################
# Async Tasks
#################################################
2026-01-12 18:16:01 +02:00
proc messageQueueConsumer(client: ChatClient) {.async.} =
2025-08-15 07:31:19 -07:00
## Main message processing loop
info "Message listener started"
while client.isRunning:
let message = await client.inboundQueue.queue.get()
debug "Got WakuMessage", client = client.getId() , topic= message.content_topic, len=message.bytes.len()
2025-08-15 07:31:19 -07:00
client.parseMessage(message)
2025-08-15 07:31:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Control Functions
#################################################
2025-07-05 14:54:19 -07:00
2026-01-12 18:16:01 +02:00
proc start*(client: ChatClient) {.async.} =
## Start `ChatClient` and listens for incoming messages.
2025-08-15 07:31:19 -07:00
client.ds.addDispatchQueue(client.inboundQueue)
asyncSpawn client.ds.start()
2025-08-15 07:31:19 -07:00
client.isRunning = true
2025-08-15 07:31:19 -07:00
asyncSpawn client.messageQueueConsumer()
2025-08-20 11:34:02 -07:00
notice "Client start complete", client = client.getId()
2026-01-12 18:16:01 +02:00
proc stop*(client: ChatClient) {.async.} =
2025-08-15 08:37:53 -07:00
## Stop the client.
2025-12-03 17:44:41 +08:00
await client.ds.stop()
client.libchatCtx.destroy()
2025-08-15 07:31:19 -07:00
client.isRunning = false
2025-08-20 11:34:02 -07:00
notice "Client stopped", client = client.getId()