From e60dbe3c1ba0e4a241ada3eed462682dadc2cea8 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Wed, 21 Nov 2018 12:22:30 +0200 Subject: [PATCH] Update selected mail servers that are used in envelope tracker --- api/backend.go | 19 +++++++++ lib/library.go | 12 ++++++ services/shhext/service.go | 69 ++++++++++++++++++++++++++++----- services/shhext/service_test.go | 24 +++++++++--- 4 files changed, 108 insertions(+), 16 deletions(-) diff --git a/api/backend.go b/api/backend.go index 8a9240688..edcaf12e6 100644 --- a/api/backend.go +++ b/api/backend.go @@ -11,6 +11,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" gethnode "github.com/ethereum/go-ethereum/node" + "github.com/ethereum/go-ethereum/p2p/enode" fcmlib "github.com/NaySoftware/go-fcm" @@ -571,3 +572,21 @@ func (b *StatusBackend) DisableInstallation(installationID string) error { return nil } + +// UpdateMailservers on ShhExtService. +func (b *StatusBackend) UpdateMailservers(enodes []string) error { + st, err := b.statusNode.ShhExtService() + if err != nil { + return err + } + nodes := make([]*enode.Node, len(enodes)) + for i, rawurl := range enodes { + node, err := enode.ParseV4(rawurl) + if err != nil { + return err + } + nodes[i] = node + } + st.UpdateMailservers(nodes) + return nil +} diff --git a/lib/library.go b/lib/library.go index e8ab8d988..4a6dbb171 100644 --- a/lib/library.go +++ b/lib/library.go @@ -462,6 +462,18 @@ func NotifyUsers(message, payloadJSON, tokensArray *C.char) (outCBytes *C.char) return } +// UpdateMailservers updates mail servers in status backend. +//export UpdateMailservers +func UpdateMailservers(data *C.char) *C.char { + var enodes []string + err := json.Unmarshal([]byte(C.GoString(data)), &enodes) + if err != nil { + return makeJSONResponse(err) + } + err = statusBackend.UpdateMailservers(enodes) + return makeJSONResponse(err) +} + // AddPeer adds an enode as a peer. //export AddPeer func AddPeer(enode *C.char) *C.char { diff --git a/services/shhext/service.go b/services/shhext/service.go index 4bdb444b1..91412924d 100644 --- a/services/shhext/service.go +++ b/services/shhext/service.go @@ -13,6 +13,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/services/shhext/chat" "github.com/status-im/status-go/services/shhext/dedup" @@ -56,10 +57,11 @@ type Service struct { } type ServiceConfig struct { - DataDir string - InstallationID string - Debug bool - PFSEnabled bool + DataDir string + InstallationID string + Debug bool + PFSEnabled bool + MailServerConfirmations bool } // Make sure that Service implements node.Service interface. @@ -68,10 +70,12 @@ var _ node.Service = (*Service)(nil) // New returns a new Service. dataDir is a folder path to a network-independent location func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config *ServiceConfig) *Service { track := &tracker{ - w: w, - handler: handler, - cache: map[common.Hash]EnvelopeState{}, - batches: map[common.Hash]map[common.Hash]struct{}{}, + w: w, + handler: handler, + cache: map[common.Hash]EnvelopeState{}, + batches: map[common.Hash]map[common.Hash]struct{}{}, + mailservers: map[enode.ID]struct{}{}, + mailServerConfirmation: config.MailServerConfirmations, } return &Service{ w: w, @@ -84,6 +88,11 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf } } +// UpdateMailservers updates information about selected mail servers. +func (s *Service) UpdateMailservers(nodes []*enode.Node) { + s.tracker.UpdateMailservers(nodes) +} + // Protocols returns a new protocols list. In this case, there are none. func (s *Service) Protocols() []p2p.Protocol { return []p2p.Protocol{} @@ -182,13 +191,17 @@ func (s *Service) Stop() error { // tracker responsible for processing events for envelopes that we are interested in // and calling specified handler. type tracker struct { - w *whisper.Whisper - handler EnvelopeEventsHandler + w *whisper.Whisper + handler EnvelopeEventsHandler + mailServerConfirmation bool mu sync.Mutex cache map[common.Hash]EnvelopeState batches map[common.Hash]map[common.Hash]struct{} + mailMu sync.Mutex + mailservers map[enode.ID]struct{} + wg sync.WaitGroup quit chan struct{} } @@ -216,6 +229,15 @@ func (t *tracker) Add(hash common.Hash) { t.cache[hash] = EnvelopePosted } +func (t *tracker) UpdateMailservers(nodes []*enode.Node) { + t.mailMu.Lock() + defer t.mailMu.Unlock() + t.mailservers = map[enode.ID]struct{}{} + for _, n := range nodes { + t.mailservers[n.ID()] = struct{}{} + } +} + // Add request hash to a tracker. func (t *tracker) AddRequest(hash common.Hash, timerC <-chan time.Time) { t.mu.Lock() @@ -268,6 +290,12 @@ func (t *tracker) handleEvent(event whisper.EnvelopeEvent) { } func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) { + if t.mailServerConfirmation { + if !t.isMailserver(event.Peer) { + return + } + } + t.mu.Lock() defer t.mu.Unlock() @@ -292,7 +320,28 @@ func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) { } } +func (t *tracker) isMailserver(peer enode.ID) bool { + t.mailMu.Lock() + defer t.mailMu.Unlock() + if len(t.mailservers) == 0 { + log.Error("mail servers are empty") + return false + } + _, exist := t.mailservers[peer] + if !exist { + log.Debug("confirmation received not from a mail server is skipped", "peer", peer) + return false + } + return true +} + func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) { + if t.mailServerConfirmation { + if !t.isMailserver(event.Peer) { + return + } + } + t.mu.Lock() defer t.mu.Unlock() diff --git a/services/shhext/service_test.go b/services/shhext/service_test.go index b106752c7..254e5c40b 100644 --- a/services/shhext/service_test.go +++ b/services/shhext/service_test.go @@ -12,6 +12,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/node" "github.com/ethereum/go-ethereum/p2p" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/status-im/status-go/t/helpers" whisper "github.com/status-im/whisper/whisperv6" "github.com/stretchr/testify/suite" @@ -88,10 +89,11 @@ func (s *ShhExtSuite) SetupTest() { return s.whisper[i], nil })) config := &ServiceConfig{ - InstallationID: "1", - DataDir: os.TempDir(), - Debug: true, - PFSEnabled: false, + InstallationID: "1", + DataDir: os.TempDir(), + Debug: true, + PFSEnabled: false, + MailServerConfirmations: true, } s.services[i] = New(s.whisper[i], nil, nil, config) s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) { @@ -106,6 +108,7 @@ func (s *ShhExtSuite) SetupTest() { func (s *ShhExtSuite) TestPostMessageWithConfirmation() { mock := newHandlerMock(1) s.services[0].tracker.handler = mock + s.services[0].UpdateMailservers([]*enode.Node{s.nodes[1].Server().Self()}) s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self()) symID, err := s.whisper[0].GenerateSymKey() s.NoError(err) @@ -411,18 +414,23 @@ type TrackerSuite struct { func (s *TrackerSuite) SetupTest() { s.tracker = &tracker{ - cache: map[common.Hash]EnvelopeState{}, - batches: map[common.Hash]map[common.Hash]struct{}{}, + cache: map[common.Hash]EnvelopeState{}, + batches: map[common.Hash]map[common.Hash]struct{}{}, + mailservers: map[enode.ID]struct{}{}, + mailServerConfirmation: true, } } func (s *TrackerSuite) TestConfirmed() { + testPeer := enode.ID{1} + s.tracker.mailservers[testPeer] = struct{}{} s.tracker.Add(testHash) s.Contains(s.tracker.cache, testHash) s.Equal(EnvelopePosted, s.tracker.cache[testHash]) s.tracker.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventEnvelopeSent, Hash: testHash, + Peer: testPeer, }) s.Contains(s.tracker.cache, testHash) s.Equal(EnvelopeSent, s.tracker.cache[testHash]) @@ -430,18 +438,22 @@ func (s *TrackerSuite) TestConfirmed() { func (s *TrackerSuite) TestConfirmedWithAcknowledge() { testBatch := common.Hash{1} + testPeer := enode.ID{1} s.tracker.Add(testHash) + s.tracker.mailservers[testPeer] = struct{}{} s.Contains(s.tracker.cache, testHash) s.Equal(EnvelopePosted, s.tracker.cache[testHash]) s.tracker.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventEnvelopeSent, Hash: testHash, Batch: testBatch, + Peer: testPeer, }) s.Equal(EnvelopePosted, s.tracker.cache[testHash]) s.tracker.handleEvent(whisper.EnvelopeEvent{ Event: whisper.EventBatchAcknowledged, Batch: testBatch, + Peer: testPeer, }) s.Contains(s.tracker.cache, testHash) s.Equal(EnvelopeSent, s.tracker.cache[testHash])