mirror of https://github.com/status-im/consul.git
agent: move watch plans into agent
This commit is contained in:
parent
3835788d6a
commit
902f4caff2
|
@ -311,6 +311,11 @@ func (a *Agent) Start() error {
|
|||
a.httpServers = append(a.httpServers, srv)
|
||||
}
|
||||
|
||||
// register watches
|
||||
if err := a.registerWatches(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// start retry join
|
||||
go a.retryJoin()
|
||||
go a.retryJoinWan()
|
||||
|
@ -486,6 +491,34 @@ func (a *Agent) serveHTTP(l net.Listener, srv *HTTPServer) error {
|
|||
}
|
||||
}
|
||||
|
||||
func (a *Agent) registerWatches() error {
|
||||
if len(a.config.WatchPlans) == 0 {
|
||||
return nil
|
||||
}
|
||||
addrs, err := a.config.HTTPAddrs()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(addrs) == 0 {
|
||||
return fmt.Errorf("watch plans require an HTTP or HTTPS endpoint")
|
||||
}
|
||||
|
||||
for _, wp := range a.config.WatchPlans {
|
||||
go func(wp *watch.Plan) {
|
||||
wp.Handler = makeWatchHandler(a.LogOutput, wp.Exempt["handler"])
|
||||
wp.LogOutput = a.LogOutput
|
||||
addr := addrs[0].String()
|
||||
if addrs[0].Net == "unix" {
|
||||
addr = "unix://" + addr
|
||||
}
|
||||
if err := wp.Run(addr); err != nil {
|
||||
a.logger.Println("[ERR] Failed to run watch: %v", err)
|
||||
}
|
||||
}(wp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// consulConfig is used to return a consul configuration
|
||||
func (a *Agent) consulConfig() (*consul.Config, error) {
|
||||
// Start with the provided config or default config
|
||||
|
|
|
@ -5,7 +5,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
|
@ -390,8 +389,9 @@ func (cmd *Command) readConfig() *Config {
|
|||
}
|
||||
|
||||
// Get the handler
|
||||
if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil {
|
||||
cmd.UI.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err))
|
||||
h := wp.Exempt["handler"]
|
||||
if _, ok := h.(string); h == nil || !ok {
|
||||
cmd.UI.Error("Watch handler must be a string")
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -632,39 +632,6 @@ func startupTelemetry(config *Config) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) registerWatches(config *Config) error {
|
||||
var err error
|
||||
|
||||
var httpAddr net.Addr
|
||||
if config.Ports.HTTP != -1 {
|
||||
httpAddr, err = config.ClientListener(config.Addresses.HTTP, config.Ports.HTTP)
|
||||
} else if config.Ports.HTTPS != -1 {
|
||||
httpAddr, err = config.ClientListener(config.Addresses.HTTPS, config.Ports.HTTPS)
|
||||
} else if len(config.WatchPlans) > 0 {
|
||||
return fmt.Errorf("Error: cannot use watches if both HTTP and HTTPS are disabled")
|
||||
}
|
||||
if err != nil {
|
||||
cmd.UI.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err))
|
||||
}
|
||||
|
||||
// Register the watches
|
||||
for _, wp := range config.WatchPlans {
|
||||
go func(wp *watch.Plan) {
|
||||
wp.Handler = makeWatchHandler(cmd.logOutput, wp.Exempt["handler"])
|
||||
wp.LogOutput = cmd.logOutput
|
||||
addr := httpAddr.String()
|
||||
// If it's a unix socket, prefix with unix:// so the client initializes correctly
|
||||
if httpAddr.Network() == "unix" {
|
||||
addr = "unix://" + addr
|
||||
}
|
||||
if err := wp.Run(addr); err != nil {
|
||||
cmd.UI.Error(fmt.Sprintf("Error running watch: %v", err))
|
||||
}
|
||||
}(wp)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (cmd *Command) Run(args []string) int {
|
||||
code := cmd.run(args)
|
||||
if cmd.logger != nil {
|
||||
|
@ -737,11 +704,6 @@ func (cmd *Command) run(args []string) int {
|
|||
return 1
|
||||
}
|
||||
|
||||
if err := cmd.registerWatches(config); err != nil {
|
||||
cmd.UI.Error(err.Error())
|
||||
return 1
|
||||
}
|
||||
|
||||
// Let the agent know we've finished registration
|
||||
agent.StartSync()
|
||||
|
||||
|
|
|
@ -20,18 +20,6 @@ const (
|
|||
WatchBufSize = 4 * 1024 // 4KB
|
||||
)
|
||||
|
||||
// verifyWatchHandler does the pre-check for our handler configuration
|
||||
func verifyWatchHandler(params interface{}) error {
|
||||
if params == nil {
|
||||
return fmt.Errorf("Must provide watch handler")
|
||||
}
|
||||
_, ok := params.(string)
|
||||
if !ok {
|
||||
return fmt.Errorf("Watch handler must be a string")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// makeWatchHandler returns a handler for the given watch
|
||||
func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc {
|
||||
script := params.(string)
|
||||
|
|
|
@ -6,22 +6,6 @@ import (
|
|||
"testing"
|
||||
)
|
||||
|
||||
func TestVerifyWatchHandler(t *testing.T) {
|
||||
t.Parallel()
|
||||
if err := verifyWatchHandler(nil); err == nil {
|
||||
t.Fatalf("should err")
|
||||
}
|
||||
if err := verifyWatchHandler(123); err == nil {
|
||||
t.Fatalf("should err")
|
||||
}
|
||||
if err := verifyWatchHandler([]string{"foo"}); err == nil {
|
||||
t.Fatalf("should err")
|
||||
}
|
||||
if err := verifyWatchHandler("foo"); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestMakeWatchHandler(t *testing.T) {
|
||||
t.Parallel()
|
||||
defer os.Remove("handler_out")
|
||||
|
|
Loading…
Reference in New Issue