mirror of https://github.com/waku-org/nwaku.git
Move waku test to test folder; reset with floodsub test
This commit is contained in:
parent
2dda8fe3c0
commit
2e9c2ed949
|
@ -8,8 +8,8 @@ import
|
||||||
libp2p/transports/[transport, tcptransport],
|
libp2p/transports/[transport, tcptransport],
|
||||||
libp2p/muxers/[muxer, mplex/mplex, mplex/types],
|
libp2p/muxers/[muxer, mplex/mplex, mplex/types],
|
||||||
libp2p/protocols/[identify, secure/secure],
|
libp2p/protocols/[identify, secure/secure],
|
||||||
libp2p/protocols/pubsub/[pubsub, gossipsub, floodsub],
|
libp2p/protocols/pubsub/[pubsub, gossipsub, floodsub]
|
||||||
waku_protocol
|
# waku_protocol
|
||||||
|
|
||||||
when libp2p_secure == "noise":
|
when libp2p_secure == "noise":
|
||||||
import libp2p/protocols/secure/noise
|
import libp2p/protocols/secure/noise
|
||||||
|
@ -42,8 +42,9 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
|
||||||
else:
|
else:
|
||||||
# Creating switch from generate node
|
# Creating switch from generate node
|
||||||
# XXX: Hacky test, hijacking WakuSub here
|
# XXX: Hacky test, hijacking WakuSub here
|
||||||
echo "Using WakuSub here"
|
#echo "Using WakuSub here"
|
||||||
PubSub newPubSub(WakuSub, peerInfo, triggerSelf)
|
PubSub newPubSub(FloodSub, peerInfo, triggerSelf)
|
||||||
|
#PubSub newPubSub(WakuSub, peerInfo, triggerSelf)
|
||||||
|
|
||||||
result = newSwitch(peerInfo,
|
result = newSwitch(peerInfo,
|
||||||
transports,
|
transports,
|
|
@ -0,0 +1,88 @@
|
||||||
|
#
|
||||||
|
# Waku
|
||||||
|
# (c) Copyright 2019
|
||||||
|
# Status Research & Development GmbH
|
||||||
|
#
|
||||||
|
# Licensed under either of
|
||||||
|
# Apache License, version 2.0, (LICENSE-APACHEv2)
|
||||||
|
# MIT license (LICENSE-MIT)
|
||||||
|
|
||||||
|
import unittest, sequtils, options, tables, sets
|
||||||
|
import chronos
|
||||||
|
import utils,
|
||||||
|
libp2p/[errors,
|
||||||
|
switch,
|
||||||
|
connection,
|
||||||
|
stream/bufferstream,
|
||||||
|
crypto/crypto,
|
||||||
|
#protocols/pubsub/pubsub,
|
||||||
|
protocols/pubsub/floodsub,
|
||||||
|
#protocols/pubsub/rpc/messages,
|
||||||
|
#protocols/pubsub/rpc/message
|
||||||
|
]
|
||||||
|
|
||||||
|
const
|
||||||
|
StreamTransportTrackerName = "stream.transport"
|
||||||
|
StreamServerTrackerName = "stream.server"
|
||||||
|
|
||||||
|
# TODO: Start with floodsub here, then move other logic here
|
||||||
|
|
||||||
|
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||||
|
# turn things deterministic
|
||||||
|
# this is for testing purposes only
|
||||||
|
var ceil = 15
|
||||||
|
let fsub = cast[FloodSub](sender.pubSub.get())
|
||||||
|
while not fsub.floodsub.hasKey(key) or
|
||||||
|
not fsub.floodsub[key].contains(receiver.peerInfo.id):
|
||||||
|
await sleepAsync(100.millis)
|
||||||
|
dec ceil
|
||||||
|
doAssert(ceil > 0, "waitSub timeout!")
|
||||||
|
|
||||||
|
suite "FloodSub":
|
||||||
|
teardown:
|
||||||
|
let
|
||||||
|
trackers = [
|
||||||
|
# getTracker(ConnectionTrackerName),
|
||||||
|
getTracker(BufferStreamTrackerName),
|
||||||
|
getTracker(AsyncStreamWriterTrackerName),
|
||||||
|
getTracker(AsyncStreamReaderTrackerName),
|
||||||
|
getTracker(StreamTransportTrackerName),
|
||||||
|
getTracker(StreamServerTrackerName)
|
||||||
|
]
|
||||||
|
for tracker in trackers:
|
||||||
|
if not isNil(tracker):
|
||||||
|
check tracker.isLeaked() == false
|
||||||
|
|
||||||
|
test "FloodSub basic publish/subscribe A -> B":
|
||||||
|
proc runTests(): Future[bool] {.async.} =
|
||||||
|
var completionFut = newFuture[bool]()
|
||||||
|
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
|
check topic == "foobar"
|
||||||
|
completionFut.complete(true)
|
||||||
|
|
||||||
|
let
|
||||||
|
nodes = generateNodes(2)
|
||||||
|
nodesFut = await allFinished(
|
||||||
|
nodes[0].start(),
|
||||||
|
nodes[1].start()
|
||||||
|
)
|
||||||
|
|
||||||
|
await subscribeNodes(nodes)
|
||||||
|
|
||||||
|
await nodes[1].subscribe("foobar", handler)
|
||||||
|
await waitSub(nodes[0], nodes[1], "foobar")
|
||||||
|
|
||||||
|
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
|
||||||
|
|
||||||
|
result = await completionFut.wait(5.seconds)
|
||||||
|
|
||||||
|
await allFuturesThrowing(
|
||||||
|
nodes[0].stop(),
|
||||||
|
nodes[1].stop()
|
||||||
|
)
|
||||||
|
|
||||||
|
for fut in nodesFut:
|
||||||
|
let res = fut.read()
|
||||||
|
await allFuturesThrowing(res)
|
||||||
|
check:
|
||||||
|
waitFor(runTests()) == true
|
|
@ -51,10 +51,12 @@ task protocol2, "Build the experimental Waku protocol":
|
||||||
buildBinary "waku_protocol", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG"
|
buildBinary "waku_protocol", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG"
|
||||||
|
|
||||||
task wakutest2, "Build Experimental Waku tests":
|
task wakutest2, "Build Experimental Waku tests":
|
||||||
buildBinary "waku_test", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG"
|
test "v2/test_waku"
|
||||||
|
#buildBinary "waku_test", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG --lineTrace:on --threads:on"
|
||||||
|
|
||||||
task wakunode2, "Build Experimental Waku cli":
|
task wakunode2, "Build Experimental Waku cli":
|
||||||
buildBinary "wakunode", "waku/node/v2/", "-d:chronicles_log_level=TRACE"
|
buildBinary "wakunode", "waku/node/v2/", "-d:chronicles_log_level=TRACE"
|
||||||
|
|
||||||
task wakusim2, "Build Experimental Waku simulation tools":
|
task wakusim2, "Build Experimental Waku simulation tools":
|
||||||
buildBinary "quicksim", "waku/node/v2/", "-d:chronicles_log_level=INFO"
|
buildBinary "quicksim", "waku/node/v2/", "-d:chronicles_log_level=INFO"
|
||||||
|
|
||||||
|
|
|
@ -34,7 +34,12 @@ method init(w: WakuSub) =
|
||||||
##
|
##
|
||||||
|
|
||||||
echo "Incoming WakuSub connection"
|
echo "Incoming WakuSub connection"
|
||||||
|
assert(w of FloodSub)
|
||||||
|
echo "asserted w of floodsub"
|
||||||
|
assert(w of PubSub)
|
||||||
|
echo "asserted w of pubsub"
|
||||||
# Defer to parent object (I think)
|
# Defer to parent object (I think)
|
||||||
|
# This isn't hit, possibly cause double link here?
|
||||||
await w.handleConn(conn, proto)
|
await w.handleConn(conn, proto)
|
||||||
|
|
||||||
# XXX: Handler hijack FloodSub here?
|
# XXX: Handler hijack FloodSub here?
|
||||||
|
@ -44,6 +49,8 @@ method init(w: WakuSub) =
|
||||||
method initPubSub*(w: WakuSub) =
|
method initPubSub*(w: WakuSub) =
|
||||||
echo "initWakuSub"
|
echo "initWakuSub"
|
||||||
w.text = "Foobar"
|
w.text = "Foobar"
|
||||||
|
echo "w.text", w.text
|
||||||
|
echo "ok2"
|
||||||
w.init()
|
w.init()
|
||||||
|
|
||||||
# Here floodsub field is a topic to remote peer map
|
# Here floodsub field is a topic to remote peer map
|
||||||
|
|
|
@ -1,75 +1,101 @@
|
||||||
import unittest
|
import unittest
|
||||||
import sequtils, tables, options, sets, strutils
|
import sequtils, tables, options, sets, strutils
|
||||||
import chronos, chronicles
|
import chronos, chronicles
|
||||||
|
import libp2p/errors
|
||||||
|
|
||||||
import waku_protocol
|
import waku_protocol
|
||||||
|
import libp2p/protocols/pubsub/pubsub
|
||||||
|
import libp2p/protocols/pubsub/floodsub
|
||||||
import utils
|
import utils
|
||||||
import standard_setup
|
import standard_setup
|
||||||
|
|
||||||
# TODO: Move to test folder
|
# TODO: Move to test folder
|
||||||
|
|
||||||
# XXX: Testing in-line
|
|
||||||
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||||
# turn things deterministic
|
# turn things deterministic
|
||||||
# this is for testing purposes only
|
# this is for testing purposes only
|
||||||
var ceil = 15
|
var ceil = 15
|
||||||
let wsub = cast[WakuSub](sender.pubSub.get())
|
let fsub = cast[FloodSub](sender.pubSub.get())
|
||||||
while not wsub.floodsub.hasKey(key) or
|
while not fsub.floodsub.hasKey(key) or
|
||||||
not wsub.floodsub[key].contains(receiver.peerInfo.id):
|
not fsub.floodsub[key].contains(receiver.peerInfo.id):
|
||||||
await sleepAsync(100.millis)
|
await sleepAsync(100.millis)
|
||||||
dec ceil
|
dec ceil
|
||||||
doAssert(ceil > 0, "waitSub timeout!")
|
doAssert(ceil > 0, "waitSub timeout!")
|
||||||
|
|
||||||
proc test(): Future[bool] {.async.} =
|
## XXX: Testing in-line
|
||||||
var completionFut = newFuture[bool]()
|
# proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
# # turn things deterministic
|
||||||
echo "HIT HANDLER", topic
|
# # this is for testing purposes only
|
||||||
check topic == "foobar"
|
# var ceil = 15
|
||||||
completionFut.complete(true)
|
# echo "xx1"
|
||||||
|
# let wsub = cast[FloodSub](sender.pubSub.get())
|
||||||
|
# echo "xx2"
|
||||||
|
# while not wsub.floodsub.hasKey(key) or
|
||||||
|
# not wsub.floodsub[key].contains(receiver.peerInfo.id):
|
||||||
|
# await sleepAsync(100.millis)
|
||||||
|
# dec ceil
|
||||||
|
# doAssert(ceil > 0, "waitSub timeout!")
|
||||||
|
# echo "xx3"
|
||||||
|
|
||||||
# XXX: Right, same issue as before! Switch interface is here
|
# proc test(): Future[bool] {.async.} =
|
||||||
# Though this is newStandardSwitch
|
# var completionFut = newFuture[bool]()
|
||||||
# HERE ATM: We need to take this and...do something
|
# proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||||
# PubSub newPubSub(FloodSub, peerInfo, triggerSelf)
|
# echo "HIT HANDLER", topic
|
||||||
# Goal: Init WakuSub that inherits from FloodSub and then print text field
|
# check topic == "foobar"
|
||||||
# Should be fine if we take standard switch code and create from here, I think
|
# completionFut.complete(true)
|
||||||
|
|
||||||
# How is this done elsewhere? Tests.
|
# # XXX: Right, same issue as before! Switch interface is here
|
||||||
|
# # Though this is newStandardSwitch
|
||||||
|
# # HERE ATM: We need to take this and...do something
|
||||||
|
# # PubSub newPubSub(FloodSub, peerInfo, triggerSelf)
|
||||||
|
# # Goal: Init WakuSub that inherits from FloodSub and then print text field
|
||||||
|
# # Should be fine if we take standard switch code and create from here, I think
|
||||||
|
|
||||||
let
|
# # How is this done elsewhere? Tests.
|
||||||
nodes = generateNodes(2)
|
|
||||||
nodesFut = await allFinished(
|
|
||||||
nodes[0].start(),
|
|
||||||
nodes[1].start()
|
|
||||||
)
|
|
||||||
|
|
||||||
# TODO: We never init this
|
# let
|
||||||
# Meaning?
|
# nodes = generateNodes(2)
|
||||||
# Where did I even get this from? waitSub
|
# nodesFut = await allFinished(
|
||||||
#let wsub = cast[WakuSub](nodes[0].pubSub.get())
|
# nodes[0].start(),
|
||||||
#echo "wsub test", $repr(wsub)
|
# nodes[1].start()
|
||||||
# illegal storage access
|
# )
|
||||||
#echo "wsub field test", wsub.text
|
|
||||||
|
|
||||||
await subscribeNodes(nodes)
|
# # TODO: We never init this
|
||||||
|
# # Meaning?
|
||||||
|
# # Where did I even get this from? waitSub
|
||||||
|
# #let wsub = cast[WakuSub](nodes[0].pubSub.get())
|
||||||
|
# #echo "wsub test", $repr(wsub)
|
||||||
|
# # illegal storage access
|
||||||
|
# #echo "wsub field test", wsub.text
|
||||||
|
|
||||||
await nodes[1].subscribe("foobar", handler)
|
# await subscribeNodes(nodes)
|
||||||
await waitSub(nodes[0], nodes[1], "foobar")
|
|
||||||
|
|
||||||
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
|
# await nodes[1].subscribe("foobar", handler)
|
||||||
|
# await waitSub(nodes[0], nodes[1], "foobar")
|
||||||
|
|
||||||
echo "ok"
|
# # Is this happening
|
||||||
result = await completionFut.wait(5.seconds)
|
# # shouldn't subs be >0 here btw
|
||||||
|
# await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
|
||||||
|
|
||||||
# await allFuturesThrowing(
|
# echo "ok"
|
||||||
# nodes[0].stop(),
|
# result = await completionFut.wait(5.seconds)
|
||||||
# nodes[1].stop()
|
|
||||||
# )
|
|
||||||
#
|
|
||||||
# for fut in nodesFut:
|
|
||||||
# let res = fut.read()
|
|
||||||
# await allFuturesThrowing(res)
|
|
||||||
|
|
||||||
echo "Starting"
|
# echo "ok3"
|
||||||
var res = waitFor test()
|
|
||||||
echo "Done with res: ", $res
|
# await allFuturesThrowing(
|
||||||
|
# nodes[0].stop(),
|
||||||
|
# nodes[1].stop()
|
||||||
|
# )
|
||||||
|
|
||||||
|
# for fut in nodesFut:
|
||||||
|
# let res = fut.read()
|
||||||
|
# await allFuturesThrowing(res)
|
||||||
|
|
||||||
|
# echo "Starting"
|
||||||
|
# var res = waitFor test()
|
||||||
|
# echo "wtf"
|
||||||
|
# echo "Done with res: ", $res
|
||||||
|
|
||||||
|
|
||||||
|
# # Bleh, need more energy to debug here, should be basic testfloodsub already works
|
||||||
|
# # SIGSERV error
|
||||||
|
|
Loading…
Reference in New Issue