Filter and lightpush showcase (#593)

* Filter and lightpush showcase

* Fix log error
This commit is contained in:
Hanno Cornelius 2021-06-04 12:02:47 +02:00 committed by GitHub
parent 2568af3b98
commit b615c6aa8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 109 additions and 61 deletions

View File

@ -1,10 +1,11 @@
# Changelog
## v0.4
## Next release
This release contains the following:
### Features
### Changes
- The [toy-chat application](https://github.com/status-im/nim-waku/blob/master/docs/tutorial/chat2.md) can now perform `lightpush` and request content-filtered messages from remote peers.
#### General refactoring
#### Docs
#### Schema
@ -12,6 +13,7 @@ This release contains the following:
- [JSON-RPC Store API](https://rfc.vac.dev/spec/16): Added an optional time-based query to filter historical messages.
- [Nim API](https://github.com/status-im/nim-waku/blob/master/docs/api/v2/node.md): Added `resume` method.
### Fixes
- Connections between nodes no longer become unstable due to keep-alive errors if mesh grows large
## 2021-05-11 v0.3

View File

@ -22,6 +22,7 @@ import ../../waku/v2/node/[wakunode2, waku_payload],
../../waku/v2/protocol/waku_message,
../../waku/v2/protocol/waku_store/waku_store,
../../waku/v2/protocol/waku_filter/waku_filter,
../../waku/v2/protocol/waku_lightpush/waku_lightpush,
../../waku/v2/utils/peers,
../../waku/common/utils/nat,
./config_chat2
@ -115,6 +116,38 @@ proc showChatPrompt(c: Chat) =
stdout.flushFile()
c.prompt = true
proc printReceivedMessage(c: Chat, msg: WakuMessage) =
when PayloadV1:
# Use Waku v1 payload encoding/encryption
let
keyInfo = KeyInfo(kind: Symmetric, symKey: c.symKey)
decodedPayload = decodePayload(decoded.get(), keyInfo)
if decodedPayload.isOK():
let
pb = Chat2Message.init(decodedPayload.get().payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(decodedPayload.get().payload)
echo &"{chatLine}"
c.prompt = false
showChatPrompt(c)
trace "Printing message", topic=DefaultTopic, chatLine,
contentTopic = msg.contentTopic
else:
debug "Invalid encoded WakuMessage payload",
error = decodedPayload.error
else:
# No payload encoding/encryption from Waku
let
pb = Chat2Message.init(msg.payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(msg.payload)
echo &"{chatLine}"
c.prompt = false
showChatPrompt(c)
trace "Printing message", topic=DefaultTopic, chatLine,
contentTopic = msg.contentTopic
proc selectRandomNode(fleetStr: string): string =
randomize()
let
@ -138,6 +171,10 @@ proc publish(c: Chat, line: string) =
nick: c.nick,
payload: line.toBytes()).encode()
## @TODO: error handling on failure
proc handler(response: PushResponse) {.gcsafe, closure.} =
trace "lightpush response received", response=response
when PayloadV1:
# Use Waku v1 payload encoding/encryption
let
@ -147,14 +184,22 @@ proc publish(c: Chat, line: string) =
if encodedPayload.isOk():
let message = WakuMessage(payload: encodedPayload.get(),
contentTopic: c.contentTopic, version: version)
asyncSpawn c.node.publish(DefaultTopic, message)
if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpush(DefaultTopic, message, handler)
else:
asyncSpawn c.node.publish(DefaultTopic, message, handler)
else:
warn "Payload encoding failed", error = encodedPayload.error
else:
# No payload encoding/encryption from Waku
let message = WakuMessage(payload: chat2pb.buffer,
contentTopic: c.contentTopic, version: 0)
asyncSpawn c.node.publish(DefaultTopic, message)
if not c.node.wakuLightPush.isNil():
# Attempt lightpush
asyncSpawn c.node.lightpush(DefaultTopic, message, handler)
else:
asyncSpawn c.node.publish(DefaultTopic, message)
# TODO This should read or be subscribe handler subscribe
proc readAndPrint(c: Chat) {.async.} =
@ -204,10 +249,18 @@ proc writeAndPrint(c: Chat) {.async.} =
echo "You are now known as " & c.nick
elif line.startsWith("/exit"):
await c.node.stop()
if not c.node.wakuFilter.isNil():
echo "unsubscribing from content filters..."
await c.node.unsubscribe(
FilterRequest(contentFilters: @[ContentFilter(contentTopic: c.contentTopic)], pubSubTopic: DefaultTopic, subscribe: false)
)
echo "quitting..."
echo "quitting..."
quit(QuitSuccess)
await c.node.stop()
quit(QuitSuccess)
else:
# XXX connected state problematic
if c.started:
@ -222,8 +275,8 @@ proc writeAndPrint(c: Chat) {.async.} =
echo getCurrentExceptionMsg()
proc readWriteLoop(c: Chat) {.async.} =
asyncCheck c.writeAndPrint() # execute the async function but does not block
asyncCheck c.readAndPrint()
asyncSpawn c.writeAndPrint() # execute the async function but does not block
asyncSpawn c.readAndPrint()
proc readInput(wfd: AsyncFD) {.thread.} =
## This procedure performs reading from `stdin` and sends data over
@ -247,10 +300,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
await node.start()
if conf.filternode != "":
node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay)
else:
node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay)
node.mountRelay(conf.topics.split(" "),
rlnRelayEnabled = conf.rlnRelay,
relayMessages = conf.relay) # Indicates if node is capable to relay messages
node.mountKeepalive()
@ -315,6 +367,12 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
info "Hit store handler"
await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]), storeHandler)
# NOTE Must be mounted after relay
if conf.lightpushnode != "":
mountLightPush(node)
node.wakuLightPush.setPeer(parsePeerInfo(conf.lightpushnode))
if conf.filternode != "":
node.mountFilter()
@ -322,60 +380,29 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
node.wakuFilter.setPeer(parsePeerInfo(conf.filternode))
proc filterHandler(msg: WakuMessage) {.gcsafe.} =
let
pb = Chat2Message.init(msg.payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(msg.payload)
echo &"{chatLine}"
info "Hit filter handler"
trace "Hit filter handler", contentTopic=msg.contentTopic
chat.printReceivedMessage(msg)
await node.subscribe(
FilterRequest(contentFilters: @[ContentFilter(contentTopic: chat.contentTopic)], pubSubTopic: DefaultTopic, subscribe: true),
filterHandler
)
# Subscribe to a topic
# TODO To get end to end sender would require more information in payload
# We could possibly indicate the relayer point with connection somehow probably (?)
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
let decoded = WakuMessage.init(data)
if decoded.isOk():
let msg = decoded.get()
when PayloadV1:
# Use Waku v1 payload encoding/encryption
let
keyInfo = KeyInfo(kind: Symmetric, symKey: chat.symKey)
decodedPayload = decodePayload(decoded.get(), keyInfo)
# Subscribe to a topic, if relay is mounted
if conf.relay:
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
trace "Hit subscribe handler", topic
if decodedPayload.isOK():
let
pb = Chat2Message.init(decodedPayload.get().payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(decodedPayload.get().payload)
echo &"{chatLine}"
chat.prompt = false
showChatPrompt(chat)
info "Hit subscribe handler", topic, chatLine,
contentTopic = msg.contentTopic
else:
debug "Invalid encoded WakuMessage payload",
error = decodedPayload.error
let decoded = WakuMessage.init(data)
if decoded.isOk():
chat.printReceivedMessage(decoded.get())
else:
# No payload encoding/encryption from Waku
let
pb = Chat2Message.init(msg.payload)
chatLine = if pb.isOk: pb[].toString()
else: string.fromBytes(msg.payload)
echo &"{chatLine}"
chat.prompt = false
showChatPrompt(chat)
info "Hit subscribe handler", topic, chatLine,
contentTopic = msg.contentTopic
else:
trace "Invalid encoded WakuMessage", error = decoded.error
trace "Invalid encoded WakuMessage", error = decoded.error
let topic = cast[Topic](DefaultTopic)
node.subscribe(topic, handler)
let topic = cast[Topic](DefaultTopic)
node.subscribe(topic, handler)
await chat.readWriteLoop()
@ -383,7 +410,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} =
node.startKeepalive()
runForever()
#await allFuturesThrowing(libp2pFuts)
proc main() {.async.} =
let rng = crypto.newRng() # Singe random number source for the whole application

View File

@ -132,6 +132,11 @@ type
defaultValue: false
name: "lightpush" }: bool
lightpushnode* {.
desc: "Peer multiaddr to request lightpush of published messages.",
defaultValue: ""
name: "lightpushnode" }: string
## JSON-RPC config
rpc* {.

View File

@ -127,6 +127,11 @@ type
defaultValue: false
name: "lightpush" }: bool
lightpushnode* {.
desc: "Peer multiaddr to request lightpush of published messages.",
defaultValue: ""
name: "lightpushnode" }: string
## JSON-RPC config
rpc* {.

View File

@ -557,19 +557,26 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} =
info "Post peerManager dial"
proc setStorePeer*(n: WakuNode, address: string) =
info "dialPeer", address = address
info "Set store peer", address = address
let remotePeer = parsePeerInfo(address)
n.wakuStore.setPeer(remotePeer)
proc setFilterPeer*(n: WakuNode, address: string) =
info "dialPeer", address = address
info "Set filter peer", address = address
let remotePeer = parsePeerInfo(address)
n.wakuFilter.setPeer(remotePeer)
proc setLightPushPeer*(n: WakuNode, address: string) =
info "Set lightpush peer", address = address
let remotePeer = parsePeerInfo(address)
n.wakuLightPush.setPeer(remotePeer)
proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} =
for nodeId in nodes:
info "connectToNodes", node = nodeId
@ -747,8 +754,11 @@ when isMainModule:
waitFor connectToNodes(node, conf.staticnodes)
# NOTE Must be mounted after relay
if conf.lightpush:
if (conf.lightpushnode != "") or (conf.lightpush):
mountLightPush(node)
if conf.lightpushnode != "":
setLightPushPeer(node, conf.lightpushnode)
# Filter setup. NOTE Must be mounted after relay
if (conf.filternode != "") or (conf.filter):