mirror of https://github.com/status-im/consul.git
various partition related todos (#11822)
This commit is contained in:
parent
11ab84f840
commit
631c649291
40
agent/acl.go
40
agent/acl.go
|
@ -43,19 +43,20 @@ func (a *Agent) vetServiceRegister(token string, service *structs.NodeService) e
|
|||
|
||||
func (a *Agent) vetServiceRegisterWithAuthorizer(authz acl.Authorizer, service *structs.NodeService) error {
|
||||
var authzContext acl.AuthorizerContext
|
||||
service.FillAuthzContext(&authzContext)
|
||||
|
||||
// Vet the service itself.
|
||||
service.FillAuthzContext(&authzContext)
|
||||
if authz.ServiceWrite(service.Service, &authzContext) != acl.Allow {
|
||||
serviceName := service.CompoundServiceName()
|
||||
return acl.PermissionDenied("Missing service:write on %s", serviceName.String())
|
||||
return acl.PermissionDenied("Missing service:write on %s",
|
||||
structs.ServiceIDString(service.Service, &service.EnterpriseMeta))
|
||||
}
|
||||
|
||||
// Vet any service that might be getting overwritten.
|
||||
if existing := a.State.Service(service.CompoundServiceID()); existing != nil {
|
||||
existing.FillAuthzContext(&authzContext)
|
||||
if authz.ServiceWrite(existing.Service, &authzContext) != acl.Allow {
|
||||
serviceName := service.CompoundServiceName()
|
||||
return acl.PermissionDenied("Missing service:write on %s", serviceName.String())
|
||||
return acl.PermissionDenied("Missing service:write on %s",
|
||||
structs.ServiceIDString(service.Service, &service.EnterpriseMeta))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -64,8 +65,8 @@ func (a *Agent) vetServiceRegisterWithAuthorizer(authz acl.Authorizer, service *
|
|||
if service.Kind == structs.ServiceKindConnectProxy {
|
||||
service.FillAuthzContext(&authzContext)
|
||||
if authz.ServiceWrite(service.Proxy.DestinationServiceName, &authzContext) != acl.Allow {
|
||||
// TODO(partitions) fix this to include namespace and partition
|
||||
return acl.PermissionDenied("Missing service:write on %s", service.Proxy.DestinationServiceName)
|
||||
return acl.PermissionDenied("Missing service:write on %s",
|
||||
structs.ServiceIDString(service.Proxy.DestinationServiceName, &service.EnterpriseMeta))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -79,8 +80,8 @@ func (a *Agent) vetServiceUpdateWithAuthorizer(authz acl.Authorizer, serviceID s
|
|||
if existing := a.State.Service(serviceID); existing != nil {
|
||||
existing.FillAuthzContext(&authzContext)
|
||||
if authz.ServiceWrite(existing.Service, &authzContext) != acl.Allow {
|
||||
serviceName := existing.CompoundServiceName()
|
||||
return acl.PermissionDenied("Missing service:write on %s", serviceName.String())
|
||||
return acl.PermissionDenied("Missing service:write on %s",
|
||||
structs.ServiceIDString(existing.Service, &existing.EnterpriseMeta))
|
||||
}
|
||||
} else {
|
||||
return NotFoundError{Reason: fmt.Sprintf("Unknown service %q", serviceID)}
|
||||
|
@ -90,18 +91,19 @@ func (a *Agent) vetServiceUpdateWithAuthorizer(authz acl.Authorizer, serviceID s
|
|||
}
|
||||
|
||||
func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *structs.HealthCheck) error {
|
||||
// TODO(partitions)
|
||||
|
||||
var authzContext acl.AuthorizerContext
|
||||
check.FillAuthzContext(&authzContext)
|
||||
|
||||
// Vet the check itself.
|
||||
if len(check.ServiceName) > 0 {
|
||||
if authz.ServiceWrite(check.ServiceName, &authzContext) != acl.Allow {
|
||||
return acl.PermissionDenied("Missing service:write on %v", structs.ServiceIDString(check.ServiceName, &check.EnterpriseMeta))
|
||||
return acl.PermissionDenied("Missing service:write on %s",
|
||||
structs.ServiceIDString(check.ServiceName, &check.EnterpriseMeta))
|
||||
}
|
||||
} else {
|
||||
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
|
||||
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
|
||||
return acl.PermissionDenied("Missing node:write on %s",
|
||||
structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -109,11 +111,13 @@ func (a *Agent) vetCheckRegisterWithAuthorizer(authz acl.Authorizer, check *stru
|
|||
if existing := a.State.Check(check.CompoundCheckID()); existing != nil {
|
||||
if len(existing.ServiceName) > 0 {
|
||||
if authz.ServiceWrite(existing.ServiceName, &authzContext) != acl.Allow {
|
||||
return acl.PermissionDenied("Missing service:write on %s", structs.ServiceIDString(existing.ServiceName, &existing.EnterpriseMeta))
|
||||
return acl.PermissionDenied("Missing service:write on %s",
|
||||
structs.ServiceIDString(existing.ServiceName, &existing.EnterpriseMeta))
|
||||
}
|
||||
} else {
|
||||
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
|
||||
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
|
||||
return acl.PermissionDenied("Missing node:write on %s",
|
||||
structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -129,11 +133,13 @@ func (a *Agent) vetCheckUpdateWithAuthorizer(authz acl.Authorizer, checkID struc
|
|||
if existing := a.State.Check(checkID); existing != nil {
|
||||
if len(existing.ServiceName) > 0 {
|
||||
if authz.ServiceWrite(existing.ServiceName, &authzContext) != acl.Allow {
|
||||
return acl.PermissionDenied("Missing service:write on %s", structs.ServiceIDString(existing.ServiceName, &existing.EnterpriseMeta))
|
||||
return acl.PermissionDenied("Missing service:write on %s",
|
||||
structs.ServiceIDString(existing.ServiceName, &existing.EnterpriseMeta))
|
||||
}
|
||||
} else {
|
||||
if authz.NodeWrite(a.config.NodeName, &authzContext) != acl.Allow {
|
||||
return acl.PermissionDenied("Missing node:write on %s", structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
|
||||
return acl.PermissionDenied("Missing node:write on %s",
|
||||
structs.NodeNameString(a.config.NodeName, a.AgentEnterpriseMeta()))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -306,7 +306,6 @@ func (s *HTTPHandlers) AgentServices(resp http.ResponseWriter, req *http.Request
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
var entMeta structs.EnterpriseMeta
|
||||
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
|
||||
return nil, err
|
||||
|
@ -393,7 +392,6 @@ func (s *HTTPHandlers) AgentService(resp http.ResponseWriter, req *http.Request)
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
var entMeta structs.EnterpriseMeta
|
||||
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
|
||||
return nil, err
|
||||
|
@ -470,7 +468,6 @@ func (s *HTTPHandlers) AgentChecks(resp http.ResponseWriter, req *http.Request)
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
var entMeta structs.EnterpriseMeta
|
||||
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
|
||||
return nil, err
|
||||
|
@ -637,7 +634,6 @@ func (s *HTTPHandlers) AgentJoin(resp http.ResponseWriter, req *http.Request) (i
|
|||
wan := false
|
||||
if other := req.URL.Query().Get("wan"); other != "" {
|
||||
wan = true
|
||||
// TODO(partitions) : block wan join
|
||||
}
|
||||
|
||||
// Get the address
|
||||
|
@ -722,7 +718,6 @@ func (s *HTTPHandlers) AgentRegisterCheck(resp http.ResponseWriter, req *http.Re
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
var args structs.CheckDefinition
|
||||
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
|
@ -794,7 +789,6 @@ func (s *HTTPHandlers) AgentDeregisterCheck(resp http.ResponseWriter, req *http.
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
if err := s.parseEntMetaNoWildcard(req, &checkID.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -887,7 +881,6 @@ func (s *HTTPHandlers) agentCheckUpdate(resp http.ResponseWriter, req *http.Requ
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
if err := s.parseEntMetaNoWildcard(req, &cid.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -963,7 +956,6 @@ func (s *HTTPHandlers) AgentHealthServiceByID(resp http.ResponseWriter, req *htt
|
|||
return nil, &BadRequestError{Reason: "Missing serviceID"}
|
||||
}
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
var entMeta structs.EnterpriseMeta
|
||||
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
|
||||
return nil, err
|
||||
|
@ -1022,7 +1014,6 @@ func (s *HTTPHandlers) AgentHealthServiceByName(resp http.ResponseWriter, req *h
|
|||
return nil, &BadRequestError{Reason: "Missing service Name"}
|
||||
}
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
var entMeta structs.EnterpriseMeta
|
||||
if err := s.parseEntMetaNoWildcard(req, &entMeta); err != nil {
|
||||
return nil, err
|
||||
|
@ -1086,7 +1077,6 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
|
|||
var args structs.ServiceDefinition
|
||||
// Fixup the type decode of TTL or Interval if a check if provided.
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1253,7 +1243,6 @@ func (s *HTTPHandlers) AgentDeregisterService(resp http.ResponseWriter, req *htt
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1311,7 +1300,6 @@ func (s *HTTPHandlers) AgentServiceMaintenance(resp http.ResponseWriter, req *ht
|
|||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
if err := s.parseEntMetaNoWildcard(req, &sid.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1369,6 +1357,7 @@ func (s *HTTPHandlers) AgentNodeMaintenance(resp http.ResponseWriter, req *http.
|
|||
// Get the provided token, if any, and vet against any ACL policies.
|
||||
var token string
|
||||
s.parseToken(req, &token)
|
||||
|
||||
authz, err := s.agent.delegate.ResolveTokenAndDefaultMeta(token, nil, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -1576,7 +1565,6 @@ func (s *HTTPHandlers) AgentConnectCALeafCert(resp http.ResponseWriter, req *htt
|
|||
}
|
||||
var qOpts structs.QueryOptions
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
if err := s.parseEntMetaNoWildcard(req, &args.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1632,7 +1620,6 @@ func (s *HTTPHandlers) AgentConnectAuthorize(resp http.ResponseWriter, req *http
|
|||
|
||||
var authReq structs.ConnectAuthorizeRequest
|
||||
|
||||
// TODO(partitions): should this default to the agent's partition?
|
||||
if err := s.parseEntMetaNoWildcard(req, &authReq.EnterpriseMeta); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
|
@ -637,7 +637,6 @@ func (r *ACLResolver) resolvePoliciesForIdentity(identity structs.ACLIdentity) (
|
|||
|
||||
policies = append(policies, syntheticPolicies...)
|
||||
filtered := r.filterPoliciesByScope(policies)
|
||||
// TODO(partitions,acls): filter these by the partition/namespace of the token trying to use them?
|
||||
return filtered, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -4,9 +4,10 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"github.com/hashicorp/go-hclog"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/go-hclog"
|
||||
)
|
||||
|
||||
// EnterpriseACLResolverDelegate stub
|
||||
|
|
|
@ -387,8 +387,6 @@ func (s *Server) initializeACLs(ctx context.Context) error {
|
|||
if s.InPrimaryDatacenter() {
|
||||
s.logger.Info("initializing acls")
|
||||
|
||||
// TODO(partitions): initialize acls in all of the partitions?
|
||||
|
||||
// Create/Upgrade the builtin global-management policy
|
||||
_, policy, err := s.fsm.State().ACLPolicyGetByID(nil, structs.ACLPolicyGlobalManagementID, structs.DefaultEnterpriseMetaInDefaultPartition())
|
||||
if err != nil {
|
||||
|
@ -965,8 +963,10 @@ func (s *Server) reconcileReaped(known map[string]struct{}, nodeEntMeta *structs
|
|||
func (s *Server) reconcileMember(member serf.Member) error {
|
||||
// Check if this is a member we should handle
|
||||
if !s.shouldHandleMember(member) {
|
||||
// TODO(partition): log the partition name
|
||||
s.logger.Warn("skipping reconcile of node", "member", member)
|
||||
s.logger.Warn("skipping reconcile of node",
|
||||
"member", member,
|
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
defer metrics.MeasureSince([]string{"leader", "reconcileMember"}, time.Now())
|
||||
|
@ -986,8 +986,8 @@ func (s *Server) reconcileMember(member serf.Member) error {
|
|||
}
|
||||
if err != nil {
|
||||
s.logger.Error("failed to reconcile member",
|
||||
// TODO(partition): log the partition name
|
||||
"member", member,
|
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
|
||||
"error", err,
|
||||
)
|
||||
|
||||
|
@ -1089,7 +1089,10 @@ func (s *Server) handleAliveMember(member serf.Member, nodeEntMeta *structs.Ente
|
|||
}
|
||||
}
|
||||
AFTER_CHECK:
|
||||
s.logger.Info("member joined, marking health alive", "member", member.Name)
|
||||
s.logger.Info("member joined, marking health alive",
|
||||
"member", member.Name,
|
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
|
||||
)
|
||||
|
||||
// Register with the catalog.
|
||||
req := structs.RegisterRequest{
|
||||
|
@ -1131,12 +1134,13 @@ func (s *Server) handleFailedMember(member serf.Member, nodeEntMeta *structs.Ent
|
|||
}
|
||||
|
||||
if node == nil {
|
||||
s.logger.Info("ignoring failed event for member because it does not exist in the catalog", "member", member.Name)
|
||||
s.logger.Info("ignoring failed event for member because it does not exist in the catalog",
|
||||
"member", member.Name,
|
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO(partitions): get the ent meta by parsing serf tags
|
||||
|
||||
if node.Address == member.Addr.String() {
|
||||
// Check if the serfCheck is in the critical state
|
||||
_, checks, err := state.NodeChecks(nil, member.Name, nodeEntMeta)
|
||||
|
@ -1149,7 +1153,10 @@ func (s *Server) handleFailedMember(member serf.Member, nodeEntMeta *structs.Ent
|
|||
}
|
||||
}
|
||||
}
|
||||
s.logger.Info("member failed, marking health critical", "member", member.Name)
|
||||
s.logger.Info("member failed, marking health critical",
|
||||
"member", member.Name,
|
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
|
||||
)
|
||||
|
||||
// Register with the catalog
|
||||
req := structs.RegisterRequest{
|
||||
|
@ -1195,8 +1202,13 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member, nodeE
|
|||
// Do not deregister ourself. This can only happen if the current leader
|
||||
// is leaving. Instead, we should allow a follower to take-over and
|
||||
// deregister us later.
|
||||
//
|
||||
// TODO(partitions): check partitions here too? server names should be unique in general though
|
||||
if member.Name == s.config.NodeName {
|
||||
s.logger.Warn("deregistering self should be done by follower", "name", s.config.NodeName)
|
||||
s.logger.Warn("deregistering self should be done by follower",
|
||||
"name", s.config.NodeName,
|
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1218,7 +1230,11 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member, nodeE
|
|||
}
|
||||
|
||||
// Deregister the node
|
||||
s.logger.Info("deregistering member", "member", member.Name, "reason", reason)
|
||||
s.logger.Info("deregistering member",
|
||||
"member", member.Name,
|
||||
"partition", getSerfMemberEnterpriseMeta(member).PartitionOrDefault(),
|
||||
"reason", reason,
|
||||
)
|
||||
req := structs.DeregisterRequest{
|
||||
Datacenter: s.config.Datacenter,
|
||||
Node: member.Name,
|
||||
|
|
|
@ -90,7 +90,6 @@ func Compile(query *structs.PreparedQuery) (*CompiledTemplate, error) {
|
|||
// prefix it will be expected to run with. The results might not make
|
||||
// sense and create a valid service to lookup, but it should render
|
||||
// without any errors.
|
||||
// TODO(partitions) should this have a partition on it?
|
||||
if _, err = ct.Render(ct.query.Name, structs.QuerySource{}); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -158,7 +157,6 @@ func (ct *CompiledTemplate) Render(name string, source structs.QuerySource) (*st
|
|||
Type: ast.TypeString,
|
||||
Value: source.Segment,
|
||||
},
|
||||
// TODO(partitions): should NodePartition be projected here?
|
||||
},
|
||||
FuncMap: map[string]ast.Function{
|
||||
"match": match,
|
||||
|
|
|
@ -128,18 +128,33 @@ func (s *Store) EnsureRegistration(idx uint64, req *structs.RegisterRequest) err
|
|||
return tx.Commit()
|
||||
}
|
||||
|
||||
func (s *Store) ensureCheckIfNodeMatches(tx WriteTxn, idx uint64, preserveIndexes bool, node string, check *structs.HealthCheck) error {
|
||||
// TODO(partitions): do we have to check partition here? probably not
|
||||
if check.Node != node {
|
||||
func (s *Store) ensureCheckIfNodeMatches(
|
||||
tx WriteTxn,
|
||||
idx uint64,
|
||||
preserveIndexes bool,
|
||||
node string,
|
||||
nodePartition string,
|
||||
check *structs.HealthCheck,
|
||||
) error {
|
||||
if check.Node != node || !structs.EqualPartitions(nodePartition, check.PartitionOrDefault()) {
|
||||
return fmt.Errorf("check node %q does not match node %q",
|
||||
check.Node, node)
|
||||
printNodeName(check.Node, check.PartitionOrDefault()),
|
||||
printNodeName(node, nodePartition),
|
||||
)
|
||||
}
|
||||
if err := s.ensureCheckTxn(tx, idx, preserveIndexes, check); err != nil {
|
||||
return fmt.Errorf("failed inserting check: %s on node %q", err, check.Node)
|
||||
return fmt.Errorf("failed inserting check on node %q: %v", printNodeName(check.Node, check.PartitionOrDefault()), err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func printNodeName(nodeName, partition string) string {
|
||||
if structs.IsDefaultPartition(partition) {
|
||||
return nodeName
|
||||
}
|
||||
return partition + "/" + nodeName
|
||||
}
|
||||
|
||||
// ensureRegistrationTxn is used to make sure a node, service, and check
|
||||
// registration is performed within a single transaction to avoid race
|
||||
// conditions on state updates.
|
||||
|
@ -205,12 +220,12 @@ func (s *Store) ensureRegistrationTxn(tx WriteTxn, idx uint64, preserveIndexes b
|
|||
|
||||
// Add the checks, if any.
|
||||
if req.Check != nil {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, req.Check); err != nil {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, req.PartitionOrDefault(), req.Check); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, check := range req.Checks {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, check); err != nil {
|
||||
if err := s.ensureCheckIfNodeMatches(tx, idx, preserveIndexes, req.Node, req.PartitionOrDefault(), check); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
@ -526,9 +541,6 @@ func (s *Store) DeleteNode(idx uint64, nodeName string, entMeta *structs.Enterpr
|
|||
tx := s.db.WriteTxn(idx)
|
||||
defer tx.Abort()
|
||||
|
||||
// TODO(partition): double check all freshly modified state store functions
|
||||
// that take an ent meta do this trick
|
||||
|
||||
// TODO: accept non-pointer value
|
||||
if entMeta == nil {
|
||||
entMeta = structs.NodeEnterpriseMetaInDefaultPartition()
|
||||
|
|
|
@ -78,9 +78,6 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
return snap, err
|
||||
}
|
||||
|
||||
// default the namespace to the namespace of this proxy service
|
||||
currentNamespace := s.proxyID.NamespaceOrDefault()
|
||||
|
||||
if s.proxyCfg.Mode == structs.ProxyModeTransparent {
|
||||
// When in transparent proxy we will infer upstreams from intentions with this source
|
||||
err := s.cache.Notify(ctx, cachetype.IntentionUpstreamsName, &structs.ServiceSpecificRequest{
|
||||
|
@ -131,15 +128,15 @@ func (s *handlerConnectProxy) initialize(ctx context.Context) (ConfigSnapshot, e
|
|||
continue
|
||||
}
|
||||
|
||||
ns := currentNamespace
|
||||
if u.DestinationNamespace != "" {
|
||||
ns = u.DestinationNamespace
|
||||
}
|
||||
|
||||
// Default the partition and namespace to the namespace of this proxy service.
|
||||
partition := s.proxyID.PartitionOrDefault()
|
||||
if u.DestinationPartition != "" {
|
||||
partition = u.DestinationPartition
|
||||
}
|
||||
ns := s.proxyID.NamespaceOrDefault()
|
||||
if u.DestinationNamespace != "" {
|
||||
ns = u.DestinationNamespace
|
||||
}
|
||||
|
||||
cfg, err := parseReducedUpstreamConfig(u.Config)
|
||||
if err != nil {
|
||||
|
|
|
@ -18,6 +18,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/rpcclient/health"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
|
@ -103,6 +104,7 @@ func TestManager_BasicLifecycle(t *testing.T) {
|
|||
upstreams := structs.TestUpstreams(t)
|
||||
for i := range upstreams {
|
||||
upstreams[i].DestinationNamespace = structs.IntentionDefaultNamespace
|
||||
upstreams[i].DestinationPartition = api.PartitionDefaultName
|
||||
}
|
||||
webProxy := &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
|
|
|
@ -113,14 +113,18 @@ func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error
|
|||
// we can safely modify these since we just copied them
|
||||
for idx := range proxyCfg.Upstreams {
|
||||
us := &proxyCfg.Upstreams[idx]
|
||||
if us.DestinationType != structs.UpstreamDestTypePreparedQuery && us.DestinationNamespace == "" {
|
||||
if us.DestinationType != structs.UpstreamDestTypePreparedQuery {
|
||||
// default the upstreams target namespace and partition to those of the proxy
|
||||
// doing this here prevents needing much more complex logic a bunch of other
|
||||
// places and makes tracking these upstreams simpler as we can dedup them
|
||||
// with the maps tracking upstream ids being watched.
|
||||
proxyCfg.Upstreams[idx].DestinationNamespace = ns.EnterpriseMeta.NamespaceOrDefault()
|
||||
if us.DestinationPartition == "" {
|
||||
proxyCfg.Upstreams[idx].DestinationPartition = ns.EnterpriseMeta.PartitionOrDefault()
|
||||
}
|
||||
if us.DestinationNamespace == "" {
|
||||
proxyCfg.Upstreams[idx].DestinationNamespace = ns.EnterpriseMeta.NamespaceOrDefault()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return proxyCfg, nil
|
||||
|
|
|
@ -547,6 +547,25 @@ func (u *Upstream) String() string {
|
|||
return u.Identifier()
|
||||
}
|
||||
|
||||
// Identifier returns a string representation that uniquely identifies the
|
||||
// upstream in a canonical but human readable way.
|
||||
func (us *Upstream) Identifier() string {
|
||||
name := us.enterpriseIdentifierPrefix() + us.DestinationName
|
||||
typ := us.DestinationType
|
||||
|
||||
if us.Datacenter != "" {
|
||||
name += "?dc=" + us.Datacenter
|
||||
}
|
||||
|
||||
// Service is default type so never prefix it. This is more readable and long
|
||||
// term it is the only type that matters so we can drop the prefix and have
|
||||
// nicer naming in metrics etc.
|
||||
if typ == "" || typ == UpstreamDestTypeService {
|
||||
return name
|
||||
}
|
||||
return typ + ":" + name
|
||||
}
|
||||
|
||||
// UpstreamFromAPI is a helper for converting api.Upstream to Upstream.
|
||||
func UpstreamFromAPI(u api.Upstream) Upstream {
|
||||
return Upstream{
|
||||
|
|
|
@ -13,24 +13,6 @@ func (us *Upstream) DestinationID() ServiceID {
|
|||
}
|
||||
}
|
||||
|
||||
// Identifier returns a string representation that uniquely identifies the
|
||||
// upstream in a canonical but human readable way.
|
||||
func (us *Upstream) Identifier() string {
|
||||
name := us.DestinationName
|
||||
typ := us.DestinationType
|
||||
|
||||
if typ != UpstreamDestTypePreparedQuery && us.DestinationNamespace != "" && us.DestinationNamespace != IntentionDefaultNamespace {
|
||||
name = us.DestinationNamespace + "/" + us.DestinationName
|
||||
}
|
||||
if us.Datacenter != "" {
|
||||
name += "?dc=" + us.Datacenter
|
||||
}
|
||||
|
||||
// Service is default type so never prefix it. This is more readable and long
|
||||
// term it is the only type that matters so we can drop the prefix and have
|
||||
// nicer naming in metrics etc.
|
||||
if typ == "" || typ == UpstreamDestTypeService {
|
||||
return name
|
||||
}
|
||||
return typ + ":" + name
|
||||
func (us *Upstream) enterpriseIdentifierPrefix() string {
|
||||
return ""
|
||||
}
|
||||
|
|
|
@ -422,7 +422,6 @@ type RegisterRequest struct {
|
|||
// node portion of this update will not apply.
|
||||
SkipNodeUpdate bool
|
||||
|
||||
// TODO(partitions): ensure the partition part is used for node reg
|
||||
// EnterpriseMeta is the embedded enterprise metadata
|
||||
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
||||
|
||||
|
@ -473,7 +472,6 @@ type DeregisterRequest struct {
|
|||
Node string
|
||||
ServiceID string
|
||||
CheckID types.CheckID
|
||||
// TODO(partitions): ensure the partition part is used for node reg
|
||||
EnterpriseMeta `hcl:",squash" mapstructure:",squash"`
|
||||
WriteRequest
|
||||
}
|
||||
|
@ -916,7 +914,6 @@ type ServiceNode struct {
|
|||
ServiceProxy ConnectProxyConfig
|
||||
ServiceConnect ServiceConnect
|
||||
|
||||
// TODO(partitions): ensure that Node+Service are both in the same Partition
|
||||
EnterpriseMeta `hcl:",squash" mapstructure:",squash" bexpr:"-"`
|
||||
|
||||
RaftIndex `bexpr:"-"`
|
||||
|
@ -1125,7 +1122,6 @@ type NodeService struct {
|
|||
// somewhere this is used in API output.
|
||||
LocallyRegisteredAsSidecar bool `json:"-" bexpr:"-"`
|
||||
|
||||
// TODO(partitions): ensure that Node+Service are both in the same Partition
|
||||
EnterpriseMeta `hcl:",squash" mapstructure:",squash" bexpr:"-"`
|
||||
|
||||
RaftIndex `bexpr:"-"`
|
||||
|
|
Loading…
Reference in New Issue