chat2 improvements (#575)

* chat2 improvements

* Further improvements, content topic config option

* Add content-topic config for chat2bridge
This commit is contained in:
Hanno Cornelius 2021-05-26 15:48:09 +02:00 committed by GitHub
parent e6d7e3a2b4
commit 131ce6c43b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 354 additions and 52 deletions

View File

@ -2,7 +2,12 @@
## Background
The `chat2` application is a basic command-line chat app using the [Waku v2 suite of protocols](https://specs.vac.dev/specs/waku/v2/waku-v2). It connects to a [fleet of test nodes](fleets.status.im) to provide end-to-end p2p chat capabilities. The Waku team is currently using this application for internal testing. If you want try our protocols, or join the dogfooding fun, follow the instructions below.
The `chat2` application is a basic command-line chat app using the [Waku v2 suite of protocols](https://specs.vac.dev/specs/waku/v2/waku-v2).
It optionally connects to a [fleet of nodes](fleets.status.im) to provide end-to-end p2p chat capabilities.
Each fleet is a publicly accessible network of Waku v2 peers, providing a bootstrap connection point for new peers, historical message storage, etc.
The Waku team is currently using this application on the _production_ fleet for internal testing.
For more information on the available fleets, see [`Connecting to a Waku v2 fleet`](#connecting-to-a-waku-v2-fleet).
If you want try our protocols, or join the dogfooding fun, follow the instructions below.
## Preparation
@ -28,29 +33,47 @@ You should be prompted to provide a nickname for the chat session.
Choose a nickname >>
```
After entering a nickname, the app will randomly select and connect to a peer from the test fleet.
After entering a nickname, the app will randomly select and connect to a peer from the `prod` fleet.
```
No static peers configured. Choosing one at random from test fleet...
No static peers configured. Choosing one at random from prod fleet...
```
It will then attempt to download historical messages from a random peer in the `prod` fleet.
```
Store enabled, but no store nodes configured. Choosing one at random from prod fleet...
```
Wait for the chat prompt (`>>`) and chat away!
To gracefully exit the `chat2` application, use the `/exit` [in-chat option](#in-chat-options)
```
>> /exit
quitting...
```
## Retrieving historical messages
The `chat2` application can retrieve historical chat messages from a node supporting and running the [Waku v2 store protocol](https://specs.vac.dev/specs/waku/v2/waku-store). Just specify the selected node's `multiaddr` as `storenode` when starting the app:
The `chat2` application can retrieve historical chat messages from a node supporting and running the [Waku v2 store protocol](https://specs.vac.dev/specs/waku/v2/waku-store), and will attempt to do so by default.
It's possible to query a *specific* store node by configuring its `multiaddr` as `storenode` when starting the app:
```
./build/chat2 --storenode:/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ
```
Alternatively, the `chat2` application will select a random `storenode` for you from the test fleet if `storenode` left unspecified.
Alternatively, the `chat2` application will select a random `storenode` for you from the configured fleet (`prod` by default) if `storenode` is left unspecified.
```
./build/chat2
```
> *NOTE: Currently (Mar 3, 2021) the only node in the test fleet that provides reliable store functionality is `/ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ`. We're working on fixing this.*
To disable historical message retrieval, use the `--store:false` option:
```
./build/chat2 --store:false
```
## Specifying a static peer
@ -62,6 +85,31 @@ In order to connect to a *specific* node as [`relay`](https://specs.vac.dev/spec
This will bypass the random peer selection process and connect to the specified node.
## Connecting to a Waku v2 fleet
It is possible to specify a specific Waku v2 fleet to connect to when starting the app by using the `--fleet` option:
```
./build/chat2 --fleet:test
```
There are currently two fleets to select from, namely _production_ (`wakuv2.prod`) and _test_ (`wakuv2.test`).
The `test` fleet is updated with each incremental change to the `nim-waku` codebase.
As a result it may have more advanced and experimental features, but will be less stable than `prod`.
The `prod` fleet is a deployed network of the latest released Waku v2 nodes.
If no `fleet` is specified, `chat2` will connect to the `prod` fleet by default.
To start `chat2` without connecting to a fleet, use the `--fleet:none` option _or_ [specify a static peer](#specifying-a-static-peer).
## Specifying a content topic
To publish chat messages on a specific [content topic](https://rfc.vac.dev/spec/14/#wakumessage), use the `--content-topic` option:
```
./build/chat2 --content-topic:/waku/2/my-content-topic/proto
```
> **NOTE:** Currently (2021/05/26) the content topic defaults to `/waku/2/huilong/proto` if left unspecified, where `huilong` is the name of our latest testnet.
## In-chat options
| Command | Effect |
@ -83,7 +131,8 @@ message Chat2Message {
}
```
where `timestamp` is the Unix timestamp of the message, `nick` is the relevant `chat2` user's selected nickname and `payload` is the actual chat message being sent. The `payload` is the byte array representation of a UTF8 encoded string.
where `timestamp` is the Unix timestamp of the message, `nick` is the relevant `chat2` user's selected nickname and `payload` is the actual chat message being sent.
The `payload` is the byte array representation of a UTF8 encoded string.
# Bridge messages between `chat2` and matterbridge

View File

@ -4,7 +4,7 @@
when not(compileOption("threads")):
{.fatal: "Please, compile this program with the --threads:on option!".}
import std/[tables, strformat, strutils, times, httpclient, json, sequtils, random]
import std/[tables, strformat, strutils, times, httpclient, json, sequtils, random, options]
import confutils, chronicles, chronos, stew/shims/net as stewNet,
eth/keys, bearssl, stew/[byteutils, endians2],
nimcrypto/pbkdf2
@ -18,15 +18,16 @@ import libp2p/[switch, # manage transports, a single entry poi
protocols/protocol, # define the protocol base type
protocols/secure/secio, # define the protocol of secure input / output, allows encrypted communication that uses public keys to validate signed messages instead of a certificate authority like in TLS
muxers/muxer] # define an interface for stream multiplexing, allowing peers to offer many protocols over a single connection
import ../../waku/v2/node/[config, wakunode2, waku_payload],
import ../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/utils/peers,
../../waku/common/utils/nat
../../waku/common/utils/nat,
./config_chat2
const Help = """
Commands: /[?|help|connect|nick]
Commands: /[?|help|connect|nick|exit]
help: Prints this help
connect: dials a remote peer
nick: change nickname for current chat session
@ -36,7 +37,6 @@ const Help = """
const
PayloadV1* {.booldefine.} = false
DefaultTopic* = "/waku/2/default-waku/proto"
DefaultContentTopic* = ContentTopic("dingpu")
# XXX Connected is a bit annoying, because incoming connections don't trigger state change
# Could poll connection pool or something here, I suppose
@ -49,6 +49,8 @@ type Chat = ref object
started: bool # if the node has started
nick: string # nickname for this chat session
prompt: bool # chat prompt is showing
contentTopic: string # default content topic for chat messages
symkey: SymKey # SymKey used for v1 payload encryption (if enabled)
type
PrivateKey* = crypto.PrivateKey
@ -102,8 +104,6 @@ proc generateSymKey(contentTopic: ContentTopic): SymKey =
symKey
let DefaultSymKey = generateSymKey(DefaultContentTopic)
proc connectToNodes(c: Chat, nodes: seq[string]) {.async.} =
echo "Connecting to nodes"
await c.node.connectToNodes(nodes)
@ -115,15 +115,15 @@ proc showChatPrompt(c: Chat) =
stdout.flushFile()
c.prompt = true
proc selectRandomNode(): string =
proc selectRandomNode(fleetStr: string): string =
randomize()
let
# Get latest fleet
# Get latest fleet addresses
fleet = newHttpClient().getContent("https://fleets.status.im")
# Select the JSONObject corresponding to the wakuv2 test fleet and convert to seq of key-val pairs
nodes = toSeq(fleet.parseJson(){"fleets", "wakuv2.test", "waku"}.pairs())
# Select the JSONObject corresponding to the selected wakuv2 fleet and convert to seq of key-val pairs
nodes = toSeq(fleet.parseJson(){"fleets", "wakuv2." & fleetStr, "waku"}.pairs())
# Select a random node from the test fleet, convert to string and return
# Select a random node from the selected fleet, convert to string and return
return nodes[rand(nodes.len - 1)].val.getStr()
proc readNick(transp: StreamTransport): Future[string] {.async.} =
@ -141,19 +141,19 @@ proc publish(c: Chat, line: string) =
when PayloadV1:
# Use Waku v1 payload encoding/encryption
let
payload = Payload(payload: chat2pb.buffer, symKey: some(DefaultSymKey))
payload = Payload(payload: chat2pb.buffer, symKey: some(c.symKey))
version = 1'u32
encodedPayload = payload.encode(version, c.node.rng[])
if encodedPayload.isOk():
let message = WakuMessage(payload: encodedPayload.get(),
contentTopic: DefaultContentTopic, version: version)
contentTopic: c.contentTopic, version: version)
asyncSpawn c.node.publish(DefaultTopic, message)
else:
warn "Payload encoding failed", error = encodedPayload.error
else:
# No payload encoding/encryption from Waku
let message = WakuMessage(payload: chat2pb.buffer,
contentTopic: DefaultContentTopic, version: 0)
contentTopic: c.contentTopic, version: 0)
asyncSpawn c.node.publish(DefaultTopic, message)
# TODO This should read or be subscribe handler subscribe
@ -238,7 +238,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let transp = fromPipe(rfd)
let
conf = WakuNodeConf.load()
conf = Chat2Conf.load()
(extIp, extTcpPort, extUdpPort) = setupNat(conf.nat, clientId,
Port(uint16(conf.tcpPort) + conf.portsShift),
Port(uint16(conf.udpPort) + conf.portsShift))
@ -255,15 +255,23 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
let nick = await readNick(transp)
echo "Welcome, " & nick & "!"
var chat = Chat(node: node, transp: transp, subscribed: true, connected: false, started: true, nick: nick, prompt: false)
var chat = Chat(node: node,
transp: transp,
subscribed: true,
connected: false,
started: true,
nick: nick,
prompt: false,
contentTopic: conf.contentTopic,
symKey: generateSymKey(conf.contentTopic))
if conf.staticnodes.len > 0:
await connectToNodes(chat, conf.staticnodes)
else:
elif conf.fleet != Fleet.none:
# Connect to at least one random fleet node
echo "No static peers configured. Choosing one at random from test fleet..."
echo "No static peers configured. Choosing one at random from " & $conf.fleet & " fleet..."
let randNode = selectRandomNode()
let randNode = selectRandomNode($conf.fleet)
echo "Connecting to " & randNode
@ -279,18 +287,21 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
if (conf.storenode != "") or (conf.store == true):
node.mountStore(persistMessages = conf.persistMessages)
var storenode: string
var storenode: Option[string]
if conf.storenode != "":
storenode = conf.storenode
else:
echo "Store enabled, but no store nodes configured. Choosing one at random from test fleet..."
storenode = some(conf.storenode)
elif conf.fleet != Fleet.none:
echo "Store enabled, but no store nodes configured. Choosing one at random from " & $conf.fleet & " fleet..."
storenode = selectRandomNode()
storenode = some(selectRandomNode($conf.fleet))
echo "Connecting to storenode: " & storenode
echo "Connecting to storenode: " & storenode.get()
node.wakuStore.setPeer(parsePeerInfo(storenode))
if storenode.isSome():
# We have a viable storenode. Let's query it for historical messages.
node.wakuStore.setPeer(parsePeerInfo(storenode.get()))
proc storeHandler(response: HistoryResponse) {.gcsafe.} =
for msg in response.messages:
@ -301,7 +312,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
echo &"{chatLine}"
info "Hit store handler"
await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: DefaultContentTopic)]), storeHandler)
await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]), storeHandler)
if conf.filternode != "":
node.mountFilter()
@ -317,7 +328,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
info "Hit filter handler"
await node.subscribe(
FilterRequest(contentFilters: @[ContentFilter(contentTopic: DefaultContentTopic)], pubSubTopic: DefaultTopic, subscribe: true),
FilterRequest(contentFilters: @[ContentFilter(contentTopic: chat.contentTopic)], pubSubTopic: DefaultTopic, subscribe: true),
filterHandler
)
@ -331,7 +342,7 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
when PayloadV1:
# Use Waku v1 payload encoding/encryption
let
keyInfo = KeyInfo(kind: Symmetric, symKey: DefaultSymKey)
keyInfo = KeyInfo(kind: Symmetric, symKey: chat.symKey)
decodedPayload = decodePayload(decoded.get(), keyInfo)
if decodedPayload.isOK():

