chore_: refactor with sender api

This commit is contained in:
kaichaosun 2024-08-08 15:15:48 +08:00 committed by richΛrd
parent ef85895f65
commit b32816a704
11 changed files with 330 additions and 176 deletions

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0 github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2 github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816
github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 h1:UN5y6imIQBXnuq/bPAYJgT6XMZRgQgUO5Mn9VFi3c5A= github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816 h1:OfmTCkidLxTenZQ5cQvohO0i2sKozxQ7Sm6AlgFGVwA=
github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -25,6 +25,7 @@ import (
"github.com/status-im/status-go/protocol/tt" "github.com/status-im/status-go/protocol/tt"
v1protocol "github.com/status-im/status-go/protocol/v1" v1protocol "github.com/status-im/status-go/protocol/v1"
"github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/wakuv2"
"github.com/waku-org/go-waku/waku/v2/api/publish"
) )
var ( var (
@ -170,7 +171,7 @@ func TestClient_ProcessSentEnvelope(t *testing.T) {
Version: proto.Uint32(0), Version: proto.Uint32(0),
Timestamp: proto.Int64(time.Now().Unix()), Timestamp: proto.Int64(time.Now().Unix()),
}, 0, ""), }, 0, ""),
PublishMethod: wakuv2.LightPush, PublishMethod: publish.LightPush,
} }
// Send the telemetry request // Send the telemetry request

View File

