deploy: b615c6aa8bb90062ac157a49b77cd838e8d8a925

This commit is contained in:
jm-clius 2021-06-04 10:23:43 +00:00
parent 431ef4b63d
commit 6c2842e50a
6 changed files with 110 additions and 62 deletions

View File

@ -1,10 +1,11 @@
# Changelog # Changelog
## v0.4 ## Next release
This release contains the following: This release contains the following:
### Features ### Features
### Changes ### Changes
- The [toy-chat application](https://github.com/status-im/nim-waku/blob/master/docs/tutorial/chat2.md) can now perform `lightpush` and request content-filtered messages from remote peers.
#### General refactoring #### General refactoring
#### Docs #### Docs
#### Schema #### Schema
@ -12,6 +13,7 @@ This release contains the following:
- [JSON-RPC Store API](https://rfc.vac.dev/spec/16): Added an optional time-based query to filter historical messages. - [JSON-RPC Store API](https://rfc.vac.dev/spec/16): Added an optional time-based query to filter historical messages.
- [Nim API](https://github.com/status-im/nim-waku/blob/master/docs/api/v2/node.md): Added `resume` method. - [Nim API](https://github.com/status-im/nim-waku/blob/master/docs/api/v2/node.md): Added `resume` method.
### Fixes ### Fixes
- Connections between nodes no longer become unstable due to keep-alive errors if mesh grows large
## 2021-05-11 v0.3 ## 2021-05-11 v0.3

View File

@ -22,6 +22,7 @@ import ../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter, ../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/utils/peers, ../../waku/v2/utils/peers,
../../waku/common/utils/nat, ../../waku/common/utils/nat,
./config_chat2 ./config_chat2
@ -115,6 +116,38 @@ proc showChatPrompt(c: Chat) =
stdout.flushFile() stdout.flushFile()
c.prompt = true c.prompt = true
proc printReceivedMessage(c: Chat, msg: WakuMessage) =
when PayloadV1:
# Use Waku v1 payload encoding/encryption
let
keyInfo = KeyInfo(kind: Symmetric, symKey: c.symKey)
decodedPayload = decodePayload(decoded.get(), keyInfo)
if decodedPayload.isOK():
let
pb = Chat2Message.init(decodedPayload.get().payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(decodedPayload.get().payload)
echo &"{chatLine}"
c.prompt = false
showChatPrompt(c)
trace "Printing message", topic=DefaultTopic, chatLine,
contentTopic = msg.contentTopic
else:
debug "Invalid encoded WakuMessage payload",
error = decodedPayload.error
else:
# No payload encoding/encryption from Waku
let
pb = Chat2Message.init(msg.payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(msg.payload)
echo &"{chatLine}"
c.prompt = false
showChatPrompt(c)
trace "Printing message", topic=DefaultTopic, chatLine,
contentTopic = msg.contentTopic
proc selectRandomNode(fleetStr: string): string = proc selectRandomNode(fleetStr: string): string =
randomize() randomize()
let let
@ -138,6 +171,10 @@ proc publish(c: Chat, line: string) =
nick: c.nick, nick: c.nick,
payload: line.toBytes()).encode() payload: line.toBytes()).encode()
## @TODO: error handling on failure
proc handler(response: PushResponse) {.gcsafe, closure.} =
trace "lightpush response received", response=response
when PayloadV1: when PayloadV1:
# Use Waku v1 payload encoding/encryption # Use Waku v1 payload encoding/encryption
let let
@ -147,14 +184,22 @@ proc publish(c: Chat, line: string) =
if encodedPayload.isOk(): if encodedPayload.isOk():
let message = WakuMessage(payload: encodedPayload.get(), let message = WakuMessage(payload: encodedPayload.get(),
contentTopic: c.contentTopic, version: version) contentTopic: c.contentTopic, version: version)
asyncSpawn c.node.publish(DefaultTopic, message) if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpush(DefaultTopic, message, handler)
else:
asyncSpawn c.node.publish(DefaultTopic, message, handler)
else: else:
warn "Payload encoding failed", error = encodedPayload.error warn "Payload encoding failed", error = encodedPayload.error
else: else:
# No payload encoding/encryption from Waku # No payload encoding/encryption from Waku
let message = WakuMessage(payload: chat2pb.buffer, let message = WakuMessage(payload: chat2pb.buffer,
contentTopic: c.contentTopic, version: 0) contentTopic: c.contentTopic, version: 0)
asyncSpawn c.node.publish(DefaultTopic, message) if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpush(DefaultTopic, message, handler)
else:
asyncSpawn c.node.publish(DefaultTopic, message)
# TODO This should read or be subscribe handler subscribe # TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} = proc readAndPrint(c: Chat) {.async.} =
@ -204,10 +249,18 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "You are now known as " & c.nick echo "You are now known as " & c.nick
elif line.startsWith("/exit"): elif line.startsWith("/exit"):
await c.node.stop() if not c.node.wakuFilter.isNil():
echo "unsubscribing from content filters..."
echo "quitting..." await c.node.unsubscribe(
quit(QuitSuccess) FilterRequest(contentFilters: @[ContentFilter(contentTopic: c.contentTopic)], pubSubTopic: DefaultTopic, subscribe: false)
)
echo "quitting..."
await c.node.stop()
quit(QuitSuccess)
else: else:
# XXX connected state problematic # XXX connected state problematic
if c.started: if c.started:
@ -222,8 +275,8 @@ proc writeAndPrint(c: Chat) {.async.} =
echo getCurrentExceptionMsg() echo getCurrentExceptionMsg()
proc readWriteLoop(c: Chat) {.async.} = proc readWriteLoop(c: Chat) {.async.} =
asyncCheck c.writeAndPrint() # execute the async function but does not block asyncSpawn c.writeAndPrint() # execute the async function but does not block
asyncCheck c.readAndPrint() asyncSpawn c.readAndPrint()
proc readInput(wfd: AsyncFD) {.thread.} = proc readInput(wfd: AsyncFD) {.thread.} =
## This procedure performs reading from `stdin` and sends data over ## This procedure performs reading from `stdin` and sends data over
@ -247,10 +300,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await node.start() await node.start()
if conf.filternode != "": node.mountRelay(conf.topics.split(" "),
node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay) rlnRelayEnabled = conf.rlnRelay,
else: relayMessages = conf.relay) # Indicates if node is capable to relay messages
node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay)
node.mountKeepalive() node.mountKeepalive()
@ -316,66 +368,41 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]), storeHandler) await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]), storeHandler)
# NOTE Must be mounted after relay
if conf.lightpushnode != "":
mountLightPush(node)
node.wakuLightPush.setPeer(parsePeerInfo(conf.lightpushnode))
if conf.filternode != "": if conf.filternode != "":
node.mountFilter() node.mountFilter()
node.wakuFilter.setPeer(parsePeerInfo(conf.filternode)) node.wakuFilter.setPeer(parsePeerInfo(conf.filternode))
proc filterHandler(msg: WakuMessage) {.gcsafe.} = proc filterHandler(msg: WakuMessage) {.gcsafe.} =
let trace "Hit filter handler", contentTopic=msg.contentTopic
pb = Chat2Message.init(msg.payload)
chatLine = if pb.isOk: pb[].toString() chat.printReceivedMessage(msg)
else: string.fromBytes(msg.payload)
echo &"{chatLine}"
info "Hit filter handler"
await node.subscribe( await node.subscribe(
FilterRequest(contentFilters: @[ContentFilter(contentTopic: chat.contentTopic)], pubSubTopic: DefaultTopic, subscribe: true), FilterRequest(contentFilters: @[ContentFilter(contentTopic: chat.contentTopic)], pubSubTopic: DefaultTopic, subscribe: true),
filterHandler filterHandler
) )
# Subscribe to a topic # Subscribe to a topic, if relay is mounted
# TODO To get end to end sender would require more information in payload if conf.relay:
# We could possibly indicate the relayer point with connection somehow probably (?) proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = trace "Hit subscribe handler", topic
let decoded = WakuMessage.init(data)
if decoded.isOk():
let msg = decoded.get()
when PayloadV1:
# Use Waku v1 payload encoding/encryption
let
keyInfo = KeyInfo(kind: Symmetric, symKey: chat.symKey)
decodedPayload = decodePayload(decoded.get(), keyInfo)
if decodedPayload.isOK(): let decoded = WakuMessage.init(data)
let
pb = Chat2Message.init(decodedPayload.get().payload) if decoded.isOk():
chatLine = if pb.isOk: pb[].toString() chat.printReceivedMessage(decoded.get())
else: string.fromBytes(decodedPayload.get().payload)
echo &"{chatLine}"
chat.prompt = false
showChatPrompt(chat)
info "Hit subscribe handler", topic, chatLine,
contentTopic = msg.contentTopic
else:
debug "Invalid encoded WakuMessage payload",
error = decodedPayload.error
else: else:
# No payload encoding/encryption from Waku trace "Invalid encoded WakuMessage", error = decoded.error
let
pb = Chat2Message.init(msg.payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(msg.payload)
echo &"{chatLine}"
chat.prompt = false
showChatPrompt(chat)
info "Hit subscribe handler", topic, chatLine,
contentTopic = msg.contentTopic
else:
trace "Invalid encoded WakuMessage", error = decoded.error
let topic = cast[Topic](DefaultTopic) let topic = cast[Topic](DefaultTopic)
node.subscribe(topic, handler) node.subscribe(topic, handler)
await chat.readWriteLoop() await chat.readWriteLoop()
@ -383,7 +410,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
node.startKeepalive() node.startKeepalive()
runForever() runForever()
#await allFuturesThrowing(libp2pFuts)
proc main() {.async.} = proc main() {.async.} =
let rng = crypto.newRng() # Singe random number source for the whole application let rng = crypto.newRng() # Singe random number source for the whole application

View File

@ -132,6 +132,11 @@ type
defaultValue: false defaultValue: false
name: "lightpush" }: bool name: "lightpush" }: bool
lightpushnode* {.
desc: "Peer multiaddr to request lightpush of published messages.",
defaultValue: ""
name: "lightpushnode" }: string
## JSON-RPC config ## JSON-RPC config
rpc* {. rpc* {.

View File

@ -2,7 +2,7 @@
# libtool - Provide generalized library-building support services. # libtool - Provide generalized library-building support services.
# Generated automatically by config.status (libbacktrace) version-unused # Generated automatically by config.status (libbacktrace) version-unused
# Libtool was configured on host fv-az278-786: # Libtool was configured on host fv-az173-757:
# NOTE: Changes made to this file will be lost: look at ltmain.sh. # NOTE: Changes made to this file will be lost: look at ltmain.sh.
# #
# Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005, # Copyright (C) 1996, 1997, 1998, 1999, 2000, 2001, 2003, 2004, 2005,

View File

@ -127,6 +127,11 @@ type
defaultValue: false defaultValue: false
name: "lightpush" }: bool name: "lightpush" }: bool
lightpushnode* {.
desc: "Peer multiaddr to request lightpush of published messages.",
defaultValue: ""
name: "lightpushnode" }: string
## JSON-RPC config ## JSON-RPC config
rpc* {. rpc* {.

View File

@ -557,19 +557,26 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "Post peerManager dial" info "Post peerManager dial"
proc setStorePeer*(n: WakuNode, address: string) = proc setStorePeer*(n: WakuNode, address: string) =
info "dialPeer", address = address info "Set store peer", address = address
let remotePeer = parsePeerInfo(address) let remotePeer = parsePeerInfo(address)
n.wakuStore.setPeer(remotePeer) n.wakuStore.setPeer(remotePeer)
proc setFilterPeer*(n: WakuNode, address: string) = proc setFilterPeer*(n: WakuNode, address: string) =
info "dialPeer", address = address info "Set filter peer", address = address
let remotePeer = parsePeerInfo(address) let remotePeer = parsePeerInfo(address)
n.wakuFilter.setPeer(remotePeer) n.wakuFilter.setPeer(remotePeer)
proc setLightPushPeer*(n: WakuNode, address: string) =
info "Set lightpush peer", address = address
let remotePeer = parsePeerInfo(address)
n.wakuLightPush.setPeer(remotePeer)
proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} = proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
for nodeId in nodes: for nodeId in nodes:
info "connectToNodes", node = nodeId info "connectToNodes", node = nodeId
@ -747,9 +754,12 @@ when isMainModule:
waitFor connectToNodes(node, conf.staticnodes) waitFor connectToNodes(node, conf.staticnodes)
# NOTE Must be mounted after relay # NOTE Must be mounted after relay
if conf.lightpush: if (conf.lightpushnode != "") or (conf.lightpush):
mountLightPush(node) mountLightPush(node)
if conf.lightpushnode != "":
setLightPushPeer(node, conf.lightpushnode)
# Filter setup. NOTE Must be mounted after relay # Filter setup. NOTE Must be mounted after relay
if (conf.filternode != "") or (conf.filter): if (conf.filternode != "") or (conf.filter):
mountFilter(node) mountFilter(node)