From 866342d5e3674d5dfa5301869e903cea030b5456 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Tue, 23 Apr 2019 20:16:48 +0300 Subject: [PATCH] Update whisper to v1.4.12 (#1447) --- Gopkg.lock | 6 +- Gopkg.toml | 2 +- .../status-im/whisper/whisperv6/api.go | 7 +- .../status-im/whisper/whisperv6/filter.go | 72 +++++++++++++------ .../status-im/whisper/whisperv6/message.go | 2 + .../status-im/whisper/whisperv6/whisper.go | 66 ++++++++++++----- 6 files changed, 111 insertions(+), 44 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index edd9ab99e..be0ac2877 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -830,12 +830,12 @@ version = "v1.1.0" [[projects]] - digest = "1:ff23c911716ddbe23acccecf0a88bb99e89132b221a3be8dbad6a8377fd6f3a0" + digest = "1:d499fd4787bb7a4a5f6fe9f75a517346d70e1e4ab3dbcc83ed85151833e3493d" name = "github.com/status-im/whisper" packages = ["whisperv6"] pruneopts = "NUT" - revision = "3a4601b568649ac152afa76551ea9c332464b867" - version = "v1.4.10" + revision = "4fae75da94b1ab6dc13a5fa7d5087bfbfa04406f" + version = "v1.4.12" [[projects]] digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e" diff --git a/Gopkg.toml b/Gopkg.toml index 48b9731d9..1e8cfd49e 100644 --- a/Gopkg.toml +++ b/Gopkg.toml @@ -46,7 +46,7 @@ [[constraint]] name = "github.com/status-im/whisper" - version = "=v1.4.10" + version = "=v1.4.12" [[constraint]] name = "golang.org/x/text" diff --git a/vendor/github.com/status-im/whisper/whisperv6/api.go b/vendor/github.com/status-im/whisper/whisperv6/api.go index 3f138b6f7..4fab045f5 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/api.go +++ b/vendor/github.com/status-im/whisper/whisperv6/api.go @@ -24,7 +24,6 @@ import ( "sync" "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" @@ -364,7 +363,7 @@ func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc. filter := Filter{ PoW: crit.MinPow, - Messages: make(map[common.Hash]*ReceivedMessage), + Messages: api.w.NewMessageStore(), AllowP2P: crit.AllowP2P, } @@ -453,6 +452,7 @@ type Message struct { PoW float64 `json:"pow"` Hash []byte `json:"hash"` Dst []byte `json:"recipientPublicKey,omitempty"` + P2P bool `json:"bool,omitempty"` } type messageOverride struct { @@ -473,6 +473,7 @@ func ToWhisperMessage(message *ReceivedMessage) *Message { PoW: message.PoW, Hash: message.EnvelopeHash.Bytes(), Topic: message.Topic, + P2P: message.P2P, } if message.Dst != nil { @@ -587,7 +588,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) { PoW: req.MinPow, AllowP2P: req.AllowP2P, Topics: topics, - Messages: make(map[common.Hash]*ReceivedMessage), + Messages: api.w.NewMessageStore(), } id, err := api.w.Subscribe(f) diff --git a/vendor/github.com/status-im/whisper/whisperv6/filter.go b/vendor/github.com/status-im/whisper/whisperv6/filter.go index 6a5b79674..b75f7b3c8 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/filter.go +++ b/vendor/github.com/status-im/whisper/whisperv6/filter.go @@ -26,6 +26,47 @@ import ( "github.com/ethereum/go-ethereum/log" ) +// MessageStore defines interface for temporary message store. +type MessageStore interface { + Add(*ReceivedMessage) error + Pop() ([]*ReceivedMessage, error) +} + +// NewMemoryMessageStore returns pointer to an instance of the MemoryMessageStore. +func NewMemoryMessageStore() *MemoryMessageStore { + return &MemoryMessageStore{ + messages: map[common.Hash]*ReceivedMessage{}, + } +} + +// MemoryMessageStore stores massages in memory hash table. +type MemoryMessageStore struct { + mu sync.Mutex + messages map[common.Hash]*ReceivedMessage +} + +// Add adds message to store. +func (store *MemoryMessageStore) Add(msg *ReceivedMessage) error { + store.mu.Lock() + defer store.mu.Unlock() + if _, exist := store.messages[msg.EnvelopeHash]; !exist { + store.messages[msg.EnvelopeHash] = msg + } + return nil +} + +// Pop returns all avaiable messages and cleans the store. +func (store *MemoryMessageStore) Pop() ([]*ReceivedMessage, error) { + store.mu.Lock() + defer store.mu.Unlock() + all := make([]*ReceivedMessage, 0, len(store.messages)) + for hash, msg := range store.messages { + delete(store.messages, hash) + all = append(all, msg) + } + return all, nil +} + // Filter represents a Whisper message filter type Filter struct { Src *ecdsa.PublicKey // Sender of the message @@ -37,7 +78,7 @@ type Filter struct { SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization id string // unique identifier - Messages map[common.Hash]*ReceivedMessage + Messages MessageStore mutex sync.RWMutex } @@ -68,10 +109,6 @@ func (fs *Filters) Install(watcher *Filter) (string, error) { return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys") } - if watcher.Messages == nil { - watcher.Messages = make(map[common.Hash]*ReceivedMessage) - } - id, err := GenerateRandomID() if err != nil { return "", err @@ -183,6 +220,7 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { } if match && msg != nil { + msg.P2P = p2pMessage log.Trace("processing message: decrypted", "hash", env.Hash().Hex()) if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) { watcher.Trigger(msg) @@ -202,27 +240,21 @@ func (f *Filter) expectsSymmetricEncryption() bool { // Trigger adds a yet-unknown message to the filter's list of // received messages. func (f *Filter) Trigger(msg *ReceivedMessage) { - f.mutex.Lock() - defer f.mutex.Unlock() - - if _, exist := f.Messages[msg.EnvelopeHash]; !exist { - f.Messages[msg.EnvelopeHash] = msg + err := f.Messages.Add(msg) + if err != nil { + log.Error("failed to add msg into the filters store", "hash", msg.EnvelopeHash, "error", err) } } // Retrieve will return the list of all received messages associated // to a filter. -func (f *Filter) Retrieve() (all []*ReceivedMessage) { - f.mutex.Lock() - defer f.mutex.Unlock() - - all = make([]*ReceivedMessage, 0, len(f.Messages)) - for _, msg := range f.Messages { - all = append(all, msg) +func (f *Filter) Retrieve() []*ReceivedMessage { + msgs, err := f.Messages.Pop() + if err != nil { + log.Error("failed to retrieve messages from filter store", "error", err) + return nil } - - f.Messages = make(map[common.Hash]*ReceivedMessage) // delete old messages - return all + return msgs } // MatchMessage checks if the filter matches an already decrypted diff --git a/vendor/github.com/status-im/whisper/whisperv6/message.go b/vendor/github.com/status-im/whisper/whisperv6/message.go index a12b445e2..44c492f8f 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/message.go +++ b/vendor/github.com/status-im/whisper/whisperv6/message.go @@ -75,6 +75,8 @@ type ReceivedMessage struct { SymKeyHash common.Hash // The Keccak256Hash of the key EnvelopeHash common.Hash // Message envelope hash to act as a unique id + + P2P bool // is set to true if this message was received from mail server. } func isMessageSigned(flags byte) bool { diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index 455f2465a..15cef7dcb 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -90,9 +90,9 @@ type Whisper struct { peerMu sync.RWMutex // Mutex to sync the active peer set peers map[*Peer]struct{} // Set of currently active peers - messageQueue chan *Envelope // Message queue for normal whisper messages - p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further) - quit chan struct{} // Channel used for graceful exit + messageQueue chan *Envelope // Message queue for normal whisper messages + p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations. + quit chan struct{} // Channel used for graceful exit settings syncmap.Map // holds configuration settings that can be dynamically changed @@ -105,6 +105,8 @@ type Whisper struct { mailServer MailServer // MailServer interface + messageStoreFabric func() MessageStore + envelopeFeed event.Feed timeSource func() time.Time // source of time for whisper @@ -123,7 +125,7 @@ func New(cfg *Config) *Whisper { expirations: make(map[uint32]mapset.Set), peers: make(map[*Peer]struct{}), messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan interface{}, messageQueueLimit), quit: make(chan struct{}), syncAllowance: DefaultSyncAllowance, timeSource: time.Now, @@ -155,6 +157,19 @@ func New(cfg *Config) *Whisper { return whisper } +// NewMessageStore returns object that implements MessageStore. +func (whisper *Whisper) NewMessageStore() MessageStore { + if whisper.messageStoreFabric != nil { + return whisper.messageStoreFabric() + } + return NewMemoryMessageStore() +} + +// SetMessageStore allows to inject custom implementation of the message store. +func (whisper *Whisper) SetMessageStore(fabric func() MessageStore) { + whisper.messageStoreFabric = fabric +} + // SetTimeSource assigns a particular source of time to a whisper object. func (whisper *Whisper) SetTimeSource(timesource func() time.Time) { whisper.timeSource = timesource @@ -800,6 +815,7 @@ func (whisper *Whisper) Start(*p2p.Server) error { for i := 0; i < numCPU; i++ { go whisper.processQueue() } + go whisper.processP2P() return nil } @@ -990,7 +1006,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { if err = packet.Decode(&envelopes); err == nil { for _, envelope := range envelopes { - whisper.postEvent(envelope, true) + whisper.postP2P(envelope) } continue } @@ -1004,7 +1020,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } if err = packet.Decode(&envelope); err == nil { - whisper.postEvent(envelope, true) + whisper.postP2P(envelope) continue } @@ -1088,7 +1104,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } if event != nil { - whisper.envelopeFeed.Send(*event) + whisper.postP2P(*event) } } @@ -1193,14 +1209,19 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { return true, nil } +func (whisper *Whisper) postP2P(event interface{}) { + whisper.p2pMsgQueue <- event +} + // postEvent queues the message for further processing. func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) { if isP2P { - whisper.p2pMsgQueue <- envelope + whisper.postP2P(envelope) } else { whisper.checkOverflow() whisper.messageQueue <- envelope } + } // checkOverflow checks if message queue overflow occurs and reports it if necessary. @@ -1222,25 +1243,36 @@ func (whisper *Whisper) checkOverflow() { // processQueue delivers the messages to the watchers during the lifetime of the whisper node. func (whisper *Whisper) processQueue() { - var e *Envelope for { select { case <-whisper.quit: return - - case e = <-whisper.messageQueue: + case e := <-whisper.messageQueue: whisper.filters.NotifyWatchers(e, false) whisper.envelopeFeed.Send(EnvelopeEvent{ Hash: e.Hash(), Event: EventEnvelopeAvailable, }) + } + } +} - case e = <-whisper.p2pMsgQueue: - whisper.filters.NotifyWatchers(e, true) - whisper.envelopeFeed.Send(EnvelopeEvent{ - Hash: e.Hash(), - Event: EventEnvelopeAvailable, - }) +func (whisper *Whisper) processP2P() { + for { + select { + case <-whisper.quit: + return + case e := <-whisper.p2pMsgQueue: + switch event := e.(type) { + case *Envelope: + whisper.filters.NotifyWatchers(event, true) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: event.Hash(), + Event: EventEnvelopeAvailable, + }) + case EnvelopeEvent: + whisper.envelopeFeed.Send(event) + } } } }