Adding state store methods

This commit is contained in:
Armon Dadgar 2013-12-11 14:27:27 -08:00
parent 3b8888bdc8
commit 05d5eb08a8
2 changed files with 132 additions and 10 deletions

View File

@ -10,9 +10,19 @@ type namedQuery uint8
const ( const (
queryEnsureNode namedQuery = iota queryEnsureNode namedQuery = iota
queryGetNodes queryNode
queryNodes
queryEnsureService
queryNodeServices
) )
// NoodeServices maps the Service name to a tag and port
type ServiceEntry struct {
Tag string
Port int
}
type NodeServices map[string]ServiceEntry
// The StateStore is responsible for maintaining all the Consul // The StateStore is responsible for maintaining all the Consul
// state. It is manipulated by the FSM which maintains consistency // state. It is manipulated by the FSM which maintains consistency
// through the use of Raft. The goals of the StateStore are to provide // through the use of Raft. The goals of the StateStore are to provide
@ -78,8 +88,11 @@ func (s *StateStore) initialize() error {
// Prepare the queries // Prepare the queries
queries := map[namedQuery]string{ queries := map[namedQuery]string{
queryEnsureNode: "INSERT OR REPLACE INTO nodes (name, address) VALUES (?, ?)", queryEnsureNode: "INSERT OR REPLACE INTO nodes (name, address) VALUES (?, ?)",
queryGetNodes: "SELECT * FROM nodes", queryNode: "SELECT address FROM nodes where name=?",
queryNodes: "SELECT * FROM nodes",
queryEnsureService: "INSERT OR REPLACE INTO services (node, service, tag, port) VALUES (?, ?, ?, ?)",
queryNodeServices: "SELECT service, tag, port from services where node=?",
} }
for name, query := range queries { for name, query := range queries {
stmt, err := s.db.Prepare(query) stmt, err := s.db.Prepare(query)
@ -111,10 +124,26 @@ func (s *StateStore) EnsureNode(name string, address string) error {
return s.checkSet(stmt.Exec(name, address)) return s.checkSet(stmt.Exec(name, address))
} }
// GetNode returns all the address of the known and if it was found
func (s *StateStore) GetNode(name string) (bool, string) {
stmt := s.prepared[queryNode]
row := stmt.QueryRow(name)
var addr string
if err := row.Scan(&addr); err != nil {
if err == sql.ErrNoRows {
return false, addr
} else {
panic(fmt.Errorf("Failed to get node: %v", err))
}
}
return true, addr
}
// GetNodes returns all the known nodes, the slice alternates between // GetNodes returns all the known nodes, the slice alternates between
// the node name and address // the node name and address
func (s *StateStore) GetNodes() []string { func (s *StateStore) Nodes() []string {
stmt := s.prepared[queryGetNodes] stmt := s.prepared[queryNodes]
rows, err := stmt.Query() rows, err := stmt.Query()
if err != nil { if err != nil {
panic(fmt.Errorf("Failed to get nodes: %v", err)) panic(fmt.Errorf("Failed to get nodes: %v", err))
@ -130,3 +159,30 @@ func (s *StateStore) GetNodes() []string {
} }
return data return data
} }
// EnsureService is used to ensure a given node exposes a service
func (s *StateStore) EnsureService(name, service, tag string, port int) error {
stmt := s.prepared[queryEnsureService]
return s.checkSet(stmt.Exec(name, service, tag, port))
}
// NodeServices is used to return all the services of a given node
func (s *StateStore) NodeServices(name string) NodeServices {
stmt := s.prepared[queryNodeServices]
rows, err := stmt.Query(name)
if err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
services := NodeServices(make(map[string]ServiceEntry))
var service string
var entry ServiceEntry
for rows.Next() {
if err := rows.Scan(&service, &entry.Tag, &entry.Port); err != nil {
panic(fmt.Errorf("Failed to get node services: %v", err))
}
services[service] = entry
}
return services
}

View File

@ -15,17 +15,83 @@ func TestEnsureNode(t *testing.T) {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
nodes := store.GetNodes() found, addr := store.GetNode("foo")
if nodes[0] != "foo" || nodes[1] != "127.0.0.1" { if !found || addr != "127.0.0.1" {
t.Fatalf("Bad: %v", nodes) t.Fatalf("Bad: %v %v", found, addr)
} }
if err := store.EnsureNode("foo", "127.0.0.2"); err != nil { if err := store.EnsureNode("foo", "127.0.0.2"); err != nil {
t.Fatalf("err: %v") t.Fatalf("err: %v")
} }
nodes = store.GetNodes() found, addr = store.GetNode("foo")
if nodes[0] != "foo" || nodes[1] != "127.0.0.2" { if !found || addr != "127.0.0.2" {
t.Fatalf("Bad: %v %v", found, addr)
}
}
func TestGetNodes(t *testing.T) {
store, err := NewStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
t.Fatalf("err: %v")
}
nodes := store.Nodes()
if len(nodes) != 4 {
t.Fatalf("Bad: %v", nodes)
}
if nodes[0] != "foo" && nodes[2] != "bar" {
t.Fatalf("Bad: %v", nodes) t.Fatalf("Bad: %v", nodes)
} }
} }
func TestEnsureService(t *testing.T) {
store, err := NewStateStore()
if err != nil {
t.Fatalf("err: %v", err)
}
defer store.Close()
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "api", "", 5000); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "api", "", 5001); err != nil {
t.Fatalf("err: %v")
}
if err := store.EnsureService("foo", "db", "master", 8000); err != nil {
t.Fatalf("err: %v")
}
services := store.NodeServices("foo")
entry, ok := services["api"]
if !ok {
t.Fatalf("missing api: %#v", services)
}
if entry.Tag != "" || entry.Port != 5001 {
t.Fatalf("Bad entry: %#v", entry)
}
entry, ok = services["db"]
if !ok {
t.Fatalf("missing db: %#v", services)
}
if entry.Tag != "master" || entry.Port != 8000 {
t.Fatalf("Bad entry: %#v", entry)
}
}