Wait for p2p messages to be processed

This commit is contained in:
Andrea Maria Piana 2021-11-26 12:30:35 +00:00
parent f271277b76
commit f60cf5cd47
9 changed files with 106 additions and 13 deletions

View File

@ -1 +1 @@
0.91.11 0.91.12

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/waku" "github.com/status-im/status-go/waku"
wakucommon "github.com/status-im/status-go/waku/common" wakucommon "github.com/status-im/status-go/waku/common"
@ -235,6 +236,14 @@ func (w *gethWakuWrapper) RequestHistoricMessagesWithTimeout(peerID []byte, enve
return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*wakucommon.Envelope), timeout) return w.waku.RequestHistoricMessagesWithTimeout(peerID, envelope.Unwrap().(*wakucommon.Envelope), timeout)
} }
func (w *gethWakuWrapper) ProcessingP2PMessages() bool {
return w.waku.ProcessingP2PMessages()
}
func (w *gethWakuWrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
w.waku.MarkP2PMessageAsProcessed(hash)
}
func (w *gethWakuWrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) { func (w *gethWakuWrapper) RequestStoreMessages(peerID []byte, r types.MessagesRequest) (*types.StoreRequestCursor, error) {
return nil, errors.New("not implemented") return nil, errors.New("not implemented")
} }

View File

@ -9,6 +9,7 @@ import (
"github.com/status-im/go-waku/waku/v2/protocol/pb" "github.com/status-im/go-waku/waku/v2/protocol/pb"
"github.com/status-im/go-waku/waku/v2/protocol/store" "github.com/status-im/go-waku/waku/v2/protocol/store"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/wakuv2" "github.com/status-im/status-go/wakuv2"
wakucommon "github.com/status-im/status-go/wakuv2/common" wakucommon "github.com/status-im/status-go/wakuv2/common"
@ -254,6 +255,15 @@ func (w *gethWakuV2Wrapper) DropPeer(peerID string) error {
return w.waku.DropPeer(peerID) return w.waku.DropPeer(peerID)
} }
func (w *gethWakuV2Wrapper) ProcessingP2PMessages() bool {
// NOTE: not implemented for waku 2
return false
}
func (w *gethWakuV2Wrapper) MarkP2PMessageAsProcessed(hash common.Hash) {
// NOTE: not implemented for waku 2
}
type wakuV2FilterWrapper struct { type wakuV2FilterWrapper struct {
filter *wakucommon.Filter filter *wakucommon.Filter
id string id string

View File

@ -3,6 +3,8 @@ package types
import ( import (
"crypto/ecdsa" "crypto/ecdsa"
"time" "time"
"github.com/ethereum/go-ethereum/common"
) )
// Whisper represents a dark communication interface through the Ethereum // Whisper represents a dark communication interface through the Ethereum
@ -79,4 +81,10 @@ type Waku interface {
// RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages // RequestStoreMessages uses the WAKU2-STORE protocol to request historic messages
RequestStoreMessages(peerID []byte, request MessagesRequest) (*StoreRequestCursor, error) RequestStoreMessages(peerID []byte, request MessagesRequest) (*StoreRequestCursor, error)
// ProcessingP2PMessages indicates whether there are in-flight p2p messages
ProcessingP2PMessages() bool
// MarkP2PMessageAsProcessed tells the waku layer that a P2P message has been processed
MarkP2PMessageAsProcessed(common.Hash)
} }

View File

@ -21,6 +21,7 @@ import (
"github.com/davecgh/go-spew/spew" "github.com/davecgh/go-spew/spew"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/appdatabase" "github.com/status-im/status-go/appdatabase"
"github.com/status-im/status-go/appmetrics" "github.com/status-im/status-go/appmetrics"
"github.com/status-im/status-go/connection" "github.com/status-im/status-go/connection"
@ -3287,6 +3288,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
messageState.Response.CommunityChanges = nil messageState.Response.CommunityChanges = nil
if allMessagesProcessed { if allMessagesProcessed {
m.transport.MarkP2PMessageAsProcessed(gethcommon.BytesToHash(shhMessage.Hash))
processedMessages = append(processedMessages, types.EncodeHex(shhMessage.Hash)) processedMessages = append(processedMessages, types.EncodeHex(shhMessage.Hash))
} }
} }

View File

