diff --git a/agent/agent_test.go b/agent/agent_test.go index c7567f5337..c07b20097f 100644 --- a/agent/agent_test.go +++ b/agent/agent_test.go @@ -367,6 +367,10 @@ func (f fakeGRPCConnPool) ClientConn(_ string) (*grpc.ClientConn, error) { return nil, nil } +func (f fakeGRPCConnPool) ClientConnLeader() (*grpc.ClientConn, error) { + return nil, nil +} + func TestAgent_ReconnectConfigWanDisabled(t *testing.T) { if testing.Short() { t.Skip("too slow for testing.Short") diff --git a/agent/consul/client_test.go b/agent/consul/client_test.go index c79a38f050..6e50735cd5 100644 --- a/agent/consul/client_test.go +++ b/agent/consul/client_test.go @@ -9,12 +9,8 @@ import ( "testing" "time" - "github.com/hashicorp/go-hclog" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/serf/serf" - "github.com/stretchr/testify/require" - "golang.org/x/time/rate" - + "github.com/hashicorp/consul/agent/grpc" + "github.com/hashicorp/consul/agent/grpc/resolver" "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -24,6 +20,11 @@ import ( "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" "github.com/hashicorp/consul/tlsutil" + "github.com/hashicorp/go-hclog" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/serf/serf" + "github.com/stretchr/testify/require" + "golang.org/x/time/rate" ) func testClientConfig(t *testing.T) (string, *Config) { @@ -169,6 +170,10 @@ func TestClient_LANReap(t *testing.T) { } func TestClient_JoinLAN_Invalid(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + t.Parallel() dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -497,7 +502,9 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { tls, err := tlsutil.NewConfigurator(c.TLSConfig, logger) require.NoError(t, err, "failed to create tls configuration") - r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter), nil) + builder := resolver.NewServerResolverBuilder(resolver.Config{Authority: c.NodeName}) + r := router.NewRouter(logger, c.Datacenter, fmt.Sprintf("%s.%s", c.NodeName, c.Datacenter), builder) + resolver.Register(builder) connPool := &pool.ConnPool{ Server: false, @@ -515,6 +522,8 @@ func newDefaultDeps(t *testing.T, c *Config) Deps { Tokens: new(token.Store), Router: r, ConnPool: connPool, + GRPCConnPool: grpc.NewClientConnPool(builder, grpc.TLSWrapper(tls.OutgoingRPCWrapper()), tls.UseTLS), + LeaderForwarder: builder, EnterpriseDeps: newDefaultDepsEnterprise(t, logger, c), } } diff --git a/agent/consul/leader_connect_test.go b/agent/consul/leader_connect_test.go index 712a7efd6c..17d0233489 100644 --- a/agent/consul/leader_connect_test.go +++ b/agent/consul/leader_connect_test.go @@ -333,6 +333,10 @@ func getCAProviderWithLock(s *Server) (ca.Provider, *structs.CARoot) { } func TestLeader_Vault_PrimaryCA_IntermediateRenew(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + ca.SkipIfVaultNotPresent(t) // no parallel execution because we change globals diff --git a/agent/consul/options.go b/agent/consul/options.go index f2d536cc5c..efcf32ab23 100644 --- a/agent/consul/options.go +++ b/agent/consul/options.go @@ -17,9 +17,15 @@ type Deps struct { Router *router.Router ConnPool *pool.ConnPool GRPCConnPool GRPCClientConner + LeaderForwarder LeaderForwarder EnterpriseDeps } type GRPCClientConner interface { ClientConn(datacenter string) (*grpc.ClientConn, error) + ClientConnLeader() (*grpc.ClientConn, error) +} + +type LeaderForwarder interface { + UpdateLeaderAddr(leaderAddr string) } diff --git a/agent/consul/server.go b/agent/consul/server.go index 10e9e7b0dc..e23e9e0037 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -261,6 +261,10 @@ type Server struct { // Used to do leader forwarding and provide fast lookup by server id and address serverLookup *ServerLookup + // grpcLeaderForwarder is notified on leader change in order to keep the grpc + // resolver up to date. + grpcLeaderForwarder LeaderForwarder + // floodLock controls access to floodCh. floodLock sync.RWMutex floodCh []chan struct{} @@ -583,6 +587,8 @@ func NewServer(config *Config, flat Deps) (*Server, error) { go reporter.Run(&lib.StopChannelContext{StopCh: s.shutdownCh}) s.grpcHandler = newGRPCHandlerFromConfig(flat, config, s) + s.grpcLeaderForwarder = flat.LeaderForwarder + go s.trackLeaderChanges() // Initialize Autopilot. This must happen before starting leadership monitoring // as establishing leadership could attempt to use autopilot and cause a panic. @@ -627,15 +633,15 @@ func newFSMFromConfig(logger hclog.Logger, gc *state.TombstoneGC, config *Config } func newGRPCHandlerFromConfig(deps Deps, config *Config, s *Server) connHandler { - if !config.RPCConfig.EnableStreaming { - return agentgrpc.NoOpHandler{Logger: deps.Logger} + register := func(srv *grpc.Server) { + if config.RPCConfig.EnableStreaming { + pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer( + &subscribeBackend{srv: s, connPool: deps.GRPCConnPool}, + deps.Logger.Named("grpc-api.subscription"))) + } + s.registerEnterpriseGRPCServices(deps, srv) } - register := func(srv *grpc.Server) { - pbsubscribe.RegisterStateChangeSubscriptionServer(srv, subscribe.NewServer( - &subscribeBackend{srv: s, connPool: deps.GRPCConnPool}, - deps.Logger.Named("grpc-api.subscription"))) - } return agentgrpc.NewHandler(config.RPCAddr, register) } @@ -1450,6 +1456,33 @@ func (s *Server) isReadyForConsistentReads() bool { return atomic.LoadInt32(&s.readyForConsistentReads) == 1 } +// trackLeaderChanges registers an Observer with raft in order to receive updates +// about leader changes, in order to keep the grpc resolver up to date for leader forwarding. +func (s *Server) trackLeaderChanges() { + obsCh := make(chan raft.Observation, 16) + observer := raft.NewObserver(obsCh, false, func(o *raft.Observation) bool { + _, ok := o.Data.(raft.LeaderObservation) + return ok + }) + s.raft.RegisterObserver(observer) + + for { + select { + case obs := <-obsCh: + leaderObs, ok := obs.Data.(raft.LeaderObservation) + if !ok { + s.logger.Debug("got unknown observation type from raft", "type", reflect.TypeOf(obs.Data)) + continue + } + + s.grpcLeaderForwarder.UpdateLeaderAddr(string(leaderObs.Leader)) + case <-s.shutdownCh: + s.raft.DeregisterObserver(observer) + return + } + } +} + // peersInfoContent is used to help operators understand what happened to the // peers.json file. This is written to a file called peers.info in the same // location. diff --git a/agent/consul/server_oss.go b/agent/consul/server_oss.go index 911e901174..5b43c0cfed 100644 --- a/agent/consul/server_oss.go +++ b/agent/consul/server_oss.go @@ -4,9 +4,12 @@ package consul import ( "github.com/hashicorp/serf/serf" + "google.golang.org/grpc" ) func (s *Server) removeFailedNodeEnterprise(remove func(*serf.Serf, string) error, node, wanNode string) error { // nothing to do for oss return nil } + +func (s *Server) registerEnterpriseGRPCServices(deps Deps, srv *grpc.Server) {} diff --git a/agent/consul/state/catalog.go b/agent/consul/state/catalog.go index a518d62399..0356079750 100644 --- a/agent/consul/state/catalog.go +++ b/agent/consul/state/catalog.go @@ -1826,6 +1826,12 @@ func (q NodeServiceQuery) NamespaceOrDefault() string { return q.EnterpriseMeta.NamespaceOrDefault() } +// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (q NodeServiceQuery) PartitionOrDefault() string { + return q.EnterpriseMeta.PartitionOrDefault() +} + // deleteCheckTxn is the inner method used to call a health // check deletion within an existing transaction. func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error { diff --git a/agent/consul/state/catalog_schema.go b/agent/consul/state/catalog_schema.go index 565b4bf1b6..a57c8cf7d2 100644 --- a/agent/consul/state/catalog_schema.go +++ b/agent/consul/state/catalog_schema.go @@ -540,3 +540,9 @@ type NodeCheckQuery struct { func (q NodeCheckQuery) NamespaceOrDefault() string { return q.EnterpriseMeta.NamespaceOrDefault() } + +// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (q NodeCheckQuery) PartitionOrDefault() string { + return q.EnterpriseMeta.PartitionOrDefault() +} diff --git a/agent/consul/state/query.go b/agent/consul/state/query.go index 81f95121d8..4ca1051f4f 100644 --- a/agent/consul/state/query.go +++ b/agent/consul/state/query.go @@ -21,6 +21,12 @@ func (q Query) NamespaceOrDefault() string { return q.EnterpriseMeta.NamespaceOrDefault() } +// PartitionOrDefault exists because structs.EnterpriseMeta uses a pointer +// receiver for this method. Remove once that is fixed. +func (q Query) PartitionOrDefault() string { + return q.EnterpriseMeta.PartitionOrDefault() +} + // indexFromQuery builds an index key where Query.Value is lowercase, and is // a required value. func indexFromQuery(arg interface{}) ([]byte, error) { diff --git a/agent/consul/state/schema_test.go b/agent/consul/state/schema_test.go index 5b1ff585be..f90b28027d 100644 --- a/agent/consul/state/schema_test.go +++ b/agent/consul/state/schema_test.go @@ -107,10 +107,12 @@ func (tc indexerTestCase) run(t *testing.T, indexer memdb.Indexer) { } if i, ok := indexer.(memdb.MultiIndexer); ok { - valid, actual, err := i.FromObject(tc.writeMulti.source) - require.NoError(t, err) - require.True(t, valid) - require.Equal(t, tc.writeMulti.expected, actual) + t.Run("writeIndexMulti", func(t *testing.T) { + valid, actual, err := i.FromObject(tc.writeMulti.source) + require.NoError(t, err) + require.True(t, valid) + require.Equal(t, tc.writeMulti.expected, actual) + }) } for _, extra := range tc.extra { diff --git a/agent/consul/subscribe_backend.go b/agent/consul/subscribe_backend.go index 851b14aa07..a1ba47236b 100644 --- a/agent/consul/subscribe_backend.go +++ b/agent/consul/subscribe_backend.go @@ -31,6 +31,7 @@ var _ subscribe.Backend = (*subscribeBackend)(nil) // or if it matches the Datacenter in config. // // TODO: extract this so that it can be used with other grpc services. +// TODO: rename to ForwardToDC func (s subscribeBackend) Forward(dc string, f func(*grpc.ClientConn) error) (handled bool, err error) { if dc == "" || dc == s.srv.config.Datacenter { return false, nil diff --git a/agent/grpc/client.go b/agent/grpc/client.go index e58d5c6496..d3709744af 100644 --- a/agent/grpc/client.go +++ b/agent/grpc/client.go @@ -50,15 +50,25 @@ func NewClientConnPool(servers ServerLocator, tls TLSWrapper, useTLSForDC func(d // existing connections in the pool, a new one will be created, stored in the pool, // then returned. func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) { + return c.dial(datacenter, "server") +} + +// TODO: godoc +func (c *ClientConnPool) ClientConnLeader() (*grpc.ClientConn, error) { + return c.dial("local", "leader") +} + +func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.ClientConn, error) { c.connsLock.Lock() defer c.connsLock.Unlock() - if conn, ok := c.conns[datacenter]; ok { + target := fmt.Sprintf("consul://%s/%s.%s", c.servers.Authority(), serverType, datacenter) + if conn, ok := c.conns[target]; ok { return conn, nil } conn, err := grpc.Dial( - fmt.Sprintf("consul://%s/server.%s", c.servers.Authority(), datacenter), + target, // use WithInsecure mode here because we handle the TLS wrapping in the // custom dialer based on logic around whether the server has TLS enabled. grpc.WithInsecure(), @@ -86,7 +96,7 @@ func (c *ClientConnPool) ClientConn(datacenter string) (*grpc.ClientConn, error) return nil, err } - c.conns[datacenter] = conn + c.conns[target] = conn return conn, nil } diff --git a/agent/grpc/client_test.go b/agent/grpc/client_test.go index 49922a3098..d4789b4f0e 100644 --- a/agent/grpc/client_test.go +++ b/agent/grpc/client_test.go @@ -113,6 +113,44 @@ func TestClientConnPool_IntegrationWithGRPCResolver_Failover(t *testing.T) { require.NotEqual(t, resp.ServerName, first.ServerName) } +func TestClientConnPool_ForwardToLeader_Failover(t *testing.T) { + count := 3 + conf := newConfig(t) + res := resolver.NewServerResolverBuilder(conf) + registerWithGRPC(t, res) + pool := NewClientConnPool(res, nil, useTLSForDcAlwaysTrue) + + var servers []testServer + for i := 0; i < count; i++ { + name := fmt.Sprintf("server-%d", i) + srv := newTestServer(t, name, "dc1") + res.AddServer(srv.Metadata()) + servers = append(servers, srv) + t.Cleanup(srv.shutdown) + } + + // Set the leader address to the first server. + res.UpdateLeaderAddr(servers[0].addr.String()) + + conn, err := pool.ClientConnLeader() + require.NoError(t, err) + client := testservice.NewSimpleClient(conn) + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + t.Cleanup(cancel) + + first, err := client.Something(ctx, &testservice.Req{}) + require.NoError(t, err) + require.Equal(t, first.ServerName, servers[0].name) + + // Update the leader address and make another request. + res.UpdateLeaderAddr(servers[1].addr.String()) + + resp, err := client.Something(ctx, &testservice.Req{}) + require.NoError(t, err) + require.Equal(t, resp.ServerName, servers[1].name) +} + func newConfig(t *testing.T) resolver.Config { n := t.Name() s := strings.Replace(n, "/", "", -1) diff --git a/agent/grpc/resolver/resolver.go b/agent/grpc/resolver/resolver.go index b3eae815ff..5700168457 100644 --- a/agent/grpc/resolver/resolver.go +++ b/agent/grpc/resolver/resolver.go @@ -16,13 +16,15 @@ import ( // ServerResolvers updated when changes occur. type ServerResolverBuilder struct { cfg Config + // leaderResolver is used to track the address of the leader in the local DC. + leaderResolver leaderResolver // servers is an index of Servers by Server.ID. The map contains server IDs // for all datacenters. servers map[string]*metadata.Server // resolvers is an index of connections to the serverResolver which manages // addresses of servers for that connection. resolvers map[resolver.ClientConn]*serverResolver - // lock for servers and resolvers. + // lock for all stateful fields (excludes config which is immutable). lock sync.RWMutex } @@ -89,9 +91,23 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client if resolver, ok := s.resolvers[cc]; ok { return resolver, nil } + if cc == s.leaderResolver.clientConn { + return s.leaderResolver, nil + } + + serverType, datacenter, err := parseEndpoint(target.Endpoint) + if err != nil { + return nil, err + } + if serverType == "leader" { + // TODO: is this safe? can we ever have multiple CC for the leader? Seems + // like we can only have one given the caching in ClientConnPool.Dial + s.leaderResolver.clientConn = cc + s.leaderResolver.updateClientConn() + return s.leaderResolver, nil + } // Make a new resolver for the dc and add it to the list of active ones. - datacenter := strings.TrimPrefix(target.Endpoint, "server.") resolver := &serverResolver{ datacenter: datacenter, clientConn: cc, @@ -107,6 +123,16 @@ func (s *ServerResolverBuilder) Build(target resolver.Target, cc resolver.Client return resolver, nil } +// parseEndpoint parses a string, expecting a format of "serverType.datacenter" +func parseEndpoint(target string) (string, string, error) { + parts := strings.SplitN(target, ".", 2) + if len(parts) != 2 { + return "", "", fmt.Errorf("unexpected endpoint address: %v", target) + } + + return parts[0], parts[1], nil +} + func (s *ServerResolverBuilder) Authority() string { return s.cfg.Authority } @@ -168,6 +194,15 @@ func (s *ServerResolverBuilder) getDCAddrs(dc string) []resolver.Address { return addrs } +// UpdateLeaderAddr updates the leader address in the local DC's resolver. +func (s *ServerResolverBuilder) UpdateLeaderAddr(leaderAddr string) { + s.lock.Lock() + defer s.lock.Unlock() + + s.leaderResolver.addr = leaderAddr + s.leaderResolver.updateClientConn() +} + // serverResolver is a grpc Resolver that will keep a grpc.ClientConn up to date // on the list of server addresses to use. type serverResolver struct { @@ -224,4 +259,29 @@ func (r *serverResolver) Close() { } // ResolveNow is not used -func (*serverResolver) ResolveNow(_ resolver.ResolveNowOption) {} +func (*serverResolver) ResolveNow(resolver.ResolveNowOption) {} + +type leaderResolver struct { + addr string + clientConn resolver.ClientConn +} + +func (l leaderResolver) ResolveNow(resolver.ResolveNowOption) {} + +func (l leaderResolver) Close() {} + +func (l leaderResolver) updateClientConn() { + if l.addr == "" || l.clientConn == nil { + return + } + addrs := []resolver.Address{ + { + Addr: l.addr, + Type: resolver.Backend, + ServerName: "leader", + }, + } + l.clientConn.UpdateState(resolver.State{Addresses: addrs}) +} + +var _ resolver.Resolver = (*leaderResolver)(nil) diff --git a/agent/http.go b/agent/http.go index ad2bbe9779..a26675ce69 100644 --- a/agent/http.go +++ b/agent/http.go @@ -31,6 +31,7 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/logging" + "github.com/hashicorp/consul/proto/pbcommon" ) var HTTPSummaries = []prometheus.SummaryDefinition{ @@ -901,6 +902,14 @@ func (s *HTTPHandlers) parseConsistency(resp http.ResponseWriter, req *http.Requ return false } +// parseConsistencyReadRequest is used to parse the ?consistent query param. +func parseConsistencyReadRequest(resp http.ResponseWriter, req *http.Request, b *pbcommon.ReadRequest) { + query := req.URL.Query() + if _, ok := query["consistent"]; ok { + b.RequireConsistent = true + } +} + // parseDC is used to parse the ?dc query param func (s *HTTPHandlers) parseDC(req *http.Request, dc *string) { if other := req.URL.Query().Get("dc"); other != "" { diff --git a/agent/rpc/subscribe/subscribe_test.go b/agent/rpc/subscribe/subscribe_test.go index d2c13716dc..3599589ee6 100644 --- a/agent/rpc/subscribe/subscribe_test.go +++ b/agent/rpc/subscribe/subscribe_test.go @@ -277,16 +277,16 @@ func assertDeepEqual(t *testing.T, x, y interface{}, opts ...cmp.Option) { type testBackend struct { store *state.Store - authorizer func(token string) acl.Authorizer + authorizer func(token string, entMeta *structs.EnterpriseMeta) acl.Authorizer forwardConn *gogrpc.ClientConn } func (b testBackend) ResolveTokenAndDefaultMeta( token string, - _ *structs.EnterpriseMeta, + entMeta *structs.EnterpriseMeta, _ *acl.AuthorizerContext, ) (acl.Authorizer, error) { - return b.authorizer(token), nil + return b.authorizer(token, entMeta), nil } func (b testBackend) Forward(_ string, fn func(*gogrpc.ClientConn) error) (handled bool, err error) { @@ -306,7 +306,7 @@ func newTestBackend() (*testBackend, error) { return nil, err } store := state.NewStateStoreWithEventPublisher(gc) - allowAll := func(_ string) acl.Authorizer { + allowAll := func(string, *structs.EnterpriseMeta) acl.Authorizer { return acl.AllowAll() } return &testBackend{store: store, authorizer: allowAll}, nil @@ -612,7 +612,7 @@ node "node1" { require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) // TODO: is there any easy way to do this with the acl package? - backend.authorizer = func(tok string) acl.Authorizer { + backend.authorizer = func(tok string, _ *structs.EnterpriseMeta) acl.Authorizer { if tok == token { return authorizer } @@ -811,7 +811,7 @@ node "node1" { require.Equal(t, acl.Deny, authorizer.NodeRead("denied", nil)) // TODO: is there any easy way to do this with the acl package? - backend.authorizer = func(tok string) acl.Authorizer { + backend.authorizer = func(tok string, _ *structs.EnterpriseMeta) acl.Authorizer { if tok == token { return authorizer } diff --git a/agent/setup.go b/agent/setup.go index 7b363cd86b..5abecde79d 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -106,6 +106,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer) (BaseDeps, error) builder := resolver.NewServerResolverBuilder(resolver.Config{}) resolver.Register(builder) d.GRPCConnPool = grpc.NewClientConnPool(builder, grpc.TLSWrapper(d.TLSConfigurator.OutgoingRPCWrapper()), d.TLSConfigurator.UseTLS) + d.LeaderForwarder = builder d.Router = router.NewRouter(d.Logger, cfg.Datacenter, fmt.Sprintf("%s.%s", cfg.NodeName, cfg.Datacenter), builder) diff --git a/agent/structs/structs_oss_test.go b/agent/structs/structs_oss_test.go index f7811e74ac..94793e2680 100644 --- a/agent/structs/structs_oss_test.go +++ b/agent/structs/structs_oss_test.go @@ -1,3 +1,5 @@ +// +build !consulent + package structs import ( diff --git a/api/api.go b/api/api.go index a35980a9a1..ec48355684 100644 --- a/api/api.go +++ b/api/api.go @@ -76,6 +76,10 @@ const ( // HTTPNamespaceEnvVar defines an environment variable name which sets // the HTTP Namespace to be used by default. This can still be overridden. HTTPNamespaceEnvName = "CONSUL_NAMESPACE" + + // HTTPPartitionEnvName defines an environment variable name which sets + // the HTTP Partition to be used by default. This can still be overridden. + HTTPPartitionEnvName = "CONSUL_PARTITION" ) // QueryOptions are used to parameterize a query @@ -84,6 +88,10 @@ type QueryOptions struct { // Note: Namespaces are available only in Consul Enterprise Namespace string + // Partition overrides the `default` partition + // Note: Partitions are available only in Consul Enterprise + Partition string + // Providing a datacenter overwrites the DC provided // by the Config Datacenter string @@ -191,6 +199,10 @@ type WriteOptions struct { // Note: Namespaces are available only in Consul Enterprise Namespace string + // Partition overrides the `default` partition + // Note: Partitions are available only in Consul Enterprise + Partition string + // Providing a datacenter overwrites the DC provided // by the Config Datacenter string @@ -314,6 +326,10 @@ type Config struct { // when no other Namespace is present in the QueryOptions Namespace string + // Partition is the name of the partition to send along for the request + // when no other Partition is present in the QueryOptions + Partition string + TLSConfig TLSConfig } @@ -466,6 +482,10 @@ func defaultConfig(logger hclog.Logger, transportFn func() *http.Transport) *Con config.Namespace = v } + if v := os.Getenv(HTTPPartitionEnvName); v != "" { + config.Partition = v + } + return config } @@ -732,6 +752,9 @@ func (r *request) setQueryOptions(q *QueryOptions) { if q.Namespace != "" { r.params.Set("ns", q.Namespace) } + if q.Partition != "" { + r.params.Set("partition", q.Partition) + } if q.Datacenter != "" { r.params.Set("dc", q.Datacenter) } @@ -834,6 +857,9 @@ func (r *request) setWriteOptions(q *WriteOptions) { if q.Namespace != "" { r.params.Set("ns", q.Namespace) } + if q.Partition != "" { + r.params.Set("partition", q.Partition) + } if q.Datacenter != "" { r.params.Set("dc", q.Datacenter) } @@ -908,6 +934,9 @@ func (c *Client) newRequest(method, path string) *request { if c.config.Namespace != "" { r.params.Set("ns", c.config.Namespace) } + if c.config.Partition != "" { + r.params.Set("partition", c.config.Partition) + } if c.config.WaitTime != 0 { r.params.Set("wait", durToMsec(r.config.WaitTime)) } diff --git a/api/api_test.go b/api/api_test.go index 5b7f557cae..134b8d9252 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -738,6 +738,7 @@ func TestAPI_SetQueryOptions(t *testing.T) { r := c.newRequest("GET", "/v1/kv/foo") q := &QueryOptions{ Namespace: "operator", + Partition: "asdf", Datacenter: "foo", AllowStale: true, RequireConsistent: true, @@ -752,6 +753,9 @@ func TestAPI_SetQueryOptions(t *testing.T) { if r.params.Get("ns") != "operator" { t.Fatalf("bad: %v", r.params) } + if r.params.Get("partition") != "asdf" { + t.Fatalf("bad: %v", r.params) + } if r.params.Get("dc") != "foo" { t.Fatalf("bad: %v", r.params) } @@ -799,6 +803,7 @@ func TestAPI_SetWriteOptions(t *testing.T) { r := c.newRequest("GET", "/v1/kv/foo") q := &WriteOptions{ Namespace: "operator", + Partition: "asdf", Datacenter: "foo", Token: "23456", } @@ -806,6 +811,9 @@ func TestAPI_SetWriteOptions(t *testing.T) { if r.params.Get("ns") != "operator" { t.Fatalf("bad: %v", r.params) } + if r.params.Get("partition") != "asdf" { + t.Fatalf("bad: %v", r.params) + } if r.params.Get("dc") != "foo" { t.Fatalf("bad: %v", r.params) } diff --git a/logging/names.go b/logging/names.go index b968718bc6..17db364af7 100644 --- a/logging/names.go +++ b/logging/names.go @@ -49,6 +49,7 @@ const ( Session string = "session" Sentinel string = "sentinel" Snapshot string = "snapshot" + Partition string = "partition" TerminatingGateway string = "terminating_gateway" TLSUtil string = "tlsutil" Transaction string = "txn" diff --git a/proto/pbcommon/common.go b/proto/pbcommon/common.go index eb396ae58e..97241341c1 100644 --- a/proto/pbcommon/common.go +++ b/proto/pbcommon/common.go @@ -74,6 +74,14 @@ func (q *QueryOptions) SetStaleIfError(staleIfError time.Duration) { q.StaleIfError = staleIfError } +func (q QueryOptions) HasTimedOut(start time.Time, rpcHoldTimeout, maxQueryTime, defaultQueryTime time.Duration) bool { + o := structs.QueryOptions{ + MaxQueryTime: q.MaxQueryTime, + MinQueryIndex: q.MinQueryIndex, + } + return o.HasTimedOut(start, rpcHoldTimeout, maxQueryTime, defaultQueryTime) +} + // SetFilter is needed to implement the structs.QueryOptionsCompat interface func (q *QueryOptions) SetFilter(filter string) { q.Filter = filter @@ -121,6 +129,10 @@ func (w WriteRequest) AllowStaleRead() bool { return false } +func (w WriteRequest) HasTimedOut(start time.Time, rpcHoldTimeout, _, _ time.Duration) bool { + return time.Since(start) > rpcHoldTimeout +} + func (td TargetDatacenter) RequestDatacenter() string { return td.Datacenter } diff --git a/proto/pbcommon/common.pb.binary.go b/proto/pbcommon/common.pb.binary.go index df36ec0ded..4fa371c5fe 100644 --- a/proto/pbcommon/common.pb.binary.go +++ b/proto/pbcommon/common.pb.binary.go @@ -37,6 +37,16 @@ func (msg *WriteRequest) UnmarshalBinary(b []byte) error { return proto.Unmarshal(b, msg) } +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *ReadRequest) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *ReadRequest) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} + // MarshalBinary implements encoding.BinaryMarshaler func (msg *QueryOptions) MarshalBinary() ([]byte, error) { return proto.Marshal(msg) diff --git a/proto/pbcommon/common.pb.go b/proto/pbcommon/common.pb.go index 06a6fa1f09..1299fa61d5 100644 --- a/proto/pbcommon/common.pb.go +++ b/proto/pbcommon/common.pb.go @@ -152,6 +152,67 @@ func (m *WriteRequest) GetToken() string { return "" } +// ReadRequest is a type that may be embedded into any requests for read +// operations. +// It is a replacement for QueryOptions now that we no longer need any of those +// fields because we are moving away from using blocking queries. +// It is also similar to WriteRequest. It is a separate type so that in the +// future we can introduce fields that may only be relevant for reads. +type ReadRequest struct { + // Token is the ACL token ID. If not provided, the 'anonymous' + // token is assumed for backwards compatibility. + Token string `protobuf:"bytes,1,opt,name=Token,proto3" json:"Token,omitempty"` + // RequireConsistent indicates that the request must be sent to the leader. + RequireConsistent bool `protobuf:"varint,2,opt,name=RequireConsistent,proto3" json:"RequireConsistent,omitempty"` +} + +func (m *ReadRequest) Reset() { *m = ReadRequest{} } +func (m *ReadRequest) String() string { return proto.CompactTextString(m) } +func (*ReadRequest) ProtoMessage() {} +func (*ReadRequest) Descriptor() ([]byte, []int) { + return fileDescriptor_a6f5ac44994d718c, []int{3} +} +func (m *ReadRequest) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ReadRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ReadRequest.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ReadRequest) XXX_Merge(src proto.Message) { + xxx_messageInfo_ReadRequest.Merge(m, src) +} +func (m *ReadRequest) XXX_Size() int { + return m.Size() +} +func (m *ReadRequest) XXX_DiscardUnknown() { + xxx_messageInfo_ReadRequest.DiscardUnknown(m) +} + +var xxx_messageInfo_ReadRequest proto.InternalMessageInfo + +func (m *ReadRequest) GetToken() string { + if m != nil { + return m.Token + } + return "" +} + +func (m *ReadRequest) GetRequireConsistent() bool { + if m != nil { + return m.RequireConsistent + } + return false +} + // QueryOptions is used to specify various flags for read queries type QueryOptions struct { // Token is the ACL token ID. If not provided, the 'anonymous' @@ -209,7 +270,7 @@ func (m *QueryOptions) Reset() { *m = QueryOptions{} } func (m *QueryOptions) String() string { return proto.CompactTextString(m) } func (*QueryOptions) ProtoMessage() {} func (*QueryOptions) Descriptor() ([]byte, []int) { - return fileDescriptor_a6f5ac44994d718c, []int{3} + return fileDescriptor_a6f5ac44994d718c, []int{4} } func (m *QueryOptions) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -336,7 +397,7 @@ func (m *QueryMeta) Reset() { *m = QueryMeta{} } func (m *QueryMeta) String() string { return proto.CompactTextString(m) } func (*QueryMeta) ProtoMessage() {} func (*QueryMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_a6f5ac44994d718c, []int{4} + return fileDescriptor_a6f5ac44994d718c, []int{5} } func (m *QueryMeta) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -398,15 +459,15 @@ func (m *QueryMeta) GetConsistencyLevel() string { type EnterpriseMeta struct { // Namespace in which the entity exists. Namespace string `protobuf:"bytes,1,opt,name=Namespace,proto3" json:"Namespace,omitempty"` - // Tenant in which the entity exists. - Tenant string `protobuf:"bytes,2,opt,name=Tenant,proto3" json:"Tenant,omitempty"` + // Partition in which the entity exists. + Partition string `protobuf:"bytes,2,opt,name=Partition,proto3" json:"Partition,omitempty"` } func (m *EnterpriseMeta) Reset() { *m = EnterpriseMeta{} } func (m *EnterpriseMeta) String() string { return proto.CompactTextString(m) } func (*EnterpriseMeta) ProtoMessage() {} func (*EnterpriseMeta) Descriptor() ([]byte, []int) { - return fileDescriptor_a6f5ac44994d718c, []int{5} + return fileDescriptor_a6f5ac44994d718c, []int{6} } func (m *EnterpriseMeta) XXX_Unmarshal(b []byte) error { return m.Unmarshal(b) @@ -439,6 +500,7 @@ func init() { proto.RegisterType((*RaftIndex)(nil), "common.RaftIndex") proto.RegisterType((*TargetDatacenter)(nil), "common.TargetDatacenter") proto.RegisterType((*WriteRequest)(nil), "common.WriteRequest") + proto.RegisterType((*ReadRequest)(nil), "common.ReadRequest") proto.RegisterType((*QueryOptions)(nil), "common.QueryOptions") proto.RegisterType((*QueryMeta)(nil), "common.QueryMeta") proto.RegisterType((*EnterpriseMeta)(nil), "common.EnterpriseMeta") @@ -447,45 +509,46 @@ func init() { func init() { proto.RegisterFile("proto/pbcommon/common.proto", fileDescriptor_a6f5ac44994d718c) } var fileDescriptor_a6f5ac44994d718c = []byte{ - // 606 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0xcd, 0x6e, 0xd3, 0x4c, - 0x14, 0x8d, 0xbf, 0x2f, 0x0d, 0xf1, 0x4d, 0x5b, 0x95, 0x51, 0x85, 0x4c, 0x41, 0x4e, 0x14, 0x21, - 0x54, 0x55, 0x10, 0x4b, 0x65, 0x57, 0x56, 0x6d, 0xda, 0xa2, 0x8a, 0x98, 0x0a, 0x13, 0x84, 0xc4, - 0x6e, 0xe2, 0xdc, 0x38, 0x23, 0xec, 0x19, 0x33, 0x1e, 0xb7, 0xe9, 0x1b, 0xb0, 0x64, 0x59, 0xb1, - 0xe2, 0x41, 0x78, 0x80, 0x2e, 0xbb, 0x64, 0x55, 0xa0, 0x79, 0x03, 0x9e, 0x00, 0x79, 0xec, 0xb6, - 0x2e, 0xed, 0x22, 0xac, 0x92, 0x73, 0x7c, 0xce, 0xcc, 0xfd, 0x39, 0x36, 0x3c, 0x88, 0xa5, 0x50, - 0xc2, 0x89, 0x07, 0xbe, 0x88, 0x22, 0xc1, 0x9d, 0xfc, 0xa7, 0xa3, 0x59, 0x52, 0xcb, 0xd1, 0x8a, - 0x1d, 0x08, 0x11, 0x84, 0xe8, 0x68, 0x76, 0x90, 0x8e, 0x9c, 0x61, 0x2a, 0xa9, 0x62, 0x17, 0xba, - 0x95, 0xe5, 0x40, 0x04, 0x22, 0x3f, 0x28, 0xfb, 0x97, 0xb3, 0xed, 0x08, 0x4c, 0x8f, 0x8e, 0xd4, - 0x1e, 0x1f, 0xe2, 0x84, 0x38, 0xd0, 0xe8, 0x4a, 0xa4, 0x0a, 0x35, 0xb4, 0x8c, 0x96, 0xb1, 0x5a, - 0xdd, 0x5a, 0xf8, 0x7d, 0xd6, 0x34, 0x07, 0x38, 0x89, 0xe5, 0x46, 0xfb, 0x69, 0xdb, 0x2b, 0x2b, - 0x32, 0x83, 0x2b, 0x86, 0x6c, 0x74, 0x94, 0x1b, 0xfe, 0xbb, 0xd5, 0x50, 0x52, 0xb4, 0xd7, 0x61, - 0xa9, 0x4f, 0x65, 0x80, 0x6a, 0x9b, 0x2a, 0xea, 0x23, 0x57, 0x28, 0x89, 0x0d, 0x70, 0x85, 0xf4, - 0xa5, 0xa6, 0x57, 0x62, 0xda, 0x6b, 0x30, 0xff, 0x4e, 0x32, 0x85, 0x1e, 0x7e, 0x4c, 0x31, 0x51, - 0x64, 0x19, 0xe6, 0xfa, 0xe2, 0x03, 0xf2, 0x42, 0x9a, 0x83, 0x8d, 0xea, 0xa7, 0xaf, 0x4d, 0xa3, - 0xfd, 0xa5, 0x0a, 0xf3, 0xaf, 0x53, 0x94, 0x47, 0xfb, 0x71, 0xd6, 0x7a, 0x72, 0xbb, 0x98, 0x3c, - 0x82, 0x05, 0x97, 0x71, 0x2d, 0x2c, 0x55, 0xee, 0x5d, 0x27, 0xc9, 0x0b, 0x98, 0x77, 0xe9, 0x44, - 0x13, 0x7d, 0x16, 0xa1, 0xf5, 0x7f, 0xcb, 0x58, 0x6d, 0xac, 0xdf, 0xef, 0xe4, 0x83, 0xee, 0x5c, - 0x0c, 0xba, 0xb3, 0x5d, 0x0c, 0x7a, 0xab, 0x7e, 0x72, 0xd6, 0xac, 0x1c, 0xff, 0x68, 0x1a, 0xde, - 0x35, 0x63, 0xd6, 0xe1, 0x66, 0x18, 0x8a, 0xc3, 0x37, 0x8a, 0x86, 0x68, 0x55, 0x5b, 0xc6, 0x6a, - 0xdd, 0x2b, 0x31, 0xe4, 0x09, 0xdc, 0xcd, 0x9a, 0x63, 0x12, 0xbb, 0x82, 0x27, 0x2c, 0x51, 0xc8, - 0x95, 0x35, 0xa7, 0x65, 0x37, 0x1f, 0x90, 0x15, 0xa8, 0xbf, 0x4d, 0xb0, 0x4b, 0xfd, 0x31, 0x5a, - 0x35, 0x2d, 0xba, 0xc4, 0x64, 0x1f, 0x96, 0x5c, 0x3a, 0xd1, 0xa7, 0x5e, 0x54, 0x65, 0xdd, 0x99, - 0xbd, 0xec, 0x1b, 0x66, 0xf2, 0x1c, 0x6a, 0x2e, 0x9d, 0x6c, 0x06, 0x68, 0xd5, 0x67, 0x3f, 0xa6, - 0xb0, 0x90, 0xc7, 0xb0, 0xe8, 0xa6, 0x89, 0xf2, 0xf0, 0x80, 0x86, 0x6c, 0x48, 0x15, 0x5a, 0xa6, - 0xae, 0xf7, 0x2f, 0x36, 0x1b, 0xb4, 0xbe, 0x75, 0x6f, 0xb4, 0x23, 0xa5, 0x90, 0x16, 0xfc, 0xc3, - 0xa0, 0xcb, 0x46, 0x72, 0x0f, 0x6a, 0xbb, 0x2c, 0xcc, 0x62, 0xd4, 0xd0, 0xeb, 0x2e, 0x50, 0x11, - 0x8e, 0x6f, 0x06, 0x98, 0x7a, 0x29, 0x2e, 0x2a, 0x9a, 0x25, 0xa3, 0x14, 0x73, 0x2f, 0x07, 0x64, - 0x07, 0x1a, 0x3d, 0x9a, 0xa8, 0xae, 0xe0, 0x8a, 0xfa, 0x4a, 0xe7, 0x62, 0xc6, 0x4a, 0xca, 0x3e, - 0xd2, 0x82, 0xc6, 0x4b, 0x2e, 0x0e, 0x79, 0x0f, 0xe9, 0x10, 0xa5, 0x4e, 0x4e, 0xdd, 0x2b, 0x53, - 0x64, 0x0d, 0x96, 0x2e, 0x77, 0xea, 0x1f, 0xf5, 0xf0, 0x00, 0x43, 0x9d, 0x0c, 0xd3, 0xbb, 0xc1, - 0x17, 0xe5, 0xef, 0xc2, 0xe2, 0x4e, 0xf6, 0x42, 0xc4, 0x92, 0x25, 0xa8, 0x5b, 0x78, 0x08, 0xe6, - 0x2b, 0x1a, 0x61, 0x12, 0x53, 0x1f, 0x8b, 0x80, 0x5f, 0x11, 0xd9, 0x30, 0xfa, 0xc8, 0x29, 0xcf, - 0xbb, 0x30, 0xbd, 0x02, 0x6d, 0xf5, 0x4e, 0x7e, 0xd9, 0x95, 0x93, 0x73, 0xdb, 0x38, 0x3d, 0xb7, - 0x8d, 0x9f, 0xe7, 0xb6, 0xf1, 0x79, 0x6a, 0x57, 0x8e, 0xa7, 0x76, 0xe5, 0x74, 0x6a, 0x57, 0xbe, - 0x4f, 0xed, 0xca, 0xfb, 0xb5, 0x80, 0xa9, 0x71, 0x3a, 0xe8, 0xf8, 0x22, 0x72, 0xc6, 0x34, 0x19, - 0x33, 0x5f, 0xc8, 0xd8, 0xf1, 0x05, 0x4f, 0xd2, 0xd0, 0xb9, 0xfe, 0x2d, 0x1a, 0xd4, 0x34, 0x7e, - 0xf6, 0x27, 0x00, 0x00, 0xff, 0xff, 0x6f, 0x3a, 0xc6, 0x28, 0xa4, 0x04, 0x00, 0x00, + // 620 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x54, 0x41, 0x4f, 0xd4, 0x40, + 0x14, 0xde, 0xe2, 0xb2, 0x6e, 0xdf, 0x02, 0xc1, 0x09, 0x31, 0x15, 0x4d, 0x97, 0x6c, 0x8c, 0x21, + 0x44, 0xb7, 0x09, 0xde, 0xf0, 0x04, 0x0b, 0x1a, 0xe2, 0x56, 0x74, 0xc4, 0x90, 0x78, 0x9b, 0xed, + 0xbe, 0xed, 0x4e, 0x6c, 0x3b, 0x75, 0x3a, 0x85, 0xe5, 0x1f, 0x78, 0xf4, 0x48, 0x3c, 0xf9, 0x43, + 0xfc, 0x01, 0x1c, 0x39, 0x7a, 0x42, 0x65, 0xff, 0x81, 0xbf, 0xc0, 0x74, 0x5a, 0xa0, 0x08, 0x18, + 0x3c, 0xed, 0x7e, 0xdf, 0x7c, 0xdf, 0xeb, 0x9b, 0xf7, 0xbe, 0x16, 0xee, 0xc7, 0x52, 0x28, 0xe1, + 0xc4, 0x3d, 0x4f, 0x84, 0xa1, 0x88, 0x9c, 0xfc, 0xa7, 0xad, 0x59, 0x52, 0xcb, 0xd1, 0xbc, 0xed, + 0x0b, 0xe1, 0x07, 0xe8, 0x68, 0xb6, 0x97, 0x0e, 0x9c, 0x7e, 0x2a, 0x99, 0xe2, 0xa7, 0xba, 0xf9, + 0x39, 0x5f, 0xf8, 0x22, 0x2f, 0x94, 0xfd, 0xcb, 0xd9, 0x56, 0x08, 0x26, 0x65, 0x03, 0xb5, 0x19, + 0xf5, 0x71, 0x44, 0x1c, 0x68, 0x74, 0x24, 0x32, 0x85, 0x1a, 0x5a, 0xc6, 0x82, 0xb1, 0x58, 0x5d, + 0x9b, 0xfe, 0x7d, 0xdc, 0x34, 0x7b, 0x38, 0x8a, 0xe5, 0x4a, 0xeb, 0x49, 0x8b, 0x96, 0x15, 0x99, + 0xc1, 0x15, 0x7d, 0x3e, 0xd8, 0xcf, 0x0d, 0x13, 0x57, 0x1a, 0x4a, 0x8a, 0xd6, 0x32, 0xcc, 0x6e, + 0x33, 0xe9, 0xa3, 0x5a, 0x67, 0x8a, 0x79, 0x18, 0x29, 0x94, 0xc4, 0x06, 0x38, 0x47, 0xfa, 0xa1, + 0x26, 0x2d, 0x31, 0xad, 0x25, 0x98, 0xda, 0x91, 0x5c, 0x21, 0xc5, 0x8f, 0x29, 0x26, 0x8a, 0xcc, + 0xc1, 0xe4, 0xb6, 0xf8, 0x80, 0x51, 0x21, 0xcd, 0xc1, 0x4a, 0xf5, 0xd3, 0xd7, 0xa6, 0xd1, 0xda, + 0x81, 0x06, 0x45, 0xd6, 0xff, 0xa7, 0x94, 0x3c, 0x86, 0x3b, 0x99, 0x80, 0x4b, 0xec, 0x88, 0x28, + 0xe1, 0x89, 0xc2, 0x48, 0xe9, 0xde, 0xeb, 0xf4, 0xf2, 0x41, 0x51, 0xf8, 0x4b, 0x15, 0xa6, 0xde, + 0xa4, 0x28, 0xf7, 0xb7, 0xe2, 0x6c, 0xa6, 0xc9, 0x35, 0xa5, 0x1f, 0xc2, 0xb4, 0xcb, 0x23, 0x2d, + 0x2c, 0x8d, 0x84, 0x5e, 0x24, 0xc9, 0x0b, 0x98, 0x72, 0xd9, 0x48, 0x13, 0xdb, 0x3c, 0x44, 0xeb, + 0xd6, 0x82, 0xb1, 0xd8, 0x58, 0xbe, 0xd7, 0xce, 0x37, 0xd8, 0x3e, 0xdd, 0x60, 0x7b, 0xbd, 0xd8, + 0xe0, 0x5a, 0xfd, 0xf0, 0xb8, 0x59, 0x39, 0xf8, 0xd1, 0x34, 0xe8, 0x05, 0x63, 0x36, 0xba, 0xd5, + 0x20, 0x10, 0x7b, 0x6f, 0x15, 0x0b, 0xd0, 0xaa, 0xea, 0x2b, 0x94, 0x98, 0xab, 0x6f, 0x3a, 0x79, + 0xcd, 0x4d, 0xc9, 0x3c, 0xd4, 0xdf, 0x25, 0xd8, 0x61, 0xde, 0x10, 0xad, 0x9a, 0x16, 0x9d, 0x61, + 0xb2, 0x05, 0xb3, 0x2e, 0x1b, 0xe9, 0xaa, 0xa7, 0x5d, 0x59, 0xb7, 0x6f, 0xde, 0xf6, 0x25, 0x33, + 0x79, 0x06, 0x35, 0x97, 0x8d, 0x56, 0x7d, 0xb4, 0xea, 0x37, 0x2f, 0x53, 0x58, 0xc8, 0x23, 0x98, + 0x71, 0xd3, 0x44, 0x51, 0xdc, 0x65, 0x01, 0xef, 0x33, 0x85, 0x96, 0xa9, 0xfb, 0xfd, 0x8b, 0xcd, + 0x06, 0xad, 0x9f, 0xba, 0x39, 0xd8, 0x90, 0x52, 0x48, 0x0b, 0xfe, 0x63, 0xd0, 0x65, 0x23, 0xb9, + 0x0b, 0xb5, 0xe7, 0x3c, 0xc8, 0xf2, 0xd9, 0xd0, 0xeb, 0x2e, 0x50, 0x11, 0x8e, 0x6f, 0x06, 0x98, + 0x7a, 0x29, 0x2e, 0x2a, 0x96, 0x25, 0xa3, 0xf4, 0xfe, 0xd0, 0x1c, 0x90, 0x0d, 0x68, 0x74, 0x59, + 0xa2, 0x3a, 0x22, 0x52, 0xcc, 0xcb, 0xe3, 0x76, 0xc3, 0x4e, 0xca, 0x3e, 0xb2, 0x00, 0x8d, 0x97, + 0x91, 0xd8, 0x8b, 0xba, 0xc8, 0xfa, 0x28, 0x75, 0x72, 0xea, 0xb4, 0x4c, 0x91, 0x25, 0x98, 0x3d, + 0xdb, 0xa9, 0xb7, 0xdf, 0xc5, 0x5d, 0x0c, 0x74, 0x32, 0x4c, 0x7a, 0x89, 0x2f, 0xda, 0xef, 0xc2, + 0xcc, 0x46, 0xf6, 0xa6, 0xc5, 0x92, 0x27, 0xa8, 0xaf, 0xf0, 0x00, 0xcc, 0x57, 0x2c, 0xc4, 0x24, + 0x66, 0x1e, 0x16, 0x01, 0x3f, 0x27, 0xb2, 0xd3, 0xd7, 0x4c, 0x2a, 0xae, 0x43, 0x30, 0x91, 0x9f, + 0x9e, 0x11, 0x6b, 0xdd, 0xc3, 0x5f, 0x76, 0xe5, 0xf0, 0xc4, 0x36, 0x8e, 0x4e, 0x6c, 0xe3, 0xe7, + 0x89, 0x6d, 0x7c, 0x1e, 0xdb, 0x95, 0x83, 0xb1, 0x5d, 0x39, 0x1a, 0xdb, 0x95, 0xef, 0x63, 0xbb, + 0xf2, 0x7e, 0xc9, 0xe7, 0x6a, 0x98, 0xf6, 0xda, 0x9e, 0x08, 0x9d, 0x21, 0x4b, 0x86, 0xdc, 0x13, + 0x32, 0x76, 0x3c, 0x11, 0x25, 0x69, 0xe0, 0x5c, 0xfc, 0xd4, 0xf5, 0x6a, 0x1a, 0x3f, 0xfd, 0x13, + 0x00, 0x00, 0xff, 0xff, 0x9c, 0xf6, 0xbd, 0xcc, 0x03, 0x05, 0x00, 0x00, } func (m *RaftIndex) Marshal() (dAtA []byte, err error) { @@ -581,6 +644,46 @@ func (m *WriteRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { return len(dAtA) - i, nil } +func (m *ReadRequest) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ReadRequest) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ReadRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.RequireConsistent { + i-- + if m.RequireConsistent { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i-- + dAtA[i] = 0x10 + } + if len(m.Token) > 0 { + i -= len(m.Token) + copy(dAtA[i:], m.Token) + i = encodeVarintCommon(dAtA, i, uint64(len(m.Token))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + func (m *QueryOptions) Marshal() (dAtA []byte, err error) { size := m.Size() dAtA = make([]byte, size) @@ -768,10 +871,10 @@ func (m *EnterpriseMeta) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l - if len(m.Tenant) > 0 { - i -= len(m.Tenant) - copy(dAtA[i:], m.Tenant) - i = encodeVarintCommon(dAtA, i, uint64(len(m.Tenant))) + if len(m.Partition) > 0 { + i -= len(m.Partition) + copy(dAtA[i:], m.Partition) + i = encodeVarintCommon(dAtA, i, uint64(len(m.Partition))) i-- dAtA[i] = 0x12 } @@ -837,6 +940,22 @@ func (m *WriteRequest) Size() (n int) { return n } +func (m *ReadRequest) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Token) + if l > 0 { + n += 1 + l + sovCommon(uint64(l)) + } + if m.RequireConsistent { + n += 2 + } + return n +} + func (m *QueryOptions) Size() (n int) { if m == nil { return 0 @@ -908,7 +1027,7 @@ func (m *EnterpriseMeta) Size() (n int) { if l > 0 { n += 1 + l + sovCommon(uint64(l)) } - l = len(m.Tenant) + l = len(m.Partition) if l > 0 { n += 1 + l + sovCommon(uint64(l)) } @@ -1182,6 +1301,111 @@ func (m *WriteRequest) Unmarshal(dAtA []byte) error { } return nil } +func (m *ReadRequest) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ReadRequest: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ReadRequest: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Token", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthCommon + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthCommon + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Token = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field RequireConsistent", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCommon + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.RequireConsistent = bool(v != 0) + default: + iNdEx = preIndex + skippy, err := skipCommon(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthCommon + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthCommon + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} func (m *QueryOptions) Unmarshal(dAtA []byte) error { l := len(dAtA) iNdEx := 0 @@ -1750,7 +1974,7 @@ func (m *EnterpriseMeta) Unmarshal(dAtA []byte) error { iNdEx = postIndex case 2: if wireType != 2 { - return fmt.Errorf("proto: wrong wireType = %d for field Tenant", wireType) + return fmt.Errorf("proto: wrong wireType = %d for field Partition", wireType) } var stringLen uint64 for shift := uint(0); ; shift += 7 { @@ -1778,7 +2002,7 @@ func (m *EnterpriseMeta) Unmarshal(dAtA []byte) error { if postIndex > l { return io.ErrUnexpectedEOF } - m.Tenant = string(dAtA[iNdEx:postIndex]) + m.Partition = string(dAtA[iNdEx:postIndex]) iNdEx = postIndex default: iNdEx = preIndex diff --git a/proto/pbcommon/common.proto b/proto/pbcommon/common.proto index db946d097e..2a0d2c6dc6 100644 --- a/proto/pbcommon/common.proto +++ b/proto/pbcommon/common.proto @@ -38,6 +38,24 @@ message WriteRequest { string Token = 1; } +// ReadRequest is a type that may be embedded into any requests for read +// operations. +// It is a replacement for QueryOptions now that we no longer need any of those +// fields because we are moving away from using blocking queries. +// It is also similar to WriteRequest. It is a separate type so that in the +// future we can introduce fields that may only be relevant for reads. +message ReadRequest { + option (gogoproto.goproto_getters) = true; + + // Token is the ACL token ID. If not provided, the 'anonymous' + // token is assumed for backwards compatibility. + string Token = 1; + + // RequireConsistent indicates that the request must be sent to the leader. + bool RequireConsistent = 2; +} + + // QueryOptions is used to specify various flags for read queries message QueryOptions { // The autogenerated getters will implement half of the @@ -139,6 +157,6 @@ message QueryMeta { message EnterpriseMeta { // Namespace in which the entity exists. string Namespace = 1; - // Tenant in which the entity exists. - string Tenant = 2; + // Partition in which the entity exists. + string Partition = 2; } \ No newline at end of file