diff --git a/go.mod b/go.mod index e428482ef..178c86e14 100644 --- a/go.mod +++ b/go.mod @@ -96,7 +96,7 @@ require ( github.com/schollz/peerdiscovery v1.7.0 github.com/siphiuel/lc-proxy-wrapper v0.0.0-20230516150924-246507cee8c7 github.com/urfave/cli/v2 v2.27.2 - github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833 + github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 github.com/wk8/go-ordered-map/v2 v2.1.7 github.com/yeqown/go-qrcode/v2 v2.2.1 github.com/yeqown/go-qrcode/writer/standard v1.2.1 diff --git a/go.sum b/go.sum index 98f51508a..feea5a45b 100644 --- a/go.sum +++ b/go.sum @@ -2143,8 +2143,8 @@ github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5 h1:9u16E github.com/waku-org/go-libp2p-pubsub v0.0.0-20240703191659-2cbb09eac9b5/go.mod h1:QEb+hEV9WL9wCiUAnpY29FZR6W3zK8qYlaml8R4q6gQ= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0 h1:R4YYx2QamhBRl/moIxkDCNW+OP7AHbyWLBygDc/xIMo= github.com/waku-org/go-libp2p-rendezvous v0.0.0-20240110193335-a67d1cc760a0/go.mod h1:EhZP9fee0DYjKH/IOQvoNSy1tSHp2iZadsHGphcAJgY= -github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833 h1:ywaQQJ4WASimv8Y6ut7xhkBYMXyRZQCEw64CFPJJCbQ= -github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= +github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 h1:aTOUQm0kKtHiqraFpqj1Ja++C+qyZyeiSPKtXe3Ctac= +github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9/go.mod h1:VNbVmh5UYg3vIvhGV4hCw8QEykq3RScDACo2Y2dIFfg= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59 h1:jisj+OCI6QydLtFq3Pyhu49wl9ytPN7oAHjMfepHDrA= github.com/waku-org/go-zerokit-rln v0.1.14-0.20240102145250-fa738c0bdf59/go.mod h1:1PdBdPzyTaKt3VnpAHk3zj+r9dXPFOr3IHZP9nFle6E= github.com/waku-org/go-zerokit-rln-apple v0.0.0-20230916172309-ee0ee61dde2b h1:KgZVhsLkxsj5gb/FfndSCQu6VYwALrCOgYI3poR95yE= diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go new file mode 100644 index 000000000..ef5148bdb --- /dev/null +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_check.go @@ -0,0 +1,248 @@ +package publish + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/libp2p/go-libp2p/core/peer" + "github.com/waku-org/go-waku/waku/v2/protocol" + "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/store" + "github.com/waku-org/go-waku/waku/v2/timesource" + "go.uber.org/zap" +) + +const DefaultMaxHashQueryLength = 100 +const DefaultHashQueryInterval = 3 * time.Second +const DefaultMessageSentPeriod = 3 // in seconds +const DefaultMessageExpiredPerid = 10 // in seconds + +type MessageSentCheckOption func(*MessageSentCheck) error + +// MessageSentCheck tracks the outgoing messages and check against store node +// if the message sent time has passed the `messageSentPeriod`, the message id will be includes for the next query +// if the message keeps missing after `messageExpiredPerid`, the message id will be expired +type MessageSentCheck struct { + messageIDs map[string]map[common.Hash]uint32 + messageIDsMu sync.RWMutex + storePeerID peer.ID + MessageStoredChan chan common.Hash + MessageExpiredChan chan common.Hash + ctx context.Context + store *store.WakuStore + timesource timesource.Timesource + logger *zap.Logger + maxHashQueryLength uint64 + hashQueryInterval time.Duration + messageSentPeriod uint32 + messageExpiredPerid uint32 +} + +// NewMessageSentCheck creates a new instance of MessageSentCheck with default parameters +func NewMessageSentCheck(ctx context.Context, store *store.WakuStore, timesource timesource.Timesource, logger *zap.Logger) *MessageSentCheck { + return &MessageSentCheck{ + messageIDs: make(map[string]map[common.Hash]uint32), + messageIDsMu: sync.RWMutex{}, + MessageStoredChan: make(chan common.Hash, 1000), + MessageExpiredChan: make(chan common.Hash, 1000), + ctx: ctx, + store: store, + timesource: timesource, + logger: logger, + maxHashQueryLength: DefaultMaxHashQueryLength, + hashQueryInterval: DefaultHashQueryInterval, + messageSentPeriod: DefaultMessageSentPeriod, + messageExpiredPerid: DefaultMessageExpiredPerid, + } +} + +// WithMaxHashQueryLength sets the maximum number of message hashes to query in one request +func WithMaxHashQueryLength(count uint64) MessageSentCheckOption { + return func(params *MessageSentCheck) error { + params.maxHashQueryLength = count + return nil + } +} + +// WithHashQueryInterval sets the interval to query the store node +func WithHashQueryInterval(interval time.Duration) MessageSentCheckOption { + return func(params *MessageSentCheck) error { + params.hashQueryInterval = interval + return nil + } +} + +// WithMessageSentPeriod sets the delay period to query the store node after message is published +func WithMessageSentPeriod(period uint32) MessageSentCheckOption { + return func(params *MessageSentCheck) error { + params.messageSentPeriod = period + return nil + } +} + +// WithMessageExpiredPerid sets the period that a message is considered expired +func WithMessageExpiredPerid(period uint32) MessageSentCheckOption { + return func(params *MessageSentCheck) error { + params.messageExpiredPerid = period + return nil + } +} + +// Add adds a message for message sent check +func (m *MessageSentCheck) Add(topic string, messageID common.Hash, sentTime uint32) { + m.messageIDsMu.Lock() + defer m.messageIDsMu.Unlock() + + if _, ok := m.messageIDs[topic]; !ok { + m.messageIDs[topic] = make(map[common.Hash]uint32) + } + m.messageIDs[topic][messageID] = sentTime +} + +// DeleteByMessageIDs deletes the message ids from the message sent check, used by scenarios like message acked with MVDS +func (m *MessageSentCheck) DeleteByMessageIDs(messageIDs []common.Hash) { + m.messageIDsMu.Lock() + defer m.messageIDsMu.Unlock() + + for pubsubTopic, subMsgs := range m.messageIDs { + for _, hash := range messageIDs { + delete(subMsgs, hash) + if len(subMsgs) == 0 { + delete(m.messageIDs, pubsubTopic) + } else { + m.messageIDs[pubsubTopic] = subMsgs + } + } + } +} + +// SetStorePeerID sets the peer id of store node +func (m *MessageSentCheck) SetStorePeerID(peerID peer.ID) { + m.storePeerID = peerID +} + +// CheckIfMessagesStored checks if the tracked outgoing messages are stored periodically +func (m *MessageSentCheck) CheckIfMessagesStored() { + ticker := time.NewTicker(m.hashQueryInterval) + defer ticker.Stop() + for { + select { + case <-m.ctx.Done(): + m.logger.Debug("stop the look for message stored check") + return + case <-ticker.C: + m.messageIDsMu.Lock() + m.logger.Debug("running loop for messages stored check", zap.Any("messageIds", m.messageIDs)) + pubsubTopics := make([]string, 0, len(m.messageIDs)) + pubsubMessageIds := make([][]common.Hash, 0, len(m.messageIDs)) + pubsubMessageTime := make([][]uint32, 0, len(m.messageIDs)) + for pubsubTopic, subMsgs := range m.messageIDs { + var queryMsgIds []common.Hash + var queryMsgTime []uint32 + for msgID, sendTime := range subMsgs { + if uint64(len(queryMsgIds)) >= m.maxHashQueryLength { + break + } + // message is sent 5 seconds ago, check if it's stored + if uint32(m.timesource.Now().Unix()) > sendTime+m.messageSentPeriod { + queryMsgIds = append(queryMsgIds, msgID) + queryMsgTime = append(queryMsgTime, sendTime) + } + } + m.logger.Debug("store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic)) + if len(queryMsgIds) > 0 { + pubsubTopics = append(pubsubTopics, pubsubTopic) + pubsubMessageIds = append(pubsubMessageIds, queryMsgIds) + pubsubMessageTime = append(pubsubMessageTime, queryMsgTime) + } + } + m.messageIDsMu.Unlock() + + pubsubProcessedMessages := make([][]common.Hash, len(pubsubTopics)) + for i, pubsubTopic := range pubsubTopics { + processedMessages := m.messageHashBasedQuery(m.ctx, pubsubMessageIds[i], pubsubMessageTime[i], pubsubTopic) + pubsubProcessedMessages[i] = processedMessages + } + + m.messageIDsMu.Lock() + for i, pubsubTopic := range pubsubTopics { + subMsgs, ok := m.messageIDs[pubsubTopic] + if !ok { + continue + } + for _, hash := range pubsubProcessedMessages[i] { + delete(subMsgs, hash) + if len(subMsgs) == 0 { + delete(m.messageIDs, pubsubTopic) + } else { + m.messageIDs[pubsubTopic] = subMsgs + } + } + } + m.logger.Debug("messages for next store hash query", zap.Any("messageIds", m.messageIDs)) + m.messageIDsMu.Unlock() + + } + } +} + +func (m *MessageSentCheck) messageHashBasedQuery(ctx context.Context, hashes []common.Hash, relayTime []uint32, pubsubTopic string) []common.Hash { + selectedPeer := m.storePeerID + if selectedPeer == "" { + m.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic)) + return []common.Hash{} + } + + var opts []store.RequestOption + requestID := protocol.GenerateRequestID() + opts = append(opts, store.WithRequestID(requestID)) + opts = append(opts, store.WithPeer(selectedPeer)) + opts = append(opts, store.WithPaging(false, m.maxHashQueryLength)) + opts = append(opts, store.IncludeData(false)) + + messageHashes := make([]pb.MessageHash, len(hashes)) + for i, hash := range hashes { + messageHashes[i] = pb.ToMessageHash(hash.Bytes()) + } + + m.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes)) + + result, err := m.store.QueryByHash(ctx, messageHashes, opts...) + if err != nil { + m.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) + return []common.Hash{} + } + + m.logger.Debug("store.queryByHash result", zap.String("requestID", hexutil.Encode(requestID)), zap.Int("messages", len(result.Messages()))) + + var ackHashes []common.Hash + var missedHashes []common.Hash + for i, hash := range hashes { + found := false + for _, msg := range result.Messages() { + if bytes.Equal(msg.GetMessageHash(), hash.Bytes()) { + found = true + break + } + } + + if found { + ackHashes = append(ackHashes, hash) + m.MessageStoredChan <- hash + } + + if !found && uint32(m.timesource.Now().Unix()) > relayTime[i]+m.messageExpiredPerid { + missedHashes = append(missedHashes, hash) + m.MessageExpiredChan <- hash + } + } + + m.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes)) + m.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes)) + + return append(ackHashes, missedHashes...) +} diff --git a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go index fbd79df88..ad153ff9c 100644 --- a/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go +++ b/vendor/github.com/waku-org/go-waku/waku/v2/api/publish/message_queue.go @@ -103,12 +103,6 @@ func (m *MessageQueue) Start(ctx context.Context) { m.envelopeAvailableOnPriorityQueueSignal <- struct{}{} case <-ctx.Done(): - if m.usePriorityQueue { - close(m.throttledPrioritySendQueue) - close(m.envelopeAvailableOnPriorityQueueSignal) - } else { - close(m.toSendChan) - } return } } @@ -116,27 +110,43 @@ func (m *MessageQueue) Start(ctx context.Context) { // Push an envelope into the message queue. The priority is optional, and will be ignored // if the message queue does not use a priority queue -func (m *MessageQueue) Push(envelope *protocol.Envelope, priority ...MessagePriority) { +func (m *MessageQueue) Push(ctx context.Context, envelope *protocol.Envelope, priority ...MessagePriority) error { if m.usePriorityQueue { msgPriority := NormalPriority if len(priority) != 0 { msgPriority = priority[0] } - m.throttledPrioritySendQueue <- &envelopePriority{ + pEnvelope := &envelopePriority{ envelope: envelope, priority: msgPriority, } + + select { + case m.throttledPrioritySendQueue <- pEnvelope: + // Do nothing + case <-ctx.Done(): + return ctx.Err() + } } else { - m.toSendChan <- envelope + select { + case m.toSendChan <- envelope: + // Do nothing + case <-ctx.Done(): + return ctx.Err() + } } + + return nil } // Pop will return a channel on which a message can be retrieved from the message queue -func (m *MessageQueue) Pop() <-chan *protocol.Envelope { +func (m *MessageQueue) Pop(ctx context.Context) <-chan *protocol.Envelope { ch := make(chan *protocol.Envelope) go func() { + defer close(ch) + select { case _, ok := <-m.envelopeAvailableOnPriorityQueueSignal: if ok { @@ -147,9 +157,11 @@ func (m *MessageQueue) Pop() <-chan *protocol.Envelope { if ok { ch <- envelope } + + case <-ctx.Done(): + return } - close(ch) }() return ch diff --git a/vendor/modules.txt b/vendor/modules.txt index fd31c9e36..bd6c0fafc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1018,7 +1018,7 @@ github.com/waku-org/go-discover/discover/v5wire github.com/waku-org/go-libp2p-rendezvous github.com/waku-org/go-libp2p-rendezvous/db github.com/waku-org/go-libp2p-rendezvous/pb -# github.com/waku-org/go-waku v0.8.1-0.20240806122111-5aa11311f833 +# github.com/waku-org/go-waku v0.8.1-0.20240806200653-4f1d692413e9 ## explicit; go 1.21 github.com/waku-org/go-waku/logging github.com/waku-org/go-waku/tests diff --git a/wakuv2/message_publishing.go b/wakuv2/message_publishing.go index 0e32d8ee0..2d16f6f23 100644 --- a/wakuv2/message_publishing.go +++ b/wakuv2/message_publishing.go @@ -53,9 +53,15 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage, priority *int) ([]b envelope := protocol.NewEnvelope(msg, msg.GetTimestamp(), pubsubTopic) if priority != nil { - w.sendQueue.Push(envelope, *priority) + err := w.sendQueue.Push(w.ctx, envelope, *priority) + if err != nil { + return nil, err + } } else { - w.sendQueue.Push(envelope) + err := w.sendQueue.Push(w.ctx, envelope) + if err != nil { + return nil, err + } } w.poolMu.Lock() @@ -75,7 +81,7 @@ func (w *Waku) broadcast() { var envelope *protocol.Envelope select { - case envelope = <-w.sendQueue.Pop(): + case envelope = <-w.sendQueue.Pop(w.ctx): case <-w.ctx.Done(): return