From d5277af70dfe0c8456743ea0ed05b945fce366e3 Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Fri, 21 Apr 2023 09:19:02 -0700 Subject: [PATCH] Add manual virtual IP support to state store (#16815) --- agent/consul/fsm/commands_oss.go | 14 +++++ agent/consul/state/catalog.go | 79 ++++++++++++++++++++++++++++ agent/consul/state/catalog_schema.go | 46 +++++++++++++++- agent/consul/state/catalog_test.go | 79 ++++++++++++++++++++++++++++ agent/structs/structs.go | 2 + 5 files changed, 218 insertions(+), 2 deletions(-) diff --git a/agent/consul/fsm/commands_oss.go b/agent/consul/fsm/commands_oss.go index fdb3f96f31..6f0497c149 100644 --- a/agent/consul/fsm/commands_oss.go +++ b/agent/consul/fsm/commands_oss.go @@ -146,6 +146,7 @@ func init() { registerCommand(structs.PeeringTrustBundleDeleteType, (*FSM).applyPeeringTrustBundleDelete) registerCommand(structs.PeeringSecretsWriteType, (*FSM).applyPeeringSecretsWrite) registerCommand(structs.ResourceOperationType, (*FSM).applyResourceOperation) + registerCommand(structs.UpdateVirtualIPRequestType, (*FSM).applyManualVirtualIPs) } func (c *FSM) applyRegister(buf []byte, index uint64) interface{} { @@ -786,3 +787,16 @@ func (c *FSM) applyPeeringTrustBundleDelete(buf []byte, index uint64) interface{ func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any { return f.deps.StorageBackend.Apply(buf, idx) } + +func (c *FSM) applyManualVirtualIPs(buf []byte, index uint64) interface{} { + var req state.ServiceVirtualIP + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + if err := c.state.AssignManualVirtualIPs(index, req.Service, req.ManualIPs); err != nil { + c.logger.Warn("AssignManualVirtualIPs failed", "error", err) + return err + } + return nil +} diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index a19a97b11c..2e1a7099ae 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1090,6 +1090,69 @@ func assignServiceVirtualIP(tx WriteTxn, idx uint64, psn structs.PeeredServiceNa return result.String(), nil } +func (s *Store) AssignManualVirtualIPs(idx uint64, psn structs.PeeredServiceName, ips []string) error { + tx := s.db.WriteTxn(idx) + defer tx.Abort() + + // First remove the given IPs from any existing services, to avoid duplicate assignments. + assignedIPs := map[string]struct{}{} + for _, ip := range ips { + assignedIPs[ip] = struct{}{} + } + for ip := range assignedIPs { + entry, err := tx.First(tableServiceVirtualIPs, indexManualVIPs, psn.ServiceName.PartitionOrDefault(), ip) + if err != nil { + return fmt.Errorf("failed service virtual IP lookup: %s", err) + } + + if entry == nil { + continue + } + + newEntry := entry.(ServiceVirtualIP) + if newEntry.Service.ServiceName.Matches(psn.ServiceName) { + continue + } + + // Rebuild this entry's list of manual IPs, removing any that are present + // in new list we're assigning. + var filteredIPs []string + for _, existingIP := range newEntry.ManualIPs { + if _, ok := assignedIPs[existingIP]; !ok { + filteredIPs = append(filteredIPs, existingIP) + } + } + + newEntry.ManualIPs = filteredIPs + newEntry.ModifyIndex = idx + if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil { + return fmt.Errorf("failed inserting service virtual IP entry: %s", err) + } + } + + entry, err := tx.First(tableServiceVirtualIPs, indexID, psn) + if err != nil { + return fmt.Errorf("failed service virtual IP lookup: %s", err) + } + + if entry == nil { + return nil + } + + newEntry := entry.(ServiceVirtualIP) + newEntry.ManualIPs = ips + newEntry.ModifyIndex = idx + + if err := tx.Insert(tableServiceVirtualIPs, newEntry); err != nil { + return fmt.Errorf("failed inserting service virtual IP entry: %s", err) + } + if err := updateVirtualIPMaxIndexes(tx, idx, psn.ServiceName.PartitionOrDefault(), psn.Peer); err != nil { + return err + } + + return tx.Commit() +} + func updateVirtualIPMaxIndexes(txn WriteTxn, idx uint64, partition, peerName string) error { // update per-partition max index if err := indexUpdateMaxTxn(txn, idx, partitionedIndexEntryName(tableServiceVirtualIPs, partition)); err != nil { @@ -2976,6 +3039,22 @@ func (s *Store) VirtualIPForService(psn structs.PeeredServiceName) (string, erro return result.String(), nil } +func (s *Store) ServiceManualVIPs(psn structs.PeeredServiceName) (*ServiceVirtualIP, error) { + tx := s.db.Txn(false) + defer tx.Abort() + + vip, err := tx.First(tableServiceVirtualIPs, indexID, psn) + if err != nil { + return nil, fmt.Errorf("failed service virtual IP lookup: %s", err) + } + if vip == nil { + return nil, nil + } + + entry := vip.(ServiceVirtualIP) + return &entry, nil +} + // VirtualIPsForAllImportedServices returns a slice of ServiceVirtualIP for all // VirtualIP-assignable services that have been imported by the partition represented in entMeta. // Namespace is ignored. diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 6d88362357..c4f7bb8b03 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -39,6 +39,7 @@ const ( indexUUID = "uuid" indexMeta = "meta" indexCounterOnly = "counter" + indexManualVIPs = "manual-vips" ) // nodesTableSchema returns a new table schema used for storing struct.Node. @@ -608,8 +609,9 @@ func (q NodeCheckQuery) PartitionOrDefault() string { // ServiceVirtualIP is used to store a virtual IP associated with a service. // It is also used to store assigned virtual IPs when a snapshot is created. type ServiceVirtualIP struct { - Service structs.PeeredServiceName - IP net.IP + Service structs.PeeredServiceName + IP net.IP + ManualIPs []string structs.RaftIndex } @@ -628,6 +630,33 @@ func counterIndex(obj interface{}) (bool, error) { return false, fmt.Errorf("object is not a virtual IP entry") } +type ServiceManualVIPIndex struct{} + +func (index *ServiceManualVIPIndex) FromObject(obj interface{}) (bool, []byte, error) { + entry, ok := obj.(ServiceVirtualIP) + if !ok { + return false, nil, fmt.Errorf("object is not a ServiceVirtualIP") + } + + // Enforce lowercase and add null character as terminator + id := strings.ToLower(entry.Service.ServiceName.PartitionOrDefault()) + "\x00" + + return true, []byte(id), nil +} + +func (index *ServiceManualVIPIndex) FromArgs(args ...interface{}) ([]byte, error) { + if len(args) != 1 { + return nil, fmt.Errorf("must provide only a single argument") + } + arg, ok := args[0].(string) + if !ok { + return nil, fmt.Errorf("argument must be a string: %#v", args[0]) + } + + id := strings.ToLower(arg) + "\x00" + return []byte(id), nil +} + func serviceVirtualIPTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ Name: tableServiceVirtualIPs, @@ -643,6 +672,19 @@ func serviceVirtualIPTableSchema() *memdb.TableSchema { prefixIndex: prefixIndexFromQueryWithPeerWildcardable, }, }, + indexManualVIPs: { + Name: indexManualVIPs, + AllowMissing: true, + Unique: false, + Indexer: &memdb.CompoundMultiIndex{ + Indexes: []memdb.Indexer{ + &ServiceManualVIPIndex{}, + &memdb.StringSliceFieldIndex{ + Field: "ManualIPs", + }, + }, + }, + }, }, } } diff --git a/agent/consul/state/catalog_test.go b/agent/consul/state/catalog_test.go index 31e5fc9145..27d11f21c0 100644 --- a/agent/consul/state/catalog_test.go +++ b/agent/consul/state/catalog_test.go @@ -1959,6 +1959,85 @@ func TestStateStore_EnsureService_VirtualIPAssign(t *testing.T) { assert.Equal(t, ns5.Port, taggedAddress.Port) } +func TestStateStore_AssignManualVirtualIPs(t *testing.T) { + s := testStateStore(t) + setVirtualIPFlags(t, s) + + // Attempt to assign manual virtual IPs to a service that doesn't exist - should be a no-op. + psn := structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "foo"}} + require.NoError(t, s.AssignManualVirtualIPs(0, psn, []string{"7.7.7.7", "8.8.8.8"})) + serviceVIP, err := s.ServiceManualVIPs(psn) + require.NoError(t, err) + require.Nil(t, serviceVIP) + + // Create the service registration. + entMeta := structs.DefaultEnterpriseMetaInDefaultPartition() + ns1 := &structs.NodeService{ + ID: "foo", + Service: "foo", + Address: "1.1.1.1", + Port: 1111, + Connect: structs.ServiceConnect{Native: true}, + EnterpriseMeta: *entMeta, + } + + // Service successfully registers into the state store. + testRegisterNode(t, s, 0, "node1") + require.NoError(t, s.EnsureService(1, "node1", ns1)) + + // Make sure there's a virtual IP for the foo service. + vip, err := s.VirtualIPForService(psn) + require.NoError(t, err) + assert.Equal(t, "240.0.0.1", vip) + + // No manual IP should be set yet. + serviceVIP, err = s.ServiceManualVIPs(psn) + require.NoError(t, err) + require.Equal(t, "0.0.0.1", serviceVIP.IP.String()) + require.Empty(t, serviceVIP.ManualIPs) + + // Attempt to assign manual virtual IPs again. + require.NoError(t, s.AssignManualVirtualIPs(2, psn, []string{"7.7.7.7", "8.8.8.8"})) + serviceVIP, err = s.ServiceManualVIPs(psn) + require.NoError(t, err) + require.Equal(t, "0.0.0.1", serviceVIP.IP.String()) + require.Equal(t, serviceVIP.ManualIPs, []string{"7.7.7.7", "8.8.8.8"}) + + // Register another service + ns2 := &structs.NodeService{ + ID: "bar", + Service: "bar", + Address: "2.2.2.2", + Port: 2222, + Connect: structs.ServiceConnect{Native: true}, + EnterpriseMeta: *entMeta, + } + + // Service successfully registers into the state store. + require.NoError(t, s.EnsureService(3, "node1", ns2)) + + psn2 := structs.PeeredServiceName{ServiceName: structs.ServiceName{Name: "bar"}} + vip, err = s.VirtualIPForService(psn2) + require.NoError(t, err) + assert.Equal(t, "240.0.0.2", vip) + + // Attempt to assign manual virtual IPs for bar, with one IP overlapping with foo. + // This should cause the ip to be removed from foo's list of manual IPs. + require.NoError(t, s.AssignManualVirtualIPs(4, psn2, []string{"7.7.7.7", "9.9.9.9"})) + + serviceVIP, err = s.ServiceManualVIPs(psn) + require.NoError(t, err) + require.Equal(t, "0.0.0.1", serviceVIP.IP.String()) + require.Equal(t, []string{"8.8.8.8"}, serviceVIP.ManualIPs) + require.Equal(t, uint64(4), serviceVIP.ModifyIndex) + + serviceVIP, err = s.ServiceManualVIPs(psn2) + require.NoError(t, err) + require.Equal(t, "0.0.0.2", serviceVIP.IP.String()) + require.Equal(t, []string{"7.7.7.7", "9.9.9.9"}, serviceVIP.ManualIPs) + require.Equal(t, uint64(4), serviceVIP.ModifyIndex) +} + func TestStateStore_EnsureService_ReassignFreedVIPs(t *testing.T) { s := testStateStore(t) setVirtualIPFlags(t, s) diff --git a/agent/structs/structs.go b/agent/structs/structs.go index 3c7b09822f..ccde4f1947 100644 --- a/agent/structs/structs.go +++ b/agent/structs/structs.go @@ -88,6 +88,7 @@ const ( PeeringSecretsWriteType = 40 RaftLogVerifierCheckpoint = 41 // Only used for log verifier, no-op on FSM. ResourceOperationType = 42 + UpdateVirtualIPRequestType = 43 ) const ( @@ -156,6 +157,7 @@ var requestTypeStrings = map[MessageType]string{ PeeringSecretsWriteType: "PeeringSecret", RaftLogVerifierCheckpoint: "RaftLogVerifierCheckpoint", ResourceOperationType: "Resource", + UpdateVirtualIPRequestType: "UpdateManualVirtualIPRequestType", } const (