fix(waku2): use a cancelable context for initial bootnode discovery
This commit is contained in:
parent
a51f8aa13c
commit
89385cfcaf
|
@ -205,6 +205,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
|
||||
logger.Info("starting wakuv2 with config", zap.Any("config", cfg))
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
waku := &Waku{
|
||||
appDB: appDB,
|
||||
cfg: cfg,
|
||||
|
@ -216,6 +218,8 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
sendQueue: make(chan *protocol.Envelope, 1000),
|
||||
connStatusChan: make(chan node.ConnStatus, 100),
|
||||
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
wg: sync.WaitGroup{},
|
||||
dnsAddressCache: make(map[string][]dnsdisc.DiscoveredNode),
|
||||
dnsAddressCacheLock: &sync.RWMutex{},
|
||||
|
@ -284,7 +288,7 @@ func New(nodeKey string, fleet string, cfg *Config, logger *zap.Logger, appDB *s
|
|||
}
|
||||
|
||||
if cfg.EnableDiscV5 {
|
||||
bootnodes, err := waku.getDiscV5BootstrapNodes(context.Background(), cfg.DiscV5BootstrapNodes)
|
||||
bootnodes, err := waku.getDiscV5BootstrapNodes(waku.ctx, cfg.DiscV5BootstrapNodes)
|
||||
if err != nil {
|
||||
logger.Error("failed to get bootstrap nodes", zap.Error(err))
|
||||
return nil, err
|
||||
|
@ -1018,10 +1022,10 @@ func (w *Waku) broadcast() {
|
|||
logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().Timestamp))
|
||||
if w.settings.LightClient {
|
||||
logger.Info("publishing message via lightpush")
|
||||
_, err = w.node.Lightpush().Publish(context.Background(), envelope.Message(), lightpush.WithPubSubTopic(pubsubTopic))
|
||||
_, err = w.node.Lightpush().Publish(w.ctx, envelope.Message(), lightpush.WithPubSubTopic(pubsubTopic))
|
||||
} else {
|
||||
logger.Info("publishing message via relay")
|
||||
_, err = w.node.Relay().Publish(context.Background(), envelope.Message(), relay.WithPubSubTopic(pubsubTopic))
|
||||
_, err = w.node.Relay().Publish(w.ctx, envelope.Message(), relay.WithPubSubTopic(pubsubTopic))
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
|
@ -1145,11 +1149,7 @@ func (w *Waku) Start() error {
|
|||
|
||||
w.connectionChanged = make(chan struct{})
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
w.ctx = ctx
|
||||
w.cancel = cancel
|
||||
|
||||
if err = w.node.Start(ctx); err != nil {
|
||||
if err = w.node.Start(w.ctx); err != nil {
|
||||
return fmt.Errorf("failed to start go-waku node: %v", err)
|
||||
}
|
||||
|
||||
|
@ -1162,12 +1162,12 @@ func (w *Waku) Start() error {
|
|||
|
||||
w.identifyService = idService
|
||||
|
||||
if err = w.addWakuV2Peers(ctx, w.cfg); err != nil {
|
||||
if err = w.addWakuV2Peers(w.ctx, w.cfg); err != nil {
|
||||
return fmt.Errorf("failed to add wakuv2 peers: %v", err)
|
||||
}
|
||||
|
||||
if w.cfg.EnableDiscV5 {
|
||||
err := w.node.DiscV5().Start(ctx)
|
||||
err := w.node.DiscV5().Start(w.ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -1209,7 +1209,7 @@ func (w *Waku) Start() error {
|
|||
w.logger.Info("Restarting DiscV5: online and is not connected")
|
||||
isConnected = true
|
||||
if w.node.DiscV5().ErrOnNotRunning() != nil {
|
||||
err := w.node.DiscV5().Start(ctx)
|
||||
err := w.node.DiscV5().Start(w.ctx)
|
||||
if err != nil {
|
||||
w.logger.Error("Could not start DiscV5", zap.Error(err))
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue