Re-send user payload if previously sent envelope wasn't acknowledged (#1386)

* Split shhext.tracker into envelopes and mail monitors

* Send envelopes on every new attempt to deliver a message

* Re-send user payloads if previous envelopes weren't acknowledged

* Remove debug api across the codebase
This commit is contained in:
Dmitry Shulyak 2019-03-01 15:36:21 +02:00 committed by Igor Mandrigin
parent 38e5335e18
commit aa28f652e3
9 changed files with 264 additions and 415 deletions

View File

@ -307,8 +307,7 @@ type ShhextConfig struct {
// BackupDisabledDataDir is the file system folder the node should use for any data storage needs that it doesn't want backed up.
BackupDisabledDataDir string
// InstallationId id of the current installation
InstallationID string
DebugAPIEnabled bool
InstallationID string
// MailServerConfirmations should be true if client wants to receive confirmatons only from a selected mail servers.
MailServerConfirmations bool
// EnableConnectionManager turns on management of the mail server connections if true.
@ -322,6 +321,9 @@ type ShhextConfig struct {
// MaxServerFailures defines maximum allowed expired requests before server will be swapped to another one.
MaxServerFailures int
// MaxMessageDeliveryAttempts defines how many times we will try to deliver not-acknowledged envelopes.
MaxMessageDeliveryAttempts int
}
// Validate validates the ShhextConfig struct and returns an error if inconsistent values are found

View File

@ -164,14 +164,15 @@ func NewPublicAPI(s *Service) *PublicAPI {
}
// Post shamelessly copied from whisper codebase with slight modifications.
func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash hexutil.Bytes, err error) {
hash, err = api.publicAPI.Post(ctx, req)
func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hexutil.Bytes, error) {
hexID, err := api.publicAPI.Post(ctx, req)
if err == nil {
var envHash common.Hash
copy(envHash[:], hash[:]) // slice can't be used as key
api.service.envelopesMonitor.Add(envHash)
api.service.envelopesMonitor.Add(common.BytesToHash(hexID), req)
} else {
return nil, err
}
return hash, err
mID := messageID(req)
return mID[:], err
}
func (api *PublicAPI) getPeer(rawurl string) (*enode.Node, error) {

View File

@ -1,74 +0,0 @@
package shhext
import (
"context"
"errors"
"fmt"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/status-im/status-go/services"
whisper "github.com/status-im/whisper/whisperv6"
)
var (
postSyncTimeout = 60 * time.Second
errEnvelopeExpired = errors.New("envelope expired before being sent")
errNoShhextAttachedAPI = errors.New("No shhext attached")
)
// DebugAPI represents a set of APIs from the `web3.debug` namespace.
type DebugAPI struct {
s *Service
}
// NewDebugAPI creates an instance of the debug API.
func NewDebugAPI(s *Service) *DebugAPI {
return &DebugAPI{s: s}
}
// PostSync sends an envelope through shhext_post and waits until it's sent.
func (api *DebugAPI) PostSync(ctx context.Context, req whisper.NewMessage) (hash hexutil.Bytes, err error) {
shhAPI := services.APIByNamespace(api.s.APIs(), "shhext")
if shhAPI == nil {
err = errNoShhextAttachedAPI
return
}
s, ok := shhAPI.(*PublicAPI)
if !ok {
err = errNoShhextAttachedAPI
return
}
hash, err = s.Post(ctx, req)
if err != nil {
return
}
ctxTimeout, cancel := context.WithTimeout(ctx, postSyncTimeout)
defer cancel()
err = api.waitForHash(ctxTimeout, hash)
return
}
// waitForHash waits for a specific hash to be sent
func (api *DebugAPI) waitForHash(ctx context.Context, hash hexutil.Bytes) error {
h := common.BytesToHash(hash)
events := make(chan whisper.EnvelopeEvent, 100)
sub := api.s.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case ev := <-events:
if ev.Hash == h {
if ev.Event == whisper.EventEnvelopeSent {
return nil
}
if ev.Event == whisper.EventEnvelopeExpired {
return errEnvelopeExpired
}
}
case <-ctx.Done():
return fmt.Errorf("wait for hash canceled: %v", ctx.Err())
}
}
}

View File

@ -1,6 +1,8 @@
package shhext
import (
"context"
"hash/fnv"
"sync"
"github.com/ethereum/go-ethereum/common"
@ -10,15 +12,51 @@ import (
whisper "github.com/status-im/whisper/whisperv6"
)
func messageID(message whisper.NewMessage) common.Hash {
hash := fnv.New32()
_, _ = hash.Write(message.Payload)
_, _ = hash.Write(message.Topic[:])
return common.BytesToHash(hash.Sum(nil))
}
// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor.
func NewEnvelopesMonitor(w *whisper.Whisper, handler EnvelopeEventsHandler, mailServerConfirmation bool, mailPeers *mailservers.PeerStore, maxAttempts int) *EnvelopesMonitor {
return &EnvelopesMonitor{
w: w,
whisperAPI: whisper.NewPublicWhisperAPI(w),
handler: handler,
mailServerConfirmation: mailServerConfirmation,
mailPeers: mailPeers,
maxAttempts: maxAttempts,
// key is envelope hash (event.Hash)
envelopes: map[common.Hash]EnvelopeState{},
messages: map[common.Hash]whisper.NewMessage{},
attempts: map[common.Hash]int{},
// key is messageID
messageToEnvelope: map[common.Hash]common.Hash{},
// key is hash of the batch (event.Batch)
batches: map[common.Hash]map[common.Hash]struct{}{},
}
}
// EnvelopesMonitor is responsible for monitoring whisper envelopes state.
type EnvelopesMonitor struct {
w *whisper.Whisper
whisperAPI *whisper.PublicWhisperAPI
handler EnvelopeEventsHandler
mailServerConfirmation bool
maxAttempts int
mu sync.Mutex
cache map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
mu sync.Mutex
envelopes map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
messageToEnvelope map[common.Hash]common.Hash
messages map[common.Hash]whisper.NewMessage
attempts map[common.Hash]int
mailPeers *mailservers.PeerStore
@ -43,16 +81,33 @@ func (m *EnvelopesMonitor) Stop() {
}
// Add hash to a tracker.
func (m *EnvelopesMonitor) Add(hash common.Hash) {
func (m *EnvelopesMonitor) Add(envelopeHash common.Hash, message whisper.NewMessage) {
m.mu.Lock()
defer m.mu.Unlock()
m.cache[hash] = EnvelopePosted
m.envelopes[envelopeHash] = EnvelopePosted
m.messages[envelopeHash] = message
m.attempts[envelopeHash] = 1
m.messageToEnvelope[messageID(message)] = envelopeHash
}
func (m *EnvelopesMonitor) GetState(hash common.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
state, exist := m.cache[hash]
state, exist := m.envelopes[hash]
if !exist {
return NotRegistered
}
return state
}
func (m *EnvelopesMonitor) GetMessageState(mID common.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
envelope, exist := m.messageToEnvelope[mID]
if !exist {
return NotRegistered
}
state, exist := m.envelopes[envelope]
if !exist {
return NotRegistered
}
@ -83,7 +138,6 @@ func (m *EnvelopesMonitor) handleEvent(event whisper.EnvelopeEvent) {
whisper.EventBatchAcknowledged: m.handleAcknowledgedBatch,
whisper.EventEnvelopeReceived: m.handleEventEnvelopeReceived,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
@ -99,7 +153,7 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event whisper.EnvelopeEvent)
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.cache[event.Hash]
state, ok := m.envelopes[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
@ -113,9 +167,9 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event whisper.EnvelopeEvent)
m.batches[event.Batch][event.Hash] = struct{}{}
log.Debug("waiting for a confirmation", "batch", event.Batch)
} else {
m.cache[event.Hash] = EnvelopeSent
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(event.Hash)
m.handler.EnvelopeSent(messageID(m.messages[event.Hash]))
}
}
}
@ -140,13 +194,13 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent)
}
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
for hash := range envelopes {
state, ok := m.cache[hash]
state, ok := m.envelopes[hash]
if !ok || state == EnvelopeSent {
continue
}
m.cache[hash] = EnvelopeSent
m.envelopes[hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(hash)
m.handler.EnvelopeSent(messageID(m.messages[hash]))
}
}
delete(m.batches, event.Batch)
@ -155,15 +209,33 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent)
func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
if state, ok := m.cache[event.Hash]; ok {
delete(m.cache, event.Hash)
if state, ok := m.envelopes[event.Hash]; ok {
message, exist := m.messages[event.Hash]
if !exist {
log.Error("message was deleted erroneously", "envelope hash", event.Hash)
}
mID := messageID(message)
attempt := m.attempts[event.Hash]
m.clearMessageState(event.Hash)
if state == EnvelopeSent {
return
}
log.Debug("envelope expired", "hash", event.Hash, "state", state)
if m.handler != nil {
m.handler.EnvelopeExpired(event.Hash)
if attempt < m.maxAttempts {
log.Debug("retrying to send a message", "message id", mID, "attempt", attempt+1)
hex, err := m.whisperAPI.Post(context.TODO(), message)
if err != nil {
log.Error("failed to retry sending message", "message id", mID, "attempt", attempt+1)
}
envelopeID := common.BytesToHash(hex)
m.messageToEnvelope[mID] = envelopeID
m.envelopes[envelopeID] = EnvelopePosted
m.messages[envelopeID] = message
m.attempts[envelopeID] = attempt + 1
} else {
log.Debug("envelope expired", "hash", event.Hash, "state", state)
if m.handler != nil {
m.handler.EnvelopeExpired(mID)
}
}
}
}
@ -176,13 +248,23 @@ func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event whisper.EnvelopeEve
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.cache[event.Hash]
state, ok := m.envelopes[event.Hash]
if !ok || state != EnvelopePosted {
return
}
log.Debug("expected envelope received", "hash", event.Hash, "peer", event.Peer)
delete(m.cache, event.Hash)
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(event.Hash)
m.handler.EnvelopeSent(messageID(m.messages[event.Hash]))
}
}
// clearMessageState removes all message and envelope state.
// not thread-safe, should be protected on a higher level.
func (m *EnvelopesMonitor) clearMessageState(envelopeID common.Hash) {
delete(m.envelopes, envelopeID)
mID := messageID(m.messages[envelopeID])
delete(m.messageToEnvelope, mID)
delete(m.messages, envelopeID)
delete(m.attempts, envelopeID)
}

