Rename MessageProcessor to MessageSender (#2264)

This commit is contained in:
Volodymyr Kozieiev 2021-06-23 17:13:48 +03:00 committed by GitHub
parent 81171ad9e6
commit 0e538c0a95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 205 additions and 205 deletions

View File

@ -1 +1 @@
0.80.00
0.80.1

View File

@ -43,7 +43,7 @@ type SentMessage struct {
MessageIDs [][]byte
}
type MessageProcessor struct {
type MessageSender struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
protocol *encryption.Protocol
@ -67,14 +67,14 @@ type MessageProcessor struct {
handleSharedSecrets func([]*sharedsecret.Secret) error
}
func NewMessageProcessor(
func NewMessageSender(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport *transport.Transport,
logger *zap.Logger,
features FeatureFlags,
) (*MessageProcessor, error) {
) (*MessageSender, error) {
dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
database,
@ -89,7 +89,7 @@ func NewMessageProcessor(
}
ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger)
p := &MessageProcessor{
p := &MessageSender{
identity: identity,
datasync: ds,
protocol: enc,
@ -116,25 +116,25 @@ func NewMessageProcessor(
return p, nil
}
func (p *MessageProcessor) Stop() {
for _, c := range p.sentMessagesSubscriptions {
func (s *MessageSender) Stop() {
for _, c := range s.sentMessagesSubscriptions {
close(c)
}
p.sentMessagesSubscriptions = nil
p.datasync.Stop() // idempotent op
s.sentMessagesSubscriptions = nil
s.datasync.Stop() // idempotent op
}
func (p *MessageProcessor) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
p.handleSharedSecrets = handler
func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
s.handleSharedSecrets = handler
}
// SendPrivate takes encoded data, encrypts it and sends through the wire.
func (p *MessageProcessor) SendPrivate(
func (s *MessageSender) SendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
p.logger.Debug(
s.logger.Debug(
"sending a private message",
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
zap.String("site", "SendPrivate"),
@ -148,47 +148,47 @@ func (p *MessageProcessor) SendPrivate(
// Set sender identity if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = p.identity
rawMessage.Sender = s.identity
}
return p.sendPrivate(ctx, recipient, rawMessage)
return s.sendPrivate(ctx, recipient, rawMessage)
}
// SendCommunityMessage takes encoded data, encrypts it and sends through the wire
// using the community topic and their key
func (p *MessageProcessor) SendCommunityMessage(
func (s *MessageSender) SendCommunityMessage(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
p.logger.Debug(
s.logger.Debug(
"sending a community message",
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
zap.String("site", "SendPrivate"),
)
rawMessage.Sender = p.identity
rawMessage.Sender = s.identity
return p.sendCommunity(ctx, recipient, &rawMessage)
return s.sendCommunity(ctx, recipient, &rawMessage)
}
// SendGroup takes encoded data, encrypts it and sends through the wire,
// always return the messageID
func (p *MessageProcessor) SendGroup(
func (s *MessageSender) SendGroup(
ctx context.Context,
recipients []*ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
p.logger.Debug(
s.logger.Debug(
"sending a private group message",
zap.String("site", "SendGroup"),
)
// Set sender if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = p.identity
rawMessage.Sender = s.identity
}
// Calculate messageID first and set on raw message
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
@ -197,7 +197,7 @@ func (p *MessageProcessor) SendGroup(
// Send to each recipients
for _, recipient := range recipients {
_, err = p.sendPrivate(ctx, recipient, &rawMessage)
_, err = s.sendPrivate(ctx, recipient, &rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message")
}
@ -206,14 +206,14 @@ func (p *MessageProcessor) SendGroup(
}
// sendCommunity sends data to the recipient identifying with a given public key.
func (p *MessageProcessor) sendCommunity(
func (s *MessageSender) sendCommunity(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
p.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
wrappedMessage, err := p.wrapMessageV1(rawMessage)
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
@ -223,29 +223,29 @@ func (p *MessageProcessor) sendCommunity(
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
p.notifyOnScheduledMessage(rawMessage)
s.notifyOnScheduledMessage(rawMessage)
messageIDs := [][]byte{messageID}
hash, newMessage, err := p.sendCommunityRawMessage(ctx, recipient, wrappedMessage, messageIDs)
hash, newMessage, err := s.sendCommunityRawMessage(ctx, recipient, wrappedMessage, messageIDs)
if err != nil {
p.logger.Error("failed to send a community message", zap.Error(err))
s.logger.Error("failed to send a community message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
p.transport.Track(messageIDs, hash, newMessage)
s.transport.Track(messageIDs, hash, newMessage)
return messageID, nil
}
// sendPrivate sends data to the recipient identifying with a given public key.
func (p *MessageProcessor) sendPrivate(
func (s *MessageSender) sendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
wrappedMessage, err := p.wrapMessageV1(rawMessage)
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
@ -255,24 +255,24 @@ func (p *MessageProcessor) sendPrivate(
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
p.notifyOnScheduledMessage(rawMessage)
s.notifyOnScheduledMessage(rawMessage)
if p.featureFlags.Datasync && rawMessage.ResendAutomatically {
if s.featureFlags.Datasync && rawMessage.ResendAutomatically {
// No need to call transport tracking.
// It is done in a data sync dispatch step.
datasyncID, err := p.addToDataSync(recipient, wrappedMessage)
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
// We don't need to receive confirmations from our own devices
if !IsPubKeyEqual(recipient, &p.identity.PublicKey) {
if !IsPubKeyEqual(recipient, &s.identity.PublicKey) {
confirmation := &RawMessageConfirmation{
DataSyncID: datasyncID,
MessageID: messageID,
PublicKey: crypto.CompressPubkey(recipient),
}
err = p.persistence.InsertPendingConfirmation(confirmation)
err = s.persistence.InsertPendingConfirmation(confirmation)
if err != nil {
return nil, err
}
@ -280,24 +280,24 @@ func (p *MessageProcessor) sendPrivate(
} else if rawMessage.SkipEncryption {
// When SkipEncryption is set we don't pass the message to the encryption layer
messageIDs := [][]byte{messageID}
hash, newMessage, err := p.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs)
hash, newMessage, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs)
if err != nil {
p.logger.Error("failed to send a private message", zap.Error(err))
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
p.transport.Track(messageIDs, hash, newMessage)
s.transport.Track(messageIDs, hash, newMessage)
} else {
messageSpec, err := p.protocol.BuildDirectMessage(rawMessage.Sender, recipient, wrappedMessage)
messageSpec, err := s.protocol.BuildDirectMessage(rawMessage.Sender, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
// The shared secret needs to be handle before we send a message
// otherwise the topic might not be set up before we receive a message
if p.handleSharedSecrets != nil {
err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if s.handleSharedSecrets != nil {
err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if err != nil {
return nil, err
}
@ -305,50 +305,50 @@ func (p *MessageProcessor) sendPrivate(
}
messageIDs := [][]byte{messageID}
hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
if err != nil {
p.logger.Error("failed to send a private message", zap.Error(err))
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
p.transport.Track(messageIDs, hash, newMessage)
s.transport.Track(messageIDs, hash, newMessage)
}
return messageID, nil
}
// sendPairInstallation sends data to the recipients, using DH
func (p *MessageProcessor) SendPairInstallation(
func (s *MessageSender) SendPairInstallation(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := p.protocol.BuildDHMessage(p.identity, recipient, wrappedMessage)
messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage)
messageIDs := [][]byte{messageID}
hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
if err != nil {
return nil, errors.Wrap(err, "failed to send a message spec")
}
p.transport.Track(messageIDs, hash, newMessage)
s.transport.Track(messageIDs, hash, newMessage)
return messageID, nil
}
func (p *MessageProcessor) encodeMembershipUpdate(
func (s *MessageSender) encodeMembershipUpdate(
message v1protocol.MembershipUpdateMessage,
chatEntity ChatEntity,
) ([]byte, error) {
@ -374,7 +374,7 @@ func (p *MessageProcessor) encodeMembershipUpdate(
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// All the events in a group are encoded and added to the payload
func (p *MessageProcessor) EncodeMembershipUpdate(
func (s *MessageSender) EncodeMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
@ -383,43 +383,43 @@ func (p *MessageProcessor) EncodeMembershipUpdate(
Events: group.Events(),
}
return p.encodeMembershipUpdate(message, chatEntity)
return s.encodeMembershipUpdate(message, chatEntity)
}
// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// Only the events relevant to the sender are encoded
func (p *MessageProcessor) EncodeAbridgedMembershipUpdate(
func (s *MessageSender) EncodeAbridgedMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
message := v1protocol.MembershipUpdateMessage{
ChatID: group.ChatID(),
Events: group.AbridgedEvents(&p.identity.PublicKey),
Events: group.AbridgedEvents(&s.identity.PublicKey),
}
return p.encodeMembershipUpdate(message, chatEntity)
return s.encodeMembershipUpdate(message, chatEntity)
}
// SendPublic takes encoded data, encrypts it and sends through the wire.
func (p *MessageProcessor) SendPublic(
func (s *MessageSender) SendPublic(
ctx context.Context,
chatName string,
rawMessage RawMessage,
) ([]byte, error) {
// Set sender
if rawMessage.Sender == nil {
rawMessage.Sender = p.identity
rawMessage.Sender = s.identity
}
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
var newMessage *types.NewMessage
messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage)
messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage)
if err != nil {
p.logger.Error("failed to send a public message", zap.Error(err))
s.logger.Error("failed to send a public message", zap.Error(err))
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
}
@ -441,9 +441,9 @@ func (p *MessageProcessor) SendPublic(
rawMessage.ID = types.EncodeHex(messageID)
// notify before dispatching
p.notifyOnScheduledMessage(&rawMessage)
s.notifyOnScheduledMessage(&rawMessage)
hash, err := p.transport.SendPublic(ctx, newMessage, chatName)
hash, err := s.transport.SendPublic(ctx, newMessage, chatName)
if err != nil {
return nil, err
}
@ -453,9 +453,9 @@ func (p *MessageProcessor) SendPublic(
MessageIDs: [][]byte{messageID},
}
p.notifyOnSentMessage(sentMessage)
s.notifyOnSentMessage(sentMessage)
p.transport.Track([][]byte{messageID}, hash, newMessage)
s.transport.Track([][]byte{messageID}, hash, newMessage)
return messageID, nil
}
@ -489,8 +489,8 @@ func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataS
// layer message, or in case of Raw methods, the processing stops at the layer
// before.
// It returns an error only if the processing of required steps failed.
func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, [][]byte, error) {
logger := p.logger.With(zap.String("site", "handleMessages"))
func (s *MessageSender) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, [][]byte, error) {
logger := s.logger.With(zap.String("site", "handleMessages"))
hlogger := logger.With(zap.ByteString("hash", shhMessage.Hash))
var statusMessage v1protocol.StatusMessage
var statusMessages []*v1protocol.StatusMessage
@ -501,12 +501,12 @@ func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, application
return nil, nil, err
}
err = p.handleEncryptionLayer(context.Background(), &statusMessage)
err = s.handleEncryptionLayer(context.Background(), &statusMessage)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
}
statusMessages, acks, err := unwrapDatasyncMessage(&statusMessage, p.datasync)
statusMessages, acks, err := unwrapDatasyncMessage(&statusMessage, s.datasync)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
//that wasn't a datasync message, so use the original payload
@ -531,32 +531,32 @@ func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, application
}
// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
func (p *MessageProcessor) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
p.ephemeralKeysMutex.Lock()
decryptionKey, ok := p.ephemeralKeys[destinationID]
p.ephemeralKeysMutex.Unlock()
s.ephemeralKeysMutex.Lock()
decryptionKey, ok := s.ephemeralKeys[destinationID]
s.ephemeralKeysMutex.Unlock()
// the key is not there, fallback on identity
if !ok {
return p.identity, false
return s.identity, false
}
return decryptionKey, true
}
func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
logger := p.logger.With(zap.String("site", "handleEncryptionLayer"))
func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleEncryptionLayer"))
publicKey := message.SigPubKey()
// if it's an ephemeral key, we don't negotiate a topic
decryptionKey, skipNegotiation := p.fetchDecryptionKey(message.Dst)
decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.Dst)
err := message.HandleEncryption(decryptionKey, publicKey, p.protocol, skipNegotiation)
err := message.HandleEncryption(decryptionKey, publicKey, s.protocol, skipNegotiation)
// if it's an ephemeral key, we don't have to handle a device not found error
if err == encryption.ErrDeviceNotFound && !skipNegotiation {
if err := p.handleErrDeviceNotFound(ctx, publicKey); err != nil {
if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil {
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
}
}
@ -567,9 +567,9 @@ func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v
return nil
}
func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now)
advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
@ -577,7 +577,7 @@ func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKe
return nil
}
messageSpec, err := p.protocol.BuildBundleAdvertiseMessage(p.identity, publicKey)
messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey)
if err != nil {
return err
}
@ -586,17 +586,17 @@ func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKe
defer cancel()
// We don't pass an array of messageIDs as no action needs to be taken
// when sending a bundle
_, _, err = p.sendMessageSpec(ctx, publicKey, messageSpec, nil)
_, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil)
if err != nil {
return err
}
p.protocol.ConfirmBundleAdvertisement(publicKey, now)
s.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
@ -604,19 +604,19 @@ func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error)
return wrappedMessage, nil
}
func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey)
func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := p.datasync.IsPeerInGroup(groupID, peerID)
exist, err := s.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return nil, errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := p.datasync.AddPeer(groupID, peerID); err != nil {
if err := s.datasync.AddPeer(groupID, peerID); err != nil {
return nil, errors.Wrap(err, "failed to add peer")
}
}
id, err := p.datasync.AppendMessage(groupID, message)
id, err := s.datasync.AppendMessage(groupID, message)
if err != nil {
return nil, errors.Wrap(err, "failed to append message to datasync")
}
@ -626,41 +626,41 @@ func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []b
// sendDataSync sends a message scheduled by the data sync layer.
// Data Sync layer calls this method "dispatch" function.
func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, marshalledDatasyncPayload []byte, payload *datasyncproto.Payload) error {
// Calculate the messageIDs
messageIDs := make([][]byte, 0, len(payload.Messages))
for _, payload := range payload.Messages {
messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body))
messageIDs = append(messageIDs, v1protocol.MessageID(&s.identity.PublicKey, payload.Body))
}
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, encodedMessage)
messageSpec, err := s.protocol.BuildDirectMessage(s.identity, publicKey, marshalledDatasyncPayload)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
// The shared secret needs to be handle before we send a message
// otherwise the topic might not be set up before we receive a message
if p.handleSharedSecrets != nil {
err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if s.handleSharedSecrets != nil {
err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if err != nil {
return err
}
}
hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
hash, newMessage, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
if err != nil {
p.logger.Error("failed to send a datasync message", zap.Error(err))
s.logger.Error("failed to send a datasync message", zap.Error(err))
return err
}
p.transport.Track(messageIDs, hash, newMessage)
s.transport.Track(messageIDs, hash, newMessage)
return nil
}
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
@ -671,9 +671,9 @@ func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage
var err error
if rawMessage.SendOnPersonalTopic {
hash, err = p.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
} else {
hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
@ -684,7 +684,7 @@ func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage
// sendCommunityRawMessage sends a message not wrapped in an encryption layer
// to a community
func (p *MessageProcessor) sendCommunityRawMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
func (s *MessageSender) sendCommunityRawMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
@ -692,7 +692,7 @@ func (p *MessageProcessor) sendCommunityRawMessage(ctx context.Context, publicKe
PowTime: whisperPoWTime,
}
hash, err := p.transport.SendCommunityMessage(ctx, newMessage, publicKey)
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)
if err != nil {
return nil, nil, err
}
@ -701,23 +701,23 @@ func (p *MessageProcessor) sendCommunityRawMessage(ctx context.Context, publicKe
}
// sendMessageSpec analyses the spec properties and selects a proper transport method.
func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
newMessage, err := MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
logger := p.logger.With(zap.String("site", "sendMessageSpec"))
logger := s.logger.With(zap.String("site", "sendMessageSpec"))
var hash []byte
// process shared secret
if messageSpec.AgreedSecret {
logger.Debug("sending using shared secret")
hash, err = p.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
} else {
logger.Debug("sending partitioned topic")
hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
@ -729,61 +729,61 @@ func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa
MessageIDs: messageIDs,
}
p.notifyOnSentMessage(sentMessage)
s.notifyOnSentMessage(sentMessage)
return hash, newMessage, nil
}
// SubscribeToSentMessages returns a channel where we publish every time a message is sent
func (p *MessageProcessor) SubscribeToSentMessages() <-chan *SentMessage {
func (s *MessageSender) SubscribeToSentMessages() <-chan *SentMessage {
c := make(chan *SentMessage, 100)
p.sentMessagesSubscriptions = append(p.sentMessagesSubscriptions, c)
s.sentMessagesSubscriptions = append(s.sentMessagesSubscriptions, c)
return c
}
func (p *MessageProcessor) notifyOnSentMessage(sentMessage *SentMessage) {
func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) {
// Publish on channels, drop if buffer is full
for _, c := range p.sentMessagesSubscriptions {
for _, c := range s.sentMessagesSubscriptions {
select {
case c <- sentMessage:
default:
p.logger.Warn("sent messages subscription channel full, dropping message")
s.logger.Warn("sent messages subscription channel full, dropping message")
}
}
}
// SubscribeToScheduledMessages returns a channel where we publish every time a message is scheduled for sending
func (p *MessageProcessor) SubscribeToScheduledMessages() <-chan *RawMessage {
func (s *MessageSender) SubscribeToScheduledMessages() <-chan *RawMessage {
c := make(chan *RawMessage, 100)
p.scheduledMessagesSubscriptions = append(p.scheduledMessagesSubscriptions, c)
s.scheduledMessagesSubscriptions = append(s.scheduledMessagesSubscriptions, c)
return c
}
func (p *MessageProcessor) notifyOnScheduledMessage(message *RawMessage) {
func (s *MessageSender) notifyOnScheduledMessage(message *RawMessage) {
// Publish on channels, drop if buffer is full
for _, c := range p.scheduledMessagesSubscriptions {
for _, c := range s.scheduledMessagesSubscriptions {
select {
case c <- message:
default:
p.logger.Warn("scheduled messages subscription channel full, dropping message")
s.logger.Warn("scheduled messages subscription channel full, dropping message")
}
}
}
func (p *MessageProcessor) JoinPublic(id string) (*transport.Filter, error) {
return p.transport.JoinPublic(id)
func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) {
return s.transport.JoinPublic(id)
}
// AddEphemeralKey adds an ephemeral key that we will be listening to
// note that we never removed them from now, as waku/whisper does not
// recalculate topics on removal, so effectively there's no benefit.
// On restart they will be gone.
func (p *MessageProcessor) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
p.ephemeralKeysMutex.Lock()
p.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
p.ephemeralKeysMutex.Unlock()
return p.transport.LoadKeyFilters(privateKey)
func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
s.ephemeralKeysMutex.Lock()
s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
s.ephemeralKeysMutex.Unlock()
return s.transport.LoadKeyFilters(privateKey)
}
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {

View File

@ -26,20 +26,20 @@ import (
v1protocol "github.com/status-im/status-go/protocol/v1"
)
func TestMessageProcessorSuite(t *testing.T) {
suite.Run(t, new(MessageProcessorSuite))
func TestMessageSenderSuite(t *testing.T) {
suite.Run(t, new(MessageSenderSuite))
}
type MessageProcessorSuite struct {
type MessageSenderSuite struct {
suite.Suite
processor *MessageProcessor
sender *MessageSender
tmpDir string
testMessage protobuf.ChatMessage
logger *zap.Logger
}
func (s *MessageProcessorSuite) SetupTest() {
func (s *MessageSenderSuite) SetupTest() {
s.testMessage = protobuf.ChatMessage{
Text: "abc123",
ChatId: "testing-adamb",
@ -60,7 +60,7 @@ func (s *MessageProcessorSuite) SetupTest() {
identity, err := crypto.GenerateKey()
s.Require().NoError(err)
database, err := sqlite.Open(filepath.Join(s.tmpDir, "processor-test.sql"), "some-key")
database, err := sqlite.Open(filepath.Join(s.tmpDir, "sender-test.sql"), "some-key")
s.Require().NoError(err)
encryptionProtocol := encryption.New(
@ -85,7 +85,7 @@ func (s *MessageProcessorSuite) SetupTest() {
)
s.Require().NoError(err)
s.processor, err = NewMessageProcessor(
s.sender, err = NewMessageSender(
identity,
database,
encryptionProtocol,
@ -96,12 +96,12 @@ func (s *MessageProcessorSuite) SetupTest() {
s.Require().NoError(err)
}
func (s *MessageProcessorSuite) TearDownTest() {
func (s *MessageSenderSuite) TearDownTest() {
os.Remove(s.tmpDir)
_ = s.logger.Sync()
}
func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() {
func (s *MessageSenderSuite) TestHandleDecodedMessagesWrapped() {
relayerKey, err := crypto.GenerateKey()
s.Require().NoError(err)
@ -118,7 +118,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() {
message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey)
message.Payload = wrappedPayload
decodedMessages, _, err := s.processor.HandleMessages(message, true)
decodedMessages, _, err := s.sender.HandleMessages(message, true)
s.Require().NoError(err)
s.Require().Equal(1, len(decodedMessages))
@ -130,7 +130,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesWrapped() {
s.Require().Equal(protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, decodedMessages[0].Type)
}
func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() {
func (s *MessageSenderSuite) TestHandleDecodedMessagesDatasync() {
relayerKey, err := crypto.GenerateKey()
s.Require().NoError(err)
@ -154,7 +154,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() {
message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey)
message.Payload = marshalledDataSyncMessage
decodedMessages, _, err := s.processor.HandleMessages(message, true)
decodedMessages, _, err := s.sender.HandleMessages(message, true)
s.Require().NoError(err)
// We send two messages, the unwrapped one will be attributed to the relayer, while the wrapped one will be attributed to the author
@ -167,14 +167,14 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasync() {
s.Require().Equal(protobuf.ApplicationMetadataMessage_CHAT_MESSAGE, decodedMessages[0].Type)
}
func (s *MessageProcessorSuite) CalculatePoWTest() {
func (s *MessageSenderSuite) CalculatePoWTest() {
largeSizePayload := make([]byte, largeSizeInBytes)
s.Require().Equal(whisperLargeSizePoW, calculatePoW(largeSizePayload))
normalSizePayload := make([]byte, largeSizeInBytes-1)
s.Require().Equal(whisperDefaultPoW, calculatePoW(normalSizePayload))
}
func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
func (s *MessageSenderSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
relayerKey, err := crypto.GenerateKey()
s.Require().NoError(err)
@ -206,7 +206,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
messageSpec, err := senderEncryptionProtocol.BuildDirectMessage(
relayerKey,
&s.processor.identity.PublicKey,
&s.sender.identity.PublicKey,
marshalledDataSyncMessage,
)
s.Require().NoError(err)
@ -218,7 +218,7 @@ func (s *MessageProcessorSuite) TestHandleDecodedMessagesDatasyncEncrypted() {
message.Sig = crypto.FromECDSAPub(&relayerKey.PublicKey)
message.Payload = encryptedPayload
decodedMessages, _, err := s.processor.HandleMessages(message, true)
decodedMessages, _, err := s.sender.HandleMessages(message, true)
s.Require().NoError(err)
// We send two messages, the unwrapped one will be attributed to the relayer,

View File

@ -72,7 +72,7 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf
continue
}
data, err := proto.Marshal(payload)
marshalledPayload, err := proto.Marshal(payload)
if err != nil {
t.logger.Error("failed to marshal payload")
continue
@ -85,7 +85,7 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf
}
// We don't return an error otherwise datasync will keep
// re-trying sending at each epoch
err = t.dispatch(context.Background(), publicKey, data, payload)
err = t.dispatch(context.Background(), publicKey, marshalledPayload, payload)
if err != nil {
t.logger.Error("failed to send message", zap.Error(err))
continue

View File

@ -189,7 +189,7 @@ func (p *Protocol) BuildDirectMessage(myIdentityKey *ecdsa.PrivateKey, publicKey
}
// Encrypt payload
directMessage, installations, err := p.encryptor.EncryptPayload(publicKey, myIdentityKey, activeInstallations, payload)
directMessagesByInstalls, installations, err := p.encryptor.EncryptPayload(publicKey, myIdentityKey, activeInstallations, payload)
if err != nil {
return nil, err
}
@ -197,7 +197,7 @@ func (p *Protocol) BuildDirectMessage(myIdentityKey *ecdsa.PrivateKey, publicKey
// Build message
message := &ProtocolMessage{
InstallationId: p.encryptor.config.InstallationID,
DirectMessage: directMessage,
DirectMessage: directMessagesByInstalls,
}
err = p.addBundle(myIdentityKey, message)

View File

@ -81,7 +81,7 @@ type Messenger struct {
persistence *sqlitePersistence
transport *transport.Transport
encryptor *encryption.Protocol
processor *common.MessageProcessor
sender *common.MessageSender
handler *MessageHandler
ensVerifier *ens.Verifier
pushNotificationClient *pushnotificationclient.Client
@ -249,7 +249,7 @@ func NewMessenger(
logger,
)
processor, err := common.NewMessageProcessor(
sender, err := common.NewMessageSender(
identity,
database,
encryptionProtocol,
@ -258,7 +258,7 @@ func NewMessenger(
c.featureFlags,
)
if err != nil {
return nil, errors.Wrap(err, "failed to create messageProcessor")
return nil, errors.Wrap(err, "failed to create messageSender")
}
// Initialize push notification server
@ -266,7 +266,7 @@ func NewMessenger(
if c.pushNotificationServerConfig != nil && c.pushNotificationServerConfig.Enabled {
c.pushNotificationServerConfig.Identity = identity
pushNotificationServerPersistence := pushnotificationserver.NewSQLitePersistence(database)
pushNotificationServer = pushnotificationserver.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, processor)
pushNotificationServer = pushnotificationserver.New(c.pushNotificationServerConfig, pushNotificationServerPersistence, sender)
}
// Initialize push notification client
@ -282,7 +282,7 @@ func NewMessenger(
pushNotificationClientConfig.Logger = logger
pushNotificationClientConfig.InstallationID = installationID
pushNotificationClient := pushnotificationclient.New(pushNotificationClientPersistence, pushNotificationClientConfig, processor, sqlitePersistence)
pushNotificationClient := pushnotificationclient.New(pushNotificationClientPersistence, pushNotificationClientConfig, sender, sqlitePersistence)
ensVerifier := ens.New(node, logger, transp, database, c.verifyENSURL, c.verifyENSContractAddress)
@ -300,7 +300,7 @@ func NewMessenger(
persistence: sqlitePersistence,
transport: transp,
encryptor: encryptionProtocol,
processor: processor,
sender: sender,
handler: handler,
pushNotificationClient: pushNotificationClient,
pushNotificationServer: pushNotificationServer,
@ -328,7 +328,7 @@ func NewMessenger(
encryptionProtocol.Stop,
transp.ResetFilters,
transp.Stop,
func() error { processor.Stop(); return nil },
func() error { sender.Stop(); return nil },
// Currently this often fails, seems like it's safe to ignore them
// https://github.com/uber-go/zap/issues/328
func() error { _ = logger.Sync; return nil },
@ -444,7 +444,7 @@ func (m *Messenger) Start() (*MessengerResponse, error) {
}
// set shared secret handles
m.processor.SetHandleSharedSecrets(m.handleSharedSecrets)
m.sender.SetHandleSharedSecrets(m.handleSharedSecrets)
subscriptions, err := m.encryptor.Start(m.identity)
if err != nil {
@ -572,7 +572,7 @@ func (m *Messenger) publishContactCode() error {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = m.processor.SendPublic(ctx, contactCodeTopic, rawMessage)
_, err = m.sender.SendPublic(ctx, contactCodeTopic, rawMessage)
if err != nil {
m.logger.Warn("failed to send a contact code", zap.Error(err))
}
@ -643,7 +643,7 @@ func (m *Messenger) handleStandaloneChatIdentity(chat *Chat) error {
}
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
_, err = m.processor.SendPublic(ctx, chat.ID, rawMessage)
_, err = m.sender.SendPublic(ctx, chat.ID, rawMessage)
if err != nil {
return err
}
@ -1200,7 +1200,7 @@ func (m *Messenger) CreateGroupChatWithMembers(ctx context.Context, name string,
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
@ -1279,7 +1279,7 @@ func (m *Messenger) RemoveMemberFromGroupChat(ctx context.Context, chatID string
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
@ -1364,7 +1364,7 @@ func (m *Messenger) AddMembersToGroupChat(ctx context.Context, chatID string, me
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
@ -1425,7 +1425,7 @@ func (m *Messenger) ChangeGroupChatName(ctx context.Context, chatID string, name
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
@ -1500,7 +1500,7 @@ func (m *Messenger) SendGroupChatInvitationRequest(ctx context.Context, chatID s
return nil, err
}
id, err := m.processor.SendPrivate(ctx, adminpk, &spec)
id, err := m.sender.SendPrivate(ctx, adminpk, &spec)
if err != nil {
return nil, err
}
@ -1568,7 +1568,7 @@ func (m *Messenger) SendGroupChatInvitationRejection(ctx context.Context, invita
return nil, err
}
id, err := m.processor.SendPrivate(ctx, userpk, &spec)
id, err := m.sender.SendPrivate(ctx, userpk, &spec)
if err != nil {
return nil, err
}
@ -1626,7 +1626,7 @@ func (m *Messenger) AddAdminsToGroupChat(ctx context.Context, chatID string, mem
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
@ -1690,7 +1690,7 @@ func (m *Messenger) ConfirmJoiningGroup(ctx context.Context, chatID string) (*Me
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
@ -1753,7 +1753,7 @@ func (m *Messenger) LeaveGroupChat(ctx context.Context, chatID string, remove bo
return nil, err
}
encodedMessage, err := m.processor.EncodeMembershipUpdate(group, nil)
encodedMessage, err := m.sender.EncodeMembershipUpdate(group, nil)
if err != nil {
return nil, err
}
@ -1837,7 +1837,7 @@ func (m *Messenger) sendToPairedDevices(ctx context.Context, spec common.RawMess
hasPairedDevices := m.hasPairedDevices()
// We send a message to any paired device
if hasPairedDevices {
_, err := m.processor.SendPrivate(ctx, &m.identity.PublicKey, &spec)
_, err := m.sender.SendPrivate(ctx, &m.identity.PublicKey, &spec)
if err != nil {
return err
}
@ -1849,7 +1849,7 @@ func (m *Messenger) dispatchPairInstallationMessage(ctx context.Context, spec co
var err error
var id []byte
id, err = m.processor.SendPairInstallation(ctx, &m.identity.PublicKey, spec)
id, err = m.sender.SendPairInstallation(ctx, &m.identity.PublicKey, spec)
if err != nil {
return nil, err
@ -1884,7 +1884,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage)
//message for sending to paired devices later
specCopyForPairedDevices := spec
if !common.IsPubKeyEqual(publicKey, &m.identity.PublicKey) {
id, err = m.processor.SendPrivate(ctx, publicKey, &spec)
id, err = m.sender.SendPrivate(ctx, publicKey, &spec)
if err != nil {
return spec, err
@ -1899,7 +1899,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage)
case ChatTypePublic, ChatTypeProfile:
logger.Debug("sending public message", zap.String("chatName", chat.Name))
id, err = m.processor.SendPublic(ctx, chat.ID, spec)
id, err = m.sender.SendPublic(ctx, chat.ID, spec)
if err != nil {
return spec, err
}
@ -1918,7 +1918,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage)
}
logger.Debug("sending community chat message", zap.String("chatName", chat.Name))
id, err = m.processor.SendPublic(ctx, chat.ID, spec)
id, err = m.sender.SendPublic(ctx, chat.ID, spec)
if err != nil {
return spec, err
}
@ -1967,7 +1967,7 @@ func (m *Messenger) dispatchMessage(ctx context.Context, spec common.RawMessage)
spec.MessageType = protobuf.ApplicationMetadataMessage_MEMBERSHIP_UPDATE_MESSAGE
}
id, err = m.processor.SendGroup(ctx, spec.Recipients, spec)
id, err = m.sender.SendGroup(ctx, spec.Recipients, spec)
if err != nil {
return spec, err
}
@ -2479,7 +2479,7 @@ func (m *Messenger) handleRetrievedMessages(chatWithMessages map[transport.Filte
for _, shhMessage := range messages {
// Indicates tha all messages in the batch have been processed correctly
allMessagesProcessed := true
statusMessages, acks, err := m.processor.HandleMessages(shhMessage, true)
statusMessages, acks, err := m.sender.HandleMessages(shhMessage, true)
if err != nil {
logger.Info("failed to decode messages", zap.Error(err))
continue
@ -4083,7 +4083,7 @@ func (m *Messenger) StartPushNotificationsServer() error {
Logger: m.logger,
Identity: m.identity,
}
m.pushNotificationServer = pushnotificationserver.New(config, pushNotificationServerPersistence, m.processor)
m.pushNotificationServer = pushnotificationserver.New(config, pushNotificationServerPersistence, m.sender)
}
return m.pushNotificationServer.Start()
@ -4280,7 +4280,7 @@ func (m *Messenger) encodeChatEntity(chat *Chat, message common.ChatEntity) ([]b
return nil, err
}
encodedMessage, err = m.processor.EncodeAbridgedMembershipUpdate(group, message)
encodedMessage, err = m.sender.EncodeAbridgedMembershipUpdate(group, message)
if err != nil {
return nil, err
}

View File

@ -34,7 +34,7 @@ func (m *Messenger) publishOrg(org *communities.Community) error {
SkipEncryption: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_DESCRIPTION,
}
_, err = m.processor.SendPublic(context.Background(), org.IDString(), rawMessage)
_, err = m.sender.SendPublic(context.Background(), org.IDString(), rawMessage)
return err
}
@ -57,7 +57,7 @@ func (m *Messenger) publishOrgInvitation(org *communities.Community, invitation
SkipEncryption: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_INVITATION,
}
_, err = m.processor.SendPrivate(context.Background(), pk, &rawMessage)
_, err = m.sender.SendPrivate(context.Background(), pk, &rawMessage)
return err
}
@ -210,7 +210,7 @@ func (m *Messenger) RequestToJoinCommunity(request *requests.RequestToJoinCommun
SkipEncryption: true,
MessageType: protobuf.ApplicationMetadataMessage_COMMUNITY_REQUEST_TO_JOIN,
}
_, err = m.processor.SendCommunityMessage(context.Background(), community.PublicKey(), rawMessage)
_, err = m.sender.SendCommunityMessage(context.Background(), community.PublicKey(), rawMessage)
if err != nil {
return nil, err
}

View File

@ -2532,7 +2532,7 @@ func (s *MessageHandlerSuite) TestRun() {
// ChatID is not set at the beginning.
s.Empty(message.LocalChatID)
message.ID = strconv.Itoa(idx) // manually set the ID because messages does not go through messageProcessor
message.ID = strconv.Itoa(idx) // manually set the ID because messages does not go through messageSender
chat, err := s.messageHandler.matchChatEntity(&message, chatsMap, contactsMap, &testTimeSource{})
if tc.Error {
s.Require().Error(err)

View File

@ -179,8 +179,8 @@ type Client struct {
// randomReader only used for testing so we have deterministic encryption
reader io.Reader
//messageProcessor is a message processor used to send and being notified of messages
messageProcessor *common.MessageProcessor
//messageSender used to send and being notified of messages
messageSender *common.MessageSender
// registrationLoopQuitChan is a channel to indicate to the registration loop that should be terminating
registrationLoopQuitChan chan struct{}
@ -194,11 +194,11 @@ type Client struct {
registrationSubscriptions []chan struct{}
}
func New(persistence *Persistence, config *Config, processor *common.MessageProcessor, messagePersistence MessagePersistence) *Client {
func New(persistence *Persistence, config *Config, sender *common.MessageSender, messagePersistence MessagePersistence) *Client {
return &Client{
quit: make(chan struct{}),
config: config,
messageProcessor: processor,
messageSender: sender,
messagePersistence: messagePersistence,
persistence: persistence,
reader: rand.Reader,
@ -206,8 +206,8 @@ func New(persistence *Persistence, config *Config, processor *common.MessageProc
}
func (c *Client) Start() error {
if c.messageProcessor == nil {
return errors.New("can't start, missing message processor")
if c.messageSender == nil {
return errors.New("can't start, missing message sender")
}
err := c.loadLastPushNotificationRegistration()
@ -685,8 +685,8 @@ func (c *Client) generateSharedKey(publicKey *ecdsa.PublicKey) ([]byte, error) {
func (c *Client) subscribeForMessageEvents() {
go func() {
c.config.Logger.Debug("subscribing for message events")
sentMessagesSubscription := c.messageProcessor.SubscribeToSentMessages()
scheduledMessagesSubscription := c.messageProcessor.SubscribeToScheduledMessages()
sentMessagesSubscription := c.messageSender.SubscribeToSentMessages()
scheduledMessagesSubscription := c.messageSender.SubscribeToScheduledMessages()
for {
select {
// order is important, since both are asynchronous, we want to process
@ -1273,7 +1273,7 @@ func (c *Client) registerWithServer(registration *protobuf.PushNotificationRegis
SkipEncryption: true,
}
_, err = c.messageProcessor.SendPrivate(context.Background(), server.PublicKey, &rawMessage)
_, err = c.messageSender.SendPrivate(context.Background(), server.PublicKey, &rawMessage)
if err != nil {
return err
@ -1336,7 +1336,7 @@ func (c *Client) SendNotification(publicKey *ecdsa.PublicKey, installationIDs []
if err != nil {
return nil, err
}
_, err = c.messageProcessor.AddEphemeralKey(ephemeralKey)
_, err = c.messageSender.AddEphemeralKey(ephemeralKey)
if err != nil {
return nil, err
}
@ -1377,7 +1377,7 @@ func (c *Client) SendNotification(publicKey *ecdsa.PublicKey, installationIDs []
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_REQUEST,
}
_, err = c.messageProcessor.SendPrivate(context.Background(), serverPublicKey, &rawMessage)
_, err = c.messageSender.SendPrivate(context.Background(), serverPublicKey, &rawMessage)
if err != nil {
return nil, err
@ -1657,14 +1657,14 @@ func (c *Client) queryPushNotificationInfo(publicKey *ecdsa.PublicKey) error {
MessageType: protobuf.ApplicationMetadataMessage_PUSH_NOTIFICATION_QUERY,
}
_, err = c.messageProcessor.AddEphemeralKey(ephemeralKey)
_, err = c.messageSender.AddEphemeralKey(ephemeralKey)
if err != nil {
return err
}
// this is the topic of message
encodedPublicKey := hex.EncodeToString(hashedPublicKey)
messageID, err := c.messageProcessor.SendPublic(context.Background(), encodedPublicKey, rawMessage)
messageID, err := c.messageSender.SendPublic(context.Background(), encodedPublicKey, rawMessage)
if err != nil {
return err

View File

@ -34,19 +34,19 @@ type Config struct {
}
type Server struct {
persistence Persistence
config *Config
messageProcessor *common.MessageProcessor
persistence Persistence
config *Config
messageSender *common.MessageSender
// SentRequests keeps track of the requests sent to gorush, for testing only
SentRequests int64
}
func New(config *Config, persistence Persistence, messageProcessor *common.MessageProcessor) *Server {
func New(config *Config, persistence Persistence, messageSender *common.MessageSender) *Server {
if len(config.GorushURL) == 0 {
config.GorushURL = defaultGorushURL
}
return &Server{persistence: persistence, config: config, messageProcessor: messageProcessor}
return &Server{persistence: persistence, config: config, messageSender: messageSender}
}
func (s *Server) Start() error {
@ -112,7 +112,7 @@ func (s *Server) HandlePushNotificationRegistration(publicKey *ecdsa.PublicKey,
SkipEncryption: true,
}
_, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, &rawMessage)
_, err = s.messageSender.SendPrivate(context.Background(), publicKey, &rawMessage)
return err
}
@ -135,7 +135,7 @@ func (s *Server) HandlePushNotificationQuery(publicKey *ecdsa.PublicKey, message
SkipEncryption: true,
}
_, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, &rawMessage)
_, err = s.messageSender.SendPrivate(context.Background(), publicKey, &rawMessage)
return err
}
@ -178,7 +178,7 @@ func (s *Server) HandlePushNotificationRequest(publicKey *ecdsa.PublicKey,
SkipEncryption: true,
}
_, err = s.messageProcessor.SendPrivate(context.Background(), publicKey, &rawMessage)
_, err = s.messageSender.SendPrivate(context.Background(), publicKey, &rawMessage)
return err
}
@ -459,11 +459,11 @@ func (s *Server) sendPushNotification(requestAndRegistrations []*RequestAndRegis
// listenToPublicKeyQueryTopic listen to a topic derived from the hashed public key
func (s *Server) listenToPublicKeyQueryTopic(hashedPublicKey []byte) error {
if s.messageProcessor == nil {
if s.messageSender == nil {
return nil
}
encodedPublicKey := hex.EncodeToString(hashedPublicKey)
_, err := s.messageProcessor.JoinPublic(encodedPublicKey)
_, err := s.messageSender.JoinPublic(encodedPublicKey)
return err
}