diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index d0ba9ed68c..b0078b5b07 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -24,6 +24,10 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int return nil, nil } + // Get the datacenter + var dc string + s.parseDC(req, &dc) + event := &UserEvent{} event.Name = strings.TrimPrefix(req.URL.Path, "/v1/event/fire/") if event.Name == "" { @@ -53,7 +57,7 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int } // Try to fire the event - if err := s.agent.UserEvent(event); err != nil { + if err := s.agent.UserEvent(dc, event); err != nil { return nil, err } diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index 6c64a44159..acdb1464a1 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -54,7 +54,7 @@ func TestEventFire(t *testing.T) { func TestEventList(t *testing.T) { httpTest(t, func(srv *HTTPServer) { p := &UserEvent{Name: "test"} - if err := srv.agent.UserEvent(p); err != nil { + if err := srv.agent.UserEvent("", p); err != nil { t.Fatalf("err: %v", err) } @@ -90,7 +90,7 @@ func TestEventList(t *testing.T) { func TestEventList_Blocking(t *testing.T) { httpTest(t, func(srv *HTTPServer) { p := &UserEvent{Name: "test"} - if err := srv.agent.UserEvent(p); err != nil { + if err := srv.agent.UserEvent("", p); err != nil { t.Fatalf("err: %v", err) } @@ -118,7 +118,7 @@ func TestEventList_Blocking(t *testing.T) { go func() { time.Sleep(50 * time.Millisecond) p := &UserEvent{Name: "second"} - if err := srv.agent.UserEvent(p); err != nil { + if err := srv.agent.UserEvent("", p); err != nil { t.Fatalf("err: %v", err) } }() diff --git a/command/agent/user_event.go b/command/agent/user_event.go index 6004a1bda9..be1037d51d 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -5,6 +5,7 @@ import ( "fmt" "regexp" + "github.com/hashicorp/consul/consul/structs" "github.com/ugorji/go/codec" ) @@ -69,7 +70,7 @@ func validateUserEventParams(params *UserEvent) error { } // UserEvent is used to fire an event via the Serf layer on the LAN -func (a *Agent) UserEvent(params *UserEvent) error { +func (a *Agent) UserEvent(dc string, params *UserEvent) error { // Validate the params if err := validateUserEventParams(params); err != nil { return err @@ -82,10 +83,27 @@ func (a *Agent) UserEvent(params *UserEvent) error { if err != nil { return fmt.Errorf("UserEvent encoding failed: %v", err) } - if a.server != nil { - return a.server.UserEvent(params.Name, payload) + + // Check if this is the local DC, fire locally + if dc == "" || dc == a.config.Datacenter { + if a.server != nil { + return a.server.UserEvent(params.Name, payload) + } else { + return a.client.UserEvent(params.Name, payload) + } } else { - return a.client.UserEvent(params.Name, payload) + // Send an RPC to remote datacenter to service this + args := structs.EventFireRequest{ + Datacenter: dc, + Name: params.Name, + Payload: payload, + } + + // Any server can process in the remote DC, since the + // gossip will take over anyways + args.AllowStale = true + var out structs.EventFireResponse + return a.RPC("Internal.EventFire", &args, &out) } } diff --git a/command/agent/user_event_test.go b/command/agent/user_event_test.go index 76a3e4bf62..3c2fcbe34e 100644 --- a/command/agent/user_event_test.go +++ b/command/agent/user_event_test.go @@ -162,13 +162,13 @@ func TestFireReceiveEvent(t *testing.T) { agent.state.AddService(srv1) p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} - err := agent.UserEvent(p1) + err := agent.UserEvent("", p1) if err != nil { t.Fatalf("err: %v", err) } p2 := &UserEvent{Name: "deploy"} - err = agent.UserEvent(p2) + err = agent.UserEvent("", p2) if err != nil { t.Fatalf("err: %v", err) } diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index 53252e78ee..5a38b31a22 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -46,3 +46,19 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, return nil }) } + +// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC +// call to fire an event. The primary use case is to enable user events being +// triggered in a remote DC. +func (m *Internal) EventFire(args *structs.EventFireRequest, + reply *structs.EventFireResponse) error { + if done, err := m.srv.forward("Internal.EventFire", args, args, reply); done { + return err + } + + // Set the query meta data + m.srv.setQueryMeta(&reply.QueryMeta) + + // Fire the event + return m.srv.UserEvent(args.Name, args.Payload) +} diff --git a/consul/structs/structs.go b/consul/structs/structs.go index 95f273f4fc..910a1d3e05 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -493,6 +493,29 @@ type ACLPolicy struct { QueryMeta } +// EventFireRequest is used to ask a server to fire +// a Serf event. It is a bit odd, since it doesn't depend on +// the catalog or leader. Any node can respond, so it's not quite +// like a standard write request. This is used only internally. +type EventFireRequest struct { + Datacenter string + Name string + Payload []byte + + // Not using WriteRequest so that any server can process + // the request. It is a bit unusual... + QueryOptions +} + +func (r *EventFireRequest) RequestDatacenter() string { + return r.Datacenter +} + +// EventFireResponse is used to respond to a fire request. +type EventFireResponse struct { + QueryMeta +} + // msgpackHandle is a shared handle for encoding/decoding of structs var msgpackHandle = &codec.MsgpackHandle{}