diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index 57128b4950..71a42f0e25 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -18,15 +18,9 @@ import ( "github.com/hashicorp/consul/types" ) -const ( - servicesTableName = "services" - gatewayServicesTableName = "gateway-services" - topologyTableName = "mesh-topology" - - // serviceLastExtinctionIndexName keeps track of the last raft index when the last instance - // of any service was unregistered. This is used by blocking queries on missing services. - serviceLastExtinctionIndexName = "service_last_extinction" -) +// indexServiceExtinction keeps track of the last raft index when the last instance +// of any service was unregistered. This is used by blocking queries on missing services. +const indexServiceExtinction = "service_last_extinction" const ( // minUUIDLookupLen is used as a minimum length of a node name required before @@ -2087,7 +2081,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru if err != nil { return 0, nil, err } - idx := maxIndexTxn(tx, gatewayServicesTableName) + idx := maxIndexTxn(tx, tableGatewayServices) return lib.MaxUint64(maxIdx, idx), results, nil } @@ -2361,7 +2355,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en // Delete all associated with gateway first, to avoid keeping mappings that were removed sn := structs.NewServiceName(conf.GetName(), entMeta) - if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil { + if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil { @@ -2389,7 +2383,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en } } - if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { + if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil { return fmt.Errorf("failed updating gateway-services index: %v", err) } return nil @@ -2499,7 +2493,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer continue } - existing, err := tx.First(gatewayServicesTableName, "id", service.Gateway, sn.CompoundServiceName(), service.Port) + existing, err := tx.First(tableGatewayServices, "id", service.Gateway, sn.CompoundServiceName(), service.Port) if err != nil { return fmt.Errorf("gateway service lookup failed: %s", err) } @@ -2534,7 +2528,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error { // Check if mapping already exists in table if it's already in the table // Avoid insert if nothing changed - existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port) + existing, err := tx.First(tableGatewayServices, "id", mapping.Gateway, mapping.Service, mapping.Port) if err != nil { return fmt.Errorf("gateway service lookup failed: %s", err) } @@ -2549,11 +2543,11 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi } mapping.ModifyIndex = idx - if err := tx.Insert(gatewayServicesTableName, mapping); err != nil { + if err := tx.Insert(tableGatewayServices, mapping); err != nil { return fmt.Errorf("failed inserting gateway service mapping: %s", err) } - if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { + if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil { return fmt.Errorf("failed updating gateway-services index: %v", err) } @@ -2613,10 +2607,10 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) // Otherwise the service was specified in the config entry, and the association should be maintained // for when the service is re-registered if m.FromWildcard { - if err := tx.Delete(gatewayServicesTableName, m); err != nil { + if err := tx.Delete(tableGatewayServices, m); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } - if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { + if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil { return fmt.Errorf("failed updating gateway-services index: %v", err) } if err := deleteGatewayServiceTopologyMapping(tx, idx, m); err != nil { @@ -2630,18 +2624,18 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode) // serviceGateways returns all GatewayService entries with the given service name. This effectively looks up // all the gateways mapped to this service. func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta)) + return tx.Get(tableGatewayServices, "service", structs.NewServiceName(name, entMeta)) } func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) { - return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta)) + return tx.Get(tableGatewayServices, "gateway", structs.NewServiceName(name, entMeta)) } func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayServices, error) { tx := s.db.ReadTxn() defer tx.Abort() - iter, err := tx.Get(gatewayServicesTableName, "id") + iter, err := tx.Get(tableGatewayServices, "id") if err != nil { return 0, nil, fmt.Errorf("failed to dump gateway-services: %s", err) } @@ -2651,7 +2645,7 @@ func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayS if err != nil { return 0, nil, err } - idx := maxIndexTxn(tx, gatewayServicesTableName) + idx := maxIndexTxn(tx, tableGatewayServices) return lib.MaxUint64(maxIdx, idx), results, nil } @@ -2968,9 +2962,9 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se index = "upstream" } - iter, err := tx.Get(topologyTableName, index, service) + iter, err := tx.Get(tableMeshTopology, index, service) if err != nil { - return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err) + return 0, nil, fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err) } ws.Add(iter.WatchCh()) @@ -2993,7 +2987,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se // TODO (freddy) This needs a tombstone to avoid the index sliding back on mapping deletion // Using the table index here means that blocking queries will wake up more often than they should - tableIdx := maxIndexTxn(tx, topologyTableName) + tableIdx := maxIndexTxn(tx, tableMeshTopology) if tableIdx > idx { idx = tableIdx } @@ -3024,9 +3018,9 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace) upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta) - obj, err := tx.First(topologyTableName, "id", upstream, downstream) + obj, err := tx.First(tableMeshTopology, "id", upstream, downstream) if err != nil { - return fmt.Errorf("%q lookup failed: %v", topologyTableName, err) + return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err) } sid := svc.CompoundServiceID() uid := structs.UniqueID(node, sid.String()) @@ -3057,22 +3051,22 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS }, } } - if err := tx.Insert(topologyTableName, mapping); err != nil { - return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err) + if err := tx.Insert(tableMeshTopology, mapping); err != nil { + return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err) } - if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { - return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err) } inserted[upstream] = true } for u := range oldUpstreams { if !inserted[u] { - if _, err := tx.DeleteAll(topologyTableName, "id", u, downstream); err != nil { - return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + if _, err := tx.DeleteAll(tableMeshTopology, "id", u, downstream); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } - if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { - return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err) } } } @@ -3090,9 +3084,9 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) sid := service.CompoundServiceID() uid := structs.UniqueID(service.Node, sid.String()) - iter, err := tx.Get(topologyTableName, "downstream", sn) + iter, err := tx.Get(tableMeshTopology, "downstream", sn) if err != nil { - return fmt.Errorf("%q lookup failed: %v", topologyTableName, err) + return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err) } mappings := make([]*structs.UpstreamDownstream, 0) @@ -3118,17 +3112,17 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode) delete(copy.Refs, uid) if len(copy.Refs) == 0 { - if err := tx.Delete(topologyTableName, m); err != nil { - return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + if err := tx.Delete(tableMeshTopology, m); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } - if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { - return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err) } continue } - if err := tx.Insert(topologyTableName, copy); err != nil { - return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err) + if err := tx.Insert(tableMeshTopology, copy); err != nil { + return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err) } } return nil @@ -3145,11 +3139,11 @@ func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga Downstream: gs.Gateway, RaftIndex: gs.RaftIndex, } - if err := tx.Insert(topologyTableName, &mapping); err != nil { - return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err) + if err := tx.Insert(tableMeshTopology, &mapping); err != nil { + return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err) } - if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { - return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err) } return nil @@ -3161,11 +3155,11 @@ func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga return nil } - if _, err := tx.DeleteAll(topologyTableName, "id", gs.Service, gs.Gateway); err != nil { - return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + if _, err := tx.DeleteAll(tableMeshTopology, "id", gs.Service, gs.Gateway); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } - if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { - return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err) } return nil @@ -3177,11 +3171,11 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str return nil } - if _, err := tx.DeleteAll(topologyTableName, "downstream", gateway); err != nil { - return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + if _, err := tx.DeleteAll(tableMeshTopology, "downstream", gateway); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } - if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { - return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err) } return nil diff --git a/agent/consul/state/catalog_oss.go b/agent/consul/state/catalog_oss.go index ed92e34df9..4ca5b40d25 100644 --- a/agent/consul/state/catalog_oss.go +++ b/agent/consul/state/catalog_oss.go @@ -54,7 +54,7 @@ func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _ } func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error { - if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{indexServiceExtinction, idx}); err != nil { return fmt.Errorf("failed updating missing service extinction index: %s", err) } return nil @@ -110,7 +110,7 @@ func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.En } func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) { - return tx.First("index", "id", serviceLastExtinctionIndexName) + return tx.First("index", "id", indexServiceExtinction) } func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 { diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 91f7be358d..1c69cd6fa9 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -11,9 +11,11 @@ import ( ) const ( - tableNodes = "nodes" - tableServices = "services" - tableChecks = "checks" + tableNodes = "nodes" + tableServices = "services" + tableChecks = "checks" + tableGatewayServices = "gateway-services" + tableMeshTopology = "mesh-topology" indexID = "id" indexServiceName = "service" @@ -205,11 +207,11 @@ func checksTableSchema() *memdb.TableSchema { } } -// gatewayServicesTableNameSchema returns a new table schema used to store information +// gatewayServicesTableSchema returns a new table schema used to store information // about services associated with terminating gateways. -func gatewayServicesTableNameSchema() *memdb.TableSchema { +func gatewayServicesTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: gatewayServicesTableName, + Name: tableGatewayServices, Indexes: map[string]*memdb.IndexSchema{ indexID: { Name: indexID, @@ -249,11 +251,11 @@ func gatewayServicesTableNameSchema() *memdb.TableSchema { } } -// topologyTableNameSchema returns a new table schema used to store information +// meshTopologyTableSchema returns a new table schema used to store information // relating upstream and downstream services -func topologyTableNameSchema() *memdb.TableSchema { +func meshTopologyTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: topologyTableName, + Name: tableMeshTopology, Indexes: map[string]*memdb.IndexSchema{ indexID: { Name: indexID, @@ -350,6 +352,6 @@ func init() { registerSchema(nodesTableSchema) registerSchema(servicesTableSchema) registerSchema(checksTableSchema) - registerSchema(gatewayServicesTableNameSchema) - registerSchema(topologyTableNameSchema) + registerSchema(gatewayServicesTableSchema) + registerSchema(meshTopologyTableSchema) } diff --git a/agent/consul/state/config_entry.go b/agent/consul/state/config_entry.go index 3f8d54e9ff..7a6845896f 100644 --- a/agent/consul/state/config_entry.go +++ b/agent/consul/state/config_entry.go @@ -273,20 +273,20 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *s sn := structs.NewServiceName(name, entMeta) if kind == structs.TerminatingGateway || kind == structs.IngressGateway { - if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil { + if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil { return fmt.Errorf("failed to truncate gateway services table: %v", err) } - if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil { + if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil { return fmt.Errorf("failed updating gateway-services index: %v", err) } } // Also clean up associations in the mesh topology table for ingress gateways if kind == structs.IngressGateway { - if _, err := tx.DeleteAll(topologyTableName, "downstream", sn); err != nil { - return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err) + if _, err := tx.DeleteAll(tableMeshTopology, "downstream", sn); err != nil { + return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err) } - if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil { - return fmt.Errorf("failed updating %s index: %v", topologyTableName, err) + if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil { + return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err) } } diff --git a/agent/consul/state/connect_ca.go b/agent/consul/state/connect_ca.go index 7a467e75d9..aa5134fb1f 100644 --- a/agent/consul/state/connect_ca.go +++ b/agent/consul/state/connect_ca.go @@ -3,16 +3,17 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) const ( - caBuiltinProviderTableName = "connect-ca-builtin" - caBuiltinProviderSerialNumber = "connect-ca-builtin-serial" - caConfigTableName = "connect-ca-config" - caRootTableName = "connect-ca-roots" - caLeafIndexName = "connect-ca-leaf-certs" + tableConnectCABuiltin = "connect-ca-builtin" + tableConnectCABuiltinSerial = "connect-ca-builtin-serial" + tableConnectCAConfig = "connect-ca-config" + tableConnectCARoots = "connect-ca-roots" + tableConnectCALeafCerts = "connect-ca-leaf-certs" ) // caBuiltinProviderTableSchema returns a new table schema used for storing @@ -20,7 +21,7 @@ const ( // the internal Consul CA provider. func caBuiltinProviderTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: caBuiltinProviderTableName, + Name: tableConnectCABuiltin, Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", @@ -38,7 +39,7 @@ func caBuiltinProviderTableSchema() *memdb.TableSchema { // the CA config for Connect. func caConfigTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: caConfigTableName, + Name: tableConnectCAConfig, Indexes: map[string]*memdb.IndexSchema{ // This table only stores one row, so this just ignores the ID field // and always overwrites the same config object. @@ -58,7 +59,7 @@ func caConfigTableSchema() *memdb.TableSchema { // CA roots for Connect. func caRootTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: caRootTableName, + Name: tableConnectCARoots, Indexes: map[string]*memdb.IndexSchema{ "id": { Name: "id", @@ -80,7 +81,7 @@ func init() { // CAConfig is used to pull the CA config from the snapshot. func (s *Snapshot) CAConfig() (*structs.CAConfiguration, error) { - c, err := s.tx.First(caConfigTableName, "id") + c, err := s.tx.First(tableConnectCAConfig, "id") if err != nil { return nil, err } @@ -101,7 +102,7 @@ func (s *Restore) CAConfig(config *structs.CAConfiguration) error { return nil } - if err := s.tx.Insert(caConfigTableName, config); err != nil { + if err := s.tx.Insert(tableConnectCAConfig, config); err != nil { return fmt.Errorf("failed restoring CA config: %s", err) } @@ -118,7 +119,7 @@ func (s *Store) CAConfig(ws memdb.WatchSet) (uint64, *structs.CAConfiguration, e func caConfigTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, *structs.CAConfiguration, error) { // Get the CA config - ch, c, err := tx.FirstWatch(caConfigTableName, "id") + ch, c, err := tx.FirstWatch(tableConnectCAConfig, "id") if err != nil { return 0, nil, fmt.Errorf("failed CA config lookup: %s", err) } @@ -153,7 +154,7 @@ func (s *Store) CACheckAndSetConfig(idx, cidx uint64, config *structs.CAConfigur defer tx.Abort() // Check for an existing config - existing, err := tx.First(caConfigTableName, "id") + existing, err := tx.First(tableConnectCAConfig, "id") if err != nil { return false, fmt.Errorf("failed CA config lookup: %s", err) } @@ -176,7 +177,7 @@ func (s *Store) CACheckAndSetConfig(idx, cidx uint64, config *structs.CAConfigur func (s *Store) caSetConfigTxn(idx uint64, tx *txn, config *structs.CAConfiguration) error { // Check for an existing config - prev, err := tx.First(caConfigTableName, "id") + prev, err := tx.First(tableConnectCAConfig, "id") if err != nil { return fmt.Errorf("failed CA config lookup: %s", err) } @@ -194,7 +195,7 @@ func (s *Store) caSetConfigTxn(idx uint64, tx *txn, config *structs.CAConfigurat } config.ModifyIndex = idx - if err := tx.Insert(caConfigTableName, config); err != nil { + if err := tx.Insert(tableConnectCAConfig, config); err != nil { return fmt.Errorf("failed updating CA config: %s", err) } return nil @@ -202,7 +203,7 @@ func (s *Store) caSetConfigTxn(idx uint64, tx *txn, config *structs.CAConfigurat // CARoots is used to pull all the CA roots for the snapshot. func (s *Snapshot) CARoots() (structs.CARoots, error) { - ixns, err := s.tx.Get(caRootTableName, "id") + ixns, err := s.tx.Get(tableConnectCARoots, "id") if err != nil { return nil, err } @@ -218,10 +219,10 @@ func (s *Snapshot) CARoots() (structs.CARoots, error) { // CARoots is used when restoring from a snapshot. func (s *Restore) CARoot(r *structs.CARoot) error { // Insert - if err := s.tx.Insert(caRootTableName, r); err != nil { + if err := s.tx.Insert(tableConnectCARoots, r); err != nil { return fmt.Errorf("failed restoring CA root: %s", err) } - if err := indexUpdateMaxTxn(s.tx, r.ModifyIndex, caRootTableName); err != nil { + if err := indexUpdateMaxTxn(s.tx, r.ModifyIndex, tableConnectCARoots); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -238,10 +239,10 @@ func (s *Store) CARoots(ws memdb.WatchSet) (uint64, structs.CARoots, error) { func caRootsTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, structs.CARoots, error) { // Get the index - idx := maxIndexTxn(tx, caRootTableName) + idx := maxIndexTxn(tx, tableConnectCARoots) // Get all - iter, err := tx.Get(caRootTableName, "id") + iter, err := tx.Get(tableConnectCARoots, "id") if err != nil { return 0, nil, fmt.Errorf("failed CA root lookup: %s", err) } @@ -293,7 +294,7 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro } // Get the current max index - if midx := maxIndexTxn(tx, caRootTableName); midx != cidx { + if midx := maxIndexTxn(tx, tableConnectCARoots); midx != cidx { return false, nil } @@ -304,7 +305,7 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro return false, ErrMissingCARootID } - existing, err := tx.First(caRootTableName, "id", r.ID) + existing, err := tx.First(tableConnectCARoots, "id", r.ID) if err != nil { return false, fmt.Errorf("failed CA root lookup: %s", err) } @@ -318,20 +319,20 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro } // Delete all - _, err := tx.DeleteAll(caRootTableName, "id") + _, err := tx.DeleteAll(tableConnectCARoots, "id") if err != nil { return false, err } // Insert all for _, r := range rs { - if err := tx.Insert(caRootTableName, r); err != nil { + if err := tx.Insert(tableConnectCARoots, r); err != nil { return false, err } } // Update the index - if err := tx.Insert("index", &IndexEntry{caRootTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableConnectCARoots, idx}); err != nil { return false, fmt.Errorf("failed updating index: %s", err) } @@ -341,7 +342,7 @@ func (s *Store) CARootSetCAS(idx, cidx uint64, rs []*structs.CARoot) (bool, erro // CAProviderState is used to pull the built-in provider states from the snapshot. func (s *Snapshot) CAProviderState() ([]*structs.CAConsulProviderState, error) { - ixns, err := s.tx.Get(caBuiltinProviderTableName, "id") + ixns, err := s.tx.Get(tableConnectCABuiltin, "id") if err != nil { return nil, err } @@ -356,10 +357,10 @@ func (s *Snapshot) CAProviderState() ([]*structs.CAConsulProviderState, error) { // CAProviderState is used when restoring from a snapshot. func (s *Restore) CAProviderState(state *structs.CAConsulProviderState) error { - if err := s.tx.Insert(caBuiltinProviderTableName, state); err != nil { + if err := s.tx.Insert(tableConnectCABuiltin, state); err != nil { return fmt.Errorf("failed restoring built-in CA state: %s", err) } - if err := indexUpdateMaxTxn(s.tx, state.ModifyIndex, caBuiltinProviderTableName); err != nil { + if err := indexUpdateMaxTxn(s.tx, state.ModifyIndex, tableConnectCABuiltin); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -372,10 +373,10 @@ func (s *Store) CAProviderState(id string) (uint64, *structs.CAConsulProviderSta defer tx.Abort() // Get the index - idx := maxIndexTxn(tx, caBuiltinProviderTableName) + idx := maxIndexTxn(tx, tableConnectCABuiltin) // Get the provider config - c, err := tx.First(caBuiltinProviderTableName, "id", id) + c, err := tx.First(tableConnectCABuiltin, "id", id) if err != nil { return 0, nil, fmt.Errorf("failed built-in CA state lookup: %s", err) } @@ -394,7 +395,7 @@ func (s *Store) CASetProviderState(idx uint64, state *structs.CAConsulProviderSt defer tx.Abort() // Check for an existing config - existing, err := tx.First(caBuiltinProviderTableName, "id", state.ID) + existing, err := tx.First(tableConnectCABuiltin, "id", state.ID) if err != nil { return false, fmt.Errorf("failed built-in CA state lookup: %s", err) } @@ -407,12 +408,12 @@ func (s *Store) CASetProviderState(idx uint64, state *structs.CAConsulProviderSt } state.ModifyIndex = idx - if err := tx.Insert(caBuiltinProviderTableName, state); err != nil { + if err := tx.Insert(tableConnectCABuiltin, state); err != nil { return false, fmt.Errorf("failed updating built-in CA state: %s", err) } // Update the index - if err := tx.Insert("index", &IndexEntry{caBuiltinProviderTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableConnectCABuiltin, idx}); err != nil { return false, fmt.Errorf("failed updating index: %s", err) } @@ -427,7 +428,7 @@ func (s *Store) CADeleteProviderState(idx uint64, id string) error { defer tx.Abort() // Check for an existing config - existing, err := tx.First(caBuiltinProviderTableName, "id", id) + existing, err := tx.First(tableConnectCABuiltin, "id", id) if err != nil { return fmt.Errorf("failed built-in CA state lookup: %s", err) } @@ -438,10 +439,10 @@ func (s *Store) CADeleteProviderState(idx uint64, id string) error { providerState := existing.(*structs.CAConsulProviderState) // Do the delete and update the index - if err := tx.Delete(caBuiltinProviderTableName, providerState); err != nil { + if err := tx.Delete(tableConnectCABuiltin, providerState); err != nil { return err } - if err := tx.Insert("index", &IndexEntry{caBuiltinProviderTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableConnectCABuiltin, idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -452,7 +453,7 @@ func (s *Store) CALeafSetIndex(idx uint64, index uint64) error { tx := s.db.WriteTxn(idx) defer tx.Abort() - return indexUpdateMaxTxn(tx, index, caLeafIndexName) + return indexUpdateMaxTxn(tx, index, tableConnectCALeafCerts) } func (s *Store) CARootsAndConfig(ws memdb.WatchSet) (uint64, structs.CARoots, *structs.CAConfiguration, error) { @@ -481,7 +482,7 @@ func (s *Store) CAIncrementProviderSerialNumber(idx uint64) (uint64, error) { tx := s.db.WriteTxn(idx) defer tx.Abort() - existing, err := tx.First("index", "id", caBuiltinProviderSerialNumber) + existing, err := tx.First("index", "id", tableConnectCABuiltinSerial) if err != nil { return 0, fmt.Errorf("failed built-in CA serial number lookup: %s", err) } @@ -492,11 +493,11 @@ func (s *Store) CAIncrementProviderSerialNumber(idx uint64) (uint64, error) { } else { // Serials used to be based on the raft indexes in the provider table, // so bootstrap off of that. - last = maxIndexTxn(tx, caBuiltinProviderTableName) + last = maxIndexTxn(tx, tableConnectCABuiltin) } next := last + 1 - if err := tx.Insert("index", &IndexEntry{caBuiltinProviderSerialNumber, next}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableConnectCABuiltinSerial, next}); err != nil { return 0, fmt.Errorf("failed updating index: %s", err) } diff --git a/agent/consul/state/connect_ca_test.go b/agent/consul/state/connect_ca_test.go index df14b49de1..c3509f404a 100644 --- a/agent/consul/state/connect_ca_test.go +++ b/agent/consul/state/connect_ca_test.go @@ -5,11 +5,12 @@ import ( "testing" "time" - "github.com/hashicorp/consul/agent/connect" - "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/go-memdb" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/agent/connect" + "github.com/hashicorp/consul/agent/structs" ) func TestStore_CAConfig(t *testing.T) { @@ -201,7 +202,7 @@ func TestStore_CARootSetList(t *testing.T) { assert.True(ok) // Make sure the index got updated. - assert.Equal(s.maxIndex(caRootTableName), uint64(1)) + assert.Equal(s.maxIndex(tableConnectCARoots), uint64(1)) assert.True(watchFired(ws), "watch fired") // Read it back out and verify it. @@ -239,7 +240,7 @@ func TestStore_CARootSet_emptyID(t *testing.T) { assert.False(ok) // Make sure the index got updated. - assert.Equal(s.maxIndex(caRootTableName), uint64(0)) + assert.Equal(s.maxIndex(tableConnectCARoots), uint64(0)) assert.False(watchFired(ws), "watch fired") // Read it back out and verify it. diff --git a/agent/consul/state/federation_state.go b/agent/consul/state/federation_state.go index 6854267d66..5c93500730 100644 --- a/agent/consul/state/federation_state.go +++ b/agent/consul/state/federation_state.go @@ -3,18 +3,19 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) -const federationStateTableName = "federation-states" +const tableFederationStates = "federation-states" func federationStateTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: federationStateTableName, + Name: tableFederationStates, Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", + indexID: { + Name: indexID, AllowMissing: false, Unique: true, Indexer: &memdb.StringFieldIndex{ @@ -32,7 +33,7 @@ func init() { // FederationStates is used to pull all the federation states for the snapshot. func (s *Snapshot) FederationStates() ([]*structs.FederationState, error) { - configs, err := s.tx.Get(federationStateTableName, "id") + configs, err := s.tx.Get(tableFederationStates, "id") if err != nil { return nil, err } @@ -48,10 +49,10 @@ func (s *Snapshot) FederationStates() ([]*structs.FederationState, error) { // FederationState is used when restoring from a snapshot. func (s *Restore) FederationState(g *structs.FederationState) error { // Insert - if err := s.tx.Insert(federationStateTableName, g); err != nil { + if err := s.tx.Insert(tableFederationStates, g); err != nil { return fmt.Errorf("failed restoring federation state object: %s", err) } - if err := indexUpdateMaxTxn(s.tx, g.ModifyIndex, federationStateTableName); err != nil { + if err := indexUpdateMaxTxn(s.tx, g.ModifyIndex, tableFederationStates); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -91,7 +92,7 @@ func federationStateSetTxn(tx *txn, idx uint64, config *structs.FederationState) // Check for existing. var existing *structs.FederationState - existingRaw, err := tx.First(federationStateTableName, "id", config.Datacenter) + existingRaw, err := tx.First(tableFederationStates, "id", config.Datacenter) if err != nil { return fmt.Errorf("failed federation state lookup: %s", err) } @@ -117,10 +118,10 @@ func federationStateSetTxn(tx *txn, idx uint64, config *structs.FederationState) } // Insert the federation state and update the index - if err := tx.Insert(federationStateTableName, config); err != nil { + if err := tx.Insert(tableFederationStates, config); err != nil { return fmt.Errorf("failed inserting federation state: %s", err) } - if err := tx.Insert("index", &IndexEntry{federationStateTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableFederationStates, idx}); err != nil { return fmt.Errorf("failed updating index: %v", err) } @@ -136,10 +137,10 @@ func (s *Store) FederationStateGet(ws memdb.WatchSet, datacenter string) (uint64 func federationStateGetTxn(tx ReadTxn, ws memdb.WatchSet, datacenter string) (uint64, *structs.FederationState, error) { // Get the index - idx := maxIndexTxn(tx, federationStateTableName) + idx := maxIndexTxn(tx, tableFederationStates) // Get the existing contents. - watchCh, existing, err := tx.FirstWatch(federationStateTableName, "id", datacenter) + watchCh, existing, err := tx.FirstWatch(tableFederationStates, "id", datacenter) if err != nil { return 0, nil, fmt.Errorf("failed federation state lookup: %s", err) } @@ -166,9 +167,9 @@ func (s *Store) FederationStateList(ws memdb.WatchSet) (uint64, []*structs.Feder func federationStateListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.FederationState, error) { // Get the index - idx := maxIndexTxn(tx, federationStateTableName) + idx := maxIndexTxn(tx, tableFederationStates) - iter, err := tx.Get(federationStateTableName, "id") + iter, err := tx.Get(tableFederationStates, "id") if err != nil { return 0, nil, fmt.Errorf("failed federation state lookup: %s", err) } @@ -207,7 +208,7 @@ func (s *Store) FederationStateBatchDelete(idx uint64, datacenters []string) err func federationStateDeleteTxn(tx *txn, idx uint64, datacenter string) error { // Try to retrieve the existing federation state. - existing, err := tx.First(federationStateTableName, "id", datacenter) + existing, err := tx.First(tableFederationStates, "id", datacenter) if err != nil { return fmt.Errorf("failed federation state lookup: %s", err) } @@ -216,10 +217,10 @@ func federationStateDeleteTxn(tx *txn, idx uint64, datacenter string) error { } // Delete the federation state from the DB and update the index. - if err := tx.Delete(federationStateTableName, existing); err != nil { + if err := tx.Delete(tableFederationStates, existing); err != nil { return fmt.Errorf("failed removing federation state: %s", err) } - if err := tx.Insert("index", &IndexEntry{federationStateTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableFederationStates, idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } return nil diff --git a/agent/consul/state/intention.go b/agent/consul/state/intention.go index 9cee39acf1..2a7f2c275f 100644 --- a/agent/consul/state/intention.go +++ b/agent/consul/state/intention.go @@ -12,18 +12,16 @@ import ( "github.com/hashicorp/consul/agent/structs" ) -const ( - intentionsTableName = "connect-intentions" -) +const tableConnectIntentions = "connect-intentions" // intentionsTableSchema returns a new table schema used for storing // intentions for Connect. func intentionsTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: intentionsTableName, + Name: tableConnectIntentions, Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", + indexID: { + Name: indexID, AllowMissing: false, Unique: true, Indexer: &memdb.UUIDFieldIndex{ @@ -106,7 +104,7 @@ func init() { // Deprecated: service-intentions config entries are handled as config entries // in the snapshot. func (s *Snapshot) LegacyIntentions() (structs.Intentions, error) { - ixns, err := s.tx.Get(intentionsTableName, "id") + ixns, err := s.tx.Get(tableConnectIntentions, "id") if err != nil { return nil, err } @@ -125,10 +123,10 @@ func (s *Snapshot) LegacyIntentions() (structs.Intentions, error) { // in the snapshot. func (s *Restore) LegacyIntention(ixn *structs.Intention) error { // Insert the intention - if err := s.tx.Insert(intentionsTableName, ixn); err != nil { + if err := s.tx.Insert(tableConnectIntentions, ixn); err != nil { return fmt.Errorf("failed restoring intention: %s", err) } - if err := indexUpdateMaxTxn(s.tx, ixn.ModifyIndex, intentionsTableName); err != nil { + if err := indexUpdateMaxTxn(s.tx, ixn.ModifyIndex, tableConnectIntentions); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -181,7 +179,7 @@ func (s *Store) Intentions(ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) ( func (s *Store) legacyIntentionsListTxn(tx ReadTxn, ws memdb.WatchSet, entMeta *structs.EnterpriseMeta) (uint64, structs.Intentions, bool, error) { // Get the index - idx := maxIndexTxn(tx, intentionsTableName) + idx := maxIndexTxn(tx, tableConnectIntentions) if idx < 1 { idx = 1 } @@ -530,7 +528,7 @@ func legacyIntentionSetTxn(tx WriteTxn, idx uint64, ixn *structs.Intention) erro ixn.UpdatePrecedence() // Check for an existing intention - existing, err := tx.First(intentionsTableName, "id", ixn.ID) + existing, err := tx.First(tableConnectIntentions, "id", ixn.ID) if err != nil { return fmt.Errorf("failed intention lookup: %s", err) } @@ -544,7 +542,7 @@ func legacyIntentionSetTxn(tx WriteTxn, idx uint64, ixn *structs.Intention) erro ixn.ModifyIndex = idx // Check for duplicates on the 4-tuple. - duplicate, err := tx.First(intentionsTableName, "source_destination", + duplicate, err := tx.First(tableConnectIntentions, "source_destination", ixn.SourceNS, ixn.SourceName, ixn.DestinationNS, ixn.DestinationName) if err != nil { return fmt.Errorf("failed intention lookup: %s", err) @@ -564,10 +562,10 @@ func legacyIntentionSetTxn(tx WriteTxn, idx uint64, ixn *structs.Intention) erro } // Insert - if err := tx.Insert(intentionsTableName, ixn); err != nil { + if err := tx.Insert(tableConnectIntentions, ixn); err != nil { return err } - if err := tx.Insert("index", &IndexEntry{intentionsTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableConnectIntentions, idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -592,13 +590,13 @@ func (s *Store) IntentionGet(ws memdb.WatchSet, id string) (uint64, *structs.Ser func (s *Store) legacyIntentionGetTxn(tx ReadTxn, ws memdb.WatchSet, id string) (uint64, *structs.Intention, error) { // Get the table index. - idx := maxIndexTxn(tx, intentionsTableName) + idx := maxIndexTxn(tx, tableConnectIntentions) if idx < 1 { idx = 1 } // Look up by its ID. - watchCh, intention, err := tx.FirstWatch(intentionsTableName, "id", id) + watchCh, intention, err := tx.FirstWatch(tableConnectIntentions, "id", id) if err != nil { return 0, nil, fmt.Errorf("failed intention lookup: %s", err) } @@ -635,13 +633,13 @@ func (s *Store) legacyIntentionGetExactTxn(tx ReadTxn, ws memdb.WatchSet, args * } // Get the table index. - idx := maxIndexTxn(tx, intentionsTableName) + idx := maxIndexTxn(tx, tableConnectIntentions) if idx < 1 { idx = 1 } // Look up by its full name. - watchCh, intention, err := tx.FirstWatch(intentionsTableName, "source_destination", + watchCh, intention, err := tx.FirstWatch(tableConnectIntentions, "source_destination", args.SourceNS, args.SourceName, args.DestinationNS, args.DestinationName) if err != nil { return 0, nil, fmt.Errorf("failed intention lookup: %s", err) @@ -683,7 +681,7 @@ func (s *Store) LegacyIntentionDelete(idx uint64, id string) error { // with the proper indexes into the state store. func legacyIntentionDeleteTxn(tx WriteTxn, idx uint64, queryID string) error { // Pull the query. - wrapped, err := tx.First(intentionsTableName, "id", queryID) + wrapped, err := tx.First(tableConnectIntentions, "id", queryID) if err != nil { return fmt.Errorf("failed intention lookup: %s", err) } @@ -692,10 +690,10 @@ func legacyIntentionDeleteTxn(tx WriteTxn, idx uint64, queryID string) error { } // Delete the query and update the index. - if err := tx.Delete(intentionsTableName, wrapped); err != nil { + if err := tx.Delete(tableConnectIntentions, wrapped); err != nil { return fmt.Errorf("failed intention delete: %s", err) } - if err := tx.Insert("index", &IndexEntry{intentionsTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableConnectIntentions, idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -709,10 +707,10 @@ func (s *Store) LegacyIntentionDeleteAll(idx uint64) error { defer tx.Abort() // Delete the table and update the index. - if _, err := tx.DeleteAll(intentionsTableName, "id"); err != nil { + if _, err := tx.DeleteAll(tableConnectIntentions, "id"); err != nil { return fmt.Errorf("failed intention delete-all: %s", err) } - if err := tx.Insert("index", &IndexEntry{intentionsTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableConnectIntentions, idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } // Also bump the index for the config entry table so that @@ -822,7 +820,7 @@ func (s *Store) IntentionMatch(ws memdb.WatchSet, args *structs.IntentionQueryMa func (s *Store) legacyIntentionMatchTxn(tx ReadTxn, ws memdb.WatchSet, args *structs.IntentionQueryMatch) (uint64, []structs.Intentions, error) { // Get the table index. - idx := maxIndexTxn(tx, intentionsTableName) + idx := maxIndexTxn(tx, tableConnectIntentions) if idx < 1 { idx = 1 } @@ -876,7 +874,7 @@ func legacyIntentionMatchOneTxn( matchType structs.IntentionMatchType, ) (uint64, structs.Intentions, error) { // Get the table index. - idx := maxIndexTxn(tx, intentionsTableName) + idx := maxIndexTxn(tx, tableConnectIntentions) if idx < 1 { idx = 1 } @@ -907,7 +905,7 @@ func intentionMatchOneTxn(tx ReadTxn, ws memdb.WatchSet, // Perform each call and accumulate the result. var result structs.Intentions for _, params := range getParams { - iter, err := tx.Get(intentionsTableName, string(matchType), params...) + iter, err := tx.Get(tableConnectIntentions, string(matchType), params...) if err != nil { return nil, fmt.Errorf("failed intention lookup: %s", err) } diff --git a/agent/consul/state/intention_oss.go b/agent/consul/state/intention_oss.go index dcdadfa437..a06949d427 100644 --- a/agent/consul/state/intention_oss.go +++ b/agent/consul/state/intention_oss.go @@ -3,11 +3,12 @@ package state import ( - "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) func intentionListTxn(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) { // Get all intentions - return tx.Get(intentionsTableName, "id") + return tx.Get(tableConnectIntentions, "id") } diff --git a/agent/consul/state/intention_test.go b/agent/consul/state/intention_test.go index ac40f4c91c..7da6c9a178 100644 --- a/agent/consul/state/intention_test.go +++ b/agent/consul/state/intention_test.go @@ -90,7 +90,7 @@ func TestStore_IntentionSetGet_basic(t *testing.T) { require.NoError(t, s.LegacyIntentionSet(lastIndex, legacyIxn)) // Make sure the right index got updated. - require.Equal(t, lastIndex, s.maxIndex(intentionsTableName)) + require.Equal(t, lastIndex, s.maxIndex(tableConnectIntentions)) require.Equal(t, uint64(0), s.maxIndex(tableConfigEntries)) expected = &structs.Intention{ @@ -133,7 +133,7 @@ func TestStore_IntentionSetGet_basic(t *testing.T) { // Make sure the config entry index got updated instead of the old intentions one require.Equal(t, lastIndex, s.maxIndex(tableConfigEntries)) - require.Equal(t, uint64(0), s.maxIndex(intentionsTableName)) + require.Equal(t, uint64(0), s.maxIndex(tableConnectIntentions)) expected = &structs.Intention{ ID: srcID, @@ -178,7 +178,7 @@ func TestStore_IntentionSetGet_basic(t *testing.T) { require.NoError(t, s.LegacyIntentionSet(lastIndex, legacyIxn)) // Make sure the index got updated. - require.Equal(t, lastIndex, s.maxIndex(intentionsTableName)) + require.Equal(t, lastIndex, s.maxIndex(tableConnectIntentions)) require.Equal(t, uint64(0), s.maxIndex(tableConfigEntries)) expected.SourceNS = legacyIxn.SourceNS @@ -203,7 +203,7 @@ func TestStore_IntentionSetGet_basic(t *testing.T) { // Make sure the config entry index got updated instead of the old intentions one require.Equal(t, lastIndex, s.maxIndex(tableConfigEntries)) - require.Equal(t, uint64(0), s.maxIndex(intentionsTableName)) + require.Equal(t, uint64(0), s.maxIndex(tableConnectIntentions)) expected.Description = configEntry.Sources[0].Description expected.Action = structs.IntentionActionDeny @@ -240,7 +240,7 @@ func TestStore_IntentionSetGet_basic(t *testing.T) { require.Error(t, s.LegacyIntentionSet(lastIndex, legacyIxn)) // Make sure the index did NOT get updated. - require.Equal(t, lastIndex-1, s.maxIndex(intentionsTableName)) + require.Equal(t, lastIndex-1, s.maxIndex(tableConnectIntentions)) require.Equal(t, uint64(0), s.maxIndex(tableConfigEntries)) require.False(t, watchFired(ws), "watch not fired") } @@ -815,7 +815,7 @@ func TestStore_LegacyIntentionSet_emptyId(t *testing.T) { require.Contains(t, err.Error(), ErrMissingIntentionID.Error()) // Index is not updated if nothing is saved. - require.Equal(t, s.maxIndex(intentionsTableName), uint64(0)) + require.Equal(t, s.maxIndex(tableConnectIntentions), uint64(0)) require.Equal(t, uint64(0), s.maxIndex(tableConfigEntries)) require.False(t, watchFired(ws), "watch fired") @@ -1005,7 +1005,7 @@ func TestStore_IntentionDelete(t *testing.T) { require.NoError(t, s.LegacyIntentionSet(lastIndex, ixn)) // Make sure the index got updated. - require.Equal(t, s.maxIndex(intentionsTableName), lastIndex) + require.Equal(t, s.maxIndex(tableConnectIntentions), lastIndex) require.Equal(t, uint64(0), s.maxIndex(tableConfigEntries)) } else { conf := &structs.ServiceIntentionsConfigEntry{ @@ -1029,7 +1029,7 @@ func TestStore_IntentionDelete(t *testing.T) { // Make sure the index got updated. require.Equal(t, s.maxIndex(tableConfigEntries), lastIndex) - require.Equal(t, uint64(0), s.maxIndex(intentionsTableName)) + require.Equal(t, uint64(0), s.maxIndex(tableConnectIntentions)) } require.True(t, watchFired(ws), "watch fired") @@ -1045,7 +1045,7 @@ func TestStore_IntentionDelete(t *testing.T) { require.NoError(t, s.LegacyIntentionDelete(lastIndex, id)) // Make sure the index got updated. - require.Equal(t, s.maxIndex(intentionsTableName), lastIndex) + require.Equal(t, s.maxIndex(tableConnectIntentions), lastIndex) require.Equal(t, uint64(0), s.maxIndex(tableConfigEntries)) } else { lastIndex++ @@ -1053,7 +1053,7 @@ func TestStore_IntentionDelete(t *testing.T) { // Make sure the index got updated. require.Equal(t, s.maxIndex(tableConfigEntries), lastIndex) - require.Equal(t, uint64(0), s.maxIndex(intentionsTableName)) + require.Equal(t, uint64(0), s.maxIndex(tableConnectIntentions)) } require.True(t, watchFired(ws), "watch fired") diff --git a/agent/consul/state/system_metadata.go b/agent/consul/state/system_metadata.go index 52f9f43a3a..0655758a60 100644 --- a/agent/consul/state/system_metadata.go +++ b/agent/consul/state/system_metadata.go @@ -3,18 +3,19 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) -const systemMetadataTableName = "system-metadata" +const tableSystemMetadata = "system-metadata" func systemMetadataTableSchema() *memdb.TableSchema { return &memdb.TableSchema{ - Name: systemMetadataTableName, + Name: tableSystemMetadata, Indexes: map[string]*memdb.IndexSchema{ - "id": { - Name: "id", + indexID: { + Name: indexID, AllowMissing: false, Unique: true, Indexer: &memdb.StringFieldIndex{ @@ -31,7 +32,7 @@ func init() { // SystemMetadataEntries used to pull all the system metadata entries for the snapshot. func (s *Snapshot) SystemMetadataEntries() ([]*structs.SystemMetadataEntry, error) { - entries, err := s.tx.Get(systemMetadataTableName, "id") + entries, err := s.tx.Get(tableSystemMetadata, "id") if err != nil { return nil, err } @@ -47,10 +48,10 @@ func (s *Snapshot) SystemMetadataEntries() ([]*structs.SystemMetadataEntry, erro // SystemMetadataEntry is used when restoring from a snapshot. func (s *Restore) SystemMetadataEntry(entry *structs.SystemMetadataEntry) error { // Insert - if err := s.tx.Insert(systemMetadataTableName, entry); err != nil { + if err := s.tx.Insert(tableSystemMetadata, entry); err != nil { return fmt.Errorf("failed restoring system metadata object: %s", err) } - if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, systemMetadataTableName); err != nil { + if err := indexUpdateMaxTxn(s.tx, entry.ModifyIndex, tableSystemMetadata); err != nil { return fmt.Errorf("failed updating index: %s", err) } @@ -78,7 +79,7 @@ func systemMetadataSetTxn(tx *txn, idx uint64, entry *structs.SystemMetadataEntr // Check for existing. var existing *structs.SystemMetadataEntry - existingRaw, err := tx.First(systemMetadataTableName, "id", entry.Key) + existingRaw, err := tx.First(tableSystemMetadata, "id", entry.Key) if err != nil { return fmt.Errorf("failed system metadata lookup: %s", err) } @@ -97,10 +98,10 @@ func systemMetadataSetTxn(tx *txn, idx uint64, entry *structs.SystemMetadataEntr } // Insert the system metadata and update the index - if err := tx.Insert(systemMetadataTableName, entry); err != nil { + if err := tx.Insert(tableSystemMetadata, entry); err != nil { return fmt.Errorf("failed inserting system metadata: %s", err) } - if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableSystemMetadata, idx}); err != nil { return fmt.Errorf("failed updating index: %v", err) } @@ -116,10 +117,10 @@ func (s *Store) SystemMetadataGet(ws memdb.WatchSet, key string) (uint64, *struc func systemMetadataGetTxn(tx ReadTxn, ws memdb.WatchSet, key string) (uint64, *structs.SystemMetadataEntry, error) { // Get the index - idx := maxIndexTxn(tx, systemMetadataTableName) + idx := maxIndexTxn(tx, tableSystemMetadata) // Get the existing contents. - watchCh, existing, err := tx.FirstWatch(systemMetadataTableName, "id", key) + watchCh, existing, err := tx.FirstWatch(tableSystemMetadata, "id", key) if err != nil { return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err) } @@ -146,9 +147,9 @@ func (s *Store) SystemMetadataList(ws memdb.WatchSet) (uint64, []*structs.System func systemMetadataListTxn(tx ReadTxn, ws memdb.WatchSet) (uint64, []*structs.SystemMetadataEntry, error) { // Get the index - idx := maxIndexTxn(tx, systemMetadataTableName) + idx := maxIndexTxn(tx, tableSystemMetadata) - iter, err := tx.Get(systemMetadataTableName, "id") + iter, err := tx.Get(tableSystemMetadata, "id") if err != nil { return 0, nil, fmt.Errorf("failed system metadata lookup: %s", err) } @@ -174,7 +175,7 @@ func (s *Store) SystemMetadataDelete(idx uint64, entry *structs.SystemMetadataEn func systemMetadataDeleteTxn(tx *txn, idx uint64, key string) error { // Try to retrieve the existing system metadata. - existing, err := tx.First(systemMetadataTableName, "id", key) + existing, err := tx.First(tableSystemMetadata, "id", key) if err != nil { return fmt.Errorf("failed system metadata lookup: %s", err) } @@ -183,10 +184,10 @@ func systemMetadataDeleteTxn(tx *txn, idx uint64, key string) error { } // Delete the system metadata from the DB and update the index. - if err := tx.Delete(systemMetadataTableName, existing); err != nil { + if err := tx.Delete(tableSystemMetadata, existing); err != nil { return fmt.Errorf("failed removing system metadata: %s", err) } - if err := tx.Insert("index", &IndexEntry{systemMetadataTableName, idx}); err != nil { + if err := tx.Insert("index", &IndexEntry{tableSystemMetadata, idx}); err != nil { return fmt.Errorf("failed updating index: %s", err) } return nil diff --git a/agent/consul/state/usage.go b/agent/consul/state/usage.go index bde67d127d..459de23fc4 100644 --- a/agent/consul/state/usage.go +++ b/agent/consul/state/usage.go @@ -3,8 +3,9 @@ package state import ( "fmt" - "github.com/hashicorp/consul/agent/structs" memdb "github.com/hashicorp/go-memdb" + + "github.com/hashicorp/consul/agent/structs" ) const ( @@ -101,7 +102,7 @@ func updateUsage(tx WriteTxn, changes Changes) error { // This will happen when restoring from a snapshot, just take the max index // of the tables we are tracking. if idx == 0 { - idx = maxIndexTxn(tx, "nodes", servicesTableName) + idx = maxIndexTxn(tx, "nodes", tableServices) } return writeUsageDeltas(tx, idx, usageDeltas) @@ -110,7 +111,7 @@ func updateUsage(tx WriteTxn, changes Changes) error { func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) { serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges)) for svc, delta := range serviceNameChanges { - serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta) + serviceIter, err := getWithTxn(tx, tableServices, "service", svc.Name, &svc.EnterpriseMeta) if err != nil { return nil, err } @@ -226,7 +227,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) { tx := s.db.ReadTxn() defer tx.Abort() - serviceInstances, err := firstUsageEntry(tx, servicesTableName) + serviceInstances, err := firstUsageEntry(tx, tableServices) if err != nil { return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err) }