2020-07-06 10:54:22 +02:00
|
|
|
package common
|
2019-09-02 11:29:06 +02:00
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"crypto/ecdsa"
|
|
|
|
"database/sql"
|
2020-07-21 17:41:10 +02:00
|
|
|
"sync"
|
2019-09-02 11:29:06 +02:00
|
|
|
"time"
|
|
|
|
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"github.com/pkg/errors"
|
2020-01-02 10:10:19 +01:00
|
|
|
datasyncnode "github.com/vacp2p/mvds/node"
|
|
|
|
datasyncproto "github.com/vacp2p/mvds/protobuf"
|
|
|
|
"go.uber.org/zap"
|
|
|
|
|
2019-11-23 18:57:05 +01:00
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
|
|
"github.com/status-im/status-go/eth-node/types"
|
2019-11-21 17:19:22 +01:00
|
|
|
"github.com/status-im/status-go/protocol/datasync"
|
|
|
|
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
|
|
|
|
"github.com/status-im/status-go/protocol/encryption"
|
2020-07-31 14:22:05 +02:00
|
|
|
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
|
2020-07-25 22:25:40 +01:00
|
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
2020-01-13 20:17:30 +01:00
|
|
|
"github.com/status-im/status-go/protocol/transport"
|
2019-11-21 17:19:22 +01:00
|
|
|
v1protocol "github.com/status-im/status-go/protocol/v1"
|
2019-09-02 11:29:06 +02:00
|
|
|
)
|
|
|
|
|
|
|
|
// Whisper message properties.
|
|
|
|
const (
|
2020-05-13 15:19:29 +02:00
|
|
|
whisperTTL = 15
|
|
|
|
whisperDefaultPoW = 0.002
|
|
|
|
// whisperLargeSizePoW is the PoWTarget for larger payload sizes
|
|
|
|
whisperLargeSizePoW = 0.000002
|
|
|
|
// largeSizeInBytes is when should we be using a lower POW.
|
|
|
|
// Roughly this is 50KB
|
|
|
|
largeSizeInBytes = 50000
|
|
|
|
whisperPoWTime = 5
|
2019-09-02 11:29:06 +02:00
|
|
|
)
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
// SentMessage reprent a message that has been passed to the transport layer
|
|
|
|
type SentMessage struct {
|
|
|
|
PublicKey *ecdsa.PublicKey
|
|
|
|
Spec *encryption.ProtocolMessageSpec
|
|
|
|
MessageIDs [][]byte
|
|
|
|
}
|
|
|
|
|
|
|
|
type MessageProcessor struct {
|
2020-07-22 09:41:40 +02:00
|
|
|
identity *ecdsa.PrivateKey
|
|
|
|
datasync *datasync.DataSync
|
|
|
|
protocol *encryption.Protocol
|
|
|
|
transport transport.Transport
|
|
|
|
logger *zap.Logger
|
|
|
|
|
|
|
|
// ephemeralKeys is a map that contains the ephemeral keys of the client, used
|
|
|
|
// to decrypt messages
|
|
|
|
ephemeralKeys map[string]*ecdsa.PrivateKey
|
|
|
|
ephemeralKeysMutex sync.Mutex
|
|
|
|
|
|
|
|
// sentMessagesSubscriptions contains all the subscriptions for sent messages
|
|
|
|
sentMessagesSubscriptions []chan<- *SentMessage
|
|
|
|
// sentMessagesSubscriptions contains all the subscriptions for scheduled messages
|
|
|
|
scheduledMessagesSubscriptions []chan<- *RawMessage
|
2020-07-06 10:54:22 +02:00
|
|
|
|
|
|
|
featureFlags FeatureFlags
|
2020-07-31 14:22:05 +02:00
|
|
|
|
|
|
|
// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
|
|
|
|
handleSharedSecrets func([]*sharedsecret.Secret) error
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
func NewMessageProcessor(
|
2019-09-02 11:29:06 +02:00
|
|
|
identity *ecdsa.PrivateKey,
|
|
|
|
database *sql.DB,
|
|
|
|
enc *encryption.Protocol,
|
2020-01-13 20:17:30 +01:00
|
|
|
transport transport.Transport,
|
2019-09-02 11:29:06 +02:00
|
|
|
logger *zap.Logger,
|
2020-07-06 10:54:22 +02:00
|
|
|
features FeatureFlags,
|
|
|
|
) (*MessageProcessor, error) {
|
2020-02-10 12:22:37 +01:00
|
|
|
dataSyncTransport := datasync.NewNodeTransport()
|
2019-09-02 11:29:06 +02:00
|
|
|
dataSyncNode, err := datasyncnode.NewPersistentNode(
|
|
|
|
database,
|
|
|
|
dataSyncTransport,
|
|
|
|
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
|
|
|
|
datasyncnode.BATCH,
|
|
|
|
datasync.CalculateSendTime,
|
|
|
|
logger,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-07-06 10:54:22 +02:00
|
|
|
ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger)
|
2019-09-02 11:29:06 +02:00
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
p := &MessageProcessor{
|
2020-07-21 17:41:10 +02:00
|
|
|
identity: identity,
|
|
|
|
datasync: ds,
|
|
|
|
protocol: enc,
|
|
|
|
transport: transport,
|
|
|
|
logger: logger,
|
|
|
|
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
|
|
|
|
featureFlags: features,
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Initializing DataSync is required to encrypt and send messages.
|
|
|
|
// With DataSync enabled, messages are added to the DataSync
|
|
|
|
// but actual encrypt and send calls are postponed.
|
|
|
|
// sendDataSync is responsible for encrypting and sending postponed messages.
|
2020-07-06 10:54:22 +02:00
|
|
|
if features.Datasync {
|
2020-11-03 13:42:42 +01:00
|
|
|
// We set the max message size to 3/4 of the allowed message size, to leave
|
|
|
|
// room for encryption.
|
|
|
|
// Messages will be tried to send in any case, even if they exceed this
|
|
|
|
// value
|
|
|
|
ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger)
|
2020-11-24 07:25:54 +01:00
|
|
|
ds.Start(datasync.DatasyncTicker)
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return p, nil
|
|
|
|
}
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) Stop() {
|
2020-07-22 09:41:40 +02:00
|
|
|
for _, c := range p.sentMessagesSubscriptions {
|
2020-07-06 10:54:22 +02:00
|
|
|
close(c)
|
|
|
|
}
|
2020-07-31 14:22:05 +02:00
|
|
|
p.sentMessagesSubscriptions = nil
|
2019-09-02 11:29:06 +02:00
|
|
|
p.datasync.Stop() // idempotent op
|
|
|
|
}
|
|
|
|
|
2020-07-31 14:22:05 +02:00
|
|
|
func (p *MessageProcessor) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
|
|
|
|
p.handleSharedSecrets = handler
|
|
|
|
}
|
|
|
|
|
2020-05-13 15:24:52 +02:00
|
|
|
// SendPrivate takes encoded data, encrypts it and sends through the wire.
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) SendPrivate(
|
2019-09-02 11:29:06 +02:00
|
|
|
ctx context.Context,
|
2019-10-14 16:10:48 +02:00
|
|
|
recipient *ecdsa.PublicKey,
|
2021-02-23 17:47:45 +02:00
|
|
|
rawMessage *RawMessage,
|
2019-09-02 11:29:06 +02:00
|
|
|
) ([]byte, error) {
|
|
|
|
p.logger.Debug(
|
|
|
|
"sending a private message",
|
2020-07-21 17:41:10 +02:00
|
|
|
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
|
2020-05-13 15:24:52 +02:00
|
|
|
zap.String("site", "SendPrivate"),
|
2019-09-02 11:29:06 +02:00
|
|
|
)
|
2020-07-22 09:41:40 +02:00
|
|
|
// Currently we don't support sending through datasync and setting custom waku fields,
|
|
|
|
// as the datasync interface is not rich enough to propagate that information, so we
|
|
|
|
// would have to add some complexity to handle this.
|
2021-01-18 10:12:03 +01:00
|
|
|
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryption || rawMessage.SendOnPersonalTopic) {
|
|
|
|
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
|
2020-07-22 09:41:40 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// Set sender identity if not specified
|
|
|
|
if rawMessage.Sender == nil {
|
|
|
|
rawMessage.Sender = p.identity
|
|
|
|
}
|
|
|
|
|
2021-02-23 17:47:45 +02:00
|
|
|
return p.sendPrivate(ctx, recipient, rawMessage)
|
2019-09-26 09:01:17 +02:00
|
|
|
}
|
2019-09-02 11:29:06 +02:00
|
|
|
|
2020-07-22 09:41:40 +02:00
|
|
|
// SendGroup takes encoded data, encrypts it and sends through the wire,
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
// always return the messageID
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) SendGroup(
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
ctx context.Context,
|
|
|
|
recipients []*ecdsa.PublicKey,
|
2020-07-28 15:22:22 +02:00
|
|
|
rawMessage RawMessage,
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
) ([]byte, error) {
|
|
|
|
p.logger.Debug(
|
|
|
|
"sending a private group message",
|
2020-05-13 15:24:52 +02:00
|
|
|
zap.String("site", "SendGroup"),
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
)
|
2020-07-22 09:41:40 +02:00
|
|
|
// Set sender if not specified
|
2020-07-21 17:41:10 +02:00
|
|
|
if rawMessage.Sender == nil {
|
|
|
|
rawMessage.Sender = p.identity
|
|
|
|
}
|
2020-07-22 09:41:40 +02:00
|
|
|
|
|
|
|
// Calculate messageID first and set on raw message
|
2020-07-28 15:22:22 +02:00
|
|
|
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
|
|
}
|
2020-07-21 17:41:10 +02:00
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
2020-07-22 09:41:40 +02:00
|
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
|
2020-05-13 15:24:52 +02:00
|
|
|
// Send to each recipients
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
for _, recipient := range recipients {
|
2020-07-28 15:22:22 +02:00
|
|
|
_, err = p.sendPrivate(ctx, recipient, &rawMessage)
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to send message")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return messageID, nil
|
|
|
|
}
|
|
|
|
|
2019-09-26 09:01:17 +02:00
|
|
|
// sendPrivate sends data to the recipient identifying with a given public key.
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) sendPrivate(
|
2019-09-26 09:01:17 +02:00
|
|
|
ctx context.Context,
|
2019-10-14 16:10:48 +02:00
|
|
|
recipient *ecdsa.PublicKey,
|
2020-05-13 15:24:52 +02:00
|
|
|
rawMessage *RawMessage,
|
2019-09-26 09:01:17 +02:00
|
|
|
) ([]byte, error) {
|
2020-07-21 17:41:10 +02:00
|
|
|
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
|
|
|
|
2020-05-13 15:24:52 +02:00
|
|
|
wrappedMessage, err := p.wrapMessageV1(rawMessage)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
|
|
}
|
|
|
|
|
2020-07-21 17:41:10 +02:00
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
2020-07-22 09:41:40 +02:00
|
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
|
|
|
|
// Notify before dispatching, otherwise the dispatch subscription might happen
|
|
|
|
// earlier than the scheduled
|
|
|
|
p.notifyOnScheduledMessage(rawMessage)
|
2019-09-02 11:29:06 +02:00
|
|
|
|
2020-07-15 08:31:39 +02:00
|
|
|
if p.featureFlags.Datasync && rawMessage.ResendAutomatically {
|
2020-07-21 17:41:10 +02:00
|
|
|
// No need to call transport tracking.
|
|
|
|
// It is done in a data sync dispatch step.
|
2021-02-23 17:47:45 +02:00
|
|
|
datasyncID, err := p.addToDataSync(recipient, wrappedMessage)
|
|
|
|
if err != nil {
|
2019-09-02 11:29:06 +02:00
|
|
|
return nil, errors.Wrap(err, "failed to send message with datasync")
|
|
|
|
}
|
2021-02-23 17:47:45 +02:00
|
|
|
rawMessage.DataSyncID = datasyncID
|
2020-07-22 09:41:40 +02:00
|
|
|
} else if rawMessage.SkipEncryption {
|
|
|
|
// When SkipEncryption is set we don't pass the message to the encryption layer
|
2020-07-21 17:41:10 +02:00
|
|
|
messageIDs := [][]byte{messageID}
|
2021-01-18 10:12:03 +01:00
|
|
|
hash, newMessage, err := p.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs)
|
2020-07-21 17:41:10 +02:00
|
|
|
if err != nil {
|
2020-11-18 10:16:51 +01:00
|
|
|
p.logger.Error("failed to send a private message", zap.Error(err))
|
2020-07-21 17:41:10 +02:00
|
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
|
|
}
|
|
|
|
|
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
|
|
|
|
2019-09-02 11:29:06 +02:00
|
|
|
} else {
|
2020-07-21 17:41:10 +02:00
|
|
|
messageSpec, err := p.protocol.BuildDirectMessage(rawMessage.Sender, recipient, wrappedMessage)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to encrypt message")
|
|
|
|
}
|
|
|
|
|
2020-07-31 14:22:05 +02:00
|
|
|
// The shared secret needs to be handle before we send a message
|
|
|
|
// otherwise the topic might not be set up before we receive a message
|
|
|
|
if p.handleSharedSecrets != nil {
|
|
|
|
err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-06-30 09:50:59 +02:00
|
|
|
messageIDs := [][]byte{messageID}
|
|
|
|
hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
2020-11-18 10:16:51 +01:00
|
|
|
p.logger.Error("failed to send a private message", zap.Error(err))
|
2019-09-02 11:29:06 +02:00
|
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
|
|
}
|
|
|
|
|
2020-06-30 09:50:59 +02:00
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
return messageID, nil
|
|
|
|
}
|
|
|
|
|
2020-01-10 19:59:01 +01:00
|
|
|
// sendPairInstallation sends data to the recipients, using DH
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) SendPairInstallation(
|
2019-10-14 16:10:48 +02:00
|
|
|
ctx context.Context,
|
2020-01-10 19:59:01 +01:00
|
|
|
recipient *ecdsa.PublicKey,
|
2020-07-28 15:22:22 +02:00
|
|
|
rawMessage RawMessage,
|
2020-01-10 19:59:01 +01:00
|
|
|
) ([]byte, error) {
|
2020-07-21 17:41:10 +02:00
|
|
|
p.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
2020-01-10 19:59:01 +01:00
|
|
|
|
2020-07-28 15:22:22 +02:00
|
|
|
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
|
2020-01-10 19:59:01 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
|
|
}
|
|
|
|
|
|
|
|
messageSpec, err := p.protocol.BuildDHMessage(p.identity, recipient, wrappedMessage)
|
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to encrypt message")
|
|
|
|
}
|
|
|
|
|
2020-06-30 09:50:59 +02:00
|
|
|
messageID := v1protocol.MessageID(&p.identity.PublicKey, wrappedMessage)
|
|
|
|
messageIDs := [][]byte{messageID}
|
|
|
|
|
|
|
|
hash, newMessage, err := p.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
|
2020-01-10 19:59:01 +01:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
|
|
}
|
|
|
|
|
2020-06-30 09:50:59 +02:00
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
2020-01-10 19:59:01 +01:00
|
|
|
|
|
|
|
return messageID, nil
|
|
|
|
}
|
|
|
|
|
2021-01-08 12:26:15 +01:00
|
|
|
func (p *MessageProcessor) encodeMembershipUpdate(
|
|
|
|
message v1protocol.MembershipUpdateMessage,
|
2020-07-25 22:25:40 +01:00
|
|
|
chatEntity ChatEntity,
|
2019-12-02 16:34:05 +01:00
|
|
|
) ([]byte, error) {
|
2019-10-14 16:10:48 +02:00
|
|
|
|
2020-07-27 12:13:22 +02:00
|
|
|
if chatEntity != nil {
|
|
|
|
chatEntityProtobuf := chatEntity.GetProtobuf()
|
2020-07-28 10:02:51 +02:00
|
|
|
switch chatEntityProtobuf := chatEntityProtobuf.(type) {
|
2020-07-27 12:13:22 +02:00
|
|
|
case *protobuf.ChatMessage:
|
2020-07-28 10:02:51 +02:00
|
|
|
message.Message = chatEntityProtobuf
|
2020-07-27 12:13:22 +02:00
|
|
|
case *protobuf.EmojiReaction:
|
2020-07-28 10:02:51 +02:00
|
|
|
message.EmojiReaction = chatEntityProtobuf
|
2020-07-27 12:13:22 +02:00
|
|
|
|
|
|
|
}
|
2019-10-14 16:10:48 +02:00
|
|
|
}
|
2020-07-27 12:13:22 +02:00
|
|
|
|
2019-11-21 17:19:22 +01:00
|
|
|
encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message)
|
2019-10-14 16:10:48 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to encode membership update message")
|
|
|
|
}
|
|
|
|
|
2020-01-10 19:59:01 +01:00
|
|
|
return encodedMessage, nil
|
2019-10-14 16:10:48 +02:00
|
|
|
}
|
|
|
|
|
2021-01-08 12:26:15 +01:00
|
|
|
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
|
|
|
|
// All the events in a group are encoded and added to the payload
|
|
|
|
func (p *MessageProcessor) EncodeMembershipUpdate(
|
|
|
|
group *v1protocol.Group,
|
|
|
|
chatEntity ChatEntity,
|
|
|
|
) ([]byte, error) {
|
|
|
|
message := v1protocol.MembershipUpdateMessage{
|
|
|
|
ChatID: group.ChatID(),
|
|
|
|
Events: group.Events(),
|
|
|
|
}
|
|
|
|
|
|
|
|
return p.encodeMembershipUpdate(message, chatEntity)
|
|
|
|
}
|
|
|
|
|
|
|
|
// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
|
|
|
|
// Only the events relevant to the sender are encoded
|
|
|
|
func (p *MessageProcessor) EncodeAbridgedMembershipUpdate(
|
|
|
|
group *v1protocol.Group,
|
|
|
|
chatEntity ChatEntity,
|
|
|
|
) ([]byte, error) {
|
|
|
|
message := v1protocol.MembershipUpdateMessage{
|
|
|
|
ChatID: group.ChatID(),
|
|
|
|
Events: group.AbridgedEvents(&p.identity.PublicKey),
|
|
|
|
}
|
|
|
|
return p.encodeMembershipUpdate(message, chatEntity)
|
|
|
|
}
|
|
|
|
|
2020-05-13 15:24:52 +02:00
|
|
|
// SendPublic takes encoded data, encrypts it and sends through the wire.
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) SendPublic(
|
2019-12-02 16:34:05 +01:00
|
|
|
ctx context.Context,
|
|
|
|
chatName string,
|
2020-07-28 15:22:22 +02:00
|
|
|
rawMessage RawMessage,
|
2019-12-02 16:34:05 +01:00
|
|
|
) ([]byte, error) {
|
2020-07-22 09:41:40 +02:00
|
|
|
// Set sender
|
2020-07-21 17:41:10 +02:00
|
|
|
if rawMessage.Sender == nil {
|
|
|
|
rawMessage.Sender = p.identity
|
|
|
|
}
|
2019-09-02 11:29:06 +02:00
|
|
|
|
2020-07-28 15:22:22 +02:00
|
|
|
wrappedMessage, err := p.wrapMessageV1(&rawMessage)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
|
|
}
|
|
|
|
|
2020-07-31 14:22:05 +02:00
|
|
|
var newMessage *types.NewMessage
|
2020-09-02 16:11:16 +02:00
|
|
|
|
|
|
|
messageSpec, err := p.protocol.BuildPublicMessage(p.identity, wrappedMessage)
|
|
|
|
if err != nil {
|
2020-11-18 10:16:51 +01:00
|
|
|
p.logger.Error("failed to send a public message", zap.Error(err))
|
2020-09-02 16:11:16 +02:00
|
|
|
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
|
|
|
|
}
|
|
|
|
|
2020-07-31 14:22:05 +02:00
|
|
|
if !rawMessage.SkipEncryption {
|
|
|
|
newMessage, err = MessageSpecToWhisper(messageSpec)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
newMessage = &types.NewMessage{
|
|
|
|
TTL: whisperTTL,
|
|
|
|
Payload: wrappedMessage,
|
|
|
|
PowTarget: calculatePoW(wrappedMessage),
|
|
|
|
PowTime: whisperPoWTime,
|
|
|
|
}
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
2020-07-22 09:41:40 +02:00
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
|
|
|
|
// notify before dispatching
|
2020-07-28 15:22:22 +02:00
|
|
|
p.notifyOnScheduledMessage(&rawMessage)
|
2020-07-22 09:41:40 +02:00
|
|
|
|
2019-10-09 16:22:53 +02:00
|
|
|
hash, err := p.transport.SendPublic(ctx, newMessage, chatName)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
2020-09-02 16:11:16 +02:00
|
|
|
sentMessage := &SentMessage{
|
|
|
|
Spec: messageSpec,
|
|
|
|
MessageIDs: [][]byte{messageID},
|
|
|
|
}
|
|
|
|
|
|
|
|
p.notifyOnSentMessage(sentMessage)
|
|
|
|
|
2019-09-02 11:29:06 +02:00
|
|
|
p.transport.Track([][]byte{messageID}, hash, newMessage)
|
|
|
|
|
|
|
|
return messageID, nil
|
|
|
|
}
|
|
|
|
|
2021-02-23 17:47:45 +02:00
|
|
|
// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
|
|
|
|
// returns cloned messages with replaced payloads
|
|
|
|
func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataSync) ([]*v1protocol.StatusMessage, [][]byte, error) {
|
|
|
|
var statusMessages []*v1protocol.StatusMessage
|
|
|
|
|
|
|
|
payloads, acks, err := datasync.UnwrapPayloadsAndAcks(
|
|
|
|
m.SigPubKey(),
|
|
|
|
m.DecryptedPayload,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
for _, payload := range payloads {
|
|
|
|
message, err := m.Clone()
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
message.DecryptedPayload = payload
|
|
|
|
statusMessages = append(statusMessages, message)
|
|
|
|
}
|
|
|
|
return statusMessages, acks, nil
|
|
|
|
}
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
// HandleMessages expects a whisper message as input, and it will go through
|
2019-09-02 11:29:06 +02:00
|
|
|
// a series of transformations until the message is parsed into an application
|
|
|
|
// layer message, or in case of Raw methods, the processing stops at the layer
|
2019-09-26 09:01:17 +02:00
|
|
|
// before.
|
|
|
|
// It returns an error only if the processing of required steps failed.
|
2021-02-23 17:47:45 +02:00
|
|
|
func (p *MessageProcessor) HandleMessages(shhMessage *types.Message, applicationLayer bool) ([]*v1protocol.StatusMessage, [][]byte, error) {
|
2019-09-02 11:29:06 +02:00
|
|
|
logger := p.logger.With(zap.String("site", "handleMessages"))
|
2019-12-17 21:51:01 +01:00
|
|
|
hlogger := logger.With(zap.ByteString("hash", shhMessage.Hash))
|
2019-11-21 17:19:22 +01:00
|
|
|
var statusMessage v1protocol.StatusMessage
|
2021-02-23 17:47:45 +02:00
|
|
|
var statusMessages []*v1protocol.StatusMessage
|
2019-09-02 11:29:06 +02:00
|
|
|
|
|
|
|
err := statusMessage.HandleTransport(shhMessage)
|
|
|
|
if err != nil {
|
|
|
|
hlogger.Error("failed to handle transport layer message", zap.Error(err))
|
2021-02-23 17:47:45 +02:00
|
|
|
return nil, nil, err
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
err = p.handleEncryptionLayer(context.Background(), &statusMessage)
|
|
|
|
if err != nil {
|
|
|
|
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
2021-02-23 17:47:45 +02:00
|
|
|
statusMessages, acks, err := unwrapDatasyncMessage(&statusMessage, p.datasync)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
|
|
|
hlogger.Debug("failed to handle datasync message", zap.Error(err))
|
2021-02-23 17:47:45 +02:00
|
|
|
//that wasn't a datasync message, so use the original payload
|
|
|
|
statusMessages = append(statusMessages, &statusMessage)
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
for _, statusMessage := range statusMessages {
|
|
|
|
err := statusMessage.HandleApplicationMetadata()
|
|
|
|
if err != nil {
|
|
|
|
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
|
|
|
|
}
|
|
|
|
|
|
|
|
if applicationLayer {
|
|
|
|
err = statusMessage.HandleApplication()
|
|
|
|
if err != nil {
|
2019-11-04 11:08:22 +01:00
|
|
|
hlogger.Error("failed to handle application layer message", zap.Error(err))
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-02-23 17:47:45 +02:00
|
|
|
return statusMessages, acks, nil
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
2020-07-22 09:41:40 +02:00
|
|
|
// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
|
|
|
|
func (p *MessageProcessor) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
|
2020-07-21 17:41:10 +02:00
|
|
|
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
|
2020-07-22 09:41:40 +02:00
|
|
|
|
|
|
|
p.ephemeralKeysMutex.Lock()
|
2020-07-21 17:41:10 +02:00
|
|
|
decryptionKey, ok := p.ephemeralKeys[destinationID]
|
2020-07-22 09:41:40 +02:00
|
|
|
p.ephemeralKeysMutex.Unlock()
|
|
|
|
|
|
|
|
// the key is not there, fallback on identity
|
2020-07-21 17:41:10 +02:00
|
|
|
if !ok {
|
2020-07-22 09:41:40 +02:00
|
|
|
return p.identity, false
|
2020-07-21 17:41:10 +02:00
|
|
|
}
|
2020-07-22 09:41:40 +02:00
|
|
|
return decryptionKey, true
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *MessageProcessor) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
|
|
|
|
logger := p.logger.With(zap.String("site", "handleEncryptionLayer"))
|
|
|
|
publicKey := message.SigPubKey()
|
|
|
|
|
|
|
|
// if it's an ephemeral key, we don't negotiate a topic
|
|
|
|
decryptionKey, skipNegotiation := p.fetchDecryptionKey(message.Dst)
|
2019-09-02 11:29:06 +02:00
|
|
|
|
2020-07-21 17:41:10 +02:00
|
|
|
err := message.HandleEncryption(decryptionKey, publicKey, p.protocol, skipNegotiation)
|
2020-07-22 09:41:40 +02:00
|
|
|
|
|
|
|
// if it's an ephemeral key, we don't have to handle a device not found error
|
|
|
|
if err == encryption.ErrDeviceNotFound && !skipNegotiation {
|
2019-11-04 11:08:22 +01:00
|
|
|
if err := p.handleErrDeviceNotFound(ctx, publicKey); err != nil {
|
|
|
|
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "failed to process an encrypted message")
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
|
2019-09-02 11:29:06 +02:00
|
|
|
now := time.Now().Unix()
|
|
|
|
advertise, err := p.protocol.ShouldAdvertiseBundle(publicKey, now)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if !advertise {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
messageSpec, err := p.protocol.BuildBundleAdvertiseMessage(p.identity, publicKey)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
|
|
|
defer cancel()
|
2020-06-30 09:50:59 +02:00
|
|
|
// We don't pass an array of messageIDs as no action needs to be taken
|
|
|
|
// when sending a bundle
|
|
|
|
_, _, err = p.sendMessageSpec(ctx, publicKey, messageSpec, nil)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
p.protocol.ConfirmBundleAdvertisement(publicKey, now)
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
|
2020-07-21 17:41:10 +02:00
|
|
|
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
Move to protobuf for Message type (#1706)
* Use a single Message type `v1/message.go` and `message.go` are the same now, and they embed `protobuf.ChatMessage`
* Use `SendChatMessage` for sending chat messages, this is basically the old `Send` but a bit more flexible so we can send different message types (stickers,commands), and not just text.
* Remove dedup from services/shhext. Because now we process in status-protocol, dedup makes less sense, as those messages are going to be processed anyway, so removing for now, we can re-evaluate if bringing it to status-go or not.
* Change the various retrieveX method to a single one:
`RetrieveAll` will be processing those messages that it can process (Currently only `Message`), and return the rest in `RawMessages` (still transit). The format for the response is:
`Chats`: -> The chats updated by receiving the message
`Messages`: -> The messages retrieved (already matched to a chat)
`Contacts`: -> The contacts updated by the messages
`RawMessages` -> Anything else that can't be parsed, eventually as we move everything to status-protocol-go this will go away.
2019-12-05 17:25:34 +01:00
|
|
|
return wrappedMessage, nil
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
2021-02-23 17:47:45 +02:00
|
|
|
func (p *MessageProcessor) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
|
2019-09-02 11:29:06 +02:00
|
|
|
groupID := datasync.ToOneToOneGroupID(&p.identity.PublicKey, publicKey)
|
|
|
|
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
|
|
|
|
exist, err := p.datasync.IsPeerInGroup(groupID, peerID)
|
|
|
|
if err != nil {
|
2021-02-23 17:47:45 +02:00
|
|
|
return nil, errors.Wrap(err, "failed to check if peer is in group")
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
if !exist {
|
|
|
|
if err := p.datasync.AddPeer(groupID, peerID); err != nil {
|
2021-02-23 17:47:45 +02:00
|
|
|
return nil, errors.Wrap(err, "failed to add peer")
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
}
|
2021-02-23 17:47:45 +02:00
|
|
|
id, err := p.datasync.AppendMessage(groupID, message)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
2021-02-23 17:47:45 +02:00
|
|
|
return nil, errors.Wrap(err, "failed to append message to datasync")
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
2021-02-23 17:47:45 +02:00
|
|
|
return id[:], nil
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
// sendDataSync sends a message scheduled by the data sync layer.
|
2019-09-26 09:01:17 +02:00
|
|
|
// Data Sync layer calls this method "dispatch" function.
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, encodedMessage []byte, payload *datasyncproto.Payload) error {
|
2020-07-10 15:26:06 +02:00
|
|
|
// Calculate the messageIDs
|
2019-09-02 11:29:06 +02:00
|
|
|
messageIDs := make([][]byte, 0, len(payload.Messages))
|
|
|
|
for _, payload := range payload.Messages {
|
2019-11-21 17:19:22 +01:00
|
|
|
messageIDs = append(messageIDs, v1protocol.MessageID(&p.identity.PublicKey, payload.Body))
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
messageSpec, err := p.protocol.BuildDirectMessage(p.identity, publicKey, encodedMessage)
|
|
|
|
if err != nil {
|
|
|
|
return errors.Wrap(err, "failed to encrypt message")
|
|
|
|
}
|
|
|
|
|
2020-07-31 14:22:05 +02:00
|
|
|
// The shared secret needs to be handle before we send a message
|
|
|
|
// otherwise the topic might not be set up before we receive a message
|
|
|
|
if p.handleSharedSecrets != nil {
|
|
|
|
err := p.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2020-06-30 09:50:59 +02:00
|
|
|
hash, newMessage, err := p.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
2020-11-18 10:16:51 +01:00
|
|
|
p.logger.Error("failed to send a datasync message", zap.Error(err))
|
2019-09-02 11:29:06 +02:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2019-10-09 16:22:53 +02:00
|
|
|
p.transport.Track(messageIDs, hash, newMessage)
|
2019-09-02 11:29:06 +02:00
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2020-07-31 14:22:05 +02:00
|
|
|
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
|
2021-01-18 10:12:03 +01:00
|
|
|
func (p *MessageProcessor) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
|
2020-07-21 17:41:10 +02:00
|
|
|
newMessage := &types.NewMessage{
|
|
|
|
TTL: whisperTTL,
|
|
|
|
Payload: payload,
|
|
|
|
PowTarget: calculatePoW(payload),
|
|
|
|
PowTime: whisperPoWTime,
|
|
|
|
}
|
2021-01-18 10:12:03 +01:00
|
|
|
var hash []byte
|
|
|
|
var err error
|
2020-07-21 17:41:10 +02:00
|
|
|
|
2021-01-18 10:12:03 +01:00
|
|
|
if rawMessage.SendOnPersonalTopic {
|
|
|
|
hash, err = p.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
|
|
|
|
} else {
|
|
|
|
hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
|
|
|
|
}
|
2020-07-21 17:41:10 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return hash, newMessage, nil
|
|
|
|
}
|
|
|
|
|
2019-09-26 09:01:17 +02:00
|
|
|
// sendMessageSpec analyses the spec properties and selects a proper transport method.
|
2020-07-06 10:54:22 +02:00
|
|
|
func (p *MessageProcessor) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) {
|
|
|
|
newMessage, err := MessageSpecToWhisper(messageSpec)
|
2019-09-02 11:29:06 +02:00
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
logger := p.logger.With(zap.String("site", "sendMessageSpec"))
|
|
|
|
|
|
|
|
var hash []byte
|
|
|
|
|
2020-07-31 14:22:05 +02:00
|
|
|
// process shared secret
|
|
|
|
if messageSpec.AgreedSecret {
|
2019-09-02 11:29:06 +02:00
|
|
|
logger.Debug("sending using shared secret")
|
2020-07-31 14:22:05 +02:00
|
|
|
hash, err = p.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
|
|
|
|
} else {
|
2019-09-02 11:29:06 +02:00
|
|
|
logger.Debug("sending partitioned topic")
|
2019-10-09 16:22:53 +02:00
|
|
|
hash, err = p.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
if err != nil {
|
|
|
|
return nil, nil, err
|
|
|
|
}
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
sentMessage := &SentMessage{
|
|
|
|
PublicKey: publicKey,
|
|
|
|
Spec: messageSpec,
|
|
|
|
MessageIDs: messageIDs,
|
|
|
|
}
|
2020-06-30 09:50:59 +02:00
|
|
|
|
2020-07-22 09:41:40 +02:00
|
|
|
p.notifyOnSentMessage(sentMessage)
|
|
|
|
|
|
|
|
return hash, newMessage, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// SubscribeToSentMessages returns a channel where we publish every time a message is sent
|
|
|
|
func (p *MessageProcessor) SubscribeToSentMessages() <-chan *SentMessage {
|
|
|
|
c := make(chan *SentMessage, 100)
|
|
|
|
p.sentMessagesSubscriptions = append(p.sentMessagesSubscriptions, c)
|
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
|
|
|
func (p *MessageProcessor) notifyOnSentMessage(sentMessage *SentMessage) {
|
2020-07-06 10:54:22 +02:00
|
|
|
// Publish on channels, drop if buffer is full
|
2020-07-22 09:41:40 +02:00
|
|
|
for _, c := range p.sentMessagesSubscriptions {
|
2020-07-06 10:54:22 +02:00
|
|
|
select {
|
|
|
|
case c <- sentMessage:
|
|
|
|
default:
|
2020-07-22 09:41:40 +02:00
|
|
|
p.logger.Warn("sent messages subscription channel full, dropping message")
|
2020-06-30 09:50:59 +02:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2019-09-02 11:29:06 +02:00
|
|
|
}
|
|
|
|
|
2020-07-22 09:41:40 +02:00
|
|
|
// SubscribeToScheduledMessages returns a channel where we publish every time a message is scheduled for sending
|
|
|
|
func (p *MessageProcessor) SubscribeToScheduledMessages() <-chan *RawMessage {
|
|
|
|
c := make(chan *RawMessage, 100)
|
|
|
|
p.scheduledMessagesSubscriptions = append(p.scheduledMessagesSubscriptions, c)
|
2020-07-06 10:54:22 +02:00
|
|
|
return c
|
|
|
|
}
|
|
|
|
|
2020-07-22 09:41:40 +02:00
|
|
|
func (p *MessageProcessor) notifyOnScheduledMessage(message *RawMessage) {
|
|
|
|
// Publish on channels, drop if buffer is full
|
|
|
|
for _, c := range p.scheduledMessagesSubscriptions {
|
|
|
|
select {
|
|
|
|
case c <- message:
|
|
|
|
default:
|
|
|
|
p.logger.Warn("scheduled messages subscription channel full, dropping message")
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-07-09 18:52:26 +02:00
|
|
|
func (p *MessageProcessor) JoinPublic(chatID string) error {
|
|
|
|
return p.transport.JoinPublic(chatID)
|
|
|
|
}
|
|
|
|
|
2020-07-22 09:41:40 +02:00
|
|
|
// AddEphemeralKey adds an ephemeral key that we will be listening to
|
|
|
|
// note that we never removed them from now, as waku/whisper does not
|
|
|
|
// recalculate topics on removal, so effectively there's no benefit.
|
|
|
|
// On restart they will be gone.
|
|
|
|
func (p *MessageProcessor) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
|
|
|
|
p.ephemeralKeysMutex.Lock()
|
2020-07-21 17:41:10 +02:00
|
|
|
p.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
|
2020-07-22 09:41:40 +02:00
|
|
|
p.ephemeralKeysMutex.Unlock()
|
2020-07-21 17:41:10 +02:00
|
|
|
return p.transport.LoadKeyFilters(privateKey)
|
|
|
|
}
|
|
|
|
|
2020-07-06 10:54:22 +02:00
|
|
|
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
|
2019-11-23 18:57:05 +01:00
|
|
|
var newMessage *types.NewMessage
|
2019-09-02 11:29:06 +02:00
|
|
|
|
|
|
|
payload, err := proto.Marshal(spec.Message)
|
|
|
|
if err != nil {
|
|
|
|
return newMessage, err
|
|
|
|
}
|
|
|
|
|
2019-11-23 18:57:05 +01:00
|
|
|
newMessage = &types.NewMessage{
|
2019-09-02 11:29:06 +02:00
|
|
|
TTL: whisperTTL,
|
|
|
|
Payload: payload,
|
2020-05-13 15:19:29 +02:00
|
|
|
PowTarget: calculatePoW(payload),
|
2019-09-02 11:29:06 +02:00
|
|
|
PowTime: whisperPoWTime,
|
|
|
|
}
|
|
|
|
return newMessage, nil
|
|
|
|
}
|
|
|
|
|
2020-05-13 15:19:29 +02:00
|
|
|
// calculatePoW returns the PoWTarget to be used.
|
2020-05-14 07:40:40 +02:00
|
|
|
// We check the size and arbitrarily set it to a lower PoW if the packet is
|
|
|
|
// greater than 50KB. We do this as the defaultPoW is too high for clients to send
|
2020-05-13 15:19:29 +02:00
|
|
|
// large messages.
|
|
|
|
func calculatePoW(payload []byte) float64 {
|
|
|
|
if len(payload) > largeSizeInBytes {
|
|
|
|
return whisperLargeSizePoW
|
|
|
|
}
|
|
|
|
return whisperDefaultPoW
|
|
|
|
}
|