2020-01-13 19:17:30 +00:00
package transport
import (
2021-04-22 13:36:18 +00:00
"context"
"errors"
2024-05-31 13:17:44 +00:00
"math"
2021-04-22 13:36:18 +00:00
"sync"
2024-05-31 13:17:44 +00:00
"time"
2021-04-22 13:36:18 +00:00
2020-01-13 19:17:30 +00:00
"go.uber.org/zap"
2020-01-15 11:36:49 +00:00
"github.com/status-im/status-go/eth-node/types"
2020-01-13 19:17:30 +00:00
)
2021-04-22 13:36:18 +00:00
// EnvelopeState in local tracker
type EnvelopeState int
const (
// NotRegistered returned if asked hash wasn't registered in the tracker.
NotRegistered EnvelopeState = - 1
// EnvelopePosted is set when envelope was added to a local waku queue.
EnvelopePosted EnvelopeState = iota + 1
// EnvelopeSent is set when envelope is sent to at least one peer.
EnvelopeSent
)
2020-01-13 19:17:30 +00:00
type EnvelopesMonitorConfig struct {
2021-11-19 12:17:59 +00:00
EnvelopeEventsHandler EnvelopeEventsHandler
MaxAttempts int
AwaitOnlyMailServerConfirmations bool
IsMailserver func ( types . EnodeID ) bool
Logger * zap . Logger
2020-01-13 19:17:30 +00:00
}
// EnvelopeEventsHandler used for two different event types.
type EnvelopeEventsHandler interface {
EnvelopeSent ( [ ] [ ] byte )
EnvelopeExpired ( [ ] [ ] byte , error )
MailServerRequestCompleted ( types . Hash , types . Hash , [ ] byte , error )
MailServerRequestExpired ( types . Hash )
}
2021-04-22 13:36:18 +00:00
// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor.
func NewEnvelopesMonitor ( w types . Waku , config EnvelopesMonitorConfig ) * EnvelopesMonitor {
logger := config . Logger
if logger == nil {
logger = zap . NewNop ( )
}
var api types . PublicWakuAPI
if w != nil {
api = w . PublicWakuAPI ( )
}
return & EnvelopesMonitor {
2021-11-19 12:17:59 +00:00
w : w ,
api : api ,
handler : config . EnvelopeEventsHandler ,
awaitOnlyMailServerConfirmations : config . AwaitOnlyMailServerConfirmations ,
maxAttempts : config . MaxAttempts ,
isMailserver : config . IsMailserver ,
logger : logger . With ( zap . Namespace ( "EnvelopesMonitor" ) ) ,
2021-04-22 13:36:18 +00:00
// key is envelope hash (event.Hash)
2023-11-22 18:28:40 +00:00
envelopes : map [ types . Hash ] * monitoredEnvelope { } ,
2021-04-22 13:36:18 +00:00
// key is hash of the batch (event.Batch)
batches : map [ types . Hash ] map [ types . Hash ] struct { } { } ,
2023-11-17 14:45:19 +00:00
// key is stringified message identifier
2024-05-31 13:17:44 +00:00
messageEnvelopeHashes : make ( map [ string ] [ ] types . Hash ) ,
2021-04-22 13:36:18 +00:00
}
}
2023-11-22 18:28:40 +00:00
type monitoredEnvelope struct {
2024-05-31 13:17:44 +00:00
envelopeHashID types . Hash
state EnvelopeState
attempts int
message * types . NewMessage
messageIDs [ ] [ ] byte
lastAttemptTime time . Time
2023-11-22 18:28:40 +00:00
}
2021-04-22 13:36:18 +00:00
// EnvelopesMonitor is responsible for monitoring waku envelopes state.
type EnvelopesMonitor struct {
2021-11-19 12:17:59 +00:00
w types . Waku
api types . PublicWakuAPI
handler EnvelopeEventsHandler
maxAttempts int
2021-04-22 13:36:18 +00:00
2023-11-22 18:28:40 +00:00
mu sync . Mutex
2021-04-22 13:36:18 +00:00
2024-05-31 13:17:44 +00:00
envelopes map [ types . Hash ] * monitoredEnvelope
retryQueue [ ] * monitoredEnvelope
batches map [ types . Hash ] map [ types . Hash ] struct { }
messageEnvelopeHashes map [ string ] [ ] types . Hash
2021-04-22 13:36:18 +00:00
2021-11-19 12:17:59 +00:00
awaitOnlyMailServerConfirmations bool
2021-04-22 13:36:18 +00:00
wg sync . WaitGroup
quit chan struct { }
isMailserver func ( peer types . EnodeID ) bool
logger * zap . Logger
}
// Start processing events.
func ( m * EnvelopesMonitor ) Start ( ) {
m . quit = make ( chan struct { } )
2024-05-31 13:17:44 +00:00
m . wg . Add ( 2 )
2021-04-22 13:36:18 +00:00
go func ( ) {
m . handleEnvelopeEvents ( )
m . wg . Done ( )
} ( )
2024-05-31 13:17:44 +00:00
go func ( ) {
defer m . wg . Done ( )
m . retryLoop ( )
} ( )
2021-04-22 13:36:18 +00:00
}
// Stop process events.
func ( m * EnvelopesMonitor ) Stop ( ) {
close ( m . quit )
m . wg . Wait ( )
}
2023-11-17 14:45:19 +00:00
// Add hashes to a tracker.
// Identifiers may be backed by multiple envelopes. It happens when message is split in segmentation layer.
2024-05-31 13:17:44 +00:00
func ( m * EnvelopesMonitor ) Add ( messageIDs [ ] [ ] byte , envelopeHashes [ ] types . Hash , messages [ ] * types . NewMessage ) error {
2023-11-17 14:45:19 +00:00
if len ( envelopeHashes ) != len ( messages ) {
return errors . New ( "hashes don't match messages" )
}
2023-11-22 18:28:40 +00:00
2024-02-05 13:37:56 +00:00
m . mu . Lock ( )
defer m . mu . Unlock ( )
2024-05-31 13:17:44 +00:00
for _ , messageID := range messageIDs {
2024-06-11 07:45:01 +00:00
m . messageEnvelopeHashes [ types . HexBytes ( messageID ) . String ( ) ] = envelopeHashes
2023-11-17 14:45:19 +00:00
}
for i , envelopeHash := range envelopeHashes {
if _ , ok := m . envelopes [ envelopeHash ] ; ! ok {
m . envelopes [ envelopeHash ] = & monitoredEnvelope {
2024-05-31 13:17:44 +00:00
envelopeHashID : envelopeHash ,
state : EnvelopePosted ,
attempts : 1 ,
lastAttemptTime : time . Now ( ) ,
message : messages [ i ] ,
messageIDs : messageIDs ,
2023-11-17 14:45:19 +00:00
}
2023-06-13 15:08:22 +00:00
}
}
2023-11-17 14:45:19 +00:00
2024-05-31 13:17:44 +00:00
m . processMessageIDs ( messageIDs )
2023-11-17 14:45:19 +00:00
return nil
2021-04-22 13:36:18 +00:00
}
func ( m * EnvelopesMonitor ) GetState ( hash types . Hash ) EnvelopeState {
m . mu . Lock ( )
defer m . mu . Unlock ( )
2023-11-22 18:28:40 +00:00
envelope , exist := m . envelopes [ hash ]
2021-04-22 13:36:18 +00:00
if ! exist {
return NotRegistered
}
2023-11-22 18:28:40 +00:00
return envelope . state
2021-04-22 13:36:18 +00:00
}
// handleEnvelopeEvents processes waku envelope events
func ( m * EnvelopesMonitor ) handleEnvelopeEvents ( ) {
events := make ( chan types . EnvelopeEvent , 100 ) // must be buffered to prevent blocking waku
sub := m . w . SubscribeEnvelopeEvents ( events )
defer func ( ) {
sub . Unsubscribe ( )
} ( )
for {
select {
case <- m . quit :
return
case event := <- events :
m . handleEvent ( event )
}
}
}
// handleEvent based on type of the event either triggers
// confirmation handler or removes hash from tracker
func ( m * EnvelopesMonitor ) handleEvent ( event types . EnvelopeEvent ) {
handlers := map [ types . EventType ] func ( types . EnvelopeEvent ) {
types . EventEnvelopeSent : m . handleEventEnvelopeSent ,
types . EventEnvelopeExpired : m . handleEventEnvelopeExpired ,
types . EventBatchAcknowledged : m . handleAcknowledgedBatch ,
types . EventEnvelopeReceived : m . handleEventEnvelopeReceived ,
}
if handler , ok := handlers [ event . Event ] ; ok {
handler ( event )
}
}
func ( m * EnvelopesMonitor ) handleEventEnvelopeSent ( event types . EnvelopeEvent ) {
2021-11-02 18:27:37 +00:00
// Mailserver confirmations for WakuV2 are disabled
2021-11-19 12:17:59 +00:00
if ( m . w == nil || m . w . Version ( ) < 2 ) && m . awaitOnlyMailServerConfirmations {
2021-04-22 13:36:18 +00:00
if ! m . isMailserver ( event . Peer ) {
return
}
}
m . mu . Lock ( )
defer m . mu . Unlock ( )
2023-06-13 15:08:22 +00:00
confirmationExpected := event . Batch != ( types . Hash { } )
2023-11-22 18:28:40 +00:00
envelope , ok := m . envelopes [ event . Hash ]
2021-11-02 18:27:37 +00:00
2023-06-13 15:08:22 +00:00
// If confirmations are not expected, we keep track of the envelope
// being sent
if ! ok && ! confirmationExpected {
2024-05-31 13:17:44 +00:00
m . envelopes [ event . Hash ] = & monitoredEnvelope { envelopeHashID : event . Hash , state : EnvelopeSent }
2023-06-13 15:08:22 +00:00
return
}
2021-04-22 13:36:18 +00:00
// if message was already confirmed - skip it
2023-11-22 18:28:40 +00:00
if envelope . state == EnvelopeSent {
2021-04-22 13:36:18 +00:00
return
}
m . logger . Debug ( "envelope is sent" , zap . String ( "hash" , event . Hash . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) )
2021-11-19 12:17:59 +00:00
if confirmationExpected {
2021-04-22 13:36:18 +00:00
if _ , ok := m . batches [ event . Batch ] ; ! ok {
m . batches [ event . Batch ] = map [ types . Hash ] struct { } { }
}
m . batches [ event . Batch ] [ event . Hash ] = struct { } { }
m . logger . Debug ( "waiting for a confirmation" , zap . String ( "batch" , event . Batch . String ( ) ) )
2021-11-19 12:21:43 +00:00
} else {
m . logger . Debug ( "confirmation not expected, marking as sent" )
2023-11-22 18:28:40 +00:00
envelope . state = EnvelopeSent
2024-05-31 13:17:44 +00:00
m . processMessageIDs ( envelope . messageIDs )
2021-04-22 13:36:18 +00:00
}
}
func ( m * EnvelopesMonitor ) handleAcknowledgedBatch ( event types . EnvelopeEvent ) {
2021-11-19 12:17:59 +00:00
if m . awaitOnlyMailServerConfirmations && ! m . isMailserver ( event . Peer ) {
return
2021-04-22 13:36:18 +00:00
}
m . mu . Lock ( )
defer m . mu . Unlock ( )
envelopes , ok := m . batches [ event . Batch ]
if ! ok {
m . logger . Debug ( "batch is not found" , zap . String ( "batch" , event . Batch . String ( ) ) )
}
m . logger . Debug ( "received a confirmation" , zap . String ( "batch" , event . Batch . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) )
envelopeErrors , ok := event . Data . ( [ ] types . EnvelopeError )
if event . Data != nil && ! ok {
m . logger . Error ( "received unexpected data in the the confirmation event" , zap . Any ( "data" , event . Data ) )
}
failedEnvelopes := map [ types . Hash ] struct { } { }
for i := range envelopeErrors {
envelopeError := envelopeErrors [ i ]
_ , exist := m . envelopes [ envelopeError . Hash ]
if exist {
m . logger . Warn ( "envelope that was posted by us is discarded" , zap . String ( "hash" , envelopeError . Hash . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) , zap . String ( "error" , envelopeError . Description ) )
var err error
switch envelopeError . Code {
case types . EnvelopeTimeNotSynced :
err = errors . New ( "envelope wasn't delivered due to time sync issues" )
}
m . handleEnvelopeFailure ( envelopeError . Hash , err )
}
failedEnvelopes [ envelopeError . Hash ] = struct { } { }
}
for hash := range envelopes {
if _ , exist := failedEnvelopes [ hash ] ; exist {
continue
}
2023-11-22 18:28:40 +00:00
envelope , ok := m . envelopes [ hash ]
if ! ok || envelope . state == EnvelopeSent {
2021-04-22 13:36:18 +00:00
continue
}
2023-11-22 18:28:40 +00:00
envelope . state = EnvelopeSent
2024-05-31 13:17:44 +00:00
m . processMessageIDs ( envelope . messageIDs )
2021-04-22 13:36:18 +00:00
}
delete ( m . batches , event . Batch )
}
func ( m * EnvelopesMonitor ) handleEventEnvelopeExpired ( event types . EnvelopeEvent ) {
m . mu . Lock ( )
defer m . mu . Unlock ( )
m . handleEnvelopeFailure ( event . Hash , errors . New ( "envelope expired due to connectivity issues" ) )
}
// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock
// must be used on a higher level.
func ( m * EnvelopesMonitor ) handleEnvelopeFailure ( hash types . Hash , err error ) {
2023-11-22 18:28:40 +00:00
if envelope , ok := m . envelopes [ hash ] ; ok {
2021-04-22 13:36:18 +00:00
m . clearMessageState ( hash )
2023-11-22 18:28:40 +00:00
if envelope . state == EnvelopeSent {
2021-04-22 13:36:18 +00:00
return
}
2023-11-22 18:28:40 +00:00
if envelope . attempts < m . maxAttempts {
2024-05-31 13:17:44 +00:00
m . retryQueue = append ( m . retryQueue , envelope )
} else {
m . logger . Debug ( "envelope expired" , zap . String ( "hash" , hash . String ( ) ) )
m . removeFromRetryQueue ( hash )
if m . handler != nil {
m . handler . EnvelopeExpired ( envelope . messageIDs , err )
}
}
}
}
func backoffDuration ( attempts int ) time . Duration {
baseDelay := 1 * time . Second
maxDelay := 30 * time . Second
backoff := baseDelay * time . Duration ( math . Pow ( 2 , float64 ( attempts ) ) )
if backoff > maxDelay {
backoff = maxDelay
}
return backoff
}
// retryLoop handles the retry logic to send envelope in a loop
func ( m * EnvelopesMonitor ) retryLoop ( ) {
ticker := time . NewTicker ( 500 * time . Millisecond ) // Timer, triggers every 500 milliseconds
defer ticker . Stop ( )
for {
select {
case <- m . quit :
return
case <- ticker . C :
m . retryOnce ( )
}
}
}
// retryOnce retries once
func ( m * EnvelopesMonitor ) retryOnce ( ) {
m . mu . Lock ( )
defer m . mu . Unlock ( )
for _ , envelope := range m . retryQueue {
if envelope . attempts < m . maxAttempts {
elapsed := time . Since ( envelope . lastAttemptTime )
if elapsed < backoffDuration ( envelope . attempts ) {
continue
}
m . logger . Debug ( "retrying to send a message" , zap . String ( "hash" , envelope . envelopeHashID . String ( ) ) , zap . Int ( "attempt" , envelope . attempts + 1 ) )
2023-11-22 18:28:40 +00:00
hex , err := m . api . Post ( context . TODO ( ) , * envelope . message )
2021-04-22 13:36:18 +00:00
if err != nil {
2024-05-31 13:17:44 +00:00
m . logger . Error ( "failed to retry sending message" , zap . String ( "hash" , envelope . envelopeHashID . String ( ) ) , zap . Int ( "attempt" , envelope . attempts + 1 ) , zap . Error ( err ) )
2021-04-22 13:36:18 +00:00
if m . handler != nil {
2024-05-31 13:17:44 +00:00
m . handler . EnvelopeExpired ( envelope . messageIDs , err )
2021-04-22 13:36:18 +00:00
}
2024-05-31 13:17:44 +00:00
} else {
m . removeFromRetryQueue ( envelope . envelopeHashID )
envelope . envelopeHashID = types . BytesToHash ( hex )
2021-04-22 13:36:18 +00:00
}
2024-05-31 13:17:44 +00:00
envelope . state = EnvelopePosted
envelope . attempts ++
envelope . lastAttemptTime = time . Now ( )
m . envelopes [ envelope . envelopeHashID ] = envelope
}
}
}
// removeFromRetryQueue removes the specified envelope from the retry queue
func ( m * EnvelopesMonitor ) removeFromRetryQueue ( envelopeID types . Hash ) {
var newRetryQueue [ ] * monitoredEnvelope
for _ , envelope := range m . retryQueue {
if envelope . envelopeHashID != envelopeID {
newRetryQueue = append ( newRetryQueue , envelope )
2021-04-22 13:36:18 +00:00
}
}
2024-05-31 13:17:44 +00:00
m . retryQueue = newRetryQueue
2021-04-22 13:36:18 +00:00
}
func ( m * EnvelopesMonitor ) handleEventEnvelopeReceived ( event types . EnvelopeEvent ) {
2021-11-19 12:17:59 +00:00
if m . awaitOnlyMailServerConfirmations && ! m . isMailserver ( event . Peer ) {
return
2021-04-22 13:36:18 +00:00
}
m . mu . Lock ( )
defer m . mu . Unlock ( )
2023-11-22 18:28:40 +00:00
envelope , ok := m . envelopes [ event . Hash ]
if ! ok || envelope . state != EnvelopePosted {
2021-04-22 13:36:18 +00:00
return
}
m . logger . Debug ( "expected envelope received" , zap . String ( "hash" , event . Hash . String ( ) ) , zap . String ( "peer" , event . Peer . String ( ) ) )
2023-11-22 18:28:40 +00:00
envelope . state = EnvelopeSent
2024-05-31 13:17:44 +00:00
m . processMessageIDs ( envelope . messageIDs )
2023-11-17 14:45:19 +00:00
}
2024-05-31 13:17:44 +00:00
func ( m * EnvelopesMonitor ) processMessageIDs ( messageIDs [ ] [ ] byte ) {
sentMessageIDs := make ( [ ] [ ] byte , 0 , len ( messageIDs ) )
2023-11-17 14:45:19 +00:00
2024-05-31 13:17:44 +00:00
for _ , messageID := range messageIDs {
2024-06-11 07:45:01 +00:00
hashes , ok := m . messageEnvelopeHashes [ types . HexBytes ( messageID ) . String ( ) ]
2023-11-17 14:45:19 +00:00
if ! ok {
continue
}
sent := true
2024-05-31 13:17:44 +00:00
// Consider message as sent if all corresponding envelopes are in EnvelopeSent state
2023-11-17 14:45:19 +00:00
for _ , hash := range hashes {
envelope , ok := m . envelopes [ hash ]
if ! ok || envelope . state != EnvelopeSent {
sent = false
break
}
}
if sent {
2024-05-31 13:17:44 +00:00
sentMessageIDs = append ( sentMessageIDs , messageID )
2023-11-17 14:45:19 +00:00
}
}
2024-05-31 13:17:44 +00:00
if len ( sentMessageIDs ) > 0 && m . handler != nil {
m . handler . EnvelopeSent ( sentMessageIDs )
2021-04-22 13:36:18 +00:00
}
}
// clearMessageState removes all message and envelope state.
// not thread-safe, should be protected on a higher level.
func ( m * EnvelopesMonitor ) clearMessageState ( envelopeID types . Hash ) {
2023-11-17 14:45:19 +00:00
envelope , ok := m . envelopes [ envelopeID ]
if ! ok {
return
}
2021-04-22 13:36:18 +00:00
delete ( m . envelopes , envelopeID )
2024-05-31 13:17:44 +00:00
for _ , messageID := range envelope . messageIDs {
2024-06-11 07:45:01 +00:00
delete ( m . messageEnvelopeHashes , types . HexBytes ( messageID ) . String ( ) )
2023-11-17 14:45:19 +00:00
}
2021-04-22 13:36:18 +00:00
}