From 2717209f05f2c03a26cda07995ae41ef9c3ad08e Mon Sep 17 00:00:00 2001 From: Kim De Mey Date: Wed, 2 Sep 2020 05:15:25 +0200 Subject: [PATCH] Move filter.notify to WakuRelay handler and add test (#128) --- tests/v2/test_wakunode.nim | 86 +++++++++++++++++++++++++++++++-- waku/node/v2/rpc/wakurpc.nim | 5 +- waku/node/v2/waku_types.nim | 24 +++++++++ waku/node/v2/wakunode2.nim | 21 ++++---- waku/protocol/v2/waku_relay.nim | 53 ++------------------ 5 files changed, 123 insertions(+), 66 deletions(-) diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim index ef6258cf7..4b3dbc9d4 100644 --- a/tests/v2/test_wakunode.nim +++ b/tests/v2/test_wakunode.nim @@ -5,25 +5,101 @@ import chronicles, chronos, stew/shims/net as stewNet, stew/byteutils, libp2p/crypto/crypto, libp2p/crypto/secp, + libp2p/switch, eth/keys, + ../../waku/protocol/v2/waku_relay, ../../waku/node/v2/[wakunode2, waku_types], ../test_helpers procSuite "WakuNode": + let rng = keys.newRng() asyncTest "Message published with content filter is retrievable": let - rng = keys.newRng() nodeKey = crypto.PrivateKey.random(Secp256k1, rng[])[] node = WakuNode.init(nodeKey, ValidIpAddress.init("0.0.0.0"), Port(60000)) + pubSubTopic = "chat" + contentTopic = "foobar" + contentFilter = ContentFilter(topics: @[contentTopic]) + message = WakuMessage(payload: "hello world".toBytes(), + contentTopic: contentTopic) + + # This could/should become a more fixed handler (at least default) that + # would be enforced on WakuNode level. + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + check: + topic == "chat" + node.filters.notify(msg[]) + + var completionFut = newFuture[bool]() + + # This would be the actual application handler + proc contentHandler(message: seq[byte]) {.gcsafe, closure.} = + let msg = string.fromBytes(message) + check: + msg == "hello world" + completionFut.complete(true) await node.start() + # Subscribe our node to the pubSubTopic where all chat data go onto. + node.subscribe(pubSubTopic, relayHandler) + # Subscribe a contentFilter to trigger a specific application handler when + # WakuMessages with that content are received + node.subscribe(contentFilter, contentHandler) + + node.publish(pubSubTopic, message) + + check: + (await completionFut.withTimeout(5.seconds)) == true + + await node.stop() + + asyncTest "Content filtered publishing over network": let - topic = "foo" + nodeKey1 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node1 = WakuNode.init(nodeKey1, ValidIpAddress.init("0.0.0.0"), + Port(60000)) + nodeKey2 = crypto.PrivateKey.random(Secp256k1, rng[])[] + node2 = WakuNode.init(nodeKey2, ValidIpAddress.init("0.0.0.0"), + Port(60002)) + pubSubTopic = "chat" contentTopic = "foobar" - wakuMessage = WakuMessage(payload: ("hello world").toBytes, + contentFilter = ContentFilter(topics: @[contentTopic]) + message = WakuMessage(payload: "hello world".toBytes(), contentTopic: contentTopic) - node.publish(topic, wakuMessage) - # TODO: Add actual subscribe part with receival of message + var completionFut = newFuture[bool]() + + # This could/should become a more fixed handler (at least default) that + # would be enforced on WakuNode level. + proc relayHandler(topic: string, data: seq[byte]) {.async, gcsafe.} = + let msg = WakuMessage.init(data) + if msg.isOk(): + check: + topic == "chat" + node1.filters.notify(msg[]) + + # This would be the actual application handler + proc contentHandler1(message: seq[byte]) {.gcsafe, closure.} = + let msg = string.fromBytes(message) + check: + msg == "hello world" + completionFut.complete(true) + + await allFutures([node1.start(), node2.start()]) + + # Subscribe our node to the pubSubTopic where all chat data go onto. + node1.subscribe(pubSubTopic, relayHandler) + # Subscribe a contentFilter to trigger a specific application handler when + # WakuMessages with that content are received + node1.subscribe(contentFilter, contentHandler1) + # Connect peers by dialing from node2 to node1 + let conn = await node2.switch.dial(node1.peerInfo, WakuRelayCodec) + node2.publish(pubSubTopic, message) + + check: + (await completionFut.withTimeout(5.seconds)) == true + await allFutures([node1.stop(), node2.stop()]) diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 55b15e6be..dda912c7f 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -26,8 +26,8 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = # XXX Why is casting necessary here but not in Nim node API? let wakuRelay = cast[WakuRelay](node.switch.pubSub.get()) # XXX also future return type - var message = WakuMessage(payload: payload, contentTopic: "foo") - discard wakuRelay.publish(topic, message) + # TODO: Shouldn't we really be doing WakuNode publish here? + discard wakuRelay.publish(topic, payload) return true #if not result: # raise newException(ValueError, "Message could not be posted") @@ -40,6 +40,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = info "Hit subscribe handler", topic=topic, data=data + # TODO: Shouldn't we really be doing WakuNode subscribe here? discard wakuRelay.subscribe(topic, handler) return true #if not result: diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index 47013fe81..7b2ce54d6 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -3,6 +3,7 @@ ## TODO Move more common data types here import + std/tables, chronos, libp2p/[switch, peerinfo, multiaddress, crypto/crypto], libp2p/protobuf/minprotobuf @@ -13,6 +14,20 @@ type Topic* = string Message* = seq[byte] + # TODO: these filter structures can be simplified but like this for now to + # match Node API + # Also, should reuse in filter/wakufilter code, but cyclic imports right now. + ContentFilter* = object + topics*: seq[string] + + ContentFilterHandler* = proc(message: seq[byte]) {.gcsafe, closure.} + + Filter* = object + contentFilter*: ContentFilter + handler*: ContentFilterHandler + + Filters* = Table[string, Filter] + # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj switch*: Switch @@ -20,6 +35,7 @@ type libp2pTransportLoops*: seq[Future[void]] # TODO Revist messages field indexing as well as if this should be Message or WakuMessage messages*: seq[(Topic, WakuMessage)] + filters*: Filters WakuMessage* = object payload*: seq[byte] @@ -41,3 +57,11 @@ proc encode*(message: WakuMessage): ProtoBuffer = result.write(1, message.payload) result.write(2, message.contentTopic) + +proc notify*(filters: Filters, msg: WakuMessage) = + for filter in filters.values: + # TODO: In case of no topics we should either trigger here for all messages, + # or we should not allow such filter to exist in the first place. + if filter.contentFilter.topics.len > 0: + if msg.contentTopic in filter.contentFilter.topics: + filter.handler(msg.payload) diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index e8ce7bf62..54920435e 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -1,5 +1,5 @@ import - std/[strutils, options], + std/[strutils, options, tables], chronos, confutils, json_rpc/rpcserver, metrics, stew/shims/net as stewNet, # TODO: Why do we need eth keys? eth/keys, @@ -24,8 +24,6 @@ type # TODO Get rid of this and use waku_types one Topic* = waku_types.Topic Message* = seq[byte] - ContentFilter* = object - contentTopic*: string HistoryQuery* = object topics*: seq[string] @@ -142,10 +140,11 @@ proc start*(node: WakuNode) {.async.} = ## XXX: this should be /ip4..., / stripped? info "Listening on", full = listenStr -# NOTE TopicHandler is defined in pubsub.nim, roughly: -#type TopicHandler* = proc(topic: string, data: seq[byte]) +proc stop*(node: WakuNode) {.async.} = + let wakuRelay = node.switch.pubSub.get() + await wakuRelay.stop() -type ContentFilterHandler* = proc(contentFilter: ContentFilter, message: Message) + await node.switch.stop() proc subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on @@ -159,14 +158,13 @@ proc subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) = discard wakuRelay.subscribe(topic, handler) proc subscribe*(w: WakuNode, contentFilter: ContentFilter, handler: ContentFilterHandler) = - echo "NYI" ## Subscribes to a ContentFilter. Triggers handler when receiving messages on ## this content filter. ContentFilter is a method that takes some content ## filter, specifically with `ContentTopic`, and a `Message`. The `Message` ## has to match the `ContentTopic`. - ## Status: Not yet implemented. - ## TODO Implement as wrapper around `waku_filter` and `subscribe` above. + # TODO: get some random id, or use the Filter directly as key + w.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler)) proc unsubscribe*(w: WakuNode, topic: Topic) = echo "NYI" @@ -198,8 +196,11 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) = # Commenting out as it is later expected to be Message type, not WakuMessage #node.messages.insert((topic, message)) + debug "publish", topic=topic, contentTopic=message.contentTopic + let data = message.encode().buffer + # XXX Consider awaiting here - discard wakuRelay.publish(topic, message) + discard wakuRelay.publish(topic, data) proc query*(w: WakuNode, query: HistoryQuery): HistoryResponse = ## Queries for historical messages. diff --git a/waku/protocol/v2/waku_relay.nim b/waku/protocol/v2/waku_relay.nim index f99b26c5d..1e858a7e7 100644 --- a/waku/protocol/v2/waku_relay.nim +++ b/waku/protocol/v2/waku_relay.nim @@ -22,7 +22,6 @@ const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2" type WakuRelay* = ref object of GossipSub gossipEnabled*: bool - filters*: Filters method init*(w: WakuRelay) = debug "init" @@ -37,7 +36,6 @@ method init*(w: WakuRelay) = # XXX: Handler hijack GossipSub here? w.handler = handler - w.filters = initTable[string, Filter]() w.codec = WakuRelayCodec method initPubSub*(w: WakuRelay) = @@ -62,59 +60,16 @@ method subscribe*(w: WakuRelay, else: await procCall FloodSub(w).subscribe(topic, handler) -method subscribeTopic*(w: WakuRelay, - topic: string, - subscribe: bool, - peerId: string) {.async, gcsafe.} = - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - info "Hit NOOP handler", topic - - # Currently we are using the libp2p topic here. - # This will need to be change to a Waku topic. - - debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId - - if w.gossipEnabled: - await procCall GossipSub(w).subscribeTopic(topic, subscribe, peerId) - else: - await procCall FloodSub(w).subscribeTopic(topic, subscribe, peerId) - - # XXX: This should distingish light and etc node - # NOTE: Relay subscription - # TODO: If peer is light node - info "about to call subscribe" - await w.subscribe(topic, handler) - -method rpcHandler*(w: WakuRelay, - peer: PubSubPeer, - rpcMsgs: seq[RPCMsg]) {.async.} = - debug "rpcHandler" - - # XXX: Right place? - total_messages.inc() - - if w.gossipEnabled: - await procCall GossipSub(w).rpcHandler(peer, rpcMsgs) - else: - await procCall FloodSub(w).rpcHandler(peer, rpcMsgs) - # XXX: here - - for rpcs in rpcMsgs: - for msg in rpcs.messages: - w.filters.notify(msg) - method publish*(w: WakuRelay, topic: string, - message: WakuMessage + message: seq[byte] ): Future[int] {.async.} = - debug "publish", topic=topic, contentTopic=message.contentTopic - - let data = message.encode().buffer + debug "publish", topic=topic if w.gossipEnabled: - return await procCall GossipSub(w).publish(topic, data) + return await procCall GossipSub(w).publish(topic, message) else: - return await procCall FloodSub(w).publish(topic, data) + return await procCall FloodSub(w).publish(topic, message) method unsubscribe*(w: WakuRelay, topics: seq[TopicPair]) {.async.} =