From 4fae75da94b1ab6dc13a5fa7d5087bfbfa04406f Mon Sep 17 00:00:00 2001 From: Dmitry Shulyak Date: Fri, 19 Apr 2019 08:14:44 +0300 Subject: [PATCH] Post request.completed only after envelopes from that request were added to a filter (#27) * Post completion event after every message was added to a filter * Add test that request.completed received after all envelope.avaiable * Process all p2p envelopes and completion events sequantially * Stop whisper after test --- whisperv6/whisper.go | 51 ++++++++++++++++++++++++++------------- whisperv6/whisper_test.go | 41 +++++++++++++++++++++++++++++++ 2 files changed, 75 insertions(+), 17 deletions(-) diff --git a/whisperv6/whisper.go b/whisperv6/whisper.go index f0e6b53..15cef7d 100644 --- a/whisperv6/whisper.go +++ b/whisperv6/whisper.go @@ -90,9 +90,9 @@ type Whisper struct { peerMu sync.RWMutex // Mutex to sync the active peer set peers map[*Peer]struct{} // Set of currently active peers - messageQueue chan *Envelope // Message queue for normal whisper messages - p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further) - quit chan struct{} // Channel used for graceful exit + messageQueue chan *Envelope // Message queue for normal whisper messages + p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations. + quit chan struct{} // Channel used for graceful exit settings syncmap.Map // holds configuration settings that can be dynamically changed @@ -125,7 +125,7 @@ func New(cfg *Config) *Whisper { expirations: make(map[uint32]mapset.Set), peers: make(map[*Peer]struct{}), messageQueue: make(chan *Envelope, messageQueueLimit), - p2pMsgQueue: make(chan *Envelope, messageQueueLimit), + p2pMsgQueue: make(chan interface{}, messageQueueLimit), quit: make(chan struct{}), syncAllowance: DefaultSyncAllowance, timeSource: time.Now, @@ -815,6 +815,7 @@ func (whisper *Whisper) Start(*p2p.Server) error { for i := 0; i < numCPU; i++ { go whisper.processQueue() } + go whisper.processP2P() return nil } @@ -1005,7 +1006,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { if err = packet.Decode(&envelopes); err == nil { for _, envelope := range envelopes { - whisper.postEvent(envelope, true) + whisper.postP2P(envelope) } continue } @@ -1019,7 +1020,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } if err = packet.Decode(&envelope); err == nil { - whisper.postEvent(envelope, true) + whisper.postP2P(envelope) continue } @@ -1103,7 +1104,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } if event != nil { - whisper.envelopeFeed.Send(*event) + whisper.postP2P(*event) } } @@ -1208,14 +1209,19 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { return true, nil } +func (whisper *Whisper) postP2P(event interface{}) { + whisper.p2pMsgQueue <- event +} + // postEvent queues the message for further processing. func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) { if isP2P { - whisper.p2pMsgQueue <- envelope + whisper.postP2P(envelope) } else { whisper.checkOverflow() whisper.messageQueue <- envelope } + } // checkOverflow checks if message queue overflow occurs and reports it if necessary. @@ -1237,25 +1243,36 @@ func (whisper *Whisper) checkOverflow() { // processQueue delivers the messages to the watchers during the lifetime of the whisper node. func (whisper *Whisper) processQueue() { - var e *Envelope for { select { case <-whisper.quit: return - - case e = <-whisper.messageQueue: + case e := <-whisper.messageQueue: whisper.filters.NotifyWatchers(e, false) whisper.envelopeFeed.Send(EnvelopeEvent{ Hash: e.Hash(), Event: EventEnvelopeAvailable, }) + } + } +} - case e = <-whisper.p2pMsgQueue: - whisper.filters.NotifyWatchers(e, true) - whisper.envelopeFeed.Send(EnvelopeEvent{ - Hash: e.Hash(), - Event: EventEnvelopeAvailable, - }) +func (whisper *Whisper) processP2P() { + for { + select { + case <-whisper.quit: + return + case e := <-whisper.p2pMsgQueue: + switch event := e.(type) { + case *Envelope: + whisper.filters.NotifyWatchers(event, true) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: event.Hash(), + Event: EventEnvelopeAvailable, + }) + case EnvelopeEvent: + whisper.envelopeFeed.Send(event) + } } } } diff --git a/whisperv6/whisper_test.go b/whisperv6/whisper_test.go index e01aafa..d0c6a44 100644 --- a/whisperv6/whisper_test.go +++ b/whisperv6/whisper_test.go @@ -1527,3 +1527,44 @@ func (m *mockMailServer) SyncMail(p *Peer, r SyncMailRequest) error { args := m.Called(p, r) return args.Error(0) } + +func TestMailserverCompletionEvent(t *testing.T) { + w := New(nil) + require.NoError(t, w.Start(nil)) + defer w.Stop() + + rw1, rw2 := p2p.MsgPipe() + peer := newPeer(w, p2p.NewPeer(enode.ID{1}, "1", nil), rw1) + peer.trusted = true + w.peers[peer] = struct{}{} + + events := make(chan EnvelopeEvent) + sub := w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + + envelopes := []*Envelope{{Data: []byte{1}}, {Data: []byte{2}}} + go func() { + require.NoError(t, p2p.Send(rw2, p2pMessageCode, envelopes)) + require.NoError(t, p2p.Send(rw2, p2pRequestCompleteCode, [100]byte{})) // 2 hashes + cursor size + rw2.Close() + }() + require.EqualError(t, w.runMessageLoop(peer, rw1), "p2p: read or write on closed message pipe") + + after := time.After(2 * time.Second) + count := 0 + for { + select { + case <-after: + require.FailNow(t, "timed out waiting for all events") + case ev := <-events: + switch ev.Event { + case EventEnvelopeAvailable: + count++ + case EventMailServerRequestCompleted: + require.Equal(t, count, len(envelopes), + "all envelope.avaiable events mut be recevied before request is compelted") + return + } + } + } +}