XXX: Try to use .PubSub on WakuRelay

This commit is contained in:
Oskar Thoren 2020-09-15 16:44:07 +08:00
parent 37c24a2634
commit 3a3139e4cf
No known key found for this signature in database
GPG Key ID: B2ECCFD3BC2EF77E
4 changed files with 51 additions and 37 deletions

View File

@ -27,7 +27,8 @@ proc setupWakuRPC*(node: WakuNode, rpcsrv: RpcServer) =
# XXX also future return type # XXX also future return type
# TODO: Shouldn't we really be doing WakuNode publish here? # TODO: Shouldn't we really be doing WakuNode publish here?
debug "waku_publish", topic=topic, payload=payload debug "waku_publish", topic=topic, payload=payload
discard wakuRelay.publish(topic, payload) # XXX Doesn't work, remove this - using waku_publish2 instead now, except for simulation
#discard wakuRelay.publish(topic, payload)
return true return true
#if not result: #if not result:
# raise newException(ValueError, "Message could not be posted") # raise newException(ValueError, "Message could not be posted")

View File

@ -32,7 +32,9 @@ type
# NOTE based on Eth2Node in NBC eth2_network.nim # NOTE based on Eth2Node in NBC eth2_network.nim
WakuNode* = ref object of RootObj WakuNode* = ref object of RootObj
switch*: Switch switch*: Switch
wakuRelay*: WakuRelay #XXX
#wakuRelay*: WakuRelay
wakuRelay*: PubSub
peerInfo*: PeerInfo peerInfo*: PeerInfo
libp2pTransportLoops*: seq[Future[void]] libp2pTransportLoops*: seq[Future[void]]
# TODO Revist messages field indexing as well as if this should be Message or WakuMessage # TODO Revist messages field indexing as well as if this should be Message or WakuMessage

View File

@ -7,7 +7,7 @@ import
libp2p/crypto/crypto, libp2p/crypto/crypto,
libp2p/protocols/protocol, libp2p/protocols/protocol,
# NOTE For TopicHandler, solve with exports? # NOTE For TopicHandler, solve with exports?
libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/[pubsub, floodsub, gossipsub],
libp2p/peerinfo, libp2p/peerinfo,
libp2p/standard_setup, libp2p/standard_setup,
../../protocol/v2/[waku_relay, waku_store, waku_filter], ../../protocol/v2/[waku_relay, waku_store, waku_filter],
@ -73,22 +73,23 @@ proc init*(T: type WakuNode, nodeKey: crypto.PrivateKey,
var switch = newStandardSwitch(some(nodekey), hostAddress) var switch = newStandardSwitch(some(nodekey), hostAddress)
# TODO Untested - verify behavior after switch interface change # TODO Untested - verify behavior after switch interface change
# More like this: let wakuRelay = WakuRelay.init(
switch = switch,
triggerSelf = true,
sign = false,
verifySignature = false).PubSub
switch.mount(wakuRelay)
#NBC
#let pubsub = GossipSub.init( #let pubsub = GossipSub.init(
# switch = switch, # switch = switch,
# msgIdProvider = msgIdProvider, # msgIdProvider = msgIdProvider,
# triggerSelf = true, sign = false, # triggerSelf = true, sign = false,
# verifySignature = false).PubSub # verifySignature = false).PubSub
let wakuRelay = WakuRelay.init( #switch.mount(pubsub)
switch = switch,
# Use default
#msgIdProvider = msgIdProvider,
triggerSelf = true,
sign = false,
verifySignature = false)
# This gets messy with: .PubSub
switch.mount(wakuRelay)
# XXX: pubSub and wakuRelay a bit confusing here
result = WakuNode(switch: switch, peerInfo: peerInfo, wakuRelay: wakuRelay) result = WakuNode(switch: switch, peerInfo: peerInfo, wakuRelay: wakuRelay)
for topic in topics: for topic in topics:

View File

@ -39,10 +39,12 @@ method initPubSub*(w: WakuRelay) =
# Not using GossipSub # Not using GossipSub
w.gossipEnabled = false w.gossipEnabled = false
if w.gossipEnabled: # XXX: Unclear how we toggle with gossip/flood here?
procCall GossipSub(w).initPubSub() #if w.gossipEnabled:
else: # procCall GossipSub(w).initPubSub()
procCall FloodSub(w).initPubSub() #else:
# procCall FloodSub(w).initPubSub()
procCall PubSub(w).initPubSub()
w.init() w.init()
@ -50,10 +52,12 @@ method subscribe*(w: WakuRelay,
pubSubTopic: string, pubSubTopic: string,
handler: TopicHandler) {.async.} = handler: TopicHandler) {.async.} =
debug "subscribe", pubSubTopic=pubSubTopic debug "subscribe", pubSubTopic=pubSubTopic
if w.gossipEnabled:
await procCall GossipSub(w).subscribe(pubSubTopic, handler) await procCall PubSub(w).subscribe(pubSubTopic, handler)
else: #if w.gossipEnabled:
await procCall FloodSub(w).subscribe(pubSubTopic, handler) # await procCall GossipSub(w).subscribe(pubSubTopic, handler)
#else:
# await procCall FloodSub(w).subscribe(pubSubTopic, handler)
method publish*(w: WakuRelay, method publish*(w: WakuRelay,
pubSubTopic: string, pubSubTopic: string,
@ -61,30 +65,36 @@ method publish*(w: WakuRelay,
): Future[int] {.async.} = ): Future[int] {.async.} =
debug "publish", pubSubTopic=pubSubTopic, message=message debug "publish", pubSubTopic=pubSubTopic, message=message
if w.gossipEnabled: return await procCall PubSub(w).publish(pubSubTopic, message)
return await procCall GossipSub(w).publish(pubSubTopic, message) #if w.gossipEnabled:
else: # return await procCall GossipSub(w).publish(pubSubTopic, message)
return await procCall FloodSub(w).publish(pubSubTopic, message) #else:
# return await procCall FloodSub(w).publish(pubSubTopic, message)
method unsubscribe*(w: WakuRelay, method unsubscribe*(w: WakuRelay,
topics: seq[TopicPair]) {.async.} = topics: seq[TopicPair]) {.async.} =
debug "unsubscribe" debug "unsubscribe"
if w.gossipEnabled: await procCall PubSub(w).unsubscribe(topics)
await procCall GossipSub(w).unsubscribe(topics) #if w.gossipEnabled:
else: # await procCall GossipSub(w).unsubscribe(topics)
await procCall FloodSub(w).unsubscribe(topics) #else:
# await procCall FloodSub(w).unsubscribe(topics)
# GossipSub specific methods -------------------------------------------------- # GossipSub specific methods --------------------------------------------------
method start*(w: WakuRelay) {.async.} = method start*(w: WakuRelay) {.async.} =
debug "start" debug "start"
if w.gossipEnabled: # XXX: This is in GossipSub
await procCall GossipSub(w).start() await procCall PubSub(w).start()
else: # if w.gossipEnabled:
await procCall FloodSub(w).start() # await procCall GossipSub(w).start()
# else:
# await procCall FloodSub(w).start()
method stop*(w: WakuRelay) {.async.} = method stop*(w: WakuRelay) {.async.} =
debug "stop" debug "stop"
if w.gossipEnabled: # XXX: This is in GossipSub
await procCall GossipSub(w).stop() await procCall PubSub(w).start()
else: # if w.gossipEnabled:
await procCall FloodSub(w).stop() # await procCall GossipSub(w).stop()
# else:
# await procCall FloodSub(w).stop()