mirror of
https://github.com/status-im/status-go.git
synced 2025-01-25 05:58:59 +00:00
8015cc3e3b
It happens that an envelope is sent before it's tracked, resulting in long delays before the envelope is marked as sent. This commit changes the behavior of the code so that order is now irrelevant.
335 lines
9.8 KiB
Go
335 lines
9.8 KiB
Go
package transport
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"sync"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
)
|
|
|
|
// EnvelopeState in local tracker
|
|
type EnvelopeState int
|
|
|
|
const (
|
|
// NotRegistered returned if asked hash wasn't registered in the tracker.
|
|
NotRegistered EnvelopeState = -1
|
|
// EnvelopePosted is set when envelope was added to a local waku queue.
|
|
EnvelopePosted EnvelopeState = iota + 1
|
|
// EnvelopeSent is set when envelope is sent to at least one peer.
|
|
EnvelopeSent
|
|
)
|
|
|
|
type EnvelopesMonitorConfig struct {
|
|
EnvelopeEventsHandler EnvelopeEventsHandler
|
|
MaxAttempts int
|
|
AwaitOnlyMailServerConfirmations bool
|
|
IsMailserver func(types.EnodeID) bool
|
|
Logger *zap.Logger
|
|
}
|
|
|
|
// EnvelopeEventsHandler used for two different event types.
|
|
type EnvelopeEventsHandler interface {
|
|
EnvelopeSent([][]byte)
|
|
EnvelopeExpired([][]byte, error)
|
|
MailServerRequestCompleted(types.Hash, types.Hash, []byte, error)
|
|
MailServerRequestExpired(types.Hash)
|
|
}
|
|
|
|
// NewEnvelopesMonitor returns a pointer to an instance of the EnvelopesMonitor.
|
|
func NewEnvelopesMonitor(w types.Waku, config EnvelopesMonitorConfig) *EnvelopesMonitor {
|
|
logger := config.Logger
|
|
|
|
if logger == nil {
|
|
logger = zap.NewNop()
|
|
}
|
|
|
|
var api types.PublicWakuAPI
|
|
if w != nil {
|
|
api = w.PublicWakuAPI()
|
|
}
|
|
|
|
return &EnvelopesMonitor{
|
|
w: w,
|
|
api: api,
|
|
handler: config.EnvelopeEventsHandler,
|
|
awaitOnlyMailServerConfirmations: config.AwaitOnlyMailServerConfirmations,
|
|
maxAttempts: config.MaxAttempts,
|
|
isMailserver: config.IsMailserver,
|
|
logger: logger.With(zap.Namespace("EnvelopesMonitor")),
|
|
|
|
// key is envelope hash (event.Hash)
|
|
envelopes: map[types.Hash]EnvelopeState{},
|
|
messages: map[types.Hash]*types.NewMessage{},
|
|
attempts: map[types.Hash]int{},
|
|
identifiers: make(map[types.Hash][][]byte),
|
|
|
|
// key is hash of the batch (event.Batch)
|
|
batches: map[types.Hash]map[types.Hash]struct{}{},
|
|
}
|
|
}
|
|
|
|
// EnvelopesMonitor is responsible for monitoring waku envelopes state.
|
|
type EnvelopesMonitor struct {
|
|
w types.Waku
|
|
api types.PublicWakuAPI
|
|
handler EnvelopeEventsHandler
|
|
maxAttempts int
|
|
|
|
mu sync.Mutex
|
|
envelopes map[types.Hash]EnvelopeState
|
|
batches map[types.Hash]map[types.Hash]struct{}
|
|
|
|
messages map[types.Hash]*types.NewMessage
|
|
attempts map[types.Hash]int
|
|
identifiers map[types.Hash][][]byte
|
|
|
|
awaitOnlyMailServerConfirmations bool
|
|
|
|
wg sync.WaitGroup
|
|
quit chan struct{}
|
|
isMailserver func(peer types.EnodeID) bool
|
|
|
|
logger *zap.Logger
|
|
}
|
|
|
|
// Start processing events.
|
|
func (m *EnvelopesMonitor) Start() {
|
|
m.quit = make(chan struct{})
|
|
m.wg.Add(1)
|
|
go func() {
|
|
m.handleEnvelopeEvents()
|
|
m.wg.Done()
|
|
}()
|
|
}
|
|
|
|
// Stop process events.
|
|
func (m *EnvelopesMonitor) Stop() {
|
|
close(m.quit)
|
|
m.wg.Wait()
|
|
}
|
|
|
|
// Add hash to a tracker.
|
|
func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.identifiers[envelopeHash] = identifiers
|
|
// If it's already been marked as sent, we notify the client
|
|
if m.envelopes[envelopeHash] == EnvelopeSent {
|
|
if m.handler != nil {
|
|
m.handler.EnvelopeSent(m.identifiers[envelopeHash])
|
|
}
|
|
} else {
|
|
// otherwise we keep track of the message
|
|
m.messages[envelopeHash] = &message
|
|
m.attempts[envelopeHash] = 1
|
|
m.envelopes[envelopeHash] = EnvelopePosted
|
|
}
|
|
}
|
|
|
|
func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
state, exist := m.envelopes[hash]
|
|
if !exist {
|
|
return NotRegistered
|
|
}
|
|
return state
|
|
}
|
|
|
|
// handleEnvelopeEvents processes waku envelope events
|
|
func (m *EnvelopesMonitor) handleEnvelopeEvents() {
|
|
events := make(chan types.EnvelopeEvent, 100) // must be buffered to prevent blocking waku
|
|
sub := m.w.SubscribeEnvelopeEvents(events)
|
|
defer func() {
|
|
close(events)
|
|
sub.Unsubscribe()
|
|
}()
|
|
for {
|
|
select {
|
|
case <-m.quit:
|
|
return
|
|
case event := <-events:
|
|
m.handleEvent(event)
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleEvent based on type of the event either triggers
|
|
// confirmation handler or removes hash from tracker
|
|
func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) {
|
|
handlers := map[types.EventType]func(types.EnvelopeEvent){
|
|
types.EventEnvelopeSent: m.handleEventEnvelopeSent,
|
|
types.EventEnvelopeExpired: m.handleEventEnvelopeExpired,
|
|
types.EventBatchAcknowledged: m.handleAcknowledgedBatch,
|
|
types.EventEnvelopeReceived: m.handleEventEnvelopeReceived,
|
|
}
|
|
if handler, ok := handlers[event.Event]; ok {
|
|
handler(event)
|
|
}
|
|
}
|
|
|
|
func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
|
|
// Mailserver confirmations for WakuV2 are disabled
|
|
if (m.w == nil || m.w.Version() < 2) && m.awaitOnlyMailServerConfirmations {
|
|
if !m.isMailserver(event.Peer) {
|
|
return
|
|
}
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
confirmationExpected := event.Batch != (types.Hash{})
|
|
|
|
state, ok := m.envelopes[event.Hash]
|
|
|
|
// If confirmations are not expected, we keep track of the envelope
|
|
// being sent
|
|
if !ok && !confirmationExpected {
|
|
m.envelopes[event.Hash] = EnvelopeSent
|
|
return
|
|
}
|
|
|
|
// if message was already confirmed - skip it
|
|
if state == EnvelopeSent {
|
|
return
|
|
}
|
|
m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
|
|
if confirmationExpected {
|
|
if _, ok := m.batches[event.Batch]; !ok {
|
|
m.batches[event.Batch] = map[types.Hash]struct{}{}
|
|
}
|
|
m.batches[event.Batch][event.Hash] = struct{}{}
|
|
m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String()))
|
|
} else {
|
|
m.logger.Debug("confirmation not expected, marking as sent")
|
|
m.envelopes[event.Hash] = EnvelopeSent
|
|
if m.handler != nil {
|
|
m.handler.EnvelopeSent(m.identifiers[event.Hash])
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) {
|
|
|
|
if m.awaitOnlyMailServerConfirmations && !m.isMailserver(event.Peer) {
|
|
return
|
|
}
|
|
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
|
|
envelopes, ok := m.batches[event.Batch]
|
|
if !ok {
|
|
m.logger.Debug("batch is not found", zap.String("batch", event.Batch.String()))
|
|
}
|
|
m.logger.Debug("received a confirmation", zap.String("batch", event.Batch.String()), zap.String("peer", event.Peer.String()))
|
|
envelopeErrors, ok := event.Data.([]types.EnvelopeError)
|
|
if event.Data != nil && !ok {
|
|
m.logger.Error("received unexpected data in the the confirmation event", zap.Any("data", event.Data))
|
|
}
|
|
failedEnvelopes := map[types.Hash]struct{}{}
|
|
for i := range envelopeErrors {
|
|
envelopeError := envelopeErrors[i]
|
|
_, exist := m.envelopes[envelopeError.Hash]
|
|
if exist {
|
|
m.logger.Warn("envelope that was posted by us is discarded", zap.String("hash", envelopeError.Hash.String()), zap.String("peer", event.Peer.String()), zap.String("error", envelopeError.Description))
|
|
var err error
|
|
switch envelopeError.Code {
|
|
case types.EnvelopeTimeNotSynced:
|
|
err = errors.New("envelope wasn't delivered due to time sync issues")
|
|
}
|
|
m.handleEnvelopeFailure(envelopeError.Hash, err)
|
|
}
|
|
failedEnvelopes[envelopeError.Hash] = struct{}{}
|
|
}
|
|
|
|
for hash := range envelopes {
|
|
if _, exist := failedEnvelopes[hash]; exist {
|
|
continue
|
|
}
|
|
state, ok := m.envelopes[hash]
|
|
if !ok || state == EnvelopeSent {
|
|
continue
|
|
}
|
|
m.envelopes[hash] = EnvelopeSent
|
|
if m.handler != nil {
|
|
m.handler.EnvelopeSent(m.identifiers[hash])
|
|
}
|
|
}
|
|
delete(m.batches, event.Batch)
|
|
}
|
|
|
|
func (m *EnvelopesMonitor) handleEventEnvelopeExpired(event types.EnvelopeEvent) {
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
m.handleEnvelopeFailure(event.Hash, errors.New("envelope expired due to connectivity issues"))
|
|
}
|
|
|
|
// handleEnvelopeFailure is a common code path for processing envelopes failures. not thread safe, lock
|
|
// must be used on a higher level.
|
|
func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) {
|
|
if state, ok := m.envelopes[hash]; ok {
|
|
message, exist := m.messages[hash]
|
|
if !exist {
|
|
m.logger.Error("message was deleted erroneously", zap.String("envelope hash", hash.String()))
|
|
}
|
|
attempt := m.attempts[hash]
|
|
identifiers := m.identifiers[hash]
|
|
m.clearMessageState(hash)
|
|
if state == EnvelopeSent {
|
|
return
|
|
}
|
|
if attempt < m.maxAttempts {
|
|
m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1))
|
|
hex, err := m.api.Post(context.TODO(), *message)
|
|
if err != nil {
|
|
m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", attempt+1), zap.Error(err))
|
|
if m.handler != nil {
|
|
m.handler.EnvelopeExpired(identifiers, err)
|
|
}
|
|
|
|
}
|
|
envelopeID := types.BytesToHash(hex)
|
|
m.envelopes[envelopeID] = EnvelopePosted
|
|
m.messages[envelopeID] = message
|
|
m.attempts[envelopeID] = attempt + 1
|
|
m.identifiers[envelopeID] = identifiers
|
|
} else {
|
|
m.logger.Debug("envelope expired", zap.String("hash", hash.String()))
|
|
if m.handler != nil {
|
|
m.handler.EnvelopeExpired(identifiers, err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) {
|
|
if m.awaitOnlyMailServerConfirmations && !m.isMailserver(event.Peer) {
|
|
return
|
|
}
|
|
m.mu.Lock()
|
|
defer m.mu.Unlock()
|
|
state, ok := m.envelopes[event.Hash]
|
|
if !ok || state != EnvelopePosted {
|
|
return
|
|
}
|
|
m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
|
|
m.envelopes[event.Hash] = EnvelopeSent
|
|
if m.handler != nil {
|
|
m.handler.EnvelopeSent(m.identifiers[event.Hash])
|
|
}
|
|
}
|
|
|
|
// clearMessageState removes all message and envelope state.
|
|
// not thread-safe, should be protected on a higher level.
|
|
func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) {
|
|
delete(m.envelopes, envelopeID)
|
|
delete(m.messages, envelopeID)
|
|
delete(m.attempts, envelopeID)
|
|
delete(m.identifiers, envelopeID)
|
|
}
|