@ -430,6 +430,8 @@ func (m *Messenger) processMailserverBatch(batch MailserverBatch) error {
return err return err
} }
} }
logger.Info("waiting until message processed")
m.waitUntilP2PMessagesProcessed()
logger.Info("synced topic") logger.Info("synced topic")
return nil return nil
} }
@ -572,6 +574,21 @@ func (m *Messenger) FillGaps(chatID string, messageIDs []string) error {
return m.persistence.DeleteMessages(messageIDs) return m.persistence.DeleteMessages(messageIDs)
} }
func (m *Messenger) waitUntilP2PMessagesProcessed() {
ticker := time.NewTicker(50 * time.Millisecond)
for { //nolint: gosimple
select {
case <-ticker.C:
if !m.transport.ProcessingP2PMessages() {
ticker.Stop()
return
}
}
}
}
func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) { func (m *Messenger) LoadFilters(filters []*transport.Filter) ([]*transport.Filter, error) {
return m.transport.LoadFilters(filters) return m.transport.LoadFilters(filters)
} }

View File

@ -14,6 +14,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"go.uber.org/zap" "go.uber.org/zap"
"github.com/ethereum/go-ethereum/common"
"github.com/status-im/status-go/eth-node/crypto" "github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/eth-node/types"
) )
@ -217,17 +218,20 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
result := make(map[Filter][]*types.Message) result := make(map[Filter][]*types.Message)
logger := t.logger.With(zap.String("site", "retrieveRawAll")) logger := t.logger.With(zap.String("site", "retrieveRawAll"))
allFilters := t.filters.Filters() for _, filter := range t.filters.Filters() {
for _, filter := range allFilters {
// Don't pull from filters we don't listen to
if !filter.Listen {
continue
}
msgs, err := t.api.GetFilterMessages(filter.FilterID) msgs, err := t.api.GetFilterMessages(filter.FilterID)
if err != nil { if err != nil {
logger.Warn("failed to fetch messages", zap.Error(err)) logger.Warn("failed to fetch messages", zap.Error(err))
continue continue
} }
// Don't pull from filters we don't listen to
if !filter.Listen {
for _, msg := range msgs {
t.waku.MarkP2PMessageAsProcessed(common.BytesToHash(msg.Hash))
}
continue
}
if len(msgs) == 0 { if len(msgs) == 0 {
continue continue
} }
@ -251,7 +255,7 @@ func (t *Transport) RetrieveRawAll() (map[Filter][]*types.Message, error) {
logger.Debug("message not cached", zap.String("hash", types.EncodeHex(msgs[i].Hash))) logger.Debug("message not cached", zap.String("hash", types.EncodeHex(msgs[i].Hash)))
} else { } else {
logger.Debug("message cached", zap.String("hash", types.EncodeHex(msgs[i].Hash))) logger.Debug("message cached", zap.String("hash", types.EncodeHex(msgs[i].Hash)))
t.waku.MarkP2PMessageAsProcessed(common.BytesToHash(msgs[i].Hash))
} }
} }
@ -630,3 +634,11 @@ func (t *Transport) DialPeerByID(peerID string) error {
func (t *Transport) DropPeer(peerID string) error { func (t *Transport) DropPeer(peerID string) error {
return t.waku.DropPeer(peerID) return t.waku.DropPeer(peerID)
} }
func (t *Transport) ProcessingP2PMessages() bool {
return t.waku.ProcessingP2PMessages()
}
func (t *Transport) MarkP2PMessageAsProcessed(hash common.Hash) {
t.waku.MarkP2PMessageAsProcessed(hash)
}

View File

@ -160,12 +160,14 @@ func (fs *Filters) Get(id string) *Filter {
// NotifyWatchers notifies any filter that has declared interest // NotifyWatchers notifies any filter that has declared interest
// for the envelope's topic. // for the envelope's topic.
func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) bool {
var msg *ReceivedMessage var msg *ReceivedMessage
fs.mutex.RLock() fs.mutex.RLock()
defer fs.mutex.RUnlock() defer fs.mutex.RUnlock()
var matched bool
candidates := fs.GetWatchersByTopic(env.Topic) candidates := fs.GetWatchersByTopic(env.Topic)
for _, watcher := range candidates { for _, watcher := range candidates {
if p2pMessage && !watcher.AllowP2P { if p2pMessage && !watcher.AllowP2P {
@ -194,8 +196,10 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) { if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) {
watcher.Trigger(msg) watcher.Trigger(msg)
} }
matched = true
} }
} }
return matched
} }
func (f *Filter) expectsAsymmetricEncryption() bool { func (f *Filter) expectsAsymmetricEncryption() bool {
@ -219,6 +223,7 @@ func (f *Filter) Trigger(msg *ReceivedMessage) {
// to a filter. // to a filter.
func (f *Filter) Retrieve() []*ReceivedMessage { func (f *Filter) Retrieve() []*ReceivedMessage {
msgs, err := f.Messages.Pop() msgs, err := f.Messages.Pop()
if err != nil { if err != nil {
log.Error("failed to retrieve messages from filter store", "error", err) log.Error("failed to retrieve messages from filter store", "error", err)
return nil return nil

View File

@ -91,9 +91,12 @@ type Waku struct {
peers map[common.Peer]struct{} // Set of currently active peers peers map[common.Peer]struct{} // Set of currently active peers
peerMu sync.RWMutex // Mutex to sync the active peer set peerMu sync.RWMutex // Mutex to sync the active peer set
msgQueue chan *common.Envelope // Message queue for normal waku messages msgQueue chan *common.Envelope // Message queue for normal waku messages
p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations. p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations.
quit chan struct{} // Channel used for graceful exit p2pMsgIDs map[gethcommon.Hash]bool // Map of the currently processing ids
p2pMsgIDsMu sync.RWMutex
quit chan struct{} // Channel used for graceful exit
settings settings // Holds configuration settings that can be dynamically changed settings settings // Holds configuration settings that can be dynamically changed
settingsMu sync.RWMutex // Mutex to sync the settings access settingsMu sync.RWMutex // Mutex to sync the settings access
@ -133,6 +136,7 @@ func New(cfg *Config, logger *zap.Logger) *Waku {
peers: make(map[common.Peer]struct{}), peers: make(map[common.Peer]struct{}),
msgQueue: make(chan *common.Envelope, messageQueueLimit), msgQueue: make(chan *common.Envelope, messageQueueLimit),
p2pMsgQueue: make(chan interface{}, messageQueueLimit), p2pMsgQueue: make(chan interface{}, messageQueueLimit),
p2pMsgIDs: make(map[gethcommon.Hash]bool),
quit: make(chan struct{}), quit: make(chan struct{}),
timeSource: time.Now, timeSource: time.Now,
logger: logger, logger: logger,
@ -1454,7 +1458,21 @@ func (w *Waku) processP2P() {
case e := <-w.p2pMsgQueue: case e := <-w.p2pMsgQueue:
switch evn := e.(type) { switch evn := e.(type) {
case *common.Envelope: case *common.Envelope:
w.filters.NotifyWatchers(evn, true) // We need to insert it first, and then remove it if not matched,
// as messages are processed asynchronously
w.p2pMsgIDsMu.Lock()
w.p2pMsgIDs[evn.Hash()] = true
w.p2pMsgIDsMu.Unlock()
matched := w.filters.NotifyWatchers(evn, true)
// If not matched we remove it
if !matched {
w.p2pMsgIDsMu.Lock()
delete(w.p2pMsgIDs, evn.Hash())
w.p2pMsgIDsMu.Unlock()
}
w.envelopeFeed.Send(common.EnvelopeEvent{ w.envelopeFeed.Send(common.EnvelopeEvent{
Topic: evn.Topic, Topic: evn.Topic,
Hash: evn.Hash(), Hash: evn.Hash(),
@ -1545,6 +1563,18 @@ func (w *Waku) IsEnvelopeCached(hash gethcommon.Hash) bool {
return exist return exist
} }
func (w *Waku) ProcessingP2PMessages() bool {
w.p2pMsgIDsMu.Lock()
defer w.p2pMsgIDsMu.Unlock()
return len(w.p2pMsgIDs) != 0
}
func (w *Waku) MarkP2PMessageAsProcessed(hash gethcommon.Hash) {
w.p2pMsgIDsMu.Lock()
defer w.p2pMsgIDsMu.Unlock()
delete(w.p2pMsgIDs, hash)
}
// validatePrivateKey checks the format of the given private key. // validatePrivateKey checks the format of the given private key.
func validatePrivateKey(k *ecdsa.PrivateKey) bool { func validatePrivateKey(k *ecdsa.PrivateKey) bool {
if k == nil || k.D == nil || k.D.Sign() == 0 { if k == nil || k.D == nil || k.D.Sign() == 0 {