mirror of https://github.com/status-im/consul.git
GRPC stub for the ResourceService (#16528)
This commit is contained in:
parent
040647e0ba
commit
176945aa86
|
@ -48,6 +48,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/grpc-external/services/connectca"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/dataplane"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/peerstream"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/resource"
|
||||
"github.com/hashicorp/consul/agent/grpc-external/services/serverdiscovery"
|
||||
agentgrpc "github.com/hashicorp/consul/agent/grpc-internal"
|
||||
"github.com/hashicorp/consul/agent/grpc-internal/services/subscribe"
|
||||
|
@ -728,69 +729,8 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
|
|||
s.overviewManager = NewOverviewManager(s.logger, s.fsm, s.config.MetricsReportingInterval)
|
||||
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
|
||||
|
||||
// Initialize external gRPC server - register services on external gRPC server.
|
||||
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
|
||||
ACLsEnabled: s.config.ACLsEnabled,
|
||||
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||
return s.ForwardGRPC(s.grpcConnPool, info, fn)
|
||||
},
|
||||
InPrimaryDatacenter: s.InPrimaryDatacenter(),
|
||||
LoadAuthMethod: func(methodName string, entMeta *acl.EnterpriseMeta) (*structs.ACLAuthMethod, aclgrpc.Validator, error) {
|
||||
return s.loadAuthMethod(methodName, entMeta)
|
||||
},
|
||||
LocalTokensEnabled: s.LocalTokensEnabled,
|
||||
Logger: logger.Named("grpc-api.acl"),
|
||||
NewLogin: func() aclgrpc.Login { return s.aclLogin() },
|
||||
NewTokenWriter: func() aclgrpc.TokenWriter { return s.aclTokenWriter() },
|
||||
PrimaryDatacenter: s.config.PrimaryDatacenter,
|
||||
ValidateEnterpriseRequest: s.validateEnterpriseRequest,
|
||||
})
|
||||
s.externalACLServer.Register(s.externalGRPCServer)
|
||||
|
||||
s.externalConnectCAServer = connectca.NewServer(connectca.Config{
|
||||
Publisher: s.publisher,
|
||||
GetStore: func() connectca.StateStore { return s.FSM().State() },
|
||||
Logger: logger.Named("grpc-api.connect-ca"),
|
||||
ACLResolver: s.ACLResolver,
|
||||
CAManager: s.caManager,
|
||||
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||
return s.ForwardGRPC(s.grpcConnPool, info, fn)
|
||||
},
|
||||
ConnectEnabled: s.config.ConnectEnabled,
|
||||
})
|
||||
s.externalConnectCAServer.Register(s.externalGRPCServer)
|
||||
|
||||
dataplane.NewServer(dataplane.Config{
|
||||
GetStore: func() dataplane.StateStore { return s.FSM().State() },
|
||||
Logger: logger.Named("grpc-api.dataplane"),
|
||||
ACLResolver: s.ACLResolver,
|
||||
Datacenter: s.config.Datacenter,
|
||||
}).Register(s.externalGRPCServer)
|
||||
|
||||
serverdiscovery.NewServer(serverdiscovery.Config{
|
||||
Publisher: s.publisher,
|
||||
ACLResolver: s.ACLResolver,
|
||||
Logger: logger.Named("grpc-api.server-discovery"),
|
||||
}).Register(s.externalGRPCServer)
|
||||
|
||||
s.peeringBackend = NewPeeringBackend(s)
|
||||
s.operatorBackend = NewOperatorBackend(s)
|
||||
s.peerStreamServer = peerstream.NewServer(peerstream.Config{
|
||||
Backend: s.peeringBackend,
|
||||
GetStore: func() peerstream.StateStore { return s.FSM().State() },
|
||||
Logger: logger.Named("grpc-api.peerstream"),
|
||||
ACLResolver: s.ACLResolver,
|
||||
Datacenter: s.config.Datacenter,
|
||||
ConnectEnabled: s.config.ConnectEnabled,
|
||||
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||
// Only forward the request if the dc in the request matches the server's datacenter.
|
||||
if info.RequestDatacenter() != "" && info.RequestDatacenter() != config.Datacenter {
|
||||
return false, fmt.Errorf("requests to generate peering tokens cannot be forwarded to remote datacenters")
|
||||
}
|
||||
return s.ForwardGRPC(s.grpcConnPool, info, fn)
|
||||
},
|
||||
})
|
||||
s.peerStreamServer.Register(s.externalGRPCServer)
|
||||
// Initialize external gRPC server
|
||||
s.setupExternalGRPC(config, logger)
|
||||
|
||||
// Initialize internal gRPC server.
|
||||
//
|
||||
|
@ -1220,6 +1160,76 @@ func (s *Server) setupRPC() error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// Initialize and register services on external gRPC server.
|
||||
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
|
||||
|
||||
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
|
||||
ACLsEnabled: s.config.ACLsEnabled,
|
||||
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||
return s.ForwardGRPC(s.grpcConnPool, info, fn)
|
||||
},
|
||||
InPrimaryDatacenter: s.InPrimaryDatacenter(),
|
||||
LoadAuthMethod: func(methodName string, entMeta *acl.EnterpriseMeta) (*structs.ACLAuthMethod, aclgrpc.Validator, error) {
|
||||
return s.loadAuthMethod(methodName, entMeta)
|
||||
},
|
||||
LocalTokensEnabled: s.LocalTokensEnabled,
|
||||
Logger: logger.Named("grpc-api.acl"),
|
||||
NewLogin: func() aclgrpc.Login { return s.aclLogin() },
|
||||
NewTokenWriter: func() aclgrpc.TokenWriter { return s.aclTokenWriter() },
|
||||
PrimaryDatacenter: s.config.PrimaryDatacenter,
|
||||
ValidateEnterpriseRequest: s.validateEnterpriseRequest,
|
||||
})
|
||||
s.externalACLServer.Register(s.externalGRPCServer)
|
||||
|
||||
s.externalConnectCAServer = connectca.NewServer(connectca.Config{
|
||||
Publisher: s.publisher,
|
||||
GetStore: func() connectca.StateStore { return s.FSM().State() },
|
||||
Logger: logger.Named("grpc-api.connect-ca"),
|
||||
ACLResolver: s.ACLResolver,
|
||||
CAManager: s.caManager,
|
||||
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||
return s.ForwardGRPC(s.grpcConnPool, info, fn)
|
||||
},
|
||||
ConnectEnabled: s.config.ConnectEnabled,
|
||||
})
|
||||
s.externalConnectCAServer.Register(s.externalGRPCServer)
|
||||
|
||||
dataplane.NewServer(dataplane.Config{
|
||||
GetStore: func() dataplane.StateStore { return s.FSM().State() },
|
||||
Logger: logger.Named("grpc-api.dataplane"),
|
||||
ACLResolver: s.ACLResolver,
|
||||
Datacenter: s.config.Datacenter,
|
||||
}).Register(s.externalGRPCServer)
|
||||
|
||||
serverdiscovery.NewServer(serverdiscovery.Config{
|
||||
Publisher: s.publisher,
|
||||
ACLResolver: s.ACLResolver,
|
||||
Logger: logger.Named("grpc-api.server-discovery"),
|
||||
}).Register(s.externalGRPCServer)
|
||||
|
||||
s.peeringBackend = NewPeeringBackend(s)
|
||||
s.operatorBackend = NewOperatorBackend(s)
|
||||
|
||||
s.peerStreamServer = peerstream.NewServer(peerstream.Config{
|
||||
Backend: s.peeringBackend,
|
||||
GetStore: func() peerstream.StateStore { return s.FSM().State() },
|
||||
Logger: logger.Named("grpc-api.peerstream"),
|
||||
ACLResolver: s.ACLResolver,
|
||||
Datacenter: s.config.Datacenter,
|
||||
ConnectEnabled: s.config.ConnectEnabled,
|
||||
ForwardRPC: func(info structs.RPCInfo, fn func(*grpc.ClientConn) error) (bool, error) {
|
||||
// Only forward the request if the dc in the request matches the server's datacenter.
|
||||
if info.RequestDatacenter() != "" && info.RequestDatacenter() != config.Datacenter {
|
||||
return false, fmt.Errorf("requests to generate peering tokens cannot be forwarded to remote datacenters")
|
||||
}
|
||||
return s.ForwardGRPC(s.grpcConnPool, info, fn)
|
||||
},
|
||||
})
|
||||
s.peerStreamServer.Register(s.externalGRPCServer)
|
||||
|
||||
resource.NewServer(resource.Config{}).Register(s.externalGRPCServer)
|
||||
}
|
||||
|
||||
// Shutdown is used to shutdown the server
|
||||
func (s *Server) Shutdown() error {
|
||||
s.logger.Info("shutting down server")
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
Config
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
}
|
||||
|
||||
func NewServer(cfg Config) *Server {
|
||||
return &Server{cfg}
|
||||
}
|
||||
|
||||
var _ pbresource.ResourceServiceServer = (*Server)(nil)
|
||||
|
||||
func (s *Server) Register(grpcServer *grpc.Server) {
|
||||
pbresource.RegisterResourceServiceServer(grpcServer, s)
|
||||
}
|
||||
|
||||
func (s *Server) Read(ctx context.Context, req *pbresource.ReadRequest) (*pbresource.ReadResponse, error) {
|
||||
// TODO
|
||||
return &pbresource.ReadResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) Write(ctx context.Context, req *pbresource.WriteRequest) (*pbresource.WriteResponse, error) {
|
||||
// TODO
|
||||
return &pbresource.WriteResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) WriteStatus(ctx context.Context, req *pbresource.WriteStatusRequest) (*pbresource.WriteStatusResponse, error) {
|
||||
// TODO
|
||||
return &pbresource.WriteStatusResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) List(ctx context.Context, req *pbresource.ListRequest) (*pbresource.ListResponse, error) {
|
||||
// TODO
|
||||
return &pbresource.ListResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) Delete(ctx context.Context, req *pbresource.DeleteRequest) (*pbresource.DeleteResponse, error) {
|
||||
// TODO
|
||||
return &pbresource.DeleteResponse{}, nil
|
||||
}
|
||||
|
||||
func (s *Server) Watch(req *pbresource.WatchRequest, ws pbresource.ResourceService_WatchServer) error {
|
||||
// TODO
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,75 @@
|
|||
package resource
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/agent/grpc-external/testutils"
|
||||
"github.com/hashicorp/consul/proto-public/pbresource"
|
||||
)
|
||||
|
||||
func testClient(t *testing.T, server *Server) pbresource.ResourceServiceClient {
|
||||
t.Helper()
|
||||
|
||||
addr := testutils.RunTestServer(t, server)
|
||||
|
||||
//nolint:staticcheck
|
||||
conn, err := grpc.DialContext(context.Background(), addr.String(), grpc.WithInsecure())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
require.NoError(t, conn.Close())
|
||||
})
|
||||
|
||||
return pbresource.NewResourceServiceClient(conn)
|
||||
}
|
||||
|
||||
func TestRead_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
client := testClient(t, server)
|
||||
resp, err := client.Read(context.Background(), &pbresource.ReadRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
}
|
||||
|
||||
func TestWrite_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
client := testClient(t, server)
|
||||
resp, err := client.Write(context.Background(), &pbresource.WriteRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
}
|
||||
|
||||
func TestWriteStatus_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
client := testClient(t, server)
|
||||
resp, err := client.WriteStatus(context.Background(), &pbresource.WriteStatusRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
}
|
||||
|
||||
func TestList_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
client := testClient(t, server)
|
||||
resp, err := client.List(context.Background(), &pbresource.ListRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
}
|
||||
|
||||
func TestDelete_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
client := testClient(t, server)
|
||||
resp, err := client.Delete(context.Background(), &pbresource.DeleteRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, resp)
|
||||
}
|
||||
|
||||
func TestWatch_TODO(t *testing.T) {
|
||||
server := NewServer(Config{})
|
||||
client := testClient(t, server)
|
||||
wc, err := client.Watch(context.Background(), &pbresource.WatchRequest{})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, wc)
|
||||
}
|
|
@ -22,6 +22,12 @@ var rpcRateLimitSpecs = map[string]rate.OperationType{
|
|||
"/hashicorp.consul.internal.peering.PeeringService/TrustBundleRead": rate.OperationTypeRead,
|
||||
"/hashicorp.consul.internal.peerstream.PeerStreamService/ExchangeSecret": rate.OperationTypeWrite,
|
||||
"/hashicorp.consul.internal.peerstream.PeerStreamService/StreamResources": rate.OperationTypeRead,
|
||||
"/hashicorp.consul.resource.ResourceService/Delete": rate.OperationTypeWrite,
|
||||
"/hashicorp.consul.resource.ResourceService/List": rate.OperationTypeRead,
|
||||
"/hashicorp.consul.resource.ResourceService/Read": rate.OperationTypeRead,
|
||||
"/hashicorp.consul.resource.ResourceService/Watch": rate.OperationTypeRead,
|
||||
"/hashicorp.consul.resource.ResourceService/Write": rate.OperationTypeWrite,
|
||||
"/hashicorp.consul.resource.ResourceService/WriteStatus": rate.OperationTypeWrite,
|
||||
"/hashicorp.consul.serverdiscovery.ServerDiscoveryService/WatchServers": rate.OperationTypeRead,
|
||||
"/subscribe.StateChangeSubscription/Subscribe": rate.OperationTypeRead,
|
||||
}
|
||||
|
|
|
@ -0,0 +1,178 @@
|
|||
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
|
||||
// source: pbresource/resource.proto
|
||||
|
||||
package pbresource
|
||||
|
||||
import (
|
||||
"google.golang.org/protobuf/proto"
|
||||
)
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Type) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Type) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Tenancy) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Tenancy) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ID) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ID) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *Resource) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *Resource) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WatchEvent) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WatchEvent) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ReadRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ReadRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ReadResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ReadResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ListRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ListRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *ListResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *ListResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteStatusResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteStatusResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WriteStatusRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WriteStatusRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *DeleteRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *DeleteRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *DeleteResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *DeleteResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WatchRequest) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WatchRequest) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
||||
|
||||
// MarshalBinary implements encoding.BinaryMarshaler
|
||||
func (msg *WatchResponse) MarshalBinary() ([]byte, error) {
|
||||
return proto.Marshal(msg)
|
||||
}
|
||||
|
||||
// UnmarshalBinary implements encoding.BinaryUnmarshaler
|
||||
func (msg *WatchResponse) UnmarshalBinary(b []byte) error {
|
||||
return proto.Unmarshal(b, msg)
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,141 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package hashicorp.consul.resource;
|
||||
|
||||
import "annotations/ratelimit/ratelimit.proto";
|
||||
import "google/protobuf/any.proto";
|
||||
|
||||
message Type {
|
||||
string group = 1;
|
||||
string group_version = 2;
|
||||
string kind = 3;
|
||||
}
|
||||
|
||||
message Tenancy {
|
||||
string partition = 1;
|
||||
string namespace = 2;
|
||||
string peer_name = 3;
|
||||
}
|
||||
|
||||
message ID {
|
||||
string uid = 1;
|
||||
string name = 2;
|
||||
Type type = 3;
|
||||
Tenancy tenancy = 4;
|
||||
}
|
||||
|
||||
message Resource {
|
||||
ID id = 1;
|
||||
ID owner = 2;
|
||||
string version = 3;
|
||||
string generation = 4;
|
||||
|
||||
map<string, string> metadata = 5;
|
||||
reserved 6; // status
|
||||
|
||||
google.protobuf.Any data = 7;
|
||||
}
|
||||
|
||||
message WatchEvent {
|
||||
enum Operation {
|
||||
OPERATION_UNSPECIFIED = 0;
|
||||
OPERATION_UPSERT = 1;
|
||||
OPERATION_DELETE = 2;
|
||||
}
|
||||
|
||||
Operation operation = 1;
|
||||
Resource resource = 2;
|
||||
}
|
||||
|
||||
service ResourceService {
|
||||
rpc Read(ReadRequest) returns (ReadResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {operation_type: OPERATION_TYPE_READ};
|
||||
}
|
||||
|
||||
rpc Write(WriteRequest) returns (WriteResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {operation_type: OPERATION_TYPE_WRITE};
|
||||
}
|
||||
|
||||
rpc WriteStatus(WriteStatusRequest) returns (WriteStatusResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {operation_type: OPERATION_TYPE_WRITE};
|
||||
}
|
||||
|
||||
rpc List(ListRequest) returns (ListResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {operation_type: OPERATION_TYPE_READ};
|
||||
}
|
||||
|
||||
rpc Delete(DeleteRequest) returns (DeleteResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {operation_type: OPERATION_TYPE_WRITE};
|
||||
}
|
||||
|
||||
rpc Watch(WatchRequest) returns (stream WatchResponse) {
|
||||
option (hashicorp.consul.internal.ratelimit.spec) = {operation_type: OPERATION_TYPE_READ};
|
||||
}
|
||||
}
|
||||
|
||||
enum Condition {
|
||||
CONDITION_UNSPECIFIED = 0;
|
||||
CONDITION_ACCEPTED = 1;
|
||||
CONDITION_INVALID = 2;
|
||||
CONDITION_PERSISTENT_FAILURE = 3;
|
||||
}
|
||||
|
||||
message ReadRequest {
|
||||
ID id = 1;
|
||||
}
|
||||
|
||||
message ReadResponse {
|
||||
Resource resource = 1;
|
||||
}
|
||||
|
||||
message ListRequest {
|
||||
Type type = 1;
|
||||
Tenancy tenancy = 2;
|
||||
string name_prefix = 3;
|
||||
}
|
||||
|
||||
message ListResponse {
|
||||
repeated Resource resources = 1;
|
||||
}
|
||||
|
||||
message WriteRequest {
|
||||
Resource resource = 1;
|
||||
}
|
||||
|
||||
message WriteResponse {
|
||||
Resource resource = 1;
|
||||
}
|
||||
|
||||
message WriteStatusResponse {
|
||||
Resource resource = 1;
|
||||
}
|
||||
|
||||
message WriteStatusRequest {
|
||||
ID id = 1;
|
||||
string version = 2;
|
||||
string key = 3;
|
||||
Condition condition = 4;
|
||||
string state = 5;
|
||||
repeated string messages = 6;
|
||||
}
|
||||
|
||||
message DeleteRequest {
|
||||
ID id = 1;
|
||||
string version = 2;
|
||||
}
|
||||
|
||||
message DeleteResponse {}
|
||||
|
||||
message WatchRequest {
|
||||
ID id = 1;
|
||||
}
|
||||
|
||||
message WatchResponse {
|
||||
enum Operation {
|
||||
OPERATION_UNSPECIFIED = 0;
|
||||
OPERATION_UPSERT = 1;
|
||||
OPERATION_DELETE = 2;
|
||||
}
|
||||
Operation operation = 1;
|
||||
Resource resource = 2;
|
||||
}
|
|
@ -0,0 +1,311 @@
|
|||
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
|
||||
// versions:
|
||||
// - protoc-gen-go-grpc v1.2.0
|
||||
// - protoc (unknown)
|
||||
// source: pbresource/resource.proto
|
||||
|
||||
package pbresource
|
||||
|
||||
import (
|
||||
context "context"
|
||||
grpc "google.golang.org/grpc"
|
||||
codes "google.golang.org/grpc/codes"
|
||||
status "google.golang.org/grpc/status"
|
||||
)
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
// Requires gRPC-Go v1.32.0 or later.
|
||||
const _ = grpc.SupportPackageIsVersion7
|
||||
|
||||
// ResourceServiceClient is the client API for ResourceService service.
|
||||
//
|
||||
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
|
||||
type ResourceServiceClient interface {
|
||||
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
|
||||
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
|
||||
WriteStatus(ctx context.Context, in *WriteStatusRequest, opts ...grpc.CallOption) (*WriteStatusResponse, error)
|
||||
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
|
||||
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error)
|
||||
Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceService_WatchClient, error)
|
||||
}
|
||||
|
||||
type resourceServiceClient struct {
|
||||
cc grpc.ClientConnInterface
|
||||
}
|
||||
|
||||
func NewResourceServiceClient(cc grpc.ClientConnInterface) ResourceServiceClient {
|
||||
return &resourceServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *resourceServiceClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
|
||||
out := new(ReadResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.resource.ResourceService/Read", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceServiceClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) {
|
||||
out := new(WriteResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.resource.ResourceService/Write", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceServiceClient) WriteStatus(ctx context.Context, in *WriteStatusRequest, opts ...grpc.CallOption) (*WriteStatusResponse, error) {
|
||||
out := new(WriteStatusResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.resource.ResourceService/WriteStatus", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceServiceClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
|
||||
out := new(ListResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.resource.ResourceService/List", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceServiceClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) {
|
||||
out := new(DeleteResponse)
|
||||
err := c.cc.Invoke(ctx, "/hashicorp.consul.resource.ResourceService/Delete", in, out, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
func (c *resourceServiceClient) Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceService_WatchClient, error) {
|
||||
stream, err := c.cc.NewStream(ctx, &ResourceService_ServiceDesc.Streams[0], "/hashicorp.consul.resource.ResourceService/Watch", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &resourceServiceWatchClient{stream}
|
||||
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := x.ClientStream.CloseSend(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type ResourceService_WatchClient interface {
|
||||
Recv() (*WatchResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type resourceServiceWatchClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *resourceServiceWatchClient) Recv() (*WatchResponse, error) {
|
||||
m := new(WatchResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
// ResourceServiceServer is the server API for ResourceService service.
|
||||
// All implementations should embed UnimplementedResourceServiceServer
|
||||
// for forward compatibility
|
||||
type ResourceServiceServer interface {
|
||||
Read(context.Context, *ReadRequest) (*ReadResponse, error)
|
||||
Write(context.Context, *WriteRequest) (*WriteResponse, error)
|
||||
WriteStatus(context.Context, *WriteStatusRequest) (*WriteStatusResponse, error)
|
||||
List(context.Context, *ListRequest) (*ListResponse, error)
|
||||
Delete(context.Context, *DeleteRequest) (*DeleteResponse, error)
|
||||
Watch(*WatchRequest, ResourceService_WatchServer) error
|
||||
}
|
||||
|
||||
// UnimplementedResourceServiceServer should be embedded to have forward compatible implementations.
|
||||
type UnimplementedResourceServiceServer struct {
|
||||
}
|
||||
|
||||
func (UnimplementedResourceServiceServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
|
||||
}
|
||||
func (UnimplementedResourceServiceServer) Write(context.Context, *WriteRequest) (*WriteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Write not implemented")
|
||||
}
|
||||
func (UnimplementedResourceServiceServer) WriteStatus(context.Context, *WriteStatusRequest) (*WriteStatusResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method WriteStatus not implemented")
|
||||
}
|
||||
func (UnimplementedResourceServiceServer) List(context.Context, *ListRequest) (*ListResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
|
||||
}
|
||||
func (UnimplementedResourceServiceServer) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) {
|
||||
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
|
||||
}
|
||||
func (UnimplementedResourceServiceServer) Watch(*WatchRequest, ResourceService_WatchServer) error {
|
||||
return status.Errorf(codes.Unimplemented, "method Watch not implemented")
|
||||
}
|
||||
|
||||
// UnsafeResourceServiceServer may be embedded to opt out of forward compatibility for this service.
|
||||
// Use of this interface is not recommended, as added methods to ResourceServiceServer will
|
||||
// result in compilation errors.
|
||||
type UnsafeResourceServiceServer interface {
|
||||
mustEmbedUnimplementedResourceServiceServer()
|
||||
}
|
||||
|
||||
func RegisterResourceServiceServer(s grpc.ServiceRegistrar, srv ResourceServiceServer) {
|
||||
s.RegisterService(&ResourceService_ServiceDesc, srv)
|
||||
}
|
||||
|
||||
func _ResourceService_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ReadRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceServiceServer).Read(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.resource.ResourceService/Read",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceServiceServer).Read(ctx, req.(*ReadRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceService_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(WriteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceServiceServer).Write(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.resource.ResourceService/Write",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceServiceServer).Write(ctx, req.(*WriteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceService_WriteStatus_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(WriteStatusRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceServiceServer).WriteStatus(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.resource.ResourceService/WriteStatus",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceServiceServer).WriteStatus(ctx, req.(*WriteStatusRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceService_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(ListRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceServiceServer).List(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.resource.ResourceService/List",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceServiceServer).List(ctx, req.(*ListRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceService_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
|
||||
in := new(DeleteRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if interceptor == nil {
|
||||
return srv.(ResourceServiceServer).Delete(ctx, in)
|
||||
}
|
||||
info := &grpc.UnaryServerInfo{
|
||||
Server: srv,
|
||||
FullMethod: "/hashicorp.consul.resource.ResourceService/Delete",
|
||||
}
|
||||
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
|
||||
return srv.(ResourceServiceServer).Delete(ctx, req.(*DeleteRequest))
|
||||
}
|
||||
return interceptor(ctx, in, info, handler)
|
||||
}
|
||||
|
||||
func _ResourceService_Watch_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(WatchRequest)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return srv.(ResourceServiceServer).Watch(m, &resourceServiceWatchServer{stream})
|
||||
}
|
||||
|
||||
type ResourceService_WatchServer interface {
|
||||
Send(*WatchResponse) error
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type resourceServiceWatchServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *resourceServiceWatchServer) Send(m *WatchResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
// ResourceService_ServiceDesc is the grpc.ServiceDesc for ResourceService service.
|
||||
// It's only intended for direct use with grpc.RegisterService,
|
||||
// and not to be introspected or modified (even as a copy)
|
||||
var ResourceService_ServiceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "hashicorp.consul.resource.ResourceService",
|
||||
HandlerType: (*ResourceServiceServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "Read",
|
||||
Handler: _ResourceService_Read_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Write",
|
||||
Handler: _ResourceService_Write_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "WriteStatus",
|
||||
Handler: _ResourceService_WriteStatus_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "List",
|
||||
Handler: _ResourceService_List_Handler,
|
||||
},
|
||||
{
|
||||
MethodName: "Delete",
|
||||
Handler: _ResourceService_Delete_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "Watch",
|
||||
Handler: _ResourceService_Watch_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
},
|
||||
Metadata: "pbresource/resource.proto",
|
||||
}
|
Loading…
Reference in New Issue