2019-07-26 07:17:29 +00:00
package datasync
import (
"context"
"crypto/ecdsa"
"errors"
2020-01-02 09:10:19 +00:00
2019-07-26 07:17:29 +00:00
"github.com/golang/protobuf/proto"
"github.com/vacp2p/mvds/protobuf"
"github.com/vacp2p/mvds/state"
"github.com/vacp2p/mvds/transport"
2020-11-03 12:42:42 +00:00
"go.uber.org/zap"
2020-01-02 09:10:19 +00:00
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
2019-07-26 07:17:29 +00:00
)
var errNotInitialized = errors . New ( "Datasync transport not initialized" )
2020-11-03 12:42:42 +00:00
// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes
var payloadTagSize = 14
// timestampPayloadSize is the maximum size in bytes for the timestamp field (uint64)
var timestampPayloadSize = 10
2020-02-10 11:22:37 +00:00
type NodeTransport struct {
2020-11-03 12:42:42 +00:00
packets chan transport . Packet
logger * zap . Logger
maxMessageSize uint32
dispatch func ( context . Context , * ecdsa . PublicKey , [ ] byte , * protobuf . Payload ) error
2019-07-26 07:17:29 +00:00
}
2020-02-10 11:22:37 +00:00
func NewNodeTransport ( ) * NodeTransport {
return & NodeTransport {
2019-07-26 07:17:29 +00:00
packets : make ( chan transport . Packet ) ,
}
}
2020-11-03 12:42:42 +00:00
func ( t * NodeTransport ) Init ( dispatch func ( context . Context , * ecdsa . PublicKey , [ ] byte , * protobuf . Payload ) error , maxMessageSize uint32 , logger * zap . Logger ) {
2019-07-26 07:17:29 +00:00
t . dispatch = dispatch
2020-11-03 12:42:42 +00:00
t . maxMessageSize = maxMessageSize
t . logger = logger
2019-07-26 07:17:29 +00:00
}
2020-02-10 11:22:37 +00:00
func ( t * NodeTransport ) AddPacket ( p transport . Packet ) {
2019-07-26 07:17:29 +00:00
t . packets <- p
}
2020-02-10 11:22:37 +00:00
func ( t * NodeTransport ) Watch ( ) transport . Packet {
2019-07-26 07:17:29 +00:00
return <- t . packets
}
2020-02-10 11:22:37 +00:00
func ( t * NodeTransport ) Send ( _ state . PeerID , peer state . PeerID , payload protobuf . Payload ) error {
2020-11-03 12:42:42 +00:00
var lastError error
2019-07-26 07:17:29 +00:00
if t . dispatch == nil {
return errNotInitialized
}
2020-11-03 12:42:42 +00:00
payloads := splitPayloadInBatches ( & payload , int ( t . maxMessageSize ) )
for _ , payload := range payloads {
data , err := proto . Marshal ( payload )
if err != nil {
return err
}
publicKey , err := datasyncpeer . IDToPublicKey ( peer )
if err != nil {
return err
}
err = t . dispatch ( context . Background ( ) , publicKey , data , payload )
if err != nil {
lastError = err
t . logger . Error ( "failed to send message" , zap . Error ( err ) )
continue
}
2019-07-26 07:17:29 +00:00
}
2020-11-03 12:42:42 +00:00
return lastError
}
func splitPayloadInBatches ( payload * protobuf . Payload , maxSizeBytes int ) [ ] * protobuf . Payload {
newPayload := & protobuf . Payload { }
var response [ ] * protobuf . Payload
currentSize := payloadTagSize
2019-07-26 07:17:29 +00:00
2020-11-03 12:42:42 +00:00
// this is not going to be 100% accurate, but should be fine in most cases, faster
// than using proto.Size
for _ , ack := range payload . Acks {
if len ( ack ) + currentSize + 1 > maxSizeBytes {
// We check if it's valid as it might be that the initial message
// is too big, in this case we still batch it
if newPayload . IsValid ( ) {
response = append ( response , newPayload )
}
newPayload = & protobuf . Payload { Acks : [ ] [ ] byte { ack } }
currentSize = len ( ack ) + payloadTagSize + 1
} else {
newPayload . Acks = append ( newPayload . Acks , ack )
currentSize += len ( ack )
}
2019-07-26 07:17:29 +00:00
}
2020-11-03 12:42:42 +00:00
for _ , offer := range payload . Offers {
if len ( offer ) + currentSize + 1 > maxSizeBytes {
if newPayload . IsValid ( ) {
response = append ( response , newPayload )
}
newPayload = & protobuf . Payload { Offers : [ ] [ ] byte { offer } }
currentSize = len ( offer ) + payloadTagSize + 1
} else {
newPayload . Offers = append ( newPayload . Offers , offer )
currentSize += len ( offer )
}
}
for _ , request := range payload . Requests {
if len ( request ) + currentSize + 1 > maxSizeBytes {
if newPayload . IsValid ( ) {
response = append ( response , newPayload )
}
newPayload = & protobuf . Payload { Requests : [ ] [ ] byte { request } }
currentSize = len ( request ) + payloadTagSize + 1
} else {
newPayload . Requests = append ( newPayload . Requests , request )
currentSize += len ( request )
}
}
for _ , message := range payload . Messages {
// We add the body size, the length field for payload, the length field for group id,
// the length of timestamp, body and groupid
if currentSize + 1 + 1 + timestampPayloadSize + len ( message . Body ) + len ( message . GroupId ) > maxSizeBytes {
if newPayload . IsValid ( ) {
response = append ( response , newPayload )
}
newPayload = & protobuf . Payload { Messages : [ ] * protobuf . Message { message } }
currentSize = timestampPayloadSize + len ( message . Body ) + len ( message . GroupId ) + payloadTagSize + 1 + 1
} else {
newPayload . Messages = append ( newPayload . Messages , message )
currentSize += len ( message . Body ) + len ( message . GroupId ) + timestampPayloadSize
}
}
if newPayload . IsValid ( ) {
response = append ( response , newPayload )
}
return response
2019-07-26 07:17:29 +00:00
}
// CalculateSendTime calculates the next epoch
// at which a message should be sent.
func CalculateSendTime ( count uint64 , time int64 ) int64 {
return time + int64 ( count * 2 ) // @todo this should match that time is increased by whisper periods, aka we only retransmit the first time when a message has expired.
}