Split shhext.tracker into envelopes and mail monitors

This commit is contained in:
Dmitry 2019-02-20 08:57:57 +02:00 committed by Dmitry Shulyak
parent c869160ee7
commit f2a400dc44
9 changed files with 537 additions and 437 deletions

View File

@ -169,7 +169,7 @@ func (api *PublicAPI) Post(ctx context.Context, req whisper.NewMessage) (hash he
if err == nil {
var envHash common.Hash
copy(envHash[:], hash[:]) // slice can't be used as key
api.service.tracker.Add(envHash)
api.service.envelopesMonitor.Add(envHash)
}
return hash, err
}

View File

@ -0,0 +1,188 @@
package shhext
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6"
)
// EnvelopesMonitor is responsible for monitoring whisper envelopes state.
type EnvelopesMonitor struct {
w *whisper.Whisper
handler EnvelopeEventsHandler
mailServerConfirmation bool
mu sync.Mutex
cache map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
mailPeers *mailservers.PeerStore
wg sync.WaitGroup
quit chan struct{}
}
// Start processing events.
func (m *EnvelopesMonitor) Start() {
m.quit = make(chan struct{})
m.wg.Add(1)
go func() {
m.handleEnvelopeEvents()
m.wg.Done()
}()
}
// Stop process events.
func (m *EnvelopesMonitor) Stop() {
close(m.quit)
m.wg.Wait()
}
// Add hash to a tracker.
func (m *EnvelopesMonitor) Add(hash common.Hash) {
m.mu.Lock()
defer m.mu.Unlock()
m.cache[hash] = EnvelopePosted
}
func (m *EnvelopesMonitor) GetState(hash common.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
state, exist := m.cache[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes whisper envelope events
func (m *EnvelopesMonitor) handleEnvelopeEvents() {
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := m.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case <-m.quit:
return
case event := <-events:
m.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (m *EnvelopesMonitor) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventEnvelopeSent: m.handleEventEnvelopeSent,
whisper.EventEnvelopeExpired: m.handleEventEnvelopeExpired,
whisper.EventBatchAcknowledged: m.handleAcknowledgedBatch,
whisper.EventEnvelopeReceived: m.handleEventEnvelopeReceived,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.cache[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
if event.Batch != (common.Hash{}) {
if _, ok := m.batches[event.Batch]; !ok {
m.batches[event.Batch] = map[common.Hash]struct{}{}
}
m.batches[event.Batch][event.Hash] = struct{}{}
log.Debug("waiting for a confirmation", "batch", event.Batch)
} else {
m.cache[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(event.Hash)
}
}
}
func (m *EnvelopesMonitor) isMailserver(peer enode.ID) bool {
return m.mailPeers.Exist(peer)
}
func (m *EnvelopesMonitor) handleAcknowledgedBatch(event whisper.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
envelopes, ok := m.batches[event.Batch]
if !ok {
log.Debug("batch is not found", "batch", event.Batch)
}
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
for hash := range envelopes {
state, ok := m.cache[hash]
if !ok || state == EnvelopeSent {
continue
}
m.cache[hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(hash)
}
}
delete(m.batches, event.Batch)
}
func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
if state, ok := m.cache[event.Hash]; ok {
delete(m.cache, event.Hash)
if state == EnvelopeSent {
return
}
log.Debug("envelope expired", "hash", event.Hash, "state", state)
if m.handler != nil {
m.handler.EnvelopeExpired(event.Hash)
}
}
}
func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event whisper.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) {
return
}
}
m.mu.Lock()
defer m.mu.Unlock()
state, ok := m.cache[event.Hash]
if !ok || state != EnvelopePosted {
return
}
log.Debug("expected envelope received", "hash", event.Hash, "peer", event.Peer)
delete(m.cache, event.Hash)
if m.handler != nil {
m.handler.EnvelopeSent(event.Hash)
}
}

View File

@ -0,0 +1,103 @@
package shhext
import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
type EnvelopesMonitorSuite struct {
suite.Suite
monitor *EnvelopesMonitor
}
func (s *EnvelopesMonitorSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
s.monitor = &EnvelopesMonitor{
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)),
}
}
func (s *EnvelopesMonitorSuite) TestConfirmed() {
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopePosted, s.monitor.cache[testHash])
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopeSent, s.monitor.cache[testHash])
}
func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
testBatch := common.Hash{1}
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, nil, 0, 0)
s.Require().NoError(s.monitor.mailPeers.Update([]*enode.Node{node}))
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopePosted, s.monitor.cache[testHash])
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
})
s.Equal(EnvelopePosted, s.monitor.cache[testHash])
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventBatchAcknowledged,
Batch: testBatch,
Peer: node.ID(),
})
s.Contains(s.monitor.cache, testHash)
s.Equal(EnvelopeSent, s.monitor.cache[testHash])
}
func (s *EnvelopesMonitorSuite) TestIgnored() {
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.monitor.cache, testHash)
}
func (s *EnvelopesMonitorSuite) TestRemoved() {
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.monitor.cache, testHash)
}
func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() {
// enables filter in the tracker to drop confirmations from non-mailserver peers
s.monitor.mailServerConfirmation = true
s.monitor.Add(testHash)
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Peer: enode.ID{1}, // could be empty, doesn't impact test behaviour
})
s.Require().Equal(EnvelopePosted, s.monitor.GetState(testHash))
}
func (s *EnvelopesMonitorSuite) TestReceived() {
s.monitor.Add(testHash)
s.Contains(s.monitor.cache, testHash)
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeReceived,
Hash: testHash})
s.NotContains(s.monitor.cache, testHash)
}

View File

@ -0,0 +1,130 @@
package shhext
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
whisper "github.com/status-im/whisper/whisperv6"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = -1
// EnvelopePosted is set when envelope was added to a local whisper queue.
EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to atleast one peer.
EnvelopeSent
// MailServerRequestSent is set when p2p request is sent to the mailserver
MailServerRequestSent
)
// MailRequestMonitor is responsible for monitoring history request to mailservers.
type MailRequestMonitor struct {
w *whisper.Whisper
handler EnvelopeEventsHandler
mu sync.Mutex
cache map[common.Hash]EnvelopeState
requestsRegistry *RequestsRegistry
wg sync.WaitGroup
quit chan struct{}
}
// Start processing events.
func (m *MailRequestMonitor) Start() {
m.quit = make(chan struct{})
m.wg.Add(1)
go func() {
m.handleEnvelopeEvents()
m.wg.Done()
}()
}
// Stop process events.
func (m *MailRequestMonitor) Stop() {
close(m.quit)
m.wg.Wait()
}
func (m *MailRequestMonitor) GetState(hash common.Hash) EnvelopeState {
m.mu.Lock()
defer m.mu.Unlock()
state, exist := m.cache[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes whisper envelope events
func (m *MailRequestMonitor) handleEnvelopeEvents() {
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := m.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case <-m.quit:
return
case event := <-events:
m.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from MailRequestMonitor
func (m *MailRequestMonitor) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventMailServerRequestSent: m.handleRequestSent,
whisper.EventMailServerRequestCompleted: m.handleEventMailServerRequestCompleted,
whisper.EventMailServerRequestExpired: m.handleEventMailServerRequestExpired,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (m *MailRequestMonitor) handleRequestSent(event whisper.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.cache[event.Hash] = MailServerRequestSent
}
func (m *MailRequestMonitor) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.requestsRegistry.Unregister(event.Hash)
state, ok := m.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response received", "hash", event.Hash)
delete(m.cache, event.Hash)
if m.handler != nil {
if resp, ok := event.Data.(*whisper.MailServerResponse); ok {
m.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error)
}
}
}
func (m *MailRequestMonitor) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
m.requestsRegistry.Unregister(event.Hash)
state, ok := m.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response expired", "hash", event.Hash)
delete(m.cache, event.Hash)
if m.handler != nil {
m.handler.MailServerRequestExpired(event.Hash)
}
}

View File

