From 93aeefcb8958816761ceaaedde6a438247bc8517 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?rich=CE=9Brd?= Date: Fri, 15 Dec 2023 08:02:04 -0400 Subject: [PATCH] refactor(waku2): publish messages on a goroutine (#4466) --- wakuv2/waku.go | 58 ++++++++++++++++++++++++++++---------------------- 1 file changed, 32 insertions(+), 26 deletions(-) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 1c07ae7bd..00fb0f09d 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -1026,43 +1026,49 @@ func (w *Waku) broadcast() { for { select { case envelope := <-w.sendQueue: - pubsubTopic := envelope.PubsubTopic() - var err error - logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", pubsubTopic), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) - // For now only used in testing to simulate going offline + logger := w.logger.With(zap.String("envelopeHash", hexutil.Encode(envelope.Hash())), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) + var fn publishFn if w.settings.SkipPublishToTopic { - err = errors.New("Test send failure") + // For now only used in testing to simulate going offline + fn = func(env *protocol.Envelope, logger *zap.Logger) error { return errors.New("test send failure") } } else if w.settings.LightClient { - w.logger.Info("publishing message via lightpush") - _, err = w.node.Lightpush().Publish(w.ctx, envelope.Message(), lightpush.WithPubSubTopic(pubsubTopic)) + fn = func(env *protocol.Envelope, logger *zap.Logger) error { + logger.Info("publishing message via lightpush") + _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic())) + return err + } } else { - logger.Info("publishing message via relay") - _, err = w.node.Relay().Publish(w.ctx, envelope.Message(), relay.WithPubSubTopic(pubsubTopic)) + fn = func(env *protocol.Envelope, logger *zap.Logger) error { + logger.Info("publishing message via relay") + _, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) + return err + } } - if err != nil { - logger.Error("could not send message", zap.Error(err)) - w.envelopeFeed.Send(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(envelope.Hash()), - Event: common.EventEnvelopeExpired, - }) - - continue - } - - event := common.EnvelopeEvent{ - Event: common.EventEnvelopeSent, - Hash: gethcommon.BytesToHash(envelope.Hash()), - } - - w.SendEnvelopeEvent(event) - + go w.publishEnvelope(envelope, fn, logger) case <-w.ctx.Done(): return } } } +type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error + +func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) { + var event common.EventType + if err := publishFn(envelope, logger); err != nil { + logger.Error("could not send message", zap.Error(err)) + event = common.EventEnvelopeExpired + } else { + event = common.EventEnvelopeSent + } + + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(envelope.Hash()), + Event: event, + }) +} + // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {