Send events when mail request is sent or expired

This commit is contained in:
Dmitry 2018-11-27 13:03:05 +02:00 committed by Dmitry Shulyak
parent 76c2447643
commit 96d2199ed5
3 changed files with 73 additions and 2 deletions

View File

@ -17,9 +17,13 @@ const (
EventBatchAcknowledged EventType = "batch.acknowleged"
// EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestSent fires when such request is sent.
EventMailServerRequestSent EventType = "mailserver.request.sent"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
EventMailServerRequestCompleted EventType = "mailserver.request.completed"
// EventMailServerRequestExpired fires after mailserver the request TTL ends
// EventMailServerRequestExpired fires after mailserver the request TTL ends.
// This event is independent and concurrent to EventMailServerRequestCompleted.
// Request should be considered as expired only if expiry event was received first.
EventMailServerRequestExpired EventType = "mailserver.request.expired"
// EventMailServerEnvelopeArchived fires after an envelope has been archived
EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"

View File

@ -407,12 +407,40 @@ func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error {
// which are not supposed to be forwarded any further.
// The whisper protocol is agnostic of the format and contents of envelope.
func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error {
return whisper.RequestHistoricMessagesWithTimeout(peerID, envelope, 0)
}
func (whisper *Whisper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *Envelope, timeout time.Duration) error {
p, err := whisper.getPeer(peerID)
if err != nil {
return err
}
whisper.envelopeFeed.Send(EnvelopeEvent{
Peer: p.peer.ID(),
Hash: envelope.Hash(),
Event: EventMailServerRequestSent,
})
p.trusted = true
return p2p.Send(p.ws, p2pRequestCode, envelope)
err = p2p.Send(p.ws, p2pRequestCode, envelope)
if timeout != 0 {
go whisper.expireRequestHistoricMessages(p.peer.ID(), envelope.Hash(), timeout)
}
return err
}
func (whisper *Whisper) expireRequestHistoricMessages(peer enode.ID, hash common.Hash, timeout time.Duration) {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-whisper.quit:
return
case <-timer.C:
whisper.envelopeFeed.Send(EnvelopeEvent{
Peer: peer,
Hash: hash,
Event: EventMailServerRequestExpired,
})
}
}
func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error {

View File

@ -1202,3 +1202,42 @@ func TestEventsWithoutConfirmation(t *testing.T) {
require.FailNow(t, "timed out waiting for an envelope.sent event")
}
}
func discardPipe() *p2p.MsgPipeRW {
rw1, rw2 := p2p.MsgPipe()
go func() {
for {
msg, err := rw1.ReadMsg()
if err != nil {
return
}
msg.Discard()
}
}()
return rw2
}
func TestRequestSentEventWithExpiry(t *testing.T) {
w := New(nil)
p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}})
rw := discardPipe()
defer rw.Close()
w.peers[newPeer(w, p, rw)] = struct{}{}
events := make(chan EnvelopeEvent, 1)
sub := w.SubscribeEnvelopeEvents(events)
defer sub.Unsubscribe()
e := &Envelope{Nonce: 1}
require.NoError(t, w.RequestHistoricMessagesWithTimeout(p.ID().Bytes(), e, time.Millisecond))
verifyEvent := func(etype EventType) {
select {
case <-time.After(time.Second):
require.FailNow(t, "error waiting for a event type %s", etype)
case ev := <-events:
require.Equal(t, etype, ev.Event)
require.Equal(t, p.ID(), ev.Peer)
require.Equal(t, e.Hash(), ev.Hash)
}
}
verifyEvent(EventMailServerRequestSent)
verifyEvent(EventMailServerRequestExpired)
}