sync changes to oss files made in enterprise (#10670)

This commit is contained in:
R.B. Boyer 2021-07-22 13:58:08 -05:00 committed by GitHub
parent 188e8dc51f
commit fc9b1a277d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 585 additions and 83 deletions

View File

@ -367,6 +367,10 @@ func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) {
return nil, nil return nil, nil
} }
func (f fakeGRPCConnPool) ClientConnLeader() (*grpc.ClientConn, error) {
return nil, nil
}
func TestAgent_ReconnectConfigWanDisabled(t *testing.T) { func TestAgent_ReconnectConfigWanDisabled(t *testing.T) {
if testing.Short() { if testing.Short() {
t.Skip("too slow for testing.Short") t.Skip("too slow for testing.Short")

View File

@ -9,12 +9,8 @@ import (
"testing" "testing"
"time" "time"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/consul/agent/grpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/consul/agent/grpc/resolver"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/router"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -24,6 +20,11 @@ import (
"github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"
) )
func testClientConfig(t *testing.T) (string, *Config) { func testClientConfig(t *testing.T) (string, *Config) {
@ -169,6 +170,10 @@ func TestClient_LANReap(t *testing.T) {
} }
func TestClient_JoinLAN_Invalid(t *testing.T) { func TestClient_JoinLAN_Invalid(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
t.Parallel() t.Parallel()
dir1, s1 := testServer(t) dir1, s1 := testServer(t)
defer os.RemoveAll(dir1) defer os.RemoveAll(dir1)
@ -497,7 +502,9 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
tls, err := tlsutil.NewConfigurator(c.TLSConfig, logger) tls, err := tlsutil.NewConfigurator(c.TLSConfig, logger)
require.NoError(t, err, "failed to create tls configuration") require.NoError(t, err, "failed to create tls configuration")
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter), nil) builder := resolver.NewServerResolverBuilder(resolver.Config{Authority: c.NodeName})
r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter), builder)
resolver.Register(builder)
connPool := &pool.ConnPool{ connPool := &pool.ConnPool{
Server: false, Server: false,
@ -515,6 +522,8 @@ func newDefaultDeps(t *testing.T, c *Config) Deps {
Tokens: new(token.Store), Tokens: new(token.Store),
Router: r, Router: r,
ConnPool: connPool, ConnPool: connPool,
GRPCConnPool: grpc.NewClientConnPool(builder, grpc.TLSWrapper(tls.OutgoingRPCWrapper()), tls.UseTLS),
LeaderForwarder: builder,
EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c), EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c),
} }
} }

View File

@ -333,6 +333,10 @@ func getCAProviderWithLock(s *Server) (ca.Provider, *structs.CARoot) {
} }
func TestLeader_Vault_PrimaryCA_IntermediateRenew(t *testing.T) { func TestLeader_Vault_PrimaryCA_IntermediateRenew(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
ca.SkipIfVaultNotPresent(t) ca.SkipIfVaultNotPresent(t)
// no parallel execution because we change globals // no parallel execution because we change globals

View File

@ -17,9 +17,15 @@ type Deps struct {
Router *router.Router Router *router.Router
ConnPool *pool.ConnPool ConnPool *pool.ConnPool
GRPCConnPool GRPCClientConner GRPCConnPool GRPCClientConner
LeaderForwarder LeaderForwarder
EnterpriseDeps EnterpriseDeps
} }
type GRPCClientConner interface { type GRPCClientConner interface {
ClientConn(datacenter string) (*grpc.ClientConn, error) ClientConn(datacenter string) (*grpc.ClientConn, error)
ClientConnLeader() (*grpc.ClientConn, error)
}
type LeaderForwarder interface {
UpdateLeaderAddr(leaderAddr string)
} }

View File

@ -261,6 +261,10 @@ type Server struct {
// Used to do leader forwarding and provide fast lookup by server id and address // Used to do leader forwarding and provide fast lookup by server id and address
serverLookup *ServerLookup serverLookup *ServerLookup
// grpcLeaderForwarder is notified on leader change in order to keep the grpc
// resolver up to date.
grpcLeaderForwarder LeaderForwarder
// floodLock controls access to floodCh. // floodLock controls access to floodCh.
floodLock sync.RWMutex floodLock sync.RWMutex
floodCh []chan struct{} floodCh []chan struct{}
@ -583,6 +587,8 @@ func NewServer(config *Config, flat Deps) (*Server, error) {
go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s) s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s)
s.grpcLeaderForwarder = flat.LeaderForwarder
go s.trackLeaderChanges()
// Initialize Autopilot. This must happen before starting leadership monitoring // Initialize Autopilot. This must happen before starting leadership monitoring
// as establishing leadership could attempt to use autopilot and cause a panic. // as establishing leadership could attempt to use autopilot and cause a panic.
@ -627,15 +633,15 @@ func newFSMFromConfig(logger hclog.Logger, gc *state.TombstoneGC, config *Config
} }
func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler { func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler {
if !config.RPCConfig.EnableStreaming {
return agentgrpc.NoOpHandler{Logger: deps.Logger}
}
register := func(srv *grpc.Server) { register := func(srv *grpc.Server) {
if config.RPCConfig.EnableStreaming {
pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer( pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer(
&subscribeBackend{srv: s, connPool: deps.GRPCConnPool}, &subscribeBackend{srv: s, connPool: deps.GRPCConnPool},
deps.Logger.Named("grpc-api.subscription"))) deps.Logger.Named("grpc-api.subscription")))
} }
s.registerEnterpriseGRPCServices(deps, srv)
}
return agentgrpc.NewHandler(config.RPCAddr, register) return agentgrpc.NewHandler(config.RPCAddr, register)
} }
@ -1450,6 +1456,33 @@ func (s *Server) isReadyForConsistentReads() bool {
return atomic.LoadInt32(&s.readyForConsistentReads) == 1 return atomic.LoadInt32(&s.readyForConsistentReads) == 1
} }
// trackLeaderChanges registers an Observer with raft in order to receive updates
// about leader changes, in order to keep the grpc resolver up to date for leader forwarding.
func (s *Server) trackLeaderChanges() {
obsCh := make(chan raft.Observation, 16)
observer := raft.NewObserver(obsCh, false, func(o *raft.Observation) bool {
_, ok := o.Data.(raft.LeaderObservation)
return ok
})
s.raft.RegisterObserver(observer)
for {
select {
case obs := <-obsCh:
leaderObs, ok := obs.Data.(raft.LeaderObservation)
if !ok {
s.logger.Debug("got unknown observation type from raft", "type", reflect.TypeOf(obs.Data))
continue
}
s.grpcLeaderForwarder.UpdateLeaderAddr(string(leaderObs.Leader))
case <-s.shutdownCh:
s.raft.DeregisterObserver(observer)
return
}
}
}
// peersInfoContent is used to help operators understand what happened to the // peersInfoContent is used to help operators understand what happened to the
// peers.json file. This is written to a file called peers.info in the same // peers.json file. This is written to a file called peers.info in the same
// location. // location.

View File

@ -4,9 +4,12 @@ package consul
import ( import (
"github.com/hashicorp/serf/serf" "github.com/hashicorp/serf/serf"
"google.golang.org/grpc"
) )
func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error { func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error {
// nothing to do for oss // nothing to do for oss
return nil return nil
} }
func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {}

View File

@ -1826,6 +1826,12 @@ func (q NodeServiceQuery) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault() return q.EnterpriseMeta.NamespaceOrDefault()
} }
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q NodeServiceQuery) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}
// deleteCheckTxn is the inner method used to call a health // deleteCheckTxn is the inner method used to call a health
// check deletion within an existing transaction. // check deletion within an existing transaction.
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {

View File

@ -540,3 +540,9 @@ type NodeCheckQuery struct {
func (q NodeCheckQuery) NamespaceOrDefault() string { func (q NodeCheckQuery) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault() return q.EnterpriseMeta.NamespaceOrDefault()
} }
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q NodeCheckQuery) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}

View File

@ -21,6 +21,12 @@ func (q Query) NamespaceOrDefault() string {
return q.EnterpriseMeta.NamespaceOrDefault() return q.EnterpriseMeta.NamespaceOrDefault()
} }
// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer
// receiver for this method. Remove once that is fixed.
func (q Query) PartitionOrDefault() string {
return q.EnterpriseMeta.PartitionOrDefault()
}
// indexFromQuery builds an index key where Query.Value is lowercase, and is // indexFromQuery builds an index key where Query.Value is lowercase, and is
// a required value. // a required value.
func indexFromQuery(arg interface{}) ([]byte, error) { func indexFromQuery(arg interface{}) ([]byte, error) {

View File

@ -107,10 +107,12 @@ func (tc indexerTestCase) run(t *testing.T, indexer memdb.Indexer) {
} }
if i, ok := indexer.(memdb.MultiIndexer); ok { if i, ok := indexer.(memdb.MultiIndexer); ok {
t.Run("writeIndexMulti", func(t *testing.T) {
valid, actual, err := i.FromObject(tc.writeMulti.source) valid, actual, err := i.FromObject(tc.writeMulti.source)
require.NoError(t, err) require.NoError(t, err)
require.True(t, valid) require.True(t, valid)
require.Equal(t, tc.writeMulti.expected, actual) require.Equal(t, tc.writeMulti.expected, actual)
})
} }
for _, extra := range tc.extra { for _, extra := range tc.extra {

View File

@ -31,6 +31,7 @@ var _ subscribe.Backend = (*subscribeBackend)(nil)
// or if it matches the Datacenter in config. // or if it matches the Datacenter in config.
// //
// TODO: extract this so that it can be used with other grpc services. // TODO: extract this so that it can be used with other grpc services.
// TODO: rename to ForwardToDC
func (s subscribeBackend) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) { func (s subscribeBackend) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) {
if dc == "" || dc == s.srv.config.Datacenter { if dc == "" || dc == s.srv.config.Datacenter {
return false, nil return false, nil

View File

@ -50,15 +50,25 @@ func NewClientConnPool(servers ServerLocator, tls TLSWrapper, useTLSForDC func(d
// existing connections in the pool, a new one will be created, stored in the pool, // existing connections in the pool, a new one will be created, stored in the pool,
// then returned. // then returned.
func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) { func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) {
return c.dial(datacenter, "server")
}
// TODO: godoc
func (c *ClientConnPool) ClientConnLeader() (*grpc.ClientConn, error) {
return c.dial("local", "leader")
}
func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.ClientConn, error) {
c.connsLock.Lock() c.connsLock.Lock()
defer c.connsLock.Unlock() defer c.connsLock.Unlock()
if conn, ok := c.conns[datacenter]; ok { target := fmt.Sprintf("consul://%s/%s.%s", c.servers.Authority(), serverType, datacenter)
if conn, ok := c.conns[target]; ok {
return conn, nil return conn, nil
} }
conn, err := grpc.Dial( conn, err := grpc.Dial(
fmt.Sprintf("consul://%s/server.%s", c.servers.Authority(), datacenter), target,
// use WithInsecure mode here because we handle the TLS wrapping in the // use WithInsecure mode here because we handle the TLS wrapping in the
// custom dialer based on logic around whether the server has TLS enabled. // custom dialer based on logic around whether the server has TLS enabled.
grpc.WithInsecure(), grpc.WithInsecure(),
@ -86,7 +96,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error)
return nil, err return nil, err
} }
c.conns[datacenter] = conn c.conns[target] = conn
return conn, nil return conn, nil
} }

View File

@ -113,6 +113,44 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) {
require.NotEqual(t, resp.ServerName, first.ServerName) require.NotEqual(t, resp.ServerName, first.ServerName)
} }
func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) {
count := 3
conf := newConfig(t)
res := resolver.NewServerResolverBuilder(conf)
registerWithGRPC(t, res)
pool := NewClientConnPool(res, nil, useTLSForDcAlwaysTrue)
var servers []testServer
for i := 0; i < count; i++ {
name := fmt.Sprintf("server-%d", i)
srv := newTestServer(t, name, "dc1")
res.AddServer(srv.Metadata())
servers = append(servers, srv)
t.Cleanup(srv.shutdown)
}
// Set the leader address to the first server.
res.UpdateLeaderAddr(servers[0].addr.String())
conn, err := pool.ClientConnLeader()
require.NoError(t, err)
client := testservice.NewSimpleClient(conn)
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
t.Cleanup(cancel)
first, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.Equal(t, first.ServerName, servers[0].name)
// Update the leader address and make another request.
res.UpdateLeaderAddr(servers[1].addr.String())
resp, err := client.Something(ctx, &testservice.Req{})
require.NoError(t, err)
require.Equal(t, resp.ServerName, servers[1].name)
}
func newConfig(t *testing.T) resolver.Config { func newConfig(t *testing.T) resolver.Config {
n := t.Name() n := t.Name()
s := strings.Replace(n, "/", "", -1) s := strings.Replace(n, "/", "", -1)

View File

@ -16,13 +16,15 @@ import (
// ServerResolvers updated when changes occur. // ServerResolvers updated when changes occur.
type ServerResolverBuilder struct { type ServerResolverBuilder struct {
cfg Config cfg Config
// leaderResolver is used to track the address of the leader in the local DC.
leaderResolver leaderResolver
// servers is an index of Servers by Server.ID. The map contains server IDs // servers is an index of Servers by Server.ID. The map contains server IDs
// for all datacenters. // for all datacenters.
servers map[string]*metadata.Server servers map[string]*metadata.Server
// resolvers is an index of connections to the serverResolver which manages // resolvers is an index of connections to the serverResolver which manages
// addresses of servers for that connection. // addresses of servers for that connection.
resolvers map[resolver.ClientConn]*serverResolver resolvers map[resolver.ClientConn]*serverResolver
// lock for servers and resolvers. // lock for all stateful fields (excludes config which is immutable).
lock sync.RWMutex lock sync.RWMutex
} }
@ -89,9 +91,23 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client
if resolver, ok := s.resolvers[cc]; ok { if resolver, ok := s.resolvers[cc]; ok {
return resolver, nil return resolver, nil
} }
if cc == s.leaderResolver.clientConn {
return s.leaderResolver, nil
}
serverType, datacenter, err := parseEndpoint(target.Endpoint)
if err != nil {
return nil, err
}
if serverType == "leader" {
// TODO: is this safe? can we ever have multiple CC for the leader? Seems
// like we can only have one given the caching in ClientConnPool.Dial
s.leaderResolver.clientConn = cc
s.leaderResolver.updateClientConn()
return s.leaderResolver, nil
}
// Make a new resolver for the dc and add it to the list of active ones. // Make a new resolver for the dc and add it to the list of active ones.
datacenter := strings.TrimPrefix(target.Endpoint, "server.")
resolver := &serverResolver{ resolver := &serverResolver{
datacenter: datacenter, datacenter: datacenter,
clientConn: cc, clientConn: cc,
@ -107,6 +123,16 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client
return resolver, nil return resolver, nil
} }
// parseEndpoint parses a string, expecting a format of "serverType.datacenter"
func parseEndpoint(target string) (string, string, error) {
parts := strings.SplitN(target, ".", 2)
if len(parts) != 2 {
return "", "", fmt.Errorf("unexpected endpoint address: %v", target)
}
return parts[0], parts[1], nil
}
func (s *ServerResolverBuilder) Authority() string { func (s *ServerResolverBuilder) Authority() string {
return s.cfg.Authority return s.cfg.Authority
} }
@ -168,6 +194,15 @@ func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address {
return addrs return addrs
} }
// UpdateLeaderAddr updates the leader address in the local DC's resolver.
func (s *ServerResolverBuilder) UpdateLeaderAddr(leaderAddr string) {
s.lock.Lock()
defer s.lock.Unlock()
s.leaderResolver.addr = leaderAddr
s.leaderResolver.updateClientConn()
}
// serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date // serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date
// on the list of server addresses to use. // on the list of server addresses to use.
type serverResolver struct { type serverResolver struct {
@ -224,4 +259,29 @@ func (r *serverResolver) Close() {
} }
// ResolveNow is not used // ResolveNow is not used
func (*serverResolver) ResolveNow(_ resolver.ResolveNowOption) {} func (*serverResolver) ResolveNow(resolver.ResolveNowOption) {}
type leaderResolver struct {
addr string
clientConn resolver.ClientConn
}
func (l leaderResolver) ResolveNow(resolver.ResolveNowOption) {}
func (l leaderResolver) Close() {}
func (l leaderResolver) updateClientConn() {
if l.addr == "" || l.clientConn == nil {
return
}
addrs := []resolver.Address{
{
Addr: l.addr,
Type: resolver.Backend,
ServerName: "leader",
},
}
l.clientConn.UpdateState(resolver.State{Addresses: addrs})
}
var _ resolver.Resolver = (*leaderResolver)(nil)

View File

@ -31,6 +31,7 @@ import (
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/logging"
"github.com/hashicorp/consul/proto/pbcommon"
) )
var HTTPSummaries = []prometheus.SummaryDefinition{ var HTTPSummaries = []prometheus.SummaryDefinition{
@ -901,6 +902,14 @@ func (s *HTTPHandlers) parseConsistency(resp http.ResponseWriter, req *http.Requ
return false return false
} }
// parseConsistencyReadRequest is used to parse the ?consistent query param.
func parseConsistencyReadRequest(resp http.ResponseWriter, req *http.Request, b *pbcommon.ReadRequest) {
query := req.URL.Query()
if _, ok := query["consistent"]; ok {
b.RequireConsistent = true
}
}
// parseDC is used to parse the ?dc query param // parseDC is used to parse the ?dc query param
func (s *HTTPHandlers) parseDC(req *http.Request, dc *string) { func (s *HTTPHandlers) parseDC(req *http.Request, dc *string) {
if other := req.URL.Query().Get("dc"); other != "" { if other := req.URL.Query().Get("dc"); other != "" {

View File

@ -277,16 +277,16 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) {
type testBackend struct { type testBackend struct {
store *state.Store store *state.Store
authorizer func(token string) acl.Authorizer authorizer func(token string, entMeta *structs.EnterpriseMeta) acl.Authorizer
forwardConn *gogrpc.ClientConn forwardConn *gogrpc.ClientConn
} }
func (b testBackend) ResolveTokenAndDefaultMeta( func (b testBackend) ResolveTokenAndDefaultMeta(
token string, token string,
_ *structs.EnterpriseMeta, entMeta *structs.EnterpriseMeta,
_ *acl.AuthorizerContext, _ *acl.AuthorizerContext,
) (acl.Authorizer, error) { ) (acl.Authorizer, error) {
return b.authorizer(token), nil return b.authorizer(token, entMeta), nil
} }
func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) { func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) {
@ -306,7 +306,7 @@ func newTestBackend() (*testBackend, error) {
return nil, err return nil, err
} }
store := state.NewStateStoreWithEventPublisher(gc) store := state.NewStateStoreWithEventPublisher(gc)
allowAll := func(_ string) acl.Authorizer { allowAll := func(string, *structs.EnterpriseMeta) acl.Authorizer {
return acl.AllowAll() return acl.AllowAll()
} }
return &testBackend{store: store, authorizer: allowAll}, nil return &testBackend{store: store, authorizer: allowAll}, nil
@ -612,7 +612,7 @@ node "node1" {
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
// TODO: is there any easy way to do this with the acl package? // TODO: is there any easy way to do this with the acl package?
backend.authorizer = func(tok string) acl.Authorizer { backend.authorizer = func(tok string, _ *structs.EnterpriseMeta) acl.Authorizer {
if tok == token { if tok == token {
return authorizer return authorizer
} }
@ -811,7 +811,7 @@ node "node1" {
require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil))
// TODO: is there any easy way to do this with the acl package? // TODO: is there any easy way to do this with the acl package?
backend.authorizer = func(tok string) acl.Authorizer { backend.authorizer = func(tok string, _ *structs.EnterpriseMeta) acl.Authorizer {
if tok == token { if tok == token {
return authorizer return authorizer
} }

View File

@ -106,6 +106,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error)
builder := resolver.NewServerResolverBuilder(resolver.Config{}) builder := resolver.NewServerResolverBuilder(resolver.Config{})
resolver.Register(builder) resolver.Register(builder)
d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), d.TLSConfigurator.UseTLS) d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), d.TLSConfigurator.UseTLS)
d.LeaderForwarder = builder
d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder) d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder)

View File

@ -1,3 +1,5 @@
// +build !consulent
package structs package structs
import ( import (

View File

@ -76,6 +76,10 @@ const (
// HTTPNamespaceEnvVar defines an environment variable name which sets // HTTPNamespaceEnvVar defines an environment variable name which sets
// the HTTP Namespace to be used by default. This can still be overridden. // the HTTP Namespace to be used by default. This can still be overridden.
HTTPNamespaceEnvName = "CONSUL_NAMESPACE" HTTPNamespaceEnvName = "CONSUL_NAMESPACE"
// HTTPPartitionEnvName defines an environment variable name which sets
// the HTTP Partition to be used by default. This can still be overridden.
HTTPPartitionEnvName = "CONSUL_PARTITION"
) )
// QueryOptions are used to parameterize a query // QueryOptions are used to parameterize a query
@ -84,6 +88,10 @@ type QueryOptions struct {
// Note: Namespaces are available only in Consul Enterprise // Note: Namespaces are available only in Consul Enterprise
Namespace string Namespace string
// Partition overrides the `default` partition
// Note: Partitions are available only in Consul Enterprise
Partition string
// Providing a datacenter overwrites the DC provided // Providing a datacenter overwrites the DC provided
// by the Config // by the Config
Datacenter string Datacenter string
@ -191,6 +199,10 @@ type WriteOptions struct {
// Note: Namespaces are available only in Consul Enterprise // Note: Namespaces are available only in Consul Enterprise
Namespace string Namespace string
// Partition overrides the `default` partition
// Note: Partitions are available only in Consul Enterprise
Partition string
// Providing a datacenter overwrites the DC provided // Providing a datacenter overwrites the DC provided
// by the Config // by the Config
Datacenter string Datacenter string
@ -314,6 +326,10 @@ type Config struct {
// when no other Namespace is present in the QueryOptions // when no other Namespace is present in the QueryOptions
Namespace string Namespace string
// Partition is the name of the partition to send along for the request
// when no other Partition is present in the QueryOptions
Partition string
TLSConfig TLSConfig TLSConfig TLSConfig
} }
@ -466,6 +482,10 @@ func defaultConfig(logger hclog.Logger, transportFn func() *http.Transport) *Con
config.Namespace = v config.Namespace = v
} }
if v := os.Getenv(HTTPPartitionEnvName); v != "" {
config.Partition = v
}
return config return config
} }
@ -732,6 +752,9 @@ func (r *request) setQueryOptions(q *QueryOptions) {
if q.Namespace != "" { if q.Namespace != "" {
r.params.Set("ns", q.Namespace) r.params.Set("ns", q.Namespace)
} }
if q.Partition != "" {
r.params.Set("partition", q.Partition)
}
if q.Datacenter != "" { if q.Datacenter != "" {
r.params.Set("dc", q.Datacenter) r.params.Set("dc", q.Datacenter)
} }
@ -834,6 +857,9 @@ func (r *request) setWriteOptions(q *WriteOptions) {
if q.Namespace != "" { if q.Namespace != "" {
r.params.Set("ns", q.Namespace) r.params.Set("ns", q.Namespace)
} }
if q.Partition != "" {
r.params.Set("partition", q.Partition)
}
if q.Datacenter != "" { if q.Datacenter != "" {
r.params.Set("dc", q.Datacenter) r.params.Set("dc", q.Datacenter)
} }
@ -908,6 +934,9 @@ func (c *Client) newRequest(method, path string) *request {
if c.config.Namespace != "" { if c.config.Namespace != "" {
r.params.Set("ns", c.config.Namespace) r.params.Set("ns", c.config.Namespace)
} }
if c.config.Partition != "" {
r.params.Set("partition", c.config.Partition)
}
if c.config.WaitTime != 0 { if c.config.WaitTime != 0 {
r.params.Set("wait", durToMsec(r.config.WaitTime)) r.params.Set("wait", durToMsec(r.config.WaitTime))
} }

View File

@ -738,6 +738,7 @@ func TestAPI_SetQueryOptions(t *testing.T) {
r := c.newRequest("GET", "/v1/kv/foo") r := c.newRequest("GET", "/v1/kv/foo")
q := &QueryOptions{ q := &QueryOptions{
Namespace: "operator", Namespace: "operator",
Partition: "asdf",
Datacenter: "foo", Datacenter: "foo",
AllowStale: true, AllowStale: true,
RequireConsistent: true, RequireConsistent: true,
@ -752,6 +753,9 @@ func TestAPI_SetQueryOptions(t *testing.T) {
if r.params.Get("ns") != "operator" { if r.params.Get("ns") != "operator" {
t.Fatalf("bad: %v", r.params) t.Fatalf("bad: %v", r.params)
} }
if r.params.Get("partition") != "asdf" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("dc") != "foo" { if r.params.Get("dc") != "foo" {
t.Fatalf("bad: %v", r.params) t.Fatalf("bad: %v", r.params)
} }
@ -799,6 +803,7 @@ func TestAPI_SetWriteOptions(t *testing.T) {
r := c.newRequest("GET", "/v1/kv/foo") r := c.newRequest("GET", "/v1/kv/foo")
q := &WriteOptions{ q := &WriteOptions{
Namespace: "operator", Namespace: "operator",
Partition: "asdf",
Datacenter: "foo", Datacenter: "foo",
Token: "23456", Token: "23456",
} }
@ -806,6 +811,9 @@ func TestAPI_SetWriteOptions(t *testing.T) {
if r.params.Get("ns") != "operator" { if r.params.Get("ns") != "operator" {
t.Fatalf("bad: %v", r.params) t.Fatalf("bad: %v", r.params)
} }
if r.params.Get("partition") != "asdf" {
t.Fatalf("bad: %v", r.params)
}
if r.params.Get("dc") != "foo" { if r.params.Get("dc") != "foo" {
t.Fatalf("bad: %v", r.params) t.Fatalf("bad: %v", r.params)
} }

View File

@ -49,6 +49,7 @@ const (
Session string = "session" Session string = "session"
Sentinel string = "sentinel" Sentinel string = "sentinel"
Snapshot string = "snapshot" Snapshot string = "snapshot"
Partition string = "partition"
TerminatingGateway string = "terminating_gateway" TerminatingGateway string = "terminating_gateway"
TLSUtil string = "tlsutil" TLSUtil string = "tlsutil"
Transaction string = "txn" Transaction string = "txn"

View File

@ -74,6 +74,14 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) {
q.StaleIfError = staleIfError q.StaleIfError = staleIfError
} }
func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool {
o := structs.QueryOptions{
MaxQueryTime: q.MaxQueryTime,
MinQueryIndex: q.MinQueryIndex,
}
return o.HasTimedOut(start, rpcHoldTimeout, maxQueryTime, defaultQueryTime)
}
// SetFilter is needed to implement the structs.QueryOptionsCompat interface // SetFilter is needed to implement the structs.QueryOptionsCompat interface
func (q *QueryOptions) SetFilter(filter string) { func (q *QueryOptions) SetFilter(filter string) {
q.Filter = filter q.Filter = filter
@ -121,6 +129,10 @@ func (w WriteRequest) AllowStaleRead() bool {
return false return false
} }
func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) bool {
return time.Since(start) > rpcHoldTimeout
}
func (td TargetDatacenter) RequestDatacenter() string { func (td TargetDatacenter) RequestDatacenter() string {
return td.Datacenter return td.Datacenter
} }

View File

@ -37,6 +37,16 @@ func (msg *WriteRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ReadRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ReadRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *QueryOptions) MarshalBinary() ([]byte, error) { func (msg *QueryOptions) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)

View File

@ -152,6 +152,67 @@ func (m *WriteRequest) GetToken() string {
return "" return ""
} }
// ReadRequest is a type that may be embedded into any requests for read
// operations.
// It is a replacement for QueryOptions now that we no longer need any of those
// fields because we are moving away from using blocking queries.
// It is also similar to WriteRequest. It is a separate type so that in the
// future we can introduce fields that may only be relevant for reads.
type ReadRequest struct {
// Token is the ACL token ID. If not provided, the 'anonymous'
// token is assumed for backwards compatibility.
Token string `protobuf:"bytes,1,opt,name=Token,proto3" json:"Token,omitempty"`
// RequireConsistent indicates that the request must be sent to the leader.
RequireConsistent bool `protobuf:"varint,2,opt,name=RequireConsistent,proto3" json:"RequireConsistent,omitempty"`
}
func (m *ReadRequest) Reset() { *m = ReadRequest{} }
func (m *ReadRequest) String() string { return proto.CompactTextString(m) }
func (*ReadRequest) ProtoMessage() {}
func (*ReadRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{3}
}
func (m *ReadRequest) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ReadRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ReadRequest.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ReadRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReadRequest.Merge(m, src)
}
func (m *ReadRequest) XXX_Size() int {
return m.Size()
}
func (m *ReadRequest) XXX_DiscardUnknown() {
xxx_messageInfo_ReadRequest.DiscardUnknown(m)
}
var xxx_messageInfo_ReadRequest proto.InternalMessageInfo
func (m *ReadRequest) GetToken() string {
if m != nil {
return m.Token
}
return ""
}
func (m *ReadRequest) GetRequireConsistent() bool {
if m != nil {
return m.RequireConsistent
}
return false
}
// QueryOptions is used to specify various flags for read queries // QueryOptions is used to specify various flags for read queries
type QueryOptions struct { type QueryOptions struct {
// Token is the ACL token ID. If not provided, the 'anonymous' // Token is the ACL token ID. If not provided, the 'anonymous'
@ -209,7 +270,7 @@ func (m *QueryOptions) Reset() { *m = QueryOptions{} }
func (m *QueryOptions) String() string { return proto.CompactTextString(m) } func (m *QueryOptions) String() string { return proto.CompactTextString(m) }
func (*QueryOptions) ProtoMessage() {} func (*QueryOptions) ProtoMessage() {}
func (*QueryOptions) Descriptor() ([]byte, []int) { func (*QueryOptions) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{3} return fileDescriptor_a6f5ac44994d718c, []int{4}
} }
func (m *QueryOptions) XXX_Unmarshal(b []byte) error { func (m *QueryOptions) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -336,7 +397,7 @@ func (m *QueryMeta) Reset() { *m = QueryMeta{} }
func (m *QueryMeta) String() string { return proto.CompactTextString(m) } func (m *QueryMeta) String() string { return proto.CompactTextString(m) }
func (*QueryMeta) ProtoMessage() {} func (*QueryMeta) ProtoMessage() {}
func (*QueryMeta) Descriptor() ([]byte, []int) { func (*QueryMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{4} return fileDescriptor_a6f5ac44994d718c, []int{5}
} }
func (m *QueryMeta) XXX_Unmarshal(b []byte) error { func (m *QueryMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -398,15 +459,15 @@ func (m *QueryMeta) GetConsistencyLevel() string {
type EnterpriseMeta struct { type EnterpriseMeta struct {
// Namespace in which the entity exists. // Namespace in which the entity exists.
Namespace string `protobuf:"bytes,1,opt,name=Namespace,proto3" json:"Namespace,omitempty"` Namespace string `protobuf:"bytes,1,opt,name=Namespace,proto3" json:"Namespace,omitempty"`
// Tenant in which the entity exists. // Partition in which the entity exists.
Tenant string `protobuf:"bytes,2,opt,name=Tenant,proto3" json:"Tenant,omitempty"` Partition string `protobuf:"bytes,2,opt,name=Partition,proto3" json:"Partition,omitempty"`
} }
func (m *EnterpriseMeta) Reset() { *m = EnterpriseMeta{} } func (m *EnterpriseMeta) Reset() { *m = EnterpriseMeta{} }
func (m *EnterpriseMeta) String() string { return proto.CompactTextString(m) } func (m *EnterpriseMeta) String() string { return proto.CompactTextString(m) }
func (*EnterpriseMeta) ProtoMessage() {} func (*EnterpriseMeta) ProtoMessage() {}
func (*EnterpriseMeta) Descriptor() ([]byte, []int) { func (*EnterpriseMeta) Descriptor() ([]byte, []int) {
return fileDescriptor_a6f5ac44994d718c, []int{5} return fileDescriptor_a6f5ac44994d718c, []int{6}
} }
func (m *EnterpriseMeta) XXX_Unmarshal(b []byte) error { func (m *EnterpriseMeta) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b) return m.Unmarshal(b)
@ -439,6 +500,7 @@ func init() {
proto.RegisterType((*RaftIndex)(nil), "common.RaftIndex") proto.RegisterType((*RaftIndex)(nil), "common.RaftIndex")
proto.RegisterType((*TargetDatacenter)(nil), "common.TargetDatacenter") proto.RegisterType((*TargetDatacenter)(nil), "common.TargetDatacenter")
proto.RegisterType((*WriteRequest)(nil), "common.WriteRequest") proto.RegisterType((*WriteRequest)(nil), "common.WriteRequest")
proto.RegisterType((*ReadRequest)(nil), "common.ReadRequest")
proto.RegisterType((*QueryOptions)(nil), "common.QueryOptions") proto.RegisterType((*QueryOptions)(nil), "common.QueryOptions")
proto.RegisterType((*QueryMeta)(nil), "common.QueryMeta") proto.RegisterType((*QueryMeta)(nil), "common.QueryMeta")
proto.RegisterType((*EnterpriseMeta)(nil), "common.EnterpriseMeta") proto.RegisterType((*EnterpriseMeta)(nil), "common.EnterpriseMeta")
@ -447,45 +509,46 @@ func init() {
func init() { proto.RegisterFile("proto/pbcommon/common.proto", fileDescriptor_a6f5ac44994d718c) } func init() { proto.RegisterFile("proto/pbcommon/common.proto", fileDescriptor_a6f5ac44994d718c) }
var fileDescriptor_a6f5ac44994d718c = []byte{ var fileDescriptor_a6f5ac44994d718c = []byte{
// 606 bytes of a gzipped FileDescriptorProto // 620 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x6e, 0xd3, 0x4c, 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x41, 0x4f, 0xd4, 0x40,
0x14, 0x8d, 0xbf, 0x2f, 0x0d, 0xf1, 0x4d, 0x5b, 0x95, 0x51, 0x85, 0x4c, 0x41, 0x4e, 0x14, 0x21, 0x14, 0xde, 0xe2, 0xb2, 0x6e, 0xdf, 0x02, 0xc1, 0x09, 0x31, 0x15, 0x4d, 0x97, 0x6c, 0x8c, 0x21,
0x54, 0x55, 0x10, 0x4b, 0x65, 0x57, 0x56, 0x6d, 0xda, 0xa2, 0x8a, 0x98, 0x0a, 0x13, 0x84, 0xc4, 0x44, 0xb7, 0x09, 0xde, 0xf0, 0x04, 0x0b, 0x1a, 0xe2, 0x56, 0x74, 0xc4, 0x90, 0x78, 0x9b, 0xed,
0x6e, 0xe2, 0xdc, 0x38, 0x23, 0xec, 0x19, 0x33, 0x1e, 0xb7, 0xe9, 0x1b, 0xb0, 0x64, 0x59, 0xb1, 0xbe, 0xed, 0x4e, 0x6c, 0x3b, 0x75, 0x3a, 0x85, 0xe5, 0x1f, 0x78, 0xf4, 0x48, 0x3c, 0xf9, 0x43,
0xe2, 0x41, 0x78, 0x80, 0x2e, 0xbb, 0x64, 0x55, 0xa0, 0x79, 0x03, 0x9e, 0x00, 0x79, 0xec, 0xb6, 0xfc, 0x01, 0x1c, 0x39, 0x7a, 0x42, 0x65, 0xff, 0x81, 0xbf, 0xc0, 0x74, 0x5a, 0xa0, 0x08, 0x18,
0x2e, 0xed, 0x22, 0xac, 0x92, 0x73, 0x7c, 0xce, 0xcc, 0xfd, 0x39, 0x36, 0x3c, 0x88, 0xa5, 0x50, 0x3c, 0xed, 0x7e, 0xdf, 0x7c, 0xdf, 0xeb, 0x9b, 0xf7, 0xbe, 0x16, 0xee, 0xc7, 0x52, 0x28, 0xe1,
0xc2, 0x89, 0x07, 0xbe, 0x88, 0x22, 0xc1, 0x9d, 0xfc, 0xa7, 0xa3, 0x59, 0x52, 0xcb, 0xd1, 0x8a, 0xc4, 0x3d, 0x4f, 0x84, 0xa1, 0x88, 0x9c, 0xfc, 0xa7, 0xad, 0x59, 0x52, 0xcb, 0xd1, 0xbc, 0xed,
0x1d, 0x08, 0x11, 0x84, 0xe8, 0x68, 0x76, 0x90, 0x8e, 0x9c, 0x61, 0x2a, 0xa9, 0x62, 0x17, 0xba, 0x0b, 0xe1, 0x07, 0xe8, 0x68, 0xb6, 0x97, 0x0e, 0x9c, 0x7e, 0x2a, 0x99, 0xe2, 0xa7, 0xba, 0xf9,
0x95, 0xe5, 0x40, 0x04, 0x22, 0x3f, 0x28, 0xfb, 0x97, 0xb3, 0xed, 0x08, 0x4c, 0x8f, 0x8e, 0xd4, 0x39, 0x5f, 0xf8, 0x22, 0x2f, 0x94, 0xfd, 0xcb, 0xd9, 0x56, 0x08, 0x26, 0x65, 0x03, 0xb5, 0x19,
0x1e, 0x1f, 0xe2, 0x84, 0x38, 0xd0, 0xe8, 0x4a, 0xa4, 0x0a, 0x35, 0xb4, 0x8c, 0x96, 0xb1, 0x5a, 0xf5, 0x71, 0x44, 0x1c, 0x68, 0x74, 0x24, 0x32, 0x85, 0x1a, 0x5a, 0xc6, 0x82, 0xb1, 0x58, 0x5d,
0xdd, 0x5a, 0xf8, 0x7d, 0xd6, 0x34, 0x07, 0x38, 0x89, 0xe5, 0x46, 0xfb, 0x69, 0xdb, 0x2b, 0x2b, 0x9b, 0xfe, 0x7d, 0xdc, 0x34, 0x7b, 0x38, 0x8a, 0xe5, 0x4a, 0xeb, 0x49, 0x8b, 0x96, 0x15, 0x99,
0x32, 0x83, 0x2b, 0x86, 0x6c, 0x74, 0x94, 0x1b, 0xfe, 0xbb, 0xd5, 0x50, 0x52, 0xb4, 0xd7, 0x61, 0xc1, 0x15, 0x7d, 0x3e, 0xd8, 0xcf, 0x0d, 0x13, 0x57, 0x1a, 0x4a, 0x8a, 0xd6, 0x32, 0xcc, 0x6e,
0xa9, 0x4f, 0x65, 0x80, 0x6a, 0x9b, 0x2a, 0xea, 0x23, 0x57, 0x28, 0x89, 0x0d, 0x70, 0x85, 0xf4, 0x33, 0xe9, 0xa3, 0x5a, 0x67, 0x8a, 0x79, 0x18, 0x29, 0x94, 0xc4, 0x06, 0x38, 0x47, 0xfa, 0xa1,
0xa5, 0xa6, 0x57, 0x62, 0xda, 0x6b, 0x30, 0xff, 0x4e, 0x32, 0x85, 0x1e, 0x7e, 0x4c, 0x31, 0x51, 0x26, 0x2d, 0x31, 0xad, 0x25, 0x98, 0xda, 0x91, 0x5c, 0x21, 0xc5, 0x8f, 0x29, 0x26, 0x8a, 0xcc,
0x64, 0x19, 0xe6, 0xfa, 0xe2, 0x03, 0xf2, 0x42, 0x9a, 0x83, 0x8d, 0xea, 0xa7, 0xaf, 0x4d, 0xa3, 0xc1, 0xe4, 0xb6, 0xf8, 0x80, 0x51, 0x21, 0xcd, 0xc1, 0x4a, 0xf5, 0xd3, 0xd7, 0xa6, 0xd1, 0xda,
0xfd, 0xa5, 0x0a, 0xf3, 0xaf, 0x53, 0x94, 0x47, 0xfb, 0x71, 0xd6, 0x7a, 0x72, 0xbb, 0x98, 0x3c, 0x81, 0x06, 0x45, 0xd6, 0xff, 0xa7, 0x94, 0x3c, 0x86, 0x3b, 0x99, 0x80, 0x4b, 0xec, 0x88, 0x28,
0x82, 0x05, 0x97, 0x71, 0x2d, 0x2c, 0x55, 0xee, 0x5d, 0x27, 0xc9, 0x0b, 0x98, 0x77, 0xe9, 0x44, 0xe1, 0x89, 0xc2, 0x48, 0xe9, 0xde, 0xeb, 0xf4, 0xf2, 0x41, 0x51, 0xf8, 0x4b, 0x15, 0xa6, 0xde,
0x13, 0x7d, 0x16, 0xa1, 0xf5, 0x7f, 0xcb, 0x58, 0x6d, 0xac, 0xdf, 0xef, 0xe4, 0x83, 0xee, 0x5c, 0xa4, 0x28, 0xf7, 0xb7, 0xe2, 0x6c, 0xa6, 0xc9, 0x35, 0xa5, 0x1f, 0xc2, 0xb4, 0xcb, 0x23, 0x2d,
0x0c, 0xba, 0xb3, 0x5d, 0x0c, 0x7a, 0xab, 0x7e, 0x72, 0xd6, 0xac, 0x1c, 0xff, 0x68, 0x1a, 0xde, 0x2c, 0x8d, 0x84, 0x5e, 0x24, 0xc9, 0x0b, 0x98, 0x72, 0xd9, 0x48, 0x13, 0xdb, 0x3c, 0x44, 0xeb,
0x35, 0x63, 0xd6, 0xe1, 0x66, 0x18, 0x8a, 0xc3, 0x37, 0x8a, 0x86, 0x68, 0x55, 0x5b, 0xc6, 0x6a, 0xd6, 0x82, 0xb1, 0xd8, 0x58, 0xbe, 0xd7, 0xce, 0x37, 0xd8, 0x3e, 0xdd, 0x60, 0x7b, 0xbd, 0xd8,
0xdd, 0x2b, 0x31, 0xe4, 0x09, 0xdc, 0xcd, 0x9a, 0x63, 0x12, 0xbb, 0x82, 0x27, 0x2c, 0x51, 0xc8, 0xe0, 0x5a, 0xfd, 0xf0, 0xb8, 0x59, 0x39, 0xf8, 0xd1, 0x34, 0xe8, 0x05, 0x63, 0x36, 0xba, 0xd5,
0x95, 0x35, 0xa7, 0x65, 0x37, 0x1f, 0x90, 0x15, 0xa8, 0xbf, 0x4d, 0xb0, 0x4b, 0xfd, 0x31, 0x5a, 0x20, 0x10, 0x7b, 0x6f, 0x15, 0x0b, 0xd0, 0xaa, 0xea, 0x2b, 0x94, 0x98, 0xab, 0x6f, 0x3a, 0x79,
0x35, 0x2d, 0xba, 0xc4, 0x64, 0x1f, 0x96, 0x5c, 0x3a, 0xd1, 0xa7, 0x5e, 0x54, 0x65, 0xdd, 0x99, 0xcd, 0x4d, 0xc9, 0x3c, 0xd4, 0xdf, 0x25, 0xd8, 0x61, 0xde, 0x10, 0xad, 0x9a, 0x16, 0x9d, 0x61,
0xbd, 0xec, 0x1b, 0x66, 0xf2, 0x1c, 0x6a, 0x2e, 0x9d, 0x6c, 0x06, 0x68, 0xd5, 0x67, 0x3f, 0xa6, 0xb2, 0x05, 0xb3, 0x2e, 0x1b, 0xe9, 0xaa, 0xa7, 0x5d, 0x59, 0xb7, 0x6f, 0xde, 0xf6, 0x25, 0x33,
0xb0, 0x90, 0xc7, 0xb0, 0xe8, 0xa6, 0x89, 0xf2, 0xf0, 0x80, 0x86, 0x6c, 0x48, 0x15, 0x5a, 0xa6, 0x79, 0x06, 0x35, 0x97, 0x8d, 0x56, 0x7d, 0xb4, 0xea, 0x37, 0x2f, 0x53, 0x58, 0xc8, 0x23, 0x98,
0xae, 0xf7, 0x2f, 0x36, 0x1b, 0xb4, 0xbe, 0x75, 0x6f, 0xb4, 0x23, 0xa5, 0x90, 0x16, 0xfc, 0xc3, 0x71, 0xd3, 0x44, 0x51, 0xdc, 0x65, 0x01, 0xef, 0x33, 0x85, 0x96, 0xa9, 0xfb, 0xfd, 0x8b, 0xcd,
0xa0, 0xcb, 0x46, 0x72, 0x0f, 0x6a, 0xbb, 0x2c, 0xcc, 0x62, 0xd4, 0xd0, 0xeb, 0x2e, 0x50, 0x11, 0x06, 0xad, 0x9f, 0xba, 0x39, 0xd8, 0x90, 0x52, 0x48, 0x0b, 0xfe, 0x63, 0xd0, 0x65, 0x23, 0xb9,
0x8e, 0x6f, 0x06, 0x98, 0x7a, 0x29, 0x2e, 0x2a, 0x9a, 0x25, 0xa3, 0x14, 0x73, 0x2f, 0x07, 0x64, 0x0b, 0xb5, 0xe7, 0x3c, 0xc8, 0xf2, 0xd9, 0xd0, 0xeb, 0x2e, 0x50, 0x11, 0x8e, 0x6f, 0x06, 0x98,
0x07, 0x1a, 0x3d, 0x9a, 0xa8, 0xae, 0xe0, 0x8a, 0xfa, 0x4a, 0xe7, 0x62, 0xc6, 0x4a, 0xca, 0x3e, 0x7a, 0x29, 0x2e, 0x2a, 0x96, 0x25, 0xa3, 0xf4, 0xfe, 0xd0, 0x1c, 0x90, 0x0d, 0x68, 0x74, 0x59,
0xd2, 0x82, 0xc6, 0x4b, 0x2e, 0x0e, 0x79, 0x0f, 0xe9, 0x10, 0xa5, 0x4e, 0x4e, 0xdd, 0x2b, 0x53, 0xa2, 0x3a, 0x22, 0x52, 0xcc, 0xcb, 0xe3, 0x76, 0xc3, 0x4e, 0xca, 0x3e, 0xb2, 0x00, 0x8d, 0x97,
0x64, 0x0d, 0x96, 0x2e, 0x77, 0xea, 0x1f, 0xf5, 0xf0, 0x00, 0x43, 0x9d, 0x0c, 0xd3, 0xbb, 0xc1, 0x91, 0xd8, 0x8b, 0xba, 0xc8, 0xfa, 0x28, 0x75, 0x72, 0xea, 0xb4, 0x4c, 0x91, 0x25, 0x98, 0x3d,
0x17, 0xe5, 0xef, 0xc2, 0xe2, 0x4e, 0xf6, 0x42, 0xc4, 0x92, 0x25, 0xa8, 0x5b, 0x78, 0x08, 0xe6, 0xdb, 0xa9, 0xb7, 0xdf, 0xc5, 0x5d, 0x0c, 0x74, 0x32, 0x4c, 0x7a, 0x89, 0x2f, 0xda, 0xef, 0xc2,
0x2b, 0x1a, 0x61, 0x12, 0x53, 0x1f, 0x8b, 0x80, 0x5f, 0x11, 0xd9, 0x30, 0xfa, 0xc8, 0x29, 0xcf, 0xcc, 0x46, 0xf6, 0xa6, 0xc5, 0x92, 0x27, 0xa8, 0xaf, 0xf0, 0x00, 0xcc, 0x57, 0x2c, 0xc4, 0x24,
0xbb, 0x30, 0xbd, 0x02, 0x6d, 0xf5, 0x4e, 0x7e, 0xd9, 0x95, 0x93, 0x73, 0xdb, 0x38, 0x3d, 0xb7, 0x66, 0x1e, 0x16, 0x01, 0x3f, 0x27, 0xb2, 0xd3, 0xd7, 0x4c, 0x2a, 0xae, 0x43, 0x30, 0x91, 0x9f,
0x8d, 0x9f, 0xe7, 0xb6, 0xf1, 0x79, 0x6a, 0x57, 0x8e, 0xa7, 0x76, 0xe5, 0x74, 0x6a, 0x57, 0xbe, 0x9e, 0x11, 0x6b, 0xdd, 0xc3, 0x5f, 0x76, 0xe5, 0xf0, 0xc4, 0x36, 0x8e, 0x4e, 0x6c, 0xe3, 0xe7,
0x4f, 0xed, 0xca, 0xfb, 0xb5, 0x80, 0xa9, 0x71, 0x3a, 0xe8, 0xf8, 0x22, 0x72, 0xc6, 0x34, 0x19, 0x89, 0x6d, 0x7c, 0x1e, 0xdb, 0x95, 0x83, 0xb1, 0x5d, 0x39, 0x1a, 0xdb, 0x95, 0xef, 0x63, 0xbb,
0x33, 0x5f, 0xc8, 0xd8, 0xf1, 0x05, 0x4f, 0xd2, 0xd0, 0xb9, 0xfe, 0x2d, 0x1a, 0xd4, 0x34, 0x7e, 0xf2, 0x7e, 0xc9, 0xe7, 0x6a, 0x98, 0xf6, 0xda, 0x9e, 0x08, 0x9d, 0x21, 0x4b, 0x86, 0xdc, 0x13,
0xf6, 0x27, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x3a, 0xc6, 0x28, 0xa4, 0x04, 0x00, 0x00, 0x32, 0x76, 0x3c, 0x11, 0x25, 0x69, 0xe0, 0x5c, 0xfc, 0xd4, 0xf5, 0x6a, 0x1a, 0x3f, 0xfd, 0x13,
0x00, 0x00, 0xff, 0xff, 0x9c, 0xf6, 0xbd, 0xcc, 0x03, 0x05, 0x00, 0x00,
} }
func (m *RaftIndex) Marshal() (dAtA []byte, err error) { func (m *RaftIndex) Marshal() (dAtA []byte, err error) {
@ -581,6 +644,46 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
return len(dAtA) - i, nil return len(dAtA) - i, nil
} }
func (m *ReadRequest) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReadRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.RequireConsistent {
i--
if m.RequireConsistent {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i--
dAtA[i] = 0x10
}
if len(m.Token) > 0 {
i -= len(m.Token)
copy(dAtA[i:], m.Token)
i = encodeVarintCommon(dAtA, i, uint64(len(m.Token)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *QueryOptions) Marshal() (dAtA []byte, err error) { func (m *QueryOptions) Marshal() (dAtA []byte, err error) {
size := m.Size() size := m.Size()
dAtA = make([]byte, size) dAtA = make([]byte, size)
@ -768,10 +871,10 @@ func (m *EnterpriseMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) {
_ = i _ = i
var l int var l int
_ = l _ = l
if len(m.Tenant) > 0 { if len(m.Partition) > 0 {
i -= len(m.Tenant) i -= len(m.Partition)
copy(dAtA[i:], m.Tenant) copy(dAtA[i:], m.Partition)
i = encodeVarintCommon(dAtA, i, uint64(len(m.Tenant))) i = encodeVarintCommon(dAtA, i, uint64(len(m.Partition)))
i-- i--
dAtA[i] = 0x12 dAtA[i] = 0x12
} }
@ -837,6 +940,22 @@ func (m *WriteRequest) Size() (n int) {
return n return n
} }
func (m *ReadRequest) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Token)
if l > 0 {
n += 1 + l + sovCommon(uint64(l))
}
if m.RequireConsistent {
n += 2
}
return n
}
func (m *QueryOptions) Size() (n int) { func (m *QueryOptions) Size() (n int) {
if m == nil { if m == nil {
return 0 return 0
@ -908,7 +1027,7 @@ func (m *EnterpriseMeta) Size() (n int) {
if l > 0 { if l > 0 {
n += 1 + l + sovCommon(uint64(l)) n += 1 + l + sovCommon(uint64(l))
} }
l = len(m.Tenant) l = len(m.Partition)
if l > 0 { if l > 0 {
n += 1 + l + sovCommon(uint64(l)) n += 1 + l + sovCommon(uint64(l))
} }
@ -1182,6 +1301,111 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error {
} }
return nil return nil
} }
func (m *ReadRequest) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Token", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthCommon
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthCommon
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Token = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field RequireConsistent", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCommon
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.RequireConsistent = bool(v != 0)
default:
iNdEx = preIndex
skippy, err := skipCommon(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthCommon
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthCommon
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *QueryOptions) Unmarshal(dAtA []byte) error { func (m *QueryOptions) Unmarshal(dAtA []byte) error {
l := len(dAtA) l := len(dAtA)
iNdEx := 0 iNdEx := 0
@ -1750,7 +1974,7 @@ func (m *EnterpriseMeta) Unmarshal(dAtA []byte) error {
iNdEx = postIndex iNdEx = postIndex
case 2: case 2:
if wireType != 2 { if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType) return fmt.Errorf("proto: wrong wireType = %d for field Partition", wireType)
} }
var stringLen uint64 var stringLen uint64
for shift := uint(0); ; shift += 7 { for shift := uint(0); ; shift += 7 {
@ -1778,7 +2002,7 @@ func (m *EnterpriseMeta) Unmarshal(dAtA []byte) error {
if postIndex > l { if postIndex > l {
return io.ErrUnexpectedEOF return io.ErrUnexpectedEOF
} }
m.Tenant = string(dAtA[iNdEx:postIndex]) m.Partition = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex iNdEx = postIndex
default: default:
iNdEx = preIndex iNdEx = preIndex

View File

@ -38,6 +38,24 @@ message WriteRequest {
string Token = 1; string Token = 1;
} }
// ReadRequest is a type that may be embedded into any requests for read
// operations.
// It is a replacement for QueryOptions now that we no longer need any of those
// fields because we are moving away from using blocking queries.
// It is also similar to WriteRequest. It is a separate type so that in the
// future we can introduce fields that may only be relevant for reads.
message ReadRequest {
option (gogoproto.goproto_getters) = true;
// Token is the ACL token ID. If not provided, the 'anonymous'
// token is assumed for backwards compatibility.
string Token = 1;
// RequireConsistent indicates that the request must be sent to the leader.
bool RequireConsistent = 2;
}
// QueryOptions is used to specify various flags for read queries // QueryOptions is used to specify various flags for read queries
message QueryOptions { message QueryOptions {
// The autogenerated getters will implement half of the // The autogenerated getters will implement half of the
@ -139,6 +157,6 @@ message QueryMeta {
message EnterpriseMeta { message EnterpriseMeta {
// Namespace in which the entity exists. // Namespace in which the entity exists.
string Namespace = 1; string Namespace = 1;
// Tenant in which the entity exists. // Partition in which the entity exists.
string Tenant = 2; string Partition = 2;
} }