logos-chat/src/client.nim

324 lines
9.7 KiB
Nim
Raw Normal View History

2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
import # Foreign
chronicles,
chronos,
json,
std/sequtils,
strformat,
strutils,
tables
import #local
conversations/private_v1,
crypto,
identity,
inbox,
proto_types,
types,
utils,
waku_client
logScope:
topics = "chat client"
2025-07-05 14:54:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Definitions
#################################################
2025-07-05 14:54:19 -07:00
type KeyEntry* = object
keytype: string
2025-07-16 16:17:22 -07:00
privateKey: PrivateKey
2025-07-05 14:54:19 -07:00
timestamp: int64
type
2025-08-15 07:31:19 -07:00
SupportedConvoTypes* = Inbox | PrivateV1
2025-07-05 14:54:19 -07:00
ConvoType* = enum
InboxV1Type, PrivateV1Type
ConvoWrapper* = object
case convo_type*: ConvoType
of InboxV1Type:
inboxV1*: Inbox
of PrivateV1Type:
privateV1*: PrivateV1
2025-08-15 07:31:19 -07:00
type Client* = ref object
ident: Identity
ds*: WakuClient
key_store: Table[string, KeyEntry] # Keyed by HexEncoded Public Key
conversations: Table[string, ConvoWrapper] # Keyed by conversation ID
inboundQueue: QueueRef
isRunning: bool
2025-07-05 14:54:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Constructors
#################################################
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
proc newClient*(name: string, cfg: WakuConfig): Client =
let waku = initWakuClient(cfg)
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
var q = QueueRef(queue: newAsyncQueue[ChatPayload](10))
2025-07-05 14:54:19 -07:00
var c = Client(ident: createIdentity(name),
2025-08-15 07:31:19 -07:00
ds: waku,
2025-07-05 14:54:19 -07:00
key_store: initTable[string, KeyEntry](),
2025-08-15 07:31:19 -07:00
conversations: initTable[string, ConvoWrapper](),
inboundQueue: q,
isRunning: false)
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
let default_inbox = initInbox(c.ident.getAddr())
2025-07-05 14:54:19 -07:00
c.conversations[conversation_id_for(c.ident.getPubkey(
))] = ConvoWrapper(convo_type: InboxV1Type, inboxV1: default_inbox)
2025-08-15 07:38:31 -07:00
notice "Client started", client = c.getId(),
2025-08-15 07:31:19 -07:00
default_inbox = default_inbox
result = c
2025-07-05 14:54:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Parameter Access
#################################################
2025-08-15 07:38:31 -07:00
proc getId(client: Client): string =
result = client.getId()
2025-07-05 14:54:19 -07:00
proc default_inbox_conversation_id*(self: Client): string =
## Returns the default inbox address for the client.
result = conversation_id_for(self.ident.getPubkey())
2025-08-15 08:05:59 -07:00
proc getConversationFromHint(self: Client,
conversation_hint: string): Result[Option[ConvoWrapper], string] =
# TODO: Implementing Hinting
if not self.conversations.hasKey(conversation_hint):
ok(none(ConvoWrapper))
else:
ok(some(self.conversations[conversation_hint]))
#################################################
# Functional
#################################################
2025-07-05 14:54:19 -07:00
proc createIntroBundle*(self: var Client): IntroBundle =
## Generates an IntroBundle for the client, which includes
## the required information to send a message.
# Create Ephemeral keypair, save it in the key store
2025-07-16 16:17:22 -07:00
let ephemeral_key = generate_key()
self.key_store[ephemeral_key.getPublickey().bytes().bytesToHex()] = KeyEntry(
2025-07-05 14:54:19 -07:00
keytype: "ephemeral",
2025-08-15 07:31:19 -07:00
privateKey: ephemeral_key,
timestamp: getTimestamp()
2025-07-05 14:54:19 -07:00
)
result = IntroBundle(
2025-07-16 16:17:22 -07:00
ident: @(self.ident.getPubkey().bytes()),
ephemeral: @(ephemeral_key.getPublicKey().bytes()),
2025-07-05 14:54:19 -07:00
)
2025-08-15 07:38:31 -07:00
notice "IntroBundleCreated", client = self.getId(),
2025-08-15 07:31:19 -07:00
pubBytes = result.ident
2025-08-15 08:05:59 -07:00
#################################################
# Conversation Initiation
#################################################
2025-08-15 07:31:19 -07:00
proc createPrivateConversation(client: Client, participant: PublicKey,
discriminator: string = "default"): Option[ChatError] =
# Creates a private conversation with the given participant and discriminator.
let convo = initPrivateV1(client.ident, participant, discriminator)
2025-07-07 14:20:46 -07:00
2025-08-15 07:31:19 -07:00
notice "Creating PrivateV1 conversation", topic = convo.getConvoId()
client.conversations[convo.getConvoId()] = ConvoWrapper(
2025-07-07 14:20:46 -07:00
convo_type: PrivateV1Type,
privateV1: convo
)
2025-08-15 07:31:19 -07:00
return some(convo.getConvoId())
2025-07-07 14:20:46 -07:00
2025-08-15 07:31:19 -07:00
proc newPrivateConversation*(client: Client,
intro_bundle: IntroBundle): Future[Option[ChatError]] {.async.} =
2025-07-05 14:54:19 -07:00
## Creates a private conversation with the given Invitebundle.
2025-08-15 07:38:31 -07:00
notice "New PRIVATE Convo ", clientId = client.getId(),
2025-08-15 07:31:19 -07:00
fromm = intro_bundle.ident.mapIt(it.toHex(2)).join("")
2025-07-16 16:17:22 -07:00
let res_pubkey = loadPublicKeyFromBytes(intro_bundle.ident)
2025-07-05 14:54:19 -07:00
if res_pubkey.isErr:
raise newException(ValueError, "Invalid public key in intro bundle.")
let dest_pubkey = res_pubkey.get()
2025-07-11 14:57:34 -07:00
let convo_id = conversation_id_for(dest_pubkey)
2025-07-05 14:54:19 -07:00
let dst_convo_topic = topic_inbox(dest_pubkey.get_addr())
let invite = InvitePrivateV1(
2025-08-15 07:31:19 -07:00
initiator: @(client.ident.getPubkey().bytes()),
initiator_ephemeral: @[0, 0], # TODO: Add ephemeral
2025-07-16 16:17:22 -07:00
participant: @(dest_pubkey.bytes()),
participant_ephemeral_id: intro_bundle.ephemeral_id,
discriminator: "test"
2025-07-05 14:54:19 -07:00
)
let env = wrap_env(encrypt(InboxV1Frame(invite_private_v1: invite,
recipient: "")), convo_id)
2025-08-15 07:31:19 -07:00
let convo = createPrivateConversation(client, dest_pubkey)
# TODO: Subscribe to new content topic
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
await client.ds.sendPayload(dst_convo_topic, env)
return none(ChatError)
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
proc acceptPrivateInvite(client: Client,
invite: InvitePrivateV1): Option[ChatError] =
2025-08-15 07:38:31 -07:00
notice "ACCEPT PRIVATE Convo ", clientId = client.getId(),
2025-08-15 07:31:19 -07:00
fromm = invite.initiator.mapIt(it.toHex(2)).join("")
let res_pubkey = loadPublicKeyFromBytes(invite.initiator)
if res_pubkey.isErr:
raise newException(ValueError, "Invalid public key in intro bundle.")
let dest_pubkey = res_pubkey.get()
let convo = createPrivateConversation(client, dest_pubkey)
# TODO: Subscribe to new content topic
result = none(ChatError)
2025-08-15 08:05:59 -07:00
#################################################
# Payload Handling
#################################################
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
proc handleInboxFrame(client: Client, frame: InboxV1Frame) =
case getKind(frame):
of type_InvitePrivateV1:
2025-08-15 07:38:31 -07:00
notice "Receive PrivateInvite", client = client.getId(),
2025-08-15 07:31:19 -07:00
frame = frame.invite_private_v1
discard client.acceptPrivateInvite(frame.invite_private_v1)
of type_Note:
notice "Receive Note", text = frame.note.text
proc handlePrivateFrame(client: Client, convo: PrivateV1, bytes: seq[byte]) =
let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result
let frame = convo.decrypt(enc) # TODO: handle result
case frame.getKind():
of type_ContentFrame:
2025-08-15 07:38:31 -07:00
notice "Got Mail", client = client.getId(),
2025-08-15 07:31:19 -07:00
text = frame.content.bytes.toUtfString()
of type_Placeholder:
2025-08-15 07:38:31 -07:00
notice "Got Placeholder", client = client.getId(),
2025-08-15 07:31:19 -07:00
text = frame.placeholder.counter
proc parseMessage(client: Client, msg: ChatPayload) =
2025-07-05 14:54:19 -07:00
## Reveives a incomming payload, decodes it, and processes it.
2025-08-15 07:38:31 -07:00
info "Parse", clientId = client.getId(), msg = msg,
2025-08-15 07:31:19 -07:00
contentTopic = msg.contentTopic
let res_env = decode(msg.bytes, WapEnvelopeV1)
2025-07-05 14:54:19 -07:00
if res_env.isErr:
2025-08-15 07:31:19 -07:00
raise newException(ValueError, "Failed to decode WsapEnvelopeV1: " & res_env.error)
2025-07-05 14:54:19 -07:00
let env = res_env.get()
2025-08-15 07:31:19 -07:00
let res_convo = client.getConversationFromHint(env.conversation_hint)
2025-07-05 14:54:19 -07:00
if res_convo.isErr:
raise newException(ValueError, "Failed to get conversation: " &
res_convo.error)
2025-08-15 07:31:19 -07:00
let resWrappedConvo = res_convo.get()
if not resWrappedConvo.isSome:
let k = toSeq(client.conversations.keys()).join(", ")
2025-08-15 07:38:31 -07:00
warn "No conversation found", client = client.getId(),
2025-08-15 07:31:19 -07:00
hint = env.conversation_hint, knownIds = k
2025-07-05 14:54:19 -07:00
return
2025-08-15 07:31:19 -07:00
let wrappedConvo = resWrappedConvo.get()
case wrappedConvo.convo_type:
of InboxV1Type:
let enc = decode(env.payload, EncryptedPayload).get() # TODO: handle result
let resFrame = inbox.decrypt(enc) # TODO: handle result
if resFrame.isErr:
error "Decrypt failed", error = resFrame.error()
raise newException(ValueError, "Failed to Decrypt MEssage: " &
resFrame.error)
client.handleInboxFrame(resFrame.get())
of PrivateV1Type:
client.handlePrivateFrame(wrapped_convo.private_v1, env.payload)
2025-08-15 08:05:59 -07:00
proc addMessage*(client: Client, convo: PrivateV1,
text: string = "") {.async.} =
let message = PrivateV1Frame(content: ContentFrame(domain: 0, tag: 1,
bytes: text.toBytes()))
await convo.sendMessage(client.ds, message)
#################################################
# Async Tasks
#################################################
2025-08-15 07:31:19 -07:00
proc messageQueueConsumer(client: Client) {.async.} =
## Main message processing loop
info "Message listener started"
while client.isRunning:
# Wait for next message (this will suspend the coroutine)
let message = await client.inboundQueue.queue.get()
2025-08-15 07:38:31 -07:00
notice "Inbound Message Received", client = client.getId(),
2025-08-15 07:31:19 -07:00
contentTopic = message.contentTopic, len = message.bytes.len()
try:
# Parse and handle the message
client.parseMessage(message)
except CatchableError as e:
error "Error in message listener", err = e.msg,
pubsub = message.pubsubTopic, contentTopic = message.contentTopic
# Continue running even if there's an error
proc simulateMessages(client: Client){.async.} =
while client.conversations.len() <= 1:
await sleepAsync(4000)
2025-07-05 14:54:19 -07:00
2025-08-15 07:38:31 -07:00
notice "Starting Message Simulation", client = client.getId()
2025-08-15 07:31:19 -07:00
for a in 1..5:
await sleepAsync(4000)
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
notice "Send to"
for convoWrapper in client.conversations.values():
if convoWrapper.convo_type == PrivateV1Type:
await client.addMessage(convoWrapper.privateV1, fmt"message: {a}")
2025-07-05 14:54:19 -07:00
2025-08-15 08:05:59 -07:00
#################################################
# Control Functions
#################################################
2025-07-05 14:54:19 -07:00
2025-08-15 07:31:19 -07:00
proc start*(client: Client) {.async.} =
# Start the message listener in the backgrounds
client.ds.addDispatchQueue(client.inboundQueue)
asyncSpawn client.ds.start()
2025-08-15 07:31:19 -07:00
client.isRunning = true
2025-08-15 07:31:19 -07:00
asyncSpawn client.messageQueueConsumer()
asyncSpawn client.simulateMessages()
2025-08-15 07:31:19 -07:00
notice "Client start complete"
2025-08-15 07:31:19 -07:00
proc stop*(client: Client) =
## Stop the client
client.isRunning = false
notice "Client stopped"