2019-08-05 08:10:13 +00:00
package whisper
import (
"context"
"errors"
"sync"
2019-10-09 12:20:51 +00:00
whispertypes "github.com/status-im/status-protocol-go/transport/whisper/types"
statusproto "github.com/status-im/status-protocol-go/types"
2019-08-05 08:10:13 +00:00
"go.uber.org/zap"
)
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = - 1
// EnvelopePosted is set when envelope was added to a local whisper queue.
EnvelopePosted EnvelopeState = iota
// EnvelopeSent is set when envelope is sent to atleast one peer.
EnvelopeSent
)
type EnvelopesMonitorConfig struct {
EnvelopeEventsHandler EnvelopeEventsHandler
MaxAttempts int
MailserverConfirmationsEnabled bool
2019-10-09 12:20:51 +00:00
IsMailserver func ( whispertypes . EnodeID ) bool
2019-08-05 08:10:13 +00:00
Logger * zap . Logger
}
// EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface {
EnvelopeSent ( [ ] [ ] byte )
EnvelopeExpired ( [ ] [ ] byte , error )
2019-10-09 12:20:51 +00:00
MailServerRequestCompleted ( statusproto . Hash , statusproto . Hash , [ ] byte , error )
MailServerRequestExpired ( statusproto . Hash )
2019-08-05 08:10:13 +00:00
}
// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor.
2019-10-09 12:20:51 +00:00
func NewEnvelopesMonitor ( w whispertypes . Whisper , config EnvelopesMonitorConfig ) * EnvelopesMonitor {
2019-08-05 08:10:13 +00:00
logger := config . Logger
if logger == nil {
logger = zap . NewNop ( )
}
2019-10-09 12:20:51 +00:00
var whisperAPI whispertypes . PublicWhisperAPI
if w != nil {
whisperAPI = w . PublicWhisperAPI ( )
}
2019-08-05 08:10:13 +00:00
return & EnvelopesMonitor {
w : w ,
2019-10-09 12:20:51 +00:00
whisperAPI : whisperAPI ,
2019-08-05 08:10:13 +00:00
handler : config . EnvelopeEventsHandler ,
mailServerConfirmation : config . MailserverConfirmationsEnabled ,
maxAttempts : config . MaxAttempts ,
isMailserver : config . IsMailserver ,
logger : logger . With ( zap . Namespace ( "EnvelopesMonitor" ) ) ,
// key is envelope hash (event.Hash)
2019-10-09 12:20:51 +00:00
envelopes : map [ statusproto . Hash ] EnvelopeState { } ,
messages : map [ statusproto . Hash ] * whispertypes . NewMessage { } ,
attempts : map [ statusproto . Hash ] int { } ,
identifiers : make ( map [ statusproto . Hash ] [ ] [ ] byte ) ,
2019-08-05 08:10:13 +00:00
// key is hash of the batch (event.Batch)
2019-10-09 12:20:51 +00:00
batches : map [ statusproto . Hash ] map [ statusproto . Hash ] struct { } { } ,
2019-08-05 08:10:13 +00:00
}
}
// EnvelopesMonitor is responsible for monitoring whisper envelopes state.
type EnvelopesMonitor struct {
2019-10-09 12:20:51 +00:00
w whispertypes . Whisper
whisperAPI whispertypes . PublicWhisperAPI
2019-08-05 08:10:13 +00:00
handler EnvelopeEventsHandler
mailServerConfirmation bool
maxAttempts int
mu sync . Mutex
2019-10-09 12:20:51 +00:00
envelopes map [ statusproto . Hash ] EnvelopeState
batches map [ statusproto . Hash ] map [ statusproto . Hash ] struct { }
2019-08-05 08:10:13 +00:00
2019-10-09 12:20:51 +00:00
messages map [ statusproto . Hash ] * whispertypes . NewMessage
attempts map [ statusproto . Hash ] int
identifiers map [ statusproto . Hash ] [ ] [ ] byte
2019-08-05 08:10:13 +00:00
wg sync . WaitGroup
quit chan struct { }
2019-10-09 12:20:51 +00:00
isMailserver func ( peer whispertypes . EnodeID ) bool
2019-08-05 08:10:13 +00:00
logger * zap . Logger
}
// Start processing events.
func ( m * EnvelopesMonitor ) Start ( ) {
m . quit = make ( chan struct { } )
m . wg . Add ( 1 )
go func ( ) {
m . handleEnvelopeEvents ( )
m . wg . Done ( )
} ( )
}
// Stop process events.
func ( m * EnvelopesMonitor ) Stop ( ) {
close ( m . quit )
m . wg . Wait ( )
}
// Add hash to a tracker.
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) Add ( identifiers [ ] [ ] byte , envelopeHash statusproto . Hash , message whispertypes . NewMessage ) {
2019-08-05 08:10:13 +00:00
m . mu . Lock ( )
defer m . mu . Unlock ( )
m . envelopes [ envelopeHash ] = EnvelopePosted
m . identifiers [ envelopeHash ] = identifiers
2019-10-09 12:20:51 +00:00
m . messages [ envelopeHash ] = & message
2019-08-05 08:10:13 +00:00
m . attempts [ envelopeHash ] = 1
}
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) GetState ( hash statusproto . Hash ) EnvelopeState {
2019-08-05 08:10:13 +00:00
m . mu . Lock ( )
defer m . mu . Unlock ( )
state , exist := m . envelopes [ hash ]
if ! exist {
return NotRegistered
}
return state
}
// handleEnvelopeEvents processes whisper envelope events
func ( m * EnvelopesMonitor ) handleEnvelopeEvents ( ) {
2019-10-09 12:20:51 +00:00
events := make ( chan whispertypes . EnvelopeEvent , 100 ) // must be buffered to prevent blocking whisper
2019-08-05 08:10:13 +00:00
sub := m . w . SubscribeEnvelopeEvents ( events )
2019-10-09 12:20:51 +00:00
defer func ( ) {
close ( events )
sub . Unsubscribe ( )
} ( )
2019-08-05 08:10:13 +00:00
for {
select {
case <- m . quit :
return
case event := <- events :
m . handleEvent ( event )
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) handleEvent ( event whispertypes . EnvelopeEvent ) {
handlers := map [ whispertypes . EventType ] func ( whispertypes . EnvelopeEvent ) {
whispertypes . EventEnvelopeSent : m . handleEventEnvelopeSent ,
whispertypes . EventEnvelopeExpired : m . handleEventEnvelopeExpired ,
whispertypes . EventBatchAcknowledged : m . handleAcknowledgedBatch ,
whispertypes . EventEnvelopeReceived : m . handleEventEnvelopeReceived ,
2019-08-05 08:10:13 +00:00
}
if handler , ok := handlers [ event . Event ] ; ok {
handler ( event )
}
}
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) handleEventEnvelopeSent ( event whispertypes . EnvelopeEvent ) {
2019-08-05 08:10:13 +00:00
if m . mailServerConfirmation {
if ! m . isMailserver ( event . Peer ) {
return
}
}
m . mu . Lock ( )
defer m . mu . Unlock ( )
state , ok := m . envelopes [ event . Hash ]
// if we didn't send a message using extension - skip it
// if message was already confirmed - skip it
if ! ok || state == EnvelopeSent {
return
}
m . logger . Debug ( "envelope is sent" , zap . String ( "hash" , event . Hash . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) )
2019-10-09 12:20:51 +00:00
if event . Batch != ( statusproto . Hash { } ) {
2019-08-05 08:10:13 +00:00
if _ , ok := m . batches [ event . Batch ] ; ! ok {
2019-10-09 12:20:51 +00:00
m . batches [ event . Batch ] = map [ statusproto . Hash ] struct { } { }
2019-08-05 08:10:13 +00:00
}
m . batches [ event . Batch ] [ event . Hash ] = struct { } { }
m . logger . Debug ( "waiting for a confirmation" , zap . String ( "batch" , event . Batch . String ( ) ) )
} else {
m . envelopes [ event . Hash ] = EnvelopeSent
if m . handler != nil {
m . handler . EnvelopeSent ( m . identifiers [ event . Hash ] )
}
}
}
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) handleAcknowledgedBatch ( event whispertypes . EnvelopeEvent ) {
2019-08-05 08:10:13 +00:00
if m . mailServerConfirmation {
if ! m . isMailserver ( event . Peer ) {
return
}
}
m . mu . Lock ( )
defer m . mu . Unlock ( )
envelopes , ok := m . batches [ event . Batch ]
if ! ok {
m . logger . Debug ( "batch is not found" , zap . String ( "batch" , event . Batch . String ( ) ) )
}
m . logger . Debug ( "received a confirmation" , zap . String ( "batch" , event . Batch . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) )
2019-10-09 12:20:51 +00:00
envelopeErrors , ok := event . Data . ( [ ] whispertypes . EnvelopeError )
2019-08-05 08:10:13 +00:00
if event . Data != nil && ! ok {
m . logger . Error ( "received unexpected data in the the confirmation event" , zap . String ( "batch" , event . Batch . String ( ) ) )
}
2019-10-09 12:20:51 +00:00
failedEnvelopes := map [ statusproto . Hash ] struct { } { }
2019-08-05 08:10:13 +00:00
for i := range envelopeErrors {
envelopeError := envelopeErrors [ i ]
_ , exist := m . envelopes [ envelopeError . Hash ]
if exist {
m . logger . Warn ( "envelope that was posted by us is discarded" , zap . String ( "hash" , envelopeError . Hash . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) , zap . String ( "error" , envelopeError . Description ) )
var err error
switch envelopeError . Code {
2019-10-09 12:20:51 +00:00
case whispertypes . EnvelopeTimeNotSynced :
2019-08-05 08:10:13 +00:00
err = errors . New ( "envelope wasn't delivered due to time sync issues" )
}
m . handleEnvelopeFailure ( envelopeError . Hash , err )
}
failedEnvelopes [ envelopeError . Hash ] = struct { } { }
}
for hash := range envelopes {
if _ , exist := failedEnvelopes [ hash ] ; exist {
continue
}
state , ok := m . envelopes [ hash ]
if ! ok || state == EnvelopeSent {
continue
}
m . envelopes [ hash ] = EnvelopeSent
if m . handler != nil {
m . handler . EnvelopeSent ( m . identifiers [ hash ] )
}
}
delete ( m . batches , event . Batch )
}
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) handleEventEnvelopeExpired ( event whispertypes . EnvelopeEvent ) {
2019-08-05 08:10:13 +00:00
m . mu . Lock ( )
defer m . mu . Unlock ( )
m . handleEnvelopeFailure ( event . Hash , errors . New ( "envelope expired due to connectivity issues" ) )
}
// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock
// must be used on a higher level.
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) handleEnvelopeFailure ( hash statusproto . Hash , err error ) {
2019-08-05 08:10:13 +00:00
if state , ok := m . envelopes [ hash ] ; ok {
message , exist := m . messages [ hash ]
if ! exist {
m . logger . Error ( "message was deleted erroneously" , zap . String ( "envelope hash" , hash . String ( ) ) )
}
attempt := m . attempts [ hash ]
identifiers := m . identifiers [ hash ]
m . clearMessageState ( hash )
if state == EnvelopeSent {
return
}
if attempt < m . maxAttempts {
m . logger . Debug ( "retrying to send a message" , zap . String ( "hash" , hash . String ( ) ) , zap . Int ( "attempt" , attempt + 1 ) )
2019-10-09 12:20:51 +00:00
hex , err := m . whisperAPI . Post ( context . TODO ( ) , * message )
2019-08-05 08:10:13 +00:00
if err != nil {
m . logger . Error ( "failed to retry sending message" , zap . String ( "hash" , hash . String ( ) ) , zap . Int ( "attempt" , attempt + 1 ) , zap . Error ( err ) )
if m . handler != nil {
m . handler . EnvelopeExpired ( identifiers , err )
}
}
2019-10-09 12:20:51 +00:00
envelopeID := statusproto . BytesToHash ( hex )
2019-08-05 08:10:13 +00:00
m . envelopes [ envelopeID ] = EnvelopePosted
m . messages [ envelopeID ] = message
m . attempts [ envelopeID ] = attempt + 1
2019-07-31 07:50:08 +00:00
m . identifiers [ envelopeID ] = identifiers
2019-08-05 08:10:13 +00:00
} else {
m . logger . Debug ( "envelope expired" , zap . String ( "hash" , hash . String ( ) ) )
if m . handler != nil {
m . handler . EnvelopeExpired ( identifiers , err )
}
}
}
}
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) handleEventEnvelopeReceived ( event whispertypes . EnvelopeEvent ) {
2019-08-05 08:10:13 +00:00
if m . mailServerConfirmation {
if ! m . isMailserver ( event . Peer ) {
return
}
}
m . mu . Lock ( )
defer m . mu . Unlock ( )
state , ok := m . envelopes [ event . Hash ]
if ! ok || state != EnvelopePosted {
return
}
m . logger . Debug ( "expected envelope received" , zap . String ( "hash" , event . Hash . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) )
m . envelopes [ event . Hash ] = EnvelopeSent
if m . handler != nil {
m . handler . EnvelopeSent ( m . identifiers [ event . Hash ] )
}
}
// clearMessageState removes all message and envelope state.
// not thread-safe, should be protected on a higher level.
2019-10-09 12:20:51 +00:00
func ( m * EnvelopesMonitor ) clearMessageState ( envelopeID statusproto . Hash ) {
2019-08-05 08:10:13 +00:00
delete ( m . envelopes , envelopeID )
delete ( m . messages , envelopeID )
delete ( m . attempts , envelopeID )
delete ( m . identifiers , envelopeID )
}