Split datasync messages in batches

When sending messages in quick succession, it might be that multiple
messages are batched together in datasync, resulting in a single large
payload.
This commit changes the behavior so that we can pass a max-message-size
and we split the message in batches before sending.

A more elegant way would be to split at the transport layer (i.e
waku/whisper), but that would be incompatible with older client.

We can still do that eventually to support larger messages.
This commit is contained in:
Andrea Maria Piana 2020-11-03 13:42:42 +01:00
parent 86e0ec8e10
commit 75e0809f50
11 changed files with 172 additions and 12 deletions

View File

@ -1 +1 @@
0.63.1 0.63.2

View File

@ -38,6 +38,11 @@ func (w *gethWakuWrapper) MinPow() float64 {
return w.waku.MinPow() return w.waku.MinPow()
} }
// MaxMessageSize returns the MaxMessageSize set
func (w *gethWakuWrapper) MaxMessageSize() uint32 {
return w.waku.MaxMessageSize()
}
// BloomFilter returns the aggregated bloom filter for all the topics of interest. // BloomFilter returns the aggregated bloom filter for all the topics of interest.
// The nodes are required to send only messages that match the advertised bloom filter. // The nodes are required to send only messages that match the advertised bloom filter.
// If a message does not match the bloom, it will tantamount to spam, and the peer will // If a message does not match the bloom, it will tantamount to spam, and the peer will

View File

@ -37,6 +37,11 @@ func (w *gethWhisperWrapper) MinPow() float64 {
return w.whisper.MinPow() return w.whisper.MinPow()
} }
// MaxMessageSize returns the MaxMessageSize set
func (w *gethWhisperWrapper) MaxMessageSize() uint32 {
return w.whisper.MaxMessageSize()
}
// BloomFilter returns the aggregated bloom filter for all the topics of interest. // BloomFilter returns the aggregated bloom filter for all the topics of interest.
// The nodes are required to send only messages that match the advertised bloom filter. // The nodes are required to send only messages that match the advertised bloom filter.
// If a message does not match the bloom, it will tantamount to spam, and the peer will // If a message does not match the bloom, it will tantamount to spam, and the peer will

View File

