mirror of
https://github.com/status-im/status-go.git
synced 2025-01-18 10:42:07 +00:00
605fe40e32
This commit fixes a few issues with communities encryption: Key distribution was disconnected from the community description, this created a case where the key would arrive after the community description and that would result in the client thinking that it was kicked. To overcome this, we added a message that signals the user that is kicked. Also, we distribute the key with the community description so that there's no more issues with timing. This is a bit expensive for large communities, and it will require some further optimizations. Key distribution is now also connected to the request to join response, so there are no timing issues. Fixes an issue with key distribution (race condition) where the community would be modified before being compared, resulting in a comparison of two identical communities, which would result in no key being distributed. This commit only partially address the issue.
1421 lines
43 KiB
Go
1421 lines
43 KiB
Go
package common
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"crypto/ecdsa"
|
|
"database/sql"
|
|
"math"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/jinzhu/copier"
|
|
"github.com/pkg/errors"
|
|
datasyncnode "github.com/status-im/mvds/node"
|
|
datasyncproto "github.com/status-im/mvds/protobuf"
|
|
"github.com/status-im/mvds/state"
|
|
"go.uber.org/zap"
|
|
|
|
"github.com/status-im/status-go/eth-node/crypto"
|
|
"github.com/status-im/status-go/eth-node/types"
|
|
"github.com/status-im/status-go/protocol/datasync"
|
|
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
|
|
"github.com/status-im/status-go/protocol/encryption"
|
|
"github.com/status-im/status-go/protocol/encryption/sharedsecret"
|
|
"github.com/status-im/status-go/protocol/protobuf"
|
|
"github.com/status-im/status-go/protocol/transport"
|
|
v1protocol "github.com/status-im/status-go/protocol/v1"
|
|
)
|
|
|
|
// Whisper message properties.
|
|
const (
|
|
whisperTTL = 15
|
|
whisperDefaultPoW = 0.002
|
|
// whisperLargeSizePoW is the PoWTarget for larger payload sizes
|
|
whisperLargeSizePoW = 0.000002
|
|
// largeSizeInBytes is when should we be using a lower POW.
|
|
// Roughly this is 50KB
|
|
largeSizeInBytes = 50000
|
|
whisperPoWTime = 5
|
|
)
|
|
|
|
// RekeyCompatibility indicates whether we should be sending
|
|
// keys in 1-to-1 messages as well as in the newer format
|
|
var RekeyCompatibility = true
|
|
|
|
// SentMessage reprent a message that has been passed to the transport layer
|
|
type SentMessage struct {
|
|
PublicKey *ecdsa.PublicKey
|
|
Spec *encryption.ProtocolMessageSpec
|
|
MessageIDs [][]byte
|
|
}
|
|
|
|
type MessageEventType uint32
|
|
|
|
const (
|
|
MessageScheduled = iota + 1
|
|
MessageSent
|
|
)
|
|
|
|
type MessageEvent struct {
|
|
Recipient *ecdsa.PublicKey
|
|
Type MessageEventType
|
|
SentMessage *SentMessage
|
|
RawMessage *RawMessage
|
|
}
|
|
|
|
type MessageSender struct {
|
|
identity *ecdsa.PrivateKey
|
|
datasync *datasync.DataSync
|
|
database *sql.DB
|
|
protocol *encryption.Protocol
|
|
transport *transport.Transport
|
|
logger *zap.Logger
|
|
persistence *RawMessagesPersistence
|
|
|
|
datasyncEnabled bool
|
|
|
|
// ephemeralKeys is a map that contains the ephemeral keys of the client, used
|
|
// to decrypt messages
|
|
ephemeralKeys map[string]*ecdsa.PrivateKey
|
|
ephemeralKeysMutex sync.Mutex
|
|
|
|
// messageEventsSubscriptions contains all the subscriptions for message events
|
|
messageEventsSubscriptions []chan<- *MessageEvent
|
|
|
|
featureFlags FeatureFlags
|
|
|
|
// handleSharedSecrets is a callback that is called every time a new shared secret is negotiated
|
|
handleSharedSecrets func([]*sharedsecret.Secret) error
|
|
}
|
|
|
|
func NewMessageSender(
|
|
identity *ecdsa.PrivateKey,
|
|
database *sql.DB,
|
|
enc *encryption.Protocol,
|
|
transport *transport.Transport,
|
|
logger *zap.Logger,
|
|
features FeatureFlags,
|
|
) (*MessageSender, error) {
|
|
p := &MessageSender{
|
|
identity: identity,
|
|
datasyncEnabled: features.Datasync,
|
|
protocol: enc,
|
|
database: database,
|
|
persistence: NewRawMessagesPersistence(database),
|
|
transport: transport,
|
|
logger: logger,
|
|
ephemeralKeys: make(map[string]*ecdsa.PrivateKey),
|
|
featureFlags: features,
|
|
}
|
|
|
|
return p, nil
|
|
}
|
|
|
|
func (s *MessageSender) Stop() {
|
|
for _, c := range s.messageEventsSubscriptions {
|
|
close(c)
|
|
}
|
|
s.messageEventsSubscriptions = nil
|
|
s.StopDatasync()
|
|
}
|
|
|
|
func (s *MessageSender) SetHandleSharedSecrets(handler func([]*sharedsecret.Secret) error) {
|
|
s.handleSharedSecrets = handler
|
|
}
|
|
|
|
func (s *MessageSender) StartDatasync(handler func(peer state.PeerID, payload *datasyncproto.Payload) error) error {
|
|
if !s.datasyncEnabled {
|
|
return nil
|
|
}
|
|
|
|
dataSyncTransport := datasync.NewNodeTransport()
|
|
dataSyncNode, err := datasyncnode.NewPersistentNode(
|
|
s.database,
|
|
dataSyncTransport,
|
|
datasyncpeer.PublicKeyToPeerID(s.identity.PublicKey),
|
|
datasyncnode.BATCH,
|
|
datasync.CalculateSendTime,
|
|
s.logger,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.datasync = datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)
|
|
|
|
s.datasync.Init(handler, s.logger)
|
|
s.datasync.Start(datasync.DatasyncTicker)
|
|
|
|
return nil
|
|
}
|
|
|
|
// SendPrivate takes encoded data, encrypts it and sends through the wire.
|
|
func (s *MessageSender) SendPrivate(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage *RawMessage,
|
|
) ([]byte, error) {
|
|
s.logger.Debug(
|
|
"sending a private message",
|
|
zap.String("public-key", types.EncodeHex(crypto.FromECDSAPub(recipient))),
|
|
zap.String("site", "SendPrivate"),
|
|
)
|
|
// Currently we don't support sending through datasync and setting custom waku fields,
|
|
// as the datasync interface is not rich enough to propagate that information, so we
|
|
// would have to add some complexity to handle this.
|
|
if rawMessage.ResendAutomatically && (rawMessage.Sender != nil || rawMessage.SkipEncryptionLayer || rawMessage.SendOnPersonalTopic) {
|
|
return nil, errors.New("setting identity, skip-encryption or personal topic and datasync not supported")
|
|
}
|
|
|
|
// Set sender identity if not specified
|
|
if rawMessage.Sender == nil {
|
|
rawMessage.Sender = s.identity
|
|
}
|
|
|
|
return s.sendPrivate(ctx, recipient, rawMessage)
|
|
}
|
|
|
|
// SendCommunityMessage takes encoded data, encrypts it and sends through the wire
|
|
// using the community topic and their key
|
|
func (s *MessageSender) SendCommunityMessage(
|
|
ctx context.Context,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
s.logger.Debug(
|
|
"sending a community message",
|
|
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
|
|
zap.String("site", "SendCommunityMessage"),
|
|
)
|
|
rawMessage.Sender = s.identity
|
|
|
|
return s.sendCommunity(ctx, &rawMessage)
|
|
}
|
|
|
|
// SendPubsubTopicKey sends the protected topic key for a community to a list of recipients
|
|
func (s *MessageSender) SendPubsubTopicKey(
|
|
ctx context.Context,
|
|
rawMessage *RawMessage,
|
|
) ([]byte, error) {
|
|
s.logger.Debug(
|
|
"sending the protected topic key for a community",
|
|
zap.String("communityId", types.EncodeHex(rawMessage.CommunityID)),
|
|
zap.String("site", "SendPubsubTopicKey"),
|
|
)
|
|
rawMessage.Sender = s.identity
|
|
messageID, err := s.getMessageID(rawMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
// Notify before dispatching, otherwise the dispatch subscription might happen
|
|
// earlier than the scheduled
|
|
s.notifyOnScheduledMessage(nil, rawMessage)
|
|
|
|
// Send to each recipients
|
|
for _, recipient := range rawMessage.Recipients {
|
|
_, err = s.sendPrivate(ctx, recipient, rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to send message")
|
|
}
|
|
}
|
|
return messageID, nil
|
|
|
|
}
|
|
|
|
// SendGroup takes encoded data, encrypts it and sends through the wire,
|
|
// always return the messageID
|
|
func (s *MessageSender) SendGroup(
|
|
ctx context.Context,
|
|
recipients []*ecdsa.PublicKey,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
s.logger.Debug(
|
|
"sending a private group message",
|
|
zap.String("site", "SendGroup"),
|
|
)
|
|
// Set sender if not specified
|
|
if rawMessage.Sender == nil {
|
|
rawMessage.Sender = s.identity
|
|
}
|
|
|
|
// Calculate messageID first and set on raw message
|
|
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
// We call it only once, and we nil the function after so it doesn't get called again
|
|
if rawMessage.BeforeDispatch != nil {
|
|
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Send to each recipients
|
|
for _, recipient := range recipients {
|
|
_, err = s.sendPrivate(ctx, recipient, &rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to send message")
|
|
}
|
|
}
|
|
return messageID, nil
|
|
}
|
|
|
|
func (s *MessageSender) getMessageID(rawMessage *RawMessage) (types.HexBytes, error) {
|
|
wrappedMessage, err := s.wrapMessageV1(rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
func ShouldCommunityMessageBeEncrypted(msgType protobuf.ApplicationMetadataMessage_Type) bool {
|
|
return msgType == protobuf.ApplicationMetadataMessage_CHAT_MESSAGE ||
|
|
msgType == protobuf.ApplicationMetadataMessage_EDIT_MESSAGE ||
|
|
msgType == protobuf.ApplicationMetadataMessage_DELETE_MESSAGE ||
|
|
msgType == protobuf.ApplicationMetadataMessage_PIN_MESSAGE ||
|
|
msgType == protobuf.ApplicationMetadataMessage_EMOJI_REACTION
|
|
}
|
|
|
|
// sendCommunity sends a message that's to be sent in a community
|
|
// If it's a chat message, it will go to the respective topic derived by the
|
|
// chat id, if it's not a chat message, it will go to the community topic.
|
|
func (s *MessageSender) sendCommunity(
|
|
ctx context.Context,
|
|
rawMessage *RawMessage,
|
|
) ([]byte, error) {
|
|
s.logger.Debug("sending community message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(&rawMessage.Sender.PublicKey))))
|
|
|
|
// Set sender
|
|
if rawMessage.Sender == nil {
|
|
rawMessage.Sender = s.identity
|
|
}
|
|
|
|
messageID, err := s.getMessageID(rawMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
if rawMessage.BeforeDispatch != nil {
|
|
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
// Notify before dispatching, otherwise the dispatch subscription might happen
|
|
// earlier than the scheduled
|
|
s.notifyOnScheduledMessage(nil, rawMessage)
|
|
|
|
var hashes [][]byte
|
|
var newMessages []*types.NewMessage
|
|
|
|
forceRekey := rawMessage.CommunityKeyExMsgType == KeyExMsgRekey
|
|
|
|
// Check if it's a key exchange message. In this case we send it
|
|
// to all the recipients
|
|
if rawMessage.CommunityKeyExMsgType != KeyExMsgNone {
|
|
// If rekeycompatibility is on, we always
|
|
// want to execute below, otherwise we execute
|
|
// only when we want to fill up old keys to a given user
|
|
if RekeyCompatibility || !forceRekey {
|
|
keyExMessageSpecs, err := s.protocol.GetKeyExMessageSpecs(rawMessage.HashRatchetGroupID, s.identity, rawMessage.Recipients, forceRekey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for i, spec := range keyExMessageSpecs {
|
|
recipient := rawMessage.Recipients[i]
|
|
_, _, err = s.sendMessageSpec(ctx, recipient, spec, [][]byte{messageID})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
wrappedMessage, err := s.wrapMessageV1(rawMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// If it's a chat message, we send it on the community chat topic
|
|
if ShouldCommunityMessageBeEncrypted(rawMessage.MessageType) {
|
|
messageSpec, err := s.protocol.BuildHashRatchetMessage(rawMessage.HashRatchetGroupID, wrappedMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
payload, err := proto.Marshal(messageSpec.Message)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to marshal")
|
|
}
|
|
hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload, forceRekey)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
sentMessage := &SentMessage{
|
|
Spec: messageSpec,
|
|
MessageIDs: [][]byte{messageID},
|
|
}
|
|
|
|
s.notifyOnSentMessage(sentMessage)
|
|
|
|
} else {
|
|
|
|
pubkey, err := crypto.DecompressPubkey(rawMessage.CommunityID)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to decompress pubkey")
|
|
}
|
|
hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, wrappedMessage, rawMessage.PubsubTopic, forceRekey, rawMessage)
|
|
if err != nil {
|
|
s.logger.Error("failed to send a community message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
}
|
|
|
|
s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
|
|
s.transport.Track(messageID, hashes, newMessages)
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
// sendPrivate sends data to the recipient identifying with a given public key.
|
|
func (s *MessageSender) sendPrivate(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage *RawMessage,
|
|
) ([]byte, error) {
|
|
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
|
|
|
var wrappedMessage []byte
|
|
var err error
|
|
if rawMessage.SkipApplicationWrap {
|
|
wrappedMessage = rawMessage.Payload
|
|
} else {
|
|
wrappedMessage, err = s.wrapMessageV1(rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
}
|
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
if rawMessage.BeforeDispatch != nil {
|
|
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// Notify before dispatching, otherwise the dispatch subscription might happen
|
|
// earlier than the scheduled
|
|
s.notifyOnScheduledMessage(recipient, rawMessage)
|
|
|
|
if s.datasync != nil && s.featureFlags.Datasync && rawMessage.ResendAutomatically {
|
|
// No need to call transport tracking.
|
|
// It is done in a data sync dispatch step.
|
|
datasyncID, err := s.addToDataSync(recipient, wrappedMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to send message with datasync")
|
|
}
|
|
// We don't need to receive confirmations from our own devices
|
|
if !IsPubKeyEqual(recipient, &s.identity.PublicKey) {
|
|
confirmation := &RawMessageConfirmation{
|
|
DataSyncID: datasyncID,
|
|
MessageID: messageID,
|
|
PublicKey: crypto.CompressPubkey(recipient),
|
|
}
|
|
|
|
err = s.persistence.InsertPendingConfirmation(confirmation)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
} else if rawMessage.SkipEncryptionLayer {
|
|
|
|
messageBytes := wrappedMessage
|
|
if rawMessage.CommunityKeyExMsgType == KeyExMsgReuse {
|
|
groupID := rawMessage.HashRatchetGroupID
|
|
|
|
ratchets, err := s.protocol.GetKeysForGroup(groupID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
message, err := s.protocol.BuildHashRatchetKeyExchangeMessageWithPayload(s.identity, recipient, groupID, ratchets, wrappedMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
messageBytes, err = proto.Marshal(message.Message)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// When SkipProtocolLayer is set we don't pass the message to the encryption layer
|
|
hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, messageBytes)
|
|
if err != nil {
|
|
s.logger.Error("failed to send a private message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
|
|
s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
|
|
s.transport.Track(messageID, hashes, newMessages)
|
|
|
|
} else {
|
|
messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encrypt message")
|
|
}
|
|
|
|
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
|
|
if err != nil {
|
|
s.logger.Error("failed to send a private message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
|
|
s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
|
|
s.transport.Track(messageID, hashes, newMessages)
|
|
}
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
// sendPairInstallation sends data to the recipients, using DH
|
|
func (s *MessageSender) SendPairInstallation(
|
|
ctx context.Context,
|
|
recipient *ecdsa.PublicKey,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
s.logger.Debug("sending private message", zap.String("recipient", types.EncodeHex(crypto.FromECDSAPub(recipient))))
|
|
|
|
wrappedMessage, err := s.wrapMessageV1(&rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
|
|
messageSpec, err := s.protocol.BuildDHMessage(s.identity, recipient, wrappedMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encrypt message")
|
|
}
|
|
|
|
messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage)
|
|
|
|
hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, [][]byte{messageID})
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to send a message spec")
|
|
}
|
|
|
|
s.transport.Track(messageID, hashes, newMessages)
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
func (s *MessageSender) encodeMembershipUpdate(
|
|
message v1protocol.MembershipUpdateMessage,
|
|
chatEntity ChatEntity,
|
|
) ([]byte, error) {
|
|
|
|
if chatEntity != nil {
|
|
chatEntityProtobuf := chatEntity.GetProtobuf()
|
|
switch chatEntityProtobuf := chatEntityProtobuf.(type) {
|
|
case *protobuf.ChatMessage:
|
|
message.Message = chatEntityProtobuf
|
|
case *protobuf.EmojiReaction:
|
|
message.EmojiReaction = chatEntityProtobuf
|
|
|
|
}
|
|
}
|
|
|
|
encodedMessage, err := v1protocol.EncodeMembershipUpdateMessage(message)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to encode membership update message")
|
|
}
|
|
|
|
return encodedMessage, nil
|
|
}
|
|
|
|
// EncodeMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
|
|
// All the events in a group are encoded and added to the payload
|
|
func (s *MessageSender) EncodeMembershipUpdate(
|
|
group *v1protocol.Group,
|
|
chatEntity ChatEntity,
|
|
) ([]byte, error) {
|
|
message := v1protocol.MembershipUpdateMessage{
|
|
ChatID: group.ChatID(),
|
|
Events: group.Events(),
|
|
}
|
|
|
|
return s.encodeMembershipUpdate(message, chatEntity)
|
|
}
|
|
|
|
// EncodeAbridgedMembershipUpdate takes a group and an optional chat message and returns the protobuf representation to be sent on the wire.
|
|
// Only the events relevant to the current group are encoded
|
|
func (s *MessageSender) EncodeAbridgedMembershipUpdate(
|
|
group *v1protocol.Group,
|
|
chatEntity ChatEntity,
|
|
) ([]byte, error) {
|
|
message := v1protocol.MembershipUpdateMessage{
|
|
ChatID: group.ChatID(),
|
|
Events: group.AbridgedEvents(),
|
|
}
|
|
return s.encodeMembershipUpdate(message, chatEntity)
|
|
}
|
|
|
|
func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte, rekey bool) ([][]byte, []*types.NewMessage, error) {
|
|
payload := wrappedMessage
|
|
var err error
|
|
if rekey && len(rawMessage.HashRatchetGroupID) != 0 {
|
|
|
|
var ratchet *encryption.HashRatchetKeyCompatibility
|
|
// We have just rekeyed, pull the latest
|
|
if RekeyCompatibility {
|
|
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
}
|
|
// We send the message over the community topic
|
|
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
payload, err = proto.Marshal(spec.Message)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
newMessage := &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: payload,
|
|
PowTarget: calculatePoW(payload),
|
|
PowTime: whisperPoWTime,
|
|
PubsubTopic: rawMessage.PubsubTopic,
|
|
}
|
|
|
|
if rawMessage.BeforeDispatch != nil {
|
|
if err := rawMessage.BeforeDispatch(rawMessage); err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
// notify before dispatching
|
|
s.notifyOnScheduledMessage(nil, rawMessage)
|
|
|
|
newMessages, err := s.segmentMessage(newMessage)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
hashes := make([][]byte, 0, len(newMessages))
|
|
for _, newMessage := range newMessages {
|
|
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
|
|
return hashes, newMessages, nil
|
|
}
|
|
|
|
// SendPublic takes encoded data, encrypts it and sends through the wire.
|
|
func (s *MessageSender) SendPublic(
|
|
ctx context.Context,
|
|
chatName string,
|
|
rawMessage RawMessage,
|
|
) ([]byte, error) {
|
|
// Set sender
|
|
if rawMessage.Sender == nil {
|
|
rawMessage.Sender = s.identity
|
|
}
|
|
|
|
var wrappedMessage []byte
|
|
var err error
|
|
if rawMessage.SkipApplicationWrap {
|
|
wrappedMessage = rawMessage.Payload
|
|
} else {
|
|
wrappedMessage, err = s.wrapMessageV1(&rawMessage)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
}
|
|
|
|
var newMessage *types.NewMessage
|
|
|
|
messageSpec, err := s.protocol.BuildPublicMessage(s.identity, wrappedMessage)
|
|
if err != nil {
|
|
s.logger.Error("failed to send a public message", zap.Error(err))
|
|
return nil, errors.Wrap(err, "failed to wrap a public message in the encryption layer")
|
|
}
|
|
|
|
if len(rawMessage.HashRatchetGroupID) != 0 {
|
|
|
|
var ratchet *encryption.HashRatchetKeyCompatibility
|
|
var err error
|
|
// We have just rekeyed, pull the latest
|
|
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
keyID, err := ratchet.GetKeyID()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
|
|
// We send the message over the community topic
|
|
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
newMessage, err = MessageSpecToWhisper(spec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
} else if !rawMessage.SkipEncryptionLayer {
|
|
newMessage, err = MessageSpecToWhisper(messageSpec)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
newMessage = &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: wrappedMessage,
|
|
PowTarget: calculatePoW(wrappedMessage),
|
|
PowTime: whisperPoWTime,
|
|
}
|
|
}
|
|
|
|
newMessage.Ephemeral = rawMessage.Ephemeral
|
|
newMessage.PubsubTopic = rawMessage.PubsubTopic
|
|
|
|
messageID := v1protocol.MessageID(&rawMessage.Sender.PublicKey, wrappedMessage)
|
|
rawMessage.ID = types.EncodeHex(messageID)
|
|
|
|
if rawMessage.BeforeDispatch != nil {
|
|
if err := rawMessage.BeforeDispatch(&rawMessage); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
// notify before dispatching
|
|
s.notifyOnScheduledMessage(nil, &rawMessage)
|
|
|
|
newMessages, err := s.segmentMessage(newMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
hashes := make([][]byte, 0, len(newMessages))
|
|
for _, newMessage := range newMessages {
|
|
hash, err := s.transport.SendPublic(ctx, newMessage, chatName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
|
|
sentMessage := &SentMessage{
|
|
Spec: messageSpec,
|
|
MessageIDs: [][]byte{messageID},
|
|
}
|
|
|
|
s.notifyOnSentMessage(sentMessage)
|
|
|
|
s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.Strings("hashes", types.EncodeHexes(hashes)))
|
|
s.transport.Track(messageID, hashes, newMessages)
|
|
|
|
return messageID, nil
|
|
}
|
|
|
|
// unwrapDatasyncMessage tries to unwrap message as datasync one and in case of success
|
|
// returns cloned messages with replaced payloads
|
|
func (s *MessageSender) unwrapDatasyncMessage(m *v1protocol.StatusMessage, response *handleMessageResponse) error {
|
|
|
|
datasyncMessage, err := s.datasync.Unwrap(
|
|
m.SigPubKey(),
|
|
m.EncryptionLayer.Payload,
|
|
)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
response.DatasyncSender = m.SigPubKey()
|
|
response.DatasyncAcks = append(response.DatasyncAcks, datasyncMessage.Acks...)
|
|
response.DatasyncRequests = append(response.DatasyncRequests, datasyncMessage.Requests...)
|
|
for _, o := range datasyncMessage.GroupOffers {
|
|
for _, mID := range o.MessageIds {
|
|
response.DatasyncOffers = append(response.DatasyncOffers, DatasyncOffer{GroupID: o.GroupId, MessageID: mID})
|
|
}
|
|
}
|
|
|
|
for _, ds := range datasyncMessage.Messages {
|
|
message, err := m.Clone()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
message.EncryptionLayer.Payload = ds.Body
|
|
response.DatasyncMessages = append(response.DatasyncMessages, message)
|
|
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// HandleMessages expects a whisper message as input, and it will go through
|
|
// a series of transformations until the message is parsed into an application
|
|
// layer message, or in case of Raw methods, the processing stops at the layer
|
|
// before.
|
|
// It returns an error only if the processing of required steps failed.
|
|
func (s *MessageSender) HandleMessages(wakuMessage *types.Message) (*HandleMessageResponse, error) {
|
|
logger := s.logger.With(zap.String("site", "HandleMessages"))
|
|
hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String()))
|
|
|
|
response, err := s.handleMessage(wakuMessage)
|
|
if err != nil {
|
|
// Hash ratchet with a group id not found yet, save the message for future processing
|
|
if err == encryption.ErrHashRatchetGroupIDNotFound && len(response.Message.EncryptionLayer.HashRatchetInfo) == 1 {
|
|
info := response.Message.EncryptionLayer.HashRatchetInfo[0]
|
|
return nil, s.persistence.SaveHashRatchetMessage(info.GroupID, info.KeyID, wakuMessage)
|
|
}
|
|
|
|
// The current message segment has been successfully retrieved.
|
|
// However, the collection of segments is not yet complete.
|
|
if err == ErrMessageSegmentsIncomplete {
|
|
return nil, nil
|
|
}
|
|
|
|
return nil, err
|
|
}
|
|
|
|
// Process queued hash ratchet messages
|
|
for _, hashRatchetInfo := range response.Message.EncryptionLayer.HashRatchetInfo {
|
|
messages, err := s.persistence.GetHashRatchetMessages(hashRatchetInfo.KeyID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var processedIds [][]byte
|
|
for _, message := range messages {
|
|
hlogger.Info("handling out of order encrypted messages", zap.String("hash", types.Bytes2Hex(message.Hash)))
|
|
r, err := s.handleMessage(message)
|
|
if err != nil {
|
|
hlogger.Debug("failed to handle hash ratchet message", zap.Error(err))
|
|
continue
|
|
}
|
|
response.DatasyncMessages = append(response.toPublicResponse().StatusMessages, r.Messages()...)
|
|
response.DatasyncAcks = append(response.DatasyncAcks, r.DatasyncAcks...)
|
|
|
|
processedIds = append(processedIds, message.Hash)
|
|
}
|
|
|
|
err = s.persistence.DeleteHashRatchetMessages(processedIds)
|
|
if err != nil {
|
|
s.logger.Warn("failed to delete hash ratchet messages", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return response.toPublicResponse(), nil
|
|
}
|
|
|
|
type DatasyncOffer struct {
|
|
GroupID []byte
|
|
MessageID []byte
|
|
}
|
|
|
|
type HandleMessageResponse struct {
|
|
Hash []byte
|
|
StatusMessages []*v1protocol.StatusMessage
|
|
DatasyncSender *ecdsa.PublicKey
|
|
DatasyncAcks [][]byte
|
|
DatasyncOffers []DatasyncOffer
|
|
DatasyncRequests [][]byte
|
|
}
|
|
|
|
func (h *handleMessageResponse) toPublicResponse() *HandleMessageResponse {
|
|
return &HandleMessageResponse{
|
|
Hash: h.Hash,
|
|
StatusMessages: h.Messages(),
|
|
DatasyncSender: h.DatasyncSender,
|
|
DatasyncAcks: h.DatasyncAcks,
|
|
DatasyncOffers: h.DatasyncOffers,
|
|
DatasyncRequests: h.DatasyncRequests,
|
|
}
|
|
}
|
|
|
|
type handleMessageResponse struct {
|
|
Hash []byte
|
|
Message *v1protocol.StatusMessage
|
|
DatasyncMessages []*v1protocol.StatusMessage
|
|
DatasyncSender *ecdsa.PublicKey
|
|
DatasyncAcks [][]byte
|
|
DatasyncOffers []DatasyncOffer
|
|
DatasyncRequests [][]byte
|
|
}
|
|
|
|
func (h *handleMessageResponse) Messages() []*v1protocol.StatusMessage {
|
|
if len(h.DatasyncMessages) > 0 {
|
|
return h.DatasyncMessages
|
|
}
|
|
return []*v1protocol.StatusMessage{h.Message}
|
|
}
|
|
|
|
func (s *MessageSender) handleMessage(wakuMessage *types.Message) (*handleMessageResponse, error) {
|
|
logger := s.logger.With(zap.String("site", "handleMessage"))
|
|
hlogger := logger.With(zap.String("hash", types.EncodeHex(wakuMessage.Hash)))
|
|
|
|
message := &v1protocol.StatusMessage{}
|
|
|
|
response := &handleMessageResponse{
|
|
Hash: wakuMessage.Hash,
|
|
Message: message,
|
|
DatasyncMessages: []*v1protocol.StatusMessage{},
|
|
DatasyncAcks: [][]byte{},
|
|
}
|
|
|
|
err := message.HandleTransportLayer(wakuMessage)
|
|
if err != nil {
|
|
hlogger.Error("failed to handle transport layer message", zap.Error(err))
|
|
return nil, err
|
|
}
|
|
|
|
err = s.handleSegmentationLayer(message)
|
|
if err != nil {
|
|
hlogger.Debug("failed to handle segmentation layer message", zap.Error(err))
|
|
|
|
// Segments not completed yet, stop processing
|
|
if err == ErrMessageSegmentsIncomplete {
|
|
return nil, err
|
|
}
|
|
// Segments already completed, stop processing
|
|
if err == ErrMessageSegmentsAlreadyCompleted {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
err = s.handleEncryptionLayer(context.Background(), message)
|
|
if err != nil {
|
|
hlogger.Debug("failed to handle an encryption message", zap.Error(err))
|
|
|
|
// Hash ratchet with a group id not found yet, stop processing
|
|
if err == encryption.ErrHashRatchetGroupIDNotFound {
|
|
return response, err
|
|
}
|
|
}
|
|
|
|
if s.datasync != nil && s.datasyncEnabled {
|
|
err := s.unwrapDatasyncMessage(message, response)
|
|
if err != nil {
|
|
hlogger.Debug("failed to handle datasync message", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
for _, msg := range response.Messages() {
|
|
err := msg.HandleApplicationLayer()
|
|
if err != nil {
|
|
hlogger.Error("failed to handle application metadata layer message", zap.Error(err))
|
|
}
|
|
}
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// fetchDecryptionKey returns the private key associated with this public key, and returns true if it's an ephemeral key
|
|
func (s *MessageSender) fetchDecryptionKey(destination *ecdsa.PublicKey) (*ecdsa.PrivateKey, bool) {
|
|
destinationID := types.EncodeHex(crypto.FromECDSAPub(destination))
|
|
|
|
s.ephemeralKeysMutex.Lock()
|
|
decryptionKey, ok := s.ephemeralKeys[destinationID]
|
|
s.ephemeralKeysMutex.Unlock()
|
|
|
|
// the key is not there, fallback on identity
|
|
if !ok {
|
|
return s.identity, false
|
|
}
|
|
return decryptionKey, true
|
|
}
|
|
|
|
func (s *MessageSender) handleEncryptionLayer(ctx context.Context, message *v1protocol.StatusMessage) error {
|
|
logger := s.logger.With(zap.String("site", "handleEncryptionLayer"))
|
|
publicKey := message.SigPubKey()
|
|
|
|
// if it's an ephemeral key, we don't negotiate a topic
|
|
decryptionKey, skipNegotiation := s.fetchDecryptionKey(message.TransportLayer.Dst)
|
|
|
|
err := message.HandleEncryptionLayer(decryptionKey, publicKey, s.protocol, skipNegotiation)
|
|
|
|
// if it's an ephemeral key, we don't have to handle a device not found error
|
|
if err == encryption.ErrDeviceNotFound && !skipNegotiation {
|
|
if err := s.handleErrDeviceNotFound(ctx, publicKey); err != nil {
|
|
logger.Error("failed to handle ErrDeviceNotFound", zap.Error(err))
|
|
}
|
|
}
|
|
if err != nil {
|
|
logger.Error("failed to handle an encrypted message", zap.Error(err))
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MessageSender) handleErrDeviceNotFound(ctx context.Context, publicKey *ecdsa.PublicKey) error {
|
|
now := time.Now().Unix()
|
|
advertise, err := s.protocol.ShouldAdvertiseBundle(publicKey, now)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !advertise {
|
|
return nil
|
|
}
|
|
|
|
messageSpec, err := s.protocol.BuildBundleAdvertiseMessage(s.identity, publicKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
|
defer cancel()
|
|
// We don't pass an array of messageIDs as no action needs to be taken
|
|
// when sending a bundle
|
|
_, _, err = s.sendMessageSpec(ctx, publicKey, messageSpec, nil)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
s.protocol.ConfirmBundleAdvertisement(publicKey, now)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MessageSender) wrapMessageV1(rawMessage *RawMessage) ([]byte, error) {
|
|
wrappedMessage, err := v1protocol.WrapMessageV1(rawMessage.Payload, rawMessage.MessageType, rawMessage.Sender)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to wrap message")
|
|
}
|
|
return wrappedMessage, nil
|
|
}
|
|
|
|
func (s *MessageSender) addToDataSync(publicKey *ecdsa.PublicKey, message []byte) ([]byte, error) {
|
|
groupID := datasync.ToOneToOneGroupID(&s.identity.PublicKey, publicKey)
|
|
peerID := datasyncpeer.PublicKeyToPeerID(*publicKey)
|
|
exist, err := s.datasync.IsPeerInGroup(groupID, peerID)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to check if peer is in group")
|
|
}
|
|
if !exist {
|
|
if err := s.datasync.AddPeer(groupID, peerID); err != nil {
|
|
return nil, errors.Wrap(err, "failed to add peer")
|
|
}
|
|
}
|
|
id, err := s.datasync.AppendMessage(groupID, message)
|
|
if err != nil {
|
|
return nil, errors.Wrap(err, "failed to append message to datasync")
|
|
}
|
|
|
|
return id[:], nil
|
|
}
|
|
|
|
// sendPrivateRawMessage sends a message not wrapped in an encryption layer
|
|
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte) ([][]byte, []*types.NewMessage, error) {
|
|
newMessage := &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: payload,
|
|
PowTarget: calculatePoW(payload),
|
|
PowTime: whisperPoWTime,
|
|
PubsubTopic: rawMessage.PubsubTopic,
|
|
}
|
|
|
|
newMessages, err := s.segmentMessage(newMessage)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
hashes := make([][]byte, 0, len(newMessages))
|
|
var hash []byte
|
|
for _, newMessage := range newMessages {
|
|
if rawMessage.SendOnPersonalTopic {
|
|
hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
|
|
} else {
|
|
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
|
|
return hashes, newMessages, nil
|
|
}
|
|
|
|
// sendCommunityMessage sends a message not wrapped in an encryption layer
|
|
// to a community
|
|
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, wrappedMessage []byte, pubsubTopic string, rekey bool, rawMessage *RawMessage) ([][]byte, []*types.NewMessage, error) {
|
|
payload := wrappedMessage
|
|
if rekey && len(rawMessage.HashRatchetGroupID) != 0 {
|
|
|
|
var ratchet *encryption.HashRatchetKeyCompatibility
|
|
var err error
|
|
// We have just rekeyed, pull the latest
|
|
if RekeyCompatibility {
|
|
ratchet, err = s.protocol.GetCurrentKeyForGroup(rawMessage.HashRatchetGroupID)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
}
|
|
keyID, err := ratchet.GetKeyID()
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
s.logger.Debug("adding key id to message", zap.String("keyid", types.Bytes2Hex(keyID)))
|
|
// We send the message over the community topic
|
|
spec, err := s.protocol.BuildHashRatchetReKeyGroupMessage(s.identity, rawMessage.Recipients, rawMessage.HashRatchetGroupID, wrappedMessage, ratchet)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
payload, err = proto.Marshal(spec.Message)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
newMessage := &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: payload,
|
|
PowTarget: calculatePoW(payload),
|
|
PowTime: whisperPoWTime,
|
|
PubsubTopic: pubsubTopic,
|
|
}
|
|
|
|
newMessages, err := s.segmentMessage(newMessage)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
hashes := make([][]byte, 0, len(newMessages))
|
|
for _, newMessage := range newMessages {
|
|
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
|
|
return hashes, newMessages, nil
|
|
}
|
|
|
|
func (s *MessageSender) SendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
|
|
return s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
|
|
}
|
|
|
|
// sendMessageSpec analyses the spec properties and selects a proper transport method.
|
|
func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
|
|
logger := s.logger.With(zap.String("site", "sendMessageSpec"))
|
|
|
|
newMessage, err := MessageSpecToWhisper(messageSpec)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
newMessages, err := s.segmentMessage(newMessage)
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
hashes := make([][]byte, 0, len(newMessages))
|
|
var hash []byte
|
|
for _, newMessage := range newMessages {
|
|
// The shared secret needs to be handle before we send a message
|
|
// otherwise the topic might not be set up before we receive a message
|
|
if messageSpec.SharedSecret != nil && s.handleSharedSecrets != nil {
|
|
err := s.handleSharedSecrets([]*sharedsecret.Secret{messageSpec.SharedSecret})
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
|
|
}
|
|
// process shared secret
|
|
if messageSpec.AgreedSecret {
|
|
logger.Debug("sending using shared secret")
|
|
hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
|
|
} else {
|
|
logger.Debug("sending partitioned topic")
|
|
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
|
|
}
|
|
if err != nil {
|
|
return nil, nil, err
|
|
}
|
|
hashes = append(hashes, hash)
|
|
}
|
|
|
|
sentMessage := &SentMessage{
|
|
PublicKey: publicKey,
|
|
Spec: messageSpec,
|
|
MessageIDs: messageIDs,
|
|
}
|
|
|
|
s.notifyOnSentMessage(sentMessage)
|
|
|
|
return hashes, newMessages, nil
|
|
}
|
|
|
|
func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent {
|
|
c := make(chan *MessageEvent, 100)
|
|
s.messageEventsSubscriptions = append(s.messageEventsSubscriptions, c)
|
|
return c
|
|
}
|
|
|
|
func (s *MessageSender) notifyOnSentMessage(sentMessage *SentMessage) {
|
|
event := &MessageEvent{
|
|
Type: MessageSent,
|
|
SentMessage: sentMessage,
|
|
}
|
|
// Publish on channels, drop if buffer is full
|
|
for _, c := range s.messageEventsSubscriptions {
|
|
select {
|
|
case c <- event:
|
|
default:
|
|
s.logger.Warn("message events subscription channel full when publishing sent event, dropping message")
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
func (s *MessageSender) notifyOnScheduledMessage(recipient *ecdsa.PublicKey, message *RawMessage) {
|
|
event := &MessageEvent{
|
|
Recipient: recipient,
|
|
Type: MessageScheduled,
|
|
RawMessage: message,
|
|
}
|
|
|
|
// Publish on channels, drop if buffer is full
|
|
for _, c := range s.messageEventsSubscriptions {
|
|
select {
|
|
case c <- event:
|
|
default:
|
|
s.logger.Warn("message events subscription channel full when publishing scheduled event, dropping message")
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *MessageSender) JoinPublic(id string) (*transport.Filter, error) {
|
|
return s.transport.JoinPublic(id)
|
|
}
|
|
|
|
// AddEphemeralKey adds an ephemeral key that we will be listening to
|
|
// note that we never removed them from now, as waku/whisper does not
|
|
// recalculate topics on removal, so effectively there's no benefit.
|
|
// On restart they will be gone.
|
|
func (s *MessageSender) AddEphemeralKey(privateKey *ecdsa.PrivateKey) (*transport.Filter, error) {
|
|
s.ephemeralKeysMutex.Lock()
|
|
s.ephemeralKeys[types.EncodeHex(crypto.FromECDSAPub(&privateKey.PublicKey))] = privateKey
|
|
s.ephemeralKeysMutex.Unlock()
|
|
return s.transport.LoadKeyFilters(privateKey)
|
|
}
|
|
|
|
func MessageSpecToWhisper(spec *encryption.ProtocolMessageSpec) (*types.NewMessage, error) {
|
|
var newMessage *types.NewMessage
|
|
|
|
payload, err := proto.Marshal(spec.Message)
|
|
if err != nil {
|
|
return newMessage, err
|
|
}
|
|
|
|
newMessage = &types.NewMessage{
|
|
TTL: whisperTTL,
|
|
Payload: payload,
|
|
PowTarget: calculatePoW(payload),
|
|
PowTime: whisperPoWTime,
|
|
}
|
|
return newMessage, nil
|
|
}
|
|
|
|
// calculatePoW returns the PoWTarget to be used.
|
|
// We check the size and arbitrarily set it to a lower PoW if the packet is
|
|
// greater than 50KB. We do this as the defaultPoW is too high for clients to send
|
|
// large messages.
|
|
func calculatePoW(payload []byte) float64 {
|
|
if len(payload) > largeSizeInBytes {
|
|
return whisperLargeSizePoW
|
|
}
|
|
return whisperDefaultPoW
|
|
}
|
|
|
|
func (s *MessageSender) StopDatasync() {
|
|
if s.datasync != nil {
|
|
s.datasync.Stop()
|
|
}
|
|
}
|
|
|
|
// GetCurrentKeyForGroup returns the latest key timestampID belonging to a key group
|
|
func (s *MessageSender) GetCurrentKeyForGroup(groupID []byte) (*encryption.HashRatchetKeyCompatibility, error) {
|
|
return s.protocol.GetCurrentKeyForGroup(groupID)
|
|
}
|
|
|
|
// GetKeyIDsForGroup returns a slice of key IDs belonging to a given group ID
|
|
func (s *MessageSender) GetKeysForGroup(groupID []byte) ([]*encryption.HashRatchetKeyCompatibility, error) {
|
|
return s.protocol.GetKeysForGroup(groupID)
|
|
}
|
|
|
|
// Segments message into smaller chunks if the size exceeds the maximum allowed
|
|
func segmentMessage(newMessage *types.NewMessage, maxSegmentSize int) ([]*types.NewMessage, error) {
|
|
if len(newMessage.Payload) <= maxSegmentSize {
|
|
return []*types.NewMessage{newMessage}, nil
|
|
}
|
|
|
|
createSegment := func(chunkPayload []byte) (*types.NewMessage, error) {
|
|
copy := &types.NewMessage{}
|
|
err := copier.Copy(copy, newMessage)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
copy.Payload = chunkPayload
|
|
copy.PowTarget = calculatePoW(chunkPayload)
|
|
return copy, nil
|
|
}
|
|
|
|
entireMessageHash := crypto.Keccak256(newMessage.Payload)
|
|
payloadSize := len(newMessage.Payload)
|
|
segmentsCount := int(math.Ceil(float64(payloadSize) / float64(maxSegmentSize)))
|
|
|
|
var segmentMessages []*types.NewMessage
|
|
|
|
for start, index := 0, 0; start < payloadSize; start += maxSegmentSize {
|
|
end := start + maxSegmentSize
|
|
if end > payloadSize {
|
|
end = payloadSize
|
|
}
|
|
|
|
chunk := newMessage.Payload[start:end]
|
|
|
|
segmentMessageProto := &protobuf.SegmentMessage{
|
|
EntireMessageHash: entireMessageHash,
|
|
Index: uint32(index),
|
|
SegmentsCount: uint32(segmentsCount),
|
|
Payload: chunk,
|
|
}
|
|
chunkPayload, err := proto.Marshal(segmentMessageProto)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
segmentMessage, err := createSegment(chunkPayload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
segmentMessages = append(segmentMessages, segmentMessage)
|
|
index++
|
|
}
|
|
|
|
return segmentMessages, nil
|
|
}
|
|
|
|
func (s *MessageSender) segmentMessage(newMessage *types.NewMessage) ([]*types.NewMessage, error) {
|
|
// We set the max message size to 3/4 of the allowed message size, to leave
|
|
// room for segment message metadata.
|
|
newMessages, err := segmentMessage(newMessage, int(s.transport.MaxMessageSize()/4*3))
|
|
s.logger.Debug("message segmented", zap.Int("segments", len(newMessages)))
|
|
return newMessages, err
|
|
}
|
|
|
|
var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete")
|
|
var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed")
|
|
var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count")
|
|
var ErrMessageSegmentsHashMismatch = errors.New("hash of entire payload does not match")
|
|
|
|
func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error {
|
|
logger := s.logger.With(zap.String("site", "handleSegmentationLayer"))
|
|
hlogger := logger.With(zap.String("hash", types.HexBytes(message.TransportLayer.Hash).String()))
|
|
|
|
var segmentMessage protobuf.SegmentMessage
|
|
err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to unmarshal SegmentMessage")
|
|
}
|
|
|
|
hlogger.Debug("handling message segment", zap.String("EntireMessageHash", types.HexBytes(segmentMessage.EntireMessageHash).String()),
|
|
zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount))
|
|
|
|
alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if alreadyCompleted {
|
|
return ErrMessageSegmentsAlreadyCompleted
|
|
}
|
|
|
|
if segmentMessage.SegmentsCount < 2 {
|
|
return ErrMessageSegmentsInvalidCount
|
|
}
|
|
|
|
err = s.persistence.SaveMessageSegment(&segmentMessage, message.TransportLayer.SigPubKey, time.Now().Unix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
segments, err := s.persistence.GetMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(segments) != int(segmentMessage.SegmentsCount) {
|
|
return ErrMessageSegmentsIncomplete
|
|
}
|
|
|
|
// Combine payload
|
|
var entirePayload bytes.Buffer
|
|
for _, segment := range segments {
|
|
_, err := entirePayload.Write(segment.Payload)
|
|
if err != nil {
|
|
return errors.Wrap(err, "failed to write segment payload")
|
|
}
|
|
}
|
|
|
|
// Sanity check
|
|
entirePayloadHash := crypto.Keccak256(entirePayload.Bytes())
|
|
if !bytes.Equal(entirePayloadHash, segmentMessage.EntireMessageHash) {
|
|
return ErrMessageSegmentsHashMismatch
|
|
}
|
|
|
|
err = s.persistence.CompleteMessageSegments(segmentMessage.EntireMessageHash, message.TransportLayer.SigPubKey, time.Now().Unix())
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
message.TransportLayer.Payload = entirePayload.Bytes()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *MessageSender) CleanupSegments() error {
|
|
weekAgo := time.Now().AddDate(0, 0, -7).Unix()
|
|
monthAgo := time.Now().AddDate(0, -1, 0).Unix()
|
|
|
|
err := s.persistence.RemoveMessageSegmentsOlderThan(weekAgo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = s.persistence.RemoveMessageSegmentsCompletedOlderThan(monthAgo)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|