mirror of
https://github.com/status-im/consul.git
synced 2025-01-25 05:00:32 +00:00
301 lines
9.0 KiB
Go
301 lines
9.0 KiB
Go
package peering
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/golang/protobuf/ptypes"
|
|
"github.com/hashicorp/go-hclog"
|
|
"google.golang.org/genproto/googleapis/rpc/code"
|
|
"google.golang.org/protobuf/types/known/anypb"
|
|
|
|
"github.com/hashicorp/consul/agent/cache"
|
|
"github.com/hashicorp/consul/agent/structs"
|
|
"github.com/hashicorp/consul/proto/pbpeering"
|
|
"github.com/hashicorp/consul/proto/pbservice"
|
|
"github.com/hashicorp/consul/proto/pbstatus"
|
|
)
|
|
|
|
/*
|
|
TODO(peering):
|
|
|
|
At the start of each peering stream establishment (not initiation, but the
|
|
thing that reconnects) we need to do a little bit of light differential
|
|
snapshot correction to initially synchronize the local state store.
|
|
|
|
Then if we ever fail to apply a replication message we should either tear
|
|
down the entire connection (and thus force a resync on reconnect) or
|
|
request a resync operation.
|
|
*/
|
|
|
|
// pushService response handles sending exported service instance updates to the peer cluster.
|
|
// Each cache.UpdateEvent will contain all instances for a service name.
|
|
// If there are no instances in the event, we consider that to be a de-registration.
|
|
func pushServiceResponse(
|
|
logger hclog.Logger,
|
|
stream BidirectionalStream,
|
|
status *lockableStreamStatus,
|
|
update cache.UpdateEvent,
|
|
) error {
|
|
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
|
if !ok {
|
|
logger.Error(fmt.Sprintf("invalid type for response: %T, expected *pbservice.IndexedCheckServiceNodes", update.Result))
|
|
|
|
// Skip this update to avoid locking up peering due to a bad service update.
|
|
return nil
|
|
}
|
|
|
|
var serviceName string
|
|
if strings.HasPrefix(update.CorrelationID, subExportedService) {
|
|
serviceName = strings.TrimPrefix(update.CorrelationID, subExportedService)
|
|
} else {
|
|
serviceName = strings.TrimPrefix(update.CorrelationID, subExportedProxyService) + syntheticProxyNameSuffix
|
|
}
|
|
|
|
// If no nodes are present then it's due to one of:
|
|
// 1. The service is newly registered or exported and yielded a transient empty update.
|
|
// 2. All instances of the service were de-registered.
|
|
// 3. The service was un-exported.
|
|
//
|
|
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
|
|
// Case #1 is a no-op for the importing peer.
|
|
if len(csn.Nodes) == 0 {
|
|
resp := &pbpeering.ReplicationMessage{
|
|
Payload: &pbpeering.ReplicationMessage_Response_{
|
|
Response: &pbpeering.ReplicationMessage_Response{
|
|
ResourceURL: pbpeering.TypeURLService,
|
|
// TODO(peering): Nonce management
|
|
Nonce: "",
|
|
ResourceID: serviceName,
|
|
Operation: pbpeering.ReplicationMessage_Response_DELETE,
|
|
},
|
|
},
|
|
}
|
|
logTraceSend(logger, resp)
|
|
if err := stream.Send(resp); err != nil {
|
|
status.trackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// If there are nodes in the response, we push them as an UPSERT operation.
|
|
any, err := ptypes.MarshalAny(csn)
|
|
if err != nil {
|
|
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
|
logger.Error("failed to marshal service endpoints", "error", err)
|
|
return nil
|
|
}
|
|
resp := &pbpeering.ReplicationMessage{
|
|
Payload: &pbpeering.ReplicationMessage_Response_{
|
|
Response: &pbpeering.ReplicationMessage_Response{
|
|
ResourceURL: pbpeering.TypeURLService,
|
|
// TODO(peering): Nonce management
|
|
Nonce: "",
|
|
ResourceID: serviceName,
|
|
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
|
|
Resource: any,
|
|
},
|
|
},
|
|
}
|
|
logTraceSend(logger, resp)
|
|
if err := stream.Send(resp); err != nil {
|
|
status.trackSendError(err.Error())
|
|
return fmt.Errorf("failed to send to stream: %v", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) {
|
|
if resp.ResourceURL != pbpeering.TypeURLService {
|
|
err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
|
|
return makeReply(
|
|
resp.ResourceURL,
|
|
resp.Nonce,
|
|
code.Code_INVALID_ARGUMENT,
|
|
err.Error(),
|
|
), err
|
|
}
|
|
|
|
switch resp.Operation {
|
|
case pbpeering.ReplicationMessage_Response_UPSERT:
|
|
if resp.Resource == nil {
|
|
err := fmt.Errorf("received upsert response with no content")
|
|
return makeReply(
|
|
resp.ResourceURL,
|
|
resp.Nonce,
|
|
code.Code_INVALID_ARGUMENT,
|
|
err.Error(),
|
|
), err
|
|
}
|
|
|
|
if err := s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil {
|
|
return makeReply(
|
|
resp.ResourceURL,
|
|
resp.Nonce,
|
|
code.Code_INTERNAL,
|
|
fmt.Sprintf("upsert error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err),
|
|
), fmt.Errorf("upsert error: %w", err)
|
|
}
|
|
|
|
return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil
|
|
|
|
case pbpeering.ReplicationMessage_Response_DELETE:
|
|
if err := s.handleDelete(peerName, partition, resp.ResourceURL, resp.ResourceID); err != nil {
|
|
return makeReply(
|
|
resp.ResourceURL,
|
|
resp.Nonce,
|
|
code.Code_INTERNAL,
|
|
fmt.Sprintf("delete error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err),
|
|
), fmt.Errorf("delete error: %w", err)
|
|
}
|
|
return makeReply(resp.ResourceURL, resp.Nonce, code.Code_OK, ""), nil
|
|
|
|
default:
|
|
var errMsg string
|
|
if op := pbpeering.ReplicationMessage_Response_Operation_name[int32(resp.Operation)]; op != "" {
|
|
errMsg = fmt.Sprintf("unsupported operation: %q", op)
|
|
} else {
|
|
errMsg = fmt.Sprintf("unsupported operation: %d", resp.Operation)
|
|
}
|
|
return makeReply(
|
|
resp.ResourceURL,
|
|
resp.Nonce,
|
|
code.Code_INVALID_ARGUMENT,
|
|
errMsg,
|
|
), errors.New(errMsg)
|
|
}
|
|
}
|
|
|
|
func (s *Service) handleUpsert(
|
|
peerName string,
|
|
partition string,
|
|
resourceURL string,
|
|
resourceID string,
|
|
resource *anypb.Any,
|
|
) error {
|
|
switch resourceURL {
|
|
case pbpeering.TypeURLService:
|
|
sn := structs.ServiceNameFromString(resourceID)
|
|
sn.OverridePartition(partition)
|
|
|
|
csn := &pbservice.IndexedCheckServiceNodes{}
|
|
if err := ptypes.UnmarshalAny(resource, csn); err != nil {
|
|
return fmt.Errorf("failed to unmarshal resource: %w", err)
|
|
}
|
|
|
|
return s.handleUpsertService(peerName, partition, sn, csn)
|
|
default:
|
|
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
|
|
}
|
|
}
|
|
|
|
func (s *Service) handleUpsertService(
|
|
peerName string,
|
|
partition string,
|
|
sn structs.ServiceName,
|
|
csn *pbservice.IndexedCheckServiceNodes,
|
|
) error {
|
|
if csn == nil || len(csn.Nodes) == 0 {
|
|
return s.handleDeleteService(peerName, partition, sn)
|
|
}
|
|
|
|
// Convert exported data into structs format.
|
|
structsNodes := make([]structs.CheckServiceNode, 0, len(csn.Nodes))
|
|
for _, pb := range csn.Nodes {
|
|
instance, err := pbservice.CheckServiceNodeToStructs(pb)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to convert instance: %w", err)
|
|
}
|
|
structsNodes = append(structsNodes, *instance)
|
|
}
|
|
|
|
// Normalize the data into a convenient form for operation.
|
|
snap := newHealthSnapshot(structsNodes, partition, peerName)
|
|
|
|
for _, nodeSnap := range snap.Nodes {
|
|
// First register the node
|
|
req := nodeSnap.Node.ToRegisterRequest()
|
|
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
|
return fmt.Errorf("failed to register node: %w", err)
|
|
}
|
|
|
|
// Then register all services on that node
|
|
for _, svcSnap := range nodeSnap.Services {
|
|
req.Service = svcSnap.Service
|
|
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
|
return fmt.Errorf("failed to register service: %w", err)
|
|
}
|
|
}
|
|
req.Service = nil
|
|
|
|
// Then register all checks on that node
|
|
var chks structs.HealthChecks
|
|
for _, svcSnap := range nodeSnap.Services {
|
|
for _, c := range svcSnap.Checks {
|
|
chks = append(chks, c)
|
|
}
|
|
}
|
|
|
|
req.Checks = chks
|
|
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
|
|
return fmt.Errorf("failed to register check: %w", err)
|
|
}
|
|
}
|
|
|
|
// TODO(peering): cleanup and deregister existing data that is now missing safely somehow
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Service) handleDelete(
|
|
peerName string,
|
|
partition string,
|
|
resourceURL string,
|
|
resourceID string,
|
|
) error {
|
|
switch resourceURL {
|
|
case pbpeering.TypeURLService:
|
|
sn := structs.ServiceNameFromString(resourceID)
|
|
sn.OverridePartition(partition)
|
|
return s.handleDeleteService(peerName, partition, sn)
|
|
|
|
default:
|
|
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
|
|
}
|
|
}
|
|
|
|
func (s *Service) handleDeleteService(
|
|
peerName string,
|
|
partition string,
|
|
sn structs.ServiceName,
|
|
) error {
|
|
// Deregister: ServiceID == DeleteService ANd checks
|
|
// Deregister: ServiceID(empty) CheckID(empty) == DeleteNode
|
|
|
|
// TODO(peering): implement
|
|
return nil
|
|
}
|
|
|
|
func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeering.ReplicationMessage {
|
|
var rpcErr *pbstatus.Status
|
|
if errCode != code.Code_OK || errMsg != "" {
|
|
rpcErr = &pbstatus.Status{
|
|
Code: int32(errCode),
|
|
Message: errMsg,
|
|
}
|
|
}
|
|
|
|
// TODO: shouldn't this be response?
|
|
return &pbpeering.ReplicationMessage{
|
|
Payload: &pbpeering.ReplicationMessage_Request_{
|
|
Request: &pbpeering.ReplicationMessage_Request{
|
|
ResourceURL: resourceURL,
|
|
Nonce: nonce,
|
|
Error: rpcErr,
|
|
},
|
|
},
|
|
}
|
|
}
|