@ -23,6 +23,13 @@ const DefaultMessageExpiredPerid = 10 // in seconds
type MessageSentCheckOption func(*MessageSentCheck) error type MessageSentCheckOption func(*MessageSentCheck) error
type ISentCheck interface {
Start()
Add(topic string, messageID common.Hash, sentTime uint32)
DeleteByMessageIDs(messageIDs []common.Hash)
SetStorePeerID(peerID peer.ID)
}
// MessageSentCheck tracks the outgoing messages and check against store node // MessageSentCheck tracks the outgoing messages and check against store node
// if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query // if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query
// if the message keeps missing after `messageExpiredPerid`, the message id will be expired // if the message keeps missing after `messageExpiredPerid`, the message id will be expired
@ -30,8 +37,8 @@ type MessageSentCheck struct {
messageIDs map[string]map[common.Hash]uint32 messageIDs map[string]map[common.Hash]uint32
messageIDsMu sync.RWMutex messageIDsMu sync.RWMutex
storePeerID peer.ID storePeerID peer.ID
MessageStoredChan chan common.Hash messageStoredChan chan common.Hash
MessageExpiredChan chan common.Hash messageExpiredChan chan common.Hash
ctx context.Context ctx context.Context
store *store.WakuStore store *store.WakuStore
timesource timesource.Timesource timesource timesource.Timesource
@ -43,12 +50,12 @@ type MessageSentCheck struct {
} }
// NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters // NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters
func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, logger *zap.Logger) *MessageSentCheck { func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, msgStoredChan chan common.Hash, msgExpiredChan chan common.Hash, logger *zap.Logger) *MessageSentCheck {
return &MessageSentCheck{ return &MessageSentCheck{
messageIDs: make(map[string]map[common.Hash]uint32), messageIDs: make(map[string]map[common.Hash]uint32),
messageIDsMu: sync.RWMutex{}, messageIDsMu: sync.RWMutex{},
MessageStoredChan: make(chan common.Hash, 1000), messageStoredChan: msgStoredChan,
MessageExpiredChan: make(chan common.Hash, 1000), messageExpiredChan: msgExpiredChan,
ctx: ctx, ctx: ctx,
store: store, store: store,
timesource: timesource, timesource: timesource,
@ -232,12 +239,12 @@ func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []c
if found { if found {
ackHashes = append(ackHashes, hash) ackHashes = append(ackHashes, hash)
m.MessageStoredChan <- hash m.messageStoredChan <- hash
} }
if !found && uint32(m.timesource.Now().Unix()) > relayTime[i]+m.messageExpiredPerid { if !found && uint32(m.timesource.Now().Unix()) > relayTime[i]+m.messageExpiredPerid {
missedHashes = append(missedHashes, hash) missedHashes = append(missedHashes, hash)
m.MessageExpiredChan <- hash m.messageExpiredChan <- hash
} }
} }

View File

@ -0,0 +1,170 @@
package publish
import (
"context"
"errors"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
const DefaultPeersToPublishForLightpush = 2
const DefaultPublishingLimiterRate = rate.Limit(2)
const DefaultPublishingLimitBurst = 4
type PublishMethod int
const (
LightPush PublishMethod = iota
Relay
UnknownMethod
)
func (pm PublishMethod) String() string {
switch pm {
case LightPush:
return "LightPush"
case Relay:
return "Relay"
default:
return "Unknown"
}
}
type MessageSender struct {
publishMethod PublishMethod
lightPush *lightpush.WakuLightPush
relay *relay.WakuRelay
messageSentCheck ISentCheck
rateLimiter *PublishRateLimiter
logger *zap.Logger
}
type Request struct {
ctx context.Context
envelope *protocol.Envelope
publishMethod PublishMethod
}
func NewRequest(ctx context.Context, envelope *protocol.Envelope) *Request {
return &Request{
ctx: ctx,
envelope: envelope,
publishMethod: UnknownMethod,
}
}
func (r *Request) WithPublishMethod(publishMethod PublishMethod) *Request {
r.publishMethod = publishMethod
return r
}
func NewMessageSender(publishMethod PublishMethod, lightPush *lightpush.WakuLightPush, relay *relay.WakuRelay, logger *zap.Logger) (*MessageSender, error) {
if publishMethod == UnknownMethod {
return nil, errors.New("publish method is required")
}
return &MessageSender{
publishMethod: publishMethod,
lightPush: lightPush,
relay: relay,
rateLimiter: NewPublishRateLimiter(DefaultPublishingLimiterRate, DefaultPublishingLimitBurst),
logger: logger,
}, nil
}
func (ms *MessageSender) WithMessageSentCheck(messageSentCheck ISentCheck) *MessageSender {
ms.messageSentCheck = messageSentCheck
return ms
}
func (ms *MessageSender) WithRateLimiting(rateLimiter *PublishRateLimiter) *MessageSender {
ms.rateLimiter = rateLimiter
return ms
}
func (ms *MessageSender) Send(req *Request) error {
logger := ms.logger.With(
zap.Stringer("envelopeHash", req.envelope.Hash()),
zap.String("pubsubTopic", req.envelope.PubsubTopic()),
zap.String("contentTopic", req.envelope.Message().ContentTopic),
zap.Int64("timestamp", req.envelope.Message().GetTimestamp()),
)
if ms.rateLimiter != nil {
if err := ms.rateLimiter.Check(req.ctx, logger); err != nil {
return err
}
}
publishMethod := req.publishMethod
if publishMethod == UnknownMethod {
publishMethod = ms.publishMethod
}
switch publishMethod {
case LightPush:
if ms.lightPush == nil {
return errors.New("lightpush is not available")
}
logger.Info("publishing message via lightpush")
_, err := ms.lightPush.Publish(
req.ctx,
req.envelope.Message(),
lightpush.WithPubSubTopic(req.envelope.PubsubTopic()),
lightpush.WithMaxPeers(DefaultPeersToPublishForLightpush),
)
if err != nil {
return err
}
case Relay:
if ms.relay == nil {
return errors.New("relay is not available")
}
peerCnt := len(ms.relay.PubSub().ListPeers(req.envelope.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
_, err := ms.relay.Publish(req.ctx, req.envelope.Message(), relay.WithPubSubTopic(req.envelope.PubsubTopic()))
if err != nil {
return err
}
default:
return errors.New("unknown publish method")
}
if ms.messageSentCheck != nil && !req.envelope.Message().GetEphemeral() {
ms.messageSentCheck.Add(
req.envelope.PubsubTopic(),
common.BytesToHash(req.envelope.Hash().Bytes()),
uint32(req.envelope.Message().GetTimestamp()/int64(time.Second)),
)
}
return nil
}
func (ms *MessageSender) Start() {
if ms.messageSentCheck != nil {
go ms.messageSentCheck.Start()
}
}
func (ms *MessageSender) PublishMethod() PublishMethod {
return ms.publishMethod
}
func (ms *MessageSender) MessagesDelivered(messageIDs []common.Hash) {
if ms.messageSentCheck != nil {
ms.messageSentCheck.DeleteByMessageIDs(messageIDs)
}
}
func (ms *MessageSender) SetStorePeerID(peerID peer.ID) {
if ms.messageSentCheck != nil {
ms.messageSentCheck.SetStorePeerID(peerID)
}
}

View File

@ -26,12 +26,19 @@ func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter {
// ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied // ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied
func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn {
return func(envelope *protocol.Envelope, logger *zap.Logger) error { return func(envelope *protocol.Envelope, logger *zap.Logger) error {
if err := p.limiter.Wait(ctx); err != nil { if err := p.Check(ctx, logger); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("could not send message (limiter)", zap.Error(err))
}
return err return err
} }
return publishFn(envelope, logger) return publishFn(envelope, logger)
} }
} }
func (p *PublishRateLimiter) Check(ctx context.Context, logger *zap.Logger) error {
if err := p.limiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("could not send message (limiter)", zap.Error(err))
}
return err
}
return nil
}