View File

@ -0,0 +1,229 @@
import
std/strutils,
confutils, confutils/defs, confutils/std/net,
chronicles, chronos,
libp2p/crypto/crypto,
libp2p/crypto/secp,
nimcrypto/utils,
eth/keys
type
Fleet* = enum
none
prod
test
Chat2Conf* = object
## General node config
logLevel* {.
desc: "Sets the log level."
defaultValue: LogLevel.INFO
name: "log-level" }: LogLevel
nodekey* {.
desc: "P2P node private key as 64 char hex string.",
defaultValue: crypto.PrivateKey.random(Secp256k1, keys.newRng()[]).tryGet()
name: "nodekey" }: crypto.PrivateKey
listenAddress* {.
defaultValue: defaultListenAddress(config)
desc: "Listening address for the LibP2P traffic."
name: "listen-address"}: ValidIpAddress
tcpPort* {.
desc: "TCP listening port."
defaultValue: 60000
name: "tcp-port" }: Port
udpPort* {.
desc: "UDP listening port."
defaultValue: 60000
name: "udp-port" }: Port
portsShift* {.
desc: "Add a shift to all port numbers."
defaultValue: 0
name: "ports-shift" }: uint16
nat* {.
desc: "Specify method to use for determining public address. " &
"Must be one of: any, none, upnp, pmp, extip:<IP>."
defaultValue: "any" }: string
## Persistence config
dbPath* {.
desc: "The database path for peristent storage",
defaultValue: ""
name: "db-path" }: string
persistPeers* {.
desc: "Enable peer persistence: true|false",
defaultValue: false
name: "persist-peers" }: bool
persistMessages* {.
desc: "Enable message persistence: true|false",
defaultValue: false
name: "persist-messages" }: bool
## Relay config
relay* {.
desc: "Enable relay protocol: true|false",
defaultValue: true
name: "relay" }: bool
rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",
defaultValue: false
name: "rln-relay" }: bool
staticnodes* {.
desc: "Peer multiaddr to directly connect with. Argument may be repeated."
name: "staticnode" }: seq[string]
keepAlive* {.
desc: "Enable keep-alive for idle connections: true|false",
defaultValue: false
name: "keep-alive" }: bool
topics* {.
desc: "Default topics to subscribe to (space separated list)."
defaultValue: "/waku/2/default-waku/proto"
name: "topics" .}: string
## Store config
store* {.
desc: "Enable store protocol: true|false",
defaultValue: true
name: "store" }: bool
storenode* {.
desc: "Peer multiaddr to query for storage.",
defaultValue: ""
name: "storenode" }: string
## Filter config
filter* {.
desc: "Enable filter protocol: true|false",
defaultValue: false
name: "filter" }: bool
filternode* {.
desc: "Peer multiaddr to request content filtering of messages.",
defaultValue: ""
name: "filternode" }: string
## Swap config
swap* {.
desc: "Enable swap protocol: true|false",
defaultValue: true
name: "swap" }: bool
## Lightpush config
lightpush* {.
desc: "Enable lightpush protocol: true|false",
defaultValue: false
name: "lightpush" }: bool
## JSON-RPC config
rpc* {.
desc: "Enable Waku JSON-RPC server: true|false",
defaultValue: true
name: "rpc" }: bool
rpcAddress* {.
desc: "Listening address of the JSON-RPC server.",
defaultValue: ValidIpAddress.init("127.0.0.1")
name: "rpc-address" }: ValidIpAddress
rpcPort* {.
desc: "Listening port of the JSON-RPC server.",
defaultValue: 8545
name: "rpc-port" }: uint16
rpcAdmin* {.
desc: "Enable access to JSON-RPC Admin API: true|false",
defaultValue: false
name: "rpc-admin" }: bool
rpcPrivate* {.
desc: "Enable access to JSON-RPC Private API: true|false",
defaultValue: false
name: "rpc-private" }: bool
## Metrics config
metricsServer* {.
desc: "Enable the metrics server: true|false"
defaultValue: false
name: "metrics-server" }: bool
metricsServerAddress* {.
desc: "Listening address of the metrics server."
defaultValue: ValidIpAddress.init("127.0.0.1")
name: "metrics-server-address" }: ValidIpAddress
metricsServerPort* {.
desc: "Listening HTTP port of the metrics server."
defaultValue: 8008
name: "metrics-server-port" }: uint16
metricsLogging* {.
desc: "Enable metrics logging: true|false"
defaultValue: false
name: "metrics-logging" }: bool
## Chat2 configuration
fleet* {.
desc: "Select the fleet to connect to."
defaultValue: Fleet.prod
name: "fleet" }: Fleet
contentTopic* {.
desc: "Content topic for chat messages."
defaultValue: "/waku/2/huilong/proto"
name: "content-topic" }: string
# NOTE: Keys are different in nim-libp2p
proc parseCmdArg*(T: type crypto.PrivateKey, p: TaintedString): T =
try:
let key = SkPrivateKey.init(utils.fromHex(p)).tryGet()
# XXX: Here at the moment
result = crypto.PrivateKey(scheme: Secp256k1, skkey: key)
except CatchableError as e:
raise newException(ConfigurationError, "Invalid private key")
proc completeCmdArg*(T: type crypto.PrivateKey, val: TaintedString): seq[string] =
return @[]
proc parseCmdArg*(T: type ValidIpAddress, p: TaintedString): T =
try:
result = ValidIpAddress.init(p)
except CatchableError as e:
raise newException(ConfigurationError, "Invalid IP address")
proc completeCmdArg*(T: type ValidIpAddress, val: TaintedString): seq[string] =
return @[]
proc parseCmdArg*(T: type Port, p: TaintedString): T =
try:
result = Port(parseInt(p))
except CatchableError as e:
raise newException(ConfigurationError, "Invalid Port number")
proc completeCmdArg*(T: type Port, val: TaintedString): seq[string] =
return @[]
func defaultListenAddress*(conf: Chat2Conf): ValidIpAddress =
# TODO: How should we select between IPv4 and IPv6
# Maybe there should be a config option for this.
(static ValidIpAddress.init("0.0.0.0"))

