From b32816a704e0a5f3b04cf775634f5a6f0a80adcf Mon Sep 17 00:00:00 2001 From: kaichaosun Date: Thu, 8 Aug 2024 15:15:48 +0800 Subject: [PATCH] chore_: refactor with sender api --- go.mod | 2 +- go.sum | 4 +- telemetry/client_test.go | 3 +- .../waku/v2/api/publish/message_check.go | 21 ++- .../waku/v2/api/publish/message_sender.go | 170 ++++++++++++++++++ .../waku/v2/api/publish/rate_limiting.go | 15 +- .../go-waku/waku/v2/node/keepalive.go | 72 ++++---- .../go-waku/waku/v2/node/wakunode2.go | 10 +- vendor/modules.txt | 2 +- wakuv2/message_publishing.go | 104 +++-------- wakuv2/waku.go | 103 ++++++----- 11 files changed, 330 insertions(+), 176 deletions(-) create mode 100644 vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go diff --git a/go.mod b/go.mod index 8805d7198..05003b835 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 + github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 668d50626..ad62f7018 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 h1:UN5y6imIQBXnuq/bPAYJgT6XMZRgQgUO5Mn9VFi3c5A= -github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816 h1:OfmTCkidLxTenZQ5cQvohO0i2sKozxQ7Sm6AlgFGVwA= +github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/telemetry/client_test.go b/telemetry/client_test.go index d6482bb8d..cfca103d0 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -25,6 +25,7 @@ import ( "github.com/status-im/status-go/protocol/tt" v1protocol "github.com/status-im/status-go/protocol/v1" "github.com/status-im/status-go/wakuv2" + "github.com/waku-org/go-waku/waku/v2/api/publish" ) var ( @@ -170,7 +171,7 @@ func TestClient_ProcessSentEnvelope(t *testing.T) { Version: proto.Uint32(0), Timestamp: proto.Int64(time.Now().Unix()), }, 0, ""), - PublishMethod: wakuv2.LightPush, + PublishMethod: publish.LightPush, } // Send the telemetry request diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go index a7b16a571..a60a8d912 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go @@ -23,6 +23,13 @@ const DefaultMessageExpiredPerid = 10 // in seconds type MessageSentCheckOption func(*MessageSentCheck) error +type ISentCheck interface { + Start() + Add(topic string, messageID common.Hash, sentTime uint32) + DeleteByMessageIDs(messageIDs []common.Hash) + SetStorePeerID(peerID peer.ID) +} + // MessageSentCheck tracks the outgoing messages and check against store node // if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query // if the message keeps missing after `messageExpiredPerid`, the message id will be expired @@ -30,8 +37,8 @@ type MessageSentCheck struct { messageIDs map[string]map[common.Hash]uint32 messageIDsMu sync.RWMutex storePeerID peer.ID - MessageStoredChan chan common.Hash - MessageExpiredChan chan common.Hash + messageStoredChan chan common.Hash + messageExpiredChan chan common.Hash ctx context.Context store *store.WakuStore timesource timesource.Timesource @@ -43,12 +50,12 @@ type MessageSentCheck struct { } // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters -func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, logger *zap.Logger) *MessageSentCheck { +func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck { return &MessageSentCheck{ messageIDs: make(map[string]map[common.Hash]uint32), messageIDsMu: sync.RWMutex{}, - MessageStoredChan: make(chan common.Hash, 1000), - MessageExpiredChan: make(chan common.Hash, 1000), + messageStoredChan: msgStoredChan, + messageExpiredChan: msgExpiredChan, ctx: ctx, store: store, timesource: timesource, @@ -232,12 +239,12 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c if found { ackHashes = append(ackHashes, hash) - m.MessageStoredChan <- hash + m.messageStoredChan <- hash } if !found && uint32(m.timesource.Now().Unix()) > relayTime[i]+m.messageExpiredPerid { missedHashes = append(missedHashes, hash) - m.MessageExpiredChan <- hash + m.messageExpiredChan <- hash } } diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go new file mode 100644 index 000000000..479d894ad --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_sender.go @@ -0,0 +1,170 @@ +package publish + +import ( + "context" + "errors" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "go.uber.org/zap" + "golang.org/x/time/rate" +) + +const DefaultPeersToPublishForLightpush = 2 +const DefaultPublishingLimiterRate = rate.Limit(2) +const DefaultPublishingLimitBurst = 4 + +type PublishMethod int + +const ( + LightPush PublishMethod = iota + Relay + UnknownMethod +) + +func (pm PublishMethod) String() string { + switch pm { + case LightPush: + return "LightPush" + case Relay: + return "Relay" + default: + return "Unknown" + } +} + +type MessageSender struct { + publishMethod PublishMethod + lightPush *lightpush.WakuLightPush + relay *relay.WakuRelay + messageSentCheck ISentCheck + rateLimiter *PublishRateLimiter + logger *zap.Logger +} + +type Request struct { + ctx context.Context + envelope *protocol.Envelope + publishMethod PublishMethod +} + +func NewRequest(ctx context.Context, envelope *protocol.Envelope) *Request { + return &Request{ + ctx: ctx, + envelope: envelope, + publishMethod: UnknownMethod, + } +} + +func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request { + r.publishMethod = publishMethod + return r +} + +func NewMessageSender(publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) (*MessageSender, error) { + if publishMethod == UnknownMethod { + return nil, errors.New("publish method is required") + } + return &MessageSender{ + publishMethod: publishMethod, + lightPush: lightPush, + relay: relay, + rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst), + logger: logger, + }, nil +} + +func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *MessageSender { + ms.messageSentCheck = messageSentCheck + return ms +} + +func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender { + ms.rateLimiter = rateLimiter + return ms +} + +func (ms *MessageSender) Send(req *Request) error { + logger := ms.logger.With( + zap.Stringer("envelopeHash", req.envelope.Hash()), + zap.String("pubsubTopic", req.envelope.PubsubTopic()), + zap.String("contentTopic", req.envelope.Message().ContentTopic), + zap.Int64("timestamp", req.envelope.Message().GetTimestamp()), + ) + + if ms.rateLimiter != nil { + if err := ms.rateLimiter.Check(req.ctx, logger); err != nil { + return err + } + } + + publishMethod := req.publishMethod + if publishMethod == UnknownMethod { + publishMethod = ms.publishMethod + } + + switch publishMethod { + case LightPush: + if ms.lightPush == nil { + return errors.New("lightpush is not available") + } + logger.Info("publishing message via lightpush") + _, err := ms.lightPush.Publish( + req.ctx, + req.envelope.Message(), + lightpush.WithPubSubTopic(req.envelope.PubsubTopic()), + lightpush.WithMaxPeers(DefaultPeersToPublishForLightpush), + ) + if err != nil { + return err + } + case Relay: + if ms.relay == nil { + return errors.New("relay is not available") + } + peerCnt := len(ms.relay.PubSub().ListPeers(req.envelope.PubsubTopic())) + logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) + _, err := ms.relay.Publish(req.ctx, req.envelope.Message(), relay.WithPubSubTopic(req.envelope.PubsubTopic())) + if err != nil { + return err + } + default: + return errors.New("unknown publish method") + } + + if ms.messageSentCheck != nil && !req.envelope.Message().GetEphemeral() { + ms.messageSentCheck.Add( + req.envelope.PubsubTopic(), + common.BytesToHash(req.envelope.Hash().Bytes()), + uint32(req.envelope.Message().GetTimestamp()/int64(time.Second)), + ) + } + + return nil +} + +func (ms *MessageSender) Start() { + if ms.messageSentCheck != nil { + go ms.messageSentCheck.Start() + } +} + +func (ms *MessageSender) PublishMethod() PublishMethod { + return ms.publishMethod +} + +func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) { + if ms.messageSentCheck != nil { + ms.messageSentCheck.DeleteByMessageIDs(messageIDs) + } +} + +func (ms *MessageSender) SetStorePeerID(peerID peer.ID) { + if ms.messageSentCheck != nil { + ms.messageSentCheck.SetStorePeerID(peerID) + } +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go index 4322413b3..a0bddcbdb 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go @@ -26,12 +26,19 @@ func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter { // ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { return func(envelope *protocol.Envelope, logger *zap.Logger) error { - if err := p.limiter.Wait(ctx); err != nil { - if !errors.Is(err, context.Canceled) { - logger.Error("could not send message (limiter)", zap.Error(err)) - } + if err := p.Check(ctx, logger); err != nil { return err } return publishFn(envelope, logger) } } + +func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error { + if err := p.limiter.Wait(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("could not send message (limiter)", zap.Error(err)) + } + return err + } + return nil +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go index 92e0ab1b4..eb28d5171 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/keepalive.go @@ -23,7 +23,7 @@ const maxAllowedPingFailures = 2 // the peers if they don't reply back const sleepDetectionIntervalFactor = 3 -const maxPeersToPing = 10 +const maxPeersToPingPerProtocol = 10 const maxAllowedSubsequentPingFailures = 2 @@ -56,8 +56,8 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t } allPeersTickerC := make(<-chan time.Time) - if randomPeersPingDuration != 0 { - allPeersTicker := time.NewTicker(randomPeersPingDuration) + if allPeersPingDuration != 0 { + allPeersTicker := time.NewTicker(allPeersPingDuration) defer allPeersTicker.Stop() randomPeersTickerC = allPeersTicker.C } @@ -72,13 +72,15 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t select { case <-allPeersTickerC: - relayPeersSet := make(map[peer.ID]struct{}) - for _, t := range w.Relay().Topics() { - for _, p := range w.Relay().PubSub().ListPeers(t) { - relayPeersSet[p] = struct{}{} + if w.opts.enableRelay { + relayPeersSet := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().ListPeers(t) { + relayPeersSet[p] = struct{}{} + } } + peersToPing = append(peersToPing, maps.Keys(relayPeersSet)...) } - peersToPing = maps.Keys(relayPeersSet) case <-randomPeersTickerC: difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() @@ -94,36 +96,46 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t continue } - // Priorize mesh peers - meshPeersSet := make(map[peer.ID]struct{}) - for _, t := range w.Relay().Topics() { - for _, p := range w.Relay().PubSub().MeshPeers(t) { - meshPeersSet[p] = struct{}{} - } - } - peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...) - - // Ping also some random relay peers - if maxPeersToPing-len(peersToPing) > 0 { - relayPeersSet := make(map[peer.ID]struct{}) + if w.opts.enableRelay { + // Priorize mesh peers + meshPeersSet := make(map[peer.ID]struct{}) for _, t := range w.Relay().Topics() { - for _, p := range w.Relay().PubSub().ListPeers(t) { - if _, ok := meshPeersSet[p]; !ok { - relayPeersSet[p] = struct{}{} - } + for _, p := range w.Relay().PubSub().MeshPeers(t) { + meshPeersSet[p] = struct{}{} } } + peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...) - relayPeers := maps.Keys(relayPeersSet) - rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] }) + // Ping also some random relay peers + if maxPeersToPingPerProtocol-len(peersToPing) > 0 { + relayPeersSet := make(map[peer.ID]struct{}) + for _, t := range w.Relay().Topics() { + for _, p := range w.Relay().PubSub().ListPeers(t) { + if _, ok := meshPeersSet[p]; !ok { + relayPeersSet[p] = struct{}{} + } + } + } - peerLen := maxPeersToPing - len(peersToPing) - if peerLen > len(relayPeers) { - peerLen = len(relayPeers) + relayPeers := maps.Keys(relayPeersSet) + rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] }) + + peerLen := maxPeersToPingPerProtocol - len(peersToPing) + if peerLen > len(relayPeers) { + peerLen = len(relayPeers) + } + peersToPing = append(peersToPing, relayPeers[0:peerLen]...) } - peersToPing = append(peersToPing, relayPeers[0:peerLen]...) } + if w.opts.enableFilterLightNode { + // We also ping all filter nodes + filterPeersSet := make(map[peer.ID]struct{}) + for _, s := range w.FilterLightnode().Subscriptions() { + filterPeersSet[s.PeerID] = struct{}{} + } + peersToPing = append(peersToPing, maps.Keys(filterPeersSet)...) + } case <-ctx.Done(): w.log.Info("stopping ping protocol") return diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go index dd7fbae9b..747109ffb 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/node/wakunode2.go @@ -378,11 +378,6 @@ func (w *WakuNode) Start(ctx context.Context) error { return err } - if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) { - w.wg.Add(1) - go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval) - } - w.metadata.SetHost(host) err = w.metadata.Start(ctx) if err != nil { @@ -478,6 +473,11 @@ func (w *WakuNode) Start(ctx context.Context) error { } } + if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) { + w.wg.Add(1) + go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval) + } + w.peerExchange.SetHost(host) if w.opts.enablePeerExchange { err := w.peerExchange.Start(ctx) diff --git a/vendor/modules.txt b/vendor/modules.txt index 556c56a1e..ef3ee3b4f 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 +# github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 9fbf44fa8..864711201 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -1,13 +1,10 @@ package wakuv2 import ( - "errors" - "go.uber.org/zap" "github.com/waku-org/go-waku/waku/v2/api/publish" "github.com/waku-org/go-waku/waku/v2/protocol" - "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/relay" @@ -15,24 +12,6 @@ import ( "github.com/status-im/status-go/wakuv2/common" ) -type PublishMethod int - -const ( - LightPush PublishMethod = iota - Relay -) - -func (pm PublishMethod) String() string { - switch pm { - case LightPush: - return "LightPush" - case Relay: - return "Relay" - default: - return "Unknown" - } -} - // Send injects a message into the waku send queue, to be distributed in the // network in the coming cycles. func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { @@ -88,72 +67,45 @@ func (w *Waku) broadcast() { return } - logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) - - var fn publish.PublishFn - var publishMethod PublishMethod - - if w.cfg.SkipPublishToTopic { - // For now only used in testing to simulate going offline - publishMethod = LightPush - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - return errors.New("test send failure") - } - } else if w.cfg.LightClient { - publishMethod = LightPush - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - logger.Info("publishing message via lightpush") - _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush)) - return err - } - } else { - publishMethod = Relay - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic())) - logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) - _, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) - return err - } - } - - // Wraps the publish function with a call to the telemetry client - if w.statusTelemetryClient != nil { - sendFn := fn - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - err := sendFn(env, logger) - if err == nil { - w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: env, PublishMethod: publishMethod}) - } else { - w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) - } - return err - } - } - - // Wraps the publish function with rate limiter - fn = w.limiter.ThrottlePublishFn(w.ctx, fn) - w.wg.Add(1) - go w.publishEnvelope(envelope, fn, logger) + go w.publishEnvelope(envelope) } } -func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.PublishFn, logger *zap.Logger) { +func (w *Waku) publishEnvelope(envelope *protocol.Envelope) { defer w.wg.Done() - if err := publishFn(envelope, logger); err != nil { + logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) + + // only used in testing to simulate going offline + if w.cfg.SkipPublishToTopic { + logger.Info("skipping publish to topic") + return + } + + err := w.messageSender.Send(publish.NewRequest(w.ctx, envelope)) + + if w.statusTelemetryClient != nil { + if err == nil { + w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}) + } else { + w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}}) + } + } + + if err != nil { logger.Error("could not send message", zap.Error(err)) w.SendEnvelopeEvent(common.EnvelopeEvent{ Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), Event: common.EventEnvelopeExpired, }) return - } else { - if !w.cfg.EnableStoreConfirmationForMessagesSent { - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), - Event: common.EventEnvelopeSent, - }) - } + } + + if !w.cfg.EnableStoreConfirmationForMessagesSent { + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), + Event: common.EventEnvelopeSent, + }) } } diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 24cd65a35..6d5e66936 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -90,13 +90,10 @@ const cacheTTL = 20 * time.Minute const maxRelayPeers = 300 const randomPeersKeepAliveInterval = 5 * time.Second const allPeersKeepAliveInterval = 5 * time.Minute -const peersToPublishForLightpush = 2 -const publishingLimiterRate = rate.Limit(2) -const publishingLimitBurst = 4 type SentEnvelope struct { Envelope *protocol.Envelope - PublishMethod PublishMethod + PublishMethod publish.PublishMethod } type ErrorSendingEnvelope struct { @@ -137,7 +134,6 @@ type Waku struct { protectedTopicStore *persistence.ProtectedTopicsStore sendQueue *publish.MessageQueue - limiter *publish.PublishRateLimiter missingMsgVerifier *missing.MissingMessageVerifier @@ -155,7 +151,7 @@ type Waku struct { storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids storeMsgIDsMu sync.RWMutex - messageSentCheck *publish.MessageSentCheck + messageSender *publish.MessageSender topicHealthStatusChan chan peermanager.TopicHealthStatus connectionNotifChan chan node.PeerConnection @@ -246,15 +242,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), } - if !cfg.UseThrottledPublish || testing.Testing() { - // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, - // basically disabling the rate limit functionality - waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1) - - } else { - waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst) - } - waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) waku.bandwidthCounter = metrics.NewBandwidthCounter() @@ -992,16 +979,11 @@ func (w *Waku) SkipPublishToTopic(value bool) { } func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { - if !w.cfg.EnableStoreConfirmationForMessagesSent { - return - } - w.messageSentCheck.DeleteByMessageIDs(hashes) + w.messageSender.MessagesDelivered(hashes) } func (w *Waku) SetStorePeerID(peerID peer.ID) { - if w.messageSentCheck != nil { - w.messageSentCheck.SetStorePeerID(peerID) - } + w.messageSender.SetStorePeerID(peerID) } func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCriteria, cursor []byte, opts []store.RequestOption, processEnvelopes bool) ([]byte, int, error) { @@ -1162,8 +1144,9 @@ func (w *Waku) Start() error { go w.sendQueue.Start(w.ctx) - if w.cfg.EnableStoreConfirmationForMessagesSent { - w.confirmMessagesSent() + err = w.startMessageSender() + if err != nil { + return err } // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` @@ -1210,28 +1193,55 @@ func (w *Waku) checkForConnectionChanges() { }) } -func (w *Waku) confirmMessagesSent() { - w.messageSentCheck = publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), w.logger) - go w.messageSentCheck.Start() +func (w *Waku) startMessageSender() error { + publishMethod := publish.Relay + if w.cfg.LightClient { + publishMethod = publish.LightPush + } - go func() { - for { - select { - case <-w.ctx.Done(): - return - case hash := <-w.messageSentCheck.MessageStoredChan: - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: hash, - Event: common.EventEnvelopeSent, - }) - case hash := <-w.messageSentCheck.MessageExpiredChan: - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: hash, - Event: common.EventEnvelopeExpired, - }) + sender, err := publish.NewMessageSender(publishMethod, w.node.Lightpush(), w.node.Relay(), w.logger) + if err != nil { + w.logger.Error("failed to create message sender", zap.Error(err)) + return err + } + + if w.cfg.EnableStoreConfirmationForMessagesSent { + msgStoredChan := make(chan gethcommon.Hash, 1000) + msgExpiredChan := make(chan gethcommon.Hash, 1000) + messageSentCheck := publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger) + sender.WithMessageSentCheck(messageSentCheck) + + go func() { + for { + select { + case <-w.ctx.Done(): + return + case hash := <-msgStoredChan: + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeSent, + }) + case hash := <-msgExpiredChan: + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeExpired, + }) + } } - } - }() + }() + } + + if !w.cfg.UseThrottledPublish || testing.Testing() { + // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, + // basically disabling the rate limit functionality + limiter := publish.NewPublishRateLimiter(rate.Inf, 1) + sender.WithRateLimiting(limiter) + } + + w.messageSender = sender + w.messageSender.Start() + + return nil } func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { @@ -1421,11 +1431,6 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } - ephemeral := e.Envelope.Message().Ephemeral - if w.cfg.EnableStoreConfirmationForMessagesSent && e.MsgType == common.SendMessageType && (ephemeral == nil || !*ephemeral) { - w.messageSentCheck.Add(e.PubsubTopic, e.Hash(), e.Sent) - } - matched := w.filters.NotifyWatchers(e) // If not matched we remove it