mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 21:35:52 +00:00
Add internal endpoint to fetch peered upstream candidates from VirtualIP table (#13642)
For initial cluster peering TProxy support we consider all imported services of a partition to be potential upstreams. We leverage the VirtualIP table because it stores plain service names (e.g. "api", not "api-sidecar-proxy").
This commit is contained in:
parent
653cb42944
commit
d8b7940e40
@ -647,6 +647,7 @@ func (a *Agent) Start(ctx context.Context) error {
|
|||||||
IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache),
|
IntentionUpstreams: proxycfgglue.CacheIntentionUpstreams(a.cache),
|
||||||
InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache),
|
InternalServiceDump: proxycfgglue.CacheInternalServiceDump(a.cache),
|
||||||
LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache),
|
LeafCertificate: proxycfgglue.CacheLeafCertificate(a.cache),
|
||||||
|
PeeredUpstreams: proxycfgglue.CachePeeredUpstreams(a.cache),
|
||||||
PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache),
|
PreparedQuery: proxycfgglue.CachePrepraredQuery(a.cache),
|
||||||
ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache),
|
ResolvedServiceConfig: proxycfgglue.CacheResolvedServiceConfig(a.cache),
|
||||||
ServiceList: proxycfgglue.CacheServiceList(a.cache),
|
ServiceList: proxycfgglue.CacheServiceList(a.cache),
|
||||||
|
@ -10,7 +10,7 @@ import (
|
|||||||
// Recommended name for registration.
|
// Recommended name for registration.
|
||||||
const IntentionUpstreamsName = "intention-upstreams"
|
const IntentionUpstreamsName = "intention-upstreams"
|
||||||
|
|
||||||
// GatewayUpstreams supports fetching upstreams for a given gateway name.
|
// IntentionUpstreams supports fetching upstreams for a given service name.
|
||||||
type IntentionUpstreams struct {
|
type IntentionUpstreams struct {
|
||||||
RegisterOptionsBlockingRefresh
|
RegisterOptionsBlockingRefresh
|
||||||
RPC RPC
|
RPC RPC
|
||||||
|
51
agent/cache-types/peered_upstreams.go
Normal file
51
agent/cache-types/peered_upstreams.go
Normal file
@ -0,0 +1,51 @@
|
|||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Recommended name for registration.
|
||||||
|
const PeeredUpstreamsName = "peered-upstreams"
|
||||||
|
|
||||||
|
// PeeredUpstreams supports fetching imported upstream candidates of a given partition.
|
||||||
|
type PeeredUpstreams struct {
|
||||||
|
RegisterOptionsBlockingRefresh
|
||||||
|
RPC RPC
|
||||||
|
}
|
||||||
|
|
||||||
|
func (i *PeeredUpstreams) Fetch(opts cache.FetchOptions, req cache.Request) (cache.FetchResult, error) {
|
||||||
|
var result cache.FetchResult
|
||||||
|
|
||||||
|
reqReal, ok := req.(*structs.PartitionSpecificRequest)
|
||||||
|
if !ok {
|
||||||
|
return result, fmt.Errorf(
|
||||||
|
"Internal cache failure: request wrong type: %T", req)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Lightweight copy this object so that manipulating QueryOptions doesn't race.
|
||||||
|
dup := *reqReal
|
||||||
|
reqReal = &dup
|
||||||
|
|
||||||
|
// Set the minimum query index to our current index so we block
|
||||||
|
reqReal.QueryOptions.MinQueryIndex = opts.MinIndex
|
||||||
|
reqReal.QueryOptions.MaxQueryTime = opts.Timeout
|
||||||
|
|
||||||
|
// Always allow stale - there's no point in hitting leader if the request is
|
||||||
|
// going to be served from cache and end up arbitrarily stale anyway. This
|
||||||
|
// allows cached service-discover to automatically read scale across all
|
||||||
|
// servers too.
|
||||||
|
reqReal.AllowStale = true
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
var reply structs.IndexedPeeredServiceList
|
||||||
|
if err := i.RPC.RPC("Internal.PeeredUpstreams", reqReal, &reply); err != nil {
|
||||||
|
return result, err
|
||||||
|
}
|
||||||
|
|
||||||
|
result.Value = &reply
|
||||||
|
result.Index = reply.QueryMeta.Index
|
||||||
|
return result, nil
|
||||||
|
}
|
60
agent/cache-types/peered_upstreams_test.go
Normal file
60
agent/cache-types/peered_upstreams_test.go
Normal file
@ -0,0 +1,60 @@
|
|||||||
|
package cachetype
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/stretchr/testify/mock"
|
||||||
|
"github.com/stretchr/testify/require"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestPeeredUpstreams(t *testing.T) {
|
||||||
|
rpc := TestRPC(t)
|
||||||
|
defer rpc.AssertExpectations(t)
|
||||||
|
typ := &PeeredUpstreams{RPC: rpc}
|
||||||
|
|
||||||
|
// Expect the proper RPC call. This also sets the expected value
|
||||||
|
// since that is return-by-pointer in the arguments.
|
||||||
|
var resp *structs.IndexedPeeredServiceList
|
||||||
|
rpc.On("RPC", "Internal.PeeredUpstreams", mock.Anything, mock.Anything).Return(nil).
|
||||||
|
Run(func(args mock.Arguments) {
|
||||||
|
req := args.Get(1).(*structs.PartitionSpecificRequest)
|
||||||
|
require.Equal(t, uint64(24), req.MinQueryIndex)
|
||||||
|
require.Equal(t, 1*time.Second, req.QueryOptions.MaxQueryTime)
|
||||||
|
require.True(t, req.AllowStale)
|
||||||
|
|
||||||
|
reply := args.Get(2).(*structs.IndexedPeeredServiceList)
|
||||||
|
reply.Index = 48
|
||||||
|
resp = reply
|
||||||
|
})
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
result, err := typ.Fetch(cache.FetchOptions{
|
||||||
|
MinIndex: 24,
|
||||||
|
Timeout: 1 * time.Second,
|
||||||
|
}, &structs.PartitionSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||||
|
})
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, cache.FetchResult{
|
||||||
|
Value: resp,
|
||||||
|
Index: 48,
|
||||||
|
}, result)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestPeeredUpstreams_badReqType(t *testing.T) {
|
||||||
|
rpc := TestRPC(t)
|
||||||
|
defer rpc.AssertExpectations(t)
|
||||||
|
typ := &PeeredUpstreams{RPC: rpc}
|
||||||
|
|
||||||
|
// Fetch
|
||||||
|
_, err := typ.Fetch(cache.FetchOptions{}, cache.TestRequest(
|
||||||
|
t, cache.RequestInfo{Key: "foo", MinIndex: 64}))
|
||||||
|
require.Error(t, err)
|
||||||
|
require.Contains(t, err.Error(), "wrong type")
|
||||||
|
}
|
@ -14,6 +14,7 @@ import (
|
|||||||
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
"github.com/hashicorp/consul-net-rpc/net/rpc"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/api"
|
"github.com/hashicorp/consul/api"
|
||||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||||
@ -1209,6 +1210,104 @@ func registerTestRoutingConfigTopologyEntries(t *testing.T, codec rpc.ClientCode
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func registerLocalAndRemoteServicesVIPEnabled(t *testing.T, state *state.Store) {
|
||||||
|
t.Helper()
|
||||||
|
|
||||||
|
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotNil(t, entry)
|
||||||
|
require.Equal(t, "true", entry.Value)
|
||||||
|
|
||||||
|
// Register a local connect-native service
|
||||||
|
require.NoError(t, state.EnsureRegistration(10, &structs.RegisterRequest{
|
||||||
|
Node: "foo",
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Service: "api",
|
||||||
|
Connect: structs.ServiceConnect{
|
||||||
|
Native: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}))
|
||||||
|
// Should be assigned VIP
|
||||||
|
psn := structs.PeeredServiceName{ServiceName: structs.NewServiceName("api", nil)}
|
||||||
|
vip, err := state.VirtualIPForService(psn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "240.0.0.1", vip)
|
||||||
|
|
||||||
|
// Register an imported service and its proxy
|
||||||
|
require.NoError(t, state.EnsureRegistration(11, &structs.RegisterRequest{
|
||||||
|
Node: "bar",
|
||||||
|
SkipNodeUpdate: true,
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
Service: "web",
|
||||||
|
ID: "web-1",
|
||||||
|
},
|
||||||
|
PeerName: "peer-a",
|
||||||
|
}))
|
||||||
|
require.NoError(t, state.EnsureRegistration(12, &structs.RegisterRequest{
|
||||||
|
Node: "bar",
|
||||||
|
Address: "127.0.0.2",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "web-proxy",
|
||||||
|
Service: "web-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "web",
|
||||||
|
},
|
||||||
|
LocallyRegisteredAsSidecar: true,
|
||||||
|
},
|
||||||
|
PeerName: "peer-a",
|
||||||
|
}))
|
||||||
|
// Should be assigned one VIP for the real service name
|
||||||
|
psn = structs.PeeredServiceName{Peer: "peer-a", ServiceName: structs.NewServiceName("web", nil)}
|
||||||
|
vip, err = state.VirtualIPForService(psn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "240.0.0.2", vip)
|
||||||
|
// web-proxy should not have a VIP
|
||||||
|
psn = structs.PeeredServiceName{Peer: "peer-a", ServiceName: structs.NewServiceName("web-proxy", nil)}
|
||||||
|
vip, err = state.VirtualIPForService(psn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, vip)
|
||||||
|
|
||||||
|
// Register an imported service and its proxy from another peer
|
||||||
|
require.NoError(t, state.EnsureRegistration(11, &structs.RegisterRequest{
|
||||||
|
Node: "gir",
|
||||||
|
SkipNodeUpdate: true,
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindTypical,
|
||||||
|
Service: "web",
|
||||||
|
ID: "web-1",
|
||||||
|
},
|
||||||
|
PeerName: "peer-b",
|
||||||
|
}))
|
||||||
|
require.NoError(t, state.EnsureRegistration(12, &structs.RegisterRequest{
|
||||||
|
Node: "gir",
|
||||||
|
Address: "127.0.0.3",
|
||||||
|
Service: &structs.NodeService{
|
||||||
|
Kind: structs.ServiceKindConnectProxy,
|
||||||
|
ID: "web-proxy",
|
||||||
|
Service: "web-proxy",
|
||||||
|
Proxy: structs.ConnectProxyConfig{
|
||||||
|
DestinationServiceName: "web",
|
||||||
|
},
|
||||||
|
LocallyRegisteredAsSidecar: true,
|
||||||
|
},
|
||||||
|
PeerName: "peer-b",
|
||||||
|
}))
|
||||||
|
// Should be assigned one VIP for the real service name
|
||||||
|
psn = structs.PeeredServiceName{Peer: "peer-b", ServiceName: structs.NewServiceName("web", nil)}
|
||||||
|
vip, err = state.VirtualIPForService(psn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, "240.0.0.3", vip)
|
||||||
|
// web-proxy should not have a VIP
|
||||||
|
psn = structs.PeeredServiceName{Peer: "peer-b", ServiceName: structs.NewServiceName("web-proxy", nil)}
|
||||||
|
vip, err = state.VirtualIPForService(psn)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Empty(t, vip)
|
||||||
|
}
|
||||||
|
|
||||||
func registerIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token string) {
|
func registerIntentionUpstreamEntries(t *testing.T, codec rpc.ClientCodec, token string) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
|
@ -292,7 +292,7 @@ func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// IntentionUpstreams returns the upstreams of a service. Upstreams are inferred from intentions.
|
// IntentionUpstreams returns a service's upstreams which are inferred from intentions.
|
||||||
// If intentions allow a connection from the target to some candidate service, the candidate service is considered
|
// If intentions allow a connection from the target to some candidate service, the candidate service is considered
|
||||||
// an upstream of the target.
|
// an upstream of the target.
|
||||||
func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error {
|
func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error {
|
||||||
@ -309,9 +309,9 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl
|
|||||||
return m.internalUpstreams(args, reply, structs.IntentionTargetService)
|
return m.internalUpstreams(args, reply, structs.IntentionTargetService)
|
||||||
}
|
}
|
||||||
|
|
||||||
// IntentionUpstreamsDestination returns the upstreams of a service. Upstreams are inferred from intentions.
|
// IntentionUpstreamsDestination returns a service's upstreams which are inferred from intentions.
|
||||||
// If intentions allow a connection from the target to some candidate destination, the candidate destination is considered
|
// If intentions allow a connection from the target to some candidate destination, the candidate destination is considered
|
||||||
// an upstream of the target.this is performs the same logic as IntentionUpstreams endpoint but for destination upstreams only.
|
// an upstream of the target. This performs the same logic as IntentionUpstreams endpoint but for destination upstreams only.
|
||||||
func (m *Internal) IntentionUpstreamsDestination(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error {
|
func (m *Internal) IntentionUpstreamsDestination(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceList) error {
|
||||||
// Exit early if Connect hasn't been enabled.
|
// Exit early if Connect hasn't been enabled.
|
||||||
if !m.srv.config.ConnectEnabled {
|
if !m.srv.config.ConnectEnabled {
|
||||||
@ -571,6 +571,49 @@ func (m *Internal) ExportedPeeredServices(args *structs.DCSpecificRequest, reply
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PeeredUpstreams returns all imported services as upstreams for any service in a given partition.
|
||||||
|
// Cluster peering does not replicate intentions so all imported services are considered potential upstreams.
|
||||||
|
func (m *Internal) PeeredUpstreams(args *structs.PartitionSpecificRequest, reply *structs.IndexedPeeredServiceList) error {
|
||||||
|
// Exit early if Connect hasn't been enabled.
|
||||||
|
if !m.srv.config.ConnectEnabled {
|
||||||
|
return ErrConnectNotEnabled
|
||||||
|
}
|
||||||
|
if done, err := m.srv.ForwardRPC("Internal.PeeredUpstreams", args, reply); done {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO(peering): ACL for filtering
|
||||||
|
// authz, err := m.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, nil)
|
||||||
|
// if err != nil {
|
||||||
|
// return err
|
||||||
|
// }
|
||||||
|
|
||||||
|
if err := m.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return m.srv.blockingQuery(
|
||||||
|
&args.QueryOptions,
|
||||||
|
&reply.QueryMeta,
|
||||||
|
func(ws memdb.WatchSet, state *state.Store) error {
|
||||||
|
index, vips, err := state.VirtualIPsForAllImportedServices(ws, args.EnterpriseMeta)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
result := make([]structs.PeeredServiceName, 0, len(vips))
|
||||||
|
for _, vip := range vips {
|
||||||
|
result = append(result, vip.Service)
|
||||||
|
}
|
||||||
|
|
||||||
|
reply.Index, reply.Services = index, result
|
||||||
|
|
||||||
|
// TODO(peering): low priority: consider ACL filtering
|
||||||
|
// m.srv.filterACLWithAuthorizer(authz, reply)
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
|
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
|
||||||
// call to fire an event. The primary use case is to enable user events being
|
// call to fire an event. The primary use case is to enable user events being
|
||||||
// triggered in a remote DC.
|
// triggered in a remote DC.
|
||||||
|
@ -2776,3 +2776,40 @@ func TestInternal_CatalogOverview_ACLDeny(t *testing.T) {
|
|||||||
arg.Token = opReadToken.SecretID
|
arg.Token = opReadToken.SecretID
|
||||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out))
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.CatalogOverview", &arg, &out))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestInternal_PeeredUpstreams(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("too slow for testing.Short")
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Parallel()
|
||||||
|
_, s1 := testServerWithConfig(t)
|
||||||
|
|
||||||
|
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
state := s1.fsm.State()
|
||||||
|
|
||||||
|
// Services
|
||||||
|
// api local
|
||||||
|
// web peer: peer-a
|
||||||
|
// web-proxy peer: peer-a
|
||||||
|
// web peer: peer-b
|
||||||
|
// web-proxy peer: peer-b
|
||||||
|
registerLocalAndRemoteServicesVIPEnabled(t, state)
|
||||||
|
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
|
||||||
|
args := structs.PartitionSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
|
||||||
|
}
|
||||||
|
var out structs.IndexedPeeredServiceList
|
||||||
|
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Internal.PeeredUpstreams", &args, &out))
|
||||||
|
|
||||||
|
require.Len(t, out.Services, 2)
|
||||||
|
expect := []structs.PeeredServiceName{
|
||||||
|
{Peer: "peer-a", ServiceName: structs.NewServiceName("web", structs.DefaultEnterpriseMetaInDefaultPartition())},
|
||||||
|
{Peer: "peer-b", ServiceName: structs.NewServiceName("web", structs.DefaultEnterpriseMetaInDefaultPartition())},
|
||||||
|
}
|
||||||
|
require.Equal(t, expect, out.Services)
|
||||||
|
}
|
||||||
|
@ -117,7 +117,13 @@ func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *Restore) ServiceVirtualIP(req ServiceVirtualIP) error {
|
func (s *Restore) ServiceVirtualIP(req ServiceVirtualIP) error {
|
||||||
return s.tx.Insert(tableServiceVirtualIPs, req)
|
if err := s.tx.Insert(tableServiceVirtualIPs, req); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if err := updateVirtualIPMaxIndexes(s.tx, req.ModifyIndex, req.Service.ServiceName.PartitionOrDefault(), req.Service.Peer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Restore) FreeVirtualIP(req FreeVirtualIP) error {
|
func (s *Restore) FreeVirtualIP(req FreeVirtualIP) error {
|
||||||
@ -898,7 +904,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||||||
|
|
||||||
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
|
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
|
||||||
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
|
psn := structs.PeeredServiceName{Peer: svc.PeerName, ServiceName: sn}
|
||||||
vip, err := assignServiceVirtualIP(tx, psn)
|
vip, err := assignServiceVirtualIP(tx, idx, psn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("failed updating virtual IP: %s", err)
|
return fmt.Errorf("failed updating virtual IP: %s", err)
|
||||||
}
|
}
|
||||||
@ -923,7 +929,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||||||
}
|
}
|
||||||
if conf != nil {
|
if conf != nil {
|
||||||
termGatewayConf := conf.(*structs.TerminatingGatewayConfigEntry)
|
termGatewayConf := conf.(*structs.TerminatingGatewayConfigEntry)
|
||||||
addrs, err := getTermGatewayVirtualIPs(tx, termGatewayConf.Services, &svc.EnterpriseMeta)
|
addrs, err := getTermGatewayVirtualIPs(tx, idx, termGatewayConf.Services, &svc.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -978,7 +984,7 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||||||
|
|
||||||
// assignServiceVirtualIP assigns a virtual IP to the target service and updates
|
// assignServiceVirtualIP assigns a virtual IP to the target service and updates
|
||||||
// the global virtual IP counter if necessary.
|
// the global virtual IP counter if necessary.
|
||||||
func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string, error) {
|
func assignServiceVirtualIP(tx WriteTxn, idx uint64, psn structs.PeeredServiceName) (string, error) {
|
||||||
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn)
|
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, psn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
|
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||||
@ -1052,10 +1058,17 @@ func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string,
|
|||||||
assignedVIP := ServiceVirtualIP{
|
assignedVIP := ServiceVirtualIP{
|
||||||
Service: psn,
|
Service: psn,
|
||||||
IP: newEntry.IP,
|
IP: newEntry.IP,
|
||||||
|
RaftIndex: structs.RaftIndex{
|
||||||
|
ModifyIndex: idx,
|
||||||
|
CreateIndex: idx,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil {
|
if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil {
|
||||||
return "", fmt.Errorf("failed inserting service virtual IP entry: %s", err)
|
return "", fmt.Errorf("failed inserting service virtual IP entry: %s", err)
|
||||||
}
|
}
|
||||||
|
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
result, err := addIPOffset(startingVirtualIP, assignedVIP.IP)
|
result, err := addIPOffset(startingVirtualIP, assignedVIP.IP)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1064,6 +1077,20 @@ func assignServiceVirtualIP(tx WriteTxn, psn structs.PeeredServiceName) (string,
|
|||||||
return result.String(), nil
|
return result.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error {
|
||||||
|
// update per-partition max index
|
||||||
|
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil {
|
||||||
|
return fmt.Errorf("failed while updating partitioned index: %w", err)
|
||||||
|
}
|
||||||
|
if peerName != "" {
|
||||||
|
// track a separate max index for imported services
|
||||||
|
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs+".imported", partition)); err != nil {
|
||||||
|
return fmt.Errorf("failed while updating partitioned index for imported services: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func addIPOffset(a, b net.IP) (net.IP, error) {
|
func addIPOffset(a, b net.IP) (net.IP, error) {
|
||||||
a4 := a.To4()
|
a4 := a.To4()
|
||||||
b4 := b.To4()
|
b4 := b.To4()
|
||||||
@ -2899,6 +2926,35 @@ func (s *Store) VirtualIPForService(psn structs.PeeredServiceName) (string, erro
|
|||||||
return result.String(), nil
|
return result.String(), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// VirtualIPsForAllImportedServices returns a slice of ServiceVirtualIP for all
|
||||||
|
// VirtualIP-assignable services that have been imported by the partition represented in entMeta.
|
||||||
|
// Namespace is ignored.
|
||||||
|
func (s *Store) VirtualIPsForAllImportedServices(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []ServiceVirtualIP, error) {
|
||||||
|
tx := s.db.ReadTxn()
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
q := Query{
|
||||||
|
EnterpriseMeta: entMeta,
|
||||||
|
// Wildcard peername is used by prefix index to fetch all remote peers for a partition.
|
||||||
|
PeerName: "*",
|
||||||
|
}
|
||||||
|
iter, err := tx.Get(tableServiceVirtualIPs, indexID+"_prefix", q)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||||
|
}
|
||||||
|
ws.Add(iter.WatchCh())
|
||||||
|
|
||||||
|
idx := maxIndexTxn(tx, partitionedIndexEntryName(tableServiceVirtualIPs+".imported", entMeta.PartitionOrDefault()))
|
||||||
|
|
||||||
|
var vips []ServiceVirtualIP
|
||||||
|
for raw := iter.Next(); raw != nil; raw = iter.Next() {
|
||||||
|
vip := raw.(ServiceVirtualIP)
|
||||||
|
vips = append(vips, vip)
|
||||||
|
}
|
||||||
|
|
||||||
|
return idx, vips, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Store) ServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
|
func (s *Store) ServiceNamesOfKind(ws memdb.WatchSet, kind structs.ServiceKind) (uint64, []*KindServiceName, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
@ -3333,13 +3389,18 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, entMeta *acl.EnterpriseMeta) (map[string]structs.ServiceAddress, error) {
|
func getTermGatewayVirtualIPs(
|
||||||
|
tx WriteTxn,
|
||||||
|
idx uint64,
|
||||||
|
services []structs.LinkedService,
|
||||||
|
entMeta *acl.EnterpriseMeta,
|
||||||
|
) (map[string]structs.ServiceAddress, error) {
|
||||||
addrs := make(map[string]structs.ServiceAddress, len(services))
|
addrs := make(map[string]structs.ServiceAddress, len(services))
|
||||||
for _, s := range services {
|
for _, s := range services {
|
||||||
sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta}
|
sn := structs.ServiceName{Name: s.Name, EnterpriseMeta: *entMeta}
|
||||||
// Terminating Gateways cannot route to services in peered clusters
|
// Terminating Gateways cannot route to services in peered clusters
|
||||||
psn := structs.PeeredServiceName{ServiceName: sn, Peer: structs.DefaultPeerKeyword}
|
psn := structs.PeeredServiceName{ServiceName: sn, Peer: structs.DefaultPeerKeyword}
|
||||||
vip, err := assignServiceVirtualIP(tx, psn)
|
vip, err := assignServiceVirtualIP(tx, idx, psn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -3353,7 +3414,7 @@ func getTermGatewayVirtualIPs(tx WriteTxn, services []structs.LinkedService, ent
|
|||||||
func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.TerminatingGatewayConfigEntry, entMeta *acl.EnterpriseMeta) error {
|
func updateTerminatingGatewayVirtualIPs(tx WriteTxn, idx uint64, conf *structs.TerminatingGatewayConfigEntry, entMeta *acl.EnterpriseMeta) error {
|
||||||
// Build the current map of services with virtual IPs for this gateway
|
// Build the current map of services with virtual IPs for this gateway
|
||||||
services := conf.Services
|
services := conf.Services
|
||||||
addrs, err := getTermGatewayVirtualIPs(tx, services, entMeta)
|
addrs, err := getTermGatewayVirtualIPs(tx, idx, services, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -303,7 +303,12 @@ func updateKindServiceNamesIndex(tx WriteTxn, idx uint64, kind structs.ServiceKi
|
|||||||
func indexFromPeeredServiceName(psn structs.PeeredServiceName) ([]byte, error) {
|
func indexFromPeeredServiceName(psn structs.PeeredServiceName) ([]byte, error) {
|
||||||
peer := structs.LocalPeerKeyword
|
peer := structs.LocalPeerKeyword
|
||||||
if psn.Peer != "" {
|
if psn.Peer != "" {
|
||||||
peer = psn.Peer
|
// This prefix is unusual but necessary for reads which want
|
||||||
|
// to isolate peered resources.
|
||||||
|
// This allows you to prefix query for "peer:":
|
||||||
|
// internal/name
|
||||||
|
// peer:peername/name
|
||||||
|
peer = "peer:" + psn.Peer
|
||||||
}
|
}
|
||||||
|
|
||||||
var b indexBuilder
|
var b indexBuilder
|
||||||
|
@ -700,6 +700,21 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
|
|||||||
source: obj,
|
source: obj,
|
||||||
expected: []byte("internal\x00foo\x00"),
|
expected: []byte("internal\x00foo\x00"),
|
||||||
},
|
},
|
||||||
|
prefix: []indexValue{
|
||||||
|
{
|
||||||
|
source: Query{
|
||||||
|
Value: "foo",
|
||||||
|
},
|
||||||
|
expected: []byte("internal\x00foo\x00"),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
source: Query{
|
||||||
|
Value: "foo",
|
||||||
|
PeerName: "*", // test wildcard PeerName
|
||||||
|
},
|
||||||
|
expected: []byte("peer:"),
|
||||||
|
},
|
||||||
|
},
|
||||||
extra: []indexerTestCase{
|
extra: []indexerTestCase{
|
||||||
{
|
{
|
||||||
read: indexValue{
|
read: indexValue{
|
||||||
@ -709,11 +724,11 @@ func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
|
|||||||
},
|
},
|
||||||
Peer: "Billing",
|
Peer: "Billing",
|
||||||
},
|
},
|
||||||
expected: []byte("billing\x00foo\x00"),
|
expected: []byte("peer:billing\x00foo\x00"),
|
||||||
},
|
},
|
||||||
write: indexValue{
|
write: indexValue{
|
||||||
source: peeredObj,
|
source: peeredObj,
|
||||||
expected: []byte("billing\x00foo\x00"),
|
expected: []byte("peer:billing\x00foo\x00"),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -607,6 +607,8 @@ func (q NodeCheckQuery) PartitionOrDefault() string {
|
|||||||
type ServiceVirtualIP struct {
|
type ServiceVirtualIP struct {
|
||||||
Service structs.PeeredServiceName
|
Service structs.PeeredServiceName
|
||||||
IP net.IP
|
IP net.IP
|
||||||
|
|
||||||
|
structs.RaftIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
// FreeVirtualIP is used to store a virtual IP freed up by a service deregistration.
|
// FreeVirtualIP is used to store a virtual IP freed up by a service deregistration.
|
||||||
@ -631,9 +633,11 @@ func serviceVirtualIPTableSchema() *memdb.TableSchema {
|
|||||||
Name: indexID,
|
Name: indexID,
|
||||||
AllowMissing: false,
|
AllowMissing: false,
|
||||||
Unique: true,
|
Unique: true,
|
||||||
Indexer: indexerSingle[structs.PeeredServiceName, ServiceVirtualIP]{
|
Indexer: indexerSingleWithPrefix[structs.PeeredServiceName, ServiceVirtualIP, Query]{
|
||||||
readIndex: indexFromPeeredServiceName,
|
readIndex: indexFromPeeredServiceName,
|
||||||
writeIndex: indexFromServiceVirtualIP,
|
writeIndex: indexFromServiceVirtualIP,
|
||||||
|
// Read all peers in a cluster / partition
|
||||||
|
prefixIndex: prefixIndexFromQueryWithPeerWildcardable,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
@ -52,6 +52,29 @@ func prefixIndexFromQueryWithPeer(arg any) ([]byte, error) {
|
|||||||
return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg)
|
return nil, fmt.Errorf("unexpected type %T for Query prefix index", arg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// prefixIndexFromQueryWithPeerWildcardable allows for a wildcard "*" peerName
|
||||||
|
// to query for all peers _excluding_ structs.LocalPeerKeyword.
|
||||||
|
// Assumes that non-local peers are prefixed with "peer:".
|
||||||
|
func prefixIndexFromQueryWithPeerWildcardable(v Query) ([]byte, error) {
|
||||||
|
var b indexBuilder
|
||||||
|
|
||||||
|
peername := v.PeerOrEmpty()
|
||||||
|
if peername == "" {
|
||||||
|
b.String(strings.ToLower(structs.LocalPeerKeyword))
|
||||||
|
} else if peername == "*" {
|
||||||
|
// use b.Raw so we don't add null terminator to prefix
|
||||||
|
b.Raw([]byte("peer:"))
|
||||||
|
return b.Bytes(), nil
|
||||||
|
} else {
|
||||||
|
b.String(strings.ToLower("peer:" + peername))
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.Value != "" {
|
||||||
|
b.String(strings.ToLower(v.Value))
|
||||||
|
}
|
||||||
|
return b.Bytes(), nil
|
||||||
|
}
|
||||||
|
|
||||||
func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) {
|
func prefixIndexFromQueryNoNamespace(arg interface{}) ([]byte, error) {
|
||||||
return prefixIndexFromQuery(arg)
|
return prefixIndexFromQuery(arg)
|
||||||
}
|
}
|
||||||
|
@ -83,6 +83,12 @@ func CacheLeafCertificate(c *cache.Cache) proxycfg.LeafCertificate {
|
|||||||
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
|
return &cacheProxyDataSource[*cachetype.ConnectCALeafRequest]{c, cachetype.ConnectCALeafName}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CachePeeredUpstreams satisfies the proxycfg.PeeredUpstreams interface
|
||||||
|
// by sourcing data from the agent cache.
|
||||||
|
func CachePeeredUpstreams(c *cache.Cache) proxycfg.PeeredUpstreams {
|
||||||
|
return &cacheProxyDataSource[*structs.PartitionSpecificRequest]{c, cachetype.PeeredUpstreamsName}
|
||||||
|
}
|
||||||
|
|
||||||
// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by
|
// CachePrepraredQuery satisfies the proxycfg.PreparedQuery interface by
|
||||||
// sourcing data from the agent cache.
|
// sourcing data from the agent cache.
|
||||||
func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery {
|
func CachePrepraredQuery(c *cache.Cache) proxycfg.PreparedQuery {
|
||||||
|
@ -69,6 +69,8 @@ type DataSources struct {
|
|||||||
// notification channel.
|
// notification channel.
|
||||||
LeafCertificate LeafCertificate
|
LeafCertificate LeafCertificate
|
||||||
|
|
||||||
|
PeeredUpstreams PeeredUpstreams
|
||||||
|
|
||||||
// PreparedQuery provides updates about the results of a prepared query.
|
// PreparedQuery provides updates about the results of a prepared query.
|
||||||
PreparedQuery PreparedQuery
|
PreparedQuery PreparedQuery
|
||||||
|
|
||||||
@ -170,6 +172,12 @@ type LeafCertificate interface {
|
|||||||
Notify(ctx context.Context, req *cachetype.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error
|
Notify(ctx context.Context, req *cachetype.ConnectCALeafRequest, correlationID string, ch chan<- UpdateEvent) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PeeredUpstreams is the interface used to consume updates about upstreams
|
||||||
|
// for all peered targets in a given partition.
|
||||||
|
type PeeredUpstreams interface {
|
||||||
|
Notify(ctx context.Context, req *structs.PartitionSpecificRequest, correlationID string, ch chan<- UpdateEvent) error
|
||||||
|
}
|
||||||
|
|
||||||
// PreparedQuery is the interface used to consume updates about the results of
|
// PreparedQuery is the interface used to consume updates about the results of
|
||||||
// a prepared query.
|
// a prepared query.
|
||||||
type PreparedQuery interface {
|
type PreparedQuery interface {
|
||||||
|
@ -682,6 +682,30 @@ func (r *ServiceDumpRequest) CacheMinIndex() uint64 {
|
|||||||
return r.QueryOptions.MinQueryIndex
|
return r.QueryOptions.MinQueryIndex
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// PartitionSpecificRequest is used to query about a specific partition.
|
||||||
|
type PartitionSpecificRequest struct {
|
||||||
|
Datacenter string
|
||||||
|
|
||||||
|
acl.EnterpriseMeta
|
||||||
|
QueryOptions
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *PartitionSpecificRequest) RequestDatacenter() string {
|
||||||
|
return r.Datacenter
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *PartitionSpecificRequest) CacheInfo() cache.RequestInfo {
|
||||||
|
return cache.RequestInfo{
|
||||||
|
Token: r.Token,
|
||||||
|
Datacenter: r.Datacenter,
|
||||||
|
MinIndex: r.MinQueryIndex,
|
||||||
|
Timeout: r.MaxQueryTime,
|
||||||
|
MaxAge: r.MaxAge,
|
||||||
|
MustRevalidate: r.MustRevalidate,
|
||||||
|
Key: r.EnterpriseMeta.PartitionOrDefault(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ServiceSpecificRequest is used to query about a specific service
|
// ServiceSpecificRequest is used to query about a specific service
|
||||||
type ServiceSpecificRequest struct {
|
type ServiceSpecificRequest struct {
|
||||||
Datacenter string
|
Datacenter string
|
||||||
@ -2211,6 +2235,11 @@ type IndexedServiceList struct {
|
|||||||
QueryMeta
|
QueryMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type IndexedPeeredServiceList struct {
|
||||||
|
Services []PeeredServiceName
|
||||||
|
QueryMeta
|
||||||
|
}
|
||||||
|
|
||||||
type IndexedServiceNodes struct {
|
type IndexedServiceNodes struct {
|
||||||
ServiceNodes ServiceNodes
|
ServiceNodes ServiceNodes
|
||||||
QueryMeta
|
QueryMeta
|
||||||
|
Loading…
x
Reference in New Issue
Block a user