mirror of https://github.com/status-im/consul.git
agent/consul/state: ConnectServiceNodes
This commit is contained in:
parent
7ed26e2c64
commit
2062e37270
|
@ -10,6 +10,10 @@ import (
|
||||||
"github.com/hashicorp/go-memdb"
|
"github.com/hashicorp/go-memdb"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
servicesTableName = "services"
|
||||||
|
)
|
||||||
|
|
||||||
// nodesTableSchema returns a new table schema used for storing node
|
// nodesTableSchema returns a new table schema used for storing node
|
||||||
// information.
|
// information.
|
||||||
func nodesTableSchema() *memdb.TableSchema {
|
func nodesTableSchema() *memdb.TableSchema {
|
||||||
|
@ -87,6 +91,15 @@ func servicesTableSchema() *memdb.TableSchema {
|
||||||
Lowercase: true,
|
Lowercase: true,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
"proxy_destination": &memdb.IndexSchema{
|
||||||
|
Name: "proxy_destination",
|
||||||
|
AllowMissing: true,
|
||||||
|
Unique: false,
|
||||||
|
Indexer: &memdb.StringFieldIndex{
|
||||||
|
Field: "ServiceProxyDestination",
|
||||||
|
Lowercase: true,
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -839,6 +852,39 @@ func (s *Store) ServiceTagNodes(ws memdb.WatchSet, service string, tag string) (
|
||||||
return idx, results, nil
|
return idx, results, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ConnectServiceNodes returns the nodes associated with a Connect
|
||||||
|
// compatible destination for the given service name. This will include
|
||||||
|
// both proxies and native integrations.
|
||||||
|
func (s *Store) ConnectServiceNodes(ws memdb.WatchSet, serviceName string) (uint64, structs.ServiceNodes, error) {
|
||||||
|
tx := s.db.Txn(false)
|
||||||
|
defer tx.Abort()
|
||||||
|
|
||||||
|
// Get the table index.
|
||||||
|
idx := maxIndexForService(tx, serviceName, false)
|
||||||
|
|
||||||
|
// Find all the proxies. When we support native integrations we'll have
|
||||||
|
// to perform another table lookup here.
|
||||||
|
services, err := tx.Get(servicesTableName, "proxy_destination", serviceName)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed service lookup: %s", err)
|
||||||
|
}
|
||||||
|
ws.Add(services.WatchCh())
|
||||||
|
|
||||||
|
// Store them
|
||||||
|
var results structs.ServiceNodes
|
||||||
|
for service := services.Next(); service != nil; service = services.Next() {
|
||||||
|
results = append(results, service.(*structs.ServiceNode))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fill in the node details.
|
||||||
|
results, err = s.parseServiceNodes(tx, ws, results)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, fmt.Errorf("failed parsing service nodes: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return idx, results, nil
|
||||||
|
}
|
||||||
|
|
||||||
// serviceTagFilter returns true (should filter) if the given service node
|
// serviceTagFilter returns true (should filter) if the given service node
|
||||||
// doesn't contain the given tag.
|
// doesn't contain the given tag.
|
||||||
func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
|
func serviceTagFilter(sn *structs.ServiceNode, tag string) bool {
|
||||||
|
|
|
@ -1572,6 +1572,48 @@ func TestStateStore_DeleteService(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStateStore_ConnectServiceNodes(t *testing.T) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
// Listing with no results returns an empty list.
|
||||||
|
ws := memdb.NewWatchSet()
|
||||||
|
idx, nodes, err := s.ConnectServiceNodes(ws, "db")
|
||||||
|
assert.Nil(err)
|
||||||
|
assert.Equal(idx, uint64(0))
|
||||||
|
assert.Len(nodes, 0)
|
||||||
|
|
||||||
|
// Create some nodes and services.
|
||||||
|
assert.Nil(s.EnsureNode(10, &structs.Node{Node: "foo", Address: "127.0.0.1"}))
|
||||||
|
assert.Nil(s.EnsureNode(11, &structs.Node{Node: "bar", Address: "127.0.0.2"}))
|
||||||
|
assert.Nil(s.EnsureService(12, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: nil, Address: "", Port: 5000}))
|
||||||
|
assert.Nil(s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
|
||||||
|
assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
|
||||||
|
assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
|
||||||
|
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}))
|
||||||
|
assert.True(watchFired(ws))
|
||||||
|
|
||||||
|
// Read everything back.
|
||||||
|
ws = memdb.NewWatchSet()
|
||||||
|
idx, nodes, err = s.ConnectServiceNodes(ws, "db")
|
||||||
|
assert.Nil(err)
|
||||||
|
assert.Equal(idx, uint64(idx))
|
||||||
|
assert.Len(nodes, 2)
|
||||||
|
|
||||||
|
for _, n := range nodes {
|
||||||
|
assert.Equal(structs.ServiceKindConnectProxy, n.ServiceKind)
|
||||||
|
assert.Equal("db", n.ServiceProxyDestination)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Registering some unrelated node should not fire the watch.
|
||||||
|
testRegisterNode(t, s, 17, "nope")
|
||||||
|
assert.False(watchFired(ws))
|
||||||
|
|
||||||
|
// But removing a node with the "db" service should fire the watch.
|
||||||
|
assert.Nil(s.DeleteNode(18, "bar"))
|
||||||
|
assert.True(watchFired(ws))
|
||||||
|
}
|
||||||
|
|
||||||
func TestStateStore_Service_Snapshot(t *testing.T) {
|
func TestStateStore_Service_Snapshot(t *testing.T) {
|
||||||
s := testStateStore(t)
|
s := testStateStore(t)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue