status-go/protocol/common/message_sender.go

1407 lines
43 KiB
Go

package common
import (
"bytes"
"context"
"crypto/ecdsa"
"database/sql"
"math"
"sync"
"time"
"github.com/golang/protobuf/proto"
"github.com/jinzhu/copier"
"github.com/pkg/errors"
datasyncnode "github.com/vacp2p/mvds/node"
datasyncproto "github.com/vacp2p/mvds/protobuf"
"go.uber.org/zap"
"github.com/status-im/status-go/eth-node/crypto"
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/protocol/datasync"
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
"github.com/status-im/status-go/protocol/encryption"
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
"github.com/status-im/status-go/protocol/protobuf"
"github.com/status-im/status-go/protocol/transport"
v1protocol "github.com/status-im/status-go/protocol/v1"
)
// Whisper message properties.
const (
whisperTTL = 15
whisperDefaultPoW = 0.002
// whisperLargeSizePoW is the PoWTarget for larger payload sizes
whisperLargeSizePoW = 0.000002
// largeSizeInBytes is when should we be using a lower POW.
// Roughly this is 50KB
largeSizeInBytes = 50000
whisperPoWTime = 5
)
// RekeyCompatibility indicates whether we should be sending
// keys in 1-to-1 messages as well as in the newer format
var RekeyCompatibility = true
// SentMessage reprent a message that has been passed to the transport layer
type SentMessage struct {
PublicKey *ecdsa.PublicKey
Spec *encryption.ProtocolMessageSpec
MessageIDs [][]byte
}
type MessageEventType uint32
const (
MessageScheduled = iota + 1
MessageSent
)
type MessageEvent struct {
Recipient *ecdsa.PublicKey
Type MessageEventType
SentMessage *SentMessage
RawMessage *RawMessage
}
type MessageSender struct {
identity *ecdsa.PrivateKey
datasync *datasync.DataSync
database *sql.DB
protocol *encryption.Protocol
transport *transport.Transport
logger *zap.Logger
persistence *RawMessagesPersistence
datasyncEnabled bool
// ephemeralKeys is a map that contains the ephemeral keys of the client, used
// to decrypt messages
ephemeralKeys map[string]*ecdsa.PrivateKey
ephemeralKeysMutex sync.Mutex
// messageEventsSubscriptions contains all the subscriptions for message events
messageEventsSubscriptions []chan<- *MessageEvent
featureFlags FeatureFlags
// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
handleSharedSecrets func([]*sharedsecret.Secret) error
}
func NewMessageSender(
identity *ecdsa.PrivateKey,
database *sql.DB,
enc *encryption.Protocol,
transport *transport.Transport,
logger *zap.Logger,
features FeatureFlags,
) (*MessageSender, error) {
dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
logger,
)
if err != nil {
return nil, err
}
ds := datasync.New(dataSyncNode, dataSyncTransport, features.Datasync, logger)
p := &MessageSender{
identity: identity,
datasyncEnabled: features.Datasync,
datasync: ds,
protocol: enc,
database: database,
persistence: NewRawMessagesPersistence(database),
transport: transport,
logger: logger,
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
featureFlags: features,
}
// Initializing DataSync is required to encrypt and send messages.
// With DataSync enabled, messages are added to the DataSync
// but actual encrypt and send calls are postponed.
// sendDataSync is responsible for encrypting and sending postponed messages.
if features.Datasync {
// We set the max message size to 3/4 of the allowed message size, to leave
// room for encryption.
// Messages will be tried to send in any case, even if they exceed this
// value
ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger)
ds.Start(datasync.DatasyncTicker)
}
return p, nil
}
func (s *MessageSender) Stop() {
for _, c := range s.messageEventsSubscriptions {
close(c)
}
s.messageEventsSubscriptions = nil
s.datasync.Stop() // idempotent op
}
func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
s.handleSharedSecrets = handler
}
// SendPrivate takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a private message",
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
zap.String("site", "SendPrivate"),
)
// Currently we don't support sending through datasync and setting custom waku fields,
// as the datasync interface is not rich enough to propagate that information, so we
// would have to add some complexity to handle this.
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
}
// Set sender identity if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
return s.sendPrivate(ctx, recipient, rawMessage)
}
// SendCommunityMessage takes encoded data, encrypts it and sends through the wire
// using the community topic and their key
func (s *MessageSender) SendCommunityMessage(
ctx context.Context,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a community message",
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
zap.String("site", "SendCommunityMessage"),
)
rawMessage.Sender = s.identity
return s.sendCommunity(ctx, &rawMessage)
}
// SendPubsubTopicKey sends the protected topic key for a community to a list of recipients
func (s *MessageSender) SendPubsubTopicKey(
ctx context.Context,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending the protected topic key for a community",
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
zap.String("site", "SendPubsubTopicKey"),
)
rawMessage.Sender = s.identity
messageID, err := s.getMessageID(rawMessage)
if err != nil {
return nil, err
}
rawMessage.ID = types.EncodeHex(messageID)
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(nil, rawMessage)
// Send to each recipients
for _, recipient := range rawMessage.Recipients {
_, err = s.sendPrivate(ctx, recipient, rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message")
}
}
return messageID, nil
}
// SendGroup takes encoded data, encrypts it and sends through the wire,
// always return the messageID
func (s *MessageSender) SendGroup(
ctx context.Context,
recipients []*ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug(
"sending a private group message",
zap.String("site", "SendGroup"),
)
// Set sender if not specified
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
// Calculate messageID first and set on raw message
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
// We call it only once, and we nil the function after so it doesn't get called again
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
return nil, err
}
}
rawMessage.BeforeDispatch = nil
// Send to each recipients
for _, recipient := range recipients {
_, err = s.sendPrivate(ctx, recipient, &rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message")
}
}
return messageID, nil
}
func (s *MessageSender) getMessageID(rawMessage *RawMessage) (types.HexBytes, error) {
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
return messageID, nil
}
func ShouldCommunityMessageBeEncrypted(msgType protobuf.ApplicationMetadataMessage_Type) bool {
return msgType == protobuf.ApplicationMetadataMessage_CHAT_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_EDIT_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_DELETE_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_PIN_MESSAGE ||
msgType == protobuf.ApplicationMetadataMessage_EMOJI_REACTION
}
// sendCommunity sends a message that's to be sent in a community
// If it's a chat message, it will go to the respective topic derived by the
// chat id, if it's not a chat message, it will go to the community topic.
func (s *MessageSender) sendCommunity(
ctx context.Context,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(&rawMessage.Sender.PublicKey))))
// Set sender
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
messageID, err := s.getMessageID(rawMessage)
if err != nil {
return nil, err
}
rawMessage.ID = types.EncodeHex(messageID)
messageIDs := [][]byte{messageID}
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, err
}
}
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(nil, rawMessage)
var hashes [][]byte
var newMessages []*types.NewMessage
// Check if it's a key exchange message. In this case we send it
// to all the recipients
if rawMessage.CommunityKeyExMsgType != KeyExMsgNone {
forceRekey := rawMessage.CommunityKeyExMsgType == KeyExMsgRekey
// If rekeycompatibility is on, we always
// want to execute below, otherwise we execute
// only when we want to fill up old keys to a given user
if RekeyCompatibility || !forceRekey {
keyExMessageSpecs, err := s.protocol.GetKeyExMessageSpecs(rawMessage.HashRatchetGroupID, s.identity, rawMessage.Recipients, forceRekey)
if err != nil {
return nil, err
}
for i, spec := range keyExMessageSpecs {
recipient := rawMessage.Recipients[i]
_, _, err = s.sendMessageSpec(ctx, recipient, spec, messageIDs)
if err != nil {
return nil, err
}
}
}
if forceRekey {
var ratchet *encryption.HashRatchetKeyCompatibility
// We have just rekeyed, pull the latest
if RekeyCompatibility {
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
if err != nil {
return nil, err
}
}
// We send the message over the community topic
messages, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, ratchet)
if err != nil {
return nil, err
}
for _, m := range messages {
payload, err := proto.Marshal(m.Message)
if err != nil {
return nil, err
}
rawMessage.Payload = payload
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
}
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, payload)
rawMessage.ID = types.EncodeHex(messageID)
// notify before dispatching
s.notifyOnScheduledMessage(nil, rawMessage)
newMessages, err = s.segmentMessage(newMessage)
if err != nil {
return nil, err
}
for _, newMessage := range newMessages {
_, err = s.transport.SendPublic(ctx, newMessage, types.EncodeHex(rawMessage.CommunityID))
if err != nil {
return nil, err
}
}
}
}
return nil, nil
}
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, err
}
// If it's a chat message, we send it on the community chat topic
if ShouldCommunityMessageBeEncrypted(rawMessage.MessageType) {
messageSpec, err := s.protocol.BuildHashRatchetMessage(rawMessage.HashRatchetGroupID, wrappedMessage)
if err != nil {
return nil, err
}
payload, err := proto.Marshal(messageSpec.Message)
if err != nil {
return nil, errors.Wrap(err, "failed to marshal")
}
hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload)
if err != nil {
return nil, err
}
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: messageIDs,
}
s.notifyOnSentMessage(sentMessage)
} else {
payload := wrappedMessage
pubkey, err := crypto.DecompressPubkey(rawMessage.CommunityID)
if err != nil {
return nil, errors.Wrap(err, "failed to decompress pubkey")
}
hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs, rawMessage.PubsubTopic)
if err != nil {
s.logger.Error("failed to send a community message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
}
for i, newMessage := range newMessages {
s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hashes[i], newMessage)
}
return messageID, nil
}
// sendPrivate sends data to the recipient identifying with a given public key.
func (s *MessageSender) sendPrivate(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage *RawMessage,
) ([]byte, error) {
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
wrappedMessage, err := s.wrapMessageV1(rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
rawMessage.PubsubTopic = transport.DefaultShardPubsubTopic() // TODO: determine which pubsub topic should be used for 1:1 messages
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, err
}
}
// Notify before dispatching, otherwise the dispatch subscription might happen
// earlier than the scheduled
s.notifyOnScheduledMessage(recipient, rawMessage)
if s.featureFlags.Datasync && rawMessage.ResendAutomatically {
// No need to call transport tracking.
// It is done in a data sync dispatch step.
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to send message with datasync")
}
// We don't need to receive confirmations from our own devices
if !IsPubKeyEqual(recipient, &s.identity.PublicKey) {
confirmation := &RawMessageConfirmation{
DataSyncID: datasyncID,
MessageID: messageID,
PublicKey: crypto.CompressPubkey(recipient),
}
err = s.persistence.InsertPendingConfirmation(confirmation)
if err != nil {
return nil, err
}
}
} else if rawMessage.SkipEncryptionLayer {
// When SkipProtocolLayer is set we don't pass the message to the encryption layer
messageIDs := [][]byte{messageID}
hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs)
if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
for i, newMessage := range newMessages {
s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hashes[i], newMessage)
}
} else {
messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
// The shared secret needs to be handle before we send a message
// otherwise the topic might not be set up before we receive a message
if s.handleSharedSecrets != nil {
err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if err != nil {
return nil, err
}
}
messageIDs := [][]byte{messageID}
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec")
}
for i, newMessage := range newMessages {
s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hashes[i], newMessage)
}
}
return messageID, nil
}
// sendPairInstallation sends data to the recipients, using DH
func (s *MessageSender) SendPairInstallation(
ctx context.Context,
recipient *ecdsa.PublicKey,
rawMessage RawMessage,
) ([]byte, error) {
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to encrypt message")
}
messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage)
messageIDs := [][]byte{messageID}
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
if err != nil {
return nil, errors.Wrap(err, "failed to send a message spec")
}
for i, newMessage := range newMessages {
s.transport.Track(messageIDs, hashes[i], newMessage)
}
return messageID, nil
}
func (s *MessageSender) encodeMembershipUpdate(
message v1protocol.MembershipUpdateMessage,
chatEntity ChatEntity,
) ([]byte, error) {
if chatEntity != nil {
chatEntityProtobuf := chatEntity.GetProtobuf()
switch chatEntityProtobuf := chatEntityProtobuf.(type) {
case *protobuf.ChatMessage:
message.Message = chatEntityProtobuf
case *protobuf.EmojiReaction:
message.EmojiReaction = chatEntityProtobuf
}
}
encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message)
if err != nil {
return nil, errors.Wrap(err, "failed to encode membership update message")
}
return encodedMessage, nil
}
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// All the events in a group are encoded and added to the payload
func (s *MessageSender) EncodeMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
message := v1protocol.MembershipUpdateMessage{
ChatID: group.ChatID(),
Events: group.Events(),
}
return s.encodeMembershipUpdate(message, chatEntity)
}
// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
// Only the events relevant to the current group are encoded
func (s *MessageSender) EncodeAbridgedMembershipUpdate(
group *v1protocol.Group,
chatEntity ChatEntity,
) ([]byte, error) {
message := v1protocol.MembershipUpdateMessage{
ChatID: group.ChatID(),
Events: group.AbridgedEvents(),
}
return s.encodeMembershipUpdate(message, chatEntity)
}
func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: calculatePoW(wrappedMessage),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
return nil, nil, err
}
}
// notify before dispatching
s.notifyOnScheduledMessage(nil, rawMessage)
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
// SendPublic takes encoded data, encrypts it and sends through the wire.
func (s *MessageSender) SendPublic(
ctx context.Context,
chatName string,
rawMessage RawMessage,
) ([]byte, error) {
// Set sender
if rawMessage.Sender == nil {
rawMessage.Sender = s.identity
}
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
var newMessage *types.NewMessage
messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage)
if err != nil {
s.logger.Error("failed to send a public message", zap.Error(err))
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
}
if !rawMessage.SkipEncryptionLayer {
newMessage, err = MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, err
}
} else {
newMessage = &types.NewMessage{
TTL: whisperTTL,
Payload: wrappedMessage,
PowTarget: calculatePoW(wrappedMessage),
PowTime: whisperPoWTime,
}
}
newMessage.Ephemeral = rawMessage.Ephemeral
newMessage.PubsubTopic = rawMessage.PubsubTopic
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
rawMessage.ID = types.EncodeHex(messageID)
if rawMessage.BeforeDispatch != nil {
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
return nil, err
}
}
// notify before dispatching
s.notifyOnScheduledMessage(nil, &rawMessage)
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, err
}
hashes := make([][]byte, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, chatName)
if err != nil {
return nil, err
}
hashes = append(hashes, hash)
s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash)))
}
sentMessage := &SentMessage{
Spec: messageSpec,
MessageIDs: [][]byte{messageID},
}
s.notifyOnSentMessage(sentMessage)
for i, newMessage := range newMessages {
s.transport.Track([][]byte{messageID}, hashes[i], newMessage)
}
return messageID, nil
}
// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
// returns cloned messages with replaced payloads
func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataSync) ([]*v1protocol.StatusMessage, [][]byte, error) {
var statusMessages []*v1protocol.StatusMessage
payloads, acks, err := datasync.UnwrapPayloadsAndAcks(
m.SigPubKey(),
m.EncryptionLayer.Payload,
)
if err != nil {
return nil, nil, err
}
for _, payload := range payloads {
message, err := m.Clone()
if err != nil {
return nil, nil, err
}
message.EncryptionLayer.Payload = payload
statusMessages = append(statusMessages, message)
}
return statusMessages, acks, nil
}
// HandleMessages expects a whisper message as input, and it will go through
// a series of transformations until the message is parsed into an application
// layer message, or in case of Raw methods, the processing stops at the layer
// before.
// It returns an error only if the processing of required steps failed.
func (s *MessageSender) HandleMessages(wakuMessage *types.Message) ([]*v1protocol.StatusMessage, [][]byte, error) {
logger := s.logger.With(zap.String("site", "HandleMessages"))
hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String()))
var statusMessages []*v1protocol.StatusMessage
var acks [][]byte
response, err := s.handleMessage(wakuMessage)
if err != nil {
// Hash ratchet with a group id not found yet, save the message for future processing
if err == encryption.ErrHashRatchetGroupIDNotFound && len(response.Message.EncryptionLayer.HashRatchetInfo) == 1 {
info := response.Message.EncryptionLayer.HashRatchetInfo[0]
return nil, nil, s.persistence.SaveHashRatchetMessage(info.GroupID, info.KeyID, wakuMessage)
}
// The current message segment has been successfully retrieved.
// However, the collection of segments is not yet complete.
if err == ErrMessageSegmentsIncomplete {
return nil, nil, nil
}
return nil, nil, err
}
statusMessages = append(statusMessages, response.Messages()...)
acks = append(acks, response.DatasyncAcks...)
// Process queued hash ratchet messages
for _, hashRatchetInfo := range response.Message.EncryptionLayer.HashRatchetInfo {
messages, err := s.persistence.GetHashRatchetMessages(hashRatchetInfo.KeyID)
if err != nil {
return nil, nil, err
}
var processedIds [][]byte
for _, message := range messages {
response, err := s.handleMessage(message)
if err != nil {
hlogger.Debug("failed to handle hash ratchet message", zap.Error(err))
continue
}
statusMessages = append(statusMessages, response.Messages()...)
acks = append(acks, response.DatasyncAcks...)
processedIds = append(processedIds, message.Hash)
}
err = s.persistence.DeleteHashRatchetMessages(processedIds)
if err != nil {
s.logger.Warn("failed to delete hash ratchet messages", zap.Error(err))
return nil, nil, err
}
}
return statusMessages, acks, nil
}
type handleMessageResponse struct {
Message *v1protocol.StatusMessage
DatasyncMessages []*v1protocol.StatusMessage
DatasyncAcks [][]byte
}
func (h *handleMessageResponse) Messages() []*v1protocol.StatusMessage {
if len(h.DatasyncMessages) > 0 {
return h.DatasyncMessages
}
return []*v1protocol.StatusMessage{h.Message}
}
func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessageResponse, error) {
logger := s.logger.With(zap.String("site", "handleMessage"))
hlogger := logger.With(zap.ByteString("hash", wakuMessage.Hash))
response := &handleMessageResponse{
Message: &v1protocol.StatusMessage{},
DatasyncMessages: []*v1protocol.StatusMessage{},
DatasyncAcks: [][]byte{},
}
err := response.Message.HandleTransportLayer(wakuMessage)
if err != nil {
hlogger.Error("failed to handle transport layer message", zap.Error(err))
return nil, err
}
err = s.handleSegmentationLayer(response.Message)
if err != nil {
hlogger.Debug("failed to handle segmentation layer message", zap.Error(err))
// Segments not completed yet, stop processing
if err == ErrMessageSegmentsIncomplete {
return nil, err
}
// Segments already completed, stop processing
if err == ErrMessageSegmentsAlreadyCompleted {
return nil, err
}
}
err = s.handleEncryptionLayer(context.Background(), response.Message)
if err != nil {
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
// Hash ratchet with a group id not found yet, stop processing
if err == encryption.ErrHashRatchetGroupIDNotFound {
return response, err
}
}
datasyncMessages, as, err := unwrapDatasyncMessage(response.Message, s.datasync)
if err != nil {
hlogger.Debug("failed to handle datasync message", zap.Error(err))
} else {
response.DatasyncMessages = append(response.DatasyncMessages, datasyncMessages...)
response.DatasyncAcks = append(response.DatasyncAcks, as...)
}
for _, msg := range response.Messages() {
err := msg.HandleApplicationLayer()
if err != nil {
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
}
}
return response, nil
}
// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
s.ephemeralKeysMutex.Lock()
decryptionKey, ok := s.ephemeralKeys[destinationID]
s.ephemeralKeysMutex.Unlock()
// the key is not there, fallback on identity
if !ok {
return s.identity, false
}
return decryptionKey, true
}
func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleEncryptionLayer"))
publicKey := message.SigPubKey()
// if it's an ephemeral key, we don't negotiate a topic
decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.TransportLayer.Dst)
err := message.HandleEncryptionLayer(decryptionKey, publicKey, s.protocol, skipNegotiation)
// if it's an ephemeral key, we don't have to handle a device not found error
if err == encryption.ErrDeviceNotFound && !skipNegotiation {
if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil {
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
}
}
if err != nil {
logger.Error("failed to handle an encrypted message", zap.Error(err))
return err
}
return nil
}
func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
now := time.Now().Unix()
advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now)
if err != nil {
return err
}
if !advertise {
return nil
}
messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(ctx, time.Second)
defer cancel()
// We don't pass an array of messageIDs as no action needs to be taken
// when sending a bundle
_, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil)
if err != nil {
return err
}
s.protocol.ConfirmBundleAdvertisement(publicKey, now)
return nil
}
func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
if err != nil {
return nil, errors.Wrap(err, "failed to wrap message")
}
return wrappedMessage, nil
}
func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey)
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
exist, err := s.datasync.IsPeerInGroup(groupID, peerID)
if err != nil {
return nil, errors.Wrap(err, "failed to check if peer is in group")
}
if !exist {
if err := s.datasync.AddPeer(groupID, peerID); err != nil {
return nil, errors.Wrap(err, "failed to add peer")
}
}
id, err := s.datasync.AppendMessage(groupID, message)
if err != nil {
return nil, errors.Wrap(err, "failed to append message to datasync")
}
return id[:], nil
}
// sendDataSync sends a message scheduled by the data sync layer.
// Data Sync layer calls this method "dispatch" function.
func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.PublicKey, marshalledDatasyncPayload []byte, payload *datasyncproto.Payload) error {
// Calculate the messageIDs
messageIDs := make([][]byte, 0, len(payload.Messages))
hexMessageIDs := make([]string, 0, len(payload.Messages))
for _, payload := range payload.Messages {
mid := v1protocol.MessageID(&s.identity.PublicKey, payload.Body)
messageIDs = append(messageIDs, mid)
hexMessageIDs = append(hexMessageIDs, mid.String())
}
messageSpec, err := s.protocol.BuildEncryptedMessage(s.identity, publicKey, marshalledDatasyncPayload)
if err != nil {
return errors.Wrap(err, "failed to encrypt message")
}
// The shared secret needs to be handle before we send a message
// otherwise the topic might not be set up before we receive a message
if s.handleSharedSecrets != nil {
err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
if err != nil {
return err
}
}
hashes, newMessages, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
if err != nil {
s.logger.Error("failed to send a datasync message", zap.Error(err))
return err
}
for i, newMessage := range newMessages {
s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hashes[i], newMessage)
}
return nil
}
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic,
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
if rawMessage.SendOnPersonalTopic {
hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
} else {
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
// sendCommunityMessage sends a message not wrapped in an encryption layer
// to a community
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte, pubsubTopic string) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
PubsubTopic: pubsubTopic,
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
}
// sendMessageSpec analyses the spec properties and selects a proper transport method.
func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
logger := s.logger.With(zap.String("site", "sendMessageSpec"))
newMessage, err := MessageSpecToWhisper(messageSpec)
if err != nil {
return nil, nil, err
}
newMessages, err := s.segmentMessage(newMessage)
if err != nil {
return nil, nil, err
}
hashes := make([][]byte, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
// process shared secret
if messageSpec.AgreedSecret {
logger.Debug("sending using shared secret")
hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
} else {
logger.Debug("sending partitioned topic")
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
sentMessage := &SentMessage{
PublicKey: publicKey,
Spec: messageSpec,
MessageIDs: messageIDs,
}
s.notifyOnSentMessage(sentMessage)
return hashes, newMessages, nil
}
func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent {
c := make(chan *MessageEvent, 100)
s.messageEventsSubscriptions = append(s.messageEventsSubscriptions, c)
return c
}
func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) {
event := &MessageEvent{
Type: MessageSent,
SentMessage: sentMessage,
}
// Publish on channels, drop if buffer is full
for _, c := range s.messageEventsSubscriptions {
select {
case c <- event:
default:
s.logger.Warn("message events subscription channel full when publishing sent event, dropping message")
}
}
}
func (s *MessageSender) notifyOnScheduledMessage(recipient *ecdsa.PublicKey, message *RawMessage) {
event := &MessageEvent{
Recipient: recipient,
Type: MessageScheduled,
RawMessage: message,
}
// Publish on channels, drop if buffer is full
for _, c := range s.messageEventsSubscriptions {
select {
case c <- event:
default:
s.logger.Warn("message events subscription channel full when publishing scheduled event, dropping message")
}
}
}
func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) {
return s.transport.JoinPublic(id)
}
// AddEphemeralKey adds an ephemeral key that we will be listening to
// note that we never removed them from now, as waku/whisper does not
// recalculate topics on removal, so effectively there's no benefit.
// On restart they will be gone.
func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
s.ephemeralKeysMutex.Lock()
s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
s.ephemeralKeysMutex.Unlock()
return s.transport.LoadKeyFilters(privateKey)
}
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
var newMessage *types.NewMessage
payload, err := proto.Marshal(spec.Message)
if err != nil {
return newMessage, err
}
newMessage = &types.NewMessage{
TTL: whisperTTL,
Payload: payload,
PowTarget: calculatePoW(payload),
PowTime: whisperPoWTime,
}
return newMessage, nil
}
// calculatePoW returns the PoWTarget to be used.
// We check the size and arbitrarily set it to a lower PoW if the packet is
// greater than 50KB. We do this as the defaultPoW is too high for clients to send
// large messages.
func calculatePoW(payload []byte) float64 {
if len(payload) > largeSizeInBytes {
return whisperLargeSizePoW
}
return whisperDefaultPoW
}
func (s *MessageSender) StopDatasync() {
s.datasync.Stop()
}
func (s *MessageSender) StartDatasync() {
dataSyncTransport := datasync.NewNodeTransport()
dataSyncNode, err := datasyncnode.NewPersistentNode(
s.database,
dataSyncTransport,
datasyncpeer.PublicKeyToPeerID(s.identity.PublicKey),
datasyncnode.BATCH,
datasync.CalculateSendTime,
s.logger,
)
if err != nil {
return
}
ds := datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)
if s.datasyncEnabled {
ds.Init(s.sendDataSync, s.transport.MaxMessageSize()/4*3, s.logger)
ds.Start(datasync.DatasyncTicker)
}
s.datasync = ds
}
// GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group
func (s *MessageSender) GetCurrentKeyForGroup(groupID []byte) (*encryption.HashRatchetKeyCompatibility, error) {
return s.protocol.GetCurrentKeyForGroup(groupID)
}
// GetKeyIDsForGroup returns a slice of key IDs belonging to a given group ID
func (s *MessageSender) GetKeysForGroup(groupID []byte) ([]*encryption.HashRatchetKeyCompatibility, error) {
return s.protocol.GetKeysForGroup(groupID)
}
// Segments message into smaller chunks if the size exceeds the maximum allowed
func segmentMessage(newMessage *types.NewMessage, maxSegmentSize int) ([]*types.NewMessage, error) {
if len(newMessage.Payload) <= maxSegmentSize {
return []*types.NewMessage{newMessage}, nil
}
createSegment := func(chunkPayload []byte) (*types.NewMessage, error) {
copy := &types.NewMessage{}
err := copier.Copy(copy, newMessage)
if err != nil {
return nil, err
}
copy.Payload = chunkPayload
copy.PowTarget = calculatePoW(chunkPayload)
return copy, nil
}
entireMessageHash := crypto.Keccak256(newMessage.Payload)
payloadSize := len(newMessage.Payload)
segmentsCount := int(math.Ceil(float64(payloadSize) / float64(maxSegmentSize)))
var segmentMessages []*types.NewMessage
for start, index := 0, 0; start < payloadSize; start += maxSegmentSize {
end := start + maxSegmentSize
if end > payloadSize {
end = payloadSize
}
chunk := newMessage.Payload[start:end]
segmentMessageProto := &protobuf.SegmentMessage{
EntireMessageHash: entireMessageHash,
Index: uint32(index),
SegmentsCount: uint32(segmentsCount),
Payload: chunk,
}
chunkPayload, err := proto.Marshal(segmentMessageProto)
if err != nil {
return nil, err
}
segmentMessage, err := createSegment(chunkPayload)
if err != nil {
return nil, err
}
segmentMessages = append(segmentMessages, segmentMessage)
index++
}
return segmentMessages, nil
}
func (s *MessageSender) segmentMessage(newMessage *types.NewMessage) ([]*types.NewMessage, error) {
// We set the max message size to 3/4 of the allowed message size, to leave
// room for segment message metadata.
newMessages, err := segmentMessage(newMessage, int(s.transport.MaxMessageSize()/4*3))
s.logger.Debug("message segmented", zap.Int("segments", len(newMessages)))
return newMessages, err
}
var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete")
var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed")
var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count")
var ErrMessageSegmentsHashMismatch = errors.New("hash of entire payload does not match")
func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleSegmentationLayer"))
hlogger := logger.With(zap.String("hash", types.HexBytes(message.TransportLayer.Hash).String()))
var segmentMessage protobuf.SegmentMessage
err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage)
if err != nil {
return errors.Wrap(err, "failed to unmarshal SegmentMessage")
}
hlogger.Debug("handling message segment", zap.String("EntireMessageHash", types.HexBytes(segmentMessage.EntireMessageHash).String()),
zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount))
alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash)
if err != nil {
return err
}
if alreadyCompleted {
return ErrMessageSegmentsAlreadyCompleted
}
if segmentMessage.SegmentsCount < 2 {
return ErrMessageSegmentsInvalidCount
}
err = s.persistence.SaveMessageSegment(&segmentMessage, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
segments, err := s.persistence.GetMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey)
if err != nil {
return err
}
if len(segments) != int(segmentMessage.SegmentsCount) {
return ErrMessageSegmentsIncomplete
}
// Combine payload
var entirePayload bytes.Buffer
for _, segment := range segments {
_, err := entirePayload.Write(segment.Payload)
if err != nil {
return errors.Wrap(err, "failed to write segment payload")
}
}
// Sanity check
entirePayloadHash := crypto.Keccak256(entirePayload.Bytes())
if !bytes.Equal(entirePayloadHash, segmentMessage.EntireMessageHash) {
return ErrMessageSegmentsHashMismatch
}
err = s.persistence.CompleteMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey, time.Now().Unix())
if err != nil {
return err
}
message.TransportLayer.Payload = entirePayload.Bytes()
return nil
}
func (s *MessageSender) CleanupSegments() error {
weekAgo := time.Now().AddDate(0, 0, -7).Unix()
monthAgo := time.Now().AddDate(0, -1, 0).Unix()
err := s.persistence.RemoveMessageSegmentsOlderThan(weekAgo)
if err != nil {
return err
}
err = s.persistence.RemoveMessageSegmentsCompletedOlderThan(monthAgo)
if err != nil {
return err
}
return nil
}