feat_: rate limit message publishing (#5523)

This commit is contained in:
richΛrd 2024-08-01 14:36:25 -04:00 committed by GitHub
parent 3e6f407330
commit 5212f337d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 428 additions and 147 deletions

View File

@ -99,6 +99,7 @@ func (w *gethPublicWakuV2APIWrapper) Post(ctx context.Context, req types.NewMess
Padding: req.Padding,
TargetPeer: req.TargetPeer,
Ephemeral: req.Ephemeral,
Priority: req.Priority,
}
return w.api.Post(ctx, msg)
}

View File

@ -18,6 +18,7 @@ type NewMessage struct {
PowTarget float64 `json:"powTarget"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
}
// Message is the RPC representation of a whisper message.

2
go.mod
View File

@ -96,7 +96,7 @@ require (
github.com/schollz/peerdiscovery v1.7.0
github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7
github.com/urfave/cli/v2 v2.27.2
github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26
github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2
github.com/wk8/go-ordered-map/v2 v2.1.7
github.com/yeqown/go-qrcode/v2 v2.2.1
github.com/yeqown/go-qrcode/writer/standard v1.2.1

4
go.sum
View File

@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E
github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo=
github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY=
github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 h1:F657EAwHvwTcVjMddQYGZjVC3mfYsdPD9AAHPvV9/hI=
github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw=
github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2 h1:S8KqFD9b1T+fJvKqDbb95e4X9TS0gf8XOJUR551+tRQ=
github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2/go.mod h1:OH0Z4ZMVXbgs6cNRap+ENDSNrfp1v2LA6K1qWWMT30M=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA=
github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E=
github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE=

View File

@ -338,6 +338,7 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku,
ClusterID: nodeConfig.ClusterConfig.ClusterID,
EnableMissingMessageVerification: nodeConfig.WakuV2Config.EnableMissingMessageVerification,
EnableStoreConfirmationForMessagesSent: nodeConfig.WakuV2Config.EnableStoreConfirmationForMessagesSent,
UseThrottledPublish: true,
}
// Configure peer exchange and discv5 settings based on node type

View File

@ -764,6 +764,7 @@ func (s *MessageSender) SendPublic(
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
newMessage.Priority = rawMessage.Priority
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)

View File

@ -45,6 +45,15 @@ const (
ResendMethodSendCommunityMessage ResendMethod = 2
)
// MessagePriority determines the ordering for publishing message
type MessagePriority = int
var (
LowPriority MessagePriority = 0
NormalPriority MessagePriority = 1
HighPriority MessagePriority = 2
)
// RawMessage represent a sent or received message, kept for being able
// to re-send/propagate
type RawMessage struct {
@ -73,4 +82,5 @@ type RawMessage struct {
PubsubTopic string
ResendType ResendType
ResendMethod ResendMethod
Priority *MessagePriority
}

View File

@ -1079,6 +1079,7 @@ func (m *Messenger) publishContactCode() error {
LocalChatID: contactCodeTopic,
MessageType: protobuf.ApplicationMetadataMessage_CONTACT_CODE_ADVERTISEMENT,
Payload: payload,
Priority: &common.LowPriority,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@ -1185,6 +1186,7 @@ func (m *Messenger) handleStandaloneChatIdentity(chat *Chat) error {
LocalChatID: chat.ID,
MessageType: protobuf.ApplicationMetadataMessage_CHAT_IDENTITY,
Payload: payload,
Priority: &common.LowPriority,
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

View File

@ -142,6 +142,7 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err
CommunityID: org.ID(),
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
PubsubTopic: org.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
Priority: &common.HighPriority,
}
if org.Encrypted() {
members := org.GetMemberPubkeys()
@ -180,6 +181,7 @@ func (m *Messenger) publishCommunityEvents(community *communities.Community, msg
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EVENTS_MESSAGE,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic
Priority: &common.LowPriority,
}
// TODO: resend in case of failure?
@ -775,6 +777,7 @@ func (m *Messenger) publishGroupGrantMessage(community *communities.Community, t
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_UPDATE_GRANT,
PubsubTopic: community.PubsubTopic(),
Priority: &common.LowPriority,
}
_, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage)
@ -1507,6 +1510,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
Priority: &common.HighPriority,
}
_, err = m.SendMessageToControlNode(community, rawMessage)
@ -1884,6 +1888,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN,
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(),
ResendType: common.ResendTypeRawMessage,
Priority: &common.HighPriority,
}
_, err = m.SendMessageToControlNode(community, &rawMessage)
@ -2031,6 +2036,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ
ResendType: common.ResendTypeRawMessage,
ResendMethod: common.ResendMethodSendPrivate,
Recipients: []*ecdsa.PublicKey{pk},
Priority: &common.HighPriority,
}
if community.Encrypted() {
@ -2224,6 +2230,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_LEAVE,
PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in the community pubsub topic
ResendType: common.ResendTypeRawMessage,
Priority: &common.HighPriority,
}
_, err = m.SendMessageToControlNode(community, &rawMessage)
@ -4225,6 +4232,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error {
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_MESSAGE_ARCHIVE_MAGNETLINK,
SkipGroupMessageWrap: true,
PubsubTopic: community.PubsubTopic(),
Priority: &common.LowPriority,
}
_, err = m.sender.SendPublic(context.Background(), chatID, rawMessage)

View File

@ -58,6 +58,7 @@ func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Communit
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_SHARD_INFO,
PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), // it must be sent always to default shard pubsub topic
Priority: &common.HighPriority,
}
chatName := transport.CommunityShardInfoTopic(community.IDString())

View File

@ -51,6 +51,7 @@ func (m *Messenger) sendCommunityPublicStorenodesInfo(community *communities.Com
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_STORENODES_INFO,
PubsubTopic: community.PubsubTopic(),
Priority: &common.HighPriority,
}
_, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage)

View File

@ -178,6 +178,7 @@ func (m *Messenger) sendDatasyncOffersForCommunities() error {
Ephemeral: true,
SkipApplicationWrap: true,
PubsubTopic: community.PubsubTopic(),
Priority: &common.LowPriority,
}
_, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage)
if err != nil {

View File

@ -74,6 +74,7 @@ func (m *Messenger) sendUserStatus(ctx context.Context, status UserStatus) error
MessageType: protobuf.ApplicationMetadataMessage_STATUS_UPDATE,
ResendType: common.ResendTypeNone, // does this need to be resent?
Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC,
Priority: &common.LowPriority,
}
_, err = m.sender.SendPublic(ctx, contactCodeTopic, rawMessage)
@ -177,6 +178,7 @@ func (m *Messenger) sendCurrentUserStatusToCommunity(ctx context.Context, commun
ResendType: common.ResendTypeNone, // does this need to be resent?
Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC,
PubsubTopic: community.PubsubTopic(),
Priority: &common.LowPriority,
}
_, err = m.sender.SendPublic(ctx, rawMessage.LocalChatID, rawMessage)

View File

@ -1738,6 +1738,7 @@ func (c *Client) queryPushNotificationInfo(publicKey *ecdsa.PublicKey) error {
// we don't want to wrap in an encryption layer message
SkipEncryptionLayer: true,
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY,
Priority: &common.LowPriority,
}
// this is the topic of message

View File

@ -370,8 +370,6 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types.
newMessage.Topic = filter.ContentTopic
newMessage.PublicKey = crypto.FromECDSAPub(publicKey)
t.logger.Debug("SENDING message", zap.Binary("topic", filter.ContentTopic[:]))
return t.api.Post(ctx, *newMessage)
}

View File

@ -222,7 +222,7 @@ func TestTelemetryUponPublishError(t *testing.T) {
}
// This should result in a single request sent by the telemetry client
_, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg)
_, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg, nil)
require.NoError(t, err)
})
}

View File

@ -74,6 +74,10 @@ func (t timestamp) String() string {
return time.Unix(0, int64(t)).Format(time.RFC3339)
}
func Epoch(key string, time time.Time) zap.Field {
return zap.String(key, fmt.Sprintf("%d", time.UnixNano()))
}
// History Query Filters
type historyFilters []*pb.ContentFilter

View File

@ -1,4 +1,4 @@
package api
package filter
import (
"context"

View File

@ -0,0 +1,9 @@
package publish
import (
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
)
// PublishFn represents a function that will publish a message.
type PublishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error

View File

@ -0,0 +1,156 @@
package publish
import (
"container/heap"
"context"
"github.com/waku-org/go-waku/waku/v2/protocol"
)
// MessagePriority determines the ordering for the message priority queue
type MessagePriority = int
const (
LowPriority MessagePriority = 1
NormalPriority MessagePriority = 2
HighPriority MessagePriority = 3
)
type envelopePriority struct {
envelope *protocol.Envelope
priority int
index int
}
type envelopePriorityQueue []*envelopePriority
func (pq envelopePriorityQueue) Len() int { return len(pq) }
func (pq envelopePriorityQueue) Less(i, j int) bool {
if pq[i].priority > pq[j].priority {
return true
} else if pq[i].priority == pq[j].priority {
return pq[i].envelope.Message().GetTimestamp() < pq[j].envelope.Message().GetTimestamp()
}
return false
}
func (pq envelopePriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *envelopePriorityQueue) Push(x any) {
n := len(*pq)
item := x.(*envelopePriority)
item.index = n
*pq = append(*pq, item)
}
func (pq *envelopePriorityQueue) Pop() any {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// MessageQueue is a structure used to handle the ordering of the messages to publish
type MessageQueue struct {
usePriorityQueue bool
toSendChan chan *protocol.Envelope
throttledPrioritySendQueue chan *envelopePriority
envelopeAvailableOnPriorityQueueSignal chan struct{}
envelopePriorityQueue envelopePriorityQueue
}
// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a
// priority queue to handle the ordering of the messages, or use a simple FIFO queue.
func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue {
m := &MessageQueue{
usePriorityQueue: usePriorityQueue,
}
if m.usePriorityQueue {
m.envelopePriorityQueue = make(envelopePriorityQueue, 0)
m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize)
m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize)
heap.Init(&m.envelopePriorityQueue)
} else {
m.toSendChan = make(chan *protocol.Envelope, bufferSize)
}
return m
}
// Start must be called to handle the lifetime of the internals of the message queue
func (m *MessageQueue) Start(ctx context.Context) {
for {
select {
case envelopePriority, ok := <-m.throttledPrioritySendQueue:
if !ok {
continue
}
heap.Push(&m.envelopePriorityQueue, envelopePriority)
m.envelopeAvailableOnPriorityQueueSignal <- struct{}{}
case <-ctx.Done():
if m.usePriorityQueue {
close(m.throttledPrioritySendQueue)
close(m.envelopeAvailableOnPriorityQueueSignal)
} else {
close(m.toSendChan)
}
return
}
}
}
// Push an envelope into the message queue. The priority is optional, and will be ignored
// if the message queue does not use a priority queue
func (m *MessageQueue) Push(envelope *protocol.Envelope, priority ...MessagePriority) {
if m.usePriorityQueue {
msgPriority := NormalPriority
if len(priority) != 0 {
msgPriority = priority[0]
}
m.throttledPrioritySendQueue <- &envelopePriority{
envelope: envelope,
priority: msgPriority,
}
} else {
m.toSendChan <- envelope
}
}
// Pop will return a channel on which a message can be retrieved from the message queue
func (m *MessageQueue) Pop() <-chan *protocol.Envelope {
ch := make(chan *protocol.Envelope)
go func() {
select {
case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal:
if ok {
ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope
}
case envelope, ok := <-m.toSendChan:
if ok {
ch <- envelope
}
}
close(ch)
}()
return ch
}

View File

@ -0,0 +1,37 @@
package publish
import (
"context"
"errors"
"github.com/waku-org/go-waku/waku/v2/protocol"
"go.uber.org/zap"
"golang.org/x/time/rate"
)
// PublishRateLimiter is used to decorate publish functions to limit the
// number of messages per second that can be published
type PublishRateLimiter struct {
limiter *rate.Limiter
}
// NewPublishRateLimiter will create a new instance of PublishRateLimiter.
// You can specify an rate.Inf value to in practice ignore the rate limiting
func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter {
return &PublishRateLimiter{
limiter: rate.NewLimiter(r, b),
}
}
// ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied
func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn {
return func(envelope *protocol.Envelope, logger *zap.Logger) error {
if err := p.limiter.Wait(ctx); err != nil {
if !errors.Is(err, context.Canceled) {
logger.Error("could not send message (limiter)", zap.Error(err))
}
return err
}
return publishFn(envelope, logger)
}
}

5
vendor/modules.txt vendored
View File

@ -1018,12 +1018,13 @@ github.com/waku-org/go-discover/discover/v5wire
github.com/waku-org/go-libp2p-rendezvous
github.com/waku-org/go-libp2p-rendezvous/db
github.com/waku-org/go-libp2p-rendezvous/pb
# github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26
# github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2
## explicit; go 1.21
github.com/waku-org/go-waku/logging
github.com/waku-org/go-waku/tests
github.com/waku-org/go-waku/waku/persistence
github.com/waku-org/go-waku/waku/v2/api
github.com/waku-org/go-waku/waku/v2/api/filter
github.com/waku-org/go-waku/waku/v2/api/publish
github.com/waku-org/go-waku/waku/v2/discv5
github.com/waku-org/go-waku/waku/v2/dnsdisc
github.com/waku-org/go-waku/waku/v2/hash

View File

@ -182,6 +182,7 @@ type NewMessage struct {
Padding []byte `json:"padding"`
TargetPeer string `json:"targetPeer"`
Ephemeral bool `json:"ephemeral"`
Priority *int `json:"priority"`
}
// Post posts a message on the Waku network.
@ -255,7 +256,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt
Ephemeral: &req.Ephemeral,
}
hash, err := api.w.Send(req.PubsubTopic, wakuMsg)
hash, err := api.w.Send(req.PubsubTopic, wakuMsg, req.Priority)
if err != nil {
return nil, err

View File

@ -66,6 +66,7 @@ type Config struct {
SkipPublishToTopic bool `toml:",omitempty"` // Used in testing
EnableMissingMessageVerification bool `toml:",omitempty"`
EnableStoreConfirmationForMessagesSent bool `toml:",omitempty"` //Flag that enables checking with store node for sent message confimration
UseThrottledPublish bool `toml:",omitempty"` // Flag that indicates whether a rate limited priority queue will be used to send messages or not
}
func (c *Config) Validate(logger *zap.Logger) error {

View File

@ -12,7 +12,7 @@ import (
"go.uber.org/zap"
"golang.org/x/exp/maps"
"github.com/waku-org/go-waku/waku/v2/api"
api "github.com/waku-org/go-waku/waku/v2/api/filter"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/filter"

View File

@ -0,0 +1,151 @@
package wakuv2
import (
"errors"
"go.uber.org/zap"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/wakuv2/common"
"github.com/waku-org/go-waku/waku/v2/api/publish"
"github.com/waku-org/go-waku/waku/v2/protocol"
"github.com/waku-org/go-waku/waku/v2/protocol/lightpush"
"github.com/waku-org/go-waku/waku/v2/protocol/pb"
"github.com/waku-org/go-waku/waku/v2/protocol/relay"
)
type PublishMethod int
const (
LightPush PublishMethod = iota
Relay
)
func (pm PublishMethod) String() string {
switch pm {
case LightPush:
return "LightPush"
case Relay:
return "Relay"
default:
return "Unknown"
}
}
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) {
pubsubTopic = w.GetPubsubTopic(pubsubTopic)
if w.protectedTopicStore != nil {
privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic)
if err != nil {
return nil, err
}
if privKey != nil {
err = relay.SignMessage(privKey, msg, pubsubTopic)
if err != nil {
return nil, err
}
}
}
envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic)
if priority != nil {
w.sendQueue.Push(envelope, *priority)
} else {
w.sendQueue.Push(envelope)
}
w.poolMu.Lock()
alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash().Bytes()))
w.poolMu.Unlock()
if !alreadyCached {
recvMessage := common.NewReceivedMessage(envelope, common.SendMessageType)
w.postEvent(recvMessage) // notify the local node about the new message
w.addEnvelope(recvMessage)
}
return envelope.Hash().Bytes(), nil
}
func (w *Waku) broadcast() {
for {
var envelope *protocol.Envelope
select {
case envelope = <-w.sendQueue.Pop():
case <-w.ctx.Done():
return
}
logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
var fn publish.PublishFn
var publishMethod PublishMethod
if w.cfg.SkipPublishToTopic {
// For now only used in testing to simulate going offline
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
return errors.New("test send failure")
}
} else if w.cfg.LightClient {
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush))
return err
}
} else {
publishMethod = Relay
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
_, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic()))
return err
}
}
fn = w.limiter.ThrottlePublishFn(w.ctx, fn)
// Wraps the publish function with a call to the telemetry client
if w.statusTelemetryClient != nil {
sendFn := fn
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
err := sendFn(env, logger)
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}})
}
return err
}
}
w.wg.Add(1)
go w.publishEnvelope(envelope, fn, logger)
}
}
func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.PublishFn, logger *zap.Logger) {
defer w.wg.Done()
if err := publishFn(envelope, logger); err != nil {
logger.Error("could not send message", zap.Error(err))
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeExpired,
})
return
} else {
if !w.cfg.EnableStoreConfirmationForMessagesSent {
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeSent,
})
}
}
}

View File

@ -31,6 +31,7 @@ import (
"runtime"
"strings"
"sync"
"testing"
"time"
"github.com/jellydator/ttlcache/v3"
@ -42,6 +43,7 @@ import (
mapset "github.com/deckarep/golang-set"
"golang.org/x/crypto/pbkdf2"
"golang.org/x/time/rate"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
@ -55,6 +57,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/metrics"
"github.com/waku-org/go-waku/waku/v2/api/publish"
"github.com/waku-org/go-waku/waku/v2/dnsdisc"
"github.com/waku-org/go-waku/waku/v2/onlinechecker"
"github.com/waku-org/go-waku/waku/v2/peermanager"
@ -90,7 +93,9 @@ const messageExpiredPerid = 10 // in seconds
const maxRelayPeers = 300
const randomPeersKeepAliveInterval = 5 * time.Second
const allPeersKeepAliveInterval = 5 * time.Minute
const PeersToPublishForLightpush = 2
const peersToPublishForLightpush = 2
const publishingLimiterRate = rate.Limit(2)
const publishingLimitBurst = 4
type SentEnvelope struct {
Envelope *protocol.Envelope
@ -133,8 +138,11 @@ type Waku struct {
bandwidthCounter *metrics.BandwidthCounter
protectedTopicStore *persistence.ProtectedTopicsStore
sendQueue chan *protocol.Envelope
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
sendQueue *publish.MessageQueue
limiter *publish.PublishRateLimiter
msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded
topicInterest map[string]TopicInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages
topicInterestMu sync.Mutex
@ -227,7 +235,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
envelopeCache: newTTLCache(),
expirations: make(map[uint32]mapset.Set),
msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit),
sendQueue: make(chan *protocol.Envelope, 1000),
topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100),
connectionNotifChan: make(chan node.PeerConnection, 20),
connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription),
@ -247,6 +254,16 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge
onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed,
onPeerStats: onPeerStats,
onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker),
sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish),
}
if !cfg.UseThrottledPublish || testing.Testing() {
// To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter,
// basically disabling the rate limit functionality
waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1)
} else {
waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst)
}
waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger)
@ -979,76 +996,6 @@ func (w *Waku) SkipPublishToTopic(value bool) {
w.cfg.SkipPublishToTopic = value
}
type PublishMethod int
const (
LightPush PublishMethod = iota
Relay
)
func (pm PublishMethod) String() string {
switch pm {
case LightPush:
return "LightPush"
case Relay:
return "Relay"
default:
return "Unknown"
}
}
func (w *Waku) broadcast() {
for {
select {
case envelope := <-w.sendQueue:
logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp()))
var fn publishFn
var publishMethod PublishMethod
if w.cfg.SkipPublishToTopic {
// For now only used in testing to simulate going offline
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
return errors.New("test send failure")
}
} else if w.cfg.LightClient {
publishMethod = LightPush
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
logger.Info("publishing message via lightpush")
_, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(PeersToPublishForLightpush))
return err
}
} else {
publishMethod = Relay
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic()))
logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt))
_, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic()))
return err
}
}
// Wraps the publish function with a call to the telemetry client
if w.statusTelemetryClient != nil {
sendFn := fn
fn = func(env *protocol.Envelope, logger *zap.Logger) error {
err := sendFn(env, logger)
if err == nil {
w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod})
} else {
w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}})
}
return err
}
}
w.wg.Add(1)
go w.publishEnvelope(envelope, fn, logger)
case <-w.ctx.Done():
return
}
}
}
func (w *Waku) checkIfMessagesStored() {
if !w.cfg.EnableStoreConfirmationForMessagesSent {
return
@ -1139,62 +1086,6 @@ func (w *Waku) SetStorePeerID(peerID peer.ID) {
w.storePeerID = peerID
}
type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error
func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) {
defer w.wg.Done()
if err := publishFn(envelope, logger); err != nil {
logger.Error("could not send message", zap.Error(err))
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeExpired,
})
return
} else {
if !w.cfg.EnableStoreConfirmationForMessagesSent {
w.SendEnvelopeEvent(common.EnvelopeEvent{
Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()),
Event: common.EventEnvelopeSent,
})
}
}
}
// Send injects a message into the waku send queue, to be distributed in the
// network in the coming cycles.
func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) {
pubsubTopic = w.GetPubsubTopic(pubsubTopic)
if w.protectedTopicStore != nil {
privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic)
if err != nil {
return nil, err
}
if privKey != nil {
err = relay.SignMessage(privKey, msg, pubsubTopic)
if err != nil {
return nil, err
}
}
}
envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic)
w.sendQueue <- envelope
w.poolMu.Lock()
alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash().Bytes()))
w.poolMu.Unlock()
if !alreadyCached {
recvMessage := common.NewReceivedMessage(envelope, common.SendMessageType)
w.postEvent(recvMessage) // notify the local node about the new message
w.addEnvelope(recvMessage)
}
return envelope.Hash().Bytes(), nil
}
// ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes
func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, relayTime []uint32, pubsubTopic string) []gethcommon.Hash {
selectedPeer := w.storePeerID
@ -1425,6 +1316,8 @@ func (w *Waku) Start() error {
go w.broadcast()
go w.sendQueue.Start(w.ctx)
go w.checkIfMessagesStored()
// we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()`

View File

@ -219,7 +219,7 @@ func TestBasicWakuV2(t *testing.T) {
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
})
}, nil)
require.NoError(t, err)
@ -416,7 +416,7 @@ func TestWakuV2Filter(t *testing.T) {
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
})
}, nil)
require.NoError(t, err)
time.Sleep(5 * time.Second)
@ -443,7 +443,7 @@ func TestWakuV2Filter(t *testing.T) {
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
})
}, nil)
require.NoError(t, err)
time.Sleep(10 * time.Second)
@ -531,7 +531,7 @@ func TestWakuV2Store(t *testing.T) {
ContentTopic: contentTopic.ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
})
}, nil)
require.NoError(t, err)
waitForEnvelope(t, contentTopic.ContentTopic(), w2EnvelopeCh)
@ -632,7 +632,7 @@ func TestConfirmMessageDelivered(t *testing.T) {
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
Ephemeral: proto.Bool(false),
})
}, nil)
require.NoError(t, err)
time.Sleep(1 * time.Second)
@ -782,7 +782,7 @@ func TestLightpushRateLimit(t *testing.T) {
ContentTopic: maps.Keys(contentTopics)[0].ContentTopic(),
Version: proto.Uint32(0),
Timestamp: &msgTimestamp,
})
}, nil)
require.NoError(t, err)