@ -35,6 +35,7 @@ type Waku interface {
AddSymKeyFromPassword(password string) (string, error) AddSymKeyFromPassword(password string) (string, error)
DeleteSymKey(id string) bool DeleteSymKey(id string) bool
GetSymKey(id string) ([]byte, error) GetSymKey(id string) ([]byte, error)
MaxMessageSize() uint32
Subscribe(opts *SubscriptionOptions) (string, error) Subscribe(opts *SubscriptionOptions) (string, error)
GetFilter(id string) Filter GetFilter(id string) Filter

View File

@ -21,6 +21,7 @@ type Whisper interface {
SetTimeSource(timesource func() time.Time) SetTimeSource(timesource func() time.Time)
// GetCurrentTime returns current time. // GetCurrentTime returns current time.
GetCurrentTime() time.Time GetCurrentTime() time.Time
MaxMessageSize() uint32
// GetPrivateKey retrieves the private key of the specified identity. // GetPrivateKey retrieves the private key of the specified identity.
GetPrivateKey(id string) (*ecdsa.PrivateKey, error) GetPrivateKey(id string) (*ecdsa.PrivateKey, error)

View File

@ -103,7 +103,11 @@ func NewMessageProcessor(
// but actual encrypt and send calls are postponed. // but actual encrypt and send calls are postponed.
// sendDataSync is responsible for encrypting and sending postponed messages. // sendDataSync is responsible for encrypting and sending postponed messages.
if features.Datasync { if features.Datasync {
ds.Init(p.sendDataSync) // We set the max message size to 3/4 of the allowed message size, to leave
// room for encryption.
// Messages will be tried to send in any case, even if they exceed this
// value
ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger)
ds.Start(300 * time.Millisecond) ds.Start(300 * time.Millisecond)
} }

View File

@ -9,14 +9,23 @@ import (
"github.com/vacp2p/mvds/protobuf" "github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state" "github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/transport" "github.com/vacp2p/mvds/transport"
"go.uber.org/zap"
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer" datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
) )
var errNotInitialized = errors.New("Datasync transport not initialized") var errNotInitialized = errors.New("Datasync transport not initialized")
// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes
var payloadTagSize = 14
// timestampPayloadSize is the maximum size in bytes for the timestamp field (uint64)
var timestampPayloadSize = 10
type NodeTransport struct { type NodeTransport struct {
packets chan transport.Packet packets chan transport.Packet
logger *zap.Logger
maxMessageSize uint32
dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error
} }
@ -26,8 +35,10 @@ func NewNodeTransport() *NodeTransport {
} }
} }
func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error) { func (t *NodeTransport) Init(dispatch func(context.Context, *ecdsa.PublicKey, []byte, *protobuf.Payload) error, maxMessageSize uint32, logger *zap.Logger) {
t.dispatch = dispatch t.dispatch = dispatch
t.maxMessageSize = maxMessageSize
t.logger = logger
} }
func (t *NodeTransport) AddPacket(p transport.Packet) { func (t *NodeTransport) AddPacket(p transport.Packet) {
@ -39,11 +50,15 @@ func (t *NodeTransport) Watch() transport.Packet {
} }
func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf.Payload) error { func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
var lastError error
if t.dispatch == nil { if t.dispatch == nil {
return errNotInitialized return errNotInitialized
} }
data, err := proto.Marshal(&payload) payloads := splitPayloadInBatches(&payload, int(t.maxMessageSize))
for _, payload := range payloads {
data, err := proto.Marshal(payload)
if err != nil { if err != nil {
return err return err
} }
@ -52,8 +67,83 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf
if err != nil { if err != nil {
return err return err
} }
err = t.dispatch(context.Background(), publicKey, data, payload)
if err != nil {
lastError = err
t.logger.Error("failed to send message", zap.Error(err))
continue
}
}
return lastError
}
return t.dispatch(context.TODO(), publicKey, data, &payload) func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*protobuf.Payload {
newPayload := &protobuf.Payload{}
var response []*protobuf.Payload
currentSize := payloadTagSize
// this is not going to be 100% accurate, but should be fine in most cases, faster
// than using proto.Size
for _, ack := range payload.Acks {
if len(ack)+currentSize+1 > maxSizeBytes {
// We check if it's valid as it might be that the initial message
// is too big, in this case we still batch it
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Acks: [][]byte{ack}}
currentSize = len(ack) + payloadTagSize + 1
} else {
newPayload.Acks = append(newPayload.Acks, ack)
currentSize += len(ack)
}
}
for _, offer := range payload.Offers {
if len(offer)+currentSize+1 > maxSizeBytes {
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Offers: [][]byte{offer}}
currentSize = len(offer) + payloadTagSize + 1
} else {
newPayload.Offers = append(newPayload.Offers, offer)
currentSize += len(offer)
}
}
for _, request := range payload.Requests {
if len(request)+currentSize+1 > maxSizeBytes {
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Requests: [][]byte{request}}
currentSize = len(request) + payloadTagSize + 1
} else {
newPayload.Requests = append(newPayload.Requests, request)
currentSize += len(request)
}
}
for _, message := range payload.Messages {
// We add the body size, the length field for payload, the length field for group id,
// the length of timestamp, body and groupid
if currentSize+1+1+timestampPayloadSize+len(message.Body)+len(message.GroupId) > maxSizeBytes {
if newPayload.IsValid() {
response = append(response, newPayload)
}
newPayload = &protobuf.Payload{Messages: []*protobuf.Message{message}}
currentSize = timestampPayloadSize + len(message.Body) + len(message.GroupId) + payloadTagSize + 1 + 1
} else {
newPayload.Messages = append(newPayload.Messages, message)
currentSize += len(message.Body) + len(message.GroupId) + timestampPayloadSize
}
}
if newPayload.IsValid() {
response = append(response, newPayload)
}
return response
} }
// CalculateSendTime calculates the next epoch // CalculateSendTime calculates the next epoch

