Don't return an error on datasync Send
if an error is returned on the Send function, datasync will keep retrying a message at each epoch. If the message cannot be sent (for example is too large), then no messages will be sent until logout.
This commit is contained in:
parent
0c79e50ca8
commit
938e0d77dd
|
@ -59,7 +59,6 @@ func (t *NodeTransport) Watch() transport.Packet {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
|
func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf.Payload) error {
|
||||||
var lastError error
|
|
||||||
if t.dispatch == nil {
|
if t.dispatch == nil {
|
||||||
return errNotInitialized
|
return errNotInitialized
|
||||||
}
|
}
|
||||||
|
@ -74,21 +73,25 @@ func (t *NodeTransport) Send(_ state.PeerID, peer state.PeerID, payload protobuf
|
||||||
|
|
||||||
data, err := proto.Marshal(payload)
|
data, err := proto.Marshal(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
t.logger.Error("failed to marshal payload")
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
publicKey, err := datasyncpeer.IDToPublicKey(peer)
|
publicKey, err := datasyncpeer.IDToPublicKey(peer)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
t.logger.Error("failed to conver id to public key", zap.Error(err))
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
|
// We don't return an error otherwise datasync will keep
|
||||||
|
// re-trying sending at each epoch
|
||||||
err = t.dispatch(context.Background(), publicKey, data, payload)
|
err = t.dispatch(context.Background(), publicKey, data, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
lastError = err
|
|
||||||
t.logger.Error("failed to send message", zap.Error(err))
|
t.logger.Error("failed to send message", zap.Error(err))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return lastError
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*protobuf.Payload {
|
func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*protobuf.Payload {
|
||||||
|
|
Loading…
Reference in New Issue