nim-chat-poc/src/chat/client.nim

258 lines
7.5 KiB
Nim

## Main Entry point to the ChatSDK.
## Clients are the primary manager of sending and receiving
## messages, and managing conversations.
import # Foreign
chronicles,
chronos,
libchat,
std/options,
strformat,
types
import #local
delivery/waku_client,
errors,
identity,
types,
utils
logScope:
topics = "chat client"
#################################################
# Definitions
#################################################
# Type used to return message data via callback
type ReceivedMessage* = ref object of RootObj
sender*: PublicKey
timestamp*: int64
content*: seq[byte]
type ConvoType = enum
PrivateV1
type Conversation* = object
ctx: LibChat
convoId: string
ds: WakuClient
convo_type: ConvoType
proc id*(self: Conversation): string =
return self.convoId
type
MessageCallback* = proc(conversation: Conversation, msg: ReceivedMessage): Future[void] {.async.}
NewConvoCallback* = proc(conversation: Conversation): Future[void] {.async.}
DeliveryAckCallback* = proc(conversation: Conversation,
msgId: MessageId): Future[void] {.async.}
type KeyEntry* = object
keyType: string
privateKey: PrivateKey
timestamp: int64
type ChatClient* = ref object
libchatCtx: LibChat
ds*: WakuClient
id: string
inboundQueue: QueueRef
isRunning: bool
newMessageCallbacks: seq[MessageCallback]
newConvoCallbacks: seq[NewConvoCallback]
deliveryAckCallbacks: seq[DeliveryAckCallback]
#################################################
# Constructors
#################################################
proc newClient*(ds: WakuClient, ident: Identity): ChatClient {.raises: [IOError, ValueError].} =
## Creates new instance of a `ChatClient` with a given `WakuConfig`
## TODO: (P1) Currently the passed in Identity is not used. Libchat Generates one for every invocation.
try:
var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
var c = ChatClient(
libchatCtx: newConversationsContext(),
ds: ds,
id: ident.getName(),
inboundQueue: q,
isRunning: false,
newMessageCallbacks: @[],
newConvoCallbacks: @[])
notice "Client started", client = c.id
result = c
except Exception as e:
error "newCLient", err = e.msg
#################################################
# Parameter Access
#################################################
proc getId*(client: ChatClient): string =
result = client.id
proc listConversations*(client: ChatClient): seq[Conversation] =
# TODO: (P1) Implement list conversations
result = @[]
#################################################
# Callback Handling
#################################################
proc onNewMessage*(client: ChatClient, callback: MessageCallback) =
client.newMessageCallbacks.add(callback)
proc notifyNewMessage*(client: ChatClient, convo: Conversation, msg: ReceivedMessage) =
for cb in client.newMessageCallbacks:
discard cb(convo, msg)
proc onNewConversation*(client: ChatClient, callback: NewConvoCallback) =
client.newConvoCallbacks.add(callback)
proc notifyNewConversation(client: ChatClient, convo: Conversation) =
for cb in client.newConvoCallbacks:
debug "calling OnConvo CB", client=client.getId(), len = client.newConvoCallbacks.len()
discard cb(convo)
proc onDeliveryAck*(client: ChatClient, callback: DeliveryAckCallback) =
client.deliveryAckCallbacks.add(callback)
proc notifyDeliveryAck(client: ChatClient, convo: Conversation,
messageId: MessageId) =
for cb in client.deliveryAckCallbacks:
discard cb(convo, messageId)
#################################################
# Functional
#################################################
proc createIntroBundle*(self: ChatClient): seq[byte] =
## Generates an IntroBundle for the client, which includes
## the required information to send a message.
result = self.libchatCtx.createIntroductionBundle().valueOr:
error "could not create bundle",error=error, client = self.getId()
return
notice "IntroBundleCreated", client = self.getId(),
bundle = result
proc sendPayloads(ds: WakuClient, payloads: seq[PayloadResult]) =
for payload in payloads:
# TODO: (P2) surface errors
discard ds.sendBytes(payload.address, payload.data)
#################################################
# Conversation Initiation
#################################################
proc getConversation*(client: ChatClient, convoId: string): Conversation =
result = Conversation(ctx:client.libchatCtx, convoId:convoId, ds: client.ds, convo_type: PrivateV1)
proc newPrivateConversation*(client: ChatClient,
introBundle: seq[byte], content: Content): Future[Option[ChatError]] {.async.} =
let res = client.libchatCtx.createNewPrivateConvo(introBundle, content)
let (convoId, payloads) = res.valueOr:
error "could not create bundle",error=error, client = client.getId()
return some(ChatError(code: errLibChat, context:fmt"got: {error}" ))
client.ds.sendPayloads(payloads);
client.notifyNewConversation(Conversation(ctx: client.libchatCtx,
convoId : convoId, ds: client.ds, convo_type: ConvoType.PrivateV1
))
notice "CREATED", client=client.getId(), convoId=convoId
return none(ChatError)
#################################################
# Payload Handling
# Receives a incoming payload, decodes it, and processes it.
#################################################
proc parseMessage(client: ChatClient, msg: ChatPayload) {.raises: [ValueError].} =
try:
let opt_content = client.libchatCtx.handlePayload(msg.bytes).valueOr:
error "handlePayload" , error=error, client=client.getId()
return
if opt_content.isSome():
let content = opt_content.get()
let convo = client.getConversation(content.conversationId)
if content.isNewConvo:
client.notifyNewConversation(convo)
# TODO: (P1) Add sender information from LibChat.
let msg = ReceivedMessage(timestamp:getCurrentTimestamp(),content: content.data )
client.notifyNewMessage(convo, msg)
else:
debug "Parsed message generated no content", client=client.getId()
except Exception as e:
error "HandleFrame Failed", error = e.msg
proc sendMessage*(convo: Conversation, content: Content) : Future[MessageId] {.async, gcsafe.} =
let payloads = convo.ctx.sendContent(convo.convoId, content).valueOr:
error "SendMessage", e=error
return "error"
convo.ds.sendPayloads(payloads);
#################################################
# Async Tasks
#################################################
proc messageQueueConsumer(client: ChatClient) {.async.} =
## Main message processing loop
info "Message listener started"
while client.isRunning:
let message = await client.inboundQueue.queue.get()
debug "Got WakuMessage", client = client.getId() , topic= message.content_topic, len=message.bytes.len()
client.parseMessage(message)
#################################################
# Control Functions
#################################################
proc start*(client: ChatClient) {.async.} =
## Start `ChatClient` and listens for incoming messages.
client.ds.addDispatchQueue(client.inboundQueue)
asyncSpawn client.ds.start()
client.isRunning = true
asyncSpawn client.messageQueueConsumer()
notice "Client start complete", client = client.getId()
proc stop*(client: ChatClient) {.async.} =
## Stop the client.
await client.ds.stop()
client.libchatCtx.destroy()
client.isRunning = false
notice "Client stopped", client = client.getId()