View File

@ -0,0 +1,45 @@
package datasync
import (
"testing"
"github.com/vacp2p/mvds/protobuf"
"github.com/stretchr/testify/require"
)
func TestSplitPayloadInBatches(t *testing.T) {
payload := &protobuf.Payload{Acks: [][]byte{{0x1}}}
response := splitPayloadInBatches(payload, 100)
require.NotNil(t, response)
require.Len(t, response, 1)
payload = &protobuf.Payload{Acks: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}}
// 1 is the maximum size of the actual ack, + the tag size + 1, the length of the field
response = splitPayloadInBatches(payload, 1+payloadTagSize+1)
require.NotNil(t, response)
require.Len(t, response, 4)
payload = &protobuf.Payload{Offers: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}}
response = splitPayloadInBatches(payload, 1+payloadTagSize+1)
require.NotNil(t, response)
require.Len(t, response, 4)
payload = &protobuf.Payload{Requests: [][]byte{{0x1}, {0x2}, {0x3}, {0x4}}}
response = splitPayloadInBatches(payload, 1+payloadTagSize+1)
require.NotNil(t, response)
require.Len(t, response, 4)
payload = &protobuf.Payload{Messages: []*protobuf.Message{
{GroupId: []byte{0x1}, Timestamp: 1, Body: []byte{0x1}},
{GroupId: []byte{0x2}, Timestamp: 1, Body: []byte{0x2}},
{GroupId: []byte{0x3}, Timestamp: 1, Body: []byte{0x3}},
{GroupId: []byte{0x4}, Timestamp: 1, Body: []byte{0x4}},
},
}
// 1 for the size of Messages + 2 for the sizes of the repeated MessageFields fields + 10 for the worst size of timestamps + 1 for the size of the body + 1 for the size of group id
response = splitPayloadInBatches(payload, 1+payloadTagSize+2+timestampPayloadSize+1+1)
require.NotNil(t, response)
require.Len(t, response, 4)
}

View File

@ -17,6 +17,7 @@ type Transport interface {
JoinPublic(chatID string) error JoinPublic(chatID string) error
LeavePublic(chatID string) error LeavePublic(chatID string) error
GetCurrentTime() uint64 GetCurrentTime() uint64
MaxMessageSize() uint32
SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error) SendPublic(ctx context.Context, newMessage *types.NewMessage, chatName string) ([]byte, error)
SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error) SendPrivateWithSharedSecret(ctx context.Context, newMessage *types.NewMessage, publicKey *ecdsa.PublicKey, secret []byte) ([]byte, error)

View File

@ -315,6 +315,10 @@ func (a *Transport) GetCurrentTime() uint64 {
return uint64(a.waku.GetCurrentTime().UnixNano() / int64(time.Millisecond)) return uint64(a.waku.GetCurrentTime().UnixNano() / int64(time.Millisecond))
} }
func (a *Transport) MaxMessageSize() uint32 {
return a.waku.MaxMessageSize()
}
func (a *Transport) Stop() error { func (a *Transport) Stop() error {
if a.envelopesMonitor != nil { if a.envelopesMonitor != nil {
a.envelopesMonitor.Stop() a.envelopesMonitor.Stop()

View File

@ -366,6 +366,10 @@ func (a *Transport) GetCurrentTime() uint64 {
return uint64(a.shh.GetCurrentTime().UnixNano() / int64(time.Millisecond)) return uint64(a.shh.GetCurrentTime().UnixNano() / int64(time.Millisecond))
} }
func (a *Transport) MaxMessageSize() uint32 {
return a.shh.MaxMessageSize()
}
func (a *Transport) Stop() error { func (a *Transport) Stop() error {
if a.envelopesMonitor != nil { if a.envelopesMonitor != nil {
a.envelopesMonitor.Stop() a.envelopesMonitor.Stop()