Merge pull request #2832 from hashicorp/node-id-integrity

Adds node ID integrity checking to the catalog and the LAN and WAN clusters.
This commit is contained in:
James Phillips 2017-03-27 09:01:36 -07:00 committed by GitHub
commit 7360928203
12 changed files with 271 additions and 94 deletions

View File

@ -48,12 +48,18 @@ func nextConfig() *Config {
idx := int(atomic.AddUint64(&offset, numPortsPerIndex)) idx := int(atomic.AddUint64(&offset, numPortsPerIndex))
conf := DefaultConfig() conf := DefaultConfig()
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
conf.Version = version.Version conf.Version = version.Version
conf.VersionPrerelease = "c.d" conf.VersionPrerelease = "c.d"
conf.AdvertiseAddr = "127.0.0.1" conf.AdvertiseAddr = "127.0.0.1"
conf.Bootstrap = true conf.Bootstrap = true
conf.Datacenter = "dc1" conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx) conf.NodeName = fmt.Sprintf("Node %d", idx)
conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1" conf.BindAddr = "127.0.0.1"
conf.Ports.DNS = basePortNumber + idx + portOffsetDNS conf.Ports.DNS = basePortNumber + idx + portOffsetDNS
conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP conf.Ports.HTTP = basePortNumber + idx + portOffsetHTTP
@ -314,6 +320,7 @@ func TestAgent_ReconnectConfigSettings(t *testing.T) {
func TestAgent_NodeID(t *testing.T) { func TestAgent_NodeID(t *testing.T) {
c := nextConfig() c := nextConfig()
c.NodeID = ""
dir, agent := makeAgent(t, c) dir, agent := makeAgent(t, c)
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer agent.Shutdown() defer agent.Shutdown()

View File

@ -15,7 +15,9 @@ import (
"github.com/hashicorp/consul/command/agent" "github.com/hashicorp/consul/command/agent"
"github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/consul/version" "github.com/hashicorp/consul/version"
"github.com/hashicorp/go-uuid"
"github.com/mitchellh/cli" "github.com/mitchellh/cli"
) )
@ -112,9 +114,15 @@ func nextConfig() *agent.Config {
idx := int(atomic.AddUint64(&offset, 1)) idx := int(atomic.AddUint64(&offset, 1))
conf := agent.DefaultConfig() conf := agent.DefaultConfig()
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
conf.Bootstrap = true conf.Bootstrap = true
conf.Datacenter = "dc1" conf.Datacenter = "dc1"
conf.NodeName = fmt.Sprintf("Node %d", idx) conf.NodeName = fmt.Sprintf("Node %d", idx)
conf.NodeID = types.NodeID(nodeID)
conf.BindAddr = "127.0.0.1" conf.BindAddr = "127.0.0.1"
conf.Server = true conf.Server = true

View File

@ -90,11 +90,13 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error
} }
} }
_, err = c.srv.raftApply(structs.RegisterRequestType, args) resp, err := c.srv.raftApply(structs.RegisterRequestType, args)
if err != nil { if err != nil {
return err return err
} }
if respErr, ok := resp.(error); ok {
return respErr
}
return nil return nil
} }

View File

@ -32,6 +32,7 @@ func TestCatalog_Register(t *testing.T) {
Port: 8000, Port: 8000,
}, },
Check: &structs.HealthCheck{ Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db", ServiceID: "db",
}, },
} }
@ -61,6 +62,7 @@ func TestCatalog_Register_NodeID(t *testing.T) {
Port: 8000, Port: 8000,
}, },
Check: &structs.HealthCheck{ Check: &structs.HealthCheck{
CheckID: types.CheckID("db-check"),
ServiceID: "db", ServiceID: "db",
}, },
} }

View File

