Add manual virtual IP support to state store (#16815)

This commit is contained in:
Kyle Havlovitz 2023-04-21 09:19:02 -07:00 committed by GitHub
parent b19359ce2b
commit d5277af70d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 218 additions and 2 deletions

View File

@ -146,6 +146,7 @@ func init() {
registerCommand(structs.PeeringTrustBundleDeleteType, (*FSM).applyPeeringTrustBundleDelete) registerCommand(structs.PeeringTrustBundleDeleteType, (*FSM).applyPeeringTrustBundleDelete)
registerCommand(structs.PeeringSecretsWriteType, (*FSM).applyPeeringSecretsWrite) registerCommand(structs.PeeringSecretsWriteType, (*FSM).applyPeeringSecretsWrite)
registerCommand(structs.ResourceOperationType, (*FSM).applyResourceOperation) registerCommand(structs.ResourceOperationType, (*FSM).applyResourceOperation)
registerCommand(structs.UpdateVirtualIPRequestType, (*FSM).applyManualVirtualIPs)
} }
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -786,3 +787,16 @@ func (c *FSM) applyPeeringTrustBundleDelete(buf []byte, index uint64) interface{
func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any { func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any {
return f.deps.StorageBackend.Apply(buf, idx) return f.deps.StorageBackend.Apply(buf, idx)
} }
func (c *FSM) applyManualVirtualIPs(buf []byte, index uint64) interface{} {
var req state.ServiceVirtualIP
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
if err := c.state.AssignManualVirtualIPs(index, req.Service, req.ManualIPs); err != nil {
c.logger.Warn("AssignManualVirtualIPs failed", "error", err)
return err
}
return nil
}

View File

@ -1090,6 +1090,69 @@ func assignServiceVirtualIP(tx WriteTxn, idx uint64, psn structs.PeeredServiceNa
return result.String(), nil return result.String(), nil
} }
func (s *Store) AssignManualVirtualIPs(idx uint64, psn structs.PeeredServiceName, ips []string) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
// First remove the given IPs from any existing services, to avoid duplicate assignments.
assignedIPs := map[string]struct{}{}
for _, ip := range ips {
assignedIPs[ip] = struct{}{}
}
for ip := range assignedIPs {
entry, err := tx.First(tableServiceVirtualIPs, indexManualVIPs, psn.ServiceName.PartitionOrDefault(), ip)
if err != nil {
return fmt.Errorf("failed service virtual IP lookup: %s", err)
}
if entry == nil {
continue
}
newEntry := entry.(ServiceVirtualIP)
if newEntry.Service.ServiceName.Matches(psn.ServiceName) {
continue
}
// Rebuild this entry's list of manual IPs, removing any that are present
// in new list we're assigning.
var filteredIPs []string
for _, existingIP := range newEntry.ManualIPs {
if _, ok := assignedIPs[existingIP]; !ok {
filteredIPs = append(filteredIPs, existingIP)
}
}
newEntry.ManualIPs = filteredIPs
newEntry.ModifyIndex = idx
if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
}
entry, err := tx.First(tableServiceVirtualIPs, indexID, psn)
if err != nil {
return fmt.Errorf("failed service virtual IP lookup: %s", err)
}
if entry == nil {
return nil
}
newEntry := entry.(ServiceVirtualIP)
newEntry.ManualIPs = ips
newEntry.ModifyIndex = idx
if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil {
return fmt.Errorf("failed inserting service virtual IP entry: %s", err)
}
if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil {
return err
}
return tx.Commit()
}
func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error { func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error {
// update per-partition max index // update per-partition max index
if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil { if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil {
@ -2976,6 +3039,22 @@ func (s *Store) VirtualIPForService(psn structs.PeeredServiceName) (string, erro
return result.String(), nil return result.String(), nil
} }
func (s *Store) ServiceManualVIPs(psn structs.PeeredServiceName) (*ServiceVirtualIP, error) {
tx := s.db.Txn(false)
defer tx.Abort()
vip, err := tx.First(tableServiceVirtualIPs, indexID, psn)
if err != nil {
return nil, fmt.Errorf("failed service virtual IP lookup: %s", err)
}
if vip == nil {
return nil, nil
}
entry := vip.(ServiceVirtualIP)
return &entry, nil
}
// VirtualIPsForAllImportedServices returns a slice of ServiceVirtualIP for all // VirtualIPsForAllImportedServices returns a slice of ServiceVirtualIP for all
// VirtualIP-assignable services that have been imported by the partition represented in entMeta. // VirtualIP-assignable services that have been imported by the partition represented in entMeta.
// Namespace is ignored. // Namespace is ignored.

View File

@ -39,6 +39,7 @@ const (
indexUUID = "uuid" indexUUID = "uuid"
indexMeta = "meta" indexMeta = "meta"
indexCounterOnly = "counter" indexCounterOnly = "counter"
indexManualVIPs = "manual-vips"
) )
// nodesTableSchema returns a new table schema used for storing struct.Node. // nodesTableSchema returns a new table schema used for storing struct.Node.
@ -608,8 +609,9 @@ func (q NodeCheckQuery) PartitionOrDefault() string {
// ServiceVirtualIP is used to store a virtual IP associated with a service. // ServiceVirtualIP is used to store a virtual IP associated with a service.
// It is also used to store assigned virtual IPs when a snapshot is created. // It is also used to store assigned virtual IPs when a snapshot is created.
type ServiceVirtualIP struct { type ServiceVirtualIP struct {
Service structs.PeeredServiceName Service structs.PeeredServiceName
IP net.IP IP net.IP
ManualIPs []string
structs.RaftIndex structs.RaftIndex
} }
@ -628,6 +630,33 @@ func counterIndex(obj interface{}) (bool, error) {
return false, fmt.Errorf("object is not a virtual IP entry") return false, fmt.Errorf("object is not a virtual IP entry")
} }
type ServiceManualVIPIndex struct{}
func (index *ServiceManualVIPIndex) FromObject(obj interface{}) (bool, []byte, error) {
entry, ok := obj.(ServiceVirtualIP)
if !ok {
return false, nil, fmt.Errorf("object is not a ServiceVirtualIP")
}
// Enforce lowercase and add null character as terminator
id := strings.ToLower(entry.Service.ServiceName.PartitionOrDefault()) + "\x00"
return true, []byte(id), nil
}
func (index *ServiceManualVIPIndex) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
id := strings.ToLower(arg) + "\x00"
return []byte(id), nil
}
func serviceVirtualIPTableSchema() *memdb.TableSchema { func serviceVirtualIPTableSchema() *memdb.TableSchema {
return &memdb.TableSchema{ return &memdb.TableSchema{
Name: tableServiceVirtualIPs, Name: tableServiceVirtualIPs,
@ -643,6 +672,19 @@ func serviceVirtualIPTableSchema() *memdb.TableSchema {
prefixIndex: prefixIndexFromQueryWithPeerWildcardable, prefixIndex: prefixIndexFromQueryWithPeerWildcardable,
}, },
}, },
indexManualVIPs: {
Name: indexManualVIPs,
AllowMissing: true,
Unique: false,
Indexer: &memdb.CompoundMultiIndex{
Indexes: []memdb.Indexer{
&ServiceManualVIPIndex{},
&memdb.StringSliceFieldIndex{
Field: "ManualIPs",
},
},
},
},
}, },
} }
} }

View File

@ -1959,6 +1959,85 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) {
assert.Equal(t, ns5.Port, taggedAddress.Port) assert.Equal(t, ns5.Port, taggedAddress.Port)
} }
func TestStateStore_AssignManualVirtualIPs(t *testing.T) {
s := testStateStore(t)
setVirtualIPFlags(t, s)
// Attempt to assign manual virtual IPs to a service that doesn't exist - should be a no-op.
psn := structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "foo"}}
require.NoError(t, s.AssignManualVirtualIPs(0, psn, []string{"7.7.7.7", "8.8.8.8"}))
serviceVIP, err := s.ServiceManualVIPs(psn)
require.NoError(t, err)
require.Nil(t, serviceVIP)
// Create the service registration.
entMeta := structs.DefaultEnterpriseMetaInDefaultPartition()
ns1 := &structs.NodeService{
ID: "foo",
Service: "foo",
Address: "1.1.1.1",
Port: 1111,
Connect: structs.ServiceConnect{Native: true},
EnterpriseMeta: *entMeta,
}
// Service successfully registers into the state store.
testRegisterNode(t, s, 0, "node1")
require.NoError(t, s.EnsureService(1, "node1", ns1))
// Make sure there's a virtual IP for the foo service.
vip, err := s.VirtualIPForService(psn)
require.NoError(t, err)
assert.Equal(t, "240.0.0.1", vip)
// No manual IP should be set yet.
serviceVIP, err = s.ServiceManualVIPs(psn)
require.NoError(t, err)
require.Equal(t, "0.0.0.1", serviceVIP.IP.String())
require.Empty(t, serviceVIP.ManualIPs)
// Attempt to assign manual virtual IPs again.
require.NoError(t, s.AssignManualVirtualIPs(2, psn, []string{"7.7.7.7", "8.8.8.8"}))
serviceVIP, err = s.ServiceManualVIPs(psn)
require.NoError(t, err)
require.Equal(t, "0.0.0.1", serviceVIP.IP.String())
require.Equal(t, serviceVIP.ManualIPs, []string{"7.7.7.7", "8.8.8.8"})
// Register another service
ns2 := &structs.NodeService{
ID: "bar",
Service: "bar",
Address: "2.2.2.2",
Port: 2222,
Connect: structs.ServiceConnect{Native: true},
EnterpriseMeta: *entMeta,
}
// Service successfully registers into the state store.
require.NoError(t, s.EnsureService(3, "node1", ns2))
psn2 := structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "bar"}}
vip, err = s.VirtualIPForService(psn2)
require.NoError(t, err)
assert.Equal(t, "240.0.0.2", vip)
// Attempt to assign manual virtual IPs for bar, with one IP overlapping with foo.
// This should cause the ip to be removed from foo's list of manual IPs.
require.NoError(t, s.AssignManualVirtualIPs(4, psn2, []string{"7.7.7.7", "9.9.9.9"}))
serviceVIP, err = s.ServiceManualVIPs(psn)
require.NoError(t, err)
require.Equal(t, "0.0.0.1", serviceVIP.IP.String())
require.Equal(t, []string{"8.8.8.8"}, serviceVIP.ManualIPs)
require.Equal(t, uint64(4), serviceVIP.ModifyIndex)
serviceVIP, err = s.ServiceManualVIPs(psn2)
require.NoError(t, err)
require.Equal(t, "0.0.0.2", serviceVIP.IP.String())
require.Equal(t, []string{"7.7.7.7", "9.9.9.9"}, serviceVIP.ManualIPs)
require.Equal(t, uint64(4), serviceVIP.ModifyIndex)
}
func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) {
s := testStateStore(t) s := testStateStore(t)
setVirtualIPFlags(t, s) setVirtualIPFlags(t, s)

View File

@ -88,6 +88,7 @@ const (
PeeringSecretsWriteType = 40 PeeringSecretsWriteType = 40
RaftLogVerifierCheckpoint = 41 // Only used for log verifier, no-op on FSM. RaftLogVerifierCheckpoint = 41 // Only used for log verifier, no-op on FSM.
ResourceOperationType = 42 ResourceOperationType = 42
UpdateVirtualIPRequestType = 43
) )
const ( const (
@ -156,6 +157,7 @@ var requestTypeStrings = map[MessageType]string{
PeeringSecretsWriteType: "PeeringSecret", PeeringSecretsWriteType: "PeeringSecret",
RaftLogVerifierCheckpoint: "RaftLogVerifierCheckpoint", RaftLogVerifierCheckpoint: "RaftLogVerifierCheckpoint",
ResourceOperationType: "Resource", ResourceOperationType: "Resource",
UpdateVirtualIPRequestType: "UpdateManualVirtualIPRequestType",
} }
const ( const (