Apply mail server cursor patch

This commit is contained in:
Dmitry 2018-09-25 09:38:44 +03:00
parent 07151c8376
commit 3dff91373a
2 changed files with 61 additions and 4 deletions

View File

@ -13,10 +13,14 @@ const (
EventEnvelopeSent EventType = "envelope.sent" EventEnvelopeSent EventType = "envelope.sent"
// EventEnvelopeExpired fires when envelop expired // EventEnvelopeExpired fires when envelop expired
EventEnvelopeExpired EventType = "envelope.expired" EventEnvelopeExpired EventType = "envelope.expired"
// EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestCompleted fires after mailserver sends all the requested messages // EventMailServerRequestCompleted fires after mailserver sends all the requested messages
EventMailServerRequestCompleted EventType = "mailserver.request.completed" EventMailServerRequestCompleted EventType = "mailserver.request.completed"
// EventMailServerRequestExpired fires after mailserver the request TTL ends // EventMailServerRequestExpired fires after mailserver the request TTL ends
EventMailServerRequestExpired EventType = "mailserver.request.expired" EventMailServerRequestExpired EventType = "mailserver.request.expired"
// EventMailServerEnvelopeArchived fires after an envelope has been archived
EventMailServerEnvelopeArchived EventType = "mailserver.envelope.archived"
) )
// EnvelopeEvent used for envelopes events. // EnvelopeEvent used for envelopes events.
@ -24,4 +28,5 @@ type EnvelopeEvent struct {
Event EventType Event EventType
Hash common.Hash Hash common.Hash
Peer discover.NodeID Peer discover.NodeID
Data interface{}
} }

View File

@ -60,6 +60,12 @@ const (
restrictConnectionBetweenLightClientsIdx // Restrict connection between two light clients restrictConnectionBetweenLightClientsIdx // Restrict connection between two light clients
) )
// MailServerResponse is the response payload sent by the mailserver
type MailServerResponse struct {
LastEnvelopeHash common.Hash
Cursor []byte
}
// Whisper represents a dark communication interface through the Ethereum // Whisper represents a dark communication interface through the Ethereum
// network, using its very own P2P communication layer. // network, using its very own P2P communication layer.
type Whisper struct { type Whisper struct {
@ -402,8 +408,8 @@ func (whisper *Whisper) RequestHistoricMessages(peerID []byte, envelope *Envelop
return p2p.Send(p.ws, p2pRequestCode, envelope) return p2p.Send(p.ws, p2pRequestCode, envelope)
} }
func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, requestID common.Hash) error { func (whisper *Whisper) SendHistoricMessageResponse(peer *Peer, payload []byte) error {
size, r, err := rlp.EncodeToReader(requestID) size, r, err := rlp.EncodeToReader(payload)
if err != nil { if err != nil {
return err return err
} }
@ -857,15 +863,49 @@ func (whisper *Whisper) runMessageLoop(p *Peer, rw p2p.MsgReadWriter) error {
} }
case p2pRequestCompleteCode: case p2pRequestCompleteCode:
if p.trusted { if p.trusted {
var requestID common.Hash var payload []byte
if err := packet.Decode(&requestID); err != nil { if err := packet.Decode(&payload); err != nil {
log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err) log.Warn("failed to decode response message, peer will be disconnected", "peer", p.peer.ID(), "err", err)
return errors.New("invalid request response message") return errors.New("invalid request response message")
} }
// check if payload is
// - requestID or
// - requestID + lastEnvelopeHash or
// - requestID + lastEnvelopeHash + cursor
// requestID is the hash of the request envelope.
// lastEnvelopeHash is the last envelope sent by the mail server
// cursor is the db key, 36 bytes: 4 for the timestamp + 32 for the envelope hash.
// length := len(payload)
if len(payload) < common.HashLength || len(payload) > common.HashLength*3+4 {
log.Warn("invalid response message, peer will be disconnected", "peer", p.peer.ID(), "err", err, "payload size", len(payload))
return errors.New("invalid response size")
}
var (
requestID common.Hash
lastEnvelopeHash common.Hash
cursor []byte
)
requestID = common.BytesToHash(payload[:common.HashLength])
if len(payload) >= common.HashLength*2 {
lastEnvelopeHash = common.BytesToHash(payload[common.HashLength : common.HashLength*2])
}
if len(payload) >= common.HashLength*2+36 {
cursor = payload[common.HashLength*2 : common.HashLength*2+36]
}
whisper.envelopeFeed.Send(EnvelopeEvent{ whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: requestID, Hash: requestID,
Event: EventMailServerRequestCompleted, Event: EventMailServerRequestCompleted,
Data: &MailServerResponse{
LastEnvelopeHash: lastEnvelopeHash,
Cursor: cursor,
},
}) })
} }
default: default:
@ -949,6 +989,10 @@ func (whisper *Whisper) add(envelope *Envelope, isP2P bool) (bool, error) {
whisper.postEvent(envelope, isP2P) // notify the local node about the new message whisper.postEvent(envelope, isP2P) // notify the local node about the new message
if whisper.mailServer != nil { if whisper.mailServer != nil {
whisper.mailServer.Archive(envelope) whisper.mailServer.Archive(envelope)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: envelope.Hash(),
Event: EventMailServerEnvelopeArchived,
})
} }
} }
return true, nil return true, nil
@ -991,9 +1035,17 @@ func (whisper *Whisper) processQueue() {
case e = <-whisper.messageQueue: case e = <-whisper.messageQueue:
whisper.filters.NotifyWatchers(e, false) whisper.filters.NotifyWatchers(e, false)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(),
Event: EventEnvelopeAvailable,
})
case e = <-whisper.p2pMsgQueue: case e = <-whisper.p2pMsgQueue:
whisper.filters.NotifyWatchers(e, true) whisper.filters.NotifyWatchers(e, true)
whisper.envelopeFeed.Send(EnvelopeEvent{
Hash: e.Hash(),
Event: EventEnvelopeAvailable,
})
} }
} }
} }