Sync messages in a loop until error or no cursor (#1502)

This commit is contained in:
Adam Babik 2019-06-26 18:17:41 +02:00 committed by GitHub
parent 5335a2b4fd
commit 1ab2e88bf5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 88 additions and 294 deletions

2
go.mod
View File

@ -25,7 +25,7 @@ require (
github.com/status-im/doubleratchet v2.0.0+incompatible github.com/status-im/doubleratchet v2.0.0+incompatible
github.com/status-im/migrate/v4 v4.3.1-status github.com/status-im/migrate/v4 v4.3.1-status
github.com/status-im/rendezvous v1.3.0 github.com/status-im/rendezvous v1.3.0
github.com/status-im/whisper v1.4.13 github.com/status-im/whisper v1.4.14
github.com/stretchr/testify v1.3.0 github.com/stretchr/testify v1.3.0
github.com/syndtr/goleveldb v1.0.0 github.com/syndtr/goleveldb v1.0.0
golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5 golang.org/x/crypto v0.0.0-20190530122614-20be4c3c3ed5

4
go.sum
View File

@ -438,8 +438,8 @@ github.com/status-im/migrate/v4 v4.3.1-status h1:tJwsEYLgbFkvlTSMk89APwRDfpr4yG8
github.com/status-im/migrate/v4 v4.3.1-status/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA= github.com/status-im/migrate/v4 v4.3.1-status/go.mod h1:r8HggRBZ/k7TRwByq/Hp3P/ubFppIna0nvyavVK0pjA=
github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4Lbjg4= github.com/status-im/rendezvous v1.3.0 h1:7RK/MXXW+tlm0asKm1u7Qp7Yni6AO29a7j8+E4Lbjg4=
github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s= github.com/status-im/rendezvous v1.3.0/go.mod h1:+hzjuP+j/XzLPeF6E50b88pWOTLdTcwjvNYt+Gh1W1s=
github.com/status-im/whisper v1.4.13 h1:V8aC66CkpTYFS/l9lSp5NW2xHOlpRnGZxKdJmDVLFxE= github.com/status-im/whisper v1.4.14 h1:9VHqx4+PUYfhDnYYtDxHkg/3cfVvkHjPNciY4LO83yc=
github.com/status-im/whisper v1.4.13/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0= github.com/status-im/whisper v1.4.14/go.mod h1:WS6z39YJQ8WJa9s+DmTuEM/s2nVF6Iz3B1SZYw5cYf0=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A= github.com/stretchr/objx v0.1.1 h1:2vfRuCMp5sSVIDSqO8oNnWJq7mPa6KVP3iPIwFBuy8A=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=

View File

@ -24,10 +24,12 @@ $ echo '{"jsonrpc":"2.0","method":"shh_markTrustedPeer", "params": ["enode://c42
4. Finally, trigger the sync command: 4. Finally, trigger the sync command:
``` ```
# sudo might be not needed in your setup # sudo might be not needed in your setup
$ echo '{"jsonrpc":"2.0","method":"shhext_syncMessages","params":[{"mailServerPeer":"enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.243.162:30504", "to": 1550479953, "from": 1550393583}],"id":1}' | \ $ echo '{"jsonrpc":"2.0","method":"shhext_syncMessages","params":[{"mailServerPeer":"enode://c42f368a23fa98ee546fd247220759062323249ef657d26d357a777443aec04db1b29a3a22ef3e7c548e18493ddaf51a31b0aed6079bd6ebe5ae838fcfaf3a49@206.189.243.162:30504", "to": 1550479953, "from": 1550393583, "limit": 1000}],"id":1}' | \
sudo socat -d -d - UNIX-CONNECT:/docker/statusd-mail/data/geth.ipc sudo socat -d -d - UNIX-CONNECT:/docker/statusd-mail/data/geth.ipc
``` ```
You can add `"followCursor": true` if you want it to automatically download messages until the cursor is empty meaning all data was synced.
### Debugging ### Debugging
To verify that your mail server received any responses, watch logs and seek for logs like this: To verify that your mail server received any responses, watch logs and seek for logs like this:

View File

@ -523,10 +523,12 @@ func (s *WMailServer) processRequestInBundles(
} }
// TODO(adam): this is invalid code. If the limit is 1000,
// it will only send 999 items and send a cursor.
lastEnvelopeHash = key.EnvelopeHash() lastEnvelopeHash = key.EnvelopeHash()
processedEnvelopes++ processedEnvelopes++
envelopeSize := uint32(len(rawValue)) envelopeSize := uint32(len(rawValue))
limitReached := processedEnvelopes == limit limitReached := processedEnvelopes >= limit
newSize := bundleSize + envelopeSize newSize := bundleSize + envelopeSize
// If we still have some room for messages, add and continue // If we still have some room for messages, add and continue

View File

@ -139,6 +139,9 @@ type SyncMessagesRequest struct {
// Cursor is used as starting point for paginated requests // Cursor is used as starting point for paginated requests
Cursor string `json:"cursor"` Cursor string `json:"cursor"`
// FollowCursor if true loads messages until cursor is empty.
FollowCursor bool `json:"followCursor"`
// Topics is a list of Whisper topics. // Topics is a list of Whisper topics.
// If empty, a full bloom filter will be used. // If empty, a full bloom filter will be used.
Topics []whisper.TopicType `json:"topics"` Topics []whisper.TopicType `json:"topics"`
@ -373,44 +376,36 @@ func createSyncMessagesResponse(r whisper.SyncEventResponse) SyncMessagesRespons
// SyncMessages sends a request to a given MailServerPeer to sync historic messages. // SyncMessages sends a request to a given MailServerPeer to sync historic messages.
// MailServerPeers needs to be added as a trusted peer first. // MailServerPeers needs to be added as a trusted peer first.
func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) { func (api *PublicAPI) SyncMessages(ctx context.Context, r SyncMessagesRequest) (SyncMessagesResponse, error) {
log.Info("SyncMessages start", "request", r)
var response SyncMessagesResponse var response SyncMessagesResponse
mailServerEnode, err := enode.ParseV4(r.MailServerPeer) mailServerEnode, err := enode.ParseV4(r.MailServerPeer)
if err != nil { if err != nil {
return response, fmt.Errorf("invalid MailServerPeer: %v", err) return response, fmt.Errorf("invalid MailServerPeer: %v", err)
} }
mailServerID := mailServerEnode.ID().Bytes()
request, err := createSyncMailRequest(r) request, err := createSyncMailRequest(r)
if err != nil { if err != nil {
return response, fmt.Errorf("failed to create a sync mail request: %v", err) return response, fmt.Errorf("failed to create a sync mail request: %v", err)
} }
if err := api.service.w.SyncMessages(mailServerEnode.ID().Bytes(), request); err != nil {
return response, fmt.Errorf("failed to send a sync request: %v", err)
}
// Wait for the response which is received asynchronously as a p2p packet.
// This packet handler will send an event which contains the response payload.
events := make(chan whisper.EnvelopeEvent)
sub := api.service.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for { for {
select { log.Info("Sending a request to sync messages", "request", request)
case event := <-events:
if event.Event != whisper.EventMailServerSyncFinished {
continue
}
log.Info("received EventMailServerSyncFinished event", "data", event.Data) resp, err := api.service.syncMessages(ctx, mailServerID, request)
if err != nil {
if resp, ok := event.Data.(whisper.SyncEventResponse); ok { return response, err
return createSyncMessagesResponse(resp), nil
}
return response, fmt.Errorf("did not understand the response event data")
case <-ctx.Done():
return response, ctx.Err()
} }
log.Info("Syncing messages response", "error", resp.Error, "cursor", fmt.Sprintf("%#x", resp.Cursor))
if resp.Error != "" || len(resp.Cursor) == 0 || !r.FollowCursor {
return createSyncMessagesResponse(resp), nil
}
request.Cursor = resp.Cursor
} }
} }

View File

@ -1,7 +1,9 @@
package shhext package shhext
import ( import (
"context"
"crypto/ecdsa" "crypto/ecdsa"
"fmt"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -179,3 +181,46 @@ func (s *Service) Stop() error {
return s.Service.Stop() return s.Service.Stop()
} }
func (s *Service) syncMessages(ctx context.Context, mailServerID []byte, r whisper.SyncMailRequest) (resp whisper.SyncEventResponse, err error) {
err = s.w.SyncMessages(mailServerID, r)
if err != nil {
return
}
// Wait for the response which is received asynchronously as a p2p packet.
// This packet handler will send an event which contains the response payload.
events := make(chan whisper.EnvelopeEvent, 1024)
sub := s.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
// Add explicit timeout context, otherwise the request
// can hang indefinitely if not specified by the sender.
// Sender is usually through netcat or some bash tool
// so it's not really possible to specify the timeout.
timeoutCtx, cancel := context.WithTimeout(ctx, time.Second*30)
defer cancel()
for {
select {
case event := <-events:
if event.Event != whisper.EventMailServerSyncFinished {
continue
}
log.Info("received EventMailServerSyncFinished event", "data", event.Data)
var ok bool
resp, ok = event.Data.(whisper.SyncEventResponse)
if !ok {
err = fmt.Errorf("did not understand the response event data")
return
}
return
case <-timeoutCtx.Done():
err = timeoutCtx.Err()
return
}
}
}

View File

@ -1,262 +0,0 @@
// Copyright 2016 The go-ethereum Authors
// This file is part of the go-ethereum library.
//
// The go-ethereum library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The go-ethereum library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see <http://www.gnu.org/licenses/>.
package whisperv6
import (
"crypto/ecdsa"
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
)
// Filter represents a Whisper message filter
type Filter struct {
Src *ecdsa.PublicKey // Sender of the message
KeyAsym *ecdsa.PrivateKey // Private Key of recipient
KeySym []byte // Key associated with the Topic
Topics [][]byte // Topics to filter messages with
PoW float64 // Proof of work as described in the Whisper spec
AllowP2P bool // Indicates whether this filter is interested in direct peer-to-peer messages
SymKeyHash common.Hash // The Keccak256Hash of the symmetric key, needed for optimization
id string // unique identifier
Messages map[common.Hash]*ReceivedMessage
mutex sync.RWMutex
}
// Filters represents a collection of filters
type Filters struct {
watchers map[string]*Filter
topicMatcher map[TopicType]map[*Filter]struct{} // map a topic to the filters that are interested in being notified when a message matches that topic
allTopicsMatcher map[*Filter]struct{} // list all the filters that will be notified of a new message, no matter what its topic is
whisper *Whisper
mutex sync.RWMutex
}
// NewFilters returns a newly created filter collection
func NewFilters(w *Whisper) *Filters {
return &Filters{
watchers: make(map[string]*Filter),
topicMatcher: make(map[TopicType]map[*Filter]struct{}),
allTopicsMatcher: make(map[*Filter]struct{}),
whisper: w,
}
}
// Install will add a new filter to the filter collection
func (fs *Filters) Install(watcher *Filter) (string, error) {
if watcher.KeySym != nil && watcher.KeyAsym != nil {
return "", fmt.Errorf("filters must choose between symmetric and asymmetric keys")
}
if watcher.Messages == nil {
watcher.Messages = make(map[common.Hash]*ReceivedMessage)
}
id, err := GenerateRandomID()
if err != nil {
return "", err
}
fs.mutex.Lock()
defer fs.mutex.Unlock()
if fs.watchers[id] != nil {
return "", fmt.Errorf("failed to generate unique ID")
}
if watcher.expectsSymmetricEncryption() {
watcher.SymKeyHash = crypto.Keccak256Hash(watcher.KeySym)
}
watcher.id = id
fs.watchers[id] = watcher
fs.addTopicMatcher(watcher)
return id, err
}
// Uninstall will remove a filter whose id has been specified from
// the filter collection
func (fs *Filters) Uninstall(id string) bool {
fs.mutex.Lock()
defer fs.mutex.Unlock()
if fs.watchers[id] != nil {
fs.removeFromTopicMatchers(fs.watchers[id])
delete(fs.watchers, id)
return true
}
return false
}
// addTopicMatcher adds a filter to the topic matchers.
// If the filter's Topics array is empty, it will be tried on every topic.
// Otherwise, it will be tried on the topics specified.
func (fs *Filters) addTopicMatcher(watcher *Filter) {
if len(watcher.Topics) == 0 {
fs.allTopicsMatcher[watcher] = struct{}{}
} else {
for _, t := range watcher.Topics {
topic := BytesToTopic(t)
if fs.topicMatcher[topic] == nil {
fs.topicMatcher[topic] = make(map[*Filter]struct{})
}
fs.topicMatcher[topic][watcher] = struct{}{}
}
}
}
// removeFromTopicMatchers removes a filter from the topic matchers
func (fs *Filters) removeFromTopicMatchers(watcher *Filter) {
delete(fs.allTopicsMatcher, watcher)
for _, topic := range watcher.Topics {
delete(fs.topicMatcher[BytesToTopic(topic)], watcher)
}
}
// getWatchersByTopic returns a slice containing the filters that
// match a specific topic
func (fs *Filters) getWatchersByTopic(topic TopicType) []*Filter {
res := make([]*Filter, 0, len(fs.allTopicsMatcher))
for watcher := range fs.allTopicsMatcher {
res = append(res, watcher)
}
for watcher := range fs.topicMatcher[topic] {
res = append(res, watcher)
}
return res
}
// Get returns a filter from the collection with a specific ID
func (fs *Filters) Get(id string) *Filter {
fs.mutex.RLock()
defer fs.mutex.RUnlock()
return fs.watchers[id]
}
// NotifyWatchers notifies any filter that has declared interest
// for the envelope's topic.
func (fs *Filters) NotifyWatchers(env *Envelope, p2pMessage bool) {
var msg *ReceivedMessage
fs.mutex.RLock()
defer fs.mutex.RUnlock()
candidates := fs.getWatchersByTopic(env.Topic)
for _, watcher := range candidates {
if p2pMessage && !watcher.AllowP2P {
log.Trace(fmt.Sprintf("msg [%x], filter [%s]: p2p messages are not allowed", env.Hash(), watcher.id))
continue
}
var match bool
if msg != nil {
match = watcher.MatchMessage(msg)
} else {
match = watcher.MatchEnvelope(env)
if match {
msg = env.Open(watcher)
if msg == nil {
log.Trace("processing message: failed to open", "message", env.Hash().Hex(), "filter", watcher.id)
}
} else {
log.Trace("processing message: does not match", "message", env.Hash().Hex(), "filter", watcher.id)
}
}
if match && msg != nil {
log.Trace("processing message: decrypted", "hash", env.Hash().Hex())
if watcher.Src == nil || IsPubKeyEqual(msg.Src, watcher.Src) {
watcher.Trigger(msg)
}
}
}
}
func (f *Filter) expectsAsymmetricEncryption() bool {
return f.KeyAsym != nil
}
func (f *Filter) expectsSymmetricEncryption() bool {
return f.KeySym != nil
}
// Trigger adds a yet-unknown message to the filter's list of
// received messages.
func (f *Filter) Trigger(msg *ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
if _, exist := f.Messages[msg.EnvelopeHash]; !exist {
f.Messages[msg.EnvelopeHash] = msg
}
}
// Retrieve will return the list of all received messages associated
// to a filter.
func (f *Filter) Retrieve() (all []*ReceivedMessage) {
f.mutex.Lock()
defer f.mutex.Unlock()
all = make([]*ReceivedMessage, 0, len(f.Messages))
for _, msg := range f.Messages {
all = append(all, msg)
}
f.Messages = make(map[common.Hash]*ReceivedMessage) // delete old messages
return all
}
// MatchMessage checks if the filter matches an already decrypted
// message (i.e. a Message that has already been handled by
// MatchEnvelope when checked by a previous filter).
// Topics are not checked here, since this is done by topic matchers.
func (f *Filter) MatchMessage(msg *ReceivedMessage) bool {
if f.PoW > 0 && msg.PoW < f.PoW {
return false
}
if f.expectsAsymmetricEncryption() && msg.isAsymmetricEncryption() {
return IsPubKeyEqual(&f.KeyAsym.PublicKey, msg.Dst)
} else if f.expectsSymmetricEncryption() && msg.isSymmetricEncryption() {
return f.SymKeyHash == msg.SymKeyHash
}
return false
}
// MatchEnvelope checks if it's worth decrypting the message. If
// it returns `true`, client code is expected to attempt decrypting
// the message and subsequently call MatchMessage.
// Topics are not checked here, since this is done by topic matchers.
func (f *Filter) MatchEnvelope(envelope *Envelope) bool {
return f.PoW <= 0 || envelope.pow >= f.PoW
}
// IsPubKeyEqual checks that two public keys are equal
func IsPubKeyEqual(a, b *ecdsa.PublicKey) bool {
if !ValidatePublicKey(a) {
return false
} else if !ValidatePublicKey(b) {
return false
}
// the curve is always the same, just compare the points
return a.X.Cmp(b.X) == 0 && a.Y.Cmp(b.Y) == 0
}

View File

@ -122,8 +122,12 @@ type SyncMailRequest struct {
// Validate checks request's fields if they are valid. // Validate checks request's fields if they are valid.
func (r SyncMailRequest) Validate() error { func (r SyncMailRequest) Validate() error {
if r.Limit == 0 {
return errors.New("invalid 'Limit' value, expected value greater than 0")
}
if r.Limit > MaxLimitInSyncMailRequest { if r.Limit > MaxLimitInSyncMailRequest {
return fmt.Errorf("invalid 'Limit' value, expected lower than %d", MaxLimitInSyncMailRequest) return fmt.Errorf("invalid 'Limit' value, expected value lower than %d", MaxLimitInSyncMailRequest)
} }
if r.Lower > r.Upper { if r.Lower > r.Upper {

View File

@ -1058,7 +1058,15 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
} }
if err := whisper.mailServer.SyncMail(p, request); err != nil { if err := whisper.mailServer.SyncMail(p, request); err != nil {
log.Error("failed to sync envelopes", "peer", p.peer.ID().String()) log.Error(
"failed to sync envelopes",
"peer", p.peer.ID().String(),
)
_ = whisper.SendSyncResponse(
p,
SyncResponse{Error: err.Error()},
)
return err
} }
} else { } else {
log.Debug("requested to sync messages but mail servers is not registered", "peer", p.peer.ID().String()) log.Debug("requested to sync messages but mail servers is not registered", "peer", p.peer.ID().String())

4
vendor/modules.txt vendored
View File

@ -55,6 +55,7 @@ github.com/ethereum/go-ethereum/rpc
github.com/ethereum/go-ethereum/ethapi github.com/ethereum/go-ethereum/ethapi
github.com/ethereum/go-ethereum/eth/filters github.com/ethereum/go-ethereum/eth/filters
github.com/ethereum/go-ethereum/crypto/ecies github.com/ethereum/go-ethereum/crypto/ecies
github.com/ethereum/go-ethereum/consensus/ethash
github.com/ethereum/go-ethereum/common/math github.com/ethereum/go-ethereum/common/math
github.com/ethereum/go-ethereum/crypto/secp256k1 github.com/ethereum/go-ethereum/crypto/secp256k1
github.com/ethereum/go-ethereum/accounts/usbwallet github.com/ethereum/go-ethereum/accounts/usbwallet
@ -72,7 +73,6 @@ github.com/ethereum/go-ethereum/core/rawdb
github.com/ethereum/go-ethereum/core/state github.com/ethereum/go-ethereum/core/state
github.com/ethereum/go-ethereum/core/vm github.com/ethereum/go-ethereum/core/vm
github.com/ethereum/go-ethereum/consensus/clique github.com/ethereum/go-ethereum/consensus/clique
github.com/ethereum/go-ethereum/consensus/ethash
github.com/ethereum/go-ethereum/core/bloombits github.com/ethereum/go-ethereum/core/bloombits
github.com/ethereum/go-ethereum/eth/fetcher github.com/ethereum/go-ethereum/eth/fetcher
github.com/ethereum/go-ethereum/eth/gasprice github.com/ethereum/go-ethereum/eth/gasprice
@ -315,7 +315,7 @@ github.com/status-im/migrate/v4/database/sqlcipher
github.com/status-im/rendezvous github.com/status-im/rendezvous
github.com/status-im/rendezvous/protocol github.com/status-im/rendezvous/protocol
github.com/status-im/rendezvous/server github.com/status-im/rendezvous/server
# github.com/status-im/whisper v1.4.13 # github.com/status-im/whisper v1.4.14
github.com/status-im/whisper/whisperv6 github.com/status-im/whisper/whisperv6
# github.com/stretchr/testify v1.3.0 # github.com/stretchr/testify v1.3.0
github.com/stretchr/testify/require github.com/stretchr/testify/require