From 0e538c0a95f6fb3f9fa67fd0292fec14d56c45cd Mon Sep 17 00:00:00 2001 From: Volodymyr Kozieiev Date: Wed, 23 Jun 2021 17:13:48 +0300 Subject: [PATCH] Rename MessageProcessor to MessageSender (#2264) --- VERSION | 2 +- ...message_processor.go => message_sender.go} | 258 +++++++++--------- ...ocessor_test.go => message_sender_test.go} | 32 +-- protocol/datasync/transport.go | 4 +- protocol/encryption/protocol.go | 4 +- protocol/messenger.go | 56 ++-- protocol/messenger_communities.go | 6 +- protocol/messenger_test.go | 2 +- protocol/pushnotificationclient/client.go | 26 +- protocol/pushnotificationserver/server.go | 20 +- 10 files changed, 205 insertions(+), 205 deletions(-) rename protocol/common/{message_processor.go => message_sender.go} (70%) rename protocol/common/{message_processor_test.go => message_sender_test.go} (87%) diff --git a/VERSION b/VERSION index 964b0a805..e72f24026 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.80.00 +0.80.1 diff --git a/protocol/common/message_processor.go b/protocol/common/message_sender.go similarity index 70% rename from protocol/common/message_processor.go rename to protocol/common/message_sender.go index a3516d0c1..00c590438 100644 --- a/protocol/common/message_processor.go +++ b/protocol/common/message_sender.go @@ -43,7 +43,7 @@ type SentMessage struct { MessageIDs [][]byte } -type MessageProcessor struct { +type MessageSender struct { identity *ecdsa.PrivateKey datasync *datasync.DataSync protocol *encryption.Protocol @@ -67,14 +67,14 @@ type MessageProcessor struct { handleSharedSecrets func([]*sharedsecret.Secret) error } -func NewMessageProcessor( +func NewMessageSender( identity *ecdsa.PrivateKey, database *sql.DB, enc *encryption.Protocol, transport *transport.Transport, logger *zap.Logger, features FeatureFlags, -) (*MessageProcessor, error) { +) (*MessageSender, error) { dataSyncTransport := datasync.NewNodeTransport() dataSyncNode, err := datasyncnode.NewPersistentNode( database, @@ -89,7 +89,7 @@ func NewMessageProcessor( } ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger) - p := &MessageProcessor{ + p := &MessageSender{ identity: identity, datasync: ds, protocol: enc, @@ -116,25 +116,25 @@ func NewMessageProcessor( return p, nil } -func (p *MessageProcessor) Stop() { - for _, c := range p.sentMessagesSubscriptions { +func (s *MessageSender) Stop() { + for _, c := range s.sentMessagesSubscriptions { close(c) } - p.sentMessagesSubscriptions = nil - p.datasync.Stop() // idempotent op + s.sentMessagesSubscriptions = nil + s.datasync.Stop() // idempotent op } -func (p *MessageProcessor) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) { - p.handleSharedSecrets = handler +func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) { + s.handleSharedSecrets = handler } // SendPrivate takes encoded data, encrypts it and sends through the wire. -func (p *MessageProcessor) SendPrivate( +func (s *MessageSender) SendPrivate( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, ) ([]byte, error) { - p.logger.Debug( + s.logger.Debug( "sending a private message", zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))), zap.String("site", "SendPrivate"), @@ -148,47 +148,47 @@ func (p *MessageProcessor) SendPrivate( // Set sender identity if not specified if rawMessage.Sender == nil { - rawMessage.Sender = p.identity + rawMessage.Sender = s.identity } - return p.sendPrivate(ctx, recipient, rawMessage) + return s.sendPrivate(ctx, recipient, rawMessage) } // SendCommunityMessage takes encoded data, encrypts it and sends through the wire // using the community topic and their key -func (p *MessageProcessor) SendCommunityMessage( +func (s *MessageSender) SendCommunityMessage( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage RawMessage, ) ([]byte, error) { - p.logger.Debug( + s.logger.Debug( "sending a community message", zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))), zap.String("site", "SendPrivate"), ) - rawMessage.Sender = p.identity + rawMessage.Sender = s.identity - return p.sendCommunity(ctx, recipient, &rawMessage) + return s.sendCommunity(ctx, recipient, &rawMessage) } // SendGroup takes encoded data, encrypts it and sends through the wire, // always return the messageID -func (p *MessageProcessor) SendGroup( +func (s *MessageSender) SendGroup( ctx context.Context, recipients []*ecdsa.PublicKey, rawMessage RawMessage, ) ([]byte, error) { - p.logger.Debug( + s.logger.Debug( "sending a private group message", zap.String("site", "SendGroup"), ) // Set sender if not specified if rawMessage.Sender == nil { - rawMessage.Sender = p.identity + rawMessage.Sender = s.identity } // Calculate messageID first and set on raw message - wrappedMessage, err := p.wrapMessageV1(&rawMessage) + wrappedMessage, err := s.wrapMessageV1(&rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } @@ -197,7 +197,7 @@ func (p *MessageProcessor) SendGroup( // Send to each recipients for _, recipient := range recipients { - _, err = p.sendPrivate(ctx, recipient, &rawMessage) + _, err = s.sendPrivate(ctx, recipient, &rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to send message") } @@ -206,14 +206,14 @@ func (p *MessageProcessor) SendGroup( } // sendCommunity sends data to the recipient identifying with a given public key. -func (p *MessageProcessor) sendCommunity( +func (s *MessageSender) sendCommunity( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, ) ([]byte, error) { - p.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) + s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) - wrappedMessage, err := p.wrapMessageV1(rawMessage) + wrappedMessage, err := s.wrapMessageV1(rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } @@ -223,29 +223,29 @@ func (p *MessageProcessor) sendCommunity( // Notify before dispatching, otherwise the dispatch subscription might happen // earlier than the scheduled - p.notifyOnScheduledMessage(rawMessage) + s.notifyOnScheduledMessage(rawMessage) messageIDs := [][]byte{messageID} - hash, newMessage, err := p.sendCommunityRawMessage(ctx, recipient, wrappedMessage, messageIDs) + hash, newMessage, err := s.sendCommunityRawMessage(ctx, recipient, wrappedMessage, messageIDs) if err != nil { - p.logger.Error("failed to send a community message", zap.Error(err)) + s.logger.Error("failed to send a community message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } - p.transport.Track(messageIDs, hash, newMessage) + s.transport.Track(messageIDs, hash, newMessage) return messageID, nil } // sendPrivate sends data to the recipient identifying with a given public key. -func (p *MessageProcessor) sendPrivate( +func (s *MessageSender) sendPrivate( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage *RawMessage, ) ([]byte, error) { - p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) + s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) - wrappedMessage, err := p.wrapMessageV1(rawMessage) + wrappedMessage, err := s.wrapMessageV1(rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } @@ -255,24 +255,24 @@ func (p *MessageProcessor) sendPrivate( // Notify before dispatching, otherwise the dispatch subscription might happen // earlier than the scheduled - p.notifyOnScheduledMessage(rawMessage) + s.notifyOnScheduledMessage(rawMessage) - if p.featureFlags.Datasync && rawMessage.ResendAutomatically { + if s.featureFlags.Datasync && rawMessage.ResendAutomatically { // No need to call transport tracking. // It is done in a data sync dispatch step. - datasyncID, err := p.addToDataSync(recipient, wrappedMessage) + datasyncID, err := s.addToDataSync(recipient, wrappedMessage) if err != nil { return nil, errors.Wrap(err, "failed to send message with datasync") } // We don't need to receive confirmations from our own devices - if !IsPubKeyEqual(recipient, &p.identity.PublicKey) { + if !IsPubKeyEqual(recipient, &s.identity.PublicKey) { confirmation := &RawMessageConfirmation{ DataSyncID: datasyncID, MessageID: messageID, PublicKey: crypto.CompressPubkey(recipient), } - err = p.persistence.InsertPendingConfirmation(confirmation) + err = s.persistence.InsertPendingConfirmation(confirmation) if err != nil { return nil, err } @@ -280,24 +280,24 @@ func (p *MessageProcessor) sendPrivate( } else if rawMessage.SkipEncryption { // When SkipEncryption is set we don't pass the message to the encryption layer messageIDs := [][]byte{messageID} - hash, newMessage, err := p.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs) + hash, newMessage, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs) if err != nil { - p.logger.Error("failed to send a private message", zap.Error(err)) + s.logger.Error("failed to send a private message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } - p.transport.Track(messageIDs, hash, newMessage) + s.transport.Track(messageIDs, hash, newMessage) } else { - messageSpec, err := p.protocol.BuildDirectMessage(rawMessage.Sender, recipient, wrappedMessage) + messageSpec, err := s.protocol.BuildDirectMessage(rawMessage.Sender, recipient, wrappedMessage) if err != nil { return nil, errors.Wrap(err, "failed to encrypt message") } // The shared secret needs to be handle before we send a message // otherwise the topic might not be set up before we receive a message - if p.handleSharedSecrets != nil { - err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret}) + if s.handleSharedSecrets != nil { + err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret}) if err != nil { return nil, err } @@ -305,50 +305,50 @@ func (p *MessageProcessor) sendPrivate( } messageIDs := [][]byte{messageID} - hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) + hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) if err != nil { - p.logger.Error("failed to send a private message", zap.Error(err)) + s.logger.Error("failed to send a private message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } - p.transport.Track(messageIDs, hash, newMessage) + s.transport.Track(messageIDs, hash, newMessage) } return messageID, nil } // sendPairInstallation sends data to the recipients, using DH -func (p *MessageProcessor) SendPairInstallation( +func (s *MessageSender) SendPairInstallation( ctx context.Context, recipient *ecdsa.PublicKey, rawMessage RawMessage, ) ([]byte, error) { - p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) + s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient)))) - wrappedMessage, err := p.wrapMessageV1(&rawMessage) + wrappedMessage, err := s.wrapMessageV1(&rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } - messageSpec, err := p.protocol.BuildDHMessage(p.identity, recipient, wrappedMessage) + messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage) if err != nil { return nil, errors.Wrap(err, "failed to encrypt message") } - messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage) + messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage) messageIDs := [][]byte{messageID} - hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) + hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) if err != nil { return nil, errors.Wrap(err, "failed to send a message spec") } - p.transport.Track(messageIDs, hash, newMessage) + s.transport.Track(messageIDs, hash, newMessage) return messageID, nil } -func (p *MessageProcessor) encodeMembershipUpdate( +func (s *MessageSender) encodeMembershipUpdate( message v1protocol.MembershipUpdateMessage, chatEntity ChatEntity, ) ([]byte, error) { @@ -374,7 +374,7 @@ func (p *MessageProcessor) encodeMembershipUpdate( // EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire. // All the events in a group are encoded and added to the payload -func (p *MessageProcessor) EncodeMembershipUpdate( +func (s *MessageSender) EncodeMembershipUpdate( group *v1protocol.Group, chatEntity ChatEntity, ) ([]byte, error) { @@ -383,43 +383,43 @@ func (p *MessageProcessor) EncodeMembershipUpdate( Events: group.Events(), } - return p.encodeMembershipUpdate(message, chatEntity) + return s.encodeMembershipUpdate(message, chatEntity) } // EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire. // Only the events relevant to the sender are encoded -func (p *MessageProcessor) EncodeAbridgedMembershipUpdate( +func (s *MessageSender) EncodeAbridgedMembershipUpdate( group *v1protocol.Group, chatEntity ChatEntity, ) ([]byte, error) { message := v1protocol.MembershipUpdateMessage{ ChatID: group.ChatID(), - Events: group.AbridgedEvents(&p.identity.PublicKey), + Events: group.AbridgedEvents(&s.identity.PublicKey), } - return p.encodeMembershipUpdate(message, chatEntity) + return s.encodeMembershipUpdate(message, chatEntity) } // SendPublic takes encoded data, encrypts it and sends through the wire. -func (p *MessageProcessor) SendPublic( +func (s *MessageSender) SendPublic( ctx context.Context, chatName string, rawMessage RawMessage, ) ([]byte, error) { // Set sender if rawMessage.Sender == nil { - rawMessage.Sender = p.identity + rawMessage.Sender = s.identity } - wrappedMessage, err := p.wrapMessageV1(&rawMessage) + wrappedMessage, err := s.wrapMessageV1(&rawMessage) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") } var newMessage *types.NewMessage - messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage) + messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage) if err != nil { - p.logger.Error("failed to send a public message", zap.Error(err)) + s.logger.Error("failed to send a public message", zap.Error(err)) return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer") } @@ -441,9 +441,9 @@ func (p *MessageProcessor) SendPublic( rawMessage.ID = types.EncodeHex(messageID) // notify before dispatching - p.notifyOnScheduledMessage(&rawMessage) + s.notifyOnScheduledMessage(&rawMessage) - hash, err := p.transport.SendPublic(ctx, newMessage, chatName) + hash, err := s.transport.SendPublic(ctx, newMessage, chatName) if err != nil { return nil, err } @@ -453,9 +453,9 @@ func (p *MessageProcessor) SendPublic( MessageIDs: [][]byte{messageID}, } - p.notifyOnSentMessage(sentMessage) + s.notifyOnSentMessage(sentMessage) - p.transport.Track([][]byte{messageID}, hash, newMessage) + s.transport.Track([][]byte{messageID}, hash, newMessage) return messageID, nil } @@ -489,8 +489,8 @@ func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataS // layer message, or in case of Raw methods, the processing stops at the layer // before. // It returns an error only if the processing of required steps failed. -func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, [][]byte, error) { - logger := p.logger.With(zap.String("site", "handleMessages")) +func (s *MessageSender) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, [][]byte, error) { + logger := s.logger.With(zap.String("site", "handleMessages")) hlogger := logger.With(zap.ByteString("hash", shhMessage.Hash)) var statusMessage v1protocol.StatusMessage var statusMessages []*v1protocol.StatusMessage @@ -501,12 +501,12 @@ func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, application return nil, nil, err } - err = p.handleEncryptionLayer(context.Background(), &statusMessage) + err = s.handleEncryptionLayer(context.Background(), &statusMessage) if err != nil { hlogger.Debug("failed to handle an encryption message", zap.Error(err)) } - statusMessages, acks, err := unwrapDatasyncMessage(&statusMessage, p.datasync) + statusMessages, acks, err := unwrapDatasyncMessage(&statusMessage, s.datasync) if err != nil { hlogger.Debug("failed to handle datasync message", zap.Error(err)) //that wasn't a datasync message, so use the original payload @@ -531,32 +531,32 @@ func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, application } // fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key -func (p *MessageProcessor) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) { +func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) { destinationID := types.EncodeHex(crypto.FromECDSAPub(destination)) - p.ephemeralKeysMutex.Lock() - decryptionKey, ok := p.ephemeralKeys[destinationID] - p.ephemeralKeysMutex.Unlock() + s.ephemeralKeysMutex.Lock() + decryptionKey, ok := s.ephemeralKeys[destinationID] + s.ephemeralKeysMutex.Unlock() // the key is not there, fallback on identity if !ok { - return p.identity, false + return s.identity, false } return decryptionKey, true } -func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error { - logger := p.logger.With(zap.String("site", "handleEncryptionLayer")) +func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error { + logger := s.logger.With(zap.String("site", "handleEncryptionLayer")) publicKey := message.SigPubKey() // if it's an ephemeral key, we don't negotiate a topic - decryptionKey, skipNegotiation := p.fetchDecryptionKey(message.Dst) + decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.Dst) - err := message.HandleEncryption(decryptionKey, publicKey, p.protocol, skipNegotiation) + err := message.HandleEncryption(decryptionKey, publicKey, s.protocol, skipNegotiation) // if it's an ephemeral key, we don't have to handle a device not found error if err == encryption.ErrDeviceNotFound && !skipNegotiation { - if err := p.handleErrDeviceNotFound(ctx, publicKey); err != nil { + if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil { logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err)) } } @@ -567,9 +567,9 @@ func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v return nil } -func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error { +func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error { now := time.Now().Unix() - advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now) + advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now) if err != nil { return err } @@ -577,7 +577,7 @@ func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKe return nil } - messageSpec, err := p.protocol.BuildBundleAdvertiseMessage(p.identity, publicKey) + messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey) if err != nil { return err } @@ -586,17 +586,17 @@ func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKe defer cancel() // We don't pass an array of messageIDs as no action needs to be taken // when sending a bundle - _, _, err = p.sendMessageSpec(ctx, publicKey, messageSpec, nil) + _, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil) if err != nil { return err } - p.protocol.ConfirmBundleAdvertisement(publicKey, now) + s.protocol.ConfirmBundleAdvertisement(publicKey, now) return nil } -func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) { +func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) { wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender) if err != nil { return nil, errors.Wrap(err, "failed to wrap message") @@ -604,19 +604,19 @@ func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) return wrappedMessage, nil } -func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) { - groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey) +func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) { + groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey) peerID := datasyncpeer.PublicKeyToPeerID(*publicKey) - exist, err := p.datasync.IsPeerInGroup(groupID, peerID) + exist, err := s.datasync.IsPeerInGroup(groupID, peerID) if err != nil { return nil, errors.Wrap(err, "failed to check if peer is in group") } if !exist { - if err := p.datasync.AddPeer(groupID, peerID); err != nil { + if err := s.datasync.AddPeer(groupID, peerID); err != nil { return nil, errors.Wrap(err, "failed to add peer") } } - id, err := p.datasync.AppendMessage(groupID, message) + id, err := s.datasync.AppendMessage(groupID, message) if err != nil { return nil, errors.Wrap(err, "failed to append message to datasync") } @@ -626,41 +626,41 @@ func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []b // sendDataSync sends a message scheduled by the data sync layer. // Data Sync layer calls this method "dispatch" function. -func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error { +func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, marshalledDatasyncPayload []byte, payload *datasyncproto.Payload) error { // Calculate the messageIDs messageIDs := make([][]byte, 0, len(payload.Messages)) for _, payload := range payload.Messages { - messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body)) + messageIDs = append(messageIDs, v1protocol.MessageID(&s.identity.PublicKey, payload.Body)) } - messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, encodedMessage) + messageSpec, err := s.protocol.BuildDirectMessage(s.identity, publicKey, marshalledDatasyncPayload) if err != nil { return errors.Wrap(err, "failed to encrypt message") } // The shared secret needs to be handle before we send a message // otherwise the topic might not be set up before we receive a message - if p.handleSharedSecrets != nil { - err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret}) + if s.handleSharedSecrets != nil { + err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret}) if err != nil { return err } } - hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs) + hash, newMessage, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs) if err != nil { - p.logger.Error("failed to send a datasync message", zap.Error(err)) + s.logger.Error("failed to send a datasync message", zap.Error(err)) return err } - p.transport.Track(messageIDs, hash, newMessage) + s.transport.Track(messageIDs, hash, newMessage) return nil } // sendPrivateRawMessage sends a message not wrapped in an encryption layer -func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { +func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, @@ -671,9 +671,9 @@ func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage var err error if rawMessage.SendOnPersonalTopic { - hash, err = p.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey) + hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey) } else { - hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) + hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) } if err != nil { return nil, nil, err @@ -684,7 +684,7 @@ func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage // sendCommunityRawMessage sends a message not wrapped in an encryption layer // to a community -func (p *MessageProcessor) sendCommunityRawMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { +func (s *MessageSender) sendCommunityRawMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, @@ -692,7 +692,7 @@ func (p *MessageProcessor) sendCommunityRawMessage(ctx context.Context, publicKe PowTime: whisperPoWTime, } - hash, err := p.transport.SendCommunityMessage(ctx, newMessage, publicKey) + hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey) if err != nil { return nil, nil, err } @@ -701,23 +701,23 @@ func (p *MessageProcessor) sendCommunityRawMessage(ctx context.Context, publicKe } // sendMessageSpec analyses the spec properties and selects a proper transport method. -func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { +func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { newMessage, err := MessageSpecToWhisper(messageSpec) if err != nil { return nil, nil, err } - logger := p.logger.With(zap.String("site", "sendMessageSpec")) + logger := s.logger.With(zap.String("site", "sendMessageSpec")) var hash []byte // process shared secret if messageSpec.AgreedSecret { logger.Debug("sending using shared secret") - hash, err = p.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key) + hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key) } else { logger.Debug("sending partitioned topic") - hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) + hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) } if err != nil { return nil, nil, err @@ -729,61 +729,61 @@ func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa MessageIDs: messageIDs, } - p.notifyOnSentMessage(sentMessage) + s.notifyOnSentMessage(sentMessage) return hash, newMessage, nil } // SubscribeToSentMessages returns a channel where we publish every time a message is sent -func (p *MessageProcessor) SubscribeToSentMessages() <-chan *SentMessage { +func (s *MessageSender) SubscribeToSentMessages() <-chan *SentMessage { c := make(chan *SentMessage, 100) - p.sentMessagesSubscriptions = append(p.sentMessagesSubscriptions, c) + s.sentMessagesSubscriptions = append(s.sentMessagesSubscriptions, c) return c } -func (p *MessageProcessor) notifyOnSentMessage(sentMessage *SentMessage) { +func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) { // Publish on channels, drop if buffer is full - for _, c := range p.sentMessagesSubscriptions { + for _, c := range s.sentMessagesSubscriptions { select { case c <- sentMessage: default: - p.logger.Warn("sent messages subscription channel full, dropping message") + s.logger.Warn("sent messages subscription channel full, dropping message") } } } // SubscribeToScheduledMessages returns a channel where we publish every time a message is scheduled for sending -func (p *MessageProcessor) SubscribeToScheduledMessages() <-chan *RawMessage { +func (s *MessageSender) SubscribeToScheduledMessages() <-chan *RawMessage { c := make(chan *RawMessage, 100) - p.scheduledMessagesSubscriptions = append(p.scheduledMessagesSubscriptions, c) + s.scheduledMessagesSubscriptions = append(s.scheduledMessagesSubscriptions, c) return c } -func (p *MessageProcessor) notifyOnScheduledMessage(message *RawMessage) { +func (s *MessageSender) notifyOnScheduledMessage(message *RawMessage) { // Publish on channels, drop if buffer is full - for _, c := range p.scheduledMessagesSubscriptions { + for _, c := range s.scheduledMessagesSubscriptions { select { case c <- message: default: - p.logger.Warn("scheduled messages subscription channel full, dropping message") + s.logger.Warn("scheduled messages subscription channel full, dropping message") } } } -func (p *MessageProcessor) JoinPublic(id string) (*transport.Filter, error) { - return p.transport.JoinPublic(id) +func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) { + return s.transport.JoinPublic(id) } // AddEphemeralKey adds an ephemeral key that we will be listening to // note that we never removed them from now, as waku/whisper does not // recalculate topics on removal, so effectively there's no benefit. // On restart they will be gone. -func (p *MessageProcessor) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) { - p.ephemeralKeysMutex.Lock() - p.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey - p.ephemeralKeysMutex.Unlock() - return p.transport.LoadKeyFilters(privateKey) +func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) { + s.ephemeralKeysMutex.Lock() + s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey + s.ephemeralKeysMutex.Unlock() + return s.transport.LoadKeyFilters(privateKey) } func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) { diff --git a/protocol/common/message_processor_test.go b/protocol/common/message_sender_test.go similarity index 87% rename from protocol/common/message_processor_test.go rename to protocol/common/message_sender_test.go index f0d5fe1f8..5b53d5f99 100644 --- a/protocol/common/message_processor_test.go +++ b/protocol/common/message_sender_test.go @@ -26,20 +26,20 @@ import ( v1protocol "github.com/status-im/status-go/protocol/v1" ) -func TestMessageProcessorSuite(t *testing.T) { - suite.Run(t, new(MessageProcessorSuite)) +func TestMessageSenderSuite(t *testing.T) { + suite.Run(t, new(MessageSenderSuite)) } -type MessageProcessorSuite struct { +type MessageSenderSuite struct { suite.Suite - processor *MessageProcessor + sender *MessageSender tmpDir string testMessage protobuf.ChatMessage logger *zap.Logger } -func (s *MessageProcessorSuite) SetupTest() { +func (s *MessageSenderSuite) SetupTest() { s.testMessage = protobuf.ChatMessage{ Text: "abc123", ChatId: "testing-adamb", @@ -60,7 +60,7 @@ func (s *MessageProcessorSuite) SetupTest() { identity, err := crypto.GenerateKey() s.Require().NoError(err) - database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key") + database, err := sqlite.Open(filepath.Join(s.tmpDir, "sender-test.sql"), "some-key") s.Require().NoError(err) encryptionProtocol := encryption.New( @@ -85,7 +85,7 @@ func (s *MessageProcessorSuite) SetupTest() { ) s.Require().NoError(err) - s.processor, err = NewMessageProcessor( + s.sender, err = NewMessageSender( identity, database, encryptionProtocol, @@ -96,12 +96,12 @@ func (s *MessageProcessorSuite) SetupTest() { s.Require().NoError(err) } -func (s *MessageProcessorSuite) TearDownTest() { +func (s *MessageSenderSuite) TearDownTest() { os.Remove(s.tmpDir) _ = s.logger.Sync() } -func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() { +func (s *MessageSenderSuite) TestHandleDecodedMessagesWrapped() { relayerKey, err := crypto.GenerateKey() s.Require().NoError(err) @@ -118,7 +118,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() { message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey) message.Payload = wrappedPayload - decodedMessages, _, err := s.processor.HandleMessages(message, true) + decodedMessages, _, err := s.sender.HandleMessages(message, true) s.Require().NoError(err) s.Require().Equal(1, len(decodedMessages)) @@ -130,7 +130,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() { s.Require().Equal(protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, decodedMessages[0].Type) } -func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() { +func (s *MessageSenderSuite) TestHandleDecodedMessagesDatasync() { relayerKey, err := crypto.GenerateKey() s.Require().NoError(err) @@ -154,7 +154,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() { message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey) message.Payload = marshalledDataSyncMessage - decodedMessages, _, err := s.processor.HandleMessages(message, true) + decodedMessages, _, err := s.sender.HandleMessages(message, true) s.Require().NoError(err) // We send two messages, the unwrapped one will be attributed to the relayer, while the wrapped one will be attributed to the author @@ -167,14 +167,14 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() { s.Require().Equal(protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, decodedMessages[0].Type) } -func (s *MessageProcessorSuite) CalculatePoWTest() { +func (s *MessageSenderSuite) CalculatePoWTest() { largeSizePayload := make([]byte, largeSizeInBytes) s.Require().Equal(whisperLargeSizePoW, calculatePoW(largeSizePayload)) normalSizePayload := make([]byte, largeSizeInBytes-1) s.Require().Equal(whisperDefaultPoW, calculatePoW(normalSizePayload)) } -func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() { +func (s *MessageSenderSuite) TestHandleDecodedMessagesDatasyncEncrypted() { relayerKey, err := crypto.GenerateKey() s.Require().NoError(err) @@ -206,7 +206,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() { messageSpec, err := senderEncryptionProtocol.BuildDirectMessage( relayerKey, - &s.processor.identity.PublicKey, + &s.sender.identity.PublicKey, marshalledDataSyncMessage, ) s.Require().NoError(err) @@ -218,7 +218,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() { message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey) message.Payload = encryptedPayload - decodedMessages, _, err := s.processor.HandleMessages(message, true) + decodedMessages, _, err := s.sender.HandleMessages(message, true) s.Require().NoError(err) // We send two messages, the unwrapped one will be attributed to the relayer, diff --git a/protocol/datasync/transport.go b/protocol/datasync/transport.go index ca10648d0..fa05b5bff 100644 --- a/protocol/datasync/transport.go +++ b/protocol/datasync/transport.go @@ -72,7 +72,7 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf continue } - data, err := proto.Marshal(payload) + marshalledPayload, err := proto.Marshal(payload) if err != nil { t.logger.Error("failed to marshal payload") continue @@ -85,7 +85,7 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf } // We don't return an error otherwise datasync will keep // re-trying sending at each epoch - err = t.dispatch(context.Background(), publicKey, data, payload) + err = t.dispatch(context.Background(), publicKey, marshalledPayload, payload) if err != nil { t.logger.Error("failed to send message", zap.Error(err)) continue diff --git a/protocol/encryption/protocol.go b/protocol/encryption/protocol.go index 8ef81b7fc..95812795c 100644 --- a/protocol/encryption/protocol.go +++ b/protocol/encryption/protocol.go @@ -189,7 +189,7 @@ func (p *Protocol) BuildDirectMessage(myIdentityKey *ecdsa.PrivateKey, publicKey } // Encrypt payload - directMessage, installations, err := p.encryptor.EncryptPayload(publicKey, myIdentityKey, activeInstallations, payload) + directMessagesByInstalls, installations, err := p.encryptor.EncryptPayload(publicKey, myIdentityKey, activeInstallations, payload) if err != nil { return nil, err } @@ -197,7 +197,7 @@ func (p *Protocol) BuildDirectMessage(myIdentityKey *ecdsa.PrivateKey, publicKey // Build message message := &ProtocolMessage{ InstallationId: p.encryptor.config.InstallationID, - DirectMessage: directMessage, + DirectMessage: directMessagesByInstalls, } err = p.addBundle(myIdentityKey, message) diff --git a/protocol/messenger.go b/protocol/messenger.go index fac2a2e2c..1ec576c52 100644 --- a/protocol/messenger.go +++ b/protocol/messenger.go @@ -81,7 +81,7 @@ type Messenger struct { persistence *sqlitePersistence transport *transport.Transport encryptor *encryption.Protocol - processor *common.MessageProcessor + sender *common.MessageSender handler *MessageHandler ensVerifier *ens.Verifier pushNotificationClient *pushnotificationclient.Client @@ -249,7 +249,7 @@ func NewMessenger( logger, ) - processor, err := common.NewMessageProcessor( + sender, err := common.NewMessageSender( identity, database, encryptionProtocol, @@ -258,7 +258,7 @@ func NewMessenger( c.featureFlags, ) if err != nil { - return nil, errors.Wrap(err, "failed to create messageProcessor") + return nil, errors.Wrap(err, "failed to create messageSender") } // Initialize push notification server @@ -266,7 +266,7 @@ func NewMessenger( if c.pushNotificationServerConfig != nil && c.pushNotificationServerConfig.Enabled { c.pushNotificationServerConfig.Identity = identity pushNotificationServerPersistence := pushnotificationserver.NewSQLitePersistence(database) - pushNotificationServer = pushnotificationserver.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, processor) + pushNotificationServer = pushnotificationserver.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, sender) } // Initialize push notification client @@ -282,7 +282,7 @@ func NewMessenger( pushNotificationClientConfig.Logger = logger pushNotificationClientConfig.InstallationID = installationID - pushNotificationClient := pushnotificationclient.New(pushNotificationClientPersistence, pushNotificationClientConfig, processor, sqlitePersistence) + pushNotificationClient := pushnotificationclient.New(pushNotificationClientPersistence, pushNotificationClientConfig, sender, sqlitePersistence) ensVerifier := ens.New(node, logger, transp, database, c.verifyENSURL, c.verifyENSContractAddress) @@ -300,7 +300,7 @@ func NewMessenger( persistence: sqlitePersistence, transport: transp, encryptor: encryptionProtocol, - processor: processor, + sender: sender, handler: handler, pushNotificationClient: pushNotificationClient, pushNotificationServer: pushNotificationServer, @@ -328,7 +328,7 @@ func NewMessenger( encryptionProtocol.Stop, transp.ResetFilters, transp.Stop, - func() error { processor.Stop(); return nil }, + func() error { sender.Stop(); return nil }, // Currently this often fails, seems like it's safe to ignore them // https://github.com/uber-go/zap/issues/328 func() error { _ = logger.Sync; return nil }, @@ -444,7 +444,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) { } // set shared secret handles - m.processor.SetHandleSharedSecrets(m.handleSharedSecrets) + m.sender.SetHandleSharedSecrets(m.handleSharedSecrets) subscriptions, err := m.encryptor.Start(m.identity) if err != nil { @@ -572,7 +572,7 @@ func (m *Messenger) publishContactCode() error { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _, err = m.processor.SendPublic(ctx, contactCodeTopic, rawMessage) + _, err = m.sender.SendPublic(ctx, contactCodeTopic, rawMessage) if err != nil { m.logger.Warn("failed to send a contact code", zap.Error(err)) } @@ -643,7 +643,7 @@ func (m *Messenger) handleStandaloneChatIdentity(chat *Chat) error { } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() - _, err = m.processor.SendPublic(ctx, chat.ID, rawMessage) + _, err = m.sender.SendPublic(ctx, chat.ID, rawMessage) if err != nil { return err } @@ -1200,7 +1200,7 @@ func (m *Messenger) CreateGroupChatWithMembers(ctx context.Context, name string, return nil, err } - encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil) + encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil) if err != nil { return nil, err } @@ -1279,7 +1279,7 @@ func (m *Messenger) RemoveMemberFromGroupChat(ctx context.Context, chatID string return nil, err } - encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil) + encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil) if err != nil { return nil, err } @@ -1364,7 +1364,7 @@ func (m *Messenger) AddMembersToGroupChat(ctx context.Context, chatID string, me return nil, err } - encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil) + encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil) if err != nil { return nil, err } @@ -1425,7 +1425,7 @@ func (m *Messenger) ChangeGroupChatName(ctx context.Context, chatID string, name return nil, err } - encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil) + encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil) if err != nil { return nil, err } @@ -1500,7 +1500,7 @@ func (m *Messenger) SendGroupChatInvitationRequest(ctx context.Context, chatID s return nil, err } - id, err := m.processor.SendPrivate(ctx, adminpk, &spec) + id, err := m.sender.SendPrivate(ctx, adminpk, &spec) if err != nil { return nil, err } @@ -1568,7 +1568,7 @@ func (m *Messenger) SendGroupChatInvitationRejection(ctx context.Context, invita return nil, err } - id, err := m.processor.SendPrivate(ctx, userpk, &spec) + id, err := m.sender.SendPrivate(ctx, userpk, &spec) if err != nil { return nil, err } @@ -1626,7 +1626,7 @@ func (m *Messenger) AddAdminsToGroupChat(ctx context.Context, chatID string, mem return nil, err } - encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil) + encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil) if err != nil { return nil, err } @@ -1690,7 +1690,7 @@ func (m *Messenger) ConfirmJoiningGroup(ctx context.Context, chatID string) (*Me return nil, err } - encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil) + encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil) if err != nil { return nil, err } @@ -1753,7 +1753,7 @@ func (m *Messenger) LeaveGroupChat(ctx context.Context, chatID string, remove bo return nil, err } - encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil) + encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil) if err != nil { return nil, err } @@ -1837,7 +1837,7 @@ func (m *Messenger) sendToPairedDevices(ctx context.Context, spec common.RawMess hasPairedDevices := m.hasPairedDevices() // We send a message to any paired device if hasPairedDevices { - _, err := m.processor.SendPrivate(ctx, &m.identity.PublicKey, &spec) + _, err := m.sender.SendPrivate(ctx, &m.identity.PublicKey, &spec) if err != nil { return err } @@ -1849,7 +1849,7 @@ func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec co var err error var id []byte - id, err = m.processor.SendPairInstallation(ctx, &m.identity.PublicKey, spec) + id, err = m.sender.SendPairInstallation(ctx, &m.identity.PublicKey, spec) if err != nil { return nil, err @@ -1884,7 +1884,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage) //message for sending to paired devices later specCopyForPairedDevices := spec if !common.IsPubKeyEqual(publicKey, &m.identity.PublicKey) { - id, err = m.processor.SendPrivate(ctx, publicKey, &spec) + id, err = m.sender.SendPrivate(ctx, publicKey, &spec) if err != nil { return spec, err @@ -1899,7 +1899,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage) case ChatTypePublic, ChatTypeProfile: logger.Debug("sending public message", zap.String("chatName", chat.Name)) - id, err = m.processor.SendPublic(ctx, chat.ID, spec) + id, err = m.sender.SendPublic(ctx, chat.ID, spec) if err != nil { return spec, err } @@ -1918,7 +1918,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage) } logger.Debug("sending community chat message", zap.String("chatName", chat.Name)) - id, err = m.processor.SendPublic(ctx, chat.ID, spec) + id, err = m.sender.SendPublic(ctx, chat.ID, spec) if err != nil { return spec, err } @@ -1967,7 +1967,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage) spec.MessageType = protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE } - id, err = m.processor.SendGroup(ctx, spec.Recipients, spec) + id, err = m.sender.SendGroup(ctx, spec.Recipients, spec) if err != nil { return spec, err } @@ -2479,7 +2479,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte for _, shhMessage := range messages { // Indicates tha all messages in the batch have been processed correctly allMessagesProcessed := true - statusMessages, acks, err := m.processor.HandleMessages(shhMessage, true) + statusMessages, acks, err := m.sender.HandleMessages(shhMessage, true) if err != nil { logger.Info("failed to decode messages", zap.Error(err)) continue @@ -4083,7 +4083,7 @@ func (m *Messenger) StartPushNotificationsServer() error { Logger: m.logger, Identity: m.identity, } - m.pushNotificationServer = pushnotificationserver.New(config, pushNotificationServerPersistence, m.processor) + m.pushNotificationServer = pushnotificationserver.New(config, pushNotificationServerPersistence, m.sender) } return m.pushNotificationServer.Start() @@ -4280,7 +4280,7 @@ func (m *Messenger) encodeChatEntity(chat *Chat, message common.ChatEntity) ([]b return nil, err } - encodedMessage, err = m.processor.EncodeAbridgedMembershipUpdate(group, message) + encodedMessage, err = m.sender.EncodeAbridgedMembershipUpdate(group, message) if err != nil { return nil, err } diff --git a/protocol/messenger_communities.go b/protocol/messenger_communities.go index 3e41e2c23..164f96b34 100644 --- a/protocol/messenger_communities.go +++ b/protocol/messenger_communities.go @@ -34,7 +34,7 @@ func (m *Messenger) publishOrg(org *communities.Community) error { SkipEncryption: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION, } - _, err = m.processor.SendPublic(context.Background(), org.IDString(), rawMessage) + _, err = m.sender.SendPublic(context.Background(), org.IDString(), rawMessage) return err } @@ -57,7 +57,7 @@ func (m *Messenger) publishOrgInvitation(org *communities.Community, invitation SkipEncryption: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_INVITATION, } - _, err = m.processor.SendPrivate(context.Background(), pk, &rawMessage) + _, err = m.sender.SendPrivate(context.Background(), pk, &rawMessage) return err } @@ -210,7 +210,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun SkipEncryption: true, MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN, } - _, err = m.processor.SendCommunityMessage(context.Background(), community.PublicKey(), rawMessage) + _, err = m.sender.SendCommunityMessage(context.Background(), community.PublicKey(), rawMessage) if err != nil { return nil, err } diff --git a/protocol/messenger_test.go b/protocol/messenger_test.go index 32b28c8fd..c1a3c043a 100644 --- a/protocol/messenger_test.go +++ b/protocol/messenger_test.go @@ -2532,7 +2532,7 @@ func (s *MessageHandlerSuite) TestRun() { // ChatID is not set at the beginning. s.Empty(message.LocalChatID) - message.ID = strconv.Itoa(idx) // manually set the ID because messages does not go through messageProcessor + message.ID = strconv.Itoa(idx) // manually set the ID because messages does not go through messageSender chat, err := s.messageHandler.matchChatEntity(&message, chatsMap, contactsMap, &testTimeSource{}) if tc.Error { s.Require().Error(err) diff --git a/protocol/pushnotificationclient/client.go b/protocol/pushnotificationclient/client.go index 353ac1df6..d3e5045be 100644 --- a/protocol/pushnotificationclient/client.go +++ b/protocol/pushnotificationclient/client.go @@ -179,8 +179,8 @@ type Client struct { // randomReader only used for testing so we have deterministic encryption reader io.Reader - //messageProcessor is a message processor used to send and being notified of messages - messageProcessor *common.MessageProcessor + //messageSender used to send and being notified of messages + messageSender *common.MessageSender // registrationLoopQuitChan is a channel to indicate to the registration loop that should be terminating registrationLoopQuitChan chan struct{} @@ -194,11 +194,11 @@ type Client struct { registrationSubscriptions []chan struct{} } -func New(persistence *Persistence, config *Config, processor *common.MessageProcessor, messagePersistence MessagePersistence) *Client { +func New(persistence *Persistence, config *Config, sender *common.MessageSender, messagePersistence MessagePersistence) *Client { return &Client{ quit: make(chan struct{}), config: config, - messageProcessor: processor, + messageSender: sender, messagePersistence: messagePersistence, persistence: persistence, reader: rand.Reader, @@ -206,8 +206,8 @@ func New(persistence *Persistence, config *Config, processor *common.MessageProc } func (c *Client) Start() error { - if c.messageProcessor == nil { - return errors.New("can't start, missing message processor") + if c.messageSender == nil { + return errors.New("can't start, missing message sender") } err := c.loadLastPushNotificationRegistration() @@ -685,8 +685,8 @@ func (c *Client) generateSharedKey(publicKey *ecdsa.PublicKey) ([]byte, error) { func (c *Client) subscribeForMessageEvents() { go func() { c.config.Logger.Debug("subscribing for message events") - sentMessagesSubscription := c.messageProcessor.SubscribeToSentMessages() - scheduledMessagesSubscription := c.messageProcessor.SubscribeToScheduledMessages() + sentMessagesSubscription := c.messageSender.SubscribeToSentMessages() + scheduledMessagesSubscription := c.messageSender.SubscribeToScheduledMessages() for { select { // order is important, since both are asynchronous, we want to process @@ -1273,7 +1273,7 @@ func (c *Client) registerWithServer(registration *protobuf.PushNotificationRegis SkipEncryption: true, } - _, err = c.messageProcessor.SendPrivate(context.Background(), server.PublicKey, &rawMessage) + _, err = c.messageSender.SendPrivate(context.Background(), server.PublicKey, &rawMessage) if err != nil { return err @@ -1336,7 +1336,7 @@ func (c *Client) SendNotification(publicKey *ecdsa.PublicKey, installationIDs [] if err != nil { return nil, err } - _, err = c.messageProcessor.AddEphemeralKey(ephemeralKey) + _, err = c.messageSender.AddEphemeralKey(ephemeralKey) if err != nil { return nil, err } @@ -1377,7 +1377,7 @@ func (c *Client) SendNotification(publicKey *ecdsa.PublicKey, installationIDs [] MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_REQUEST, } - _, err = c.messageProcessor.SendPrivate(context.Background(), serverPublicKey, &rawMessage) + _, err = c.messageSender.SendPrivate(context.Background(), serverPublicKey, &rawMessage) if err != nil { return nil, err @@ -1657,14 +1657,14 @@ func (c *Client) queryPushNotificationInfo(publicKey *ecdsa.PublicKey) error { MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY, } - _, err = c.messageProcessor.AddEphemeralKey(ephemeralKey) + _, err = c.messageSender.AddEphemeralKey(ephemeralKey) if err != nil { return err } // this is the topic of message encodedPublicKey := hex.EncodeToString(hashedPublicKey) - messageID, err := c.messageProcessor.SendPublic(context.Background(), encodedPublicKey, rawMessage) + messageID, err := c.messageSender.SendPublic(context.Background(), encodedPublicKey, rawMessage) if err != nil { return err diff --git a/protocol/pushnotificationserver/server.go b/protocol/pushnotificationserver/server.go index 6cde204cc..83e8f03a9 100644 --- a/protocol/pushnotificationserver/server.go +++ b/protocol/pushnotificationserver/server.go @@ -34,19 +34,19 @@ type Config struct { } type Server struct { - persistence Persistence - config *Config - messageProcessor *common.MessageProcessor + persistence Persistence + config *Config + messageSender *common.MessageSender // SentRequests keeps track of the requests sent to gorush, for testing only SentRequests int64 } -func New(config *Config, persistence Persistence, messageProcessor *common.MessageProcessor) *Server { +func New(config *Config, persistence Persistence, messageSender *common.MessageSender) *Server { if len(config.GorushURL) == 0 { config.GorushURL = defaultGorushURL } - return &Server{persistence: persistence, config: config, messageProcessor: messageProcessor} + return &Server{persistence: persistence, config: config, messageSender: messageSender} } func (s *Server) Start() error { @@ -112,7 +112,7 @@ func (s *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey, SkipEncryption: true, } - _, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, &rawMessage) + _, err = s.messageSender.SendPrivate(context.Background(), publicKey, &rawMessage) return err } @@ -135,7 +135,7 @@ func (s *Server) HandlePushNotificationQuery(publicKey *ecdsa.PublicKey, message SkipEncryption: true, } - _, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, &rawMessage) + _, err = s.messageSender.SendPrivate(context.Background(), publicKey, &rawMessage) return err } @@ -178,7 +178,7 @@ func (s *Server) HandlePushNotificationRequest(publicKey *ecdsa.PublicKey, SkipEncryption: true, } - _, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, &rawMessage) + _, err = s.messageSender.SendPrivate(context.Background(), publicKey, &rawMessage) return err } @@ -459,11 +459,11 @@ func (s *Server) sendPushNotification(requestAndRegistrations []*RequestAndRegis // listenToPublicKeyQueryTopic listen to a topic derived from the hashed public key func (s *Server) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error { - if s.messageProcessor == nil { + if s.messageSender == nil { return nil } encodedPublicKey := hex.EncodeToString(hashedPublicKey) - _, err := s.messageProcessor.JoinPublic(encodedPublicKey) + _, err := s.messageSender.JoinPublic(encodedPublicKey) return err }