add agent locality and replicate it across peer streams (#16522)

This commit is contained in:
Eric Haberkorn 2023-03-07 14:05:23 -05:00 committed by GitHub
parent b649a5e8e4
commit dbaf8bf49c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 732 additions and 472 deletions

View File

@ -1493,6 +1493,7 @@ func newConsulConfig(runtimeCfg *config.RuntimeConfig, logger hclog.Logger) (*co
cfg.RequestLimitsMode = runtimeCfg.RequestLimitsMode.String() cfg.RequestLimitsMode = runtimeCfg.RequestLimitsMode.String()
cfg.RequestLimitsReadRate = runtimeCfg.RequestLimitsReadRate cfg.RequestLimitsReadRate = runtimeCfg.RequestLimitsReadRate
cfg.RequestLimitsWriteRate = runtimeCfg.RequestLimitsWriteRate cfg.RequestLimitsWriteRate = runtimeCfg.RequestLimitsWriteRate
cfg.Locality = runtimeCfg.StructLocality()
enterpriseConsulConfig(cfg, runtimeCfg) enterpriseConsulConfig(cfg, runtimeCfg)
return cfg, nil return cfg, nil

View File

@ -833,6 +833,7 @@ func (b *builder) build() (rt RuntimeConfig, err error) {
// gossip configuration // gossip configuration
GossipLANGossipInterval: b.durationVal("gossip_lan..gossip_interval", c.GossipLAN.GossipInterval), GossipLANGossipInterval: b.durationVal("gossip_lan..gossip_interval", c.GossipLAN.GossipInterval),
GossipLANGossipNodes: intVal(c.GossipLAN.GossipNodes), GossipLANGossipNodes: intVal(c.GossipLAN.GossipNodes),
Locality: c.Locality,
GossipLANProbeInterval: b.durationVal("gossip_lan..probe_interval", c.GossipLAN.ProbeInterval), GossipLANProbeInterval: b.durationVal("gossip_lan..probe_interval", c.GossipLAN.ProbeInterval),
GossipLANProbeTimeout: b.durationVal("gossip_lan..probe_timeout", c.GossipLAN.ProbeTimeout), GossipLANProbeTimeout: b.durationVal("gossip_lan..probe_timeout", c.GossipLAN.ProbeTimeout),
GossipLANSuspicionMult: intVal(c.GossipLAN.SuspicionMult), GossipLANSuspicionMult: intVal(c.GossipLAN.SuspicionMult),

View File

@ -186,6 +186,7 @@ type Config struct {
LeaveOnTerm *bool `mapstructure:"leave_on_terminate" json:"leave_on_terminate,omitempty"` LeaveOnTerm *bool `mapstructure:"leave_on_terminate" json:"leave_on_terminate,omitempty"`
LicensePath *string `mapstructure:"license_path" json:"license_path,omitempty"` LicensePath *string `mapstructure:"license_path" json:"license_path,omitempty"`
Limits Limits `mapstructure:"limits" json:"-"` Limits Limits `mapstructure:"limits" json:"-"`
Locality Locality `mapstructure:"locality" json:"-"`
LogLevel *string `mapstructure:"log_level" json:"log_level,omitempty"` LogLevel *string `mapstructure:"log_level" json:"log_level,omitempty"`
LogJSON *bool `mapstructure:"log_json" json:"log_json,omitempty"` LogJSON *bool `mapstructure:"log_json" json:"log_json,omitempty"`
LogFile *string `mapstructure:"log_file" json:"log_file,omitempty"` LogFile *string `mapstructure:"log_file" json:"log_file,omitempty"`
@ -311,6 +312,15 @@ type GossipWANConfig struct {
RetransmitMult *int `mapstructure:"retransmit_mult"` RetransmitMult *int `mapstructure:"retransmit_mult"`
} }
// Locality identifies where a given entity is running.
type Locality struct {
// Region is region the zone belongs to.
Region *string `mapstructure:"region"`
// Zone is the zone the entity is running in.
Zone *string `mapstructure:"zone"`
}
type Consul struct { type Consul struct {
Coordinate struct { Coordinate struct {
UpdateBatchSize *int `mapstructure:"update_batch_size"` UpdateBatchSize *int `mapstructure:"update_batch_size"`

View File

@ -796,6 +796,8 @@ type RuntimeConfig struct {
// hcl: leave_on_terminate = (true|false) // hcl: leave_on_terminate = (true|false)
LeaveOnTerm bool LeaveOnTerm bool
Locality Locality
// Logging configuration used to initialize agent logging. // Logging configuration used to initialize agent logging.
Logging logging.Config Logging logging.Config
@ -1713,6 +1715,13 @@ func (c *RuntimeConfig) VersionWithMetadata() string {
return version return version
} }
func (c *RuntimeConfig) StructLocality() structs.Locality {
return structs.Locality{
Region: stringVal(c.Locality.Region),
Zone: stringVal(c.Locality.Zone),
}
}
// Sanitized returns a JSON/HCL compatible representation of the runtime // Sanitized returns a JSON/HCL compatible representation of the runtime
// configuration where all fields with potential secrets had their // configuration where all fields with potential secrets had their
// values replaced by 'hidden'. In addition, network addresses and // values replaced by 'hidden'. In addition, network addresses and

View File

@ -7092,6 +7092,7 @@ func TestRuntimeConfig_Sanitize(t *testing.T) {
}, },
}, },
}, },
Locality: Locality{Region: strPtr("us-west-1"), Zone: strPtr("us-west-1a")},
} }
b, err := json.MarshalIndent(rt.Sanitized(), "", " ") b, err := json.MarshalIndent(rt.Sanitized(), "", " ")

View File

@ -234,6 +234,10 @@
"LeaveDrainTime": "0s", "LeaveDrainTime": "0s",
"LeaveOnTerm": false, "LeaveOnTerm": false,
"LocalProxyConfigResyncInterval": "0s", "LocalProxyConfigResyncInterval": "0s",
"Locality": {
"Region": "us-west-1",
"Zone": "us-west-1a"
},
"Logging": { "Logging": {
"EnableSyslog": false, "EnableSyslog": false,
"LogFilePath": "", "LogFilePath": "",

View File

@ -436,6 +436,8 @@ type Config struct {
PeeringTestAllowPeerRegistrations bool PeeringTestAllowPeerRegistrations bool
Locality structs.Locality
// Embedded Consul Enterprise specific configuration // Embedded Consul Enterprise specific configuration
*EnterpriseConfig *EnterpriseConfig
} }

View File

@ -385,6 +385,7 @@ func (s *Server) establishStream(ctx context.Context,
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: peer.Partition, Partition: peer.Partition,
Datacenter: s.config.Datacenter, Datacenter: s.config.Datacenter,
Locality: pbpeering.LocalityFromStruct(s.config.Locality),
}, },
}, },
}, },

View File

@ -661,6 +661,11 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")
} }
acceptorLocality := structs.Locality{
Region: "us-west-2",
Zone: "us-west-2a",
}
ca := connect.TestCA(t, nil) ca := connect.TestCA(t, nil)
_, acceptingServer := testServerWithConfig(t, func(c *Config) { _, acceptingServer := testServerWithConfig(t, func(c *Config) {
c.NodeName = "accepting-server" c.NodeName = "accepting-server"
@ -676,6 +681,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
"RootCert": ca.RootCert, "RootCert": ca.RootCert,
}, },
} }
c.Locality = acceptorLocality
}) })
testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1") testrpc.WaitForLeader(t, acceptingServer.RPC, "dc1")
@ -683,6 +689,10 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
t.Cleanup(cancel) t.Cleanup(cancel)
dialerLocality := structs.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
}
conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(), conn, err := grpc.DialContext(ctx, acceptingServer.config.RPCAddr.String(),
grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())), grpc.WithContextDialer(newServerDialer(acceptingServer.config.RPCAddr.String())),
//nolint:staticcheck //nolint:staticcheck
@ -705,6 +715,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
// Ensure that the token contains the correct partition and dc // Ensure that the token contains the correct partition and dc
require.Equal(t, "dc1", token.Remote.Datacenter) require.Equal(t, "dc1", token.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, token.Remote.Partition) require.Contains(t, []string{"", "default"}, token.Remote.Partition)
require.Equal(t, acceptorLocality, token.Remote.Locality)
// Bring up dialingServer and store acceptingServer's token so that it attempts to dial. // Bring up dialingServer and store acceptingServer's token so that it attempts to dial.
_, dialingServer := testServerWithConfig(t, func(c *Config) { _, dialingServer := testServerWithConfig(t, func(c *Config) {
@ -712,6 +723,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
c.Datacenter = "dc2" c.Datacenter = "dc2"
c.PrimaryDatacenter = "dc2" c.PrimaryDatacenter = "dc2"
c.PeeringEnabled = true c.PeeringEnabled = true
c.Locality = dialerLocality
}) })
testrpc.WaitForLeader(t, dialingServer.RPC, "dc2") testrpc.WaitForLeader(t, dialingServer.RPC, "dc2")
@ -743,6 +755,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, "dc1", p.Peering.Remote.Datacenter) require.Equal(t, "dc1", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition) require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
require.Equal(t, pbpeering.LocalityFromStruct(acceptorLocality), p.Peering.Remote.Locality)
// Retry fetching the until the peering is active in the acceptor. // Retry fetching the until the peering is active in the acceptor.
ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel = context.WithTimeout(context.Background(), 10*time.Second)
@ -758,6 +771,7 @@ func TestLeader_Peering_RemoteInfo(t *testing.T) {
require.NotNil(t, p) require.NotNil(t, p)
require.Equal(t, "dc2", p.Peering.Remote.Datacenter) require.Equal(t, "dc2", p.Peering.Remote.Datacenter)
require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition) require.Contains(t, []string{"", "default"}, p.Peering.Remote.Partition)
require.Equal(t, pbpeering.LocalityFromStruct(dialerLocality), p.Peering.Remote.Locality)
} }
// Test that the dialing peer attempts to reestablish connections when the accepting peer // Test that the dialing peer attempts to reestablish connections when the accepting peer

