mirror of https://github.com/status-im/consul.git
Handle server addresses update as client
This commit is contained in:
parent
584d3409c4
commit
4e40e1d222
|
@ -11,6 +11,7 @@ import (
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/agent/cache"
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/structs"
|
"github.com/hashicorp/consul/agent/structs"
|
||||||
"github.com/hashicorp/consul/proto/pbpeering"
|
"github.com/hashicorp/consul/proto/pbpeering"
|
||||||
"github.com/hashicorp/consul/proto/pbpeerstream"
|
"github.com/hashicorp/consul/proto/pbpeerstream"
|
||||||
|
@ -256,6 +257,7 @@ func (s *Server) handleUpsert(
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleUpdateService handles both deletion and upsert events for a service.
|
// handleUpdateService handles both deletion and upsert events for a service.
|
||||||
|
//
|
||||||
// On an UPSERT event:
|
// On an UPSERT event:
|
||||||
// - All nodes, services, checks in the input pbNodes are re-applied through Raft.
|
// - All nodes, services, checks in the input pbNodes are re-applied through Raft.
|
||||||
// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted.
|
// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted.
|
||||||
|
@ -470,6 +472,33 @@ func (s *Server) handleUpsertRoots(
|
||||||
return s.Backend.PeeringTrustBundleWrite(req)
|
return s.Backend.PeeringTrustBundleWrite(req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *Server) handleUpsertServerAddrs(
|
||||||
|
peerName string,
|
||||||
|
partition string,
|
||||||
|
addrs *pbpeering.PeeringServerAddresses,
|
||||||
|
) error {
|
||||||
|
q := state.Query{
|
||||||
|
Value: peerName,
|
||||||
|
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(partition),
|
||||||
|
}
|
||||||
|
_, existing, err := s.GetStore().PeeringRead(nil, q)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to read peering: %w", err)
|
||||||
|
}
|
||||||
|
if existing == nil || !existing.IsActive() {
|
||||||
|
return fmt.Errorf("peering does not exist or has been marked for deletion")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clone to avoid mutating the existing data
|
||||||
|
p := proto.Clone(existing).(*pbpeering.Peering)
|
||||||
|
p.PeerServerAddresses = addrs.GetAddresses()
|
||||||
|
|
||||||
|
req := &pbpeering.PeeringWriteRequest{
|
||||||
|
Peering: p,
|
||||||
|
}
|
||||||
|
return s.Backend.PeeringWrite(req)
|
||||||
|
}
|
||||||
|
|
||||||
func (s *Server) handleDelete(
|
func (s *Server) handleDelete(
|
||||||
peerName string,
|
peerName string,
|
||||||
partition string,
|
partition string,
|
||||||
|
|
|
@ -104,6 +104,7 @@ type Backend interface {
|
||||||
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
||||||
CatalogRegister(req *structs.RegisterRequest) error
|
CatalogRegister(req *structs.RegisterRequest) error
|
||||||
CatalogDeregister(req *structs.DeregisterRequest) error
|
CatalogDeregister(req *structs.DeregisterRequest) error
|
||||||
|
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// StateStore provides a read-only interface for querying Peering data.
|
// StateStore provides a read-only interface for querying Peering data.
|
||||||
|
|
|
@ -126,7 +126,7 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
||||||
|
|
||||||
// Receive a subscription from a peer. This message arrives while the
|
// Receive a subscription from a peer. This message arrives while the
|
||||||
// server is a leader and should work.
|
// server is a leader and should work.
|
||||||
testutil.RunStep(t, "send subscription request to leader and consume its two requests", func(t *testing.T) {
|
testutil.RunStep(t, "send subscription request to leader and consume its three requests", func(t *testing.T) {
|
||||||
sub := &pbpeerstream.ReplicationMessage{
|
sub := &pbpeerstream.ReplicationMessage{
|
||||||
Payload: &pbpeerstream.ReplicationMessage_Open_{
|
Payload: &pbpeerstream.ReplicationMessage_Open_{
|
||||||
Open: &pbpeerstream.ReplicationMessage_Open{
|
Open: &pbpeerstream.ReplicationMessage_Open{
|
||||||
|
@ -145,6 +145,10 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
|
||||||
msg2, err := client.Recv()
|
msg2, err := client.Recv()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
require.NotEmpty(t, msg2)
|
require.NotEmpty(t, msg2)
|
||||||
|
|
||||||
|
msg3, err := client.Recv()
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.NotEmpty(t, msg3)
|
||||||
})
|
})
|
||||||
|
|
||||||
// The ACK will be a new request but at this point the server is not the
|
// The ACK will be a new request but at this point the server is not the
|
||||||
|
@ -1314,7 +1318,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {
|
||||||
|
|
||||||
// makeClient sets up a *MockClient with the initial subscription
|
// makeClient sets up a *MockClient with the initial subscription
|
||||||
// message handshake.
|
// message handshake.
|
||||||
func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID string) *MockClient {
|
func makeClient(t *testing.T, srv *testServer, peerID string) *MockClient {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
|
|
||||||
client := NewMockClient(context.Background())
|
client := NewMockClient(context.Background())
|
||||||
|
@ -1326,7 +1330,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
|
||||||
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
|
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
|
||||||
// This matches gRPC's behavior when an error is returned by a server.
|
// This matches gRPC's behavior when an error is returned by a server.
|
||||||
if err := srv.StreamResources(client.ReplicationStream); err != nil {
|
if err := srv.StreamResources(client.ReplicationStream); err != nil {
|
||||||
errCh <- srv.StreamResources(client.ReplicationStream)
|
errCh <- err
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
@ -1345,8 +1349,15 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
receivedSub2, err := client.Recv()
|
receivedSub2, err := client.Recv()
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
receivedSub3, err := client.Recv()
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Issue a services and roots subscription pair to server
|
// This is required when the client subscribes to server address replication messages.
|
||||||
|
// We assert for the handler to be called at least once but the data doesn't matter.
|
||||||
|
srv.mockSnapshotHandler.expect("", 0, 0, nil)
|
||||||
|
|
||||||
|
// Issue services, roots, and server address subscription to server.
|
||||||
|
// Note that server address may not come as an initial message
|
||||||
for _, resourceURL := range []string{
|
for _, resourceURL := range []string{
|
||||||
pbpeerstream.TypeURLExportedService,
|
pbpeerstream.TypeURLExportedService,
|
||||||
pbpeerstream.TypeURLPeeringTrustBundle,
|
pbpeerstream.TypeURLPeeringTrustBundle,
|
||||||
|
@ -1390,6 +1401,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
|
||||||
got := []*pbpeerstream.ReplicationMessage{
|
got := []*pbpeerstream.ReplicationMessage{
|
||||||
receivedSub1,
|
receivedSub1,
|
||||||
receivedSub2,
|
receivedSub2,
|
||||||
|
receivedSub3,
|
||||||
}
|
}
|
||||||
prototest.AssertElementsMatch(t, expect, got)
|
prototest.AssertElementsMatch(t, expect, got)
|
||||||
|
|
||||||
|
@ -1446,6 +1458,10 @@ func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteReque
|
||||||
return b.store.PeeringSecretsWrite(1, req)
|
return b.store.PeeringSecretsWrite(1, req)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *testStreamBackend) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
|
||||||
|
return b.store.PeeringWrite(1, req)
|
||||||
|
}
|
||||||
|
|
||||||
// CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister.
|
// CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister.
|
||||||
func (b *testStreamBackend) CatalogRegister(req *structs.RegisterRequest) error {
|
func (b *testStreamBackend) CatalogRegister(req *structs.RegisterRequest) error {
|
||||||
return b.store.EnsureRegistration(1, req)
|
return b.store.EnsureRegistration(1, req)
|
||||||
|
@ -2734,11 +2750,16 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes)
|
||||||
|
|
||||||
type testServer struct {
|
type testServer struct {
|
||||||
*Server
|
*Server
|
||||||
|
|
||||||
|
// mockSnapshotHandler is solely used for handling autopilot events
|
||||||
|
// which don't come from the state store.
|
||||||
|
mockSnapshotHandler *mockSnapshotHandler
|
||||||
}
|
}
|
||||||
|
|
||||||
func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) {
|
func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) {
|
||||||
|
t.Helper()
|
||||||
publisher := stream.NewEventPublisher(10 * time.Second)
|
publisher := stream.NewEventPublisher(10 * time.Second)
|
||||||
store := newStateStore(t, publisher)
|
store, handler := newStateStore(t, publisher)
|
||||||
|
|
||||||
ports := freeport.GetN(t, 1) // {grpc}
|
ports := freeport.GetN(t, 1) // {grpc}
|
||||||
|
|
||||||
|
@ -2776,6 +2797,7 @@ func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.
|
||||||
|
|
||||||
return &testServer{
|
return &testServer{
|
||||||
Server: srv,
|
Server: srv,
|
||||||
|
mockSnapshotHandler: handler,
|
||||||
}, store
|
}, store
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue