mirror of https://github.com/status-im/consul.git
consul: add virtual IP generation for connect services
This commit is contained in:
parent
0c7a2257ec
commit
4f2cfee4b0
|
@ -885,3 +885,27 @@ func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *s
|
|||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func (c *Catalog) VirtualIPForService(args *structs.ServiceSpecificRequest, reply *string) error {
|
||||
if done, err := c.srv.ForwardRPC("Catalog.VirtualIPForService", args, reply); done {
|
||||
return err
|
||||
}
|
||||
|
||||
var authzContext acl.AuthorizerContext
|
||||
authz, err := c.srv.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := c.srv.validateEnterpriseRequest(&args.EnterpriseMeta, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
|
||||
return acl.ErrPermissionDenied
|
||||
}
|
||||
|
||||
state := c.srv.fsm.State()
|
||||
*reply, err = state.VirtualIPForService(structs.NewServiceName(args.ServiceName, &args.EnterpriseMeta))
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -3905,3 +3905,100 @@ node "node" {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalog_VirtualIPForService(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.Build = "1.11.0"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
err := s1.fsm.State().EnsureRegistration(1, &structs.RegisterRequest{
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
Service: "api",
|
||||
Connect: structs.ServiceConnect{
|
||||
Native: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "api",
|
||||
}
|
||||
var out string
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out))
|
||||
require.Equal(t, "240.0.0.1", out)
|
||||
}
|
||||
|
||||
func TestCatalog_VirtualIPForService_ACLDeny(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
t.Parallel()
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
c.ACLsEnabled = true
|
||||
c.ACLMasterToken = "root"
|
||||
c.ACLResolverSettings.ACLDefaultPolicy = "deny"
|
||||
c.Build = "1.11.0"
|
||||
})
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
err := s1.fsm.State().EnsureRegistration(1, &structs.RegisterRequest{
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
Service: "api",
|
||||
Connect: structs.ServiceConnect{
|
||||
Native: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// Call the endpoint with no token and expect permission denied.
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
ServiceName: "api",
|
||||
}
|
||||
var out string
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out)
|
||||
require.Contains(t, err.Error(), acl.ErrPermissionDenied.Error())
|
||||
require.Equal(t, "", out)
|
||||
|
||||
id := createToken(t, codec, `
|
||||
service "api" {
|
||||
policy = "read"
|
||||
}`)
|
||||
|
||||
// Now try with the token and it will go through.
|
||||
args.Token = id
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out))
|
||||
require.Equal(t, "240.0.0.1", out)
|
||||
|
||||
// Make sure we still get permission denied for an unknown service.
|
||||
args.ServiceName = "nope"
|
||||
var out2 string
|
||||
err = msgpackrpc.CallWithCodec(codec, "Catalog.VirtualIPForService", &args, &out2)
|
||||
require.Contains(t, err.Error(), acl.ErrPermissionDenied.Error())
|
||||
require.Equal(t, "", out2)
|
||||
}
|
||||
|
|
|
@ -33,9 +33,14 @@ func init() {
|
|||
registerRestorer(structs.ACLAuthMethodSetRequestType, restoreAuthMethod)
|
||||
registerRestorer(structs.FederationStateRequestType, restoreFederationState)
|
||||
registerRestorer(structs.SystemMetadataRequestType, restoreSystemMetadata)
|
||||
registerRestorer(structs.ServiceVirtualIPRequestType, restoreServiceVirtualIP)
|
||||
registerRestorer(structs.FreeVirtualIPRequestType, restoreFreeVirtualIP)
|
||||
}
|
||||
|
||||
func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
if err := s.persistVirtualIPs(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := s.persistNodes(sink, encoder); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -510,6 +515,38 @@ func (s *snapshot) persistIndex(sink raft.SnapshotSink, encoder *codec.Encoder)
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *snapshot) persistVirtualIPs(sink raft.SnapshotSink, encoder *codec.Encoder) error {
|
||||
serviceVIPs, err := s.state.ServiceVirtualIPs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for entry := serviceVIPs.Next(); entry != nil; entry = serviceVIPs.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.ServiceVirtualIPRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(entry.(state.ServiceVirtualIP)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
freeVIPs, err := s.state.FreeVirtualIPs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for entry := freeVIPs.Next(); entry != nil; entry = freeVIPs.Next() {
|
||||
if _, err := sink.Write([]byte{byte(structs.FreeVirtualIPRequestType)}); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := encoder.Encode(entry.(state.FreeVirtualIP)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req structs.RegisterRequest
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
|
@ -790,3 +827,25 @@ func restoreSystemMetadata(header *SnapshotHeader, restore *state.Restore, decod
|
|||
}
|
||||
return restore.SystemMetadataEntry(&req)
|
||||
}
|
||||
|
||||
func restoreServiceVirtualIP(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req state.ServiceVirtualIP
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.ServiceVirtualIP(req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func restoreFreeVirtualIP(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
|
||||
var req state.FreeVirtualIP
|
||||
if err := decoder.Decode(&req); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := restore.FreeVirtualIP(req); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -60,6 +60,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
Port: 80,
|
||||
Connect: connectConf,
|
||||
})
|
||||
|
||||
fsm.state.EnsureService(4, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000})
|
||||
fsm.state.EnsureService(5, "baz", &structs.NodeService{ID: "web", Service: "web", Tags: nil, Address: "127.0.0.2", Port: 80})
|
||||
fsm.state.EnsureService(6, "baz", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000})
|
||||
|
@ -434,6 +435,35 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, fsm.state.EnsureConfigEntry(27, meshConfig))
|
||||
|
||||
// Connect-native services for virtual IP generation
|
||||
systemMetadataEntry = &structs.SystemMetadataEntry{
|
||||
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||
Value: "true",
|
||||
}
|
||||
require.NoError(t, fsm.state.SystemMetadataSet(28, systemMetadataEntry))
|
||||
|
||||
fsm.state.EnsureService(29, "foo", &structs.NodeService{
|
||||
ID: "frontend",
|
||||
Service: "frontend",
|
||||
Address: "127.0.0.1",
|
||||
Port: 8000,
|
||||
Connect: connectConf,
|
||||
})
|
||||
vip, err := fsm.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vip, "240.0.0.1")
|
||||
|
||||
fsm.state.EnsureService(30, "foo", &structs.NodeService{
|
||||
ID: "backend",
|
||||
Service: "backend",
|
||||
Address: "127.0.0.1",
|
||||
Port: 9000,
|
||||
Connect: connectConf,
|
||||
})
|
||||
vip, err = fsm.state.VirtualIPForService(structs.NewServiceName("backend", nil))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vip, "240.0.0.2")
|
||||
|
||||
// Snapshot
|
||||
snap, err := fsm.Snapshot()
|
||||
require.NoError(t, err)
|
||||
|
@ -519,7 +549,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
|
||||
_, fooSrv, err := fsm2.state.NodeServices(nil, "foo", nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, fooSrv.Services, 2)
|
||||
require.Len(t, fooSrv.Services, 4)
|
||||
require.Contains(t, fooSrv.Services["db"].Tags, "primary")
|
||||
require.True(t, stringslice.Contains(fooSrv.Services["db"].Tags, "primary"))
|
||||
require.Equal(t, 5001, fooSrv.Services["db"].Port)
|
||||
|
@ -538,6 +568,14 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
require.Equal(t, uint64(7), checks[0].CreateIndex)
|
||||
require.Equal(t, uint64(25), checks[0].ModifyIndex)
|
||||
|
||||
// Verify virtual IPs are consistent.
|
||||
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("frontend", nil))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vip, "240.0.0.1")
|
||||
vip, err = fsm2.state.VirtualIPForService(structs.NewServiceName("backend", nil))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, vip, "240.0.0.2")
|
||||
|
||||
// Verify key is set
|
||||
_, d, err := fsm2.state.KVSGet(nil, "/test", nil)
|
||||
require.NoError(t, err)
|
||||
|
@ -700,8 +738,8 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
// Verify system metadata is restored.
|
||||
_, systemMetadataLoaded, err := fsm2.state.SystemMetadataList(nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, systemMetadataLoaded, 1)
|
||||
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[0])
|
||||
require.Len(t, systemMetadataLoaded, 2)
|
||||
require.Equal(t, systemMetadataEntry, systemMetadataLoaded[1])
|
||||
|
||||
// Verify service-intentions is restored
|
||||
_, serviceIxnEntry, err := fsm2.state.ConfigEntry(nil, structs.ServiceIntentions, "foo", structs.DefaultEnterpriseMetaInDefaultPartition())
|
||||
|
|
|
@ -55,6 +55,8 @@ var (
|
|||
// minCentralizedConfigVersion is the minimum Consul version in which centralized
|
||||
// config is supported
|
||||
minCentralizedConfigVersion = version.Must(version.NewVersion("1.5.0"))
|
||||
|
||||
minVirtualIPVersion = version.Must(version.NewVersion("1.11.0"))
|
||||
)
|
||||
|
||||
// monitorLeadership is used to monitor if we acquire or lose our role
|
||||
|
@ -186,6 +188,10 @@ RECONCILE:
|
|||
s.logger.Error("failed to reconcile", "error", err)
|
||||
goto WAIT
|
||||
}
|
||||
if err := s.setVirtualIPFlag(); err != nil {
|
||||
s.logger.Error("failed to set virtual IP flag", "error", err)
|
||||
goto WAIT
|
||||
}
|
||||
|
||||
// Initial reconcile worked, now we can process the channel
|
||||
// updates
|
||||
|
@ -213,6 +219,7 @@ WAIT:
|
|||
goto RECONCILE
|
||||
case member := <-reconcileCh:
|
||||
s.reconcileMember(member)
|
||||
s.setVirtualIPFlag()
|
||||
case index := <-s.tombstoneGC.ExpireCh():
|
||||
go s.reapTombstones(index)
|
||||
case errCh := <-s.reassertLeaderCh:
|
||||
|
@ -315,6 +322,10 @@ func (s *Server) establishLeadership(ctx context.Context) error {
|
|||
return err
|
||||
}
|
||||
|
||||
if err := s.setVirtualIPFlag(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.setConsistentReadReady()
|
||||
|
||||
s.logger.Debug("successfully established leadership", "duration", time.Since(start))
|
||||
|
@ -881,6 +892,25 @@ func (s *Server) bootstrapConfigEntries(entries []structs.ConfigEntry) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *Server) setVirtualIPFlag() error {
|
||||
// Return early if the flag is already set.
|
||||
val, err := s.getSystemMetadata(structs.SystemMetadataVirtualIPsEnabled)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if val != "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
if ok, _ := ServersInDCMeetMinimumVersion(s, s.config.Datacenter, minVirtualIPVersion); !ok {
|
||||
s.logger.Warn(fmt.Sprintf("can't allocate Virtual IPs until all servers >= %s",
|
||||
minVirtualIPVersion.String()))
|
||||
return nil
|
||||
}
|
||||
|
||||
return s.setSystemMetadataKey(structs.SystemMetadataVirtualIPsEnabled, "true")
|
||||
}
|
||||
|
||||
// reconcileReaped is used to reconcile nodes that have failed and been reaped
|
||||
// from Serf but remain in the catalog. This is done by looking for unknown nodes with serfHealth checks registered.
|
||||
// We generate a "reap" event to cause the node to be cleaned up.
|
||||
|
@ -996,6 +1026,7 @@ func (s *Server) reconcileMember(member serf.Member) error {
|
|||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -2120,3 +2120,86 @@ func TestDatacenterSupportsIntentionsAsConfigEntries(t *testing.T) {
|
|||
)
|
||||
})
|
||||
}
|
||||
|
||||
func TestLeader_EnableVirtualIPs(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
|
||||
conf := func(c *Config) {
|
||||
c.Bootstrap = false
|
||||
c.BootstrapExpect = 3
|
||||
c.Datacenter = "dc1"
|
||||
c.Build = "1.11.0"
|
||||
}
|
||||
dir1, s1 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
dir2, s2 := testServerWithConfig(t, conf)
|
||||
defer os.RemoveAll(dir2)
|
||||
defer s2.Shutdown()
|
||||
|
||||
dir3, s3 := testServerWithConfig(t, func(c *Config) {
|
||||
conf(c)
|
||||
c.Build = "1.10.0"
|
||||
})
|
||||
defer os.RemoveAll(dir3)
|
||||
defer s3.Shutdown()
|
||||
|
||||
// Try to join and wait for all servers to get promoted
|
||||
joinLAN(t, s2, s1)
|
||||
joinLAN(t, s3, s1)
|
||||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Should have nothing stored.
|
||||
state := s1.fsm.State()
|
||||
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
|
||||
require.NoError(t, err)
|
||||
require.Nil(t, entry)
|
||||
|
||||
// Register a connect-native service and make sure we don't have a virtual IP yet.
|
||||
err = state.EnsureRegistration(10, &structs.RegisterRequest{
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
Service: &structs.NodeService{
|
||||
Service: "api",
|
||||
Connect: structs.ServiceConnect{
|
||||
Native: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
vip, err := state.VirtualIPForService(structs.NewServiceName("api", nil))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "", vip)
|
||||
|
||||
// Leave s3 and wait for the version to get updated.
|
||||
require.NoError(t, s3.Leave())
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
_, entry, err := state.SystemMetadataGet(nil, structs.SystemMetadataVirtualIPsEnabled)
|
||||
require.NoError(r, err)
|
||||
require.NotNil(r, entry)
|
||||
require.Equal(r, "true", entry.Value)
|
||||
})
|
||||
|
||||
// Update the connect-native service - now there should be a virtual IP assigned.
|
||||
err = state.EnsureRegistration(20, &structs.RegisterRequest{
|
||||
Node: "foo",
|
||||
Address: "127.0.0.2",
|
||||
Service: &structs.NodeService{
|
||||
Service: "api",
|
||||
Connect: structs.ServiceConnect{
|
||||
Native: true,
|
||||
},
|
||||
},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
vip, err = state.VirtualIPForService(structs.NewServiceName("api", nil))
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, "240.0.0.1", vip)
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package state
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
|
@ -27,6 +28,14 @@ const (
|
|||
minUUIDLookupLen = 2
|
||||
)
|
||||
|
||||
var (
|
||||
// startingVirtualIP is the start of the virtual IP range we assign to services.
|
||||
// The effective CIDR range is startingVirtualIP to (startingVirtualIP + virtualIPMaxOffset).
|
||||
startingVirtualIP = net.IP{240, 0, 0, 0}
|
||||
|
||||
virtualIPMaxOffset = net.IP{15, 255, 255, 254}
|
||||
)
|
||||
|
||||
func resizeNodeLookupKey(s string) string {
|
||||
l := len(s)
|
||||
|
||||
|
@ -72,6 +81,24 @@ func (s *Snapshot) Checks(node string, entMeta *structs.EnterpriseMeta) (memdb.R
|
|||
})
|
||||
}
|
||||
|
||||
// ServiceVirtualIPs is used to pull the service virtual IP mappings for use during snapshots.
|
||||
func (s *Snapshot) ServiceVirtualIPs() (memdb.ResultIterator, error) {
|
||||
iter, err := s.tx.Get(tableServiceVirtualIPs, indexID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// FreeVirtualIPs is used to pull the freed virtual IPs for use during snapshots.
|
||||
func (s *Snapshot) FreeVirtualIPs() (memdb.ResultIterator, error) {
|
||||
iter, err := s.tx.Get(tableFreeVirtualIPs, indexID)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// Registration is used to make sure a node, service, and check registration is
|
||||
// performed within a single transaction to avoid race conditions on state
|
||||
// updates.
|
||||
|
@ -79,6 +106,14 @@ func (s *Restore) Registration(idx uint64, req *structs.RegisterRequest) error {
|
|||
return s.store.ensureRegistrationTxn(s.tx, idx, true, req, true)
|
||||
}
|
||||
|
||||
func (s *Restore) ServiceVirtualIP(req ServiceVirtualIP) error {
|
||||
return s.tx.Insert(tableServiceVirtualIPs, req)
|
||||
}
|
||||
|
||||
func (s *Restore) FreeVirtualIP(req FreeVirtualIP) error {
|
||||
return s.tx.Insert(tableFreeVirtualIPs, req)
|
||||
}
|
||||
|
||||
// EnsureRegistration is used to make sure a node, service, and check
|
||||
// registration is performed within a single transaction to avoid race
|
||||
// conditions on state updates.
|
||||
|
@ -706,11 +741,35 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||
if err = checkGatewayWildcardsAndUpdate(tx, idx, svc); err != nil {
|
||||
return fmt.Errorf("failed updating gateway mapping: %s", err)
|
||||
}
|
||||
|
||||
// Update upstream/downstream mappings if it's a connect service
|
||||
if svc.Kind == structs.ServiceKindConnectProxy {
|
||||
if svc.Kind == structs.ServiceKindConnectProxy || svc.Connect.Native {
|
||||
if err = updateMeshTopology(tx, idx, node, svc, existing); err != nil {
|
||||
return fmt.Errorf("failed updating upstream/downstream association")
|
||||
}
|
||||
|
||||
supported, err := virtualIPsSupported(tx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Update the virtual IP for the service
|
||||
if supported {
|
||||
service := svc.Service
|
||||
if svc.Kind == structs.ServiceKindConnectProxy {
|
||||
service = svc.Proxy.DestinationServiceName
|
||||
}
|
||||
|
||||
sn := structs.ServiceName{Name: service, EnterpriseMeta: svc.EnterpriseMeta}
|
||||
vip, err := assignServiceVirtualIP(tx, sn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed updating virtual IP: %s", err)
|
||||
}
|
||||
if svc.TaggedAddresses == nil {
|
||||
svc.TaggedAddresses = make(map[string]structs.ServiceAddress)
|
||||
}
|
||||
svc.TaggedAddresses[structs.TaggedAddressVirtualIP] = structs.ServiceAddress{Address: vip, Port: svc.Port}
|
||||
}
|
||||
}
|
||||
|
||||
// Create the service node entry and populate the indexes. Note that
|
||||
|
@ -751,6 +810,120 @@ func ensureServiceTxn(tx WriteTxn, idx uint64, node string, preserveIndexes bool
|
|||
return catalogInsertService(tx, entry)
|
||||
}
|
||||
|
||||
// assignServiceVirtualIP assigns a virtual IP to the target service and updates
|
||||
// the global virtual IP counter if necessary.
|
||||
func assignServiceVirtualIP(tx WriteTxn, sn structs.ServiceName) (string, error) {
|
||||
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||
}
|
||||
|
||||
// Service already has a virtual IP assigned, nothing to do.
|
||||
if serviceVIP != nil {
|
||||
sVIP := serviceVIP.(ServiceVirtualIP).IP
|
||||
result, err := addIPOffset(startingVirtualIP, sVIP)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
return result.String(), nil
|
||||
}
|
||||
|
||||
// Get the next available virtual IP, drawing from any freed from deleted services
|
||||
// first and then falling back to the global counter if none are available.
|
||||
latestVIP, err := tx.First(tableFreeVirtualIPs, indexCounterOnly, false)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed virtual IP index lookup: %s", err)
|
||||
}
|
||||
if latestVIP == nil {
|
||||
latestVIP, err = tx.First(tableFreeVirtualIPs, indexCounterOnly, true)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed virtual IP index lookup: %s", err)
|
||||
}
|
||||
}
|
||||
if latestVIP != nil {
|
||||
if err := tx.Delete(tableFreeVirtualIPs, latestVIP); err != nil {
|
||||
return "", fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
var latest FreeVirtualIP
|
||||
if latestVIP == nil {
|
||||
latest = FreeVirtualIP{
|
||||
IP: net.IPv4zero,
|
||||
IsCounter: true,
|
||||
}
|
||||
} else {
|
||||
latest = latestVIP.(FreeVirtualIP)
|
||||
}
|
||||
|
||||
// Store the next virtual IP from the counter if there aren't any freed IPs to draw from.
|
||||
// Then increment to store the next free virtual IP.
|
||||
newEntry := FreeVirtualIP{
|
||||
IP: latest.IP,
|
||||
IsCounter: latest.IsCounter,
|
||||
}
|
||||
if latest.IsCounter {
|
||||
newEntry.IP = make(net.IP, len(latest.IP))
|
||||
copy(newEntry.IP, latest.IP)
|
||||
for i := len(newEntry.IP) - 1; i >= 0; i-- {
|
||||
newEntry.IP[i]++
|
||||
if newEntry.IP[i] != 0 {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Out of virtual IPs, fail registration.
|
||||
if newEntry.IP.Equal(virtualIPMaxOffset) {
|
||||
return "", fmt.Errorf("cannot allocate any more unique service virtual IPs")
|
||||
}
|
||||
|
||||
if err := tx.Insert(tableFreeVirtualIPs, newEntry); err != nil {
|
||||
return "", fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
assignedVIP := ServiceVirtualIP{
|
||||
Service: sn,
|
||||
IP: newEntry.IP,
|
||||
}
|
||||
if err := tx.Insert(tableServiceVirtualIPs, assignedVIP); err != nil {
|
||||
return "", fmt.Errorf("failed inserting service virtual IP entry: %s", err)
|
||||
}
|
||||
|
||||
result, err := addIPOffset(startingVirtualIP, assignedVIP.IP)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return result.String(), nil
|
||||
}
|
||||
|
||||
func addIPOffset(a, b net.IP) (net.IP, error) {
|
||||
a4 := a.To4()
|
||||
b4 := b.To4()
|
||||
if a4 == nil || b4 == nil {
|
||||
return nil, errors.New("ip is not ipv4")
|
||||
}
|
||||
|
||||
var raw uint64
|
||||
for i := 0; i < 4; i++ {
|
||||
raw = raw<<8 + uint64(a4[i]) + uint64(b4[i])
|
||||
}
|
||||
return net.IPv4(byte(raw>>24), byte(raw>>16), byte(raw>>8), byte(raw)), nil
|
||||
}
|
||||
|
||||
func virtualIPsSupported(tx ReadTxn, ws memdb.WatchSet) (bool, error) {
|
||||
_, entry, err := systemMetadataGetTxn(tx, ws, structs.SystemMetadataVirtualIPsEnabled)
|
||||
if err != nil {
|
||||
return false, fmt.Errorf("failed system metadata lookup: %s", err)
|
||||
}
|
||||
if entry == nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return entry.Value != "", nil
|
||||
}
|
||||
|
||||
// Services returns all services along with a list of associated tags.
|
||||
func (s *Store) Services(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Services, error) {
|
||||
tx := s.db.Txn(false)
|
||||
|
@ -1515,6 +1688,9 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
if err := cleanupGatewayWildcards(tx, idx, svc); err != nil {
|
||||
return fmt.Errorf("failed to clean up gateway-service associations for %q: %v", name.String(), err)
|
||||
}
|
||||
if err := freeServiceVirtualIP(tx, svc.ServiceName, entMeta); err != nil {
|
||||
return fmt.Errorf("failed to clean up virtual IP for %q: %v", name.String(), err)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return fmt.Errorf("Could not find any service %s: %s", svc.ServiceName, err)
|
||||
|
@ -1523,6 +1699,40 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
return nil
|
||||
}
|
||||
|
||||
// freeServiceVirtualIP is used to free a virtual IP for a service after the last instance
|
||||
// is removed.
|
||||
func freeServiceVirtualIP(tx WriteTxn, svc string, entMeta *structs.EnterpriseMeta) error {
|
||||
supported, err := virtualIPsSupported(tx, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !supported {
|
||||
return nil
|
||||
}
|
||||
|
||||
sn := structs.NewServiceName(svc, entMeta)
|
||||
serviceVIP, err := tx.First(tableServiceVirtualIPs, indexID, sn)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||
}
|
||||
// Service has no virtual IP assigned, nothing to do.
|
||||
if serviceVIP == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete the service virtual IP and add it to the freed IPs list.
|
||||
if err := tx.Delete(tableServiceVirtualIPs, serviceVIP); err != nil {
|
||||
return fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||
}
|
||||
|
||||
newEntry := FreeVirtualIP{IP: serviceVIP.(ServiceVirtualIP).IP}
|
||||
if err := tx.Insert(tableFreeVirtualIPs, newEntry); err != nil {
|
||||
return fmt.Errorf("failed updating freed virtual IP table: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnsureCheck is used to store a check registration in the db.
|
||||
func (s *Store) EnsureCheck(idx uint64, hc *structs.HealthCheck) error {
|
||||
tx := s.db.WriteTxn(idx)
|
||||
|
@ -2297,6 +2507,25 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
|
|||
return lib.MaxUint64(maxIdx, idx), results, nil
|
||||
}
|
||||
|
||||
func (s *Store) VirtualIPForService(sn structs.ServiceName) (string, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
vip, err := tx.First(tableServiceVirtualIPs, indexID, sn)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed service virtual IP lookup: %s", err)
|
||||
}
|
||||
if vip == nil {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
result, err := addIPOffset(startingVirtualIP, vip.(ServiceVirtualIP).IP)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return result.String(), nil
|
||||
}
|
||||
|
||||
// parseCheckServiceNodes is used to parse through a given set of services,
|
||||
// and query for an associated node and a set of checks. This is the inner
|
||||
// method used to return a rich set of results from a more simple query.
|
||||
|
|
|
@ -73,6 +73,11 @@ func TestServiceHealthSnapshot(t *testing.T) {
|
|||
func TestServiceHealthSnapshot_ConnectTopic(t *testing.T) {
|
||||
store := NewStateStore(nil)
|
||||
|
||||
require.NoError(t, store.SystemMetadataSet(0, &structs.SystemMetadataEntry{
|
||||
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||
Value: "true",
|
||||
}))
|
||||
|
||||
counter := newIndexCounter()
|
||||
err := store.EnsureRegistration(counter.Next(), testServiceRegistration(t, "db"))
|
||||
require.NoError(t, err)
|
||||
|
@ -1574,6 +1579,10 @@ func TestServiceHealthEventsFromChanges(t *testing.T) {
|
|||
|
||||
func (tc eventsTestCase) run(t *testing.T) {
|
||||
s := NewStateStore(nil)
|
||||
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
|
||||
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||
Value: "true",
|
||||
}))
|
||||
|
||||
setupIndex := uint64(10)
|
||||
mutateIndex := uint64(100)
|
||||
|
@ -1936,7 +1945,14 @@ func evServiceUnchanged(e *stream.Event) error {
|
|||
// evConnectNative option converts the base event to represent a connect-native
|
||||
// service instance.
|
||||
func evConnectNative(e *stream.Event) error {
|
||||
getPayloadCheckServiceNode(e.Payload).Service.Connect.Native = true
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Service.Connect.Native = true
|
||||
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||
structs.TaggedAddressVirtualIP: {
|
||||
Address: "240.0.0.1",
|
||||
Port: csn.Service.Port,
|
||||
},
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -1969,6 +1985,13 @@ func evSidecar(e *stream.Event) error {
|
|||
csn.Service.Proxy.DestinationServiceName = svc
|
||||
csn.Service.Proxy.DestinationServiceID = svc
|
||||
|
||||
csn.Service.TaggedAddresses = map[string]structs.ServiceAddress{
|
||||
structs.TaggedAddressVirtualIP: {
|
||||
Address: "240.0.0.1",
|
||||
Port: csn.Service.Port,
|
||||
},
|
||||
}
|
||||
|
||||
// Convert the check to point to the right ID now. This isn't totally
|
||||
// realistic - sidecars should have alias checks etc but this is good enough
|
||||
// to test this code path.
|
||||
|
@ -1990,7 +2013,12 @@ func evSidecar(e *stream.Event) error {
|
|||
// amount to simulate a service change. Can be used with evSidecar since it's a
|
||||
// relative change (+10).
|
||||
func evMutatePort(e *stream.Event) error {
|
||||
getPayloadCheckServiceNode(e.Payload).Service.Port += 10
|
||||
csn := getPayloadCheckServiceNode(e.Payload)
|
||||
csn.Service.Port += 10
|
||||
if addr, ok := csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]; ok {
|
||||
addr.Port = csn.Service.Port
|
||||
csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP] = addr
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -2067,6 +2095,10 @@ func evRenameService(e *stream.Event) error {
|
|||
// We don't need to update our own details, only the name of the destination
|
||||
csn.Service.Proxy.DestinationServiceName += "_changed"
|
||||
|
||||
taggedAddr := csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||
taggedAddr.Address = "240.0.0.2"
|
||||
csn.Service.TaggedAddresses[structs.TaggedAddressVirtualIP] = taggedAddr
|
||||
|
||||
if e.Topic == topicServiceHealthConnect {
|
||||
payload := e.Payload.(EventPayloadCheckServiceNode)
|
||||
payload.overrideKey = csn.Service.Proxy.DestinationServiceName
|
||||
|
|
|
@ -4,6 +4,8 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
@ -386,3 +388,27 @@ func testIndexerTableServices() map[string]indexerTestCase {
|
|||
},
|
||||
}
|
||||
}
|
||||
|
||||
func testIndexerTableServiceVirtualIPs() map[string]indexerTestCase {
|
||||
obj := ServiceVirtualIP{
|
||||
Service: structs.ServiceName{
|
||||
Name: "foo",
|
||||
},
|
||||
IP: net.ParseIP("127.0.0.1"),
|
||||
}
|
||||
|
||||
return map[string]indexerTestCase{
|
||||
indexID: {
|
||||
read: indexValue{
|
||||
source: structs.ServiceName{
|
||||
Name: "foo",
|
||||
},
|
||||
expected: []byte("foo\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: obj,
|
||||
expected: []byte("foo\x00"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package state
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
|
@ -11,11 +12,13 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
tableNodes = "nodes"
|
||||
tableServices = "services"
|
||||
tableChecks = "checks"
|
||||
tableGatewayServices = "gateway-services"
|
||||
tableMeshTopology = "mesh-topology"
|
||||
tableNodes = "nodes"
|
||||
tableServices = "services"
|
||||
tableChecks = "checks"
|
||||
tableGatewayServices = "gateway-services"
|
||||
tableMeshTopology = "mesh-topology"
|
||||
tableServiceVirtualIPs = "service-virtual-ips"
|
||||
tableFreeVirtualIPs = "free-virtual-ips"
|
||||
|
||||
indexID = "id"
|
||||
indexService = "service"
|
||||
|
@ -30,6 +33,7 @@ const (
|
|||
indexGateway = "gateway"
|
||||
indexUUID = "uuid"
|
||||
indexMeta = "meta"
|
||||
indexCounterOnly = "counter"
|
||||
)
|
||||
|
||||
// nodesTableSchema returns a new table schema used for storing struct.Node.
|
||||
|
@ -598,3 +602,62 @@ func (q NodeCheckQuery) NamespaceOrDefault() string {
|
|||
func (q NodeCheckQuery) PartitionOrDefault() string {
|
||||
return q.EnterpriseMeta.PartitionOrDefault()
|
||||
}
|
||||
|
||||
// ServiceVirtualIP is used to store a virtual IP associated with a service.
|
||||
// It is also used to store assigned virtual IPs when a snapshot is created.
|
||||
type ServiceVirtualIP struct {
|
||||
Service structs.ServiceName
|
||||
IP net.IP
|
||||
}
|
||||
|
||||
// FreeVirtualIP is used to store a virtual IP freed up by a service deregistration.
|
||||
// It is also used to store free virtual IPs when a snapshot is created.
|
||||
type FreeVirtualIP struct {
|
||||
IP net.IP
|
||||
IsCounter bool
|
||||
}
|
||||
|
||||
func serviceVirtualIPTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: tableServiceVirtualIPs,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
indexID: {
|
||||
Name: indexID,
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &ServiceNameIndex{
|
||||
Field: "Service",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func freeVirtualIPTableSchema() *memdb.TableSchema {
|
||||
return &memdb.TableSchema{
|
||||
Name: tableFreeVirtualIPs,
|
||||
Indexes: map[string]*memdb.IndexSchema{
|
||||
indexID: {
|
||||
Name: indexID,
|
||||
AllowMissing: false,
|
||||
Unique: true,
|
||||
Indexer: &memdb.StringFieldIndex{
|
||||
Field: "IP",
|
||||
},
|
||||
},
|
||||
indexCounterOnly: {
|
||||
Name: indexCounterOnly,
|
||||
AllowMissing: false,
|
||||
Unique: false,
|
||||
Indexer: &memdb.ConditionalIndex{
|
||||
Conditional: func(obj interface{}) (bool, error) {
|
||||
if vip, ok := obj.(FreeVirtualIP); ok {
|
||||
return vip.IsCounter, nil
|
||||
}
|
||||
return false, fmt.Errorf("object is not a virtual IP entry")
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1548,6 +1548,142 @@ func TestStateStore_EnsureService_connectProxy(t *testing.T) {
|
|||
assert.Equal(&expect1, out.Services["connect-proxy"])
|
||||
}
|
||||
|
||||
func TestStateStore_EnsureService_virtualIps(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
s := testStateStore(t)
|
||||
require.NoError(t, s.SystemMetadataSet(0, &structs.SystemMetadataEntry{
|
||||
Key: structs.SystemMetadataVirtualIPsEnabled,
|
||||
Value: "true",
|
||||
}))
|
||||
|
||||
// Create the service registration.
|
||||
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
|
||||
ns1 := &structs.NodeService{
|
||||
ID: "foo",
|
||||
Service: "foo",
|
||||
Address: "1.1.1.1",
|
||||
Port: 1111,
|
||||
Weights: &structs.Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
Connect: structs.ServiceConnect{Native: true},
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
|
||||
// Service successfully registers into the state store.
|
||||
testRegisterNode(t, s, 0, "node1")
|
||||
require.NoError(t, s.EnsureService(10, "node1", ns1))
|
||||
|
||||
// Make sure there's a virtual IP for the foo service.
|
||||
vip, err := s.VirtualIPForService(structs.ServiceName{Name: "foo"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal("240.0.0.1", vip)
|
||||
|
||||
// Retrieve and verify
|
||||
_, out, err := s.NodeServices(nil, "node1", nil)
|
||||
require.NoError(t, err)
|
||||
assert.NotNil(out)
|
||||
assert.Len(out.Services, 1)
|
||||
|
||||
taggedAddress := out.Services["foo"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||
assert.Equal(vip, taggedAddress.Address)
|
||||
assert.Equal(ns1.Port, taggedAddress.Port)
|
||||
|
||||
// Create the service registration.
|
||||
ns2 := &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "redis-proxy",
|
||||
Service: "redis-proxy",
|
||||
Address: "2.2.2.2",
|
||||
Port: 2222,
|
||||
Weights: &structs.Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "redis"},
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
require.NoError(t, s.EnsureService(11, "node1", ns2))
|
||||
|
||||
// Make sure the virtual IP has been incremented for the redis service.
|
||||
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal("240.0.0.2", vip)
|
||||
|
||||
// Retrieve and verify
|
||||
_, out, err = s.NodeServices(nil, "node1", nil)
|
||||
assert.Nil(err)
|
||||
assert.NotNil(out)
|
||||
assert.Len(out.Services, 2)
|
||||
|
||||
taggedAddress = out.Services["redis-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||
assert.Equal(vip, taggedAddress.Address)
|
||||
assert.Equal(ns2.Port, taggedAddress.Port)
|
||||
|
||||
// Delete the first service and make sure it no longer has a virtual IP assigned.
|
||||
require.NoError(t, s.DeleteService(12, "node1", "foo", entMeta))
|
||||
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "connect-proxy"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal("", vip)
|
||||
|
||||
// Register another instance of redis-proxy and make sure the virtual IP is unchanged.
|
||||
ns3 := &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "redis-proxy2",
|
||||
Service: "redis-proxy",
|
||||
Address: "3.3.3.3",
|
||||
Port: 3333,
|
||||
Weights: &structs.Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "redis"},
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
require.NoError(t, s.EnsureService(13, "node1", ns3))
|
||||
|
||||
// Make sure the virtual IP is unchanged for the redis service.
|
||||
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "redis"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal("240.0.0.2", vip)
|
||||
|
||||
// Make sure the new instance has the same virtual IP.
|
||||
_, out, err = s.NodeServices(nil, "node1", nil)
|
||||
require.NoError(t, err)
|
||||
taggedAddress = out.Services["redis-proxy2"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||
assert.Equal(vip, taggedAddress.Address)
|
||||
assert.Equal(ns3.Port, taggedAddress.Port)
|
||||
|
||||
// Register another service to take its virtual IP.
|
||||
ns4 := &structs.NodeService{
|
||||
Kind: structs.ServiceKindConnectProxy,
|
||||
ID: "web-proxy",
|
||||
Service: "web-proxy",
|
||||
Address: "4.4.4.4",
|
||||
Port: 4444,
|
||||
Weights: &structs.Weights{
|
||||
Passing: 1,
|
||||
Warning: 1,
|
||||
},
|
||||
Proxy: structs.ConnectProxyConfig{DestinationServiceName: "web"},
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
require.NoError(t, s.EnsureService(14, "node1", ns4))
|
||||
|
||||
// Make sure the virtual IP has allocated from the previously freed service.
|
||||
vip, err = s.VirtualIPForService(structs.ServiceName{Name: "web"})
|
||||
require.NoError(t, err)
|
||||
assert.Equal("240.0.0.1", vip)
|
||||
|
||||
// Retrieve and verify
|
||||
_, out, err = s.NodeServices(nil, "node1", nil)
|
||||
require.NoError(t, err)
|
||||
taggedAddress = out.Services["web-proxy"].TaggedAddresses[structs.TaggedAddressVirtualIP]
|
||||
assert.Equal(vip, taggedAddress.Address)
|
||||
assert.Equal(ns4.Port, taggedAddress.Port)
|
||||
}
|
||||
|
||||
func TestStateStore_Services(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -32,12 +32,14 @@ func newDBSchema() *memdb.DBSchema {
|
|||
preparedQueriesTableSchema,
|
||||
rolesTableSchema,
|
||||
servicesTableSchema,
|
||||
serviceVirtualIPTableSchema,
|
||||
sessionChecksTableSchema,
|
||||
sessionsTableSchema,
|
||||
systemMetadataTableSchema,
|
||||
tokensTableSchema,
|
||||
tombstonesTableSchema,
|
||||
usageTableSchema,
|
||||
freeVirtualIPTableSchema,
|
||||
)
|
||||
withEnterpriseSchema(db)
|
||||
return db
|
||||
|
|
|
@ -43,12 +43,13 @@ func TestNewDBSchema_Indexers(t *testing.T) {
|
|||
tableACLRoles: testIndexerTableACLRoles,
|
||||
tableACLTokens: testIndexerTableACLTokens,
|
||||
// catalog
|
||||
tableChecks: testIndexerTableChecks,
|
||||
tableServices: testIndexerTableServices,
|
||||
tableNodes: testIndexerTableNodes,
|
||||
tableCoordinates: testIndexerTableCoordinates,
|
||||
tableMeshTopology: testIndexerTableMeshTopology,
|
||||
tableGatewayServices: testIndexerTableGatewayServices,
|
||||
tableChecks: testIndexerTableChecks,
|
||||
tableServices: testIndexerTableServices,
|
||||
tableNodes: testIndexerTableNodes,
|
||||
tableCoordinates: testIndexerTableCoordinates,
|
||||
tableMeshTopology: testIndexerTableMeshTopology,
|
||||
tableGatewayServices: testIndexerTableGatewayServices,
|
||||
tableServiceVirtualIPs: testIndexerTableServiceVirtualIPs,
|
||||
// KV
|
||||
tableKVs: testIndexerTableKVs,
|
||||
tableTombstones: testIndexerTableTombstones,
|
||||
|
|
|
@ -32,7 +32,7 @@ func TestLeader_SystemMetadata_CRUD(t *testing.T) {
|
|||
|
||||
state := srv.fsm.State()
|
||||
|
||||
// Initially empty
|
||||
// Initially has no entries
|
||||
_, entries, err := state.SystemMetadataList(nil)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, entries, 0)
|
||||
|
|
|
@ -1082,6 +1082,19 @@ func (l *State) updateSyncState() error {
|
|||
ls.Service.Tags = make([]string, len(rs.Tags))
|
||||
copy(ls.Service.Tags, rs.Tags)
|
||||
}
|
||||
|
||||
// Merge any tagged addresses with the consul- prefix (set by the server)
|
||||
// back into the local state.
|
||||
if !reflect.DeepEqual(ls.Service.TaggedAddresses, rs.TaggedAddresses) {
|
||||
if ls.Service.TaggedAddresses == nil {
|
||||
ls.Service.TaggedAddresses = make(map[string]structs.ServiceAddress)
|
||||
}
|
||||
for k, v := range rs.TaggedAddresses {
|
||||
if strings.HasPrefix(k, structs.MetaKeyReservedPrefix) {
|
||||
ls.Service.TaggedAddresses[k] = v
|
||||
}
|
||||
}
|
||||
}
|
||||
ls.InSync = ls.Service.IsSame(rs)
|
||||
}
|
||||
|
||||
|
|
|
@ -374,8 +374,17 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
assert.Len(services.NodeServices.Services, 5)
|
||||
|
||||
// All the services should match
|
||||
vips := make(map[string]struct{})
|
||||
srv1.TaggedAddresses = nil
|
||||
srv2.TaggedAddresses = nil
|
||||
for id, serv := range services.NodeServices.Services {
|
||||
serv.CreateIndex, serv.ModifyIndex = 0, 0
|
||||
if serv.TaggedAddresses != nil {
|
||||
serviceVIP := serv.TaggedAddresses[structs.TaggedAddressVirtualIP].Address
|
||||
assert.NotEmpty(serviceVIP)
|
||||
vips[serviceVIP] = struct{}{}
|
||||
}
|
||||
serv.TaggedAddresses = nil
|
||||
switch id {
|
||||
case "mysql-proxy":
|
||||
assert.Equal(srv1, serv)
|
||||
|
@ -392,6 +401,7 @@ func TestAgentAntiEntropy_Services_ConnectProxy(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
assert.Len(vips, 4)
|
||||
assert.Nil(servicesInSync(a.State, 4, structs.DefaultEnterpriseMetaInDefaultPartition()))
|
||||
|
||||
// Remove one of the services
|
||||
|
|
|
@ -70,6 +70,8 @@ const (
|
|||
ChunkingStateType = 29
|
||||
FederationStateRequestType = 30
|
||||
SystemMetadataRequestType = 31
|
||||
ServiceVirtualIPRequestType = 32
|
||||
FreeVirtualIPRequestType = 33
|
||||
)
|
||||
|
||||
// if a new request type is added above it must be
|
||||
|
@ -110,6 +112,8 @@ var requestTypeStrings = map[MessageType]string{
|
|||
ChunkingStateType: "ChunkingState",
|
||||
FederationStateRequestType: "FederationState",
|
||||
SystemMetadataRequestType: "SystemMetadata",
|
||||
ServiceVirtualIPRequestType: "ServiceVirtualIP",
|
||||
FreeVirtualIPRequestType: "FreeVirtualIP",
|
||||
}
|
||||
|
||||
const (
|
||||
|
@ -127,7 +131,7 @@ const (
|
|||
ServiceMaintPrefix = "_service_maintenance:"
|
||||
|
||||
// The meta key prefix reserved for Consul's internal use
|
||||
metaKeyReservedPrefix = "consul-"
|
||||
MetaKeyReservedPrefix = "consul-"
|
||||
|
||||
// metaMaxKeyPairs is maximum number of metadata key pairs allowed to be registered
|
||||
metaMaxKeyPairs = 64
|
||||
|
@ -148,6 +152,9 @@ const (
|
|||
// MetaExternalSource is the metadata key used when a resource is managed by a source outside Consul like nomad/k8s
|
||||
MetaExternalSource = "external-source"
|
||||
|
||||
// TaggedAddressVirtualIP is the key used to store tagged virtual IPs generated by Consul.
|
||||
TaggedAddressVirtualIP = "consul-virtual"
|
||||
|
||||
// MaxLockDelay provides a maximum LockDelay value for
|
||||
// a session. Any value above this will not be respected.
|
||||
MaxLockDelay = 60 * time.Second
|
||||
|
@ -847,9 +854,9 @@ func validateMetaPair(key, value string, allowConsulPrefix bool, allowedConsulKe
|
|||
if len(key) > metaKeyMaxLength {
|
||||
return fmt.Errorf("Key is too long (limit: %d characters)", metaKeyMaxLength)
|
||||
}
|
||||
if strings.HasPrefix(key, metaKeyReservedPrefix) {
|
||||
if strings.HasPrefix(key, MetaKeyReservedPrefix) {
|
||||
if _, ok := allowedConsulKeys[key]; !allowConsulPrefix && !ok {
|
||||
return fmt.Errorf("Key prefix '%s' is reserved for internal use", metaKeyReservedPrefix)
|
||||
return fmt.Errorf("Key prefix '%s' is reserved for internal use", MetaKeyReservedPrefix)
|
||||
}
|
||||
}
|
||||
if len(value) > metaValueMaxLength {
|
||||
|
|
|
@ -1561,7 +1561,7 @@ func TestStructs_ValidateServiceAndNodeMetadata(t *testing.T) {
|
|||
},
|
||||
"reserved key prefix denied": {
|
||||
map[string]string{
|
||||
metaKeyReservedPrefix + "key": "value1",
|
||||
MetaKeyReservedPrefix + "key": "value1",
|
||||
},
|
||||
false,
|
||||
"reserved for internal use",
|
||||
|
@ -1570,7 +1570,7 @@ func TestStructs_ValidateServiceAndNodeMetadata(t *testing.T) {
|
|||
},
|
||||
"reserved key prefix allowed": {
|
||||
map[string]string{
|
||||
metaKeyReservedPrefix + "key": "value1",
|
||||
MetaKeyReservedPrefix + "key": "value1",
|
||||
},
|
||||
true,
|
||||
"",
|
||||
|
@ -1640,13 +1640,13 @@ func TestStructs_validateMetaPair(t *testing.T) {
|
|||
// key too long
|
||||
{longKey, "value", "Key is too long", false, nil},
|
||||
// reserved prefix
|
||||
{metaKeyReservedPrefix + "key", "value", "reserved for internal use", false, nil},
|
||||
{MetaKeyReservedPrefix + "key", "value", "reserved for internal use", false, nil},
|
||||
// reserved prefix, allowed
|
||||
{metaKeyReservedPrefix + "key", "value", "", true, nil},
|
||||
{MetaKeyReservedPrefix + "key", "value", "", true, nil},
|
||||
// reserved prefix, not allowed via an allowlist
|
||||
{metaKeyReservedPrefix + "bad", "value", "reserved for internal use", false, map[string]struct{}{metaKeyReservedPrefix + "good": {}}},
|
||||
{MetaKeyReservedPrefix + "bad", "value", "reserved for internal use", false, map[string]struct{}{MetaKeyReservedPrefix + "good": {}}},
|
||||
// reserved prefix, allowed via an allowlist
|
||||
{metaKeyReservedPrefix + "good", "value", "", true, map[string]struct{}{metaKeyReservedPrefix + "good": {}}},
|
||||
{MetaKeyReservedPrefix + "good", "value", "", true, map[string]struct{}{MetaKeyReservedPrefix + "good": {}}},
|
||||
// value too long
|
||||
{"key", longValue, "Value is too long", false, nil},
|
||||
}
|
||||
|
|
|
@ -28,6 +28,7 @@ const (
|
|||
SystemMetadataIntentionFormatKey = "intention-format"
|
||||
SystemMetadataIntentionFormatConfigValue = "config-entry"
|
||||
SystemMetadataIntentionFormatLegacyValue = "legacy"
|
||||
SystemMetadataVirtualIPsEnabled = "virtual-ips"
|
||||
)
|
||||
|
||||
type SystemMetadataEntry struct {
|
||||
|
|
Loading…
Reference in New Issue