diff --git a/whisper/whisperv6/events.go b/whisper/whisperv6/events.go index 1665539..fe7570e 100644 --- a/whisper/whisperv6/events.go +++ b/whisper/whisperv6/events.go @@ -13,10 +13,14 @@ const ( EventEnvelopeSent EventType = "envelope.sent" // EventEnvelopeExpired fires when envelop expired EventEnvelopeExpired EventType = "envelope.expired" + // EventEnvelopeAvailable fires when envelop is available for filters + EventEnvelopeAvailable EventType = "envelope.available" // EventMailServerRequestCompleted fires after mailserver sends all the requested messages EventMailServerRequestCompleted EventType = "mailserver.request.completed" // EventMailServerRequestExpired fires after mailserver the request TTL ends EventMailServerRequestExpired EventType = "mailserver.request.expired" + // EventMailServerEnvelopeArchived fires after an envelope has been archived + EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived" ) // EnvelopeEvent used for envelopes events. @@ -24,4 +28,5 @@ type EnvelopeEvent struct { Event EventType Hash common.Hash Peer discover.NodeID + Data interface{} } diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go index 91d4482..6a937a2 100644 --- a/whisper/whisperv6/whisper.go +++ b/whisper/whisperv6/whisper.go @@ -49,6 +49,12 @@ type Statistics struct { totalMessagesCleared int } +// MailServerResponse is the response payload sent by the mailserver +type MailServerResponse struct { + LastEnvelopeHash common.Hash + Cursor []byte +} + const ( maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node overflowIdx // Indicator of message queue overflow @@ -378,8 +384,8 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop return p2p.Send(p.ws, p2pRequestCode, envelope) } -func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error { - size, r, err := rlp.EncodeToReader(requestID) +func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error { + size, r, err := rlp.EncodeToReader(payload) if err != nil { return err } @@ -835,15 +841,49 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error { } case p2pRequestCompleteCode: if p.trusted { - var requestID common.Hash - if err := packet.Decode(&requestID); err != nil { + var payload []byte + if err := packet.Decode(&payload); err != nil { log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err) return errors.New("invalid request response message") } + // check if payload is + // - requestID or + // - requestID + lastEnvelopeHash or + // - requestID + lastEnvelopeHash + cursor + // requestID is the hash of the request envelope. + // lastEnvelopeHash is the last envelope sent by the mail server + // cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash. + // length := len(payload) + + if len(payload) < common.HashLength || len(payload) > common.HashLength*3+4 { + log.Warn("invalid response message, peer will be disconnected", "peer", p.peer.ID(), "err", err, "payload size", len(payload)) + return errors.New("invalid response size") + } + + var ( + requestID common.Hash + lastEnvelopeHash common.Hash + cursor []byte + ) + + requestID = common.BytesToHash(payload[:common.HashLength]) + + if len(payload) >= common.HashLength*2 { + lastEnvelopeHash = common.BytesToHash(payload[common.HashLength : common.HashLength*2]) + } + + if len(payload) >= common.HashLength*2+36 { + cursor = payload[common.HashLength*2 : common.HashLength*2+36] + } + whisper.envelopeFeed.Send(EnvelopeEvent{ Hash: requestID, Event: EventMailServerRequestCompleted, + Data: &MailServerResponse{ + LastEnvelopeHash: lastEnvelopeHash, + Cursor: cursor, + }, }) } default: @@ -927,6 +967,10 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) { whisper.postEvent(envelope, isP2P) // notify the local node about the new message if whisper.mailServer != nil { whisper.mailServer.Archive(envelope) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: envelope.Hash(), + Event: EventMailServerEnvelopeArchived, + }) } } return true, nil @@ -985,9 +1029,17 @@ func (whisper *Whisper) processQueue() { case e = <-whisper.messageQueue: whisper.filters.NotifyWatchers(e, false) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: e.Hash(), + Event: EventEnvelopeAvailable, + }) case e = <-whisper.p2pMsgQueue: whisper.filters.NotifyWatchers(e, true) + whisper.envelopeFeed.Send(EnvelopeEvent{ + Hash: e.Hash(), + Event: EventEnvelopeAvailable, + }) } } }