View File

@ -21,22 +21,22 @@ func (s *EnvelopesMonitorSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
s.monitor = &EnvelopesMonitor{
cache: map[common.Hash]EnvelopeState{},
envelopes: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)),
}
}
func (s *EnvelopesMonitorSuite) TestConfirmed() {
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopePosted, s.monitor.cache[testHash])
s.monitor.Add(testHash, whisper.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopeSent, s.monitor.cache[testHash])
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash])
}
func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
@ -45,22 +45,22 @@ func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, nil, 0, 0)
s.Require().NoError(s.monitor.mailPeers.Update([]*enode.Node{node}))
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopePosted, s.monitor.cache[testHash])
s.monitor.Add(testHash, whisper.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
})
s.Equal(EnvelopePosted, s.monitor.cache[testHash])
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventBatchAcknowledged,
Batch: testBatch,
Peer: node.ID(),
})
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopeSent, s.monitor.cache[testHash])
s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash])
}
func (s *EnvelopesMonitorSuite) TestIgnored() {
@ -68,23 +68,23 @@ func (s *EnvelopesMonitorSuite) TestIgnored() {
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.monitor.cache, testHash)
s.NotContains(s.monitor.envelopes, testHash)
}
func (s *EnvelopesMonitorSuite) TestRemoved() {
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.monitor.Add(testHash, whisper.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.monitor.cache, testHash)
s.NotContains(s.monitor.envelopes, testHash)
}
func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() {
// enables filter in the tracker to drop confirmations from non-mailserver peers
s.monitor.mailServerConfirmation = true
s.monitor.Add(testHash)
s.monitor.Add(testHash, whisper.NewMessage{})
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
@ -94,10 +94,10 @@ func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() {
}
func (s *EnvelopesMonitorSuite) TestReceived() {
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.monitor.Add(testHash, whisper.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeReceived,
Hash: testHash})
s.NotContains(s.monitor.cache, testHash)
s.NotContains(s.monitor.envelopes, testHash)
}

View File

