Address comments

This commit is contained in:
Derek Chiang 2015-04-28 21:47:41 -04:00 committed by James Phillips
parent ab9262c656
commit f144d17b1c
5 changed files with 45 additions and 20 deletions

View File

@ -38,9 +38,6 @@ const (
"but no reason was provided. This is a default message."
defaultServiceMaintReason = "Maintenance mode is enabled for this " +
"service, but no reason was provided. This is a default message."
// An interval used to send network coordinates to servers
syncCoordinateStaggerIntv = 15 * time.Second
)
var (
@ -201,6 +198,9 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
return nil, err
}
// Start sending network coordinates to servers
go agent.sendCoordinates()
return agent, nil
}
@ -560,15 +560,14 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}
// SendCoordinates starts a loop that periodically sends the local coordinate
// sendCoordinates starts a loop that periodically sends the local coordinate
// to a server
func (a *Agent) SendCoordinates() {
func (a *Agent) sendCoordinates() {
for {
intv := aeScale(a.config.SyncCoordinateInterval, len(a.LANMembers()))
intv = intv + randomStagger(intv)
timer := time.After(intv)
select {
case <-timer:
case <-time.After(intv):
var c *coordinate.Coordinate
if a.config.Server {
c = a.server.GetLANCoordinate()
@ -585,7 +584,7 @@ func (a *Agent) SendCoordinates() {
var reply struct{}
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
a.logger.Printf("[ERR] coordinate update error: %s", err.Error())
a.logger.Printf("[ERR] agent: coordinate update error: %s", err.Error())
}
case <-a.shutdownCh:
return

View File

@ -709,9 +709,6 @@ func (c *Command) Run(args []string) int {
errWanCh := make(chan struct{})
go c.retryJoinWan(config, errWanCh)
// Start sending network coordinates to servers
go c.agent.SendCoordinates()
// Wait for exit
return c.handleSignals(config, errCh, errWanCh)
}

View File

@ -822,8 +822,6 @@ func TestAgentSendCoordinates(t *testing.T) {
testutil.WaitForLeader(t, agent1.RPC, "dc1")
go agent1.SendCoordinates()
go agent2.SendCoordinates()
time.Sleep(100 * time.Millisecond)
var reply structs.IndexedCoordinate
@ -837,4 +835,16 @@ func TestAgentSendCoordinates(t *testing.T) {
if reply.Coord == nil {
t.Fatalf("should get a coordinate")
}
var reply2 structs.IndexedCoordinate
req2 := structs.CoordinateGetRequest{
Datacenter: agent2.config.Datacenter,
Node: agent2.config.NodeName,
}
if err := agent1.RPC("Coordinate.Get", &req2, &reply2); err != nil {
t.Fatalf("err: %v", err)
}
if reply2.Coord == nil {
t.Fatalf("should get a coordinate")
}
}

View File

@ -8,11 +8,7 @@ type Coordinate struct {
srv *Server
}
// Get returns the the coordinate or a node.
//
// If the node is in the same datacenter, then the LAN coordinate of the node is
// returned. If the node is in a remote DC, then the WAN coordinate of the node
// is returned.
// Get returns the the LAN coordinate of a node.
func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.IndexedCoordinate) error {
if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done {
return err
@ -30,6 +26,7 @@ func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.Inde
})
}
// Update updates the the LAN coordinate of a node.
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
return err

View File

@ -18,7 +18,7 @@ func getRandomCoordinate() *coordinate.Coordinate {
n := 5
clients := make([]*coordinate.Client, n)
for i := 0; i < n; i++ {
clients[i] = coordinate.NewClient(config)
clients[i], _ = coordinate.NewClient(config)
}
for i := 0; i < n*100; i++ {
@ -41,7 +41,7 @@ func coordinatesEqual(a, b *coordinate.Coordinate) bool {
return dist < 0.1
}
func TestCoordinate(t *testing.T) {
func TestCoordinateUpdate(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@ -71,6 +71,28 @@ func TestCoordinate(t *testing.T) {
if !coordinatesEqual(d.Coord, arg.Coord) {
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg.Coord)
}
}
func TestCoordinateGet(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
arg := structs.CoordinateUpdateRequest{
Datacenter: "dc1",
Node: "node1",
Op: structs.CoordinateSet,
Coord: getRandomCoordinate(),
}
var out struct{}
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Get via RPC
var out2 *structs.IndexedCoordinate