Fix datasync retry
This commit fixes a bug on the mvds library where the nextEpoch would be incorrectly summed to the retry time, resulting in messages not being retried, or retried much less frequently the longer the app was running. It also updates the retry timing to backoff exponentially at multiple of 30 seconds.
This commit is contained in:
parent
002f9a5597
commit
6ad047d56f
3
go.mod
3
go.mod
|
@ -48,7 +48,6 @@ require (
|
||||||
github.com/pborman/uuid v1.2.0
|
github.com/pborman/uuid v1.2.0
|
||||||
github.com/pkg/errors v0.9.1
|
github.com/pkg/errors v0.9.1
|
||||||
github.com/prometheus/client_golang v1.5.0
|
github.com/prometheus/client_golang v1.5.0
|
||||||
github.com/prometheus/common v0.9.1
|
|
||||||
github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect
|
github.com/russolsen/ohyeah v0.0.0-20160324131710-f4938c005315 // indirect
|
||||||
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
|
github.com/russolsen/same v0.0.0-20160222130632-f089df61f51d // indirect
|
||||||
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
|
github.com/russolsen/transit v0.0.0-20180705123435-0794b4c4505a
|
||||||
|
@ -63,7 +62,7 @@ require (
|
||||||
github.com/stretchr/testify v1.5.1
|
github.com/stretchr/testify v1.5.1
|
||||||
github.com/syndtr/goleveldb v1.0.0
|
github.com/syndtr/goleveldb v1.0.0
|
||||||
github.com/tsenart/tb v0.0.0-20181025101425-0d2499c8b6e9
|
github.com/tsenart/tb v0.0.0-20181025101425-0d2499c8b6e9
|
||||||
github.com/vacp2p/mvds v0.0.23
|
github.com/vacp2p/mvds v0.0.24-0.20201124060106-26d8e94130d8
|
||||||
github.com/wealdtech/go-ens/v3 v3.3.0
|
github.com/wealdtech/go-ens/v3 v3.3.0
|
||||||
go.uber.org/multierr v1.4.0 // indirect
|
go.uber.org/multierr v1.4.0 // indirect
|
||||||
go.uber.org/zap v1.13.0
|
go.uber.org/zap v1.13.0
|
||||||
|
|
4
go.sum
4
go.sum
|
@ -701,8 +701,8 @@ github.com/tyler-smith/go-bip39 v1.0.2/go.mod h1:sJ5fKU0s6JVwZjjcUEX2zFOnvq0ASQ2
|
||||||
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
|
github.com/uber/jaeger-client-go v0.0.0-20180607151842-f7e0d4744fa6/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
|
||||||
github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
|
github.com/uber/jaeger-lib v0.0.0-20180615202729-a51202d6f4a7/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6+uUTzImX/AauajbLI56U=
|
||||||
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0=
|
||||||
github.com/vacp2p/mvds v0.0.23 h1:BKdn7tyGvl/J/Pwv6FlcW6Xbzm+17jv141GB1mFXyOU=
|
github.com/vacp2p/mvds v0.0.24-0.20201124060106-26d8e94130d8 h1:aSQuY64yglPb7I6lZRXt/xWD4ExM1DZo8Gpb7xCz6zk=
|
||||||
github.com/vacp2p/mvds v0.0.23/go.mod h1:uUmtiahU7efOVl/5w5yk9jOze5xYpDZDrSrT8TvHXjQ=
|
github.com/vacp2p/mvds v0.0.24-0.20201124060106-26d8e94130d8/go.mod h1:uUmtiahU7efOVl/5w5yk9jOze5xYpDZDrSrT8TvHXjQ=
|
||||||
github.com/wealdtech/go-ens/v3 v3.3.0 h1:xJJlFLEbdnzU7dLFwyg5fXlZVprwoLPlAPVZdpYCCrY=
|
github.com/wealdtech/go-ens/v3 v3.3.0 h1:xJJlFLEbdnzU7dLFwyg5fXlZVprwoLPlAPVZdpYCCrY=
|
||||||
github.com/wealdtech/go-ens/v3 v3.3.0/go.mod h1:P2OEBvgkhXLrPzPN+eR5z2/wFIGwHyijTDvpuC1xLlo=
|
github.com/wealdtech/go-ens/v3 v3.3.0/go.mod h1:P2OEBvgkhXLrPzPN+eR5z2/wFIGwHyijTDvpuC1xLlo=
|
||||||
github.com/wealdtech/go-multicodec v1.2.0 h1:9AHSxcSE9F9r6ZvQLAO0EXCdM08QfYohaXmW3k6sSh4=
|
github.com/wealdtech/go-multicodec v1.2.0 h1:9AHSxcSE9F9r6ZvQLAO0EXCdM08QfYohaXmW3k6sSh4=
|
||||||
|
|
|
@ -108,7 +108,7 @@ func NewMessageProcessor(
|
||||||
// Messages will be tried to send in any case, even if they exceed this
|
// Messages will be tried to send in any case, even if they exceed this
|
||||||
// value
|
// value
|
||||||
ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger)
|
ds.Init(p.sendDataSync, transport.MaxMessageSize()/4*3, logger)
|
||||||
ds.Start(300 * time.Millisecond)
|
ds.Start(datasync.DatasyncTicker)
|
||||||
}
|
}
|
||||||
|
|
||||||
return p, nil
|
return p, nil
|
||||||
|
|
|
@ -4,6 +4,8 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"crypto/ecdsa"
|
"crypto/ecdsa"
|
||||||
"errors"
|
"errors"
|
||||||
|
"math"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
"github.com/vacp2p/mvds/protobuf"
|
"github.com/vacp2p/mvds/protobuf"
|
||||||
|
@ -14,7 +16,14 @@ import (
|
||||||
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
|
datasyncpeer "github.com/status-im/status-go/protocol/datasync/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const backoffInterval = 30
|
||||||
|
|
||||||
var errNotInitialized = errors.New("Datasync transport not initialized")
|
var errNotInitialized = errors.New("Datasync transport not initialized")
|
||||||
|
var DatasyncTicker = 300 * time.Millisecond
|
||||||
|
|
||||||
|
// It's easier to calculate nextEpoch if we consider seconds as a unit rather than
|
||||||
|
// 300 ms, so we multiply the result by the ratio
|
||||||
|
var offsetToSecond = uint64(time.Second / DatasyncTicker)
|
||||||
|
|
||||||
// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes
|
// payloadTagSize is the tag size for the protobuf.Payload message which is number of fields * 2 bytes
|
||||||
var payloadTagSize = 14
|
var payloadTagSize = 14
|
||||||
|
@ -149,5 +158,5 @@ func splitPayloadInBatches(payload *protobuf.Payload, maxSizeBytes int) []*proto
|
||||||
// CalculateSendTime calculates the next epoch
|
// CalculateSendTime calculates the next epoch
|
||||||
// at which a message should be sent.
|
// at which a message should be sent.
|
||||||
func CalculateSendTime(count uint64, time int64) int64 {
|
func CalculateSendTime(count uint64, time int64) int64 {
|
||||||
return time + int64(count*2) // @todo this should match that time is increased by whisper periods, aka we only retransmit the first time when a message has expired.
|
return time + int64(uint64(math.Exp2(float64(count-1)))*backoffInterval*offsetToSecond)
|
||||||
}
|
}
|
||||||
|
|
|
@ -545,7 +545,7 @@ func (n *Node) insertSyncState(groupID *state.GroupID, messageID state.MessageID
|
||||||
|
|
||||||
func (n *Node) updateSendEpoch(s state.State) state.State {
|
func (n *Node) updateSendEpoch(s state.State) state.State {
|
||||||
s.SendCount += 1
|
s.SendCount += 1
|
||||||
s.SendEpoch += n.nextEpoch(s.SendCount, n.epoch)
|
s.SendEpoch = n.nextEpoch(s.SendCount, n.epoch)
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -426,7 +426,7 @@ github.com/tsenart/tb
|
||||||
# github.com/tyler-smith/go-bip39 v1.0.2
|
# github.com/tyler-smith/go-bip39 v1.0.2
|
||||||
github.com/tyler-smith/go-bip39
|
github.com/tyler-smith/go-bip39
|
||||||
github.com/tyler-smith/go-bip39/wordlists
|
github.com/tyler-smith/go-bip39/wordlists
|
||||||
# github.com/vacp2p/mvds v0.0.23
|
# github.com/vacp2p/mvds v0.0.24-0.20201124060106-26d8e94130d8
|
||||||
github.com/vacp2p/mvds/node
|
github.com/vacp2p/mvds/node
|
||||||
github.com/vacp2p/mvds/node/migrations
|
github.com/vacp2p/mvds/node/migrations
|
||||||
github.com/vacp2p/mvds/peers
|
github.com/vacp2p/mvds/peers
|
||||||
|
|
Loading…
Reference in New Issue