View File

@ -23,7 +23,7 @@ const maxAllowedPingFailures = 2
// the peers if they don't reply back // the peers if they don't reply back
const sleepDetectionIntervalFactor = 3 const sleepDetectionIntervalFactor = 3
const maxPeersToPing = 10 const maxPeersToPingPerProtocol = 10
const maxAllowedSubsequentPingFailures = 2 const maxAllowedSubsequentPingFailures = 2
@ -56,8 +56,8 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
} }
allPeersTickerC := make(<-chan time.Time) allPeersTickerC := make(<-chan time.Time)
if randomPeersPingDuration != 0 { if allPeersPingDuration != 0 {
allPeersTicker := time.NewTicker(randomPeersPingDuration) allPeersTicker := time.NewTicker(allPeersPingDuration)
defer allPeersTicker.Stop() defer allPeersTicker.Stop()
randomPeersTickerC = allPeersTicker.C randomPeersTickerC = allPeersTicker.C
} }
@ -72,13 +72,15 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
select { select {
case <-allPeersTickerC: case <-allPeersTickerC:
relayPeersSet := make(map[peer.ID]struct{}) if w.opts.enableRelay {
for _, t := range w.Relay().Topics() { relayPeersSet := make(map[peer.ID]struct{})
for _, p := range w.Relay().PubSub().ListPeers(t) { for _, t := range w.Relay().Topics() {
relayPeersSet[p] = struct{}{} for _, p := range w.Relay().PubSub().ListPeers(t) {
relayPeersSet[p] = struct{}{}
}
} }
peersToPing = append(peersToPing, maps.Keys(relayPeersSet)...)
} }
peersToPing = maps.Keys(relayPeersSet)
case <-randomPeersTickerC: case <-randomPeersTickerC:
difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano() difference := w.timesource.Now().UnixNano() - lastTimeExecuted.UnixNano()
@ -94,36 +96,46 @@ func (w *WakuNode) startKeepAlive(ctx context.Context, randomPeersPingDuration t
continue continue
} }
// Priorize mesh peers if w.opts.enableRelay {
meshPeersSet := make(map[peer.ID]struct{}) // Priorize mesh peers
for _, t := range w.Relay().Topics() { meshPeersSet := make(map[peer.ID]struct{})
for _, p := range w.Relay().PubSub().MeshPeers(t) {
meshPeersSet[p] = struct{}{}
}
}
peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...)
// Ping also some random relay peers
if maxPeersToPing-len(peersToPing) > 0 {
relayPeersSet := make(map[peer.ID]struct{})
for _, t := range w.Relay().Topics() { for _, t := range w.Relay().Topics() {
for _, p := range w.Relay().PubSub().ListPeers(t) { for _, p := range w.Relay().PubSub().MeshPeers(t) {
if _, ok := meshPeersSet[p]; !ok { meshPeersSet[p] = struct{}{}
relayPeersSet[p] = struct{}{}
}
} }
} }
peersToPing = append(peersToPing, maps.Keys(meshPeersSet)...)
relayPeers := maps.Keys(relayPeersSet) // Ping also some random relay peers
rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] }) if maxPeersToPingPerProtocol-len(peersToPing) > 0 {
relayPeersSet := make(map[peer.ID]struct{})
for _, t := range w.Relay().Topics() {
for _, p := range w.Relay().PubSub().ListPeers(t) {
if _, ok := meshPeersSet[p]; !ok {
relayPeersSet[p] = struct{}{}
}
}
}
peerLen := maxPeersToPing - len(peersToPing) relayPeers := maps.Keys(relayPeersSet)
if peerLen > len(relayPeers) { rand.Shuffle(len(relayPeers), func(i, j int) { relayPeers[i], relayPeers[j] = relayPeers[j], relayPeers[i] })
peerLen = len(relayPeers)
peerLen := maxPeersToPingPerProtocol - len(peersToPing)
if peerLen > len(relayPeers) {
peerLen = len(relayPeers)
}
peersToPing = append(peersToPing, relayPeers[0:peerLen]...)
} }
peersToPing = append(peersToPing, relayPeers[0:peerLen]...)
} }
if w.opts.enableFilterLightNode {
// We also ping all filter nodes
filterPeersSet := make(map[peer.ID]struct{})
for _, s := range w.FilterLightnode().Subscriptions() {
filterPeersSet[s.PeerID] = struct{}{}
}
peersToPing = append(peersToPing, maps.Keys(filterPeersSet)...)
}
case <-ctx.Done(): case <-ctx.Done():
w.log.Info("stopping ping protocol") w.log.Info("stopping ping protocol")
return return

