diff --git a/go.mod b/go.mod index 723cbf5cb..0f4053427 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/status-im/doubleratchet v2.0.0+incompatible github.com/status-im/migrate/v4 v4.3.1-status github.com/status-im/rendezvous v1.3.0 - github.com/status-im/whisper v1.4.13 + github.com/status-im/whisper v1.4.14 github.com/stretchr/testify v1.3.0 github.com/syndtr/goleveldb v1.0.0 golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 diff --git a/go.sum b/go.sum index d944da787..4c42fe138 100644 --- a/go.sum +++ b/go.sum @@ -438,8 +438,8 @@ github.com/status-im/migrate/v4 v4.3.1-status h1:tJwsEYLgbFkvlTSMk89APwRDfpr4yG8 github.com/status-im/migrate/v4 v4.3.1-status/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA= github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4Lbjg4= github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s= -github.com/status-im/whisper v1.4.13 h1:V8aC66CkpTYFS/l9lSp5NW2xHOlpRnGZxKdJmDVLFxE= -github.com/status-im/whisper v1.4.13/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0= +github.com/status-im/whisper v1.4.14 h1:9VHqx4+PUYfhDnYYtDxHkg/3cfVvkHjPNciY4LO83yc= +github.com/status-im/whisper v1.4.14/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/mailserver/README.md b/mailserver/README.md index be9531582..9b589163f 100644 --- a/mailserver/README.md +++ b/mailserver/README.md @@ -24,10 +24,12 @@ $ echo '{"jsonrpc":"2.0","method":"shh_markTrustedPeer", "params": ["enode://c42 4. Finally, trigger the sync command: ``` # sudo might be not needed in your setup -$ echo '{"jsonrpc":"2.0","method":"shhext_syncMessages","params":[{"mailServerPeer":"enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.243.162:30504", "to": 1550479953, "from": 1550393583}],"id":1}' | \ +$ echo '{"jsonrpc":"2.0","method":"shhext_syncMessages","params":[{"mailServerPeer":"enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.243.162:30504", "to": 1550479953, "from": 1550393583, "limit": 1000}],"id":1}' | \ sudo socat -d -d - UNIX-CONNECT:/docker/statusd-mail/data/geth.ipc ``` +You can add `"followCursor": true` if you want it to automatically download messages until the cursor is empty meaning all data was synced. + ### Debugging To verify that your mail server received any responses, watch logs and seek for logs like this: diff --git a/mailserver/mailserver.go b/mailserver/mailserver.go index 9c59de393..adda5acfb 100644 --- a/mailserver/mailserver.go +++ b/mailserver/mailserver.go @@ -523,10 +523,12 @@ func (s *WMailServer) processRequestInBundles( } + // TODO(adam): this is invalid code. If the limit is 1000, + // it will only send 999 items and send a cursor. lastEnvelopeHash = key.EnvelopeHash() processedEnvelopes++ envelopeSize := uint32(len(rawValue)) - limitReached := processedEnvelopes == limit + limitReached := processedEnvelopes >= limit newSize := bundleSize + envelopeSize // If we still have some room for messages, add and continue diff --git a/services/shhext/api.go b/services/shhext/api.go index 3ff51391d..53f439588 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -139,6 +139,9 @@ type SyncMessagesRequest struct { // Cursor is used as starting point for paginated requests Cursor string `json:"cursor"` + // FollowCursor if true loads messages until cursor is empty. + FollowCursor bool `json:"followCursor"` + // Topics is a list of Whisper topics. // If empty, a full bloom filter will be used. Topics []whisper.TopicType `json:"topics"` @@ -373,44 +376,36 @@ func createSyncMessagesResponse(r whisper.SyncEventResponse) SyncMessagesRespons // SyncMessages sends a request to a given MailServerPeer to sync historic messages. // MailServerPeers needs to be added as a trusted peer first. func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) { + log.Info("SyncMessages start", "request", r) + var response SyncMessagesResponse mailServerEnode, err := enode.ParseV4(r.MailServerPeer) if err != nil { return response, fmt.Errorf("invalid MailServerPeer: %v", err) } + mailServerID := mailServerEnode.ID().Bytes() request, err := createSyncMailRequest(r) if err != nil { return response, fmt.Errorf("failed to create a sync mail request: %v", err) } - if err := api.service.w.SyncMessages(mailServerEnode.ID().Bytes(), request); err != nil { - return response, fmt.Errorf("failed to send a sync request: %v", err) - } - - // Wait for the response which is received asynchronously as a p2p packet. - // This packet handler will send an event which contains the response payload. - events := make(chan whisper.EnvelopeEvent) - sub := api.service.w.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - for { - select { - case event := <-events: - if event.Event != whisper.EventMailServerSyncFinished { - continue - } + log.Info("Sending a request to sync messages", "request", request) - log.Info("received EventMailServerSyncFinished event", "data", event.Data) - - if resp, ok := event.Data.(whisper.SyncEventResponse); ok { - return createSyncMessagesResponse(resp), nil - } - return response, fmt.Errorf("did not understand the response event data") - case <-ctx.Done(): - return response, ctx.Err() + resp, err := api.service.syncMessages(ctx, mailServerID, request) + if err != nil { + return response, err } + + log.Info("Syncing messages response", "error", resp.Error, "cursor", fmt.Sprintf("%#x", resp.Cursor)) + + if resp.Error != "" || len(resp.Cursor) == 0 || !r.FollowCursor { + return createSyncMessagesResponse(resp), nil + } + + request.Cursor = resp.Cursor } } diff --git a/services/shhext/service.go b/services/shhext/service.go index bd4dc4c73..9e6c5bffc 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -1,7 +1,9 @@ package shhext import ( + "context" "crypto/ecdsa" + "fmt" "time" "github.com/ethereum/go-ethereum/common" @@ -179,3 +181,46 @@ func (s *Service) Stop() error { return s.Service.Stop() } + +func (s *Service) syncMessages(ctx context.Context, mailServerID []byte, r whisper.SyncMailRequest) (resp whisper.SyncEventResponse, err error) { + err = s.w.SyncMessages(mailServerID, r) + if err != nil { + return + } + + // Wait for the response which is received asynchronously as a p2p packet. + // This packet handler will send an event which contains the response payload. + events := make(chan whisper.EnvelopeEvent, 1024) + sub := s.w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + + // Add explicit timeout context, otherwise the request + // can hang indefinitely if not specified by the sender. + // Sender is usually through netcat or some bash tool + // so it's not really possible to specify the timeout. + timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30) + defer cancel() + + for { + select { + case event := <-events: + if event.Event != whisper.EventMailServerSyncFinished { + continue + } + + log.Info("received EventMailServerSyncFinished event", "data", event.Data) + + var ok bool + + resp, ok = event.Data.(whisper.SyncEventResponse) + if !ok { + err = fmt.Errorf("did not understand the response event data") + return + } + return + case <-timeoutCtx.Done(): + err = timeoutCtx.Err() + return + } + } +} diff --git a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/filter.go b/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/filter.go deleted file mode 100644 index 6a5b79674..000000000 --- a/vendor/github.com/ethereum/go-ethereum/whisper/whisperv6/filter.go +++ /dev/null @@ -1,262 +0,0 @@ -// Copyright 2016 The go-ethereum Authors -// This file is part of the go-ethereum library. -// -// The go-ethereum library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The go-ethereum library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the go-ethereum library. If not, see . - -package whisperv6 - -import ( - "crypto/ecdsa" - "fmt" - "sync" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/crypto" - "github.com/ethereum/go-ethereum/log" -) - -// Filter represents a Whisper message filter -type Filter struct { - Src *ecdsa.PublicKey // Sender of the message - KeyAsym *ecdsa.PrivateKey // Private Key of recipient - KeySym []byte // Key associated with the Topic - Topics [][]byte // Topics to filter messages with - PoW float64 // Proof of work as described in the Whisper spec - AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages - SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization - id string // unique identifier - - Messages map[common.Hash]*ReceivedMessage - mutex sync.RWMutex -} - -// Filters represents a collection of filters -type Filters struct { - watchers map[string]*Filter - - topicMatcher map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic - allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is - - whisper *Whisper - mutex sync.RWMutex -} - -// NewFilters returns a newly created filter collection -func NewFilters(w *Whisper) *Filters { - return &Filters{ - watchers: make(map[string]*Filter), - topicMatcher: make(map[TopicType]map[*Filter]struct{}), - allTopicsMatcher: make(map[*Filter]struct{}), - whisper: w, - } -} - -// Install will add a new filter to the filter collection -func (fs *Filters) Install(watcher *Filter) (string, error) { - if watcher.KeySym != nil && watcher.KeyAsym != nil { - return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys") - } - - if watcher.Messages == nil { - watcher.Messages = make(map[common.Hash]*ReceivedMessage) - } - - id, err := GenerateRandomID() - if err != nil { - return "", err - } - - fs.mutex.Lock() - defer fs.mutex.Unlock() - - if fs.watchers[id] != nil { - return "", fmt.Errorf("failed to generate unique ID") - } - - if watcher.expectsSymmetricEncryption() { - watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym) - } - - watcher.id = id - fs.watchers[id] = watcher - fs.addTopicMatcher(watcher) - return id, err -} - -// Uninstall will remove a filter whose id has been specified from -// the filter collection -func (fs *Filters) Uninstall(id string) bool { - fs.mutex.Lock() - defer fs.mutex.Unlock() - if fs.watchers[id] != nil { - fs.removeFromTopicMatchers(fs.watchers[id]) - delete(fs.watchers, id) - return true - } - return false -} - -// addTopicMatcher adds a filter to the topic matchers. -// If the filter's Topics array is empty, it will be tried on every topic. -// Otherwise, it will be tried on the topics specified. -func (fs *Filters) addTopicMatcher(watcher *Filter) { - if len(watcher.Topics) == 0 { - fs.allTopicsMatcher[watcher] = struct{}{} - } else { - for _, t := range watcher.Topics { - topic := BytesToTopic(t) - if fs.topicMatcher[topic] == nil { - fs.topicMatcher[topic] = make(map[*Filter]struct{}) - } - fs.topicMatcher[topic][watcher] = struct{}{} - } - } -} - -// removeFromTopicMatchers removes a filter from the topic matchers -func (fs *Filters) removeFromTopicMatchers(watcher *Filter) { - delete(fs.allTopicsMatcher, watcher) - for _, topic := range watcher.Topics { - delete(fs.topicMatcher[BytesToTopic(topic)], watcher) - } -} - -// getWatchersByTopic returns a slice containing the filters that -// match a specific topic -func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter { - res := make([]*Filter, 0, len(fs.allTopicsMatcher)) - for watcher := range fs.allTopicsMatcher { - res = append(res, watcher) - } - for watcher := range fs.topicMatcher[topic] { - res = append(res, watcher) - } - return res -} - -// Get returns a filter from the collection with a specific ID -func (fs *Filters) Get(id string) *Filter { - fs.mutex.RLock() - defer fs.mutex.RUnlock() - return fs.watchers[id] -} - -// NotifyWatchers notifies any filter that has declared interest -// for the envelope's topic. -func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) { - var msg *ReceivedMessage - - fs.mutex.RLock() - defer fs.mutex.RUnlock() - - candidates := fs.getWatchersByTopic(env.Topic) - for _, watcher := range candidates { - if p2pMessage && !watcher.AllowP2P { - log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id)) - continue - } - - var match bool - if msg != nil { - match = watcher.MatchMessage(msg) - } else { - match = watcher.MatchEnvelope(env) - if match { - msg = env.Open(watcher) - if msg == nil { - log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id) - } - } else { - log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id) - } - } - - if match && msg != nil { - log.Trace("processing message: decrypted", "hash", env.Hash().Hex()) - if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) { - watcher.Trigger(msg) - } - } - } -} - -func (f *Filter) expectsAsymmetricEncryption() bool { - return f.KeyAsym != nil -} - -func (f *Filter) expectsSymmetricEncryption() bool { - return f.KeySym != nil -} - -// Trigger adds a yet-unknown message to the filter's list of -// received messages. -func (f *Filter) Trigger(msg *ReceivedMessage) { - f.mutex.Lock() - defer f.mutex.Unlock() - - if _, exist := f.Messages[msg.EnvelopeHash]; !exist { - f.Messages[msg.EnvelopeHash] = msg - } -} - -// Retrieve will return the list of all received messages associated -// to a filter. -func (f *Filter) Retrieve() (all []*ReceivedMessage) { - f.mutex.Lock() - defer f.mutex.Unlock() - - all = make([]*ReceivedMessage, 0, len(f.Messages)) - for _, msg := range f.Messages { - all = append(all, msg) - } - - f.Messages = make(map[common.Hash]*ReceivedMessage) // delete old messages - return all -} - -// MatchMessage checks if the filter matches an already decrypted -// message (i.e. a Message that has already been handled by -// MatchEnvelope when checked by a previous filter). -// Topics are not checked here, since this is done by topic matchers. -func (f *Filter) MatchMessage(msg *ReceivedMessage) bool { - if f.PoW > 0 && msg.PoW < f.PoW { - return false - } - - if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() { - return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst) - } else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() { - return f.SymKeyHash == msg.SymKeyHash - } - return false -} - -// MatchEnvelope checks if it's worth decrypting the message. If -// it returns `true`, client code is expected to attempt decrypting -// the message and subsequently call MatchMessage. -// Topics are not checked here, since this is done by topic matchers. -func (f *Filter) MatchEnvelope(envelope *Envelope) bool { - return f.PoW <= 0 || envelope.pow >= f.PoW -} - -// IsPubKeyEqual checks that two public keys are equal -func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool { - if !ValidatePublicKey(a) { - return false - } else if !ValidatePublicKey(b) { - return false - } - // the curve is always the same, just compare the points - return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0 -} diff --git a/vendor/github.com/status-im/whisper/whisperv6/doc.go b/vendor/github.com/status-im/whisper/whisperv6/doc.go index 97733ed19..7ff7b1584 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/doc.go +++ b/vendor/github.com/status-im/whisper/whisperv6/doc.go @@ -122,8 +122,12 @@ type SyncMailRequest struct { // Validate checks request's fields if they are valid. func (r SyncMailRequest) Validate() error { + if r.Limit == 0 { + return errors.New("invalid 'Limit' value, expected value greater than 0") + } + if r.Limit > MaxLimitInSyncMailRequest { - return fmt.Errorf("invalid 'Limit' value, expected lower than %d", MaxLimitInSyncMailRequest) + return fmt.Errorf("invalid 'Limit' value, expected value lower than %d", MaxLimitInSyncMailRequest) } if r.Lower > r.Upper { diff --git a/vendor/github.com/status-im/whisper/whisperv6/whisper.go b/vendor/github.com/status-im/whisper/whisperv6/whisper.go index 5edc3c8eb..8d1e077c9 100644 --- a/vendor/github.com/status-im/whisper/whisperv6/whisper.go +++ b/vendor/github.com/status-im/whisper/whisperv6/whisper.go @@ -1058,7 +1058,15 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } if err := whisper.mailServer.SyncMail(p, request); err != nil { - log.Error("failed to sync envelopes", "peer", p.peer.ID().String()) + log.Error( + "failed to sync envelopes", + "peer", p.peer.ID().String(), + ) + _ = whisper.SendSyncResponse( + p, + SyncResponse{Error: err.Error()}, + ) + return err } } else { log.Debug("requested to sync messages but mail servers is not registered", "peer", p.peer.ID().String()) diff --git a/vendor/modules.txt b/vendor/modules.txt index 4e7c27fd2..0c2ee836e 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -55,6 +55,7 @@ github.com/ethereum/go-ethereum/rpc github.com/ethereum/go-ethereum/ethapi github.com/ethereum/go-ethereum/eth/filters github.com/ethereum/go-ethereum/crypto/ecies +github.com/ethereum/go-ethereum/consensus/ethash github.com/ethereum/go-ethereum/common/math github.com/ethereum/go-ethereum/crypto/secp256k1 github.com/ethereum/go-ethereum/accounts/usbwallet @@ -72,7 +73,6 @@ github.com/ethereum/go-ethereum/core/rawdb github.com/ethereum/go-ethereum/core/state github.com/ethereum/go-ethereum/core/vm github.com/ethereum/go-ethereum/consensus/clique -github.com/ethereum/go-ethereum/consensus/ethash github.com/ethereum/go-ethereum/core/bloombits github.com/ethereum/go-ethereum/eth/fetcher github.com/ethereum/go-ethereum/eth/gasprice @@ -315,7 +315,7 @@ github.com/status-im/migrate/v4/database/sqlcipher github.com/status-im/rendezvous github.com/status-im/rendezvous/protocol github.com/status-im/rendezvous/server -# github.com/status-im/whisper v1.4.13 +# github.com/status-im/whisper v1.4.14 github.com/status-im/whisper/whisperv6 # github.com/stretchr/testify v1.3.0 github.com/stretchr/testify/require