mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 13:55:55 +00:00
Switch state store to MDBTables
This commit is contained in:
parent
f3788b0472
commit
7e84a75563
@ -1,7 +1,6 @@
|
|||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"github.com/armon/gomdb"
|
"github.com/armon/gomdb"
|
||||||
"github.com/hashicorp/consul/consul/structs"
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
@ -10,10 +9,9 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
dbNodes = "nodes" // Maps node -> addr
|
dbNodes = "nodes"
|
||||||
dbServices = "services" // Maps node||servId -> structs.NodeService
|
dbServices = "services"
|
||||||
dbServiceIndex = "serviceIndex" // Maps serv||tag||node||servId -> structs.ServiceNode
|
dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size
|
||||||
dbMaxMapSize = 1024 * 1024 * 1024 // 1GB maximum size
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -28,15 +26,18 @@ var (
|
|||||||
// implementation uses the Lightning Memory-Mapped Database (MDB).
|
// implementation uses the Lightning Memory-Mapped Database (MDB).
|
||||||
// This gives us Multi-Version Concurrency Control for "free"
|
// This gives us Multi-Version Concurrency Control for "free"
|
||||||
type StateStore struct {
|
type StateStore struct {
|
||||||
path string
|
path string
|
||||||
env *mdb.Env
|
env *mdb.Env
|
||||||
|
nodeTable *MDBTable
|
||||||
|
serviceTable *MDBTable
|
||||||
|
tables MDBTables
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateSnapshot is used to provide a point-in-time snapshot
|
// StateSnapshot is used to provide a point-in-time snapshot
|
||||||
// It works by starting a readonly transaction against all tables.
|
// It works by starting a readonly transaction against all tables.
|
||||||
type StateSnapshot struct {
|
type StateSnapshot struct {
|
||||||
tx *mdb.Txn
|
store *StateStore
|
||||||
dbis []mdb.DBI
|
tx *MDBTxn
|
||||||
}
|
}
|
||||||
|
|
||||||
// Close is used to abort the transaction and allow for cleanup
|
// Close is used to abort the transaction and allow for cleanup
|
||||||
@ -100,349 +101,210 @@ func (s *StateStore) initialize() error {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create all the tables
|
// Setup our tables
|
||||||
tx, _, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex)
|
s.nodeTable = &MDBTable{
|
||||||
if err != nil {
|
Env: s.env,
|
||||||
tx.Abort()
|
Name: dbNodes,
|
||||||
|
Indexes: map[string]*MDBIndex{
|
||||||
|
"id": &MDBIndex{
|
||||||
|
Unique: true,
|
||||||
|
Fields: []string{"Node"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Encoder: func(obj interface{}) []byte {
|
||||||
|
buf, err := structs.Encode(255, obj)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return buf[1:]
|
||||||
|
},
|
||||||
|
Decoder: func(buf []byte) interface{} {
|
||||||
|
out := new(structs.Node)
|
||||||
|
if err := structs.Decode(buf, out); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.nodeTable.Init(); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return tx.Commit()
|
|
||||||
}
|
|
||||||
|
|
||||||
// startTxn is used to start a transaction and open all the associated sub-databases
|
s.serviceTable = &MDBTable{
|
||||||
func (s *StateStore) startTxn(readonly bool, open ...string) (*mdb.Txn, []mdb.DBI, error) {
|
Env: s.env,
|
||||||
var txFlags uint = 0
|
Name: dbServices,
|
||||||
var dbFlags uint = 0
|
Indexes: map[string]*MDBIndex{
|
||||||
if readonly {
|
"id": &MDBIndex{
|
||||||
txFlags |= mdb.RDONLY
|
Unique: true,
|
||||||
} else {
|
Fields: []string{"Node", "ServiceID"},
|
||||||
dbFlags |= mdb.CREATE
|
},
|
||||||
|
"service": &MDBIndex{
|
||||||
|
AllowBlank: true,
|
||||||
|
Fields: []string{"ServiceName", "ServiceTag"},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
Encoder: func(obj interface{}) []byte {
|
||||||
|
buf, err := structs.Encode(255, obj)
|
||||||
|
if err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return buf[1:]
|
||||||
|
},
|
||||||
|
Decoder: func(buf []byte) interface{} {
|
||||||
|
out := new(structs.ServiceNode)
|
||||||
|
if err := structs.Decode(buf, out); err != nil {
|
||||||
|
panic(err)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := s.serviceTable.Init(); err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
tx, err := s.env.BeginTxn(nil, txFlags)
|
// Store the set of tables
|
||||||
if err != nil {
|
s.tables = []*MDBTable{s.nodeTable, s.serviceTable}
|
||||||
return nil, nil, err
|
return nil
|
||||||
}
|
|
||||||
|
|
||||||
var dbs []mdb.DBI
|
|
||||||
for _, name := range open {
|
|
||||||
dbi, err := tx.DBIOpen(name, dbFlags)
|
|
||||||
if err != nil {
|
|
||||||
tx.Abort()
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
dbs = append(dbs, dbi)
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx, dbs, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnsureNode is used to ensure a given node exists, with the provided address
|
// EnsureNode is used to ensure a given node exists, with the provided address
|
||||||
func (s *StateStore) EnsureNode(name string, address string) error {
|
func (s *StateStore) EnsureNode(node structs.Node) error {
|
||||||
tx, dbis, err := s.startTxn(false, dbNodes)
|
return s.nodeTable.Insert(node)
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
defer tx.Abort()
|
|
||||||
|
|
||||||
if err := tx.Put(dbis[0], encNull(name), encNull(address), 0); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return tx.Commit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNode returns all the address of the known and if it was found
|
// GetNode returns all the address of the known and if it was found
|
||||||
func (s *StateStore) GetNode(name string) (bool, string) {
|
func (s *StateStore) GetNode(name string) (bool, string) {
|
||||||
tx, dbis, err := s.startTxn(true, dbNodes)
|
res, err := s.nodeTable.Get("id", name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node: %v", err))
|
panic(fmt.Errorf("Failed to get node: %v", err))
|
||||||
}
|
}
|
||||||
defer tx.Abort()
|
if len(res) == 0 {
|
||||||
|
|
||||||
val, err := tx.Get(dbis[0], []byte(name))
|
|
||||||
if err == mdb.NotFound {
|
|
||||||
return false, ""
|
return false, ""
|
||||||
} else if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get node: %v", err))
|
|
||||||
}
|
}
|
||||||
return true, decNull(sliceCopy(val))
|
return true, res[0].(*structs.Node).Address
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetNodes returns all the known nodes, the slice alternates between
|
// GetNodes returns all the known nodes, the slice alternates between
|
||||||
// the node name and address
|
// the node name and address
|
||||||
func (s *StateStore) Nodes() []string {
|
func (s *StateStore) Nodes() structs.Nodes {
|
||||||
tx, dbis, err := s.startTxn(true, dbNodes)
|
res, err := s.nodeTable.Get("id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
||||||
}
|
}
|
||||||
defer tx.Abort()
|
results := make([]structs.Node, len(res))
|
||||||
|
for i, r := range res {
|
||||||
cursor, err := tx.CursorOpen(dbis[0])
|
results[i] = *r.(*structs.Node)
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
|
||||||
}
|
}
|
||||||
|
return results
|
||||||
var nodes []string
|
|
||||||
for {
|
|
||||||
key, val, err := cursor.Get(nil, mdb.NEXT)
|
|
||||||
if err == mdb.NotFound {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
|
||||||
}
|
|
||||||
nodes = append(nodes, decNull(sliceCopy(key)), decNull(sliceCopy(val)))
|
|
||||||
}
|
|
||||||
return nodes
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// EnsureService is used to ensure a given node exposes a service
|
// EnsureService is used to ensure a given node exposes a service
|
||||||
func (s *StateStore) EnsureService(name, id, service, tag string, port int) error {
|
func (s *StateStore) EnsureService(name, id, service, tag string, port int) error {
|
||||||
// Start a txn
|
// Ensure the node exists
|
||||||
tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex)
|
res, err := s.nodeTable.Get("id", name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer tx.Abort()
|
if len(res) == 0 {
|
||||||
nodes := dbis[0]
|
return fmt.Errorf("Missing node registration")
|
||||||
services := dbis[1]
|
|
||||||
index := dbis[2]
|
|
||||||
|
|
||||||
// Get the existing services
|
|
||||||
existing := filterNodeServices(tx, services, name)
|
|
||||||
|
|
||||||
// Get the node
|
|
||||||
addr, err := tx.Get(nodes, []byte(name))
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the service entry
|
// Create the entry
|
||||||
key := []byte(fmt.Sprintf("%s||%s", name, id))
|
entry := structs.ServiceNode{
|
||||||
nService := structs.NodeService{
|
|
||||||
ID: id,
|
|
||||||
Service: service,
|
|
||||||
Tag: tag,
|
|
||||||
Port: port,
|
|
||||||
}
|
|
||||||
val, err := structs.Encode(255, &nService)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := tx.Put(services, key, val, 0); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove previous entry if any
|
|
||||||
if exist, ok := existing.Services[id]; ok {
|
|
||||||
key := []byte(fmt.Sprintf("%s||%s||%s||%s", service, exist.Tag, name, id))
|
|
||||||
if err := tx.Del(index, key, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Update the index entry
|
|
||||||
key = []byte(fmt.Sprintf("%s||%s||%s||%s", service, tag, name, id))
|
|
||||||
node := structs.ServiceNode{
|
|
||||||
Node: name,
|
Node: name,
|
||||||
Address: string(addr),
|
|
||||||
ServiceID: id,
|
ServiceID: id,
|
||||||
|
ServiceName: service,
|
||||||
ServiceTag: tag,
|
ServiceTag: tag,
|
||||||
ServicePort: port,
|
ServicePort: port,
|
||||||
}
|
}
|
||||||
val, err = structs.Encode(255, &node)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if err := tx.Put(index, key, val, 0); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Commit()
|
// Ensure the service entry is set
|
||||||
|
return s.serviceTable.Insert(&entry)
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeServices is used to return all the services of a given node
|
// NodeServices is used to return all the services of a given node
|
||||||
func (s *StateStore) NodeServices(name string) *structs.NodeServices {
|
func (s *StateStore) NodeServices(name string) *structs.NodeServices {
|
||||||
tx, dbis, err := s.startTxn(true, dbNodes, dbServices)
|
tx, err := s.tables.StartTxn(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
||||||
}
|
}
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
ns := filterNodeServices(tx, dbis[1], name)
|
return s.parseNodeServices(tx, name)
|
||||||
|
}
|
||||||
|
|
||||||
// Get the address of the ndoe
|
// parseNodeServices is used to get the services belonging to a
|
||||||
val, err := tx.Get(dbis[0], []byte(name))
|
// node, using a given txn
|
||||||
if err == mdb.NotFound {
|
func (s *StateStore) parseNodeServices(tx *MDBTxn, name string) *structs.NodeServices {
|
||||||
return ns
|
ns := &structs.NodeServices{
|
||||||
} else if err != nil {
|
Services: make(map[string]*structs.NodeService),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get the node first
|
||||||
|
res, err := s.nodeTable.GetTxn(tx, "id", name)
|
||||||
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node: %v", err))
|
panic(fmt.Errorf("Failed to get node: %v", err))
|
||||||
}
|
}
|
||||||
ns.Address = decNull(sliceCopy(val))
|
if len(res) == 0 {
|
||||||
|
return ns
|
||||||
|
}
|
||||||
|
|
||||||
return ns
|
// Set the address
|
||||||
}
|
node := res[0].(*structs.Node)
|
||||||
|
ns.Address = node.Address
|
||||||
|
|
||||||
// filterNodeServices is used to filter the services to a specific node
|
// Get the services
|
||||||
func filterNodeServices(tx *mdb.Txn, services mdb.DBI, name string) *structs.NodeServices {
|
res, err = s.serviceTable.GetTxn(tx, "id", name)
|
||||||
keyPrefix := []byte(fmt.Sprintf("%s||", name))
|
|
||||||
return parseNodeServices(tx, services, keyPrefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
// parseNodeServices is used to parse the results of a queryNodeServices
|
|
||||||
func parseNodeServices(tx *mdb.Txn, dbi mdb.DBI, prefix []byte) *structs.NodeServices {
|
|
||||||
// Create the cursor
|
|
||||||
cursor, err := tx.CursorOpen(dbi)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
panic(fmt.Errorf("Failed to get node: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
ns := &structs.NodeServices{
|
// Add each service
|
||||||
Services: make(map[string]structs.NodeService),
|
for _, r := range res {
|
||||||
}
|
service := r.(*structs.ServiceNode)
|
||||||
var id string
|
srv := &structs.NodeService{
|
||||||
var entry structs.NodeService
|
ID: service.ServiceID,
|
||||||
var key, val []byte
|
Service: service.ServiceName,
|
||||||
first := true
|
Tag: service.ServiceTag,
|
||||||
|
Port: service.ServicePort,
|
||||||
for {
|
|
||||||
if first {
|
|
||||||
first = false
|
|
||||||
key, val, err = cursor.Get(prefix, mdb.SET_RANGE)
|
|
||||||
} else {
|
|
||||||
key, val, err = cursor.Get(nil, mdb.NEXT)
|
|
||||||
}
|
}
|
||||||
if err == mdb.NotFound {
|
ns.Services[srv.ID] = srv
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Bail if this does not match our filter
|
|
||||||
if !bytes.HasPrefix(key, prefix) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Split to get service name
|
|
||||||
parts := bytes.SplitN(sliceCopy(key), []byte("||"), 2)
|
|
||||||
id = string(parts[1])
|
|
||||||
|
|
||||||
// Setup the entry
|
|
||||||
if val[0] != 255 {
|
|
||||||
panic(fmt.Errorf("Bad service value: %v", val))
|
|
||||||
}
|
|
||||||
if err := structs.Decode(val[1:], &entry); err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
// Add to the map
|
|
||||||
ns.Services[id] = entry
|
|
||||||
}
|
}
|
||||||
return ns
|
return ns
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteNodeService is used to delete a node service
|
// DeleteNodeService is used to delete a node service
|
||||||
func (s *StateStore) DeleteNodeService(node, id string) error {
|
func (s *StateStore) DeleteNodeService(node, id string) error {
|
||||||
tx, dbis, err := s.startTxn(false, dbServices, dbServiceIndex)
|
_, err := s.serviceTable.Delete("id", node, id)
|
||||||
if err != nil {
|
return err
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
|
||||||
}
|
|
||||||
defer tx.Abort()
|
|
||||||
services := dbis[0]
|
|
||||||
index := dbis[1]
|
|
||||||
|
|
||||||
// Get the existing services
|
|
||||||
existing := filterNodeServices(tx, services, node)
|
|
||||||
exist, ok := existing.Services[id]
|
|
||||||
|
|
||||||
// Bail if no existing entry
|
|
||||||
if !ok {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the node service entry
|
|
||||||
key := []byte(fmt.Sprintf("%s||%s", node, id))
|
|
||||||
if err = tx.Del(services, key, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the sevice index entry
|
|
||||||
key = []byte(fmt.Sprintf("%s||%s||%s||%s", exist.Service, exist.Tag, node, id))
|
|
||||||
if err := tx.Del(index, key, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return tx.Commit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// DeleteNode is used to delete a node and all it's services
|
// DeleteNode is used to delete a node and all it's services
|
||||||
func (s *StateStore) DeleteNode(node string) error {
|
func (s *StateStore) DeleteNode(node string) error {
|
||||||
tx, dbis, err := s.startTxn(false, dbNodes, dbServices, dbServiceIndex)
|
if _, err := s.serviceTable.Delete("id", node); err != nil {
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
|
||||||
}
|
|
||||||
defer tx.Abort()
|
|
||||||
nodes := dbis[0]
|
|
||||||
services := dbis[1]
|
|
||||||
index := dbis[2]
|
|
||||||
|
|
||||||
// Delete the node
|
|
||||||
err = tx.Del(nodes, []byte(node), nil)
|
|
||||||
if err == mdb.NotFound {
|
|
||||||
err = nil
|
|
||||||
} else if err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
if _, err := s.nodeTable.Delete("id", node); err != nil {
|
||||||
// Get the existing services
|
return err
|
||||||
existing := filterNodeServices(tx, services, node)
|
|
||||||
|
|
||||||
// Nuke all the services
|
|
||||||
for id, entry := range existing.Services {
|
|
||||||
// Delete the node service entry
|
|
||||||
key := []byte(fmt.Sprintf("%s||%s", node, id))
|
|
||||||
if err = tx.Del(services, key, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete the sevice index entry
|
|
||||||
key = []byte(fmt.Sprintf("%s||%s||%s||%s", entry.Service, entry.Tag, node, id))
|
|
||||||
if err := tx.Del(index, key, nil); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
return tx.Commit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Services is used to return all the services with a list of associated tags
|
// Services is used to return all the services with a list of associated tags
|
||||||
func (s *StateStore) Services() map[string][]string {
|
func (s *StateStore) Services() map[string][]string {
|
||||||
tx, dbis, err := s.startTxn(true, dbServiceIndex)
|
// TODO: Optimize to not table scan.. We can do a distinct
|
||||||
|
// type of query to avoid this
|
||||||
|
res, err := s.serviceTable.Get("id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
||||||
}
|
}
|
||||||
defer tx.Abort()
|
|
||||||
index := dbis[0]
|
|
||||||
|
|
||||||
cursor, err := tx.CursorOpen(index)
|
|
||||||
if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get services: %v", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
services := make(map[string][]string)
|
services := make(map[string][]string)
|
||||||
for {
|
for _, r := range res {
|
||||||
key, _, err := cursor.Get(nil, mdb.NEXT)
|
srv := r.(*structs.ServiceNode)
|
||||||
if err == mdb.NotFound {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get services: %v", err))
|
|
||||||
}
|
|
||||||
parts := bytes.SplitN(sliceCopy(key), []byte("||"), 3)
|
|
||||||
service := string(parts[0])
|
|
||||||
tag := string(parts[1])
|
|
||||||
|
|
||||||
tags := services[service]
|
tags := services[srv.ServiceName]
|
||||||
if !strContains(tags, tag) {
|
if !strContains(tags, srv.ServiceTag) {
|
||||||
tags = append(tags, tag)
|
tags = append(tags, srv.ServiceTag)
|
||||||
services[service] = tags
|
services[srv.ServiceName] = tags
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return services
|
return services
|
||||||
@ -450,142 +312,83 @@ func (s *StateStore) Services() map[string][]string {
|
|||||||
|
|
||||||
// ServiceNodes returns the nodes associated with a given service
|
// ServiceNodes returns the nodes associated with a given service
|
||||||
func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes {
|
func (s *StateStore) ServiceNodes(service string) structs.ServiceNodes {
|
||||||
tx, dbis, err := s.startTxn(true, dbServiceIndex)
|
tx, err := s.tables.StartTxn(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
||||||
}
|
}
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
prefix := []byte(fmt.Sprintf("%s||", service))
|
|
||||||
return parseServiceNodes(tx, dbis[0], prefix)
|
res, err := s.serviceTable.Get("service", service)
|
||||||
|
return parseServiceNodes(tx, s.nodeTable, res, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceTagNodes returns the nodes associated with a given service matching a tag
|
// ServiceTagNodes returns the nodes associated with a given service matching a tag
|
||||||
func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes {
|
func (s *StateStore) ServiceTagNodes(service, tag string) structs.ServiceNodes {
|
||||||
tx, dbis, err := s.startTxn(true, dbServiceIndex)
|
tx, err := s.tables.StartTxn(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node servicess: %v", err))
|
panic(fmt.Errorf("Failed to start txn: %v", err))
|
||||||
}
|
}
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
prefix := []byte(fmt.Sprintf("%s||%s||", service, tag))
|
|
||||||
return parseServiceNodes(tx, dbis[0], prefix)
|
res, err := s.serviceTable.Get("service", service, tag)
|
||||||
|
return parseServiceNodes(tx, s.nodeTable, res, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
|
// parseServiceNodes parses results ServiceNodes and ServiceTagNodes
|
||||||
func parseServiceNodes(tx *mdb.Txn, index mdb.DBI, prefix []byte) structs.ServiceNodes {
|
func parseServiceNodes(tx *MDBTxn, table *MDBTable, res []interface{}, err error) structs.ServiceNodes {
|
||||||
cursor, err := tx.CursorOpen(index)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
panic(fmt.Errorf("Failed to get node services: %v", err))
|
||||||
}
|
}
|
||||||
|
println(fmt.Sprintf("res: %#v", res))
|
||||||
|
|
||||||
var nodes structs.ServiceNodes
|
nodes := make(structs.ServiceNodes, len(res))
|
||||||
var node structs.ServiceNode
|
for i, r := range res {
|
||||||
var key, val []byte
|
srv := r.(*structs.ServiceNode)
|
||||||
first := true
|
|
||||||
|
|
||||||
for {
|
// Get the address of the node
|
||||||
if first {
|
nodeRes, err := table.GetTxn(tx, "id", srv.Node)
|
||||||
first = false
|
if err != nil || len(nodeRes) != 1 {
|
||||||
key, val, err = cursor.Get(prefix, mdb.SET_RANGE)
|
panic(fmt.Errorf("Failed to join node: %v", err))
|
||||||
} else {
|
|
||||||
key, val, err = cursor.Get(nil, mdb.NEXT)
|
|
||||||
}
|
|
||||||
if err == mdb.NotFound {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
|
||||||
}
|
}
|
||||||
|
srv.Address = nodeRes[0].(*structs.Node).Address
|
||||||
|
|
||||||
// Bail if this does not match our filter
|
nodes[i] = *srv
|
||||||
if !bytes.HasPrefix(key, prefix) {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
// Setup the node
|
|
||||||
if val[0] != 255 {
|
|
||||||
panic(fmt.Errorf("Bad service value: %v", val))
|
|
||||||
}
|
|
||||||
if err := structs.Decode(val[1:], &node); err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get node services: %v", err))
|
|
||||||
}
|
|
||||||
|
|
||||||
nodes = append(nodes, node)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return nodes
|
return nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
// Snapshot is used to create a point in time snapshot
|
// Snapshot is used to create a point in time snapshot
|
||||||
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
func (s *StateStore) Snapshot() (*StateSnapshot, error) {
|
||||||
// Begin a new txn
|
// Begin a new txn on all tables
|
||||||
tx, dbis, err := s.startTxn(true, dbNodes, dbServices, dbServiceIndex)
|
tx, err := s.tables.StartTxn(true)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
tx.Abort()
|
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return the snapshot
|
// Return the snapshot
|
||||||
snap := &StateSnapshot{
|
snap := &StateSnapshot{
|
||||||
tx: tx,
|
store: s,
|
||||||
dbis: dbis,
|
tx: tx,
|
||||||
}
|
}
|
||||||
return snap, nil
|
return snap, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Nodes returns all the known nodes, the slice alternates between
|
// Nodes returns all the known nodes, the slice alternates between
|
||||||
// the node name and address
|
// the node name and address
|
||||||
func (s *StateSnapshot) Nodes() []string {
|
func (s *StateSnapshot) Nodes() structs.Nodes {
|
||||||
cursor, err := s.tx.CursorOpen(s.dbis[0])
|
res, err := s.store.nodeTable.GetTxn(s.tx, "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
||||||
}
|
}
|
||||||
|
results := make([]structs.Node, len(res))
|
||||||
var nodes []string
|
for i, r := range res {
|
||||||
for {
|
results[i] = *r.(*structs.Node)
|
||||||
key, val, err := cursor.Get(nil, mdb.NEXT)
|
|
||||||
if err == mdb.NotFound {
|
|
||||||
break
|
|
||||||
} else if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get nodes: %v", err))
|
|
||||||
}
|
|
||||||
nodes = append(nodes, decNull(sliceCopy(key)), decNull(sliceCopy(val)))
|
|
||||||
}
|
}
|
||||||
return nodes
|
return results
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeServices is used to return all the services of a given node
|
// NodeServices is used to return all the services of a given node
|
||||||
func (s *StateSnapshot) NodeServices(name string) *structs.NodeServices {
|
func (s *StateSnapshot) NodeServices(name string) *structs.NodeServices {
|
||||||
// Get the node services
|
return s.store.parseNodeServices(s.tx, name)
|
||||||
ns := filterNodeServices(s.tx, s.dbis[1], name)
|
|
||||||
|
|
||||||
// Get the address of the node
|
|
||||||
val, err := s.tx.Get(s.dbis[0], []byte(name))
|
|
||||||
if err == mdb.NotFound {
|
|
||||||
return ns
|
|
||||||
} else if err != nil {
|
|
||||||
panic(fmt.Errorf("Failed to get node: %v", err))
|
|
||||||
}
|
|
||||||
ns.Address = decNull(sliceCopy(val))
|
|
||||||
return ns
|
|
||||||
}
|
|
||||||
|
|
||||||
// copies a slice to prevent access to lmdb private data
|
|
||||||
func sliceCopy(in []byte) []byte {
|
|
||||||
c := make([]byte, len(in))
|
|
||||||
copy(c, in)
|
|
||||||
return c
|
|
||||||
}
|
|
||||||
|
|
||||||
// encodes a potentially empty string using a sentinel
|
|
||||||
func encNull(s string) []byte {
|
|
||||||
if s == "" {
|
|
||||||
return nullSentinel
|
|
||||||
}
|
|
||||||
return []byte(s)
|
|
||||||
}
|
|
||||||
|
|
||||||
// decodes the potential sentinel to an empty string
|
|
||||||
func decNull(s []byte) string {
|
|
||||||
if bytes.Compare(s, nullSentinel) == 0 {
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
return string(s)
|
|
||||||
}
|
}
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
package consul
|
package consul
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"github.com/hashicorp/consul/consul/structs"
|
||||||
"sort"
|
"sort"
|
||||||
"testing"
|
"testing"
|
||||||
)
|
)
|
||||||
@ -12,7 +13,7 @@ func TestEnsureNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -21,7 +22,7 @@ func TestEnsureNode(t *testing.T) {
|
|||||||
t.Fatalf("Bad: %v %v", found, addr)
|
t.Fatalf("Bad: %v %v", found, addr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.2"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.2"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -38,19 +39,19 @@ func TestGetNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
|
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
nodes := store.Nodes()
|
nodes := store.Nodes()
|
||||||
if len(nodes) != 4 {
|
if len(nodes) != 2 {
|
||||||
t.Fatalf("Bad: %v", nodes)
|
t.Fatalf("Bad: %v", nodes)
|
||||||
}
|
}
|
||||||
if nodes[2] != "foo" && nodes[0] != "bar" {
|
if nodes[1].Node != "foo" && nodes[0].Node != "bar" {
|
||||||
t.Fatalf("Bad: %v", nodes)
|
t.Fatalf("Bad: %v", nodes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -62,7 +63,7 @@ func TestEnsureService(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -104,7 +105,7 @@ func TestEnsureService_DuplicateNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,7 +155,7 @@ func TestDeleteNodeService(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -180,7 +181,7 @@ func TestDeleteNodeService_One(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -214,7 +215,7 @@ func TestDeleteNode(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,11 +246,11 @@ func TestGetServices(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
|
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -292,11 +293,11 @@ func TestServiceNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
|
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -380,11 +381,11 @@ func TestServiceTagNodes(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
|
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -425,11 +426,11 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer store.Close()
|
defer store.Close()
|
||||||
|
|
||||||
if err := store.EnsureNode("foo", "127.0.0.1"); err != nil {
|
if err := store.EnsureNode(structs.Node{"foo", "127.0.0.1"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := store.EnsureNode("bar", "127.0.0.2"); err != nil {
|
if err := store.EnsureNode(structs.Node{"bar", "127.0.0.2"}); err != nil {
|
||||||
t.Fatalf("err: %v")
|
t.Fatalf("err: %v")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -454,7 +455,7 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
|
|
||||||
// Check snapshot has old values
|
// Check snapshot has old values
|
||||||
nodes := snap.Nodes()
|
nodes := snap.Nodes()
|
||||||
if len(nodes) != 4 {
|
if len(nodes) != 2 {
|
||||||
t.Fatalf("bad: %v", nodes)
|
t.Fatalf("bad: %v", nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -479,13 +480,13 @@ func TestStoreSnapshot(t *testing.T) {
|
|||||||
if err := store.EnsureService("bar", "db", "db", "master", 8000); err != nil {
|
if err := store.EnsureService("bar", "db", "db", "master", 8000); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if err := store.EnsureNode("baz", "127.0.0.3"); err != nil {
|
if err := store.EnsureNode(structs.Node{"baz", "127.0.0.3"}); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check snapshot has old values
|
// Check snapshot has old values
|
||||||
nodes = snap.Nodes()
|
nodes = snap.Nodes()
|
||||||
if len(nodes) != 4 {
|
if len(nodes) != 2 {
|
||||||
t.Fatalf("bad: %v", nodes)
|
t.Fatalf("bad: %v", nodes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user