diff --git a/agent/rpc/peering/replication.go b/agent/rpc/peering/replication.go new file mode 100644 index 0000000000..e820d959fe --- /dev/null +++ b/agent/rpc/peering/replication.go @@ -0,0 +1,247 @@ +package peering + +import ( + "errors" + "fmt" + "strconv" + "strings" + + "github.com/golang/protobuf/ptypes" + "github.com/hashicorp/go-hclog" + "google.golang.org/genproto/googleapis/rpc/code" + "google.golang.org/protobuf/types/known/anypb" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/cache" + "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/proto/pbpeering" + "github.com/hashicorp/consul/proto/pbservice" + "github.com/hashicorp/consul/proto/pbstatus" + "github.com/hashicorp/consul/types" +) + +// pushService response handles sending exported service instance updates to the peer cluster. +// Each cache.UpdateEvent will contain all instances for a service name. +// If there are no instances in the event, we consider that to be a de-registration. +func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error { + csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) + if !ok { + logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result)) + + // Skip this update to avoid locking up peering due to a bad service update. + return nil + } + serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) + + // If no nodes are present then it's due to one of: + // 1. The service is newly registered or exported and yielded a transient empty update. + // 2. All instances of the service were de-registered. + // 3. The service was un-exported. + // + // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. + // Case #1 is a no-op for the importing peer. + if len(csn.Nodes) == 0 { + resp := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Response_{ + Response: &pbpeering.ReplicationMessage_Response{ + ResourceURL: pbpeering.TypeURLService, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: serviceName, + Operation: pbpeering.ReplicationMessage_Response_DELETE, + }, + }, + } + logTraceSend(logger, resp) + if err := stream.Send(resp); err != nil { + status.trackSendError(err.Error()) + return fmt.Errorf("failed to send to stream: %v", err) + } + return nil + } + + // If there are nodes in the response, we push them as an UPSERT operation. + any, err := ptypes.MarshalAny(csn) + if err != nil { + // Log the error and skip this response to avoid locking up peering due to a bad update event. + logger.Error("failed to marshal service endpoints", "error", err) + return nil + } + resp := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Response_{ + Response: &pbpeering.ReplicationMessage_Response{ + ResourceURL: pbpeering.TypeURLService, + // TODO(peering): Nonce management + Nonce: "", + ResourceID: serviceName, + Operation: pbpeering.ReplicationMessage_Response_UPSERT, + Resource: any, + }, + }, + } + logTraceSend(logger, resp) + if err := stream.Send(resp); err != nil { + status.trackSendError(err.Error()) + return fmt.Errorf("failed to send to stream: %v", err) + } + return nil +} + +func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { + var ( + err error + errCode code.Code + errMsg string + ) + + if resp.ResourceURL != pbpeering.TypeURLService { + errCode = code.Code_INVALID_ARGUMENT + err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) + return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err + } + + switch resp.Operation { + case pbpeering.ReplicationMessage_Response_UPSERT: + if resp.Resource == nil { + break + } + err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource) + if err != nil { + errCode = code.Code_INTERNAL + errMsg = err.Error() + } + + case pbpeering.ReplicationMessage_Response_DELETE: + err = handleDelete(resp.ResourceURL, resp.ResourceID) + if err != nil { + errCode = code.Code_INTERNAL + errMsg = err.Error() + } + + default: + errCode = code.Code_INVALID_ARGUMENT + + op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)] + if op == "" { + op = strconv.FormatInt(int64(resp.Operation), 10) + } + errMsg = fmt.Sprintf("unsupported operation: %q", op) + + err = errors.New(errMsg) + } + + return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err +} + +func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error { + csn := &pbservice.IndexedCheckServiceNodes{} + err := ptypes.UnmarshalAny(resource, csn) + if err != nil { + return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + if csn == nil || len(csn.Nodes) == 0 { + return nil + } + + type checkTuple struct { + checkID types.CheckID + serviceID string + nodeID types.NodeID + + acl.EnterpriseMeta + } + + var ( + nodes = make(map[types.NodeID]*structs.Node) + services = make(map[types.NodeID][]*structs.NodeService) + checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck) + ) + + for _, pbinstance := range csn.Nodes { + instance, err := pbservice.CheckServiceNodeToStructs(pbinstance) + if err != nil { + return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + + nodes[instance.Node.ID] = instance.Node + services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service) + + if _, ok := checks[instance.Node.ID]; !ok { + checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck) + } + for _, c := range instance.Checks { + tuple := checkTuple{ + checkID: c.CheckID, + serviceID: c.ServiceID, + nodeID: instance.Node.ID, + EnterpriseMeta: c.EnterpriseMeta, + } + checks[instance.Node.ID][tuple] = c + } + } + + for nodeID, node := range nodes { + // For all nodes, services, and checks we override the peer name and partition to be + // the local partition and local name for the peer. + node.PeerName, node.Partition = peerName, partition + + // First register the node + req := node.ToRegisterRequest() + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + + // Then register all services on that node + for _, svc := range services[nodeID] { + svc.PeerName = peerName + svc.OverridePartition(partition) + + req.Service = svc + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + } + req.Service = nil + + // Then register all checks on that node + var chks structs.HealthChecks + for _, c := range checks[nodeID] { + c.PeerName = peerName + c.OverridePartition(partition) + + chks = append(chks, c) + } + + req.Checks = chks + if err := s.Backend.Apply().CatalogRegister(&req); err != nil { + return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) + } + } + return nil +} + +func handleDelete(resourceURL string, resourceID string) error { + // TODO(peering): implement + return nil +} + +func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage { + var rpcErr *pbstatus.Status + if errCode != code.Code_OK || errMsg != "" { + rpcErr = &pbstatus.Status{ + Code: int32(errCode), + Message: errMsg, + } + } + + msg := &pbpeering.ReplicationMessage{ + Payload: &pbpeering.ReplicationMessage_Request_{ + Request: &pbpeering.ReplicationMessage_Request{ + ResourceURL: resourceURL, + Nonce: nonce, + Error: rpcErr, + }, + }, + } + return msg +} diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 9f15960ce3..a3955bb30c 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -5,32 +5,25 @@ import ( "errors" "fmt" "io" - "strconv" "strings" "time" "github.com/armon/go-metrics" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/grpc" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" - "google.golang.org/protobuf/types/known/anypb" "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/proto/pbpeering" - "github.com/hashicorp/consul/proto/pbservice" - "github.com/hashicorp/consul/proto/pbstatus" - "github.com/hashicorp/consul/types" ) var ( @@ -651,73 +644,6 @@ func (s *Service) HandleStream(req HandleStreamRequest) error { } } -// pushService response handles sending exported service instance updates to the peer cluster. -// Each cache.UpdateEvent will contain all instances for a service name. -// If there are no instances in the event, we consider that to be a de-registration. -func pushServiceResponse(logger hclog.Logger, stream BidirectionalStream, status *lockableStreamStatus, update cache.UpdateEvent) error { - csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes) - if !ok { - logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result)) - - // Skip this update to avoid locking up peering due to a bad service update. - return nil - } - serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService) - - // If no nodes are present then it's due to one of: - // 1. The service is newly registered or exported and yielded a transient empty update. - // 2. All instances of the service were de-registered. - // 3. The service was un-exported. - // - // We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that. - // Case #1 is a no-op for the importing peer. - if len(csn.Nodes) == 0 { - resp := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Response_{ - Response: &pbpeering.ReplicationMessage_Response{ - ResourceURL: pbpeering.TypeURLService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeering.ReplicationMessage_Response_DELETE, - }, - }, - } - logTraceSend(logger, resp) - if err := stream.Send(resp); err != nil { - status.trackSendError(err.Error()) - return fmt.Errorf("failed to send to stream: %v", err) - } - return nil - } - - // If there are nodes in the response, we push them as an UPSERT operation. - any, err := ptypes.MarshalAny(csn) - if err != nil { - // Log the error and skip this response to avoid locking up peering due to a bad update event. - logger.Error("failed to marshal service endpoints", "error", err) - return nil - } - resp := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Response_{ - Response: &pbpeering.ReplicationMessage_Response{ - ResourceURL: pbpeering.TypeURLService, - // TODO(peering): Nonce management - Nonce: "", - ResourceID: serviceName, - Operation: pbpeering.ReplicationMessage_Response_UPSERT, - Resource: any, - }, - }, - } - logTraceSend(logger, resp) - if err := stream.Send(resp); err != nil { - status.trackSendError(err.Error()) - return fmt.Errorf("failed to send to stream: %v", err) - } - return nil -} - func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) { return s.streams.streamStatus(peer) } @@ -727,165 +653,6 @@ func (s *Service) ConnectedStreams() map[string]chan struct{} { return s.streams.connectedStreams() } -func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage { - var rpcErr *pbstatus.Status - if errCode != code.Code_OK || errMsg != "" { - rpcErr = &pbstatus.Status{ - Code: int32(errCode), - Message: errMsg, - } - } - - msg := &pbpeering.ReplicationMessage{ - Payload: &pbpeering.ReplicationMessage_Request_{ - Request: &pbpeering.ReplicationMessage_Request{ - ResourceURL: resourceURL, - Nonce: nonce, - Error: rpcErr, - }, - }, - } - return msg -} - -func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) { - var ( - err error - errCode code.Code - errMsg string - ) - - if resp.ResourceURL != pbpeering.TypeURLService { - errCode = code.Code_INVALID_ARGUMENT - err = fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) - return makeReply(resp.ResourceURL, resp.Nonce, errCode, err.Error()), err - } - - switch resp.Operation { - case pbpeering.ReplicationMessage_Response_UPSERT: - if resp.Resource == nil { - break - } - err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource) - if err != nil { - errCode = code.Code_INTERNAL - errMsg = err.Error() - } - - case pbpeering.ReplicationMessage_Response_DELETE: - err = handleDelete(resp.ResourceURL, resp.ResourceID) - if err != nil { - errCode = code.Code_INTERNAL - errMsg = err.Error() - } - - default: - errCode = code.Code_INVALID_ARGUMENT - - op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)] - if op == "" { - op = strconv.FormatInt(int64(resp.Operation), 10) - } - errMsg = fmt.Sprintf("unsupported operation: %q", op) - - err = errors.New(errMsg) - } - - return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err -} - -func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error { - csn := &pbservice.IndexedCheckServiceNodes{} - err := ptypes.UnmarshalAny(resource, csn) - if err != nil { - return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - if csn == nil || len(csn.Nodes) == 0 { - return nil - } - - type checkTuple struct { - checkID types.CheckID - serviceID string - nodeID types.NodeID - - acl.EnterpriseMeta - } - - var ( - nodes = make(map[types.NodeID]*structs.Node) - services = make(map[types.NodeID][]*structs.NodeService) - checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck) - ) - - for _, pbinstance := range csn.Nodes { - instance, err := pbservice.CheckServiceNodeToStructs(pbinstance) - if err != nil { - return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - - nodes[instance.Node.ID] = instance.Node - services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service) - - if _, ok := checks[instance.Node.ID]; !ok { - checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck) - } - for _, c := range instance.Checks { - tuple := checkTuple{ - checkID: c.CheckID, - serviceID: c.ServiceID, - nodeID: instance.Node.ID, - EnterpriseMeta: c.EnterpriseMeta, - } - checks[instance.Node.ID][tuple] = c - } - } - - for nodeID, node := range nodes { - // For all nodes, services, and checks we override the peer name and partition to be - // the local partition and local name for the peer. - node.PeerName, node.Partition = peerName, partition - - // First register the node - req := node.ToRegisterRequest() - if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - - // Then register all services on that node - for _, svc := range services[nodeID] { - svc.PeerName = peerName - svc.OverridePartition(partition) - - req.Service = svc - if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - } - req.Service = nil - - // Then register all checks on that node - var chks structs.HealthChecks - for _, c := range checks[nodeID] { - c.PeerName = peerName - c.OverridePartition(partition) - - chks = append(chks, c) - } - - req.Checks = chks - if err := s.Backend.Apply().CatalogRegister(&req); err != nil { - return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err) - } - } - return nil -} - -func handleDelete(resourceURL string, resourceID string) error { - // TODO(peering): implement - return nil -} - func logTraceRecv(logger hclog.Logger, pb proto.Message) { logTraceProto(logger, pb, true) }