Merge pull request #10161 from hashicorp/dnephin/update-deps

Update a couple dependencies
This commit is contained in:
Daniel Nephin 2021-05-04 14:30:37 -04:00 committed by hc-github-team-consul-core
parent 48306a1cc2
commit a583415bed
13 changed files with 93 additions and 66 deletions

9
.changelog/10161.txt Normal file
View File

@ -0,0 +1,9 @@
```release-note:bug
memberlist: fixes a couple bugs which allowed malformed input to cause a crash in a Consul
client or server.
```
```release-note:bug
telemetry: fixes a bug with Prometheus metrics where Gauges and Summaries were incorrectly
being expired.
```

View File

@ -21,7 +21,6 @@ import (
connlimit "github.com/hashicorp/go-connlimit"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
raftboltdb "github.com/hashicorp/raft-boltdb"
@ -35,6 +34,7 @@ import (
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/usagemetrics"
"github.com/hashicorp/consul/agent/consul/wanfed"
agentgrpc "github.com/hashicorp/consul/agent/grpc"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
@ -251,7 +251,7 @@ type Server struct {
// serfWAN is the Serf cluster maintained between DC's
// which SHOULD only consist of Consul servers
serfWAN *serf.Serf
memberlistTransportWAN memberlist.IngestionAwareTransport
memberlistTransportWAN wanfed.IngestionAwareTransport
gatewayLocator *GatewayLocator
// serverLookup tracks server consuls in the local datacenter.
@ -500,7 +500,7 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
// This is always a *memberlist.NetTransport or something which wraps
// it which satisfies this interface.
s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(memberlist.IngestionAwareTransport)
s.memberlistTransportWAN = config.SerfWANConfig.MemberlistConfig.Transport.(wanfed.IngestionAwareTransport)
// See big comment above why we are doing this.
if serfBindPortWAN == 0 {

View File

@ -8,10 +8,11 @@ import (
"strings"
"time"
"github.com/hashicorp/memberlist"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/memberlist"
)
const (
@ -29,9 +30,15 @@ const (
type MeshGatewayResolver func(datacenter string) string
type IngestionAwareTransport interface {
memberlist.NodeAwareTransport
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
IngestStream(conn net.Conn) error
}
func NewTransport(
tlsConfigurator *tlsutil.Configurator,
transport memberlist.NodeAwareTransport,
transport IngestionAwareTransport,
datacenter string,
gwResolver MeshGatewayResolver,
) (*Transport, error) {
@ -48,7 +55,7 @@ func NewTransport(
}
t := &Transport{
NodeAwareTransport: transport,
IngestionAwareTransport: transport,
tlsConfigurator: tlsConfigurator,
datacenter: datacenter,
gwResolver: gwResolver,
@ -58,7 +65,7 @@ func NewTransport(
}
type Transport struct {
memberlist.NodeAwareTransport
IngestionAwareTransport
tlsConfigurator *tlsutil.Configurator
datacenter string
@ -71,7 +78,7 @@ var _ memberlist.NodeAwareTransport = (*Transport)(nil)
// Shutdown implements memberlist.Transport.
func (t *Transport) Shutdown() error {
err1 := t.pool.Close()
err2 := t.NodeAwareTransport.Shutdown()
err2 := t.IngestionAwareTransport.Shutdown()
if err2 != nil {
// the more important error is err2
return err2
@ -118,7 +125,7 @@ func (t *Transport) WriteToAddress(b []byte, addr memberlist.Address) (time.Time
return time.Now(), nil
}
return t.NodeAwareTransport.WriteToAddress(b, addr)
return t.IngestionAwareTransport.WriteToAddress(b, addr)
}
// DialAddressTimeout implements memberlist.NodeAwareTransport.
@ -137,7 +144,7 @@ func (t *Transport) DialAddressTimeout(addr memberlist.Address, timeout time.Dur
return t.dial(dc, node, pool.ALPN_WANGossipStream, gwAddr)
}
return t.NodeAwareTransport.DialAddressTimeout(addr, timeout)
return t.IngestionAwareTransport.DialAddressTimeout(addr, timeout)
}
// NOTE: There is a close mirror of this method in agent/pool/pool.go:DialTimeoutWithRPCType

6
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/Microsoft/go-winio v0.4.3 // indirect
github.com/NYTimes/gziphandler v1.0.1
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/go-metrics v0.3.7
github.com/armon/go-metrics v0.3.8
github.com/armon/go-radix v1.0.0
github.com/aws/aws-sdk-go v1.25.41
github.com/coredns/coredns v1.1.2
@ -50,9 +50,9 @@ require (
github.com/hashicorp/hcl v1.0.0
github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038
github.com/hashicorp/mdns v1.0.4 // indirect
github.com/hashicorp/memberlist v0.2.3
github.com/hashicorp/memberlist v0.2.4
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/raft v1.3.0
github.com/hashicorp/raft v1.3.1
github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea
github.com/hashicorp/serf v0.9.5

12
go.sum
View File

@ -58,8 +58,8 @@ github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5
github.com/armon/go-metrics v0.0.0-20180917152333-f0300d1749da/go.mod h1:Q73ZrmVTwzkszR9V5SSuryQ31EELlFMUz1kKyl939pY=
github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg=
github.com/armon/go-metrics v0.3.0/go.mod h1:zXjbSimjXTd7vOpY8B0/2LpvNvDoXBuplAD+gJD3GYs=
github.com/armon/go-metrics v0.3.7 h1:c/oCtWzYpboy6+6f6LjXRlyW7NwA2SWf+a9KMlHq/bM=
github.com/armon/go-metrics v0.3.7/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-metrics v0.3.8 h1:oOxq3KPj0WhCuy50EhzwiyMyG2ovRQZpZLXQuOh2a/M=
github.com/armon/go-metrics v0.3.8/go.mod h1:4O98XIr/9W0sxpJ8UaYkvjk10Iff7SnFrb4QAOwNTFc=
github.com/armon/go-radix v0.0.0-20180808171621-7fddfc383310/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
github.com/armon/go-radix v1.0.0 h1:F4z6KzEeeQIMeLFa97iZU6vupzoecKdU5TX24SNppXI=
github.com/armon/go-radix v1.0.0/go.mod h1:ufUuZ+zHj4x4TnLV4JWEpy2hxWSpsRywHrMgIH9cCH8=
@ -274,14 +274,14 @@ github.com/hashicorp/mdns v1.0.1/go.mod h1:4gW7WsVCke5TE7EPeYliwHlRUyBtfCwuFwuMg
github.com/hashicorp/mdns v1.0.4 h1:sY0CMhFmjIPDMlTB+HfymFHCaYLhgifZ0QhjaYKD/UQ=
github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/YAJqrc=
github.com/hashicorp/memberlist v0.2.2/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/memberlist v0.2.3 h1:BwZa5IjREr75J0am7nblP+X5i95Rmp8EEbMI5vkUWdA=
github.com/hashicorp/memberlist v0.2.3/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/memberlist v0.2.4 h1:OOhYzSvFnkFQXm1ysE8RjXTHsqSRDyP4emusC9K7DYg=
github.com/hashicorp/memberlist v0.2.4/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69 h1:lc3c72qGlIMDqQpQH82Y4vaglRMMFdJbziYWriR4UcE=
github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69/go.mod h1:/z+jUGRBlwVpUZfjute9jWaF6/HuhjuFQuL1YXzVD1Q=
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.3.0 h1:Wox4J4R7J2FOJLtTa6hdk0VJfiNUSP32pYoYR738bkE=
github.com/hashicorp/raft v1.3.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft v1.3.1 h1:zDT8ke8y2aP4wf9zPTB2uSIeavJ3Hx/ceY4jxI2JxuY=
github.com/hashicorp/raft v1.3.1/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7HVbyzqM=
github.com/hashicorp/raft-autopilot v0.1.2 h1:yeqdUjWLjVJkBM+mcVxqwxi+w+aHsb9cEON2dz69OCs=
github.com/hashicorp/raft-autopilot v0.1.2/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea h1:xykPFhrBAS2J0VBzVa5e80b5ZtYuNQtgXjN40qBZlD4=

View File

@ -245,6 +245,8 @@ func (i *InmemSink) Data() []*IntervalMetrics {
copyCurrent := intervals[n-1]
current.RLock()
*copyCurrent = *current
// RWMutex is not safe to copy, so create a new instance on the copy
copyCurrent.RWMutex = sync.RWMutex{}
copyCurrent.Gauges = make(map[string]GaugeValue, len(current.Gauges))
for k, v := range current.Gauges {

View File

@ -355,6 +355,10 @@ func (m *Memberlist) ingestPacket(buf []byte, from net.Addr, timestamp time.Time
}
func (m *Memberlist) handleCommand(buf []byte, from net.Addr, timestamp time.Time) {
if len(buf) < 1 {
m.logger.Printf("[ERR] memberlist: missing message type byte %s", LogAddress(from))
return
}
// Decode the message type
msgType := messageType(buf[0])
buf = buf[1:]

View File

@ -82,16 +82,18 @@ func (a *Address) String() string {
return a.Addr
}
// IngestionAwareTransport is not used.
//
// Deprecated: IngestionAwareTransport is not used and may be removed in a future
// version. Define the interface locally instead of referencing this exported
// interface.
type IngestionAwareTransport interface {
Transport
// IngestPacket pulls a single packet off the conn, and only closes it if shouldClose is true.
IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error
// IngestStream hands off the conn to the transport and doesn't close it.
IngestStream(conn net.Conn) error
}
type NodeAwareTransport interface {
IngestionAwareTransport
Transport
WriteToAddress(b []byte, addr Address) (time.Time, error)
DialAddressTimeout(addr Address, timeout time.Duration) (net.Conn, error)
}
@ -102,22 +104,6 @@ type shimNodeAwareTransport struct {
var _ NodeAwareTransport = (*shimNodeAwareTransport)(nil)
func (t *shimNodeAwareTransport) IngestPacket(conn net.Conn, addr net.Addr, now time.Time, shouldClose bool) error {
iat, ok := t.Transport.(IngestionAwareTransport)
if !ok {
panic("shimNodeAwareTransport does not support IngestPacket")
}
return iat.IngestPacket(conn, addr, now, shouldClose)
}
func (t *shimNodeAwareTransport) IngestStream(conn net.Conn) error {
iat, ok := t.Transport.(IngestionAwareTransport)
if !ok {
panic("shimNodeAwareTransport does not support IngestStream")
}
return iat.IngestStream(conn)
}
func (t *shimNodeAwareTransport) WriteToAddress(b []byte, addr Address) (time.Time, error) {
return t.WriteTo(b, addr.Addr)
}

View File

@ -185,18 +185,18 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
err = fmt.Errorf("missing compound length byte")
return
}
numParts := uint8(buf[0])
numParts := int(buf[0])
buf = buf[1:]
// Check we have enough bytes
if len(buf) < int(numParts*2) {
if len(buf) < numParts*2 {
err = fmt.Errorf("truncated len slice")
return
}
// Decode the lengths
lengths := make([]uint16, numParts)
for i := 0; i < int(numParts); i++ {
for i := 0; i < numParts; i++ {
lengths[i] = binary.BigEndian.Uint16(buf[i*2 : i*2+2])
}
buf = buf[numParts*2:]
@ -204,7 +204,7 @@ func decodeCompoundMessage(buf []byte) (trunc int, parts [][]byte, err error) {
// Split each message
for idx, msgLen := range lengths {
if len(buf) < int(msgLen) {
trunc = int(numParts) - idx
trunc = numParts - idx
return
}

View File

@ -564,7 +564,9 @@ func NewRaft(conf *Config, fsm FSM, logs LogStore, stable StableStore, snaps Sna
r.logger.Error("failed to get log", "index", index, "error", err)
panic(err)
}
r.processConfigurationLogEntry(&entry)
if err := r.processConfigurationLogEntry(&entry); err != nil {
return nil, err
}
}
r.logger.Info("initial configuration",
"index", r.configurations.latestIndex,
@ -627,7 +629,10 @@ func (r *Raft) restoreSnapshot() error {
conf = snapshot.Configuration
index = snapshot.ConfigurationIndex
} else {
conf = decodePeers(snapshot.Peers, r.trans)
var err error
if conf, err = decodePeers(snapshot.Peers, r.trans); err != nil {
return err
}
index = snapshot.Index
}
r.setCommittedConfiguration(conf, index)

View File

@ -319,11 +319,11 @@ func encodePeers(configuration Configuration, trans Transport) []byte {
// decodePeers is used to deserialize an old list of peers into a Configuration.
// This is here for backwards compatibility with old log entries and snapshots;
// it should be removed eventually.
func decodePeers(buf []byte, trans Transport) Configuration {
func decodePeers(buf []byte, trans Transport) (Configuration, error) {
// Decode the buffer first.
var encPeers [][]byte
if err := decodeMsgPack(buf, &encPeers); err != nil {
panic(fmt.Errorf("failed to decode peers: %v", err))
return Configuration{}, fmt.Errorf("failed to decode peers: %v", err)
}
// Deserialize each peer.
@ -333,13 +333,11 @@ func decodePeers(buf []byte, trans Transport) Configuration {
servers = append(servers, Server{
Suffrage: Voter,
ID: ServerID(p),
Address: ServerAddress(p),
Address: p,
})
}
return Configuration{
Servers: servers,
}
return Configuration{Servers: servers}, nil
}
// EncodeConfiguration serializes a Configuration using MsgPack, or panics on

View File

@ -244,8 +244,7 @@ func (r *Raft) liveBootstrap(configuration Configuration) error {
}
r.setCurrentTerm(1)
r.setLastLog(entry.Index, entry.Term)
r.processConfigurationLogEntry(&entry)
return nil
return r.processConfigurationLogEntry(&entry)
}
// runCandidate runs the FSM for a candidate.
@ -1383,7 +1382,13 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// Handle any new configuration changes
for _, newEntry := range newEntries {
r.processConfigurationLogEntry(newEntry)
if err := r.processConfigurationLogEntry(newEntry); err != nil {
r.logger.Warn("failed to append entry",
"index", newEntry.Index,
"error", err)
rpcErr = err
return
}
}
// Update the lastLog
@ -1415,14 +1420,21 @@ func (r *Raft) appendEntries(rpc RPC, a *AppendEntriesRequest) {
// processConfigurationLogEntry takes a log entry and updates the latest
// configuration if the entry results in a new configuration. This must only be
// called from the main thread, or from NewRaft() before any threads have begun.
func (r *Raft) processConfigurationLogEntry(entry *Log) {
if entry.Type == LogConfiguration {
func (r *Raft) processConfigurationLogEntry(entry *Log) error {
switch entry.Type {
case LogConfiguration:
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(DecodeConfiguration(entry.Data), entry.Index)
} else if entry.Type == LogAddPeerDeprecated || entry.Type == LogRemovePeerDeprecated {
case LogAddPeerDeprecated, LogRemovePeerDeprecated:
r.setCommittedConfiguration(r.configurations.latest, r.configurations.latestIndex)
r.setLatestConfiguration(decodePeers(entry.Data, r.trans), entry.Index)
conf, err := decodePeers(entry.Data, r.trans)
if err != nil {
return err
}
r.setLatestConfiguration(conf, entry.Index)
}
return nil
}
// requestVote is invoked when we get an request vote RPC call.
@ -1574,7 +1586,11 @@ func (r *Raft) installSnapshot(rpc RPC, req *InstallSnapshotRequest) {
reqConfiguration = DecodeConfiguration(req.Configuration)
reqConfigurationIndex = req.ConfigurationIndex
} else {
reqConfiguration = decodePeers(req.Peers, r.trans)
reqConfiguration, rpcErr = decodePeers(req.Peers, r.trans)
if rpcErr != nil {
r.logger.Error("failed to install snapshot", "error", rpcErr)
return
}
reqConfigurationIndex = req.LastLogIndex
}
version := getSnapshotVersion(r.protocolVersion)

6
vendor/modules.txt vendored
View File

@ -32,7 +32,7 @@ github.com/NYTimes/gziphandler
github.com/StackExchange/wmi
# github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e
github.com/armon/circbuf
# github.com/armon/go-metrics v0.3.7
# github.com/armon/go-metrics v0.3.8
github.com/armon/go-metrics
github.com/armon/go-metrics/circonus
github.com/armon/go-metrics/datadog
@ -477,11 +477,11 @@ github.com/hashicorp/hil/parser
github.com/hashicorp/hil/scanner
# github.com/hashicorp/mdns v1.0.4
github.com/hashicorp/mdns
# github.com/hashicorp/memberlist v0.2.3
# github.com/hashicorp/memberlist v0.2.4
github.com/hashicorp/memberlist
# github.com/hashicorp/net-rpc-msgpackrpc v0.0.0-20151116020338-a14192a58a69
github.com/hashicorp/net-rpc-msgpackrpc
# github.com/hashicorp/raft v1.3.0
# github.com/hashicorp/raft v1.3.1
github.com/hashicorp/raft
# github.com/hashicorp/raft-autopilot v0.1.2
github.com/hashicorp/raft-autopilot