From de29ec71ae9fe36334616511deafc85f4199f710 Mon Sep 17 00:00:00 2001 From: Patryk Osmaczko Date: Fri, 10 Nov 2023 14:15:24 +0100 Subject: [PATCH] feat: enable messages segmentation closes: status-im/status-desktop#12188 --- protocol/common/message_sender.go | 183 +++++++++++++++++-------- protocol/communities_messenger_test.go | 43 ++++++ 2 files changed, 166 insertions(+), 60 deletions(-) diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 3e83d0ec3..0d6567705 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -319,8 +319,8 @@ func (s *MessageSender) sendCommunity( // earlier than the scheduled s.notifyOnScheduledMessage(nil, rawMessage) - var hash []byte - var newMessage *types.NewMessage + var hashes [][]byte + var newMessages []*types.NewMessage // Check if it's a key exchange message. In this case we send it // to all the recipients @@ -367,7 +367,7 @@ func (s *MessageSender) sendCommunity( } rawMessage.Payload = payload - newMessage = &types.NewMessage{ + newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, PowTarget: calculatePoW(payload), @@ -383,13 +383,17 @@ func (s *MessageSender) sendCommunity( // notify before dispatching s.notifyOnScheduledMessage(nil, rawMessage) - _, err = s.transport.SendPublic(ctx, newMessage, types.EncodeHex(rawMessage.CommunityID)) + newMessages, err = s.segmentMessage(newMessage) if err != nil { return nil, err } - + for _, newMessage := range newMessages { + _, err = s.transport.SendPublic(ctx, newMessage, types.EncodeHex(rawMessage.CommunityID)) + if err != nil { + return nil, err + } + } } - } return nil, nil } @@ -410,7 +414,7 @@ func (s *MessageSender) sendCommunity( if err != nil { return nil, errors.Wrap(err, "failed to marshal") } - hash, newMessage, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload) + hashes, newMessages, err = s.dispatchCommunityChatMessage(ctx, rawMessage, payload) if err != nil { return nil, err } @@ -430,16 +434,18 @@ func (s *MessageSender) sendCommunity( if err != nil { return nil, errors.Wrap(err, "failed to decompress pubkey") } - hash, newMessage, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs, rawMessage.PubsubTopic) + hashes, newMessages, err = s.dispatchCommunityMessage(ctx, pubkey, payload, messageIDs, rawMessage.PubsubTopic) if err != nil { s.logger.Error("failed to send a community message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } - - s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) } - s.transport.Track(messageIDs, hash, newMessage) + for i, newMessage := range newMessages { + s.logger.Debug("sent community message ", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i]))) + s.transport.Track(messageIDs, hashes[i], newMessage) + } + return messageID, nil } @@ -493,15 +499,16 @@ func (s *MessageSender) sendPrivate( } else if rawMessage.SkipEncryptionLayer { // When SkipProtocolLayer is set we don't pass the message to the encryption layer messageIDs := [][]byte{messageID} - hash, newMessage, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs) + hashes, newMessages, err := s.sendPrivateRawMessage(ctx, rawMessage, recipient, wrappedMessage, messageIDs) if err != nil { s.logger.Error("failed to send a private message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } - s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) - - s.transport.Track(messageIDs, hash, newMessage) + for i, newMessage := range newMessages { + s.logger.Debug("sent private message skipProtocolLayer", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i]))) + s.transport.Track(messageIDs, hashes[i], newMessage) + } } else { messageSpec, err := s.protocol.BuildEncryptedMessage(rawMessage.Sender, recipient, wrappedMessage) @@ -520,15 +527,16 @@ func (s *MessageSender) sendPrivate( } messageIDs := [][]byte{messageID} - hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) + hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) if err != nil { s.logger.Error("failed to send a private message", zap.Error(err)) return nil, errors.Wrap(err, "failed to send a message spec") } - s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) - - s.transport.Track(messageIDs, hash, newMessage) + for i, newMessage := range newMessages { + s.logger.Debug("sent private message without datasync", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hashes[i]))) + s.transport.Track(messageIDs, hashes[i], newMessage) + } } return messageID, nil @@ -555,12 +563,14 @@ func (s *MessageSender) SendPairInstallation( messageID := v1protocol.MessageID(&s.identity.PublicKey, wrappedMessage) messageIDs := [][]byte{messageID} - hash, newMessage, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) + hashes, newMessages, err := s.sendMessageSpec(ctx, recipient, messageSpec, messageIDs) if err != nil { return nil, errors.Wrap(err, "failed to send a message spec") } - s.transport.Track(messageIDs, hash, newMessage) + for i, newMessage := range newMessages { + s.transport.Track(messageIDs, hashes[i], newMessage) + } return messageID, nil } @@ -616,7 +626,7 @@ func (s *MessageSender) EncodeAbridgedMembershipUpdate( return s.encodeMembershipUpdate(message, chatEntity) } -func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte) ([]byte, *types.NewMessage, error) { +func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMessage *RawMessage, wrappedMessage []byte) ([][]byte, []*types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, @@ -635,12 +645,21 @@ func (s *MessageSender) dispatchCommunityChatMessage(ctx context.Context, rawMes // notify before dispatching s.notifyOnScheduledMessage(nil, rawMessage) - hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID) + newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } - return hash, newMessage, nil + hashes := make([][]byte, len(newMessages)) + for _, newMessage := range newMessages { + hash, err := s.transport.SendPublic(ctx, newMessage, rawMessage.LocalChatID) + if err != nil { + return nil, nil, err + } + hashes = append(hashes, hash) + } + + return hashes, newMessages, nil } // SendPublic takes encoded data, encrypts it and sends through the wire. @@ -696,12 +715,20 @@ func (s *MessageSender) SendPublic( // notify before dispatching s.notifyOnScheduledMessage(nil, &rawMessage) - hash, err := s.transport.SendPublic(ctx, newMessage, chatName) + newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, err } - s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) + hashes := make([][]byte, len(newMessages)) + for _, newMessage := range newMessages { + hash, err := s.transport.SendPublic(ctx, newMessage, chatName) + if err != nil { + return nil, err + } + hashes = append(hashes, hash) + s.logger.Debug("sent public message", zap.String("messageID", messageID.String()), zap.String("hash", types.EncodeHex(hash))) + } sentMessage := &SentMessage{ Spec: messageSpec, @@ -710,7 +737,9 @@ func (s *MessageSender) SendPublic( s.notifyOnSentMessage(sentMessage) - s.transport.Track([][]byte{messageID}, hash, newMessage) + for i, newMessage := range newMessages { + s.transport.Track([][]byte{messageID}, hashes[i], newMessage) + } return messageID, nil } @@ -746,7 +775,7 @@ func unwrapDatasyncMessage(m *v1protocol.StatusMessage, datasync *datasync.DataS // It returns an error only if the processing of required steps failed. func (s *MessageSender) HandleMessages(wakuMessage *types.Message) ([]*v1protocol.StatusMessage, [][]byte, error) { logger := s.logger.With(zap.String("site", "HandleMessages")) - hlogger := logger.With(zap.ByteString("hash", wakuMessage.Hash)) + hlogger := logger.With(zap.String("hash", types.HexBytes(wakuMessage.Hash).String())) var statusMessages []*v1protocol.StatusMessage var acks [][]byte @@ -993,21 +1022,22 @@ func (s *MessageSender) sendDataSync(ctx context.Context, publicKey *ecdsa.Publi } - hash, newMessage, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs) + hashes, newMessages, err := s.sendMessageSpec(ctx, publicKey, messageSpec, messageIDs) if err != nil { s.logger.Error("failed to send a datasync message", zap.Error(err)) return err } - s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hash))) - - s.transport.Track(messageIDs, hash, newMessage) + for i, newMessage := range newMessages { + s.logger.Debug("sent private messages", zap.Any("messageIDs", hexMessageIDs), zap.String("hash", types.EncodeHex(hashes[i]))) + s.transport.Track(messageIDs, hashes[i], newMessage) + } return nil } // sendPrivateRawMessage sends a message not wrapped in an encryption layer -func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { +func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *RawMessage, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, @@ -1015,24 +1045,32 @@ func (s *MessageSender) sendPrivateRawMessage(ctx context.Context, rawMessage *R PowTime: whisperPoWTime, PubsubTopic: rawMessage.PubsubTopic, } - var hash []byte - var err error - if rawMessage.SendOnPersonalTopic { - hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey) - } else { - hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) - } + newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } - return hash, newMessage, nil + hashes := make([][]byte, len(newMessages)) + var hash []byte + for _, newMessage := range newMessages { + if rawMessage.SendOnPersonalTopic { + hash, err = s.transport.SendPrivateOnPersonalTopic(ctx, newMessage, publicKey) + } else { + hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) + } + if err != nil { + return nil, nil, err + } + hashes = append(hashes, hash) + } + + return hashes, newMessages, nil } // sendCommunityMessage sends a message not wrapped in an encryption layer // to a community -func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte, pubsubTopic string) ([]byte, *types.NewMessage, error) { +func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey *ecdsa.PublicKey, payload []byte, messageIDs [][]byte, pubsubTopic string) ([][]byte, []*types.NewMessage, error) { newMessage := &types.NewMessage{ TTL: whisperTTL, Payload: payload, @@ -1041,37 +1079,54 @@ func (s *MessageSender) dispatchCommunityMessage(ctx context.Context, publicKey PubsubTopic: pubsubTopic, } - hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey) + newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } - return hash, newMessage, nil + hashes := make([][]byte, len(newMessages)) + for _, newMessage := range newMessages { + hash, err := s.transport.SendCommunityMessage(ctx, newMessage, publicKey) + if err != nil { + return nil, nil, err + } + hashes = append(hashes, hash) + } + + return hashes, newMessages, nil } // sendMessageSpec analyses the spec properties and selects a proper transport method. -func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([]byte, *types.NewMessage, error) { +func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.PublicKey, messageSpec *encryption.ProtocolMessageSpec, messageIDs [][]byte) ([][]byte, []*types.NewMessage, error) { + logger := s.logger.With(zap.String("site", "sendMessageSpec")) + newMessage, err := MessageSpecToWhisper(messageSpec) if err != nil { return nil, nil, err } - logger := s.logger.With(zap.String("site", "sendMessageSpec")) - - var hash []byte - - // process shared secret - if messageSpec.AgreedSecret { - logger.Debug("sending using shared secret") - hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key) - } else { - logger.Debug("sending partitioned topic") - hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) - } + newMessages, err := s.segmentMessage(newMessage) if err != nil { return nil, nil, err } + hashes := make([][]byte, len(newMessages)) + var hash []byte + for _, newMessage := range newMessages { + // process shared secret + if messageSpec.AgreedSecret { + logger.Debug("sending using shared secret") + hash, err = s.transport.SendPrivateWithSharedSecret(ctx, newMessage, publicKey, messageSpec.SharedSecret.Key) + } else { + logger.Debug("sending partitioned topic") + hash, err = s.transport.SendPrivateWithPartitioned(ctx, newMessage, publicKey) + } + if err != nil { + return nil, nil, err + } + hashes = append(hashes, hash) + } + sentMessage := &SentMessage{ PublicKey: publicKey, Spec: messageSpec, @@ -1080,7 +1135,7 @@ func (s *MessageSender) sendMessageSpec(ctx context.Context, publicKey *ecdsa.Pu s.notifyOnSentMessage(sentMessage) - return hash, newMessage, nil + return hashes, newMessages, nil } func (s *MessageSender) SubscribeToMessageEvents() <-chan *MessageEvent { @@ -1256,6 +1311,14 @@ func segmentMessage(newMessage *types.NewMessage, maxSegmentSize int) ([]*types. return segmentMessages, nil } +func (s *MessageSender) segmentMessage(newMessage *types.NewMessage) ([]*types.NewMessage, error) { + // We set the max message size to 3/4 of the allowed message size, to leave + // room for segment message metadata. + newMessages, err := segmentMessage(newMessage, int(s.transport.MaxMessageSize()/4*3)) + s.logger.Debug("message segmented", zap.Int("segments", len(newMessages))) + return newMessages, err +} + var ErrMessageSegmentsIncomplete = errors.New("message segments incomplete") var ErrMessageSegmentsAlreadyCompleted = errors.New("message segments already completed") var ErrMessageSegmentsInvalidCount = errors.New("invalid segments count") @@ -1263,7 +1326,7 @@ var ErrMessageSegmentsHashMismatch = errors.New("hash of entire payload does not func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessage) error { logger := s.logger.With(zap.String("site", "handleSegmentationLayer")) - hlogger := logger.With(zap.ByteString("hash", message.TransportLayer.Hash)) + hlogger := logger.With(zap.String("hash", types.HexBytes(message.TransportLayer.Hash).String())) var segmentMessage protobuf.SegmentMessage err := proto.Unmarshal(message.TransportLayer.Payload, &segmentMessage) @@ -1271,7 +1334,7 @@ func (s *MessageSender) handleSegmentationLayer(message *v1protocol.StatusMessag return errors.Wrap(err, "failed to unmarshal SegmentMessage") } - hlogger.Debug("handling message segment", zap.ByteString("EntireMessageHash", segmentMessage.EntireMessageHash), + hlogger.Debug("handling message segment", zap.String("EntireMessageHash", types.HexBytes(segmentMessage.EntireMessageHash).String()), zap.Uint32("Index", segmentMessage.Index), zap.Uint32("SegmentsCount", segmentMessage.SegmentsCount)) alreadyCompleted, err := s.persistence.IsMessageAlreadyCompleted(segmentMessage.EntireMessageHash) diff --git a/protocol/communities_messenger_test.go b/protocol/communities_messenger_test.go index 749798f22..71987699e 100644 --- a/protocol/communities_messenger_test.go +++ b/protocol/communities_messenger_test.go @@ -3641,3 +3641,46 @@ func (s *MessengerCommunitiesSuite) TestCommunityRekeyAfterBanDisableCompatibili s.Require().NoError(owner.Shutdown()) } + +func (s *MessengerCommunitiesSuite) TestRetrieveBigCommunity() { + bigEmoji := make([]byte, 4*1024*1024) // 4 MB + description := &requests.CreateCommunity{ + Membership: protobuf.CommunityPermissions_AUTO_ACCEPT, + Name: "status", + Color: "#ffffff", + Description: "status community description", + Emoji: string(bigEmoji), + } + + // checks that private messages are segmented + // (community is advertised through `SendPrivate`) + response, err := s.owner.CreateCommunity(description, true) + s.Require().NoError(err) + s.Require().NotNil(response) + s.Require().Len(response.Communities(), 1) + community := response.Communities()[0] + + s.advertiseCommunityTo(community, s.owner, s.alice) + s.joinCommunity(community, s.owner, s.alice) + + // checks that public messages are segmented + // (community is advertised through `SendPublic`) + updatedDescription := "status updated community description" + _, err = s.owner.EditCommunity(&requests.EditCommunity{ + CommunityID: community.ID(), + CreateCommunity: requests.CreateCommunity{ + Membership: protobuf.CommunityPermissions_AUTO_ACCEPT, + Name: "status", + Color: "#ffffff", + Description: updatedDescription, + Emoji: string(bigEmoji), + }, + }) + s.Require().NoError(err) + + // alice receives updated description + _, err = WaitOnMessengerResponse(s.alice, func(r *MessengerResponse) bool { + return len(r.Communities()) > 0 && r.Communities()[0].DescriptionText() == updatedDescription + }, "updated description not received") + s.Require().NoError(err) +}