View File

@ -378,11 +378,6 @@ func (w *WakuNode) Start(ctx context.Context) error {
return err return err
} }
if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) {
w.wg.Add(1)
go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval)
}
w.metadata.SetHost(host) w.metadata.SetHost(host)
err = w.metadata.Start(ctx) err = w.metadata.Start(ctx)
if err != nil { if err != nil {
@ -478,6 +473,11 @@ func (w *WakuNode) Start(ctx context.Context) error {
} }
} }
if w.opts.keepAliveRandomPeersInterval > time.Duration(0) || w.opts.keepAliveAllPeersInterval > time.Duration(0) {
w.wg.Add(1)
go w.startKeepAlive(ctx, w.opts.keepAliveRandomPeersInterval, w.opts.keepAliveAllPeersInterval)
}
w.peerExchange.SetHost(host) w.peerExchange.SetHost(host)
if w.opts.enablePeerExchange { if w.opts.enablePeerExchange {
err := w.peerExchange.Start(ctx) err := w.peerExchange.Start(ctx)

2
vendor/modules.txt vendored
View File

@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240807022408-c2e6320953b1 # github.com/waku-org/go-waku v0.8.1-0.20240810120551-92d62a7c3816
## explicit; go 1.21 ## explicit; go 1.21
github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/tests

View File

@ -1,13 +1,10 @@
package wakuv2 package wakuv2
import ( import (
"errors"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/waku-org/go-waku/waku/v2/api/publish" "github.com/waku-org/go-waku/waku/v2/api/publish"
"github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb" "github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/relay"
@ -15,24 +12,6 @@ import (
"github.com/status-im/status-go/wakuv2/common" "github.com/status-im/status-go/wakuv2/common"
) )
type PublishMethod int
const (
LightPush PublishMethod = iota
Relay
)
func (pm PublishMethod) String() string {
switch pm {
case LightPush:
return "LightPush"
case Relay:
return "Relay"
default:
return "Unknown"
}
}
// Send injects a message into the waku send queue, to be distributed in the // Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles. // network in the coming cycles.
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) {
@ -88,72 +67,45 @@ func (w *Waku) broadcast() {
return return
} }
logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
var fn publish.PublishFn
var publishMethod PublishMethod
if w.cfg.SkipPublishToTopic {
// For now only used in testing to simulate going offline
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
return errors.New("test send failure")
}
} else if w.cfg.LightClient {
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush))
return err
}
} else {
publishMethod = Relay
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
_, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic()))
return err
}
}
// Wraps the publish function with a call to the telemetry client
if w.statusTelemetryClient != nil {
sendFn := fn
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
err := sendFn(env, logger)
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(w.ctx, SentEnvelope{Envelope: env, PublishMethod: publishMethod})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(w.ctx, ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}})
}
return err
}
}
// Wraps the publish function with rate limiter
fn = w.limiter.ThrottlePublishFn(w.ctx, fn)
w.wg.Add(1) w.wg.Add(1)
go w.publishEnvelope(envelope, fn, logger) go w.publishEnvelope(envelope)
} }
} }
func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.PublishFn, logger *zap.Logger) { func (w *Waku) publishEnvelope(envelope *protocol.Envelope) {
defer w.wg.Done() defer w.wg.Done()
if err := publishFn(envelope, logger); err != nil { logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
// only used in testing to simulate going offline
if w.cfg.SkipPublishToTopic {
logger.Info("skipping publish to topic")
return
}
err := w.messageSender.Send(publish.NewRequest(w.ctx, envelope))
if w.statusTelemetryClient != nil {
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: envelope, PublishMethod: w.messageSender.PublishMethod()}})
}
}
if err != nil {
logger.Error("could not send message", zap.Error(err)) logger.Error("could not send message", zap.Error(err))
w.SendEnvelopeEvent(common.EnvelopeEvent{ w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeExpired, Event: common.EventEnvelopeExpired,
}) })
return return
} else { }
if !w.cfg.EnableStoreConfirmationForMessagesSent {
w.SendEnvelopeEvent(common.EnvelopeEvent{ if !w.cfg.EnableStoreConfirmationForMessagesSent {
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), w.SendEnvelopeEvent(common.EnvelopeEvent{
Event: common.EventEnvelopeSent, Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
}) Event: common.EventEnvelopeSent,
} })
} }
} }

