diff --git a/CHANGELOG.md b/CHANGELOG.md index 5fec4d6b1..45f21d93f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,11 @@ # Changelog -## v0.4 +## Next release This release contains the following: ### Features ### Changes +- The [toy-chat application](https://github.com/status-im/nim-waku/blob/master/docs/tutorial/chat2.md) can now perform `lightpush` and request content-filtered messages from remote peers. #### General refactoring #### Docs #### Schema @@ -12,6 +13,7 @@ This release contains the following: - [JSON-RPC Store API](https://rfc.vac.dev/spec/16): Added an optional time-based query to filter historical messages. - [Nim API](https://github.com/status-im/nim-waku/blob/master/docs/api/v2/node.md): Added `resume` method. ### Fixes +- Connections between nodes no longer become unstable due to keep-alive errors if mesh grows large ## 2021-05-11 v0.3 diff --git a/examples/v2/chat2.nim b/examples/v2/chat2.nim index ed83943ec..e8c2b3bb2 100644 --- a/examples/v2/chat2.nim +++ b/examples/v2/chat2.nim @@ -22,6 +22,7 @@ import ../../waku/v2/node/[wakunode2, waku_payload], ../../waku/v2/protocol/waku_message, ../../waku/v2/protocol/waku_store/waku_store, ../../waku/v2/protocol/waku_filter/waku_filter, + ../../waku/v2/protocol/waku_lightpush/waku_lightpush, ../../waku/v2/utils/peers, ../../waku/common/utils/nat, ./config_chat2 @@ -115,6 +116,38 @@ proc showChatPrompt(c: Chat) = stdout.flushFile() c.prompt = true +proc printReceivedMessage(c: Chat, msg: WakuMessage) = + when PayloadV1: + # Use Waku v1 payload encoding/encryption + let + keyInfo = KeyInfo(kind: Symmetric, symKey: c.symKey) + decodedPayload = decodePayload(decoded.get(), keyInfo) + + if decodedPayload.isOK(): + let + pb = Chat2Message.init(decodedPayload.get().payload) + chatLine = if pb.isOk: pb[].toString() + else: string.fromBytes(decodedPayload.get().payload) + echo &"{chatLine}" + c.prompt = false + showChatPrompt(c) + trace "Printing message", topic=DefaultTopic, chatLine, + contentTopic = msg.contentTopic + else: + debug "Invalid encoded WakuMessage payload", + error = decodedPayload.error + else: + # No payload encoding/encryption from Waku + let + pb = Chat2Message.init(msg.payload) + chatLine = if pb.isOk: pb[].toString() + else: string.fromBytes(msg.payload) + echo &"{chatLine}" + c.prompt = false + showChatPrompt(c) + trace "Printing message", topic=DefaultTopic, chatLine, + contentTopic = msg.contentTopic + proc selectRandomNode(fleetStr: string): string = randomize() let @@ -138,6 +171,10 @@ proc publish(c: Chat, line: string) = nick: c.nick, payload: line.toBytes()).encode() + ## @TODO: error handling on failure + proc handler(response: PushResponse) {.gcsafe, closure.} = + trace "lightpush response received", response=response + when PayloadV1: # Use Waku v1 payload encoding/encryption let @@ -147,14 +184,22 @@ proc publish(c: Chat, line: string) = if encodedPayload.isOk(): let message = WakuMessage(payload: encodedPayload.get(), contentTopic: c.contentTopic, version: version) - asyncSpawn c.node.publish(DefaultTopic, message) + if not c.node.wakuLightPush.isNil(): + # Attempt lightpush + asyncSpawn c.node.lightpush(DefaultTopic, message, handler) + else: + asyncSpawn c.node.publish(DefaultTopic, message, handler) else: warn "Payload encoding failed", error = encodedPayload.error else: # No payload encoding/encryption from Waku let message = WakuMessage(payload: chat2pb.buffer, contentTopic: c.contentTopic, version: 0) - asyncSpawn c.node.publish(DefaultTopic, message) + if not c.node.wakuLightPush.isNil(): + # Attempt lightpush + asyncSpawn c.node.lightpush(DefaultTopic, message, handler) + else: + asyncSpawn c.node.publish(DefaultTopic, message) # TODO This should read or be subscribe handler subscribe proc readAndPrint(c: Chat) {.async.} = @@ -204,10 +249,18 @@ proc writeAndPrint(c: Chat) {.async.} = echo "You are now known as " & c.nick elif line.startsWith("/exit"): - await c.node.stop() + if not c.node.wakuFilter.isNil(): + echo "unsubscribing from content filters..." + + await c.node.unsubscribe( + FilterRequest(contentFilters: @[ContentFilter(contentTopic: c.contentTopic)], pubSubTopic: DefaultTopic, subscribe: false) + ) + + echo "quitting..." - echo "quitting..." - quit(QuitSuccess) + await c.node.stop() + + quit(QuitSuccess) else: # XXX connected state problematic if c.started: @@ -222,8 +275,8 @@ proc writeAndPrint(c: Chat) {.async.} = echo getCurrentExceptionMsg() proc readWriteLoop(c: Chat) {.async.} = - asyncCheck c.writeAndPrint() # execute the async function but does not block - asyncCheck c.readAndPrint() + asyncSpawn c.writeAndPrint() # execute the async function but does not block + asyncSpawn c.readAndPrint() proc readInput(wfd: AsyncFD) {.thread.} = ## This procedure performs reading from `stdin` and sends data over @@ -247,10 +300,9 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = await node.start() - if conf.filternode != "": - node.mountRelay(conf.topics.split(" "), rlnRelayEnabled = conf.rlnRelay) - else: - node.mountRelay(@[], rlnRelayEnabled = conf.rlnRelay) + node.mountRelay(conf.topics.split(" "), + rlnRelayEnabled = conf.rlnRelay, + relayMessages = conf.relay) # Indicates if node is capable to relay messages node.mountKeepalive() @@ -315,6 +367,12 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = info "Hit store handler" await node.query(HistoryQuery(contentFilters: @[HistoryContentFilter(contentTopic: chat.contentTopic)]), storeHandler) + + # NOTE Must be mounted after relay + if conf.lightpushnode != "": + mountLightPush(node) + + node.wakuLightPush.setPeer(parsePeerInfo(conf.lightpushnode)) if conf.filternode != "": node.mountFilter() @@ -322,60 +380,29 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = node.wakuFilter.setPeer(parsePeerInfo(conf.filternode)) proc filterHandler(msg: WakuMessage) {.gcsafe.} = - let - pb = Chat2Message.init(msg.payload) - chatLine = if pb.isOk: pb[].toString() - else: string.fromBytes(msg.payload) - echo &"{chatLine}" - info "Hit filter handler" + trace "Hit filter handler", contentTopic=msg.contentTopic + + chat.printReceivedMessage(msg) await node.subscribe( FilterRequest(contentFilters: @[ContentFilter(contentTopic: chat.contentTopic)], pubSubTopic: DefaultTopic, subscribe: true), filterHandler ) - # Subscribe to a topic - # TODO To get end to end sender would require more information in payload - # We could possibly indicate the relayer point with connection somehow probably (?) - proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = - let decoded = WakuMessage.init(data) - if decoded.isOk(): - let msg = decoded.get() - when PayloadV1: - # Use Waku v1 payload encoding/encryption - let - keyInfo = KeyInfo(kind: Symmetric, symKey: chat.symKey) - decodedPayload = decodePayload(decoded.get(), keyInfo) + # Subscribe to a topic, if relay is mounted + if conf.relay: + proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = + trace "Hit subscribe handler", topic - if decodedPayload.isOK(): - let - pb = Chat2Message.init(decodedPayload.get().payload) - chatLine = if pb.isOk: pb[].toString() - else: string.fromBytes(decodedPayload.get().payload) - echo &"{chatLine}" - chat.prompt = false - showChatPrompt(chat) - info "Hit subscribe handler", topic, chatLine, - contentTopic = msg.contentTopic - else: - debug "Invalid encoded WakuMessage payload", - error = decodedPayload.error + let decoded = WakuMessage.init(data) + + if decoded.isOk(): + chat.printReceivedMessage(decoded.get()) else: - # No payload encoding/encryption from Waku - let - pb = Chat2Message.init(msg.payload) - chatLine = if pb.isOk: pb[].toString() - else: string.fromBytes(msg.payload) - echo &"{chatLine}" - chat.prompt = false - showChatPrompt(chat) - info "Hit subscribe handler", topic, chatLine, - contentTopic = msg.contentTopic - else: - trace "Invalid encoded WakuMessage", error = decoded.error + trace "Invalid encoded WakuMessage", error = decoded.error - let topic = cast[Topic](DefaultTopic) - node.subscribe(topic, handler) + let topic = cast[Topic](DefaultTopic) + node.subscribe(topic, handler) await chat.readWriteLoop() @@ -383,7 +410,6 @@ proc processInput(rfd: AsyncFD, rng: ref BrHmacDrbgContext) {.async.} = node.startKeepalive() runForever() - #await allFuturesThrowing(libp2pFuts) proc main() {.async.} = let rng = crypto.newRng() # Singe random number source for the whole application diff --git a/examples/v2/config_chat2.nim b/examples/v2/config_chat2.nim index a80be7041..a21ed58b9 100644 --- a/examples/v2/config_chat2.nim +++ b/examples/v2/config_chat2.nim @@ -132,6 +132,11 @@ type defaultValue: false name: "lightpush" }: bool + lightpushnode* {. + desc: "Peer multiaddr to request lightpush of published messages.", + defaultValue: "" + name: "lightpushnode" }: string + ## JSON-RPC config rpc* {. diff --git a/waku/v2/node/config.nim b/waku/v2/node/config.nim index db58bd70f..6cc7ff0b7 100644 --- a/waku/v2/node/config.nim +++ b/waku/v2/node/config.nim @@ -127,6 +127,11 @@ type defaultValue: false name: "lightpush" }: bool + lightpushnode* {. + desc: "Peer multiaddr to request lightpush of published messages.", + defaultValue: "" + name: "lightpushnode" }: string + ## JSON-RPC config rpc* {. diff --git a/waku/v2/node/wakunode2.nim b/waku/v2/node/wakunode2.nim index e863dea09..bcbf994a6 100644 --- a/waku/v2/node/wakunode2.nim +++ b/waku/v2/node/wakunode2.nim @@ -557,19 +557,26 @@ proc dialPeer*(n: WakuNode, address: string) {.async.} = info "Post peerManager dial" proc setStorePeer*(n: WakuNode, address: string) = - info "dialPeer", address = address + info "Set store peer", address = address let remotePeer = parsePeerInfo(address) n.wakuStore.setPeer(remotePeer) proc setFilterPeer*(n: WakuNode, address: string) = - info "dialPeer", address = address + info "Set filter peer", address = address let remotePeer = parsePeerInfo(address) n.wakuFilter.setPeer(remotePeer) +proc setLightPushPeer*(n: WakuNode, address: string) = + info "Set lightpush peer", address = address + + let remotePeer = parsePeerInfo(address) + + n.wakuLightPush.setPeer(remotePeer) + proc connectToNodes*(n: WakuNode, nodes: seq[string]) {.async.} = for nodeId in nodes: info "connectToNodes", node = nodeId @@ -747,8 +754,11 @@ when isMainModule: waitFor connectToNodes(node, conf.staticnodes) # NOTE Must be mounted after relay - if conf.lightpush: + if (conf.lightpushnode != "") or (conf.lightpush): mountLightPush(node) + + if conf.lightpushnode != "": + setLightPushPeer(node, conf.lightpushnode) # Filter setup. NOTE Must be mounted after relay if (conf.filternode != "") or (conf.filter):