mirror of
https://github.com/waku-org/nwaku.git
synced 2025-02-25 21:35:36 +00:00
Basic end to end chat for Dingpu (#195)
* End to end chat example for Dingpu Partially based on directchat example and previous example code. Also adds existing cluster node to Nangang README. * Update waku.nimble * Update Makefile Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com> * Update Makefile Co-authored-by: Dean Eigenmann <7621705+decanus@users.noreply.github.com>
This commit is contained in:
parent
3eb015ee7b
commit
75be455272
4
Makefile
4
Makefile
@ -104,6 +104,10 @@ wakuexample2:
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim wakuexample2 $(NIM_PARAMS) waku.nims
|
||||
|
||||
chat2:
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim chat2 $(NIM_PARAMS) waku.nims
|
||||
|
||||
# symlink
|
||||
waku.nims:
|
||||
ln -s waku.nimble $@
|
||||
|
14
docs/tutorial/dingpu.md
Normal file
14
docs/tutorial/dingpu.md
Normal file
@ -0,0 +1,14 @@
|
||||
# Dingpu testnet
|
||||
|
||||
## Basic chat usage
|
||||
|
||||
Start two chat apps:
|
||||
|
||||
```
|
||||
./build/chat2 --ports-shift=0
|
||||
./build/chat2 --ports-shift=1
|
||||
```
|
||||
|
||||
Type `/connect` then paste address of other node.
|
||||
|
||||
Then type messages to publish.
|
@ -34,3 +34,9 @@ Do basic RPC calls:
|
||||
```
|
||||
|
||||
You should see other node receive something.
|
||||
|
||||
## Nangang cluster node
|
||||
|
||||
```
|
||||
/ip4/134.209.139.210/tcp/60000/p2p/16Uiu2HAmJb2e28qLXxT5kZxVUUoJt72EMzNGXB47Rxx5hw3q4YjS
|
||||
```
|
||||
|
209
examples/v2/chat2.nim
Normal file
209
examples/v2/chat2.nim
Normal file
@ -0,0 +1,209 @@
|
||||
when not(compileOption("threads")):
|
||||
{.fatal: "Please, compile this program with the --threads:on option!".}
|
||||
|
||||
import std/[tables, strformat, strutils]
|
||||
import confutils, chronicles, chronos, stew/shims/net as stewNet,
|
||||
eth/keys, bearssl
|
||||
import libp2p/[switch, # manage transports, a single entry point for dialing and listening
|
||||
multistream, # tag stream with short header to identify it
|
||||
crypto/crypto, # cryptographic functions
|
||||
errors, # error handling utilities
|
||||
protocols/identify, # identify the peer info of a peer
|
||||
stream/connection, # create and close stream read / write connections
|
||||
transports/transport, # listen and dial to other peers using p2p protocol
|
||||
transports/tcptransport, # listen and dial to other peers using client-server protocol
|
||||
multiaddress, # encode different addressing schemes. For example, /ip4/7.7.7.7/tcp/6543 means it is using IPv4 protocol and TCP
|
||||
peerinfo, # manage the information of a peer, such as peer ID and public / private key
|
||||
peerid, # Implement how peers interact
|
||||
protocols/protocol, # define the protocol base type
|
||||
protocols/secure/secure, # define the protocol of secure connection
|
||||
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
|
||||
muxers/muxer, # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
|
||||
muxers/mplex/mplex] # define some contants and message types for stream multiplexing
|
||||
import ../../waku/node/v2/[config, wakunode2, waku_types],
|
||||
../../waku/protocol/v2/waku_relay,
|
||||
../../waku/node/common
|
||||
|
||||
const Help = """
|
||||
Commands: /[?|help|connect|disconnect|exit]
|
||||
help: Prints this help
|
||||
connect: dials a remote peer
|
||||
disconnect: ends current session
|
||||
exit: closes the chat
|
||||
"""
|
||||
|
||||
const DefaultTopic = "waku"
|
||||
const DefaultContentTopic = "dingpu"
|
||||
|
||||
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
|
||||
# Could poll connection pool or something here, I suppose
|
||||
# TODO Ensure connected turns true on incoming connections, or get rid of it
|
||||
type Chat = ref object
|
||||
node: WakuNode # waku node for publishing, subscribing, etc
|
||||
transp: StreamTransport # transport streams between read & write file descriptor
|
||||
subscribed: bool # indicates if a node is subscribed or not to a topic
|
||||
connected: bool # if the node is connected to another peer
|
||||
started: bool # if the node has started
|
||||
|
||||
type
|
||||
PrivateKey* = crypto.PrivateKey
|
||||
Topic* = waku_types.Topic
|
||||
|
||||
proc initAddress(T: type MultiAddress, str: string): T =
|
||||
let address = MultiAddress.init(str).tryGet()
|
||||
if IPFS.match(address) and matchPartial(multiaddress.TCP, address):
|
||||
result = address
|
||||
else:
|
||||
raise newException(ValueError,
|
||||
"Invalid bootstrap node multi-address")
|
||||
|
||||
# NOTE Dialing on WakuRelay specifically
|
||||
proc dialPeer(c: Chat, address: string) {.async.} =
|
||||
let multiAddr = MultiAddress.initAddress(address)
|
||||
let parts = address.split("/")
|
||||
let remotePeer = PeerInfo.init(parts[^1], [multiAddr])
|
||||
|
||||
echo &"dialing peer: {multiAddr}"
|
||||
# XXX Discarding conn, do we want to keep this here?
|
||||
discard await c.node.switch.dial(remotePeer, WakuRelayCodec)
|
||||
c.connected = true
|
||||
|
||||
proc publish(c: Chat, line: string) =
|
||||
let payload = cast[seq[byte]](line)
|
||||
let message = WakuMessage(payload: payload, contentTopic: DefaultContentTopic)
|
||||
c.node.publish(DefaultTopic, message)
|
||||
|
||||
# TODO This should read or be subscribe handler subscribe
|
||||
proc readAndPrint(c: Chat) {.async.} =
|
||||
while true:
|
||||
# while p.connected:
|
||||
# # TODO: echo &"{p.id} -> "
|
||||
#
|
||||
# echo cast[string](await p.conn.readLp(1024))
|
||||
#echo "readAndPrint subscribe NYI"
|
||||
await sleepAsync(100.millis)
|
||||
|
||||
# TODO Implement
|
||||
proc writeAndPrint(c: Chat) {.async.} =
|
||||
while true:
|
||||
# Connect state not updated on incoming WakuRelay connections
|
||||
# if not c.connected:
|
||||
# echo "type an address or wait for a connection:"
|
||||
# echo "type /[help|?] for help"
|
||||
|
||||
let line = await c.transp.readLine()
|
||||
if line.startsWith("/help") or line.startsWith("/?") or not c.started:
|
||||
echo Help
|
||||
continue
|
||||
|
||||
# if line.startsWith("/disconnect"):
|
||||
# echo "Ending current session"
|
||||
# if p.connected and p.conn.closed.not:
|
||||
# await p.conn.close()
|
||||
# p.connected = false
|
||||
elif line.startsWith("/connect"):
|
||||
# TODO Should be able to connect to multiple peers for Waku chat
|
||||
if c.connected:
|
||||
echo "already connected to at least one peer"
|
||||
continue
|
||||
|
||||
echo "enter address of remote peer"
|
||||
let address = await c.transp.readLine()
|
||||
if address.len > 0:
|
||||
await c.dialPeer(address)
|
||||
|
||||
# elif line.startsWith("/exit"):
|
||||
# if p.connected and p.conn.closed.not:
|
||||
# await p.conn.close()
|
||||
# p.connected = false
|
||||
#
|
||||
# await p.switch.stop()
|
||||
# echo "quitting..."
|
||||
# quit(0)
|
||||
else:
|
||||
# XXX connected state problematic
|
||||
if c.started:
|
||||
c.publish(line)
|
||||
# TODO Connect to peer logic?
|
||||
else:
|
||||
try:
|
||||
if line.startsWith("/") and "p2p" in line:
|
||||
await c.dialPeer(line)
|
||||
except:
|
||||
echo &"unable to dial remote peer {line}"
|
||||
echo getCurrentExceptionMsg()
|
||||
|
||||
proc readWriteLoop(c: Chat) {.async.} =
|
||||
asyncCheck c.writeAndPrint() # execute the async function but does not block
|
||||
asyncCheck c.readAndPrint()
|
||||
|
||||
proc readInput(wfd: AsyncFD) {.thread.} =
|
||||
## This procedure performs reading from `stdin` and sends data over
|
||||
## pipe to main thread.
|
||||
let transp = fromPipe(wfd)
|
||||
|
||||
while true:
|
||||
let line = stdin.readLine()
|
||||
discard waitFor transp.write(line & "\r\n")
|
||||
|
||||
proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
|
||||
let transp = fromPipe(rfd)
|
||||
|
||||
let
|
||||
conf = WakuNodeConf.load()
|
||||
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift),
|
||||
Port(uint16(conf.udpPort) + conf.portsShift))
|
||||
node = WakuNode.init(conf.nodeKey, conf.libp2pAddress,
|
||||
Port(uint16(conf.tcpPort) + conf.portsShift), extIp, extTcpPort, conf.topics.split(" "))
|
||||
|
||||
# waitFor vs await
|
||||
await node.start()
|
||||
|
||||
let peerInfo = node.peerInfo
|
||||
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
|
||||
echo &"Listening on\n {listenStr}"
|
||||
|
||||
# Subscribe to a topic
|
||||
# TODO To get end to end sender would require more information in payload
|
||||
# We could possibly indicate the relayer point with connection somehow probably (?)
|
||||
let topic = cast[Topic](DefaultTopic)
|
||||
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
|
||||
let message = WakuMessage.init(data).value
|
||||
let payload = cast[string](message.payload)
|
||||
echo &"{payload}"
|
||||
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
|
||||
await node.subscribe(topic, handler)
|
||||
|
||||
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true)
|
||||
|
||||
await chat.readWriteLoop()
|
||||
runForever()
|
||||
#await allFuturesThrowing(libp2pFuts)
|
||||
|
||||
proc main() {.async.} =
|
||||
let rng = crypto.newRng() # Singe random number source for the whole application
|
||||
let (rfd, wfd) = createAsyncPipe()
|
||||
if rfd == asyncInvalidPipe or wfd == asyncInvalidPipe:
|
||||
raise newException(ValueError, "Could not initialize pipe!")
|
||||
|
||||
var thread: Thread[AsyncFD]
|
||||
thread.createThread(readInput, wfd)
|
||||
|
||||
await processInput(rfd, rng)
|
||||
|
||||
when isMainModule: # isMainModule = true when the module is compiled as the main file
|
||||
waitFor(main())
|
||||
|
||||
## Dump of things that can be improved:
|
||||
##
|
||||
## - Incoming dialed peer does not change connected state (not relying on it for now)
|
||||
## - Unclear if staticnode argument works (can enter manually)
|
||||
## - Don't trigger self / double publish own messages
|
||||
## - Integrate store protocol (fetch messages in beginning)
|
||||
## - Integrate filter protocol (default/option to be light node, connect to filter node)
|
||||
## - Test/default to cluster node connection (diff protocol version)
|
||||
## - Redirect logs to separate file
|
||||
## - Expose basic publish/subscribe etc commands with /syntax
|
||||
## - Show part of peerid to know who sent message
|
||||
## - Deal with protobuf messages (e.g. other chat protocol, or encrypted)
|
@ -76,3 +76,9 @@ task scripts2, "Build Waku v2 scripts":
|
||||
task wakuexample2, "Build example Waku usage":
|
||||
let name = "basic2"
|
||||
buildBinary name, "examples/v2/", "-d:chronicles_log_level=DEBUG"
|
||||
|
||||
task chat2, "Build example Waku chat usage":
|
||||
let name = "chat2"
|
||||
# NOTE For debugging, set debug level. For chat usage we want minimal log
|
||||
# output to STDOUT. Can be fixed by redirecting logs to file (e.g.)
|
||||
buildBinary name, "examples/v2/", "-d:chronicles_log_level=WARN"
|
||||
|
Loading…
x
Reference in New Issue
Block a user