@ -0,0 +1,85 @@
package shhext
import (
"errors"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
)
var (
testHash = common.Hash{0x01}
)
func TestMailRequestMonitorSuite(t *testing.T) {
suite.Run(t, new(MailRequestMonitorSuite))
}
type MailRequestMonitorSuite struct {
suite.Suite
monitor *MailRequestMonitor
}
func (s *MailRequestMonitorSuite) SetupTest() {
s.monitor = &MailRequestMonitor{
cache: map[common.Hash]EnvelopeState{},
requestsRegistry: NewRequestsRegistry(0),
}
}
func (s *MailRequestMonitorSuite) TestRequestCompleted() {
mock := newHandlerMock(1)
s.monitor.handler = mock
s.monitor.cache[testHash] = MailServerRequestSent
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{},
})
select {
case requestID := <-mock.requestsCompleted:
s.Equal(testHash, requestID)
s.NotContains(s.monitor.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be completed")
}
}
func (s *MailRequestMonitorSuite) TestRequestFailed() {
mock := newHandlerMock(1)
s.monitor.handler = mock
s.monitor.cache[testHash] = MailServerRequestSent
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{Error: errors.New("test error")},
})
select {
case requestID := <-mock.requestsFailed:
s.Equal(testHash, requestID)
s.NotContains(s.monitor.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be failed")
}
}
func (s *MailRequestMonitorSuite) TestRequestExpiration() {
mock := newHandlerMock(1)
s.monitor.handler = mock
s.monitor.cache[testHash] = MailServerRequestSent
s.monitor.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired,
Hash: testHash,
})
select {
case requestID := <-mock.requestsExpired:
s.Equal(testHash, requestID)
s.NotContains(s.monitor.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for request expiration")
}
}

View File

