feat: enable messages segmentation

closes: status-im/status-desktop#12188
This commit is contained in:
Patryk Osmaczko 2023-11-10 14:15:24 +01:00 committed by osmaczko
parent 6bb806caad
commit de29ec71ae
2 changed files with 166 additions and 60 deletions

View File

@ -319,8 +319,8 @@ func (s *MessageSender) sendCommunity(
// earlier than the scheduled // earlier than the scheduled
s.notifyOnScheduledMessage(nil, rawMessage) s.notifyOnScheduledMessage(nil, rawMessage)
var hash []byte var hashes [][]byte
var newMessage *types.NewMessage var newMessages []*types.NewMessage
// Check if it's a key exchange message. In this case we send it // Check if it's a key exchange message. In this case we send it
// to all the recipients // to all the recipients
@ -367,7 +367,7 @@ func (s *MessageSender) sendCommunity(
} }
rawMessage.Payload = payload rawMessage.Payload = payload
newMessage = &types.NewMessage{ newMessage := &types.NewMessage{
TTL: whisperTTL, TTL: whisperTTL,
Payload: payload, Payload: payload,
PowTarget: calculatePoW(payload), PowTarget: calculatePoW(payload),
@ -383,13 +383,17 @@ func (s *MessageSender) sendCommunity(
// notify before dispatching // notify before dispatching
s.notifyOnScheduledMessage(nil, rawMessage) s.notifyOnScheduledMessage(nil, rawMessage)
_, err = s.transport.SendPublic(ctx, newMessage, types.EncodeHex(rawMessage.CommunityID)) newMessages, err = s.segmentMessage(newMessage)
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, newMessage := range newMessages {
_, err = s.transport.SendPublic(ctx, newMessage, types.EncodeHex(rawMessage.CommunityID))
if err != nil {
return nil, err
}
}
} }
} }
return nil, nil return nil, nil
} }
@ -410,7 +414,7 @@ func (s *MessageSender) sendCommunity(
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to marshal") return nil, errors.Wrap(err, "failed to marshal")
} }
hash, newMessage, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload) hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -430,16 +434,18 @@ func (s *MessageSender) sendCommunity(
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to decompress pubkey") return nil, errors.Wrap(err, "failed to decompress pubkey")
} }
hash, newMessage, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs, rawMessage.PubsubTopic) hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs, rawMessage.PubsubTopic)
if err != nil { if err != nil {
s.logger.Error("failed to send a community message", zap.Error(err)) s.logger.Error("failed to send a community message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec") return nil, errors.Wrap(err, "failed to send a message spec")
} }
s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash)))
} }
s.transport.Track(messageIDs, hash, newMessage) for i, newMessage := range newMessages {
s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hashes[i], newMessage)
}
return messageID, nil return messageID, nil
} }
@ -493,15 +499,16 @@ func (s *MessageSender) sendPrivate(
} else if rawMessage.SkipEncryptionLayer { } else if rawMessage.SkipEncryptionLayer {
// When SkipProtocolLayer is set we don't pass the message to the encryption layer // When SkipProtocolLayer is set we don't pass the message to the encryption layer
messageIDs := [][]byte{messageID} messageIDs := [][]byte{messageID}
hash, newMessage, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs) hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs)
if err != nil { if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err)) s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec") return nil, errors.Wrap(err, "failed to send a message spec")
} }
s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) for i, newMessage := range newMessages {
s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hash, newMessage) s.transport.Track(messageIDs, hashes[i], newMessage)
}
} else { } else {
messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage) messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage)
@ -520,15 +527,16 @@ func (s *MessageSender) sendPrivate(
} }
messageIDs := [][]byte{messageID} messageIDs := [][]byte{messageID}
hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
if err != nil { if err != nil {
s.logger.Error("failed to send a private message", zap.Error(err)) s.logger.Error("failed to send a private message", zap.Error(err))
return nil, errors.Wrap(err, "failed to send a message spec") return nil, errors.Wrap(err, "failed to send a message spec")
} }
s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) for i, newMessage := range newMessages {
s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hash, newMessage) s.transport.Track(messageIDs, hashes[i], newMessage)
}
} }
return messageID, nil return messageID, nil
@ -555,12 +563,14 @@ func (s *MessageSender) SendPairInstallation(
messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage) messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage)
messageIDs := [][]byte{messageID} messageIDs := [][]byte{messageID}
hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs)
if err != nil { if err != nil {
return nil, errors.Wrap(err, "failed to send a message spec") return nil, errors.Wrap(err, "failed to send a message spec")
} }
s.transport.Track(messageIDs, hash, newMessage) for i, newMessage := range newMessages {
s.transport.Track(messageIDs, hashes[i], newMessage)
}
return messageID, nil return messageID, nil
} }
@ -616,7 +626,7 @@ func (s *MessageSender) EncodeAbridgedMembershipUpdate(
return s.encodeMembershipUpdate(message, chatEntity) return s.encodeMembershipUpdate(message, chatEntity)
} }
func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte) ([]byte, *types.NewMessage, error) { func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{ newMessage := &types.NewMessage{
TTL: whisperTTL, TTL: whisperTTL,
@ -635,12 +645,21 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes
// notify before dispatching // notify before dispatching
s.notifyOnScheduledMessage(nil, rawMessage) s.notifyOnScheduledMessage(nil, rawMessage)
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID) newMessages, err := s.segmentMessage(newMessage)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return hash, newMessage, nil hashes := make([][]byte, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
} }
// SendPublic takes encoded data, encrypts it and sends through the wire. // SendPublic takes encoded data, encrypts it and sends through the wire.
@ -696,12 +715,20 @@ func (s *MessageSender) SendPublic(
// notify before dispatching // notify before dispatching
s.notifyOnScheduledMessage(nil, &rawMessage) s.notifyOnScheduledMessage(nil, &rawMessage)
hash, err := s.transport.SendPublic(ctx, newMessage, chatName) newMessages, err := s.segmentMessage(newMessage)
if err != nil { if err != nil {
return nil, err return nil, err
} }
s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) hashes := make([][]byte, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendPublic(ctx, newMessage, chatName)
if err != nil {
return nil, err
}
hashes = append(hashes, hash)
s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash)))
}
sentMessage := &SentMessage{ sentMessage := &SentMessage{
Spec: messageSpec, Spec: messageSpec,
@ -710,7 +737,9 @@ func (s *MessageSender) SendPublic(
s.notifyOnSentMessage(sentMessage) s.notifyOnSentMessage(sentMessage)
s.transport.Track([][]byte{messageID}, hash, newMessage) for i, newMessage := range newMessages {
s.transport.Track([][]byte{messageID}, hashes[i], newMessage)
}
return messageID, nil return messageID, nil
} }
@ -746,7 +775,7 @@ func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataS
// It returns an error only if the processing of required steps failed. // It returns an error only if the processing of required steps failed.
func (s *MessageSender) HandleMessages(wakuMessage *types.Message) ([]*v1protocol.StatusMessage, [][]byte, error) { func (s *MessageSender) HandleMessages(wakuMessage *types.Message) ([]*v1protocol.StatusMessage, [][]byte, error) {
logger := s.logger.With(zap.String("site", "HandleMessages")) logger := s.logger.With(zap.String("site", "HandleMessages"))
hlogger := logger.With(zap.ByteString("hash", wakuMessage.Hash)) hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String()))
var statusMessages []*v1protocol.StatusMessage var statusMessages []*v1protocol.StatusMessage
var acks [][]byte var acks [][]byte
@ -993,21 +1022,22 @@ func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.Publi
} }
hash, newMessage, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs) hashes, newMessages, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs)
if err != nil { if err != nil {
s.logger.Error("failed to send a datasync message", zap.Error(err)) s.logger.Error("failed to send a datasync message", zap.Error(err))
return err return err
} }
s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hash))) for i, newMessage := range newMessages {
s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hashes[i])))
s.transport.Track(messageIDs, hash, newMessage) s.transport.Track(messageIDs, hashes[i], newMessage)
}
return nil return nil
} }
// sendPrivateRawMessage sends a message not wrapped in an encryption layer // sendPrivateRawMessage sends a message not wrapped in an encryption layer
func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{ newMessage := &types.NewMessage{
TTL: whisperTTL, TTL: whisperTTL,
Payload: payload, Payload: payload,
@ -1015,24 +1045,32 @@ func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *R
PowTime: whisperPoWTime, PowTime: whisperPoWTime,
PubsubTopic: rawMessage.PubsubTopic, PubsubTopic: rawMessage.PubsubTopic,
} }
var hash []byte
var err error
if rawMessage.SendOnPersonalTopic { newMessages, err := s.segmentMessage(newMessage)
hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
} else {
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return hash, newMessage, nil hashes := make([][]byte, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
if rawMessage.SendOnPersonalTopic {
hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey)
} else {
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
} }
// sendCommunityMessage sends a message not wrapped in an encryption layer // sendCommunityMessage sends a message not wrapped in an encryption layer
// to a community // to a community
func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte, pubsubTopic string) ([]byte, *types.NewMessage, error) { func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte, pubsubTopic string) ([][]byte, []*types.NewMessage, error) {
newMessage := &types.NewMessage{ newMessage := &types.NewMessage{
TTL: whisperTTL, TTL: whisperTTL,
Payload: payload, Payload: payload,
@ -1041,37 +1079,54 @@ func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey
PubsubTopic: pubsubTopic, PubsubTopic: pubsubTopic,
} }
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey) newMessages, err := s.segmentMessage(newMessage)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
return hash, newMessage, nil hashes := make([][]byte, len(newMessages))
for _, newMessage := range newMessages {
hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey)
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
return hashes, newMessages, nil
} }
// sendMessageSpec analyses the spec properties and selects a proper transport method. // sendMessageSpec analyses the spec properties and selects a proper transport method.
func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) {
logger := s.logger.With(zap.String("site", "sendMessageSpec"))
newMessage, err := MessageSpecToWhisper(messageSpec) newMessage, err := MessageSpecToWhisper(messageSpec)
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
logger := s.logger.With(zap.String("site", "sendMessageSpec")) newMessages, err := s.segmentMessage(newMessage)
var hash []byte
// process shared secret
if messageSpec.AgreedSecret {
logger.Debug("sending using shared secret")
hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
} else {
logger.Debug("sending partitioned topic")
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil { if err != nil {
return nil, nil, err return nil, nil, err
} }
hashes := make([][]byte, len(newMessages))
var hash []byte
for _, newMessage := range newMessages {
// process shared secret
if messageSpec.AgreedSecret {
logger.Debug("sending using shared secret")
hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key)
} else {
logger.Debug("sending partitioned topic")
hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey)
}
if err != nil {
return nil, nil, err
}
hashes = append(hashes, hash)
}
sentMessage := &SentMessage{ sentMessage := &SentMessage{
PublicKey: publicKey, PublicKey: publicKey,
Spec: messageSpec, Spec: messageSpec,
@ -1080,7 +1135,7 @@ func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.Pu
s.notifyOnSentMessage(sentMessage) s.notifyOnSentMessage(sentMessage)
return hash, newMessage, nil return hashes, newMessages, nil
} }
func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent { func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent {
@ -1256,6 +1311,14 @@ func segmentMessage(newMessage *types.NewMessage, maxSegmentSize int) ([]*types.
return segmentMessages, nil return segmentMessages, nil
} }
func (s *MessageSender) segmentMessage(newMessage *types.NewMessage) ([]*types.NewMessage, error) {
// We set the max message size to 3/4 of the allowed message size, to leave
// room for segment message metadata.
newMessages, err := segmentMessage(newMessage, int(s.transport.MaxMessageSize()/4*3))
s.logger.Debug("message segmented", zap.Int("segments", len(newMessages)))
return newMessages, err
}
var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete") var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete")
var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed") var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed")
var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count") var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count")
@ -1263,7 +1326,7 @@ var ErrMessageSegmentsHashMismatch = errors.New("hash of entire payload does not
func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error { func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error {
logger := s.logger.With(zap.String("site", "handleSegmentationLayer")) logger := s.logger.With(zap.String("site", "handleSegmentationLayer"))
hlogger := logger.With(zap.ByteString("hash", message.TransportLayer.Hash)) hlogger := logger.With(zap.String("hash", types.HexBytes(message.TransportLayer.Hash).String()))
var segmentMessage protobuf.SegmentMessage var segmentMessage protobuf.SegmentMessage
err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage) err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage)
@ -1271,7 +1334,7 @@ func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessag
return errors.Wrap(err, "failed to unmarshal SegmentMessage") return errors.Wrap(err, "failed to unmarshal SegmentMessage")
} }
hlogger.Debug("handling message segment", zap.ByteString("EntireMessageHash", segmentMessage.EntireMessageHash), hlogger.Debug("handling message segment", zap.String("EntireMessageHash", types.HexBytes(segmentMessage.EntireMessageHash).String()),
zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount)) zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount))
alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash) alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash)

