diff --git a/command/agent/agent.go b/command/agent/agent.go index d2a9d3e97b..08964d3257 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -47,6 +47,9 @@ type Agent struct { checkTTLs map[string]*CheckTTL checkLock sync.Mutex + // eventCh is used to receive user events + eventCh chan serf.UserEvent + shutdown bool shutdownCh chan struct{} shutdownLock sync.Mutex @@ -89,6 +92,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { logOutput: logOutput, checkMonitors: make(map[string]*CheckMonitor), checkTTLs: make(map[string]*CheckTTL), + eventCh: make(chan serf.UserEvent, 1024), shutdownCh: make(chan struct{}), } @@ -108,6 +112,9 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { return nil, err } + // Start handling events + go agent.handleEvents() + // Write out the PID file if necessary err = agent.storePid() if err != nil { @@ -219,6 +226,14 @@ func (a *Agent) consulConfig() *consul.Config { // Setup the ServerUp callback base.ServerUp = a.state.ConsulServerUp + // Setup the user event callback + base.UserEventHandler = func(e serf.UserEvent) { + select { + case a.eventCh <- e: + case <-a.shutdownCh: + } + } + // Setup the loggers base.LogOutput = a.logOutput return base @@ -372,15 +387,6 @@ func (a *Agent) WANMembers() []serf.Member { } } -// UserEvent is used to fire an event via the Serf layer on the LAN -func (a *Agent) UserEvent(name string, payload []byte) error { - if a.server != nil { - return a.server.UserEvent(name, payload) - } else { - return a.client.UserEvent(name, payload) - } -} - // StartSync is called once Services and Checks are registered. // This is called to prevent a race between clients and the anti-entropy routines func (a *Agent) StartSync() { diff --git a/command/agent/user_event.go b/command/agent/user_event.go new file mode 100644 index 0000000000..501b58d4ed --- /dev/null +++ b/command/agent/user_event.go @@ -0,0 +1,208 @@ +package agent + +import ( + "bytes" + "fmt" + "regexp" + + "github.com/ugorji/go/codec" +) + +const ( + // userEventMaxVersion is the maximum protocol version we understand + userEventMaxVersion = 1 +) + +// UserEventParam is used to parameterize a user event +type UserEventParam struct { + // Name of the event + Name string + + // Optional payload + Payload []byte + + // NodeFilter is a regular expression to filter on nodes + NodeFilter string + + // ServiceFilter is a regular expression to filter on services + ServiceFilter string + + // TagFilter is a regular expression to filter on tags of a service, + // must be provided with ServiceFilter + TagFilter string +} + +// userEventEnc is the encoded version +type userEventEnc struct { + Version int `codec:"v"` + ID string + Name string `codec:"n"` + Payload []byte `codec:"p,omitempty"` + NodeFilter string `codec:"nf,omitempty"` + ServiceFilter string `codec:"sf,omitempty"` + TagFilter string `codec:"tf,omitempty"` +} + +// validateUserEventParams is used to sanity check the inputs +func validateUserEventParams(params *UserEventParam) error { + // Validate the inputs + if params.Name == "" { + return fmt.Errorf("User event missing name") + } + if params.TagFilter != "" && params.ServiceFilter == "" { + return fmt.Errorf("Cannot provide tag filter without service filter") + } + if params.NodeFilter != "" { + if _, err := regexp.Compile(params.NodeFilter); err != nil { + return fmt.Errorf("Invalid node filter: %v", err) + } + } + if params.ServiceFilter != "" { + if _, err := regexp.Compile(params.ServiceFilter); err != nil { + return fmt.Errorf("Invalid service filter: %v", err) + } + } + if params.TagFilter != "" { + if _, err := regexp.Compile(params.TagFilter); err != nil { + return fmt.Errorf("Invalid tag filter: %v", err) + } + } + return nil +} + +// UserEvent is used to fire an event via the Serf layer on the LAN +func (a *Agent) UserEvent(params *UserEventParam) error { + // Validate the params + if err := validateUserEventParams(params); err != nil { + return err + } + + // Format message + msg := userEventEnc{ + Version: userEventMaxVersion, + ID: generateUUID(), + Name: params.Name, + Payload: params.Payload, + NodeFilter: params.NodeFilter, + ServiceFilter: params.ServiceFilter, + TagFilter: params.TagFilter, + } + payload, err := encodeUserEvent(&msg) + if err != nil { + return fmt.Errorf("UserEvent encoding failed: %v", err) + } + if a.server != nil { + return a.server.UserEvent(params.Name, payload) + } else { + return a.client.UserEvent(params.Name, payload) + } +} + +// handleEvents is used to process incoming user events +func (a *Agent) handleEvents() { + for { + select { + case e := <-a.eventCh: + // Decode the event + msg := new(userEventEnc) + if err := decodeUserEvent(e.Payload, msg); err != nil { + a.logger.Printf("[ERR] agent: Failed to decode event: %v", err) + continue + } + + // Skip if we don't pass filtering + if !a.shouldProcessUserEvent(msg) { + continue + } + + // TODO: Process event + + case <-a.shutdownCh: + return + } + } +} + +// shouldProcessUserEvent checks if an event makes it through our filters +func (a *Agent) shouldProcessUserEvent(msg *userEventEnc) bool { + // Check the version + if msg.Version > userEventMaxVersion { + a.logger.Printf("[WARN] agent: Event version %d may have unsupported features (%s)", + msg.Version, msg.Name) + } + + // Apply the filters + if msg.NodeFilter != "" { + re, err := regexp.Compile(msg.NodeFilter) + if err != nil { + a.logger.Printf("[ERR] agent: Failed to parse node filter '%s' for event '%s': %v", + msg.NodeFilter, msg.Name, err) + return false + } + if !re.MatchString(a.config.NodeName) { + return false + } + } + + if msg.ServiceFilter != "" { + re, err := regexp.Compile(msg.ServiceFilter) + if err != nil { + a.logger.Printf("[ERR] agent: Failed to parse service filter '%s' for event '%s': %v", + msg.ServiceFilter, msg.Name, err) + return false + } + + var tagRe *regexp.Regexp + if msg.TagFilter != "" { + re, err := regexp.Compile(msg.TagFilter) + if err != nil { + a.logger.Printf("[ERR] agent: Failed to parse tag filter '%s' for event '%s': %v", + msg.TagFilter, msg.Name, err) + return false + } + tagRe = re + } + + // Scan for a match + services := a.state.Services() + found := false + OUTER: + for name, info := range services { + // Check the service name + if !re.MatchString(name) { + continue + } + if tagRe == nil { + found = true + break + } + + // Look for a matching tag + for _, tag := range info.Tags { + if !tagRe.MatchString(tag) { + continue + } + found = true + break OUTER + } + } + + // No matching services + if !found { + return false + } + } + return true +} + +// Decode is used to decode a MsgPack encoded object +func decodeUserEvent(buf []byte, out interface{}) error { + return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out) +} + +// encodeUserEvent is used to encode user event +func encodeUserEvent(msg interface{}) ([]byte, error) { + var buf bytes.Buffer + err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg) + return buf.Bytes(), err +} diff --git a/command/agent/util.go b/command/agent/util.go index e2fdf44abd..b753505b50 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -1,6 +1,8 @@ package agent import ( + crand "crypto/rand" + "fmt" "math" "math/rand" "os" @@ -59,3 +61,18 @@ func ExecScript(script string) (*exec.Cmd, error) { cmd := exec.Command(shell, flag, script) return cmd, nil } + +// generateUUID is used to generate a random UUID +func generateUUID() string { + buf := make([]byte, 16) + if _, err := crand.Read(buf); err != nil { + panic(fmt.Errorf("failed to read random bytes: %v", err)) + } + + return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x", + buf[0:4], + buf[4:6], + buf[6:8], + buf[8:10], + buf[10:16]) +}