diff --git a/waku/protocol/v2/standard_setup.nim b/tests/v2/standard_setup.nim similarity index 87% rename from waku/protocol/v2/standard_setup.nim rename to tests/v2/standard_setup.nim index 85d003e5b..5274db802 100644 --- a/waku/protocol/v2/standard_setup.nim +++ b/tests/v2/standard_setup.nim @@ -8,8 +8,8 @@ import libp2p/transports/[transport, tcptransport], libp2p/muxers/[muxer, mplex/mplex, mplex/types], libp2p/protocols/[identify, secure/secure], - libp2p/protocols/pubsub/[pubsub, gossipsub, floodsub], - waku_protocol + libp2p/protocols/pubsub/[pubsub, gossipsub, floodsub] +# waku_protocol when libp2p_secure == "noise": import libp2p/protocols/secure/noise @@ -42,8 +42,9 @@ proc newStandardSwitch*(privKey = none(PrivateKey), else: # Creating switch from generate node # XXX: Hacky test, hijacking WakuSub here - echo "Using WakuSub here" - PubSub newPubSub(WakuSub, peerInfo, triggerSelf) + #echo "Using WakuSub here" + PubSub newPubSub(FloodSub, peerInfo, triggerSelf) + #PubSub newPubSub(WakuSub, peerInfo, triggerSelf) result = newSwitch(peerInfo, transports, diff --git a/tests/v2/test_waku.nim b/tests/v2/test_waku.nim new file mode 100644 index 000000000..7cc680fe6 --- /dev/null +++ b/tests/v2/test_waku.nim @@ -0,0 +1,88 @@ +# +# Waku +# (c) Copyright 2019 +# Status Research & Development GmbH +# +# Licensed under either of +# Apache License, version 2.0, (LICENSE-APACHEv2) +# MIT license (LICENSE-MIT) + +import unittest, sequtils, options, tables, sets +import chronos +import utils, + libp2p/[errors, + switch, + connection, + stream/bufferstream, + crypto/crypto, + #protocols/pubsub/pubsub, + protocols/pubsub/floodsub, + #protocols/pubsub/rpc/messages, + #protocols/pubsub/rpc/message + ] + +const + StreamTransportTrackerName = "stream.transport" + StreamServerTrackerName = "stream.server" + +# TODO: Start with floodsub here, then move other logic here + +proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = + # turn things deterministic + # this is for testing purposes only + var ceil = 15 + let fsub = cast[FloodSub](sender.pubSub.get()) + while not fsub.floodsub.hasKey(key) or + not fsub.floodsub[key].contains(receiver.peerInfo.id): + await sleepAsync(100.millis) + dec ceil + doAssert(ceil > 0, "waitSub timeout!") + +suite "FloodSub": + teardown: + let + trackers = [ + # getTracker(ConnectionTrackerName), + getTracker(BufferStreamTrackerName), + getTracker(AsyncStreamWriterTrackerName), + getTracker(AsyncStreamReaderTrackerName), + getTracker(StreamTransportTrackerName), + getTracker(StreamServerTrackerName) + ] + for tracker in trackers: + if not isNil(tracker): + check tracker.isLeaked() == false + + test "FloodSub basic publish/subscribe A -> B": + proc runTests(): Future[bool] {.async.} = + var completionFut = newFuture[bool]() + proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = + check topic == "foobar" + completionFut.complete(true) + + let + nodes = generateNodes(2) + nodesFut = await allFinished( + nodes[0].start(), + nodes[1].start() + ) + + await subscribeNodes(nodes) + + await nodes[1].subscribe("foobar", handler) + await waitSub(nodes[0], nodes[1], "foobar") + + await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) + + result = await completionFut.wait(5.seconds) + + await allFuturesThrowing( + nodes[0].stop(), + nodes[1].stop() + ) + + for fut in nodesFut: + let res = fut.read() + await allFuturesThrowing(res) + check: + waitFor(runTests()) == true diff --git a/waku/protocol/v2/utils.nim b/tests/v2/utils.nim similarity index 100% rename from waku/protocol/v2/utils.nim rename to tests/v2/utils.nim diff --git a/waku.nimble b/waku.nimble index da54027bb..fe3df57b3 100644 --- a/waku.nimble +++ b/waku.nimble @@ -51,10 +51,12 @@ task protocol2, "Build the experimental Waku protocol": buildBinary "waku_protocol", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG" task wakutest2, "Build Experimental Waku tests": - buildBinary "waku_test", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG" + test "v2/test_waku" + #buildBinary "waku_test", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG --lineTrace:on --threads:on" task wakunode2, "Build Experimental Waku cli": buildBinary "wakunode", "waku/node/v2/", "-d:chronicles_log_level=TRACE" task wakusim2, "Build Experimental Waku simulation tools": buildBinary "quicksim", "waku/node/v2/", "-d:chronicles_log_level=INFO" + diff --git a/waku/protocol/v2/waku_protocol.nim b/waku/protocol/v2/waku_protocol.nim index bf711fd16..37bdadaf4 100644 --- a/waku/protocol/v2/waku_protocol.nim +++ b/waku/protocol/v2/waku_protocol.nim @@ -34,7 +34,12 @@ method init(w: WakuSub) = ## echo "Incoming WakuSub connection" + assert(w of FloodSub) + echo "asserted w of floodsub" + assert(w of PubSub) + echo "asserted w of pubsub" # Defer to parent object (I think) + # This isn't hit, possibly cause double link here? await w.handleConn(conn, proto) # XXX: Handler hijack FloodSub here? @@ -44,6 +49,8 @@ method init(w: WakuSub) = method initPubSub*(w: WakuSub) = echo "initWakuSub" w.text = "Foobar" + echo "w.text", w.text + echo "ok2" w.init() # Here floodsub field is a topic to remote peer map diff --git a/waku/protocol/v2/waku_test.nim b/waku/protocol/v2/waku_test.nim index 8ce803a6a..2d0929983 100644 --- a/waku/protocol/v2/waku_test.nim +++ b/waku/protocol/v2/waku_test.nim @@ -1,75 +1,101 @@ import unittest import sequtils, tables, options, sets, strutils import chronos, chronicles +import libp2p/errors import waku_protocol +import libp2p/protocols/pubsub/pubsub +import libp2p/protocols/pubsub/floodsub import utils import standard_setup # TODO: Move to test folder -# XXX: Testing in-line proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = # turn things deterministic # this is for testing purposes only var ceil = 15 - let wsub = cast[WakuSub](sender.pubSub.get()) - while not wsub.floodsub.hasKey(key) or - not wsub.floodsub[key].contains(receiver.peerInfo.id): + let fsub = cast[FloodSub](sender.pubSub.get()) + while not fsub.floodsub.hasKey(key) or + not fsub.floodsub[key].contains(receiver.peerInfo.id): await sleepAsync(100.millis) dec ceil doAssert(ceil > 0, "waitSub timeout!") -proc test(): Future[bool] {.async.} = - var completionFut = newFuture[bool]() - proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = - echo "HIT HANDLER", topic - check topic == "foobar" - completionFut.complete(true) +## XXX: Testing in-line +# proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} = +# # turn things deterministic +# # this is for testing purposes only +# var ceil = 15 +# echo "xx1" +# let wsub = cast[FloodSub](sender.pubSub.get()) +# echo "xx2" +# while not wsub.floodsub.hasKey(key) or +# not wsub.floodsub[key].contains(receiver.peerInfo.id): +# await sleepAsync(100.millis) +# dec ceil +# doAssert(ceil > 0, "waitSub timeout!") +# echo "xx3" - # XXX: Right, same issue as before! Switch interface is here - # Though this is newStandardSwitch - # HERE ATM: We need to take this and...do something - # PubSub newPubSub(FloodSub, peerInfo, triggerSelf) - # Goal: Init WakuSub that inherits from FloodSub and then print text field - # Should be fine if we take standard switch code and create from here, I think +# proc test(): Future[bool] {.async.} = +# var completionFut = newFuture[bool]() +# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} = +# echo "HIT HANDLER", topic +# check topic == "foobar" +# completionFut.complete(true) - # How is this done elsewhere? Tests. +# # XXX: Right, same issue as before! Switch interface is here +# # Though this is newStandardSwitch +# # HERE ATM: We need to take this and...do something +# # PubSub newPubSub(FloodSub, peerInfo, triggerSelf) +# # Goal: Init WakuSub that inherits from FloodSub and then print text field +# # Should be fine if we take standard switch code and create from here, I think - let - nodes = generateNodes(2) - nodesFut = await allFinished( - nodes[0].start(), - nodes[1].start() - ) +# # How is this done elsewhere? Tests. - # TODO: We never init this - # Meaning? - # Where did I even get this from? waitSub - #let wsub = cast[WakuSub](nodes[0].pubSub.get()) - #echo "wsub test", $repr(wsub) - # illegal storage access - #echo "wsub field test", wsub.text +# let +# nodes = generateNodes(2) +# nodesFut = await allFinished( +# nodes[0].start(), +# nodes[1].start() +# ) - await subscribeNodes(nodes) +# # TODO: We never init this +# # Meaning? +# # Where did I even get this from? waitSub +# #let wsub = cast[WakuSub](nodes[0].pubSub.get()) +# #echo "wsub test", $repr(wsub) +# # illegal storage access +# #echo "wsub field test", wsub.text - await nodes[1].subscribe("foobar", handler) - await waitSub(nodes[0], nodes[1], "foobar") +# await subscribeNodes(nodes) - await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) +# await nodes[1].subscribe("foobar", handler) +# await waitSub(nodes[0], nodes[1], "foobar") - echo "ok" - result = await completionFut.wait(5.seconds) +# # Is this happening +# # shouldn't subs be >0 here btw +# await nodes[0].publish("foobar", cast[seq[byte]]("Hello!")) -# await allFuturesThrowing( -# nodes[0].stop(), -# nodes[1].stop() -# ) -# -# for fut in nodesFut: -# let res = fut.read() -# await allFuturesThrowing(res) +# echo "ok" +# result = await completionFut.wait(5.seconds) -echo "Starting" -var res = waitFor test() -echo "Done with res: ", $res +# echo "ok3" + +# await allFuturesThrowing( +# nodes[0].stop(), +# nodes[1].stop() +# ) + +# for fut in nodesFut: +# let res = fut.read() +# await allFuturesThrowing(res) + +# echo "Starting" +# var res = waitFor test() +# echo "wtf" +# echo "Done with res: ", $res + + +# # Bleh, need more energy to debug here, should be basic testfloodsub already works +# # SIGSERV error