Add support for WakuMessage and merge publish function (#122)

* Add WakuMessage type

* Add WakuMessage encoding and decoding

Also clean up waku_types module and imports

* Clean up waku_relay module

Imports, remove old text test, make fields and functions public, format.

* Publish WakuMessage

Also fix type mismatch in RPC

* Make publish work in examples, node and protocol

* Parse protobuf content in examples

* Update docs

* Update waku/node/v2/waku_types.nim

* Fix compilation error and disable out of date waku test

Co-authored-by: Kim De Mey <kim.demey@gmail.com>
This commit is contained in:
Oskar Thorén 2020-09-01 23:20:38 +08:00 committed by GitHub
parent fd2bb36b15
commit f828736e1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 84 additions and 76 deletions

View File

@ -46,19 +46,12 @@ method unsubscribe*(w: WakuNode, contentFilter: ContentFilter)
## Status: Not yet implemented.
## TODO Implement.
method publish*(w: WakuNode, topic: Topic, message: Message)
## Publish a `Message` to a PubSub topic.
method publish*(w: WakuNode, topic: Topic, message: WakuMessage)
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
##
## Status: Partially implemented.
## TODO WakuMessage OR seq[byte]. NOT PubSub Message.
method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message)
## Publish a `Message` to a PubSub topic with a specific content filter.
## Currently this means a `contentTopic`.
##
## Status: Not yet implemented.
## TODO Implement as wrapper around `waku_relay` and `publish`.
## TODO WakuMessage. Ensure content filter is in it.
## Status: Implemented.
method query*(w: WakuNode, query: HistoryQuery): HistoryResponse
## Queries for historical messages.

View File

@ -11,6 +11,9 @@ import
../../waku/node/v2/[config, wakunode2, waku_types],
../../waku/node/common
type
Topic* = waku_types.Topic
# Node operations happens asynchronously
proc runBackground() {.async.} =
let
@ -24,13 +27,16 @@ proc runBackground() {.async.} =
await node.start()
# Subscribe to a topic
let topic = "foobar"
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
info "Hit subscribe handler", topic=topic, data=data, decoded=cast[string](data)
let topic = cast[Topic]("foobar")
proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} =
let message = WakuMessage.init(data).value
let payload = cast[string](message.payload)
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
node.subscribe(topic, handler)
# Publish to a topic
let message = cast[seq[byte]]("hello world")
let payload = cast[seq[byte]]("hello world")
let message = WakuMessage(payload: payload, contentTopic: "foo")
node.publish(topic, message)
# TODO Await with try/except here

View File

@ -1,6 +1,7 @@
import
# Waku v2 tests
./v2/test_waku,
# TODO: enable this when it is altered into a proper waku relay test
# ./v2/test_waku,
./v2/test_wakunode,
./v2/test_waku_store,
./v2/test_waku_filter

View File

@ -20,11 +20,10 @@ procSuite "WakuNode":
await node.start()
let
topic = "foobar"
message = ("hello world").toBytes
node.publish(topic, ContentFilter(contentTopic: topic), message)
topic = "foo"
contentTopic = "foobar"
wakuMessage = WakuMessage(payload: ("hello world").toBytes,
contentTopic: contentTopic)
let response = node.query(HistoryQuery(topics: @[topic]))
check:
response.messages.len == 1
response.messages[0] == message
node.publish(topic, wakuMessage)
# TODO: Add actual subscribe part with receival of message

View File

@ -22,10 +22,11 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
result = WakuRelayCodec
# TODO: Implement symkey etc logic
rpcsrv.rpc("waku_publish") do(topic: string, message: seq[byte]) -> bool:
rpcsrv.rpc("waku_publish") do(topic: string, payload: seq[byte]) -> bool:
# XXX Why is casting necessary here but not in Nim node API?
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
# XXX also future return type
var message = WakuMessage(payload: payload, contentTopic: "foo")
discard wakuRelay.publish(topic, message)
return true
#if not result:

View File

@ -1,13 +1,13 @@
## Core Waku data types are defined here to avoid recursive dependencies.
##
## TODO Move more common data types here
import
chronos,
libp2p/multiaddress,
libp2p/crypto/crypto,
libp2p/peerinfo,
standard_setup
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
libp2p/protobuf/minprotobuf
# Core Waku data types are defined here to avoid recursive dependencies.
#
# TODO Move more common data types here
# Common data types -----------------------------------------------------------
type
Topic* = string
@ -16,7 +16,28 @@ type
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj
switch*: Switch
# XXX Unclear if we need this
peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]]
messages*: seq[(Topic, Message)]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
messages*: seq[(Topic, WakuMessage)]
WakuMessage* = object
payload*: seq[byte]
contentTopic*: string
# Encoding and decoding -------------------------------------------------------
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =
var msg = WakuMessage()
let pb = initProtoBuffer(buffer)
discard ? pb.getField(1, msg.payload)
discard ? pb.getField(2, msg.contentTopic)
ok(msg)
proc encode*(message: WakuMessage): ProtoBuffer =
result = initProtoBuffer()
result.write(1, message.payload)
result.write(2, message.contentTopic)