View File

@ -3641,3 +3641,46 @@ func (s *MessengerCommunitiesSuite) TestCommunityRekeyAfterBanDisableCompatibili
s.Require().NoError(owner.Shutdown()) s.Require().NoError(owner.Shutdown())
} }
func (s *MessengerCommunitiesSuite) TestRetrieveBigCommunity() {
bigEmoji := make([]byte, 4*1024*1024) // 4 MB
description := &requests.CreateCommunity{
Membership: protobuf.CommunityPermissions_AUTO_ACCEPT,
Name: "status",
Color: "#ffffff",
Description: "status community description",
Emoji: string(bigEmoji),
}
// checks that private messages are segmented
// (community is advertised through `SendPrivate`)
response, err := s.owner.CreateCommunity(description, true)
s.Require().NoError(err)
s.Require().NotNil(response)
s.Require().Len(response.Communities(), 1)
community := response.Communities()[0]
s.advertiseCommunityTo(community, s.owner, s.alice)
s.joinCommunity(community, s.owner, s.alice)
// checks that public messages are segmented
// (community is advertised through `SendPublic`)
updatedDescription := "status updated community description"
_, err = s.owner.EditCommunity(&requests.EditCommunity{
CommunityID: community.ID(),
CreateCommunity: requests.CreateCommunity{
Membership: protobuf.CommunityPermissions_AUTO_ACCEPT,
Name: "status",
Color: "#ffffff",
Description: updatedDescription,
Emoji: string(bigEmoji),
},
})
s.Require().NoError(err)
// alice receives updated description
_, err = WaitOnMessengerResponse(s.alice, func(r *MessengerResponse) bool {
return len(r.Communities()) > 0 && r.Communities()[0].DescriptionText() == updatedDescription
}, "updated description not received")
s.Require().NoError(err)
}