Post request.completed only after envelopes from that request were added to a filter (#27)

* Post completion event after every message was added to a filter

* Add test that request.completed received after all envelope.avaiable

* Process all p2p envelopes and completion events sequantially

* Stop whisper after test
This commit is contained in:
Dmitry Shulyak 2019-04-19 08:14:44 +03:00 committed by GitHub
parent efca6fbb0c
commit 4fae75da94
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 75 additions and 17 deletions

View File

@ -91,7 +91,7 @@ type Whisper struct {
peers map[*Peer]struct{} // Set of currently active peers peers map[*Peer]struct{} // Set of currently active peers
messageQueue chan *Envelope // Message queue for normal whisper messages messageQueue chan *Envelope // Message queue for normal whisper messages
p2pMsgQueue chan *Envelope // Message queue for peer-to-peer messages (not to be forwarded any further) p2pMsgQueue chan interface{} // Message queue for peer-to-peer messages (not to be forwarded any further) and history delivery confirmations.
quit chan struct{} // Channel used for graceful exit quit chan struct{} // Channel used for graceful exit
settings syncmap.Map // holds configuration settings that can be dynamically changed settings syncmap.Map // holds configuration settings that can be dynamically changed
@ -125,7 +125,7 @@ func New(cfg *Config) *Whisper {
expirations: make(map[uint32]mapset.Set), expirations: make(map[uint32]mapset.Set),
peers: make(map[*Peer]struct{}), peers: make(map[*Peer]struct{}),
messageQueue: make(chan *Envelope, messageQueueLimit), messageQueue: make(chan *Envelope, messageQueueLimit),
p2pMsgQueue: make(chan *Envelope, messageQueueLimit), p2pMsgQueue: make(chan interface{}, messageQueueLimit),
quit: make(chan struct{}), quit: make(chan struct{}),
syncAllowance: DefaultSyncAllowance, syncAllowance: DefaultSyncAllowance,
timeSource: time.Now, timeSource: time.Now,
@ -815,6 +815,7 @@ func (whisper *Whisper) Start(*p2p.Server) error {
for i := 0; i < numCPU; i++ { for i := 0; i < numCPU; i++ {
go whisper.processQueue() go whisper.processQueue()
} }
go whisper.processP2P()
return nil return nil
} }
@ -1005,7 +1006,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
if err = packet.Decode(&envelopes); err == nil { if err = packet.Decode(&envelopes); err == nil {
for _, envelope := range envelopes { for _, envelope := range envelopes {
whisper.postEvent(envelope, true) whisper.postP2P(envelope)
} }
continue continue
} }
@ -1019,7 +1020,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
} }
if err = packet.Decode(&envelope); err == nil { if err = packet.Decode(&envelope); err == nil {
whisper.postEvent(envelope, true) whisper.postP2P(envelope)
continue continue
} }
@ -1103,7 +1104,7 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
} }
if event != nil { if event != nil {
whisper.envelopeFeed.Send(*event) whisper.postP2P(*event)
} }
} }
@ -1208,14 +1209,19 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
return true, nil return true, nil
} }
func (whisper *Whisper) postP2P(event interface{}) {
whisper.p2pMsgQueue <- event
}
// postEvent queues the message for further processing. // postEvent queues the message for further processing.
func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) { func (whisper *Whisper) postEvent(envelope *Envelope, isP2P bool) {
if isP2P { if isP2P {
whisper.p2pMsgQueue <- envelope whisper.postP2P(envelope)
} else { } else {
whisper.checkOverflow() whisper.checkOverflow()
whisper.messageQueue <- envelope whisper.messageQueue <- envelope
} }
} }
// checkOverflow checks if message queue overflow occurs and reports it if necessary. // checkOverflow checks if message queue overflow occurs and reports it if necessary.
@ -1237,25 +1243,36 @@ func (whisper *Whisper) checkOverflow() {
// processQueue delivers the messages to the watchers during the lifetime of the whisper node. // processQueue delivers the messages to the watchers during the lifetime of the whisper node.
func (whisper *Whisper) processQueue() { func (whisper *Whisper) processQueue() {
var e *Envelope
for { for {
select { select {
case <-whisper.quit: case <-whisper.quit:
return return
case e := <-whisper.messageQueue:
case e = <-whisper.messageQueue:
whisper.filters.NotifyWatchers(e, false) whisper.filters.NotifyWatchers(e, false)
whisper.envelopeFeed.Send(EnvelopeEvent{ whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(), Hash: e.Hash(),
Event: EventEnvelopeAvailable, Event: EventEnvelopeAvailable,
}) })
}
}
}
case e = <-whisper.p2pMsgQueue: func (whisper *Whisper) processP2P() {
whisper.filters.NotifyWatchers(e, true) for {
select {
case <-whisper.quit:
return
case e := <-whisper.p2pMsgQueue:
switch event := e.(type) {
case *Envelope:
whisper.filters.NotifyWatchers(event, true)
whisper.envelopeFeed.Send(EnvelopeEvent{ whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(), Hash: event.Hash(),
Event: EventEnvelopeAvailable, Event: EventEnvelopeAvailable,
}) })
case EnvelopeEvent:
whisper.envelopeFeed.Send(event)
}
} }
} }
} }

View File

@ -1527,3 +1527,44 @@ func (m *mockMailServer) SyncMail(p *Peer, r SyncMailRequest) error {
args := m.Called(p, r) args := m.Called(p, r)
return args.Error(0) return args.Error(0)
} }
func TestMailserverCompletionEvent(t *testing.T) {
w := New(nil)
require.NoError(t, w.Start(nil))
defer w.Stop()
rw1, rw2 := p2p.MsgPipe()
peer := newPeer(w, p2p.NewPeer(enode.ID{1}, "1", nil), rw1)
peer.trusted = true
w.peers[peer] = struct{}{}
events := make(chan EnvelopeEvent)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
envelopes := []*Envelope{{Data: []byte{1}}, {Data: []byte{2}}}
go func() {
require.NoError(t, p2p.Send(rw2, p2pMessageCode, envelopes))
require.NoError(t, p2p.Send(rw2, p2pRequestCompleteCode, [100]byte{})) // 2 hashes + cursor size
rw2.Close()
}()
require.EqualError(t, w.runMessageLoop(peer, rw1), "p2p: read or write on closed message pipe")
after := time.After(2 * time.Second)
count := 0
for {
select {
case <-after:
require.FailNow(t, "timed out waiting for all events")
case ev := <-events:
switch ev.Event {
case EventEnvelopeAvailable:
count++
case EventMailServerRequestCompleted:
require.Equal(t, count, len(envelopes),
"all envelope.avaiable events mut be recevied before request is compelted")
return
}
}
}
}