Add support for setting node metadata fields

This commit is contained in:
Kyle Havlovitz 2017-01-05 14:10:26 -08:00
parent 04f8e5adc0
commit 52d6fd831e
No known key found for this signature in database
GPG Key ID: 8A5E6B173056AD6C
12 changed files with 153 additions and 13 deletions

View File

@ -246,13 +246,16 @@ func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter,
return nil, err
}
// Load checks/services.
// Load checks/services/metadata.
if err := agent.loadServices(config); err != nil {
return nil, err
}
if err := agent.loadChecks(config); err != nil {
return nil, err
}
if err := agent.loadMetadata(config); err != nil {
return nil, err
}
// Start watching for critical services to deregister, based on their
// checks.
@ -1677,6 +1680,37 @@ func (a *Agent) restoreCheckState(snap map[types.CheckID]*structs.HealthCheck) {
}
}
// loadMetadata loads and validates node metadata fields from the config and
// updates them on the local agent.
func (a *Agent) loadMetadata(conf *Config) error {
a.state.Lock()
defer a.state.Unlock()
for key, value := range conf.Meta {
if strings.Contains(key, ":") {
return fmt.Errorf("Key name cannot contain ':' character: %s", key)
}
if strings.HasPrefix(key, "consul-") {
return fmt.Errorf("Key prefix 'consul-' is reserved for internal use")
}
a.state.metadata[key] = value
}
a.state.changeMade()
return nil
}
// unloadMetadata resets the local metadata state
func (a *Agent) unloadMetadata() error {
a.state.Lock()
defer a.state.Unlock()
a.state.metadata = make(map[string]string)
return nil
}
// serviceMaintCheckID returns the ID of a given service's maintenance check
func serviceMaintCheckID(serviceID string) types.CheckID {
return types.CheckID(structs.ServiceMaintPrefix + serviceID)

View File

@ -20,6 +20,7 @@ type AgentSelf struct {
Coord *coordinate.Coordinate
Member serf.Member
Stats map[string]map[string]string
Meta map[string]string
}
func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
@ -47,6 +48,7 @@ func (s *HTTPServer) AgentSelf(resp http.ResponseWriter, req *http.Request) (int
Coord: c,
Member: s.agent.LocalMember(),
Stats: s.agent.Stats(),
Meta: s.agent.state.Metadata(),
}, nil
}

View File

@ -67,6 +67,14 @@ func (s *HTTPServer) CatalogNodes(resp http.ResponseWriter, req *http.Request) (
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
return nil, nil
}
// Try to parse node metadata filter params
if filter, ok := req.URL.Query()["node-meta"]; ok && len(filter) > 0 {
pair := strings.SplitN(filter[0], ":", 2)
args.NodeMetaKey = pair[0]
if len(pair) == 2 {
args.NodeMetaValue = pair[1]
}
}
var out structs.IndexedNodes
defer setMeta(resp, &out.QueryMeta)

View File

@ -1071,7 +1071,7 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
snap := c.agent.snapshotCheckState()
defer c.agent.restoreCheckState(snap)
// First unload all checks and services. This lets us begin the reload
// First unload all checks, services, and metadata. This lets us begin the reload
// with a clean slate.
if err := c.agent.unloadServices(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading services: %s", err))
@ -1081,8 +1081,12 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading checks: %s", err))
return nil, errs
}
if err := c.agent.unloadMetadata(); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed unloading metadata: %s", err))
return nil, errs
}
// Reload services and check definitions.
// Reload service/check definitions and metadata.
if err := c.agent.loadServices(newConf); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading services: %s", err))
return nil, errs
@ -1091,6 +1095,10 @@ func (c *Command) handleReload(config *Config) (*Config, error) {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading checks: %s", err))
return nil, errs
}
if err := c.agent.loadMetadata(newConf); err != nil {
errs = multierror.Append(errs, fmt.Errorf("Failed reloading metadata: %s", err))
return nil, errs
}
// Get the new client listener addr
httpAddr, err := newConf.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)

View File

@ -342,6 +342,9 @@ type Config struct {
// they are configured with TranslateWanAddrs set to true.
TaggedAddresses map[string]string
// Node metadata
Meta map[string]string `mapstructure:"node_meta" json:"-"`
// LeaveOnTerm controls if Serf does a graceful leave when receiving
// the TERM signal. Defaults true on clients, false on servers. This can
// be changed on reload.
@ -1577,6 +1580,14 @@ func MergeConfig(a, b *Config) *Config {
result.HTTPAPIResponseHeaders[field] = value
}
}
if len(b.Meta) != 0 {
if result.Meta == nil {
result.Meta = make(map[string]string)
}
for field, value := range b.Meta {
result.Meta[field] = value
}
}
// Copy the start join addresses
result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin))

