Bump submodules (#157)

* Upgrade all submodules

* Use stock standard_setup and remove our version

Switch no longer relies on Pubsub argument

* Fix peerId

* Add reference to WakuRelay in WakuNode

* Use WakuRelay ref directly instead of via switch

* Tweak standard switch sig

* Fix start_network peerid

* Import nim-libp2p utils test

* Use WakuRelay in test utils

* Fix utils imports

* Tweak

* Fix trigger self test

* Disable broken filter test

* Fix and amend logscope topics

* Make subscribe calls async to use await

* Add debug stuff to nimble file

* Await for subscribe content

* Sleeping in tests

* Local checkout fixes

* XXX: Try to use .PubSub on WakuRelay

* Revert "XXX: Try to use .PubSub on WakuRelay"

This reverts commit 3a3139e4cfbb5ae9500fd30b2e79c676ccc4a53b.

* Only using gossip seems to work

Subscribe for floodsub broken

* Fix await in examples

* Get rid of double publish, still need sleep
This commit is contained in:
Oskar Thorén 2020-09-16 12:23:10 +08:00 committed by GitHub
parent 4906149b27
commit 72b6e7debc
30 changed files with 165 additions and 140 deletions

View File

@ -32,7 +32,7 @@ proc runBackground() {.async.} =
let message = WakuMessage.init(data).value
let payload = cast[string](message.payload)
info "Hit subscribe handler", topic=topic, payload=payload, contentTopic=message.contentTopic
node.subscribe(topic, handler)
await node.subscribe(topic, handler)
# Publish to a topic
let payload = cast[seq[byte]]("hello world")

View File

@ -3,5 +3,6 @@ import
# TODO: enable this when it is altered into a proper waku relay test
# ./v2/test_waku,
./v2/test_wakunode,
./v2/test_waku_store,
./v2/test_waku_filter
./v2/test_waku_store
# NOTE: Disabling broken filter protocol, we don't rely on it for Nangang
# ./v2/test_waku_filter

View File

@ -45,10 +45,10 @@ procSuite "WakuNode":
await node.start()
# Subscribe our node to the pubSubTopic where all chat data go onto.
node.subscribe(pubSubTopic, relayHandler)
await node.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node.subscribe(contentFilter, contentHandler)
await node.subscribe(contentFilter, contentHandler)
node.publish(pubSubTopic, message)
@ -92,12 +92,18 @@ procSuite "WakuNode":
await allFutures([node1.start(), node2.start()])
# Subscribe our node to the pubSubTopic where all chat data go onto.
node1.subscribe(pubSubTopic, relayHandler)
await node1.subscribe(pubSubTopic, relayHandler)
# Subscribe a contentFilter to trigger a specific application handler when
# WakuMessages with that content are received
node1.subscribe(contentFilter, contentHandler1)
await node1.subscribe(contentFilter, contentHandler1)
# Connect peers by dialing from node2 to node1
let conn = await node2.switch.dial(node1.peerInfo, WakuRelayCodec)
#
# We need to sleep to allow the subscription to go through
info "Going to sleep to allow subscribe to go through"
await sleepAsync(2000.millis)
info "Waking up and publishing"
node2.publish(pubSubTopic, message)
check:

View File

@ -1,16 +1,76 @@
# compile time options here
const
libp2p_pubsub_sign {.booldefine.} = true
libp2p_pubsub_verify {.booldefine.} = true
import random
import chronos
import ../../waku/node/v2/standard_setup
import libp2p/[standard_setup,
protocols/pubsub/pubsub,
protocols/pubsub/floodsub,
protocols/pubsub/gossipsub,
protocols/secure/secure]
import ../../waku/protocol/v2/waku_relay,
../../waku/node/v2/waku_types
export standard_setup
proc generateNodes*(num: Natural, gossip: bool = false): seq[Switch] =
for i in 0..<num:
result.add(newStandardSwitch(gossip = gossip))
randomize()
proc subscribeNodes*(nodes: seq[Switch]) {.async.} =
var dials: seq[Future[void]]
proc generateNodes*(
num: Natural,
secureManagers: openarray[SecureProtocol] = [
# array cos order matters
SecureProtocol.Secio,
SecureProtocol.Noise,
],
msgIdProvider: MsgIdProvider = nil,
gossip: bool = false,
triggerSelf: bool = false,
verifySignature: bool = libp2p_pubsub_verify,
sign: bool = libp2p_pubsub_sign): seq[PubSub] =
for i in 0..<num:
let switch = newStandardSwitch(secureManagers = secureManagers)
let wakuRelay = WakuRelay.init(
switch = switch,
triggerSelf = triggerSelf,
verifySignature = verifySignature,
sign = sign,
# XXX unclear why including this causes a compiler error, it is part of WakuRelay type
#gossipEnabled = gossip,
msgIdProvider = msgIdProvider).PubSub
switch.mount(wakuRelay)
result.add(wakuRelay)
proc subscribeNodes*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes:
for node in nodes:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
dials.add(dialer.connect(node.peerInfo))
# TODO: Hmm, does this make sense?
await allFutures(dials)
if dialer.switch.peerInfo.peerId != node.switch.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
dialer.subscribePeer(node.peerInfo.peerId)
proc subscribeSparseNodes*(nodes: seq[PubSub], degree: int = 2) {.async.} =
if nodes.len < degree:
raise (ref CatchableError)(msg: "nodes count needs to be greater or equal to degree!")
for i, dialer in nodes:
if (i mod degree) != 0:
continue
for node in nodes:
if dialer.switch.peerInfo.peerId != node.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
dialer.subscribePeer(node.peerInfo.peerId)
proc subscribeRandom*(nodes: seq[PubSub]) {.async.} =
for dialer in nodes:
var dialed: seq[PeerID]
while dialed.len < nodes.len - 1:
let node = sample(nodes)
if node.peerInfo.peerId notin dialed:
if dialer.peerInfo.peerId != node.peerInfo.peerId:
await dialer.switch.connect(node.peerInfo.peerId, node.peerInfo.addrs)
dialer.subscribePeer(node.peerInfo.peerId)
dialed.add(node.peerInfo.peerId)

2
vendor/news vendored

@ -1 +1 @@
Subproject commit a3a6e3ae5ff16126942f4febe746ca4da978072b
Subproject commit e1d63564a2a411f264e75694b8f7c66e50c3a4cb

2
vendor/nim-bearssl vendored

@ -1 +1 @@
Subproject commit 33b2303fc3b64359970b77bb09274c3e012ff37f
Subproject commit ba5f4687987817902c2727e30b35cb5ad1e61203

@ -1 +1 @@
Subproject commit 3e42e9a33e3e490d9f18d8e33219f67f64f4d8a7
Subproject commit b60f70718f8039c5c86dfc2a4680d8c1e37cbce2

2
vendor/nim-chronos vendored

@ -1 +1 @@
Subproject commit 3968f09ae1cd3e8c4e73729cbccf2587ff669203
Subproject commit 21349807444b86f9ce7e2b533a3ecc690f3edc3d

2
vendor/nim-eth vendored

@ -1 +1 @@
Subproject commit ac5155394f25c2049c847e328dcfc67a01547a52
Subproject commit ea0dbb256e7f911a1ea39d8f53597c947923acae

@ -1 +1 @@
Subproject commit 5df69fc6961e58205189cd92ae2477769fa8c4c0
Subproject commit 87309f3120d4e627082171a188324d3ee14d8986

2
vendor/nim-json-rpc vendored

@ -1 +1 @@
Subproject commit 244254632b15c745b6e15537aafd401f360d9928
Subproject commit 62349c39c62986396a25dad300894899bef5f79f

@ -1 +1 @@
Subproject commit 1cf51931f1037a2c44fa0912386273c01a0e0e42
Subproject commit 1dccd4b2ef14c5e3ce30ad3f3a0962e0b98da6a3

@ -1 +1 @@
Subproject commit dc2c199d41dc90de75043d1ee4efe5e0323932bf
Subproject commit c21107e34c9f5d2c9fd5d313a6a5b554ddafa141

2
vendor/nim-libp2p vendored

@ -1 +1 @@
Subproject commit 38eb36efaee09551e0cd6c1d4530c9abfe9cb322
Subproject commit 96d4c44fec3b16a65d5ed943b85e474bfc3e1de8

2
vendor/nim-metrics vendored

@ -1 +1 @@
Subproject commit f91deb74228ecb14fb82575e4d0f387ad9732b8a
Subproject commit 77305f3b3ddfef5aff65b685effc7e7a1d1cf59c

@ -1 +1 @@
Subproject commit 2403c33929c74f2d150f50dc8bc3a598af70661a
Subproject commit eb24b2cf56958f2f5bd219abff20ebf45de9a588

@ -1 +1 @@
Subproject commit bf6cc94a3cbab16cf7ffadad11b50c52f161d0a8
Subproject commit fb9699702b44f194b5926c8ab4a004cff676b435

@ -1 +1 @@
Subproject commit 5b11c4173159a15d52f0bb6a26a1aa2b40f0f2be
Subproject commit 474bdbf49cf1634ba504888ad1a1927a2703bd3f

2
vendor/nim-stew vendored

@ -1 +1 @@
Subproject commit ec2f52b0cea1f1daa33f38ca4ba289d8f40f4104
Subproject commit 1db43c7234acb9554e3e80bf2e7b61c4cf0435cf

@ -1 +1 @@
Subproject commit e0d9939f9fce2bcbcf3451689b33bb3423fcf58e
Subproject commit a8cafce7c0d16c3c5a0d928b4a0e31207d399de0

2
vendor/nimcrypto vendored

@ -1 +1 @@
Subproject commit f767595f4ddec2b5570b5194feb96954c00a6499
Subproject commit a065c1741836462762d18d2fced1fedd46095b02

View File

@ -33,7 +33,11 @@ proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
exec "nim " & lang & " --out:build/" & name & " " & extra_params & " " & srcDir & name & ".nim"
proc test(name: string, lang = "c") =
buildBinary name, "tests/", "-d:chronicles_log_level=ERROR"
# XXX: When running `> NIM_PARAMS="-d:chronicles_log_level=INFO" make test2`
# I expect compiler flag to be overridden, however it stays with whatever is
# specified here.
buildBinary name, "tests/", "-d:chronicles_log_level=DEBUG"
#buildBinary name, "tests/", "-d:chronicles_log_level=ERROR"
exec "build/" & name
### Tasks

View File

@ -23,8 +23,7 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
# TODO: Implement symkey etc logic
rpcsrv.rpc("waku_publish") do(topic: string, payload: seq[byte]) -> bool:
# XXX Why is casting necessary here but not in Nim node API?
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
let wakuRelay = node.wakuRelay
# XXX also future return type
# TODO: Shouldn't we really be doing WakuNode publish here?
debug "waku_publish", topic=topic, payload=payload
@ -61,7 +60,8 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
var readable_str = cast[string](msg[].payload)
info "Hit subscribe handler", topic=topic, msg=msg[], payload=readable_str
node.subscribe(topic, handler)
# XXX: Can we make this context async to use await?
discard node.subscribe(topic, handler)
return true
#if not result:
# raise newException(ValueError, "Message could not be posted")

View File

@ -1,76 +0,0 @@
# compile time options here
const
libp2p_secure {.strdefine.} = ""
libp2p_pubsub_sign {.booldefine.} = true
libp2p_pubsub_verify {.booldefine.} = true
import
options, tables, chronicles, chronos,
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
libp2p/stream/connection,
libp2p/transports/[transport, tcptransport],
libp2p/muxers/[muxer, mplex/mplex, mplex/types],
libp2p/protocols/[identify, secure/secure],
libp2p/protocols/pubsub/pubsub,
../../protocol/v2/waku_relay
import
libp2p/protocols/secure/noise,
libp2p/protocols/secure/secio
export
switch, peerinfo, connection, multiaddress, crypto
type
SecureProtocol* {.pure.} = enum
Noise,
Secio
proc newStandardSwitch*(privKey = none(PrivateKey),
address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(),
triggerSelf = false,
gossip = false,
secureManagers: openarray[SecureProtocol] = [
# NOTE below relates to Eth2
# TODO investigate why we're getting fewer peers on public testnets with noise
SecureProtocol.Secio,
SecureProtocol.Noise, # array cos order matters
],
verifySignature = libp2p_pubsub_verify,
sign = libp2p_pubsub_sign,
transportFlags: set[ServerFlags] = {},
rng = newRng(),
inTimeout: Duration = 1.minutes,
outTimeout: Duration = 1.minutes): Switch =
info "newStandardSwitch"
proc createMplex(conn: Connection): Muxer =
Mplex.init(
conn,
inTimeout = inTimeout,
outTimeout = outTimeout)
let
seckey = privKey.get(otherwise = PrivateKey.random(ECDSA, rng[]).tryGet())
peerInfo = PeerInfo.init(seckey, [address])
mplexProvider = newMuxerProvider(createMplex, MplexCodec)
transports = @[Transport(TcpTransport.init(transportFlags))]
muxers = {MplexCodec: mplexProvider}.toTable
identify = newIdentify(peerInfo)
var
secureManagerInstances: seq[Secure]
for sec in secureManagers:
case sec
of SecureProtocol.Noise:
secureManagerInstances &= newNoise(rng, seckey).Secure
of SecureProtocol.Secio:
secureManagerInstances &= newSecio(rng, seckey).Secure
let pubSub = PubSub newPubSub(WakuRelay, peerInfo, triggerSelf)
result = newSwitch(peerInfo,
transports,
identify,
muxers,
secureManagers = secureManagerInstances,
pubSub = some(pubSub))

View File

@ -46,7 +46,7 @@ proc initNodeCmd(shift: int, staticNodes: seq[string] = @[], master = false, lab
info "Address", address
# TODO: Need to port shift
peerInfo.addrs.add(hostAddress)
let id = peerInfo.id
let id = $peerInfo.peerId
info "PeerInfo", id = id, addrs = peerInfo.addrs
let listenStr = $peerInfo.addrs[0] & "/p2p/" & id

View File

@ -6,7 +6,8 @@ import
std/tables,
chronos,
libp2p/[switch, peerinfo, multiaddress, crypto/crypto],
libp2p/protobuf/minprotobuf
libp2p/protobuf/minprotobuf,
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub]
# Common data types -----------------------------------------------------------
@ -31,6 +32,7 @@ type
# NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj
switch*: Switch
wakuRelay*: WakuRelay
peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage
@ -41,6 +43,9 @@ type
payload*: seq[byte]
contentTopic*: string
WakuRelay* = ref object of GossipSub
gossipEnabled*: bool
# Encoding and decoding -------------------------------------------------------
proc init*(T: type WakuMessage, buffer: seq[byte]): ProtoResult[T] =

View File

@ -9,8 +9,12 @@ import
# NOTE For TopicHandler, solve with exports?
libp2p/protocols/pubsub/pubsub,
libp2p/peerinfo,
libp2p/standard_setup,
../../protocol/v2/[waku_relay, waku_store, waku_filter],
./waku_types, ./standard_setup
./waku_types
logScope:
topics = "wakunode"
# Default clientId
const clientId* = "Nimbus Waku v2 node"
@ -67,20 +71,38 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
# XXX: Add this when we create node or start it?
peerInfo.addrs.add(hostAddress)
var switch = newStandardSwitch(some(nodekey), hostAddress, triggerSelf = true)
var switch = newStandardSwitch(some(nodekey), hostAddress)
# TODO Untested - verify behavior after switch interface change
# More like this:
# let pubsub = GossipSub.init(
# switch = switch,
# msgIdProvider = msgIdProvider,
# triggerSelf = true, sign = false,
# verifySignature = false).PubSub
let wakuRelay = WakuRelay.init(
switch = switch,
# Use default
#msgIdProvider = msgIdProvider,
triggerSelf = true,
sign = false,
verifySignature = false)
# This gets messy with: .PubSub
switch.mount(wakuRelay)
result = WakuNode(switch: switch, peerInfo: peerInfo)
result = WakuNode(switch: switch, peerInfo: peerInfo, wakuRelay: wakuRelay)
for topic in topics:
proc handler(topic: string, data: seq[byte]) {.async, gcsafe.} =
debug "Hit handler", topic=topic, data=data
result.subscribe(topic, handler)
# XXX: Is using discard here fine? Not sure if we want init to be async?
# Can also move this to the start proc, possibly wiser?
discard result.subscribe(topic, handler)
proc start*(node: WakuNode) {.async.} =
node.libp2pTransportLoops = await node.switch.start()
# NOTE WakuRelay is being instantiated as part of creating switch with PubSub field set
# NOTE WakuRelay is being instantiated as part of initing node
let storeProto = WakuStore.init()
node.switch.mount(storeProto)
@ -89,37 +111,37 @@ proc start*(node: WakuNode) {.async.} =
# TODO Get this from WakuNode obj
let peerInfo = node.peerInfo
let id = peerInfo.peerId.pretty
info "PeerInfo", id = id, addrs = peerInfo.addrs
let listenStr = $peerInfo.addrs[0] & "/p2p/" & id
info "PeerInfo", peerId = peerInfo.peerId, addrs = peerInfo.addrs
let listenStr = $peerInfo.addrs[0] & "/p2p/" & $peerInfo.peerId
## XXX: this should be /ip4..., / stripped?
info "Listening on", full = listenStr
proc stop*(node: WakuNode) {.async.} =
let wakuRelay = node.switch.pubSub.get()
let wakuRelay = node.wakuRelay
await wakuRelay.stop()
await node.switch.stop()
proc subscribe*(w: WakuNode, topic: Topic, handler: TopicHandler) =
proc subscribe*(node: WakuNode, topic: Topic, handler: TopicHandler) {.async.} =
## Subscribes to a PubSub topic. Triggers handler when receiving messages on
## this topic. TopicHandler is a method that takes a topic and some data.
##
## NOTE The data field SHOULD be decoded as a WakuMessage.
## Status: Implemented.
info "subscribe", topic=topic
let wakuRelay = w.switch.pubSub.get()
# XXX Consider awaiting here
discard wakuRelay.subscribe(topic, handler)
let wakuRelay = node.wakuRelay
await wakuRelay.subscribe(topic, handler)
proc subscribe*(w: WakuNode, contentFilter: waku_types.ContentFilter, handler: ContentFilterHandler) =
proc subscribe*(node: WakuNode, contentFilter: waku_types.ContentFilter, handler: ContentFilterHandler) {.async.} =
## Subscribes to a ContentFilter. Triggers handler when receiving messages on
## this content filter. ContentFilter is a method that takes some content
## filter, specifically with `ContentTopic`, and a `Message`. The `Message`
## has to match the `ContentTopic`.
info "subscribe content", contentFilter=contentFilter
# TODO: get some random id, or use the Filter directly as key
w.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler))
node.filters.add("some random id", Filter(contentFilter: contentFilter, handler: handler))
proc unsubscribe*(w: WakuNode, topic: Topic) =
echo "NYI"
@ -144,8 +166,7 @@ proc publish*(node: WakuNode, topic: Topic, message: WakuMessage) =
## Status: Implemented.
##
# TODO Basic getter function for relay
let wakuRelay = cast[WakuRelay](node.switch.pubSub.get())
let wakuRelay = node.wakuRelay
# XXX Unclear what the purpose of this is
# Commenting out as it is later expected to be Message type, not WakuMessage

