From adb5ffa1a6dd025822d264b75c203a4e3674fcf0 Mon Sep 17 00:00:00 2001 From: alex <8968914+acpana@users.noreply.github.com> Date: Fri, 15 Jul 2022 10:20:43 -0700 Subject: [PATCH] peering: track imported services (#13718) --- agent/consul/leader_peering_test.go | 277 +++++++++++++++++- .../services/peerstream/replication.go | 33 ++- .../services/peerstream/stream_resources.go | 2 +- .../services/peerstream/stream_test.go | 77 ++++- .../services/peerstream/stream_tracker.go | 33 ++- agent/rpc/peering/service.go | 43 ++- 6 files changed, 422 insertions(+), 43 deletions(-) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 222b59279a..1587fc30c7 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -17,6 +17,7 @@ import ( "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" + "github.com/hashicorp/consul/types" ) func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { @@ -309,11 +310,6 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId Node: "aaa", PeerName: peer, }, - { - CheckID: structs.SerfCheckID, - Node: "aaa", - PeerName: peer, - }, }, })) @@ -336,11 +332,6 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId Node: "bbb", PeerName: peer, }, - { - CheckID: structs.SerfCheckID, - Node: "bbb", - PeerName: peer, - }, }, })) @@ -363,13 +354,269 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId Node: "ccc", PeerName: peer, }, - { - CheckID: structs.SerfCheckID, - Node: "ccc", - PeerName: peer, - }, }, })) return lastIdx } + +// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe +func TestLeader_Peering_ImportedServicesCount(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + // TODO(peering): Configure with TLS + _, s1 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s1.dc1" + c.Datacenter = "dc1" + c.TLSConfig.Domain = "consul" + }) + testrpc.WaitForLeader(t, s1.RPC, "dc1") + + // Create a peering by generating a token + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + t.Cleanup(cancel) + + conn, err := grpc.DialContext(ctx, s1.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn.Close() + + peeringClient := pbpeering.NewPeeringServiceClient(conn) + + req := pbpeering.GenerateTokenRequest{ + PeerName: "my-peer-s2", + } + resp, err := peeringClient.GenerateToken(ctx, &req) + require.NoError(t, err) + + tokenJSON, err := base64.StdEncoding.DecodeString(resp.PeeringToken) + require.NoError(t, err) + + var token structs.PeeringToken + require.NoError(t, json.Unmarshal(tokenJSON, &token)) + + var ( + s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + lastIdx = uint64(0) + ) + + // Bring up s2 and store s1's token so that it attempts to dial. + _, s2 := testServerWithConfig(t, func(c *Config) { + c.NodeName = "s2.dc2" + c.Datacenter = "dc2" + c.PrimaryDatacenter = "dc2" + }) + testrpc.WaitForLeader(t, s2.RPC, "dc2") + + // Simulate a peering initiation event by writing a peering with data from a peering token. + // Eventually the leader in dc2 should dial and connect to the leader in dc1. + p := &pbpeering.Peering{ + ID: s2PeerID, + Name: "my-peer-s1", + PeerID: token.PeerID, + PeerCAPems: token.CA, + PeerServerName: token.ServerName, + PeerServerAddresses: token.ServerAddresses, + } + require.True(t, p.ShouldDial()) + + lastIdx++ + require.NoError(t, s2.fsm.State().PeeringWrite(lastIdx, p)) + + /// add services to S1 to be synced to S2 + lastIdx++ + require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{ + ID: types.NodeID(generateUUID()), + Node: "aaa", + Address: "10.0.0.1", + Service: &structs.NodeService{ + Service: "a-service", + ID: "a-service-1", + Port: 8080, + }, + Checks: structs.HealthChecks{ + { + CheckID: "a-service-1-check", + ServiceName: "a-service", + ServiceID: "a-service-1", + Node: "aaa", + }, + }, + })) + + lastIdx++ + require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{ + ID: types.NodeID(generateUUID()), + + Node: "bbb", + Address: "10.0.0.2", + Service: &structs.NodeService{ + Service: "b-service", + ID: "b-service-1", + Port: 8080, + }, + Checks: structs.HealthChecks{ + { + CheckID: "b-service-1-check", + ServiceName: "b-service", + ServiceID: "b-service-1", + Node: "bbb", + }, + }, + })) + + lastIdx++ + require.NoError(t, s1.FSM().State().EnsureRegistration(lastIdx, &structs.RegisterRequest{ + ID: types.NodeID(generateUUID()), + + Node: "ccc", + Address: "10.0.0.3", + Service: &structs.NodeService{ + Service: "c-service", + ID: "c-service-1", + Port: 8080, + }, + Checks: structs.HealthChecks{ + { + CheckID: "c-service-1-check", + ServiceName: "c-service", + ServiceID: "c-service-1", + Node: "ccc", + }, + }, + })) + /// finished adding services + + type testCase struct { + name string + description string + exportedService structs.ExportedServicesConfigEntry + expectedImportedServicesCount uint64 + } + + testCases := []testCase{ + { + name: "wildcard", + description: "for a wildcard exported services, we want to see all services synced", + exportedService: structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: structs.WildcardSpecifier, + Consumers: []structs.ServiceConsumer{ + { + PeerName: "my-peer-s2", + }, + }, + }, + }, + }, + expectedImportedServicesCount: 4, // 3 services from above + the "consul" service + }, + { + name: "no sync", + description: "update the config entry to allow no service sync", + exportedService: structs.ExportedServicesConfigEntry{ + Name: "default", + }, + expectedImportedServicesCount: 0, // we want to see this decremented from 4 --> 0 + }, + { + name: "just a, b services", + description: "export just two services", + exportedService: structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "a-service", + Consumers: []structs.ServiceConsumer{ + { + PeerName: "my-peer-s2", + }, + }, + }, + { + Name: "b-service", + Consumers: []structs.ServiceConsumer{ + { + PeerName: "my-peer-s2", + }, + }, + }, + }, + }, + expectedImportedServicesCount: 2, + }, + { + name: "unexport b service", + description: "by unexporting b we want to see the count decrement eventually", + exportedService: structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "a-service", + Consumers: []structs.ServiceConsumer{ + { + PeerName: "my-peer-s2", + }, + }, + }, + }, + }, + expectedImportedServicesCount: 1, + }, + { + name: "export c service", + description: "now export the c service and expect the count to increment", + exportedService: structs.ExportedServicesConfigEntry{ + Name: "default", + Services: []structs.ExportedService{ + { + Name: "a-service", + Consumers: []structs.ServiceConsumer{ + { + PeerName: "my-peer-s2", + }, + }, + }, + { + Name: "c-service", + Consumers: []structs.ServiceConsumer{ + { + PeerName: "my-peer-s2", + }, + }, + }, + }, + }, + expectedImportedServicesCount: 2, + }, + } + + conn2, err := grpc.DialContext(ctx, s2.config.RPCAddr.String(), + grpc.WithContextDialer(newServerDialer(s2.config.RPCAddr.String())), + grpc.WithInsecure(), + grpc.WithBlock()) + require.NoError(t, err) + defer conn2.Close() + + peeringClient2 := pbpeering.NewPeeringServiceClient(conn2) + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lastIdx++ + require.NoError(t, s1.fsm.State().EnsureConfigEntry(lastIdx, &tc.exportedService)) + + retry.Run(t, func(r *retry.R) { + resp2, err := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{}) + require.NoError(r, err) + require.NotEmpty(r, resp2.Peerings) + require.Equal(r, tc.expectedImportedServicesCount, resp2.Peerings[0].ImportedServiceCount) + }) + }) + } +} diff --git a/agent/grpc-external/services/peerstream/replication.go b/agent/grpc-external/services/peerstream/replication.go index 7ec58b5f4e..e21b48a638 100644 --- a/agent/grpc-external/services/peerstream/replication.go +++ b/agent/grpc-external/services/peerstream/replication.go @@ -113,7 +113,9 @@ func marshalToProtoAny[T proto.Message](in any) (*anypb.Any, T, error) { func (s *Server) processResponse( peerName string, partition string, + mutableStatus *MutableStatus, resp *pbpeerstream.ReplicationMessage_Response, + logger hclog.Logger, ) (*pbpeerstream.ReplicationMessage, error) { if !pbpeerstream.KnownTypeURL(resp.ResourceURL) { err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) @@ -137,7 +139,7 @@ func (s *Server) processResponse( ), err } - if err := s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil { + if err := s.handleUpsert(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, resp.Resource, logger); err != nil { return makeNACKReply( resp.ResourceURL, resp.Nonce, @@ -149,7 +151,7 @@ func (s *Server) processResponse( return makeACKReply(resp.ResourceURL, resp.Nonce), nil case pbpeerstream.Operation_OPERATION_DELETE: - if err := s.handleDelete(peerName, partition, resp.ResourceURL, resp.ResourceID); err != nil { + if err := s.handleDelete(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, logger); err != nil { return makeNACKReply( resp.ResourceURL, resp.Nonce, @@ -178,9 +180,11 @@ func (s *Server) processResponse( func (s *Server) handleUpsert( peerName string, partition string, + mutableStatus *MutableStatus, resourceURL string, resourceID string, resource *anypb.Any, + logger hclog.Logger, ) error { switch resourceURL { case pbpeerstream.TypeURLService: @@ -192,7 +196,16 @@ func (s *Server) handleUpsert( return fmt.Errorf("failed to unmarshal resource: %w", err) } - return s.handleUpdateService(peerName, partition, sn, csn) + err := s.handleUpdateService(peerName, partition, sn, csn) + if err != nil { + logger.Error("did not increment imported services count", "service_name", sn.String(), "error", err) + return err + } + + logger.Trace("incrementing imported services count", "service_name", sn.String()) + mutableStatus.TrackImportedService(sn) + + return nil case pbpeerstream.TypeURLRoots: roots := &pbpeering.PeeringTrustBundle{} @@ -425,14 +438,26 @@ func (s *Server) handleUpsertRoots( func (s *Server) handleDelete( peerName string, partition string, + mutableStatus *MutableStatus, resourceURL string, resourceID string, + logger hclog.Logger, ) error { switch resourceURL { case pbpeerstream.TypeURLService: sn := structs.ServiceNameFromString(resourceID) sn.OverridePartition(partition) - return s.handleUpdateService(peerName, partition, sn, nil) + + err := s.handleUpdateService(peerName, partition, sn, nil) + if err != nil { + logger.Error("did not decrement imported services count", "service_name", sn.String(), "error", err) + return err + } + + logger.Trace("decrementing imported services count", "service_name", sn.String()) + mutableStatus.RemoveImportedService(sn) + + return nil default: return fmt.Errorf("unexpected resourceURL: %s", resourceURL) diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 6113400827..eabd011412 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -302,7 +302,7 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error { if resp := msg.GetResponse(); resp != nil { // TODO(peering): Ensure there's a nonce - reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, resp) + reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger) if err != nil { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) status.TrackReceiveError(err.Error()) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index de1455a632..612513158e 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -475,6 +475,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { LastNack: lastNack, LastNackMessage: lastNackMsg, LastReceiveSuccess: lastRecvSuccess, + ImportedServices: map[string]struct{}{"api": {}}, } retry.Run(t, func(r *retry.R) { @@ -532,6 +533,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { LastReceiveSuccess: lastRecvSuccess, LastReceiveError: lastRecvError, LastReceiveErrorMessage: lastRecvErrorMsg, + ImportedServices: map[string]struct{}{"api": {}}, } retry.Run(t, func(r *retry.R) { @@ -559,6 +561,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { LastReceiveSuccess: lastRecvSuccess, LastReceiveErrorMessage: io.EOF.Error(), LastReceiveError: lastRecvError, + ImportedServices: map[string]struct{}{"api": {}}, } retry.Run(t, func(r *retry.R) { @@ -968,6 +971,9 @@ func (b *testStreamBackend) CatalogDeregister(req *structs.DeregisterRequest) er } func Test_processResponse_Validation(t *testing.T) { + peerName := "billing" + peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" + type testCase struct { name string in *pbpeerstream.ReplicationMessage_Response @@ -975,10 +981,18 @@ func Test_processResponse_Validation(t *testing.T) { wantErr bool } - srv, _ := newTestServer(t, nil) + srv, store := newTestServer(t, nil) + require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{ + ID: peerID, + Name: peerName}, + )) + + // connect the stream + mst, err := srv.Tracker.Connected(peerID) + require.NoError(t, err) run := func(t *testing.T, tc testCase) { - reply, err := srv.processResponse("", "", tc.in) + reply, err := srv.processResponse(peerName, "", mst, tc.in, srv.Logger) if tc.wantErr { require.Error(t, err) } else { @@ -1218,8 +1232,8 @@ func expectReplEvents(t *testing.T, client *MockClient, checkFns ...func(t *test } } -func TestHandleUpdateService(t *testing.T) { - srv, _ := newTestServer(t, func(c *Config) { +func Test_processResponse_handleUpsert_handleDelete(t *testing.T) { + srv, store := newTestServer(t, func(c *Config) { backend := c.Backend.(*testStreamBackend) backend.leader = func() bool { return false @@ -1227,13 +1241,15 @@ func TestHandleUpdateService(t *testing.T) { }) type testCase struct { - name string - seed []*structs.RegisterRequest - input *pbservice.IndexedCheckServiceNodes - expect map[string]structs.CheckServiceNodes + name string + seed []*structs.RegisterRequest + input *pbservice.IndexedCheckServiceNodes + expect map[string]structs.CheckServiceNodes + expectedImportedServicesCount int } peerName := "billing" + peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5" remoteMeta := pbcommon.NewEnterpriseMetaFromStructs(*structs.DefaultEnterpriseMetaInPartition("billing-ap")) // "api" service is imported from the billing-ap partition, corresponding to the billing peer. @@ -1241,14 +1257,43 @@ func TestHandleUpdateService(t *testing.T) { defaultMeta := *acl.DefaultEnterpriseMeta() apiSN := structs.NewServiceName("api", &defaultMeta) + // create a peering in the state store + require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{ + ID: peerID, + Name: peerName}, + )) + + // connect the stream + mst, err := srv.Tracker.Connected(peerID) + require.NoError(t, err) + run := func(t *testing.T, tc testCase) { // Seed the local catalog with some data to reconcile against. + // and increment the tracker's imported services count for _, reg := range tc.seed { require.NoError(t, srv.Backend.CatalogRegister(reg)) + + mst.TrackImportedService(reg.Service.CompoundServiceName()) + } + + var op pbpeerstream.Operation + if len(tc.input.Nodes) == 0 { + op = pbpeerstream.Operation_OPERATION_DELETE + } else { + op = pbpeerstream.Operation_OPERATION_UPSERT + } + + in := &pbpeerstream.ReplicationMessage_Response{ + ResourceURL: pbpeerstream.TypeURLService, + ResourceID: apiSN.String(), + Nonce: "1", + Operation: op, + Resource: makeAnyPB(t, tc.input), } // Simulate an update arriving for billing/api. - require.NoError(t, srv.handleUpdateService(peerName, acl.DefaultPartitionName, apiSN, tc.input)) + _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in, srv.Logger) + require.NoError(t, err) for svc, expect := range tc.expect { t.Run(svc, func(t *testing.T) { @@ -1257,6 +1302,9 @@ func TestHandleUpdateService(t *testing.T) { requireEqualInstances(t, expect, got) }) } + + // assert the imported services count modifications + require.Equal(t, tc.expectedImportedServicesCount, mst.GetImportedServicesCount()) } tt := []testCase{ @@ -1390,6 +1438,7 @@ func TestHandleUpdateService(t *testing.T) { }, }, }, + expectedImportedServicesCount: 1, }, { name: "upsert two service instances to different nodes", @@ -1521,6 +1570,7 @@ func TestHandleUpdateService(t *testing.T) { }, }, }, + expectedImportedServicesCount: 1, }, { name: "receiving a nil input leads to deleting data in the catalog", @@ -1574,10 +1624,11 @@ func TestHandleUpdateService(t *testing.T) { }, }, }, - input: nil, + input: &pbservice.IndexedCheckServiceNodes{}, expect: map[string]structs.CheckServiceNodes{ "api": {}, }, + expectedImportedServicesCount: 0, }, { name: "deleting one service name from a node does not delete other service names", @@ -1632,7 +1683,7 @@ func TestHandleUpdateService(t *testing.T) { }, }, // Nil input is for the "api" service. - input: nil, + input: &pbservice.IndexedCheckServiceNodes{}, expect: map[string]structs.CheckServiceNodes{ "api": {}, // Existing redis service was not affected by deletion. @@ -1668,6 +1719,7 @@ func TestHandleUpdateService(t *testing.T) { }, }, }, + expectedImportedServicesCount: 1, }, { name: "service checks are cleaned up when not present in a response", @@ -1738,6 +1790,7 @@ func TestHandleUpdateService(t *testing.T) { }, }, }, + expectedImportedServicesCount: 2, }, { name: "node checks are cleaned up when not present in a response", @@ -1872,6 +1925,7 @@ func TestHandleUpdateService(t *testing.T) { }, }, }, + expectedImportedServicesCount: 2, }, { name: "replacing a service instance on a node cleans up the old instance", @@ -2019,6 +2073,7 @@ func TestHandleUpdateService(t *testing.T) { }, }, }, + expectedImportedServicesCount: 2, }, } diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index 5ec0f7ebf7..4d4c8746cd 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -4,9 +4,11 @@ import ( "fmt" "sync" "time" + + "github.com/hashicorp/consul/agent/structs" ) -// Tracker contains a map of (PeerID -> Status). +// Tracker contains a map of (PeerID -> MutableStatus). // As streams are opened and closed we track details about their status. type Tracker struct { mu sync.RWMutex @@ -142,6 +144,10 @@ type Status struct { // - The error message when we failed to store a resource replicated FROM the peer. // - The last error message when receiving from the stream. LastReceiveErrorMessage string + + // TODO(peering): consider keeping track of imported service counts thru raft + // ImportedServices is set that keeps track of which service names are imported for the peer + ImportedServices map[string]struct{} } func newMutableStatus(now func() time.Time) *MutableStatus { @@ -222,3 +228,28 @@ func (s *MutableStatus) GetStatus() Status { return copy } + +func (s *MutableStatus) RemoveImportedService(sn structs.ServiceName) { + s.mu.Lock() + defer s.mu.Unlock() + + delete(s.ImportedServices, sn.String()) +} + +func (s *MutableStatus) TrackImportedService(sn structs.ServiceName) { + s.mu.Lock() + defer s.mu.Unlock() + + if s.ImportedServices == nil { + s.ImportedServices = make(map[string]struct{}) + } + + s.ImportedServices[sn.String()] = struct{}{} +} + +func (s *MutableStatus) GetImportedServicesCount() int { + s.mu.RLock() + defer s.mu.RUnlock() + + return len(s.ImportedServices) +} diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 94b7d73a30..47e39c2b7b 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -337,7 +337,17 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ if peering == nil { return &pbpeering.PeeringReadResponse{Peering: nil}, nil } + cp := copyPeeringWithNewState(peering, s.reconciledStreamStateHint(peering.ID, peering.State)) + + // add imported services count + st, found := s.Tracker.StreamStatus(peering.ID) + if !found { + s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID) + } else { + cp.ImportedServiceCount = uint64(len(st.ImportedServices)) + } + return &pbpeering.PeeringReadResponse{Peering: cp}, nil } @@ -369,6 +379,15 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ var cPeerings []*pbpeering.Peering for _, p := range peerings { cp := copyPeeringWithNewState(p, s.reconciledStreamStateHint(p.ID, p.State)) + + // add imported services count + st, found := s.Tracker.StreamStatus(p.ID) + if !found { + s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID) + } else { + cp.ImportedServiceCount = uint64(len(st.ImportedServices)) + } + cPeerings = append(cPeerings, cp) } return &pbpeering.PeeringListResponse{Peerings: cPeerings}, nil @@ -586,17 +605,19 @@ func (s *Server) getExistingOrCreateNewPeerID(peerName, partition string) (strin func copyPeeringWithNewState(p *pbpeering.Peering, state pbpeering.PeeringState) *pbpeering.Peering { return &pbpeering.Peering{ - ID: p.ID, - Name: p.Name, - Partition: p.Partition, - DeletedAt: p.DeletedAt, - Meta: p.Meta, - PeerID: p.PeerID, - PeerCAPems: p.PeerCAPems, - PeerServerAddresses: p.PeerServerAddresses, - PeerServerName: p.PeerServerName, - CreateIndex: p.CreateIndex, - ModifyIndex: p.ModifyIndex, + ID: p.ID, + Name: p.Name, + Partition: p.Partition, + DeletedAt: p.DeletedAt, + Meta: p.Meta, + PeerID: p.PeerID, + PeerCAPems: p.PeerCAPems, + PeerServerAddresses: p.PeerServerAddresses, + PeerServerName: p.PeerServerName, + CreateIndex: p.CreateIndex, + ModifyIndex: p.ModifyIndex, + ImportedServiceCount: p.ImportedServiceCount, + ExportedServiceCount: p.ExportedServiceCount, State: state, }