@ -156,7 +156,11 @@ func (c *Client) setupSerf(conf *serf.Config, ch chan serf.Event, path string) (
conf.SnapshotPath = filepath.Join(c.config.DataDir, path) conf.SnapshotPath = filepath.Join(c.config.DataDir, path)
conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion] conf.ProtocolVersion = protocolVersionMap[c.config.ProtocolVersion]
conf.RejoinAfterLeave = c.config.RejoinAfterLeave conf.RejoinAfterLeave = c.config.RejoinAfterLeave
conf.Merge = &lanMergeDelegate{dc: c.config.Datacenter} conf.Merge = &lanMergeDelegate{
dc: c.config.Datacenter,
nodeID: c.config.NodeID,
nodeName: c.config.NodeName,
}
if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil { if err := lib.EnsurePath(conf.SnapshotPath, false); err != nil {
return nil, err return nil, err
} }

View File

@ -848,87 +848,6 @@ func TestLeader_RollRaftServer(t *testing.T) {
} }
} }
func TestLeader_ChangeServerAddress(t *testing.T) {
conf := func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
}
dir1, s1 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
dir2, s2 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir2)
defer s2.Shutdown()
dir3, s3 := testServerWithConfig(t, conf)
defer os.RemoveAll(dir3)
defer s3.Shutdown()
servers := []*Server{s1, s2, s3}
// Try to join
addr := fmt.Sprintf("127.0.0.1:%d",
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
if _, err := s2.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
if _, err := s3.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range servers {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal("should have 3 peers")
}
}
// Shut down a server, freeing up its address/port
s3.Shutdown()
if err := testutil.WaitForResult(func() (bool, error) {
alive := 0
for _, m := range s1.LANMembers() {
if m.Status == serf.StatusAlive {
alive++
}
}
return alive == 2, nil
}); err != nil {
t.Fatal("should have 2 alive members")
}
// Bring up a new server with s3's address that will get a different ID
dir4, s4 := testServerWithConfig(t, func(c *Config) {
c.Bootstrap = false
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
c.NodeID = s3.config.NodeID
})
defer os.RemoveAll(dir4)
defer s4.Shutdown()
if _, err := s4.JoinLAN([]string{addr}); err != nil {
t.Fatalf("err: %v", err)
}
servers[2] = s4
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
if err := testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}); err != nil {
t.Fatal("should have 3 members")
}
}
}
func TestLeader_ChangeServerID(t *testing.T) { func TestLeader_ChangeServerID(t *testing.T) {
conf := func(c *Config) { conf := func(c *Config) {
c.Bootstrap = false c.Bootstrap = false

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"github.com/hashicorp/consul/consul/agent" "github.com/hashicorp/consul/consul/agent"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
) )
@ -11,11 +12,33 @@ import (
// ring. We check that the peers are in the same datacenter and abort the // ring. We check that the peers are in the same datacenter and abort the
// merge if there is a mis-match. // merge if there is a mis-match.
type lanMergeDelegate struct { type lanMergeDelegate struct {
dc string dc string
nodeID types.NodeID
nodeName string
} }
func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error { func (md *lanMergeDelegate) NotifyMerge(members []*serf.Member) error {
nodeMap := make(map[types.NodeID]string)
for _, m := range members { for _, m := range members {
if rawID, ok := m.Tags["id"]; ok && rawID != "" {
nodeID := types.NodeID(rawID)
// See if there's another node that conflicts with us.
if (nodeID == md.nodeID) && (m.Name != md.nodeName) {
return fmt.Errorf("Member '%s' has conflicting node ID '%s' with this agent's ID",
m.Name, nodeID)
}
// See if there are any two nodes that conflict with each
// other. This lets us only do joins into a hygienic
// cluster now that node IDs are critical for operation.
if other, ok := nodeMap[nodeID]; ok {
return fmt.Errorf("Member '%s' has conflicting node ID '%s' with member '%s'",
m.Name, nodeID, other)
}
nodeMap[nodeID] = m.Name
}
ok, dc := isConsulNode(*m) ok, dc := isConsulNode(*m)
if ok { if ok {
if dc != md.dc { if dc != md.dc {

160
consul/merge_test.go Normal file
View File

@ -0,0 +1,160 @@
package consul
import (
"strings"
"testing"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/serf/serf"
)
func makeNode(dc, name, id string, server bool) *serf.Member {
var role string
if server {
role = "consul"
} else {
role = "node"
}
return &serf.Member{
Name: name,
Tags: map[string]string{
"role": role,
"dc": dc,
"id": id,
"port": "8300",
"build": "0.7.5",
"vsn": "2",
"vsn_max": "3",
"vsn_min": "2",
},
}
}
func TestMerge_LAN(t *testing.T) {
cases := []struct {
members []*serf.Member
expect string
}{
// Client in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false),
},
expect: "wrong datacenter",
},
// Server in the wrong datacenter.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
true),
},
expect: "wrong datacenter",
},
// Node ID conflict with delegate's ID.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"ee954a2f-80de-4b34-8780-97b942a50a99",
true),
},
expect: "with this agent's ID",
},
// Cluster with existing conflicting node IDs.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc1",
"node2",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
},
expect: "with member",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc1",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc1",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true),
},
expect: "",
},
}
delegate := &lanMergeDelegate{
dc: "dc1",
nodeID: types.NodeID("ee954a2f-80de-4b34-8780-97b942a50a99"),
nodeName: "node0",
}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
}
func TestMerge_WAN(t *testing.T) {
cases := []struct {
members []*serf.Member
expect string
}{
// Not a server
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"96430788-246f-4379-94ce-257f7429e340",
false),
},
expect: "not a server",
},
// Good cluster.
{
members: []*serf.Member{
makeNode("dc2",
"node1",
"6185913b-98d7-4441-bd8f-f7f7d854a4af",
true),
makeNode("dc3",
"node2",
"cda916bc-a357-4a19-b886-59419fcee50c",
true),
},
expect: "",
},
}
delegate := &wanMergeDelegate{}
for i, c := range cases {
if err := delegate.NotifyMerge(c.members); c.expect == "" {
if err != nil {
t.Fatalf("case %d: err: %v", i+1, err)
}
} else {
if err == nil || !strings.Contains(err.Error(), c.expect) {
t.Fatalf("case %d: err: %v", i+1, err)
}
}
}
}

