mirror of
https://github.com/status-im/consul.git
synced 2025-01-09 13:26:07 +00:00
5fb9df1640
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
269 lines
6.5 KiB
Go
269 lines
6.5 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package raft
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net"
|
|
"sync"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
"google.golang.org/protobuf/types/known/emptypb"
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
grpcinternal "github.com/hashicorp/consul/agent/grpc-internal"
|
|
"github.com/hashicorp/consul/internal/storage"
|
|
pbstorage "github.com/hashicorp/consul/proto/private/pbstorage"
|
|
)
|
|
|
|
// forwardingServer implements the gRPC forwarding service.
|
|
type forwardingServer struct {
|
|
backend *Backend
|
|
listener *grpcinternal.Listener
|
|
}
|
|
|
|
var _ pbstorage.ForwardingServiceServer = (*forwardingServer)(nil)
|
|
|
|
func newForwardingServer(backend *Backend) *forwardingServer {
|
|
return &forwardingServer{
|
|
backend: backend,
|
|
|
|
// The address here doesn't actually matter. gRPC uses it as an identifier
|
|
// internally, but we only bind the server to a single listener.
|
|
listener: grpcinternal.NewListener(&net.TCPAddr{
|
|
IP: net.ParseIP("0.0.0.0"),
|
|
Port: 0,
|
|
}),
|
|
}
|
|
}
|
|
|
|
func (s *forwardingServer) Write(ctx context.Context, req *pbstorage.WriteRequest) (*pbstorage.WriteResponse, error) {
|
|
rsp, err := s.raftApply(ctx, &pbstorage.Log{
|
|
Type: pbstorage.LogType_LOG_TYPE_WRITE,
|
|
Request: &pbstorage.Log_Write{Write: req},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return rsp.GetWrite(), nil
|
|
}
|
|
|
|
func (s *forwardingServer) Delete(ctx context.Context, req *pbstorage.DeleteRequest) (*emptypb.Empty, error) {
|
|
_, err := s.raftApply(ctx, &pbstorage.Log{
|
|
Type: pbstorage.LogType_LOG_TYPE_DELETE,
|
|
Request: &pbstorage.Log_Delete{Delete: req},
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &emptypb.Empty{}, nil
|
|
}
|
|
|
|
func (s *forwardingServer) Read(ctx context.Context, req *pbstorage.ReadRequest) (*pbstorage.ReadResponse, error) {
|
|
res, err := s.backend.leaderRead(ctx, req.Id)
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
return &pbstorage.ReadResponse{Resource: res}, nil
|
|
}
|
|
|
|
func (s *forwardingServer) List(ctx context.Context, req *pbstorage.ListRequest) (*pbstorage.ListResponse, error) {
|
|
res, err := s.backend.leaderList(ctx, storage.UnversionedTypeFrom(req.Type), req.Tenancy, req.NamePrefix)
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
return &pbstorage.ListResponse{Resources: res}, nil
|
|
}
|
|
|
|
func (s *forwardingServer) raftApply(_ context.Context, req *pbstorage.Log) (*pbstorage.LogResponse, error) {
|
|
msg, err := req.MarshalBinary()
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
|
|
rsp, err := s.backend.handle.Apply(msg)
|
|
if err != nil {
|
|
return nil, wrapError(err)
|
|
}
|
|
|
|
switch t := rsp.(type) {
|
|
case *pbstorage.LogResponse:
|
|
return t, nil
|
|
default:
|
|
return nil, status.Errorf(codes.Internal, "unexpected response from Raft apply: %T", rsp)
|
|
}
|
|
}
|
|
|
|
func (s *forwardingServer) run(ctx context.Context) error {
|
|
server := grpc.NewServer()
|
|
pbstorage.RegisterForwardingServiceServer(server, s)
|
|
|
|
go func() {
|
|
<-ctx.Done()
|
|
server.Stop()
|
|
}()
|
|
|
|
return server.Serve(s.listener)
|
|
}
|
|
|
|
// forwardingClient is used to forward operations to the leader.
|
|
type forwardingClient struct {
|
|
handle Handle
|
|
logger hclog.Logger
|
|
|
|
mu sync.RWMutex
|
|
conn *grpc.ClientConn
|
|
}
|
|
|
|
func newForwardingClient(h Handle, l hclog.Logger) *forwardingClient {
|
|
return &forwardingClient{
|
|
handle: h,
|
|
logger: l,
|
|
}
|
|
}
|
|
|
|
func (c *forwardingClient) leaderChanged() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.conn == nil {
|
|
return
|
|
}
|
|
|
|
if err := c.conn.Close(); err != nil {
|
|
c.logger.Error("failed to close connection to previous leader", "error", err)
|
|
}
|
|
c.conn = nil
|
|
}
|
|
|
|
func (c *forwardingClient) getConn() (*grpc.ClientConn, error) {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
|
|
if c.conn != nil {
|
|
return c.conn, nil
|
|
}
|
|
|
|
conn, err := c.handle.DialLeader()
|
|
if err != nil {
|
|
c.logger.Error("failed to dial leader", "error", err)
|
|
return nil, err
|
|
}
|
|
c.conn = conn
|
|
|
|
return conn, nil
|
|
}
|
|
|
|
func (c *forwardingClient) getClient() (pbstorage.ForwardingServiceClient, error) {
|
|
conn, err := c.getConn()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return pbstorage.NewForwardingServiceClient(conn), nil
|
|
}
|
|
|
|
func (c *forwardingClient) delete(ctx context.Context, req *pbstorage.DeleteRequest) error {
|
|
client, err := c.getClient()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = client.Delete(ctx, req)
|
|
return unwrapError(err)
|
|
}
|
|
|
|
func (c *forwardingClient) write(ctx context.Context, req *pbstorage.WriteRequest) (*pbstorage.WriteResponse, error) {
|
|
client, err := c.getClient()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rsp, err := client.Write(ctx, req)
|
|
return rsp, unwrapError(err)
|
|
}
|
|
|
|
func (c *forwardingClient) read(ctx context.Context, req *pbstorage.ReadRequest) (*pbstorage.ReadResponse, error) {
|
|
client, err := c.getClient()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rsp, err := client.Read(ctx, req)
|
|
return rsp, unwrapError(err)
|
|
}
|
|
|
|
func (c *forwardingClient) list(ctx context.Context, req *pbstorage.ListRequest) (*pbstorage.ListResponse, error) {
|
|
client, err := c.getClient()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
rsp, err := client.List(ctx, req)
|
|
return rsp, unwrapError(err)
|
|
}
|
|
|
|
var (
|
|
errorToCode = map[error]codes.Code{
|
|
// Note: OutOfRange is used to represent GroupVersionMismatchError, but is
|
|
// handled specially in wrapError and unwrapError because it has extra details.
|
|
storage.ErrNotFound: codes.NotFound,
|
|
storage.ErrCASFailure: codes.Aborted,
|
|
storage.ErrWrongUid: codes.AlreadyExists,
|
|
storage.ErrInconsistent: codes.FailedPrecondition,
|
|
}
|
|
|
|
codeToError = func() map[codes.Code]error {
|
|
inverted := make(map[codes.Code]error, len(errorToCode))
|
|
for k, v := range errorToCode {
|
|
inverted[v] = k
|
|
}
|
|
return inverted
|
|
}()
|
|
)
|
|
|
|
// wrapError converts the given error to a gRPC status to send over the wire.
|
|
func wrapError(err error) error {
|
|
var gvm storage.GroupVersionMismatchError
|
|
if errors.As(err, &gvm) {
|
|
s, err := status.New(codes.OutOfRange, err.Error()).
|
|
WithDetails(&pbstorage.GroupVersionMismatchErrorDetails{
|
|
RequestedType: gvm.RequestedType,
|
|
Stored: gvm.Stored,
|
|
})
|
|
if err == nil {
|
|
return s.Err()
|
|
}
|
|
}
|
|
|
|
code, ok := errorToCode[err]
|
|
if !ok {
|
|
code = codes.Internal
|
|
}
|
|
return status.Error(code, err.Error())
|
|
}
|
|
|
|
// unwrapError converts the given gRPC status error back to a storage package
|
|
// error.
|
|
func unwrapError(err error) error {
|
|
s, ok := status.FromError(err)
|
|
if !ok {
|
|
return err
|
|
}
|
|
|
|
for _, d := range s.Details() {
|
|
if gvm, ok := d.(*pbstorage.GroupVersionMismatchErrorDetails); ok {
|
|
return storage.GroupVersionMismatchError{
|
|
RequestedType: gvm.RequestedType,
|
|
Stored: gvm.Stored,
|
|
}
|
|
}
|
|
}
|
|
|
|
unwrapped, ok := codeToError[s.Code()]
|
|
if !ok {
|
|
return err
|
|
}
|
|
return unwrapped
|
|
}
|