From 47899fd0456826f8da79e5cdbe5bebe490742c88 Mon Sep 17 00:00:00 2001 From: kaichao Date: Tue, 11 Jun 2024 15:45:01 +0800 Subject: [PATCH] feat_: hash based query for outgoing messages. (#5217) * feat_: hash based query for outgoing messages. * chore_: more logs * chore_: fix comments * chore_: do not lock when send queries * chore_: use constant for magic number * chore_: remove message ids from query queue after ack * chore_: fix ack clean process * chore_: fix message resend test * chore_: add test for waku confirm message sent. * chore_: fix tests. * chore_: fix more * chore_: set store peer id when mailserver updates * fix_: tests * chore_: increase max hash query length * chore_: remove debug log of ack message * chore_: remove automatic peer selection * chore_: mark raw message to sent after ack * chore_: fix test * chore_: fix test --- api/messenger_raw_message_resend_test.go | 7 +- eth-node/bridge/geth/waku.go | 6 + eth-node/bridge/geth/wakuv2.go | 8 + eth-node/types/waku.go | 6 + protocol/common/raw_messages_persistence.go | 9 +- .../common/raw_messages_persistence_test.go | 61 ++++-- protocol/messenger_mailserver_cycle.go | 5 + protocol/messenger_messages_tracking_test.go | 2 +- protocol/messenger_peers.go | 2 +- protocol/messenger_peersyncing.go | 9 + protocol/messenger_raw_message_resend.go | 8 +- protocol/transport/envelopes_monitor.go | 6 +- protocol/transport/transport.go | 19 ++ wakuv2/common/message.go | 3 +- wakuv2/waku.go | 189 ++++++++++++++++-- wakuv2/waku_test.go | 60 ++++++ 16 files changed, 358 insertions(+), 42 deletions(-) diff --git a/api/messenger_raw_message_resend_test.go b/api/messenger_raw_message_resend_test.go index 2480f30e0..7f03fd79b 100644 --- a/api/messenger_raw_message_resend_test.go +++ b/api/messenger_raw_message_resend_test.go @@ -211,7 +211,7 @@ func (s *MessengerRawMessageResendTest) TestMessageSent() { rawMessage, err := s.bobMessenger.RawMessageByID(ids[0]) s.Require().NoError(err) s.Require().NotNil(rawMessage) - if rawMessage.Sent { + if rawMessage.SendCount > 0 { return nil } return errors.New("raw message should be sent finally") @@ -227,12 +227,13 @@ func (s *MessengerRawMessageResendTest) TestMessageResend() { rawMessage, err := s.bobMessenger.RawMessageByID(ids[0]) s.Require().NoError(err) s.Require().NotNil(rawMessage) - s.Require().NoError(s.bobMessenger.UpdateRawMessageSent(rawMessage.ID, false, 0)) + s.Require().NoError(s.bobMessenger.UpdateRawMessageSent(rawMessage.ID, false)) + s.Require().NoError(s.bobMessenger.UpdateRawMessageLastSent(rawMessage.ID, 0)) err = tt.RetryWithBackOff(func() error { rawMessage, err := s.bobMessenger.RawMessageByID(ids[0]) s.Require().NoError(err) s.Require().NotNil(rawMessage) - if !rawMessage.Sent { + if rawMessage.SendCount < 2 { return errors.New("message ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN was not resent yet") } return nil diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 42a222d88..aa411ffda 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -314,3 +314,9 @@ func GetWakuFilterFrom(f types.Filter) *wakucommon.Filter { func (w *wakuFilterWrapper) ID() string { return w.id } + +func (w *GethWakuWrapper) ConfirmMessageDelivered(hashes []common.Hash) { +} + +func (w *GethWakuWrapper) SetStorePeerID(peerID peer.ID) { +} diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 283c7a6a9..2d3d4bf04 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -333,3 +333,11 @@ func GetWakuV2FilterFrom(f types.Filter) *wakucommon.Filter { func (w *wakuV2FilterWrapper) ID() string { return w.id } + +func (w *gethWakuV2Wrapper) ConfirmMessageDelivered(hashes []common.Hash) { + w.waku.ConfirmMessageDelivered(hashes) +} + +func (w *gethWakuV2Wrapper) SetStorePeerID(peerID peer.ID) { + w.waku.SetStorePeerID(peerID) +} diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index eba3cc0bf..31f90a629 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -178,4 +178,10 @@ type Waku interface { // ClearEnvelopesCache clears waku envelopes cache ClearEnvelopesCache() + + // ConfirmMessageDelivered updates a message has been delivered in waku + ConfirmMessageDelivered(hash []common.Hash) + + // SetStorePeerID updates the peer id of store node + SetStorePeerID(peerID peer.ID) } diff --git a/protocol/common/raw_messages_persistence.go b/protocol/common/raw_messages_persistence.go index 69ff7689b..ac4b534e3 100644 --- a/protocol/common/raw_messages_persistence.go +++ b/protocol/common/raw_messages_persistence.go @@ -488,7 +488,12 @@ func (db *RawMessagesPersistence) RemoveMessageSegmentsCompletedOlderThan(timest return err } -func (db RawMessagesPersistence) UpdateRawMessageSent(id string, sent bool, lastSent uint64) error { - _, err := db.db.Exec("UPDATE raw_messages SET sent = ?, last_sent = ? WHERE id = ?", sent, lastSent, id) +func (db RawMessagesPersistence) UpdateRawMessageSent(id string, sent bool) error { + _, err := db.db.Exec("UPDATE raw_messages SET sent = ? WHERE id = ?", sent, id) + return err +} + +func (db RawMessagesPersistence) UpdateRawMessageLastSent(id string, lastSent uint64) error { + _, err := db.db.Exec("UPDATE raw_messages SET last_sent = ? WHERE id = ?", lastSent, id) return err } diff --git a/protocol/common/raw_messages_persistence_test.go b/protocol/common/raw_messages_persistence_test.go index eb93b54b9..563758855 100644 --- a/protocol/common/raw_messages_persistence_test.go +++ b/protocol/common/raw_messages_persistence_test.go @@ -54,7 +54,50 @@ func TestUpdateRawMessageSent(t *testing.T) { require.NoError(t, err) rawMessageID := "1" - err = p.SaveRawMessage(&RawMessage{ + err = p.SaveRawMessage(buildRawMessage(rawMessageID, pk)) + require.NoError(t, err) + + rawMessage, err := p.RawMessageByID(rawMessageID) + require.NoError(t, err) + require.True(t, rawMessage.Sent) + require.Greater(t, rawMessage.LastSent, uint64(0)) + + err = p.UpdateRawMessageSent(rawMessageID, false) + require.NoError(t, err) + + m, err := p.RawMessageByID(rawMessageID) + require.NoError(t, err) + require.False(t, m.Sent) +} + +func TestUpdateRawMessageLastSent(t *testing.T) { + db, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + require.NoError(t, sqlite.Migrate(db)) + p := NewRawMessagesPersistence(db) + + pk, err := crypto.GenerateKey() + require.NoError(t, err) + + rawMessageID := "1" + err = p.SaveRawMessage(buildRawMessage(rawMessageID, pk)) + require.NoError(t, err) + + rawMessage, err := p.RawMessageByID(rawMessageID) + require.NoError(t, err) + require.True(t, rawMessage.Sent) + require.Greater(t, rawMessage.LastSent, uint64(0)) + + err = p.UpdateRawMessageLastSent(rawMessageID, 0) + require.NoError(t, err) + + m, err := p.RawMessageByID(rawMessageID) + require.NoError(t, err) + require.Equal(t, m.LastSent, uint64(0)) +} + +func buildRawMessage(rawMessageID string, pk *ecdsa.PrivateKey) *RawMessage { + return &RawMessage{ ID: rawMessageID, ResendType: ResendTypeRawMessage, LocalChatID: "", @@ -65,19 +108,5 @@ func TestUpdateRawMessageSent(t *testing.T) { Recipients: []*ecdsa.PublicKey{pk.Public().(*ecdsa.PublicKey)}, Sent: true, LastSent: uint64(time.Now().UnixNano() / int64(time.Millisecond)), - }) - require.NoError(t, err) - - rawMessage, err := p.RawMessageByID(rawMessageID) - require.NoError(t, err) - require.True(t, rawMessage.Sent) - require.Greater(t, rawMessage.LastSent, uint64(0)) - - err = p.UpdateRawMessageSent(rawMessageID, false, 0) - require.NoError(t, err) - - m, err := p.RawMessageByID(rawMessageID) - require.NoError(t, err) - require.False(t, m.Sent) - require.Equal(t, m.LastSent, uint64(0)) + } } diff --git a/protocol/messenger_mailserver_cycle.go b/protocol/messenger_mailserver_cycle.go index f3423e438..bbbd6a605 100644 --- a/protocol/messenger_mailserver_cycle.go +++ b/protocol/messenger_mailserver_cycle.go @@ -426,6 +426,11 @@ func (m *Messenger) connectToMailserver(ms mailservers.Mailserver) error { m.logger.Info("mailserver available", zap.String("address", m.mailserverCycle.activeMailserver.UniqueID())) m.EmitMailserverAvailable() signal.SendMailserverAvailable(m.mailserverCycle.activeMailserver.Address, m.mailserverCycle.activeMailserver.ID) + peerID, err := m.mailserverCycle.activeMailserver.PeerID() + if err != nil { + m.logger.Error("could not decode the peer id of mailserver", zap.Error(err)) + } + m.transport.SetStorePeerID(peerID) // Query mailserver if m.config.codeControlFlags.AutoRequestHistoricMessages { diff --git a/protocol/messenger_messages_tracking_test.go b/protocol/messenger_messages_tracking_test.go index c0a51b7dc..8825e1b9a 100644 --- a/protocol/messenger_messages_tracking_test.go +++ b/protocol/messenger_messages_tracking_test.go @@ -172,7 +172,7 @@ func (s *MessengerMessagesTrackingSuite) testMessageMarkedAsSent(textSize int) { // Message should be marked as sent eventually err = tt.RetryWithBackOff(func() error { rawMessage, err = s.bob.persistence.RawMessageByID(inputMessage.ID) - if err != nil || !rawMessage.Sent { + if err != nil || rawMessage.SendCount < 1 { return errors.New("message not marked as sent") } return nil diff --git a/protocol/messenger_peers.go b/protocol/messenger_peers.go index 59933ff4a..1f5b28eba 100644 --- a/protocol/messenger_peers.go +++ b/protocol/messenger_peers.go @@ -13,7 +13,7 @@ func (m *Messenger) AddStorePeer(address string) (peer.ID, error) { } func (m *Messenger) AddRelayPeer(address string) (peer.ID, error) { - return m.transport.AddStorePeer(address) + return m.transport.AddRelayPeer(address) } func (m *Messenger) DialPeer(address string) error { diff --git a/protocol/messenger_peersyncing.go b/protocol/messenger_peersyncing.go index 2495f14b7..88a08d5ec 100644 --- a/protocol/messenger_peersyncing.go +++ b/protocol/messenger_peersyncing.go @@ -38,11 +38,20 @@ func (m *Messenger) markDeliveredMessages(acks [][]byte) { messageID := messageIDBytes.String() //mark messages as delivered + m.logger.Debug("got datasync acknowledge for message", zap.String("ack", hex.EncodeToString(ack)), zap.String("messageID", messageID)) + err = m.UpdateMessageOutgoingStatus(messageID, common.OutgoingStatusDelivered) if err != nil { m.logger.Debug("Can't set message status as delivered", zap.Error(err)) } + err = m.UpdateRawMessageSent(messageID, true) + if err != nil { + m.logger.Debug("can't set raw message as sent", zap.Error(err)) + } + + m.transport.ConfirmMessageDelivered(messageID) + //send signal to client that message status updated if m.config.messengerSignalsHandler != nil { message, err := m.persistence.MessageByID(messageID) diff --git a/protocol/messenger_raw_message_resend.go b/protocol/messenger_raw_message_resend.go index 3e0649000..80f08445f 100644 --- a/protocol/messenger_raw_message_resend.go +++ b/protocol/messenger_raw_message_resend.go @@ -200,6 +200,10 @@ func (m *Messenger) RawMessageByID(id string) (*common.RawMessage, error) { return m.persistence.RawMessageByID(id) } -func (m *Messenger) UpdateRawMessageSent(id string, sent bool, lastSent uint64) error { - return m.persistence.UpdateRawMessageSent(id, sent, lastSent) +func (m *Messenger) UpdateRawMessageSent(id string, sent bool) error { + return m.persistence.UpdateRawMessageSent(id, sent) +} + +func (m *Messenger) UpdateRawMessageLastSent(id string, lastSent uint64) error { + return m.persistence.UpdateRawMessageLastSent(id, lastSent) } diff --git a/protocol/transport/envelopes_monitor.go b/protocol/transport/envelopes_monitor.go index 9b34f095b..9d5f0b127 100644 --- a/protocol/transport/envelopes_monitor.go +++ b/protocol/transport/envelopes_monitor.go @@ -136,7 +136,7 @@ func (m *EnvelopesMonitor) Add(messageIDs [][]byte, envelopeHashes []types.Hash, defer m.mu.Unlock() for _, messageID := range messageIDs { - m.messageEnvelopeHashes[string(messageID)] = envelopeHashes + m.messageEnvelopeHashes[types.HexBytes(messageID).String()] = envelopeHashes } for i, envelopeHash := range envelopeHashes { @@ -399,7 +399,7 @@ func (m *EnvelopesMonitor) processMessageIDs(messageIDs [][]byte) { sentMessageIDs := make([][]byte, 0, len(messageIDs)) for _, messageID := range messageIDs { - hashes, ok := m.messageEnvelopeHashes[string(messageID)] + hashes, ok := m.messageEnvelopeHashes[types.HexBytes(messageID).String()] if !ok { continue } @@ -432,6 +432,6 @@ func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) { } delete(m.envelopes, envelopeID) for _, messageID := range envelope.messageIDs { - delete(m.messageEnvelopeHashes, string(messageID)) + delete(m.messageEnvelopeHashes, types.HexBytes(messageID).String()) } } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index b3031101a..607b2ed14 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -717,3 +717,22 @@ func (t *Transport) RemovePubsubTopicKey(topic string) error { } return nil } + +func (t *Transport) ConfirmMessageDelivered(messageID string) { + if t.envelopesMonitor == nil { + return + } + hashes, ok := t.envelopesMonitor.messageEnvelopeHashes[messageID] + if !ok { + return + } + commHashes := make([]common.Hash, len(hashes)) + for i, h := range hashes { + commHashes[i] = common.BytesToHash(h[:]) + } + t.waku.ConfirmMessageDelivered(commHashes) +} + +func (t *Transport) SetStorePeerID(peerID peer.ID) { + t.waku.SetStorePeerID(peerID) +} diff --git a/wakuv2/common/message.go b/wakuv2/common/message.go index d89cfc5a0..4c4703da0 100644 --- a/wakuv2/common/message.go +++ b/wakuv2/common/message.go @@ -22,6 +22,7 @@ type MessageType int const ( RelayedMessageType MessageType = iota StoreMessageType + SendMessageType ) // MessageParams specifies the exact way a message should be wrapped @@ -46,7 +47,7 @@ type ReceivedMessage struct { Padding []byte Signature []byte - Sent uint32 // Time when the message was posted into the network + Sent uint32 // Time when the message was posted into the network in seconds Src *ecdsa.PublicKey // Message recipient (identity used to decode the message) Dst *ecdsa.PublicKey // Message recipient (identity used to decode the message) diff --git a/wakuv2/waku.go b/wakuv2/waku.go index 54eab868a..7ff2422d5 100644 --- a/wakuv2/waku.go +++ b/wakuv2/waku.go @@ -19,6 +19,7 @@ package wakuv2 import ( + "bytes" "context" "crypto/ecdsa" "crypto/sha256" @@ -64,6 +65,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/protocol/lightpush" "github.com/waku-org/go-waku/waku/v2/protocol/peer_exchange" "github.com/waku-org/go-waku/waku/v2/protocol/relay" + "github.com/waku-org/go-waku/waku/v2/protocol/store" "github.com/status-im/status-go/connection" "github.com/status-im/status-go/eth-node/types" @@ -81,6 +83,9 @@ const requestTimeout = 30 * time.Second const bootnodesQueryBackoffMs = 200 const bootnodesMaxRetries = 7 const cacheTTL = 20 * time.Minute +const maxHashQueryLength = 100 +const hashQueryInterval = 5 * time.Second +const messageSentPeriod = 5 // in seconds type ITelemetryClient interface { PushReceivedEnvelope(*protocol.Envelope) @@ -125,6 +130,11 @@ type Waku struct { storeMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids storeMsgIDsMu sync.RWMutex + sendMsgIDs map[string]map[gethcommon.Hash]uint32 + sendMsgIDsMu sync.RWMutex + + storePeerID peer.ID + topicHealthStatusChan chan peermanager.TopicHealthStatus connStatusSubscriptions map[string]*types.ConnStatusSubscription connStatusMu sync.Mutex @@ -205,6 +215,8 @@ func New(nodeKey *ecdsa.PrivateKey, fleet string, cfg *Config, logger *zap.Logge storeMsgIDs: make(map[gethcommon.Hash]bool), timesource: ts, storeMsgIDsMu: sync.RWMutex{}, + sendMsgIDs: make(map[string]map[gethcommon.Hash]uint32), + sendMsgIDsMu: sync.RWMutex{}, logger: logger, discV5BootstrapNodes: cfg.DiscV5BootstrapNodes, onHistoricMessagesRequestFailed: onHistoricMessagesRequestFailed, @@ -969,23 +981,98 @@ func (w *Waku) broadcast() { } } +func (w *Waku) checkIfMessagesStored() { + ticker := time.NewTicker(hashQueryInterval) + defer ticker.Stop() + + for { + select { + case <-w.ctx.Done(): + w.logger.Debug("stop the look for message stored check") + return + case <-ticker.C: + w.sendMsgIDsMu.Lock() + w.logger.Debug("running loop for messages stored check", zap.Any("messageIds", w.sendMsgIDs)) + pubsubTopics := make([]string, 0, len(w.sendMsgIDs)) + pubsubMessageIds := make([][]gethcommon.Hash, 0, len(w.sendMsgIDs)) + for pubsubTopic, subMsgs := range w.sendMsgIDs { + var queryMsgIds []gethcommon.Hash + for msgID, sendTime := range subMsgs { + if len(queryMsgIds) >= maxHashQueryLength { + break + } + // message is sent 5 seconds ago, check if it's stored + if uint32(w.timesource.Now().Unix()) > sendTime+messageSentPeriod { + queryMsgIds = append(queryMsgIds, msgID) + } + } + w.logger.Debug("store query for message hashes", zap.Any("queryMsgIds", queryMsgIds), zap.String("pubsubTopic", pubsubTopic)) + if len(queryMsgIds) > 0 { + pubsubTopics = append(pubsubTopics, pubsubTopic) + pubsubMessageIds = append(pubsubMessageIds, queryMsgIds) + } + } + w.sendMsgIDsMu.Unlock() + + pubsubProcessedMessages := make([][]gethcommon.Hash, len(pubsubTopics)) + for i, pubsubTopic := range pubsubTopics { + processedMessages := w.messageHashBasedQuery(w.ctx, pubsubMessageIds[i], pubsubTopic) + pubsubProcessedMessages[i] = processedMessages + } + + w.sendMsgIDsMu.Lock() + for i, pubsubTopic := range pubsubTopics { + subMsgs, ok := w.sendMsgIDs[pubsubTopic] + if !ok { + continue + } + for _, hash := range pubsubProcessedMessages[i] { + delete(subMsgs, hash) + if len(subMsgs) == 0 { + delete(w.sendMsgIDs, pubsubTopic) + } else { + w.sendMsgIDs[pubsubTopic] = subMsgs + } + } + } + w.logger.Debug("messages for next store hash query", zap.Any("messageIds", w.sendMsgIDs)) + w.sendMsgIDsMu.Unlock() + + } + } +} + +func (w *Waku) ConfirmMessageDelivered(hashes []gethcommon.Hash) { + w.sendMsgIDsMu.Lock() + defer w.sendMsgIDsMu.Unlock() + for pubsubTopic, subMsgs := range w.sendMsgIDs { + for _, hash := range hashes { + delete(subMsgs, hash) + if len(subMsgs) == 0 { + delete(w.sendMsgIDs, pubsubTopic) + } else { + w.sendMsgIDs[pubsubTopic] = subMsgs + } + } + } +} + +func (w *Waku) SetStorePeerID(peerID peer.ID) { + w.storePeerID = peerID +} + type publishFn = func(envelope *protocol.Envelope, logger *zap.Logger) error func (w *Waku) publishEnvelope(envelope *protocol.Envelope, publishFn publishFn, logger *zap.Logger) { defer w.wg.Done() - var event common.EventType if err := publishFn(envelope, logger); err != nil { logger.Error("could not send message", zap.Error(err)) - event = common.EventEnvelopeExpired - } else { - event = common.EventEnvelopeSent + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), + Event: common.EventEnvelopeExpired, + }) } - - w.SendEnvelopeEvent(common.EnvelopeEvent{ - Hash: gethcommon.BytesToHash(envelope.Hash().Bytes()), - Event: event, - }) } // Send injects a message into the waku send queue, to be distributed in the @@ -1014,7 +1101,7 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { alreadyCached := w.envelopeCache.Has(gethcommon.BytesToHash(envelope.Hash().Bytes())) w.poolMu.Unlock() if !alreadyCached { - recvMessage := common.NewReceivedMessage(envelope, common.RelayedMessageType) + recvMessage := common.NewReceivedMessage(envelope, common.SendMessageType) w.postEvent(recvMessage) // notify the local node about the new message w.addEnvelope(recvMessage) } @@ -1022,6 +1109,68 @@ func (w *Waku) Send(pubsubTopic string, msg *pb.WakuMessage) ([]byte, error) { return envelope.Hash().Bytes(), nil } +// ctx, peer, r.PubsubTopic, contentTopics, uint64(r.From), uint64(r.To), options, processEnvelopes +func (w *Waku) messageHashBasedQuery(ctx context.Context, hashes []gethcommon.Hash, pubsubTopic string) []gethcommon.Hash { + selectedPeer := w.storePeerID + if selectedPeer == "" { + w.logger.Error("no store peer id available", zap.String("pubsubTopic", pubsubTopic)) + return []gethcommon.Hash{} + } + + var opts []store.RequestOption + requestID := protocol.GenerateRequestID() + opts = append(opts, store.WithRequestID(requestID)) + opts = append(opts, store.WithPeer(selectedPeer)) + opts = append(opts, store.WithPaging(false, maxHashQueryLength)) + opts = append(opts, store.IncludeData(false)) + + messageHashes := make([]pb.MessageHash, len(hashes)) + for i, hash := range hashes { + messageHashes[i] = pb.ToMessageHash(hash.Bytes()) + } + + w.logger.Debug("store.queryByHash request", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Any("messageHashes", messageHashes)) + + result, err := w.node.Store().QueryByHash(ctx, messageHashes, opts...) + if err != nil { + w.logger.Error("store.queryByHash failed", zap.String("requestID", hexutil.Encode(requestID)), zap.Stringer("peerID", selectedPeer), zap.Error(err)) + return []gethcommon.Hash{} + } + + w.logger.Debug("store.queryByHash result", zap.String("requestID", hexutil.Encode(requestID)), zap.Int("messages", len(result.Messages()))) + + var ackHashes []gethcommon.Hash + var missedHashes []gethcommon.Hash + for _, hash := range hashes { + found := false + for _, msg := range result.Messages() { + if bytes.Equal(msg.GetMessageHash(), hash.Bytes()) { + found = true + break + } + } + + if found { + ackHashes = append(ackHashes, hash) + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeSent, + }) + } else { + missedHashes = append(missedHashes, hash) + w.SendEnvelopeEvent(common.EnvelopeEvent{ + Hash: hash, + Event: common.EventEnvelopeExpired, + }) + } + } + + w.logger.Debug("ack message hashes", zap.Any("ackHashes", ackHashes)) + w.logger.Debug("missed message hashes", zap.Any("missedHashes", missedHashes)) + + return append(ackHashes, missedHashes...) +} + func (w *Waku) Query(ctx context.Context, peerID peer.ID, query legacy_store.Query, cursor *storepb.Index, opts []legacy_store.HistoryRequestOption, processEnvelopes bool) (*storepb.Index, int, error) { requestID := protocol.GenerateRequestID() @@ -1179,6 +1328,8 @@ func (w *Waku) Start() error { go w.broadcast() + go w.checkIfMessagesStored() + // we should wait `seedBootnodesForDiscV5` shutdown smoothly before set w.ctx to nil within `w.Stop()` w.wg.Add(1) go w.seedBootnodesForDiscV5() @@ -1312,7 +1463,7 @@ func (w *Waku) add(recvMessage *common.ReceivedMessage, processImmediately bool) if !alreadyCached || !envelope.Value().Processed.Load() { if processImmediately { logger.Debug("immediately processing envelope") - w.processReceivedMessage(recvMessage) + w.processMessage(recvMessage) } else { logger.Debug("posting event") w.postEvent(recvMessage) // notify the local node about the new message @@ -1337,12 +1488,12 @@ func (w *Waku) processQueueLoop() { case <-w.ctx.Done(): return case e := <-w.msgQueue: - w.processReceivedMessage(e) + w.processMessage(e) } } } -func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) { +func (w *Waku) processMessage(e *common.ReceivedMessage) { logger := w.logger.With( zap.Stringer("envelopeHash", e.Envelope.Hash()), zap.String("pubsubTopic", e.PubsubTopic), @@ -1358,6 +1509,18 @@ func (w *Waku) processReceivedMessage(e *common.ReceivedMessage) { w.storeMsgIDsMu.Unlock() } + ephemeral := e.Envelope.Message().Ephemeral + if e.MsgType == common.SendMessageType && (ephemeral == nil || !*ephemeral) { + w.sendMsgIDsMu.Lock() + subMsgs, ok := w.sendMsgIDs[e.PubsubTopic] + if !ok { + subMsgs = make(map[gethcommon.Hash]uint32) + } + subMsgs[e.Hash()] = e.Sent + w.sendMsgIDs[e.PubsubTopic] = subMsgs + w.sendMsgIDsMu.Unlock() + } + matched := w.filters.NotifyWatchers(e) // If not matched we remove it diff --git a/wakuv2/waku_test.go b/wakuv2/waku_test.go index 7fe9accb3..2779abe07 100644 --- a/wakuv2/waku_test.go +++ b/wakuv2/waku_test.go @@ -13,6 +13,7 @@ import ( "github.com/cenkalti/backoff/v3" + ethcommon "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" ethdnsdisc "github.com/ethereum/go-ethereum/p2p/dnsdisc" @@ -25,6 +26,7 @@ import ( "github.com/waku-org/go-waku/waku/v2/dnsdisc" "github.com/waku-org/go-waku/waku/v2/protocol/legacy_store" "github.com/waku-org/go-waku/waku/v2/protocol/pb" + "github.com/waku-org/go-waku/waku/v2/protocol/relay" "github.com/waku-org/go-waku/waku/v2/protocol/subscription" "github.com/status-im/status-go/appdatabase" @@ -537,3 +539,61 @@ func waitForEnvelope(t *testing.T, contentTopic string, envCh chan common.Envelo } } } + +func TestConfirmMessageDelivered(t *testing.T) { + aliceConfig := &Config{} + aliceNode, err := New(nil, "", aliceConfig, nil, nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, aliceNode.Start()) + + bobConfig := &Config{} + bobNode, err := New(nil, "", bobConfig, nil, nil, nil, nil, nil) + require.NoError(t, err) + require.NoError(t, bobNode.Start()) + + addrs := aliceNode.ListenAddresses() + require.Greater(t, len(addrs), 0) + _, err = bobNode.AddRelayPeer(addrs[0]) + require.NoError(t, err) + err = bobNode.DialPeer(addrs[0]) + require.NoError(t, err) + + filter := &common.Filter{ + Messages: common.NewMemoryMessageStore(), + ContentTopics: common.NewTopicSetFromBytes([][]byte{[]byte{1, 2, 3, 4}}), + } + + _, err = aliceNode.Subscribe(filter) + require.NoError(t, err) + + msgTimestamp := aliceNode.timestamp() + contentTopic := maps.Keys(filter.ContentTopics)[0] + + _, err = aliceNode.Send(relay.DefaultWakuTopic, &pb.WakuMessage{ + Payload: []byte{1, 2, 3, 4, 5}, + ContentTopic: contentTopic.ContentTopic(), + Version: proto.Uint32(0), + Timestamp: &msgTimestamp, + Ephemeral: proto.Bool(false), + }) + require.NoError(t, err) + + time.Sleep(1 * time.Second) + + messages := filter.Retrieve() + require.Len(t, messages, 1) + + require.Len(t, aliceNode.sendMsgIDs, 1) + for _, msgs := range aliceNode.sendMsgIDs { + require.Len(t, msgs, 1) + for hash := range msgs { + require.Equal(t, hash, messages[0].Hash()) + } + } + + aliceNode.ConfirmMessageDelivered([]ethcommon.Hash{messages[0].Hash()}) + require.Len(t, aliceNode.sendMsgIDs, 0) + + require.NoError(t, aliceNode.Stop()) + require.NoError(t, bobNode.Stop()) +}