diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index fd5e474d41..3bcafa986e 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -344,7 +344,11 @@ func (s *Service) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadReq if err != nil { return nil, err } - return &pbpeering.PeeringReadResponse{Peering: peering}, nil + if peering == nil { + return &pbpeering.PeeringReadResponse{Peering: nil}, nil + } + cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State)) + return &pbpeering.PeeringReadResponse{Peering: cp}, nil } func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequest) (*pbpeering.PeeringListResponse, error) { @@ -370,7 +374,28 @@ func (s *Service) PeeringList(ctx context.Context, req *pbpeering.PeeringListReq if err != nil { return nil, err } - return &pbpeering.PeeringListResponse{Peerings: peerings}, nil + + // reconcile the actual peering state; need to copy over the ds for peering + var cPeerings []*pbpeering.Peering + for _, p := range peerings { + cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State)) + cPeerings = append(cPeerings, cp) + } + return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil +} + +// TODO(peering): Maybe get rid of this when actually monitoring the stream health +// reconciledStreamStateHint peaks into the streamTracker and determines whether a peering should be marked +// as PeeringState.Active or not +func (s *Service) reconciledStreamStateHint(pID string, pState pbpeering.PeeringState) pbpeering.PeeringState { + streamState, found := s.streams.streamStatus(pID) + + if found && streamState.Connected { + return pbpeering.PeeringState_ACTIVE + } + + // default, no reconciliation + return pState } // TODO(peering): As of writing, this method is only used in tests to set up Peerings in the state store. @@ -930,3 +955,21 @@ func logTraceProto(logger hclog.Logger, pb proto.Message, received bool) { logger.Trace("replication message", "direction", dir, "protobuf", out) } + +func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering { + return &pbpeering.Peering{ + ID: p.ID, + Name: p.Name, + Partition: p.Partition, + DeletedAt: p.DeletedAt, + Meta: p.Meta, + PeerID: p.PeerID, + PeerCAPems: p.PeerCAPems, + PeerServerAddresses: p.PeerServerAddresses, + PeerServerName: p.PeerServerName, + CreateIndex: p.CreateIndex, + ModifyIndex: p.ModifyIndex, + + State: state, + } +} diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index 48e45e27b8..7db5a349cf 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -853,6 +853,26 @@ func Test_StreamHandler_UpsertServices(t *testing.T) { run(t, tc) }) } + + // call PeeringRead and look at the peering state; the peering state must be active + { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + resp, err := srv.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: localPeerName}) + require.NoError(t, err) + require.Equal(t, pbpeering.PeeringState_ACTIVE, resp.Peering.State) + } + + // call PeeringList and look at the peering state; the peering state must be active + { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + t.Cleanup(cancel) + + resp, err := srv.PeeringList(ctx, &pbpeering.PeeringListRequest{}) + require.NoError(t, err) + require.Equal(t, pbpeering.PeeringState_ACTIVE, resp.Peerings[0].State) + } } // newTestServer is copied from partition/service_test.go, with the addition of certs/cas. diff --git a/api/peering_test.go b/api/peering_test.go index 015da723f3..b8bfaa264f 100644 --- a/api/peering_test.go +++ b/api/peering_test.go @@ -140,11 +140,21 @@ func TestAPI_Peering_GenerateToken(t *testing.T) { func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) { t.Parallel() - c, s := makeClientWithCA(t) + c, s := makeClient(t) // this is "dc1" defer s.Stop() s.WaitForSerfCheck(t) - ctx, cancel := context.WithTimeout(context.Background(), DefaultCtxDuration) + // make a "client" server in second DC for peering + c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { + conf.Datacenter = "dc2" + }) + defer s2.Stop() + + testutil.RunStep(t, "register services to get synced dc2", func(t *testing.T) { + testNodeServiceCheckRegistrations(t, c2, "dc2") + }) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() var token1 string @@ -175,12 +185,6 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) { require.Equal(t, map[string]string{"foo": "bar"}, resp.Meta) }) - // make a "client" server in second DC for peering - c2, s2 := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) { - conf.Datacenter = "dc2" - }) - defer s2.Stop() - testutil.RunStep(t, "establish peering", func(t *testing.T) { i := PeeringEstablishRequest{ Datacenter: c2.config.Datacenter, @@ -201,8 +205,42 @@ func TestAPI_Peering_GenerateToken_Read_Establish_Delete(t *testing.T) { // require that the peering state is not undefined require.Equal(r, PeeringStateInitial, resp.State) require.Equal(r, map[string]string{"foo": "bar"}, resp.Meta) + }) + }) - // TODO(peering) -- let's go all the way and test in code either here or somewhere else that PeeringState does move to Active + testutil.RunStep(t, "look for active state of peering in dc2", func(t *testing.T) { + // read and list the peer to make sure the status transitions to active + retry.Run(t, func(r *retry.R) { + peering, qm, err := c2.Peerings().Read(ctx, "peer1", nil) + require.NoError(r, err) + require.NotNil(r, qm) + require.NotNil(r, peering) + require.Equal(r, PeeringStateActive, peering.State) + + peerings, qm, err := c2.Peerings().List(ctx, nil) + + require.NoError(r, err) + require.NotNil(r, qm) + require.NotNil(r, peerings) + require.Equal(r, PeeringStateActive, peerings[0].State) + }) + }) + + testutil.RunStep(t, "look for active state of peering in dc1", func(t *testing.T) { + // read and list the peer to make sure the status transitions to active + retry.Run(t, func(r *retry.R) { + peering, qm, err := c.Peerings().Read(ctx, "peer1", nil) + require.NoError(r, err) + require.NotNil(r, qm) + require.NotNil(r, peering) + require.Equal(r, PeeringStateActive, peering.State) + + peerings, qm, err := c.Peerings().List(ctx, nil) + + require.NoError(r, err) + require.NotNil(r, qm) + require.NotNil(r, peerings) + require.Equal(r, PeeringStateActive, peerings[0].State) }) })