diff --git a/Makefile b/Makefile index 9bd654355..4dca8b7c9 100644 --- a/Makefile +++ b/Makefile @@ -104,6 +104,10 @@ wakuexample2: echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim wakuexample2 $(NIM_PARAMS) waku.nims +chat2: + echo -e $(BUILD_MSG) "build/$@" && \ + $(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims + # symlink waku.nims: ln -s waku.nimble $@ diff --git a/docs/tutorial/dingpu.md b/docs/tutorial/dingpu.md new file mode 100644 index 000000000..32df2b923 --- /dev/null +++ b/docs/tutorial/dingpu.md @@ -0,0 +1,14 @@ +# Dingpu testnet + +## Basic chat usage + +Start two chat apps: + +``` +./build/chat2 --ports-shift=0 +./build/chat2 --ports-shift=1 +``` + +Type `/connect` then paste address of other node. + +Then type messages to publish. diff --git a/docs/tutorial/nangang.md b/docs/tutorial/nangang.md index 697c9f327..99d63a5cc 100644 --- a/docs/tutorial/nangang.md +++ b/docs/tutorial/nangang.md @@ -34,3 +34,9 @@ Do basic RPC calls: ``` You should see other node receive something. + +## Nangang cluster node + +``` +/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS +``` diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim new file mode 100644 index 000000000..58be2eed5 --- /dev/null +++ b/examples/v2/chat2.nim @@ -0,0 +1,209 @@ +when not(compileOption("threads")): + {.fatal: "Please, compile this program with the --threads:on option!".} + +import std/[tables, strformat, strutils] +import confutils, chronicles, chronos, stew/shims/net as stewNet, + eth/keys, bearssl +import libp2p/[switch, # manage transports, a single entry point for dialing and listening + multistream, # tag stream with short header to identify it + crypto/crypto, # cryptographic functions + errors, # error handling utilities + protocols/identify, # identify the peer info of a peer + stream/connection, # create and close stream read / write connections + transports/transport, # listen and dial to other peers using p2p protocol + transports/tcptransport, # listen and dial to other peers using client-server protocol + multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP + peerinfo, # manage the information of a peer, such as peer ID and public / private key + peerid, # Implement how peers interact + protocols/protocol, # define the protocol base type + protocols/secure/secure, # define the protocol of secure connection + protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS + muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection + muxers/mplex/mplex] # define some contants and message types for stream multiplexing +import ../../waku/node/v2/[config, wakunode2, waku_types], + ../../waku/protocol/v2/waku_relay, + ../../waku/node/common + +const Help = """ + Commands: /[?|help|connect|disconnect|exit] + help: Prints this help + connect: dials a remote peer + disconnect: ends current session + exit: closes the chat +""" + +const DefaultTopic = "waku" +const DefaultContentTopic = "dingpu" + +# XXX Connected is a bit annoying, because incoming connections don't trigger state change +# Could poll connection pool or something here, I suppose +# TODO Ensure connected turns true on incoming connections, or get rid of it +type Chat = ref object + node: WakuNode # waku node for publishing, subscribing, etc + transp: StreamTransport # transport streams between read & write file descriptor + subscribed: bool # indicates if a node is subscribed or not to a topic + connected: bool # if the node is connected to another peer + started: bool # if the node has started + +type + PrivateKey* = crypto.PrivateKey + Topic* = waku_types.Topic + +proc initAddress(T: type MultiAddress, str: string): T = + let address = MultiAddress.init(str).tryGet() + if IPFS.match(address) and matchPartial(multiaddress.TCP, address): + result = address + else: + raise newException(ValueError, + "Invalid bootstrap node multi-address") + +# NOTE Dialing on WakuRelay specifically +proc dialPeer(c: Chat, address: string) {.async.} = + let multiAddr = MultiAddress.initAddress(address) + let parts = address.split("/") + let remotePeer = PeerInfo.init(parts[^1], [multiAddr]) + + echo &"dialing peer: {multiAddr}" + # XXX Discarding conn, do we want to keep this here? + discard await c.node.switch.dial(remotePeer, WakuRelayCodec) + c.connected = true + +proc publish(c: Chat, line: string) = + let payload = cast[seq[byte]](line) + let message = WakuMessage(payload: payload, contentTopic: DefaultContentTopic) + c.node.publish(DefaultTopic, message) + +# TODO This should read or be subscribe handler subscribe +proc readAndPrint(c: Chat) {.async.} = + while true: +# while p.connected: +# # TODO: echo &"{p.id} -> " +# +# echo cast[string](await p.conn.readLp(1024)) + #echo "readAndPrint subscribe NYI" + await sleepAsync(100.millis) + +# TODO Implement +proc writeAndPrint(c: Chat) {.async.} = + while true: +# Connect state not updated on incoming WakuRelay connections +# if not c.connected: +# echo "type an address or wait for a connection:" +# echo "type /[help|?] for help" + + let line = await c.transp.readLine() + if line.startsWith("/help") or line.startsWith("/?") or not c.started: + echo Help + continue + +# if line.startsWith("/disconnect"): +# echo "Ending current session" +# if p.connected and p.conn.closed.not: +# await p.conn.close() +# p.connected = false + elif line.startsWith("/connect"): + # TODO Should be able to connect to multiple peers for Waku chat + if c.connected: + echo "already connected to at least one peer" + continue + + echo "enter address of remote peer" + let address = await c.transp.readLine() + if address.len > 0: + await c.dialPeer(address) + +# elif line.startsWith("/exit"): +# if p.connected and p.conn.closed.not: +# await p.conn.close() +# p.connected = false +# +# await p.switch.stop() +# echo "quitting..." +# quit(0) + else: + # XXX connected state problematic + if c.started: + c.publish(line) + # TODO Connect to peer logic? + else: + try: + if line.startsWith("/") and "p2p" in line: + await c.dialPeer(line) + except: + echo &"unable to dial remote peer {line}" + echo getCurrentExceptionMsg() + +proc readWriteLoop(c: Chat) {.async.} = + asyncCheck c.writeAndPrint() # execute the async function but does not block + asyncCheck c.readAndPrint() + +proc readInput(wfd: AsyncFD) {.thread.} = + ## This procedure performs reading from `stdin` and sends data over + ## pipe to main thread. + let transp = fromPipe(wfd) + + while true: + let line = stdin.readLine() + discard waitFor transp.write(line & "\r\n") + +proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = + let transp = fromPipe(rfd) + + let + conf = WakuNodeConf.load() + (extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId, + Port(uint16(conf.tcpPort) + conf.portsShift), + Port(uint16(conf.udpPort) + conf.portsShift)) + node = WakuNode.init(conf.nodeKey, conf.libp2pAddress, + Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort, conf.topics.split(" ")) + + # waitFor vs await + await node.start() + + let peerInfo = node.peerInfo + let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId + echo &"Listening on\n {listenStr}" + + # Subscribe to a topic + # TODO To get end to end sender would require more information in payload + # We could possibly indicate the relayer point with connection somehow probably (?) + let topic = cast[Topic](DefaultTopic) + proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = + let message = WakuMessage.init(data).value + let payload = cast[string](message.payload) + echo &"{payload}" + info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic + await node.subscribe(topic, handler) + + var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true) + + await chat.readWriteLoop() + runForever() + #await allFuturesThrowing(libp2pFuts) + +proc main() {.async.} = + let rng = crypto.newRng() # Singe random number source for the whole application + let (rfd, wfd) = createAsyncPipe() + if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe: + raise newException(ValueError, "Could not initialize pipe!") + + var thread: Thread[AsyncFD] + thread.createThread(readInput, wfd) + + await processInput(rfd, rng) + +when isMainModule: # isMainModule = true when the module is compiled as the main file + waitFor(main()) + +## Dump of things that can be improved: +## +## - Incoming dialed peer does not change connected state (not relying on it for now) +## - Unclear if staticnode argument works (can enter manually) +## - Don't trigger self / double publish own messages +## - Integrate store protocol (fetch messages in beginning) +## - Integrate filter protocol (default/option to be light node, connect to filter node) +## - Test/default to cluster node connection (diff protocol version) +## - Redirect logs to separate file +## - Expose basic publish/subscribe etc commands with /syntax +## - Show part of peerid to know who sent message +## - Deal with protobuf messages (e.g. other chat protocol, or encrypted) diff --git a/waku.nimble b/waku.nimble index 171bd1cef..8c2e29122 100644 --- a/waku.nimble +++ b/waku.nimble @@ -76,3 +76,9 @@ task scripts2, "Build Waku v2 scripts": task wakuexample2, "Build example Waku usage": let name = "basic2" buildBinary name, "examples/v2/", "-d:chronicles_log_level=DEBUG" + +task chat2, "Build example Waku chat usage": + let name = "chat2" + # NOTE For debugging, set debug level. For chat usage we want minimal log + # output to STDOUT. Can be fixed by redirecting logs to file (e.g.) + buildBinary name, "examples/v2/", "-d:chronicles_log_level=WARN"