mirror of https://github.com/status-im/consul.git
Prevent consul peer-exports by discovery chain.
This commit is contained in:
parent
f366edcb8d
commit
8742fbe14f
|
@ -632,7 +632,7 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply
|
||||||
&args.QueryOptions,
|
&args.QueryOptions,
|
||||||
&reply.QueryMeta,
|
&reply.QueryMeta,
|
||||||
func(ws memdb.WatchSet, state *state.Store) error {
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.EnterpriseMeta)
|
index, serviceMap, err := state.ExportedServicesForAllPeersByName(ws, args.Datacenter, args.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -678,7 +678,7 @@ func (m *Internal) ExportedServicesForPeer(args *structs.ServiceDumpRequest, rep
|
||||||
reply.Services = nil
|
reply.Services = nil
|
||||||
return errNotFound
|
return errNotFound
|
||||||
}
|
}
|
||||||
idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, "")
|
idx, exportedSvcs, err := store.ExportedServicesForPeer(ws, p.ID, args.Datacenter)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("error while listing exported services for peer %q: %w", args.PeerName, err)
|
return fmt.Errorf("error while listing exported services for peer %q: %w", args.PeerName, err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -1139,8 +1139,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedServsCount: 4, // 3 services from above + the "consul" service
|
expectedImportedServsCount: 3, // 3 services from above
|
||||||
expectedExportedServsCount: 4, // 3 services from above + the "consul" service
|
expectedExportedServsCount: 3, // 3 services from above
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "no sync",
|
name: "no sync",
|
||||||
|
@ -1148,8 +1148,8 @@ func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
exportedService: structs.ExportedServicesConfigEntry{
|
exportedService: structs.ExportedServicesConfigEntry{
|
||||||
Name: "default",
|
Name: "default",
|
||||||
},
|
},
|
||||||
expectedImportedServsCount: 0, // we want to see this decremented from 4 --> 0
|
expectedImportedServsCount: 0, // we want to see this decremented from 3 --> 0
|
||||||
expectedExportedServsCount: 0, // we want to see this decremented from 4 --> 0
|
expectedExportedServsCount: 0, // we want to see this decremented from 3 --> 0
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "just a, b services",
|
name: "just a, b services",
|
||||||
|
|
|
@ -722,7 +722,7 @@ func (s *Store) ExportedServicesForPeer(ws memdb.WatchSet, peerID string, dc str
|
||||||
return exportedServicesForPeerTxn(ws, tx, peering, dc)
|
return exportedServicesForPeerTxn(ws, tx, peering, dc)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
|
func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error) {
|
||||||
tx := s.db.ReadTxn()
|
tx := s.db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
|
@ -733,7 +733,7 @@ func (s *Store) ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl
|
||||||
|
|
||||||
out := make(map[string]structs.ServiceList)
|
out := make(map[string]structs.ServiceList)
|
||||||
for _, peering := range peerings {
|
for _, peering := range peerings {
|
||||||
idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, "")
|
idx, list, err := exportedServicesForPeerTxn(ws, tx, peering, dc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
|
return 0, nil, fmt.Errorf("failed to list exported services for peer %q: %w", peering.ID, err)
|
||||||
}
|
}
|
||||||
|
@ -761,6 +761,11 @@ func exportedServicesForPeerTxn(
|
||||||
peering *pbpeering.Peering,
|
peering *pbpeering.Peering,
|
||||||
dc string,
|
dc string,
|
||||||
) (uint64, *structs.ExportedServiceList, error) {
|
) (uint64, *structs.ExportedServiceList, error) {
|
||||||
|
// The DC must be specified in order to compile discovery chains.
|
||||||
|
if dc == "" {
|
||||||
|
return 0, nil, fmt.Errorf("datacenter cannot be empty")
|
||||||
|
}
|
||||||
|
|
||||||
maxIdx := peering.ModifyIndex
|
maxIdx := peering.ModifyIndex
|
||||||
|
|
||||||
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
entMeta := structs.NodeEnterpriseMetaInPartition(peering.Partition)
|
||||||
|
@ -838,10 +843,6 @@ func exportedServicesForPeerTxn(
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
for _, sn := range discoChains {
|
for _, sn := range discoChains {
|
||||||
// Prevent exporting the "consul" service.
|
|
||||||
if sn.Name == structs.ConsulServiceName {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
discoSet[sn] = struct{}{}
|
discoSet[sn] = struct{}{}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -868,18 +869,24 @@ func exportedServicesForPeerTxn(
|
||||||
}
|
}
|
||||||
info.Protocol = protocol
|
info.Protocol = protocol
|
||||||
|
|
||||||
if dc != "" && !structs.IsProtocolHTTPLike(protocol) {
|
|
||||||
// We only need to populate the targets for replication purposes for L4 protocols, which
|
|
||||||
// do not ultimately get intercepted by the mesh gateways.
|
|
||||||
idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
|
idx, targets, err := discoveryChainOriginalTargetsTxn(tx, ws, dc, svc.Name, &svc.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err)
|
return fmt.Errorf("failed to get discovery chain targets for service %q: %w", svc, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if idx > maxIdx {
|
if idx > maxIdx {
|
||||||
maxIdx = idx
|
maxIdx = idx
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Prevent the consul service from being exported by a discovery chain.
|
||||||
|
for _, t := range targets {
|
||||||
|
if t.Service == structs.ConsulServiceName {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// We only need to populate the targets for replication purposes for L4 protocols, which
|
||||||
|
// do not ultimately get intercepted by the mesh gateways.
|
||||||
|
if !structs.IsProtocolHTTPLike(protocol) {
|
||||||
sort.Slice(targets, func(i, j int) bool {
|
sort.Slice(targets, func(i, j int) bool {
|
||||||
return targets[i].ID < targets[j].ID
|
return targets[i].ID < targets[j].ID
|
||||||
})
|
})
|
||||||
|
|
|
@ -1820,6 +1820,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
ID: "billing", Service: "billing", Port: 5000,
|
ID: "billing", Service: "billing", Port: 5000,
|
||||||
}))
|
}))
|
||||||
|
lastIdx++
|
||||||
// The consul service should never be exported.
|
// The consul service should never be exported.
|
||||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, Port: 8000,
|
ID: structs.ConsulServiceID, Service: structs.ConsulServiceName, Port: 8000,
|
||||||
|
@ -1877,12 +1878,13 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
Service: "payments-proxy",
|
Service: "payments-proxy",
|
||||||
Port: 5000,
|
Port: 5000,
|
||||||
}))
|
}))
|
||||||
|
lastIdx++
|
||||||
// The consul service should never be exported.
|
// The consul service should never be exported.
|
||||||
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
require.NoError(t, s.EnsureService(lastIdx, "foo", &structs.NodeService{
|
||||||
Kind: structs.ServiceKindConnectProxy,
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
ID: structs.ConsulServiceID,
|
ID: structs.ConsulServiceID + "-2",
|
||||||
Service: structs.ConsulServiceName,
|
Service: structs.ConsulServiceName,
|
||||||
Port: 8000,
|
Port: 8001,
|
||||||
}))
|
}))
|
||||||
|
|
||||||
// Ensure everything is L7-capable.
|
// Ensure everything is L7-capable.
|
||||||
|
@ -1941,9 +1943,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
// NOTE: no consul here
|
// NOTE: no consul here
|
||||||
},
|
},
|
||||||
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
||||||
newSN("consul-redirect"): {
|
// NOTE: no consul-redirect here
|
||||||
Protocol: "http",
|
|
||||||
},
|
|
||||||
newSN("billing"): {
|
newSN("billing"): {
|
||||||
Protocol: "http",
|
Protocol: "http",
|
||||||
},
|
},
|
||||||
|
@ -1987,9 +1987,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
||||||
// NOTE: no consul here
|
// NOTE: no consul here
|
||||||
},
|
},
|
||||||
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
DiscoChains: map[structs.ServiceName]structs.ExportedDiscoveryChainInfo{
|
||||||
newSN("consul-redirect"): {
|
// NOTE: no consul-redirect here
|
||||||
Protocol: "http",
|
|
||||||
},
|
|
||||||
newSN("payments"): {
|
newSN("payments"): {
|
||||||
Protocol: "http",
|
Protocol: "http",
|
||||||
},
|
},
|
||||||
|
|
|
@ -646,7 +646,6 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
||||||
var resp *pbpeerstream.ReplicationMessage_Response
|
var resp *pbpeerstream.ReplicationMessage_Response
|
||||||
switch {
|
switch {
|
||||||
case strings.HasPrefix(update.CorrelationID, subExportedServiceList):
|
case strings.HasPrefix(update.CorrelationID, subExportedServiceList):
|
||||||
// TODO either filter here or at the source.
|
|
||||||
resp, err = makeExportedServiceListResponse(status, update)
|
resp, err = makeExportedServiceListResponse(status, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
||||||
|
|
|
@ -89,8 +89,6 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p
|
||||||
state.publicUpdateCh = publicUpdateCh
|
state.publicUpdateCh = publicUpdateCh
|
||||||
state.updateCh = updateCh
|
state.updateCh = updateCh
|
||||||
|
|
||||||
// TODO should I worry about all these goroutines emitting a `consul` object?
|
|
||||||
|
|
||||||
// Wrap our bare state store queries in goroutines that emit events.
|
// Wrap our bare state store queries in goroutines that emit events.
|
||||||
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
|
go m.notifyExportedServicesForPeerID(ctx, state, peerID)
|
||||||
go m.notifyServerAddrUpdates(ctx, state.updateCh)
|
go m.notifyServerAddrUpdates(ctx, state.updateCh)
|
||||||
|
|
|
@ -40,7 +40,7 @@ func (s *serverExportedPeeredServices) Notify(ctx context.Context, req *structs.
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.EnterpriseMeta)
|
index, serviceMap, err := store.ExportedServicesForAllPeersByName(ws, req.Datacenter, req.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ func TestServerExportedPeeredServices(t *testing.T) {
|
||||||
t.Cleanup(cancel)
|
t.Cleanup(cancel)
|
||||||
|
|
||||||
store := state.NewStateStore(nil)
|
store := state.NewStateStore(nil)
|
||||||
|
require.NoError(t, store.CASetConfig(0, &structs.CAConfiguration{ClusterID: "cluster-id"}))
|
||||||
|
|
||||||
for _, peer := range []string{"peer-1", "peer-2", "peer-3"} {
|
for _, peer := range []string{"peer-1", "peer-2", "peer-3"} {
|
||||||
require.NoError(t, store.PeeringWrite(nextIndex(), &pbpeering.PeeringWriteRequest{
|
require.NoError(t, store.PeeringWrite(nextIndex(), &pbpeering.PeeringWriteRequest{
|
||||||
|
@ -59,7 +60,7 @@ func TestServerExportedPeeredServices(t *testing.T) {
|
||||||
GetStore: func() Store { return store },
|
GetStore: func() Store { return store },
|
||||||
ACLResolver: newStaticResolver(authz),
|
ACLResolver: newStaticResolver(authz),
|
||||||
})
|
})
|
||||||
require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{}, "", eventCh))
|
require.NoError(t, dataSource.Notify(ctx, &structs.DCSpecificRequest{Datacenter: "dc1"}, "", eventCh))
|
||||||
|
|
||||||
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
testutil.RunStep(t, "initial state", func(t *testing.T) {
|
||||||
result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh)
|
result := getEventResult[*structs.IndexedExportedServiceList](t, eventCh)
|
||||||
|
|
|
@ -36,7 +36,7 @@ type ServerDataSourceDeps struct {
|
||||||
type Store interface {
|
type Store interface {
|
||||||
watch.StateStore
|
watch.StateStore
|
||||||
|
|
||||||
ExportedServicesForAllPeersByName(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error)
|
ExportedServicesForAllPeersByName(ws memdb.WatchSet, dc string, entMeta acl.EnterpriseMeta) (uint64, map[string]structs.ServiceList, error)
|
||||||
FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error)
|
FederationStateList(ws memdb.WatchSet) (uint64, []*structs.FederationState, error)
|
||||||
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
|
GatewayServices(ws memdb.WatchSet, gateway string, entMeta *acl.EnterpriseMeta) (uint64, structs.GatewayServices, error)
|
||||||
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
IntentionTopology(ws memdb.WatchSet, target structs.ServiceName, downstreams bool, defaultDecision acl.EnforcementDecision, intentionTarget structs.IntentionTargetType) (uint64, structs.ServiceList, error)
|
||||||
|
|
Loading…
Reference in New Issue