refactor(c-bindings): do not subscribe automatically to default waku topic

This commit is contained in:
Richard Ramos 2023-01-25 16:12:43 -04:00 committed by RichΛrd
parent b8ac7f4c0c
commit 6c989fb178
5 changed files with 53 additions and 20 deletions

View File

@ -271,6 +271,7 @@ interface JsonConfig {
nodeKey?: string;
keepAliveInterval?: number;
relay?: boolean;
relayTopics?: Array<string>;
minPeersToPublish?: number;
filter?: boolean;
discV5?: boolean;
@ -299,6 +300,8 @@ If a key is `undefined`, or `null`, a default value will be set.
Default `20`.
- `relay`: Enable relay protocol.
Default `true`.
- `relayTopics`: Array of pubsub topics that WakuRelay will automatically subscribe to when the node starts
Default `[]`
- `minPeersToPublish`: The minimum number of peers required on a topic to allow broadcasting a message.
Default `0`.
- `filter`: Enable filter protocol.

View File

@ -27,6 +27,7 @@ func main() {}
// - nodeKey: secp256k1 private key. Default random
// - keepAliveInterval: interval in seconds to ping all peers
// - relay: Enable WakuRelay. Default `true`
// - relayTopics: Array of pubsub topics that WakuRelay will automatically subscribe to when the node starts
// - minPeersToPublish: The minimum number of peers required on a topic to allow broadcasting a message. Default `0`
// - filter: Enable Filter. Default `false`
// - discV5: Enable DiscoveryV5. Default `false`

View File

@ -30,6 +30,7 @@ import (
)
var wakuNode *node.WakuNode
var wakuRelayTopics []string
var wakuStarted = false
var errWakuNodeNotReady = errors.New("go-waku not initialized")
@ -50,6 +51,7 @@ type wakuConfig struct {
LogLevel *string `json:"logLevel,omitempty"`
KeepAliveInterval *int `json:"keepAliveInterval,omitempty"`
EnableRelay *bool `json:"relay"`
RelayTopics []string `json:"relayTopics,omitempty"`
EnableFilter *bool `json:"filter"`
MinPeersToPublish *int `json:"minPeersToPublish"`
EnableDiscV5 *bool `json:"discV5"`
@ -155,6 +157,7 @@ func NewNode(configJSON string) string {
node.WithPrivateKey(prvKey),
node.WithHostAddress(hostAddr),
node.WithKeepAlive(time.Duration(*config.KeepAliveInterval) * time.Second),
node.NoDefaultWakuTopic(),
}
if *config.EnableRelay {
@ -177,6 +180,8 @@ func NewNode(configJSON string) string {
opts = append(opts, node.WithDiscoveryV5(*config.DiscV5UDPPort, bootnodes, true))
}
wakuRelayTopics = config.RelayTopics
// for go-libp2p loggers
lvl, err := logging.LevelFromString(*config.LogLevel)
if err != nil {
@ -211,6 +216,16 @@ func Start() string {
}
}
for _, topic := range wakuRelayTopics {
topic := topic
sub, err := wakuNode.Relay().SubscribeToTopic(ctx, topic)
if err != nil {
wakuNode.Stop()
return MakeJSONResponse(fmt.Errorf("could not subscribe to topic: %s, %w", topic, err))
}
wakuNode.Broadcaster().Unregister(&topic, sub.C)
}
wakuStarted = true
return MakeJSONResponse(nil)

View File

@ -308,12 +308,14 @@ func (w *WakuNode) Start(ctx context.Context) error {
return err
}
sub, err := w.Relay().Subscribe(ctx)
if err != nil {
return err
}
if !w.opts.noDefaultWakuTopic {
sub, err := w.Relay().Subscribe(ctx)
if err != nil {
return err
}
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
w.Broadcaster().Unregister(&relay.DefaultWakuTopic, sub.C)
}
}
w.store = w.storeFactory(w)
@ -573,17 +575,19 @@ func (w *WakuNode) startStore(ctx context.Context) error {
peerIDs = append(peerIDs, pID)
}
w.wg.Add(1)
go func() {
defer w.wg.Done()
if !w.opts.noDefaultWakuTopic {
w.wg.Add(1)
go func() {
defer w.wg.Done()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
}()
ctxWithTimeout, ctxCancel := context.WithTimeout(ctx, 20*time.Second)
defer ctxCancel()
if _, err := w.store.(store.Store).Resume(ctxWithTimeout, string(relay.DefaultWakuTopic), peerIDs); err != nil {
w.log.Error("Could not resume history", zap.Error(err))
time.Sleep(10 * time.Second)
}
}()
}
}
return nil
}

View File

@ -57,11 +57,12 @@ type WakuNodeParameters struct {
logger *zap.Logger
enableRelay bool
enableFilter bool
isFilterFullNode bool
filterOpts []filter.Option
wOpts []pubsub.Option
noDefaultWakuTopic bool
enableRelay bool
enableFilter bool
isFilterFullNode bool
filterOpts []filter.Option
wOpts []pubsub.Option
minRelayPeersToPublish int
@ -266,6 +267,15 @@ func WithLibP2POptions(opts ...libp2p.Option) WakuNodeOption {
}
}
// NoDefaultWakuTopic will stop the node from subscribing to the default
// pubsub topic automatically
func NoDefaultWakuTopic() WakuNodeOption {
return func(params *WakuNodeParameters) error {
params.noDefaultWakuTopic = true
return nil
}
}
// WithWakuRelay enables the Waku V2 Relay protocol. This WakuNodeOption
// accepts a list of WakuRelay gossipsub option to setup the protocol
func WithWakuRelay(opts ...pubsub.Option) WakuNodeOption {