diff --git a/apps/chat2disco/chat2disco.nim b/apps/chat2disco/chat2disco.nim new file mode 100644 index 000000000..26e572590 --- /dev/null +++ b/apps/chat2disco/chat2disco.nim @@ -0,0 +1,509 @@ +## chat2disco is an example of usage of Waku v2 with Kademlia service discovery. +## Users create named chat rooms; the app derives a service ID from the room name, +## advertises via Kademlia, and discovers/connects to other peers with the same service. + +when not (compileOption("threads")): + {.fatal: "Please, compile this program with the --threads:on option!".} + +{.push raises: [].} + +import std/[strformat, strutils, times, options, random, sequtils, tables] +import + confutils, + chronicles, + chronos, + eth/keys, + bearssl, + stew/byteutils, + results, + metrics, + metrics/chronos_httpserver +import + libp2p/[ + switch, + crypto/crypto, + stream/connection, + multiaddress, + peerinfo, + peerid, + protobuf/minprotobuf, + nameresolving/dnsresolver, + extended_peer_record, + ] +import + waku/[ + waku_core, + waku_enr, + discovery/waku_dnsdisc, + discovery/waku_kademlia, + waku_node, + node/waku_metrics, + node/peer_manager, + factory/builder, + common/utils/nat, + waku_relay, + waku_store/common, + ], + ./config_chat2disco + +import libp2p/protocols/pubsub/rpc/messages, libp2p/protocols/pubsub/pubsub +import libp2p/protocols/service_discovery/types + +logScope: + topics = "chat2disco" + +const Help = """ + Commands: /[?|help|create|rooms|switch|nick|exit] + help: Prints this help + create : Create/join a chat room via service discovery + rooms: List joined rooms + switch : Switch active room for sending messages + nick: change nickname + exit: exits chat session +""" + +type + ChatRoom = object + serviceId*: string + contentTopic*: string + discovered*: seq[RemotePeerInfo] + + Chat = ref object + node: WakuNode + transp: StreamTransport + subscribed: bool + started: bool + nick: string + prompt: bool + rooms: Table[string, ChatRoom] + currentRoom: string + + PrivateKey* = crypto.PrivateKey + Topic* = waku_core.PubsubTopic + +##################### +## chat2 protobufs ## +##################### + +type + SelectResult*[T] = Result[T, string] + + Chat2Message* = object + timestamp*: int64 + nick*: string + payload*: seq[byte] + +proc init*(T: type Chat2Message, buffer: seq[byte]): ProtoResult[T] = + var msg = Chat2Message() + let pb = initProtoBuffer(buffer) + + var timestamp: uint64 + discard ?pb.getField(1, timestamp) + msg.timestamp = int64(timestamp) + + discard ?pb.getField(2, msg.nick) + discard ?pb.getField(3, msg.payload) + + ok(msg) + +proc encode*(message: Chat2Message): ProtoBuffer = + var serialised = initProtoBuffer() + + serialised.write(1, uint64(message.timestamp)) + serialised.write(2, message.nick) + serialised.write(3, message.payload) + + return serialised + +proc toString*(message: Chat2Message): string = + let time = message.timestamp.fromUnix().local().format("'<'MMM' 'dd,' 'HH:mm'>'") + return time & " " & message.nick & ": " & string.fromBytes(message.payload) + +##################### + +proc showChatPrompt(c: Chat) = + if not c.prompt: + try: + stdout.write(">> ") + stdout.flushFile() + c.prompt = true + except IOError: + discard + +proc getChatLine(payload: seq[byte]): string = + let pb = Chat2Message.init(payload).valueOr: + return string.fromBytes(payload) + return $pb + +proc readNick(transp: StreamTransport): Future[string] {.async.} = + stdout.write("Choose a nickname >> ") + stdout.flushFile() + return await transp.readLine() + +proc startMetricsServer( + serverIp: IpAddress, serverPort: Port +): Result[MetricsHttpServerRef, string] = + info "Starting metrics HTTP server", serverIp = $serverIp, serverPort = $serverPort + + let server = MetricsHttpServerRef.new($serverIp, serverPort).valueOr: + return err("metrics HTTP server start failed: " & $error) + + try: + waitFor server.start() + except CatchableError: + return err("metrics HTTP server start failed: " & getCurrentExceptionMsg()) + + info "Metrics HTTP server started", serverIp = $serverIp, serverPort = $serverPort + ok(server) + +proc publish(c: Chat, line: string) = + let time = getTime().toUnix() + let chat2pb = + Chat2Message(timestamp: time, nick: c.nick, payload: line.toBytes()).encode() + + let room = + try: + c.rooms[c.currentRoom] + except KeyError: + error "current room not found in rooms table", room = c.currentRoom + return + + var message = WakuMessage( + payload: chat2pb.buffer, + contentTopic: room.contentTopic, + version: 0, + timestamp: getNanosecondTime(time), + ) + + try: + (waitFor c.node.publish(some(DefaultPubsubTopic), message)).isOkOr: + error "failed to publish message", error = error + except CatchableError: + error "caught error publishing message: ", error = getCurrentExceptionMsg() + +# TODO This should read or be subscribe handler subscribe +proc readAndPrint(c: Chat) {.async.} = + while true: + await sleepAsync(100.millis) + +# TODO Implement +proc writeAndPrint(c: Chat) {.async.} = + while true: + showChatPrompt(c) + + let line = await c.transp.readLine() + if line.startsWith("/help") or line.startsWith("/?") or not c.started: + echo Help + continue + elif line.startsWith("/create"): + let roomName = line[7 ..^ 1].strip() + if roomName.len == 0: + echo "Usage: /create " + continue + + if roomName in c.rooms: + echo &"Already in room '{roomName}'. Use /switch {roomName} to make it active." + continue + + let serviceIdStr = "/waku/chat-room/" & roomName & "/1.0.0" + let contentTopic = "/chat2disco/1/" & roomName & "/proto" + + let serviceInfo = ServiceInfo(id: serviceIdStr, data: @[]) + + if not c.node.wakuKademlia.isNil(): + c.node.wakuKademlia.advertiseService(serviceInfo) + echo &"Advertising service: {serviceIdStr}" + + let peers = await c.node.wakuKademlia.lookup(serviceIdStr) + echo &"Discovered {peers.len} peer(s) for room '{roomName}'" + + if peers.len > 0: + await c.node.connectToNodes(peers) + echo "Connected to discovered peers" + + c.rooms[roomName] = ChatRoom( + serviceId: serviceIdStr, contentTopic: contentTopic, discovered: peers + ) + else: + echo "Warning: Kademlia not available. Room created locally only." + c.rooms[roomName] = + ChatRoom(serviceId: serviceIdStr, contentTopic: contentTopic, discovered: @[]) + + c.currentRoom = roomName + echo &"Created/joined room '{roomName}'. Content topic: {contentTopic}" + elif line.startsWith("/rooms"): + if c.rooms.len == 0: + echo "No rooms joined yet. Use /create to create one." + else: + echo "Joined rooms:" + for name, room in c.rooms: + let marker = if name == c.currentRoom: " *" else: "" + echo &" {name} ({room.discovered.len} peers){marker}" + elif line.startsWith("/switch"): + let roomName = line[7 ..^ 1].strip() + if roomName.len == 0: + echo "Usage: /switch " + continue + + if roomName notin c.rooms: + echo &"Room '{roomName}' not found. Use /create {roomName} to create it." + continue + + c.currentRoom = roomName + echo &"Switched to room '{roomName}'" + elif line.startsWith("/nick"): + c.nick = await readNick(c.transp) + echo "You are now known as " & c.nick + elif line.startsWith("/exit"): + echo "quitting..." + + try: + await c.node.stop() + except: + echo "exception happened when stopping: " & getCurrentExceptionMsg() + + quit(QuitSuccess) + else: + if c.started: + if c.rooms.len == 0: + echo "No room active. Use /create first." + else: + c.publish(line) + else: + try: + if line.startsWith("/") and "p2p" in line: + await c.node.connectToNodes(@[line]) + except: + echo &"unable to dial remote peer {line}" + echo getCurrentExceptionMsg() + +proc readWriteLoop(c: Chat) {.async.} = + asyncSpawn c.writeAndPrint() + asyncSpawn c.readAndPrint() + +proc readInput(wfd: AsyncFD) {.thread, raises: [Defect, CatchableError].} = + let transp = fromPipe(wfd) + + while true: + let line = stdin.readLine() + discard waitFor transp.write(line & "\r\n") + +{.pop.} +proc processInput(rfd: AsyncFD, rng: ref HmacDrbgContext) {.async.} = + let + transp = fromPipe(rfd) + conf = Chat2DiscoConf.load() + nodekey = + if conf.nodekey.isSome(): + conf.nodekey.get() + else: + PrivateKey.random(Secp256k1, rng[]).tryGet() + + # set log level + if conf.logLevel != LogLevel.NONE: + setLogLevel(conf.logLevel) + + let (extIp, extTcpPort, extUdpPort) = setupNat( + conf.nat, + clientId, + Port(uint16(conf.tcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift), + ).valueOr: + raise newException(ValueError, "setupNat error " & error) + + var enrBuilder = EnrBuilder.init(nodeKey) + + let record = enrBuilder.build().valueOr: + error "failed to create enr record", error = error + quit(QuitFailure) + + let node = block: + var builder = WakuNodeBuilder.init() + builder.withNodeKey(nodeKey) + builder.withRecord(record) + + builder + .withNetworkConfigurationDetails( + conf.listenAddress, + Port(uint16(conf.tcpPort) + conf.portsShift), + extIp, + extTcpPort, + wsBindPort = Port(uint16(conf.websocketPort) + conf.portsShift), + wsEnabled = conf.websocketSupport, + wssEnabled = conf.websocketSecureSupport, + ) + .tryGet() + builder.build().tryGet() + + if conf.relay: + (await node.mountRelay()).isOkOr: + echo "failed to mount relay: " & error + return + + await node.mountLibp2pPing() + + # Setup kademlia discovery if bootstrap nodes are provided + var providedServices: seq[ServiceInfo] = @[] + + if conf.kadBootstrapNodes.len > 0: + var kadBootstrapPeers: seq[(PeerId, seq[MultiAddress])] + for nodeStr in conf.kadBootstrapNodes: + let (peerId, ma) = parseFullAddress(nodeStr).valueOr: + error "Failed to parse kademlia bootstrap node", node = nodeStr, error = error + continue + kadBootstrapPeers.add((peerId, @[ma])) + + if kadBootstrapPeers.len > 0: + node.wakuKademlia = WakuKademlia.new( + node.switch, node.peerManager, kadBootstrapPeers, providedServices + ) + else: + # Create as seed node (no bootstrap) so we can still advertise services + node.wakuKademlia = + WakuKademlia.new(node.switch, node.peerManager, @[], providedServices) + + await node.start() + + if not node.wakuKademlia.isNil(): + node.wakuKademlia.start() + + let nick = await readNick(transp) + echo "Welcome, " & nick & "!" + + var chat = Chat( + node: node, + transp: transp, + subscribed: true, + started: true, + nick: nick, + prompt: false, + ) + + if conf.staticnodes.len > 0: + echo "Connecting to static peers..." + await node.connectToNodes(conf.staticnodes) + + var dnsDiscoveryUrl = none(string) + + if conf.fleet != Fleet.none: + echo "Connecting to " & $conf.fleet & " fleet using DNS discovery..." + + if conf.fleet == Fleet.test: + dnsDiscoveryUrl = some( + "enrtree://AOGYWMBYOUIMOENHXCHILPKY3ZRFEULMFI4DOM442QSZ73TT2A7VI@test.waku.nodes.status.im" + ) + else: + dnsDiscoveryUrl = some( + "enrtree://AIRVQ5DDA4FFWLRBCHJWUWOO6X6S4ZTZ5B667LQ6AJU6PEYDLRD5O@sandbox.waku.nodes.status.im" + ) + elif conf.dnsDiscoveryUrl != "": + info "Discovering nodes using Waku DNS discovery", url = conf.dnsDiscoveryUrl + dnsDiscoveryUrl = some(conf.dnsDiscoveryUrl) + + var discoveredNodes: seq[RemotePeerInfo] + + if dnsDiscoveryUrl.isSome: + var nameServers: seq[TransportAddress] + for ip in conf.dnsAddrsNameServers: + nameServers.add(initTAddress(ip, Port(53))) + + let dnsResolver = DnsResolver.new(nameServers) + + proc resolver(domain: string): Future[string] {.async, gcsafe.} = + trace "resolving", domain = domain + let resolved = await dnsResolver.resolveTxt(domain) + return resolved[0] + + let wakuDnsDiscovery = WakuDnsDiscovery.init(dnsDiscoveryUrl.get(), resolver) + if wakuDnsDiscovery.isOk: + let discoveredPeers = await wakuDnsDiscovery.get().findPeers() + if discoveredPeers.isOk: + info "Connecting to discovered peers" + discoveredNodes = discoveredPeers.get() + echo "Discovered and connecting to " & $discoveredNodes + waitFor chat.node.connectToNodes(discoveredNodes) + else: + warn "Failed to find peers via DNS discovery", error = discoveredPeers.error + else: + warn "Failed to init Waku DNS discovery", error = wakuDnsDiscovery.error + + let peerInfo = node.switch.peerInfo + let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId + echo &"Listening on\n {listenStr}" + + if (conf.storenode != "") or (conf.store == true): + await node.mountStore() + + var storenode: Option[RemotePeerInfo] + + if conf.storenode != "": + let peerInfo = parsePeerInfo(conf.storenode) + if peerInfo.isOk(): + storenode = some(peerInfo.value) + else: + error "Incorrect conf.storenode", error = peerInfo.error + elif discoveredNodes.len > 0: + echo "Store enabled, but no store nodes configured. Choosing one at random from discovered peers" + storenode = some(discoveredNodes[rand(0 .. len(discoveredNodes) - 1)]) + + if storenode.isSome(): + echo "Connecting to storenode: " & $(storenode.get()) + + node.mountStoreClient() + node.peerManager.addServicePeer(storenode.get(), WakuStoreCodec) + + # Subscribe to relay topic + if conf.relay: + proc handler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + for roomName, room in chat.rooms: + if msg.contentTopic == room.contentTopic: + let chatLine = getChatLine(msg.payload) + let prefix = + if chat.rooms.len > 1: + "[" & roomName & "] " + else: + "" + try: + echo &"{prefix}{chatLine}" + except ValueError: + echo prefix & chatLine + chat.prompt = false + showChatPrompt(chat) + break + + node.subscribe( + (kind: PubsubSub, topic: DefaultPubsubTopic), WakuRelayHandler(handler) + ).isOkOr: + error "failed to subscribe to pubsub topic", + topic = DefaultPubsubTopic, error = error + + if conf.metricsLogging: + startMetricsLog() + + if conf.metricsServer: + let metricsServer = startMetricsServer( + conf.metricsServerAddress, Port(conf.metricsServerPort + conf.portsShift) + ) + + await chat.readWriteLoop() + + runForever() + +proc main(rng: ref HmacDrbgContext) {.async.} = + let (rfd, wfd) = createAsyncPipe() + if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: + raise newException(ValueError, "Could not initialize pipe!") + + var thread: Thread[AsyncFD] + thread.createThread(readInput, wfd) + try: + await processInput(rfd, rng) + except ConfigurationError as e: + raise e + +when isMainModule: + let rng = crypto.newRng() + try: + waitFor(main(rng)) + except CatchableError as e: + raise e diff --git a/apps/chat2disco/config_chat2disco.nim b/apps/chat2disco/config_chat2disco.nim new file mode 100644 index 000000000..c703641b7 --- /dev/null +++ b/apps/chat2disco/config_chat2disco.nim @@ -0,0 +1,202 @@ +import + chronicles, + chronos, + confutils, + confutils/defs, + confutils/std/net, + eth/keys, + libp2p/crypto/crypto, + libp2p/crypto/secp, + nimcrypto/utils, + std/strutils +import waku/waku_core + +const + defaultMetricsAddress* = parseIpAddress("127.0.0.1") + defaultDnsResolver1* = parseIpAddress("1.1.1.1") + defaultDnsResolver2* = parseIpAddress("1.0.0.1") + +type + Fleet* = enum + none + prod + test + + Chat2DiscoConf* = object ## General node config + logLevel* {. + desc: "Sets the log level.", defaultValue: LogLevel.INFO, name: "log-level" + .}: LogLevel + + nodekey* {.desc: "P2P node private key as 64 char hex string.", name: "nodekey".}: + Option[crypto.PrivateKey] + + listenAddress* {. + defaultValue: defaultListenAddress(config), + desc: "Listening address for the LibP2P traffic.", + name: "listen-address" + .}: IpAddress + + tcpPort* {.desc: "TCP listening port.", defaultValue: 60000, name: "tcp-port".}: + Port + + udpPort* {.desc: "UDP listening port.", defaultValue: 60000, name: "udp-port".}: + Port + + portsShift* {. + desc: "Add a shift to all port numbers.", defaultValue: 0, name: "ports-shift" + .}: uint16 + + nat* {. + desc: + "Specify method to use for determining public address. " & + "Must be one of: any, none, upnp, pmp, extip:.", + defaultValue: "any" + .}: string + + ## Relay config + relay* {. + desc: "Enable relay protocol: true|false", defaultValue: true, name: "relay" + .}: bool + + staticnodes* {. + desc: "Peer multiaddr to directly connect with. Argument may be repeated.", + name: "staticnode" + .}: seq[string] + + keepAlive* {. + desc: "Enable keep-alive for idle connections: true|false", + defaultValue: false, + name: "keep-alive" + .}: bool + + clusterId* {. + desc: + "Cluster id that the node is running in. Node in a different cluster id is disconnected.", + defaultValue: 0, + name: "cluster-id" + .}: uint16 + + shards* {. + desc: + "Shards index to subscribe to [0..NUM_SHARDS_IN_NETWORK-1]. Argument may be repeated.", + defaultValue: @[uint16(0)], + name: "shard" + .}: seq[uint16] + + ## Store config + store* {. + desc: "Enable store protocol: true|false", defaultValue: true, name: "store" + .}: bool + + storenode* {. + desc: "Peer multiaddr to query for storage.", defaultValue: "", name: "storenode" + .}: string + + ## Metrics config + metricsServer* {. + desc: "Enable the metrics server: true|false", + defaultValue: false, + name: "metrics-server" + .}: bool + + metricsServerAddress* {. + desc: "Listening address of the metrics server.", + defaultValue: defaultMetricsAddress, + name: "metrics-server-address" + .}: IpAddress + + metricsServerPort* {. + desc: "Listening HTTP port of the metrics server.", + defaultValue: 8008, + name: "metrics-server-port" + .}: uint16 + + metricsLogging* {. + desc: "Enable metrics logging: true|false", + defaultValue: true, + name: "metrics-logging" + .}: bool + + ## DNS discovery config + dnsDiscoveryUrl* {. + desc: "URL for DNS node list in format 'enrtree://@'", + defaultValue: "", + name: "dns-discovery-url" + .}: string + + dnsAddrsNameServers* {. + desc: + "DNS name server IPs to query for DNS multiaddrs resolution. Argument may be repeated.", + defaultValue: @[defaultDnsResolver1, defaultDnsResolver2], + name: "dns-addrs-name-server" + .}: seq[IpAddress] + + ## Chat2 configuration + fleet* {. + desc: + "Select the fleet to connect to. This sets the DNS discovery URL to the selected fleet.", + defaultValue: Fleet.prod, + name: "fleet" + .}: Fleet + + contentTopic* {. + desc: "Content topic for chat messages.", + defaultValue: "/chat2disco/1/default/proto", + name: "content-topic" + .}: string + + ## Websocket Configuration + websocketSupport* {. + desc: "Enable websocket: true|false", + defaultValue: false, + name: "websocket-support" + .}: bool + + websocketPort* {. + desc: "WebSocket listening port.", defaultValue: 8000, name: "websocket-port" + .}: Port + + websocketSecureSupport* {. + desc: "WebSocket Secure Support.", + defaultValue: false, + name: "websocket-secure-support" + .}: bool + + ## Kademlia Discovery config + kadBootstrapNodes* {. + desc: + "Peer multiaddr for kademlia discovery bootstrap node (must include /p2p/). Argument may be repeated.", + name: "kad-bootstrap-node" + .}: seq[string] + +# NOTE: Keys are different in nim-libp2p +proc parseCmdArg*(T: type crypto.PrivateKey, p: string): T = + try: + let key = SkPrivateKey.init(utils.fromHex(p)).tryGet() + result = crypto.PrivateKey(scheme: Secp256k1, skkey: key) + except CatchableError as e: + raise newException(ValueError, "Invalid private key") + +proc completeCmdArg*(T: type crypto.PrivateKey, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type IpAddress, p: string): T = + try: + result = parseIpAddress(p) + except CatchableError as e: + raise newException(ValueError, "Invalid IP address") + +proc completeCmdArg*(T: type IpAddress, val: string): seq[string] = + return @[] + +proc parseCmdArg*(T: type Port, p: string): T = + try: + result = Port(parseInt(p)) + except CatchableError as e: + raise newException(ValueError, "Invalid Port number") + +proc completeCmdArg*(T: type Port, val: string): seq[string] = + return @[] + +func defaultListenAddress*(conf: Chat2DiscoConf): IpAddress = + (static parseIpAddress("0.0.0.0")) diff --git a/apps/chat2disco/nim.cfg b/apps/chat2disco/nim.cfg new file mode 100644 index 000000000..2231f2ebe --- /dev/null +++ b/apps/chat2disco/nim.cfg @@ -0,0 +1,4 @@ +-d:chronicles_line_numbers +-d:chronicles_runtime_filtering:on +-d:discv5_protocol_id:d5waku +path = "../.." diff --git a/waku.nimble b/waku.nimble index 7642c50ee..00be69426 100644 --- a/waku.nimble +++ b/waku.nimble @@ -432,6 +432,12 @@ task chat2mix, "Build example Waku chat mix usage": "-d:chronicles_sinks=textlines[file] -d:chronicles_log_level=TRACE " # -d:ssl - cause unlisted exception error in libp2p/utility... +task chat2disco, "Build example Waku chat with service discovery": + let name = "chat2disco" + buildBinary name, + "apps/chat2disco/", + "-d:chronicles_sinks=textlines[file] -d:chronicles_log_level=DEBUG " + task chat2bridge, "Build chat2bridge": let name = "chat2bridge" buildBinary name, "apps/chat2bridge/" diff --git a/waku/discovery/waku_kademlia.nim b/waku/discovery/waku_kademlia.nim index ae56ba0fa..d07632e7b 100644 --- a/waku/discovery/waku_kademlia.nim +++ b/waku/discovery/waku_kademlia.nim @@ -26,6 +26,7 @@ type WakuKademlia* = ref object loopInterval: Duration #periodicWalkFut: Future[void] periodicLookupFut: Future[void] + discoveredServices*: seq[string] proc toRemotePeerInfo(record: ExtendedPeerRecord): Option[RemotePeerInfo] = debug "processing kademlia record", @@ -123,6 +124,15 @@ proc lookup*( return peerInfos +proc registerLookupService*(self: WakuKademlia, serviceId: string) = + if serviceId notin self.discoveredServices: + self.protocol.startDiscovering(serviceId) + self.discoveredServices.add(serviceId) + +proc advertiseService*(self: WakuKademlia, service: ServiceInfo) = + self.protocol.addProvidedService(service) + self.registerLookupService(service.id) + #[ proc periodicRandomWalk( self: WakuKademlia, interval: Duration ) {.async: (raises: [CancelledError]).} = @@ -141,11 +151,13 @@ proc periodicLookup( while true: await sleepAsync(interval) - # For testing lets use only one hard-coded service - # Same as the advertised one - let peers = await self.lookup("delivery") + let services = self.discoveredServices + if services.len == 0: + continue - debug "lookup complete", peer_found = peers.len + for serviceId in services: + let peers = await self.lookup(serviceId) + debug "periodic lookup complete", service = serviceId, peerCount = peers.len proc new*( T: type WakuKademlia, @@ -168,8 +180,15 @@ proc new*( xprPublishing = xprPublishing, ) + var initialServices: seq[string] + for svc in providedServices: + initialServices.add(svc.id) + return WakuKademlia( - protocol: kademlia, peerManager: peerManager, loopInterval: loopInterval + protocol: kademlia, + peerManager: peerManager, + loopInterval: loopInterval, + discoveredServices: initialServices, ) proc start*(self: WakuKademlia) =