From f6ce4e8ac63931b21f23848edcf90edb918e20c4 Mon Sep 17 00:00:00 2001 From: Jazz Turner-Baggs <473256+jazzz@users.noreply.github.com> Date: Fri, 15 Aug 2025 07:31:19 -0700 Subject: [PATCH] Add basic messages over relay --- nim.cfg | 3 + protos/inbox.proto | 5 + protos/{conversations => }/private_v1.proto | 0 src/client.nim | 265 ++++++++++++++------ src/conversations/private_v1.nim | 40 ++- src/crypto.nim | 1 - src/identity.nim | 3 + src/inbox.nim | 56 ++--- src/nim_chat_poc.nim | 93 +++---- src/proto_types.nim | 63 ++++- src/types.nim | 1 + src/utils.nim | 14 +- src/waku_client.nim | 154 ++++++------ 13 files changed, 426 insertions(+), 272 deletions(-) rename protos/{conversations => }/private_v1.proto (100%) create mode 100644 src/types.nim diff --git a/nim.cfg b/nim.cfg index 5d56899..1dd4ceb 100644 --- a/nim.cfg +++ b/nim.cfg @@ -11,3 +11,6 @@ --experimental --passL: "./vendor/waku/librln_v0.7.0.a" + +--define: chronicles_runtime_filtering + diff --git a/protos/inbox.proto b/protos/inbox.proto index 6f85db5..438b977 100644 --- a/protos/inbox.proto +++ b/protos/inbox.proto @@ -4,9 +4,14 @@ package wap.inbox; import "invite.proto"; +message Note{ + string text = 1; +} + message InboxV1Frame { string recipient = 1; oneof frame_type { invite.InvitePrivateV1 invite_private_v1 = 10; + Note note = 11; } } diff --git a/protos/conversations/private_v1.proto b/protos/private_v1.proto similarity index 100% rename from protos/conversations/private_v1.proto rename to protos/private_v1.proto diff --git a/src/client.nim b/src/client.nim index ab5e93c..36bea8a 100644 --- a/src/client.nim +++ b/src/client.nim @@ -1,16 +1,26 @@ -import tables -import identity -import crypto -import proto_types -import std/times -import utils -import dev -import inbox -import conversations/private_v1 +import # Foreign + chronicles, + chronos, + json, + std/sequtils, + strformat, + strutils, + tables -import secp256k1 -import chronicles +import #local + conversations/private_v1, + crypto, + identity, + inbox, + proto_types, + types, + utils, + waku_client + + +logScope: + topics = "chat client" type KeyEntry* = object keytype: string @@ -18,9 +28,9 @@ type KeyEntry* = object timestamp: int64 -type SupportedConvoTypes* = Inbox | PrivateV1 - type + SupportedConvoTypes* = Inbox | PrivateV1 + ConvoType* = enum InboxV1Type, PrivateV1Type @@ -31,57 +41,39 @@ type of PrivateV1Type: privateV1*: PrivateV1 - -type - Client* = ref object - ident: Identity - key_store: Table[string, KeyEntry] # Keyed by HexEncoded Public Key - conversations: Table[string, ConvoWrapper] # Keyed by conversation ID +type Client* = ref object + ident: Identity + ds*: WakuClient + key_store: Table[string, KeyEntry] # Keyed by HexEncoded Public Key + conversations: Table[string, ConvoWrapper] # Keyed by conversation ID + inboundQueue: QueueRef + isRunning: bool -proc process_invite*(self: var Client, invite: InvitePrivateV1) - - -################################################# -# Constructors -################################################# - -proc initClient*(name: string): Client = - +proc newClient*(name: string, cfg: WakuConfig): Client = + let waku = initWakuClient(cfg) + var q = QueueRef(queue: newAsyncQueue[ChatPayload](10)) var c = Client(ident: createIdentity(name), + ds: waku, key_store: initTable[string, KeyEntry](), - conversations: initTable[string, ConvoWrapper]()) - - let default_inbox = initInbox(c.ident.getAddr(), proc( - x: InvitePrivateV1) = c.process_invite(x)) + conversations: initTable[string, ConvoWrapper](), + inboundQueue: q, + isRunning: false) + let default_inbox = initInbox(c.ident.getAddr()) c.conversations[conversation_id_for(c.ident.getPubkey( ))] = ConvoWrapper(convo_type: InboxV1Type, inboxV1: default_inbox) + + notice "Client started", client = c.ident.getId(), + default_inbox = default_inbox result = c - -################################################# -# Parameter Access -################################################# - -proc getClientAddr*(self: Client): string = - result = self.ident.getAddr() - proc default_inbox_conversation_id*(self: Client): string = ## Returns the default inbox address for the client. result = conversation_id_for(self.ident.getPubkey()) - -proc getConversations*(self: Client): Table[string, ConvoWrapper] = - ## Returns the conversations table for the client. - result = self.conversations - -################################################# -# Methods -################################################# - proc createIntroBundle*(self: var Client): IntroBundle = ## Generates an IntroBundle for the client, which includes ## the required information to send a message. @@ -90,8 +82,8 @@ proc createIntroBundle*(self: var Client): IntroBundle = let ephemeral_key = generate_key() self.key_store[ephemeral_key.getPublickey().bytes().bytesToHex()] = KeyEntry( keytype: "ephemeral", - privateKEy: ephemeral_key, - timestamp: getTime().toUnix(), + privateKey: ephemeral_key, + timestamp: getTimestamp() ) result = IntroBundle( @@ -99,21 +91,29 @@ proc createIntroBundle*(self: var Client): IntroBundle = ephemeral: @(ephemeral_key.getPublicKey().bytes()), ) -proc createPrivateConversation*(self: var Client, participant: PublicKey, - discriminator: string = "default") = - ## Creates a private conversation with the given participant and discriminator. - let convo = initPrivateV1(self.ident, participant, discriminator) + notice "IntroBundleCreated", client = self.ident.getId(), + pubBytes = result.ident - info "Creating PrivateV1 conversation", topic = convo.get_topic - self.conversations[convo.get_topic()] = ConvoWrapper( +proc createPrivateConversation(client: Client, participant: PublicKey, + discriminator: string = "default"): Option[ChatError] = + # Creates a private conversation with the given participant and discriminator. + let convo = initPrivateV1(client.ident, participant, discriminator) + + notice "Creating PrivateV1 conversation", topic = convo.getConvoId() + client.conversations[convo.getConvoId()] = ConvoWrapper( convo_type: PrivateV1Type, privateV1: convo ) + return some(convo.getConvoId()) -proc handleIntro*(self: var Client, intro_bundle: IntroBundle): TransportMessage = +proc newPrivateConversation*(client: Client, + intro_bundle: IntroBundle): Future[Option[ChatError]] {.async.} = ## Creates a private conversation with the given Invitebundle. + notice "New PRIVATE Convo ", clientId = client.ident.getId(), + fromm = intro_bundle.ident.mapIt(it.toHex(2)).join("") + let res_pubkey = loadPublicKeyFromBytes(intro_bundle.ident) if res_pubkey.isErr: raise newException(ValueError, "Invalid public key in intro bundle.") @@ -123,7 +123,7 @@ proc handleIntro*(self: var Client, intro_bundle: IntroBundle): TransportMessage let dst_convo_topic = topic_inbox(dest_pubkey.get_addr()) let invite = InvitePrivateV1( - initiator: @(self.ident.getPubkey().bytes()), + initiator: @(client.ident.getPubkey().bytes()), initiator_ephemeral: @[0, 0], # TODO: Add ephemeral participant: @(dest_pubkey.bytes()), participant_ephemeral_id: intro_bundle.ephemeral_id, @@ -132,11 +132,32 @@ proc handleIntro*(self: var Client, intro_bundle: IntroBundle): TransportMessage let env = wrap_env(encrypt(InboxV1Frame(invite_private_v1: invite, recipient: "")), convo_id) - createPrivateConversation(self, dest_pubkey) + let convo = createPrivateConversation(client, dest_pubkey) + # TODO: Subscribe to new content topic - return sendTo(dst_convo_topic, encode(env)) + await client.ds.sendPayload(dst_convo_topic, env) + return none(ChatError) -proc get_conversation(self: Client, +proc acceptPrivateInvite(client: Client, + invite: InvitePrivateV1): Option[ChatError] = + + + notice "ACCEPT PRIVATE Convo ", clientId = client.ident.getId(), + fromm = invite.initiator.mapIt(it.toHex(2)).join("") + + let res_pubkey = loadPublicKeyFromBytes(invite.initiator) + if res_pubkey.isErr: + raise newException(ValueError, "Invalid public key in intro bundle.") + let dest_pubkey = res_pubkey.get() + + let convo = createPrivateConversation(client, dest_pubkey) + # TODO: Subscribe to new content topic + + result = none(ChatError) + + + +proc getConversationFromHint(self: Client, conversation_hint: string): Result[Option[ConvoWrapper], string] = # TODO: Implementing Hinting @@ -146,39 +167,121 @@ proc get_conversation(self: Client, ok(some(self.conversations[conversation_hint])) -proc recv*(self: var Client, transport_message: TransportMessage): seq[ - TransportMessage] = +proc handleInboxFrame(client: Client, frame: InboxV1Frame) = + case getKind(frame): + of type_InvitePrivateV1: + notice "Receive PrivateInvite", client = client.ident.getId(), + frame = frame.invite_private_v1 + discard client.acceptPrivateInvite(frame.invite_private_v1) + + of type_Note: + notice "Receive Note", text = frame.note.text + +proc handlePrivateFrame(client: Client, convo: PrivateV1, bytes: seq[byte]) = + let enc = decode(bytes, EncryptedPayload).get() # TODO: handle result + let frame = convo.decrypt(enc) # TODO: handle result + + case frame.getKind(): + of type_ContentFrame: + notice "Got Mail", client = client.ident.getId(), + text = frame.content.bytes.toUtfString() + of type_Placeholder: + notice "Got Placeholder", client = client.ident.getId(), + text = frame.placeholder.counter + +proc parseMessage(client: Client, msg: ChatPayload) = ## Reveives a incomming payload, decodes it, and processes it. - let res_env = decode(transport_message.payload, WapEnvelopeV1) + info "Parse", clientId = client.ident.getId(), msg = msg, + contentTopic = msg.contentTopic + + let res_env = decode(msg.bytes, WapEnvelopeV1) if res_env.isErr: - raise newException(ValueError, "Failed to decode WapEnvelopeV1: " & res_env.error) + raise newException(ValueError, "Failed to decode WsapEnvelopeV1: " & res_env.error) let env = res_env.get() - let res_convo = self.get_conversation(env.conversation_hint) + let res_convo = client.getConversationFromHint(env.conversation_hint) if res_convo.isErr: raise newException(ValueError, "Failed to get conversation: " & res_convo.error) - let convo = res_convo.get() - if not convo.isSome: - debug "No conversation found", hint = env.conversation_hint + let resWrappedConvo = res_convo.get() + if not resWrappedConvo.isSome: + let k = toSeq(client.conversations.keys()).join(", ") + warn "No conversation found", client = client.ident.getId(), + hint = env.conversation_hint, knownIds = k return - let inbox = convo.get().inboxV1 + let wrappedConvo = resWrappedConvo.get() - let res = inbox.handle_incomming_frame(transport_message.topic, env.payload) - if res.isErr: - warn "Failed to handle incoming frame: ", error = res.error - return @[] + case wrappedConvo.convo_type: + of InboxV1Type: + let enc = decode(env.payload, EncryptedPayload).get() # TODO: handle result + let resFrame = inbox.decrypt(enc) # TODO: handle result + if resFrame.isErr: + error "Decrypt failed", error = resFrame.error() + raise newException(ValueError, "Failed to Decrypt MEssage: " & + resFrame.error) + + client.handleInboxFrame(resFrame.get()) + + of PrivateV1Type: + client.handlePrivateFrame(wrapped_convo.private_v1, env.payload) + +proc messageQueueConsumer(client: Client) {.async.} = + ## Main message processing loop + info "Message listener started" + + while client.isRunning: + # Wait for next message (this will suspend the coroutine) + let message = await client.inboundQueue.queue.get() + + notice "Inbound Message Received", client = client.ident.getId(), + contentTopic = message.contentTopic, len = message.bytes.len() + try: + # Parse and handle the message + client.parseMessage(message) + + except CatchableError as e: + error "Error in message listener", err = e.msg, + pubsub = message.pubsubTopic, contentTopic = message.contentTopic + # Continue running even if there's an error + +proc addMessage*(client: Client, convo: PrivateV1, + text: string = "") {.async.} = + + let message = PrivateV1Frame(content: ContentFrame(domain: 0, tag: 1, + bytes: text.toBytes())) + + await convo.sendMessage(client.ds, message) + +proc simulateMessages(client: Client){.async.} = + + while client.conversations.len() <= 1: + await sleepAsync(4000) + + notice "Starting Message Simulation", client = client.ident.getId() + for a in 1..5: + await sleepAsync(4000) + + notice "Send to" + for convoWrapper in client.conversations.values(): + if convoWrapper.convo_type == PrivateV1Type: + await client.addMessage(convoWrapper.privateV1, fmt"message: {a}") +proc start*(client: Client) {.async.} = + # Start the message listener in the backgrounds + client.ds.addDispatchQueue(client.inboundQueue) + asyncSpawn client.ds.start() -proc processInvite*(self: var Client, invite: InvitePrivateV1) = - debug "Callback Invoked", invite = invite - - createPrivateConversation(self, loadPublicKeyFromBytes( - invite.initiator).get(), - invite.discriminator) + client.isRunning = true + asyncSpawn client.messageQueueConsumer() + asyncSpawn client.simulateMessages() + notice "Client start complete" +proc stop*(client: Client) = + ## Stop the client + client.isRunning = false + notice "Client stopped" diff --git a/src/conversations/private_v1.nim b/src/conversations/private_v1.nim index de831a7..6fe56dd 100644 --- a/src/conversations/private_v1.nim +++ b/src/conversations/private_v1.nim @@ -1,11 +1,19 @@ -import ../identity -import ../crypto -import ../utils + +import chronicles +import chronos import std/[sequtils, strutils] import std/algorithm import sugar +import ../[ + crypto, + identity, + proto_types, + utils, + waku_client +] + type PrivateV1* = object # Placeholder for PrivateV1 conversation type @@ -14,22 +22,32 @@ type participants: seq[PublicKey] discriminator: string -proc get_topic*(self: PrivateV1): string = +proc getTopic*(self: PrivateV1): string = ## Returns the topic for the PrivateV1 conversation. return self.topic -proc derive_topic(participants: seq[PublicKey], discriminator: string): string = - ## Derives a topic from the participants' public keys. + +proc getConvoIdRaw(participants: seq[PublicKey], + discriminator: string): string = # This is a placeholder implementation. var addrs = participants.map(x => x.get_addr()); addrs.sort() addrs.add(discriminator) let raw = addrs.join("|") + return utils.hash_func(raw) - return "/convo/private/" & utils.hash_func(raw) +proc getConvoId*(self: PrivateV1): string = + return getConvoIdRaw(self.participants, self.discriminator) +proc derive_topic(participants: seq[PublicKey], discriminator: string): string = + ## Derives a topic from the participants' public keys. + return "/convo/private/" & getConvoIdRaw(participants, discriminator) +proc encrypt*(convo: PrivateV1, frame: PrivateV1Frame): EncryptedPayload = + result = EncryptedPayload(plaintext: Plaintext(payload: encode(frame))) +proc decrypt*(convo: PrivateV1, enc: EncryptedPayload): PrivateV1Frame = + result = decode(enc.plaintext.payload, PrivateV1Frame).get() proc initPrivateV1*(owner: Identity, participant: PublicKey, discriminator: string = "default"): PrivateV1 = @@ -43,4 +61,12 @@ proc initPrivateV1*(owner: Identity, participant: PublicKey, discriminator: discriminator ) +proc sendMessage*(self: PrivateV1, ds: WakuClient, + msg: PrivateV1Frame): Future[void]{.async.} = + notice "SENDING MSG", fromm = self.owner.getId(), + participants = self.participants, msg = msg + let encryptedBytes = EncryptedPayload(plaintext: Plaintext(payload: encode(msg))) + + discard ds.sendPayload(self.getTopic(), encryptedBytes.toEnvelope( + self.getConvoId())) diff --git a/src/crypto.nim b/src/crypto.nim index 6853e53..7bef428 100644 --- a/src/crypto.nim +++ b/src/crypto.nim @@ -19,7 +19,6 @@ proc decrypt_plain*[T: EncryptableTypes](ciphertext: Plaintext, t: typedesc[ let obj = decode(ciphertext.payload, T) if obj.isErr: return err("Protobuf decode failed: " & obj.error) - result = ok(obj.get()) proc generate_key*(): PrivateKey = diff --git a/src/identity.nim b/src/identity.nim index 29d2df5..1e8755b 100644 --- a/src/identity.nim +++ b/src/identity.nim @@ -29,3 +29,6 @@ proc getPubkey*(self: Identity): PublicKey = proc getAddr*(self: Identity): string = result = get_addr(self.getPubKey()) + +proc getId*(self: Identity): string = + result = self.name diff --git a/src/inbox.nim b/src/inbox.nim index 48b1511..483697a 100644 --- a/src/inbox.nim +++ b/src/inbox.nim @@ -1,27 +1,31 @@ -import crypto -import proto_types -import utils -import dev -import chronicles -import results - - -type InviteCallback* = proc(invite: InvitePrivateV1): void +import + chronicles, + chronos, + results +import + crypto, + proto_types, + utils type Inbox* = object inbox_addr: string - invite_callback: InviteCallback -proc initInbox*(inbox_addr: string, invite_callback: InviteCallback): Inbox = +proc initInbox*(inbox_addr: string): Inbox = ## Initializes an Inbox object with the given address and invite callback. - return Inbox(inbox_addr: inbox_addr, invite_callback: invite_callback) - + return Inbox(inbox_addr: inbox_addr) proc encrypt*(frame: InboxV1Frame): EncryptedPayload = return encrypt_plain(frame) +proc decrypt*(encbytes: EncryptedPayload): Result[InboxV1Frame, string] = + let res_frame = decrypt_plain(encbytes.plaintext, InboxV1Frame) + if res_frame.isErr: + error "Failed to decrypt frame: ", err = res_frame.error + return err("Failed to decrypt frame: " & res_frame.error) + result = res_frame + proc wrap_env*(payload: EncryptedPayload, convo_id: string): WapEnvelopeV1 = let bytes = encode(payload) let salt = generateSalt() @@ -34,32 +38,8 @@ proc wrap_env*(payload: EncryptedPayload, convo_id: string): WapEnvelopeV1 = proc conversation_id_for*(pubkey: PublicKey): string = ## Generates a conversation ID based on the public key. - return "/convo/inbox/v1" & pubkey.get_addr() + return "/convo/inbox/v1/" & pubkey.get_addr() # TODO derive this from instance of Inbox proc topic_inbox*(client_addr: string): string = return "/inbox/" & client_addr - - -proc handle_incomming_frame*(self: Inbox, topic: string, bytes: seq[ - byte]): Result[int, string] = - # TODO: Can this fail? - let res = decode(bytes, EncryptedPayload) - if res.isErr: - return err("Failed to decode payload: " & res.error) - - let encbytes = res.get() - - # NOTE! in nim_protobuf_serializaiton OneOf fields are not exclusive, and all fields are default initialized. - if encbytes.plaintext == Plaintext(): - return err("Incorrect Encryption Type") - - let res_frame = decrypt_plain(encbytes.plaintext, InboxV1Frame) - if res_frame.isErr: - return err("Failed to decrypt frame: " & res_frame.error) - let frame = res_frame.get() - - self.invite_callback(frame.invite_private_v1) - ok(0) - - diff --git a/src/nim_chat_poc.nim b/src/nim_chat_poc.nim index 5b315d3..a31cfd6 100644 --- a/src/nim_chat_poc.nim +++ b/src/nim_chat_poc.nim @@ -1,67 +1,48 @@ +import chronos +import chronicles +import client +import waku_client -import - chronicles, - chronos, - strformat +proc initLogging() = + when defined(chronicles_runtime_filtering): + setLogLevel(LogLevel.Debug) + discard setTopicState("waku filter", chronicles.Normal, LogLevel.Error) + discard setTopicState("waku relay", chronicles.Normal, LogLevel.Error) + discard setTopicState("chat client", chronicles.Enabled, LogLevel.Debug) -import - waku_client +proc main() {.async.} = + # Create Configurations + var cfg_saro = DefaultConfig() + var cfg_raya = DefaultConfig() -proc handleMessages(pubsubTopic: string, message: seq[byte]): Future[ - void] {.gcsafe, raises: [Defect].} = - info "ClientRecv", pubTopic = pubsubTopic, msg = message + # Cross pollinate Peers + cfg_saro.staticPeers.add(cfg_raya.getMultiAddr()) + cfg_raya.staticPeers.add(cfg_saro.getMultiAddr()) -proc demoSendLoop(client: WakuClient): Future[void] {.async.} = - for i in 1..10: - await sleepAsync(20.seconds) - discard client.sendMessage(&"Message:{i}") + info "CFG", cfg = cfg_raya + info "CFG", cfg = cfg_saro -proc main(): Future[void] {.async.} = - echo "Starting POC" - let cfg = DefaultConfig() - let client = initWakuClient(cfg, @[PayloadHandler(handleMessages)]) - asyncSpawn client.start() + # Start Clients + var saro = newClient("Saro", cfg_saro) + await saro.start() - await demoSendLoop(client) + var raya = newClient("Raya", cfg_raya) + await raya.start() - echo "End of POC" + await sleepAsync(5000) + + # Perform OOB Introduction: Raya -> Saro + let raya_bundle = raya.createIntroBundle() + discard await saro.newPrivateConversation(raya_bundle) + + # Let messages process + await sleepAsync(400000) + + saro.stop() + raya.stop() when isMainModule: + initLogging() waitFor main() - -# import client -# import chronicles -# import proto_types - -# proc log(transport_message: TransportMessage) = -# ## Log the transport message -# info "Transport Message:", topic = transport_message.topic, -# payload = transport_message.payload - -# proc demo() = - -# # Initalize Clients -# var saro = initClient("Saro") -# var raya = initClient("Raya") - -# # # Exchange Contact Info -# let raya_bundle = raya.createIntroBundle() - -# # Create Conversation -# let invite = saro.createPrivateConvo(raya_bundle) -# invite.log() -# let msgs = raya.recv(invite) - -# # raya.convos()[0].sendText("Hello Saro, this is Raya!") - - -# when isMainModule: -# echo("Starting ChatPOC...") - -# try: -# demo() -# except Exception as e: -# error "Crashed ", error = e.msg - -# echo("Finished...") + notice "Shutdown" diff --git a/src/proto_types.nim b/src/proto_types.nim index ff18d71..2dbfb7e 100644 --- a/src/proto_types.nim +++ b/src/proto_types.nim @@ -4,6 +4,7 @@ import protobuf_serialization # This import is needed or th macro will not work import protobuf_serialization/proto_parser import results +import std/random export protobuf_serialization @@ -11,11 +12,16 @@ import_proto3 "../protos/inbox.proto" # import_proto3 "../protos/invite.proto" // Import3 follows protobuf includes so this will result in a redefinition error import_proto3 "../protos/encryption.proto" import_proto3 "../protos/envelope.proto" +# import_proto3 "../protos/common_frames.proto" + +import_proto3 "../protos/private_v1.proto" type EncryptableTypes = InboxV1Frame | EncryptedPayload +export ContentFrame export EncryptedPayload export InboxV1Frame +export PrivateV1Frame export EncryptableTypes @@ -45,14 +51,57 @@ type export IntroBundle +proc generateSalt*(): uint64 = + randomize() + result = 0 + for i in 0 ..< 8: + result = result or (uint64(rand(255)) shl (i * 8)) + + +proc toEnvelope*(payload: EncryptedPayload, convo_id: string): WapEnvelopeV1 = + let bytes = encode(payload) + let salt = generateSalt() + + # TODO: Implement hinting + return WapEnvelopeV1( + payload: bytes, + salt: salt, + conversation_hint: convo_id, + ) + +########################################################### +# nim-serialize-protobuf does not support oneof fields. +# As a stop gap each object using oneof fields, needs +# a implementation to look up the type. +# +# The valid field is determined by the fields which +# is not set to the default value +########################################################### type - TransportMessage {.proto3.} = object - topic* {.fieldNumber: 1.}: string - payload* {.fieldNumber: 2.}: seq[byte] + InboxV1FrameType* = enum + type_InvitePrivateV1, type_Note -# Place holder for a transport channel -proc sendTo*(topic: string, payload: seq[byte]): TransportMessage = - result = TransportMessage(topic: topic, payload: payload) +proc getKind*(obj: InboxV1Frame): InboxV1FrameType = -export TransportMessage + if obj.invite_private_v1 != InvitePrivateV1(): + return type_InvitePrivateV1 + + if obj.note != Note(): + return type_Note + + raise newException(ValueError, "Un handled one of type") + +type + PrivateV1FrameType* = enum + type_ContentFrame, type_Placeholder + +proc getKind*(obj: PrivateV1Frame): PrivateV1FrameType = + + if obj.content != ContentFrame(): + return type_ContentFrame + + if obj.placeholder != Placeholder(): + return type_Placeholder + + raise newException(ValueError, "Un handled one of type") diff --git a/src/types.nim b/src/types.nim new file mode 100644 index 0000000..bb42ea2 --- /dev/null +++ b/src/types.nim @@ -0,0 +1 @@ +type ChatError* = string diff --git a/src/utils.nim b/src/utils.nim index a77ece2..b212989 100644 --- a/src/utils.nim +++ b/src/utils.nim @@ -7,20 +7,11 @@ import strutils proc getTimestamp*(): Timestamp = result = waku_core.getNanosecondTime(getTime().toUnix()) -proc generateSalt*(): uint64 = - randomize() - result = 0 - for i in 0 ..< 8: - result = result or (uint64(rand(255)) shl (i * 8)) proc hash_func*(s: string): string = # This should be Blake2s but it does not exist so substituting with Blake2b result = getBlake2b(s, 4, "") -proc get_addr*(pubkey: SkPublicKey): string = - # TODO: Needs Spec - result = hash_func(pubkey.toHexCompressed()) - proc bytesToHex*[T](bytes: openarray[T], lowercase: bool = false): string = ## Convert bytes to hex string with case option result = "" @@ -32,3 +23,8 @@ proc get_addr*(pubkey: PublicKey): string = # TODO: Needs Spec result = hash_func(pubkey.bytes().bytesToHex()) +proc toBytes*(s: string): seq[byte] = + result = cast[seq[byte]](s) + +proc toUtfString*(b: seq[byte]): string = + result = cast[string](b) diff --git a/src/waku_client.nim b/src/waku_client.nim index 81c27b7..2b55367 100644 --- a/src/waku_client.nim +++ b/src/waku_client.nim @@ -2,11 +2,12 @@ import chronicles, chronos, confutils, - eth/keys, eth/p2p/discoveryv5/enr, libp2p/crypto/crypto, + libp2p/peerid, std/random, stew/byteutils, + std/sequtils, strformat, waku/[ common/logging, @@ -19,64 +20,78 @@ import waku_filter_v2/client, ] -import utils +import utils, proto_types + +logScope: + topics = "chat waku" + +type ChatPayload* = object + pubsubTopic*: PubsubTopic + contentTopic*: string + timestamp*: Timestamp + bytes*: seq[byte] + +proc toChatPayload*(msg: WakuMessage, pubsubTopic: PubsubTopic): ChatPayload = + result = ChatPayload(pubsubTopic: pubsubTopic, contentTopic: msg.contentTopic, + timestamp: msg.timestamp, bytes: msg.payload) + const - StaticPeer = "/ip4/64.225.80.192/tcp/30303/p2p/16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb" + # Placeholder FilterContentTopic = ContentTopic("/chatsdk/test/proto") -type PayloadHandler* = proc(pubsubTopic: string, message: seq[byte]): Future[void] {. - gcsafe, raises: [Defect] - .} +type QueueRef* = ref object + queue*: AsyncQueue[ChatPayload] + type WakuConfig* = object + nodekey: crypto.PrivateKey port*: uint16 clusterId*: uint16 shardId*: seq[uint16] ## @[0'u16] pubsubTopic*: string + staticPeers*: seq[string] + +proc getMultiAddr*(cfg: WakuConfig): string = + # TODO: Handle bad PubKey + var peerId = PeerId.init(cfg.nodekey.getPublicKey().get())[] #16Uiu2HAmNaeL4p3WEYzC9mgXBmBWSgWjPHRvatZTXnp8Jgv3iKsb + + # TODO: format IP address + result = fmt"/ip4/127.0.0.1/tcp/{cfg.port}/p2p/{peerId}" + type WakuClient* = ref object cfg: WakuConfig node*: WakuNode - handlers: seq[PayloadHandler] + dispatchQueues: seq[QueueRef] + staticPeerList: seq[string] proc DefaultConfig*(): WakuConfig = - + let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] let clusterId = 1'u16 let shardId = 3'u16 var port: uint16 = 50000'u16 + uint16(rand(200)) - result = WakuConfig(port: port, clusterId: clusterId, shardId: @[shardId], - pubsubTopic: &"/waku/2/rs/{clusterId}/{shardId}") + result = WakuConfig(nodeKey: nodeKey, port: port, clusterId: clusterId, + shardId: @[shardId], pubsubTopic: &"/waku/2/rs/{clusterId}/{shardId}", + staticPeers: @[]) +proc sendPayload*(client: WakuClient, contentTopic: string, + env: WapEnvelopeV1) {.async.} = + let bytes = encode(env) -proc sendMessage*(client: WakuClient, payload: string): Future[void] {.async.} = - let bytes = payload.toBytes - - var msg = WakuMessage( - payload: bytes, - contentTopic: FilterContentTopic, - ephemeral: true, - version: 0, - timestamp: getTimestamp() - ) - - let pubMsg = WakuMessage(payload: bytes) + let msg = WakuMessage(contentTopic: contentTopic, payload: bytes) let res = await client.node.publish(some(PubsubTopic(client.cfg.pubsubTopic)), msg) if res.isErr: error "Failed to Publish", err = res.error, pubsubTopic = client.cfg.pubsubTopic - info "SendMessage", payload = payload, pubsubTopic = client.cfg.pubsubTopic, msg = msg - proc buildWakuNode(cfg: WakuConfig): WakuNode = - let - nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] ip = parseIpAddress("0.0.0.0") flags = CapabilitiesBitfield.init(relay = true) @@ -84,7 +99,7 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode = error "Relay shards initialization failed", error = error quit(QuitFailure) - var enrBuilder = EnrBuilder.init(nodeKey) + var enrBuilder = EnrBuilder.init(cfg.nodeKey) enrBuilder.withWakuRelaySharding(relayShards).expect( "Building ENR with relay sharding failed" ) @@ -98,7 +113,7 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode = recordRes.get() var builder = WakuNodeBuilder.init() - builder.withNodeKey(nodeKey) + builder.withNodeKey(cfg.nodeKey) builder.withRecord(record) builder.withNetworkConfigurationDetails(ip, Port(cfg.port)).tryGet() let node = builder.build().tryGet() @@ -107,54 +122,37 @@ proc buildWakuNode(cfg: WakuConfig): WakuNode = result = node -proc messageHandler(client: WakuClient, pubsubTopic: PubsubTopic, - message: WakuMessage -) {.async, gcsafe.} = - let payloadStr = string.fromBytes(message.payload) - notice "message received", - payload = payloadStr, - pubsubTopic = pubsubTopic, - contentTopic = message.contentTopic, - timestamp = message.timestamp - - - for handler in client.handlers: - discard handler(pubsubTopic, message.payload) - proc taskKeepAlive(client: WakuClient) {.async.} = - let peer = parsePeerInfo(StaticPeer).get() while true: - notice "maintaining subscription" - # First use filter-ping to check if we have an active subscription - let pingRes = await client.node.wakuFilterClient.ping(peer) - if pingRes.isErr(): - # No subscription found. Let's subscribe. - notice "no subscription found. Sending subscribe request" + for peerStr in client.staticPeerList: + let peer = parsePeerInfo(peerStr).get() - let subscribeRes = await client.node.wakuFilterClient.subscribe( - peer, client.cfg.pubsubTopic, @[FilterContentTopic] - ) + debug "maintaining subscription" + # First use filter-ping to check if we have an active subscription + let pingRes = await client.node.wakuFilterClient.ping(peer) + if pingRes.isErr(): + # No subscription found. Let's subscribe. + warn "no subscription found. Sending subscribe request" - if subscribeRes.isErr(): - notice "subscribe request failed. Quitting.", err = subscribeRes.error - break + # TODO: Use filter. Removing this stops relay from working so keeping for now + let subscribeRes = await client.node.wakuFilterClient.subscribe( + peer, client.cfg.pubsubTopic, @[FilterContentTopic] + ) + + if subscribeRes.isErr(): + error "subscribe request failed. Quitting.", err = subscribeRes.error + break + else: + debug "subscribe request successful." else: - notice "subscribe request successful." - else: - notice "subscription found." + debug "subscription found." await sleepAsync(60.seconds) # Subscription maintenance interval - -proc taskPublishDemo(client: WakuClient){.async.} = - for i in 0 ..< 15: - await client.sendMessage("Hello") - await sleepAsync(30.seconds) # Subscription maintenance interval - proc start*(client: WakuClient) {.async.} = setupLog(logging.LogLevel.NOTICE, logging.LogFormat.TEXT) - + await client.node.mountFilter() await client.node.mountFilterClient() await client.node.start() @@ -167,16 +165,26 @@ proc start*(client: WakuClient) {.async.} = let subscription: SubscriptionEvent = (kind: PubsubSub, topic: client.cfg.pubsubTopic) - let msg_handler = proc(pubsubTopic: PubsubTopic, - message: WakuMessage) {.async, gcsafe.} = discard client.messageHandler( - pubsubTopic, message) + proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + let payloadStr = string.fromBytes(msg.payload) + debug "message received", + pubsubTopic = topic, + contentTopic = msg.contentTopic - let res = subscribe(client.node, subscription, msg_handler) + let payload = msg.toChatPayload(topic) + + for queueRef in client.dispatchQueues: + await queueRef.queue.put(payload) + + let res = subscribe(client.node, subscription, handler) if res.isErr: error "Subscribe failed", err = res.error - await allFutures(taskKeepAlive(client), taskPublishDemo(client)) + await allFutures(taskKeepAlive(client)) -proc initWakuClient*(cfg: WakuConfig, handlers: seq[ - PayloadHandler]): WakuClient = - result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), handlers: handlers) +proc initWakuClient*(cfg: WakuConfig): WakuClient = + result = WakuClient(cfg: cfg, node: buildWakuNode(cfg), dispatchQueues: @[], + staticPeerList: cfg.staticPeers) + +proc addDispatchQueue*(client: var WakuClient, queue: QueueRef) = + client.dispatchQueues.add(queue)