refactor: eliminate datasync initialization logic duplication

This commit is contained in:
Patryk Osmaczko 2023-11-17 16:26:49 +01:00 committed by osmaczko
parent 7b3013a010
commit 4a9220bf96
3 changed files with 36 additions and 37 deletions

View File

@ -97,24 +97,9 @@ func NewMessageSender(
logger *zap.Logger, logger *zap.Logger,
features FeatureFlags, features FeatureFlags,
) (*MessageSender, error) { ) (*MessageSender, error) {
dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
logger,
)
if err != nil {
return nil, err
}
ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger)
p := &MessageSender{ p := &MessageSender{
identity: identity, identity: identity,
datasyncEnabled: features.Datasync, datasyncEnabled: features.Datasync,
datasync: ds,
protocol: enc, protocol: enc,
database: database, database: database,
persistence: NewRawMessagesPersistence(database), persistence: NewRawMessagesPersistence(database),
@ -128,9 +113,11 @@ func NewMessageSender(
// With DataSync enabled, messages are added to the DataSync // With DataSync enabled, messages are added to the DataSync
// but actual encrypt and send calls are postponed. // but actual encrypt and send calls are postponed.
// sendDataSync is responsible for encrypting and sending postponed messages. // sendDataSync is responsible for encrypting and sending postponed messages.
if features.Datasync { if p.datasyncEnabled {
ds.Init(p.sendDataSync, logger) err := p.StartDatasync()
ds.Start(datasync.DatasyncTicker) if err != nil {
return nil, err
}
} }
return p, nil return p, nil
@ -141,7 +128,7 @@ func (s *MessageSender) Stop() {
close(c) close(c)
} }
s.messageEventsSubscriptions = nil s.messageEventsSubscriptions = nil
s.datasync.Stop() // idempotent op s.StopDatasync()
} }
func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) { func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
@ -876,6 +863,7 @@ func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessag
} }
} }
if s.datasync != nil && s.datasyncEnabled {
datasyncMessages, as, err := unwrapDatasyncMessage(response.Message, s.datasync) datasyncMessages, as, err := unwrapDatasyncMessage(response.Message, s.datasync)
if err != nil { if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err)) hlogger.Debug("failed to handle datasync message", zap.Error(err))
@ -883,6 +871,7 @@ func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessag
response.DatasyncMessages = append(response.DatasyncMessages, datasyncMessages...) response.DatasyncMessages = append(response.DatasyncMessages, datasyncMessages...)
response.DatasyncAcks = append(response.DatasyncAcks, as...) response.DatasyncAcks = append(response.DatasyncAcks, as...)
} }
}
for _, msg := range response.Messages() { for _, msg := range response.Messages() {
err := msg.HandleApplicationLayer() err := msg.HandleApplicationLayer()
@ -1215,10 +1204,16 @@ func calculatePoW(payload []byte) float64 {
} }
func (s *MessageSender) StopDatasync() { func (s *MessageSender) StopDatasync() {
if s.datasync != nil {
s.datasync.Stop() s.datasync.Stop()
} }
}
func (s *MessageSender) StartDatasync() error {
if !s.datasyncEnabled {
return nil
}
func (s *MessageSender) StartDatasync() {
dataSyncTransport := datasync.NewNodeTransport() dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode( dataSyncNode, err := datasyncnode.NewPersistentNode(
s.database, s.database,
@ -1229,16 +1224,15 @@ func (s *MessageSender) StartDatasync() {
s.logger, s.logger,
) )
if err != nil { if err != nil {
return return err
}
ds := datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)
if s.datasyncEnabled {
ds.Init(s.sendDataSync, s.logger)
ds.Start(datasync.DatasyncTicker)
} }
s.datasync = ds s.datasync = datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)
s.datasync.Init(s.sendDataSync, s.logger)
s.datasync.Start(datasync.DatasyncTicker)
return nil
} }
// GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group // GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group

View File

@ -90,7 +90,9 @@ func (s *MessageSenderSuite) SetupTest() {
encryptionProtocol, encryptionProtocol,
whisperTransport, whisperTransport,
s.logger, s.logger,
FeatureFlags{}, FeatureFlags{
Datasync: true,
},
) )
s.Require().NoError(err) s.Require().NoError(err)
} }

View File

@ -937,7 +937,10 @@ func (m *Messenger) ConnectionChanged(state connection.State) {
} }
if m.connectionState.Offline && !state.Offline { if m.connectionState.Offline && !state.Offline {
m.sender.StartDatasync() err := m.sender.StartDatasync()
if err != nil {
m.logger.Error("failed to start datasync", zap.Error(err))
}
} }
m.connectionState = state m.connectionState = state