mirror of
https://github.com/logos-messaging/nim-chat-poc.git
synced 2026-01-08 00:53:08 +00:00
Merge pull request #35 from logos-messaging/nwaku-discovery
feat: dns discovery bootstrap
This commit is contained in:
commit
629b1b73b9
@ -21,10 +21,6 @@ proc main() {.async.} =
|
|||||||
var cfg_saro = DefaultConfig()
|
var cfg_saro = DefaultConfig()
|
||||||
var cfg_raya = DefaultConfig()
|
var cfg_raya = DefaultConfig()
|
||||||
|
|
||||||
# Cross pollinate Peers - No Waku discovery is used in this example
|
|
||||||
cfg_saro.staticPeers.add(cfg_raya.getMultiAddr())
|
|
||||||
cfg_raya.staticPeers.add(cfg_saro.getMultiAddr())
|
|
||||||
|
|
||||||
let sKey = loadPrivateKeyFromBytes(@[45u8, 216, 160, 24, 19, 207, 193, 214, 98, 92, 153, 145, 222, 247, 101, 99, 96, 131, 149, 185, 33, 187, 229, 251, 100, 158, 20, 131, 111, 97, 181, 210]).get()
|
let sKey = loadPrivateKeyFromBytes(@[45u8, 216, 160, 24, 19, 207, 193, 214, 98, 92, 153, 145, 222, 247, 101, 99, 96, 131, 149, 185, 33, 187, 229, 251, 100, 158, 20, 131, 111, 97, 181, 210]).get()
|
||||||
let rKey = loadPrivateKeyFromBytes(@[43u8, 12, 160, 51, 212, 90, 199, 160, 154, 164, 129, 229, 147, 69, 151, 17, 239, 51, 190, 33, 86, 164, 50, 105, 39, 250, 182, 116, 138, 132, 114, 234]).get()
|
let rKey = loadPrivateKeyFromBytes(@[43u8, 12, 160, 51, 212, 90, 199, 160, 154, 164, 129, 229, 147, 69, 151, 17, 239, 51, 190, 33, 86, 164, 50, 105, 39, 250, 182, 116, 138, 132, 114, 234]).get()
|
||||||
|
|
||||||
@ -73,7 +69,7 @@ proc main() {.async.} =
|
|||||||
await saro.start()
|
await saro.start()
|
||||||
await raya.start()
|
await raya.start()
|
||||||
|
|
||||||
await sleepAsync(5.seconds)
|
await sleepAsync(10.seconds)
|
||||||
|
|
||||||
# Perform OOB Introduction: Raya -> Saro
|
# Perform OOB Introduction: Raya -> Saro
|
||||||
let raya_bundle = raya.createIntroBundle()
|
let raya_bundle = raya.createIntroBundle()
|
||||||
@ -81,8 +77,8 @@ proc main() {.async.} =
|
|||||||
|
|
||||||
await sleepAsync(20.seconds) # Run for some time
|
await sleepAsync(20.seconds) # Run for some time
|
||||||
|
|
||||||
saro.stop()
|
await saro.stop()
|
||||||
raya.stop()
|
await raya.stop()
|
||||||
|
|
||||||
|
|
||||||
when isMainModule:
|
when isMainModule:
|
||||||
|
|||||||
@ -106,10 +106,6 @@ proc getSelectedConvo(app: ChatApp): ptr ConvoInfo =
|
|||||||
|
|
||||||
proc createChatClient(name: string): Future[Client] {.async.} =
|
proc createChatClient(name: string): Future[Client] {.async.} =
|
||||||
var cfg = await getCfg(name)
|
var cfg = await getCfg(name)
|
||||||
for key, val in fetchRegistrations():
|
|
||||||
if key != name:
|
|
||||||
cfg.waku.staticPeers.add(val)
|
|
||||||
|
|
||||||
result = newClient(cfg.waku, cfg.ident)
|
result = newClient(cfg.waku, cfg.ident)
|
||||||
|
|
||||||
|
|
||||||
@ -124,7 +120,7 @@ proc sendMessage(app: ChatApp, convoInfo: ptr ConvoInfo, msg: string) {.async.}
|
|||||||
|
|
||||||
var msgId = ""
|
var msgId = ""
|
||||||
if convoInfo.convo != nil:
|
if convoInfo.convo != nil:
|
||||||
msgId = await convoInfo.convo.sendMessage(app.client.ds, initTextFrame(msg).toContentFrame())
|
msgId = await convoInfo.convo.sendMessage(initTextFrame(msg).toContentFrame())
|
||||||
|
|
||||||
convoInfo[].addMessage(msgId, "You", app.inputBuffer)
|
convoInfo[].addMessage(msgId, "You", app.inputBuffer)
|
||||||
|
|
||||||
@ -490,7 +486,7 @@ proc appLoop(app: ChatApp, panes: seq[Pane]) : Future[void] {.async.} =
|
|||||||
illwillInit(fullscreen = false)
|
illwillInit(fullscreen = false)
|
||||||
# Clear buffer
|
# Clear buffer
|
||||||
while true:
|
while true:
|
||||||
await sleepAsync(5.milliseconds)
|
await sleepAsync(chronos.milliseconds(5))
|
||||||
app.tb.clear()
|
app.tb.clear()
|
||||||
|
|
||||||
drawStatusBar(app, panes[0], fgBlack, getIdColor(app.client.getId()))
|
drawStatusBar(app, panes[0], fgBlack, getIdColor(app.client.getId()))
|
||||||
@ -527,7 +523,7 @@ proc appLoop(app: ChatApp, panes: seq[Pane]) : Future[void] {.async.} =
|
|||||||
|
|
||||||
proc peerWatch(app: ChatApp): Future[void] {.async.} =
|
proc peerWatch(app: ChatApp): Future[void] {.async.} =
|
||||||
while true:
|
while true:
|
||||||
await sleepAsync(1.seconds)
|
await sleepAsync(chronos.seconds(1))
|
||||||
app.peerCount = app.client.ds.getConnectedPeerCount()
|
app.peerCount = app.client.ds.getConnectedPeerCount()
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -229,14 +229,16 @@ proc newPrivateConversation*(client: Client,
|
|||||||
|
|
||||||
#################################################
|
#################################################
|
||||||
# Payload Handling
|
# Payload Handling
|
||||||
|
# Receives a incoming payload, decodes it, and processes it.
|
||||||
#################################################
|
#################################################
|
||||||
|
|
||||||
proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError,
|
proc parseMessage(client: Client, msg: ChatPayload) {.raises: [ValueError,
|
||||||
SerializationError].} =
|
SerializationError].} =
|
||||||
## Receives a incoming payload, decodes it, and processes it.
|
let envelopeRes = decode(msg.bytes, WapEnvelopeV1)
|
||||||
|
if envelopeRes.isErr:
|
||||||
let envelope = decode(msg.bytes, WapEnvelopeV1).valueOr:
|
debug "Failed to decode WapEnvelopeV1", client = client.getId(), err = envelopeRes.error
|
||||||
raise newException(ValueError, "Failed to decode WapEnvelopeV1: " & error)
|
return
|
||||||
|
let envelope = envelopeRes.get()
|
||||||
|
|
||||||
let convo = block:
|
let convo = block:
|
||||||
let opt = client.getConversationFromHint(envelope.conversationHint).valueOr:
|
let opt = client.getConversationFromHint(envelope.conversationHint).valueOr:
|
||||||
@ -266,6 +268,11 @@ proc messageQueueConsumer(client: Client) {.async.} =
|
|||||||
while client.isRunning:
|
while client.isRunning:
|
||||||
let message = await client.inboundQueue.queue.get()
|
let message = await client.inboundQueue.queue.get()
|
||||||
|
|
||||||
|
let topicRes = inbox.parseTopic(message.contentTopic).or(private_v1.parseTopic(message.contentTopic))
|
||||||
|
if topicRes.isErr:
|
||||||
|
debug "Invalid content topic", client = client.getId(), err = topicRes.error, contentTopic = message.contentTopic
|
||||||
|
continue
|
||||||
|
|
||||||
notice "Inbound Message Received", client = client.getId(),
|
notice "Inbound Message Received", client = client.getId(),
|
||||||
contentTopic = message.contentTopic, len = message.bytes.len()
|
contentTopic = message.contentTopic, len = message.bytes.len()
|
||||||
try:
|
try:
|
||||||
@ -291,7 +298,8 @@ proc start*(client: Client) {.async.} =
|
|||||||
|
|
||||||
notice "Client start complete", client = client.getId()
|
notice "Client start complete", client = client.getId()
|
||||||
|
|
||||||
proc stop*(client: Client) =
|
proc stop*(client: Client) {.async.} =
|
||||||
## Stop the client.
|
## Stop the client.
|
||||||
|
await client.ds.stop()
|
||||||
client.isRunning = false
|
client.isRunning = false
|
||||||
notice "Client stopped", client = client.getId()
|
notice "Client stopped", client = client.getId()
|
||||||
|
|||||||
@ -42,6 +42,9 @@ type
|
|||||||
discriminator: string
|
discriminator: string
|
||||||
doubleratchet: naxolotl.Doubleratchet
|
doubleratchet: naxolotl.Doubleratchet
|
||||||
|
|
||||||
|
const
|
||||||
|
TopicPrefixPrivateV1 = "/convo/private/"
|
||||||
|
|
||||||
proc getTopic*(self: PrivateV1): string =
|
proc getTopic*(self: PrivateV1): string =
|
||||||
## Returns the topic for the PrivateV1 conversation.
|
## Returns the topic for the PrivateV1 conversation.
|
||||||
return self.topic
|
return self.topic
|
||||||
@ -63,7 +66,18 @@ proc getConvoId*(self: PrivateV1): string =
|
|||||||
|
|
||||||
proc derive_topic(participants: seq[PublicKey], discriminator: string): string =
|
proc derive_topic(participants: seq[PublicKey], discriminator: string): string =
|
||||||
## Derives a topic from the participants' public keys.
|
## Derives a topic from the participants' public keys.
|
||||||
return "/convo/private/" & getConvoIdRaw(participants, discriminator)
|
return TopicPrefixPrivateV1 & getConvoIdRaw(participants, discriminator)
|
||||||
|
|
||||||
|
## Parses the topic to extract the conversation ID.
|
||||||
|
proc parseTopic*(topic: string): Result[string, ChatError] =
|
||||||
|
if not topic.startsWith(TopicPrefixPrivateV1):
|
||||||
|
return err(ChatError(code: errTopic, context: "Invalid topic prefix"))
|
||||||
|
|
||||||
|
let id = topic.split('/')[^1]
|
||||||
|
if id == "":
|
||||||
|
return err(ChatError(code: errTopic, context: "Empty conversation ID"))
|
||||||
|
|
||||||
|
return ok(id)
|
||||||
|
|
||||||
proc calcMsgId(self: PrivateV1, msgBytes: seq[byte]): string =
|
proc calcMsgId(self: PrivateV1, msgBytes: seq[byte]): string =
|
||||||
let s = fmt"{self.getConvoId()}|{msgBytes}"
|
let s = fmt"{self.getConvoId()}|{msgBytes}"
|
||||||
|
|||||||
@ -15,6 +15,7 @@ import
|
|||||||
waku_node,
|
waku_node,
|
||||||
waku_enr,
|
waku_enr,
|
||||||
discovery/waku_discv5,
|
discovery/waku_discv5,
|
||||||
|
discovery/waku_dnsdisc,
|
||||||
factory/builder,
|
factory/builder,
|
||||||
waku_filter_v2/client,
|
waku_filter_v2/client,
|
||||||
]
|
]
|
||||||
@ -71,8 +72,8 @@ type
|
|||||||
|
|
||||||
proc DefaultConfig*(): WakuConfig =
|
proc DefaultConfig*(): WakuConfig =
|
||||||
let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
|
let nodeKey = crypto.PrivateKey.random(Secp256k1, crypto.newRng()[])[]
|
||||||
let clusterId = 19'u16
|
let clusterId = 16'u16
|
||||||
let shardId = 0'u16
|
let shardId = 32'u16
|
||||||
var port: uint16 = 50000'u16 + uint16(rand(200))
|
var port: uint16 = 50000'u16 + uint16(rand(200))
|
||||||
|
|
||||||
result = WakuConfig(nodeKey: nodeKey, port: port, clusterId: clusterId,
|
result = WakuConfig(nodeKey: nodeKey, port: port, clusterId: clusterId,
|
||||||
@ -161,6 +162,17 @@ proc start*(client: WakuClient) {.async.} =
|
|||||||
|
|
||||||
client.node.peerManager.start()
|
client.node.peerManager.start()
|
||||||
|
|
||||||
|
let dnsDiscoveryUrl = "enrtree://AI4W5N5IFEUIHF5LESUAOSMV6TKWF2MB6GU2YK7PU4TYUGUNOCEPW@boot.staging.status.nodes.status.im"
|
||||||
|
let nameServer = parseIpAddress("1.1.1.1")
|
||||||
|
let discoveredPeers = await retrieveDynamicBootstrapNodes(dnsDiscoveryUrl, @[nameServer])
|
||||||
|
if discoveredPeers.isOk:
|
||||||
|
info "Connecting to discovered peers"
|
||||||
|
let remotePeers = discoveredPeers.get()
|
||||||
|
info "Discovered and connecting to peers", peerCount = remotePeers.len
|
||||||
|
asyncSpawn client.node.connectToNodes(remotePeers)
|
||||||
|
else:
|
||||||
|
warn "Failed to find peers via DNS discovery", error = discoveredPeers.error
|
||||||
|
|
||||||
let subscription: SubscriptionEvent = (kind: PubsubSub, topic:
|
let subscription: SubscriptionEvent = (kind: PubsubSub, topic:
|
||||||
client.cfg.pubsubTopic)
|
client.cfg.pubsubTopic)
|
||||||
|
|
||||||
@ -194,3 +206,6 @@ proc getConnectedPeerCount*(client: WakuClient): int =
|
|||||||
if peerInfo.connectedness == Connected:
|
if peerInfo.connectedness == Connected:
|
||||||
inc count
|
inc count
|
||||||
return count
|
return count
|
||||||
|
|
||||||
|
proc stop*(client: WakuClient) {.async.} =
|
||||||
|
await client.node.stop()
|
||||||
|
|||||||
@ -8,6 +8,7 @@ type
|
|||||||
ErrorCode* = enum
|
ErrorCode* = enum
|
||||||
errTypeError
|
errTypeError
|
||||||
errWrapped
|
errWrapped
|
||||||
|
errTopic
|
||||||
|
|
||||||
proc `$`*(x: ChatError): string =
|
proc `$`*(x: ChatError): string =
|
||||||
fmt"ChatError(code={$x.code}, context: {x.context})"
|
fmt"ChatError(code={$x.code}, context: {x.context})"
|
||||||
|
|||||||
@ -1,3 +1,5 @@
|
|||||||
|
import std/[strutils]
|
||||||
|
|
||||||
import
|
import
|
||||||
chronicles,
|
chronicles,
|
||||||
chronos,
|
chronos,
|
||||||
@ -10,6 +12,7 @@ import
|
|||||||
conversation_store,
|
conversation_store,
|
||||||
crypto,
|
crypto,
|
||||||
delivery/waku_client,
|
delivery/waku_client,
|
||||||
|
errors,
|
||||||
proto_types,
|
proto_types,
|
||||||
types
|
types
|
||||||
|
|
||||||
@ -21,6 +24,8 @@ type
|
|||||||
pubkey: PublicKey
|
pubkey: PublicKey
|
||||||
inbox_addr: string
|
inbox_addr: string
|
||||||
|
|
||||||
|
const
|
||||||
|
TopicPrefixInbox = "/inbox/"
|
||||||
|
|
||||||
proc `$`*(conv: Inbox): string =
|
proc `$`*(conv: Inbox): string =
|
||||||
fmt"Inbox: addr->{conv.inbox_addr}"
|
fmt"Inbox: addr->{conv.inbox_addr}"
|
||||||
@ -56,7 +61,17 @@ proc conversation_id_for*(pubkey: PublicKey): string =
|
|||||||
|
|
||||||
# TODO derive this from instance of Inbox
|
# TODO derive this from instance of Inbox
|
||||||
proc topic_inbox*(client_addr: string): string =
|
proc topic_inbox*(client_addr: string): string =
|
||||||
return "/inbox/" & client_addr
|
return TopicPrefixInbox & client_addr
|
||||||
|
|
||||||
|
proc parseTopic*(topic: string): Result[string, ChatError] =
|
||||||
|
if not topic.startsWith(TopicPrefixInbox):
|
||||||
|
return err(ChatError(code: errTopic, context: "Invalid inbox topic prefix"))
|
||||||
|
|
||||||
|
let id = topic.split('/')[^1]
|
||||||
|
if id == "":
|
||||||
|
return err(ChatError(code: errTopic, context: "Empty inbox id"))
|
||||||
|
|
||||||
|
return ok(id)
|
||||||
|
|
||||||
method id*(convo: Inbox): string =
|
method id*(convo: Inbox): string =
|
||||||
return conversation_id_for(convo.pubkey)
|
return conversation_id_for(convo.pubkey)
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user