chore: remove message chunking from datasync
It was redundant to recently introduced messages segmentation layer.
This commit is contained in:
parent
1794b93c16
commit
7b3013a010
|
@ -129,11 +129,7 @@ func NewMessageSender(
|
|||
// but actual encrypt and send calls are postponed.
|
||||
// sendDataSync is responsible for encrypting and sending postponed messages.
|
||||
if features.Datasync {
|
||||
// We set the max message size to 3/4 of the allowed message size, to leave
|
||||
// room for encryption.
|
||||
// Messages will be tried to send in any case, even if they exceed this
|
||||
// value
|
||||
ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger)
|
||||
ds.Init(p.sendDataSync, logger)
|
||||
ds.Start(datasync.DatasyncTicker)
|
||||
}
|
||||
|
||||
|
@ -1238,7 +1234,7 @@ func (s *MessageSender) StartDatasync() {
|
|||
ds := datasync.New(dataSyncNode, dataSyncTransport, true, s.logger)
|
||||
|
||||
if s.datasyncEnabled {
|
||||
ds.Init(s.sendDataSync, s.transport.MaxMessageSize()/4*3, s.logger)
|
||||
ds.Init(s.sendDataSync, s.logger)
|
||||
ds.Start(datasync.DatasyncTicker)
|
||||
}
|
||||
|
||||
|
|
|
@ -19,24 +19,17 @@ import (
|
|||
|
||||
const backoffInterval = 30
|
||||
|
||||
var errNotInitialized = errors.New("Datasync transport not initialized")
|
||||
var errNotInitialized = errors.New("datasync transport not initialized")
|
||||
var DatasyncTicker = 300 * time.Millisecond
|
||||
|
||||
// It's easier to calculate nextEpoch if we consider seconds as a unit rather than
|
||||
// 300 ms, so we multiply the result by the ratio
|
||||
var offsetToSecond = uint64(time.Second / DatasyncTicker)
|
||||
|
||||
// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes
|
||||
var payloadTagSize = 14
|
||||
|
||||
// timestampPayloadSize is the maximum size in bytes for the timestamp field (uint64)
|
||||
var timestampPayloadSize = 10
|
||||
|
||||
type NodeTransport struct {
|
||||
packets chan transport.Packet
|
||||
logger *zap.Logger
|
||||
maxMessageSize uint32
|
||||
dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error
|
||||
packets chan transport.Packet
|
||||
logger *zap.Logger
|
||||
dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error
|
||||
}
|
||||
|
||||
func NewNodeTransport() *NodeTransport {
|
||||
|
@ -45,9 +38,8 @@ func NewNodeTransport() *NodeTransport {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error, maxMessageSize uint32, logger *zap.Logger) {
|
||||
func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error, logger *zap.Logger) {
|
||||
t.dispatch = dispatch
|
||||
t.maxMessageSize = maxMessageSize
|
||||
t.logger = logger
|
||||
}
|
||||
|
||||
|
@ -64,110 +56,37 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf
|
|||
return errNotInitialized
|
||||
}
|
||||
|
||||
payloads := splitPayloadInBatches(&payload, int(t.maxMessageSize))
|
||||
for _, payload := range payloads {
|
||||
if !payload.IsValid() {
|
||||
t.logger.Error("payload is invalid")
|
||||
return nil
|
||||
}
|
||||
|
||||
if !payload.IsValid() {
|
||||
t.logger.Error("payload is invalid")
|
||||
continue
|
||||
}
|
||||
marshalledPayload, err := proto.Marshal(&payload)
|
||||
if err != nil {
|
||||
t.logger.Error("failed to marshal payload")
|
||||
return nil
|
||||
}
|
||||
|
||||
marshalledPayload, err := proto.Marshal(payload)
|
||||
if err != nil {
|
||||
t.logger.Error("failed to marshal payload")
|
||||
continue
|
||||
}
|
||||
publicKey, err := datasyncpeer.IDToPublicKey(peer)
|
||||
if err != nil {
|
||||
t.logger.Error("failed to convert id to public key", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
publicKey, err := datasyncpeer.IDToPublicKey(peer)
|
||||
if err != nil {
|
||||
t.logger.Error("failed to conver id to public key", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
// We don't return an error otherwise datasync will keep
|
||||
// re-trying sending at each epoch
|
||||
err = t.dispatch(context.Background(), publicKey, marshalledPayload, payload)
|
||||
if err != nil {
|
||||
t.logger.Error("failed to send message", zap.Error(err))
|
||||
continue
|
||||
}
|
||||
// We don't return an error otherwise datasync will keep
|
||||
// re-trying sending at each epoch
|
||||
err = t.dispatch(context.Background(), publicKey, marshalledPayload, &payload)
|
||||
if err != nil {
|
||||
t.logger.Error("failed to send message", zap.Error(err))
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*protobuf.Payload {
|
||||
newPayload := &protobuf.Payload{}
|
||||
var response []*protobuf.Payload
|
||||
currentSize := payloadTagSize
|
||||
|
||||
// this is not going to be 100% accurate, but should be fine in most cases, faster
|
||||
// than using proto.Size
|
||||
for _, ack := range payload.Acks {
|
||||
if len(ack)+currentSize+1 > maxSizeBytes {
|
||||
// We check if it's valid as it might be that the initial message
|
||||
// is too big, in this case we still batch it
|
||||
if newPayload.IsValid() {
|
||||
response = append(response, newPayload)
|
||||
}
|
||||
newPayload = &protobuf.Payload{Acks: [][]byte{ack}}
|
||||
currentSize = len(ack) + payloadTagSize + 1
|
||||
} else {
|
||||
newPayload.Acks = append(newPayload.Acks, ack)
|
||||
currentSize += len(ack)
|
||||
}
|
||||
}
|
||||
|
||||
for _, offer := range payload.Offers {
|
||||
if len(offer)+currentSize+1 > maxSizeBytes {
|
||||
if newPayload.IsValid() {
|
||||
response = append(response, newPayload)
|
||||
}
|
||||
newPayload = &protobuf.Payload{Offers: [][]byte{offer}}
|
||||
currentSize = len(offer) + payloadTagSize + 1
|
||||
} else {
|
||||
newPayload.Offers = append(newPayload.Offers, offer)
|
||||
currentSize += len(offer)
|
||||
}
|
||||
}
|
||||
|
||||
for _, request := range payload.Requests {
|
||||
if len(request)+currentSize+1 > maxSizeBytes {
|
||||
if newPayload.IsValid() {
|
||||
response = append(response, newPayload)
|
||||
}
|
||||
newPayload = &protobuf.Payload{Requests: [][]byte{request}}
|
||||
currentSize = len(request) + payloadTagSize + 1
|
||||
} else {
|
||||
newPayload.Requests = append(newPayload.Requests, request)
|
||||
currentSize += len(request)
|
||||
}
|
||||
}
|
||||
|
||||
for _, message := range payload.Messages {
|
||||
// We add the body size, the length field for payload, the length field for group id,
|
||||
// the length of timestamp, body and groupid
|
||||
if currentSize+1+1+timestampPayloadSize+len(message.Body)+len(message.GroupId) > maxSizeBytes {
|
||||
if newPayload.IsValid() {
|
||||
response = append(response, newPayload)
|
||||
}
|
||||
newPayload = &protobuf.Payload{Messages: []*protobuf.Message{message}}
|
||||
currentSize = timestampPayloadSize + len(message.Body) + len(message.GroupId) + payloadTagSize + 1 + 1
|
||||
} else {
|
||||
newPayload.Messages = append(newPayload.Messages, message)
|
||||
currentSize += len(message.Body) + len(message.GroupId) + timestampPayloadSize
|
||||
}
|
||||
}
|
||||
|
||||
if newPayload.IsValid() {
|
||||
response = append(response, newPayload)
|
||||
}
|
||||
return response
|
||||
}
|
||||
|
||||
// CalculateSendTime calculates the next epoch
|
||||
// at which a message should be sent.
|
||||
// We randomize it a bit so that not all messages are sent on the same epoch
|
||||
func CalculateSendTime(count uint64, time int64) int64 {
|
||||
return time + int64(uint64(math.Exp2(float64(count-1)))*backoffInterval*offsetToSecond) + int64(rand.Intn(30)) // nolint: gosec
|
||||
|
||||
}
|
||||
|
|
|
@ -1,45 +0,0 @@
|
|||
package datasync
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/vacp2p/mvds/protobuf"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestSplitPayloadInBatches(t *testing.T) {
|
||||
payload := &protobuf.Payload{Acks: [][]byte{{0x1}}}
|
||||
|
||||
response := splitPayloadInBatches(payload, 100)
|
||||
require.NotNil(t, response)
|
||||
require.Len(t, response, 1)
|
||||
|
||||
payload = &protobuf.Payload{Acks: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}}
|
||||
// 1 is the maximum size of the actual ack, + the tag size + 1, the length of the field
|
||||
response = splitPayloadInBatches(payload, 1+payloadTagSize+1)
|
||||
require.NotNil(t, response)
|
||||
require.Len(t, response, 4)
|
||||
|
||||
payload = &protobuf.Payload{Offers: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}}
|
||||
response = splitPayloadInBatches(payload, 1+payloadTagSize+1)
|
||||
require.NotNil(t, response)
|
||||
require.Len(t, response, 4)
|
||||
|
||||
payload = &protobuf.Payload{Requests: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}}
|
||||
response = splitPayloadInBatches(payload, 1+payloadTagSize+1)
|
||||
require.NotNil(t, response)
|
||||
require.Len(t, response, 4)
|
||||
|
||||
payload = &protobuf.Payload{Messages: []*protobuf.Message{
|
||||
{GroupId: []byte{0x1}, Timestamp: 1, Body: []byte{0x1}},
|
||||
{GroupId: []byte{0x2}, Timestamp: 1, Body: []byte{0x2}},
|
||||
{GroupId: []byte{0x3}, Timestamp: 1, Body: []byte{0x3}},
|
||||
{GroupId: []byte{0x4}, Timestamp: 1, Body: []byte{0x4}},
|
||||
},
|
||||
}
|
||||
// 1 for the size of Messages + 2 for the sizes of the repeated MessageFields fields + 10 for the worst size of timestamps + 1 for the size of the body + 1 for the size of group id
|
||||
response = splitPayloadInBatches(payload, 1+payloadTagSize+2+timestampPayloadSize+1+1)
|
||||
require.NotNil(t, response)
|
||||
require.Len(t, response, 4)
|
||||
}
|
Loading…
Reference in New Issue