View File

@ -28,7 +28,6 @@ logScope:
const
DefaultTopic* = chat2.DefaultTopic
DefaultContentTopic* = chat2.DefaultContentTopic
DeduplQSize = 20 # Maximum number of seen messages to keep in deduplication queue
#########
@ -42,6 +41,7 @@ type
running: bool
pollPeriod: chronos.Duration
seen: seq[Hash] #FIFO queue
contentTopic: string
MbMessageHandler* = proc (jsonNode: JsonNode) {.gcsafe.}
@ -61,7 +61,7 @@ proc containsOrAdd(sequence: var seq[Hash], hash: Hash): bool =
return false
proc toWakuMessage(jsonNode: JsonNode): WakuMessage =
proc toWakuMessage(cmb: Chat2MatterBridge, jsonNode: JsonNode): WakuMessage =
# Translates a Matterbridge API JSON response to a Waku v2 message
let msgFields = jsonNode.getFields()
@ -72,11 +72,11 @@ proc toWakuMessage(jsonNode: JsonNode): WakuMessage =
payload: msgFields["text"].getStr().toBytes()).encode()
WakuMessage(payload: chat2pb.buffer,
contentTopic: DefaultContentTopic,
contentTopic: cmb.contentTopic,
version: 0)
proc toChat2(cmb: Chat2MatterBridge, jsonNode: JsonNode) {.async.} =
let msg = jsonNode.toWakuMessage()
let msg = cmb.toWakuMessage(jsonNode)
if cmb.seen.containsOrAdd(msg.payload.hash()):
# This is a duplicate message. Return.
@ -131,7 +131,8 @@ proc new*(T: type Chat2MatterBridge,
# NodeV2 initialisation
nodev2Key: crypto.PrivateKey,
nodev2BindIp: ValidIpAddress, nodev2BindPort: Port,
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port]()): T =
nodev2ExtIp = none[ValidIpAddress](), nodev2ExtPort = none[Port](),
contentTopic: string): T =
# Setup Matterbridge
let
@ -152,7 +153,11 @@ proc new*(T: type Chat2MatterBridge,
nodev2BindIp, nodev2BindPort,
nodev2ExtIp, nodev2ExtPort)
return Chat2MatterBridge(mbClient: mbClient, nodev2: nodev2, running: false, pollPeriod: chronos.seconds(1))
return Chat2MatterBridge(mbClient: mbClient,
nodev2: nodev2,
running: false,
pollPeriod: chronos.seconds(1),
contentTopic: contentTopic)
proc start*(cmb: Chat2MatterBridge) {.async.} =
info "Starting Chat2MatterBridge"
@ -237,7 +242,8 @@ when isMainModule:
mbGateway = conf.mbGateway,
nodev2Key = conf.nodekey,
nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort)
nodev2ExtIp = nodev2ExtIp, nodev2ExtPort = nodev2ExtPort,
contentTopic = conf.contentTopic)
waitFor bridge.start()

View File

@ -122,6 +122,13 @@ type
defaultValue: "gateway1"
name: "mb-gateway" }: string
## Chat2 options
contentTopic* {.
desc: "Content topic to bridge chat messages to."
defaultValue: "/waku/2/huilong/proto"
name: "content-topic" }: string
proc parseCmdArg*(T: type keys.KeyPair, p: TaintedString): T =
try:
let privkey = keys.PrivateKey.fromHex(string(p)).tryGet()