Adding tests and stuff

This commit is contained in:
Derek Chiang 2015-04-09 16:23:14 -04:00 committed by James Phillips
parent 13f6cd376f
commit a9ea503c69
6 changed files with 156 additions and 2 deletions

View File

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/coordinate"
"github.com/hashicorp/serf/serf"
)
@ -556,6 +557,23 @@ func (a *Agent) ResumeSync() {
a.state.Resume()
}
// StartSendingCoordinate starts a goroutine that periodically sends the local coordinate
// to a server
func (a *Agent) StartSendingCoordinate() {
go func() {
var c coordinate.Coordinate
if a.config.Server {
c = a.server
}
req := structs.CoordinateUpdateRequest{
Datacenter: a.config.Datacenter,
Node: a.config.NodeName,
QueryOptions: structs.QueryOptions{Token: a.config.ACLToken},
}
}()
}
// persistService saves a service definition to a JSON file in the data dir
func (a *Agent) persistService(service *structs.NodeService) error {
svcPath := filepath.Join(a.config.DataDir, servicesDir, stringHash(service.ID))

View File

@ -1,5 +1,46 @@
package consul
import (
"github.com/hashicorp/consul/consul/structs"
)
type Coordinate struct {
srv *Server
}
// Get returns the the coordinate or a node.
//
// If the node is in the same datacenter, then the LAN coordinate of the node is
// returned. If the node is in a remote DC, then the WAN coordinate of the node
// is returned.
func (c *Coordinate) Get(args *structs.CoordinateGetRequest, reply *structs.Coordinate) error {
if done, err := c.srv.forward("Coordinate.Get", args, args, reply); done {
return err
}
if args.OriginDC == c.srv.config.Datacenter {
state := c.srv.fsm.State()
_, coord, err := state.CoordinateGet(args.Node)
if err != nil {
return err
}
*reply = *coord
} else {
reply.Node = args.Node
reply.Coord = c.srv.serfWAN.GetCoordinate()
}
return nil
}
func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) error {
if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done {
return err
}
_, err := c.srv.raftApply(structs.CoordinateRequestType, args)
if err != nil {
c.srv.logger.Printf("[ERR] consul.coordinate: Update failed: %v", err)
return err
}
return nil
}

View File

@ -0,0 +1,64 @@
package consul
import (
"math/rand"
"os"
"testing"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/serf/coordinate"
)
func getRandomCoordinate() *coordinate.Coordinate {
config := coordinate.DefaultConfig()
coord := coordinate.NewCoordinate(config)
for i := 0; i < len(coord.Vec); i++ {
coord.Vec[i] = rand.Float64()
}
return coord
}
func coordinatesEqual(a, b *coordinate.Coordinate) bool {
config := coordinate.DefaultConfig()
client := coordinate.NewClient(config)
dist, err := client.DistanceBetween(a, b)
if err != nil {
panic(err)
}
return dist < 0.00001
}
func TestCoordinate_Update(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
client := rpcClient(t, s1)
defer client.Close()
testutil.WaitForLeader(t, client.Call, "dc1")
arg := structs.CoordinateUpdateRequest{
NodeSpecificRequest: structs.NodeSpecificRequest{
Datacenter: "dc1",
Node: "node1",
},
Op: structs.CoordinateSet,
Coord: getRandomCoordinate(),
}
var out struct{}
if err := client.Call("Coordinate.Update", &arg, &out); err != nil {
t.Fatalf("err: %v", err)
}
// Verify
state := s1.fsm.State()
_, d, err := state.CoordinateGet("node1")
if err != nil {
t.Fatalf("err: %v", err)
}
if coordinatesEqual(d.Coord, arg.Coord) {
t.Fatalf("should be equal\n%v\n%v", d.Coord, arg.Coord)
}
}

View File

@ -89,6 +89,8 @@ func (c *consulFSM) Apply(log *raft.Log) interface{} {
return c.applyACLOperation(buf[1:], log.Index)
case structs.TombstoneRequestType:
return c.applyTombstoneOperation(buf[1:], log.Index)
case structs.CoordinateRequestType:
return c.applyCoordinateOperation(buf[1:], log.Index)
default:
if ignoreUnknown {
c.logger.Printf("[WARN] consul.fsm: ignoring unknown message type (%d), upgrade to newer version", msgType)
@ -246,6 +248,22 @@ func (c *consulFSM) applyTombstoneOperation(buf []byte, index uint64) interface{
}
}
func (c *consulFSM) applyCoordinateOperation(buf []byte, index uint64) interface{} {
var req structs.CoordinateUpdateRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}
defer metrics.MeasureSince([]string{"consul", "fsm", "coordinate", string(req.Op)}, time.Now())
switch req.Op {
case structs.CoordinateSet:
coord := &structs.Coordinate{req.Node, req.Coord}
return c.state.CoordinateUpdate(index, coord)
default:
c.logger.Printf("[WARN] consul.fsm: Invalid Coordinate operation '%s'", req.Op)
return fmt.Errorf("Invalid Coordinate operation '%s'", req.Op)
}
}
func (c *consulFSM) Snapshot() (raft.FSMSnapshot, error) {
defer func(start time.Time) {
c.logger.Printf("[INFO] consul.fsm: snapshot created in %v", time.Now().Sub(start))

View File

@ -407,6 +407,7 @@ func (s *Server) setupRPC(tlsWrap tlsutil.DCWrapper) error {
s.rpcServer.Register(s.endpoints.Session)
s.rpcServer.Register(s.endpoints.Internal)
s.rpcServer.Register(s.endpoints.ACL)
s.rpcServer.Register(s.endpoints.Coordinate)
list, err := net.ListenTCP("tcp", s.config.RPCAddr)
if err != nil {

View File

@ -32,6 +32,7 @@ const (
SessionRequestType
ACLRequestType
TombstoneRequestType
CoordinateRequestType
)
const (
@ -625,13 +626,24 @@ type Coordinate struct {
Coord *coordinate.Coordinate
}
// CoordinateGetRequest is used to request the network coordinate of a given node
type CoordinateGetRequest struct {
Nodes []string
NodeSpecificRequest
OriginDC string
}
type CoordinateOp string
const (
CoordinateSet CoordinateOp = "set"
)
// CoordinateUpdateRequest is used to update the network coordinate of a given node
type CoordinateUpdateRequest struct {
Node string
NodeSpecificRequest
Op CoordinateOp
Coord *coordinate.Coordinate
WriteRequest
}
// EventFireRequest is used to ask a server to fire