diff --git a/VERSION b/VERSION index 0d07edfc3..c1ed59458 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.91.11 +0.91.12 diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index b2bb6a4c6..d6ffe4304 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -5,6 +5,7 @@ import ( "errors" "time" + "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/waku" wakucommon "github.com/status-im/status-go/waku/common" @@ -235,6 +236,14 @@ func (w *gethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, enve return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*wakucommon.Envelope), timeout) } +func (w *gethWakuWrapper) ProcessingP2PMessages() bool { + return w.waku.ProcessingP2PMessages() +} + +func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) { + w.waku.MarkP2PMessageAsProcessed(hash) +} + func (w *gethWakuWrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { return nil, errors.New("not implemented") } diff --git a/eth-node/bridge/geth/wakuv2.go b/eth-node/bridge/geth/wakuv2.go index 6d0eb14e2..52cf876a9 100644 --- a/eth-node/bridge/geth/wakuv2.go +++ b/eth-node/bridge/geth/wakuv2.go @@ -9,6 +9,7 @@ import ( "github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/store" + "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/wakuv2" wakucommon "github.com/status-im/status-go/wakuv2/common" @@ -254,6 +255,15 @@ func (w *gethWakuV2Wrapper) DropPeer(peerID string) error { return w.waku.DropPeer(peerID) } +func (w *gethWakuV2Wrapper) ProcessingP2PMessages() bool { + // NOTE: not implemented for waku 2 + return false +} + +func (w *gethWakuV2Wrapper) MarkP2PMessageAsProcessed(hash common.Hash) { + // NOTE: not implemented for waku 2 +} + type wakuV2FilterWrapper struct { filter *wakucommon.Filter id string diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index a4dcd2f52..ec1f9977e 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -3,6 +3,8 @@ package types import ( "crypto/ecdsa" "time" + + "github.com/ethereum/go-ethereum/common" ) // Whisper represents a dark communication interface through the Ethereum @@ -79,4 +81,10 @@ type Waku interface { // RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages RequestStoreMessages(peerID []byte, request MessagesRequest) (*StoreRequestCursor, error) + + // ProcessingP2PMessages indicates whether there are in-flight p2p messages + ProcessingP2PMessages() bool + + // MarkP2PMessageAsProcessed tells the waku layer that a P2P message has been processed + MarkP2PMessageAsProcessed(common.Hash) } diff --git a/protocol/messenger.go b/protocol/messenger.go index a3d29a9a4..b533aabfc 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -21,6 +21,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/golang/protobuf/proto" + gethcommon "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appmetrics" "github.com/status-im/status-go/connection" @@ -3287,6 +3288,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte messageState.Response.CommunityChanges = nil if allMessagesProcessed { + m.transport.MarkP2PMessageAsProcessed(gethcommon.BytesToHash(shhMessage.Hash)) processedMessages = append(processedMessages, types.EncodeHex(shhMessage.Hash)) } } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index fa6bcaf2c..483443f8c 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -430,6 +430,8 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error { return err } } + logger.Info("waiting until message processed") + m.waitUntilP2PMessagesProcessed() logger.Info("synced topic") return nil } @@ -572,6 +574,21 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error { return m.persistence.DeleteMessages(messageIDs) } +func (m *Messenger) waitUntilP2PMessagesProcessed() { + + ticker := time.NewTicker(50 * time.Millisecond) + + for { //nolint: gosimple + select { + case <-ticker.C: + if !m.transport.ProcessingP2PMessages() { + ticker.Stop() + return + } + } + } +} + func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) { return m.transport.LoadFilters(filters) } diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 425c59711..699baa09a 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -14,6 +14,7 @@ import ( "github.com/pkg/errors" "go.uber.org/zap" + "github.com/ethereum/go-ethereum/common" "github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/types" ) @@ -217,17 +218,20 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { result := make(map[Filter][]*types.Message) logger := t.logger.With(zap.String("site", "retrieveRawAll")) - allFilters := t.filters.Filters() - for _, filter := range allFilters { - // Don't pull from filters we don't listen to - if !filter.Listen { - continue - } + for _, filter := range t.filters.Filters() { msgs, err := t.api.GetFilterMessages(filter.FilterID) if err != nil { logger.Warn("failed to fetch messages", zap.Error(err)) continue } + // Don't pull from filters we don't listen to + if !filter.Listen { + for _, msg := range msgs { + t.waku.MarkP2PMessageAsProcessed(common.BytesToHash(msg.Hash)) + } + continue + } + if len(msgs) == 0 { continue } @@ -251,7 +255,7 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) { logger.Debug("message not cached", zap.String("hash", types.EncodeHex(msgs[i].Hash))) } else { logger.Debug("message cached", zap.String("hash", types.EncodeHex(msgs[i].Hash))) - + t.waku.MarkP2PMessageAsProcessed(common.BytesToHash(msgs[i].Hash)) } } @@ -630,3 +634,11 @@ func (t *Transport) DialPeerByID(peerID string) error { func (t *Transport) DropPeer(peerID string) error { return t.waku.DropPeer(peerID) } + +func (t *Transport) ProcessingP2PMessages() bool { + return t.waku.ProcessingP2PMessages() +} + +func (t *Transport) MarkP2PMessageAsProcessed(hash common.Hash) { + t.waku.MarkP2PMessageAsProcessed(hash) +} diff --git a/waku/common/filter.go b/waku/common/filter.go index 019950480..4ecfb98f5 100644 --- a/waku/common/filter.go +++ b/waku/common/filter.go @@ -160,12 +160,14 @@ func (fs *Filters) Get(id string) *Filter { // NotifyWatchers notifies any filter that has declared interest // for the envelope's topic. -func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { +func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) bool { var msg *ReceivedMessage fs.mutex.RLock() defer fs.mutex.RUnlock() + var matched bool + candidates := fs.GetWatchersByTopic(env.Topic) for _, watcher := range candidates { if p2pMessage && !watcher.AllowP2P { @@ -194,8 +196,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) { watcher.Trigger(msg) } + matched = true } } + return matched } func (f *Filter) expectsAsymmetricEncryption() bool { @@ -219,6 +223,7 @@ func (f *Filter) Trigger(msg *ReceivedMessage) { // to a filter. func (f *Filter) Retrieve() []*ReceivedMessage { msgs, err := f.Messages.Pop() + if err != nil { log.Error("failed to retrieve messages from filter store", "error", err) return nil diff --git a/waku/waku.go b/waku/waku.go index 94601b398..928c5de2a 100644 --- a/waku/waku.go +++ b/waku/waku.go @@ -91,9 +91,12 @@ type Waku struct { peers map[common.Peer]struct{} // Set of currently active peers peerMu sync.RWMutex // Mutex to sync the active peer set - msgQueue chan *common.Envelope // Message queue for normal waku messages - p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations. - quit chan struct{} // Channel used for graceful exit + msgQueue chan *common.Envelope // Message queue for normal waku messages + p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations. + p2pMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids + p2pMsgIDsMu sync.RWMutex + + quit chan struct{} // Channel used for graceful exit settings settings // Holds configuration settings that can be dynamically changed settingsMu sync.RWMutex // Mutex to sync the settings access @@ -133,6 +136,7 @@ func New(cfg *Config, logger *zap.Logger) *Waku { peers: make(map[common.Peer]struct{}), msgQueue: make(chan *common.Envelope, messageQueueLimit), p2pMsgQueue: make(chan interface{}, messageQueueLimit), + p2pMsgIDs: make(map[gethcommon.Hash]bool), quit: make(chan struct{}), timeSource: time.Now, logger: logger, @@ -1454,7 +1458,21 @@ func (w *Waku) processP2P() { case e := <-w.p2pMsgQueue: switch evn := e.(type) { case *common.Envelope: - w.filters.NotifyWatchers(evn, true) + // We need to insert it first, and then remove it if not matched, + // as messages are processed asynchronously + w.p2pMsgIDsMu.Lock() + w.p2pMsgIDs[evn.Hash()] = true + w.p2pMsgIDsMu.Unlock() + + matched := w.filters.NotifyWatchers(evn, true) + + // If not matched we remove it + if !matched { + w.p2pMsgIDsMu.Lock() + delete(w.p2pMsgIDs, evn.Hash()) + w.p2pMsgIDsMu.Unlock() + } + w.envelopeFeed.Send(common.EnvelopeEvent{ Topic: evn.Topic, Hash: evn.Hash(), @@ -1545,6 +1563,18 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool { return exist } +func (w *Waku) ProcessingP2PMessages() bool { + w.p2pMsgIDsMu.Lock() + defer w.p2pMsgIDsMu.Unlock() + return len(w.p2pMsgIDs) != 0 +} + +func (w *Waku) MarkP2PMessageAsProcessed(hash gethcommon.Hash) { + w.p2pMsgIDsMu.Lock() + defer w.p2pMsgIDsMu.Unlock() + delete(w.p2pMsgIDs, hash) +} + // validatePrivateKey checks the format of the given private key. func validatePrivateKey(k *ecdsa.PrivateKey) bool { if k == nil || k.D == nil || k.D.Sign() == 0 {