Mark envelope as confirmed when it was received from a mail server (#1378)

This commit is contained in:
Dmitry Shulyak 2019-02-20 19:10:59 +02:00 committed by Igor Mandrigin
parent 381a1941a0
commit 224bc7857c
8 changed files with 117 additions and 20 deletions

6
Gopkg.lock generated
View File

@ -822,12 +822,12 @@
version = "v1.1.0"
[[projects]]
digest = "1:2c5092efed72e4c33a9d5f2ca6970609ed959a07b08a6b85fe6e7b70df3ed210"
digest = "1:684e59281a3fd4a35437992b008f43f98a0cf5b25cde717397325b10f94ea69c"
name = "github.com/status-im/whisper"
packages = ["whisperv6"]
pruneopts = "NUT"
revision = "9cdf6385f8a9d8a1cc5ab90dcbbc8cd7efca1e86"
version = "v1.4.6"
revision = "88b3fdf3bd4b497ee3312b4c7b471cce167f40aa"
version = "v1.4.8"
[[projects]]
digest = "1:572c783a763db6383aca3179976eb80e4c900f52eba56cba8bb2e3cea7ce720e"

View File

@ -46,7 +46,7 @@
[[constraint]]
name = "github.com/status-im/whisper"
version = "=v1.4.6"
version = "=v1.4.8"
[[constraint]]
name = "golang.org/x/text"

View File

@ -18,6 +18,7 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/status-im/status-go/params"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
whisper "github.com/status-im/whisper/whisperv6"
"github.com/stretchr/testify/suite"
"github.com/syndtr/goleveldb/leveldb"
@ -28,6 +29,7 @@ import (
const (
// internal whisper protocol codes
statusCode = 0
messagesCode = 1
p2pRequestCompleteCode = 125
)
@ -485,19 +487,19 @@ func waitForHashInTracker(track *tracker, hash common.Hash, state EnvelopeState,
}
}
func TestRequestMessagesSync(t *testing.T) {
suite.Run(t, new(RequestMessagesSyncSuite))
}
type RequestMessagesSyncSuite struct {
type WhisperNodeMockSuite struct {
suite.Suite
localAPI *PublicAPI
localNode *enode.Node
remoteRW *p2p.MsgPipeRW
localWhisperAPI *whisper.PublicWhisperAPI
localAPI *PublicAPI
localNode *enode.Node
remoteRW *p2p.MsgPipeRW
localService *Service
localTracker *tracker
}
func (s *RequestMessagesSyncSuite) SetupTest() {
func (s *WhisperNodeMockSuite) SetupTest() {
db, err := leveldb.Open(storage.NewMemStorage(), nil)
s.Require().NoError(err)
conf := &whisper.Config{
@ -518,13 +520,25 @@ func (s *RequestMessagesSyncSuite) SetupTest() {
s.Require().NoError(p2p.ExpectMsg(rw1, statusCode, []interface{}{whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), false, true}))
s.Require().NoError(p2p.SendItems(rw1, statusCode, whisper.ProtocolVersion, whisper.ProtocolVersion, math.Float64bits(w.MinPow()), w.BloomFilter(), true, true))
service := New(w, nil, db, params.ShhextConfig{})
s.localService = New(w, nil, db, params.ShhextConfig{MailServerConfirmations: true})
s.Require().NoError(s.localService.UpdateMailservers([]*enode.Node{node}))
s.localTracker = s.localService.tracker
s.localTracker.Start()
s.localAPI = NewPublicAPI(service)
s.localWhisperAPI = whisper.NewPublicWhisperAPI(w)
s.localAPI = NewPublicAPI(s.localService)
s.localNode = node
s.remoteRW = rw1
}
func TestRequestMessagesSync(t *testing.T) {
suite.Run(t, new(RequestMessagesSyncSuite))
}
type RequestMessagesSyncSuite struct {
WhisperNodeMockSuite
}
func (s *RequestMessagesSyncSuite) TestExpired() {
// intentionally discarding all requests, so that request will timeout
go func() {
@ -571,3 +585,53 @@ func (s *RequestMessagesSyncSuite) TestCompletedFromFirstAttempt() {
func (s *RequestMessagesSyncSuite) TestCompletedFromSecondAttempt() {
s.testCompletedFromAttempt(2)
}
func TestWhisperConfirmations(t *testing.T) {
suite.Run(t, new(WhisperConfirmationSuite))
}
type WhisperConfirmationSuite struct {
WhisperNodeMockSuite
}
func (s *WhisperConfirmationSuite) TestEnvelopeReceived() {
symID, err := s.localWhisperAPI.GenerateSymKeyFromPassword(context.TODO(), "test")
s.Require().NoError(err)
envBytes, err := s.localAPI.Post(context.TODO(), whisper.NewMessage{
SymKeyID: symID,
TTL: 1000,
Topic: whisper.TopicType{0x01},
})
envHash := common.BytesToHash(envBytes)
s.Require().NoError(err)
s.Require().NoError(utils.Eventually(func() error {
if state := s.localTracker.GetState(envHash); state != EnvelopePosted {
return fmt.Errorf("envelope with hash %s wasn't posted", envHash.String())
}
return nil
}, 2*time.Second, 100*time.Millisecond))
// enable auto-replies once message got registered internally
go func() {
for {
msg, err := s.remoteRW.ReadMsg()
s.Require().NoError(err)
if msg.Code != messagesCode {
s.Require().NoError(msg.Discard())
continue
}
// reply with same envelopes. we could probably just write same data to remoteRW, but this works too.
var envs []*whisper.Envelope
s.Require().NoError(msg.Decode(&envs))
s.Require().NoError(p2p.Send(s.remoteRW, messagesCode, envs))
}
}()
// wait for message to be removed because it was delivered by remoteRW
s.Require().NoError(utils.Eventually(func() error {
if state := s.localTracker.GetState(envHash); state != NotRegistered {
return fmt.Errorf("envelope with hash %s wasn't removed from tracker", envHash.String())
}
return nil
}, 2*time.Second, 100*time.Millisecond))
}

View File

@ -96,6 +96,7 @@ func (t *tracker) handleEnvelopeEvents() {
func (t *tracker) handleEvent(event whisper.EnvelopeEvent) {
handlers := map[whisper.EventType]func(whisper.EnvelopeEvent){
whisper.EventEnvelopeSent: t.handleEventEnvelopeSent,
whisper.EventEnvelopeReceived: t.handleEventEnvelopeReceived,
whisper.EventEnvelopeExpired: t.handleEventEnvelopeExpired,
whisper.EventBatchAcknowledged: t.handleAcknowledgedBatch,
whisper.EventMailServerRequestSent: t.handleRequestSent,
@ -139,6 +140,25 @@ func (t *tracker) handleEventEnvelopeSent(event whisper.EnvelopeEvent) {
}
}
func (t *tracker) handleEventEnvelopeReceived(event whisper.EnvelopeEvent) {
if t.mailServerConfirmation {
if !t.isMailserver(event.Peer) {
return
}
}
t.mu.Lock()
defer t.mu.Unlock()
state, ok := t.cache[event.Hash]
if !ok || state != EnvelopePosted {
return
}
log.Debug("expected envelope received", "hash", event.Hash, "peer", event.Peer)
delete(t.cache, event.Hash)
if t.handler != nil {
t.handler.EnvelopeSent(event.Hash)
}
}
func (t *tracker) isMailserver(peer enode.ID) bool {
return t.mailPeers.Exist(peer)
}

View File

@ -94,6 +94,15 @@ func (s *TrackerSuite) TestRemoved() {
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestReceived() {
s.tracker.Add(testHash)
s.Contains(s.tracker.cache, testHash)
s.tracker.handleEvent(whisper.EnvelopeEvent{
Event: whisper.EventEnvelopeReceived,
Hash: testHash})
s.NotContains(s.tracker.cache, testHash)
}
func (s *TrackerSuite) TestRequestCompleted() {
mock := newHandlerMock(1)
s.tracker.handler = mock

View File

@ -13,6 +13,10 @@ const (
EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired"
// EventEnvelopeReceived is sent once envelope was received from a peer.
// EventEnvelopeReceived must be sent to the feed even if envelope was previously in the cache.
// And event, ideally, should contain information about peer that sent envelope to us.
EventEnvelopeReceived EventType = "envelope.received"
// EventBatchAcknowledged is sent when batch of envelopes was acknowleged by a peer.
EventBatchAcknowledged EventType = "batch.acknowleged"
// EventEnvelopeAvailable fires when envelop is available for filters

View File

@ -205,10 +205,6 @@ func (peer *Peer) expire() {
// broadcast iterates over the collection of envelopes and transmits yet unknown
// ones over the network.
func (peer *Peer) broadcast() error {
if peer.peer.IsFlaky() {
log.Trace("Waiting for a peer to restore communication", "ID", peer.peer.ID())
return nil
}
envelopes := peer.host.Envelopes()
bundle := make([]*Envelope, 0, len(envelopes))
for _, envelope := range envelopes {

View File

@ -876,7 +876,6 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
log.Warn("failed to decode envelopes, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid envelopes")
}
trouble := false
for _, env := range envelopes {
cached, err := whisper.add(env, whisper.LightClientMode())
@ -884,6 +883,11 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
trouble = true
log.Error("bad envelope received, peer will be disconnected", "peer", p.peer.ID(), "err", err)
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Event: EventEnvelopeReceived,
Hash: env.Hash(),
Peer: p.peer.ID(),
})
if cached {
p.mark(env)
}