From aa28f652e37c23b4b2c9350e1cca3a5b58905417 Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 1 Mar 2019 15:36:21 +0200 Subject: [PATCH] Re-send user payload if previously sent envelope wasn't acknowledged (#1386) * Split shhext.tracker into envelopes and mail monitors * Send envelopes on every new attempt to deliver a message * Re-send user payloads if previous envelopes weren't acknowledged * Remove debug api across the codebase --- params/config.go | 6 +- services/shhext/api.go | 13 +- services/shhext/debug.go | 74 --------- services/shhext/envelopes.go | 126 ++++++++++++--- services/shhext/envelopes_test.go | 40 ++--- services/shhext/service.go | 21 +-- services/shhext/service_test.go | 244 ++++++++++++++++-------------- t/e2e/services/base_api_test.go | 1 - t/e2e/services/debug_api_test.go | 154 ------------------- 9 files changed, 264 insertions(+), 415 deletions(-) delete mode 100644 services/shhext/debug.go delete mode 100644 t/e2e/services/debug_api_test.go diff --git a/params/config.go b/params/config.go index 112ad033e..61d0c04cc 100644 --- a/params/config.go +++ b/params/config.go @@ -307,8 +307,7 @@ type ShhextConfig struct { // BackupDisabledDataDir is the file system folder the node should use for any data storage needs that it doesn't want backed up. BackupDisabledDataDir string // InstallationId id of the current installation - InstallationID string - DebugAPIEnabled bool + InstallationID string // MailServerConfirmations should be true if client wants to receive confirmatons only from a selected mail servers. MailServerConfirmations bool // EnableConnectionManager turns on management of the mail server connections if true. @@ -322,6 +321,9 @@ type ShhextConfig struct { // MaxServerFailures defines maximum allowed expired requests before server will be swapped to another one. MaxServerFailures int + + // MaxMessageDeliveryAttempts defines how many times we will try to deliver not-acknowledged envelopes. + MaxMessageDeliveryAttempts int } // Validate validates the ShhextConfig struct and returns an error if inconsistent values are found diff --git a/services/shhext/api.go b/services/shhext/api.go index fc2643075..381995656 100644 --- a/services/shhext/api.go +++ b/services/shhext/api.go @@ -164,14 +164,15 @@ func NewPublicAPI(s *Service) *PublicAPI { } // Post shamelessly copied from whisper codebase with slight modifications. -func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash hexutil.Bytes, err error) { - hash, err = api.publicAPI.Post(ctx, req) +func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hexutil.Bytes, error) { + hexID, err := api.publicAPI.Post(ctx, req) if err == nil { - var envHash common.Hash - copy(envHash[:], hash[:]) // slice can't be used as key - api.service.envelopesMonitor.Add(envHash) + api.service.envelopesMonitor.Add(common.BytesToHash(hexID), req) + } else { + return nil, err } - return hash, err + mID := messageID(req) + return mID[:], err } func (api *PublicAPI) getPeer(rawurl string) (*enode.Node, error) { diff --git a/services/shhext/debug.go b/services/shhext/debug.go deleted file mode 100644 index 111ec8ad9..000000000 --- a/services/shhext/debug.go +++ /dev/null @@ -1,74 +0,0 @@ -package shhext - -import ( - "context" - "errors" - "fmt" - "time" - - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/status-im/status-go/services" - whisper "github.com/status-im/whisper/whisperv6" -) - -var ( - postSyncTimeout = 60 * time.Second - errEnvelopeExpired = errors.New("envelope expired before being sent") - errNoShhextAttachedAPI = errors.New("No shhext attached") -) - -// DebugAPI represents a set of APIs from the `web3.debug` namespace. -type DebugAPI struct { - s *Service -} - -// NewDebugAPI creates an instance of the debug API. -func NewDebugAPI(s *Service) *DebugAPI { - return &DebugAPI{s: s} -} - -// PostSync sends an envelope through shhext_post and waits until it's sent. -func (api *DebugAPI) PostSync(ctx context.Context, req whisper.NewMessage) (hash hexutil.Bytes, err error) { - shhAPI := services.APIByNamespace(api.s.APIs(), "shhext") - if shhAPI == nil { - err = errNoShhextAttachedAPI - return - } - s, ok := shhAPI.(*PublicAPI) - if !ok { - err = errNoShhextAttachedAPI - return - } - hash, err = s.Post(ctx, req) - if err != nil { - return - } - ctxTimeout, cancel := context.WithTimeout(ctx, postSyncTimeout) - defer cancel() - err = api.waitForHash(ctxTimeout, hash) - return -} - -// waitForHash waits for a specific hash to be sent -func (api *DebugAPI) waitForHash(ctx context.Context, hash hexutil.Bytes) error { - h := common.BytesToHash(hash) - events := make(chan whisper.EnvelopeEvent, 100) - sub := api.s.w.SubscribeEnvelopeEvents(events) - defer sub.Unsubscribe() - for { - select { - case ev := <-events: - if ev.Hash == h { - if ev.Event == whisper.EventEnvelopeSent { - return nil - } - if ev.Event == whisper.EventEnvelopeExpired { - return errEnvelopeExpired - } - } - case <-ctx.Done(): - return fmt.Errorf("wait for hash canceled: %v", ctx.Err()) - } - } -} diff --git a/services/shhext/envelopes.go b/services/shhext/envelopes.go index 872dbc140..83e918023 100644 --- a/services/shhext/envelopes.go +++ b/services/shhext/envelopes.go @@ -1,6 +1,8 @@ package shhext import ( + "context" + "hash/fnv" "sync" "github.com/ethereum/go-ethereum/common" @@ -10,15 +12,51 @@ import ( whisper "github.com/status-im/whisper/whisperv6" ) +func messageID(message whisper.NewMessage) common.Hash { + hash := fnv.New32() + _, _ = hash.Write(message.Payload) + _, _ = hash.Write(message.Topic[:]) + return common.BytesToHash(hash.Sum(nil)) +} + +// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor. +func NewEnvelopesMonitor(w *whisper.Whisper, handler EnvelopeEventsHandler, mailServerConfirmation bool, mailPeers *mailservers.PeerStore, maxAttempts int) *EnvelopesMonitor { + return &EnvelopesMonitor{ + w: w, + whisperAPI: whisper.NewPublicWhisperAPI(w), + handler: handler, + mailServerConfirmation: mailServerConfirmation, + mailPeers: mailPeers, + maxAttempts: maxAttempts, + + // key is envelope hash (event.Hash) + envelopes: map[common.Hash]EnvelopeState{}, + messages: map[common.Hash]whisper.NewMessage{}, + attempts: map[common.Hash]int{}, + + // key is messageID + messageToEnvelope: map[common.Hash]common.Hash{}, + + // key is hash of the batch (event.Batch) + batches: map[common.Hash]map[common.Hash]struct{}{}, + } +} + // EnvelopesMonitor is responsible for monitoring whisper envelopes state. type EnvelopesMonitor struct { w *whisper.Whisper + whisperAPI *whisper.PublicWhisperAPI handler EnvelopeEventsHandler mailServerConfirmation bool + maxAttempts int - mu sync.Mutex - cache map[common.Hash]EnvelopeState - batches map[common.Hash]map[common.Hash]struct{} + mu sync.Mutex + envelopes map[common.Hash]EnvelopeState + batches map[common.Hash]map[common.Hash]struct{} + + messageToEnvelope map[common.Hash]common.Hash + messages map[common.Hash]whisper.NewMessage + attempts map[common.Hash]int mailPeers *mailservers.PeerStore @@ -43,16 +81,33 @@ func (m *EnvelopesMonitor) Stop() { } // Add hash to a tracker. -func (m *EnvelopesMonitor) Add(hash common.Hash) { +func (m *EnvelopesMonitor) Add(envelopeHash common.Hash, message whisper.NewMessage) { m.mu.Lock() defer m.mu.Unlock() - m.cache[hash] = EnvelopePosted + m.envelopes[envelopeHash] = EnvelopePosted + m.messages[envelopeHash] = message + m.attempts[envelopeHash] = 1 + m.messageToEnvelope[messageID(message)] = envelopeHash } func (m *EnvelopesMonitor) GetState(hash common.Hash) EnvelopeState { m.mu.Lock() defer m.mu.Unlock() - state, exist := m.cache[hash] + state, exist := m.envelopes[hash] + if !exist { + return NotRegistered + } + return state +} + +func (m *EnvelopesMonitor) GetMessageState(mID common.Hash) EnvelopeState { + m.mu.Lock() + defer m.mu.Unlock() + envelope, exist := m.messageToEnvelope[mID] + if !exist { + return NotRegistered + } + state, exist := m.envelopes[envelope] if !exist { return NotRegistered } @@ -83,7 +138,6 @@ func (m *EnvelopesMonitor) handleEvent(event whisper.EnvelopeEvent) { whisper.EventBatchAcknowledged: m.handleAcknowledgedBatch, whisper.EventEnvelopeReceived: m.handleEventEnvelopeReceived, } - if handler, ok := handlers[event.Event]; ok { handler(event) } @@ -99,7 +153,7 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event whisper.EnvelopeEvent) m.mu.Lock() defer m.mu.Unlock() - state, ok := m.cache[event.Hash] + state, ok := m.envelopes[event.Hash] // if we didn't send a message using extension - skip it // if message was already confirmed - skip it if !ok || state == EnvelopeSent { @@ -113,9 +167,9 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event whisper.EnvelopeEvent) m.batches[event.Batch][event.Hash] = struct{}{} log.Debug("waiting for a confirmation", "batch", event.Batch) } else { - m.cache[event.Hash] = EnvelopeSent + m.envelopes[event.Hash] = EnvelopeSent if m.handler != nil { - m.handler.EnvelopeSent(event.Hash) + m.handler.EnvelopeSent(messageID(m.messages[event.Hash])) } } } @@ -140,13 +194,13 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent) } log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer) for hash := range envelopes { - state, ok := m.cache[hash] + state, ok := m.envelopes[hash] if !ok || state == EnvelopeSent { continue } - m.cache[hash] = EnvelopeSent + m.envelopes[hash] = EnvelopeSent if m.handler != nil { - m.handler.EnvelopeSent(hash) + m.handler.EnvelopeSent(messageID(m.messages[hash])) } } delete(m.batches, event.Batch) @@ -155,15 +209,33 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent) func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) { m.mu.Lock() defer m.mu.Unlock() - - if state, ok := m.cache[event.Hash]; ok { - delete(m.cache, event.Hash) + if state, ok := m.envelopes[event.Hash]; ok { + message, exist := m.messages[event.Hash] + if !exist { + log.Error("message was deleted erroneously", "envelope hash", event.Hash) + } + mID := messageID(message) + attempt := m.attempts[event.Hash] + m.clearMessageState(event.Hash) if state == EnvelopeSent { return } - log.Debug("envelope expired", "hash", event.Hash, "state", state) - if m.handler != nil { - m.handler.EnvelopeExpired(event.Hash) + if attempt < m.maxAttempts { + log.Debug("retrying to send a message", "message id", mID, "attempt", attempt+1) + hex, err := m.whisperAPI.Post(context.TODO(), message) + if err != nil { + log.Error("failed to retry sending message", "message id", mID, "attempt", attempt+1) + } + envelopeID := common.BytesToHash(hex) + m.messageToEnvelope[mID] = envelopeID + m.envelopes[envelopeID] = EnvelopePosted + m.messages[envelopeID] = message + m.attempts[envelopeID] = attempt + 1 + } else { + log.Debug("envelope expired", "hash", event.Hash, "state", state) + if m.handler != nil { + m.handler.EnvelopeExpired(mID) + } } } } @@ -176,13 +248,23 @@ func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event whisper.EnvelopeEve } m.mu.Lock() defer m.mu.Unlock() - state, ok := m.cache[event.Hash] + state, ok := m.envelopes[event.Hash] if !ok || state != EnvelopePosted { return } log.Debug("expected envelope received", "hash", event.Hash, "peer", event.Peer) - delete(m.cache, event.Hash) + m.envelopes[event.Hash] = EnvelopeSent if m.handler != nil { - m.handler.EnvelopeSent(event.Hash) + m.handler.EnvelopeSent(messageID(m.messages[event.Hash])) } } + +// clearMessageState removes all message and envelope state. +// not thread-safe, should be protected on a higher level. +func (m *EnvelopesMonitor) clearMessageState(envelopeID common.Hash) { + delete(m.envelopes, envelopeID) + mID := messageID(m.messages[envelopeID]) + delete(m.messageToEnvelope, mID) + delete(m.messages, envelopeID) + delete(m.attempts, envelopeID) +} diff --git a/services/shhext/envelopes_test.go b/services/shhext/envelopes_test.go index c85371538..d3938b472 100644 --- a/services/shhext/envelopes_test.go +++ b/services/shhext/envelopes_test.go @@ -21,22 +21,22 @@ func (s *EnvelopesMonitorSuite) SetupTest() { db, err := leveldb.Open(storage.NewMemStorage(), nil) s.Require().NoError(err) s.monitor = &EnvelopesMonitor{ - cache: map[common.Hash]EnvelopeState{}, + envelopes: map[common.Hash]EnvelopeState{}, batches: map[common.Hash]map[common.Hash]struct{}{}, mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)), } } func (s *EnvelopesMonitorSuite) TestConfirmed() { - s.monitor.Add(testHash) - s.Contains(s.monitor.cache, testHash) - s.Equal(EnvelopePosted, s.monitor.cache[testHash]) + s.monitor.Add(testHash, whisper.NewMessage{}) + s.Contains(s.monitor.envelopes, testHash) + s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) s.monitor.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventEnvelopeSent, Hash: testHash, }) - s.Contains(s.monitor.cache, testHash) - s.Equal(EnvelopeSent, s.monitor.cache[testHash]) + s.Contains(s.monitor.envelopes, testHash) + s.Equal(EnvelopeSent, s.monitor.envelopes[testHash]) } func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() { @@ -45,22 +45,22 @@ func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() { s.Require().NoError(err) node := enode.NewV4(&pkey.PublicKey, nil, 0, 0) s.Require().NoError(s.monitor.mailPeers.Update([]*enode.Node{node})) - s.monitor.Add(testHash) - s.Contains(s.monitor.cache, testHash) - s.Equal(EnvelopePosted, s.monitor.cache[testHash]) + s.monitor.Add(testHash, whisper.NewMessage{}) + s.Contains(s.monitor.envelopes, testHash) + s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) s.monitor.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventEnvelopeSent, Hash: testHash, Batch: testBatch, }) - s.Equal(EnvelopePosted, s.monitor.cache[testHash]) + s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) s.monitor.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventBatchAcknowledged, Batch: testBatch, Peer: node.ID(), }) - s.Contains(s.monitor.cache, testHash) - s.Equal(EnvelopeSent, s.monitor.cache[testHash]) + s.Contains(s.monitor.envelopes, testHash) + s.Equal(EnvelopeSent, s.monitor.envelopes[testHash]) } func (s *EnvelopesMonitorSuite) TestIgnored() { @@ -68,23 +68,23 @@ func (s *EnvelopesMonitorSuite) TestIgnored() { Event: whisper.EventEnvelopeSent, Hash: testHash, }) - s.NotContains(s.monitor.cache, testHash) + s.NotContains(s.monitor.envelopes, testHash) } func (s *EnvelopesMonitorSuite) TestRemoved() { - s.monitor.Add(testHash) - s.Contains(s.monitor.cache, testHash) + s.monitor.Add(testHash, whisper.NewMessage{}) + s.Contains(s.monitor.envelopes, testHash) s.monitor.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventEnvelopeExpired, Hash: testHash, }) - s.NotContains(s.monitor.cache, testHash) + s.NotContains(s.monitor.envelopes, testHash) } func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() { // enables filter in the tracker to drop confirmations from non-mailserver peers s.monitor.mailServerConfirmation = true - s.monitor.Add(testHash) + s.monitor.Add(testHash, whisper.NewMessage{}) s.monitor.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventEnvelopeSent, Hash: testHash, @@ -94,10 +94,10 @@ func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() { } func (s *EnvelopesMonitorSuite) TestReceived() { - s.monitor.Add(testHash) - s.Contains(s.monitor.cache, testHash) + s.monitor.Add(testHash, whisper.NewMessage{}) + s.Contains(s.monitor.envelopes, testHash) s.monitor.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventEnvelopeReceived, Hash: testHash}) - s.NotContains(s.monitor.cache, testHash) + s.NotContains(s.monitor.envelopes, testHash) } diff --git a/services/shhext/service.go b/services/shhext/service.go index c6c3b78f1..ccd80877f 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -50,7 +50,6 @@ type Service struct { nodeID *ecdsa.PrivateKey deduplicator *dedup.Deduplicator protocol *chat.ProtocolService - debug bool dataDir string installationID string pfsEnabled bool @@ -79,14 +78,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf cache: map[common.Hash]EnvelopeState{}, requestsRegistry: requestsRegistry, } - envelopesMonitor := &EnvelopesMonitor{ - w: w, - handler: handler, - cache: map[common.Hash]EnvelopeState{}, - batches: map[common.Hash]map[common.Hash]struct{}{}, - mailPeers: ps, - mailServerConfirmation: config.MailServerConfirmations, - } + envelopesMonitor := NewEnvelopesMonitor(w, handler, config.MailServerConfirmations, ps, config.MaxMessageDeliveryAttempts) return &Service{ w: w, config: config, @@ -94,7 +86,6 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf mailMonitor: mailMonitor, requestsRegistry: requestsRegistry, deduplicator: dedup.NewDeduplicator(w, db), - debug: config.DebugAPIEnabled, dataDir: config.BackupDisabledDataDir, installationID: config.InstallationID, pfsEnabled: config.PFSEnabled, @@ -250,16 +241,6 @@ func (s *Service) APIs() []rpc.API { Public: true, }, } - - if s.debug { - apis = append(apis, rpc.API{ - Namespace: "debug", - Version: "1.0", - Service: NewDebugAPI(s), - Public: true, - }) - } - return apis } diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index 3ead7260c..6b16de3d3 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -2,12 +2,12 @@ package shhext import ( "context" - "errors" "fmt" "io/ioutil" "math" "net" "os" + "sync/atomic" "testing" "time" @@ -30,6 +30,7 @@ const ( // internal whisper protocol codes statusCode = 0 messagesCode = 1 + batchAcknowledgeCode = 11 p2pRequestCompleteCode = 125 ) @@ -112,7 +113,6 @@ func (s *ShhExtSuite) SetupTest() { config := params.ShhextConfig{ InstallationID: "1", BackupDisabledDataDir: directory, - DebugAPIEnabled: true, PFSEnabled: true, MailServerConfirmations: true, ConnectionTarget: 10, @@ -149,17 +149,19 @@ func (s *ShhExtSuite) TestPostMessageWithConfirmation() { client, err := s.nodes[0].Attach() s.NoError(err) var hash common.Hash - s.NoError(client.Call(&hash, "shhext_post", whisper.NewMessage{ + message := whisper.NewMessage{ SymKeyID: symID, PowTarget: whisper.DefaultMinimumPoW, PowTime: 200, Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, Payload: []byte("hello"), - })) + } + mid := messageID(message) + s.NoError(client.Call(&hash, "shhext_post", message)) s.NoError(err) select { case confirmed := <-mock.confirmations: - s.Equal(hash, confirmed) + s.Equal(mid, confirmed) case <-time.After(5 * time.Second): s.Fail("timed out while waiting for confirmation") } @@ -173,18 +175,20 @@ func (s *ShhExtSuite) TestWaitMessageExpired() { client, err := s.nodes[0].Attach() s.NoError(err) var hash common.Hash - s.NoError(client.Call(&hash, "shhext_post", whisper.NewMessage{ + message := whisper.NewMessage{ SymKeyID: symID, PowTarget: whisper.DefaultMinimumPoW, PowTime: 200, TTL: 1, Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, Payload: []byte("hello"), - })) + } + mid := messageID(message) + s.NoError(client.Call(&hash, "shhext_post", message)) s.NoError(err) select { case expired := <-mock.expirations: - s.Equal(hash, expired) + s.Equal(mid, expired) case confirmed := <-mock.confirmations: s.Fail("unexpected confirmation for hash", confirmed) case <-time.After(10 * time.Second): @@ -374,98 +378,6 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() { s.Require().NoError(waitForHashInMonitor(api.service.mailMonitor, common.BytesToHash(hash), MailServerRequestSent, time.Second)) } -func (s *ShhExtSuite) TestDebugPostSync() { - mock := newHandlerMock(1) - s.services[0].envelopesMonitor.handler = mock - symID, err := s.whisper[0].GenerateSymKey() - s.NoError(err) - s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) - client, err := s.nodes[0].Attach() - s.NoError(err) - var hash common.Hash - - var testCases = []struct { - name string - msg whisper.NewMessage - postSyncTimeout time.Duration - expectedErr error - }{ - { - name: "timeout", - msg: whisper.NewMessage{ - SymKeyID: symID, - PowTarget: whisper.DefaultMinimumPoW, - PowTime: 200, - Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, - Payload: []byte("hello"), - }, - postSyncTimeout: postSyncTimeout, - expectedErr: nil, - }, - { - name: "invalid message", - msg: whisper.NewMessage{ - PowTarget: whisper.DefaultMinimumPoW, - PowTime: 200, - Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, - Payload: []byte("hello"), - }, - postSyncTimeout: postSyncTimeout, - expectedErr: whisper.ErrSymAsym, - }, - { - name: "context deadline exceeded", - msg: whisper.NewMessage{ - SymKeyID: symID, - PowTarget: whisper.DefaultMinimumPoW, - PowTime: 10, - Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, - TTL: 100, - Payload: []byte("hello"), - }, - postSyncTimeout: 1 * time.Millisecond, - expectedErr: errors.New("context deadline exceeded"), - }, - } - - for _, tc := range testCases { - s.T().Run(tc.name, func(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), tc.postSyncTimeout) - defer cancel() - err := client.CallContext(ctx, &hash, "debug_postSync", tc.msg) - - if tc.expectedErr != nil { - s.Equal(tc.expectedErr.Error(), err.Error()) - } else { - s.NoError(err) - } - }) - } -} - -func (s *ShhExtSuite) TestEnvelopeExpiredOnDebugPostSync() { - mock := newHandlerMock(1) - s.services[0].envelopesMonitor.handler = mock - symID, err := s.whisper[0].GenerateSymKey() - s.NoError(err) - client, err := s.nodes[0].Attach() - s.NoError(err) - var hash common.Hash - - ctx, cancel := context.WithTimeout(context.Background(), postSyncTimeout) - defer cancel() - err = client.CallContext(ctx, &hash, "debug_postSync", whisper.NewMessage{ - SymKeyID: symID, - PowTarget: whisper.DefaultMinimumPoW, - PowTime: 200, - Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, - Payload: []byte("hello"), - TTL: 1, - }) - - s.Equal(errEnvelopeExpired.Error(), err.Error()) -} - func (s *ShhExtSuite) TearDown() { for _, n := range s.nodes { s.NoError(n.Stop()) @@ -495,8 +407,8 @@ type WhisperNodeMockSuite struct { localNode *enode.Node remoteRW *p2p.MsgPipeRW - localService *Service - localEnvelopeMonitor *EnvelopesMonitor + localService *Service + localEnvelopesMonitor *EnvelopesMonitor } func (s *WhisperNodeMockSuite) SetupTest() { @@ -507,6 +419,7 @@ func (s *WhisperNodeMockSuite) SetupTest() { MaxMessageSize: 100 << 10, } w := whisper.New(conf) + s.Require().NoError(w.Start(nil)) pkey, err := crypto.GenerateKey() s.Require().NoError(err) node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1) @@ -520,10 +433,11 @@ func (s *WhisperNodeMockSuite) SetupTest() { s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true})) s.Require().NoError(p2p.SendItems(rw1, statusCode, whisper.ProtocolVersion, whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true)) - s.localService = New(w, nil, db, params.ShhextConfig{MailServerConfirmations: true}) + s.localService = New(w, nil, db, params.ShhextConfig{MailServerConfirmations: true, MaxMessageDeliveryAttempts: 3}) s.Require().NoError(s.localService.UpdateMailservers([]*enode.Node{node})) - s.localEnvelopeMonitor = s.localService.envelopesMonitor - s.localEnvelopeMonitor.Start() + + s.localEnvelopesMonitor = s.localService.envelopesMonitor + s.localEnvelopesMonitor.Start() s.localWhisperAPI = whisper.NewPublicWhisperAPI(w) s.localAPI = NewPublicAPI(s.localService) @@ -531,6 +445,19 @@ func (s *WhisperNodeMockSuite) SetupTest() { s.remoteRW = rw1 } +func (s *WhisperNodeMockSuite) PostMessage(message whisper.NewMessage) common.Hash { + envBytes, err := s.localAPI.Post(context.TODO(), message) + envHash := common.BytesToHash(envBytes) + s.Require().NoError(err) + s.Require().NoError(utils.Eventually(func() error { + if state := s.localEnvelopesMonitor.GetMessageState(envHash); state != EnvelopePosted { + return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String()) + } + return nil + }, 2*time.Second, 100*time.Millisecond)) + return envHash +} + func TestRequestMessagesSync(t *testing.T) { suite.Run(t, new(RequestMessagesSyncSuite)) } @@ -597,19 +524,11 @@ type WhisperConfirmationSuite struct { func (s *WhisperConfirmationSuite) TestEnvelopeReceived() { symID, err := s.localWhisperAPI.GenerateSymKeyFromPassword(context.TODO(), "test") s.Require().NoError(err) - envBytes, err := s.localAPI.Post(context.TODO(), whisper.NewMessage{ + envHash := s.PostMessage(whisper.NewMessage{ SymKeyID: symID, TTL: 1000, Topic: whisper.TopicType{0x01}, }) - envHash := common.BytesToHash(envBytes) - s.Require().NoError(err) - s.Require().NoError(utils.Eventually(func() error { - if state := s.localEnvelopeMonitor.GetState(envHash); state != EnvelopePosted { - return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String()) - } - return nil - }, 2*time.Second, 100*time.Millisecond)) // enable auto-replies once message got registered internally go func() { @@ -629,9 +548,102 @@ func (s *WhisperConfirmationSuite) TestEnvelopeReceived() { // wait for message to be removed because it was delivered by remoteRW s.Require().NoError(utils.Eventually(func() error { - if state := s.localEnvelopeMonitor.GetState(envHash); state != NotRegistered { - return fmt.Errorf("envelope with hash %s wasn't removed from tracker", envHash.String()) + if state := s.localEnvelopesMonitor.GetMessageState(envHash); state == EnvelopePosted { + return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String()) } return nil }, 2*time.Second, 100*time.Millisecond)) } + +func TestWhisperRetriesSuite(t *testing.T) { + suite.Run(t, new(WhisperRetriesSuite)) +} + +type WhisperRetriesSuite struct { + WhisperNodeMockSuite +} + +func (s *WhisperRetriesSuite) TestUseAllAvaiableAttempts() { + var attempts int32 + go func() { + for { + msg, err := s.remoteRW.ReadMsg() + s.Require().NoError(err) + s.Require().NoError(msg.Discard()) + if msg.Code != messagesCode { + continue + } + atomic.AddInt32(&attempts, 1) + } + }() + symID, err := s.localWhisperAPI.GenerateSymKeyFromPassword(context.TODO(), "test") + s.Require().NoError(err) + message := whisper.NewMessage{ + SymKeyID: symID, + PowTarget: whisper.DefaultMinimumPoW, + PowTime: 200, + TTL: 1, + Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, + Payload: []byte("hello"), + } + s.Require().NotNil(s.PostMessage(message)) + s.Require().NoError(utils.Eventually(func() error { + madeAttempts := atomic.LoadInt32(&attempts) + if madeAttempts != int32(s.localEnvelopesMonitor.maxAttempts) { + return fmt.Errorf("made unexpected number of attempts to deliver a message: %d != %d", s.localEnvelopesMonitor.maxAttempts, madeAttempts) + } + return nil + }, 10*time.Second, 500*time.Millisecond)) +} + +func (s *WhisperRetriesSuite) testDelivery(target int) { + go func() { + attempt := 0 + for { + msg, err := s.remoteRW.ReadMsg() + s.Require().NoError(err) + if msg.Code != messagesCode { + s.Require().NoError(msg.Discard()) + continue + } + attempt++ + if attempt != target { + s.Require().NoError(msg.Discard()) + continue + } + data, err := ioutil.ReadAll(msg.Payload) + s.Require().NoError(err) + // without this hack event from the whisper read loop will be sent sooner than event from write loop + // i don't think that this is realistic situation and can be reproduced only in test with in-memory + // connection mock + time.Sleep(time.Nanosecond) + s.Require().NoError(p2p.Send(s.remoteRW, batchAcknowledgeCode, crypto.Keccak256Hash(data))) + } + }() + symID, err := s.localWhisperAPI.GenerateSymKeyFromPassword(context.TODO(), "test") + s.Require().NoError(err) + message := whisper.NewMessage{ + SymKeyID: symID, + PowTarget: whisper.DefaultMinimumPoW, + PowTime: 200, + TTL: 1, + Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, + Payload: []byte("hello"), + } + mID := messageID(message) + s.Require().NotNil(s.PostMessage(message)) + s.Require().NoError(utils.Eventually(func() error { + if state := s.localEnvelopesMonitor.GetMessageState(mID); state != EnvelopeSent { + return fmt.Errorf("message with ID %s wasn't sent", mID.String()) + } + return nil + }, 3*time.Second, 100*time.Millisecond)) +} + +func (s *WhisperRetriesSuite) TestDeliveredFromFirstAttempt() { + s.testDelivery(1) +} + +func (s *WhisperRetriesSuite) TestDeliveredFromSecondAttempt() { + s.testDelivery(2) +} diff --git a/t/e2e/services/base_api_test.go b/t/e2e/services/base_api_test.go index 89b11d6fc..bbe115734 100644 --- a/t/e2e/services/base_api_test.go +++ b/t/e2e/services/base_api_test.go @@ -76,7 +76,6 @@ func (s *BaseJSONRPCSuite) SetupTest(upstreamEnabled, statusServiceEnabled, debu nodeConfig.IPCEnabled = false nodeConfig.EnableStatusService = statusServiceEnabled - nodeConfig.ShhextConfig.DebugAPIEnabled = debugAPIEnabled if debugAPIEnabled { nodeConfig.AddAPIModule("debug") } diff --git a/t/e2e/services/debug_api_test.go b/t/e2e/services/debug_api_test.go deleted file mode 100644 index d7b637713..000000000 --- a/t/e2e/services/debug_api_test.go +++ /dev/null @@ -1,154 +0,0 @@ -package services - -import ( - "encoding/json" - "fmt" - "io/ioutil" - "os" - "testing" - "time" - - "github.com/ethereum/go-ethereum/common/hexutil" - "github.com/ethereum/go-ethereum/p2p" - "github.com/status-im/status-go/node" - "github.com/status-im/status-go/params" - whisper "github.com/status-im/whisper/whisperv6" - "github.com/stretchr/testify/require" - "github.com/stretchr/testify/suite" - - "github.com/status-im/status-go/t/helpers" - . "github.com/status-im/status-go/t/utils" -) - -func TestDebugAPISuite(t *testing.T) { - s := new(DebugAPISuite) - s.upstream = false - suite.Run(t, s) -} - -func TestDebugAPISuiteUpstream(t *testing.T) { - s := new(DebugAPISuite) - s.upstream = true - suite.Run(t, s) -} - -type DebugAPISuite struct { - BaseJSONRPCSuite - upstream bool -} - -func (s *DebugAPISuite) TestAccessibleDebugAPIsUnexported() { - if s.upstream && GetNetworkID() == params.StatusChainNetworkID { - s.T().Skip() - return - } - - err := s.SetupTest(s.upstream, false, false) - s.NoError(err) - // Debug APIs should be unavailable - s.AssertAPIMethodUnexported("debug_postSync") - err = s.Backend.StopNode() - s.NoError(err) - - err = s.SetupTest(s.upstream, false, true) - s.NoError(err) - defer func() { - err := s.Backend.StopNode() - s.NoError(err) - }() - // Debug APIs should be available - s.AssertAPIMethodExported("debug_postSync") -} - -func (s *DebugAPISuite) TestDebugPostSyncSuccess() { - // Test upstream if that's not StatusChain - if s.upstream && GetNetworkID() == params.StatusChainNetworkID { - s.T().Skip() - return - } - - err := s.SetupTest(s.upstream, false, true) - s.NoError(err) - defer func() { - err := s.Backend.StopNode() - s.NoError(err) - }() - - dir, err := ioutil.TempDir("", "test-debug") - s.NoError(err) - defer os.RemoveAll(dir) //nolint: errcheck - s.addPeerToCurrentNode(dir) - - symID := s.generateSymKey() - result := s.sendPostConfirmMessage(symID) - - var r struct { - Error struct { - Message string `json:"message"` - } `json:"error"` - Result hexutil.Bytes `json:"result"` - } - s.NoError(json.Unmarshal([]byte(result), &r)) - s.Empty(r.Error.Message) - s.NotEmpty(r.Result) -} - -// generateSymKey generates and stores a symetric key. -func (s *DebugAPISuite) generateSymKey() string { - w, err := s.Backend.StatusNode().WhisperService() - s.Require().NoError(err) - symID, err := w.GenerateSymKey() - s.Require().NoError(err) - - return symID -} - -// sendPostConfirmMessage calls debug_postSync endpoint with valid -// parameters. -func (s *DebugAPISuite) sendPostConfirmMessage(symID string) string { - req := whisper.NewMessage{ - SymKeyID: symID, - PowTarget: whisper.DefaultMinimumPoW, - PowTime: 200, - Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01}, - Payload: []byte("hello"), - } - body, err := json.Marshal(req) - s.NoError(err) - - basicCall := fmt.Sprintf( - `{"jsonrpc":"2.0","method":"debug_postSync","params":[%s],"id":67}`, - body) - - resp, err := s.Backend.CallPrivateRPC(basicCall) - s.NoError(err) - return resp -} - -// addPeers adds a peer to the running node -func (s *DebugAPISuite) addPeerToCurrentNode(dir string) { - s.Require().NotNil(s.Backend) - node1 := s.Backend.StatusNode().GethNode() - s.NotNil(node1) - node2 := s.newPeer("test2", dir).GethNode() - s.NotNil(node2) - - errCh := helpers.WaitForPeerAsync(s.Backend.StatusNode().Server(), - node2.Server().Self().String(), - p2p.PeerEventTypeAdd, - time.Second*5) - - node1.Server().AddPeer(node2.Server().Self()) - require.NoError(s.T(), <-errCh) -} - -// newNode creates, configures and starts a new peer. -func (s *DebugAPISuite) newPeer(name, dir string) *node.StatusNode { - // network id is irrelevant - cfg, err := MakeTestNodeConfigWithDataDir(name, dir, 777) - s.Require().NoError(err) - n := node.New() - s.Require().NoError(n.Start(cfg)) - - return n -}