diff --git a/agent/grpc/handler.go b/agent/grpc/handler.go index 82577ed699..ab23537ffa 100644 --- a/agent/grpc/handler.go +++ b/agent/grpc/handler.go @@ -10,10 +10,8 @@ import ( "google.golang.org/grpc" ) -// NewHandler returns a Handler for addr. +// NewHandler returns a gRPC server that accepts connections from Handle(conn). func NewHandler(addr net.Addr) *Handler { - conns := make(chan net.Conn) - // We don't need to pass tls.Config to the server since it's multiplexed // behind the RPC listener, which already has TLS configured. srv := grpc.NewServer( @@ -24,23 +22,21 @@ func NewHandler(addr net.Addr) *Handler { // TODO(streaming): add gRPC services to srv here return &Handler{ - conns: conns, srv: srv, - listener: &chanListener{addr: addr, conns: conns}, + listener: &chanListener{addr: addr, conns: make(chan net.Conn)}, } } // Handler implements a handler for the rpc server listener, and the // agent.Component interface for managing the lifecycle of the grpc.Server. type Handler struct { - conns chan net.Conn srv *grpc.Server listener *chanListener } // Handle the connection by sending it to a channel for the grpc.Server to receive. func (h *Handler) Handle(conn net.Conn) { - h.conns <- conn + h.listener.conns <- conn } func (h *Handler) Run() error { diff --git a/agent/grpc/internal/testservice/simple.pb.binary.go b/agent/grpc/internal/testservice/simple.pb.binary.go new file mode 100644 index 0000000000..ef203aaa64 --- /dev/null +++ b/agent/grpc/internal/testservice/simple.pb.binary.go @@ -0,0 +1,28 @@ +// Code generated by protoc-gen-go-binary. DO NOT EDIT. +// source: agent/grpc/internal/testservice/simple.proto + +package testservice + +import ( + "github.com/golang/protobuf/proto" +) + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *Req) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *Req) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} + +// MarshalBinary implements encoding.BinaryMarshaler +func (msg *Resp) MarshalBinary() ([]byte, error) { + return proto.Marshal(msg) +} + +// UnmarshalBinary implements encoding.BinaryUnmarshaler +func (msg *Resp) UnmarshalBinary(b []byte) error { + return proto.Unmarshal(b, msg) +} diff --git a/agent/grpc/internal/testservice/simple.pb.go b/agent/grpc/internal/testservice/simple.pb.go new file mode 100644 index 0000000000..ee6ebc1ec2 --- /dev/null +++ b/agent/grpc/internal/testservice/simple.pb.go @@ -0,0 +1,691 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: agent/grpc/internal/testservice/simple.proto + +package testservice + +import ( + context "context" + fmt "fmt" + proto "github.com/golang/protobuf/proto" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" + io "io" + math "math" + math_bits "math/bits" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Req struct { + Datacenter string `protobuf:"bytes,1,opt,name=Datacenter,proto3" json:"Datacenter,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Req) Reset() { *m = Req{} } +func (m *Req) String() string { return proto.CompactTextString(m) } +func (*Req) ProtoMessage() {} +func (*Req) Descriptor() ([]byte, []int) { + return fileDescriptor_3009a77c573f826d, []int{0} +} +func (m *Req) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Req) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Req.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Req) XXX_Merge(src proto.Message) { + xxx_messageInfo_Req.Merge(m, src) +} +func (m *Req) XXX_Size() int { + return m.Size() +} +func (m *Req) XXX_DiscardUnknown() { + xxx_messageInfo_Req.DiscardUnknown(m) +} + +var xxx_messageInfo_Req proto.InternalMessageInfo + +func (m *Req) GetDatacenter() string { + if m != nil { + return m.Datacenter + } + return "" +} + +type Resp struct { + ServerName string `protobuf:"bytes,1,opt,name=ServerName,proto3" json:"ServerName,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Resp) Reset() { *m = Resp{} } +func (m *Resp) String() string { return proto.CompactTextString(m) } +func (*Resp) ProtoMessage() {} +func (*Resp) Descriptor() ([]byte, []int) { + return fileDescriptor_3009a77c573f826d, []int{1} +} +func (m *Resp) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Resp) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Resp.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Resp) XXX_Merge(src proto.Message) { + xxx_messageInfo_Resp.Merge(m, src) +} +func (m *Resp) XXX_Size() int { + return m.Size() +} +func (m *Resp) XXX_DiscardUnknown() { + xxx_messageInfo_Resp.DiscardUnknown(m) +} + +var xxx_messageInfo_Resp proto.InternalMessageInfo + +func (m *Resp) GetServerName() string { + if m != nil { + return m.ServerName + } + return "" +} + +func init() { + proto.RegisterType((*Req)(nil), "testservice.Req") + proto.RegisterType((*Resp)(nil), "testservice.Resp") +} + +func init() { + proto.RegisterFile("agent/grpc/internal/testservice/simple.proto", fileDescriptor_3009a77c573f826d) +} + +var fileDescriptor_3009a77c573f826d = []byte{ + // 200 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x49, 0x4c, 0x4f, 0xcd, + 0x2b, 0xd1, 0x4f, 0x2f, 0x2a, 0x48, 0xd6, 0xcf, 0xcc, 0x2b, 0x49, 0x2d, 0xca, 0x4b, 0xcc, 0xd1, + 0x2f, 0x49, 0x2d, 0x2e, 0x29, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0xd5, 0x2f, 0xce, 0xcc, 0x2d, + 0xc8, 0x49, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x46, 0x92, 0x51, 0x52, 0xe5, 0x62, + 0x0e, 0x4a, 0x2d, 0x14, 0x92, 0xe3, 0xe2, 0x72, 0x49, 0x2c, 0x49, 0x4c, 0x4e, 0x05, 0xe9, 0x96, + 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x0c, 0x42, 0x12, 0x51, 0x52, 0xe3, 0x62, 0x09, 0x4a, 0x2d, 0x2e, + 0x00, 0xa9, 0x0b, 0x4e, 0x2d, 0x2a, 0x4b, 0x2d, 0xf2, 0x4b, 0xcc, 0x4d, 0x85, 0xa9, 0x43, 0x88, + 0x18, 0xe5, 0x72, 0xb1, 0x05, 0x83, 0xed, 0x12, 0x32, 0xe2, 0xe2, 0x0c, 0xce, 0xcf, 0x4d, 0x2d, + 0xc9, 0xc8, 0xcc, 0x4b, 0x17, 0x12, 0xd0, 0x43, 0xb2, 0x53, 0x2f, 0x28, 0xb5, 0x50, 0x4a, 0x10, + 0x4d, 0xa4, 0xb8, 0x40, 0x89, 0x41, 0x48, 0x9f, 0x8b, 0xc5, 0x2d, 0x27, 0xbf, 0x9c, 0x48, 0xe5, + 0x06, 0x8c, 0x4e, 0x02, 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, + 0xe3, 0x8c, 0xc7, 0x72, 0x0c, 0x49, 0x6c, 0x60, 0x3f, 0x1a, 0x03, 0x02, 0x00, 0x00, 0xff, 0xff, + 0x61, 0xd3, 0x5e, 0xba, 0x13, 0x01, 0x00, 0x00, +} + +// Reference imports to suppress errors if they are not otherwise used. +var _ context.Context +var _ grpc.ClientConn + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +const _ = grpc.SupportPackageIsVersion4 + +// SimpleClient is the client API for Simple service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream. +type SimpleClient interface { + Something(ctx context.Context, in *Req, opts ...grpc.CallOption) (*Resp, error) + Flow(ctx context.Context, in *Req, opts ...grpc.CallOption) (Simple_FlowClient, error) +} + +type simpleClient struct { + cc *grpc.ClientConn +} + +func NewSimpleClient(cc *grpc.ClientConn) SimpleClient { + return &simpleClient{cc} +} + +func (c *simpleClient) Something(ctx context.Context, in *Req, opts ...grpc.CallOption) (*Resp, error) { + out := new(Resp) + err := c.cc.Invoke(ctx, "/testservice.Simple/Something", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +func (c *simpleClient) Flow(ctx context.Context, in *Req, opts ...grpc.CallOption) (Simple_FlowClient, error) { + stream, err := c.cc.NewStream(ctx, &_Simple_serviceDesc.Streams[0], "/testservice.Simple/Flow", opts...) + if err != nil { + return nil, err + } + x := &simpleFlowClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Simple_FlowClient interface { + Recv() (*Resp, error) + grpc.ClientStream +} + +type simpleFlowClient struct { + grpc.ClientStream +} + +func (x *simpleFlowClient) Recv() (*Resp, error) { + m := new(Resp) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// SimpleServer is the server API for Simple service. +type SimpleServer interface { + Something(context.Context, *Req) (*Resp, error) + Flow(*Req, Simple_FlowServer) error +} + +// UnimplementedSimpleServer can be embedded to have forward compatible implementations. +type UnimplementedSimpleServer struct { +} + +func (*UnimplementedSimpleServer) Something(ctx context.Context, req *Req) (*Resp, error) { + return nil, status.Errorf(codes.Unimplemented, "method Something not implemented") +} +func (*UnimplementedSimpleServer) Flow(req *Req, srv Simple_FlowServer) error { + return status.Errorf(codes.Unimplemented, "method Flow not implemented") +} + +func RegisterSimpleServer(s *grpc.Server, srv SimpleServer) { + s.RegisterService(&_Simple_serviceDesc, srv) +} + +func _Simple_Something_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(Req) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SimpleServer).Something(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/testservice.Simple/Something", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SimpleServer).Something(ctx, req.(*Req)) + } + return interceptor(ctx, in, info, handler) +} + +func _Simple_Flow_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Req) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(SimpleServer).Flow(m, &simpleFlowServer{stream}) +} + +type Simple_FlowServer interface { + Send(*Resp) error + grpc.ServerStream +} + +type simpleFlowServer struct { + grpc.ServerStream +} + +func (x *simpleFlowServer) Send(m *Resp) error { + return x.ServerStream.SendMsg(m) +} + +var _Simple_serviceDesc = grpc.ServiceDesc{ + ServiceName: "testservice.Simple", + HandlerType: (*SimpleServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Something", + Handler: _Simple_Something_Handler, + }, + }, + Streams: []grpc.StreamDesc{ + { + StreamName: "Flow", + Handler: _Simple_Flow_Handler, + ServerStreams: true, + }, + }, + Metadata: "agent/grpc/internal/testservice/simple.proto", +} + +func (m *Req) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Req) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Req) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.Datacenter) > 0 { + i -= len(m.Datacenter) + copy(dAtA[i:], m.Datacenter) + i = encodeVarintSimple(dAtA, i, uint64(len(m.Datacenter))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func (m *Resp) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Resp) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *Resp) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.XXX_unrecognized != nil { + i -= len(m.XXX_unrecognized) + copy(dAtA[i:], m.XXX_unrecognized) + } + if len(m.ServerName) > 0 { + i -= len(m.ServerName) + copy(dAtA[i:], m.ServerName) + i = encodeVarintSimple(dAtA, i, uint64(len(m.ServerName))) + i-- + dAtA[i] = 0xa + } + return len(dAtA) - i, nil +} + +func encodeVarintSimple(dAtA []byte, offset int, v uint64) int { + offset -= sovSimple(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *Req) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.Datacenter) + if l > 0 { + n += 1 + l + sovSimple(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *Resp) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + l = len(m.ServerName) + if l > 0 { + n += 1 + l + sovSimple(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovSimple(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozSimple(x uint64) (n int) { + return sovSimple(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Req) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSimple + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Req: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Req: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Datacenter", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSimple + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSimple + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSimple + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Datacenter = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSimple(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSimple + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthSimple + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *Resp) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSimple + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Resp: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Resp: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field ServerName", wireType) + } + var stringLen uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowSimple + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + stringLen |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + intStringLen := int(stringLen) + if intStringLen < 0 { + return ErrInvalidLengthSimple + } + postIndex := iNdEx + intStringLen + if postIndex < 0 { + return ErrInvalidLengthSimple + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.ServerName = string(dAtA[iNdEx:postIndex]) + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipSimple(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthSimple + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthSimple + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipSimple(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSimple + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSimple + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSimple + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthSimple + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthSimple + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowSimple + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipSimple(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthSimple + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthSimple = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowSimple = fmt.Errorf("proto: integer overflow") +) diff --git a/agent/grpc/internal/testservice/simple.proto b/agent/grpc/internal/testservice/simple.proto new file mode 100644 index 0000000000..bffa86def5 --- /dev/null +++ b/agent/grpc/internal/testservice/simple.proto @@ -0,0 +1,17 @@ +syntax = "proto3"; + +package testservice; + +// Simple service is used to test gRPC plumbing. +service Simple { + rpc Something(Req) returns (Resp) {} + rpc Flow(Req) returns (stream Resp) {} +} + +message Req { + string Datacenter = 1; +} + +message Resp { + string ServerName = 1; +} \ No newline at end of file diff --git a/agent/grpc/stats.go b/agent/grpc/stats.go index 40821244cf..cbf443878e 100644 --- a/agent/grpc/stats.go +++ b/agent/grpc/stats.go @@ -70,7 +70,6 @@ func (i *activeStreamCounter) Intercept( _ *grpc.StreamServerInfo, handler grpc.StreamHandler, ) error { - count := atomic.AddUint64(&i.count, 1) metrics.SetGauge([]string{"grpc", "server", "active_streams"}, float32(count)) defer func() { diff --git a/agent/grpc/stats_test.go b/agent/grpc/stats_test.go new file mode 100644 index 0000000000..cc99100701 --- /dev/null +++ b/agent/grpc/stats_test.go @@ -0,0 +1,124 @@ +package grpc + +import ( + "context" + "net" + "testing" + "time" + + "github.com/armon/go-metrics" + "github.com/hashicorp/consul/agent/grpc/internal/testservice" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc" +) + +func TestHandler_EmitsStats(t *testing.T) { + sink := patchGlobalMetrics(t) + + addr := &net.IPAddr{IP: net.ParseIP("127.0.0.1")} + handler := NewHandler(addr) + + testservice.RegisterSimpleServer(handler.srv, &simple{}) + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + defer lis.Close() + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + g, ctx := errgroup.WithContext(ctx) + g.Go(func() error { + return handler.srv.Serve(lis) + }) + t.Cleanup(func() { + if err := handler.Shutdown(); err != nil { + t.Logf("grpc server shutdown: %v", err) + } + if err := g.Wait(); err != nil { + t.Logf("grpc server error: %v", err) + } + }) + + conn, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithInsecure()) + require.NoError(t, err) + defer conn.Close() + + client := testservice.NewSimpleClient(conn) + fClient, err := client.Flow(ctx, &testservice.Req{Datacenter: "mine"}) + require.NoError(t, err) + + // Wait for the first event so that we know the stream is sending. + _, err = fClient.Recv() + require.NoError(t, err) + + expectedCounter := []metricCall{ + {key: []string{"testing", "grpc", "server", "request"}, val: 1}, + } + require.Equal(t, expectedCounter, sink.incrCounterCalls) + expectedGauge := []metricCall{ + {key: []string{"testing", "grpc", "server", "active_conns"}, val: 1}, + {key: []string{"testing", "grpc", "server", "active_streams"}, val: 1}, + // TODO: why is the count reset to 0 before the client receives the second message? + {key: []string{"testing", "grpc", "server", "active_streams"}, val: 0}, + } + require.Equal(t, expectedGauge, sink.gaugeCalls) +} + +type simple struct { + name string +} + +func (s *simple) Flow(_ *testservice.Req, flow testservice.Simple_FlowServer) error { + if err := flow.Send(&testservice.Resp{ServerName: "one"}); err != nil { + return err + } + if err := flow.Send(&testservice.Resp{ServerName: "two"}); err != nil { + return err + } + return nil +} + +func (s *simple) Something(_ context.Context, _ *testservice.Req) (*testservice.Resp, error) { + return &testservice.Resp{ServerName: "the-fake-service-name"}, nil +} + +func patchGlobalMetrics(t *testing.T) *fakeMetricsSink { + t.Helper() + + sink := &fakeMetricsSink{} + cfg := &metrics.Config{ + ServiceName: "testing", + TimerGranularity: time.Millisecond, // Timers are in milliseconds + ProfileInterval: time.Second, // Poll runtime every second + FilterDefault: true, + } + _, err := metrics.NewGlobal(cfg, sink) + require.NoError(t, err) + t.Cleanup(func() { + _, err = metrics.NewGlobal(cfg, &metrics.BlackholeSink{}) + require.NoError(t, err, "failed to reset global metrics") + }) + return sink +} + +type fakeMetricsSink struct { + metrics.BlackholeSink + gaugeCalls []metricCall + incrCounterCalls []metricCall +} + +func (f *fakeMetricsSink) SetGaugeWithLabels(key []string, val float32, labels []metrics.Label) { + f.gaugeCalls = append(f.gaugeCalls, metricCall{key: key, val: val, labels: labels}) +} + +func (f *fakeMetricsSink) IncrCounterWithLabels(key []string, val float32, labels []metrics.Label) { + f.incrCounterCalls = append(f.incrCounterCalls, metricCall{key: key, val: val, labels: labels}) +} + +type metricCall struct { + key []string + val float32 + labels []metrics.Label +}