Update whisper to v1.4.12 (#1447)

This commit is contained in:
Dmitry Shulyak 2019-04-23 20:16:48 +03:00 committed by GitHub
parent 64188f0702
commit 866342d5e3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 111 additions and 44 deletions

6
Gopkg.lock generated
View File

@ -830,12 +830,12 @@
version = "v1.1.0"
[[projects]]
digest = "1:ff23c911716ddbe23acccecf0a88bb99e89132b221a3be8dbad6a8377fd6f3a0"
digest = "1:d499fd4787bb7a4a5f6fe9f75a517346d70e1e4ab3dbcc83ed85151833e3493d"
name = "github.com/status-im/whisper"
packages = ["whisperv6"]
pruneopts = "NUT"
revision = "3a4601b568649ac152afa76551ea9c332464b867"
version = "v1.4.10"
revision = "4fae75da94b1ab6dc13a5fa7d5087bfbfa04406f"
version = "v1.4.12"
[[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -46,7 +46,7 @@
[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.4.10"
version = "=v1.4.12"
[[constraint]]
name = "golang.org/x/text"

View File

@ -24,7 +24,6 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
@ -364,7 +363,7 @@ func (api *PublicWhisperAPI) Messages(ctx context.Context, crit Criteria) (*rpc.
filter := Filter{
PoW: crit.MinPow,
Messages: make(map[common.Hash]*ReceivedMessage),
Messages: api.w.NewMessageStore(),
AllowP2P: crit.AllowP2P,
}
@ -453,6 +452,7 @@ type Message struct {
PoW float64 `json:"pow"`
Hash []byte `json:"hash"`
Dst []byte `json:"recipientPublicKey,omitempty"`
P2P bool `json:"bool,omitempty"`
}
type messageOverride struct {
@ -473,6 +473,7 @@ func ToWhisperMessage(message *ReceivedMessage) *Message {
PoW: message.PoW,
Hash: message.EnvelopeHash.Bytes(),
Topic: message.Topic,
P2P: message.P2P,
}
if message.Dst != nil {
@ -587,7 +588,7 @@ func (api *PublicWhisperAPI) NewMessageFilter(req Criteria) (string, error) {
PoW: req.MinPow,
AllowP2P: req.AllowP2P,
Topics: topics,
Messages: make(map[common.Hash]*ReceivedMessage),
Messages: api.w.NewMessageStore(),
}
id, err := api.w.Subscribe(f)

View File

@ -26,6 +26,47 @@ import (
"github.com/ethereum/go-ethereum/log"
)
// MessageStore defines interface for temporary message store.
type MessageStore interface {
Add(*ReceivedMessage) error
Pop() ([]*ReceivedMessage, error)
}
// NewMemoryMessageStore returns pointer to an instance of the MemoryMessageStore.
func NewMemoryMessageStore() *MemoryMessageStore {
return &MemoryMessageStore{
messages: map[common.Hash]*ReceivedMessage{},
}
}
// MemoryMessageStore stores massages in memory hash table.
type MemoryMessageStore struct {
mu sync.Mutex
messages map[common.Hash]*ReceivedMessage
}
// Add adds message to store.
func (store *MemoryMessageStore) Add(msg *ReceivedMessage) error {
store.mu.Lock()
defer store.mu.Unlock()
if _, exist := store.messages[msg.EnvelopeHash]; !exist {
store.messages[msg.EnvelopeHash] = msg
}
return nil
}
// Pop returns all avaiable messages and cleans the store.
func (store *MemoryMessageStore) Pop() ([]*ReceivedMessage, error) {
store.mu.Lock()
defer store.mu.Unlock()
all := make([]*ReceivedMessage, 0, len(store.messages))
for hash, msg := range store.messages {
delete(store.messages, hash)
all = append(all, msg)
}
return all, nil
}
// Filter represents a Whisper message filter
type Filter struct {
Src *ecdsa.PublicKey // Sender of the message
@ -37,7 +78,7 @@ type Filter struct {
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
id string // unique identifier
Messages map[common.Hash]*ReceivedMessage
Messages MessageStore
mutex sync.RWMutex
}
@ -68,10 +109,6 @@ func (fs *Filters) Install(watcher *Filter) (string, error) {
return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys")
}
if watcher.Messages == nil {
watcher.Messages = make(map[common.Hash]*ReceivedMessage)
}
id, err := GenerateRandomID()
if err != nil {
return "", err
@ -183,6 +220,7 @@ func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
}
if match && msg != nil {
msg.P2P = p2pMessage
log.Trace("processing message: decrypted", "hash", env.Hash().Hex())
if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) {
watcher.Trigger(msg)
@ -202,27 +240,21 @@ func (f *Filter) expectsSymmetricEncryption() bool {
// Trigger adds a yet-unknown message to the filter's list of
// received messages.
func (f *Filter) Trigger(msg *ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
if _, exist := f.Messages[msg.EnvelopeHash]; !exist {
f.Messages[msg.EnvelopeHash] = msg
err := f.Messages.Add(msg)
if err != nil {
log.Error("failed to add msg into the filters store", "hash", msg.EnvelopeHash, "error", err)
}
}
// Retrieve will return the list of all received messages associated
// to a filter.
func (f *Filter) Retrieve() (all []*ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
all = make([]*ReceivedMessage, 0, len(f.Messages))
for _, msg := range f.Messages {
all = append(all, msg)
func (f *Filter) Retrieve() []*ReceivedMessage {
msgs, err := f.Messages.Pop()
if err != nil {
log.Error("failed to retrieve messages from filter store", "error", err)
return nil
}
f.Messages = make(map[common.Hash]*ReceivedMessage) // delete old messages
return all
return msgs
}
// MatchMessage checks if the filter matches an already decrypted

View File

@ -75,6 +75,8 @@ type ReceivedMessage struct {
SymKeyHash common.Hash // The Keccak256Hash of the key
EnvelopeHash common.Hash // Message envelope hash to act as a unique id
P2P bool // is set to true if this message was received from mail server.
}
func isMessageSigned(flags byte) bool {

View File

@ -90,9 +90,9 @@ type Whisper struct {
peerMu sync.RWMutex // Mutex to sync the active peer set
peers map[*Peer]struct{} // Set of currently active peers
messageQueue chan *Envelope // Message queue for normal whisper messages
p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further)
quit chan struct{} // Channel used for graceful exit
messageQueue chan *Envelope // Message queue for normal whisper messages
p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations.
quit chan struct{} // Channel used for graceful exit
settings syncmap.Map // holds configuration settings that can be dynamically changed
@ -105,6 +105,8 @@ type Whisper struct {
mailServer MailServer // MailServer interface
messageStoreFabric func() MessageStore
envelopeFeed event.Feed
timeSource func() time.Time // source of time for whisper
@ -123,7 +125,7 @@ func New(cfg *Config) *Whisper {
expirations: make(map[uint32]mapset.Set),
peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan interface{}, messageQueueLimit),
quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance,
timeSource: time.Now,
@ -155,6 +157,19 @@ func New(cfg *Config) *Whisper {
return whisper
}
// NewMessageStore returns object that implements MessageStore.
func (whisper *Whisper) NewMessageStore() MessageStore {
if whisper.messageStoreFabric != nil {
return whisper.messageStoreFabric()
}
return NewMemoryMessageStore()
}
// SetMessageStore allows to inject custom implementation of the message store.
func (whisper *Whisper) SetMessageStore(fabric func() MessageStore) {
whisper.messageStoreFabric = fabric
}
// SetTimeSource assigns a particular source of time to a whisper object.
func (whisper *Whisper) SetTimeSource(timesource func() time.Time) {
whisper.timeSource = timesource
@ -800,6 +815,7 @@ func (whisper *Whisper) Start(*p2p.Server) error {
for i := 0; i < numCPU; i++ {
go whisper.processQueue()
}
go whisper.processP2P()
return nil
}
@ -990,7 +1006,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if err = packet.Decode(&envelopes); err == nil {
for _, envelope := range envelopes {
whisper.postEvent(envelope, true)
whisper.postP2P(envelope)
}
continue
}
@ -1004,7 +1020,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
if err = packet.Decode(&envelope); err == nil {
whisper.postEvent(envelope, true)
whisper.postP2P(envelope)
continue
}
@ -1088,7 +1104,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
}
if event != nil {
whisper.envelopeFeed.Send(*event)
whisper.postP2P(*event)
}
}
@ -1193,14 +1209,19 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
return true, nil
}
func (whisper *Whisper) postP2P(event interface{}) {
whisper.p2pMsgQueue <- event
}
// postEvent queues the message for further processing.
func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) {
if isP2P {
whisper.p2pMsgQueue <- envelope
whisper.postP2P(envelope)
} else {
whisper.checkOverflow()
whisper.messageQueue <- envelope
}
}
// checkOverflow checks if message queue overflow occurs and reports it if necessary.
@ -1222,25 +1243,36 @@ func (whisper *Whisper) checkOverflow() {
// processQueue delivers the messages to the watchers during the lifetime of the whisper node.
func (whisper *Whisper) processQueue() {
var e *Envelope
for {
select {
case <-whisper.quit:
return
case e = <-whisper.messageQueue:
case e := <-whisper.messageQueue:
whisper.filters.NotifyWatchers(e, false)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(),
Event: EventEnvelopeAvailable,
})
}
}
}
case e = <-whisper.p2pMsgQueue:
whisper.filters.NotifyWatchers(e, true)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(),
Event: EventEnvelopeAvailable,
})
func (whisper *Whisper) processP2P() {
for {
select {
case <-whisper.quit:
return
case e := <-whisper.p2pMsgQueue:
switch event := e.(type) {
case *Envelope:
whisper.filters.NotifyWatchers(event, true)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: event.Hash(),
Event: EventEnvelopeAvailable,
})
case EnvelopeEvent:
whisper.envelopeFeed.Send(event)
}
}
}
}