View File

@ -19,6 +19,7 @@ type
PublicKey* = crypto.PublicKey
PrivateKey* = crypto.PrivateKey
# TODO Get rid of this and use waku_types one
Topic* = waku_types.Topic
Message* = seq[byte]
ContentFilter* = object
@ -176,30 +177,24 @@ proc unsubscribe*(w: WakuNode, contentFilter: ContentFilter) =
## Status: Not yet implemented.
## TODO Implement.
proc publish*(w: WakuNode, topic: Topic, message: Message) =
## Publish a `Message` to a PubSub topic.
proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a
## `contentTopic` field for light node functionality. This field may be also
## be omitted.
##
## Status: Partially implemented.
## Status: Implemented.
##
## TODO WakuMessage OR seq[byte]. NOT PubSub Message.
let wakuSub = w.switch.pubSub.get()
# TODO Basic getter function for relay
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
# XXX Unclear what the purpose of this is
# Commenting out as it is later expected to be Message type, not WakuMessage
#node.messages.insert((topic, message))
# XXX Consider awaiting here
discard wakuSub.publish(topic, message)
proc publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) =
## Publish a `Message` to a PubSub topic with a specific content filter.
## Currently this means a `contentTopic`.
##
## Status: Not yet implemented.
## TODO Implement as wrapper around `waku_relay` and `publish`.
## TODO WakuMessage. Ensure content filter is in it.
w.messages.insert((contentFilter.contentTopic, message))
let wakuSub = w.switch.pubSub.get()
# XXX Consider awaiting here
discard wakuSub.publish(topic, message)
discard wakuRelay.publish(topic, message)
proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
## Queries for historical messages.
@ -212,7 +207,8 @@ proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse =
if msg[0] notin query.topics:
continue
result.messages.insert(msg[1])
# XXX Unclear how this should be hooked up, Message or WakuMessage?
# result.messages.insert(msg[1])
when isMainModule:
let

View File

@ -6,12 +6,10 @@
import
std/[strutils, tables],
chronos, chronicles, metrics,
libp2p/protocols/pubsub/pubsub,
libp2p/protocols/pubsub/pubsubpeer,
libp2p/protocols/pubsub/floodsub,
libp2p/protocols/pubsub/gossipsub,
libp2p/protocols/pubsub/rpc/[messages],
libp2p/protocols/pubsub/[pubsub, pubsubpeer, floodsub, gossipsub],
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection,
../../node/v2/waku_types,
./filter
declarePublicGauge total_messages, "number of messages received"
@ -23,13 +21,10 @@ const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2"
type
WakuRelay* = ref object of GossipSub
# XXX: just playing
text*: string
gossipEnabled*: bool
filters*: Filters
filters: Filters
method init(w: WakuRelay) =
method init*(w: WakuRelay) =
debug "init"
proc handler(conn: Connection, proto: string) {.async.} =
## main protocol handler that gets triggered on every
@ -47,8 +42,6 @@ method init(w: WakuRelay) =
method initPubSub*(w: WakuRelay) =
debug "initWakuRelay"
w.text = "Foobar"
debug "w.text", text = w.text
# Not using GossipSub
w.gossipEnabled = false
@ -64,16 +57,11 @@ method subscribe*(w: WakuRelay,
topic: string,
handler: TopicHandler) {.async.} =
debug "subscribe", topic=topic
# XXX: Pubsub really
# XXX: This is what is called, I think
if w.gossipEnabled:
await procCall GossipSub(w).subscribe(topic, handler)
else:
await procCall FloodSub(w).subscribe(topic, handler)
# Subscribing a peer to a specified topic
method subscribeTopic*(w: WakuRelay,
topic: string,
subscribe: bool,
@ -117,8 +105,11 @@ method rpcHandler*(w: WakuRelay,
method publish*(w: WakuRelay,
topic: string,
data: seq[byte]): Future[int] {.async.} =
debug "publish", topic=topic
message: WakuMessage
): Future[int] {.async.} =
debug "publish", topic=topic, contentTopic=message.contentTopic
let data = message.encode().buffer
if w.gossipEnabled:
return await procCall GossipSub(w).publish(topic, data)
@ -133,7 +124,7 @@ method unsubscribe*(w: WakuRelay,
else:
await procCall FloodSub(w).unsubscribe(topics)
# GossipSub specific methods
# GossipSub specific methods --------------------------------------------------
method start*(w: WakuRelay) {.async.} =
debug "start"
if w.gossipEnabled: