mirror of https://github.com/status-im/consul.git
peering: track imported services (#13718)
This commit is contained in:
parent
d523d005d9
commit
adb5ffa1a6
|
@ -17,6 +17,7 @@ import (
|
|||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
||||
|
@ -309,11 +310,6 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
|
|||
Node: "aaa",
|
||||
PeerName: peer,
|
||||
},
|
||||
{
|
||||
CheckID: structs.SerfCheckID,
|
||||
Node: "aaa",
|
||||
PeerName: peer,
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
|
@ -336,11 +332,6 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
|
|||
Node: "bbb",
|
||||
PeerName: peer,
|
||||
},
|
||||
{
|
||||
CheckID: structs.SerfCheckID,
|
||||
Node: "bbb",
|
||||
PeerName: peer,
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
|
@ -363,13 +354,269 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
|
|||
Node: "ccc",
|
||||
PeerName: peer,
|
||||
},
|
||||
{
|
||||
CheckID: structs.SerfCheckID,
|
||||
Node: "ccc",
|
||||
PeerName: peer,
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
return lastIdx
|
||||
}
|
||||
|
||||
// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe
|
||||
func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
// TODO(peering): Configure with TLS
|
||||
_, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s1.dc1"
|
||||
c.Datacenter = "dc1"
|
||||
c.TLSConfig.Domain = "consul"
|
||||
})
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Create a peering by generating a token
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
defer conn.Close()
|
||||
|
||||
peeringClient := pbpeering.NewPeeringServiceClient(conn)
|
||||
|
||||
req := pbpeering.GenerateTokenRequest{
|
||||
PeerName: "my-peer-s2",
|
||||
}
|
||||
resp, err := peeringClient.GenerateToken(ctx, &req)
|
||||
require.NoError(t, err)
|
||||
|
||||
tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken)
|
||||
require.NoError(t, err)
|
||||
|
||||
var token structs.PeeringToken
|
||||
require.NoError(t, json.Unmarshal(tokenJSON, &token))
|
||||
|
||||
var (
|
||||
s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
lastIdx = uint64(0)
|
||||
)
|
||||
|
||||
// Bring up s2 and store s1's token so that it attempts to dial.
|
||||
_, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s2.dc2"
|
||||
c.Datacenter = "dc2"
|
||||
c.PrimaryDatacenter = "dc2"
|
||||
})
|
||||
testrpc.WaitForLeader(t, s2.RPC, "dc2")
|
||||
|
||||
// Simulate a peering initiation event by writing a peering with data from a peering token.
|
||||
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
|
||||
p := &pbpeering.Peering{
|
||||
ID: s2PeerID,
|
||||
Name: "my-peer-s1",
|
||||
PeerID: token.PeerID,
|
||||
PeerCAPems: token.CA,
|
||||
PeerServerName: token.ServerName,
|
||||
PeerServerAddresses: token.ServerAddresses,
|
||||
}
|
||||
require.True(t, p.ShouldDial())
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p))
|
||||
|
||||
/// add services to S1 to be synced to S2
|
||||
lastIdx++
|
||||
require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{
|
||||
ID: types.NodeID(generateUUID()),
|
||||
Node: "aaa",
|
||||
Address: "10.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
Service: "a-service",
|
||||
ID: "a-service-1",
|
||||
Port: 8080,
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
{
|
||||
CheckID: "a-service-1-check",
|
||||
ServiceName: "a-service",
|
||||
ServiceID: "a-service-1",
|
||||
Node: "aaa",
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{
|
||||
ID: types.NodeID(generateUUID()),
|
||||
|
||||
Node: "bbb",
|
||||
Address: "10.0.0.2",
|
||||
Service: &structs.NodeService{
|
||||
Service: "b-service",
|
||||
ID: "b-service-1",
|
||||
Port: 8080,
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
{
|
||||
CheckID: "b-service-1-check",
|
||||
ServiceName: "b-service",
|
||||
ServiceID: "b-service-1",
|
||||
Node: "bbb",
|
||||
},
|
||||
},
|
||||
}))
|
||||
|
||||
lastIdx++
|
||||
require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{
|
||||
ID: types.NodeID(generateUUID()),
|
||||
|
||||
Node: "ccc",
|
||||
Address: "10.0.0.3",
|
||||
Service: &structs.NodeService{
|
||||
Service: "c-service",
|
||||
ID: "c-service-1",
|
||||
Port: 8080,
|
||||
},
|
||||
Checks: structs.HealthChecks{
|
||||
{
|
||||
CheckID: "c-service-1-check",
|
||||
ServiceName: "c-service",
|
||||
ServiceID: "c-service-1",
|
||||
Node: "ccc",
|
||||
},
|
||||
},
|
||||
}))
|
||||
/// finished adding services
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
description string
|
||||
exportedService structs.ExportedServicesConfigEntry
|
||||
expectedImportedServicesCount uint64
|
||||
}
|
||||
|
||||
testCases := []testCase{
|
||||
{
|
||||
name: "wildcard",
|
||||
description: "for a wildcard exported services, we want to see all services synced",
|
||||
exportedService: structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: structs.WildcardSpecifier,
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peer-s2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 4, // 3 services from above + the "consul" service
|
||||
},
|
||||
{
|
||||
name: "no sync",
|
||||
description: "update the config entry to allow no service sync",
|
||||
exportedService: structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
},
|
||||
expectedImportedServicesCount: 0, // we want to see this decremented from 4 --> 0
|
||||
},
|
||||
{
|
||||
name: "just a, b services",
|
||||
description: "export just two services",
|
||||
exportedService: structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: "a-service",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peer-s2",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "b-service",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peer-s2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 2,
|
||||
},
|
||||
{
|
||||
name: "unexport b service",
|
||||
description: "by unexporting b we want to see the count decrement eventually",
|
||||
exportedService: structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: "a-service",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peer-s2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 1,
|
||||
},
|
||||
{
|
||||
name: "export c service",
|
||||
description: "now export the c service and expect the count to increment",
|
||||
exportedService: structs.ExportedServicesConfigEntry{
|
||||
Name: "default",
|
||||
Services: []structs.ExportedService{
|
||||
{
|
||||
Name: "a-service",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peer-s2",
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
Name: "c-service",
|
||||
Consumers: []structs.ServiceConsumer{
|
||||
{
|
||||
PeerName: "my-peer-s2",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 2,
|
||||
},
|
||||
}
|
||||
|
||||
conn2, err := grpc.DialContext(ctx, s2.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())),
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
defer conn2.Close()
|
||||
|
||||
peeringClient2 := pbpeering.NewPeeringServiceClient(conn2)
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, s1.fsm.State().EnsureConfigEntry(lastIdx, &tc.exportedService))
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
resp2, err := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
||||
require.NoError(r, err)
|
||||
require.NotEmpty(r, resp2.Peerings)
|
||||
require.Equal(r, tc.expectedImportedServicesCount, resp2.Peerings[0].ImportedServiceCount)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -113,7 +113,9 @@ func marshalToProtoAny[T proto.Message](in any) (*anypb.Any, T, error) {
|
|||
func (s *Server) processResponse(
|
||||
peerName string,
|
||||
partition string,
|
||||
mutableStatus *MutableStatus,
|
||||
resp *pbpeerstream.ReplicationMessage_Response,
|
||||
logger hclog.Logger,
|
||||
) (*pbpeerstream.ReplicationMessage, error) {
|
||||
if !pbpeerstream.KnownTypeURL(resp.ResourceURL) {
|
||||
err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
|
||||
|
@ -137,7 +139,7 @@ func (s *Server) processResponse(
|
|||
), err
|
||||
}
|
||||
|
||||
if err := s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil {
|
||||
if err := s.handleUpsert(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, resp.Resource, logger); err != nil {
|
||||
return makeNACKReply(
|
||||
resp.ResourceURL,
|
||||
resp.Nonce,
|
||||
|
@ -149,7 +151,7 @@ func (s *Server) processResponse(
|
|||
return makeACKReply(resp.ResourceURL, resp.Nonce), nil
|
||||
|
||||
case pbpeerstream.Operation_OPERATION_DELETE:
|
||||
if err := s.handleDelete(peerName, partition, resp.ResourceURL, resp.ResourceID); err != nil {
|
||||
if err := s.handleDelete(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, logger); err != nil {
|
||||
return makeNACKReply(
|
||||
resp.ResourceURL,
|
||||
resp.Nonce,
|
||||
|
@ -178,9 +180,11 @@ func (s *Server) processResponse(
|
|||
func (s *Server) handleUpsert(
|
||||
peerName string,
|
||||
partition string,
|
||||
mutableStatus *MutableStatus,
|
||||
resourceURL string,
|
||||
resourceID string,
|
||||
resource *anypb.Any,
|
||||
logger hclog.Logger,
|
||||
) error {
|
||||
switch resourceURL {
|
||||
case pbpeerstream.TypeURLService:
|
||||
|
@ -192,7 +196,16 @@ func (s *Server) handleUpsert(
|
|||
return fmt.Errorf("failed to unmarshal resource: %w", err)
|
||||
}
|
||||
|
||||
return s.handleUpdateService(peerName, partition, sn, csn)
|
||||
err := s.handleUpdateService(peerName, partition, sn, csn)
|
||||
if err != nil {
|
||||
logger.Error("did not increment imported services count", "service_name", sn.String(), "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Trace("incrementing imported services count", "service_name", sn.String())
|
||||
mutableStatus.TrackImportedService(sn)
|
||||
|
||||
return nil
|
||||
|
||||
case pbpeerstream.TypeURLRoots:
|
||||
roots := &pbpeering.PeeringTrustBundle{}
|
||||
|
@ -425,14 +438,26 @@ func (s *Server) handleUpsertRoots(
|
|||
func (s *Server) handleDelete(
|
||||
peerName string,
|
||||
partition string,
|
||||
mutableStatus *MutableStatus,
|
||||
resourceURL string,
|
||||
resourceID string,
|
||||
logger hclog.Logger,
|
||||
) error {
|
||||
switch resourceURL {
|
||||
case pbpeerstream.TypeURLService:
|
||||
sn := structs.ServiceNameFromString(resourceID)
|
||||
sn.OverridePartition(partition)
|
||||
return s.handleUpdateService(peerName, partition, sn, nil)
|
||||
|
||||
err := s.handleUpdateService(peerName, partition, sn, nil)
|
||||
if err != nil {
|
||||
logger.Error("did not decrement imported services count", "service_name", sn.String(), "error", err)
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Trace("decrementing imported services count", "service_name", sn.String())
|
||||
mutableStatus.RemoveImportedService(sn)
|
||||
|
||||
return nil
|
||||
|
||||
default:
|
||||
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
|
||||
|
|
|
@ -302,7 +302,7 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
|
|||
|
||||
if resp := msg.GetResponse(); resp != nil {
|
||||
// TODO(peering): Ensure there's a nonce
|
||||
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, resp)
|
||||
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger)
|
||||
if err != nil {
|
||||
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
|
||||
status.TrackReceiveError(err.Error())
|
||||
|
|
|
@ -475,6 +475,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastNack: lastNack,
|
||||
LastNackMessage: lastNackMsg,
|
||||
LastReceiveSuccess: lastRecvSuccess,
|
||||
ImportedServices: map[string]struct{}{"api": {}},
|
||||
}
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -532,6 +533,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastReceiveSuccess: lastRecvSuccess,
|
||||
LastReceiveError: lastRecvError,
|
||||
LastReceiveErrorMessage: lastRecvErrorMsg,
|
||||
ImportedServices: map[string]struct{}{"api": {}},
|
||||
}
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -559,6 +561,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
|||
LastReceiveSuccess: lastRecvSuccess,
|
||||
LastReceiveErrorMessage: io.EOF.Error(),
|
||||
LastReceiveError: lastRecvError,
|
||||
ImportedServices: map[string]struct{}{"api": {}},
|
||||
}
|
||||
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
|
@ -968,6 +971,9 @@ func (b *testStreamBackend) CatalogDeregister(req *structs.DeregisterRequest) er
|
|||
}
|
||||
|
||||
func Test_processResponse_Validation(t *testing.T) {
|
||||
peerName := "billing"
|
||||
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
in *pbpeerstream.ReplicationMessage_Response
|
||||
|
@ -975,10 +981,18 @@ func Test_processResponse_Validation(t *testing.T) {
|
|||
wantErr bool
|
||||
}
|
||||
|
||||
srv, _ := newTestServer(t, nil)
|
||||
srv, store := newTestServer(t, nil)
|
||||
require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName},
|
||||
))
|
||||
|
||||
// connect the stream
|
||||
mst, err := srv.Tracker.Connected(peerID)
|
||||
require.NoError(t, err)
|
||||
|
||||
run := func(t *testing.T, tc testCase) {
|
||||
reply, err := srv.processResponse("", "", tc.in)
|
||||
reply, err := srv.processResponse(peerName, "", mst, tc.in, srv.Logger)
|
||||
if tc.wantErr {
|
||||
require.Error(t, err)
|
||||
} else {
|
||||
|
@ -1218,8 +1232,8 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test
|
|||
}
|
||||
}
|
||||
|
||||
func TestHandleUpdateService(t *testing.T) {
|
||||
srv, _ := newTestServer(t, func(c *Config) {
|
||||
func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
|
||||
srv, store := newTestServer(t, func(c *Config) {
|
||||
backend := c.Backend.(*testStreamBackend)
|
||||
backend.leader = func() bool {
|
||||
return false
|
||||
|
@ -1227,13 +1241,15 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
})
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
seed []*structs.RegisterRequest
|
||||
input *pbservice.IndexedCheckServiceNodes
|
||||
expect map[string]structs.CheckServiceNodes
|
||||
name string
|
||||
seed []*structs.RegisterRequest
|
||||
input *pbservice.IndexedCheckServiceNodes
|
||||
expect map[string]structs.CheckServiceNodes
|
||||
expectedImportedServicesCount int
|
||||
}
|
||||
|
||||
peerName := "billing"
|
||||
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
|
||||
remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap"))
|
||||
|
||||
// "api" service is imported from the billing-ap partition, corresponding to the billing peer.
|
||||
|
@ -1241,14 +1257,43 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
defaultMeta := *acl.DefaultEnterpriseMeta()
|
||||
apiSN := structs.NewServiceName("api", &defaultMeta)
|
||||
|
||||
// create a peering in the state store
|
||||
require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName},
|
||||
))
|
||||
|
||||
// connect the stream
|
||||
mst, err := srv.Tracker.Connected(peerID)
|
||||
require.NoError(t, err)
|
||||
|
||||
run := func(t *testing.T, tc testCase) {
|
||||
// Seed the local catalog with some data to reconcile against.
|
||||
// and increment the tracker's imported services count
|
||||
for _, reg := range tc.seed {
|
||||
require.NoError(t, srv.Backend.CatalogRegister(reg))
|
||||
|
||||
mst.TrackImportedService(reg.Service.CompoundServiceName())
|
||||
}
|
||||
|
||||
var op pbpeerstream.Operation
|
||||
if len(tc.input.Nodes) == 0 {
|
||||
op = pbpeerstream.Operation_OPERATION_DELETE
|
||||
} else {
|
||||
op = pbpeerstream.Operation_OPERATION_UPSERT
|
||||
}
|
||||
|
||||
in := &pbpeerstream.ReplicationMessage_Response{
|
||||
ResourceURL: pbpeerstream.TypeURLService,
|
||||
ResourceID: apiSN.String(),
|
||||
Nonce: "1",
|
||||
Operation: op,
|
||||
Resource: makeAnyPB(t, tc.input),
|
||||
}
|
||||
|
||||
// Simulate an update arriving for billing/api.
|
||||
require.NoError(t, srv.handleUpdateService(peerName, acl.DefaultPartitionName, apiSN, tc.input))
|
||||
_, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in, srv.Logger)
|
||||
require.NoError(t, err)
|
||||
|
||||
for svc, expect := range tc.expect {
|
||||
t.Run(svc, func(t *testing.T) {
|
||||
|
@ -1257,6 +1302,9 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
requireEqualInstances(t, expect, got)
|
||||
})
|
||||
}
|
||||
|
||||
// assert the imported services count modifications
|
||||
require.Equal(t, tc.expectedImportedServicesCount, mst.GetImportedServicesCount())
|
||||
}
|
||||
|
||||
tt := []testCase{
|
||||
|
@ -1390,6 +1438,7 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 1,
|
||||
},
|
||||
{
|
||||
name: "upsert two service instances to different nodes",
|
||||
|
@ -1521,6 +1570,7 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 1,
|
||||
},
|
||||
{
|
||||
name: "receiving a nil input leads to deleting data in the catalog",
|
||||
|
@ -1574,10 +1624,11 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
input: nil,
|
||||
input: &pbservice.IndexedCheckServiceNodes{},
|
||||
expect: map[string]structs.CheckServiceNodes{
|
||||
"api": {},
|
||||
},
|
||||
expectedImportedServicesCount: 0,
|
||||
},
|
||||
{
|
||||
name: "deleting one service name from a node does not delete other service names",
|
||||
|
@ -1632,7 +1683,7 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
// Nil input is for the "api" service.
|
||||
input: nil,
|
||||
input: &pbservice.IndexedCheckServiceNodes{},
|
||||
expect: map[string]structs.CheckServiceNodes{
|
||||
"api": {},
|
||||
// Existing redis service was not affected by deletion.
|
||||
|
@ -1668,6 +1719,7 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 1,
|
||||
},
|
||||
{
|
||||
name: "service checks are cleaned up when not present in a response",
|
||||
|
@ -1738,6 +1790,7 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 2,
|
||||
},
|
||||
{
|
||||
name: "node checks are cleaned up when not present in a response",
|
||||
|
@ -1872,6 +1925,7 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 2,
|
||||
},
|
||||
{
|
||||
name: "replacing a service instance on a node cleans up the old instance",
|
||||
|
@ -2019,6 +2073,7 @@ func TestHandleUpdateService(t *testing.T) {
|
|||
},
|
||||
},
|
||||
},
|
||||
expectedImportedServicesCount: 2,
|
||||
},
|
||||
}
|
||||
|
||||
|
|
|
@ -4,9 +4,11 @@ import (
|
|||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
)
|
||||
|
||||
// Tracker contains a map of (PeerID -> Status).
|
||||
// Tracker contains a map of (PeerID -> MutableStatus).
|
||||
// As streams are opened and closed we track details about their status.
|
||||
type Tracker struct {
|
||||
mu sync.RWMutex
|
||||
|
@ -142,6 +144,10 @@ type Status struct {
|
|||
// - The error message when we failed to store a resource replicated FROM the peer.
|
||||
// - The last error message when receiving from the stream.
|
||||
LastReceiveErrorMessage string
|
||||
|
||||
// TODO(peering): consider keeping track of imported service counts thru raft
|
||||
// ImportedServices is set that keeps track of which service names are imported for the peer
|
||||
ImportedServices map[string]struct{}
|
||||
}
|
||||
|
||||
func newMutableStatus(now func() time.Time) *MutableStatus {
|
||||
|
@ -222,3 +228,28 @@ func (s *MutableStatus) GetStatus() Status {
|
|||
|
||||
return copy
|
||||
}
|
||||
|
||||
func (s *MutableStatus) RemoveImportedService(sn structs.ServiceName) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
delete(s.ImportedServices, sn.String())
|
||||
}
|
||||
|
||||
func (s *MutableStatus) TrackImportedService(sn structs.ServiceName) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
||||
if s.ImportedServices == nil {
|
||||
s.ImportedServices = make(map[string]struct{})
|
||||
}
|
||||
|
||||
s.ImportedServices[sn.String()] = struct{}{}
|
||||
}
|
||||
|
||||
func (s *MutableStatus) GetImportedServicesCount() int {
|
||||
s.mu.RLock()
|
||||
defer s.mu.RUnlock()
|
||||
|
||||
return len(s.ImportedServices)
|
||||
}
|
||||
|
|
|
@ -337,7 +337,17 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ
|
|||
if peering == nil {
|
||||
return &pbpeering.PeeringReadResponse{Peering: nil}, nil
|
||||
}
|
||||
|
||||
cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State))
|
||||
|
||||
// add imported services count
|
||||
st, found := s.Tracker.StreamStatus(peering.ID)
|
||||
if !found {
|
||||
s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID)
|
||||
} else {
|
||||
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
||||
}
|
||||
|
||||
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
|
||||
}
|
||||
|
||||
|
@ -369,6 +379,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
|
|||
var cPeerings []*pbpeering.Peering
|
||||
for _, p := range peerings {
|
||||
cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State))
|
||||
|
||||
// add imported services count
|
||||
st, found := s.Tracker.StreamStatus(p.ID)
|
||||
if !found {
|
||||
s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID)
|
||||
} else {
|
||||
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
||||
}
|
||||
|
||||
cPeerings = append(cPeerings, cp)
|
||||
}
|
||||
return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil
|
||||
|
@ -586,17 +605,19 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin
|
|||
|
||||
func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering {
|
||||
return &pbpeering.Peering{
|
||||
ID: p.ID,
|
||||
Name: p.Name,
|
||||
Partition: p.Partition,
|
||||
DeletedAt: p.DeletedAt,
|
||||
Meta: p.Meta,
|
||||
PeerID: p.PeerID,
|
||||
PeerCAPems: p.PeerCAPems,
|
||||
PeerServerAddresses: p.PeerServerAddresses,
|
||||
PeerServerName: p.PeerServerName,
|
||||
CreateIndex: p.CreateIndex,
|
||||
ModifyIndex: p.ModifyIndex,
|
||||
ID: p.ID,
|
||||
Name: p.Name,
|
||||
Partition: p.Partition,
|
||||
DeletedAt: p.DeletedAt,
|
||||
Meta: p.Meta,
|
||||
PeerID: p.PeerID,
|
||||
PeerCAPems: p.PeerCAPems,
|
||||
PeerServerAddresses: p.PeerServerAddresses,
|
||||
PeerServerName: p.PeerServerName,
|
||||
CreateIndex: p.CreateIndex,
|
||||
ModifyIndex: p.ModifyIndex,
|
||||
ImportedServiceCount: p.ImportedServiceCount,
|
||||
ExportedServiceCount: p.ExportedServiceCount,
|
||||
|
||||
State: state,
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue