Raft storage backend (#16619)

This commit is contained in:
Dan Upton 2023-04-04 17:30:06 +01:00 committed by GitHub
parent afc8f978a2
commit 671d5825ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
41 changed files with 3044 additions and 149 deletions

View File

@ -10,13 +10,14 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
)
func TestBasicController(t *testing.T) {
@ -37,6 +38,7 @@ func TestBasicController(t *testing.T) {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
for i := 0; i < 200; i++ {
@ -93,6 +95,7 @@ func TestBasicController_Transform(t *testing.T) {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
go New(publisher, reconciler).Subscribe(&stream.SubscribeRequest{
@ -138,6 +141,7 @@ func TestBasicController_Retry(t *testing.T) {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
queueInitialized := make(chan *countingWorkQueue)
@ -383,6 +387,7 @@ func TestConfigEntrySubscriptions(t *testing.T) {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
for i := 0; i < 200; i++ {
@ -519,6 +524,7 @@ func TestDiscoveryChainController(t *testing.T) {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
}).State()
controller := New(publisher, reconciler)

View File

@ -145,6 +145,7 @@ func init() {
registerCommand(structs.PeeringTrustBundleWriteType, (*FSM).applyPeeringTrustBundleWrite)
registerCommand(structs.PeeringTrustBundleDeleteType, (*FSM).applyPeeringTrustBundleDelete)
registerCommand(structs.PeeringSecretsWriteType, (*FSM).applyPeeringSecretsWrite)
registerCommand(structs.ResourceOperationType, (*FSM).applyResourceOperation)
}
func (c *FSM) applyRegister(buf []byte, index uint64) interface{} {
@ -781,3 +782,7 @@ func (c *FSM) applyPeeringTrustBundleDelete(buf []byte, index uint64) interface{
}
return c.state.PeeringTrustBundleDelete(index, q)
}
func (f *FSM) applyResourceOperation(buf []byte, idx uint64) any {
return f.deps.StorageBackend.Apply(buf, idx)
}

View File

@ -5,6 +5,8 @@ package fsm
import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"
"reflect"
@ -19,11 +21,17 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/internal/storage"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/types"
)
@ -1501,6 +1509,49 @@ func TestFSM_ConfigEntry_DeleteCAS(t *testing.T) {
require.True(t, didDelete)
}
func TestFSM_Resources(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
handle := &testRaftHandle{}
storageBackend := newStorageBackend(t, handle)
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store { return state.NewStateStore(nil) },
StorageBackend: storageBackend,
})
handle.apply = func(msg []byte) (any, error) {
buf := append(
[]byte{uint8(structs.ResourceOperationType)},
msg...,
)
return fsm.Apply(makeLog(buf)), nil
}
resource, err := storageBackend.WriteCAS(context.Background(), &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: "test",
GroupVersion: "v1",
Kind: "foo",
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
PeerName: "local",
Namespace: "default",
},
Name: "bar",
Uid: "a",
},
})
require.NoError(t, err)
storedResource, err := storageBackend.Read(context.Background(), storage.EventualConsistency, resource.Id)
require.NoError(t, err)
prototest.AssertDeepEqual(t, resource, storedResource)
}
// This adapts another test by chunking the encoded data and then performing
// out-of-order applies of half the logs. It then snapshots, restores to a new
// FSM, and applies the rest. The goal is to verify that chunking snapshotting
@ -1513,8 +1564,14 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
var logOfLogs [][]*raft.Log
for i := 0; i < 10; i++ {
@ -1589,8 +1646,13 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
err = snap.Persist(sink)
require.NoError(t, err)
fsm2, err := New(nil, logger)
require.NoError(t, err)
fsm2 := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
err = fsm2.Restore(sink)
require.NoError(t, err)
@ -1717,3 +1779,27 @@ func TestFSM_Chunking_TermChange(t *testing.T) {
}
}
}
func newStorageBackend(t *testing.T, handle raftstorage.Handle) *raftstorage.Backend {
t.Helper()
backend, err := raftstorage.NewBackend(handle, testutil.Logger(t))
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
go backend.Run(ctx)
return backend
}
type testRaftHandle struct {
apply func(msg []byte) (any, error)
}
func (h *testRaftHandle) Apply(msg []byte) (any, error) { return h.apply(msg) }
func (testRaftHandle) IsLeader() bool { return true }
func (testRaftHandle) EnsureStrongConsistency(context.Context) error { return nil }
func (testRaftHandle) DialLeader() (*grpc.ClientConn, error) {
return nil, errors.New("DialLeader not implemented")
}

View File

@ -4,6 +4,7 @@
package fsm
import (
"errors"
"fmt"
"io"
"sync"
@ -18,6 +19,7 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/logging"
)
@ -72,7 +74,11 @@ func New(gc *state.TombstoneGC, logger hclog.Logger) (*FSM, error) {
newStateStore := func() *state.Store {
return state.NewStateStore(gc)
}
return NewFromDeps(Deps{Logger: logger, NewStateStore: newStateStore}), nil
return NewFromDeps(Deps{
Logger: logger,
NewStateStore: newStateStore,
StorageBackend: NullStorageBackend,
}), nil
}
// Deps are dependencies used to construct the FSM.
@ -86,6 +92,33 @@ type Deps struct {
NewStateStore func() *state.Store
Publisher *stream.EventPublisher
// StorageBackend is the storage backend used by the resource service, it
// manages its own state and has methods for handling Raft logs, snapshotting,
// and restoring snapshots.
StorageBackend StorageBackend
}
// StorageBackend contains the methods on the Raft resource storage backend that
// are used by the FSM. See the internal/storage/raft package docs for more info.
type StorageBackend interface {
Apply(buf []byte, idx uint64) any
Snapshot() (*raftstorage.Snapshot, error)
Restore() (*raftstorage.Restoration, error)
}
// NullStorageBackend can be used as the StorageBackend dependency in tests
// that won't exercize resource storage or snapshotting.
var NullStorageBackend StorageBackend = nullStorageBackend{}
type nullStorageBackend struct{}
func (nullStorageBackend) Apply([]byte, uint64) any { return errors.New("NullStorageBackend in use") }
func (nullStorageBackend) Snapshot() (*raftstorage.Snapshot, error) {
return nil, errors.New("NullStorageBackend in use")
}
func (nullStorageBackend) Restore() (*raftstorage.Restoration, error) {
return nil, errors.New("NullStorageBackend in use")
}
// NewFromDeps creates a new FSM from its dependencies.
@ -93,6 +126,9 @@ func NewFromDeps(deps Deps) *FSM {
if deps.Logger == nil {
deps.Logger = hclog.New(&hclog.LoggerOptions{})
}
if deps.StorageBackend == nil {
panic("StorageBackend is required")
}
fsm := &FSM{
deps: deps,
@ -172,9 +208,15 @@ func (c *FSM) Snapshot() (raft.FSMSnapshot, error) {
return nil, err
}
storageSnapshot, err := c.deps.StorageBackend.Snapshot()
if err != nil {
return nil, err
}
return &snapshot{
state: c.state.Snapshot(),
chunkState: chunkState,
storageSnapshot: storageSnapshot,
}, nil
}
@ -189,6 +231,12 @@ func (c *FSM) Restore(old io.ReadCloser) error {
restore := stateNew.Restore()
defer restore.Abort()
storageRestoration, err := c.deps.StorageBackend.Restore()
if err != nil {
return err
}
defer storageRestoration.Abort()
handler := func(header *SnapshotHeader, msg structs.MessageType, dec *codec.Decoder) error {
switch {
case msg == structs.ChunkingStateType:
@ -201,6 +249,14 @@ func (c *FSM) Restore(old io.ReadCloser) error {
if err := c.chunker.RestoreState(chunkState); err != nil {
return err
}
case msg == structs.ResourceOperationType:
var b []byte
if err := dec.Decode(&b); err != nil {
return err
}
if err := storageRestoration.Apply(b); err != nil {
return err
}
case restorers[msg] != nil:
fn := restorers[msg]
if err := fn(header, restore, dec); err != nil {
@ -222,6 +278,7 @@ func (c *FSM) Restore(old io.ReadCloser) error {
if err := restore.Commit(); err != nil {
return err
}
storageRestoration.Commit()
// External code might be calling State(), so we need to synchronize
// here to make sure we swap in the new state store atomically.

View File

@ -9,11 +9,13 @@ import (
"github.com/armon/go-metrics"
"github.com/armon/go-metrics/prometheus"
"github.com/hashicorp/go-raftchunking"
"github.com/hashicorp/raft"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-raftchunking"
"github.com/hashicorp/raft"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
)
var SnapshotSummaries = []prometheus.SummaryDefinition{
@ -29,6 +31,7 @@ var SnapshotSummaries = []prometheus.SummaryDefinition{
type snapshot struct {
state *state.Snapshot
chunkState *raftchunking.State
storageSnapshot *raftstorage.Snapshot
}
// SnapshotHeader is the first entry in our snapshot

View File

@ -7,10 +7,11 @@ import (
"fmt"
"net"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/hashicorp/raft"
"github.com/mitchellh/mapstructure"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/private/pbpeering"
@ -104,6 +105,9 @@ func persistOSS(s *snapshot, sink raft.SnapshotSink, encoder *codec.Encoder) err
if err := s.persistPeeringSecrets(sink, encoder); err != nil {
return err
}
if err := s.persistResources(sink, encoder); err != nil {
return err
}
return nil
}
@ -609,6 +613,25 @@ func (s *snapshot) persistPeeringSecrets(sink raft.SnapshotSink, encoder *codec.
return nil
}
func (s *snapshot) persistResources(sink raft.SnapshotSink, encoder *codec.Encoder) error {
for {
v, err := s.storageSnapshot.Next()
if err != nil {
return err
}
if v == nil {
return nil
}
if _, err := sink.Write([]byte{byte(structs.ResourceOperationType)}); err != nil {
return err
}
if err := encoder.Encode(v); err != nil {
return err
}
}
}
func restoreRegistration(header *SnapshotHeader, restore *state.Restore, decoder *codec.Decoder) error {
var req structs.RegisterRequest
if err := decoder.Decode(&req); err != nil {

View File

@ -10,18 +10,30 @@ import (
"bytes"
"testing"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul-net-rpc/go-msgpack/codec"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestRestoreFromEnterprise(t *testing.T) {
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
handle := &testRaftHandle{}
storageBackend := newStorageBackend(t, handle)
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: storageBackend,
})
// To verify if a proper message is displayed when Consul OSS tries to
// unsuccessfully restore entries from a Consul Ent snapshot.
buf := bytes.NewBuffer(nil)

View File

@ -5,6 +5,7 @@ package fsm
import (
"bytes"
"context"
"net"
"testing"
"time"
@ -18,7 +19,9 @@ import (
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/lib/stringslice"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/pbpeering"
"github.com/hashicorp/consul/proto/private/prototest"
"github.com/hashicorp/consul/sdk/testutil"
@ -28,8 +31,18 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
t.Parallel()
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
handle := &testRaftHandle{}
storageBackend := newStorageBackend(t, handle)
handle.apply = func(buf []byte) (any, error) { return storageBackend.Apply(buf, 123), nil }
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: storageBackend,
})
// Add some state
node1 := &structs.Node{
@ -531,6 +544,25 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, err)
require.Equal(t, vip, "240.0.0.3")
// Resources
resource, err := storageBackend.WriteCAS(context.Background(), &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: "test",
GroupVersion: "v1",
Kind: "foo",
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
PeerName: "local",
Namespace: "default",
},
Name: "bar",
Uid: "a",
},
})
require.NoError(t, err)
// Snapshot
snap, err := fsm.Snapshot()
require.NoError(t, err)
@ -569,8 +601,15 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, encoder.Encode(&token2))
// Try to restore on a new FSM
fsm2, err := New(nil, logger)
require.NoError(t, err)
storageBackend2 := newStorageBackend(t, nil)
fsm2 := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: storageBackend2,
})
// Do a restore
require.NoError(t, fsm2.Restore(sink))
@ -856,6 +895,11 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.Len(t, ptbRestored.RootPEMs, 1)
require.Equal(t, "qux certificate bundle", ptbRestored.RootPEMs[0])
// Verify resources are restored.
resourceRestored, err := storageBackend2.Read(context.Background(), storage.EventualConsistency, resource.Id)
require.NoError(t, err)
prototest.AssertDeepEqual(t, resource, resourceRestored)
// Snapshot
snap, err = fsm2.Snapshot()
require.NoError(t, err)
@ -881,8 +925,14 @@ func TestFSM_BadRestore_OSS(t *testing.T) {
t.Parallel()
// Create an FSM with some state.
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
fsm.state.EnsureNode(1, &structs.Node{Node: "foo", Address: "127.0.0.1"})
abandonCh := fsm.state.AbandonCh()
@ -912,8 +962,14 @@ func TestFSM_BadSnapshot_NilCAConfig(t *testing.T) {
// Create an FSM with no config entry.
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
// Snapshot
snap, err := fsm.Snapshot()
@ -926,8 +982,13 @@ func TestFSM_BadSnapshot_NilCAConfig(t *testing.T) {
require.NoError(t, snap.Persist(sink))
// Try to restore on a new FSM
fsm2, err := New(nil, logger)
require.NoError(t, err)
fsm2 := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
// Do a restore
require.NoError(t, fsm2.Restore(sink))
@ -963,8 +1024,14 @@ func Test_restoreServiceVirtualIP(t *testing.T) {
dec := codec.NewDecoder(buf, structs.MsgpackHandle)
logger := testutil.Logger(t)
fsm, err := New(nil, logger)
require.NoError(t, err)
fsm := NewFromDeps(Deps{
Logger: logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: newStorageBackend(t, nil),
})
restore := fsm.State().Restore()

View File

@ -11,14 +11,15 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/controller"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
func TestBoundAPIGatewayBindRoute(t *testing.T) {
@ -3557,6 +3558,7 @@ func TestAPIGatewayController(t *testing.T) {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
})
var index uint64
@ -3676,6 +3678,7 @@ func TestNewAPIGatewayController(t *testing.T) {
return state.NewStateStoreWithEventPublisher(nil, publisher)
},
Publisher: publisher,
StorageBackend: fsm.NullStorageBackend,
})
updater := &Updater{

View File

@ -33,6 +33,7 @@ func TestHealthCheckRace(t *testing.T) {
NewStateStore: func() *state.Store {
return state.NewStateStore(nil)
},
StorageBackend: consulfsm.NullStorageBackend,
})
state := fsm.State()

View File

@ -0,0 +1,80 @@
package consul
import (
"context"
"errors"
"net"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/hashicorp/consul/agent/pool"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/internal/storage/raft"
)
// raftHandle is the glue layer between the Raft resource storage backend and
// the exising Raft logic in Server.
type raftHandle struct{ s *Server }
func (h *raftHandle) IsLeader() bool {
return h.s.IsLeader()
}
func (h *raftHandle) EnsureStrongConsistency(ctx context.Context) error {
return h.s.consistentReadWithContext(ctx)
}
func (h *raftHandle) Apply(msg []byte) (any, error) {
return h.s.raftApplyEncoded(
structs.ResourceOperationType,
append([]byte{uint8(structs.ResourceOperationType)}, msg...),
)
}
func (h *raftHandle) DialLeader() (*grpc.ClientConn, error) {
leaderAddr, _ := h.s.raft.LeaderWithID()
if leaderAddr == "" {
return nil, errors.New("leader unknown")
}
dc := h.s.config.Datacenter
tlsCfg := h.s.tlsConfigurator
return grpc.Dial(string(leaderAddr),
// TLS is handled in the dialer below.
grpc.WithTransportCredentials(insecure.NewCredentials()),
// This dialer negotiates a connection on the multiplexed server port using
// our type-byte prefix scheme (see Server.handleConn for other side of it).
grpc.WithContextDialer(func(ctx context.Context, addr string) (net.Conn, error) {
var d net.Dialer
conn, err := d.DialContext(ctx, "tcp", addr)
if err != nil {
return nil, err
}
if tlsCfg.UseTLS(dc) {
if _, err := conn.Write([]byte{byte(pool.RPCTLS)}); err != nil {
conn.Close()
return nil, err
}
tc, err := tlsCfg.OutgoingRPCWrapper()(dc, conn)
if err != nil {
conn.Close()
return nil, err
}
conn = tc
}
if _, err := conn.Write([]byte{byte(pool.RPCRaftForwarding)}); err != nil {
conn.Close()
return nil, err
}
return conn, nil
}),
)
}
var _ raft.Handle = (*raftHandle)(nil)

View File

@ -246,6 +246,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
case pool.RPCGRPC:
s.grpcHandler.Handle(conn)
case pool.RPCRaftForwarding:
s.handleRaftForwarding(conn)
default:
if !s.handleEnterpriseRPCConn(typ, conn, isTLS) {
s.rpcLogger().Error("unrecognized RPC byte",
@ -314,6 +317,9 @@ func (s *Server) handleNativeTLS(conn net.Conn) {
case pool.ALPN_RPCGRPC:
s.grpcHandler.Handle(tlsConn)
case pool.ALPN_RPCRaftForwarding:
s.handleRaftForwarding(tlsConn)
case pool.ALPN_WANGossipPacket:
if err := s.handleALPN_WANGossipPacketStream(tlsConn); err != nil && err != io.EOF {
s.rpcLogger().Error(
@ -496,6 +502,19 @@ func (s *Server) handleRaftRPC(conn net.Conn) {
s.raftLayer.Handoff(conn)
}
func (s *Server) handleRaftForwarding(conn net.Conn) {
if tlsConn, ok := conn.(*tls.Conn); ok {
err := s.tlsConfigurator.AuthorizeServerConn(s.config.Datacenter, tlsConn)
if err != nil {
s.rpcLogger().Warn(err.Error(), "from", conn.RemoteAddr(), "operation", "raft forwarding")
conn.Close()
return
}
}
s.raftStorageBackend.HandleConnection(conn)
}
func (s *Server) handleALPN_WANGossipPacketStream(conn net.Conn) error {
defer conn.Close()
@ -903,21 +922,19 @@ func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{},
}
// raftApplyMsgpack encodes the msg using msgpack and calls raft.Apply. See
// raftApplyWithEncoder.
// raftApplyEncoded.
func (s *Server) raftApplyMsgpack(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyWithEncoder(t, msg, structs.Encode)
}
// raftApplyProtobuf encodes the msg using protobuf and calls raft.Apply. See
// raftApplyWithEncoder.
// raftApplyEncoded.
func (s *Server) raftApplyProtobuf(t structs.MessageType, msg interface{}) (interface{}, error) {
return s.raftApplyWithEncoder(t, msg, structs.EncodeProtoInterface)
}
// raftApplyWithEncoder encodes a message, and then calls raft.Apply with the
// encoded message. Returns the FSM response along with any errors. If the
// FSM.Apply response is an error it will be returned as the error return
// value with a nil response.
// encoded message. See raftApplyEncoded.
func (s *Server) raftApplyWithEncoder(
t structs.MessageType,
msg interface{},
@ -930,7 +947,13 @@ func (s *Server) raftApplyWithEncoder(
if err != nil {
return nil, fmt.Errorf("Failed to encode request: %v", err)
}
return s.raftApplyEncoded(t, buf)
}
// raftApplyEncoded calls raft.Apply with the encoded message. Returns the FSM
// response along with any errors. If the FSM.Apply response is an error it will
// be returned as the error return value with a nil response.
func (s *Server) raftApplyEncoded(t structs.MessageType, buf []byte) (any, error) {
// Warn if the command is very large
if n := len(buf); n > raftWarnSize {
s.rpcLogger().Warn("Attempting to apply large raft entry", "size_in_bytes", n)
@ -1177,37 +1200,53 @@ func (s *Server) setQueryMeta(m blockingQueryResponseMeta, token string) {
}
}
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
func (s *Server) consistentReadWithContext(ctx context.Context) error {
defer metrics.MeasureSince([]string{"rpc", "consistentRead"}, time.Now())
future := s.raft.VerifyLeader()
if err := future.Error(); err != nil {
return err // fail fast if leader verification fails
}
// poll consistent read readiness, wait for up to RPCHoldTimeout milliseconds
if s.isReadyForConsistentReads() {
return nil
}
jitter := lib.RandomStagger(s.config.RPCHoldTimeout / structs.JitterFraction)
deadline := time.Now().Add(s.config.RPCHoldTimeout)
for time.Now().Before(deadline) {
// Poll until the context reaches its deadline, or for RPCHoldTimeout if the
// context has no deadline.
pollFor := s.config.RPCHoldTimeout
if deadline, ok := ctx.Deadline(); ok {
pollFor = time.Until(deadline)
}
interval := pollFor / structs.JitterFraction
if interval <= 0 {
return structs.ErrNotReadyForConsistentReads
}
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-time.After(jitter):
// Drop through and check before we loop again.
case <-ticker.C:
if s.isReadyForConsistentReads() {
return nil
}
case <-ctx.Done():
return structs.ErrNotReadyForConsistentReads
case <-s.shutdownCh:
return fmt.Errorf("shutdown waiting for leader")
}
if s.isReadyForConsistentReads() {
return nil
}
}
}
return structs.ErrNotReadyForConsistentReads
// consistentRead is used to ensure we do not perform a stale
// read. This is done by verifying leadership before the read.
func (s *Server) consistentRead() error {
ctx, cancel := context.WithTimeout(context.Background(), s.config.RPCHoldTimeout)
defer cancel()
return s.consistentReadWithContext(ctx)
}
// rpcQueryTimeout calculates the timeout for the query, ensures it is

View File

@ -65,6 +65,8 @@ import (
"github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/internal/storage"
raftstorage "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/routine"
"github.com/hashicorp/consul/logging"
@ -251,6 +253,9 @@ type Server struct {
// transition notifications from the Raft layer.
raftNotifyCh <-chan bool
// raftStorageBackend is the Raft-backed storage backend for resources.
raftStorageBackend *raftstorage.Backend
// reconcileCh is used to pass events from the serf handler
// into the leader manager, so that the strong state can be
// updated
@ -450,14 +455,6 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
loggers := newLoggerStore(serverLogger)
fsmDeps := fsm.Deps{
Logger: flat.Logger,
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(gc, flat.EventPublisher)
},
Publisher: flat.EventPublisher,
}
if incomingRPCLimiter == nil {
incomingRPCLimiter = rpcRate.NullRequestLimitsHandler()
}
@ -484,14 +481,27 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
shutdownCh: shutdownCh,
leaderRoutineManager: routine.NewManager(logger.Named(logging.Leader)),
aclAuthMethodValidators: authmethod.NewCache(),
fsm: fsm.NewFromDeps(fsmDeps),
publisher: flat.EventPublisher,
incomingRPCLimiter: incomingRPCLimiter,
routineManager: routine.NewManager(logger.Named(logging.ConsulServer)),
}
incomingRPCLimiter.Register(s)
s.raftStorageBackend, err = raftstorage.NewBackend(&raftHandle{s}, logger.Named("raft-storage-backend"))
if err != nil {
return nil, fmt.Errorf("failed to create storage backend: %w", err)
}
go s.raftStorageBackend.Run(&lib.StopChannelContext{StopCh: shutdownCh})
s.fsm = fsm.NewFromDeps(fsm.Deps{
Logger: flat.Logger,
NewStateStore: func() *state.Store {
return state.NewStateStoreWithEventPublisher(gc, flat.EventPublisher)
},
Publisher: flat.EventPublisher,
StorageBackend: s.raftStorageBackend,
})
s.hcpManager = hcp.NewManager(hcp.ManagerConfig{
Client: flat.HCP.Client,
StatusFn: s.hcpServerStatus(flat),
@ -738,7 +748,7 @@ func NewServer(config *Config, flat Deps, externalGRPCServer *grpc.Server, incom
go s.overviewManager.Run(&lib.StopChannelContext{StopCh: s.shutdownCh})
// Initialize external gRPC server
s.setupExternalGRPC(config, logger)
s.setupExternalGRPC(config, s.raftStorageBackend, logger)
// Initialize internal gRPC server.
//
@ -1038,11 +1048,22 @@ func (s *Server) setupRaft() error {
return fmt.Errorf("recovery failed to parse peers.json: %v", err)
}
// It's safe to pass nil as the handle argument here because we won't call
// the backend's data access methods (only Apply, Snapshot, and Restore).
backend, err := raftstorage.NewBackend(nil, hclog.NewNullLogger())
if err != nil {
return fmt.Errorf("recovery failed: %w", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go backend.Run(ctx)
tmpFsm := fsm.NewFromDeps(fsm.Deps{
Logger: s.logger,
NewStateStore: func() *state.Store {
return state.NewStateStore(s.tombstoneGC)
},
StorageBackend: backend,
})
if err := raft.RecoverCluster(s.config.RaftConfig, tmpFsm,
log, stable, snap, trans, configuration); err != nil {
@ -1174,7 +1195,7 @@ func (s *Server) setupRPC() error {
}
// Initialize and register services on external gRPC server.
func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
func (s *Server) setupExternalGRPC(config *Config, backend storage.Backend, logger hclog.Logger) {
s.externalACLServer = aclgrpc.NewServer(aclgrpc.Config{
ACLsEnabled: s.config.ACLsEnabled,
@ -1240,7 +1261,9 @@ func (s *Server) setupExternalGRPC(config *Config, logger hclog.Logger) {
})
s.peerStreamServer.Register(s.externalGRPCServer)
resource.NewServer(resource.Config{}).Register(s.externalGRPCServer)
resource.NewServer(resource.Config{
Backend: backend,
}).Register(s.externalGRPCServer)
}
// Shutdown is used to shutdown the server
@ -1861,6 +1884,7 @@ func (s *Server) trackLeaderChanges() {
s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr))
s.peeringBackend.SetLeaderAddress(string(leaderObs.LeaderAddr))
s.raftStorageBackend.LeaderChanged()
// Trigger sending an update to HCP status
s.hcpManager.SendUpdate()

View File

@ -15,7 +15,7 @@ func (s *Server) List(ctx context.Context, req *pbresource.ListRequest) (*pbreso
return nil, err
}
resources, err := s.backend.List(ctx, readConsistencyFrom(ctx), storage.UnversionedTypeFrom(req.Type), req.Tenancy, req.NamePrefix)
resources, err := s.Backend.List(ctx, readConsistencyFrom(ctx), storage.UnversionedTypeFrom(req.Type), req.Tenancy, req.NamePrefix)
if err != nil {
return nil, err
}

View File

@ -70,7 +70,7 @@ func TestList_Many(t *testing.T) {
},
Version: "",
}
server.backend.WriteCAS(tc.ctx, r)
server.Backend.WriteCAS(tc.ctx, r)
}
rsp, err := client.List(tc.ctx, &pbresource.ListRequest{
@ -90,7 +90,7 @@ func TestList_GroupVersionMismatch(t *testing.T) {
server := testServer(t)
client := testClient(t, server)
server.registry.Register(resource.Registration{Type: typev1})
server.backend.WriteCAS(tc.ctx, &pbresource.Resource{Id: id2})
server.Backend.WriteCAS(tc.ctx, &pbresource.Resource{Id: id2})
rsp, err := client.List(tc.ctx, &pbresource.ListRequest{
Type: typev1,
@ -110,7 +110,7 @@ func TestList_VerifyReadConsistencyArg(t *testing.T) {
mockBackend := NewMockBackend(t)
server := NewServer(Config{
registry: resource.NewRegistry(),
backend: mockBackend,
Backend: mockBackend,
})
server.registry.Register(resource.Registration{Type: typev1})
resource1 := &pbresource.Resource{Id: id1, Version: "1"}

View File

@ -20,7 +20,7 @@ func (s *Server) Read(ctx context.Context, req *pbresource.ReadRequest) (*pbreso
return nil, err
}
resource, err := s.backend.Read(ctx, readConsistencyFrom(ctx), req.Id)
resource, err := s.Backend.Read(ctx, readConsistencyFrom(ctx), req.Id)
if err != nil {
if errors.Is(err, storage.ErrNotFound) {
return nil, status.Error(codes.NotFound, err.Error())

View File

@ -53,7 +53,7 @@ func TestRead_GroupVersionMismatch(t *testing.T) {
client := testClient(t, server)
resource1 := &pbresource.Resource{Id: id1, Version: ""}
_, err := server.backend.WriteCAS(tc.ctx, resource1)
_, err := server.Backend.WriteCAS(tc.ctx, resource1)
require.NoError(t, err)
_, err = client.Read(tc.ctx, &pbresource.ReadRequest{Id: id2})
@ -71,7 +71,7 @@ func TestRead_Success(t *testing.T) {
server.registry.Register(resource.Registration{Type: typev1})
client := testClient(t, server)
resource1 := &pbresource.Resource{Id: id1, Version: ""}
resource1, err := server.backend.WriteCAS(tc.ctx, resource1)
resource1, err := server.Backend.WriteCAS(tc.ctx, resource1)
require.NoError(t, err)
rsp, err := client.Read(tc.ctx, &pbresource.ReadRequest{Id: id1})
@ -88,7 +88,7 @@ func TestRead_VerifyReadConsistencyArg(t *testing.T) {
mockBackend := NewMockBackend(t)
server := NewServer(Config{
registry: resource.NewRegistry(),
backend: mockBackend,
Backend: mockBackend,
})
server.registry.Register(resource.Registration{Type: typev1})
resource1 := &pbresource.Resource{Id: id1, Version: "1"}

View File

@ -22,7 +22,9 @@ type Server struct {
type Config struct {
registry Registry
backend Backend
// Backend is the storage backend that will be used for resource persistence.
Backend Backend
}
//go:generate mockery --name Registry --inpackage

View File

@ -17,7 +17,7 @@ import (
)
func TestWrite_TODO(t *testing.T) {
server := NewServer(Config{})
server := testServer(t)
client := testClient(t, server)
resp, err := client.Write(context.Background(), &pbresource.WriteRequest{})
require.NoError(t, err)
@ -25,7 +25,7 @@ func TestWrite_TODO(t *testing.T) {
}
func TestWriteStatus_TODO(t *testing.T) {
server := NewServer(Config{})
server := testServer(t)
client := testClient(t, server)
resp, err := client.WriteStatus(context.Background(), &pbresource.WriteStatusRequest{})
require.NoError(t, err)
@ -33,7 +33,7 @@ func TestWriteStatus_TODO(t *testing.T) {
}
func TestDelete_TODO(t *testing.T) {
server := NewServer(Config{})
server := testServer(t)
client := testClient(t, server)
resp, err := client.Delete(context.Background(), &pbresource.DeleteRequest{})
require.NoError(t, err)
@ -48,7 +48,7 @@ func testServer(t *testing.T) *Server {
go backend.Run(testContext(t))
registry := resource.NewRegistry()
return NewServer(Config{registry: registry, backend: backend})
return NewServer(Config{registry: registry, Backend: backend})
}
func testClient(t *testing.T, server *Server) pbresource.ResourceServiceClient {

View File

@ -15,7 +15,7 @@ func (s *Server) WatchList(req *pbresource.WatchListRequest, stream pbresource.R
}
unversionedType := storage.UnversionedTypeFrom(req.Type)
watch, err := s.backend.WatchList(
watch, err := s.Backend.WatchList(
stream.Context(),
unversionedType,
req.Tenancy,

View File

@ -55,7 +55,7 @@ func TestWatchList_GroupVersionMatches(t *testing.T) {
rspCh := handleResourceStream(t, stream)
// insert and verify upsert event received
r1, err := server.backend.WriteCAS(ctx, resourcev1)
r1, err := server.Backend.WriteCAS(ctx, resourcev1)
require.NoError(t, err)
rsp := mustGetResource(t, rspCh)
require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, rsp.Operation)
@ -63,14 +63,14 @@ func TestWatchList_GroupVersionMatches(t *testing.T) {
// update and verify upsert event received
r2 := clone(r1)
r2, err = server.backend.WriteCAS(ctx, r2)
r2, err = server.Backend.WriteCAS(ctx, r2)
require.NoError(t, err)
rsp = mustGetResource(t, rspCh)
require.Equal(t, pbresource.WatchEvent_OPERATION_UPSERT, rsp.Operation)
prototest.AssertDeepEqual(t, r2, rsp.Resource)
// delete and verify delete event received
err = server.backend.DeleteCAS(ctx, r2.Id, r2.Version)
err = server.Backend.DeleteCAS(ctx, r2.Id, r2.Version)
require.NoError(t, err)
rsp = mustGetResource(t, rspCh)
require.Equal(t, pbresource.WatchEvent_OPERATION_DELETE, rsp.Operation)
@ -97,16 +97,16 @@ func TestWatchList_GroupVersionMismatch(t *testing.T) {
rspCh := handleResourceStream(t, stream)
// insert
r1, err := server.backend.WriteCAS(ctx, resourcev1)
r1, err := server.Backend.WriteCAS(ctx, resourcev1)
require.NoError(t, err)
// update
r2 := clone(r1)
r2, err = server.backend.WriteCAS(ctx, r2)
r2, err = server.Backend.WriteCAS(ctx, r2)
require.NoError(t, err)
// delete
err = server.backend.DeleteCAS(ctx, r2.Id, r2.Version)
err = server.Backend.DeleteCAS(ctx, r2.Id, r2.Version)
require.NoError(t, err)
// verify no events received

View File

@ -14,9 +14,10 @@ import (
middleware "github.com/grpc-ecosystem/go-grpc-middleware"
recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
"github.com/hashicorp/consul/agent/consul/rate"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"github.com/hashicorp/consul/agent/consul/rate"
)
var (
@ -60,15 +61,14 @@ func NewHandler(logger Logger, addr net.Addr, register func(server *grpc.Server)
srv := grpc.NewServer(opts...)
register(srv)
lis := &chanListener{addr: addr, conns: make(chan net.Conn), done: make(chan struct{})}
return &Handler{srv: srv, listener: lis}
return &Handler{srv: srv, listener: NewListener(addr)}
}
// Handler implements a handler for the rpc server listener, and the
// agent.Component interface for managing the lifecycle of the grpc.Server.
type Handler struct {
srv *grpc.Server
listener *chanListener
listener *Listener
}
// Handle the connection by sending it to a channel for the grpc.Server to receive.
@ -85,38 +85,6 @@ func (h *Handler) Shutdown() error {
return nil
}
// chanListener implements net.Listener for grpc.Server.
type chanListener struct {
conns chan net.Conn
addr net.Addr
done chan struct{}
}
// Accept blocks until a connection is received from Handle, and then returns the
// connection. Accept implements part of the net.Listener interface for grpc.Server.
func (l *chanListener) Accept() (net.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.done:
return nil, &net.OpError{
Op: "accept",
Net: l.addr.Network(),
Addr: l.addr,
Err: fmt.Errorf("listener closed"),
}
}
}
func (l *chanListener) Addr() net.Addr {
return l.addr
}
func (l *chanListener) Close() error {
close(l.done)
return nil
}
// NoOpHandler implements the same methods as Handler, but performs no handling.
// It may be used in place of Handler to disable the grpc server.
type NoOpHandler struct {

View File

@ -0,0 +1,61 @@
package internal
import (
"fmt"
"net"
)
// Listener implements the net.Listener interface and allows you to manually
// pass connections to it. This is useful when you need to accept connections
// and do something with them yourself first (e.g. handling our multiplexing
// scheme) before giving them to the gRPC server.
type Listener struct {
addr net.Addr
conns chan net.Conn
done chan struct{}
}
var _ net.Listener = (*Listener)(nil)
// NewListener creates a Listener with the given address.
func NewListener(addr net.Addr) *Listener {
return &Listener{
addr: addr,
conns: make(chan net.Conn),
done: make(chan struct{}),
}
}
// Handle makes the given connection available to Accept.
func (l *Listener) Handle(conn net.Conn) {
select {
case l.conns <- conn:
case <-l.done:
_ = conn.Close()
}
}
// Accept a connection.
func (l *Listener) Accept() (net.Conn, error) {
select {
case c := <-l.conns:
return c, nil
case <-l.done:
return nil, &net.OpError{
Op: "accept",
Net: l.addr.Network(),
Addr: l.addr,
Err: fmt.Errorf("listener closed"),
}
}
}
// Addr returns the listener's address.
func (l *Listener) Addr() net.Addr { return l.addr }
// Close the listener.
func (l *Listener) Close() error {
close(l.done)
return nil
}

View File

@ -22,6 +22,10 @@ var rpcRateLimitSpecs = map[string]rate.OperationSpec{
"/hashicorp.consul.internal.peering.PeeringService/TrustBundleRead": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryPeering},
"/hashicorp.consul.internal.peerstream.PeerStreamService/ExchangeSecret": {Type: rate.OperationTypeWrite, Category: rate.OperationCategoryPeerStream},
"/hashicorp.consul.internal.peerstream.PeerStreamService/StreamResources": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryPeerStream},
"/hashicorp.consul.internal.storage.raft.ForwardingService/Delete": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
"/hashicorp.consul.internal.storage.raft.ForwardingService/List": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
"/hashicorp.consul.internal.storage.raft.ForwardingService/Read": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
"/hashicorp.consul.internal.storage.raft.ForwardingService/Write": {Type: rate.OperationTypeExempt, Category: rate.OperationCategoryResource},
"/hashicorp.consul.resource.ResourceService/Delete": {Type: rate.OperationTypeWrite, Category: rate.OperationCategoryResource},
"/hashicorp.consul.resource.ResourceService/List": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryResource},
"/hashicorp.consul.resource.ResourceService/Read": {Type: rate.OperationTypeRead, Category: rate.OperationCategoryResource},

View File

@ -46,11 +46,12 @@ const (
// ever is.
RPCTLSInsecure RPCType = 7
RPCGRPC RPCType = 8
RPCRaftForwarding RPCType = 9
// RPCMaxTypeValue is the maximum rpc type byte value currently used for the
// various protocols riding over our "rpc" port.
//
// Currently our 0-8 values are mutually exclusive with any valid first byte
// Currently our 0-9 values are mutually exclusive with any valid first byte
// of a TLS header. The first TLS header byte will begin with a TLS content
// type and the values 0-19 are all explicitly unassigned and marked as
// requiring coordination. RFC 7983 does the marking and goes into some
@ -62,7 +63,7 @@ const (
//
// NOTE: if you add new RPCTypes beyond this value, you must similarly bump
// this value.
RPCMaxTypeValue = 8
RPCMaxTypeValue = 9
)
const (
@ -73,6 +74,7 @@ const (
ALPN_RPCSnapshot = "consul/rpc-snapshot" // RPCSnapshot
ALPN_RPCGossip = "consul/rpc-gossip" // RPCGossip
ALPN_RPCGRPC = "consul/rpc-grpc" // RPCGRPC
ALPN_RPCRaftForwarding = "consul/raft-forwarding" // RPCRaftForwarding
// wan federation additions
ALPN_WANGossipPacket = "consul/wan-gossip/packet"
ALPN_WANGossipStream = "consul/wan-gossip/stream"
@ -85,6 +87,7 @@ var RPCNextProtos = []string{
ALPN_RPCSnapshot,
ALPN_RPCGossip,
ALPN_RPCGRPC,
ALPN_RPCRaftForwarding,
ALPN_WANGossipPacket,
ALPN_WANGossipStream,
}

View File

@ -87,6 +87,7 @@ const (
PeeringTrustBundleDeleteType = 39
PeeringSecretsWriteType = 40
RaftLogVerifierCheckpoint = 41 // Only used for log verifier, no-op on FSM.
ResourceOperationType = 42
)
const (
@ -154,6 +155,7 @@ var requestTypeStrings = map[MessageType]string{
PeeringTrustBundleDeleteType: "PeeringTrustBundleDelete",
PeeringSecretsWriteType: "PeeringSecret",
RaftLogVerifierCheckpoint: "RaftLogVerifierCheckpoint",
ResourceOperationType: "Resource",
}
const (

View File

@ -7,11 +7,12 @@ import (
"strings"
"testing"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/mitchellh/cli"
"github.com/stretchr/testify/require"
)
func TestIntentionListCommand_noTabs(t *testing.T) {

View File

@ -135,7 +135,7 @@ func testRead(t *testing.T, opts TestOptions) {
var e storage.GroupVersionMismatchError
if errors.As(err, &e) {
require.Equal(t, id.Type, e.RequestedType)
prototest.AssertDeepEqual(t, id.Type, e.RequestedType)
prototest.AssertDeepEqual(t, res, e.Stored, ignoreVersion)
} else {
t.Fatalf("expected storage.GroupVersionMismatchError, got: %T", err)

View File

@ -0,0 +1,78 @@
package inmem
import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/proto-public/pbresource"
)
// Snapshot obtains a point-in-time snapshot of the store that can later be
// persisted and restored later.
func (s *Store) Snapshot() (*Snapshot, error) {
tx := s.txn(false)
iter, err := tx.Get(tableNameResources, indexNameID)
if err != nil {
return nil, err
}
return &Snapshot{iter: iter}, nil
}
// Snapshot is a point-in-time snapshot of a store.
type Snapshot struct {
iter memdb.ResultIterator
}
// Next returns the next resource in the snapshot. nil will be returned when
// the end of the snapshot has been reached.
func (s *Snapshot) Next() *pbresource.Resource {
v := s.iter.Next()
if v == nil {
return nil
}
return v.(*pbresource.Resource)
}
// Restore starts the process of restoring a snapshot.
//
// Callers *must* call Abort or Commit when done, to free resources.
func (s *Store) Restore() (*Restoration, error) {
db, err := newDB()
if err != nil {
return nil, err
}
return &Restoration{
s: s,
db: db,
tx: db.Txn(true),
}, nil
}
// Restoration is a handle that can be used to restore a snapshot.
type Restoration struct {
s *Store
db *memdb.MemDB
tx *memdb.Txn
}
// Apply the given resource to the store.
func (r *Restoration) Apply(res *pbresource.Resource) error {
return r.tx.Insert(tableNameResources, res)
}
// Commit the restoration. Replaces the in-memory database wholesale and closes
// any watches.
func (r *Restoration) Commit() {
r.tx.Commit()
r.s.mu.Lock()
defer r.s.mu.Unlock()
r.s.db = r.db
r.s.pub.RefreshTopic(eventTopic)
}
// Abort the restoration. It's safe to always call this in a defer statement
// because aborting a committed restoration is a no-op.
func (r *Restoration) Abort() { r.tx.Abort() }

View File

@ -0,0 +1,96 @@
package inmem_test
import (
"context"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/inmem"
"github.com/hashicorp/consul/proto-public/pbresource"
"github.com/hashicorp/consul/proto/private/prototest"
)
func TestSnapshotRestore(t *testing.T) {
oldStore, err := inmem.NewStore()
require.NoError(t, err)
a := &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: "mesh",
GroupVersion: "v1",
Kind: "service",
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
PeerName: "local",
Namespace: "default",
},
Name: "billing",
Uid: "a",
},
Version: "1",
}
require.NoError(t, oldStore.WriteCAS(a, ""))
newStore, err := inmem.NewStore()
require.NoError(t, err)
// Write something to the new store to make sure it gets blown away.
b := &pbresource.Resource{
Id: &pbresource.ID{
Type: &pbresource.Type{
Group: "mesh",
GroupVersion: "v1",
Kind: "service",
},
Tenancy: &pbresource.Tenancy{
Partition: "default",
PeerName: "local",
Namespace: "default",
},
Name: "api",
Uid: "a",
},
Version: "1",
}
require.NoError(t, newStore.WriteCAS(b, ""))
snap, err := oldStore.Snapshot()
require.NoError(t, err)
// Start a watch on the new store to make sure it gets closed.
watch, err := newStore.WatchList(storage.UnversionedTypeFrom(b.Id.Type), b.Id.Tenancy, "")
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// Expect the initial state on the watch.
_, err = watch.Next(ctx)
require.NoError(t, err)
restore, err := newStore.Restore()
require.NoError(t, err)
defer restore.Abort()
for r := snap.Next(); r != nil; r = snap.Next() {
restore.Apply(r)
}
restore.Commit()
// Check that resource we wrote to oldStore has been restored to newStore.
rsp, err := newStore.Read(a.Id)
require.NoError(t, err)
prototest.AssertDeepEqual(t, a, rsp)
// Check that resource written to newStore was removed by snapshot restore.
_, err = newStore.Read(b.Id)
require.ErrorIs(t, err, storage.ErrNotFound)
// Check the watch has been closed.
_, err = watch.Next(ctx)
require.ErrorIs(t, err, storage.ErrWatchClosed)
}

View File

@ -21,7 +21,9 @@ import (
// package, but also handles reads in our Raft backend, and can be used as a
// local cache when storing data in external systems (e.g. RDBMS, K/V stores).
type Store struct {
mu sync.RWMutex // guards db, because Restore.Commit will replace it wholesale.
db *memdb.MemDB
pub *stream.EventPublisher
// eventLock is used to serialize operations that result in the publishing of
@ -65,7 +67,8 @@ func (s *Store) Run(ctx context.Context) { s.pub.Run(ctx) }
//
// For more information, see the storage.Backend documentation.
func (s *Store) Read(id *pbresource.ID) (*pbresource.Resource, error) {
tx := s.db.Txn(false)
tx := s.txn(false)
defer tx.Abort()
val, err := tx.First(tableNameResources, indexNameID, id)
@ -101,7 +104,7 @@ func (s *Store) WriteCAS(res *pbresource.Resource, vsn string) error {
s.eventLock.Lock()
defer s.eventLock.Unlock()
tx := s.db.Txn(true)
tx := s.txn(true)
defer tx.Abort()
existing, err := tx.First(tableNameResources, indexNameID, res.Id)
@ -150,7 +153,7 @@ func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error {
s.eventLock.Lock()
defer s.eventLock.Unlock()
tx := s.db.Txn(true)
tx := s.txn(true)
defer tx.Abort()
existing, err := tx.First(tableNameResources, indexNameID, id)
@ -195,7 +198,7 @@ func (s *Store) DeleteCAS(id *pbresource.ID, vsn string) error {
//
// For more information, see the storage.Backend documentation.
func (s *Store) List(typ storage.UnversionedType, ten *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
tx := s.db.Txn(false)
tx := s.txn(false)
defer tx.Abort()
return listTxn(tx, query{typ, ten, namePrefix})
@ -261,7 +264,7 @@ func (s *Store) WatchList(typ storage.UnversionedType, ten *pbresource.Tenancy,
//
// For more information, see the storage.Backend documentation.
func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) {
tx := s.db.Txn(false)
tx := s.txn(false)
defer tx.Abort()
iter, err := tx.Get(tableNameResources, indexNameOwner, id)
@ -275,3 +278,10 @@ func (s *Store) OwnerReferences(id *pbresource.ID) ([]*pbresource.ID, error) {
}
return refs, nil
}
func (s *Store) txn(write bool) *memdb.Txn {
s.mu.RLock()
defer s.mu.RUnlock()
return s.db.Txn(write)
}

View File

@ -28,6 +28,9 @@ type Watch struct {
func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
for {
e, err := w.nextEvent(ctx)
if err == stream.ErrSubForceClosed {
return nil, storage.ErrWatchClosed
}
if err != nil {
return nil, err
}
@ -162,7 +165,7 @@ func (s *Store) watchSnapshot(req stream.SubscribeRequest, snap stream.SnapshotA
return 0, fmt.Errorf("unhandled subject type: %T", req.Subject)
}
tx := s.db.Txn(false)
tx := s.txn(false)
defer tx.Abort()
idx, err := currentEventIndex(tx)

View File

@ -0,0 +1,344 @@
package raft
import (
"context"
"fmt"
"net"
"strconv"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/inmem"
"github.com/hashicorp/consul/proto-public/pbresource"
pbstorage "github.com/hashicorp/consul/proto/private/pbstorage"
)
// NewBackend returns a storage backend that uses Raft for durable persistence
// and serves reads from an in-memory database. It's suitable for production use.
//
// It's not an entirely clean abstraction because rather than owning the Raft
// subsystem directly, it has to integrate with the existing FSM and related
// machinery from before generic resources.
//
// The given Handle will be used to apply logs and interrogate leadership state.
// In certain restricted circumstances, Handle may be nil, such as during tests
// that only exercise snapshot restoration, or when initializing a throwaway FSM
// during peers.json recovery - but calling any of the data access methods (read
// or write) will result in a panic.
//
// With Raft, writes and strongly consistent reads must be done on the leader.
// Backend implements a gRPC server, which followers will use to transparently
// forward operations to the leader. To do so, they will obtain a connection
// using Handle.DialLeader. Connections are cached for re-use, so when there's
// a new leader, you must call LeaderChanged to refresh the connection. Leaders
// must accept connections and hand them off by calling Backend.HandleConnection.
// Backend's gRPC client and server *DO NOT* handle TLS themselves, as they are
// intended to communicate over Consul's multiplexed server port (which handles
// TLS).
//
// You must call Run before using the backend.
func NewBackend(h Handle, l hclog.Logger) (*Backend, error) {
s, err := inmem.NewStore()
if err != nil {
return nil, err
}
b := &Backend{handle: h, store: s}
b.forwardingServer = newForwardingServer(b)
b.forwardingClient = newForwardingClient(h, l)
return b, nil
}
// Handle provides glue for interacting with the Raft subsystem via existing
// machinery on consul.Server.
type Handle interface {
// Apply the given log message.
Apply(msg []byte) (any, error)
// IsLeader determines if this server is the Raft leader (so can handle writes).
IsLeader() bool
// EnsureStrongConsistency checks the server is able to handle consistent reads by
// verifying its leadership and checking the FSM has applied all queued writes.
EnsureStrongConsistency(ctx context.Context) error
// DialLeader dials a gRPC connection to the leader for forwarding.
DialLeader() (*grpc.ClientConn, error)
}
// Backend is a Raft-backed storage backend implementation.
type Backend struct {
handle Handle
store *inmem.Store
forwardingServer *forwardingServer
forwardingClient *forwardingClient
}
// Run until the given context is canceled. This method blocks, so should be
// called in a goroutine.
func (b *Backend) Run(ctx context.Context) {
group, groupCtx := errgroup.WithContext(ctx)
group.Go(func() error {
b.store.Run(groupCtx)
return nil
})
group.Go(func() error {
return b.forwardingServer.run(groupCtx)
})
group.Wait()
}
// Read implements the storage.Backend interface.
func (b *Backend) Read(ctx context.Context, consistency storage.ReadConsistency, id *pbresource.ID) (*pbresource.Resource, error) {
// Easy case. Both leaders and followers can read from the local store.
if consistency == storage.EventualConsistency {
return b.store.Read(id)
}
if consistency != storage.StrongConsistency {
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
}
// We are the leader. Handle the request ourself.
if b.handle.IsLeader() {
return b.leaderRead(ctx, id)
}
// Forward the request to the leader.
rsp, err := b.forwardingClient.read(ctx, &pbstorage.ReadRequest{Id: id})
if err != nil {
return nil, err
}
return rsp.GetResource(), nil
}
func (b *Backend) leaderRead(ctx context.Context, id *pbresource.ID) (*pbresource.Resource, error) {
if err := b.ensureStrongConsistency(ctx); err != nil {
return nil, err
}
return b.store.Read(id)
}
// WriteCAS implements the storage.Backend interface.
func (b *Backend) WriteCAS(ctx context.Context, res *pbresource.Resource) (*pbresource.Resource, error) {
req := &pbstorage.WriteRequest{Resource: res}
if b.handle.IsLeader() {
rsp, err := b.raftApply(&pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_WRITE,
Request: &pbstorage.Log_Write{
Write: req,
},
})
if err != nil {
return nil, err
}
return rsp.GetWrite().GetResource(), nil
}
rsp, err := b.forwardingClient.write(ctx, req)
if err != nil {
return nil, err
}
return rsp.GetResource(), nil
}
// DeleteCAS implements the storage.Backend interface.
func (b *Backend) DeleteCAS(ctx context.Context, id *pbresource.ID, version string) error {
req := &pbstorage.DeleteRequest{
Id: id,
Version: version,
}
if b.handle.IsLeader() {
_, err := b.raftApply(&pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_DELETE,
Request: &pbstorage.Log_Delete{
Delete: req,
},
})
return err
}
return b.forwardingClient.delete(ctx, req)
}
// List implements the storage.Backend interface.
func (b *Backend) List(ctx context.Context, consistency storage.ReadConsistency, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
// Easy case. Both leaders and followers can read from the local store.
if consistency == storage.EventualConsistency {
return b.store.List(resType, tenancy, namePrefix)
}
if consistency != storage.StrongConsistency {
return nil, fmt.Errorf("%w: unknown consistency: %s", storage.ErrInconsistent, consistency)
}
// We are the leader. Handle the request ourself.
if b.handle.IsLeader() {
return b.leaderList(ctx, resType, tenancy, namePrefix)
}
// Forward the request to the leader.
rsp, err := b.forwardingClient.list(ctx, &pbstorage.ListRequest{
Type: &pbresource.Type{
Group: resType.Group,
Kind: resType.Kind,
},
Tenancy: tenancy,
NamePrefix: namePrefix,
})
if err != nil {
return nil, err
}
return rsp.GetResources(), nil
}
func (b *Backend) leaderList(ctx context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) ([]*pbresource.Resource, error) {
if err := b.ensureStrongConsistency(ctx); err != nil {
return nil, err
}
return b.store.List(resType, tenancy, namePrefix)
}
// WatchList implements the storage.Backend interface.
func (b *Backend) WatchList(_ context.Context, resType storage.UnversionedType, tenancy *pbresource.Tenancy, namePrefix string) (storage.Watch, error) {
return b.store.WatchList(resType, tenancy, namePrefix)
}
// OwnerReferences implements the storage.Backend interface.
func (b *Backend) OwnerReferences(_ context.Context, id *pbresource.ID) ([]*pbresource.ID, error) {
return b.store.OwnerReferences(id)
}
// Apply is called by the FSM with the bytes of a Raft log entry, with Consul's
// envelope (i.e. type prefix and msgpack wrapper) stripped off.
func (b *Backend) Apply(buf []byte, idx uint64) any {
var req pbstorage.Log
if err := req.UnmarshalBinary(buf); err != nil {
return fmt.Errorf("failed to decode request: %w", err)
}
switch req.Type {
case pbstorage.LogType_LOG_TYPE_WRITE:
res := req.GetWrite().GetResource()
oldVsn := res.Version
res.Version = strconv.Itoa(int(idx))
if err := b.store.WriteCAS(res, oldVsn); err != nil {
return err
}
return &pbstorage.LogResponse{
Response: &pbstorage.LogResponse_Write{
Write: &pbstorage.WriteResponse{Resource: res},
},
}
case pbstorage.LogType_LOG_TYPE_DELETE:
req := req.GetDelete()
if err := b.store.DeleteCAS(req.Id, req.Version); err != nil {
return err
}
return &pbstorage.LogResponse{
Response: &pbstorage.LogResponse_Delete{},
}
}
return fmt.Errorf("unexpected request type: %s", req.Type)
}
// LeaderChanged should be called whenever the current Raft leader changes, to
// drop and re-create the gRPC connection used for forwarding.
func (b *Backend) LeaderChanged() { b.forwardingClient.leaderChanged() }
// HandleConnection should be called whenever a forwarding connection is opened.
func (b *Backend) HandleConnection(conn net.Conn) { b.forwardingServer.listener.Handle(conn) }
// raftApply round trips the given request through the Raft log and FSM.
func (b *Backend) raftApply(req *pbstorage.Log) (*pbstorage.LogResponse, error) {
msg, err := req.MarshalBinary()
if err != nil {
return nil, err
}
rsp, err := b.handle.Apply(msg)
if err != nil {
return nil, err
}
switch t := rsp.(type) {
case *pbstorage.LogResponse:
return t, nil
default:
return nil, fmt.Errorf("unexpected response from Raft apply: %T", rsp)
}
}
func (b *Backend) ensureStrongConsistency(ctx context.Context) error {
if err := b.handle.EnsureStrongConsistency(ctx); err != nil {
return fmt.Errorf("%w: %v", storage.ErrInconsistent, err)
}
return nil
}
// Snapshot obtains a point-in-time snapshot of the backend's state, so that it
// can be written to disk as a backup or sent to bootstrap a follower.
func (b *Backend) Snapshot() (*Snapshot, error) {
s, err := b.store.Snapshot()
if err != nil {
return nil, err
}
return &Snapshot{s}, nil
}
// Snapshot is a point-in-time snapshot of a backend's state.
type Snapshot struct{ s *inmem.Snapshot }
// Next returns the next resource in the snapshot, protobuf encoded. nil bytes
// will be returned when the end of the snapshot has been reached.
func (s *Snapshot) Next() ([]byte, error) {
res := s.s.Next()
if res == nil {
return nil, nil
}
return res.MarshalBinary()
}
// Restore starts the process of restoring a snapshot (i.e. from an on-disk
// backup, or to bootstrap from a leader).
//
// Callers *must* call Abort or Commit when done, to free resources.
func (b *Backend) Restore() (*Restoration, error) {
r, err := b.store.Restore()
if err != nil {
return nil, err
}
return &Restoration{r}, nil
}
// Restoration is a handle that can be used to restore a snapshot.
type Restoration struct{ r *inmem.Restoration }
// Apply the given protobuf-encoded resource to the backend.
func (r *Restoration) Apply(msg []byte) error {
var res pbresource.Resource
if err := res.UnmarshalBinary(msg); err != nil {
return err
}
return r.r.Apply(&res)
}
// Commit the restoration.
func (r *Restoration) Commit() { r.r.Commit() }
// Abort the restoration. It's safe to always call this in a defer statement
// because aborting a committed restoration is a no-op.
func (r *Restoration) Abort() { r.r.Abort() }

View File

@ -0,0 +1,170 @@
package raft_test
import (
"context"
"errors"
"math/rand"
"net"
"sync/atomic"
"testing"
"time"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/conformance"
"github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/sdk/testutil"
)
func TestBackend_Conformance(t *testing.T) {
t.Run("Leader", func(t *testing.T) {
conformance.Test(t, conformance.TestOptions{
NewBackend: func(t *testing.T) storage.Backend {
leader, _ := newRaftCluster(t)
return leader
},
SupportsStronglyConsistentList: true,
})
})
t.Run("Follower", func(t *testing.T) {
conformance.Test(t, conformance.TestOptions{
NewBackend: func(t *testing.T) storage.Backend {
_, follower := newRaftCluster(t)
return follower
},
SupportsStronglyConsistentList: true,
})
})
}
func newRaftCluster(t *testing.T) (*raft.Backend, *raft.Backend) {
t.Helper()
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
lis, err := net.Listen("tcp", ":0")
require.NoError(t, err)
lc, err := grpc.DialContext(ctx, lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
lh := &leaderHandle{replCh: make(chan log, 10)}
leader, err := raft.NewBackend(lh, testutil.Logger(t))
require.NoError(t, err)
lh.backend = leader
go leader.Run(ctx)
go func() {
for {
conn, err := lis.Accept()
if errors.Is(err, net.ErrClosed) {
return
}
require.NoError(t, err)
go leader.HandleConnection(conn)
}
}()
follower, err := raft.NewBackend(&followerHandle{leaderConn: lc}, testutil.Logger(t))
require.NoError(t, err)
go follower.Run(ctx)
follower.LeaderChanged()
go lh.replicate(t, follower)
return leader, follower
}
type followerHandle struct {
leaderConn *grpc.ClientConn
}
func (followerHandle) Apply([]byte) (any, error) {
return nil, errors.New("not leader")
}
func (followerHandle) IsLeader() bool {
return false
}
func (followerHandle) EnsureStrongConsistency(context.Context) error {
return errors.New("not leader")
}
func (f *followerHandle) DialLeader() (*grpc.ClientConn, error) {
return f.leaderConn, nil
}
type leaderHandle struct {
index uint64
replCh chan log
backend *raft.Backend
}
type log struct {
idx uint64
msg []byte
}
func (l *leaderHandle) Apply(msg []byte) (any, error) {
idx := atomic.AddUint64(&l.index, 1)
// Apply the operation to the leader synchronously and capture its response
// to return to the caller.
rsp := l.backend.Apply(msg, idx)
// Replicate the operation to the follower asynchronously.
l.replCh <- log{idx, msg}
if err, ok := rsp.(error); ok {
return nil, err
}
return rsp, nil
}
func (leaderHandle) IsLeader() bool {
return true
}
func (leaderHandle) EnsureStrongConsistency(context.Context) error {
return nil
}
func (leaderHandle) DialLeader() (*grpc.ClientConn, error) {
return nil, errors.New("leader should not dial itself")
}
func (h *leaderHandle) replicate(t *testing.T, follower *raft.Backend) {
doneCh := make(chan struct{})
t.Cleanup(func() { close(doneCh) })
timer := time.NewTimer(replicationLag())
defer timer.Stop()
for {
select {
case <-timer.C:
select {
case l := <-h.replCh:
_ = follower.Apply(l.msg, l.idx)
default:
}
timer.Reset(replicationLag())
case <-doneCh:
return
}
}
}
func replicationLag() time.Duration {
if testing.Short() {
return 0
}
return time.Duration(rand.Intn(50)) * time.Millisecond
}

View File

@ -0,0 +1,265 @@
package raft
import (
"context"
"errors"
"net"
"sync"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
"github.com/hashicorp/go-hclog"
grpcinternal "github.com/hashicorp/consul/agent/grpc-internal"
"github.com/hashicorp/consul/internal/storage"
pbstorage "github.com/hashicorp/consul/proto/private/pbstorage"
)
// forwardingServer implements the gRPC forwarding service.
type forwardingServer struct {
backend *Backend
listener *grpcinternal.Listener
}
var _ pbstorage.ForwardingServiceServer = (*forwardingServer)(nil)
func newForwardingServer(backend *Backend) *forwardingServer {
return &forwardingServer{
backend: backend,
// The address here doesn't actually matter. gRPC uses it as an identifier
// internally, but we only bind the server to a single listener.
listener: grpcinternal.NewListener(&net.TCPAddr{
IP: net.ParseIP("0.0.0.0"),
Port: 0,
}),
}
}
func (s *forwardingServer) Write(ctx context.Context, req *pbstorage.WriteRequest) (*pbstorage.WriteResponse, error) {
rsp, err := s.raftApply(ctx, &pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_WRITE,
Request: &pbstorage.Log_Write{Write: req},
})
if err != nil {
return nil, err
}
return rsp.GetWrite(), nil
}
func (s *forwardingServer) Delete(ctx context.Context, req *pbstorage.DeleteRequest) (*emptypb.Empty, error) {
_, err := s.raftApply(ctx, &pbstorage.Log{
Type: pbstorage.LogType_LOG_TYPE_DELETE,
Request: &pbstorage.Log_Delete{Delete: req},
})
if err != nil {
return nil, err
}
return &emptypb.Empty{}, nil
}
func (s *forwardingServer) Read(ctx context.Context, req *pbstorage.ReadRequest) (*pbstorage.ReadResponse, error) {
res, err := s.backend.leaderRead(ctx, req.Id)
if err != nil {
return nil, wrapError(err)
}
return &pbstorage.ReadResponse{Resource: res}, nil
}
func (s *forwardingServer) List(ctx context.Context, req *pbstorage.ListRequest) (*pbstorage.ListResponse, error) {
res, err := s.backend.leaderList(ctx, storage.UnversionedTypeFrom(req.Type), req.Tenancy, req.NamePrefix)
if err != nil {
return nil, wrapError(err)
}
return &pbstorage.ListResponse{Resources: res}, nil
}
func (s *forwardingServer) raftApply(_ context.Context, req *pbstorage.Log) (*pbstorage.LogResponse, error) {
msg, err := req.MarshalBinary()
if err != nil {
return nil, wrapError(err)
}
rsp, err := s.backend.handle.Apply(msg)
if err != nil {
return nil, wrapError(err)
}
switch t := rsp.(type) {
case *pbstorage.LogResponse:
return t, nil
default:
return nil, status.Errorf(codes.Internal, "unexpected response from Raft apply: %T", rsp)
}
}
func (s *forwardingServer) run(ctx context.Context) error {
server := grpc.NewServer()
pbstorage.RegisterForwardingServiceServer(server, s)
go func() {
<-ctx.Done()
server.Stop()
}()
return server.Serve(s.listener)
}
// forwardingClient is used to forward operations to the leader.
type forwardingClient struct {
handle Handle
logger hclog.Logger
mu sync.RWMutex
conn *grpc.ClientConn
}
func newForwardingClient(h Handle, l hclog.Logger) *forwardingClient {
return &forwardingClient{
handle: h,
logger: l,
}
}
func (c *forwardingClient) leaderChanged() {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn == nil {
return
}
if err := c.conn.Close(); err != nil {
c.logger.Error("failed to close connection to previous leader", "error", err)
}
c.conn = nil
}
func (c *forwardingClient) getConn() (*grpc.ClientConn, error) {
c.mu.Lock()
defer c.mu.Unlock()
if c.conn != nil {
return c.conn, nil
}
conn, err := c.handle.DialLeader()
if err != nil {
c.logger.Error("failed to dial leader", "error", err)
return nil, err
}
c.conn = conn
return conn, nil
}
func (c *forwardingClient) getClient() (pbstorage.ForwardingServiceClient, error) {
conn, err := c.getConn()
if err != nil {
return nil, err
}
return pbstorage.NewForwardingServiceClient(conn), nil
}
func (c *forwardingClient) delete(ctx context.Context, req *pbstorage.DeleteRequest) error {
client, err := c.getClient()
if err != nil {
return err
}
_, err = client.Delete(ctx, req)
return unwrapError(err)
}
func (c *forwardingClient) write(ctx context.Context, req *pbstorage.WriteRequest) (*pbstorage.WriteResponse, error) {
client, err := c.getClient()
if err != nil {
return nil, err
}
rsp, err := client.Write(ctx, req)
return rsp, unwrapError(err)
}
func (c *forwardingClient) read(ctx context.Context, req *pbstorage.ReadRequest) (*pbstorage.ReadResponse, error) {
client, err := c.getClient()
if err != nil {
return nil, err
}
rsp, err := client.Read(ctx, req)
return rsp, unwrapError(err)
}
func (c *forwardingClient) list(ctx context.Context, req *pbstorage.ListRequest) (*pbstorage.ListResponse, error) {
client, err := c.getClient()
if err != nil {
return nil, err
}
rsp, err := client.List(ctx, req)
return rsp, unwrapError(err)
}
var (
errorToCode = map[error]codes.Code{
// Note: OutOfRange is used to represent GroupVersionMismatchError, but is
// handled specially in wrapError and unwrapError because it has extra details.
storage.ErrNotFound: codes.NotFound,
storage.ErrCASFailure: codes.Aborted,
storage.ErrWrongUid: codes.AlreadyExists,
storage.ErrInconsistent: codes.FailedPrecondition,
}
codeToError = func() map[codes.Code]error {
inverted := make(map[codes.Code]error, len(errorToCode))
for k, v := range errorToCode {
inverted[v] = k
}
return inverted
}()
)
// wrapError converts the given error to a gRPC status to send over the wire.
func wrapError(err error) error {
var gvm storage.GroupVersionMismatchError
if errors.As(err, &gvm) {
s, err := status.New(codes.OutOfRange, err.Error()).
WithDetails(&pbstorage.GroupVersionMismatchErrorDetails{
RequestedType: gvm.RequestedType,
Stored: gvm.Stored,
})
if err == nil {
return s.Err()
}
}
code, ok := errorToCode[err]
if !ok {
code = codes.Internal
}
return status.Error(code, err.Error())
}
// unwrapError converts the given gRPC status error back to a storage package
// error.
func unwrapError(err error) error {
s, ok := status.FromError(err)
if !ok {
return err
}
for _, d := range s.Details() {
if gvm, ok := d.(*pbstorage.GroupVersionMismatchErrorDetails); ok {
return storage.GroupVersionMismatchError{
RequestedType: gvm.RequestedType,
Stored: gvm.Stored,
}
}
}
unwrapped, ok := codeToError[s.Code()]
if !ok {
return err
}
return unwrapped
}

View File

@ -32,6 +32,11 @@ var (
// not be achieved because of a consistency or availability issue (e.g. loss of
// quorum, or when interacting with a Raft follower).
ErrInconsistent = errors.New("cannot satisfy consistency requirements")
// ErrWatchClosed is returned by Watch.Next when the watch is closed, e.g. when
// a snapshot is restored and the watch's events are no longer valid. Consumers
// should discard any materialized state and start a new watch.
ErrWatchClosed = errors.New("watch closed")
)
// ReadConsistency is used to specify the required consistency guarantees for

View File

@ -21,6 +21,7 @@ lint:
- ONEOF_LOWER_SNAKE_CASE
- ENUM_VALUE_PREFIX
service_suffix: ""
rpc_allow_google_protobuf_empty_responses: true
breaking:
use:
- FILE

View File

@ -0,0 +1,108 @@
// Code generated by protoc-gen-go-binary. DO NOT EDIT.
// source: private/pbstorage/raft.proto
package pbstorage
import (
"google.golang.org/protobuf/proto"
)
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *Log) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *Log) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *LogResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *LogResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *WriteRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *WriteRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *WriteResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *WriteResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *DeleteRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *DeleteRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ReadRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ReadRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ReadResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ReadResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ListRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ListRequest) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *ListResponse) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *ListResponse) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *GroupVersionMismatchErrorDetails) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *GroupVersionMismatchErrorDetails) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,116 @@
syntax = "proto3";
// This package contains types used by the Raft storage backend that lives in
// the internal/storage/raft Go package.
package hashicorp.consul.internal.storage.raft;
import "annotations/ratelimit/ratelimit.proto";
import "google/protobuf/empty.proto";
import "pbresource/resource.proto";
// Forwarding service is used for forwarding write and consistent read
// operations to the Raft leader. It is served on Consul's multiplexed
// server port, which is the same port used for regular Raft traffic.
service ForwardingService {
// Write handles a forwarded write operation.
rpc Write(WriteRequest) returns (WriteResponse) {
option (hashicorp.consul.internal.ratelimit.spec) = {
operation_type: OPERATION_TYPE_EXEMPT,
operation_category: OPERATION_CATEGORY_RESOURCE
};
}
// Delete handles a forwarded delete operation.
rpc Delete(DeleteRequest) returns (google.protobuf.Empty) {
option (hashicorp.consul.internal.ratelimit.spec) = {
operation_type: OPERATION_TYPE_EXEMPT,
operation_category: OPERATION_CATEGORY_RESOURCE
};
}
// Read handles a forwarded read operation.
rpc Read(ReadRequest) returns (ReadResponse) {
option (hashicorp.consul.internal.ratelimit.spec) = {
operation_type: OPERATION_TYPE_EXEMPT,
operation_category: OPERATION_CATEGORY_RESOURCE
};
}
// List handles a forwarded list operation.
rpc List(ListRequest) returns (ListResponse) {
option (hashicorp.consul.internal.ratelimit.spec) = {
operation_type: OPERATION_TYPE_EXEMPT,
operation_category: OPERATION_CATEGORY_RESOURCE
};
}
}
// LogType describes the type of operation being written to the Raft log.
enum LogType {
LOG_TYPE_UNSPECIFIED = 0;
LOG_TYPE_WRITE = 1;
LOG_TYPE_DELETE = 2;
}
// Log is protobuf-encoded and written to the Raft log.
message Log {
LogType type = 1;
oneof request {
WriteRequest write = 2;
DeleteRequest delete = 3;
}
}
// LogResponse contains the FSM's response to applying a log.
message LogResponse {
oneof response {
WriteResponse write = 1;
google.protobuf.Empty delete = 2;
}
}
// WriteRequest contains the parameters for a write operation.
message WriteRequest {
hashicorp.consul.resource.Resource resource = 1;
}
// WriteResponse contains the results of a write operation.
message WriteResponse {
hashicorp.consul.resource.Resource resource = 1;
}
// DeleteRequest contains the parameters for a write operation.
message DeleteRequest {
hashicorp.consul.resource.ID id = 1;
string version = 2;
}
// ReadRequest contains the parameters for a consistent read operation.
message ReadRequest {
hashicorp.consul.resource.ID id = 1;
}
// ReadResponse contains the results of a consistent read operation.
message ReadResponse {
hashicorp.consul.resource.Resource resource = 1;
}
// ListRequest contains the parameters for a consistent list operation.
message ListRequest {
hashicorp.consul.resource.Type type = 1;
hashicorp.consul.resource.Tenancy tenancy = 2;
string name_prefix = 3;
}
// ListResponse contains the results of a consistent list operation.
message ListResponse {
repeated hashicorp.consul.resource.Resource resources = 1;
}
// GroupVersionMismatchErrorDetails contains the error details that will be
// returned when the leader encounters a storage.GroupVersionMismatchError.
message GroupVersionMismatchErrorDetails {
hashicorp.consul.resource.Type requested_type = 1;
hashicorp.consul.resource.Resource stored = 2;
}

View File

@ -0,0 +1,220 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.2.0
// - protoc (unknown)
// source: private/pbstorage/raft.proto
package pbstorage
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
emptypb "google.golang.org/protobuf/types/known/emptypb"
)
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7
// ForwardingServiceClient is the client API for ForwardingService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type ForwardingServiceClient interface {
// Write handles a forwarded write operation.
Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error)
// Delete handles a forwarded delete operation.
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error)
// Read handles a forwarded read operation.
Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error)
// List handles a forwarded list operation.
List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error)
}
type forwardingServiceClient struct {
cc grpc.ClientConnInterface
}
func NewForwardingServiceClient(cc grpc.ClientConnInterface) ForwardingServiceClient {
return &forwardingServiceClient{cc}
}
func (c *forwardingServiceClient) Write(ctx context.Context, in *WriteRequest, opts ...grpc.CallOption) (*WriteResponse, error) {
out := new(WriteResponse)
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/Write", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *forwardingServiceClient) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) {
out := new(emptypb.Empty)
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/Delete", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *forwardingServiceClient) Read(ctx context.Context, in *ReadRequest, opts ...grpc.CallOption) (*ReadResponse, error) {
out := new(ReadResponse)
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/Read", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *forwardingServiceClient) List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) {
out := new(ListResponse)
err := c.cc.Invoke(ctx, "/hashicorp.consul.internal.storage.raft.ForwardingService/List", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ForwardingServiceServer is the server API for ForwardingService service.
// All implementations should embed UnimplementedForwardingServiceServer
// for forward compatibility
type ForwardingServiceServer interface {
// Write handles a forwarded write operation.
Write(context.Context, *WriteRequest) (*WriteResponse, error)
// Delete handles a forwarded delete operation.
Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error)
// Read handles a forwarded read operation.
Read(context.Context, *ReadRequest) (*ReadResponse, error)
// List handles a forwarded list operation.
List(context.Context, *ListRequest) (*ListResponse, error)
}
// UnimplementedForwardingServiceServer should be embedded to have forward compatible implementations.
type UnimplementedForwardingServiceServer struct {
}
func (UnimplementedForwardingServiceServer) Write(context.Context, *WriteRequest) (*WriteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Write not implemented")
}
func (UnimplementedForwardingServiceServer) Delete(context.Context, *DeleteRequest) (*emptypb.Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Delete not implemented")
}
func (UnimplementedForwardingServiceServer) Read(context.Context, *ReadRequest) (*ReadResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Read not implemented")
}
func (UnimplementedForwardingServiceServer) List(context.Context, *ListRequest) (*ListResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method List not implemented")
}
// UnsafeForwardingServiceServer may be embedded to opt out of forward compatibility for this service.
// Use of this interface is not recommended, as added methods to ForwardingServiceServer will
// result in compilation errors.
type UnsafeForwardingServiceServer interface {
mustEmbedUnimplementedForwardingServiceServer()
}
func RegisterForwardingServiceServer(s grpc.ServiceRegistrar, srv ForwardingServiceServer) {
s.RegisterService(&ForwardingService_ServiceDesc, srv)
}
func _ForwardingService_Write_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(WriteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ForwardingServiceServer).Write(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/Write",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ForwardingServiceServer).Write(ctx, req.(*WriteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ForwardingService_Delete_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(DeleteRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ForwardingServiceServer).Delete(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/Delete",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ForwardingServiceServer).Delete(ctx, req.(*DeleteRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ForwardingService_Read_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ReadRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ForwardingServiceServer).Read(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/Read",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ForwardingServiceServer).Read(ctx, req.(*ReadRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ForwardingService_List_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ListRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ForwardingServiceServer).List(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/hashicorp.consul.internal.storage.raft.ForwardingService/List",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ForwardingServiceServer).List(ctx, req.(*ListRequest))
}
return interceptor(ctx, in, info, handler)
}
// ForwardingService_ServiceDesc is the grpc.ServiceDesc for ForwardingService service.
// It's only intended for direct use with grpc.RegisterService,
// and not to be introspected or modified (even as a copy)
var ForwardingService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "hashicorp.consul.internal.storage.raft.ForwardingService",
HandlerType: (*ForwardingServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "Write",
Handler: _ForwardingService_Write_Handler,
},
{
MethodName: "Delete",
Handler: _ForwardingService_Delete_Handler,
},
{
MethodName: "Read",
Handler: _ForwardingService_Read_Handler,
},
{
MethodName: "List",
Handler: _ForwardingService_List_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "private/pbstorage/raft.proto",
}