mirror of
https://github.com/status-im/consul.git
synced 2025-01-11 06:16:08 +00:00
state: rename table name constants to new pattern
Using Apps Hungarian Notation for these constants makes the memdb queries more readable.
This commit is contained in:
parent
5b4703f0e4
commit
0c34e474c5
@ -18,15 +18,9 @@ import (
|
|||||||
"github.com/hashicorp/consul/types"
|
"github.com/hashicorp/consul/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// indexServiceExtinction keeps track of the last raft index when the last instance
|
||||||
servicesTableName = "services"
|
|
||||||
gatewayServicesTableName = "gateway-services"
|
|
||||||
topologyTableName = "mesh-topology"
|
|
||||||
|
|
||||||
// serviceLastExtinctionIndexName keeps track of the last raft index when the last instance
|
|
||||||
// of any service was unregistered. This is used by blocking queries on missing services.
|
// of any service was unregistered. This is used by blocking queries on missing services.
|
||||||
serviceLastExtinctionIndexName = "service_last_extinction"
|
const indexServiceExtinction = "service_last_extinction"
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// minUUIDLookupLen is used as a minimum length of a node name required before
|
// minUUIDLookupLen is used as a minimum length of a node name required before
|
||||||
@ -2087,7 +2081,7 @@ func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *stru
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
idx := maxIndexTxn(tx, gatewayServicesTableName)
|
idx := maxIndexTxn(tx, tableGatewayServices)
|
||||||
|
|
||||||
return lib.MaxUint64(maxIdx, idx), results, nil
|
return lib.MaxUint64(maxIdx, idx), results, nil
|
||||||
}
|
}
|
||||||
@ -2361,7 +2355,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
|
|||||||
// Delete all associated with gateway first, to avoid keeping mappings that were removed
|
// Delete all associated with gateway first, to avoid keeping mappings that were removed
|
||||||
sn := structs.NewServiceName(conf.GetName(), entMeta)
|
sn := structs.NewServiceName(conf.GetName(), entMeta)
|
||||||
|
|
||||||
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
|
if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil {
|
||||||
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
||||||
}
|
}
|
||||||
if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil {
|
if err := truncateGatewayServiceTopologyMappings(tx, idx, sn, conf.GetKind()); err != nil {
|
||||||
@ -2389,7 +2383,7 @@ func updateGatewayServices(tx WriteTxn, idx uint64, conf structs.ConfigEntry, en
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
|
||||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -2499,7 +2493,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
existing, err := tx.First(gatewayServicesTableName, "id", service.Gateway, sn.CompoundServiceName(), service.Port)
|
existing, err := tx.First(tableGatewayServices, "id", service.Gateway, sn.CompoundServiceName(), service.Port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("gateway service lookup failed: %s", err)
|
return fmt.Errorf("gateway service lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -2534,7 +2528,7 @@ func updateGatewayNamespace(tx WriteTxn, idx uint64, service *structs.GatewaySer
|
|||||||
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
|
func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayService) error {
|
||||||
// Check if mapping already exists in table if it's already in the table
|
// Check if mapping already exists in table if it's already in the table
|
||||||
// Avoid insert if nothing changed
|
// Avoid insert if nothing changed
|
||||||
existing, err := tx.First(gatewayServicesTableName, "id", mapping.Gateway, mapping.Service, mapping.Port)
|
existing, err := tx.First(tableGatewayServices, "id", mapping.Gateway, mapping.Service, mapping.Port)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("gateway service lookup failed: %s", err)
|
return fmt.Errorf("gateway service lookup failed: %s", err)
|
||||||
}
|
}
|
||||||
@ -2549,11 +2543,11 @@ func updateGatewayService(tx WriteTxn, idx uint64, mapping *structs.GatewayServi
|
|||||||
}
|
}
|
||||||
mapping.ModifyIndex = idx
|
mapping.ModifyIndex = idx
|
||||||
|
|
||||||
if err := tx.Insert(gatewayServicesTableName, mapping); err != nil {
|
if err := tx.Insert(tableGatewayServices, mapping); err != nil {
|
||||||
return fmt.Errorf("failed inserting gateway service mapping: %s", err)
|
return fmt.Errorf("failed inserting gateway service mapping: %s", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
|
||||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2613,10 +2607,10 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
|
|||||||
// Otherwise the service was specified in the config entry, and the association should be maintained
|
// Otherwise the service was specified in the config entry, and the association should be maintained
|
||||||
// for when the service is re-registered
|
// for when the service is re-registered
|
||||||
if m.FromWildcard {
|
if m.FromWildcard {
|
||||||
if err := tx.Delete(gatewayServicesTableName, m); err != nil {
|
if err := tx.Delete(tableGatewayServices, m); err != nil {
|
||||||
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
|
||||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||||
}
|
}
|
||||||
if err := deleteGatewayServiceTopologyMapping(tx, idx, m); err != nil {
|
if err := deleteGatewayServiceTopologyMapping(tx, idx, m); err != nil {
|
||||||
@ -2630,18 +2624,18 @@ func cleanupGatewayWildcards(tx WriteTxn, idx uint64, svc *structs.ServiceNode)
|
|||||||
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
|
// serviceGateways returns all GatewayService entries with the given service name. This effectively looks up
|
||||||
// all the gateways mapped to this service.
|
// all the gateways mapped to this service.
|
||||||
func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func serviceGateways(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get(gatewayServicesTableName, "service", structs.NewServiceName(name, entMeta))
|
return tx.Get(tableGatewayServices, "service", structs.NewServiceName(name, entMeta))
|
||||||
}
|
}
|
||||||
|
|
||||||
func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func gatewayServices(tx ReadTxn, name string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get(gatewayServicesTableName, "gateway", structs.NewServiceName(name, entMeta))
|
return tx.Get(tableGatewayServices, "gateway", structs.NewServiceName(name, entMeta))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayServices, error) {
|
func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayServices, error) {
|
||||||
tx := s.db.ReadTxn()
|
tx := s.db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
iter, err := tx.Get(gatewayServicesTableName, "id")
|
iter, err := tx.Get(tableGatewayServices, "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed to dump gateway-services: %s", err)
|
return 0, nil, fmt.Errorf("failed to dump gateway-services: %s", err)
|
||||||
}
|
}
|
||||||
@ -2651,7 +2645,7 @@ func (s *Store) DumpGatewayServices(ws memdb.WatchSet) (uint64, structs.GatewayS
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
idx := maxIndexTxn(tx, gatewayServicesTableName)
|
idx := maxIndexTxn(tx, tableGatewayServices)
|
||||||
|
|
||||||
return lib.MaxUint64(maxIdx, idx), results, nil
|
return lib.MaxUint64(maxIdx, idx), results, nil
|
||||||
}
|
}
|
||||||
@ -2968,9 +2962,9 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
|
|||||||
index = "upstream"
|
index = "upstream"
|
||||||
}
|
}
|
||||||
|
|
||||||
iter, err := tx.Get(topologyTableName, index, service)
|
iter, err := tx.Get(tableMeshTopology, index, service)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
|
return 0, nil, fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
ws.Add(iter.WatchCh())
|
ws.Add(iter.WatchCh())
|
||||||
|
|
||||||
@ -2993,7 +2987,7 @@ func linkedFromRegistrationTxn(tx ReadTxn, ws memdb.WatchSet, service structs.Se
|
|||||||
|
|
||||||
// TODO (freddy) This needs a tombstone to avoid the index sliding back on mapping deletion
|
// TODO (freddy) This needs a tombstone to avoid the index sliding back on mapping deletion
|
||||||
// Using the table index here means that blocking queries will wake up more often than they should
|
// Using the table index here means that blocking queries will wake up more often than they should
|
||||||
tableIdx := maxIndexTxn(tx, topologyTableName)
|
tableIdx := maxIndexTxn(tx, tableMeshTopology)
|
||||||
if tableIdx > idx {
|
if tableIdx > idx {
|
||||||
idx = tableIdx
|
idx = tableIdx
|
||||||
}
|
}
|
||||||
@ -3024,9 +3018,9 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
|
|||||||
upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace)
|
upstreamMeta := structs.EnterpriseMetaInitializer(u.DestinationNamespace)
|
||||||
upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta)
|
upstream := structs.NewServiceName(u.DestinationName, &upstreamMeta)
|
||||||
|
|
||||||
obj, err := tx.First(topologyTableName, "id", upstream, downstream)
|
obj, err := tx.First(tableMeshTopology, "id", upstream, downstream)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
|
return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
sid := svc.CompoundServiceID()
|
sid := svc.CompoundServiceID()
|
||||||
uid := structs.UniqueID(node, sid.String())
|
uid := structs.UniqueID(node, sid.String())
|
||||||
@ -3057,22 +3051,22 @@ func updateMeshTopology(tx WriteTxn, idx uint64, node string, svc *structs.NodeS
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := tx.Insert(topologyTableName, mapping); err != nil {
|
if err := tx.Insert(tableMeshTopology, mapping); err != nil {
|
||||||
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
|
return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
inserted[upstream] = true
|
inserted[upstream] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
for u := range oldUpstreams {
|
for u := range oldUpstreams {
|
||||||
if !inserted[u] {
|
if !inserted[u] {
|
||||||
if _, err := tx.DeleteAll(topologyTableName, "id", u, downstream); err != nil {
|
if _, err := tx.DeleteAll(tableMeshTopology, "id", u, downstream); err != nil {
|
||||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -3090,9 +3084,9 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
|
|||||||
sid := service.CompoundServiceID()
|
sid := service.CompoundServiceID()
|
||||||
uid := structs.UniqueID(service.Node, sid.String())
|
uid := structs.UniqueID(service.Node, sid.String())
|
||||||
|
|
||||||
iter, err := tx.Get(topologyTableName, "downstream", sn)
|
iter, err := tx.Get(tableMeshTopology, "downstream", sn)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("%q lookup failed: %v", topologyTableName, err)
|
return fmt.Errorf("%q lookup failed: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
mappings := make([]*structs.UpstreamDownstream, 0)
|
mappings := make([]*structs.UpstreamDownstream, 0)
|
||||||
@ -3118,17 +3112,17 @@ func cleanupMeshTopology(tx WriteTxn, idx uint64, service *structs.ServiceNode)
|
|||||||
|
|
||||||
delete(copy.Refs, uid)
|
delete(copy.Refs, uid)
|
||||||
if len(copy.Refs) == 0 {
|
if len(copy.Refs) == 0 {
|
||||||
if err := tx.Delete(topologyTableName, m); err != nil {
|
if err := tx.Delete(tableMeshTopology, m); err != nil {
|
||||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
continue
|
continue
|
||||||
|
|
||||||
}
|
}
|
||||||
if err := tx.Insert(topologyTableName, copy); err != nil {
|
if err := tx.Insert(tableMeshTopology, copy); err != nil {
|
||||||
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
|
return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -3145,11 +3139,11 @@ func insertGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga
|
|||||||
Downstream: gs.Gateway,
|
Downstream: gs.Gateway,
|
||||||
RaftIndex: gs.RaftIndex,
|
RaftIndex: gs.RaftIndex,
|
||||||
}
|
}
|
||||||
if err := tx.Insert(topologyTableName, &mapping); err != nil {
|
if err := tx.Insert(tableMeshTopology, &mapping); err != nil {
|
||||||
return fmt.Errorf("failed inserting %s mapping: %s", topologyTableName, err)
|
return fmt.Errorf("failed inserting %s mapping: %s", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -3161,11 +3155,11 @@ func deleteGatewayServiceTopologyMapping(tx WriteTxn, idx uint64, gs *structs.Ga
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := tx.DeleteAll(topologyTableName, "id", gs.Service, gs.Gateway); err != nil {
|
if _, err := tx.DeleteAll(tableMeshTopology, "id", gs.Service, gs.Gateway); err != nil {
|
||||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -3177,11 +3171,11 @@ func truncateGatewayServiceTopologyMappings(tx WriteTxn, idx uint64, gateway str
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if _, err := tx.DeleteAll(topologyTableName, "downstream", gateway); err != nil {
|
if _, err := tx.DeleteAll(tableMeshTopology, "downstream", gateway); err != nil {
|
||||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
@ -54,7 +54,7 @@ func catalogUpdateServiceIndexes(tx WriteTxn, serviceName string, idx uint64, _
|
|||||||
}
|
}
|
||||||
|
|
||||||
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
func catalogUpdateServiceExtinctionIndex(tx WriteTxn, idx uint64, _ *structs.EnterpriseMeta) error {
|
||||||
if err := tx.Insert("index", &IndexEntry{serviceLastExtinctionIndexName, idx}); err != nil {
|
if err := tx.Insert("index", &IndexEntry{indexServiceExtinction, idx}); err != nil {
|
||||||
return fmt.Errorf("failed updating missing service extinction index: %s", err)
|
return fmt.Errorf("failed updating missing service extinction index: %s", err)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
@ -110,7 +110,7 @@ func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.En
|
|||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
||||||
return tx.First("index", "id", serviceLastExtinctionIndexName)
|
return tx.First("index", "id", indexServiceExtinction)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
||||||
|
@ -14,6 +14,8 @@ const (
|
|||||||
tableNodes = "nodes"
|
tableNodes = "nodes"
|
||||||
tableServices = "services"
|
tableServices = "services"
|
||||||
tableChecks = "checks"
|
tableChecks = "checks"
|
||||||
|
tableGatewayServices = "gateway-services"
|
||||||
|
tableMeshTopology = "mesh-topology"
|
||||||
|
|
||||||
indexID = "id"
|
indexID = "id"
|
||||||
indexServiceName = "service"
|
indexServiceName = "service"
|
||||||
@ -205,11 +207,11 @@ func checksTableSchema() *memdb.TableSchema {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// gatewayServicesTableNameSchema returns a new table schema used to store information
|
// gatewayServicesTableSchema returns a new table schema used to store information
|
||||||
// about services associated with terminating gateways.
|
// about services associated with terminating gateways.
|
||||||
func gatewayServicesTableNameSchema() *memdb.TableSchema {
|
func gatewayServicesTableSchema() *memdb.TableSchema {
|
||||||
return &memdb.TableSchema{
|
return &memdb.TableSchema{
|
||||||
Name: gatewayServicesTableName,
|
Name: tableGatewayServices,
|
||||||
Indexes: map[string]*memdb.IndexSchema{
|
Indexes: map[string]*memdb.IndexSchema{
|
||||||
indexID: {
|
indexID: {
|
||||||
Name: indexID,
|
Name: indexID,
|
||||||
@ -249,11 +251,11 @@ func gatewayServicesTableNameSchema() *memdb.TableSchema {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// topologyTableNameSchema returns a new table schema used to store information
|
// meshTopologyTableSchema returns a new table schema used to store information
|
||||||
// relating upstream and downstream services
|
// relating upstream and downstream services
|
||||||
func topologyTableNameSchema() *memdb.TableSchema {
|
func meshTopologyTableSchema() *memdb.TableSchema {
|
||||||
return &memdb.TableSchema{
|
return &memdb.TableSchema{
|
||||||
Name: topologyTableName,
|
Name: tableMeshTopology,
|
||||||
Indexes: map[string]*memdb.IndexSchema{
|
Indexes: map[string]*memdb.IndexSchema{
|
||||||
indexID: {
|
indexID: {
|
||||||
Name: indexID,
|
Name: indexID,
|
||||||
@ -350,6 +352,6 @@ func init() {
|
|||||||
registerSchema(nodesTableSchema)
|
registerSchema(nodesTableSchema)
|
||||||
registerSchema(servicesTableSchema)
|
registerSchema(servicesTableSchema)
|
||||||
registerSchema(checksTableSchema)
|
registerSchema(checksTableSchema)
|
||||||
registerSchema(gatewayServicesTableNameSchema)
|
registerSchema(gatewayServicesTableSchema)
|
||||||
registerSchema(topologyTableNameSchema)
|
registerSchema(meshTopologyTableSchema)
|
||||||
}
|
}
|
||||||
|
@ -273,20 +273,20 @@ func deleteConfigEntryTxn(tx WriteTxn, idx uint64, kind, name string, entMeta *s
|
|||||||
sn := structs.NewServiceName(name, entMeta)
|
sn := structs.NewServiceName(name, entMeta)
|
||||||
|
|
||||||
if kind == structs.TerminatingGateway || kind == structs.IngressGateway {
|
if kind == structs.TerminatingGateway || kind == structs.IngressGateway {
|
||||||
if _, err := tx.DeleteAll(gatewayServicesTableName, "gateway", sn); err != nil {
|
if _, err := tx.DeleteAll(tableGatewayServices, "gateway", sn); err != nil {
|
||||||
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
return fmt.Errorf("failed to truncate gateway services table: %v", err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, gatewayServicesTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableGatewayServices); err != nil {
|
||||||
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
return fmt.Errorf("failed updating gateway-services index: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Also clean up associations in the mesh topology table for ingress gateways
|
// Also clean up associations in the mesh topology table for ingress gateways
|
||||||
if kind == structs.IngressGateway {
|
if kind == structs.IngressGateway {
|
||||||
if _, err := tx.DeleteAll(topologyTableName, "downstream", sn); err != nil {
|
if _, err := tx.DeleteAll(tableMeshTopology, "downstream", sn); err != nil {
|
||||||
return fmt.Errorf("failed to truncate %s table: %v", topologyTableName, err)
|
return fmt.Errorf("failed to truncate %s table: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
if err := indexUpdateMaxTxn(tx, idx, topologyTableName); err != nil {
|
if err := indexUpdateMaxTxn(tx, idx, tableMeshTopology); err != nil {
|
||||||
return fmt.Errorf("failed updating %s index: %v", topologyTableName, err)
|
return fmt.Errorf("failed updating %s index: %v", tableMeshTopology, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -3,8 +3,9 @@ package state
|
|||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
|
||||||
memdb "github.com/hashicorp/go-memdb"
|
memdb "github.com/hashicorp/go-memdb"
|
||||||
|
|
||||||
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@ -101,7 +102,7 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
|||||||
// This will happen when restoring from a snapshot, just take the max index
|
// This will happen when restoring from a snapshot, just take the max index
|
||||||
// of the tables we are tracking.
|
// of the tables we are tracking.
|
||||||
if idx == 0 {
|
if idx == 0 {
|
||||||
idx = maxIndexTxn(tx, "nodes", servicesTableName)
|
idx = maxIndexTxn(tx, "nodes", tableServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
return writeUsageDeltas(tx, idx, usageDeltas)
|
return writeUsageDeltas(tx, idx, usageDeltas)
|
||||||
@ -110,7 +111,7 @@ func updateUsage(tx WriteTxn, changes Changes) error {
|
|||||||
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
|
func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
|
||||||
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
|
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
|
||||||
for svc, delta := range serviceNameChanges {
|
for svc, delta := range serviceNameChanges {
|
||||||
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta)
|
serviceIter, err := getWithTxn(tx, tableServices, "service", svc.Name, &svc.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -226,7 +227,7 @@ func (s *Store) ServiceUsage() (uint64, ServiceUsage, error) {
|
|||||||
tx := s.db.ReadTxn()
|
tx := s.db.ReadTxn()
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
serviceInstances, err := firstUsageEntry(tx, servicesTableName)
|
serviceInstances, err := firstUsageEntry(tx, tableServices)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
return 0, ServiceUsage{}, fmt.Errorf("failed services lookup: %s", err)
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user