diff --git a/eth-node/bridge/geth/public_wakuv2_api.go b/eth-node/bridge/geth/public_wakuv2_api.go index b70cfb608..69f15a013 100644 --- a/eth-node/bridge/geth/public_wakuv2_api.go +++ b/eth-node/bridge/geth/public_wakuv2_api.go @@ -99,6 +99,7 @@ func (w *gethPublicWakuV2APIWrapper) Post(ctx context.Context, req types.NewMess Padding: req.Padding, TargetPeer: req.TargetPeer, Ephemeral: req.Ephemeral, + Priority: req.Priority, } return w.api.Post(ctx, msg) } diff --git a/eth-node/types/rpc.go b/eth-node/types/rpc.go index e7feb0950..6105f3b01 100644 --- a/eth-node/types/rpc.go +++ b/eth-node/types/rpc.go @@ -18,6 +18,7 @@ type NewMessage struct { PowTarget float64 `json:"powTarget"` TargetPeer string `json:"targetPeer"` Ephemeral bool `json:"ephemeral"` + Priority *int `json:"priority"` } // Message is the RPC representation of a whisper message. diff --git a/go.mod b/go.mod index 6d7e8a8d5..3ce4090cf 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 + github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index c5ab705a5..d34ed8111 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 h1:F657EAwHvwTcVjMddQYGZjVC3mfYsdPD9AAHPvV9/hI= -github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26/go.mod h1:ugDTCvcP6oJ9mTtINeo4EIsnC3oQCU3RsctNKu4MsRw= +github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2 h1:S8KqFD9b1T+fJvKqDbb95e4X9TS0gf8XOJUR551+tRQ= +github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2/go.mod h1:OH0Z4ZMVXbgs6cNRap+ENDSNrfp1v2LA6K1qWWMT30M= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/node/status_node_services.go b/node/status_node_services.go index f3cf57a24..f2716f29d 100644 --- a/node/status_node_services.go +++ b/node/status_node_services.go @@ -338,6 +338,7 @@ func (b *StatusNode) wakuV2Service(nodeConfig *params.NodeConfig) (*wakuv2.Waku, ClusterID: nodeConfig.ClusterConfig.ClusterID, EnableMissingMessageVerification: nodeConfig.WakuV2Config.EnableMissingMessageVerification, EnableStoreConfirmationForMessagesSent: nodeConfig.WakuV2Config.EnableStoreConfirmationForMessagesSent, + UseThrottledPublish: true, } // Configure peer exchange and discv5 settings based on node type diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 479cd203e..20e9ab544 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -764,6 +764,7 @@ func (s *MessageSender) SendPublic( newMessage.Ephemeral = rawMessage.Ephemeral newMessage.PubsubTopic = rawMessage.PubsubTopic + newMessage.Priority = rawMessage.Priority messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage) diff --git a/protocol/common/raw_message.go b/protocol/common/raw_message.go index a784e586a..608d959b8 100644 --- a/protocol/common/raw_message.go +++ b/protocol/common/raw_message.go @@ -45,6 +45,15 @@ const ( ResendMethodSendCommunityMessage ResendMethod = 2 ) +// MessagePriority determines the ordering for publishing message +type MessagePriority = int + +var ( + LowPriority MessagePriority = 0 + NormalPriority MessagePriority = 1 + HighPriority MessagePriority = 2 +) + // RawMessage represent a sent or received message, kept for being able // to re-send/propagate type RawMessage struct { @@ -73,4 +82,5 @@ type RawMessage struct { PubsubTopic string ResendType ResendType ResendMethod ResendMethod + Priority *MessagePriority } diff --git a/protocol/messenger.go b/protocol/messenger.go index a50e1953d..b65fa4bfa 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -1079,6 +1079,7 @@ func (m *Messenger) publishContactCode() error { LocalChatID: contactCodeTopic, MessageType: protobuf.ApplicationMetadataMessage_CONTACT_CODE_ADVERTISEMENT, Payload: payload, + Priority: &common.LowPriority, } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -1185,6 +1186,7 @@ func (m *Messenger) handleStandaloneChatIdentity(chat *Chat) error { LocalChatID: chat.ID, MessageType: protobuf.ApplicationMetadataMessage_CHAT_IDENTITY, Payload: payload, + Priority: &common.LowPriority, } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index bd2ea46cf..3012e1ea8 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -142,6 +142,7 @@ func (m *Messenger) publishOrg(org *communities.Community, shouldRekey bool) err CommunityID: org.ID(), MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, PubsubTopic: org.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + Priority: &common.HighPriority, } if org.Encrypted() { members := org.GetMemberPubkeys() @@ -180,6 +181,7 @@ func (m *Messenger) publishCommunityEvents(community *communities.Community, msg SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_EVENTS_MESSAGE, PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in community pubsub topic + Priority: &common.LowPriority, } // TODO: resend in case of failure? @@ -775,6 +777,7 @@ func (m *Messenger) publishGroupGrantMessage(community *communities.Community, t SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_UPDATE_GRANT, PubsubTopic: community.PubsubTopic(), + Priority: &common.LowPriority, } _, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage) @@ -1507,6 +1510,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN, PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), + Priority: &common.HighPriority, } _, err = m.SendMessageToControlNode(community, rawMessage) @@ -1884,6 +1888,7 @@ func (m *Messenger) CancelRequestToJoinCommunity(ctx context.Context, request *r MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_CANCEL_REQUEST_TO_JOIN, PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), ResendType: common.ResendTypeRawMessage, + Priority: &common.HighPriority, } _, err = m.SendMessageToControlNode(community, &rawMessage) @@ -2031,6 +2036,7 @@ func (m *Messenger) acceptRequestToJoinCommunity(requestToJoin *communities.Requ ResendType: common.ResendTypeRawMessage, ResendMethod: common.ResendMethodSendPrivate, Recipients: []*ecdsa.PublicKey{pk}, + Priority: &common.HighPriority, } if community.Encrypted() { @@ -2224,6 +2230,7 @@ func (m *Messenger) LeaveCommunity(communityID types.HexBytes) (*MessengerRespon MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_LEAVE, PubsubTopic: community.PubsubTopic(), // TODO: confirm if it should be sent in the community pubsub topic ResendType: common.ResendTypeRawMessage, + Priority: &common.HighPriority, } _, err = m.SendMessageToControlNode(community, &rawMessage) @@ -4225,6 +4232,7 @@ func (m *Messenger) dispatchMagnetlinkMessage(communityID string) error { MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_MESSAGE_ARCHIVE_MAGNETLINK, SkipGroupMessageWrap: true, PubsubTopic: community.PubsubTopic(), + Priority: &common.LowPriority, } _, err = m.sender.SendPublic(context.Background(), chatID, rawMessage) diff --git a/protocol/messenger_community_shard.go b/protocol/messenger_community_shard.go index c32b5b1a5..8fdda061d 100644 --- a/protocol/messenger_community_shard.go +++ b/protocol/messenger_community_shard.go @@ -58,6 +58,7 @@ func (m *Messenger) sendPublicCommunityShardInfo(community *communities.Communit SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_SHARD_INFO, PubsubTopic: shard.DefaultNonProtectedPubsubTopic(), // it must be sent always to default shard pubsub topic + Priority: &common.HighPriority, } chatName := transport.CommunityShardInfoTopic(community.IDString()) diff --git a/protocol/messenger_community_storenodes.go b/protocol/messenger_community_storenodes.go index e2374085d..571ccc2d6 100644 --- a/protocol/messenger_community_storenodes.go +++ b/protocol/messenger_community_storenodes.go @@ -51,6 +51,7 @@ func (m *Messenger) sendCommunityPublicStorenodesInfo(community *communities.Com SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_PUBLIC_STORENODES_INFO, PubsubTopic: community.PubsubTopic(), + Priority: &common.HighPriority, } _, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage) diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index 88a08d5ec..f3cbb8f16 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -178,6 +178,7 @@ func (m *Messenger) sendDatasyncOffersForCommunities() error { Ephemeral: true, SkipApplicationWrap: true, PubsubTopic: community.PubsubTopic(), + Priority: &common.LowPriority, } _, err = m.sender.SendPublic(context.Background(), community.IDString(), rawMessage) if err != nil { diff --git a/protocol/messenger_status_updates.go b/protocol/messenger_status_updates.go index 35ebfe6dc..e954626f2 100644 --- a/protocol/messenger_status_updates.go +++ b/protocol/messenger_status_updates.go @@ -74,6 +74,7 @@ func (m *Messenger) sendUserStatus(ctx context.Context, status UserStatus) error MessageType: protobuf.ApplicationMetadataMessage_STATUS_UPDATE, ResendType: common.ResendTypeNone, // does this need to be resent? Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC, + Priority: &common.LowPriority, } _, err = m.sender.SendPublic(ctx, contactCodeTopic, rawMessage) @@ -177,6 +178,7 @@ func (m *Messenger) sendCurrentUserStatusToCommunity(ctx context.Context, commun ResendType: common.ResendTypeNone, // does this need to be resent? Ephemeral: statusUpdate.StatusType == protobuf.StatusUpdate_AUTOMATIC, PubsubTopic: community.PubsubTopic(), + Priority: &common.LowPriority, } _, err = m.sender.SendPublic(ctx, rawMessage.LocalChatID, rawMessage) diff --git a/protocol/pushnotificationclient/client.go b/protocol/pushnotificationclient/client.go index d06fd635d..c1bac7cdf 100644 --- a/protocol/pushnotificationclient/client.go +++ b/protocol/pushnotificationclient/client.go @@ -1738,6 +1738,7 @@ func (c *Client) queryPushNotificationInfo(publicKey *ecdsa.PublicKey) error { // we don't want to wrap in an encryption layer message SkipEncryptionLayer: true, MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY, + Priority: &common.LowPriority, } // this is the topic of message diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index c099a94d3..6785289e9 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -370,8 +370,6 @@ func (t *Transport) SendCommunityMessage(ctx context.Context, newMessage *types. newMessage.Topic = filter.ContentTopic newMessage.PublicKey = crypto.FromECDSAPub(publicKey) - t.logger.Debug("SENDING message", zap.Binary("topic", filter.ContentTopic[:])) - return t.api.Post(ctx, *newMessage) } diff --git a/telemetry/client_test.go b/telemetry/client_test.go index adf72d45f..dbff6db27 100644 --- a/telemetry/client_test.go +++ b/telemetry/client_test.go @@ -222,7 +222,7 @@ func TestTelemetryUponPublishError(t *testing.T) { } // This should result in a single request sent by the telemetry client - _, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg) + _, err = w.Send(wakuConfig.DefaultShardPubsubTopic, msg, nil) require.NoError(t, err) }) } diff --git a/vendor/github.com/waku-org/go-waku/logging/logging.go b/vendor/github.com/waku-org/go-waku/logging/logging.go index 19732d55f..d577a1c5e 100644 --- a/vendor/github.com/waku-org/go-waku/logging/logging.go +++ b/vendor/github.com/waku-org/go-waku/logging/logging.go @@ -74,6 +74,10 @@ func (t timestamp) String() string { return time.Unix(0, int64(t)).Format(time.RFC3339) } +func Epoch(key string, time time.Time) zap.Field { + return zap.String(key, fmt.Sprintf("%d", time.UnixNano())) +} + // History Query Filters type historyFilters []*pb.ContentFilter diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go similarity index 99% rename from vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go rename to vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go index 1f9ea6be4..6bd041e6a 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/filter.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/filter/filter.go @@ -1,4 +1,4 @@ -package api +package filter import ( "context" diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/common.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/common.go new file mode 100644 index 000000000..be72f4c12 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/common.go @@ -0,0 +1,9 @@ +package publish + +import ( + "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" +) + +// PublishFn represents a function that will publish a message. +type PublishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go new file mode 100644 index 000000000..fbd79df88 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go @@ -0,0 +1,156 @@ +package publish + +import ( + "container/heap" + "context" + + "github.com/waku-org/go-waku/waku/v2/protocol" +) + +// MessagePriority determines the ordering for the message priority queue +type MessagePriority = int + +const ( + LowPriority MessagePriority = 1 + NormalPriority MessagePriority = 2 + HighPriority MessagePriority = 3 +) + +type envelopePriority struct { + envelope *protocol.Envelope + priority int + index int +} + +type envelopePriorityQueue []*envelopePriority + +func (pq envelopePriorityQueue) Len() int { return len(pq) } + +func (pq envelopePriorityQueue) Less(i, j int) bool { + if pq[i].priority > pq[j].priority { + return true + } else if pq[i].priority == pq[j].priority { + return pq[i].envelope.Message().GetTimestamp() < pq[j].envelope.Message().GetTimestamp() + } + + return false +} + +func (pq envelopePriorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *envelopePriorityQueue) Push(x any) { + n := len(*pq) + item := x.(*envelopePriority) + item.index = n + *pq = append(*pq, item) +} + +func (pq *envelopePriorityQueue) Pop() any { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil // avoid memory leak + item.index = -1 // for safety + *pq = old[0 : n-1] + return item +} + +// MessageQueue is a structure used to handle the ordering of the messages to publish +type MessageQueue struct { + usePriorityQueue bool + + toSendChan chan *protocol.Envelope + throttledPrioritySendQueue chan *envelopePriority + envelopeAvailableOnPriorityQueueSignal chan struct{} + envelopePriorityQueue envelopePriorityQueue +} + +// NewMessageQueue returns a new instance of MessageQueue. The MessageQueue can internally use a +// priority queue to handle the ordering of the messages, or use a simple FIFO queue. +func NewMessageQueue(bufferSize int, usePriorityQueue bool) *MessageQueue { + m := &MessageQueue{ + usePriorityQueue: usePriorityQueue, + } + + if m.usePriorityQueue { + m.envelopePriorityQueue = make(envelopePriorityQueue, 0) + m.throttledPrioritySendQueue = make(chan *envelopePriority, bufferSize) + m.envelopeAvailableOnPriorityQueueSignal = make(chan struct{}, bufferSize) + heap.Init(&m.envelopePriorityQueue) + } else { + m.toSendChan = make(chan *protocol.Envelope, bufferSize) + } + + return m +} + +// Start must be called to handle the lifetime of the internals of the message queue +func (m *MessageQueue) Start(ctx context.Context) { + + for { + select { + case envelopePriority, ok := <-m.throttledPrioritySendQueue: + if !ok { + continue + } + + heap.Push(&m.envelopePriorityQueue, envelopePriority) + + m.envelopeAvailableOnPriorityQueueSignal <- struct{}{} + + case <-ctx.Done(): + if m.usePriorityQueue { + close(m.throttledPrioritySendQueue) + close(m.envelopeAvailableOnPriorityQueueSignal) + } else { + close(m.toSendChan) + } + return + } + } +} + +// Push an envelope into the message queue. The priority is optional, and will be ignored +// if the message queue does not use a priority queue +func (m *MessageQueue) Push(envelope *protocol.Envelope, priority ...MessagePriority) { + if m.usePriorityQueue { + msgPriority := NormalPriority + if len(priority) != 0 { + msgPriority = priority[0] + } + + m.throttledPrioritySendQueue <- &envelopePriority{ + envelope: envelope, + priority: msgPriority, + } + } else { + m.toSendChan <- envelope + } +} + +// Pop will return a channel on which a message can be retrieved from the message queue +func (m *MessageQueue) Pop() <-chan *protocol.Envelope { + ch := make(chan *protocol.Envelope) + + go func() { + select { + case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal: + if ok { + ch <- heap.Pop(&m.envelopePriorityQueue).(*envelopePriority).envelope + } + + case envelope, ok := <-m.toSendChan: + if ok { + ch <- envelope + } + } + + close(ch) + }() + + return ch +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go new file mode 100644 index 000000000..4322413b3 --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/rate_limiting.go @@ -0,0 +1,37 @@ +package publish + +import ( + "context" + "errors" + + "github.com/waku-org/go-waku/waku/v2/protocol" + "go.uber.org/zap" + "golang.org/x/time/rate" +) + +// PublishRateLimiter is used to decorate publish functions to limit the +// number of messages per second that can be published +type PublishRateLimiter struct { + limiter *rate.Limiter +} + +// NewPublishRateLimiter will create a new instance of PublishRateLimiter. +// You can specify an rate.Inf value to in practice ignore the rate limiting +func NewPublishRateLimiter(r rate.Limit, b int) *PublishRateLimiter { + return &PublishRateLimiter{ + limiter: rate.NewLimiter(r, b), + } +} + +// ThrottlePublishFn is used to decorate a PublishFn so rate limiting is applied +func (p *PublishRateLimiter) ThrottlePublishFn(ctx context.Context, publishFn PublishFn) PublishFn { + return func(envelope *protocol.Envelope, logger *zap.Logger) error { + if err := p.limiter.Wait(ctx); err != nil { + if !errors.Is(err, context.Canceled) { + logger.Error("could not send message (limiter)", zap.Error(err)) + } + return err + } + return publishFn(envelope, logger) + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 231ba1e21..b13ee811a 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,12 +1018,13 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240731185821-04a9af931f26 +# github.com/waku-org/go-waku v0.8.1-0.20240801160005-d047df3859e2 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests github.com/waku-org/go-waku/waku/persistence -github.com/waku-org/go-waku/waku/v2/api +github.com/waku-org/go-waku/waku/v2/api/filter +github.com/waku-org/go-waku/waku/v2/api/publish github.com/waku-org/go-waku/waku/v2/discv5 github.com/waku-org/go-waku/waku/v2/dnsdisc github.com/waku-org/go-waku/waku/v2/hash diff --git a/wakuv2/api.go b/wakuv2/api.go index bdff5fe16..f106b32f5 100644 --- a/wakuv2/api.go +++ b/wakuv2/api.go @@ -182,6 +182,7 @@ type NewMessage struct { Padding []byte `json:"padding"` TargetPeer string `json:"targetPeer"` Ephemeral bool `json:"ephemeral"` + Priority *int `json:"priority"` } // Post posts a message on the Waku network. @@ -255,7 +256,7 @@ func (api *PublicWakuAPI) Post(ctx context.Context, req NewMessage) (hexutil.Byt Ephemeral: &req.Ephemeral, } - hash, err := api.w.Send(req.PubsubTopic, wakuMsg) + hash, err := api.w.Send(req.PubsubTopic, wakuMsg, req.Priority) if err != nil { return nil, err diff --git a/wakuv2/config.go b/wakuv2/config.go index 78399ac53..cf27a5e6d 100644 --- a/wakuv2/config.go +++ b/wakuv2/config.go @@ -66,6 +66,7 @@ type Config struct { SkipPublishToTopic bool `toml:",omitempty"` // Used in testing EnableMissingMessageVerification bool `toml:",omitempty"` EnableStoreConfirmationForMessagesSent bool `toml:",omitempty"` //Flag that enables checking with store node for sent message confimration + UseThrottledPublish bool `toml:",omitempty"` // Flag that indicates whether a rate limited priority queue will be used to send messages or not } func (c *Config) Validate(logger *zap.Logger) error { diff --git a/wakuv2/filter_manager.go b/wakuv2/filter_manager.go index 892f20391..ad0d1df06 100644 --- a/wakuv2/filter_manager.go +++ b/wakuv2/filter_manager.go @@ -12,7 +12,7 @@ import ( "go.uber.org/zap" "golang.org/x/exp/maps" - "github.com/waku-org/go-waku/waku/v2/api" + api "github.com/waku-org/go-waku/waku/v2/api/filter" "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/protocol" "github.com/waku-org/go-waku/waku/v2/protocol/filter" diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go new file mode 100644 index 000000000..05fcce883 --- /dev/null +++ b/wakuv2/message_publishing.go @@ -0,0 +1,151 @@ +package wakuv2 + +import ( + "errors" + + "go.uber.org/zap" + + gethcommon "github.com/ethereum/go-ethereum/common" + "github.com/status-im/status-go/wakuv2/common" + "github.com/waku-org/go-waku/waku/v2/api/publish" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" +) + +type PublishMethod int + +const ( + LightPush PublishMethod = iota + Relay +) + +func (pm PublishMethod) String() string { + switch pm { + case LightPush: + return "LightPush" + case Relay: + return "Relay" + default: + return "Unknown" + } +} + +// Send injects a message into the waku send queue, to be distributed in the +// network in the coming cycles. +func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]byte, error) { + pubsubTopic = w.GetPubsubTopic(pubsubTopic) + if w.protectedTopicStore != nil { + privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) + if err != nil { + return nil, err + } + + if privKey != nil { + err = relay.SignMessage(privKey, msg, pubsubTopic) + if err != nil { + return nil, err + } + } + } + + envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic) + + if priority != nil { + w.sendQueue.Push(envelope, *priority) + } else { + w.sendQueue.Push(envelope) + } + + w.poolMu.Lock() + alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash().Bytes())) + w.poolMu.Unlock() + if !alreadyCached { + recvMessage := common.NewReceivedMessage(envelope, common.SendMessageType) + w.postEvent(recvMessage) // notify the local node about the new message + w.addEnvelope(recvMessage) + } + + return envelope.Hash().Bytes(), nil +} + +func (w *Waku) broadcast() { + for { + var envelope *protocol.Envelope + + select { + case envelope = <-w.sendQueue.Pop(): + + case <-w.ctx.Done(): + return + } + + logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) + + var fn publish.PublishFn + var publishMethod PublishMethod + + if w.cfg.SkipPublishToTopic { + // For now only used in testing to simulate going offline + publishMethod = LightPush + fn = func(env *protocol.Envelope, logger *zap.Logger) error { + return errors.New("test send failure") + } + } else if w.cfg.LightClient { + publishMethod = LightPush + fn = func(env *protocol.Envelope, logger *zap.Logger) error { + logger.Info("publishing message via lightpush") + _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(peersToPublishForLightpush)) + return err + } + } else { + publishMethod = Relay + fn = func(env *protocol.Envelope, logger *zap.Logger) error { + peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic())) + logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) + _, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) + return err + } + } + + fn = w.limiter.ThrottlePublishFn(w.ctx, fn) + + // Wraps the publish function with a call to the telemetry client + if w.statusTelemetryClient != nil { + sendFn := fn + fn = func(env *protocol.Envelope, logger *zap.Logger) error { + err := sendFn(env, logger) + if err == nil { + w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) + } else { + w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) + } + return err + } + } + + w.wg.Add(1) + go w.publishEnvelope(envelope, fn, logger) + } +} + +func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publish.PublishFn, logger *zap.Logger) { + defer w.wg.Done() + + if err := publishFn(envelope, logger); err != nil { + logger.Error("could not send message", zap.Error(err)) + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), + Event: common.EventEnvelopeExpired, + }) + return + } else { + if !w.cfg.EnableStoreConfirmationForMessagesSent { + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), + Event: common.EventEnvelopeSent, + }) + } + } +} diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 9f6a83d19..97ce3506e 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -31,6 +31,7 @@ import ( "runtime" "strings" "sync" + "testing" "time" "github.com/jellydator/ttlcache/v3" @@ -42,6 +43,7 @@ import ( mapset "github.com/deckarep/golang-set" "golang.org/x/crypto/pbkdf2" + "golang.org/x/time/rate" gethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" @@ -55,6 +57,7 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/metrics" + "github.com/waku-org/go-waku/waku/v2/api/publish" "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/onlinechecker" "github.com/waku-org/go-waku/waku/v2/peermanager" @@ -90,7 +93,9 @@ const messageExpiredPerid = 10 // in seconds const maxRelayPeers = 300 const randomPeersKeepAliveInterval = 5 * time.Second const allPeersKeepAliveInterval = 5 * time.Minute -const PeersToPublishForLightpush = 2 +const peersToPublishForLightpush = 2 +const publishingLimiterRate = rate.Limit(2) +const publishingLimitBurst = 4 type SentEnvelope struct { Envelope *protocol.Envelope @@ -133,8 +138,11 @@ type Waku struct { bandwidthCounter *metrics.BandwidthCounter protectedTopicStore *persistence.ProtectedTopicsStore - sendQueue chan *protocol.Envelope - msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded + + sendQueue *publish.MessageQueue + limiter *publish.PublishRateLimiter + + msgQueue chan *common.ReceivedMessage // Message queue for waku messages that havent been decoded topicInterest map[string]TopicInterest // Track message verification requests and when was the last time a pubsub topic was verified for missing messages topicInterestMu sync.Mutex @@ -227,7 +235,6 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge envelopeCache: newTTLCache(), expirations: make(map[uint32]mapset.Set), msgQueue: make(chan *common.ReceivedMessage, messageQueueLimit), - sendQueue: make(chan *protocol.Envelope, 1000), topicHealthStatusChan: make(chan peermanager.TopicHealthStatus, 100), connectionNotifChan: make(chan node.PeerConnection, 20), connStatusSubscriptions: make(map[string]*types.ConnStatusSubscription), @@ -247,6 +254,16 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, onPeerStats: onPeerStats, onlineChecker: onlinechecker.NewDefaultOnlineChecker(false).(*onlinechecker.DefaultOnlineChecker), + sendQueue: publish.NewMessageQueue(1000, cfg.UseThrottledPublish), + } + + if !cfg.UseThrottledPublish || testing.Testing() { + // To avoid delaying the tests, or for when we dont want to rate limit, we set up an infinite rate limiter, + // basically disabling the rate limit functionality + waku.limiter = publish.NewPublishRateLimiter(rate.Inf, 1) + + } else { + waku.limiter = publish.NewPublishRateLimiter(publishingLimiterRate, publishingLimitBurst) } waku.filters = common.NewFilters(waku.cfg.DefaultShardPubsubTopic, waku.logger) @@ -979,76 +996,6 @@ func (w *Waku) SkipPublishToTopic(value bool) { w.cfg.SkipPublishToTopic = value } -type PublishMethod int - -const ( - LightPush PublishMethod = iota - Relay -) - -func (pm PublishMethod) String() string { - switch pm { - case LightPush: - return "LightPush" - case Relay: - return "Relay" - default: - return "Unknown" - } -} - -func (w *Waku) broadcast() { - for { - select { - case envelope := <-w.sendQueue: - logger := w.logger.With(zap.Stringer("envelopeHash", envelope.Hash()), zap.String("pubsubTopic", envelope.PubsubTopic()), zap.String("contentTopic", envelope.Message().ContentTopic), zap.Int64("timestamp", envelope.Message().GetTimestamp())) - var fn publishFn - var publishMethod PublishMethod - if w.cfg.SkipPublishToTopic { - // For now only used in testing to simulate going offline - publishMethod = LightPush - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - return errors.New("test send failure") - } - } else if w.cfg.LightClient { - publishMethod = LightPush - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - logger.Info("publishing message via lightpush") - _, err := w.node.Lightpush().Publish(w.ctx, env.Message(), lightpush.WithPubSubTopic(env.PubsubTopic()), lightpush.WithMaxPeers(PeersToPublishForLightpush)) - return err - } - } else { - publishMethod = Relay - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - peerCnt := len(w.node.Relay().PubSub().ListPeers(env.PubsubTopic())) - logger.Info("publishing message via relay", zap.Int("peerCnt", peerCnt)) - _, err := w.node.Relay().Publish(w.ctx, env.Message(), relay.WithPubSubTopic(env.PubsubTopic())) - return err - } - } - - // Wraps the publish function with a call to the telemetry client - if w.statusTelemetryClient != nil { - sendFn := fn - fn = func(env *protocol.Envelope, logger *zap.Logger) error { - err := sendFn(env, logger) - if err == nil { - w.statusTelemetryClient.PushSentEnvelope(SentEnvelope{Envelope: env, PublishMethod: publishMethod}) - } else { - w.statusTelemetryClient.PushErrorSendingEnvelope(ErrorSendingEnvelope{Error: err, SentEnvelope: SentEnvelope{Envelope: env, PublishMethod: publishMethod}}) - } - return err - } - } - - w.wg.Add(1) - go w.publishEnvelope(envelope, fn, logger) - case <-w.ctx.Done(): - return - } - } -} - func (w *Waku) checkIfMessagesStored() { if !w.cfg.EnableStoreConfirmationForMessagesSent { return @@ -1139,62 +1086,6 @@ func (w *Waku) SetStorePeerID(peerID peer.ID) { w.storePeerID = peerID } -type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error - -func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) { - defer w.wg.Done() - - if err := publishFn(envelope, logger); err != nil { - logger.Error("could not send message", zap.Error(err)) - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), - Event: common.EventEnvelopeExpired, - }) - return - } else { - if !w.cfg.EnableStoreConfirmationForMessagesSent { - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), - Event: common.EventEnvelopeSent, - }) - } - } -} - -// Send injects a message into the waku send queue, to be distributed in the -// network in the coming cycles. -func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { - pubsubTopic = w.GetPubsubTopic(pubsubTopic) - if w.protectedTopicStore != nil { - privKey, err := w.protectedTopicStore.FetchPrivateKey(pubsubTopic) - if err != nil { - return nil, err - } - - if privKey != nil { - err = relay.SignMessage(privKey, msg, pubsubTopic) - if err != nil { - return nil, err - } - } - } - - envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic) - - w.sendQueue <- envelope - - w.poolMu.Lock() - alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash().Bytes())) - w.poolMu.Unlock() - if !alreadyCached { - recvMessage := common.NewReceivedMessage(envelope, common.SendMessageType) - w.postEvent(recvMessage) // notify the local node about the new message - w.addEnvelope(recvMessage) - } - - return envelope.Hash().Bytes(), nil -} - // ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, relayTime []uint32, pubsubTopic string) []gethcommon.Hash { selectedPeer := w.storePeerID @@ -1425,6 +1316,8 @@ func (w *Waku) Start() error { go w.broadcast() + go w.sendQueue.Start(w.ctx) + go w.checkIfMessagesStored() // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 8c30487ac..0e9648868 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -219,7 +219,7 @@ func TestBasicWakuV2(t *testing.T) { ContentTopic: contentTopic.ContentTopic(), Version: proto.Uint32(0), Timestamp: &msgTimestamp, - }) + }, nil) require.NoError(t, err) @@ -416,7 +416,7 @@ func TestWakuV2Filter(t *testing.T) { ContentTopic: contentTopic.ContentTopic(), Version: proto.Uint32(0), Timestamp: &msgTimestamp, - }) + }, nil) require.NoError(t, err) time.Sleep(5 * time.Second) @@ -443,7 +443,7 @@ func TestWakuV2Filter(t *testing.T) { ContentTopic: contentTopic.ContentTopic(), Version: proto.Uint32(0), Timestamp: &msgTimestamp, - }) + }, nil) require.NoError(t, err) time.Sleep(10 * time.Second) @@ -531,7 +531,7 @@ func TestWakuV2Store(t *testing.T) { ContentTopic: contentTopic.ContentTopic(), Version: proto.Uint32(0), Timestamp: &msgTimestamp, - }) + }, nil) require.NoError(t, err) waitForEnvelope(t, contentTopic.ContentTopic(), w2EnvelopeCh) @@ -632,7 +632,7 @@ func TestConfirmMessageDelivered(t *testing.T) { Version: proto.Uint32(0), Timestamp: &msgTimestamp, Ephemeral: proto.Bool(false), - }) + }, nil) require.NoError(t, err) time.Sleep(1 * time.Second) @@ -782,7 +782,7 @@ func TestLightpushRateLimit(t *testing.T) { ContentTopic: maps.Keys(contentTopics)[0].ContentTopic(), Version: proto.Uint32(0), Timestamp: &msgTimestamp, - }) + }, nil) require.NoError(t, err)