mirror of https://github.com/status-im/consul.git
peering: refactor reconcile, cleanup (#13795)
Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>
This commit is contained in:
parent
eb4f479e7e
commit
de5a991d8c
|
@ -747,10 +747,11 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
/// finished adding services
|
/// finished adding services
|
||||||
|
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
description string
|
description string
|
||||||
exportedService structs.ExportedServicesConfigEntry
|
exportedService structs.ExportedServicesConfigEntry
|
||||||
expectedImportedExportedServicesCount uint64 // same count for a server that imports the services form a server that exports them
|
expectedImportedServsCount uint64
|
||||||
|
expectedExportedServsCount uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
testCases := []testCase{
|
testCases := []testCase{
|
||||||
|
@ -770,7 +771,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedExportedServicesCount: 4, // 3 services from above + the "consul" service
|
expectedImportedServsCount: 4, // 3 services from above + the "consul" service
|
||||||
|
expectedExportedServsCount: 4, // 3 services from above + the "consul" service
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "no sync",
|
name: "no sync",
|
||||||
|
@ -778,7 +780,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
exportedService: structs.ExportedServicesConfigEntry{
|
exportedService: structs.ExportedServicesConfigEntry{
|
||||||
Name: "default",
|
Name: "default",
|
||||||
},
|
},
|
||||||
expectedImportedExportedServicesCount: 0, // we want to see this decremented from 4 --> 0
|
expectedImportedServsCount: 0, // we want to see this decremented from 4 --> 0
|
||||||
|
expectedExportedServsCount: 0, // we want to see this decremented from 4 --> 0
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "just a, b services",
|
name: "just a, b services",
|
||||||
|
@ -804,7 +807,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedExportedServicesCount: 2,
|
expectedImportedServsCount: 2,
|
||||||
|
expectedExportedServsCount: 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "unexport b service",
|
name: "unexport b service",
|
||||||
|
@ -822,7 +826,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedExportedServicesCount: 1,
|
expectedImportedServsCount: 1,
|
||||||
|
expectedExportedServsCount: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "export c service",
|
name: "export c service",
|
||||||
|
@ -848,7 +853,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedExportedServicesCount: 2,
|
expectedImportedServsCount: 2,
|
||||||
|
expectedExportedServsCount: 2,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -872,13 +878,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
resp, err := peeringClient2.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
|
resp, err := peeringClient2.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
require.NotNil(r, resp.Peering)
|
require.NotNil(r, resp.Peering)
|
||||||
require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ImportedServiceCount)
|
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ImportedServiceCount)
|
||||||
|
|
||||||
// on List
|
// on List
|
||||||
resp2, err2 := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
resp2, err2 := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
||||||
require.NoError(r, err2)
|
require.NoError(r, err2)
|
||||||
require.NotEmpty(r, resp2.Peerings)
|
require.NotEmpty(r, resp2.Peerings)
|
||||||
require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ImportedServiceCount)
|
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ImportedServiceCount)
|
||||||
})
|
})
|
||||||
|
|
||||||
// Check that exported services count on S1 are what we expect
|
// Check that exported services count on S1 are what we expect
|
||||||
|
@ -887,13 +893,13 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
resp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
|
resp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
require.NotNil(r, resp.Peering)
|
require.NotNil(r, resp.Peering)
|
||||||
require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ExportedServiceCount)
|
require.Equal(r, tc.expectedImportedServsCount, resp.Peering.ExportedServiceCount)
|
||||||
|
|
||||||
// on List
|
// on List
|
||||||
resp2, err2 := peeringClient.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
resp2, err2 := peeringClient.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
||||||
require.NoError(r, err2)
|
require.NoError(r, err2)
|
||||||
require.NotEmpty(r, resp2.Peerings)
|
require.NotEmpty(r, resp2.Peerings)
|
||||||
require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ExportedServiceCount)
|
require.Equal(r, tc.expectedExportedServsCount, resp2.Peerings[0].ExportedServiceCount)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,7 +43,6 @@ func makeServiceResponse(
|
||||||
sn := structs.ServiceNameFromString(serviceName)
|
sn := structs.ServiceNameFromString(serviceName)
|
||||||
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
if !ok {
|
if !ok {
|
||||||
logger.Error("did not increment or decrement exported services count", "service_name", serviceName)
|
|
||||||
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
|
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -63,7 +62,6 @@ func makeServiceResponse(
|
||||||
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
|
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
|
||||||
// Case #1 is a no-op for the importing peer.
|
// Case #1 is a no-op for the importing peer.
|
||||||
if len(csn.Nodes) == 0 {
|
if len(csn.Nodes) == 0 {
|
||||||
logger.Trace("decrementing exported services count", "service_name", sn.String())
|
|
||||||
mst.RemoveExportedService(sn)
|
mst.RemoveExportedService(sn)
|
||||||
|
|
||||||
return &pbpeerstream.ReplicationMessage_Response{
|
return &pbpeerstream.ReplicationMessage_Response{
|
||||||
|
@ -75,7 +73,6 @@ func makeServiceResponse(
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Trace("incrementing exported services count", "service_name", sn.String())
|
|
||||||
mst.TrackExportedService(sn)
|
mst.TrackExportedService(sn)
|
||||||
|
|
||||||
// If there are nodes in the response, we push them as an UPSERT operation.
|
// If there are nodes in the response, we push them as an UPSERT operation.
|
||||||
|
@ -220,7 +217,6 @@ func (s *Server) handleUpsert(
|
||||||
return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err)
|
return fmt.Errorf("did not increment imported services count for service=%q: %w", sn.String(), err)
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Trace("incrementing imported services count", "service_name", sn.String())
|
|
||||||
mutableStatus.TrackImportedService(sn)
|
mutableStatus.TrackImportedService(sn)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -468,11 +464,9 @@ func (s *Server) handleDelete(
|
||||||
|
|
||||||
err := s.handleUpdateService(peerName, partition, sn, nil)
|
err := s.handleUpdateService(peerName, partition, sn, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("did not decrement imported services count", "service_name", sn.String(), "error", err)
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
logger.Trace("decrementing imported services count", "service_name", sn.String())
|
|
||||||
mutableStatus.RemoveImportedService(sn)
|
mutableStatus.RemoveImportedService(sn)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
@ -173,6 +173,14 @@ type Status struct {
|
||||||
ExportedServices map[string]struct{}
|
ExportedServices map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Status) GetImportedServicesCount() uint64 {
|
||||||
|
return uint64(len(s.ImportedServices))
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *Status) GetExportedServicesCount() uint64 {
|
||||||
|
return uint64(len(s.ExportedServices))
|
||||||
|
}
|
||||||
|
|
||||||
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
|
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
|
||||||
return &MutableStatus{
|
return &MutableStatus{
|
||||||
Status: Status{
|
Status: Status{
|
||||||
|
|
|
@ -13,6 +13,7 @@ import (
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
grpcstatus "google.golang.org/grpc/status"
|
grpcstatus "google.golang.org/grpc/status"
|
||||||
|
"google.golang.org/protobuf/proto"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
|
@ -338,17 +339,7 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ
|
||||||
return &pbpeering.PeeringReadResponse{Peering: nil}, nil
|
return &pbpeering.PeeringReadResponse{Peering: nil}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State))
|
cp := s.reconcilePeering(peering)
|
||||||
|
|
||||||
// add imported services count
|
|
||||||
st, found := s.Tracker.StreamStatus(peering.ID)
|
|
||||||
if !found {
|
|
||||||
s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID)
|
|
||||||
} else {
|
|
||||||
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
|
||||||
cp.ExportedServiceCount = uint64(len(st.ExportedServices))
|
|
||||||
}
|
|
||||||
|
|
||||||
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
|
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -379,34 +370,38 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
|
||||||
// reconcile the actual peering state; need to copy over the ds for peering
|
// reconcile the actual peering state; need to copy over the ds for peering
|
||||||
var cPeerings []*pbpeering.Peering
|
var cPeerings []*pbpeering.Peering
|
||||||
for _, p := range peerings {
|
for _, p := range peerings {
|
||||||
cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State))
|
cp := s.reconcilePeering(p)
|
||||||
|
|
||||||
// add imported services count
|
|
||||||
st, found := s.Tracker.StreamStatus(p.ID)
|
|
||||||
if !found {
|
|
||||||
s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID)
|
|
||||||
} else {
|
|
||||||
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
|
||||||
cp.ExportedServiceCount = uint64(len(st.ExportedServices))
|
|
||||||
}
|
|
||||||
|
|
||||||
cPeerings = append(cPeerings, cp)
|
cPeerings = append(cPeerings, cp)
|
||||||
}
|
}
|
||||||
|
|
||||||
return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil
|
return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(peering): Maybe get rid of this when actually monitoring the stream health
|
// TODO(peering): Get rid of this func when we stop using the stream tracker for imported/ exported services and the peering state
|
||||||
// reconciledStreamStateHint peaks into the streamTracker and determines whether a peering should be marked
|
// reconcilePeering enriches the peering with the following information:
|
||||||
// as PeeringState.Active or not
|
// -- PeeringState.Active if the peering is active
|
||||||
func (s *Server) reconciledStreamStateHint(pID string, pState pbpeering.PeeringState) pbpeering.PeeringState {
|
// -- ImportedServicesCount and ExportedServicesCount
|
||||||
streamState, found := s.Tracker.StreamStatus(pID)
|
// NOTE: we return a new peering with this additional data
|
||||||
|
func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering {
|
||||||
|
streamState, found := s.Tracker.StreamStatus(peering.ID)
|
||||||
|
if !found {
|
||||||
|
s.Logger.Warn("did not find peer in stream tracker; cannot populate imported and"+
|
||||||
|
" exported services count or reconcile peering state", "peerID", peering.ID)
|
||||||
|
return peering
|
||||||
|
} else {
|
||||||
|
cp := copyPeering(peering)
|
||||||
|
|
||||||
if found && streamState.Connected {
|
// reconcile pbpeering.PeeringState_Active
|
||||||
return pbpeering.PeeringState_ACTIVE
|
if streamState.Connected {
|
||||||
|
cp.State = pbpeering.PeeringState_ACTIVE
|
||||||
|
}
|
||||||
|
|
||||||
|
// add imported & exported services counts
|
||||||
|
cp.ImportedServiceCount = streamState.GetImportedServicesCount()
|
||||||
|
cp.ExportedServiceCount = streamState.GetExportedServicesCount()
|
||||||
|
|
||||||
|
return cp
|
||||||
}
|
}
|
||||||
|
|
||||||
// default, no reconciliation
|
|
||||||
return pState
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
|
// TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store.
|
||||||
|
@ -605,22 +600,9 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin
|
||||||
return id, nil
|
return id, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering {
|
func copyPeering(p *pbpeering.Peering) *pbpeering.Peering {
|
||||||
return &pbpeering.Peering{
|
var copyP pbpeering.Peering
|
||||||
ID: p.ID,
|
proto.Merge(©P, p)
|
||||||
Name: p.Name,
|
|
||||||
Partition: p.Partition,
|
|
||||||
DeletedAt: p.DeletedAt,
|
|
||||||
Meta: p.Meta,
|
|
||||||
PeerID: p.PeerID,
|
|
||||||
PeerCAPems: p.PeerCAPems,
|
|
||||||
PeerServerAddresses: p.PeerServerAddresses,
|
|
||||||
PeerServerName: p.PeerServerName,
|
|
||||||
CreateIndex: p.CreateIndex,
|
|
||||||
ModifyIndex: p.ModifyIndex,
|
|
||||||
ImportedServiceCount: p.ImportedServiceCount,
|
|
||||||
ExportedServiceCount: p.ExportedServiceCount,
|
|
||||||
|
|
||||||
State: state,
|
return ©P
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,9 +152,7 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) {
|
||||||
})
|
})
|
||||||
defer s2.Stop()
|
defer s2.Stop()
|
||||||
|
|
||||||
testutil.RunStep(t, "register services to get synced dc2", func(t *testing.T) {
|
testNodeServiceCheckRegistrations(t, c2, "dc2")
|
||||||
testNodeServiceCheckRegistrations(t, c2, "dc2")
|
|
||||||
})
|
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
Loading…
Reference in New Issue