mirror of https://github.com/status-im/consul.git
Finish cleanup from ServiceConfigRequest changes
This commit is contained in:
parent
770c5552d6
commit
3492f9e0d6
|
@ -1948,7 +1948,6 @@ type addServiceLockedRequest struct {
|
|||
// agent using Agent.AddService.
|
||||
type AddServiceRequest struct {
|
||||
Service *structs.NodeService
|
||||
nodeName string
|
||||
chkTypes []*structs.CheckType
|
||||
persist bool
|
||||
token string
|
||||
|
@ -3108,7 +3107,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
|||
err = a.addServiceLocked(addServiceLockedRequest{
|
||||
AddServiceRequest: AddServiceRequest{
|
||||
Service: ns,
|
||||
nodeName: a.config.NodeName,
|
||||
chkTypes: chkTypes,
|
||||
persist: false, // don't rewrite the file with the same data we just read
|
||||
token: service.Token,
|
||||
|
@ -3129,7 +3127,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
|||
err = a.addServiceLocked(addServiceLockedRequest{
|
||||
AddServiceRequest: AddServiceRequest{
|
||||
Service: sidecar,
|
||||
nodeName: a.config.NodeName,
|
||||
chkTypes: sidecarChecks,
|
||||
persist: false, // don't rewrite the file with the same data we just read
|
||||
token: sidecarToken,
|
||||
|
@ -3228,7 +3225,6 @@ func (a *Agent) loadServices(conf *config.RuntimeConfig, snap map[structs.CheckI
|
|||
err = a.addServiceLocked(addServiceLockedRequest{
|
||||
AddServiceRequest: AddServiceRequest{
|
||||
Service: p.Service,
|
||||
nodeName: a.config.NodeName,
|
||||
chkTypes: nil,
|
||||
persist: false, // don't rewrite the file with the same data we just read
|
||||
token: p.Token,
|
||||
|
|
|
@ -994,7 +994,6 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
|
|||
|
||||
addReq := AddServiceRequest{
|
||||
Service: ns,
|
||||
nodeName: s.agent.config.NodeName,
|
||||
chkTypes: chkTypes,
|
||||
persist: true,
|
||||
token: token,
|
||||
|
@ -1008,7 +1007,6 @@ func (s *HTTPHandlers) AgentRegisterService(resp http.ResponseWriter, req *http.
|
|||
if sidecar != nil {
|
||||
addReq := AddServiceRequest{
|
||||
Service: sidecar,
|
||||
nodeName: s.agent.config.NodeName,
|
||||
chkTypes: sidecarChecks,
|
||||
persist: true,
|
||||
token: sidecarToken,
|
||||
|
|
|
@ -25,8 +25,6 @@ func TestResolvedServiceConfig(t *testing.T) {
|
|||
require.Equal(uint64(24), req.QueryOptions.MinQueryIndex)
|
||||
require.Equal(1*time.Second, req.QueryOptions.MaxQueryTime)
|
||||
require.Equal("foo", req.Name)
|
||||
require.Equal("foo-1", req.ID)
|
||||
require.Equal("foo-node", req.NodeName)
|
||||
require.True(req.AllowStale)
|
||||
|
||||
reply := args.Get(2).(*structs.ServiceConfigResponse)
|
||||
|
@ -50,8 +48,6 @@ func TestResolvedServiceConfig(t *testing.T) {
|
|||
}, &structs.ServiceConfigRequest{
|
||||
Datacenter: "dc1",
|
||||
Name: "foo",
|
||||
ID: "foo-1",
|
||||
NodeName: "foo-node",
|
||||
})
|
||||
require.NoError(err)
|
||||
require.Equal(cache.FetchResult{
|
||||
|
|
|
@ -333,7 +333,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
|
|||
|
||||
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
|
||||
// blocking query, this function will be rerun and these state store lookups will both be current.
|
||||
// We use the default enterprise meta to look up the global proxy defaults because their are not namespaced.
|
||||
// We use the default enterprise meta to look up the global proxy defaults because they are not namespaced.
|
||||
_, proxyEntry, err := state.ConfigEntry(ws, structs.ProxyDefaults, structs.ProxyConfigGlobal, structs.DefaultEnterpriseMeta())
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -1145,269 +1145,6 @@ func TestConfigEntry_ResolveServiceConfig_Upstreams(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig_Upstreams_RegistrationBlocking(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
|
||||
nodeName := "foo-node"
|
||||
|
||||
// Create a dummy proxy/service config in the state store to look up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"foo": 1,
|
||||
},
|
||||
}))
|
||||
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
Protocol: "http",
|
||||
}))
|
||||
require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "bar",
|
||||
Protocol: "grpc",
|
||||
}))
|
||||
require.NoError(t, state.EnsureNode(4, &structs.Node{
|
||||
ID: "9c6e733c-c39d-4555-8d41-0f174a31c489",
|
||||
Node: nodeName,
|
||||
}))
|
||||
|
||||
args := structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
Datacenter: s1.config.Datacenter,
|
||||
Upstreams: []string{"bar", "baz"},
|
||||
}
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
|
||||
|
||||
var index uint64
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"foo": int64(1),
|
||||
"protocol": "http",
|
||||
},
|
||||
// This mesh gateway configuration is pulled from foo-proxy's registration
|
||||
UpstreamConfigs: map[string]map[string]interface{}{
|
||||
"bar": {
|
||||
"protocol": "grpc",
|
||||
},
|
||||
},
|
||||
// Don't know what this is deterministically
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(t, expected, out)
|
||||
index = out.Index
|
||||
|
||||
// Now setup a blocking query for 'foo' while we add the proxy registration for foo-proxy.
|
||||
// Adding the foo proxy registration should cause the blocking query to fire because it is
|
||||
// watched when the ID and NodeName are provided.
|
||||
{
|
||||
// Async cause a change
|
||||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.NoError(t, state.EnsureService(index+1, nodeName, &structs.NodeService{
|
||||
ID: "foo-proxy",
|
||||
Service: "foo-proxy",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
MeshGateway: structs.MeshGatewayConfig{
|
||||
Mode: structs.MeshGatewayModeLocal,
|
||||
},
|
||||
},
|
||||
}))
|
||||
}()
|
||||
|
||||
// Re-run the query
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
||||
&structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
Datacenter: "dc1",
|
||||
Upstreams: []string{"bar", "baz"},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: index,
|
||||
MaxQueryTime: time.Second,
|
||||
},
|
||||
},
|
||||
&out,
|
||||
))
|
||||
|
||||
// Should block at least 100ms
|
||||
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
|
||||
|
||||
// Check the indexes
|
||||
require.Equal(t, out.Index, index+1)
|
||||
|
||||
// The mesh gateway config from the proxy registration should no longer be present
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"foo": int64(1),
|
||||
"protocol": "http",
|
||||
},
|
||||
UpstreamConfigs: map[string]map[string]interface{}{
|
||||
"bar": {
|
||||
"protocol": "grpc",
|
||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
||||
},
|
||||
"baz": {
|
||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
||||
},
|
||||
},
|
||||
// Don't know what this is deterministically
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(t, expected, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig_Upstreams_DegistrationBlocking(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
}
|
||||
t.Parallel()
|
||||
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testrpc.WaitForTestAgent(t, s1.RPC, "dc1")
|
||||
|
||||
nodeName := "foo-node"
|
||||
|
||||
// Create a dummy proxy/service config in the state store to look up.
|
||||
state := s1.fsm.State()
|
||||
require.NoError(t, state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
|
||||
Kind: structs.ProxyDefaults,
|
||||
Name: structs.ProxyConfigGlobal,
|
||||
Config: map[string]interface{}{
|
||||
"foo": 1,
|
||||
},
|
||||
}))
|
||||
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "foo",
|
||||
Protocol: "http",
|
||||
}))
|
||||
require.NoError(t, state.EnsureConfigEntry(3, &structs.ServiceConfigEntry{
|
||||
Kind: structs.ServiceDefaults,
|
||||
Name: "bar",
|
||||
Protocol: "grpc",
|
||||
}))
|
||||
require.NoError(t, state.EnsureNode(4, &structs.Node{
|
||||
ID: "9c6e733c-c39d-4555-8d41-0f174a31c489",
|
||||
Node: nodeName,
|
||||
}))
|
||||
|
||||
registration := structs.NodeService{
|
||||
ID: "foo-proxy",
|
||||
Service: "foo-proxy",
|
||||
Proxy: structs.ConnectProxyConfig{
|
||||
MeshGateway: structs.MeshGatewayConfig{
|
||||
Mode: structs.MeshGatewayModeLocal,
|
||||
},
|
||||
},
|
||||
}
|
||||
require.NoError(t, state.EnsureService(5, nodeName, ®istration))
|
||||
|
||||
args := structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
Datacenter: s1.config.Datacenter,
|
||||
MeshGateway: registration.Proxy.MeshGateway,
|
||||
Upstreams: []string{"bar", "baz"},
|
||||
}
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig", &args, &out))
|
||||
|
||||
var index uint64
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"foo": int64(1),
|
||||
"protocol": "http",
|
||||
},
|
||||
// This mesh gateway configuration is pulled from foo-proxy's registration
|
||||
UpstreamConfigs: map[string]map[string]interface{}{
|
||||
"bar": {
|
||||
"protocol": "grpc",
|
||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
||||
},
|
||||
"baz": {
|
||||
"mesh_gateway": map[string]interface{}{"Mode": string(structs.MeshGatewayModeLocal)},
|
||||
},
|
||||
},
|
||||
// Don't know what this is deterministically
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(t, expected, out)
|
||||
index = out.Index
|
||||
|
||||
// Now setup a blocking query for 'foo' while we erase the proxy registration for foo-proxy.
|
||||
// Deleting the foo proxy registration should cause the blocking query to fire because it is
|
||||
// watched when the ID and NodeName are provided.
|
||||
{
|
||||
// Async cause a change
|
||||
start := time.Now()
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
require.NoError(t, state.DeleteService(index+1, nodeName, "foo-proxy", nil))
|
||||
}()
|
||||
|
||||
// Re-run the query
|
||||
var out structs.ServiceConfigResponse
|
||||
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
|
||||
&structs.ServiceConfigRequest{
|
||||
Name: "foo",
|
||||
Datacenter: "dc1",
|
||||
MeshGateway: registration.Proxy.MeshGateway,
|
||||
Upstreams: []string{"bar", "baz"},
|
||||
QueryOptions: structs.QueryOptions{
|
||||
MinQueryIndex: index,
|
||||
MaxQueryTime: time.Second,
|
||||
},
|
||||
},
|
||||
&out,
|
||||
))
|
||||
|
||||
// Should block at least 100ms
|
||||
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")
|
||||
|
||||
// Check the indexes
|
||||
require.Equal(t, out.Index, index+1)
|
||||
|
||||
// The mesh gateway config from the proxy registration should no longer be present
|
||||
expected := structs.ServiceConfigResponse{
|
||||
ProxyConfig: map[string]interface{}{
|
||||
"foo": int64(1),
|
||||
"protocol": "http",
|
||||
},
|
||||
UpstreamConfigs: map[string]map[string]interface{}{
|
||||
"bar": {
|
||||
"protocol": "grpc",
|
||||
},
|
||||
},
|
||||
// Don't know what this is deterministically
|
||||
QueryMeta: out.QueryMeta,
|
||||
}
|
||||
require.Equal(t, expected, out)
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("too slow for testing.Short")
|
||||
|
|
|
@ -1136,24 +1136,6 @@ func (s *Store) NodeService(nodeName string, serviceID string, entMeta *structs.
|
|||
return idx, service, nil
|
||||
}
|
||||
|
||||
// NodeServiceWatch is used to retrieve a specific service associated with the given
|
||||
// node, and add it to the watch set.
|
||||
func (s *Store) NodeServiceWatch(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *structs.EnterpriseMeta) (uint64, *structs.NodeService, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := catalogServicesMaxIndex(tx, entMeta)
|
||||
|
||||
// Query the service
|
||||
service, err := getNodeServiceWatchTxn(tx, ws, nodeName, serviceID, entMeta)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
||||
}
|
||||
|
||||
return idx, service, nil
|
||||
}
|
||||
|
||||
func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
|
||||
// Query the service
|
||||
_, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
|
||||
|
@ -1168,21 +1150,6 @@ func getNodeServiceTxn(tx ReadTxn, nodeName, serviceID string, entMeta *structs.
|
|||
return nil, nil
|
||||
}
|
||||
|
||||
func getNodeServiceWatchTxn(tx ReadTxn, ws memdb.WatchSet, nodeName, serviceID string, entMeta *structs.EnterpriseMeta) (*structs.NodeService, error) {
|
||||
// Query the service
|
||||
watchCh, service, err := firstWatchCompoundWithTxn(tx, "services", "id", entMeta, nodeName, serviceID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed querying service for node %q: %s", nodeName, err)
|
||||
}
|
||||
ws.Add(watchCh)
|
||||
|
||||
if service != nil {
|
||||
return service.(*structs.ServiceNode).ToNodeService(), nil
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func (s *Store) nodeServices(ws memdb.WatchSet, nodeNameOrID string, entMeta *structs.EnterpriseMeta, allowWildcard bool) (bool, uint64, *structs.Node, memdb.ResultIterator, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
|
Loading…
Reference in New Issue