From 3a3139e4cfbb5ae9500fd30b2e79c676ccc4a53b Mon Sep 17 00:00:00 2001 From: Oskar Thoren Date: Tue, 15 Sep 2020 16:44:07 +0800 Subject: [PATCH] XXX: Try to use .PubSub on WakuRelay --- waku/node/v2/rpc/wakurpc.nim | 3 +- waku/node/v2/waku_types.nim | 4 ++- waku/node/v2/wakunode2.nim | 23 ++++++------- waku/protocol/v2/waku_relay.nim | 58 +++++++++++++++++++-------------- 4 files changed, 51 insertions(+), 37 deletions(-) diff --git a/waku/node/v2/rpc/wakurpc.nim b/waku/node/v2/rpc/wakurpc.nim index 93031eda1..f322bdf65 100644 --- a/waku/node/v2/rpc/wakurpc.nim +++ b/waku/node/v2/rpc/wakurpc.nim @@ -27,7 +27,8 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) = # XXX also future return type # TODO: Shouldn't we really be doing WakuNode publish here? debug "waku_publish", topic=topic, payload=payload - discard wakuRelay.publish(topic, payload) + # XXX Doesn't work, remove this - using waku_publish2 instead now, except for simulation + #discard wakuRelay.publish(topic, payload) return true #if not result: # raise newException(ValueError, "Message could not be posted") diff --git a/waku/node/v2/waku_types.nim b/waku/node/v2/waku_types.nim index fba109045..ba3a0e203 100644 --- a/waku/node/v2/waku_types.nim +++ b/waku/node/v2/waku_types.nim @@ -32,7 +32,9 @@ type # NOTE based on Eth2Node in NBC eth2_network.nim WakuNode* = ref object of RootObj switch*: Switch - wakuRelay*: WakuRelay + #XXX + #wakuRelay*: WakuRelay + wakuRelay*: PubSub peerInfo*: PeerInfo libp2pTransportLoops*: seq[Future[void]] # TODO Revist messages field indexing as well as if this should be Message or WakuMessage diff --git a/waku/node/v2/wakunode2.nim b/waku/node/v2/wakunode2.nim index 5eb220ed8..15a100e24 100644 --- a/waku/node/v2/wakunode2.nim +++ b/waku/node/v2/wakunode2.nim @@ -7,7 +7,7 @@ import libp2p/crypto/crypto, libp2p/protocols/protocol, # NOTE For TopicHandler, solve with exports? - libp2p/protocols/pubsub/pubsub, + libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub], libp2p/peerinfo, libp2p/standard_setup, ../../protocol/v2/[waku_relay, waku_store, waku_filter], @@ -73,22 +73,23 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey, var switch = newStandardSwitch(some(nodekey), hostAddress) # TODO Untested - verify behavior after switch interface change - # More like this: - # let pubsub = GossipSub.init( - # switch = switch, - # msgIdProvider = msgIdProvider, - # triggerSelf = true, sign = false, - # verifySignature = false).PubSub let wakuRelay = WakuRelay.init( switch = switch, - # Use default - #msgIdProvider = msgIdProvider, triggerSelf = true, sign = false, - verifySignature = false) - # This gets messy with: .PubSub + verifySignature = false).PubSub switch.mount(wakuRelay) + #NBC + #let pubsub = GossipSub.init( + # switch = switch, + # msgIdProvider = msgIdProvider, + # triggerSelf = true, sign = false, + # verifySignature = false).PubSub + #switch.mount(pubsub) + + + # XXX: pubSub and wakuRelay a bit confusing here result = WakuNode(switch: switch, peerInfo: peerInfo, wakuRelay: wakuRelay) for topic in topics: diff --git a/waku/protocol/v2/waku_relay.nim b/waku/protocol/v2/waku_relay.nim index 6ff809da6..c6ac398be 100644 --- a/waku/protocol/v2/waku_relay.nim +++ b/waku/protocol/v2/waku_relay.nim @@ -39,10 +39,12 @@ method initPubSub*(w: WakuRelay) = # Not using GossipSub w.gossipEnabled = false - if w.gossipEnabled: - procCall GossipSub(w).initPubSub() - else: - procCall FloodSub(w).initPubSub() + # XXX: Unclear how we toggle with gossip/flood here? + #if w.gossipEnabled: + # procCall GossipSub(w).initPubSub() + #else: + # procCall FloodSub(w).initPubSub() + procCall PubSub(w).initPubSub() w.init() @@ -50,10 +52,12 @@ method subscribe*(w: WakuRelay, pubSubTopic: string, handler: TopicHandler) {.async.} = debug "subscribe", pubSubTopic=pubSubTopic - if w.gossipEnabled: - await procCall GossipSub(w).subscribe(pubSubTopic, handler) - else: - await procCall FloodSub(w).subscribe(pubSubTopic, handler) + + await procCall PubSub(w).subscribe(pubSubTopic, handler) + #if w.gossipEnabled: + # await procCall GossipSub(w).subscribe(pubSubTopic, handler) + #else: + # await procCall FloodSub(w).subscribe(pubSubTopic, handler) method publish*(w: WakuRelay, pubSubTopic: string, @@ -61,30 +65,36 @@ method publish*(w: WakuRelay, ): Future[int] {.async.} = debug "publish", pubSubTopic=pubSubTopic, message=message - if w.gossipEnabled: - return await procCall GossipSub(w).publish(pubSubTopic, message) - else: - return await procCall FloodSub(w).publish(pubSubTopic, message) + return await procCall PubSub(w).publish(pubSubTopic, message) + #if w.gossipEnabled: + # return await procCall GossipSub(w).publish(pubSubTopic, message) + #else: + # return await procCall FloodSub(w).publish(pubSubTopic, message) method unsubscribe*(w: WakuRelay, topics: seq[TopicPair]) {.async.} = debug "unsubscribe" - if w.gossipEnabled: - await procCall GossipSub(w).unsubscribe(topics) - else: - await procCall FloodSub(w).unsubscribe(topics) + await procCall PubSub(w).unsubscribe(topics) + #if w.gossipEnabled: + # await procCall GossipSub(w).unsubscribe(topics) + #else: + # await procCall FloodSub(w).unsubscribe(topics) # GossipSub specific methods -------------------------------------------------- method start*(w: WakuRelay) {.async.} = debug "start" - if w.gossipEnabled: - await procCall GossipSub(w).start() - else: - await procCall FloodSub(w).start() + # XXX: This is in GossipSub + await procCall PubSub(w).start() +# if w.gossipEnabled: +# await procCall GossipSub(w).start() +# else: +# await procCall FloodSub(w).start() method stop*(w: WakuRelay) {.async.} = debug "stop" - if w.gossipEnabled: - await procCall GossipSub(w).stop() - else: - await procCall FloodSub(w).stop() + # XXX: This is in GossipSub + await procCall PubSub(w).start() +# if w.gossipEnabled: +# await procCall GossipSub(w).stop() +# else: +# await procCall FloodSub(w).stop()