@ -50,7 +50,6 @@ type Service struct {
nodeID *ecdsa.PrivateKey
deduplicator *dedup.Deduplicator
protocol *chat.ProtocolService
debug bool
dataDir string
installationID string
pfsEnabled bool
@ -79,14 +78,7 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf
cache: map[common.Hash]EnvelopeState{},
requestsRegistry: requestsRegistry,
}
envelopesMonitor := &EnvelopesMonitor{
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: ps,
mailServerConfirmation: config.MailServerConfirmations,
}
envelopesMonitor := NewEnvelopesMonitor(w, handler, config.MailServerConfirmations, ps, config.MaxMessageDeliveryAttempts)
return &Service{
w: w,
config: config,
@ -94,7 +86,6 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf
mailMonitor: mailMonitor,
requestsRegistry: requestsRegistry,
deduplicator: dedup.NewDeduplicator(w, db),
debug: config.DebugAPIEnabled,
dataDir: config.BackupDisabledDataDir,
installationID: config.InstallationID,
pfsEnabled: config.PFSEnabled,
@ -250,16 +241,6 @@ func (s *Service) APIs() []rpc.API {
Public: true,
},
}
if s.debug {
apis = append(apis, rpc.API{
Namespace: "debug",
Version: "1.0",
Service: NewDebugAPI(s),
Public: true,
})
}
return apis
}

View File

