mirror of
https://github.com/waku-org/nwaku.git
synced 2025-01-13 16:25:00 +00:00
Move waku test out; basic protocol handler
This commit is contained in:
parent
824bef65ad
commit
edd0074ff7
5
Makefile
5
Makefile
@ -80,6 +80,11 @@ wakusim2: | build deps wakunode2
|
||||
protocol2:
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim protocol2 $(NIM_PARAMS) waku.nims
|
||||
|
||||
wakutest2:
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim wakutest2 $(NIM_PARAMS) waku.nims
|
||||
|
||||
# symlink
|
||||
waku.nims:
|
||||
ln -s waku.nimble $@
|
||||
|
@ -50,6 +50,9 @@ task wakusim, "Build Waku simulation tools":
|
||||
task protocol2, "Build the experimental Waku protocol":
|
||||
buildBinary "waku_protocol", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG"
|
||||
|
||||
task wakutest2, "Build Experimental Waku tests":
|
||||
buildBinary "waku_test", "waku/protocol/v2/", "-d:chronicles_log_level=DEBUG"
|
||||
|
||||
task wakunode2, "Build Experimental Waku cli":
|
||||
buildBinary "wakunode", "waku/node/v2/", "-d:chronicles_log_level=TRACE"
|
||||
|
||||
|
@ -8,7 +8,8 @@ import
|
||||
libp2p/transports/[transport, tcptransport],
|
||||
libp2p/muxers/[muxer, mplex/mplex, mplex/types],
|
||||
libp2p/protocols/[identify, secure/secure],
|
||||
libp2p/protocols/pubsub/[pubsub, gossipsub, floodsub]
|
||||
libp2p/protocols/pubsub/[pubsub, gossipsub, floodsub],
|
||||
waku_protocol
|
||||
|
||||
when libp2p_secure == "noise":
|
||||
import libp2p/protocols/secure/noise
|
||||
@ -40,7 +41,8 @@ proc newStandardSwitch*(privKey = none(PrivateKey),
|
||||
PubSub newPubSub(GossipSub, peerInfo, triggerSelf)
|
||||
else:
|
||||
# Creating switch from generate node
|
||||
PubSub newPubSub(FloodSub, peerInfo, triggerSelf)
|
||||
# XXX: Hacky test, hijacking WakuSub here
|
||||
PubSub newPubSub(WakuSub, peerInfo, triggerSelf)
|
||||
|
||||
result = newSwitch(peerInfo,
|
||||
transports,
|
||||
|
@ -6,18 +6,16 @@
|
||||
import unittest
|
||||
import sequtils, tables, options, sets, strutils
|
||||
import chronos, chronicles
|
||||
import ../../vendor/nim-libp2p/libp2p/protocols/pubsub/pubsub,
|
||||
../../vendor/nim-libp2p/libp2p/protocols/pubsub/pubsubpeer,
|
||||
../../vendor/nim-libp2p/libp2p/protocols/pubsub/floodsub
|
||||
import ../../vendor/nim-libp2p/tests/pubsub/utils
|
||||
|
||||
# XXX: Hacky in-line test for now
|
||||
import libp2p/protocols/pubsub/pubsub,
|
||||
libp2p/protocols/pubsub/pubsubpeer,
|
||||
libp2p/protocols/pubsub/floodsub,
|
||||
libp2p/connection
|
||||
|
||||
logScope:
|
||||
topic = "WakuSub"
|
||||
|
||||
# For spike
|
||||
const WakuSubCodec* = "/WakuSub/0.0.1"
|
||||
#For spike
|
||||
const WakuSubCodec* = "/wakusub/0.0.1"
|
||||
|
||||
#const wakuVersionStr = "2.0.0-alpha1"
|
||||
|
||||
@ -28,6 +26,30 @@ type
|
||||
# XXX: just playing
|
||||
text*: string
|
||||
|
||||
method init(w: WakuSub) =
|
||||
proc handler(conn: Connection, proto: string) {.async.} =
|
||||
## main protocol handler that gets triggered on every
|
||||
## connection for a protocol string
|
||||
## e.g. ``/wakusub/0.0.1``, etc...
|
||||
##
|
||||
|
||||
echo "Incoming WakuSub connection"
|
||||
# Defer to parent object (I think)
|
||||
await w.handleConn(conn, proto)
|
||||
|
||||
# XXX: Handler hijack FloodSub here?
|
||||
w.handler = handler
|
||||
w.codec = WakuSubCodec
|
||||
|
||||
# Never hit, it seems like BUT initPubSub is hit
|
||||
method initWakuSub*(w: WakuSub) =
|
||||
echo "initWakuSub"
|
||||
w.text = "Foobar"
|
||||
w.init()
|
||||
|
||||
# Here floodsub field is a topic to remote peer map
|
||||
# We also have a seen message forwarded to peers
|
||||
|
||||
# method subscribeTopic
|
||||
# method handleDisconnect
|
||||
# method rpcHandler
|
||||
@ -44,52 +66,3 @@ type
|
||||
# Can also do in-line here
|
||||
|
||||
|
||||
# XXX: Testing in-line
|
||||
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||
# turn things deterministic
|
||||
# this is for testing purposes only
|
||||
var ceil = 15
|
||||
# TODO: Cast WakuSub
|
||||
let fsub = cast[FloodSub](sender.pubSub.get())
|
||||
while not fsub.floodsub.hasKey(key) or
|
||||
not fsub.floodsub[key].contains(receiver.peerInfo.id):
|
||||
await sleepAsync(100.millis)
|
||||
dec ceil
|
||||
doAssert(ceil > 0, "waitSub timeout!")
|
||||
|
||||
proc test(): Future[bool] {.async.} =
|
||||
var completionFut = newFuture[bool]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
echo "HIT HANDLER", topic
|
||||
check topic == "foobar"
|
||||
completionFut.complete(true)
|
||||
|
||||
let
|
||||
nodes = generateNodes(2)
|
||||
nodesFut = await allFinished(
|
||||
nodes[0].start(),
|
||||
nodes[1].start()
|
||||
)
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
await nodes[1].subscribe("foobar", handler)
|
||||
await waitSub(nodes[0], nodes[1], "foobar")
|
||||
|
||||
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
|
||||
|
||||
echo "ok"
|
||||
result = await completionFut.wait(5.seconds)
|
||||
|
||||
# await allFuturesThrowing(
|
||||
# nodes[0].stop(),
|
||||
# nodes[1].stop()
|
||||
# )
|
||||
#
|
||||
# for fut in nodesFut:
|
||||
# let res = fut.read()
|
||||
# await allFuturesThrowing(res)
|
||||
|
||||
echo "Starting"
|
||||
var res = waitFor test()
|
||||
echo "Done with res: ", $res
|
||||
|
75
waku/protocol/v2/waku_test.nim
Normal file
75
waku/protocol/v2/waku_test.nim
Normal file
@ -0,0 +1,75 @@
|
||||
import unittest
|
||||
import sequtils, tables, options, sets, strutils
|
||||
import chronos, chronicles
|
||||
|
||||
import waku_protocol
|
||||
import utils
|
||||
import standard_setup
|
||||
|
||||
# TODO: Move to test folder
|
||||
|
||||
# XXX: Testing in-line
|
||||
proc waitSub(sender, receiver: auto; key: string) {.async, gcsafe.} =
|
||||
# turn things deterministic
|
||||
# this is for testing purposes only
|
||||
var ceil = 15
|
||||
let wsub = cast[WakuSub](sender.pubSub.get())
|
||||
while not wsub.floodsub.hasKey(key) or
|
||||
not wsub.floodsub[key].contains(receiver.peerInfo.id):
|
||||
await sleepAsync(100.millis)
|
||||
dec ceil
|
||||
doAssert(ceil > 0, "waitSub timeout!")
|
||||
|
||||
proc test(): Future[bool] {.async.} =
|
||||
var completionFut = newFuture[bool]()
|
||||
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
|
||||
echo "HIT HANDLER", topic
|
||||
check topic == "foobar"
|
||||
completionFut.complete(true)
|
||||
|
||||
# XXX: Right, same issue as before! Switch interface is here
|
||||
# Though this is newStandardSwitch
|
||||
# HERE ATM: We need to take this and...do something
|
||||
# PubSub newPubSub(FloodSub, peerInfo, triggerSelf)
|
||||
# Goal: Init WakuSub that inherits from FloodSub and then print text field
|
||||
# Should be fine if we take standard switch code and create from here, I think
|
||||
|
||||
# How is this done elsewhere? Tests.
|
||||
|
||||
let
|
||||
nodes = generateNodes(2)
|
||||
nodesFut = await allFinished(
|
||||
nodes[0].start(),
|
||||
nodes[1].start()
|
||||
)
|
||||
|
||||
# TODO: We never init this
|
||||
# Meaning?
|
||||
# Where did I even get this from? waitSub
|
||||
#let wsub = cast[WakuSub](nodes[0].pubSub.get())
|
||||
#echo "wsub test", $repr(wsub)
|
||||
# illegal storage access
|
||||
#echo "wsub field test", wsub.text
|
||||
|
||||
await subscribeNodes(nodes)
|
||||
|
||||
await nodes[1].subscribe("foobar", handler)
|
||||
await waitSub(nodes[0], nodes[1], "foobar")
|
||||
|
||||
await nodes[0].publish("foobar", cast[seq[byte]]("Hello!"))
|
||||
|
||||
echo "ok"
|
||||
result = await completionFut.wait(5.seconds)
|
||||
|
||||
# await allFuturesThrowing(
|
||||
# nodes[0].stop(),
|
||||
# nodes[1].stop()
|
||||
# )
|
||||
#
|
||||
# for fut in nodesFut:
|
||||
# let res = fut.read()
|
||||
# await allFuturesThrowing(res)
|
||||
|
||||
echo "Starting"
|
||||
var res = waitFor test()
|
||||
echo "Done with res: ", $res
|
Loading…
x
Reference in New Issue
Block a user