diff --git a/docs/api/v2/node.md b/docs/api/v2/node.md index a57ae5ce3..b36ce4861 100644 --- a/docs/api/v2/node.md +++ b/docs/api/v2/node.md @@ -46,19 +46,12 @@ method unsubscribe*(w: WakuNode, contentFilter: ContentFilter) ## Status: Not yet implemented. ## TODO Implement. -method publish*(w: WakuNode, topic: Topic, message: Message) - ## Publish a `Message` to a PubSub topic. +method publish*(w: WakuNode, topic: Topic, message: WakuMessage) + ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a + ## `contentTopic` field for light node functionality. This field may be also + ## be omitted. ## - ## Status: Partially implemented. - ## TODO WakuMessage OR seq[byte]. NOT PubSub Message. - -method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) - ## Publish a `Message` to a PubSub topic with a specific content filter. - ## Currently this means a `contentTopic`. - ## - ## Status: Not yet implemented. - ## TODO Implement as wrapper around `waku_relay` and `publish`. - ## TODO WakuMessage. Ensure content filter is in it. + ## Status: Implemented. method query*(w: WakuNode, query: HistoryQuery): HistoryResponse ## Queries for historical messages. diff --git a/examples/v2/basic2.nim b/examples/v2/basic2.nim index 676eccb56..4cb1ce2a5 100644 --- a/examples/v2/basic2.nim +++ b/examples/v2/basic2.nim @@ -11,6 +11,9 @@ import ../../waku/node/v2/[config, wakunode2, waku_types], ../../waku/node/common +type + Topic* = waku_types.Topic + # Node operations happens asynchronously proc runBackground() {.async.} = let @@ -24,13 +27,16 @@ proc runBackground() {.async.} = await node.start() # Subscribe to a topic - let topic = "foobar" - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - info "Hit subscribe handler", topic=topic, data=data, decoded=cast[string](data) + let topic = cast[Topic]("foobar") + proc handler(topic: Topic, data: seq[byte]) {.async, gcsafe.} = + let message = WakuMessage.init(data).value + let payload = cast[string](message.payload) + info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic node.subscribe(topic, handler) # Publish to a topic - let message = cast[seq[byte]]("hello world") + let payload = cast[seq[byte]]("hello world") + let message = WakuMessage(payload: payload, contentTopic: "foo") node.publish(topic, message) # TODO Await with try/except here diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index 0044effa9..c25cd2a6a 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -1,6 +1,7 @@ import # Waku v2 tests - ./v2/test_waku, + # TODO: enable this when it is altered into a proper waku relay test + # ./v2/test_waku, ./v2/test_wakunode, ./v2/test_waku_store, ./v2/test_waku_filter diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index fdf45e307..ef6258cf7 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -20,11 +20,10 @@ procSuite "WakuNode": await node.start() let - topic = "foobar" - message = ("hello world").toBytes - node.publish(topic, ContentFilter(contentTopic: topic), message) + topic = "foo" + contentTopic = "foobar" + wakuMessage = WakuMessage(payload: ("hello world").toBytes, + contentTopic: contentTopic) - let response = node.query(HistoryQuery(topics: @[topic])) - check: - response.messages.len == 1 - response.messages[0] == message + node.publish(topic, wakuMessage) + # TODO: Add actual subscribe part with receival of message diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index d37e12c9c..55b15e6be 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -22,10 +22,11 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = result = WakuRelayCodec # TODO: Implement symkey etc logic - rpcsrv.rpc("waku_publish") do(topic: string, message: seq[byte]) -> bool: + rpcsrv.rpc("waku_publish") do(topic: string, payload: seq[byte]) -> bool: # XXX Why is casting necessary here but not in Nim node API? let wakuRelay = cast[WakuRelay](node.switch.pubSub.get()) # XXX also future return type + var message = WakuMessage(payload: payload, contentTopic: "foo") discard wakuRelay.publish(topic, message) return true #if not result: diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 8c0a10db7..47013fe81 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -1,13 +1,13 @@ +## Core Waku data types are defined here to avoid recursive dependencies. +## +## TODO Move more common data types here + import chronos, - libp2p/multiaddress, - libp2p/crypto/crypto, - libp2p/peerinfo, - standard_setup + libp2p/[switch, peerinfo, multiaddress, crypto/crypto], + libp2p/protobuf/minprotobuf -# Core Waku data types are defined here to avoid recursive dependencies. -# -# TODO Move more common data types here +# Common data types ----------------------------------------------------------- type Topic* = string @@ -16,7 +16,28 @@ type # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj switch*: Switch - # XXX Unclear if we need this peerInfo*: PeerInfo libp2pTransportLoops*: seq[Future[void]] - messages*: seq[(Topic, Message)] + # TODO Revist messages field indexing as well as if this should be Message or WakuMessage + messages*: seq[(Topic, WakuMessage)] + + WakuMessage* = object + payload*: seq[byte] + contentTopic*: string + +# Encoding and decoding ------------------------------------------------------- + +proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] = + var msg = WakuMessage() + let pb = initProtoBuffer(buffer) + + discard ? pb.getField(1, msg.payload) + discard ? pb.getField(2, msg.contentTopic) + + ok(msg) + +proc encode*(message: WakuMessage): ProtoBuffer = + result = initProtoBuffer() + + result.write(1, message.payload) + result.write(2, message.contentTopic) diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 9f70a081c..1139d23da 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -19,6 +19,7 @@ type PublicKey* = crypto.PublicKey PrivateKey* = crypto.PrivateKey + # TODO Get rid of this and use waku_types one Topic* = waku_types.Topic Message* = seq[byte] ContentFilter* = object @@ -176,30 +177,24 @@ proc unsubscribe*(w: WakuNode, contentFilter: ContentFilter) = ## Status: Not yet implemented. ## TODO Implement. -proc publish*(w: WakuNode, topic: Topic, message: Message) = - ## Publish a `Message` to a PubSub topic. + +proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = + ## Publish a `WakuMessage` to a PubSub topic. `WakuMessage` should contain a + ## `contentTopic` field for light node functionality. This field may be also + ## be omitted. ## - ## Status: Partially implemented. + ## Status: Implemented. ## - ## TODO WakuMessage OR seq[byte]. NOT PubSub Message. - let wakuSub = w.switch.pubSub.get() + + # TODO Basic getter function for relay + let wakuRelay = cast[WakuRelay](node.switch.pubSub.get()) + + # XXX Unclear what the purpose of this is + # Commenting out as it is later expected to be Message type, not WakuMessage + #node.messages.insert((topic, message)) + # XXX Consider awaiting here - discard wakuSub.publish(topic, message) - -proc publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) = - ## Publish a `Message` to a PubSub topic with a specific content filter. - ## Currently this means a `contentTopic`. - ## - ## Status: Not yet implemented. - ## TODO Implement as wrapper around `waku_relay` and `publish`. - ## TODO WakuMessage. Ensure content filter is in it. - - w.messages.insert((contentFilter.contentTopic, message)) - - let wakuSub = w.switch.pubSub.get() - # XXX Consider awaiting here - - discard wakuSub.publish(topic, message) + discard wakuRelay.publish(topic, message) proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse = ## Queries for historical messages. @@ -212,7 +207,8 @@ proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse = if msg[0] notin query.topics: continue - result.messages.insert(msg[1]) + # XXX Unclear how this should be hooked up, Message or WakuMessage? + # result.messages.insert(msg[1]) when isMainModule: let diff --git a/waku/protocol/v2/waku_relay.nim b/waku/protocol/v2/waku_relay.nim index 5656fcb88..d86eaf603 100644 --- a/waku/protocol/v2/waku_relay.nim +++ b/waku/protocol/v2/waku_relay.nim @@ -6,12 +6,10 @@ import std/[strutils, tables], chronos, chronicles, metrics, - libp2p/protocols/pubsub/pubsub, - libp2p/protocols/pubsub/pubsubpeer, - libp2p/protocols/pubsub/floodsub, - libp2p/protocols/pubsub/gossipsub, - libp2p/protocols/pubsub/rpc/[messages], + libp2p/protocols/pubsub/[pubsub, pubsubpeer, floodsub, gossipsub], + libp2p/protocols/pubsub/rpc/messages, libp2p/stream/connection, + ../../node/v2/waku_types, ./filter declarePublicGauge total_messages, "number of messages received" @@ -23,13 +21,10 @@ const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2" type WakuRelay* = ref object of GossipSub - # XXX: just playing - text*: string gossipEnabled*: bool + filters*: Filters - filters: Filters - -method init(w: WakuRelay) = +method init*(w: WakuRelay) = debug "init" proc handler(conn: Connection, proto: string) {.async.} = ## main protocol handler that gets triggered on every @@ -47,8 +42,6 @@ method init(w: WakuRelay) = method initPubSub*(w: WakuRelay) = debug "initWakuRelay" - w.text = "Foobar" - debug "w.text", text = w.text # Not using GossipSub w.gossipEnabled = false @@ -64,16 +57,11 @@ method subscribe*(w: WakuRelay, topic: string, handler: TopicHandler) {.async.} = debug "subscribe", topic=topic - # XXX: Pubsub really - - # XXX: This is what is called, I think if w.gossipEnabled: await procCall GossipSub(w).subscribe(topic, handler) else: await procCall FloodSub(w).subscribe(topic, handler) - -# Subscribing a peer to a specified topic method subscribeTopic*(w: WakuRelay, topic: string, subscribe: bool, @@ -117,8 +105,11 @@ method rpcHandler*(w: WakuRelay, method publish*(w: WakuRelay, topic: string, - data: seq[byte]): Future[int] {.async.} = - debug "publish", topic=topic + message: WakuMessage + ): Future[int] {.async.} = + debug "publish", topic=topic, contentTopic=message.contentTopic + + let data = message.encode().buffer if w.gossipEnabled: return await procCall GossipSub(w).publish(topic, data) @@ -133,7 +124,7 @@ method unsubscribe*(w: WakuRelay, else: await procCall FloodSub(w).unsubscribe(topics) -# GossipSub specific methods +# GossipSub specific methods -------------------------------------------------- method start*(w: WakuRelay) {.async.} = debug "start" if w.gossipEnabled: