mirror of https://github.com/status-im/consul.git
agent: working on user events
This commit is contained in:
parent
b5fb9d6dfb
commit
314743c111
|
@ -47,6 +47,9 @@ type Agent struct {
|
|||
checkTTLs map[string]*CheckTTL
|
||||
checkLock sync.Mutex
|
||||
|
||||
// eventCh is used to receive user events
|
||||
eventCh chan serf.UserEvent
|
||||
|
||||
shutdown bool
|
||||
shutdownCh chan struct{}
|
||||
shutdownLock sync.Mutex
|
||||
|
@ -89,6 +92,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
logOutput: logOutput,
|
||||
checkMonitors: make(map[string]*CheckMonitor),
|
||||
checkTTLs: make(map[string]*CheckTTL),
|
||||
eventCh: make(chan serf.UserEvent, 1024),
|
||||
shutdownCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
|
@ -108,6 +112,9 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
// Start handling events
|
||||
go agent.handleEvents()
|
||||
|
||||
// Write out the PID file if necessary
|
||||
err = agent.storePid()
|
||||
if err != nil {
|
||||
|
@ -219,6 +226,14 @@ func (a *Agent) consulConfig() *consul.Config {
|
|||
// Setup the ServerUp callback
|
||||
base.ServerUp = a.state.ConsulServerUp
|
||||
|
||||
// Setup the user event callback
|
||||
base.UserEventHandler = func(e serf.UserEvent) {
|
||||
select {
|
||||
case a.eventCh <- e:
|
||||
case <-a.shutdownCh:
|
||||
}
|
||||
}
|
||||
|
||||
// Setup the loggers
|
||||
base.LogOutput = a.logOutput
|
||||
return base
|
||||
|
@ -372,15 +387,6 @@ func (a *Agent) WANMembers() []serf.Member {
|
|||
}
|
||||
}
|
||||
|
||||
// UserEvent is used to fire an event via the Serf layer on the LAN
|
||||
func (a *Agent) UserEvent(name string, payload []byte) error {
|
||||
if a.server != nil {
|
||||
return a.server.UserEvent(name, payload)
|
||||
} else {
|
||||
return a.client.UserEvent(name, payload)
|
||||
}
|
||||
}
|
||||
|
||||
// StartSync is called once Services and Checks are registered.
|
||||
// This is called to prevent a race between clients and the anti-entropy routines
|
||||
func (a *Agent) StartSync() {
|
||||
|
|
|
@ -0,0 +1,208 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"fmt"
|
||||
"regexp"
|
||||
|
||||
"github.com/ugorji/go/codec"
|
||||
)
|
||||
|
||||
const (
|
||||
// userEventMaxVersion is the maximum protocol version we understand
|
||||
userEventMaxVersion = 1
|
||||
)
|
||||
|
||||
// UserEventParam is used to parameterize a user event
|
||||
type UserEventParam struct {
|
||||
// Name of the event
|
||||
Name string
|
||||
|
||||
// Optional payload
|
||||
Payload []byte
|
||||
|
||||
// NodeFilter is a regular expression to filter on nodes
|
||||
NodeFilter string
|
||||
|
||||
// ServiceFilter is a regular expression to filter on services
|
||||
ServiceFilter string
|
||||
|
||||
// TagFilter is a regular expression to filter on tags of a service,
|
||||
// must be provided with ServiceFilter
|
||||
TagFilter string
|
||||
}
|
||||
|
||||
// userEventEnc is the encoded version
|
||||
type userEventEnc struct {
|
||||
Version int `codec:"v"`
|
||||
ID string
|
||||
Name string `codec:"n"`
|
||||
Payload []byte `codec:"p,omitempty"`
|
||||
NodeFilter string `codec:"nf,omitempty"`
|
||||
ServiceFilter string `codec:"sf,omitempty"`
|
||||
TagFilter string `codec:"tf,omitempty"`
|
||||
}
|
||||
|
||||
// validateUserEventParams is used to sanity check the inputs
|
||||
func validateUserEventParams(params *UserEventParam) error {
|
||||
// Validate the inputs
|
||||
if params.Name == "" {
|
||||
return fmt.Errorf("User event missing name")
|
||||
}
|
||||
if params.TagFilter != "" && params.ServiceFilter == "" {
|
||||
return fmt.Errorf("Cannot provide tag filter without service filter")
|
||||
}
|
||||
if params.NodeFilter != "" {
|
||||
if _, err := regexp.Compile(params.NodeFilter); err != nil {
|
||||
return fmt.Errorf("Invalid node filter: %v", err)
|
||||
}
|
||||
}
|
||||
if params.ServiceFilter != "" {
|
||||
if _, err := regexp.Compile(params.ServiceFilter); err != nil {
|
||||
return fmt.Errorf("Invalid service filter: %v", err)
|
||||
}
|
||||
}
|
||||
if params.TagFilter != "" {
|
||||
if _, err := regexp.Compile(params.TagFilter); err != nil {
|
||||
return fmt.Errorf("Invalid tag filter: %v", err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// UserEvent is used to fire an event via the Serf layer on the LAN
|
||||
func (a *Agent) UserEvent(params *UserEventParam) error {
|
||||
// Validate the params
|
||||
if err := validateUserEventParams(params); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Format message
|
||||
msg := userEventEnc{
|
||||
Version: userEventMaxVersion,
|
||||
ID: generateUUID(),
|
||||
Name: params.Name,
|
||||
Payload: params.Payload,
|
||||
NodeFilter: params.NodeFilter,
|
||||
ServiceFilter: params.ServiceFilter,
|
||||
TagFilter: params.TagFilter,
|
||||
}
|
||||
payload, err := encodeUserEvent(&msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("UserEvent encoding failed: %v", err)
|
||||
}
|
||||
if a.server != nil {
|
||||
return a.server.UserEvent(params.Name, payload)
|
||||
} else {
|
||||
return a.client.UserEvent(params.Name, payload)
|
||||
}
|
||||
}
|
||||
|
||||
// handleEvents is used to process incoming user events
|
||||
func (a *Agent) handleEvents() {
|
||||
for {
|
||||
select {
|
||||
case e := <-a.eventCh:
|
||||
// Decode the event
|
||||
msg := new(userEventEnc)
|
||||
if err := decodeUserEvent(e.Payload, msg); err != nil {
|
||||
a.logger.Printf("[ERR] agent: Failed to decode event: %v", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip if we don't pass filtering
|
||||
if !a.shouldProcessUserEvent(msg) {
|
||||
continue
|
||||
}
|
||||
|
||||
// TODO: Process event
|
||||
|
||||
case <-a.shutdownCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// shouldProcessUserEvent checks if an event makes it through our filters
|
||||
func (a *Agent) shouldProcessUserEvent(msg *userEventEnc) bool {
|
||||
// Check the version
|
||||
if msg.Version > userEventMaxVersion {
|
||||
a.logger.Printf("[WARN] agent: Event version %d may have unsupported features (%s)",
|
||||
msg.Version, msg.Name)
|
||||
}
|
||||
|
||||
// Apply the filters
|
||||
if msg.NodeFilter != "" {
|
||||
re, err := regexp.Compile(msg.NodeFilter)
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] agent: Failed to parse node filter '%s' for event '%s': %v",
|
||||
msg.NodeFilter, msg.Name, err)
|
||||
return false
|
||||
}
|
||||
if !re.MatchString(a.config.NodeName) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if msg.ServiceFilter != "" {
|
||||
re, err := regexp.Compile(msg.ServiceFilter)
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] agent: Failed to parse service filter '%s' for event '%s': %v",
|
||||
msg.ServiceFilter, msg.Name, err)
|
||||
return false
|
||||
}
|
||||
|
||||
var tagRe *regexp.Regexp
|
||||
if msg.TagFilter != "" {
|
||||
re, err := regexp.Compile(msg.TagFilter)
|
||||
if err != nil {
|
||||
a.logger.Printf("[ERR] agent: Failed to parse tag filter '%s' for event '%s': %v",
|
||||
msg.TagFilter, msg.Name, err)
|
||||
return false
|
||||
}
|
||||
tagRe = re
|
||||
}
|
||||
|
||||
// Scan for a match
|
||||
services := a.state.Services()
|
||||
found := false
|
||||
OUTER:
|
||||
for name, info := range services {
|
||||
// Check the service name
|
||||
if !re.MatchString(name) {
|
||||
continue
|
||||
}
|
||||
if tagRe == nil {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
|
||||
// Look for a matching tag
|
||||
for _, tag := range info.Tags {
|
||||
if !tagRe.MatchString(tag) {
|
||||
continue
|
||||
}
|
||||
found = true
|
||||
break OUTER
|
||||
}
|
||||
}
|
||||
|
||||
// No matching services
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Decode is used to decode a MsgPack encoded object
|
||||
func decodeUserEvent(buf []byte, out interface{}) error {
|
||||
return codec.NewDecoder(bytes.NewReader(buf), msgpackHandle).Decode(out)
|
||||
}
|
||||
|
||||
// encodeUserEvent is used to encode user event
|
||||
func encodeUserEvent(msg interface{}) ([]byte, error) {
|
||||
var buf bytes.Buffer
|
||||
err := codec.NewEncoder(&buf, msgpackHandle).Encode(msg)
|
||||
return buf.Bytes(), err
|
||||
}
|
|
@ -1,6 +1,8 @@
|
|||
package agent
|
||||
|
||||
import (
|
||||
crand "crypto/rand"
|
||||
"fmt"
|
||||
"math"
|
||||
"math/rand"
|
||||
"os"
|
||||
|
@ -59,3 +61,18 @@ func ExecScript(script string) (*exec.Cmd, error) {
|
|||
cmd := exec.Command(shell, flag, script)
|
||||
return cmd, nil
|
||||
}
|
||||
|
||||
// generateUUID is used to generate a random UUID
|
||||
func generateUUID() string {
|
||||
buf := make([]byte, 16)
|
||||
if _, err := crand.Read(buf); err != nil {
|
||||
panic(fmt.Errorf("failed to read random bytes: %v", err))
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%08x-%04x-%04x-%04x-%12x",
|
||||
buf[0:4],
|
||||
buf[4:6],
|
||||
buf[6:8],
|
||||
buf[8:10],
|
||||
buf[10:16])
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue