Update selected mail servers that are used in envelope tracker

This commit is contained in:
Dmitry 2018-11-21 12:22:30 +02:00 committed by Dmitry Shulyak
parent d61e838235
commit e60dbe3c1b
4 changed files with 108 additions and 16 deletions

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/log"
gethnode "github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p/enode"
fcmlib "github.com/NaySoftware/go-fcm"
@ -571,3 +572,21 @@ func (b *StatusBackend) DisableInstallation(installationID string) error {
return nil
}
// UpdateMailservers on ShhExtService.
func (b *StatusBackend) UpdateMailservers(enodes []string) error {
st, err := b.statusNode.ShhExtService()
if err != nil {
return err
}
nodes := make([]*enode.Node, len(enodes))
for i, rawurl := range enodes {
node, err := enode.ParseV4(rawurl)
if err != nil {
return err
}
nodes[i] = node
}
st.UpdateMailservers(nodes)
return nil
}

View File

@ -462,6 +462,18 @@ func NotifyUsers(message, payloadJSON, tokensArray *C.char) (outCBytes *C.char)
return
}
// UpdateMailservers updates mail servers in status backend.
//export UpdateMailservers
func UpdateMailservers(data *C.char) *C.char {
var enodes []string
err := json.Unmarshal([]byte(C.GoString(data)), &enodes)
if err != nil {
return makeJSONResponse(err)
}
err = statusBackend.UpdateMailservers(enodes)
return makeJSONResponse(err)
}
// AddPeer adds an enode as a peer.
//export AddPeer
func AddPeer(enode *C.char) *C.char {

View File

@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/services/shhext/chat"
"github.com/status-im/status-go/services/shhext/dedup"
@ -56,10 +57,11 @@ type Service struct {
}
type ServiceConfig struct {
DataDir string
InstallationID string
Debug bool
PFSEnabled bool
DataDir string
InstallationID string
Debug bool
PFSEnabled bool
MailServerConfirmations bool
}
// Make sure that Service implements node.Service interface.
@ -68,10 +70,12 @@ var _ node.Service = (*Service)(nil)
// New returns a new Service. dataDir is a folder path to a network-independent location
func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, config *ServiceConfig) *Service {
track := &tracker{
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailservers: map[enode.ID]struct{}{},
mailServerConfirmation: config.MailServerConfirmations,
}
return &Service{
w: w,
@ -84,6 +88,11 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf
}
}
// UpdateMailservers updates information about selected mail servers.
func (s *Service) UpdateMailservers(nodes []*enode.Node) {
s.tracker.UpdateMailservers(nodes)
}
// Protocols returns a new protocols list. In this case, there are none.
func (s *Service) Protocols() []p2p.Protocol {
return []p2p.Protocol{}
@ -182,13 +191,17 @@ func (s *Service) Stop() error {
// tracker responsible for processing events for envelopes that we are interested in
// and calling specified handler.
type tracker struct {
w *whisper.Whisper
handler EnvelopeEventsHandler
w *whisper.Whisper
handler EnvelopeEventsHandler
mailServerConfirmation bool
mu sync.Mutex
cache map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
mailMu sync.Mutex
mailservers map[enode.ID]struct{}
wg sync.WaitGroup
quit chan struct{}
}
@ -216,6 +229,15 @@ func (t *tracker) Add(hash common.Hash) {
t.cache[hash] = EnvelopePosted
}
func (t *tracker) UpdateMailservers(nodes []*enode.Node) {
t.mailMu.Lock()
defer t.mailMu.Unlock()
t.mailservers = map[enode.ID]struct{}{}
for _, n := range nodes {
t.mailservers[n.ID()] = struct{}{}
}
}
// Add request hash to a tracker.
func (t *tracker) AddRequest(hash common.Hash, timerC <-chan time.Time) {
t.mu.Lock()
@ -268,6 +290,12 @@ func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
}
func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
@ -292,7 +320,28 @@ func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
}
}
func (t *tracker) isMailserver(peer enode.ID) bool {
t.mailMu.Lock()
defer t.mailMu.Unlock()
if len(t.mailservers) == 0 {
log.Error("mail servers are empty")
return false
}
_, exist := t.mailservers[peer]
if !exist {
log.Debug("confirmation received not from a mail server is skipped", "peer", peer)
return false
}
return true
}
func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()

View File

@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/node"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/t/helpers"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
@ -88,10 +89,11 @@ func (s *ShhExtSuite) SetupTest() {
return s.whisper[i], nil
}))
config := &ServiceConfig{
InstallationID: "1",
DataDir: os.TempDir(),
Debug: true,
PFSEnabled: false,
InstallationID: "1",
DataDir: os.TempDir(),
Debug: true,
PFSEnabled: false,
MailServerConfirmations: true,
}
s.services[i] = New(s.whisper[i], nil, nil, config)
s.NoError(stack.Register(func(n *node.ServiceContext) (node.Service, error) {
@ -106,6 +108,7 @@ func (s *ShhExtSuite) SetupTest() {
func (s *ShhExtSuite) TestPostMessageWithConfirmation() {
mock := newHandlerMock(1)
s.services[0].tracker.handler = mock
s.services[0].UpdateMailservers([]*enode.Node{s.nodes[1].Server().Self()})
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
symID, err := s.whisper[0].GenerateSymKey()
s.NoError(err)
@ -411,18 +414,23 @@ type TrackerSuite struct {
func (s *TrackerSuite) SetupTest() {
s.tracker = &tracker{
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailservers: map[enode.ID]struct{}{},
mailServerConfirmation: true,
}
}
func (s *TrackerSuite) TestConfirmed() {
testPeer := enode.ID{1}
s.tracker.mailservers[testPeer] = struct{}{}
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Peer: testPeer,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
@ -430,18 +438,22 @@ func (s *TrackerSuite) TestConfirmed() {
func (s *TrackerSuite) TestConfirmedWithAcknowledge() {
testBatch := common.Hash{1}
testPeer := enode.ID{1}
s.tracker.Add(testHash)
s.tracker.mailservers[testPeer] = struct{}{}
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
Peer: testPeer,
})
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventBatchAcknowledged,
Batch: testBatch,
Peer: testPeer,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])