Merge pull request #14285 from hashicorp/NET-638-push-server-address-updates-to-the-peer

peering: Subscribe to server address changes and push updates to peers
This commit is contained in:
Chris S. Kim 2022-09-07 09:30:45 -04:00 committed by GitHub
commit 1c4a6eef4f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 1128 additions and 436 deletions

3
.changelog/14285.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
connect: Server address changes are streamed to peers
```

View File

@ -4,6 +4,8 @@ package autopilotevents
import ( import (
acl "github.com/hashicorp/consul/acl" acl "github.com/hashicorp/consul/acl"
memdb "github.com/hashicorp/go-memdb"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs" structs "github.com/hashicorp/consul/agent/structs"
@ -48,6 +50,36 @@ func (_m *MockStateStore) GetNodeID(_a0 types.NodeID, _a1 *acl.EnterpriseMeta, _
return r0, r1, r2 return r0, r1, r2
} }
// NodeService provides a mock function with given fields: ws, nodeName, serviceID, entMeta, peerName
func (_m *MockStateStore) NodeService(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeService, error) {
ret := _m.Called(ws, nodeName, serviceID, entMeta, peerName)
var r0 uint64
if rf, ok := ret.Get(0).(func(memdb.WatchSet, string, string, *acl.EnterpriseMeta, string) uint64); ok {
r0 = rf(ws, nodeName, serviceID, entMeta, peerName)
} else {
r0 = ret.Get(0).(uint64)
}
var r1 *structs.NodeService
if rf, ok := ret.Get(1).(func(memdb.WatchSet, string, string, *acl.EnterpriseMeta, string) *structs.NodeService); ok {
r1 = rf(ws, nodeName, serviceID, entMeta, peerName)
} else {
if ret.Get(1) != nil {
r1 = ret.Get(1).(*structs.NodeService)
}
}
var r2 error
if rf, ok := ret.Get(2).(func(memdb.WatchSet, string, string, *acl.EnterpriseMeta, string) error); ok {
r2 = rf(ws, nodeName, serviceID, entMeta, peerName)
} else {
r2 = ret.Error(2)
}
return r0, r1, r2
}
// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. // NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockStateStore(t testing.TB) *MockStateStore { func NewMockStateStore(t testing.TB) *MockStateStore {
mock := &MockStateStore{} mock := &MockStateStore{}

View File

@ -4,9 +4,11 @@ import (
"fmt" "fmt"
"net" "net"
"sort" "sort"
"strconv"
"sync" "sync"
"time" "time"
"github.com/hashicorp/go-memdb"
autopilot "github.com/hashicorp/raft-autopilot" autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
@ -26,6 +28,7 @@ type ReadyServerInfo struct {
ID string ID string
Address string Address string
TaggedAddresses map[string]string TaggedAddresses map[string]string
ExtGRPCPort int
Version string Version string
} }
@ -122,6 +125,7 @@ func NewReadyServersEventPublisher(config Config) *ReadyServersEventPublisher {
//go:generate mockery --name StateStore --inpackage --filename mock_StateStore_test.go //go:generate mockery --name StateStore --inpackage --filename mock_StateStore_test.go
type StateStore interface { type StateStore interface {
GetNodeID(types.NodeID, *acl.EnterpriseMeta, string) (uint64, *structs.Node, error) GetNodeID(types.NodeID, *acl.EnterpriseMeta, string) (uint64, *structs.Node, error)
NodeService(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeService, error)
} }
//go:generate mockery --name Publisher --inpackage --filename mock_Publisher_test.go //go:generate mockery --name Publisher --inpackage --filename mock_Publisher_test.go
@ -226,6 +230,7 @@ func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopil
Address: host, Address: host,
Version: srv.Server.Version, Version: srv.Server.Version,
TaggedAddresses: r.getTaggedAddresses(srv), TaggedAddresses: r.getTaggedAddresses(srv),
ExtGRPCPort: r.getGRPCPort(srv),
}) })
} }
} }
@ -254,7 +259,7 @@ func (r *ReadyServersEventPublisher) getTaggedAddresses(srv *autopilot.ServerSta
// code and reason about and having those addresses be updated within 30s is good enough. // code and reason about and having those addresses be updated within 30s is good enough.
_, node, err := r.GetStore().GetNodeID(types.NodeID(srv.Server.ID), structs.NodeEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword) _, node, err := r.GetStore().GetNodeID(types.NodeID(srv.Server.ID), structs.NodeEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword)
if err != nil || node == nil { if err != nil || node == nil {
// no catalog information means we should return a nil addres map // no catalog information means we should return a nil address map
return nil return nil
} }
@ -276,6 +281,38 @@ func (r *ReadyServersEventPublisher) getTaggedAddresses(srv *autopilot.ServerSta
return addrs return addrs
} }
// getGRPCPort will get the external gRPC port for a Consul server.
// Returns 0 if there is none assigned or if an error is encountered.
func (r *ReadyServersEventPublisher) getGRPCPort(srv *autopilot.ServerState) int {
if r.GetStore == nil {
return 0
}
_, n, err := r.GetStore().GetNodeID(types.NodeID(srv.Server.ID), structs.NodeEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword)
if err != nil || n == nil {
return 0
}
_, ns, err := r.GetStore().NodeService(
nil,
n.Node,
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
)
if err != nil || ns == nil || ns.Meta == nil {
return 0
}
if str, ok := ns.Meta["grpc_port"]; ok {
grpcPort, err := strconv.Atoi(str)
if err == nil {
return grpcPort
}
}
return 0
}
// newReadyServersEvent will create a stream.Event with the provided ready server info. // newReadyServersEvent will create a stream.Event with the provided ready server info.
func (r *ReadyServersEventPublisher) newReadyServersEvent(servers EventPayloadReadyServers) stream.Event { func (r *ReadyServersEventPublisher) newReadyServersEvent(servers EventPayloadReadyServers) stream.Event {
now := time.Now() now := time.Now()

View File

@ -4,6 +4,7 @@ import (
"testing" "testing"
time "time" time "time"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot" autopilot "github.com/hashicorp/raft-autopilot"
mock "github.com/stretchr/testify/mock" mock "github.com/stretchr/testify/mock"
@ -164,9 +165,21 @@ func TestAutopilotStateToReadyServersWithTaggedAddresses(t *testing.T) {
types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"), types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"),
structs.NodeEnterpriseMetaInDefaultPartition(), structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword, structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-1", TaggedAddresses: map[string]string{"wan": "5.4.3.2"}},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-1",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return( ).Once().Return(
uint64(0), uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "5.4.3.2"}}, nil,
nil, nil,
) )
@ -174,9 +187,21 @@ func TestAutopilotStateToReadyServersWithTaggedAddresses(t *testing.T) {
types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"), types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"),
structs.NodeEnterpriseMetaInDefaultPartition(), structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword, structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-2", TaggedAddresses: map[string]string{"wan": "1.2.3.4"}},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-2",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return( ).Once().Return(
uint64(0), uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "1.2.3.4"}}, nil,
nil, nil,
) )
@ -184,9 +209,119 @@ func TestAutopilotStateToReadyServersWithTaggedAddresses(t *testing.T) {
types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"), types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"),
structs.NodeEnterpriseMetaInDefaultPartition(), structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword, structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-3", TaggedAddresses: map[string]string{"wan": "9.8.7.6"}},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-3",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return( ).Once().Return(
uint64(0), uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "9.8.7.6"}}, nil,
nil,
)
r := NewReadyServersEventPublisher(Config{
GetStore: func() StateStore { return store },
})
actual := r.autopilotStateToReadyServers(exampleState)
require.ElementsMatch(t, expected, actual)
}
func TestAutopilotStateToReadyServersWithExtGRPCPort(t *testing.T) {
expected := EventPayloadReadyServers{
{
ID: "792ae13c-d765-470b-852c-e073fdb6e849",
Address: "198.18.0.2",
ExtGRPCPort: 1234,
Version: "v1.12.0",
},
{
ID: "65e79ff4-bbce-467b-a9d6-725c709fa985",
Address: "198.18.0.3",
ExtGRPCPort: 2345,
Version: "v1.12.0",
},
{
ID: "db11f0ac-0cbe-4215-80cc-b4e843f4df1e",
Address: "198.18.0.4",
ExtGRPCPort: 3456,
Version: "v1.12.0",
},
}
store := &MockStateStore{}
t.Cleanup(func() { store.AssertExpectations(t) })
store.On("GetNodeID",
types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"),
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-1"},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-1",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return(
uint64(0),
&structs.NodeService{Meta: map[string]string{"grpc_port": "1234"}},
nil,
)
store.On("GetNodeID",
types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"),
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-2"},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-2",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return(
uint64(0),
&structs.NodeService{Meta: map[string]string{"grpc_port": "2345"}},
nil,
)
store.On("GetNodeID",
types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"),
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-3"},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-3",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return(
uint64(0),
&structs.NodeService{Meta: map[string]string{"grpc_port": "3456"}},
nil, nil,
) )
@ -493,9 +628,21 @@ func TestReadyServerEventsSnapshotHandler(t *testing.T) {
types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"), types.NodeID("792ae13c-d765-470b-852c-e073fdb6e849"),
structs.NodeEnterpriseMetaInDefaultPartition(), structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword, structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-1", TaggedAddresses: map[string]string{"wan": "5.4.3.2"}},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-1",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return( ).Once().Return(
uint64(0), uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "5.4.3.2"}}, nil,
nil, nil,
) )
@ -503,9 +650,21 @@ func TestReadyServerEventsSnapshotHandler(t *testing.T) {
types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"), types.NodeID("65e79ff4-bbce-467b-a9d6-725c709fa985"),
structs.NodeEnterpriseMetaInDefaultPartition(), structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword, structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-2", TaggedAddresses: map[string]string{"wan": "1.2.3.4"}},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-2",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return( ).Once().Return(
uint64(0), uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "1.2.3.4"}}, nil,
nil, nil,
) )
@ -513,9 +672,21 @@ func TestReadyServerEventsSnapshotHandler(t *testing.T) {
types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"), types.NodeID("db11f0ac-0cbe-4215-80cc-b4e843f4df1e"),
structs.NodeEnterpriseMetaInDefaultPartition(), structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword, structs.DefaultPeerKeyword,
).Times(2).Return(
uint64(0),
&structs.Node{Node: "node-3", TaggedAddresses: map[string]string{"wan": "9.8.7.6"}},
nil,
)
store.On("NodeService",
memdb.WatchSet(nil),
"node-3",
structs.ConsulServiceID,
structs.NodeEnterpriseMetaInDefaultPartition(),
structs.DefaultPeerKeyword,
).Once().Return( ).Once().Return(
uint64(0), uint64(0),
&structs.Node{TaggedAddresses: map[string]string{"wan": "9.8.7.6"}}, nil,
nil, nil,
) )

View File

@ -5,12 +5,13 @@ import (
"fmt" "fmt"
"strings" "strings"
"github.com/hashicorp/go-hclog" "github.com/golang/protobuf/proto"
"google.golang.org/genproto/googleapis/rpc/code" "google.golang.org/genproto/googleapis/rpc/code"
newproto "google.golang.org/protobuf/proto" newproto "google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/anypb" "google.golang.org/protobuf/types/known/anypb"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbpeerstream" "github.com/hashicorp/consul/proto/pbpeerstream"
@ -35,7 +36,6 @@ import (
// Each cache.UpdateEvent will contain all instances for a service name. // Each cache.UpdateEvent will contain all instances for a service name.
// If there are no instances in the event, we consider that to be a de-registration. // If there are no instances in the event, we consider that to be a de-registration.
func makeServiceResponse( func makeServiceResponse(
logger hclog.Logger,
mst *MutableStatus, mst *MutableStatus,
update cache.UpdateEvent, update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) { ) (*pbpeerstream.ReplicationMessage_Response, error) {
@ -87,7 +87,6 @@ func makeServiceResponse(
} }
func makeCARootsResponse( func makeCARootsResponse(
logger hclog.Logger,
update cache.UpdateEvent, update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) { ) (*pbpeerstream.ReplicationMessage_Response, error) {
any, _, err := marshalToProtoAny[*pbpeering.PeeringTrustBundle](update.Result) any, _, err := marshalToProtoAny[*pbpeering.PeeringTrustBundle](update.Result)
@ -105,6 +104,24 @@ func makeCARootsResponse(
}, nil }, nil
} }
func makeServerAddrsResponse(
update cache.UpdateEvent,
) (*pbpeerstream.ReplicationMessage_Response, error) {
any, _, err := marshalToProtoAny[*pbpeering.PeeringServerAddresses](update.Result)
if err != nil {
return nil, fmt.Errorf("failed to marshal: %w", err)
}
return &pbpeerstream.ReplicationMessage_Response{
ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses,
// TODO(peering): Nonce management
Nonce: "",
ResourceID: "server-addrs",
Operation: pbpeerstream.Operation_OPERATION_UPSERT,
Resource: any,
}, nil
}
// marshalToProtoAny takes any input and returns: // marshalToProtoAny takes any input and returns:
// the protobuf.Any type, the asserted T type, and any errors // the protobuf.Any type, the asserted T type, and any errors
// during marshalling or type assertion. // during marshalling or type assertion.
@ -127,7 +144,6 @@ func (s *Server) processResponse(
partition string, partition string,
mutableStatus *MutableStatus, mutableStatus *MutableStatus,
resp *pbpeerstream.ReplicationMessage_Response, resp *pbpeerstream.ReplicationMessage_Response,
logger hclog.Logger,
) (*pbpeerstream.ReplicationMessage, error) { ) (*pbpeerstream.ReplicationMessage, error) {
if !pbpeerstream.KnownTypeURL(resp.ResourceURL) { if !pbpeerstream.KnownTypeURL(resp.ResourceURL) {
err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL) err := fmt.Errorf("received response for unknown resource type %q", resp.ResourceURL)
@ -151,7 +167,7 @@ func (s *Server) processResponse(
), err ), err
} }
if err := s.handleUpsert(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, resp.Resource, logger); err != nil { if err := s.handleUpsert(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, resp.Resource); err != nil {
return makeNACKReply( return makeNACKReply(
resp.ResourceURL, resp.ResourceURL,
resp.Nonce, resp.Nonce,
@ -163,7 +179,7 @@ func (s *Server) processResponse(
return makeACKReply(resp.ResourceURL, resp.Nonce), nil return makeACKReply(resp.ResourceURL, resp.Nonce), nil
case pbpeerstream.Operation_OPERATION_DELETE: case pbpeerstream.Operation_OPERATION_DELETE:
if err := s.handleDelete(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID, logger); err != nil { if err := s.handleDelete(peerName, partition, mutableStatus, resp.ResourceURL, resp.ResourceID); err != nil {
return makeNACKReply( return makeNACKReply(
resp.ResourceURL, resp.ResourceURL,
resp.Nonce, resp.Nonce,
@ -196,7 +212,6 @@ func (s *Server) handleUpsert(
resourceURL string, resourceURL string,
resourceID string, resourceID string,
resource *anypb.Any, resource *anypb.Any,
logger hclog.Logger,
) error { ) error {
if resource.TypeUrl != resourceURL { if resource.TypeUrl != resourceURL {
return fmt.Errorf("mismatched resourceURL %q and Any typeUrl %q", resourceURL, resource.TypeUrl) return fmt.Errorf("mismatched resourceURL %q and Any typeUrl %q", resourceURL, resource.TypeUrl)
@ -229,12 +244,20 @@ func (s *Server) handleUpsert(
return s.handleUpsertRoots(peerName, partition, roots) return s.handleUpsertRoots(peerName, partition, roots)
case pbpeerstream.TypeURLPeeringServerAddresses:
addrs := &pbpeering.PeeringServerAddresses{}
if err := resource.UnmarshalTo(addrs); err != nil {
return fmt.Errorf("failed to unmarshal resource: %w", err)
}
return s.handleUpsertServerAddrs(peerName, partition, addrs)
default: default:
return fmt.Errorf("unexpected resourceURL: %s", resourceURL) return fmt.Errorf("unexpected resourceURL: %s", resourceURL)
} }
} }
// handleUpdateService handles both deletion and upsert events for a service. // handleUpdateService handles both deletion and upsert events for a service.
//
// On an UPSERT event: // On an UPSERT event:
// - All nodes, services, checks in the input pbNodes are re-applied through Raft. // - All nodes, services, checks in the input pbNodes are re-applied through Raft.
// - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted. // - Any nodes, services, or checks in the catalog that were not in the input pbNodes get deleted.
@ -449,13 +472,39 @@ func (s *Server) handleUpsertRoots(
return s.Backend.PeeringTrustBundleWrite(req) return s.Backend.PeeringTrustBundleWrite(req)
} }
func (s *Server) handleUpsertServerAddrs(
peerName string,
partition string,
addrs *pbpeering.PeeringServerAddresses,
) error {
q := state.Query{
Value: peerName,
EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(partition),
}
_, existing, err := s.GetStore().PeeringRead(nil, q)
if err != nil {
return fmt.Errorf("failed to read peering: %w", err)
}
if existing == nil || !existing.IsActive() {
return fmt.Errorf("peering does not exist or has been marked for deletion")
}
// Clone to avoid mutating the existing data
p := proto.Clone(existing).(*pbpeering.Peering)
p.PeerServerAddresses = addrs.GetAddresses()
req := &pbpeering.PeeringWriteRequest{
Peering: p,
}
return s.Backend.PeeringWrite(req)
}
func (s *Server) handleDelete( func (s *Server) handleDelete(
peerName string, peerName string,
partition string, partition string,
mutableStatus *MutableStatus, mutableStatus *MutableStatus,
resourceURL string, resourceURL string,
resourceID string, resourceID string,
logger hclog.Logger,
) error { ) error {
switch resourceURL { switch resourceURL {
case pbpeerstream.TypeURLExportedService: case pbpeerstream.TypeURLExportedService:

View File

@ -105,6 +105,7 @@ type Backend interface {
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
CatalogRegister(req *structs.RegisterRequest) error CatalogRegister(req *structs.RegisterRequest) error
CatalogDeregister(req *structs.DeregisterRequest) error CatalogDeregister(req *structs.DeregisterRequest) error
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
} }
// StateStore provides a read-only interface for querying Peering data. // StateStore provides a read-only interface for querying Peering data.

View File

@ -161,8 +161,22 @@ func (s *Server) StreamResources(stream pbpeerstream.PeerStreamService_StreamRes
if p == nil { if p == nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID) return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
} }
if !p.IsActive() {
// If peering is terminated, then our peer sent the termination message.
// For other non-active states, send the termination message.
if p.State != pbpeering.PeeringState_TERMINATED {
term := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Terminated_{
Terminated: &pbpeerstream.ReplicationMessage_Terminated{},
},
}
logTraceSend(logger, term)
// TODO(peering): If the peering is marked as deleted, send a Terminated message and return // we don't care if send fails; stream will be killed by termination message or grpc error
_ = stream.Send(term)
}
return grpcstatus.Error(codes.Aborted, "peering is marked as deleted: "+req.PeerID)
}
secrets, err := s.GetStore().PeeringSecretsRead(nil, req.PeerID) secrets, err := s.GetStore().PeeringSecretsRead(nil, req.PeerID)
if err != nil { if err != nil {
@ -347,6 +361,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
for _, resourceURL := range []string{ for _, resourceURL := range []string{
pbpeerstream.TypeURLExportedService, pbpeerstream.TypeURLExportedService,
pbpeerstream.TypeURLPeeringTrustBundle, pbpeerstream.TypeURLPeeringTrustBundle,
pbpeerstream.TypeURLPeeringServerAddresses,
} { } {
sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{ sub := makeReplicationRequest(&pbpeerstream.ReplicationMessage_Request{
ResourceURL: resourceURL, ResourceURL: resourceURL,
@ -544,14 +559,11 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// At this point we have a valid ResourceURL and we are subscribed to it. // At this point we have a valid ResourceURL and we are subscribed to it.
switch { switch {
case req.ResponseNonce == "" && req.Error != nil: case req.Error == nil: // ACK
return grpcstatus.Error(codes.InvalidArgument, "initial subscription request for a resource type must not contain an error")
case req.ResponseNonce != "" && req.Error == nil: // ACK
// TODO(peering): handle ACK fully // TODO(peering): handle ACK fully
status.TrackAck() status.TrackAck()
case req.ResponseNonce != "" && req.Error != nil: // NACK case req.Error != nil: // NACK
// TODO(peering): handle NACK fully // TODO(peering): handle NACK fully
logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message) logger.Warn("client peer was unable to apply resource", "code", req.Error.Code, "error", req.Error.Message)
status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message)) status.TrackNack(fmt.Sprintf("client peer was unable to apply resource: %s", req.Error.Message))
@ -567,7 +579,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
if resp := msg.GetResponse(); resp != nil { if resp := msg.GetResponse(); resp != nil {
// TODO(peering): Ensure there's a nonce // TODO(peering): Ensure there's a nonce
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger) reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp)
if err != nil { if err != nil {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
status.TrackRecvError(err.Error()) status.TrackRecvError(err.Error())
@ -613,7 +625,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
var resp *pbpeerstream.ReplicationMessage_Response var resp *pbpeerstream.ReplicationMessage_Response
switch { switch {
case strings.HasPrefix(update.CorrelationID, subExportedService): case strings.HasPrefix(update.CorrelationID, subExportedService):
resp, err = makeServiceResponse(logger, status, update) resp, err = makeServiceResponse(status, update)
if err != nil { if err != nil {
// Log the error and skip this response to avoid locking up peering due to a bad update event. // Log the error and skip this response to avoid locking up peering due to a bad update event.
logger.Error("failed to create service response", "error", err) logger.Error("failed to create service response", "error", err)
@ -624,13 +636,20 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
// TODO(Peering): figure out how to sync this separately // TODO(Peering): figure out how to sync this separately
case update.CorrelationID == subCARoot: case update.CorrelationID == subCARoot:
resp, err = makeCARootsResponse(logger, update) resp, err = makeCARootsResponse(update)
if err != nil { if err != nil {
// Log the error and skip this response to avoid locking up peering due to a bad update event. // Log the error and skip this response to avoid locking up peering due to a bad update event.
logger.Error("failed to create ca roots response", "error", err) logger.Error("failed to create ca roots response", "error", err)
continue continue
} }
case update.CorrelationID == subServerAddrs:
resp, err = makeServerAddrsResponse(update)
if err != nil {
logger.Error("failed to create server address response", "error", err)
continue
}
default: default:
logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID) logger.Warn("unrecognized update type from subscription manager: " + update.CorrelationID)
continue continue
@ -641,6 +660,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
replResp := makeReplicationResponse(resp) replResp := makeReplicationResponse(resp)
if err := streamSend(replResp); err != nil { if err := streamSend(replResp); err != nil {
// note: govet warns of context leak but it is cleaned up in a defer
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err) return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
} }
} }

View File

@ -126,7 +126,7 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
// Receive a subscription from a peer. This message arrives while the // Receive a subscription from a peer. This message arrives while the
// server is a leader and should work. // server is a leader and should work.
testutil.RunStep(t, "send subscription request to leader and consume its two requests", func(t *testing.T) { testutil.RunStep(t, "send subscription request to leader and consume its three requests", func(t *testing.T) {
sub := &pbpeerstream.ReplicationMessage{ sub := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Open_{ Payload: &pbpeerstream.ReplicationMessage_Open_{
Open: &pbpeerstream.ReplicationMessage_Open{ Open: &pbpeerstream.ReplicationMessage_Open{
@ -145,6 +145,10 @@ func TestStreamResources_Server_LeaderBecomesFollower(t *testing.T) {
msg2, err := client.Recv() msg2, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
require.NotEmpty(t, msg2) require.NotEmpty(t, msg2)
msg3, err := client.Recv()
require.NoError(t, err)
require.NotEmpty(t, msg3)
}) })
// The ACK will be a new request but at this point the server is not the // The ACK will be a new request but at this point the server is not the
@ -1126,7 +1130,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) {
} }
srv, store := newTestServer(t, func(c *Config) { srv, store := newTestServer(t, func(c *Config) {
c.incomingHeartbeatTimeout = 5 * time.Millisecond c.incomingHeartbeatTimeout = 50 * time.Millisecond
}) })
srv.Tracker.setClock(it.Now) srv.Tracker.setClock(it.Now)
@ -1312,7 +1316,7 @@ func TestStreamResources_Server_KeepsConnectionOpenWithHeartbeat(t *testing.T) {
// makeClient sets up a *MockClient with the initial subscription // makeClient sets up a *MockClient with the initial subscription
// message handshake. // message handshake.
func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID string) *MockClient { func makeClient(t *testing.T, srv *testServer, peerID string) *MockClient {
t.Helper() t.Helper()
client := NewMockClient(context.Background()) client := NewMockClient(context.Background())
@ -1324,7 +1328,7 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv(). // Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server. // This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.ReplicationStream); err != nil { if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- srv.StreamResources(client.ReplicationStream) errCh <- err
} }
}() }()
@ -1343,11 +1347,19 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
require.NoError(t, err) require.NoError(t, err)
receivedSub2, err := client.Recv() receivedSub2, err := client.Recv()
require.NoError(t, err) require.NoError(t, err)
receivedSub3, err := client.Recv()
require.NoError(t, err)
// Issue a services and roots subscription pair to server // This is required when the client subscribes to server address replication messages.
// We assert for the handler to be called at least once but the data doesn't matter.
srv.mockSnapshotHandler.expect("", 0, 0, nil)
// Issue services, roots, and server address subscription to server.
// Note that server address may not come as an initial message
for _, resourceURL := range []string{ for _, resourceURL := range []string{
pbpeerstream.TypeURLExportedService, pbpeerstream.TypeURLExportedService,
pbpeerstream.TypeURLPeeringTrustBundle, pbpeerstream.TypeURLPeeringTrustBundle,
pbpeerstream.TypeURLPeeringServerAddresses,
} { } {
init := &pbpeerstream.ReplicationMessage{ init := &pbpeerstream.ReplicationMessage{
Payload: &pbpeerstream.ReplicationMessage_Request_{ Payload: &pbpeerstream.ReplicationMessage_Request_{
@ -1383,10 +1395,22 @@ func makeClient(t *testing.T, srv pbpeerstream.PeerStreamServiceServer, peerID s
}, },
}, },
}, },
{
Payload: &pbpeerstream.ReplicationMessage_Request_{
Request: &pbpeerstream.ReplicationMessage_Request{
ResourceURL: pbpeerstream.TypeURLPeeringServerAddresses,
// The PeerID field is only set for the messages coming FROM
// the establishing side and are going to be empty from the
// other side.
PeerID: "",
},
},
},
} }
got := []*pbpeerstream.ReplicationMessage{ got := []*pbpeerstream.ReplicationMessage{
receivedSub1, receivedSub1,
receivedSub2, receivedSub2,
receivedSub3,
} }
prototest.AssertElementsMatch(t, expect, got) prototest.AssertElementsMatch(t, expect, got)
@ -1443,6 +1467,10 @@ func (b *testStreamBackend) PeeringSecretsWrite(req *pbpeering.SecretsWriteReque
return b.store.PeeringSecretsWrite(1, req) return b.store.PeeringSecretsWrite(1, req)
} }
func (b *testStreamBackend) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
return b.store.PeeringWrite(1, req)
}
// CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister. // CatalogRegister mocks catalog registrations through Raft by copying the logic of FSM.applyRegister.
func (b *testStreamBackend) CatalogRegister(req *structs.RegisterRequest) error { func (b *testStreamBackend) CatalogRegister(req *structs.RegisterRequest) error {
return b.store.EnsureRegistration(1, req) return b.store.EnsureRegistration(1, req)
@ -1496,7 +1524,7 @@ func Test_makeServiceResponse_ExportedServicesCount(t *testing.T) {
}, },
}, },
}} }}
_, err := makeServiceResponse(srv.Logger, mst, update) _, err := makeServiceResponse(mst, update)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 1, mst.GetExportedServicesCount()) require.Equal(t, 1, mst.GetExportedServicesCount())
@ -1508,7 +1536,7 @@ func Test_makeServiceResponse_ExportedServicesCount(t *testing.T) {
Result: &pbservice.IndexedCheckServiceNodes{ Result: &pbservice.IndexedCheckServiceNodes{
Nodes: []*pbservice.CheckServiceNode{}, Nodes: []*pbservice.CheckServiceNode{},
}} }}
_, err := makeServiceResponse(srv.Logger, mst, update) _, err := makeServiceResponse(mst, update)
require.NoError(t, err) require.NoError(t, err)
require.Equal(t, 0, mst.GetExportedServicesCount()) require.Equal(t, 0, mst.GetExportedServicesCount())
@ -1539,7 +1567,7 @@ func Test_processResponse_Validation(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
run := func(t *testing.T, tc testCase) { run := func(t *testing.T, tc testCase) {
reply, err := srv.processResponse(peerName, "", mst, tc.in, srv.Logger) reply, err := srv.processResponse(peerName, "", mst, tc.in)
if tc.wantErr { if tc.wantErr {
require.Error(t, err) require.Error(t, err)
} else { } else {
@ -1865,7 +1893,7 @@ func Test_processResponse_handleUpsert_handleDelete(t *testing.T) {
} }
// Simulate an update arriving for billing/api. // Simulate an update arriving for billing/api.
_, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in, srv.Logger) _, err = srv.processResponse(peerName, acl.DefaultPartitionName, mst, in)
require.NoError(t, err) require.NoError(t, err)
for svc, expect := range tc.expect { for svc, expect := range tc.expect {
@ -2731,11 +2759,16 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes)
type testServer struct { type testServer struct {
*Server *Server
// mockSnapshotHandler is solely used for handling autopilot events
// which don't come from the state store.
mockSnapshotHandler *mockSnapshotHandler
} }
func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) { func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.Store) {
t.Helper()
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store, handler := newStateStore(t, publisher)
ports := freeport.GetN(t, 1) // {grpc} ports := freeport.GetN(t, 1) // {grpc}
@ -2772,6 +2805,7 @@ func newTestServer(t *testing.T, configFn func(c *Config)) (*testServer, *state.
return &testServer{ return &testServer{
Server: srv, Server: srv,
mockSnapshotHandler: handler,
}, store }, store
} }

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strconv"
"strings" "strings"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
@ -12,6 +13,7 @@ import (
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/autopilotevents"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -42,6 +44,7 @@ type subscriptionManager struct {
getStore func() StateStore getStore func() StateStore
serviceSubReady <-chan struct{} serviceSubReady <-chan struct{}
trustBundlesSubReady <-chan struct{} trustBundlesSubReady <-chan struct{}
serverAddrsSubReady <-chan struct{}
} }
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering. // TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
@ -67,6 +70,7 @@ func newSubscriptionManager(
getStore: getStore, getStore: getStore,
serviceSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLExportedService), serviceSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLExportedService),
trustBundlesSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringTrustBundle), trustBundlesSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringTrustBundle),
serverAddrsSubReady: remoteSubTracker.SubscribedChan(pbpeerstream.TypeURLPeeringServerAddresses),
} }
} }
@ -83,6 +87,7 @@ func (m *subscriptionManager) subscribe(ctx context.Context, peerID, peerName, p
// Wrap our bare state store queries in goroutines that emit events. // Wrap our bare state store queries in goroutines that emit events.
go m.notifyExportedServicesForPeerID(ctx, state, peerID) go m.notifyExportedServicesForPeerID(ctx, state, peerID)
go m.notifyServerAddrUpdates(ctx, state.updateCh)
if m.config.ConnectEnabled { if m.config.ConnectEnabled {
go m.notifyMeshGatewaysForPartition(ctx, state, state.partition) go m.notifyMeshGatewaysForPartition(ctx, state, state.partition)
// If connect is enabled, watch for updates to CA roots. // If connect is enabled, watch for updates to CA roots.
@ -262,6 +267,17 @@ func (m *subscriptionManager) handleEvent(ctx context.Context, state *subscripti
state.sendPendingEvents(ctx, m.logger, pending) state.sendPendingEvents(ctx, m.logger, pending)
case u.CorrelationID == subServerAddrs:
addrs, ok := u.Result.(*pbpeering.PeeringServerAddresses)
if !ok {
return fmt.Errorf("invalid type for response: %T", u.Result)
}
pending := &pendingPayload{}
if err := pending.Add(serverAddrsPayloadID, u.CorrelationID, addrs); err != nil {
return err
}
state.sendPendingEvents(ctx, m.logger, pending)
default: default:
return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID) return fmt.Errorf("unknown correlation ID: %s", u.CorrelationID)
} }
@ -333,6 +349,8 @@ func (m *subscriptionManager) notifyRootCAUpdatesForPartition(
} }
} }
const subCARoot = "roots"
// subscribeCARoots subscribes to state.EventTopicCARoots for changes to CA roots. // subscribeCARoots subscribes to state.EventTopicCARoots for changes to CA roots.
// Upon receiving an event it will send the payload in updateCh. // Upon receiving an event it will send the payload in updateCh.
func (m *subscriptionManager) subscribeCARoots( func (m *subscriptionManager) subscribeCARoots(
@ -414,8 +432,6 @@ func (m *subscriptionManager) subscribeCARoots(
} }
} }
const subCARoot = "roots"
func (m *subscriptionManager) syncNormalServices( func (m *subscriptionManager) syncNormalServices(
ctx context.Context, ctx context.Context,
state *subscriptionState, state *subscriptionState,
@ -721,3 +737,112 @@ const syntheticProxyNameSuffix = "-sidecar-proxy"
func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName { func generateProxyNameForDiscoveryChain(sn structs.ServiceName) structs.ServiceName {
return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta) return structs.NewServiceName(sn.Name+syntheticProxyNameSuffix, &sn.EnterpriseMeta)
} }
const subServerAddrs = "server-addrs"
func (m *subscriptionManager) notifyServerAddrUpdates(
ctx context.Context,
updateCh chan<- cache.UpdateEvent,
) {
// Wait until this is subscribed-to.
select {
case <-m.serverAddrsSubReady:
case <-ctx.Done():
return
}
var idx uint64
// TODO(peering): retry logic; fail past a threshold
for {
var err error
// Typically, this function will block inside `m.subscribeServerAddrs` and only return on error.
// Errors are logged and the watch is retried.
idx, err = m.subscribeServerAddrs(ctx, idx, updateCh)
if errors.Is(err, stream.ErrSubForceClosed) {
m.logger.Trace("subscription force-closed due to an ACL change or snapshot restore, will attempt resume")
} else if !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
m.logger.Warn("failed to subscribe to server addresses, will attempt resume", "error", err.Error())
} else {
m.logger.Trace(err.Error())
}
select {
case <-ctx.Done():
return
default:
}
}
}
func (m *subscriptionManager) subscribeServerAddrs(
ctx context.Context,
idx uint64,
updateCh chan<- cache.UpdateEvent,
) (uint64, error) {
// following code adapted from serverdiscovery/watch_servers.go
sub, err := m.backend.Subscribe(&stream.SubscribeRequest{
Topic: autopilotevents.EventTopicReadyServers,
Subject: stream.SubjectNone,
Token: "", // using anonymous token for now
Index: idx,
})
if err != nil {
return 0, fmt.Errorf("failed to subscribe to ReadyServers events: %w", err)
}
defer sub.Unsubscribe()
for {
event, err := sub.Next(ctx)
switch {
case errors.Is(err, context.Canceled):
return 0, err
case err != nil:
return idx, err
}
// We do not send framing events (e.g. EndOfSnapshot, NewSnapshotToFollow)
// because we send a full list of ready servers on every event, rather than expecting
// clients to maintain a state-machine in the way they do for service health.
if event.IsFramingEvent() {
continue
}
// Note: this check isn't strictly necessary because the event publishing
// machinery will ensure the index increases monotonically, but it can be
// tricky to faithfully reproduce this in tests (e.g. the EventPublisher
// garbage collects topic buffers and snapshots aggressively when streams
// disconnect) so this avoids a bunch of confusing setup code.
if event.Index <= idx {
continue
}
idx = event.Index
payload, ok := event.Payload.(autopilotevents.EventPayloadReadyServers)
if !ok {
return 0, fmt.Errorf("unexpected event payload type: %T", payload)
}
var serverAddrs = make([]string, 0, len(payload))
for _, srv := range payload {
if srv.ExtGRPCPort == 0 {
continue
}
grpcAddr := srv.Address + ":" + strconv.Itoa(srv.ExtGRPCPort)
serverAddrs = append(serverAddrs, grpcAddr)
}
if len(serverAddrs) == 0 {
m.logger.Warn("did not find any server addresses with external gRPC ports to publish")
continue
}
updateCh <- cache.UpdateEvent{
CorrelationID: subServerAddrs,
Result: &pbpeering.PeeringServerAddresses{
Addresses: serverAddrs,
},
}
}
}

View File

@ -3,14 +3,17 @@ package peerstream
import ( import (
"context" "context"
"sort" "sort"
"sync"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache" "github.com/hashicorp/consul/agent/cache"
"github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/connect"
"github.com/hashicorp/consul/agent/consul/autopilotevents"
"github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/structs"
@ -627,20 +630,100 @@ func TestSubscriptionManager_CARoots(t *testing.T) {
}) })
} }
func TestSubscriptionManager_ServerAddrs(t *testing.T) {
backend := newTestSubscriptionBackend(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
// Create a peering
_, id := backend.ensurePeering(t, "my-peering")
partition := acl.DefaultEnterpriseMeta().PartitionOrEmpty()
payload := autopilotevents.EventPayloadReadyServers{
autopilotevents.ReadyServerInfo{
ID: "9aeb73f6-e83e-43c1-bdc9-ca5e43efe3e4",
Address: "198.18.0.1",
Version: "1.13.1",
ExtGRPCPort: 8502,
},
}
// mock handler only gets called once during the initial subscription
backend.handler.expect("", 0, 1, payload)
// Only configure a tracker for server address events.
tracker := newResourceSubscriptionTracker()
tracker.Subscribe(pbpeerstream.TypeURLPeeringServerAddresses)
mgr := newSubscriptionManager(ctx,
testutil.Logger(t),
Config{
Datacenter: "dc1",
ConnectEnabled: true,
},
connect.TestTrustDomain,
backend,
func() StateStore {
return backend.store
},
tracker)
subCh := mgr.subscribe(ctx, id, "my-peering", partition)
testutil.RunStep(t, "initial events", func(t *testing.T) {
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, subServerAddrs, got.CorrelationID)
addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses)
require.True(t, ok)
require.Equal(t, []string{"198.18.0.1:8502"}, addrs.GetAddresses())
},
)
})
testutil.RunStep(t, "added server", func(t *testing.T) {
payload = append(payload, autopilotevents.ReadyServerInfo{
ID: "eec8721f-c42b-48da-a5a5-07565158015e",
Address: "198.18.0.2",
Version: "1.13.1",
ExtGRPCPort: 9502,
})
backend.Publish([]stream.Event{
{
Topic: autopilotevents.EventTopicReadyServers,
Index: 2,
Payload: payload,
},
})
expectEvents(t, subCh,
func(t *testing.T, got cache.UpdateEvent) {
require.Equal(t, subServerAddrs, got.CorrelationID)
addrs, ok := got.Result.(*pbpeering.PeeringServerAddresses)
require.True(t, ok)
require.Equal(t, []string{"198.18.0.1:8502", "198.18.0.2:9502"}, addrs.GetAddresses())
},
)
})
}
type testSubscriptionBackend struct { type testSubscriptionBackend struct {
state.EventPublisher state.EventPublisher
store *state.Store store *state.Store
handler *mockSnapshotHandler
lastIdx uint64 lastIdx uint64
} }
func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend { func newTestSubscriptionBackend(t *testing.T) *testSubscriptionBackend {
publisher := stream.NewEventPublisher(10 * time.Second) publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher) store, handler := newStateStore(t, publisher)
backend := &testSubscriptionBackend{ backend := &testSubscriptionBackend{
EventPublisher: publisher, EventPublisher: publisher,
store: store, store: store,
handler: handler,
} }
backend.ensureCAConfig(t, &structs.CAConfiguration{ backend.ensureCAConfig(t, &structs.CAConfiguration{
@ -739,20 +822,35 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6
return p.ID return p.ID
} }
func newStateStore(t *testing.T, publisher *stream.EventPublisher) *state.Store { func newStateStore(t *testing.T, publisher *stream.EventPublisher) (*state.Store, *mockSnapshotHandler) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
gc, err := state.NewTombstoneGC(time.Second, time.Millisecond) gc, err := state.NewTombstoneGC(time.Second, time.Millisecond)
require.NoError(t, err) require.NoError(t, err)
handler := newMockSnapshotHandler(t)
store := state.NewStateStoreWithEventPublisher(gc, publisher) store := state.NewStateStoreWithEventPublisher(gc, publisher)
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false)) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealth, store.ServiceHealthSnapshot, false))
require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot, false)) require.NoError(t, publisher.RegisterHandler(state.EventTopicServiceHealthConnect, store.ServiceHealthSnapshot, false))
require.NoError(t, publisher.RegisterHandler(state.EventTopicCARoots, store.CARootsSnapshot, false)) require.NoError(t, publisher.RegisterHandler(state.EventTopicCARoots, store.CARootsSnapshot, false))
go publisher.Run(ctx) require.NoError(t, publisher.RegisterHandler(autopilotevents.EventTopicReadyServers, handler.handle, false))
return store // WaitGroup used to make sure that the publisher returns
// before handler's t.Cleanup is called (otherwise an event
// might fire during an assertion and cause a data race).
var wg sync.WaitGroup
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(func() {
cancel()
wg.Wait()
})
wg.Add(1)
go func() {
publisher.Run(ctx)
wg.Done()
}()
return store, handler
} }
func expectEvents( func expectEvents(
@ -870,3 +968,39 @@ func pbCheck(node, svcID, svcName, status string, entMeta *pbcommon.EnterpriseMe
EnterpriseMeta: entMeta, EnterpriseMeta: entMeta,
} }
} }
// mockSnapshotHandler is copied from server_discovery/server_test.go
type mockSnapshotHandler struct {
mock.Mock
}
func newMockSnapshotHandler(t *testing.T) *mockSnapshotHandler {
handler := &mockSnapshotHandler{}
t.Cleanup(func() {
handler.AssertExpectations(t)
})
return handler
}
func (m *mockSnapshotHandler) handle(req stream.SubscribeRequest, buf stream.SnapshotAppender) (uint64, error) {
ret := m.Called(req, buf)
return ret.Get(0).(uint64), ret.Error(1)
}
func (m *mockSnapshotHandler) expect(token string, requestIndex uint64, eventIndex uint64, payload autopilotevents.EventPayloadReadyServers) {
m.On("handle", stream.SubscribeRequest{
Topic: autopilotevents.EventTopicReadyServers,
Subject: stream.SubjectNone,
Token: token,
Index: requestIndex,
}, mock.Anything).Run(func(args mock.Arguments) {
buf := args.Get(1).(stream.SnapshotAppender)
buf.Append([]stream.Event{
{
Topic: autopilotevents.EventTopicReadyServers,
Index: eventIndex,
Payload: payload,
},
})
}).Return(eventIndex, nil)
}

View File

@ -93,6 +93,9 @@ func (s *subscriptionState) cleanupEventVersions(logger hclog.Logger) {
case id == caRootsPayloadID: case id == caRootsPayloadID:
keep = true keep = true
case id == serverAddrsPayloadID:
keep = true
case strings.HasPrefix(id, servicePayloadIDPrefix): case strings.HasPrefix(id, servicePayloadIDPrefix):
name := strings.TrimPrefix(id, servicePayloadIDPrefix) name := strings.TrimPrefix(id, servicePayloadIDPrefix)
sn := structs.ServiceNameFromString(name) sn := structs.ServiceNameFromString(name)
@ -129,6 +132,7 @@ type pendingEvent struct {
} }
const ( const (
serverAddrsPayloadID = "server-addrs"
caRootsPayloadID = "roots" caRootsPayloadID = "roots"
meshGatewayPayloadID = "mesh-gateway" meshGatewayPayloadID = "mesh-gateway"
servicePayloadIDPrefix = "service:" servicePayloadIDPrefix = "service:"

View File

@ -107,6 +107,16 @@ func (msg *PeeringTrustBundle) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg) return proto.Unmarshal(b, msg)
} }
// MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringServerAddresses) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg)
}
// UnmarshalBinary implements encoding.BinaryUnmarshaler
func (msg *PeeringServerAddresses) UnmarshalBinary(b []byte) error {
return proto.Unmarshal(b, msg)
}
// MarshalBinary implements encoding.BinaryMarshaler // MarshalBinary implements encoding.BinaryMarshaler
func (msg *PeeringReadRequest) MarshalBinary() ([]byte, error) { func (msg *PeeringReadRequest) MarshalBinary() ([]byte, error) {
return proto.Marshal(msg) return proto.Marshal(msg)

File diff suppressed because it is too large Load Diff

View File

@ -225,6 +225,12 @@ message PeeringTrustBundle {
uint64 ModifyIndex = 7; uint64 ModifyIndex = 7;
} }
// PeeringServerAddresses contains the latest snapshot of all known
// server addresses for a peer.
message PeeringServerAddresses {
repeated string Addresses = 1;
}
// @consul-rpc-glue: LeaderReadTODO // @consul-rpc-glue: LeaderReadTODO
message PeeringReadRequest { message PeeringReadRequest {
string Name = 1; string Name = 1;

View File

@ -5,11 +5,12 @@ const (
TypeURLExportedService = apiTypePrefix + "hashicorp.consul.internal.peerstream.ExportedService" TypeURLExportedService = apiTypePrefix + "hashicorp.consul.internal.peerstream.ExportedService"
TypeURLPeeringTrustBundle = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringTrustBundle" TypeURLPeeringTrustBundle = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringTrustBundle"
TypeURLPeeringServerAddresses = apiTypePrefix + "hashicorp.consul.internal.peering.PeeringServerAddresses"
) )
func KnownTypeURL(s string) bool { func KnownTypeURL(s string) bool {
switch s { switch s {
case TypeURLExportedService, TypeURLPeeringTrustBundle: case TypeURLExportedService, TypeURLPeeringTrustBundle, TypeURLPeeringServerAddresses:
return true return true
} }
return false return false