View File

@ -14,6 +14,9 @@ import
# should be direct payload exchange (a la req-resp), not be coupled with the
# relay protocol.
logScope:
topics = "wakufilter"
const
WakuFilterCodec* = "/vac/waku/filter/2.0.0-alpha5"

View File

@ -8,19 +8,16 @@ import
chronos, chronicles, metrics,
libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub],
libp2p/protocols/pubsub/rpc/messages,
libp2p/stream/connection
libp2p/stream/connection,
../../node/v2/waku_types
declarePublicGauge total_messages, "number of messages received"
logScope:
topic = "WakuRelay"
topics = "wakurelay"
const WakuRelayCodec* = "/vac/waku/relay/2.0.0-alpha2"
type
WakuRelay* = ref object of GossipSub
gossipEnabled*: bool
method init*(w: WakuRelay) =
debug "init"
proc handler(conn: Connection, proto: string) {.async.} =
@ -40,7 +37,8 @@ method initPubSub*(w: WakuRelay) =
debug "initWakuRelay"
# Not using GossipSub
w.gossipEnabled = false
# XXX: FloodSub subscribe doesn't work
w.gossipEnabled = true
if w.gossipEnabled:
procCall GossipSub(w).initPubSub()

View File

@ -7,6 +7,9 @@ import
./message_notifier,
./../../node/v2/waku_types
logScope:
topics = "wakustore"
const
WakuStoreCodec* = "/vac/waku/store/2.0.0-alpha5"