diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 864711201..41f7aede8 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,6 +1,8 @@ package wakuv2 import ( + "errors" + "go.uber.org/zap" "github.com/waku-org/go-waku/waku/v2/api/publish" @@ -77,19 +79,20 @@ func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) + var err error // only used in testing to simulate going offline if w.cfg.SkipPublishToTopic { logger.Info("skipping publish to topic") - return + err = errors.New("test send failure") + } else { + err = w.messageSender.Send(publish.NewRequest(w.ctx, envelope)) } - err := w.messageSender.Send(publish.NewRequest(w.ctx, envelope)) - if w.statusTelemetryClient != nil { if err == nil { - w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}) + w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}) } else { - w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}}) + w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}}) } }