View File

@ -396,7 +396,11 @@ func (s *Server) setupSerf(conf *serf.Config, ch chan serf.Event, path string, w
if wan { if wan {
conf.Merge = &wanMergeDelegate{} conf.Merge = &wanMergeDelegate{}
} else { } else {
conf.Merge = &lanMergeDelegate{dc: s.config.Datacenter} conf.Merge = &lanMergeDelegate{
dc: s.config.Datacenter,
nodeID: s.config.NodeID,
nodeName: s.config.NodeName,
}
} }
// Until Consul supports this fully, we disable automatic resolution. // Until Consul supports this fully, we disable automatic resolution.

View File

@ -165,22 +165,44 @@ func (s *StateStore) EnsureNode(idx uint64, node *structs.Node) error {
// registration or modify an existing one in the state store. It allows // registration or modify an existing one in the state store. It allows
// passing in a memdb transaction so it may be part of a larger txn. // passing in a memdb transaction so it may be part of a larger txn.
func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error { func (s *StateStore) ensureNodeTxn(tx *memdb.Txn, idx uint64, node *structs.Node) error {
// Check for an existing node // See if there's an existing node with this UUID, and make sure the
existing, err := tx.First("nodes", "id", node.Node) // name is the same.
if err != nil { var n *structs.Node
return fmt.Errorf("node name lookup failed: %s", err) if node.ID != "" {
existing, err := tx.First("nodes", "uuid", string(node.ID))
if err != nil {
return fmt.Errorf("node lookup failed: %s", err)
}
if existing != nil {
n = existing.(*structs.Node)
if n.Node != node.Node {
return fmt.Errorf("node ID %q for node %q aliases existing node %q",
node.ID, node.Node, n.Node)
}
}
} }
// Get the indexes // Check for an existing node by name to support nodes with no IDs.
if existing != nil { if n == nil {
node.CreateIndex = existing.(*structs.Node).CreateIndex existing, err := tx.First("nodes", "id", node.Node)
if err != nil {
return fmt.Errorf("node name lookup failed: %s", err)
}
if existing != nil {
n = existing.(*structs.Node)
}
}
// Get the indexes.
if n != nil {
node.CreateIndex = n.CreateIndex
node.ModifyIndex = idx node.ModifyIndex = idx
} else { } else {
node.CreateIndex = idx node.CreateIndex = idx
node.ModifyIndex = idx node.ModifyIndex = idx
} }
// Insert the node and update the index // Insert the node and update the index.
if err := tx.Insert("nodes", node); err != nil { if err := tx.Insert("nodes", node); err != nil {
return fmt.Errorf("failed inserting node: %s", err) return fmt.Errorf("failed inserting node: %s", err)
} }

View File

@ -4,6 +4,7 @@ import (
"fmt" "fmt"
"reflect" "reflect"
"sort" "sort"
"strings"
"testing" "testing"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
@ -419,6 +420,23 @@ func TestStateStore_EnsureNode(t *testing.T) {
if idx != 3 { if idx != 3 {
t.Fatalf("bad index: %d", idx) t.Fatalf("bad index: %d", idx)
} }
// Add an ID to the node
in.ID = types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c")
if err := s.EnsureNode(4, in); err != nil {
t.Fatalf("err: %v", err)
}
// Now try to add another node with the same ID
in = &structs.Node{
Node: "nope",
ID: types.NodeID("cda916bc-a357-4a19-b886-59419fcee50c"),
Address: "1.2.3.4",
}
err = s.EnsureNode(5, in)
if err == nil || !strings.Contains(err.Error(), "aliases existing node") {
t.Fatalf("err: %v", err)
}
} }
func TestStateStore_GetNodes(t *testing.T) { func TestStateStore_GetNodes(t *testing.T) {

View File

@ -25,6 +25,7 @@ import (
"strings" "strings"
"github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-uuid"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
@ -55,6 +56,7 @@ type TestAddressConfig struct {
// TestServerConfig is the main server configuration struct. // TestServerConfig is the main server configuration struct.
type TestServerConfig struct { type TestServerConfig struct {
NodeName string `json:"node_name"` NodeName string `json:"node_name"`
NodeID string `json:"node_id"`
NodeMeta map[string]string `json:"node_meta,omitempty"` NodeMeta map[string]string `json:"node_meta,omitempty"`
Performance *TestPerformanceConfig `json:"performance,omitempty"` Performance *TestPerformanceConfig `json:"performance,omitempty"`
Bootstrap bool `json:"bootstrap,omitempty"` Bootstrap bool `json:"bootstrap,omitempty"`
@ -83,8 +85,14 @@ type ServerConfigCallback func(c *TestServerConfig)
// defaultServerConfig returns a new TestServerConfig struct // defaultServerConfig returns a new TestServerConfig struct
// with all of the listen ports incremented by one. // with all of the listen ports incremented by one.
func defaultServerConfig() *TestServerConfig { func defaultServerConfig() *TestServerConfig {
nodeID, err := uuid.GenerateUUID()
if err != nil {
panic(err)
}
return &TestServerConfig{ return &TestServerConfig{
NodeName: fmt.Sprintf("node%d", randomPort()), NodeName: fmt.Sprintf("node%d", randomPort()),
NodeID: nodeID,
DisableCheckpoint: true, DisableCheckpoint: true,
Performance: &TestPerformanceConfig{ Performance: &TestPerformanceConfig{
RaftMultiplier: 1, RaftMultiplier: 1,