nim-chat-poc/src/chat/client.nim

276 lines
8.6 KiB
Nim
Raw Normal View History

2025-08-15 08:37:53 -07:00
## Main Entry point to the ChatSDK.
## Clients are the primary manager of sending and receiving
## messages, and managing conversations.
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
import # Foreign
chronicles,
chronos,
2025-09-05 15:35:47 -07:00
sds,
2025-08-22 18:45:55 -07:00
sequtils,
std/tables,
2025-08-15 07:31:19 -07:00
std/sequtils,
strformat,
strutils,
2025-09-05 16:19:26 -07:00
tables,
types
2025-08-15 07:31:19 -07:00
import #local
2025-08-21 12:35:55 -07:00
conversations,
conversations/convo_impl,
2025-08-15 07:31:19 -07:00
crypto,
2025-08-21 12:35:55 -07:00
delivery/waku_client,
errors,
2025-08-15 07:31:19 -07:00
identity,
inbox,
proto_types,
types,
2025-08-21 12:35:55 -07:00
utils
2025-08-15 07:31:19 -07:00
logScope:
topics = "chat client"
2025-09-15 00:42:07 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Definitions
#################################################
2025-08-22 18:45:55 -07:00
type
MessageCallback* = proc(conversation: Conversation, msg: ReceivedMessage): Future[void] {.async.}
2025-09-05 15:35:47 -07:00
NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.}
2025-09-05 16:19:26 -07:00
DeliveryAckCallback* = proc(conversation: Conversation,
msgId: MessageId): Future[void] {.async.}
2025-08-22 18:45:55 -07:00
2025-07-05 14:54:19 -07:00
type KeyEntry* = object
2025-08-15 09:26:02 -07:00
keyType: string
2025-12-03 16:20:10 +08:00
privateKey: PrivateKey
2025-07-05 14:54:19 -07:00
timestamp: int64
2025-08-15 07:31:19 -07:00
type Client* = ref object
ident: Identity
ds*: WakuClient
2025-08-15 09:26:02 -07:00
keyStore: Table[string, KeyEntry] # Keyed by HexEncoded Public Key
2025-08-20 10:16:52 -07:00
conversations: Table[string, Conversation] # Keyed by conversation ID
2025-08-15 07:31:19 -07:00
inboundQueue: QueueRef
isRunning: bool
inbox: Inbox
2025-07-05 14:54:19 -07:00
2025-10-15 13:48:26 -07:00
newMessageCallbacks: seq[MessageCallback]
2025-08-22 22:40:27 -07:00
newConvoCallbacks: seq[NewConvoCallback]
2025-09-05 16:19:26 -07:00
deliveryAckCallbacks: seq[DeliveryAckCallback]
2025-08-22 18:45:55 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Constructors
#################################################
2025-07-05 14:54:19 -07:00
proc newClient*(ds: WakuClient, ident: Identity): Client {.raises: [IOError,
2025-08-20 10:16:52 -07:00
ValueError, SerializationError].} =
2025-08-15 08:37:53 -07:00
## Creates new instance of a `Client` with a given `WakuConfig`
2025-08-20 10:16:52 -07:00
try:
2025-09-05 15:35:47 -07:00
let rm = newReliabilityManager().valueOr:
raise newException(ValueError, fmt"SDS InitializationError")
2025-08-20 10:16:52 -07:00
let defaultInbox = initInbox(ident)
2025-08-20 10:16:52 -07:00
var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
2025-09-15 00:42:07 -07:00
var c = Client(ident: ident,
ds: ds,
2025-08-20 10:16:52 -07:00
keyStore: initTable[string, KeyEntry](),
conversations: initTable[string, Conversation](),
inboundQueue: q,
2025-08-22 18:45:55 -07:00
isRunning: false,
inbox: defaultInbox,
2025-08-22 22:40:27 -07:00
newMessageCallbacks: @[],
newConvoCallbacks: @[])
2025-08-20 10:16:52 -07:00
c.conversations[defaultInbox.id()] = defaultInbox
notice "Client started", client = c.ident.getName(),
defaultInbox = defaultInbox, inTopic= topic_inbox(c.ident.get_addr())
2025-08-20 10:16:52 -07:00
result = c
except Exception as e:
error "newCLient", err = e.msg
2025-08-22 18:45:55 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Parameter Access
#################################################
2025-08-20 11:34:02 -07:00
proc getId*(client: Client): string =
result = client.ident.getName()
2025-08-15 07:38:31 -07:00
2025-08-20 10:16:52 -07:00
proc identity*(client: Client): Identity =
result = client.ident
2025-08-15 09:26:02 -07:00
proc defaultInboxConversationId*(self: Client): string =
2025-07-05 14:54:19 -07:00
## Returns the default inbox address for the client.
2025-08-15 09:26:02 -07:00
result = conversationIdFor(self.ident.getPubkey())
2025-07-05 14:54:19 -07:00
2025-08-15 08:05:59 -07:00
proc getConversationFromHint(self: Client,
2025-08-20 10:16:52 -07:00
conversationHint: string): Result[Option[Conversation], string] =
2025-08-15 08:05:59 -07:00
# TODO: Implementing Hinting
2025-08-15 09:26:02 -07:00
if not self.conversations.hasKey(conversationHint):
2025-08-20 10:16:52 -07:00
ok(none(Conversation))
2025-08-15 08:05:59 -07:00
else:
2025-08-15 09:26:02 -07:00
ok(some(self.conversations[conversationHint]))
2025-08-15 08:05:59 -07:00
2025-08-22 18:45:55 -07:00
proc listConversations*(client: Client): seq[Conversation] =
result = toSeq(client.conversations.values())
#################################################
# Callback Handling
#################################################
2025-10-15 13:48:26 -07:00
proc onNewMessage*(client: Client, callback: MessageCallback) =
2025-08-22 18:45:55 -07:00
client.newMessageCallbacks.add(callback)
proc notifyNewMessage*(client: Client, convo: Conversation, msg: ReceivedMessage) =
2025-08-22 18:45:55 -07:00
for cb in client.newMessageCallbacks:
discard cb(convo, msg)
2025-08-22 18:45:55 -07:00
2025-08-22 22:40:27 -07:00
proc onNewConversation*(client: Client, callback: NewConvoCallback) =
client.newConvoCallbacks.add(callback)
proc notifyNewConversation(client: Client, convo: Conversation) =
for cb in client.newConvoCallbacks:
discard cb(convo)
2025-08-22 18:45:55 -07:00
2025-09-05 16:19:26 -07:00
proc onDeliveryAck*(client: Client, callback: DeliveryAckCallback) =
client.deliveryAckCallbacks.add(callback)
2025-09-05 15:35:47 -07:00
2025-09-05 16:19:26 -07:00
proc notifyDeliveryAck(client: Client, convo: Conversation,
2025-09-05 15:35:47 -07:00
messageId: MessageId) =
2025-09-05 16:19:26 -07:00
for cb in client.deliveryAckCallbacks:
2025-09-05 15:35:47 -07:00
discard cb(convo, messageId)
2025-08-15 08:05:59 -07:00
#################################################
# Functional
#################################################
2025-07-05 14:54:19 -07:00
proc createIntroBundle*(self: var Client): IntroBundle =
## Generates an IntroBundle for the client, which includes
## the required information to send a message.
# Create Ephemeral keypair, save it in the key store
2025-08-15 09:26:02 -07:00
let ephemeralKey = generateKey()
self.keyStore[ephemeralKey.getPublicKey().bytes().bytesToHex()] = KeyEntry(
keyType: "ephemeral",
privateKey: ephemeralKey,
2025-09-05 15:35:47 -07:00
timestamp: getCurrentTimestamp()
2025-07-05 14:54:19 -07:00
)
result = IntroBundle(
2025-07-16 16:17:22 -07:00
ident: @(self.ident.getPubkey().bytes()),
2025-08-15 09:26:02 -07:00
ephemeral: @(ephemeralKey.getPublicKey().bytes()),
2025-07-05 14:54:19 -07:00
)
2025-08-15 07:38:31 -07:00
notice "IntroBundleCreated", client = self.getId(),
2025-08-15 07:31:19 -07:00
pubBytes = result.ident
2025-08-15 08:05:59 -07:00
#################################################
# Conversation Initiation
#################################################
2025-08-20 10:16:52 -07:00
proc addConversation*(client: Client, convo: Conversation) =
notice "Creating conversation", client = client.getId(), convoId = convo.id()
2025-08-20 10:16:52 -07:00
client.conversations[convo.id()] = convo
2025-08-22 22:40:27 -07:00
client.notifyNewConversation(convo)
2025-08-15 08:37:53 -07:00
2025-08-20 10:16:52 -07:00
proc getConversation*(client: Client, convoId: string): Conversation =
2025-08-20 11:34:02 -07:00
notice "Get conversation", client = client.getId(), convoId = convoId
2025-08-20 10:16:52 -07:00
result = client.conversations[convoId]
2025-07-07 14:20:46 -07:00
2025-08-15 07:31:19 -07:00
proc newPrivateConversation*(client: Client,
introBundle: IntroBundle, content: Content): Future[Option[ChatError]] {.async.} =
2025-08-15 08:37:53 -07:00
## Creates a private conversation with the given `IntroBundle`.
## `IntroBundles` are provided out-of-band.
let remote_pubkey = loadPublicKeyFromBytes(introBundle.ident).get()
let remote_ephemeralkey = loadPublicKeyFromBytes(introBundle.ephemeral).get()
2025-07-05 14:54:19 -07:00
let convo = await client.inbox.inviteToPrivateConversation(client.ds,remote_pubkey, remote_ephemeralkey, content )
client.addConversation(convo) # TODO: Fix re-entrantancy bug. Convo needs to be saved before payload is sent.
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
return none(ChatError)
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Payload Handling
2025-12-04 11:51:34 +08:00
# Receives a incoming payload, decodes it, and processes it.
2025-08-15 08:05:59 -07:00
#################################################
2025-07-05 14:54:19 -07:00
2025-08-20 10:16:52 -07:00
proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError,
SerializationError].} =
2025-12-04 10:45:16 +08:00
let envelopeRes = decode(msg.bytes, WapEnvelopeV1)
if envelopeRes.isErr:
2025-12-04 11:51:34 +08:00
debug "Failed to decode WapEnvelopeV1", client = client.getId(), err = envelopeRes.error
2025-12-04 10:45:16 +08:00
return
let envelope = envelopeRes.get()
2025-07-05 14:54:19 -07:00
2025-08-20 10:16:52 -07:00
let convo = block:
2025-08-15 10:37:07 -07:00
let opt = client.getConversationFromHint(envelope.conversationHint).valueOr:
raise newException(ValueError, "Failed to get conversation: " & error)
2025-07-05 14:54:19 -07:00
2025-08-15 10:37:07 -07:00
if opt.isSome():
opt.get()
else:
let k = toSeq(client.conversations.keys()).join(", ")
warn "No conversation found", client = client.getId(),
hint = envelope.conversationHint, knownIds = k
return
2025-08-15 07:31:19 -07:00
2025-08-20 10:16:52 -07:00
try:
convo.handleFrame(client, envelope.payload)
except Exception as e:
error "HandleFrame Failed", error = e.msg
2025-08-15 07:31:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Async Tasks
#################################################
2025-08-15 07:31:19 -07:00
proc messageQueueConsumer(client: Client) {.async.} =
## Main message processing loop
info "Message listener started"
while client.isRunning:
let message = await client.inboundQueue.queue.get()
2025-12-04 11:51:34 +08:00
let topicRes = inbox.parseTopic(message.contentTopic).or(private_v1.parseTopic(message.contentTopic))
if topicRes.isErr:
debug "Invalid content topic", client = client.getId(), err = topicRes.error, contentTopic = message.contentTopic
continue
2025-08-15 07:38:31 -07:00
notice "Inbound Message Received", client = client.getId(),
2025-08-15 07:31:19 -07:00
contentTopic = message.contentTopic, len = message.bytes.len()
try:
client.parseMessage(message)
except CatchableError as e:
error "Error in message listener", err = e.msg,
pubsub = message.pubsubTopic, contentTopic = message.contentTopic
2025-08-15 08:05:59 -07:00
#################################################
# Control Functions
#################################################
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
proc start*(client: Client) {.async.} =
2025-08-15 08:37:53 -07:00
## Start `Client` and listens for incoming messages.
2025-08-15 07:31:19 -07:00
client.ds.addDispatchQueue(client.inboundQueue)
asyncSpawn client.ds.start()
2025-08-15 07:31:19 -07:00
client.isRunning = true
2025-08-15 07:31:19 -07:00
asyncSpawn client.messageQueueConsumer()
2025-08-20 11:34:02 -07:00
notice "Client start complete", client = client.getId()
2025-12-03 17:44:41 +08:00
proc stop*(client: Client) {.async.} =
2025-08-15 08:37:53 -07:00
## Stop the client.
2025-12-03 17:44:41 +08:00
await client.ds.stop()
2025-08-15 07:31:19 -07:00
client.isRunning = false
2025-08-20 11:34:02 -07:00
notice "Client stopped", client = client.getId()