diff --git a/VERSION b/VERSION index 28cfb23fc..ad7e0bcae 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.91.3 +0.91.4 diff --git a/eth-node/types/envelopes.go b/eth-node/types/envelopes.go index 62c22ba95..7e5283fbc 100644 --- a/eth-node/types/envelopes.go +++ b/eth-node/types/envelopes.go @@ -28,7 +28,7 @@ const ( // And event, ideally, should contain information about peer that sent envelope to us. EventEnvelopeReceived EventType = "envelope.received" // EventBatchAcknowledged is sent when batch of envelopes was acknowledged by a peer. - EventBatchAcknowledged EventType = "batch.acknowleged" + EventBatchAcknowledged EventType = "batch.acknowledged" // EventEnvelopeAvailable fires when envelop is available for filters EventEnvelopeAvailable EventType = "envelope.available" // EventMailServerRequestSent fires when such request is sent. diff --git a/protocol/transport/envelopes_monitor.go b/protocol/transport/envelopes_monitor.go index f65215f95..7ecc2d698 100644 --- a/protocol/transport/envelopes_monitor.go +++ b/protocol/transport/envelopes_monitor.go @@ -23,11 +23,11 @@ const ( ) type EnvelopesMonitorConfig struct { - EnvelopeEventsHandler EnvelopeEventsHandler - MaxAttempts int - MailserverConfirmationsEnabled bool - IsMailserver func(types.EnodeID) bool - Logger *zap.Logger + EnvelopeEventsHandler EnvelopeEventsHandler + MaxAttempts int + AwaitOnlyMailServerConfirmations bool + IsMailserver func(types.EnodeID) bool + Logger *zap.Logger } // EnvelopeEventsHandler used for two different event types. @@ -52,13 +52,13 @@ func NewEnvelopesMonitor(w types.Waku, config EnvelopesMonitorConfig) *Envelopes } return &EnvelopesMonitor{ - w: w, - api: api, - handler: config.EnvelopeEventsHandler, - mailServerConfirmation: config.MailserverConfirmationsEnabled, - maxAttempts: config.MaxAttempts, - isMailserver: config.IsMailserver, - logger: logger.With(zap.Namespace("EnvelopesMonitor")), + w: w, + api: api, + handler: config.EnvelopeEventsHandler, + awaitOnlyMailServerConfirmations: config.AwaitOnlyMailServerConfirmations, + maxAttempts: config.MaxAttempts, + isMailserver: config.IsMailserver, + logger: logger.With(zap.Namespace("EnvelopesMonitor")), // key is envelope hash (event.Hash) envelopes: map[types.Hash]EnvelopeState{}, @@ -73,11 +73,10 @@ func NewEnvelopesMonitor(w types.Waku, config EnvelopesMonitorConfig) *Envelopes // EnvelopesMonitor is responsible for monitoring waku envelopes state. type EnvelopesMonitor struct { - w types.Waku - api types.PublicWakuAPI - handler EnvelopeEventsHandler - mailServerConfirmation bool - maxAttempts int + w types.Waku + api types.PublicWakuAPI + handler EnvelopeEventsHandler + maxAttempts int mu sync.Mutex envelopes map[types.Hash]EnvelopeState @@ -87,6 +86,8 @@ type EnvelopesMonitor struct { attempts map[types.Hash]int identifiers map[types.Hash][][]byte + awaitOnlyMailServerConfirmations bool + wg sync.WaitGroup quit chan struct{} isMailserver func(peer types.EnodeID) bool @@ -164,7 +165,7 @@ func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) { func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) { // Mailserver confirmations for WakuV2 are disabled - if (m.w == nil || m.w.Version() < 2) && m.mailServerConfirmation { + if (m.w == nil || m.w.Version() < 2) && m.awaitOnlyMailServerConfirmations { if !m.isMailserver(event.Peer) { return } @@ -181,25 +182,20 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) { return } m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) - if event.Batch != (types.Hash{}) { + confirmationExpected := event.Batch != (types.Hash{}) + if confirmationExpected { if _, ok := m.batches[event.Batch]; !ok { m.batches[event.Batch] = map[types.Hash]struct{}{} } m.batches[event.Batch][event.Hash] = struct{}{} m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String())) - } else { - m.envelopes[event.Hash] = EnvelopeSent - if m.handler != nil { - m.handler.EnvelopeSent(m.identifiers[event.Hash]) - } } } func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } + + if m.awaitOnlyMailServerConfirmations && !m.isMailserver(event.Peer) { + return } m.mu.Lock() @@ -291,10 +287,8 @@ func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) { } func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) { - if m.mailServerConfirmation { - if !m.isMailserver(event.Peer) { - return - } + if m.awaitOnlyMailServerConfirmations && !m.isMailserver(event.Peer) { + return } m.mu.Lock() defer m.mu.Unlock() diff --git a/protocol/transport/envelopes_monitor_test.go b/protocol/transport/envelopes_monitor_test.go index c93ede598..c3862ca7a 100644 --- a/protocol/transport/envelopes_monitor_test.go +++ b/protocol/transport/envelopes_monitor_test.go @@ -32,16 +32,16 @@ func (s *EnvelopesMonitorSuite) SetupTest() { s.monitor = NewEnvelopesMonitor( nil, EnvelopesMonitorConfig{ - EnvelopeEventsHandler: nil, - MaxAttempts: 0, - MailserverConfirmationsEnabled: false, - IsMailserver: func(types.EnodeID) bool { return false }, - Logger: zap.NewNop(), + EnvelopeEventsHandler: nil, + MaxAttempts: 0, + AwaitOnlyMailServerConfirmations: false, + IsMailserver: func(types.EnodeID) bool { return false }, + Logger: zap.NewNop(), }, ) } -func (s *EnvelopesMonitorSuite) TestConfirmed() { +func (s *EnvelopesMonitorSuite) TestEnvelopePosted() { s.monitor.Add(testIDs, testHash, types.NewMessage{}) s.Contains(s.monitor.envelopes, testHash) s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) @@ -50,7 +50,7 @@ func (s *EnvelopesMonitorSuite) TestConfirmed() { Hash: testHash, }) s.Contains(s.monitor.envelopes, testHash) - s.Equal(EnvelopeSent, s.monitor.envelopes[testHash]) + s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) } func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() { @@ -96,7 +96,7 @@ func (s *EnvelopesMonitorSuite) TestRemoved() { func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() { // enables filter in the tracker to drop confirmations from non-mailserver peers - s.monitor.mailServerConfirmation = true + s.monitor.awaitOnlyMailServerConfirmations = true s.monitor.Add(testIDs, testHash, types.NewMessage{}) s.monitor.handleEvent(types.EnvelopeEvent{ Event: types.EventEnvelopeSent, diff --git a/services/ext/service.go b/services/ext/service.go index 4b9e60383..bf4944bec 100644 --- a/services/ext/service.go +++ b/services/ext/service.go @@ -125,8 +125,8 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db * } envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{ - MaxAttempts: s.config.MaxMessageDeliveryAttempts, - MailserverConfirmationsEnabled: s.config.MailServerConfirmations, + MaxAttempts: s.config.MaxMessageDeliveryAttempts, + AwaitOnlyMailServerConfirmations: s.config.MailServerConfirmations, IsMailserver: func(peer types.EnodeID) bool { return s.peerStore.Exist(peer) },