mirror of
https://github.com/status-im/consul.git
synced 2025-01-13 07:14:37 +00:00
consul: always fire events from server nodes
This commit is contained in:
parent
d5e13280a3
commit
6e084f6897
@ -85,16 +85,15 @@ func (a *Agent) UserEvent(dc, token string, params *UserEvent) error {
|
|||||||
return fmt.Errorf("UserEvent encoding failed: %v", err)
|
return fmt.Errorf("UserEvent encoding failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send an RPC to service this
|
// Service the event fire over RPC. This ensures that we authorize
|
||||||
|
// the request against the token first.
|
||||||
args := structs.EventFireRequest{
|
args := structs.EventFireRequest{
|
||||||
Datacenter: dc,
|
Datacenter: dc,
|
||||||
Name: params.Name,
|
Name: params.Name,
|
||||||
Payload: payload,
|
Payload: payload,
|
||||||
|
QueryOptions: structs.QueryOptions{Token: token},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass along the ACL token, if any
|
|
||||||
args.Token = token
|
|
||||||
|
|
||||||
// Any server can process in the remote DC, since the
|
// Any server can process in the remote DC, since the
|
||||||
// gossip will take over anyways
|
// gossip will take over anyways
|
||||||
args.AllowStale = true
|
args.AllowStale = true
|
||||||
|
@ -201,11 +201,6 @@ func (c *Client) RemoveFailedNode(node string) error {
|
|||||||
return c.serf.RemoveFailedNode(node)
|
return c.serf.RemoveFailedNode(node)
|
||||||
}
|
}
|
||||||
|
|
||||||
// UserEvent is used to fire an event via the Serf layer
|
|
||||||
func (c *Client) UserEvent(name string, payload []byte) error {
|
|
||||||
return c.serf.UserEvent(userEventName(name), payload, false)
|
|
||||||
}
|
|
||||||
|
|
||||||
// KeyManagerLAN returns the LAN Serf keyring manager
|
// KeyManagerLAN returns the LAN Serf keyring manager
|
||||||
func (c *Client) KeyManagerLAN() *serf.KeyManager {
|
func (c *Client) KeyManagerLAN() *serf.KeyManager {
|
||||||
return c.serf.KeyManager()
|
return c.serf.KeyManager()
|
||||||
|
@ -276,26 +276,18 @@ func TestClientServer_UserEvent(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
// Fire the user event
|
// Fire the user event
|
||||||
err := c1.UserEvent("foo", []byte("bar"))
|
if err := s1.UserEvent("foo", []byte("baz")); err != nil {
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
err = s1.UserEvent("bar", []byte("baz"))
|
|
||||||
if err != nil {
|
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for all the events
|
// Wait for all the events
|
||||||
var serverFoo, serverBar, clientFoo, clientBar bool
|
var clientReceived, serverReceived bool
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 2; i++ {
|
||||||
select {
|
select {
|
||||||
case e := <-clientOut:
|
case e := <-clientOut:
|
||||||
switch e.Name {
|
switch e.Name {
|
||||||
case "foo":
|
case "foo":
|
||||||
clientFoo = true
|
clientReceived = true
|
||||||
case "bar":
|
|
||||||
clientBar = true
|
|
||||||
default:
|
default:
|
||||||
t.Fatalf("Bad: %#v", e)
|
t.Fatalf("Bad: %#v", e)
|
||||||
}
|
}
|
||||||
@ -303,9 +295,7 @@ func TestClientServer_UserEvent(t *testing.T) {
|
|||||||
case e := <-serverOut:
|
case e := <-serverOut:
|
||||||
switch e.Name {
|
switch e.Name {
|
||||||
case "foo":
|
case "foo":
|
||||||
serverFoo = true
|
serverReceived = true
|
||||||
case "bar":
|
|
||||||
serverBar = true
|
|
||||||
default:
|
default:
|
||||||
t.Fatalf("Bad: %#v", e)
|
t.Fatalf("Bad: %#v", e)
|
||||||
}
|
}
|
||||||
@ -315,7 +305,7 @@ func TestClientServer_UserEvent(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if !(serverFoo && serverBar && clientFoo && clientBar) {
|
if !serverReceived || !clientReceived {
|
||||||
t.Fatalf("missing events")
|
t.Fatalf("missing events")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user