From 7b3013a010bda65c28d97e897ad6d6027700b985 Mon Sep 17 00:00:00 2001 From: Patryk Osmaczko Date: Fri, 17 Nov 2023 16:11:06 +0100 Subject: [PATCH] chore: remove message chunking from datasync It was redundant to recently introduced messages segmentation layer. --- protocol/common/message_sender.go | 8 +- protocol/datasync/transport.go | 131 ++++++---------------------- protocol/datasync/transport_test.go | 45 ---------- 3 files changed, 27 insertions(+), 157 deletions(-) delete mode 100644 protocol/datasync/transport_test.go diff --git a/protocol/common/message_sender.go b/protocol/common/message_sender.go index 33c6c86dd..dff0ab4a1 100644 --- a/protocol/common/message_sender.go +++ b/protocol/common/message_sender.go @@ -129,11 +129,7 @@ func NewMessageSender( // but actual encrypt and send calls are postponed. // sendDataSync is responsible for encrypting and sending postponed messages. if features.Datasync { - // We set the max message size to 3/4 of the allowed message size, to leave - // room for encryption. - // Messages will be tried to send in any case, even if they exceed this - // value - ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger) + ds.Init(p.sendDataSync, logger) ds.Start(datasync.DatasyncTicker) } @@ -1238,7 +1234,7 @@ func (s *MessageSender) StartDatasync() { ds := datasync.New(dataSyncNode, dataSyncTransport, true, s.logger) if s.datasyncEnabled { - ds.Init(s.sendDataSync, s.transport.MaxMessageSize()/4*3, s.logger) + ds.Init(s.sendDataSync, s.logger) ds.Start(datasync.DatasyncTicker) } diff --git a/protocol/datasync/transport.go b/protocol/datasync/transport.go index fa05b5bff..3f1801749 100644 --- a/protocol/datasync/transport.go +++ b/protocol/datasync/transport.go @@ -19,24 +19,17 @@ import ( const backoffInterval = 30 -var errNotInitialized = errors.New("Datasync transport not initialized") +var errNotInitialized = errors.New("datasync transport not initialized") var DatasyncTicker = 300 * time.Millisecond // It's easier to calculate nextEpoch if we consider seconds as a unit rather than // 300 ms, so we multiply the result by the ratio var offsetToSecond = uint64(time.Second / DatasyncTicker) -// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes -var payloadTagSize = 14 - -// timestampPayloadSize is the maximum size in bytes for the timestamp field (uint64) -var timestampPayloadSize = 10 - type NodeTransport struct { - packets chan transport.Packet - logger *zap.Logger - maxMessageSize uint32 - dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error + packets chan transport.Packet + logger *zap.Logger + dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error } func NewNodeTransport() *NodeTransport { @@ -45,9 +38,8 @@ func NewNodeTransport() *NodeTransport { } } -func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error, maxMessageSize uint32, logger *zap.Logger) { +func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error, logger *zap.Logger) { t.dispatch = dispatch - t.maxMessageSize = maxMessageSize t.logger = logger } @@ -64,110 +56,37 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf return errNotInitialized } - payloads := splitPayloadInBatches(&payload, int(t.maxMessageSize)) - for _, payload := range payloads { + if !payload.IsValid() { + t.logger.Error("payload is invalid") + return nil + } - if !payload.IsValid() { - t.logger.Error("payload is invalid") - continue - } + marshalledPayload, err := proto.Marshal(&payload) + if err != nil { + t.logger.Error("failed to marshal payload") + return nil + } - marshalledPayload, err := proto.Marshal(payload) - if err != nil { - t.logger.Error("failed to marshal payload") - continue - } + publicKey, err := datasyncpeer.IDToPublicKey(peer) + if err != nil { + t.logger.Error("failed to convert id to public key", zap.Error(err)) + return nil + } - publicKey, err := datasyncpeer.IDToPublicKey(peer) - if err != nil { - t.logger.Error("failed to conver id to public key", zap.Error(err)) - continue - } - // We don't return an error otherwise datasync will keep - // re-trying sending at each epoch - err = t.dispatch(context.Background(), publicKey, marshalledPayload, payload) - if err != nil { - t.logger.Error("failed to send message", zap.Error(err)) - continue - } + // We don't return an error otherwise datasync will keep + // re-trying sending at each epoch + err = t.dispatch(context.Background(), publicKey, marshalledPayload, &payload) + if err != nil { + t.logger.Error("failed to send message", zap.Error(err)) + return nil } return nil } -func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*protobuf.Payload { - newPayload := &protobuf.Payload{} - var response []*protobuf.Payload - currentSize := payloadTagSize - - // this is not going to be 100% accurate, but should be fine in most cases, faster - // than using proto.Size - for _, ack := range payload.Acks { - if len(ack)+currentSize+1 > maxSizeBytes { - // We check if it's valid as it might be that the initial message - // is too big, in this case we still batch it - if newPayload.IsValid() { - response = append(response, newPayload) - } - newPayload = &protobuf.Payload{Acks: [][]byte{ack}} - currentSize = len(ack) + payloadTagSize + 1 - } else { - newPayload.Acks = append(newPayload.Acks, ack) - currentSize += len(ack) - } - } - - for _, offer := range payload.Offers { - if len(offer)+currentSize+1 > maxSizeBytes { - if newPayload.IsValid() { - response = append(response, newPayload) - } - newPayload = &protobuf.Payload{Offers: [][]byte{offer}} - currentSize = len(offer) + payloadTagSize + 1 - } else { - newPayload.Offers = append(newPayload.Offers, offer) - currentSize += len(offer) - } - } - - for _, request := range payload.Requests { - if len(request)+currentSize+1 > maxSizeBytes { - if newPayload.IsValid() { - response = append(response, newPayload) - } - newPayload = &protobuf.Payload{Requests: [][]byte{request}} - currentSize = len(request) + payloadTagSize + 1 - } else { - newPayload.Requests = append(newPayload.Requests, request) - currentSize += len(request) - } - } - - for _, message := range payload.Messages { - // We add the body size, the length field for payload, the length field for group id, - // the length of timestamp, body and groupid - if currentSize+1+1+timestampPayloadSize+len(message.Body)+len(message.GroupId) > maxSizeBytes { - if newPayload.IsValid() { - response = append(response, newPayload) - } - newPayload = &protobuf.Payload{Messages: []*protobuf.Message{message}} - currentSize = timestampPayloadSize + len(message.Body) + len(message.GroupId) + payloadTagSize + 1 + 1 - } else { - newPayload.Messages = append(newPayload.Messages, message) - currentSize += len(message.Body) + len(message.GroupId) + timestampPayloadSize - } - } - - if newPayload.IsValid() { - response = append(response, newPayload) - } - return response -} - // CalculateSendTime calculates the next epoch // at which a message should be sent. // We randomize it a bit so that not all messages are sent on the same epoch func CalculateSendTime(count uint64, time int64) int64 { return time + int64(uint64(math.Exp2(float64(count-1)))*backoffInterval*offsetToSecond) + int64(rand.Intn(30)) // nolint: gosec - } diff --git a/protocol/datasync/transport_test.go b/protocol/datasync/transport_test.go deleted file mode 100644 index b9f7e6fea..000000000 --- a/protocol/datasync/transport_test.go +++ /dev/null @@ -1,45 +0,0 @@ -package datasync - -import ( - "testing" - - "github.com/vacp2p/mvds/protobuf" - - "github.com/stretchr/testify/require" -) - -func TestSplitPayloadInBatches(t *testing.T) { - payload := &protobuf.Payload{Acks: [][]byte{{0x1}}} - - response := splitPayloadInBatches(payload, 100) - require.NotNil(t, response) - require.Len(t, response, 1) - - payload = &protobuf.Payload{Acks: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}} - // 1 is the maximum size of the actual ack, + the tag size + 1, the length of the field - response = splitPayloadInBatches(payload, 1+payloadTagSize+1) - require.NotNil(t, response) - require.Len(t, response, 4) - - payload = &protobuf.Payload{Offers: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}} - response = splitPayloadInBatches(payload, 1+payloadTagSize+1) - require.NotNil(t, response) - require.Len(t, response, 4) - - payload = &protobuf.Payload{Requests: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}} - response = splitPayloadInBatches(payload, 1+payloadTagSize+1) - require.NotNil(t, response) - require.Len(t, response, 4) - - payload = &protobuf.Payload{Messages: []*protobuf.Message{ - {GroupId: []byte{0x1}, Timestamp: 1, Body: []byte{0x1}}, - {GroupId: []byte{0x2}, Timestamp: 1, Body: []byte{0x2}}, - {GroupId: []byte{0x3}, Timestamp: 1, Body: []byte{0x3}}, - {GroupId: []byte{0x4}, Timestamp: 1, Body: []byte{0x4}}, - }, - } - // 1 for the size of Messages + 2 for the sizes of the repeated MessageFields fields + 10 for the worst size of timestamps + 1 for the size of the body + 1 for the size of group id - response = splitPayloadInBatches(payload, 1+payloadTagSize+2+timestampPayloadSize+1+1) - require.NotNil(t, response) - require.Len(t, response, 4) -}