View File

@ -281,6 +281,19 @@ func TestDecodeConfig(t *testing.T) {
t.Fatalf("bad: %#v", config)
}
// Node metadata fields
input = `{"node_meta": {"thing1": "1", "thing2": "2"}}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
if err != nil {
t.Fatalf("err: %s", err)
}
if v, ok := config.Meta["thing1"]; !ok || v != "1" {
t.Fatalf("bad: %#v", config)
}
if v, ok := config.Meta["thing2"]; !ok || v != "2" {
t.Fatalf("bad: %#v", config)
}
// leave_on_terminate
input = `{"leave_on_terminate": true}`
config, err = DecodeConfig(bytes.NewReader([]byte(input)))
@ -1519,6 +1532,9 @@ func TestMergeConfig(t *testing.T) {
DogStatsdAddr: "nope",
DogStatsdTags: []string{"nope"},
},
Meta: map[string]string{
"key": "value1",
},
}
b := &Config{
@ -1620,6 +1636,9 @@ func TestMergeConfig(t *testing.T) {
DogStatsdAddr: "127.0.0.1:7254",
DogStatsdTags: []string{"tag_1:val_1", "tag_2:val_2"},
},
Meta: map[string]string{
"key": "value2",
},
DisableUpdateCheck: true,
DisableAnonymousSignature: true,
HTTPAPIResponseHeaders: map[string]string{

View File

@ -61,6 +61,9 @@ type localState struct {
// Used to track checks that are being deferred
deferCheck map[types.CheckID]*time.Timer
// metadata tracks the local metadata fields
metadata map[string]string
// consulCh is used to inform of a change to the known
// consul nodes. This may be used to retry a sync run
consulCh chan struct{}
@ -82,6 +85,7 @@ func (l *localState) Init(config *Config, logger *log.Logger) {
l.checkTokens = make(map[types.CheckID]string)
l.checkCriticalTime = make(map[types.CheckID]time.Time)
l.deferCheck = make(map[types.CheckID]*time.Timer)
l.metadata = make(map[string]string)
l.consulCh = make(chan struct{}, 1)
l.triggerCh = make(chan struct{}, 1)
}
@ -339,6 +343,19 @@ func (l *localState) CriticalChecks() map[types.CheckID]CriticalCheck {
return checks
}
// Metadata returns the local node metadata fields that the
// agent is aware of and are being kept in sync with the server
func (l *localState) Metadata() map[string]string {
metadata := make(map[string]string)
l.RLock()
defer l.RUnlock()
for key, value := range l.metadata {
metadata[key] = value
}
return metadata
}
// antiEntropy is a long running method used to perform anti-entropy
// between local and remote state.
func (l *localState) antiEntropy(shutdownCh chan struct{}) {
@ -412,10 +429,10 @@ func (l *localState) setSyncState() error {
l.Lock()
defer l.Unlock()
// Check the node info (currently limited to tagged addresses since
// everything else is managed by the Serf layer)
// Check the node info
if out1.NodeServices == nil || out1.NodeServices.Node == nil ||
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) {
!reflect.DeepEqual(out1.NodeServices.Node.TaggedAddresses, l.config.TaggedAddresses) ||
!reflect.DeepEqual(out1.NodeServices.Node.Meta, l.metadata) {
l.nodeInfoInSync = false
}
@ -619,6 +636,7 @@ func (l *localState) syncService(id string) error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
Service: l.services[id],
WriteRequest: structs.WriteRequest{Token: l.serviceToken(id)},
}
@ -680,6 +698,7 @@ func (l *localState) syncCheck(id types.CheckID) error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
Service: service,
Check: l.checks[id],
WriteRequest: structs.WriteRequest{Token: l.checkToken(id)},
@ -706,6 +725,7 @@ func (l *localState) syncNodeInfo() error {
Node: l.config.NodeName,
Address: l.config.AdvertiseAddr,
TaggedAddresses: l.config.TaggedAddresses,
NodeMeta: l.metadata,
WriteRequest: structs.WriteRequest{Token: l.config.GetTokenForAgent()},
}
var out struct{}

View File

@ -156,6 +156,14 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
return err
}
var metaFilter []interface{}
if args.NodeMetaKey != "" {
metaFilter = append(metaFilter, args.NodeMetaKey)
if args.NodeMetaValue != "" {
metaFilter = append(metaFilter, args.NodeMetaValue)
}
}
// Get the list of nodes.
state := c.srv.fsm.State()
return c.srv.blockingRPC(
@ -163,7 +171,7 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
&reply.QueryMeta,
state.GetQueryWatch("Nodes"),
func() error {
index, nodes, err := state.Nodes()
index, nodes, err := state.Nodes(metaFilter...)
if err != nil {
return err
}

View File

@ -78,6 +78,15 @@ func nodesTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
"meta": &memdb.IndexSchema{
Name: "meta",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringMapFieldIndex{
Field: "Meta",
Lowercase: false,
},
},
},
}
}

View File

@ -426,6 +426,7 @@ func (s *StateStore) ensureRegistrationTxn(tx *memdb.Txn, idx uint64, watches *D
Node: req.Node,
Address: req.Address,
TaggedAddresses: req.TaggedAddresses,
Meta: req.NodeMeta,
}
if err := s.ensureNodeTxn(tx, idx, watches, node); err != nil {
return fmt.Errorf("failed inserting node: %s", err)
@ -527,7 +528,7 @@ func (s *StateStore) GetNode(id string) (uint64, *structs.Node, error) {
}
// Nodes is used to return all of the known nodes.
func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
func (s *StateStore) Nodes(metaFilter ...interface{}) (uint64, structs.Nodes, error) {
tx := s.db.Txn(false)
defer tx.Abort()
@ -535,7 +536,13 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
idx := maxIndexTxn(tx, s.getWatchTables("Nodes")...)
// Retrieve all of the nodes
nodes, err := tx.Get("nodes", "id")
var nodes memdb.ResultIterator
var err error
if len(metaFilter) > 0 {
nodes, err = tx.Get("nodes", "meta", metaFilter...)
} else {
nodes, err = tx.Get("nodes", "id")
}
if err != nil {
return 0, nil, fmt.Errorf("failed nodes lookup: %s", err)
}
@ -854,6 +861,7 @@ func (s *StateStore) parseServiceNodes(tx *memdb.Txn, services structs.ServiceNo
node := n.(*structs.Node)
s.Address = node.Address
s.TaggedAddresses = node.TaggedAddresses
s.NodeMeta = node.Meta
results = append(results, s)
}
@ -1392,6 +1400,7 @@ func (s *StateStore) parseNodes(tx *memdb.Txn, idx uint64,
Node: node.Node,
Address: node.Address,
TaggedAddresses: node.TaggedAddresses,
Meta: node.Meta,
}
// Query the node services

View File

@ -173,6 +173,7 @@ type RegisterRequest struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
Service *NodeService
Check *HealthCheck
Checks HealthChecks
@ -195,7 +196,8 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool {
// Check if any of the node-level fields are being changed.
if r.Node != node.Node ||
r.Address != node.Address ||
!reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) {
!reflect.DeepEqual(r.TaggedAddresses, node.TaggedAddresses) ||
!reflect.DeepEqual(r.NodeMeta, node.Meta) {
return true
}
@ -227,8 +229,10 @@ type QuerySource struct {
// DCSpecificRequest is used to query about a specific DC
type DCSpecificRequest struct {
Datacenter string
Source QuerySource
Datacenter string
NodeMetaKey string
NodeMetaValue string
Source QuerySource
QueryOptions
}
@ -278,6 +282,7 @@ type Node struct {
Node string
Address string
TaggedAddresses map[string]string
Meta map[string]string
RaftIndex
}
@ -296,6 +301,7 @@ type ServiceNode struct {
Node string
Address string
TaggedAddresses map[string]string
NodeMeta map[string]string
ServiceID string
ServiceName string
ServiceTags []string
@ -488,6 +494,7 @@ type NodeInfo struct {
Node string
Address string
TaggedAddresses map[string]string
Meta map[string]string
Services []*NodeService
Checks HealthChecks
}

View File

@ -151,6 +151,9 @@ func testServiceNode() *ServiceNode {
TaggedAddresses: map[string]string{
"hello": "world",
},
NodeMeta: map[string]string{
"tag": "value",
},
ServiceID: "service1",
ServiceName: "dogs",
ServiceTags: []string{"prod", "v1"},
@ -172,12 +175,13 @@ func TestStructs_ServiceNode_PartialClone(t *testing.T) {
// Make sure the parts that weren't supposed to be cloned didn't get
// copied over, then zero-value them out so we can do a DeepEqual() on
// the rest of the contents.
if clone.Address != "" || len(clone.TaggedAddresses) != 0 {
if clone.Address != "" || len(clone.TaggedAddresses) != 0 || len(clone.NodeMeta) != 0 {
t.Fatalf("bad: %v", clone)
}
sn.Address = ""
sn.TaggedAddresses = nil
sn.NodeMeta = nil
if !reflect.DeepEqual(sn, clone) {
t.Fatalf("bad: %v", clone)
}
@ -197,6 +201,7 @@ func TestStructs_ServiceNode_Conversions(t *testing.T) {
// them out before we do the compare.
sn.Address = ""
sn.TaggedAddresses = nil
sn.NodeMeta = nil
if !reflect.DeepEqual(sn, sn2) {
t.Fatalf("bad: %v", sn2)
}