View File

@ -860,6 +860,7 @@ func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler
Datacenter: config.Datacenter, Datacenter: config.Datacenter,
ConnectEnabled: config.ConnectEnabled, ConnectEnabled: config.ConnectEnabled,
PeeringEnabled: config.PeeringEnabled, PeeringEnabled: config.PeeringEnabled,
Locality: config.Locality,
}) })
s.peeringServer = p s.peeringServer = p
o := operator.NewServer(operator.Config{ o := operator.NewServer(operator.Config{

View File

@ -591,6 +591,7 @@ func (s *Store) PeeringWrite(idx uint64, req *pbpeering.PeeringWriteRequest) err
req.Peering.Remote = &pbpeering.RemoteInfo{ req.Peering.Remote = &pbpeering.RemoteInfo{
Partition: existing.Remote.Partition, Partition: existing.Remote.Partition,
Datacenter: existing.Remote.Datacenter, Datacenter: existing.Remote.Datacenter,
Locality: existing.Remote.Locality,
} }
} }

View File

@ -1261,6 +1261,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
}, },
@ -1272,6 +1276,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
secrets: &pbpeering.PeeringSecrets{ secrets: &pbpeering.PeeringSecrets{
@ -1303,6 +1311,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
secrets: &pbpeering.PeeringSecrets{ secrets: &pbpeering.PeeringSecrets{
@ -1332,6 +1344,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
secrets: &pbpeering.PeeringSecrets{ secrets: &pbpeering.PeeringSecrets{
@ -1361,6 +1377,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
// Secrets for baz should have been deleted // Secrets for baz should have been deleted
@ -1389,6 +1409,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
// Meta should be unchanged. // Meta should be unchanged.
Meta: nil, Meta: nil,
@ -1416,6 +1440,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
secrets: nil, secrets: nil,
@ -1443,6 +1471,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
// Secrets for baz should have been deleted // Secrets for baz should have been deleted
@ -1469,6 +1501,10 @@ func TestStore_PeeringWrite(t *testing.T) {
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: "part1", Partition: "part1",
Datacenter: "datacenter1", Datacenter: "datacenter1",
Locality: &pbpeering.Locality{
Region: "us-west-1",
Zone: "us-west-1a",
},
}, },
}, },
// Secrets for baz should have been deleted // Secrets for baz should have been deleted

View File

@ -87,6 +87,7 @@ type Config struct {
Datacenter string Datacenter string
ConnectEnabled bool ConnectEnabled bool
PeeringEnabled bool PeeringEnabled bool
Locality structs.Locality
} }
func NewServer(cfg Config) *Server { func NewServer(cfg Config) *Server {
@ -327,6 +328,7 @@ func (s *Server) GenerateToken(
Remote: structs.PeeringTokenRemote{ Remote: structs.PeeringTokenRemote{
Partition: req.PartitionOrDefault(), Partition: req.PartitionOrDefault(),
Datacenter: s.Datacenter, Datacenter: s.Datacenter,
Locality: s.Config.Locality,
}, },
} }
@ -445,6 +447,7 @@ func (s *Server) Establish(
Remote: &pbpeering.RemoteInfo{ Remote: &pbpeering.RemoteInfo{
Partition: tok.Remote.Partition, Partition: tok.Remote.Partition,
Datacenter: tok.Remote.Datacenter, Datacenter: tok.Remote.Datacenter,
Locality: pbpeering.LocalityFromStruct(tok.Remote.Locality),
}, },
} }

View File

@ -14,6 +14,7 @@ type PeeringToken struct {
type PeeringTokenRemote struct { type PeeringTokenRemote struct {
Partition string Partition string
Datacenter string Datacenter string
Locality Locality
} }
type IndexedExportedServiceList struct { type IndexedExportedServiceList struct {

View File

@ -3021,3 +3021,12 @@ func TimeToProto(s time.Time) *timestamppb.Timestamp {
func IsZeroProtoTime(t *timestamppb.Timestamp) bool { func IsZeroProtoTime(t *timestamppb.Timestamp) bool {
return t.Seconds == 0 && t.Nanos == 0 return t.Seconds == 0 && t.Nanos == 0
} }
// Locality identifies where a given entity is running.
type Locality struct {
// Region is region the zone belongs to.
Region string `json:",omitempty"`
// Zone is the zone the entity is running in.
Zone string `json:",omitempty"`
}

View File

@ -44,6 +44,16 @@ type PeeringRemoteInfo struct {
Partition string Partition string
// Datacenter is the remote peer's datacenter. // Datacenter is the remote peer's datacenter.
Datacenter string Datacenter string
Locality Locality
}
// Locality identifies where a given entity is running.
type Locality struct {
// Region is region the zone belongs to.
Region string
// Zone is the zone the entity is running in.
Zone string
} }
type Peering struct { type Peering struct {

View File

@ -62,6 +62,20 @@ func GenerateTokenResponseFromAPI(t *api.PeeringGenerateTokenResponse, s *Genera
} }
s.PeeringToken = t.PeeringToken s.PeeringToken = t.PeeringToken
} }
func LocalityToAPI(s *Locality, t *api.Locality) {
if s == nil {
return
}
t.Region = s.Region
t.Zone = s.Zone
}
func LocalityFromAPI(t *api.Locality, s *Locality) {
if s == nil {
return
}
s.Region = t.Region
s.Zone = t.Zone
}
func PeeringToAPI(s *Peering, t *api.Peering) { func PeeringToAPI(s *Peering, t *api.Peering) {
if s == nil { if s == nil {
return return
@ -112,6 +126,9 @@ func RemoteInfoToAPI(s *RemoteInfo, t *api.PeeringRemoteInfo) {
} }
t.Partition = s.Partition t.Partition = s.Partition
t.Datacenter = s.Datacenter t.Datacenter = s.Datacenter
if s.Locality != nil {
LocalityToAPI(s.Locality, &t.Locality)
}
} }
func RemoteInfoFromAPI(t *api.PeeringRemoteInfo, s *RemoteInfo) { func RemoteInfoFromAPI(t *api.PeeringRemoteInfo, s *RemoteInfo) {
if s == nil { if s == nil {
@ -119,4 +136,9 @@ func RemoteInfoFromAPI(t *api.PeeringRemoteInfo, s *RemoteInfo) {
} }
s.Partition = t.Partition s.Partition = t.Partition
s.Datacenter = t.Datacenter s.Datacenter = t.Datacenter
{
var x Locality
LocalityFromAPI(&t.Locality, &x)
s.Locality = &x
}
} }

View File

@ -276,7 +276,14 @@ func (r *RemoteInfo) IsEmpty() bool {
if r == nil { if r == nil {
return true return true
} }
return r.Partition == "" && r.Datacenter == "" return r.Partition == "" && r.Datacenter == "" && r.Locality.IsEmpty()
}
func (l *Locality) IsEmpty() bool {
if l == nil {
return true
}
return l.Region == "" && l.Zone == ""
} }
// convenience // convenience
@ -324,3 +331,10 @@ func (o *PeeringTrustBundle) DeepCopy() *PeeringTrustBundle {
} }
return cp return cp
} }
func LocalityFromStruct(l structs.Locality) *Locality {
return &Locality{
Region: l.Region,
Zone: l.Zone,
}
}

View File

@ -107,6 +107,16 @@ func (msg *RemoteInfo) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *Locality) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *Locality) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *StreamStatus) MarshalBinary() ([]byte, error) { func (msg *StreamStatus) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -237,6 +237,22 @@ message RemoteInfo {
string Partition = 1; string Partition = 1;
// Datacenter is the remote peer's datacenter. // Datacenter is the remote peer's datacenter.
string Datacenter = 2; string Datacenter = 2;
// Locality identifies where the peer is running.
Locality Locality = 3;
}
// mog annotation:
//
// target=github.com/hashicorp/consul/api.Locality
// output=peering.gen.go
// name=API
message Locality {
// Region is region the zone belongs to.
string Region = 1;
// Zone is the zone the entity is running in.
string Zone = 2;
} }
// StreamStatus represents information about an active peering stream. // StreamStatus represents information about an active peering stream.