chore_:optimise envelope monitor (#5172)

This commit is contained in:
frank 2024-05-31 21:17:44 +08:00 committed by GitHub
parent f41dd35d6c
commit 0a88ebdeae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 158 additions and 51 deletions

View File

@ -3,7 +3,9 @@ package transport
import (
"context"
"errors"
"math"
"sync"
"time"
"go.uber.org/zap"
@ -67,15 +69,17 @@ func NewEnvelopesMonitor(w types.Waku, config EnvelopesMonitorConfig) *Envelopes
batches: map[types.Hash]map[types.Hash]struct{}{},
// key is stringified message identifier
identifierHashes: make(map[string][]types.Hash),
messageEnvelopeHashes: make(map[string][]types.Hash),
}
}
type monitoredEnvelope struct {
state EnvelopeState
attempts int
message *types.NewMessage
identifiers [][]byte
envelopeHashID types.Hash
state EnvelopeState
attempts int
message *types.NewMessage
messageIDs [][]byte
lastAttemptTime time.Time
}
// EnvelopesMonitor is responsible for monitoring waku envelopes state.
@ -87,9 +91,10 @@ type EnvelopesMonitor struct {
mu sync.Mutex
envelopes map[types.Hash]*monitoredEnvelope
batches map[types.Hash]map[types.Hash]struct{}
identifierHashes map[string][]types.Hash
envelopes map[types.Hash]*monitoredEnvelope
retryQueue []*monitoredEnvelope
batches map[types.Hash]map[types.Hash]struct{}
messageEnvelopeHashes map[string][]types.Hash
awaitOnlyMailServerConfirmations bool
@ -103,11 +108,15 @@ type EnvelopesMonitor struct {
// Start processing events.
func (m *EnvelopesMonitor) Start() {
m.quit = make(chan struct{})
m.wg.Add(1)
m.wg.Add(2)
go func() {
m.handleEnvelopeEvents()
m.wg.Done()
}()
go func() {
defer m.wg.Done()
m.retryLoop()
}()
}
// Stop process events.
@ -118,7 +127,7 @@ func (m *EnvelopesMonitor) Stop() {
// Add hashes to a tracker.
// Identifiers may be backed by multiple envelopes. It happens when message is split in segmentation layer.
func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHashes []types.Hash, messages []*types.NewMessage) error {
func (m *EnvelopesMonitor) Add(messageIDs [][]byte, envelopeHashes []types.Hash, messages []*types.NewMessage) error {
if len(envelopeHashes) != len(messages) {
return errors.New("hashes don't match messages")
}
@ -126,22 +135,24 @@ func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHashes []types.Hash
m.mu.Lock()
defer m.mu.Unlock()
for _, identifier := range identifiers {
m.identifierHashes[string(identifier)] = envelopeHashes
for _, messageID := range messageIDs {
m.messageEnvelopeHashes[string(messageID)] = envelopeHashes
}
for i, envelopeHash := range envelopeHashes {
if _, ok := m.envelopes[envelopeHash]; !ok {
m.envelopes[envelopeHash] = &monitoredEnvelope{
state: EnvelopePosted,
attempts: 1,
message: messages[i],
identifiers: identifiers,
envelopeHashID: envelopeHash,
state: EnvelopePosted,
attempts: 1,
lastAttemptTime: time.Now(),
message: messages[i],
messageIDs: messageIDs,
}
}
}
m.processIdentifiers(identifiers)
m.processMessageIDs(messageIDs)
return nil
}
@ -206,7 +217,7 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
// If confirmations are not expected, we keep track of the envelope
// being sent
if !ok && !confirmationExpected {
m.envelopes[event.Hash] = &monitoredEnvelope{state: EnvelopeSent}
m.envelopes[event.Hash] = &monitoredEnvelope{envelopeHashID: event.Hash, state: EnvelopeSent}
return
}
@ -224,7 +235,7 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
} else {
m.logger.Debug("confirmation not expected, marking as sent")
envelope.state = EnvelopeSent
m.processIdentifiers(envelope.identifiers)
m.processMessageIDs(envelope.messageIDs)
}
}
@ -271,7 +282,7 @@ func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) {
continue
}
envelope.state = EnvelopeSent
m.processIdentifiers(envelope.identifiers)
m.processMessageIDs(envelope.messageIDs)
}
delete(m.batches, event.Batch)
}
@ -291,31 +302,84 @@ func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) {
return
}
if envelope.attempts < m.maxAttempts {
m.logger.Debug("retrying to send a message", zap.String("hash", hash.String()), zap.Int("attempt", envelope.attempts+1))
hex, err := m.api.Post(context.TODO(), *envelope.message)
if err != nil {
m.logger.Error("failed to retry sending message", zap.String("hash", hash.String()), zap.Int("attempt", envelope.attempts+1), zap.Error(err))
if m.handler != nil {
m.handler.EnvelopeExpired(envelope.identifiers, err)
}
}
envelopeID := types.BytesToHash(hex)
m.envelopes[envelopeID] = &monitoredEnvelope{
state: EnvelopePosted,
attempts: envelope.attempts + 1,
message: envelope.message,
identifiers: envelope.identifiers,
}
m.retryQueue = append(m.retryQueue, envelope)
} else {
m.logger.Debug("envelope expired", zap.String("hash", hash.String()))
m.removeFromRetryQueue(hash)
if m.handler != nil {
m.handler.EnvelopeExpired(envelope.identifiers, err)
m.handler.EnvelopeExpired(envelope.messageIDs, err)
}
}
}
}
func backoffDuration(attempts int) time.Duration {
baseDelay := 1 * time.Second
maxDelay := 30 * time.Second
backoff := baseDelay * time.Duration(math.Pow(2, float64(attempts)))
if backoff > maxDelay {
backoff = maxDelay
}
return backoff
}
// retryLoop handles the retry logic to send envelope in a loop
func (m *EnvelopesMonitor) retryLoop() {
ticker := time.NewTicker(500 * time.Millisecond) // Timer, triggers every 500 milliseconds
defer ticker.Stop()
for {
select {
case <-m.quit:
return
case <-ticker.C:
m.retryOnce()
}
}
}
// retryOnce retries once
func (m *EnvelopesMonitor) retryOnce() {
m.mu.Lock()
defer m.mu.Unlock()
for _, envelope := range m.retryQueue {
if envelope.attempts < m.maxAttempts {
elapsed := time.Since(envelope.lastAttemptTime)
if elapsed < backoffDuration(envelope.attempts) {
continue
}
m.logger.Debug("retrying to send a message", zap.String("hash", envelope.envelopeHashID.String()), zap.Int("attempt", envelope.attempts+1))
hex, err := m.api.Post(context.TODO(), *envelope.message)
if err != nil {
m.logger.Error("failed to retry sending message", zap.String("hash", envelope.envelopeHashID.String()), zap.Int("attempt", envelope.attempts+1), zap.Error(err))
if m.handler != nil {
m.handler.EnvelopeExpired(envelope.messageIDs, err)
}
} else {
m.removeFromRetryQueue(envelope.envelopeHashID)
envelope.envelopeHashID = types.BytesToHash(hex)
}
envelope.state = EnvelopePosted
envelope.attempts++
envelope.lastAttemptTime = time.Now()
m.envelopes[envelope.envelopeHashID] = envelope
}
}
}
// removeFromRetryQueue removes the specified envelope from the retry queue
func (m *EnvelopesMonitor) removeFromRetryQueue(envelopeID types.Hash) {
var newRetryQueue []*monitoredEnvelope
for _, envelope := range m.retryQueue {
if envelope.envelopeHashID != envelopeID {
newRetryQueue = append(newRetryQueue, envelope)
}
}
m.retryQueue = newRetryQueue
}
func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) {
if m.awaitOnlyMailServerConfirmations && !m.isMailserver(event.Peer) {
return
@ -328,20 +392,20 @@ func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent
}
m.logger.Debug("expected envelope received", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
envelope.state = EnvelopeSent
m.processIdentifiers(envelope.identifiers)
m.processMessageIDs(envelope.messageIDs)
}
func (m *EnvelopesMonitor) processIdentifiers(identifiers [][]byte) {
sentIdentifiers := make([][]byte, 0, len(identifiers))
func (m *EnvelopesMonitor) processMessageIDs(messageIDs [][]byte) {
sentMessageIDs := make([][]byte, 0, len(messageIDs))
for _, identifier := range identifiers {
hashes, ok := m.identifierHashes[string(identifier)]
for _, messageID := range messageIDs {
hashes, ok := m.messageEnvelopeHashes[string(messageID)]
if !ok {
continue
}
sent := true
// Consider identifier as sent if all corresponding envelopes are in EnvelopeSent state
// Consider message as sent if all corresponding envelopes are in EnvelopeSent state
for _, hash := range hashes {
envelope, ok := m.envelopes[hash]
if !ok || envelope.state != EnvelopeSent {
@ -350,12 +414,12 @@ func (m *EnvelopesMonitor) processIdentifiers(identifiers [][]byte) {
}
}
if sent {
sentIdentifiers = append(sentIdentifiers, identifier)
sentMessageIDs = append(sentMessageIDs, messageID)
}
}
if len(sentIdentifiers) > 0 && m.handler != nil {
m.handler.EnvelopeSent(sentIdentifiers)
if len(sentMessageIDs) > 0 && m.handler != nil {
m.handler.EnvelopeSent(sentMessageIDs)
}
}
@ -367,7 +431,7 @@ func (m *EnvelopesMonitor) clearMessageState(envelopeID types.Hash) {
return
}
delete(m.envelopes, envelopeID)
for _, identifier := range envelope.identifiers {
delete(m.identifierHashes, string(identifier))
for _, messageID := range envelope.messageIDs {
delete(m.messageEnvelopeHashes, string(messageID))
}
}

View File

@ -1,8 +1,10 @@
package transport
import (
"context"
"reflect"
"testing"
"time"
"go.uber.org/zap"
@ -51,7 +53,7 @@ func (s *EnvelopesMonitorSuite) SetupTest() {
nil,
EnvelopesMonitorConfig{
EnvelopeEventsHandler: s.eventsHandlerMock,
MaxAttempts: 0,
MaxAttempts: 6,
AwaitOnlyMailServerConfirmations: false,
IsMailserver: func(types.EnodeID) bool { return false },
Logger: zap.NewNop(),
@ -199,7 +201,7 @@ func (s *EnvelopesMonitorSuite) TestMultipleHashes_EnvelopeExpired() {
err := s.monitor.Add(messageIDs, hashes, messages)
s.Require().NoError(err)
// If any envelope fails, then identifiers are considered as not sent
// If any envelope fails, then messageIDs are considered as not sent
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeExpired,
Hash: hashes[0],
@ -214,7 +216,7 @@ func (s *EnvelopesMonitorSuite) TestMultipleHashes_EnvelopeExpired() {
})
s.Require().Empty(s.eventsHandlerMock.envelopeSentCalls)
s.Require().Empty(s.monitor.identifierHashes)
s.Require().Empty(s.monitor.messageEnvelopeHashes)
s.Require().Len(s.monitor.envelopes, 2)
}
@ -222,3 +224,44 @@ func (s *EnvelopesMonitorSuite) TestMultipleHashes_Failure() {
err := s.monitor.Add(testIDs, []types.Hash{{0x01}, {0x02}}, []*types.NewMessage{{}})
s.Require().Error(err)
}
func (s *EnvelopesMonitorSuite) TestRetryOnce() {
s.monitor.api = &mockWakuAPI{}
err := s.monitor.Add(testIDs, testHashes, []*types.NewMessage{{}})
s.Require().NoError(err)
envelope := s.monitor.envelopes[testHash]
envelope.attempts = 2
envelope.lastAttemptTime = time.Now().Add(-20 * time.Second)
s.monitor.retryQueue = append(s.monitor.retryQueue, envelope)
s.monitor.retryOnce()
s.Require().Equal(3, envelope.attempts)
s.Require().Len(s.monitor.retryQueue, 0)
s.Require().Equal(envelope.envelopeHashID, s.monitor.envelopes[envelope.envelopeHashID].envelopeHashID)
}
type mockWakuAPI struct{}
func (m *mockWakuAPI) Post(ctx context.Context, msg types.NewMessage) ([]byte, error) {
return []byte{0x01}, nil
}
func (m *mockWakuAPI) AddPrivateKey(ctx context.Context, privateKey types.HexBytes) (string, error) {
return "", nil
}
func (m *mockWakuAPI) GenerateSymKeyFromPassword(ctx context.Context, passwd string) (string, error) {
return "", nil
}
func (m *mockWakuAPI) DeleteKeyPair(ctx context.Context, key string) (bool, error) {
return false, nil
}
func (m *mockWakuAPI) NewMessageFilter(req types.Criteria) (string, error) {
return "", nil
}
func (m *mockWakuAPI) GetFilterMessages(id string) ([]*types.Message, error) {
return nil, nil
}
func (m *mockWakuAPI) BloomFilter() []byte {
return nil
}