Correctly process confirmations

This commit is contained in:
Volodymyr Kozieiev 2021-01-26 13:08:52 +02:00 committed by Andrea Maria Piana
parent f0c9236508
commit 1b9bc4450c
5 changed files with 38 additions and 44 deletions

View File

@ -1 +1 @@
0.90.0 0.91.0

View File

@ -28,7 +28,7 @@ const (
// And event, ideally, should contain information about peer that sent envelope to us. // And event, ideally, should contain information about peer that sent envelope to us.
EventEnvelopeReceived EventType = "envelope.received" EventEnvelopeReceived EventType = "envelope.received"
// EventBatchAcknowledged is sent when batch of envelopes was acknowledged by a peer. // EventBatchAcknowledged is sent when batch of envelopes was acknowledged by a peer.
EventBatchAcknowledged EventType = "batch.acknowleged" EventBatchAcknowledged EventType = "batch.acknowledged"
// EventEnvelopeAvailable fires when envelop is available for filters // EventEnvelopeAvailable fires when envelop is available for filters
EventEnvelopeAvailable EventType = "envelope.available" EventEnvelopeAvailable EventType = "envelope.available"
// EventMailServerRequestSent fires when such request is sent. // EventMailServerRequestSent fires when such request is sent.

View File

@ -25,7 +25,7 @@ const (
type EnvelopesMonitorConfig struct { type EnvelopesMonitorConfig struct {
EnvelopeEventsHandler EnvelopeEventsHandler EnvelopeEventsHandler EnvelopeEventsHandler
MaxAttempts int MaxAttempts int
MailserverConfirmationsEnabled bool AwaitOnlyMailServerConfirmations bool
IsMailserver func(types.EnodeID) bool IsMailserver func(types.EnodeID) bool
Logger *zap.Logger Logger *zap.Logger
} }
@ -55,7 +55,7 @@ func NewEnvelopesMonitor(w types.Waku, config EnvelopesMonitorConfig) *Envelopes
w: w, w: w,
api: api, api: api,
handler: config.EnvelopeEventsHandler, handler: config.EnvelopeEventsHandler,
mailServerConfirmation: config.MailserverConfirmationsEnabled, awaitOnlyMailServerConfirmations: config.AwaitOnlyMailServerConfirmations,
maxAttempts: config.MaxAttempts, maxAttempts: config.MaxAttempts,
isMailserver: config.IsMailserver, isMailserver: config.IsMailserver,
logger: logger.With(zap.Namespace("EnvelopesMonitor")), logger: logger.With(zap.Namespace("EnvelopesMonitor")),
@ -76,7 +76,6 @@ type EnvelopesMonitor struct {
w types.Waku w types.Waku
api types.PublicWakuAPI api types.PublicWakuAPI
handler EnvelopeEventsHandler handler EnvelopeEventsHandler
mailServerConfirmation bool
maxAttempts int maxAttempts int
mu sync.Mutex mu sync.Mutex
@ -87,6 +86,8 @@ type EnvelopesMonitor struct {
attempts map[types.Hash]int attempts map[types.Hash]int
identifiers map[types.Hash][][]byte identifiers map[types.Hash][][]byte
awaitOnlyMailServerConfirmations bool
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
isMailserver func(peer types.EnodeID) bool isMailserver func(peer types.EnodeID) bool
@ -164,7 +165,7 @@ func (m *EnvelopesMonitor) handleEvent(event types.EnvelopeEvent) {
func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) { func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
// Mailserver confirmations for WakuV2 are disabled // Mailserver confirmations for WakuV2 are disabled
if (m.w == nil || m.w.Version() < 2) && m.mailServerConfirmation { if (m.w == nil || m.w.Version() < 2) && m.awaitOnlyMailServerConfirmations {
if !m.isMailserver(event.Peer) { if !m.isMailserver(event.Peer) {
return return
} }
@ -181,26 +182,21 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
return return
} }
m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String())) m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
if event.Batch != (types.Hash{}) { confirmationExpected := event.Batch != (types.Hash{})
if confirmationExpected {
if _, ok := m.batches[event.Batch]; !ok { if _, ok := m.batches[event.Batch]; !ok {
m.batches[event.Batch] = map[types.Hash]struct{}{} m.batches[event.Batch] = map[types.Hash]struct{}{}
} }
m.batches[event.Batch][event.Hash] = struct{}{} m.batches[event.Batch][event.Hash] = struct{}{}
m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String())) m.logger.Debug("waiting for a confirmation", zap.String("batch", event.Batch.String()))
} else {
m.envelopes[event.Hash] = EnvelopeSent
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[event.Hash])
}
} }
} }
func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) { func (m *EnvelopesMonitor) handleAcknowledgedBatch(event types.EnvelopeEvent) {
if m.mailServerConfirmation {
if !m.isMailserver(event.Peer) { if m.awaitOnlyMailServerConfirmations && !m.isMailserver(event.Peer) {
return return
} }
}
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
@ -291,11 +287,9 @@ func (m *EnvelopesMonitor) handleEnvelopeFailure(hash types.Hash, err error) {
} }
func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) { func (m *EnvelopesMonitor) handleEventEnvelopeReceived(event types.EnvelopeEvent) {
if m.mailServerConfirmation { if m.awaitOnlyMailServerConfirmations && !m.isMailserver(event.Peer) {
if !m.isMailserver(event.Peer) {
return return
} }
}
m.mu.Lock() m.mu.Lock()
defer m.mu.Unlock() defer m.mu.Unlock()
state, ok := m.envelopes[event.Hash] state, ok := m.envelopes[event.Hash]

View File

@ -34,14 +34,14 @@ func (s *EnvelopesMonitorSuite) SetupTest() {
EnvelopesMonitorConfig{ EnvelopesMonitorConfig{
EnvelopeEventsHandler: nil, EnvelopeEventsHandler: nil,
MaxAttempts: 0, MaxAttempts: 0,
MailserverConfirmationsEnabled: false, AwaitOnlyMailServerConfirmations: false,
IsMailserver: func(types.EnodeID) bool { return false }, IsMailserver: func(types.EnodeID) bool { return false },
Logger: zap.NewNop(), Logger: zap.NewNop(),
}, },
) )
} }
func (s *EnvelopesMonitorSuite) TestConfirmed() { func (s *EnvelopesMonitorSuite) TestEnvelopePosted() {
s.monitor.Add(testIDs, testHash, types.NewMessage{}) s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.Contains(s.monitor.envelopes, testHash) s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopePosted, s.monitor.envelopes[testHash]) s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
@ -50,7 +50,7 @@ func (s *EnvelopesMonitorSuite) TestConfirmed() {
Hash: testHash, Hash: testHash,
}) })
s.Contains(s.monitor.envelopes, testHash) s.Contains(s.monitor.envelopes, testHash)
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash]) s.Equal(EnvelopePosted, s.monitor.envelopes[testHash])
} }
func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() { func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
@ -96,7 +96,7 @@ func (s *EnvelopesMonitorSuite) TestRemoved() {
func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() { func (s *EnvelopesMonitorSuite) TestIgnoreNotFromMailserver() {
// enables filter in the tracker to drop confirmations from non-mailserver peers // enables filter in the tracker to drop confirmations from non-mailserver peers
s.monitor.mailServerConfirmation = true s.monitor.awaitOnlyMailServerConfirmations = true
s.monitor.Add(testIDs, testHash, types.NewMessage{}) s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.monitor.handleEvent(types.EnvelopeEvent{ s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeSent, Event: types.EventEnvelopeSent,

View File

@ -126,7 +126,7 @@ func (s *Service) InitProtocol(nodeName string, identity *ecdsa.PrivateKey, db *
envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{ envelopesMonitorConfig := &transport.EnvelopesMonitorConfig{
MaxAttempts: s.config.MaxMessageDeliveryAttempts, MaxAttempts: s.config.MaxMessageDeliveryAttempts,
MailserverConfirmationsEnabled: s.config.MailServerConfirmations, AwaitOnlyMailServerConfirmations: s.config.MailServerConfirmations,
IsMailserver: func(peer types.EnodeID) bool { IsMailserver: func(peer types.EnodeID) bool {
return s.peerStore.Exist(peer) return s.peerStore.Exist(peer)
}, },