mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-05-12 05:19:33 +00:00
439 lines
12 KiB
Nim
439 lines
12 KiB
Nim
## chat2disco is an example of usage of Waku v2 with Kademlia service discovery.
|
|
## Users create named chat rooms; the app derives a service ID from the room name,
|
|
## advertises via Kademlia, and discovers/connects to other peers with the same service.
|
|
|
|
when not (compileOption("threads")):
|
|
{.fatal: "Please, compile this program with the --threads:on option!".}
|
|
|
|
{.push raises: [].}
|
|
|
|
import std/[strformat, strutils, times, options, sequtils, tables]
|
|
import
|
|
confutils,
|
|
chronicles,
|
|
chronos,
|
|
eth/keys,
|
|
bearssl,
|
|
stew/byteutils,
|
|
results,
|
|
metrics,
|
|
metrics/chronos_httpserver
|
|
import
|
|
libp2p/[
|
|
switch,
|
|
crypto/crypto,
|
|
stream/connection,
|
|
multiaddress,
|
|
peerinfo,
|
|
peerid,
|
|
protobuf/minprotobuf,
|
|
extended_peer_record,
|
|
]
|
|
import
|
|
waku/[
|
|
waku_core,
|
|
waku_enr,
|
|
discovery/waku_kademlia,
|
|
waku_node,
|
|
node/waku_metrics,
|
|
node/peer_manager,
|
|
factory/builder,
|
|
common/utils/nat,
|
|
waku_relay,
|
|
],
|
|
./config_chat2disco
|
|
|
|
import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub
|
|
import libp2p/protocols/service_discovery/types
|
|
|
|
logScope:
|
|
topics = "chat2disco"
|
|
|
|
const Help = """
|
|
Commands: /[?|help|create|rooms|switch|nick|exit]
|
|
help: Prints this help
|
|
create <room>: Create/join a chat room via service discovery
|
|
rooms: List joined rooms
|
|
switch <room>: Switch active room for sending messages
|
|
nick: change nickname
|
|
exit: exits chat session
|
|
"""
|
|
|
|
type
|
|
ChatRoom = object
|
|
serviceId*: string
|
|
contentTopic*: string
|
|
discovered*: seq[RemotePeerInfo]
|
|
|
|
Chat = ref object
|
|
node: WakuNode
|
|
transp: StreamTransport
|
|
subscribed: bool
|
|
started: bool
|
|
nick: string
|
|
prompt: bool
|
|
rooms: Table[string, ChatRoom]
|
|
currentRoom: string
|
|
|
|
PrivateKey* = crypto.PrivateKey
|
|
Topic* = waku_core.PubsubTopic
|
|
|
|
#####################
|
|
## chat2 protobufs ##
|
|
#####################
|
|
|
|
type
|
|
SelectResult*[T] = Result[T, string]
|
|
|
|
Chat2Message* = object
|
|
timestamp*: int64
|
|
nick*: string
|
|
payload*: seq[byte]
|
|
|
|
proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] =
|
|
var msg = Chat2Message()
|
|
let pb = initProtoBuffer(buffer)
|
|
|
|
var timestamp: uint64
|
|
discard ?pb.getField(1, timestamp)
|
|
msg.timestamp = int64(timestamp)
|
|
|
|
discard ?pb.getField(2, msg.nick)
|
|
discard ?pb.getField(3, msg.payload)
|
|
|
|
ok(msg)
|
|
|
|
proc encode*(message: Chat2Message): ProtoBuffer =
|
|
var serialised = initProtoBuffer()
|
|
|
|
serialised.write(1, uint64(message.timestamp))
|
|
serialised.write(2, message.nick)
|
|
serialised.write(3, message.payload)
|
|
|
|
return serialised
|
|
|
|
proc toString*(message: Chat2Message): string =
|
|
let time = message.timestamp.fromUnix().local().format("'<'MMM' 'dd,' 'HH:mm'>'")
|
|
return time & " " & message.nick & ": " & string.fromBytes(message.payload)
|
|
|
|
#####################
|
|
|
|
proc showChatPrompt(c: Chat) =
|
|
if not c.prompt:
|
|
try:
|
|
stdout.write(">> ")
|
|
stdout.flushFile()
|
|
c.prompt = true
|
|
except IOError:
|
|
discard
|
|
|
|
proc getChatLine(payload: seq[byte]): string =
|
|
let pb = Chat2Message.init(payload).valueOr:
|
|
return string.fromBytes(payload)
|
|
return $pb
|
|
|
|
proc readNick(transp: StreamTransport): Future[string] {.async.} =
|
|
stdout.write("Choose a nickname >> ")
|
|
stdout.flushFile()
|
|
return await transp.readLine()
|
|
|
|
proc startMetricsServer(
|
|
serverIp: IpAddress, serverPort: Port
|
|
): Result[MetricsHttpServerRef, string] =
|
|
info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort
|
|
|
|
let server = MetricsHttpServerRef.new($serverIp, serverPort).valueOr:
|
|
return err("metrics HTTP server start failed: " & $error)
|
|
|
|
try:
|
|
waitFor server.start()
|
|
except CatchableError:
|
|
return err("metrics HTTP server start failed: " & getCurrentExceptionMsg())
|
|
|
|
info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort
|
|
ok(server)
|
|
|
|
proc publish(c: Chat, line: string) =
|
|
let time = getTime().toUnix()
|
|
let chat2pb =
|
|
Chat2Message(timestamp: time, nick: c.nick, payload: line.toBytes()).encode()
|
|
|
|
let room =
|
|
try:
|
|
c.rooms[c.currentRoom]
|
|
except KeyError:
|
|
error "current room not found in rooms table", room = c.currentRoom
|
|
return
|
|
|
|
var message = WakuMessage(
|
|
payload: chat2pb.buffer,
|
|
contentTopic: room.contentTopic,
|
|
version: 0,
|
|
timestamp: getNanosecondTime(time),
|
|
)
|
|
|
|
try:
|
|
(waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr:
|
|
error "failed to publish message", error = error
|
|
except CatchableError:
|
|
error "caught error publishing message: ", error = getCurrentExceptionMsg()
|
|
|
|
# TODO This should read or be subscribe handler subscribe
|
|
proc readAndPrint(c: Chat) {.async.} =
|
|
while true:
|
|
await sleepAsync(100.millis)
|
|
|
|
# TODO Implement
|
|
proc writeAndPrint(c: Chat) {.async.} =
|
|
while true:
|
|
showChatPrompt(c)
|
|
|
|
let line = await c.transp.readLine()
|
|
if line.startsWith("/help") or line.startsWith("/?") or not c.started:
|
|
echo Help
|
|
continue
|
|
elif line.startsWith("/create"):
|
|
let roomName = line[7 ..^ 1].strip()
|
|
if roomName.len == 0:
|
|
echo "Usage: /create <room-name>"
|
|
continue
|
|
|
|
if roomName in c.rooms:
|
|
echo &"Already in room '{roomName}'. Use /switch {roomName} to make it active."
|
|
continue
|
|
|
|
let serviceIdStr = "/waku/chat-room/" & roomName & "/1.0.0"
|
|
let contentTopic = "/chat2disco/1/" & roomName & "/proto"
|
|
|
|
let serviceInfo = ServiceInfo(id: serviceIdStr, data: @[])
|
|
|
|
if not c.node.wakuKademlia.isNil():
|
|
c.node.wakuKademlia.advertiseService(serviceInfo)
|
|
echo &"Advertising service: {serviceIdStr}"
|
|
|
|
let peers = await c.node.wakuKademlia.lookup(serviceIdStr)
|
|
echo &"Discovered {peers.len} peer(s) for room '{roomName}'"
|
|
|
|
if peers.len > 0:
|
|
await c.node.connectToNodes(peers)
|
|
echo "Connected to discovered peers"
|
|
|
|
c.rooms[roomName] = ChatRoom(
|
|
serviceId: serviceIdStr, contentTopic: contentTopic, discovered: peers
|
|
)
|
|
else:
|
|
echo "Warning: Kademlia not available. Room created locally only."
|
|
c.rooms[roomName] =
|
|
ChatRoom(serviceId: serviceIdStr, contentTopic: contentTopic, discovered: @[])
|
|
|
|
c.currentRoom = roomName
|
|
echo &"Created/joined room '{roomName}'. Content topic: {contentTopic}"
|
|
elif line.startsWith("/rooms"):
|
|
if c.rooms.len == 0:
|
|
echo "No rooms joined yet. Use /create <room-name> to create one."
|
|
else:
|
|
echo "Joined rooms:"
|
|
for name, room in c.rooms:
|
|
let marker = if name == c.currentRoom: " *" else: ""
|
|
echo &" {name} ({room.discovered.len} peers){marker}"
|
|
elif line.startsWith("/switch"):
|
|
let roomName = line[7 ..^ 1].strip()
|
|
if roomName.len == 0:
|
|
echo "Usage: /switch <room-name>"
|
|
continue
|
|
|
|
if roomName notin c.rooms:
|
|
echo &"Room '{roomName}' not found. Use /create {roomName} to create it."
|
|
continue
|
|
|
|
c.currentRoom = roomName
|
|
echo &"Switched to room '{roomName}'"
|
|
elif line.startsWith("/nick"):
|
|
c.nick = await readNick(c.transp)
|
|
echo "You are now known as " & c.nick
|
|
elif line.startsWith("/exit"):
|
|
echo "quitting..."
|
|
|
|
try:
|
|
await c.node.stop()
|
|
except:
|
|
echo "exception happened when stopping: " & getCurrentExceptionMsg()
|
|
|
|
quit(QuitSuccess)
|
|
else:
|
|
if c.started:
|
|
if c.rooms.len == 0:
|
|
echo "No room active. Use /create <room-name> first."
|
|
else:
|
|
c.publish(line)
|
|
else:
|
|
try:
|
|
if line.startsWith("/") and "p2p" in line:
|
|
await c.node.connectToNodes(@[line])
|
|
except:
|
|
echo &"unable to dial remote peer {line}"
|
|
echo getCurrentExceptionMsg()
|
|
|
|
proc readWriteLoop(c: Chat) {.async.} =
|
|
asyncSpawn c.writeAndPrint()
|
|
asyncSpawn c.readAndPrint()
|
|
|
|
proc readInput(wfd: AsyncFD) {.thread, raises: [Defect, CatchableError].} =
|
|
let transp = fromPipe(wfd)
|
|
|
|
while true:
|
|
let line = stdin.readLine()
|
|
discard waitFor transp.write(line & "\r\n")
|
|
|
|
{.pop.}
|
|
proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} =
|
|
let
|
|
transp = fromPipe(rfd)
|
|
conf = Chat2DiscoConf.load()
|
|
nodekey =
|
|
if conf.nodekey.isSome():
|
|
conf.nodekey.get()
|
|
else:
|
|
PrivateKey.random(Secp256k1, rng[]).tryGet()
|
|
|
|
# set log level
|
|
if conf.logLevel != LogLevel.NONE:
|
|
setLogLevel(conf.logLevel)
|
|
|
|
let (extIp, extTcpPort, extUdpPort) = setupNat(
|
|
conf.nat,
|
|
clientId,
|
|
Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
Port(uint16(conf.udpPort) + conf.portsShift),
|
|
).valueOr:
|
|
raise newException(ValueError, "setupNat error " & error)
|
|
|
|
var enrBuilder = EnrBuilder.init(nodeKey)
|
|
|
|
let record = enrBuilder.build().valueOr:
|
|
error "failed to create enr record", error
|
|
quit(QuitFailure)
|
|
|
|
let node = block:
|
|
var builder = WakuNodeBuilder.init()
|
|
builder.withNodeKey(nodeKey)
|
|
builder.withRecord(record)
|
|
|
|
builder
|
|
.withNetworkConfigurationDetails(
|
|
conf.listenAddress,
|
|
Port(uint16(conf.tcpPort) + conf.portsShift),
|
|
extIp,
|
|
extTcpPort,
|
|
wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift),
|
|
wsEnabled = conf.websocketSupport,
|
|
wssEnabled = conf.websocketSecureSupport,
|
|
)
|
|
.tryGet()
|
|
builder.build().tryGet()
|
|
|
|
if conf.relay:
|
|
(await node.mountRelay()).isOkOr:
|
|
error "failed to mount relay", error
|
|
quit(QuitFailure)
|
|
|
|
await node.mountLibp2pPing()
|
|
|
|
var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] = @[]
|
|
if conf.kadBootstrapNodes.len > 0:
|
|
for nodeStr in conf.kadBootstrapNodes:
|
|
let (peerId, ma) = parseFullAddress(nodeStr).valueOr:
|
|
error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error
|
|
continue
|
|
kadBootstrapPeers.add((peerId, @[ma]))
|
|
|
|
node.wakuKademlia = WakuKademlia.new(
|
|
switch = node.switch,
|
|
peerManager = node.peerManager,
|
|
bootstrapNodes = kadBootstrapPeers,
|
|
xprPublishing = false,
|
|
disableBootstrapping = true,
|
|
)
|
|
|
|
let catchRes = catch:
|
|
node.switch.mount(node.wakuKademlia.protocol)
|
|
|
|
if catchRes.isErr():
|
|
error "failed to mount kademlia discovery", error = catchRes.error.msg
|
|
quit(QuitFailure)
|
|
|
|
# node start include kademlia
|
|
await node.start()
|
|
|
|
let nick = await readNick(transp)
|
|
echo "Welcome, " & nick & "!"
|
|
|
|
var chat = Chat(
|
|
node: node,
|
|
transp: transp,
|
|
subscribed: true,
|
|
started: true,
|
|
nick: nick,
|
|
prompt: false,
|
|
)
|
|
|
|
let peerInfo = node.switch.peerInfo
|
|
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
|
|
echo &"Listening on\n {listenStr}"
|
|
|
|
# Subscribe to relay topic
|
|
if conf.relay:
|
|
proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} =
|
|
for roomName, room in chat.rooms:
|
|
if msg.contentTopic == room.contentTopic:
|
|
let chatLine = getChatLine(msg.payload)
|
|
let prefix =
|
|
if chat.rooms.len > 1:
|
|
"[" & roomName & "] "
|
|
else:
|
|
""
|
|
try:
|
|
echo &"{prefix}{chatLine}"
|
|
except ValueError:
|
|
echo prefix & chatLine
|
|
chat.prompt = false
|
|
showChatPrompt(chat)
|
|
break
|
|
|
|
node.subscribe(
|
|
(kind: PubsubSub, topic: DefaultPubsubTopic), WakuRelayHandler(handler)
|
|
).isOkOr:
|
|
error "failed to subscribe to pubsub topic",
|
|
topic = DefaultPubsubTopic, error = error
|
|
|
|
if conf.metricsLogging:
|
|
startMetricsLog()
|
|
|
|
if conf.metricsServer:
|
|
let metricsServer = startMetricsServer(
|
|
conf.metricsServerAddress, Port(conf.metricsServerPort + conf.portsShift)
|
|
)
|
|
|
|
await chat.readWriteLoop()
|
|
|
|
runForever()
|
|
|
|
proc main(rng: ref HmacDrbgContext) {.async.} =
|
|
let (rfd, wfd) = createAsyncPipe()
|
|
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
|
raise newException(ValueError, "Could not initialize pipe!")
|
|
|
|
var thread: Thread[AsyncFD]
|
|
thread.createThread(readInput, wfd)
|
|
try:
|
|
await processInput(rfd, rng)
|
|
except ConfigurationError as e:
|
|
raise e
|
|
|
|
when isMainModule:
|
|
let rng = crypto.newRng()
|
|
try:
|
|
waitFor(main(rng))
|
|
except CatchableError as e:
|
|
raise e
|