@ -43,7 +43,8 @@ type EnvelopeEventsHandler interface {
type Service struct {
w *whisper.Whisper
config params.ShhextConfig
tracker *tracker
envelopesMonitor *EnvelopesMonitor
mailMonitor *MailRequestMonitor
requestsRegistry *RequestsRegistry
server *p2p.Server
nodeID *ecdsa.PrivateKey
@ -72,19 +73,25 @@ func New(w *whisper.Whisper, handler EnvelopeEventsHandler, db *leveldb.DB, conf
delay = config.RequestsDelay
}
requestsRegistry := NewRequestsRegistry(delay)
track := &tracker{
mailMonitor := &MailRequestMonitor{
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
requestsRegistry: requestsRegistry,
}
envelopesMonitor := &EnvelopesMonitor{
w: w,
handler: handler,
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: ps,
mailServerConfirmation: config.MailServerConfirmations,
requestsRegistry: requestsRegistry,
}
return &Service{
w: w,
config: config,
tracker: track,
envelopesMonitor: envelopesMonitor,
mailMonitor: mailMonitor,
requestsRegistry: requestsRegistry,
deduplicator: dedup.NewDeduplicator(w, db),
debug: config.DebugAPIEnabled,
@ -267,7 +274,8 @@ func (s *Service) Start(server *p2p.Server) error {
s.lastUsedMonitor = mailservers.NewLastUsedConnectionMonitor(s.peerStore, s.cache, s.w)
s.lastUsedMonitor.Start()
}
s.tracker.Start()
s.envelopesMonitor.Start()
s.mailMonitor.Start()
s.nodeID = server.PrivateKey
s.server = server
return nil
@ -282,6 +290,7 @@ func (s *Service) Stop() error {
if s.config.EnableLastUsedMonitor {
s.lastUsedMonitor.Stop()
}
s.tracker.Stop()
s.envelopesMonitor.Stop()
s.mailMonitor.Stop()
return nil
}

View File

@ -126,7 +126,7 @@ func (s *ShhExtSuite) SetupTest() {
s.Require().NoError(stack.Start())
s.nodes[i] = stack
}
s.services[0].tracker.handler = newHandlerMock(1)
s.services[0].envelopesMonitor.handler = newHandlerMock(1)
}
func (s *ShhExtSuite) TestInitProtocol() {
@ -141,7 +141,7 @@ func (s *ShhExtSuite) TestInitProtocol() {
func (s *ShhExtSuite) TestPostMessageWithConfirmation() {
mock := newHandlerMock(1)
s.services[0].tracker.handler = mock
s.services[0].envelopesMonitor.handler = mock
s.Require().NoError(s.services[0].UpdateMailservers([]*enode.Node{s.nodes[1].Server().Self()}))
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
symID, err := s.whisper[0].GenerateSymKey()
@ -167,7 +167,7 @@ func (s *ShhExtSuite) TestPostMessageWithConfirmation() {
func (s *ShhExtSuite) TestWaitMessageExpired() {
mock := newHandlerMock(1)
s.services[0].tracker.handler = mock
s.services[0].envelopesMonitor.handler = mock
symID, err := s.whisper[0].GenerateSymKey()
s.NoError(err)
client, err := s.nodes[0].Attach()
@ -362,7 +362,7 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
})
s.Require().NoError(err)
s.Require().NotNil(hash)
s.Require().NoError(waitForHashInTracker(api.service.tracker, common.BytesToHash(hash), MailServerRequestSent, time.Second))
s.Require().NoError(waitForHashInMonitor(api.service.mailMonitor, common.BytesToHash(hash), MailServerRequestSent, time.Second))
// Send a request without a symmetric key. In this case,
// a public key extracted from MailServerPeer will be used.
hash, err = api.RequestMessages(context.TODO(), MessagesRequest{
@ -371,12 +371,12 @@ func (s *ShhExtSuite) TestRequestMessagesSuccess() {
})
s.Require().NoError(err)
s.Require().NotNil(hash)
s.Require().NoError(waitForHashInTracker(api.service.tracker, common.BytesToHash(hash), MailServerRequestSent, time.Second))
s.Require().NoError(waitForHashInMonitor(api.service.mailMonitor, common.BytesToHash(hash), MailServerRequestSent, time.Second))
}
func (s *ShhExtSuite) TestDebugPostSync() {
mock := newHandlerMock(1)
s.services[0].tracker.handler = mock
s.services[0].envelopesMonitor.handler = mock
symID, err := s.whisper[0].GenerateSymKey()
s.NoError(err)
s.nodes[0].Server().AddPeer(s.nodes[1].Server().Self())
@ -445,7 +445,7 @@ func (s *ShhExtSuite) TestDebugPostSync() {
func (s *ShhExtSuite) TestEnvelopeExpiredOnDebugPostSync() {
mock := newHandlerMock(1)
s.services[0].tracker.handler = mock
s.services[0].envelopesMonitor.handler = mock
symID, err := s.whisper[0].GenerateSymKey()
s.NoError(err)
client, err := s.nodes[0].Attach()
@ -472,7 +472,7 @@ func (s *ShhExtSuite) TearDown() {
}
}
func waitForHashInTracker(track *tracker, hash common.Hash, state EnvelopeState, deadline time.Duration) error {
func waitForHashInMonitor(mon *MailRequestMonitor, hash common.Hash, state EnvelopeState, deadline time.Duration) error {
after := time.After(deadline)
ticker := time.Tick(100 * time.Millisecond)
for {
@ -480,7 +480,7 @@ func waitForHashInTracker(track *tracker, hash common.Hash, state EnvelopeState,
case <-after:
return fmt.Errorf("failed while waiting for %s to get into state %d", hash, state)
case <-ticker:
if track.GetState(hash) == state {
if mon.GetState(hash) == state {
return nil
}
}
@ -495,8 +495,8 @@ type WhisperNodeMockSuite struct {
localNode *enode.Node
remoteRW *p2p.MsgPipeRW
localService *Service
localTracker *tracker
localService *Service
localEnvelopeMonitor *EnvelopesMonitor
}
func (s *WhisperNodeMockSuite) SetupTest() {
@ -522,8 +522,8 @@ func (s *WhisperNodeMockSuite) SetupTest() {
s.localService = New(w, nil, db, params.ShhextConfig{MailServerConfirmations: true})
s.Require().NoError(s.localService.UpdateMailservers([]*enode.Node{node}))
s.localTracker = s.localService.tracker
s.localTracker.Start()
s.localEnvelopeMonitor = s.localService.envelopesMonitor
s.localEnvelopeMonitor.Start()
s.localWhisperAPI = whisper.NewPublicWhisperAPI(w)
s.localAPI = NewPublicAPI(s.localService)
@ -605,7 +605,7 @@ func (s *WhisperConfirmationSuite) TestEnvelopeReceived() {
envHash := common.BytesToHash(envBytes)
s.Require().NoError(err)
s.Require().NoError(utils.Eventually(func() error {
if state := s.localTracker.GetState(envHash); state != EnvelopePosted {
if state := s.localEnvelopeMonitor.GetState(envHash); state != EnvelopePosted {
return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String())
}
return nil
@ -629,7 +629,7 @@ func (s *WhisperConfirmationSuite) TestEnvelopeReceived() {
// wait for message to be removed because it was delivered by remoteRW
s.Require().NoError(utils.Eventually(func() error {
if state := s.localTracker.GetState(envHash); state != NotRegistered {
if state := s.localEnvelopeMonitor.GetState(envHash); state != NotRegistered {
return fmt.Errorf("envelope with hash %s wasn't removed from tracker", envHash.String())
}
return nil

View File

@ -1,246 +0,0 @@
package shhext
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = -1
// EnvelopePosted is set when envelope was added to a local whisper queue.
EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to atleast one peer.
EnvelopeSent
// MailServerRequestSent is set when p2p request is sent to the mailserver
MailServerRequestSent
)
// tracker responsible for processing events for envelopes that we are interested in
// and calling specified handler.
type tracker struct {
w *whisper.Whisper
handler EnvelopeEventsHandler
mailServerConfirmation bool
mu sync.Mutex
cache map[common.Hash]EnvelopeState
batches map[common.Hash]map[common.Hash]struct{}
mailPeers *mailservers.PeerStore
requestsRegistry *RequestsRegistry
wg sync.WaitGroup
quit chan struct{}
}
// Start processing events.
func (t *tracker) Start() {
t.quit = make(chan struct{})
t.wg.Add(1)
go func() {
t.handleEnvelopeEvents()
t.wg.Done()
}()
}
// Stop process events.
func (t *tracker) Stop() {
close(t.quit)
t.wg.Wait()
}
// Add hash to a tracker.
func (t *tracker) Add(hash common.Hash) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[hash] = EnvelopePosted
}
func (t *tracker) GetState(hash common.Hash) EnvelopeState {
t.mu.Lock()
defer t.mu.Unlock()
state, exist := t.cache[hash]
if !exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes whisper envelope events
func (t *tracker) handleEnvelopeEvents() {
events := make(chan whisper.EnvelopeEvent, 100) // must be buffered to prevent blocking whisper
sub := t.w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
for {
select {
case <-t.quit:
return
case event := <-events:
t.handleEvent(event)
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventEnvelopeSent: t.handleEventEnvelopeSent,
whisper.EventEnvelopeReceived: t.handleEventEnvelopeReceived,
whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired,
whisper.EventBatchAcknowledged: t.handleAcknowledgedBatch,
whisper.EventMailServerRequestSent: t.handleRequestSent,
whisper.EventMailServerRequestCompleted: t.handleEventMailServerRequestCompleted,
whisper.EventMailServerRequestExpired: t.handleEventMailServerRequestExpired,
}
if handler, ok := handlers[event.Event]; ok {
handler(event)
}
}
func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
return
}
log.Debug("envelope is sent", "hash", event.Hash, "peer", event.Peer)
if event.Batch != (common.Hash{}) {
if _, ok := t.batches[event.Batch]; !ok {
t.batches[event.Batch] = map[common.Hash]struct{}{}
}
t.batches[event.Batch][event.Hash] = struct{}{}
log.Debug("waiting for a confirmation", "batch", event.Batch)
} else {
t.cache[event.Hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(event.Hash)
}
}
}
func (t *tracker) handleEventEnvelopeReceived(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != EnvelopePosted {
return
}
log.Debug("expected envelope received", "hash", event.Hash, "peer", event.Peer)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.EnvelopeSent(event.Hash)
}
}
func (t *tracker) isMailserver(peer enode.ID) bool {
return t.mailPeers.Exist(peer)
}
func (t *tracker) handleAcknowledgedBatch(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
envelopes, ok := t.batches[event.Batch]
if !ok {
log.Debug("batch is not found", "batch", event.Batch)
}
log.Debug("received a confirmation", "batch", event.Batch, "peer", event.Peer)
for hash := range envelopes {
state, ok := t.cache[hash]
if !ok || state == EnvelopeSent {
continue
}
t.cache[hash] = EnvelopeSent
if t.handler != nil {
t.handler.EnvelopeSent(hash)
}
}
delete(t.batches, event.Batch)
}
func (t *tracker) handleEventEnvelopeExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
if state, ok := t.cache[event.Hash]; ok {
delete(t.cache, event.Hash)
if state == EnvelopeSent {
return
}
log.Debug("envelope expired", "hash", event.Hash, "state", state)
if t.handler != nil {
t.handler.EnvelopeExpired(event.Hash)
}
}
}
func (t *tracker) handleRequestSent(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
t.cache[event.Hash] = MailServerRequestSent
}
func (t *tracker) handleEventMailServerRequestCompleted(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
t.requestsRegistry.Unregister(event.Hash)
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response received", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
if resp, ok := event.Data.(*whisper.MailServerResponse); ok {
t.handler.MailServerRequestCompleted(event.Hash, resp.LastEnvelopeHash, resp.Cursor, resp.Error)
}
}
}
func (t *tracker) handleEventMailServerRequestExpired(event whisper.EnvelopeEvent) {
t.mu.Lock()
defer t.mu.Unlock()
t.requestsRegistry.Unregister(event.Hash)
state, ok := t.cache[event.Hash]
if !ok || state != MailServerRequestSent {
return
}
log.Debug("mailserver response expired", "hash", event.Hash)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.MailServerRequestExpired(event.Hash)
}
}

View File

@ -1,169 +0,0 @@
package shhext
import (
"errors"
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/services/shhext/mailservers"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/storage"
)
var (
testHash = common.Hash{0x01}
)
func TestTrackerSuite(t *testing.T) {
suite.Run(t, new(TrackerSuite))
}
type TrackerSuite struct {
suite.Suite
tracker *tracker
}
func (s *TrackerSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
s.tracker = &tracker{
cache: map[common.Hash]EnvelopeState{},
batches: map[common.Hash]map[common.Hash]struct{}{},
mailPeers: mailservers.NewPeerStore(mailservers.NewCache(db)),
requestsRegistry: NewRequestsRegistry(0),
}
}
func (s *TrackerSuite) TestConfirmed() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestConfirmedWithAcknowledge() {
testBatch := common.Hash{1}
pkey, err := crypto.GenerateKey()
s.Require().NoError(err)
node := enode.NewV4(&pkey.PublicKey, nil, 0, 0)
s.Require().NoError(s.tracker.mailPeers.Update([]*enode.Node{node}))
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Batch: testBatch,
})
s.Equal(EnvelopePosted, s.tracker.cache[testHash])
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventBatchAcknowledged,
Batch: testBatch,
Peer: node.ID(),
})
s.Contains(s.tracker.cache, testHash)
s.Equal(EnvelopeSent, s.tracker.cache[testHash])
}
func (s *TrackerSuite) TestIgnored() {
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRemoved() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeExpired,
Hash: testHash,
})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestReceived() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeReceived,
Hash: testHash})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRequestCompleted() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.cache[testHash] = MailServerRequestSent
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{},
})
select {
case requestID := <-mock.requestsCompleted:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be completed")
}
}
func (s *TrackerSuite) TestRequestFailed() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.cache[testHash] = MailServerRequestSent
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestCompleted,
Hash: testHash,
Data: &whisper.MailServerResponse{Error: errors.New("test error")},
})
select {
case requestID := <-mock.requestsFailed:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for a request to be failed")
}
}
func (s *TrackerSuite) TestRequestExpiration() {
mock := newHandlerMock(1)
s.tracker.handler = mock
s.tracker.cache[testHash] = MailServerRequestSent
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventMailServerRequestExpired,
Hash: testHash,
})
select {
case requestID := <-mock.requestsExpired:
s.Equal(testHash, requestID)
s.NotContains(s.tracker.cache, testHash)
case <-time.After(10 * time.Second):
s.Fail("timed out while waiting for request expiration")
}
}
func (s *TrackerSuite) TestIgnoreNotFromMailserver() {
// enables filter in the tracker to drop confirmations from non-mailserver peers
s.tracker.mailServerConfirmation = true
s.tracker.Add(testHash)
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeSent,
Hash: testHash,
Peer: enode.ID{1}, // could be empty, doesn't impact test behaviour
})
s.Require().Equal(EnvelopePosted, s.tracker.GetState(testHash))
}