View File

@ -90,13 +90,10 @@ const cacheTTL = 20 * time.Minute
const maxRelayPeers = 300 const maxRelayPeers = 300
const randomPeersKeepAliveInterval = 5 * time.Second const randomPeersKeepAliveInterval = 5 * time.Second
const allPeersKeepAliveInterval = 5 * time.Minute const allPeersKeepAliveInterval = 5 * time.Minute
const peersToPublishForLightpush = 2
const publishingLimiterRate = rate.Limit(2)
const publishingLimitBurst = 4
type SentEnvelope struct { type SentEnvelope struct {
Envelope *protocol.Envelope Envelope *protocol.Envelope
PublishMethod PublishMethod PublishMethod publish.PublishMethod
} }
type ErrorSendingEnvelope struct { type ErrorSendingEnvelope struct {
@ -137,7 +134,6 @@ type Waku struct {
protectedTopicStore *persistence.ProtectedTopicsStore protectedTopicStore *persistence.ProtectedTopicsStore
sendQueue *publish.MessageQueue sendQueue *publish.MessageQueue
limiter *publish.PublishRateLimiter
missingMsgVerifier *missing.MissingMessageVerifier missingMsgVerifier *missing.MissingMessageVerifier
@ -155,7 +151,7 @@ type Waku struct {
storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids
storeMsgIDsMu sync.RWMutex storeMsgIDsMu sync.RWMutex
messageSentCheck *publish.MessageSentCheck messageSender *publish.MessageSender
topicHealthStatusChan chan peermanager.TopicHealthStatus topicHealthStatusChan chan peermanager.TopicHealthStatus
connectionNotifChan chan node.PeerConnection connectionNotifChan chan node.PeerConnection
@ -246,15 +242,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
} }
if !cfg.UseThrottledPublish || testing.Testing() {
// To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter,
// basically disabling the rate limit functionality
waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1)
} else {
waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst)
}
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
waku.bandwidthCounter = metrics.NewBandwidthCounter() waku.bandwidthCounter = metrics.NewBandwidthCounter()
@ -992,16 +979,11 @@ func (w *Waku) SkipPublishToTopic(value bool) {
} }
func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) {
if !w.cfg.EnableStoreConfirmationForMessagesSent { w.messageSender.MessagesDelivered(hashes)
return
}
w.messageSentCheck.DeleteByMessageIDs(hashes)
} }
func (w *Waku) SetStorePeerID(peerID peer.ID) { func (w *Waku) SetStorePeerID(peerID peer.ID) {
if w.messageSentCheck != nil { w.messageSender.SetStorePeerID(peerID)
w.messageSentCheck.SetStorePeerID(peerID)
}
} }
func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCriteria, cursor []byte, opts []store.RequestOption, processEnvelopes bool) ([]byte, int, error) { func (w *Waku) Query(ctx context.Context, peerID peer.ID, query store.FilterCriteria, cursor []byte, opts []store.RequestOption, processEnvelopes bool) ([]byte, int, error) {
@ -1162,8 +1144,9 @@ func (w *Waku) Start() error {
go w.sendQueue.Start(w.ctx) go w.sendQueue.Start(w.ctx)
if w.cfg.EnableStoreConfirmationForMessagesSent { err = w.startMessageSender()
w.confirmMessagesSent() if err != nil {
return err
} }
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`
@ -1210,28 +1193,55 @@ func (w *Waku) checkForConnectionChanges() {
}) })
} }
func (w *Waku) confirmMessagesSent() { func (w *Waku) startMessageSender() error {
w.messageSentCheck = publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), w.logger) publishMethod := publish.Relay
go w.messageSentCheck.Start() if w.cfg.LightClient {
publishMethod = publish.LightPush
}
go func() { sender, err := publish.NewMessageSender(publishMethod, w.node.Lightpush(), w.node.Relay(), w.logger)
for { if err != nil {
select { w.logger.Error("failed to create message sender", zap.Error(err))
case <-w.ctx.Done(): return err
return }
case hash := <-w.messageSentCheck.MessageStoredChan:
w.SendEnvelopeEvent(common.EnvelopeEvent{ if w.cfg.EnableStoreConfirmationForMessagesSent {
Hash: hash, msgStoredChan := make(chan gethcommon.Hash, 1000)
Event: common.EventEnvelopeSent, msgExpiredChan := make(chan gethcommon.Hash, 1000)
}) messageSentCheck := publish.NewMessageSentCheck(w.ctx, w.node.Store(), w.node.Timesource(), msgStoredChan, msgExpiredChan, w.logger)
case hash := <-w.messageSentCheck.MessageExpiredChan: sender.WithMessageSentCheck(messageSentCheck)
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash, go func() {
Event: common.EventEnvelopeExpired, for {
}) select {
case <-w.ctx.Done():
return
case hash := <-msgStoredChan:
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeSent,
})
case hash := <-msgExpiredChan:
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: hash,
Event: common.EventEnvelopeExpired,
})
}
} }
} }()
}() }
if !w.cfg.UseThrottledPublish || testing.Testing() {
// To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter,
// basically disabling the rate limit functionality
limiter := publish.NewPublishRateLimiter(rate.Inf, 1)
sender.WithRateLimiting(limiter)
}
w.messageSender = sender
w.messageSender.Start()
return nil
} }
func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) { func (w *Waku) MessageExists(mh pb.MessageHash) (bool, error) {
@ -1421,11 +1431,6 @@ func (w *Waku) processMessage(e *common.ReceivedMessage) {
w.storeMsgIDsMu.Unlock() w.storeMsgIDsMu.Unlock()
} }
ephemeral := e.Envelope.Message().Ephemeral
if w.cfg.EnableStoreConfirmationForMessagesSent && e.MsgType == common.SendMessageType && (ephemeral == nil || !*ephemeral) {
w.messageSentCheck.Add(e.PubsubTopic, e.Hash(), e.Sent)
}
matched := w.filters.NotifyWatchers(e) matched := w.filters.NotifyWatchers(e)
// If not matched we remove it // If not matched we remove it