135 lines
4.8 KiB
Diff
135 lines
4.8 KiB
Diff
diff --git a/whisper/whisperv6/events.go b/whisper/whisperv6/events.go
|
|
index 1665539..fe7570e 100644
|
|
--- a/whisper/whisperv6/events.go
|
|
+++ b/whisper/whisperv6/events.go
|
|
@@ -13,10 +13,14 @@ const (
|
|
EventEnvelopeSent EventType = "envelope.sent"
|
|
// EventEnvelopeExpired fires when envelop expired
|
|
EventEnvelopeExpired EventType = "envelope.expired"
|
|
+ // EventEnvelopeAvailable fires when envelop is available for filters
|
|
+ EventEnvelopeAvailable EventType = "envelope.available"
|
|
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages
|
|
EventMailServerRequestCompleted EventType = "mailserver.request.completed"
|
|
// EventMailServerRequestExpired fires after mailserver the request TTL ends
|
|
EventMailServerRequestExpired EventType = "mailserver.request.expired"
|
|
+ // EventMailServerEnvelopeArchived fires after an envelope has been archived
|
|
+ EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"
|
|
)
|
|
|
|
// EnvelopeEvent used for envelopes events.
|
|
@@ -24,4 +28,5 @@ type EnvelopeEvent struct {
|
|
Event EventType
|
|
Hash common.Hash
|
|
Peer discover.NodeID
|
|
+ Data interface{}
|
|
}
|
|
diff --git a/whisper/whisperv6/whisper.go b/whisper/whisperv6/whisper.go
|
|
index 91d4482..6a937a2 100644
|
|
--- a/whisper/whisperv6/whisper.go
|
|
+++ b/whisper/whisperv6/whisper.go
|
|
@@ -49,6 +49,12 @@ type Statistics struct {
|
|
totalMessagesCleared int
|
|
}
|
|
|
|
+// MailServerResponse is the response payload sent by the mailserver
|
|
+type MailServerResponse struct {
|
|
+ LastEnvelopeHash common.Hash
|
|
+ Cursor []byte
|
|
+}
|
|
+
|
|
const (
|
|
maxMsgSizeIdx = iota // Maximal message length allowed by the whisper node
|
|
overflowIdx // Indicator of message queue overflow
|
|
@@ -378,8 +384,8 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop
|
|
return p2p.Send(p.ws, p2pRequestCode, envelope)
|
|
}
|
|
|
|
-func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error {
|
|
- size, r, err := rlp.EncodeToReader(requestID)
|
|
+func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error {
|
|
+ size, r, err := rlp.EncodeToReader(payload)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
@@ -835,15 +841,49 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
|
|
}
|
|
case p2pRequestCompleteCode:
|
|
if p.trusted {
|
|
- var requestID common.Hash
|
|
- if err := packet.Decode(&requestID); err != nil {
|
|
+ var payload []byte
|
|
+ if err := packet.Decode(&payload); err != nil {
|
|
log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
|
|
return errors.New("invalid request response message")
|
|
}
|
|
|
|
+ // check if payload is
|
|
+ // - requestID or
|
|
+ // - requestID + lastEnvelopeHash or
|
|
+ // - requestID + lastEnvelopeHash + cursor
|
|
+ // requestID is the hash of the request envelope.
|
|
+ // lastEnvelopeHash is the last envelope sent by the mail server
|
|
+ // cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
|
|
+ // length := len(payload)
|
|
+
|
|
+ if len(payload) < common.HashLength || len(payload) > common.HashLength*3+4 {
|
|
+ log.Warn("invalid response message, peer will be disconnected", "peer", p.peer.ID(), "err", err, "payload size", len(payload))
|
|
+ return errors.New("invalid response size")
|
|
+ }
|
|
+
|
|
+ var (
|
|
+ requestID common.Hash
|
|
+ lastEnvelopeHash common.Hash
|
|
+ cursor []byte
|
|
+ )
|
|
+
|
|
+ requestID = common.BytesToHash(payload[:common.HashLength])
|
|
+
|
|
+ if len(payload) >= common.HashLength*2 {
|
|
+ lastEnvelopeHash = common.BytesToHash(payload[common.HashLength : common.HashLength*2])
|
|
+ }
|
|
+
|
|
+ if len(payload) >= common.HashLength*2+36 {
|
|
+ cursor = payload[common.HashLength*2 : common.HashLength*2+36]
|
|
+ }
|
|
+
|
|
whisper.envelopeFeed.Send(EnvelopeEvent{
|
|
Hash: requestID,
|
|
Event: EventMailServerRequestCompleted,
|
|
+ Data: &MailServerResponse{
|
|
+ LastEnvelopeHash: lastEnvelopeHash,
|
|
+ Cursor: cursor,
|
|
+ },
|
|
})
|
|
}
|
|
default:
|
|
@@ -927,6 +967,10 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
|
|
whisper.postEvent(envelope, isP2P) // notify the local node about the new message
|
|
if whisper.mailServer != nil {
|
|
whisper.mailServer.Archive(envelope)
|
|
+ whisper.envelopeFeed.Send(EnvelopeEvent{
|
|
+ Hash: envelope.Hash(),
|
|
+ Event: EventMailServerEnvelopeArchived,
|
|
+ })
|
|
}
|
|
}
|
|
return true, nil
|
|
@@ -985,9 +1029,17 @@ func (whisper *Whisper) processQueue() {
|
|
|
|
case e = <-whisper.messageQueue:
|
|
whisper.filters.NotifyWatchers(e, false)
|
|
+ whisper.envelopeFeed.Send(EnvelopeEvent{
|
|
+ Hash: e.Hash(),
|
|
+ Event: EventEnvelopeAvailable,
|
|
+ })
|
|
|
|
case e = <-whisper.p2pMsgQueue:
|
|
whisper.filters.NotifyWatchers(e, true)
|
|
+ whisper.envelopeFeed.Send(EnvelopeEvent{
|
|
+ Hash: e.Hash(),
|
|
+ Event: EventEnvelopeAvailable,
|
|
+ })
|
|
}
|
|
}
|
|
}
|