Handle race condition when sending envelopes

It happens that an envelope is sent before it's tracked, resulting in
long delays before the envelope is marked as sent.
This commit changes the behavior of the code so that order is now
irrelevant.
This commit is contained in:
Andrea Maria Piana 2023-06-13 16:08:22 +01:00
parent e058b50a2e
commit 8015cc3e3b
5 changed files with 39 additions and 20 deletions

View File

@ -1 +1 @@
0.157.3
0.157.4

2
go.mod
View File

@ -64,7 +64,7 @@ require (
go.uber.org/zap v1.24.0
golang.org/x/crypto v0.7.0
golang.org/x/image v0.0.0-20210220032944-ac19c3e999fb
google.golang.org/protobuf v1.30.1-0.20230508203708-b8fc77060104 // indirect
google.golang.org/protobuf v1.30.1-0.20230508203708-b8fc77060104
gopkg.in/go-playground/assert.v1 v1.2.1 // indirect
gopkg.in/go-playground/validator.v9 v9.31.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0

View File

@ -217,10 +217,11 @@ func (interceptor EnvelopeEventsInterceptor) EnvelopeSent(identifiers [][]byte)
err := interceptor.Messenger.processSentMessages(ids)
if err != nil {
interceptor.Messenger.logger.Info("Messenger failed to process sent messages", zap.Error(err))
} else {
interceptor.EnvelopeEventsHandler.EnvelopeSent(identifiers)
interceptor.Messenger.logger.Info("messenger failed to process sent messages", zap.Error(err))
}
// We notify the client, regardless whether we were able to mark them as sent
interceptor.EnvelopeEventsHandler.EnvelopeSent(identifiers)
} else {
// NOTE(rasom): In case if interceptor.Messenger is not nil and
// some error occurred on processing sent message we don't want
@ -2090,7 +2091,7 @@ func (m *Messenger) SendChatMessages(ctx context.Context, messages []*common.Mes
return &response, nil
}
// SendChatMessage takes a minimal message and sends it based on the corresponding chat
// sendChatMessage takes a minimal message and sends it based on the corresponding chat
func (m *Messenger) sendChatMessage(ctx context.Context, message *common.Message) (*MessengerResponse, error) {
displayName, err := m.settings.DisplayName()
if err != nil {

View File

@ -115,10 +115,18 @@ func (m *EnvelopesMonitor) Stop() {
func (m *EnvelopesMonitor) Add(identifiers [][]byte, envelopeHash types.Hash, message types.NewMessage) {
m.mu.Lock()
defer m.mu.Unlock()
m.envelopes[envelopeHash] = EnvelopePosted
m.identifiers[envelopeHash] = identifiers
// If it's already been marked as sent, we notify the client
if m.envelopes[envelopeHash] == EnvelopeSent {
if m.handler != nil {
m.handler.EnvelopeSent(m.identifiers[envelopeHash])
}
} else {
// otherwise we keep track of the message
m.messages[envelopeHash] = &message
m.attempts[envelopeHash] = 1
m.envelopes[envelopeHash] = EnvelopePosted
}
}
func (m *EnvelopesMonitor) GetState(hash types.Hash) EnvelopeState {
@ -174,15 +182,22 @@ func (m *EnvelopesMonitor) handleEventEnvelopeSent(event types.EnvelopeEvent) {
m.mu.Lock()
defer m.mu.Unlock()
confirmationExpected := event.Batch != (types.Hash{})
state, ok := m.envelopes[event.Hash]
// if we didn't send a message using extension - skip it
// If confirmations are not expected, we keep track of the envelope
// being sent
if !ok && !confirmationExpected {
m.envelopes[event.Hash] = EnvelopeSent
return
}
// if message was already confirmed - skip it
if !ok || state == EnvelopeSent {
if state == EnvelopeSent {
return
}
m.logger.Debug("envelope is sent", zap.String("hash", event.Hash.String()), zap.String("peer", event.Peer.String()))
confirmationExpected := event.Batch != (types.Hash{})
if confirmationExpected {
if _, ok := m.batches[event.Batch]; !ok {
m.batches[event.Batch] = map[types.Hash]struct{}{}

View File

@ -53,6 +53,17 @@ func (s *EnvelopesMonitorSuite) TestEnvelopePosted() {
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash])
}
func (s *EnvelopesMonitorSuite) TestEnvelopePostedOutOfOrder() {
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeSent,
Hash: testHash,
})
s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.Require().Contains(s.monitor.envelopes, testHash)
s.Require().Equal(EnvelopeSent, s.monitor.envelopes[testHash])
}
func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
testBatch := types.Hash{1}
pkey, err := crypto.GenerateKey()
@ -76,14 +87,6 @@ func (s *EnvelopesMonitorSuite) TestConfirmedWithAcknowledge() {
s.Equal(EnvelopeSent, s.monitor.envelopes[testHash])
}
func (s *EnvelopesMonitorSuite) TestIgnored() {
s.monitor.handleEvent(types.EnvelopeEvent{
Event: types.EventEnvelopeSent,
Hash: testHash,
})
s.NotContains(s.monitor.envelopes, testHash)
}
func (s *EnvelopesMonitorSuite) TestRemoved() {
s.monitor.Add(testIDs, testHash, types.NewMessage{})
s.Contains(s.monitor.envelopes, testHash)