diff --git a/tests/v2/standard_setup.nim b/tests/v2/standard_setup.nim index c99588ced..e69d1a732 100644 --- a/tests/v2/standard_setup.nim +++ b/tests/v2/standard_setup.nim @@ -24,7 +24,6 @@ export proc newStandardSwitch*(privKey = none(PrivateKey), address = MultiAddress.init("/ip4/127.0.0.1/tcp/0").tryGet(), triggerSelf = false, - gossip = false, verifySignature = libp2p_pubsub_verify, sign = libp2p_pubsub_sign, transportFlags: set[ServerFlags] = {}): Switch = @@ -43,14 +42,7 @@ proc newStandardSwitch*(privKey = none(PrivateKey), let secureManagers = {NoiseCodec: newNoise(seckey).Secure}.toTable else: let secureManagers = {SecioCodec: newSecio(seckey).Secure}.toTable - let pubSub = if gossip: - PubSub newPubSub(GossipSub, peerInfo, triggerSelf) - else: - # Creating switch from generate node - # XXX: Hacky test, hijacking WakuSub here - debug "Using WakuSub here" - #PubSub newPubSub(FloodSub, peerInfo, triggerSelf) - PubSub newPubSub(WakuSub, peerInfo, triggerSelf) + let pubSub = PubSub newPubSub(WakuSub, peerInfo, triggerSelf) result = newSwitch(peerInfo, transports, diff --git a/waku/node/v2/wakunode.nim b/waku/node/v2/wakunode.nim index 47c2b0333..ec952afc2 100644 --- a/waku/node/v2/wakunode.nim +++ b/waku/node/v2/wakunode.nim @@ -141,7 +141,7 @@ proc run(config: WakuNodeConf) = # switch.pubsub = wakusub, plus all the peer info etc # And it has wakuProto lets use wakuProto maybe, cause it has switch - var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true, gossip = false) + var switch = newStandardSwitch(some keys.seckey, hostAddress, triggerSelf = true) let wakuProto = newWakuProto(switch) switch.mount(wakuProto) diff --git a/waku/protocol/v2/waku_protocol.nim b/waku/protocol/v2/waku_protocol.nim index 5f0752a96..d8dbf134e 100644 --- a/waku/protocol/v2/waku_protocol.nim +++ b/waku/protocol/v2/waku_protocol.nim @@ -7,6 +7,7 @@ import strutils import chronos, chronicles import libp2p/protocols/pubsub/pubsub, libp2p/protocols/pubsub/pubsubpeer, + libp2p/protocols/pubsub/floodsub, libp2p/protocols/pubsub/gossipsub, libp2p/protocols/pubsub/rpc/[messages], libp2p/connection @@ -28,6 +29,7 @@ type WakuSub* = ref object of GossipSub # XXX: just playing text*: string + gossip_enabled*: bool method init(w: WakuSub) = debug "init" @@ -51,8 +53,14 @@ method initPubSub*(w: WakuSub) = debug "initWakuSub" w.text = "Foobar" debug "w.text", text = w.text - # XXX - procCall GossipSub(w).initPubSub() + + w.gossip_enabled = true + + if w.gossip_enabled: + procCall GossipSub(w).initPubSub() + else: + procCall FloodSub(w).initPubSub() + w.init() method subscribe*(w: WakuSub, @@ -60,7 +68,11 @@ method subscribe*(w: WakuSub, handler: TopicHandler) {.async.} = debug "subscribe", topic=topic # XXX: Pubsub really - await procCall GossipSub(w).subscribe(topic, handler) + + if w.gossip_enabled: + await procCall GossipSub(w).subscribe(topic, handler) + else: + await procCall FloodSub(w).subscribe(topic, handler) # Subscribing a peer to a specified topic method subscribeTopic*(w: WakuSub, @@ -68,7 +80,11 @@ method subscribeTopic*(w: WakuSub, subscribe: bool, peerId: string) {.async, gcsafe.} = debug "subscribeTopic", topic=topic, subscribe=subscribe, peerId=peerId - await procCall GossipSub(w).subscribeTopic(topic, subscribe, peerId) + + if w.gossip_enabled: + await procCall GossipSub(w).subscribeTopic(topic, subscribe, peerId) + else: + await procCall FloodSub(w).subscribeTopic(topic, subscribe, peerId) # TODO: Fix decrement connected peers here or somewhere else method handleDisconnect*(w: WakuSub, peer: PubSubPeer) {.async.} = @@ -82,24 +98,41 @@ method rpcHandler*(w: WakuSub, # XXX: Right place? total_messages.inc() - await procCall GossipSub(w).rpcHandler(peer, rpcMsgs) + + if w.gossip_enabled: + await procCall GossipSub(w).rpcHandler(peer, rpcMsgs) + else: + await procCall FloodSub(w).rpcHandler(peer, rpcMsgs) method publish*(w: WakuSub, topic: string, data: seq[byte]) {.async.} = debug "publish", topic=topic - await procCall GossipSub(w).publish(topic, data) + + if w.gossip_enabled: + await procCall GossipSub(w).publish(topic, data) + else: + await procCall FloodSub(w).publish(topic, data) method unsubscribe*(w: WakuSub, topics: seq[TopicPair]) {.async.} = debug "unsubscribe" - await procCall GossipSub(w).unsubscribe(topics) + if w.gossip_enabled: + await procCall GossipSub(w).unsubscribe(topics) + else: + await procCall FloodSub(w).unsubscribe(topics) # GossipSub specific methods method start*(w: WakuSub) {.async.} = debug "start" - await procCall GossipSub(w).start() + if w.gossip_enabled: + await procCall GossipSub(w).start() + else: + await procCall FloodSub(w).start() method stop*(w: WakuSub) {.async.} = debug "stop" - await procCall GossipSub(w).stop() + if w.gossip_enabled: + await procCall GossipSub(w).stop() + else: + await procCall FloodSub(w).stop()