diff --git a/whisperv6/events.go b/whisperv6/events.go index bfbce00..2faabe1 100644 --- a/whisperv6/events.go +++ b/whisperv6/events.go @@ -17,9 +17,13 @@ const ( EventBatchAcknowledged EventType = "batch.acknowleged" // EventEnvelopeAvailable fires when envelop is available for filters EventEnvelopeAvailable EventType = "envelope.available" + // EventMailServerRequestSent fires when such request is sent. + EventMailServerRequestSent EventType = "mailserver.request.sent" // EventMailServerRequestCompleted fires after mailserver sends all the requested messages EventMailServerRequestCompleted EventType = "mailserver.request.completed" - // EventMailServerRequestExpired fires after mailserver the request TTL ends + // EventMailServerRequestExpired fires after mailserver the request TTL ends. + // This event is independent and concurrent to EventMailServerRequestCompleted. + // Request should be considered as expired only if expiry event was received first. EventMailServerRequestExpired EventType = "mailserver.request.expired" // EventMailServerEnvelopeArchived fires after an envelope has been archived EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived" diff --git a/whisperv6/whisper.go b/whisperv6/whisper.go index cbe44c4..c8bed75 100644 --- a/whisperv6/whisper.go +++ b/whisperv6/whisper.go @@ -407,12 +407,40 @@ func (whisper *Whisper) AllowP2PMessagesFromPeer(peerID []byte) error { // which are not supposed to be forwarded any further. // The whisper protocol is agnostic of the format and contents of envelope. func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelope) error { + return whisper.RequestHistoricMessagesWithTimeout(peerID, envelope, 0) +} + +func (whisper *Whisper) RequestHistoricMessagesWithTimeout(peerID []byte, envelope *Envelope, timeout time.Duration) error { p, err := whisper.getPeer(peerID) if err != nil { return err } + whisper.envelopeFeed.Send(EnvelopeEvent{ + Peer: p.peer.ID(), + Hash: envelope.Hash(), + Event: EventMailServerRequestSent, + }) p.trusted = true - return p2p.Send(p.ws, p2pRequestCode, envelope) + err = p2p.Send(p.ws, p2pRequestCode, envelope) + if timeout != 0 { + go whisper.expireRequestHistoricMessages(p.peer.ID(), envelope.Hash(), timeout) + } + return err +} + +func (whisper *Whisper) expireRequestHistoricMessages(peer enode.ID, hash common.Hash, timeout time.Duration) { + timer := time.NewTimer(timeout) + defer timer.Stop() + select { + case <-whisper.quit: + return + case <-timer.C: + whisper.envelopeFeed.Send(EnvelopeEvent{ + Peer: peer, + Hash: hash, + Event: EventMailServerRequestExpired, + }) + } } func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error { diff --git a/whisperv6/whisper_test.go b/whisperv6/whisper_test.go index 52c2ccb..fe3a5f0 100644 --- a/whisperv6/whisper_test.go +++ b/whisperv6/whisper_test.go @@ -1202,3 +1202,42 @@ func TestEventsWithoutConfirmation(t *testing.T) { require.FailNow(t, "timed out waiting for an envelope.sent event") } } + +func discardPipe() *p2p.MsgPipeRW { + rw1, rw2 := p2p.MsgPipe() + go func() { + for { + msg, err := rw1.ReadMsg() + if err != nil { + return + } + msg.Discard() + } + }() + return rw2 +} + +func TestRequestSentEventWithExpiry(t *testing.T) { + w := New(nil) + p := p2p.NewPeer(enode.ID{1}, "1", []p2p.Cap{{"shh", 6}}) + rw := discardPipe() + defer rw.Close() + w.peers[newPeer(w, p, rw)] = struct{}{} + events := make(chan EnvelopeEvent, 1) + sub := w.SubscribeEnvelopeEvents(events) + defer sub.Unsubscribe() + e := &Envelope{Nonce: 1} + require.NoError(t, w.RequestHistoricMessagesWithTimeout(p.ID().Bytes(), e, time.Millisecond)) + verifyEvent := func(etype EventType) { + select { + case <-time.After(time.Second): + require.FailNow(t, "error waiting for a event type %s", etype) + case ev := <-events: + require.Equal(t, etype, ev.Event) + require.Equal(t, p.ID(), ev.Peer) + require.Equal(t, e.Hash(), ev.Hash) + } + } + verifyEvent(EventMailServerRequestSent) + verifyEvent(EventMailServerRequestExpired) +}