diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index dff0ab4a1..039639cef 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -97,24 +97,9 @@ func NewMessageSender( logger *zap.Logger, features FeatureFlags, ) (*MessageSender, error) { - dataSyncTransport := datasync.NewNodeTransport() - dataSyncNode, err := datasyncnode.NewPersistentNode( - database, - dataSyncTransport, - datasyncpeer.PublicKeyToPeerID(identity.PublicKey), - datasyncnode.BATCH, - datasync.CalculateSendTime, - logger, - ) - if err != nil { - return nil, err - } - ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger) - p := &MessageSender{ identity: identity, datasyncEnabled: features.Datasync, - datasync: ds, protocol: enc, database: database, persistence: NewRawMessagesPersistence(database), @@ -128,9 +113,11 @@ func NewMessageSender( // With DataSync enabled, messages are added to the DataSync // but actual encrypt and send calls are postponed. // sendDataSync is responsible for encrypting and sending postponed messages. - if features.Datasync { - ds.Init(p.sendDataSync, logger) - ds.Start(datasync.DatasyncTicker) + if p.datasyncEnabled { + err := p.StartDatasync() + if err != nil { + return nil, err + } } return p, nil @@ -141,7 +128,7 @@ func (s *MessageSender) Stop() { close(c) } s.messageEventsSubscriptions = nil - s.datasync.Stop() // idempotent op + s.StopDatasync() } func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) { @@ -876,12 +863,14 @@ func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessag } } - datasyncMessages, as, err := unwrapDatasyncMessage(response.Message, s.datasync) - if err != nil { - hlogger.Debug("failed to handle datasync message", zap.Error(err)) - } else { - response.DatasyncMessages = append(response.DatasyncMessages, datasyncMessages...) - response.DatasyncAcks = append(response.DatasyncAcks, as...) + if s.datasync != nil && s.datasyncEnabled { + datasyncMessages, as, err := unwrapDatasyncMessage(response.Message, s.datasync) + if err != nil { + hlogger.Debug("failed to handle datasync message", zap.Error(err)) + } else { + response.DatasyncMessages = append(response.DatasyncMessages, datasyncMessages...) + response.DatasyncAcks = append(response.DatasyncAcks, as...) + } } for _, msg := range response.Messages() { @@ -1215,10 +1204,16 @@ func calculatePoW(payload []byte) float64 { } func (s *MessageSender) StopDatasync() { - s.datasync.Stop() + if s.datasync != nil { + s.datasync.Stop() + } } -func (s *MessageSender) StartDatasync() { +func (s *MessageSender) StartDatasync() error { + if !s.datasyncEnabled { + return nil + } + dataSyncTransport := datasync.NewNodeTransport() dataSyncNode, err := datasyncnode.NewPersistentNode( s.database, @@ -1229,16 +1224,15 @@ func (s *MessageSender) StartDatasync() { s.logger, ) if err != nil { - return - } - ds := datasync.New(dataSyncNode, dataSyncTransport, true, s.logger) - - if s.datasyncEnabled { - ds.Init(s.sendDataSync, s.logger) - ds.Start(datasync.DatasyncTicker) + return err } - s.datasync = ds + s.datasync = datasync.New(dataSyncNode, dataSyncTransport, true, s.logger) + + s.datasync.Init(s.sendDataSync, s.logger) + s.datasync.Start(datasync.DatasyncTicker) + + return nil } // GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group diff --git a/protocol/common/message_sender_test.go b/protocol/common/message_sender_test.go index 2aa5394c5..cb6bb0b3c 100644 --- a/protocol/common/message_sender_test.go +++ b/protocol/common/message_sender_test.go @@ -90,7 +90,9 @@ func (s *MessageSenderSuite) SetupTest() { encryptionProtocol, whisperTransport, s.logger, - FeatureFlags{}, + FeatureFlags{ + Datasync: true, + }, ) s.Require().NoError(err) } diff --git a/protocol/messenger_mailserver.go b/protocol/messenger_mailserver.go index 00cc74f33..298a4bd09 100644 --- a/protocol/messenger_mailserver.go +++ b/protocol/messenger_mailserver.go @@ -937,7 +937,10 @@ func (m *Messenger) ConnectionChanged(state connection.State) { } if m.connectionState.Offline && !state.Offline { - m.sender.StartDatasync() + err := m.sender.StartDatasync() + if err != nil { + m.logger.Error("failed to start datasync", zap.Error(err)) + } } m.connectionState = state