diff --git a/tests/all_tests_v2.nim b/tests/all_tests_v2.nim index efaa1f09b..e24ff1ad5 100644 --- a/tests/all_tests_v2.nim +++ b/tests/all_tests_v2.nim @@ -1,3 +1,4 @@ import # Waku v2 tests - ./v2/test_waku \ No newline at end of file + ./v2/test_waku, + ./v2/test_wakunode diff --git a/tests/v1/test_helpers.nim b/tests/test_helpers.nim similarity index 89% rename from tests/v1/test_helpers.nim rename to tests/test_helpers.nim index 4f28733fe..6c910463b 100644 --- a/tests/v1/test_helpers.nim +++ b/tests/test_helpers.nim @@ -1,5 +1,5 @@ import - unittest, chronos, strutils, + unittest, chronos, eth/[keys, p2p] var nextPort = 30303 @@ -30,5 +30,3 @@ template procSuite*(name, body: untyped) = body suitePayload() - -template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] diff --git a/tests/v1/test_rpc_waku.nim b/tests/v1/test_rpc_waku.nim index 32c68e5a7..753508782 100644 --- a/tests/v1/test_rpc_waku.nim +++ b/tests/v1/test_rpc_waku.nim @@ -1,11 +1,11 @@ import - unittest, options, os, stew/byteutils, + unittest, options, os, stew/byteutils, strutils, json_rpc/[rpcserver, rpcclient], eth/common as eth_common, eth/[rlp, keys, p2p], ../../waku/protocol/v1/waku_protocol, - ../../waku/node/v1/rpc/[hexstrings, rpc_types, waku, key_storage], - ./test_helpers + ../../waku/node/v1/rpc/[hexstrings, rpc_types, waku, key_storage] +template sourceDir*: string = currentSourcePath.rsplit(DirSep, 1)[0] ## Generate client convenience marshalling wrappers from forward declarations ## For testing, ethcallsigs needs to be kept in sync with ../waku/node/v1/rpc/waku const sigPath = sourceDir / ParDir / ParDir / "waku" / "node" / "v1" / "rpc" / "wakucallsigs.nim" diff --git a/tests/v1/test_waku_bridge.nim b/tests/v1/test_waku_bridge.nim index d6b71bc8c..125dd823f 100644 --- a/tests/v1/test_waku_bridge.nim +++ b/tests/v1/test_waku_bridge.nim @@ -12,7 +12,7 @@ import eth/p2p/rlpx_protocols/whisper_protocol as whisper, ../../waku/protocol/v1/waku_protocol as waku, ../../waku/protocol/v1/waku_bridge, - ./test_helpers + ../test_helpers let safeTTL = 5'u32 let waitInterval = waku.messageInterval + 150.milliseconds diff --git a/tests/v1/test_waku_connect.nim b/tests/v1/test_waku_connect.nim index 813e89326..22f89b76b 100644 --- a/tests/v1/test_waku_connect.nim +++ b/tests/v1/test_waku_connect.nim @@ -10,7 +10,7 @@ import sequtils, tables, unittest, chronos, eth/[keys, p2p], eth/p2p/peer_pool, ../../waku/protocol/v1/waku_protocol, - ./test_helpers + ../test_helpers const safeTTL = 5'u32 diff --git a/tests/v1/test_waku_mail.nim b/tests/v1/test_waku_mail.nim index 5e133bcfd..62cbed0b8 100644 --- a/tests/v1/test_waku_mail.nim +++ b/tests/v1/test_waku_mail.nim @@ -2,7 +2,7 @@ import unittest, chronos, tables, sequtils, times, eth/[p2p, async_utils], eth/p2p/peer_pool, ../../waku/protocol/v1/[waku_protocol, waku_mail], - ./test_helpers + ../test_helpers const transmissionTimeout = chronos.milliseconds(100) diff --git a/tests/v2/test_waku.nim b/tests/v2/test_waku.nim index 2db6a96e8..93faea230 100644 --- a/tests/v2/test_waku.nim +++ b/tests/v2/test_waku.nim @@ -18,6 +18,8 @@ import utils, libp2p/protocols/pubsub/floodsub import ../../waku/protocol/v2/waku_protocol2 +import ../test_helpers + const StreamTransportTrackerName = "stream.transport" StreamServerTrackerName = "stream.server" @@ -49,7 +51,7 @@ proc decodeMessage(data: seq[byte]): string = result = "" let res = pb.getField(1, result) -suite "FloodSub": +procSuite "FloodSub": teardown: let trackers = [ @@ -64,41 +66,41 @@ suite "FloodSub": if not isNil(tracker): check tracker.isLeaked() == false - test "FloodSub basic publish/subscribe A -> B": - proc runTests(): Future[bool] {.async.} = - var completionFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - debug "Hit handler", topic - let msg = decodeMessage(data) - check topic == "foobar" - check msg == "hello" - completionFut.complete(true) + asyncTest "FloodSub basic publish/subscribe A -> B": + var completionFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + debug "Hit handler", topic + let msg = decodeMessage(data) + check topic == "foobar" + check msg == "hello" + completionFut.complete(true) - let - nodes = generateNodes(2) - nodesFut = await allFinished( - nodes[0].start(), - nodes[1].start() - ) - - await subscribeNodes(nodes) - - await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") - - # TODO: you might want to check the value here - let msg = message() - discard await nodes[0].publish("foobar", msg) - - result = await completionFut.wait(5.seconds) - - await allFuturesThrowing( - nodes[0].stop(), - nodes[1].stop() + let + nodes = generateNodes(2) + nodesFut = await allFinished( + nodes[0].start(), + nodes[1].start() ) - for fut in nodesFut: - let res = fut.read() - await allFuturesThrowing(res) + await subscribeNodes(nodes) + + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + # TODO: you might want to check the value here + let msg = message() + discard await nodes[0].publish("foobar", msg) + + let result = await completionFut.wait(5.seconds) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + for fut in nodesFut: + let res = fut.read() + await allFuturesThrowing(res) + check: - waitFor(runTests()) == true + result == true diff --git a/tests/v2/test_wakunode.nim b/tests/v2/test_wakunode.nim new file mode 100644 index 000000000..82a8db64f --- /dev/null +++ b/tests/v2/test_wakunode.nim @@ -0,0 +1,28 @@ +import unittest + +import confutils, chronicles, chronos, os + +import stew/shims/net as stewNet +import libp2p/crypto/crypto +import libp2p/crypto/secp +import eth/keys +import json_rpc/[rpcclient, rpcserver] + +import ../../waku/node/v2/[config, wakunode2] + +import ../test_helpers + +procSuite "WakuNode": + asyncTest "Message published with content filter is retrievable": + let conf = WakuNodeConf.load() + let node = await WakuNode.init(conf) + + let topic = "foobar" + + let message = cast[seq[byte]]("hello world") + node.publish(topic, ContentFilter(contentTopic: topic), message) + + let response = node.query(HistoryQuery(topics: @[topic])) + check: + response.messages.len == 1 + response.messages[0] == message diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 6f21ed93d..ca17b40b7 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -1,5 +1,5 @@ import - confutils, config, strutils, chronos, json_rpc/rpcserver, metrics, + confutils, config, strutils, chronos, json_rpc/rpcserver, metrics, sequtils, chronicles/topics_registry, # TODO: What? Need this for setLoglevel, weird. eth/[keys, p2p], eth/net/nat, eth/p2p/[discovery, enode], @@ -22,12 +22,24 @@ type PublicKey* = crypto.PublicKey PrivateKey* = crypto.PrivateKey + Topic* = string + Message* = seq[byte] + ContentFilter* = object + contentTopic*: string + + HistoryQuery* = object + topics*: seq[string] + + HistoryResponse* = object + messages*: seq[Message] + # NOTE: based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj switch*: Switch # XXX: Unclear if we need this peerInfo*: PeerInfo libp2pTransportLoops*: seq[Future[void]] + messages: seq[(Topic, Message)] const clientId = "Nimbus waku node" @@ -230,11 +242,6 @@ method init*(T: type WakuNode, conf: WakuNodeConf): Future[T] {.async.} = await node.start(conf) return node -type Topic* = string -type Message* = seq[byte] -type ContentFilter* = object - contentTopic*: string - # TODO Update TopicHandler to take Message, not seq[byte] data #type TopicHandler* = proc(topic: Topic, message: Message) # Currently this is using the one in pubsub.nim, roughly: @@ -242,12 +249,6 @@ type ContentFilter* = object type ContentFilterHandler* = proc(contentFilter: ContentFilter, message: Message) -type HistoryQuery = object - xxx*: seq[byte] - -type HistoryResponse = object - xxx*: seq[byte] - method subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) = ## Subscribes to a PubSub topic. Triggers handler when receiving messages on ## this topic. TopicHandler is a method that takes a topic and a `Message`. @@ -295,7 +296,6 @@ method publish*(w: WakuNode, topic: Topic, message: Message) = discard wakuSub.publish(topic, message) method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message: Message) = - echo "NYI" ## Publish a `Message` to a PubSub topic with a specific content filter. ## Currently this means a `contentTopic`. ## @@ -303,13 +303,28 @@ method publish*(w: WakuNode, topic: Topic, contentFilter: ContentFilter, message ## TODO Implement as wrapper around `waku_protocol` and `publish`, and ensure ## Message is passed, not `data` field. Also ensure content filter is in ## Message. + ## + + w.messages.insert((contentFilter.contentTopic, message)) + + let wakuSub = w.switch.pubSub.get() + # XXX Consider awaiting here + + # @TODO MAKE SURE WE PASS CONTENT FILTER + discard wakuSub.publish(topic, message) method query*(w: WakuNode, query: HistoryQuery): HistoryResponse = - echo "NYI" ## Queries for historical messages. ## ## Status: Not yet implemented. ## TODO Implement as wrapper around `waku_protocol` and send `RPCMsg`. + result.messages = newSeq[Message]() + + for msg in w.messages: + if msg[0] notin query.topics: + continue + + result.messages.insert(msg[1]) when isMainModule: let conf = WakuNodeConf.load()