diff --git a/command/agent/event_endpoint.go b/command/agent/event_endpoint.go index b93af5ce90..9ad99d983c 100644 --- a/command/agent/event_endpoint.go +++ b/command/agent/event_endpoint.go @@ -36,6 +36,10 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int return nil, nil } + // Get the ACL token + var token string + s.parseToken(req, &token) + // Get the filters if filt := req.URL.Query().Get("node"); filt != "" { event.NodeFilter = filt @@ -57,7 +61,7 @@ func (s *HTTPServer) EventFire(resp http.ResponseWriter, req *http.Request) (int } // Try to fire the event - if err := s.agent.UserEvent(dc, event); err != nil { + if err := s.agent.UserEvent(dc, token, event); err != nil { return nil, err } diff --git a/command/agent/event_endpoint_test.go b/command/agent/event_endpoint_test.go index 81b3846955..e3f6760bf6 100644 --- a/command/agent/event_endpoint_test.go +++ b/command/agent/event_endpoint_test.go @@ -13,6 +13,8 @@ import ( func TestEventFire(t *testing.T) { httpTest(t, func(srv *HTTPServer) { + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + body := bytes.NewBuffer([]byte("test")) url := "/v1/event/fire/test?node=Node&service=foo&tag=bar" req, err := http.NewRequest("PUT", url, body) @@ -53,8 +55,10 @@ func TestEventFire(t *testing.T) { func TestEventList(t *testing.T) { httpTest(t, func(srv *HTTPServer) { + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + p := &UserEvent{Name: "test"} - if err := srv.agent.UserEvent("", p); err != nil { + if err := srv.agent.UserEvent("dc1", "root", p); err != nil { t.Fatalf("err: %v", err) } @@ -89,13 +93,15 @@ func TestEventList(t *testing.T) { func TestEventList_Filter(t *testing.T) { httpTest(t, func(srv *HTTPServer) { + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + p := &UserEvent{Name: "test"} - if err := srv.agent.UserEvent("", p); err != nil { + if err := srv.agent.UserEvent("dc1", "root", p); err != nil { t.Fatalf("err: %v", err) } p = &UserEvent{Name: "foo"} - if err := srv.agent.UserEvent("", p); err != nil { + if err := srv.agent.UserEvent("dc1", "root", p); err != nil { t.Fatalf("err: %v", err) } @@ -130,8 +136,10 @@ func TestEventList_Filter(t *testing.T) { func TestEventList_Blocking(t *testing.T) { httpTest(t, func(srv *HTTPServer) { + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + p := &UserEvent{Name: "test"} - if err := srv.agent.UserEvent("", p); err != nil { + if err := srv.agent.UserEvent("dc1", "root", p); err != nil { t.Fatalf("err: %v", err) } @@ -159,7 +167,7 @@ func TestEventList_Blocking(t *testing.T) { go func() { time.Sleep(50 * time.Millisecond) p := &UserEvent{Name: "second"} - if err := srv.agent.UserEvent("", p); err != nil { + if err := srv.agent.UserEvent("dc1", "root", p); err != nil { t.Fatalf("err: %v", err) } }() @@ -192,6 +200,8 @@ func TestEventList_Blocking(t *testing.T) { func TestEventList_EventBufOrder(t *testing.T) { httpTest(t, func(srv *HTTPServer) { + testutil.WaitForLeader(t, srv.agent.RPC, "dc1") + // Fire some events in a non-sequential order expected := &UserEvent{Name: "foo"} @@ -202,7 +212,7 @@ func TestEventList_EventBufOrder(t *testing.T) { expected, &UserEvent{Name: "bar"}, } { - if err := srv.agent.UserEvent("", e); err != nil { + if err := srv.agent.UserEvent("dc1", "root", e); err != nil { t.Fatalf("err: %v", err) } } diff --git a/command/agent/user_event.go b/command/agent/user_event.go index 64891981f7..d019a310a6 100644 --- a/command/agent/user_event.go +++ b/command/agent/user_event.go @@ -71,7 +71,7 @@ func validateUserEventParams(params *UserEvent) error { } // UserEvent is used to fire an event via the Serf layer on the LAN -func (a *Agent) UserEvent(dc string, params *UserEvent) error { +func (a *Agent) UserEvent(dc, token string, params *UserEvent) error { // Validate the params if err := validateUserEventParams(params); err != nil { return err @@ -85,27 +85,21 @@ func (a *Agent) UserEvent(dc string, params *UserEvent) error { return fmt.Errorf("UserEvent encoding failed: %v", err) } - // Check if this is the local DC, fire locally - if dc == "" || dc == a.config.Datacenter { - if a.server != nil { - return a.server.UserEvent(params.Name, payload) - } else { - return a.client.UserEvent(params.Name, payload) - } - } else { - // Send an RPC to remote datacenter to service this - args := structs.EventFireRequest{ - Datacenter: dc, - Name: params.Name, - Payload: payload, - } - - // Any server can process in the remote DC, since the - // gossip will take over anyways - args.AllowStale = true - var out structs.EventFireResponse - return a.RPC("Internal.EventFire", &args, &out) + // Send an RPC to service this + args := structs.EventFireRequest{ + Datacenter: dc, + Name: params.Name, + Payload: payload, } + + // Pass along the ACL token, if any + args.Token = token + + // Any server can process in the remote DC, since the + // gossip will take over anyways + args.AllowStale = true + var out structs.EventFireResponse + return a.RPC("Internal.EventFire", &args, &out) } // handleEvents is used to process incoming user events diff --git a/command/agent/user_event_test.go b/command/agent/user_event_test.go index 336bf04943..6a4c9919c9 100644 --- a/command/agent/user_event_test.go +++ b/command/agent/user_event_test.go @@ -153,6 +153,8 @@ func TestFireReceiveEvent(t *testing.T) { defer os.RemoveAll(dir) defer agent.Shutdown() + testutil.WaitForLeader(t, agent.RPC, "dc1") + srv1 := &structs.NodeService{ ID: "mysql", Service: "mysql", @@ -162,13 +164,13 @@ func TestFireReceiveEvent(t *testing.T) { agent.state.AddService(srv1, "") p1 := &UserEvent{Name: "deploy", ServiceFilter: "web"} - err := agent.UserEvent("", p1) + err := agent.UserEvent("dc1", "root", p1) if err != nil { t.Fatalf("err: %v", err) } p2 := &UserEvent{Name: "deploy"} - err = agent.UserEvent("", p2) + err = agent.UserEvent("dc1", "root", p2) if err != nil { t.Fatalf("err: %v", err) } @@ -186,3 +188,66 @@ func TestFireReceiveEvent(t *testing.T) { t.Fatalf("bad: %#v", last) } } + +func TestUserEventToken(t *testing.T) { + conf := nextConfig() + + // Set the default policies to deny + conf.ACLDefaultPolicy = "deny" + + dir, agent := makeAgent(t, conf) + defer os.RemoveAll(dir) + defer agent.Shutdown() + + testutil.WaitForLeader(t, agent.RPC, "dc1") + + // Create an ACL token + args := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: testEventPolicy, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var token string + if err := agent.RPC("ACL.Apply", &args, &token); err != nil { + t.Fatalf("err: %v", err) + } + + type tcase struct { + name string + expect bool + } + cases := []tcase{ + {"foo", false}, + {"bar", false}, + {"baz", true}, + {"zip", false}, + } + for _, c := range cases { + event := &UserEvent{Name: c.name} + err := agent.UserEvent("dc1", token, event) + allowed := false + if err == nil || err.Error() != permissionDenied { + allowed = true + } + if allowed != c.expect { + t.Fatalf("bad: %#v result: %v", c, allowed) + } + } +} + +const testEventPolicy = ` +event "foo" { + policy = "deny" +} +event "bar" { + policy = "read" +} +event "baz" { + policy = "write" +} +`