507 lines
17 KiB

package peerstream
import (
At the start of each peering stream establishment (not initiation, but the
thing that reconnects) we need to do a little bit of light differential
snapshot correction to initially synchronize the local state store.
Then if we ever fail to apply a replication message we should either tear
down the entire connection (and thus force a resync on reconnect) or
request a resync operation.
// makeServiceResponse handles preparing exported service instance updates to the peer cluster.
// Each cache.UpdateEvent will contain all instances for a service name.
// If there are no instances in the event, we consider that to be a de-registration.
func makeServiceResponse(
logger hclog.Logger,
update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) {
any, csn, err := marshalToProtoAny[*pbservice.IndexedCheckServiceNodes](update.Result)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
// If no nodes are present then it's due to one of:
// 1. The service is newly registered or exported and yielded a transient empty update.
// 2. All instances of the service were de-registered.
// 3. The service was un-exported.
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
// Case #1 is a no-op for the importing peer.
if len(csn.Nodes) == 0 {
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: serviceName,
Operation: pbpeerstream.Operation_OPERATION_DELETE,
}, nil
// If there are nodes in the response, we push them as an UPSERT operation.
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLService,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: serviceName,
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
func makeCARootsResponse(
logger hclog.Logger,
update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) {
any, _, err := marshalToProtoAny[*pbpeering.PeeringTrustBundle](update.Result)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLRoots,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: "roots",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
// marshalToProtoAny takes any input and returns:
// the protobuf.Any type, the asserted T type, and any errors
// during marshalling or type assertion.
// `in` MUST be of type T or it returns an error.
func marshalToProtoAny[T proto.Message](in any) (*anypb.Any, T, error) {
typ, ok := in.(T)
if !ok {
var outType T
return nil, typ, fmt.Errorf("input type is not %T: %T", outType, in)
any, err := ptypes.MarshalAny(typ)
if err != nil {
return nil, typ, err
return any, typ, nil
func (s *Server) processResponse(
peerName string,
partition string,
mutableStatus *MutableStatus,
resp *pbpeerstream.ReplicationMessage_Response,
logger hclog.Logger,
) (*pbpeerstream.ReplicationMessage, error) {
if !pbpeerstream.KnownTypeURL(resp.ResourceURL) {
err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
return makeNACKReply(
), err
switch resp.Operation {
case pbpeerstream.Operation_OPERATION_UPSERT:
if resp.Resource == nil {
err := fmt.Errorf("received upsert response with no content")
return makeNACKReply(
), err
if err := s.handleUpsert(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, resp.Resource, logger); err != nil {
return makeNACKReply(
fmt.Sprintf("upsert error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err),
), fmt.Errorf("upsert error: %w", err)
return makeACKReply(resp.ResourceURL, resp.Nonce), nil
case pbpeerstream.Operation_OPERATION_DELETE:
if err := s.handleDelete(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, logger); err != nil {
return makeNACKReply(
fmt.Sprintf("delete error, ResourceURL: %q, ResourceID: %q: %v", resp.ResourceURL, resp.ResourceID, err),
), fmt.Errorf("delete error: %w", err)
return makeACKReply(resp.ResourceURL, resp.Nonce), nil
var errMsg string
if op := pbpeerstream.Operation_name[int32(resp.Operation)]; op != "" {
errMsg = fmt.Sprintf("unsupported operation: %q", op)
} else {
errMsg = fmt.Sprintf("unsupported operation: %d", resp.Operation)
return makeNACKReply(
), errors.New(errMsg)
func (s *Server) handleUpsert(
peerName string,
partition string,
mutableStatus *MutableStatus,
resourceURL string,
resourceID string,
resource *anypb.Any,
logger hclog.Logger,
) error {
switch resourceURL {
case pbpeerstream.TypeURLService:
sn := structs.ServiceNameFromString(resourceID)
csn := &pbservice.IndexedCheckServiceNodes{}
if err := ptypes.UnmarshalAny(resource, csn); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
err := s.handleUpdateService(peerName, partition, sn, csn)
if err != nil {
logger.Error("did not increment imported services count", "service_name", sn.String(), "error", err)
return err
logger.Trace("incrementing imported services count", "service_name", sn.String())
return nil
case pbpeerstream.TypeURLRoots:
roots := &pbpeering.PeeringTrustBundle{}
if err := ptypes.UnmarshalAny(resource, roots); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
return s.handleUpsertRoots(peerName, partition, roots)
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
// handleUpdateService handles both deletion and upsert events for a service.
// On an UPSERT event:
// - All nodes, services, checks in the input pbNodes are re-applied through Raft.
// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted.
// On a DELETE event:
// - A reconciliation against nil or empty input pbNodes leads to deleting all stored catalog resources
// associated with the service name.
func (s *Server) handleUpdateService(
peerName string,
partition string,
sn structs.ServiceName,
pbNodes *pbservice.IndexedCheckServiceNodes,
) error {
// Capture instances in the state store for reconciliation later.
_, storedInstances, err := s.GetStore().CheckServiceNodes(nil, sn.Name, &sn.EnterpriseMeta, peerName)
if err != nil {
return fmt.Errorf("failed to read imported services: %w", err)
structsNodes, err := pbNodes.CheckServiceNodesToStruct()
if err != nil {
return fmt.Errorf("failed to convert protobuf instances to structs: %w", err)
// Normalize the data into a convenient form for operation.
snap := newHealthSnapshot(structsNodes, partition, peerName)
for _, nodeSnap := range snap.Nodes {
// First register the node
req := nodeSnap.Node.ToRegisterRequest()
if err := s.Backend.CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register node: %w", err)
// Then register all services on that node
for _, svcSnap := range nodeSnap.Services {
req.Service = svcSnap.Service
if err := s.Backend.CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register service: %w", err)
req.Service = nil
// Then register all checks on that node
var chks structs.HealthChecks
for _, svcSnap := range nodeSnap.Services {
for _, c := range svcSnap.Checks {
chks = append(chks, c)
req.Checks = chks
if err := s.Backend.CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register check: %w", err)
// Now that the data received has been stored in the state store, the rest of this
// function is responsible for cleaning up data in the catalog that wasn't in the snapshot.
// nodeCheckTuple uniquely identifies a node check in the catalog.
// The partition is not needed because we are only operating on one partition's catalog.
type nodeCheckTuple struct {
checkID types.CheckID
node string
var (
// unusedNodes tracks node names that were not present in the latest response.
// Missing nodes are not assumed to be deleted because there may be other service names
// registered on them.
// Inside we also track a map of node checks associated with the node.
unusedNodes = make(map[string]struct{})
// deletedNodeChecks tracks node checks that were not present in the latest response.
// A single node check will be attached to all service instances of a node, so this
// deduplication prevents issuing multiple deregistrations for a single check.
deletedNodeChecks = make(map[nodeCheckTuple]struct{})
for _, csn := range storedInstances {
if _, ok := snap.Nodes[csn.Node.Node]; !ok {
unusedNodes[csn.Node.Node] = struct{}{}
// Since the node is not in the snapshot we can know the associated service
// instance is not in the snapshot either, since a service instance can't
// exist without a node.
// This will also delete all service checks.
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: csn.Node.Node,
ServiceID: csn.Service.ID,
EnterpriseMeta: csn.Service.EnterpriseMeta,
PeerName: peerName,
if err != nil {
return fmt.Errorf("failed to deregister service %q: %w", csn.Service.CompoundServiceID(), err)
// We can't know if a node check was deleted from the exporting cluster
// (but not the node itself) if the node wasn't in the snapshot,
// so we do not loop over checks here.
// If the unusedNode gets deleted below that will also delete node checks.
// Delete the service instance if not in the snapshot.
sid := csn.Service.CompoundServiceID()
if _, ok := snap.Nodes[csn.Node.Node].Services[sid]; !ok {
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: csn.Node.Node,
ServiceID: csn.Service.ID,
EnterpriseMeta: csn.Service.EnterpriseMeta,
PeerName: peerName,
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/service_id:%s",
csn.Service.PartitionOrDefault(), peerName, csn.Node.Node, csn.Service.NamespaceOrDefault(), csn.Service.ID)
return fmt.Errorf("failed to deregister service %q: %w", ident, err)
// When a service is deleted all associated checks also get deleted as a side effect.
// Reconcile checks.
for _, chk := range csn.Checks {
if _, ok := snap.Nodes[csn.Node.Node].Services[sid].Checks[chk.CheckID]; !ok {
// Checks without a ServiceID are node checks.
// If the node exists but the check does not then the check was deleted.
if chk.ServiceID == "" {
// Deduplicate node checks to avoid deregistering a check multiple times.
tuple := nodeCheckTuple{
checkID: chk.CheckID,
node: chk.Node,
deletedNodeChecks[tuple] = struct{}{}
// If the check isn't a node check then it's a service check.
// Service checks that were not present can be deleted immediately because
// checks for a given service ID will only be attached to a single CheckServiceNode.
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: chk.Node,
CheckID: chk.CheckID,
EnterpriseMeta: chk.EnterpriseMeta,
PeerName: peerName,
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/ns:%s/check_id:%s",
chk.PartitionOrDefault(), peerName, chk.Node, chk.NamespaceOrDefault(), chk.CheckID)
return fmt.Errorf("failed to deregister check %q: %w", ident, err)
// Delete all deduplicated node checks.
for chk := range deletedNodeChecks {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
err := s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: chk.node,
CheckID: chk.checkID,
EnterpriseMeta: *nodeMeta,
PeerName: peerName,
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s/check_id:%s", nodeMeta.PartitionOrDefault(), peerName, chk.node, chk.checkID)
return fmt.Errorf("failed to deregister node check %q: %w", ident, err)
// Delete any nodes that do not have any other services registered on them.
for node := range unusedNodes {
nodeMeta := structs.NodeEnterpriseMetaInPartition(sn.PartitionOrDefault())
_, ns, err := s.GetStore().NodeServices(nil, node, nodeMeta, peerName)
if err != nil {
return fmt.Errorf("failed to query services on node: %w", err)
if ns != nil && len(ns.Services) >= 1 {
// At least one service is still registered on this node, so we keep it.
// All services on the node were deleted, so the node is also cleaned up.
err = s.Backend.CatalogDeregister(&structs.DeregisterRequest{
Node: node,
PeerName: peerName,
EnterpriseMeta: *nodeMeta,
if err != nil {
ident := fmt.Sprintf("partition:%s/peer:%s/node:%s", nodeMeta.PartitionOrDefault(), peerName, node)
return fmt.Errorf("failed to deregister node %q: %w", ident, err)
return nil
func (s *Server) handleUpsertRoots(
peerName string,
partition string,
trustBundle *pbpeering.PeeringTrustBundle,
) error {
// We override the partition and peer name so that the trust bundle gets stored
// in the importing partition with a reference to the peer it was imported from.
trustBundle.Partition = partition
trustBundle.PeerName = peerName
req := &pbpeering.PeeringTrustBundleWriteRequest{
PeeringTrustBundle: trustBundle,
return s.Backend.PeeringTrustBundleWrite(req)
func (s *Server) handleDelete(
peerName string,
partition string,
mutableStatus *MutableStatus,
resourceURL string,
resourceID string,
logger hclog.Logger,
) error {
switch resourceURL {
case pbpeerstream.TypeURLService:
sn := structs.ServiceNameFromString(resourceID)
err := s.handleUpdateService(peerName, partition, sn, nil)
if err != nil {
logger.Error("did not decrement imported services count", "service_name", sn.String(), "error", err)
return err
logger.Trace("decrementing imported services count", "service_name", sn.String())
return nil
return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
func makeACKReply(resourceURL, nonce string) *pbpeerstream.ReplicationMessage {
return makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: resourceURL,
ResponseNonce: nonce,
func makeNACKReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbpeerstream.ReplicationMessage {
var rpcErr *pbstatus.Status
if errCode != code.Code_OK || errMsg != "" {
rpcErr = &pbstatus.Status{
Code: int32(errCode),
Message: errMsg,
return makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: resourceURL,
ResponseNonce: nonce,
Error: rpcErr,
// makeReplicationRequest is a convenience method to make a Request-type ReplicationMessage.
func makeReplicationRequest(req *pbpeerstream.ReplicationMessage_Request) *pbpeerstream.ReplicationMessage {
return &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: req,
// makeReplicationResponse is a convenience method to make a Response-type ReplicationMessage.
func makeReplicationResponse(resp *pbpeerstream.ReplicationMessage_Response) *pbpeerstream.ReplicationMessage {
return &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Response_{
Response: resp,