From 6c989fb1780159aacd8ec210ac4d7a33f40005df Mon Sep 17 00:00:00 2001 From: Richard Ramos Date: Wed, 25 Jan 2023 16:12:43 -0400 Subject: [PATCH] refactor(c-bindings): do not subscribe automatically to default waku topic --- library/README.md | 3 +++ library/api.go | 1 + mobile/api.go | 15 +++++++++++++++ waku/v2/node/wakunode2.go | 34 +++++++++++++++++++--------------- waku/v2/node/wakuoptions.go | 20 +++++++++++++++----- 5 files changed, 53 insertions(+), 20 deletions(-) diff --git a/library/README.md b/library/README.md index 2df40ef7..7c03d5cc 100644 --- a/library/README.md +++ b/library/README.md @@ -271,6 +271,7 @@ interface JsonConfig { nodeKey?: string; keepAliveInterval?: number; relay?: boolean; + relayTopics?: Array; minPeersToPublish?: number; filter?: boolean; discV5?: boolean; @@ -299,6 +300,8 @@ If a key is `undefined`, or `null`, a default value will be set. Default `20`. - `relay`: Enable relay protocol. Default `true`. +- `relayTopics`: Array of pubsub topics that WakuRelay will automatically subscribe to when the node starts + Default `[]` - `minPeersToPublish`: The minimum number of peers required on a topic to allow broadcasting a message. Default `0`. - `filter`: Enable filter protocol. diff --git a/library/api.go b/library/api.go index 4af5a066..86e4807f 100644 --- a/library/api.go +++ b/library/api.go @@ -27,6 +27,7 @@ func main() {} // - nodeKey: secp256k1 private key. Default random // - keepAliveInterval: interval in seconds to ping all peers // - relay: Enable WakuRelay. Default `true` +// - relayTopics: Array of pubsub topics that WakuRelay will automatically subscribe to when the node starts // - minPeersToPublish: The minimum number of peers required on a topic to allow broadcasting a message. Default `0` // - filter: Enable Filter. Default `false` // - discV5: Enable DiscoveryV5. Default `false` diff --git a/mobile/api.go b/mobile/api.go index 98cc46ae..4cd8997b 100644 --- a/mobile/api.go +++ b/mobile/api.go @@ -30,6 +30,7 @@ import ( ) var wakuNode *node.WakuNode +var wakuRelayTopics []string var wakuStarted = false var errWakuNodeNotReady = errors.New("go-waku not initialized") @@ -50,6 +51,7 @@ type wakuConfig struct { LogLevel *string `json:"logLevel,omitempty"` KeepAliveInterval *int `json:"keepAliveInterval,omitempty"` EnableRelay *bool `json:"relay"` + RelayTopics []string `json:"relayTopics,omitempty"` EnableFilter *bool `json:"filter"` MinPeersToPublish *int `json:"minPeersToPublish"` EnableDiscV5 *bool `json:"discV5"` @@ -155,6 +157,7 @@ func NewNode(configJSON string) string { node.WithPrivateKey(prvKey), node.WithHostAddress(hostAddr), node.WithKeepAlive(time.Duration(*config.KeepAliveInterval) * time.Second), + node.NoDefaultWakuTopic(), } if *config.EnableRelay { @@ -177,6 +180,8 @@ func NewNode(configJSON string) string { opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true)) } + wakuRelayTopics = config.RelayTopics + // for go-libp2p loggers lvl, err := logging.LevelFromString(*config.LogLevel) if err != nil { @@ -211,6 +216,16 @@ func Start() string { } } + for _, topic := range wakuRelayTopics { + topic := topic + sub, err := wakuNode.Relay().SubscribeToTopic(ctx, topic) + if err != nil { + wakuNode.Stop() + return MakeJSONResponse(fmt.Errorf("could not subscribe to topic: %s, %w", topic, err)) + } + wakuNode.Broadcaster().Unregister(&topic, sub.C) + } + wakuStarted = true return MakeJSONResponse(nil) diff --git a/waku/v2/node/wakunode2.go b/waku/v2/node/wakunode2.go index 4b3c263e..12412c0c 100644 --- a/waku/v2/node/wakunode2.go +++ b/waku/v2/node/wakunode2.go @@ -308,12 +308,14 @@ func (w *WakuNode) Start(ctx context.Context) error { return err } - sub, err := w.Relay().Subscribe(ctx) - if err != nil { - return err - } + if !w.opts.noDefaultWakuTopic { + sub, err := w.Relay().Subscribe(ctx) + if err != nil { + return err + } - w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C) + w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C) + } } w.store = w.storeFactory(w) @@ -573,17 +575,19 @@ func (w *WakuNode) startStore(ctx context.Context) error { peerIDs = append(peerIDs, pID) } - w.wg.Add(1) - go func() { - defer w.wg.Done() + if !w.opts.noDefaultWakuTopic { + w.wg.Add(1) + go func() { + defer w.wg.Done() - ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second) - defer ctxCancel() - if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil { - w.log.Error("Could not resume history", zap.Error(err)) - time.Sleep(10 * time.Second) - } - }() + ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second) + defer ctxCancel() + if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil { + w.log.Error("Could not resume history", zap.Error(err)) + time.Sleep(10 * time.Second) + } + }() + } } return nil } diff --git a/waku/v2/node/wakuoptions.go b/waku/v2/node/wakuoptions.go index 71cdd6d3..c1655023 100644 --- a/waku/v2/node/wakuoptions.go +++ b/waku/v2/node/wakuoptions.go @@ -57,11 +57,12 @@ type WakuNodeParameters struct { logger *zap.Logger - enableRelay bool - enableFilter bool - isFilterFullNode bool - filterOpts []filter.Option - wOpts []pubsub.Option + noDefaultWakuTopic bool + enableRelay bool + enableFilter bool + isFilterFullNode bool + filterOpts []filter.Option + wOpts []pubsub.Option minRelayPeersToPublish int @@ -266,6 +267,15 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption { } } +// NoDefaultWakuTopic will stop the node from subscribing to the default +// pubsub topic automatically +func NoDefaultWakuTopic() WakuNodeOption { + return func(params *WakuNodeParameters) error { + params.noDefaultWakuTopic = true + return nil + } +} + // WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption // accepts a list of WakuRelay gossipsub option to setup the protocol func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {