From cc01e954efc8a6aeefe05923bdda480d59bb18c6 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 3 Dec 2025 15:49:38 +0800 Subject: [PATCH 1/6] feat: dns discovery bootstrap --- examples/pingpong.nim | 6 +++--- examples/tui/tui.nim | 12 ++++++------ src/chat/client.nim | 19 ++++++++++++++++++- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/examples/pingpong.nim b/examples/pingpong.nim index bfdb67c..f2e17f5 100644 --- a/examples/pingpong.nim +++ b/examples/pingpong.nim @@ -22,8 +22,8 @@ proc main() {.async.} = var cfg_raya = DefaultConfig() # Cross pollinate Peers - No Waku discovery is used in this example - cfg_saro.staticPeers.add(cfg_raya.getMultiAddr()) - cfg_raya.staticPeers.add(cfg_saro.getMultiAddr()) + # cfg_saro.staticPeers.add(cfg_raya.getMultiAddr()) + # cfg_raya.staticPeers.add(cfg_saro.getMultiAddr()) let sKey = loadPrivateKeyFromBytes(@[45u8, 216, 160, 24, 19, 207, 193, 214, 98, 92, 153, 145, 222, 247, 101, 99, 96, 131, 149, 185, 33, 187, 229, 251, 100, 158, 20, 131, 111, 97, 181, 210]).get() let rKey = loadPrivateKeyFromBytes(@[43u8, 12, 160, 51, 212, 90, 199, 160, 154, 164, 129, 229, 147, 69, 151, 17, 239, 51, 190, 33, 86, 164, 50, 105, 39, 250, 182, 116, 138, 132, 114, 234]).get() @@ -79,7 +79,7 @@ proc main() {.async.} = let raya_bundle = raya.createIntroBundle() discard await saro.newPrivateConversation(raya_bundle) - await sleepAsync(20.seconds) # Run for some time + await sleepAsync(200.seconds) # Run for some time saro.stop() raya.stop() diff --git a/examples/tui/tui.nim b/examples/tui/tui.nim index f49c608..914897f 100644 --- a/examples/tui/tui.nim +++ b/examples/tui/tui.nim @@ -106,9 +106,9 @@ proc getSelectedConvo(app: ChatApp): ptr ConvoInfo = proc createChatClient(name: string): Future[Client] {.async.} = var cfg = await getCfg(name) - for key, val in fetchRegistrations(): - if key != name: - cfg.waku.staticPeers.add(val) + # for key, val in fetchRegistrations(): + # if key != name: + # cfg.waku.staticPeers.add(val) result = newClient(cfg.waku, cfg.ident) @@ -124,7 +124,7 @@ proc sendMessage(app: ChatApp, convoInfo: ptr ConvoInfo, msg: string) {.async.} var msgId = "" if convoInfo.convo != nil: - msgId = await convoInfo.convo.sendMessage(app.client.ds, initTextFrame(msg).toContentFrame()) + msgId = await convoInfo.convo.sendMessage(initTextFrame(msg).toContentFrame()) convoInfo[].addMessage(msgId, "You", app.inputBuffer) @@ -490,7 +490,7 @@ proc appLoop(app: ChatApp, panes: seq[Pane]) : Future[void] {.async.} = illwillInit(fullscreen = false) # Clear buffer while true: - await sleepAsync(5.milliseconds) + await sleepAsync(chronos.milliseconds(5)) app.tb.clear() drawStatusBar(app, panes[0], fgBlack, getIdColor(app.client.getId())) @@ -527,7 +527,7 @@ proc appLoop(app: ChatApp, panes: seq[Pane]) : Future[void] {.async.} = proc peerWatch(app: ChatApp): Future[void] {.async.} = while true: - await sleepAsync(1.seconds) + await sleepAsync(chronos.seconds(1)) app.peerCount = app.client.ds.getConnectedPeerCount() diff --git a/src/chat/client.nim b/src/chat/client.nim index ee52046..5f1901b 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -15,6 +15,12 @@ import # Foreign tables, types +import + waku/[ + waku_node, + discovery/waku_dnsdisc + ] + import #local conversations, conversations/convo_impl, @@ -44,7 +50,7 @@ type type KeyEntry* = object keyType: string - privateKey: PrivateKey + privateKey: crypto.PrivateKey timestamp: int64 type Client* = ref object @@ -287,6 +293,17 @@ proc start*(client: Client) {.async.} = client.isRunning = true + let dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" + let nameServer = parseIpAddress("1.1.1.1") + let discoveredPeers = await retrieveDynamicBootstrapNodes(dnsDiscoveryUrl, @[nameServer]) + if discoveredPeers.isOk: + info "Connecting to discovered peers" + let remotePeers = discoveredPeers.get() + info "Discovered and connecting to peers", peerCount = remotePeers.len + asyncSpawn client.ds.node.connectToNodes(remotePeers) + else: + warn "Failed to find peers via DNS discovery", error = discoveredPeers.error + asyncSpawn client.messageQueueConsumer() notice "Client start complete", client = client.getId() From a76af7b2c785c0bc4de549bbe39d6673ecb9492b Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 3 Dec 2025 16:20:10 +0800 Subject: [PATCH 2/6] chore: move discovery to ds --- src/chat/client.nim | 19 +------------------ src/chat/delivery/waku_client.nim | 12 ++++++++++++ 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/src/chat/client.nim b/src/chat/client.nim index 5f1901b..ee52046 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -15,12 +15,6 @@ import # Foreign tables, types -import - waku/[ - waku_node, - discovery/waku_dnsdisc - ] - import #local conversations, conversations/convo_impl, @@ -50,7 +44,7 @@ type type KeyEntry* = object keyType: string - privateKey: crypto.PrivateKey + privateKey: PrivateKey timestamp: int64 type Client* = ref object @@ -293,17 +287,6 @@ proc start*(client: Client) {.async.} = client.isRunning = true - let dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" - let nameServer = parseIpAddress("1.1.1.1") - let discoveredPeers = await retrieveDynamicBootstrapNodes(dnsDiscoveryUrl, @[nameServer]) - if discoveredPeers.isOk: - info "Connecting to discovered peers" - let remotePeers = discoveredPeers.get() - info "Discovered and connecting to peers", peerCount = remotePeers.len - asyncSpawn client.ds.node.connectToNodes(remotePeers) - else: - warn "Failed to find peers via DNS discovery", error = discoveredPeers.error - asyncSpawn client.messageQueueConsumer() notice "Client start complete", client = client.getId() diff --git a/src/chat/delivery/waku_client.nim b/src/chat/delivery/waku_client.nim index 1a04d2c..efff71e 100644 --- a/src/chat/delivery/waku_client.nim +++ b/src/chat/delivery/waku_client.nim @@ -15,6 +15,7 @@ import waku_node, waku_enr, discovery/waku_discv5, + discovery/waku_dnsdisc, factory/builder, waku_filter_v2/client, ] @@ -161,6 +162,17 @@ proc start*(client: WakuClient) {.async.} = client.node.peerManager.start() + let dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" + let nameServer = parseIpAddress("1.1.1.1") + let discoveredPeers = await retrieveDynamicBootstrapNodes(dnsDiscoveryUrl, @[nameServer]) + if discoveredPeers.isOk: + info "Connecting to discovered peers" + let remotePeers = discoveredPeers.get() + info "Discovered and connecting to peers", peerCount = remotePeers.len + asyncSpawn client.node.connectToNodes(remotePeers) + else: + warn "Failed to find peers via DNS discovery", error = discoveredPeers.error + let subscription: SubscriptionEvent = (kind: PubsubSub, topic: client.cfg.pubsubTopic) From 09f2af49e97a34e7ce4b74791762eebf4a8b1671 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 3 Dec 2025 17:34:58 +0800 Subject: [PATCH 3/6] chore: stop with waku node --- examples/pingpong.nim | 6 +----- src/chat/client.nim | 1 + src/chat/delivery/waku_client.nim | 6 +++--- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/examples/pingpong.nim b/examples/pingpong.nim index f2e17f5..ed32e16 100644 --- a/examples/pingpong.nim +++ b/examples/pingpong.nim @@ -21,10 +21,6 @@ proc main() {.async.} = var cfg_saro = DefaultConfig() var cfg_raya = DefaultConfig() - # Cross pollinate Peers - No Waku discovery is used in this example - # cfg_saro.staticPeers.add(cfg_raya.getMultiAddr()) - # cfg_raya.staticPeers.add(cfg_saro.getMultiAddr()) - let sKey = loadPrivateKeyFromBytes(@[45u8, 216, 160, 24, 19, 207, 193, 214, 98, 92, 153, 145, 222, 247, 101, 99, 96, 131, 149, 185, 33, 187, 229, 251, 100, 158, 20, 131, 111, 97, 181, 210]).get() let rKey = loadPrivateKeyFromBytes(@[43u8, 12, 160, 51, 212, 90, 199, 160, 154, 164, 129, 229, 147, 69, 151, 17, 239, 51, 190, 33, 86, 164, 50, 105, 39, 250, 182, 116, 138, 132, 114, 234]).get() @@ -79,7 +75,7 @@ proc main() {.async.} = let raya_bundle = raya.createIntroBundle() discard await saro.newPrivateConversation(raya_bundle) - await sleepAsync(200.seconds) # Run for some time + await sleepAsync(20.seconds) # Run for some time saro.stop() raya.stop() diff --git a/src/chat/client.nim b/src/chat/client.nim index ee52046..ffbdac0 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -293,5 +293,6 @@ proc start*(client: Client) {.async.} = proc stop*(client: Client) = ## Stop the client. + client.ds.stop() client.isRunning = false notice "Client stopped", client = client.getId() diff --git a/src/chat/delivery/waku_client.nim b/src/chat/delivery/waku_client.nim index efff71e..4504131 100644 --- a/src/chat/delivery/waku_client.nim +++ b/src/chat/delivery/waku_client.nim @@ -72,8 +72,8 @@ type proc DefaultConfig*(): WakuConfig = let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[] - let clusterId = 19'u16 - let shardId = 0'u16 + let clusterId = 16'u16 + let shardId = 32'u16 var port: uint16 = 50000'u16 + uint16(rand(200)) result = WakuConfig(nodeKey: nodeKey, port: port, clusterId: clusterId, @@ -162,7 +162,7 @@ proc start*(client: WakuClient) {.async.} = client.node.peerManager.start() - let dnsDiscoveryUrl = "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" + let dnsDiscoveryUrl = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@boot.staging.status.nodes.status.im" let nameServer = parseIpAddress("1.1.1.1") let discoveredPeers = await retrieveDynamicBootstrapNodes(dnsDiscoveryUrl, @[nameServer]) if discoveredPeers.isOk: From 76e65f49b66929782780d3b05bd02b589f80df7c Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Wed, 3 Dec 2025 17:44:41 +0800 Subject: [PATCH 4/6] chore: async stop process --- examples/pingpong.nim | 6 +++--- examples/tui/tui.nim | 4 ---- src/chat/client.nim | 4 ++-- src/chat/delivery/waku_client.nim | 3 +++ 4 files changed, 8 insertions(+), 9 deletions(-) diff --git a/examples/pingpong.nim b/examples/pingpong.nim index ed32e16..6caf000 100644 --- a/examples/pingpong.nim +++ b/examples/pingpong.nim @@ -69,7 +69,7 @@ proc main() {.async.} = await saro.start() await raya.start() - await sleepAsync(5.seconds) + await sleepAsync(10.seconds) # Perform OOB Introduction: Raya -> Saro let raya_bundle = raya.createIntroBundle() @@ -77,8 +77,8 @@ proc main() {.async.} = await sleepAsync(20.seconds) # Run for some time - saro.stop() - raya.stop() + await saro.stop() + await raya.stop() when isMainModule: diff --git a/examples/tui/tui.nim b/examples/tui/tui.nim index 914897f..a3b4716 100644 --- a/examples/tui/tui.nim +++ b/examples/tui/tui.nim @@ -106,10 +106,6 @@ proc getSelectedConvo(app: ChatApp): ptr ConvoInfo = proc createChatClient(name: string): Future[Client] {.async.} = var cfg = await getCfg(name) - # for key, val in fetchRegistrations(): - # if key != name: - # cfg.waku.staticPeers.add(val) - result = newClient(cfg.waku, cfg.ident) diff --git a/src/chat/client.nim b/src/chat/client.nim index ffbdac0..75ad3c0 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -291,8 +291,8 @@ proc start*(client: Client) {.async.} = notice "Client start complete", client = client.getId() -proc stop*(client: Client) = +proc stop*(client: Client) {.async.} = ## Stop the client. - client.ds.stop() + await client.ds.stop() client.isRunning = false notice "Client stopped", client = client.getId() diff --git a/src/chat/delivery/waku_client.nim b/src/chat/delivery/waku_client.nim index 4504131..2e871df 100644 --- a/src/chat/delivery/waku_client.nim +++ b/src/chat/delivery/waku_client.nim @@ -206,3 +206,6 @@ proc getConnectedPeerCount*(client: WakuClient): int = if peerInfo.connectedness == Connected: inc count return count + +proc stop*(client: WakuClient) {.async.} = + await client.node.stop() From 567db393ac9b1cd3acb86d09cb864e940bfc33d2 Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 4 Dec 2025 10:45:16 +0800 Subject: [PATCH 5/6] fix: no error when decode failure --- src/chat/client.nim | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/chat/client.nim b/src/chat/client.nim index 75ad3c0..8fcfed3 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -234,9 +234,11 @@ proc newPrivateConversation*(client: Client, proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError, SerializationError].} = ## Receives a incoming payload, decodes it, and processes it. - - let envelope = decode(msg.bytes, WapEnvelopeV1).valueOr: - raise newException(ValueError, "Failed to decode WapEnvelopeV1: " & error) + let envelopeRes = decode(msg.bytes, WapEnvelopeV1) + if envelopeRes.isErr: + debug "Failed to decode WapEnvelopeV1", err = envelopeRes.error + return + let envelope = envelopeRes.get() let convo = block: let opt = client.getConversationFromHint(envelope.conversationHint).valueOr: From 70bd36785c8b9cf7be417ecba3b9cda4813e237d Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 4 Dec 2025 11:51:34 +0800 Subject: [PATCH 6/6] refactor: remove useless log --- src/chat/client.nim | 9 +++++++-- src/chat/conversations/private_v1.nim | 16 +++++++++++++++- src/chat/errors.nim | 1 + src/chat/inbox.nim | 17 ++++++++++++++++- 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/src/chat/client.nim b/src/chat/client.nim index 8fcfed3..3d5b374 100644 --- a/src/chat/client.nim +++ b/src/chat/client.nim @@ -229,14 +229,14 @@ proc newPrivateConversation*(client: Client, ################################################# # Payload Handling +# Receives a incoming payload, decodes it, and processes it. ################################################# proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError, SerializationError].} = - ## Receives a incoming payload, decodes it, and processes it. let envelopeRes = decode(msg.bytes, WapEnvelopeV1) if envelopeRes.isErr: - debug "Failed to decode WapEnvelopeV1", err = envelopeRes.error + debug "Failed to decode WapEnvelopeV1", client = client.getId(), err = envelopeRes.error return let envelope = envelopeRes.get() @@ -268,6 +268,11 @@ proc messageQueueConsumer(client: Client) {.async.} = while client.isRunning: let message = await client.inboundQueue.queue.get() + let topicRes = inbox.parseTopic(message.contentTopic).or(private_v1.parseTopic(message.contentTopic)) + if topicRes.isErr: + debug "Invalid content topic", client = client.getId(), err = topicRes.error, contentTopic = message.contentTopic + continue + notice "Inbound Message Received", client = client.getId(), contentTopic = message.contentTopic, len = message.bytes.len() try: diff --git a/src/chat/conversations/private_v1.nim b/src/chat/conversations/private_v1.nim index 5b31f4a..99a0236 100644 --- a/src/chat/conversations/private_v1.nim +++ b/src/chat/conversations/private_v1.nim @@ -42,6 +42,9 @@ type discriminator: string doubleratchet: naxolotl.Doubleratchet +const + TopicPrefixPrivateV1 = "/convo/private/" + proc getTopic*(self: PrivateV1): string = ## Returns the topic for the PrivateV1 conversation. return self.topic @@ -63,7 +66,18 @@ proc getConvoId*(self: PrivateV1): string = proc derive_topic(participants: seq[PublicKey], discriminator: string): string = ## Derives a topic from the participants' public keys. - return "/convo/private/" & getConvoIdRaw(participants, discriminator) + return TopicPrefixPrivateV1 & getConvoIdRaw(participants, discriminator) + +## Parses the topic to extract the conversation ID. +proc parseTopic*(topic: string): Result[string, ChatError] = + if not topic.startsWith(TopicPrefixPrivateV1): + return err(ChatError(code: errTopic, context: "Invalid topic prefix")) + + let id = topic.split('/')[^1] + if id == "": + return err(ChatError(code: errTopic, context: "Empty conversation ID")) + + return ok(id) proc calcMsgId(self: PrivateV1, msgBytes: seq[byte]): string = let s = fmt"{self.getConvoId()}|{msgBytes}" diff --git a/src/chat/errors.nim b/src/chat/errors.nim index f4751b0..c74b5b2 100644 --- a/src/chat/errors.nim +++ b/src/chat/errors.nim @@ -8,6 +8,7 @@ type ErrorCode* = enum errTypeError errWrapped + errTopic proc `$`*(x: ChatError): string = fmt"ChatError(code={$x.code}, context: {x.context})" diff --git a/src/chat/inbox.nim b/src/chat/inbox.nim index 14beed2..6b8f606 100644 --- a/src/chat/inbox.nim +++ b/src/chat/inbox.nim @@ -1,3 +1,5 @@ +import std/[strutils] + import chronicles, chronos, @@ -10,6 +12,7 @@ import conversation_store, crypto, delivery/waku_client, + errors, proto_types, types @@ -21,6 +24,8 @@ type pubkey: PublicKey inbox_addr: string +const + TopicPrefixInbox = "/inbox/" proc `$`*(conv: Inbox): string = fmt"Inbox: addr->{conv.inbox_addr}" @@ -56,7 +61,17 @@ proc conversation_id_for*(pubkey: PublicKey): string = # TODO derive this from instance of Inbox proc topic_inbox*(client_addr: string): string = - return "/inbox/" & client_addr + return TopicPrefixInbox & client_addr + +proc parseTopic*(topic: string): Result[string, ChatError] = + if not topic.startsWith(TopicPrefixInbox): + return err(ChatError(code: errTopic, context: "Invalid inbox topic prefix")) + + let id = topic.split('/')[^1] + if id == "": + return err(ChatError(code: errTopic, context: "Empty inbox id")) + + return ok(id) method id*(convo: Inbox): string = return conversation_id_for(convo.pubkey)