From 75e0809f50ffdc0c9be275a0393e6d66c3b2c90d Mon Sep 17 00:00:00 2001 From: Andrea Maria Piana Date: Tue, 3 Nov 2020 13:42:42 +0100 Subject: [PATCH] Split datasync messages in batches When sending messages in quick succession, it might be that multiple messages are batched together in datasync, resulting in a single large payload. This commit changes the behavior so that we can pass a max-message-size and we split the message in batches before sending. A more elegant way would be to split at the transport layer (i.e waku/whisper), but that would be incompatible with older client. We can still do that eventually to support larger messages. --- VERSION | 2 +- eth-node/bridge/geth/waku.go | 5 + eth-node/bridge/geth/whisper.go | 5 + eth-node/types/waku.go | 1 + eth-node/types/whisper.go | 1 + protocol/common/message_processor.go | 6 +- protocol/datasync/transport.go | 110 ++++++++++++++++-- protocol/datasync/transport_test.go | 45 +++++++ protocol/transport/transport.go | 1 + protocol/transport/waku/waku_service.go | 4 + protocol/transport/whisper/whisper_service.go | 4 + 11 files changed, 172 insertions(+), 12 deletions(-) create mode 100644 protocol/datasync/transport_test.go diff --git a/VERSION b/VERSION index 630f2e0ce..2d9cdfc21 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -0.63.1 +0.63.2 diff --git a/eth-node/bridge/geth/waku.go b/eth-node/bridge/geth/waku.go index 56c76515a..83b1d1ee7 100644 --- a/eth-node/bridge/geth/waku.go +++ b/eth-node/bridge/geth/waku.go @@ -38,6 +38,11 @@ func (w *gethWakuWrapper) MinPow() float64 { return w.waku.MinPow() } +// MaxMessageSize returns the MaxMessageSize set +func (w *gethWakuWrapper) MaxMessageSize() uint32 { + return w.waku.MaxMessageSize() +} + // BloomFilter returns the aggregated bloom filter for all the topics of interest. // The nodes are required to send only messages that match the advertised bloom filter. // If a message does not match the bloom, it will tantamount to spam, and the peer will diff --git a/eth-node/bridge/geth/whisper.go b/eth-node/bridge/geth/whisper.go index 81c95c436..c274c6cc0 100644 --- a/eth-node/bridge/geth/whisper.go +++ b/eth-node/bridge/geth/whisper.go @@ -37,6 +37,11 @@ func (w *gethWhisperWrapper) MinPow() float64 { return w.whisper.MinPow() } +// MaxMessageSize returns the MaxMessageSize set +func (w *gethWhisperWrapper) MaxMessageSize() uint32 { + return w.whisper.MaxMessageSize() +} + // BloomFilter returns the aggregated bloom filter for all the topics of interest. // The nodes are required to send only messages that match the advertised bloom filter. // If a message does not match the bloom, it will tantamount to spam, and the peer will diff --git a/eth-node/types/waku.go b/eth-node/types/waku.go index ce81c2cf3..ee5ab15b6 100644 --- a/eth-node/types/waku.go +++ b/eth-node/types/waku.go @@ -35,6 +35,7 @@ type Waku interface { AddSymKeyFromPassword(password string) (string, error) DeleteSymKey(id string) bool GetSymKey(id string) ([]byte, error) + MaxMessageSize() uint32 Subscribe(opts *SubscriptionOptions) (string, error) GetFilter(id string) Filter diff --git a/eth-node/types/whisper.go b/eth-node/types/whisper.go index f3266bbab..68f17ec5d 100644 --- a/eth-node/types/whisper.go +++ b/eth-node/types/whisper.go @@ -21,6 +21,7 @@ type Whisper interface { SetTimeSource(timesource func() time.Time) // GetCurrentTime returns current time. GetCurrentTime() time.Time + MaxMessageSize() uint32 // GetPrivateKey retrieves the private key of the specified identity. GetPrivateKey(id string) (*ecdsa.PrivateKey, error) diff --git a/protocol/common/message_processor.go b/protocol/common/message_processor.go index a67f76f1a..e81eea616 100644 --- a/protocol/common/message_processor.go +++ b/protocol/common/message_processor.go @@ -103,7 +103,11 @@ func NewMessageProcessor( // but actual encrypt and send calls are postponed. // sendDataSync is responsible for encrypting and sending postponed messages. if features.Datasync { - ds.Init(p.sendDataSync) + // We set the max message size to 3/4 of the allowed message size, to leave + // room for encryption. + // Messages will be tried to send in any case, even if they exceed this + // value + ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger) ds.Start(300 * time.Millisecond) } diff --git a/protocol/datasync/transport.go b/protocol/datasync/transport.go index 6daa7a38d..7e1c60862 100644 --- a/protocol/datasync/transport.go +++ b/protocol/datasync/transport.go @@ -9,15 +9,24 @@ import ( "github.com/vacp2p/mvds/protobuf" "github.com/vacp2p/mvds/state" "github.com/vacp2p/mvds/transport" + "go.uber.org/zap" datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer" ) var errNotInitialized = errors.New("Datasync transport not initialized") +// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes +var payloadTagSize = 14 + +// timestampPayloadSize is the maximum size in bytes for the timestamp field (uint64) +var timestampPayloadSize = 10 + type NodeTransport struct { - packets chan transport.Packet - dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error + packets chan transport.Packet + logger *zap.Logger + maxMessageSize uint32 + dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error } func NewNodeTransport() *NodeTransport { @@ -26,8 +35,10 @@ func NewNodeTransport() *NodeTransport { } } -func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error) { +func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error, maxMessageSize uint32, logger *zap.Logger) { t.dispatch = dispatch + t.maxMessageSize = maxMessageSize + t.logger = logger } func (t *NodeTransport) AddPacket(p transport.Packet) { @@ -39,21 +50,100 @@ func (t *NodeTransport) Watch() transport.Packet { } func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf.Payload) error { + var lastError error if t.dispatch == nil { return errNotInitialized } - data, err := proto.Marshal(&payload) - if err != nil { - return err + payloads := splitPayloadInBatches(&payload, int(t.maxMessageSize)) + for _, payload := range payloads { + + data, err := proto.Marshal(payload) + if err != nil { + return err + } + + publicKey, err := datasyncpeer.IDToPublicKey(peer) + if err != nil { + return err + } + err = t.dispatch(context.Background(), publicKey, data, payload) + if err != nil { + lastError = err + t.logger.Error("failed to send message", zap.Error(err)) + continue + } + } + return lastError +} + +func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*protobuf.Payload { + newPayload := &protobuf.Payload{} + var response []*protobuf.Payload + currentSize := payloadTagSize + + // this is not going to be 100% accurate, but should be fine in most cases, faster + // than using proto.Size + for _, ack := range payload.Acks { + if len(ack)+currentSize+1 > maxSizeBytes { + // We check if it's valid as it might be that the initial message + // is too big, in this case we still batch it + if newPayload.IsValid() { + response = append(response, newPayload) + } + newPayload = &protobuf.Payload{Acks: [][]byte{ack}} + currentSize = len(ack) + payloadTagSize + 1 + } else { + newPayload.Acks = append(newPayload.Acks, ack) + currentSize += len(ack) + } } - publicKey, err := datasyncpeer.IDToPublicKey(peer) - if err != nil { - return err + for _, offer := range payload.Offers { + if len(offer)+currentSize+1 > maxSizeBytes { + if newPayload.IsValid() { + response = append(response, newPayload) + } + newPayload = &protobuf.Payload{Offers: [][]byte{offer}} + currentSize = len(offer) + payloadTagSize + 1 + } else { + newPayload.Offers = append(newPayload.Offers, offer) + currentSize += len(offer) + } } - return t.dispatch(context.TODO(), publicKey, data, &payload) + for _, request := range payload.Requests { + if len(request)+currentSize+1 > maxSizeBytes { + if newPayload.IsValid() { + response = append(response, newPayload) + } + newPayload = &protobuf.Payload{Requests: [][]byte{request}} + currentSize = len(request) + payloadTagSize + 1 + } else { + newPayload.Requests = append(newPayload.Requests, request) + currentSize += len(request) + } + } + + for _, message := range payload.Messages { + // We add the body size, the length field for payload, the length field for group id, + // the length of timestamp, body and groupid + if currentSize+1+1+timestampPayloadSize+len(message.Body)+len(message.GroupId) > maxSizeBytes { + if newPayload.IsValid() { + response = append(response, newPayload) + } + newPayload = &protobuf.Payload{Messages: []*protobuf.Message{message}} + currentSize = timestampPayloadSize + len(message.Body) + len(message.GroupId) + payloadTagSize + 1 + 1 + } else { + newPayload.Messages = append(newPayload.Messages, message) + currentSize += len(message.Body) + len(message.GroupId) + timestampPayloadSize + } + } + + if newPayload.IsValid() { + response = append(response, newPayload) + } + return response } // CalculateSendTime calculates the next epoch diff --git a/protocol/datasync/transport_test.go b/protocol/datasync/transport_test.go new file mode 100644 index 000000000..b9f7e6fea --- /dev/null +++ b/protocol/datasync/transport_test.go @@ -0,0 +1,45 @@ +package datasync + +import ( + "testing" + + "github.com/vacp2p/mvds/protobuf" + + "github.com/stretchr/testify/require" +) + +func TestSplitPayloadInBatches(t *testing.T) { + payload := &protobuf.Payload{Acks: [][]byte{{0x1}}} + + response := splitPayloadInBatches(payload, 100) + require.NotNil(t, response) + require.Len(t, response, 1) + + payload = &protobuf.Payload{Acks: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}} + // 1 is the maximum size of the actual ack, + the tag size + 1, the length of the field + response = splitPayloadInBatches(payload, 1+payloadTagSize+1) + require.NotNil(t, response) + require.Len(t, response, 4) + + payload = &protobuf.Payload{Offers: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}} + response = splitPayloadInBatches(payload, 1+payloadTagSize+1) + require.NotNil(t, response) + require.Len(t, response, 4) + + payload = &protobuf.Payload{Requests: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}} + response = splitPayloadInBatches(payload, 1+payloadTagSize+1) + require.NotNil(t, response) + require.Len(t, response, 4) + + payload = &protobuf.Payload{Messages: []*protobuf.Message{ + {GroupId: []byte{0x1}, Timestamp: 1, Body: []byte{0x1}}, + {GroupId: []byte{0x2}, Timestamp: 1, Body: []byte{0x2}}, + {GroupId: []byte{0x3}, Timestamp: 1, Body: []byte{0x3}}, + {GroupId: []byte{0x4}, Timestamp: 1, Body: []byte{0x4}}, + }, + } + // 1 for the size of Messages + 2 for the sizes of the repeated MessageFields fields + 10 for the worst size of timestamps + 1 for the size of the body + 1 for the size of group id + response = splitPayloadInBatches(payload, 1+payloadTagSize+2+timestampPayloadSize+1+1) + require.NotNil(t, response) + require.Len(t, response, 4) +} diff --git a/protocol/transport/transport.go b/protocol/transport/transport.go index 1d804c62e..de15705bd 100644 --- a/protocol/transport/transport.go +++ b/protocol/transport/transport.go @@ -17,6 +17,7 @@ type Transport interface { JoinPublic(chatID string) error LeavePublic(chatID string) error GetCurrentTime() uint64 + MaxMessageSize() uint32 SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) diff --git a/protocol/transport/waku/waku_service.go b/protocol/transport/waku/waku_service.go index c184a245d..1e81c6b99 100644 --- a/protocol/transport/waku/waku_service.go +++ b/protocol/transport/waku/waku_service.go @@ -315,6 +315,10 @@ func (a *Transport) GetCurrentTime() uint64 { return uint64(a.waku.GetCurrentTime().UnixNano() / int64(time.Millisecond)) } +func (a *Transport) MaxMessageSize() uint32 { + return a.waku.MaxMessageSize() +} + func (a *Transport) Stop() error { if a.envelopesMonitor != nil { a.envelopesMonitor.Stop() diff --git a/protocol/transport/whisper/whisper_service.go b/protocol/transport/whisper/whisper_service.go index fd043eb8b..4959918bf 100644 --- a/protocol/transport/whisper/whisper_service.go +++ b/protocol/transport/whisper/whisper_service.go @@ -366,6 +366,10 @@ func (a *Transport) GetCurrentTime() uint64 { return uint64(a.shh.GetCurrentTime().UnixNano() / int64(time.Millisecond)) } +func (a *Transport) MaxMessageSize() uint32 { + return a.shh.MaxMessageSize() +} + func (a *Transport) Stop() error { if a.envelopesMonitor != nil { a.envelopesMonitor.Stop()