@ -2,12 +2,12 @@ package shhext
import (
"context"
"errors"
"fmt"
"io/ioutil"
"math"
"net"
"os"
"sync/atomic"
"testing"
"time"
@ -30,6 +30,7 @@ const (
// internal whisper protocol codes
statusCode = 0
messagesCode = 1
batchAcknowledgeCode = 11
p2pRequestCompleteCode = 125
)
@ -112,7 +113,6 @@ func (s *ShhExtSuite) SetupTest() {
config := params.ShhextConfig{
InstallationID: "1",
BackupDisabledDataDir: directory,
DebugAPIEnabled: true,
PFSEnabled: true,
MailServerConfirmations: true,
ConnectionTarget: 10,
@ -149,17 +149,19 @@ func (s *ShhExtSuite) TestPostMessageWithConfirmation() {
client, err := s.nodes[0].Attach()
s.NoError(err)
var hash common.Hash
s.NoError(client.Call(&hash, "shhext_post", whisper.NewMessage{
message := whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}))
}
mid := messageID(message)
s.NoError(client.Call(&hash, "shhext_post", message))
s.NoError(err)
select {
case confirmed := <-mock.confirmations:
s.Equal(hash, confirmed)
s.Equal(mid, confirmed)
case <-time.After(5 * time.Second):
s.Fail("timed out while waiting for confirmation")
}
@ -173,18 +175,20 @@ func (s *ShhExtSuite) TestWaitMessageExpired() {
client, err := s.nodes[0].Attach()
s.NoError(err)
var hash common.Hash
s.NoError(client.Call(&hash, "shhext_post", whisper.NewMessage{
message := whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
TTL: 1,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}))
}
mid := messageID(message)
s.NoError(client.Call(&hash, "shhext_post", message))
s.NoError(err)
select {
case expired := <-mock.expirations:
s.Equal(hash, expired)
s.Equal(mid, expired)
case confirmed := <-mock.confirmations:
s.Fail("unexpected confirmation for hash", confirmed)
case <-time.After(10 * time.Second):
@ -374,98 +378,6 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
s.Require().NoError(waitForHashInMonitor(api.service.mailMonitor, common.BytesToHash(hash), MailServerRequestSent, time.Second))
}
func (s *ShhExtSuite) TestDebugPostSync() {
mock := newHandlerMock(1)
s.services[0].envelopesMonitor.handler = mock
symID, err := s.whisper[0].GenerateSymKey()
s.NoError(err)
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
client, err := s.nodes[0].Attach()
s.NoError(err)
var hash common.Hash
var testCases = []struct {
name string
msg whisper.NewMessage
postSyncTimeout time.Duration
expectedErr error
}{
{
name: "timeout",
msg: whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
},
postSyncTimeout: postSyncTimeout,
expectedErr: nil,
},
{
name: "invalid message",
msg: whisper.NewMessage{
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
},
postSyncTimeout: postSyncTimeout,
expectedErr: whisper.ErrSymAsym,
},
{
name: "context deadline exceeded",
msg: whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 10,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
TTL: 100,
Payload: []byte("hello"),
},
postSyncTimeout: 1 * time.Millisecond,
expectedErr: errors.New("context deadline exceeded"),
},
}
for _, tc := range testCases {
s.T().Run(tc.name, func(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), tc.postSyncTimeout)
defer cancel()
err := client.CallContext(ctx, &hash, "debug_postSync", tc.msg)
if tc.expectedErr != nil {
s.Equal(tc.expectedErr.Error(), err.Error())
} else {
s.NoError(err)
}
})
}
}
func (s *ShhExtSuite) TestEnvelopeExpiredOnDebugPostSync() {
mock := newHandlerMock(1)
s.services[0].envelopesMonitor.handler = mock
symID, err := s.whisper[0].GenerateSymKey()
s.NoError(err)
client, err := s.nodes[0].Attach()
s.NoError(err)
var hash common.Hash
ctx, cancel := context.WithTimeout(context.Background(), postSyncTimeout)
defer cancel()
err = client.CallContext(ctx, &hash, "debug_postSync", whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
TTL: 1,
})
s.Equal(errEnvelopeExpired.Error(), err.Error())
}
func (s *ShhExtSuite) TearDown() {
for _, n := range s.nodes {
s.NoError(n.Stop())
@ -495,8 +407,8 @@ type WhisperNodeMockSuite struct {
localNode *enode.Node
remoteRW *p2p.MsgPipeRW
localService *Service
localEnvelopeMonitor *EnvelopesMonitor
localService *Service
localEnvelopesMonitor *EnvelopesMonitor
}
func (s *WhisperNodeMockSuite) SetupTest() {
@ -507,6 +419,7 @@ func (s *WhisperNodeMockSuite) SetupTest() {
MaxMessageSize: 100 << 10,
}
w := whisper.New(conf)
s.Require().NoError(w.Start(nil))
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, net.ParseIP("127.0.0.1"), 1, 1)
@ -520,10 +433,11 @@ func (s *WhisperNodeMockSuite) SetupTest() {
s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true}))
s.Require().NoError(p2p.SendItems(rw1, statusCode, whisper.ProtocolVersion, whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true))
s.localService = New(w, nil, db, params.ShhextConfig{MailServerConfirmations: true})
s.localService = New(w, nil, db, params.ShhextConfig{MailServerConfirmations: true, MaxMessageDeliveryAttempts: 3})
s.Require().NoError(s.localService.UpdateMailservers([]*enode.Node{node}))
s.localEnvelopeMonitor = s.localService.envelopesMonitor
s.localEnvelopeMonitor.Start()
s.localEnvelopesMonitor = s.localService.envelopesMonitor
s.localEnvelopesMonitor.Start()
s.localWhisperAPI = whisper.NewPublicWhisperAPI(w)
s.localAPI = NewPublicAPI(s.localService)
@ -531,6 +445,19 @@ func (s *WhisperNodeMockSuite) SetupTest() {
s.remoteRW = rw1
}
func (s *WhisperNodeMockSuite) PostMessage(message whisper.NewMessage) common.Hash {
envBytes, err := s.localAPI.Post(context.TODO(), message)
envHash := common.BytesToHash(envBytes)
s.Require().NoError(err)
s.Require().NoError(utils.Eventually(func() error {
if state := s.localEnvelopesMonitor.GetMessageState(envHash); state != EnvelopePosted {
return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String())
}
return nil
}, 2*time.Second, 100*time.Millisecond))
return envHash
}
func TestRequestMessagesSync(t *testing.T) {
suite.Run(t, new(RequestMessagesSyncSuite))
}
@ -597,19 +524,11 @@ type WhisperConfirmationSuite struct {
func (s *WhisperConfirmationSuite) TestEnvelopeReceived() {
symID, err := s.localWhisperAPI.GenerateSymKeyFromPassword(context.TODO(), "test")
s.Require().NoError(err)
envBytes, err := s.localAPI.Post(context.TODO(), whisper.NewMessage{
envHash := s.PostMessage(whisper.NewMessage{
SymKeyID: symID,
TTL: 1000,
Topic: whisper.TopicType{0x01},
})
envHash := common.BytesToHash(envBytes)
s.Require().NoError(err)
s.Require().NoError(utils.Eventually(func() error {
if state := s.localEnvelopeMonitor.GetState(envHash); state != EnvelopePosted {
return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String())
}
return nil
}, 2*time.Second, 100*time.Millisecond))
// enable auto-replies once message got registered internally
go func() {
@ -629,9 +548,102 @@ func (s *WhisperConfirmationSuite) TestEnvelopeReceived() {
// wait for message to be removed because it was delivered by remoteRW
s.Require().NoError(utils.Eventually(func() error {
if state := s.localEnvelopeMonitor.GetState(envHash); state != NotRegistered {
return fmt.Errorf("envelope with hash %s wasn't removed from tracker", envHash.String())
if state := s.localEnvelopesMonitor.GetMessageState(envHash); state == EnvelopePosted {
return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String())
}
return nil
}, 2*time.Second, 100*time.Millisecond))
}
func TestWhisperRetriesSuite(t *testing.T) {
suite.Run(t, new(WhisperRetriesSuite))
}
type WhisperRetriesSuite struct {
WhisperNodeMockSuite
}
func (s *WhisperRetriesSuite) TestUseAllAvaiableAttempts() {
var attempts int32
go func() {
for {
msg, err := s.remoteRW.ReadMsg()
s.Require().NoError(err)
s.Require().NoError(msg.Discard())
if msg.Code != messagesCode {
continue
}
atomic.AddInt32(&attempts, 1)
}
}()
symID, err := s.localWhisperAPI.GenerateSymKeyFromPassword(context.TODO(), "test")
s.Require().NoError(err)
message := whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
TTL: 1,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}
s.Require().NotNil(s.PostMessage(message))
s.Require().NoError(utils.Eventually(func() error {
madeAttempts := atomic.LoadInt32(&attempts)
if madeAttempts != int32(s.localEnvelopesMonitor.maxAttempts) {
return fmt.Errorf("made unexpected number of attempts to deliver a message: %d != %d", s.localEnvelopesMonitor.maxAttempts, madeAttempts)
}
return nil
}, 10*time.Second, 500*time.Millisecond))
}
func (s *WhisperRetriesSuite) testDelivery(target int) {
go func() {
attempt := 0
for {
msg, err := s.remoteRW.ReadMsg()
s.Require().NoError(err)
if msg.Code != messagesCode {
s.Require().NoError(msg.Discard())
continue
}
attempt++
if attempt != target {
s.Require().NoError(msg.Discard())
continue
}
data, err := ioutil.ReadAll(msg.Payload)
s.Require().NoError(err)
// without this hack event from the whisper read loop will be sent sooner than event from write loop
// i don't think that this is realistic situation and can be reproduced only in test with in-memory
// connection mock
time.Sleep(time.Nanosecond)
s.Require().NoError(p2p.Send(s.remoteRW, batchAcknowledgeCode, crypto.Keccak256Hash(data)))
}
}()
symID, err := s.localWhisperAPI.GenerateSymKeyFromPassword(context.TODO(), "test")
s.Require().NoError(err)
message := whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
TTL: 1,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}
mID := messageID(message)
s.Require().NotNil(s.PostMessage(message))
s.Require().NoError(utils.Eventually(func() error {
if state := s.localEnvelopesMonitor.GetMessageState(mID); state != EnvelopeSent {
return fmt.Errorf("message with ID %s wasn't sent", mID.String())
}
return nil
}, 3*time.Second, 100*time.Millisecond))
}
func (s *WhisperRetriesSuite) TestDeliveredFromFirstAttempt() {
s.testDelivery(1)
}
func (s *WhisperRetriesSuite) TestDeliveredFromSecondAttempt() {
s.testDelivery(2)
}

View File

@ -76,7 +76,6 @@ func (s *BaseJSONRPCSuite) SetupTest(upstreamEnabled, statusServiceEnabled, debu
nodeConfig.IPCEnabled = false
nodeConfig.EnableStatusService = statusServiceEnabled
nodeConfig.ShhextConfig.DebugAPIEnabled = debugAPIEnabled
if debugAPIEnabled {
nodeConfig.AddAPIModule("debug")
}

View File

@ -1,154 +0,0 @@
package services
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/p2p"
"github.com/status-im/status-go/node"
"github.com/status-im/status-go/params"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/status-im/status-go/t/helpers"
. "github.com/status-im/status-go/t/utils"
)
func TestDebugAPISuite(t *testing.T) {
s := new(DebugAPISuite)
s.upstream = false
suite.Run(t, s)
}
func TestDebugAPISuiteUpstream(t *testing.T) {
s := new(DebugAPISuite)
s.upstream = true
suite.Run(t, s)
}
type DebugAPISuite struct {
BaseJSONRPCSuite
upstream bool
}
func (s *DebugAPISuite) TestAccessibleDebugAPIsUnexported() {
if s.upstream && GetNetworkID() == params.StatusChainNetworkID {
s.T().Skip()
return
}
err := s.SetupTest(s.upstream, false, false)
s.NoError(err)
// Debug APIs should be unavailable
s.AssertAPIMethodUnexported("debug_postSync")
err = s.Backend.StopNode()
s.NoError(err)
err = s.SetupTest(s.upstream, false, true)
s.NoError(err)
defer func() {
err := s.Backend.StopNode()
s.NoError(err)
}()
// Debug APIs should be available
s.AssertAPIMethodExported("debug_postSync")
}
func (s *DebugAPISuite) TestDebugPostSyncSuccess() {
// Test upstream if that's not StatusChain
if s.upstream && GetNetworkID() == params.StatusChainNetworkID {
s.T().Skip()
return
}
err := s.SetupTest(s.upstream, false, true)
s.NoError(err)
defer func() {
err := s.Backend.StopNode()
s.NoError(err)
}()
dir, err := ioutil.TempDir("", "test-debug")
s.NoError(err)
defer os.RemoveAll(dir) //nolint: errcheck
s.addPeerToCurrentNode(dir)
symID := s.generateSymKey()
result := s.sendPostConfirmMessage(symID)
var r struct {
Error struct {
Message string `json:"message"`
} `json:"error"`
Result hexutil.Bytes `json:"result"`
}
s.NoError(json.Unmarshal([]byte(result), &r))
s.Empty(r.Error.Message)
s.NotEmpty(r.Result)
}
// generateSymKey generates and stores a symetric key.
func (s *DebugAPISuite) generateSymKey() string {
w, err := s.Backend.StatusNode().WhisperService()
s.Require().NoError(err)
symID, err := w.GenerateSymKey()
s.Require().NoError(err)
return symID
}
// sendPostConfirmMessage calls debug_postSync endpoint with valid
// parameters.
func (s *DebugAPISuite) sendPostConfirmMessage(symID string) string {
req := whisper.NewMessage{
SymKeyID: symID,
PowTarget: whisper.DefaultMinimumPoW,
PowTime: 200,
Topic: whisper.TopicType{0x01, 0x01, 0x01, 0x01},
Payload: []byte("hello"),
}
body, err := json.Marshal(req)
s.NoError(err)
basicCall := fmt.Sprintf(
`{"jsonrpc":"2.0","method":"debug_postSync","params":[%s],"id":67}`,
body)
resp, err := s.Backend.CallPrivateRPC(basicCall)
s.NoError(err)
return resp
}
// addPeers adds a peer to the running node
func (s *DebugAPISuite) addPeerToCurrentNode(dir string) {
s.Require().NotNil(s.Backend)
node1 := s.Backend.StatusNode().GethNode()
s.NotNil(node1)
node2 := s.newPeer("test2", dir).GethNode()
s.NotNil(node2)
errCh := helpers.WaitForPeerAsync(s.Backend.StatusNode().Server(),
node2.Server().Self().String(),
p2p.PeerEventTypeAdd,
time.Second*5)
node1.Server().AddPeer(node2.Server().Self())
require.NoError(s.T(), <-errCh)
}
// newNode creates, configures and starts a new peer.
func (s *DebugAPISuite) newPeer(name, dir string) *node.StatusNode {
// network id is irrelevant
cfg, err := MakeTestNodeConfigWithDataDir(name, dir, 777)
s.Require().NoError(err)
n := node.New()
s.Require().NoError(n.Start(cfg))
return n
}