agent: First pass at multi-DC support

This commit is contained in:
Armon Dadgar 2014-08-28 15:00:49 -07:00
parent 337fe4085a
commit 602828472d
6 changed files with 71 additions and 10 deletions

View File

@ -24,6 +24,10 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
return nil, nil return nil, nil
} }
// Get the datacenter
var dc string
s.parseDC(req, &dc)
event := &UserEvent{} event := &UserEvent{}
event.Name = strings.TrimPrefix(req.URL.Path, "/v1/event/fire/") event.Name = strings.TrimPrefix(req.URL.Path, "/v1/event/fire/")
if event.Name == "" { if event.Name == "" {
@ -53,7 +57,7 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int
} }
// Try to fire the event // Try to fire the event
if err := s.agent.UserEvent(event); err != nil { if err := s.agent.UserEvent(dc, event); err != nil {
return nil, err return nil, err
} }

View File

@ -54,7 +54,7 @@ func TestEventFire(t *testing.T) {
func TestEventList(t *testing.T) { func TestEventList(t *testing.T) {
httpTest(t, func(srv *HTTPServer) { httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"} p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent(p); err != nil { if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -90,7 +90,7 @@ func TestEventList(t *testing.T) {
func TestEventList_Blocking(t *testing.T) { func TestEventList_Blocking(t *testing.T) {
httpTest(t, func(srv *HTTPServer) { httpTest(t, func(srv *HTTPServer) {
p := &UserEvent{Name: "test"} p := &UserEvent{Name: "test"}
if err := srv.agent.UserEvent(p); err != nil { if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
@ -118,7 +118,7 @@ func TestEventList_Blocking(t *testing.T) {
go func() { go func() {
time.Sleep(50 * time.Millisecond) time.Sleep(50 * time.Millisecond)
p := &UserEvent{Name: "second"} p := &UserEvent{Name: "second"}
if err := srv.agent.UserEvent(p); err != nil { if err := srv.agent.UserEvent("", p); err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
}() }()

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"regexp" "regexp"
"github.com/hashicorp/consul/consul/structs"
"github.com/ugorji/go/codec" "github.com/ugorji/go/codec"
) )
@ -69,7 +70,7 @@ func validateUserEventParams(params *UserEvent) error {
} }
// UserEvent is used to fire an event via the Serf layer on the LAN // UserEvent is used to fire an event via the Serf layer on the LAN
func (a *Agent) UserEvent(params *UserEvent) error { func (a *Agent) UserEvent(dc string, params *UserEvent) error {
// Validate the params // Validate the params
if err := validateUserEventParams(params); err != nil { if err := validateUserEventParams(params); err != nil {
return err return err
@ -82,11 +83,28 @@ func (a *Agent) UserEvent(params *UserEvent) error {
if err != nil { if err != nil {
return fmt.Errorf("UserEvent encoding failed: %v", err) return fmt.Errorf("UserEvent encoding failed: %v", err)
} }
// Check if this is the local DC, fire locally
if dc == "" || dc == a.config.Datacenter {
if a.server != nil { if a.server != nil {
return a.server.UserEvent(params.Name, payload) return a.server.UserEvent(params.Name, payload)
} else { } else {
return a.client.UserEvent(params.Name, payload) return a.client.UserEvent(params.Name, payload)
} }
} else {
// Send an RPC to remote datacenter to service this
args := structs.EventFireRequest{
Datacenter: dc,
Name: params.Name,
Payload: payload,
}
// Any server can process in the remote DC, since the
// gossip will take over anyways
args.AllowStale = true
var out structs.EventFireResponse
return a.RPC("Internal.EventFire", &args, &out)
}
} }
// handleEvents is used to process incoming user events // handleEvents is used to process incoming user events

View File

@ -162,13 +162,13 @@ func TestFireReceiveEvent(t *testing.T) {
agent.state.AddService(srv1) agent.state.AddService(srv1)
p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"}
err := agent.UserEvent(p1) err := agent.UserEvent("", p1)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }
p2 := &UserEvent{Name: "deploy"} p2 := &UserEvent{Name: "deploy"}
err = agent.UserEvent(p2) err = agent.UserEvent("", p2)
if err != nil { if err != nil {
t.Fatalf("err: %v", err) t.Fatalf("err: %v", err)
} }

View File

@ -46,3 +46,19 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest,
return nil return nil
}) })
} }
// EventFire is a bit of an odd endpoint, but it allows for a cross-DC RPC
// call to fire an event. The primary use case is to enable user events being
// triggered in a remote DC.
func (m *Internal) EventFire(args *structs.EventFireRequest,
reply *structs.EventFireResponse) error {
if done, err := m.srv.forward("Internal.EventFire", args, args, reply); done {
return err
}
// Set the query meta data
m.srv.setQueryMeta(&reply.QueryMeta)
// Fire the event
return m.srv.UserEvent(args.Name, args.Payload)
}

View File

@ -493,6 +493,29 @@ type ACLPolicy struct {
QueryMeta QueryMeta
} }
// EventFireRequest is used to ask a server to fire
// a Serf event. It is a bit odd, since it doesn't depend on
// the catalog or leader. Any node can respond, so it's not quite
// like a standard write request. This is used only internally.
type EventFireRequest struct {
Datacenter string
Name string
Payload []byte
// Not using WriteRequest so that any server can process
// the request. It is a bit unusual...
QueryOptions
}
func (r *EventFireRequest) RequestDatacenter() string {
return r.Datacenter
}
// EventFireResponse is used to respond to a fire request.
type EventFireResponse struct {
QueryMeta
}
// msgpackHandle is a shared handle for encoding/decoding of structs // msgpackHandle is a shared handle for encoding/decoding of structs
var msgpackHandle = &codec.MsgpackHandle{} var msgpackHandle = &codec.MsgpackHandle{}