[OSS] Add upsert handling for receiving CheckServiceNode (#13061)

This commit is contained in:
Freddy 2022-05-12 15:04:44 -06:00 committed by GitHub
parent b788691fa6
commit 4e215dc411
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 628 additions and 85 deletions

View File

@ -100,6 +100,10 @@ func (m *EnterpriseMeta) UnsetPartition() {
// do nothing
}
func (m *EnterpriseMeta) OverridePartition(_ string) {
// do nothing
}
func NewEnterpriseMetaWithPartition(_, _ string) EnterpriseMeta {
return emptyEnterpriseMeta
}

View File

@ -112,17 +112,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
n := node.(*structs.Node)
nodeEntMeta := n.GetEnterpriseMeta()
req := structs.RegisterRequest{
ID: n.ID,
Node: n.Node,
Datacenter: n.Datacenter,
Address: n.Address,
TaggedAddresses: n.TaggedAddresses,
NodeMeta: n.Meta,
RaftIndex: n.RaftIndex,
EnterpriseMeta: *nodeEntMeta,
PeerName: n.PeerName,
}
req := n.ToRegisterRequest()
// Register the node itself
if _, err := sink.Write([]byte{byte(structs.RegisterRequestType)}); err != nil {

View File

@ -8,6 +8,7 @@ import (
"fmt"
"net"
"github.com/hashicorp/consul/agent/rpc/peering"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
@ -207,7 +208,13 @@ func (s *Server) establishStream(ctx context.Context, logger hclog.Logger, peer
return err
}
err = s.peeringService.HandleStream(peer.ID, peer.PeerID, stream)
err = s.peeringService.HandleStream(peering.HandleStreamRequest{
LocalID: peer.ID,
RemoteID: peer.PeerID,
PeerName: peer.Name,
Partition: peer.Partition,
Stream: stream,
})
if err == nil {
// This will cancel the retry-er context, letting us break out of this loop when we want to shut down the stream.
cancel()

View File

@ -123,4 +123,9 @@ func (a *peeringApply) PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDR
return err
}
func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error {
_, err := a.srv.leaderRaftApply("Catalog.Register", structs.RegisterRequestType, req)
return err
}
var _ peering.Apply = (*peeringApply)(nil)

View File

@ -30,6 +30,7 @@ import (
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/pbstatus"
"github.com/hashicorp/consul/types"
)
var (
@ -105,6 +106,7 @@ type Backend interface {
// Store provides a read-only interface for querying Peering data.
type Store interface {
PeeringRead(ws memdb.WatchSet, q state.Query) (uint64, *pbpeering.Peering, error)
PeeringReadByID(ws memdb.WatchSet, id string) (uint64, *pbpeering.Peering, error)
PeeringList(ws memdb.WatchSet, entMeta acl.EnterpriseMeta) (uint64, []*pbpeering.Peering, error)
ExportedServicesForPeer(ws memdb.WatchSet, peerID string) (uint64, []structs.ServiceName, error)
AbandonCh() <-chan struct{}
@ -115,6 +117,7 @@ type Apply interface {
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
PeeringDelete(req *pbpeering.PeeringDeleteRequest) error
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
CatalogRegister(req *structs.RegisterRequest) error
}
// GenerateToken implements the PeeringService RPC method to generate a
@ -405,44 +408,75 @@ func (s *Service) StreamResources(stream pbpeering.PeeringService_StreamResource
return grpcstatus.Error(codes.InvalidArgument, fmt.Sprintf("subscription request to unknown resource URL: %s", req.ResourceURL))
}
// TODO(peering): Validate that a peering exists for this peer
_, p, err := s.Backend.Store().PeeringReadByID(nil, req.PeerID)
if err != nil {
s.logger.Error("failed to look up peer", "peer_id", req.PeerID, "error", err)
return grpcstatus.Error(codes.Internal, "failed to find PeerID: "+req.PeerID)
}
if p == nil {
return grpcstatus.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: "+req.PeerID)
}
// TODO(peering): If the peering is marked as deleted, send a Terminated message and return
// TODO(peering): Store subscription request so that an event publisher can separately handle pushing messages for it
s.logger.Info("accepted initial replication request from peer", "peer_id", req.PeerID)
// For server peers both of these ID values are the same, because we generated a token with a local ID,
// and the client peer dials using that same ID.
return s.HandleStream(req.PeerID, req.PeerID, stream)
return s.HandleStream(HandleStreamRequest{
LocalID: req.PeerID,
RemoteID: req.PeerID,
PeerName: p.Name,
Partition: p.Partition,
Stream: stream,
})
}
type HandleStreamRequest struct {
// LocalID is the UUID for the peering in the local Consul datacenter.
LocalID string
// RemoteID is the UUID for the peering from the perspective of the peer.
RemoteID string
// PeerName is the name of the peering.
PeerName string
// Partition is the local partition associated with the peer.
Partition string
// Stream is the open stream to the peer cluster.
Stream BidirectionalStream
}
// The localID provided is the locally-generated identifier for the peering.
// The remoteID is an identifier that the remote peer recognizes for the peering.
func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStream) error {
logger := s.logger.Named("stream").With("peer_id", localID)
func (s *Service) HandleStream(req HandleStreamRequest) error {
logger := s.logger.Named("stream").With("peer_id", req.LocalID)
logger.Trace("handling stream for peer")
status, err := s.streams.connected(localID)
status, err := s.streams.connected(req.LocalID)
if err != nil {
return fmt.Errorf("failed to register stream: %v", err)
}
// TODO(peering) Also need to clear subscriptions associated with the peer
defer s.streams.disconnected(localID)
defer s.streams.disconnected(req.LocalID)
mgr := newSubscriptionManager(stream.Context(), logger, s.Backend)
subCh := mgr.subscribe(stream.Context(), localID)
mgr := newSubscriptionManager(req.Stream.Context(), logger, s.Backend)
subCh := mgr.subscribe(req.Stream.Context(), req.LocalID)
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
ResourceURL: pbpeering.TypeURLService,
PeerID: remoteID,
PeerID: req.RemoteID,
},
},
}
logTraceSend(logger, sub)
if err := stream.Send(sub); err != nil {
if err := req.Stream.Send(sub); err != nil {
if err == io.EOF {
logger.Info("stream ended by peer")
status.trackReceiveError(err.Error())
@ -458,7 +492,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
go func() {
defer close(recvChan)
for {
msg, err := stream.Recv()
msg, err := req.Stream.Recv()
if err == io.EOF {
logger.Info("stream ended by peer")
status.trackReceiveError(err.Error())
@ -494,13 +528,13 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
}
logTraceSend(logger, term)
if err := stream.Send(term); err != nil {
if err := req.Stream.Send(term); err != nil {
status.trackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err)
}
logger.Trace("deleting stream status")
s.streams.deleteStatus(localID)
s.streams.deleteStatus(req.LocalID)
return nil
@ -528,7 +562,8 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
}
if resp := msg.GetResponse(); resp != nil {
req, err := processResponse(resp)
// TODO(peering): Ensure there's a nonce
reply, err := s.processResponse(req.PeerName, req.Partition, resp)
if err != nil {
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
status.trackReceiveError(err.Error())
@ -536,8 +571,8 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
status.trackReceiveSuccess()
}
logTraceSend(logger, req)
if err := stream.Send(req); err != nil {
logTraceSend(logger, reply)
if err := req.Stream.Send(reply); err != nil {
status.trackSendError(err.Error())
return fmt.Errorf("failed to send to stream: %v", err)
}
@ -549,7 +584,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
logger.Info("received peering termination message, cleaning up imported resources")
// Once marked as terminated, a separate deferred deletion routine will clean up imported resources.
if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: localID}); err != nil {
if err := s.Backend.Apply().PeeringTerminateByID(&pbpeering.PeeringTerminateByIDRequest{ID: req.LocalID}); err != nil {
return err
}
return nil
@ -558,7 +593,7 @@ func (s *Service) HandleStream(localID, remoteID string, stream BidirectionalStr
case update := <-subCh:
switch {
case strings.HasPrefix(update.CorrelationID, subExportedService):
if err := pushServiceResponse(logger, stream, status, update); err != nil {
if err := pushServiceResponse(logger, req.Stream, status, update); err != nil {
return fmt.Errorf("failed to push data for %q: %w", update.CorrelationID, err)
}
@ -667,7 +702,7 @@ func makeReply(resourceURL, nonce string, errCode code.Code, errMsg string) *pbp
return msg
}
func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) {
func (s *Service) processResponse(peerName string, partition string, resp *pbpeering.ReplicationMessage_Response) (*pbpeering.ReplicationMessage, error) {
var (
err error
errCode code.Code
@ -682,7 +717,10 @@ func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.Re
switch resp.Operation {
case pbpeering.ReplicationMessage_Response_UPSERT:
err = handleUpsert(resp.ResourceURL, resp.Resource)
if resp.Resource == nil {
break
}
err = s.handleUpsert(peerName, partition, resp.ResourceURL, resp.ResourceID, resp.Resource)
if err != nil {
errCode = code.Code_INTERNAL
errMsg = err.Error()
@ -710,8 +748,90 @@ func processResponse(resp *pbpeering.ReplicationMessage_Response) (*pbpeering.Re
return makeReply(resp.ResourceURL, resp.Nonce, errCode, errMsg), err
}
func handleUpsert(resourceURL string, resource *anypb.Any) error {
// TODO(peering): implement
func (s *Service) handleUpsert(peerName string, partition string, resourceURL string, resourceID string, resource *anypb.Any) error {
csn := &pbservice.IndexedCheckServiceNodes{}
err := ptypes.UnmarshalAny(resource, csn)
if err != nil {
return fmt.Errorf("failed to unmarshal resource, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
if csn == nil || len(csn.Nodes) == 0 {
return nil
}
type checkTuple struct {
checkID types.CheckID
serviceID string
nodeID types.NodeID
acl.EnterpriseMeta
}
var (
nodes = make(map[types.NodeID]*structs.Node)
services = make(map[types.NodeID][]*structs.NodeService)
checks = make(map[types.NodeID]map[checkTuple]*structs.HealthCheck)
)
for _, pbinstance := range csn.Nodes {
instance, err := pbservice.CheckServiceNodeToStructs(pbinstance)
if err != nil {
return fmt.Errorf("failed to convert instance, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
nodes[instance.Node.ID] = instance.Node
services[instance.Node.ID] = append(services[instance.Node.ID], instance.Service)
if _, ok := checks[instance.Node.ID]; !ok {
checks[instance.Node.ID] = make(map[checkTuple]*structs.HealthCheck)
}
for _, c := range instance.Checks {
tuple := checkTuple{
checkID: c.CheckID,
serviceID: c.ServiceID,
nodeID: instance.Node.ID,
EnterpriseMeta: c.EnterpriseMeta,
}
checks[instance.Node.ID][tuple] = c
}
}
for nodeID, node := range nodes {
// For all nodes, services, and checks we override the peer name and partition to be
// the local partition and local name for the peer.
node.PeerName, node.Partition = peerName, partition
// First register the node
req := node.ToRegisterRequest()
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
// Then register all services on that node
for _, svc := range services[nodeID] {
svc.PeerName = peerName
svc.OverridePartition(partition)
req.Service = svc
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
}
req.Service = nil
// Then register all checks on that node
var chks structs.HealthChecks
for _, c := range checks[nodeID] {
c.PeerName = peerName
c.OverridePartition(partition)
chks = append(chks, c)
}
req.Checks = chks
if err := s.Backend.Apply().CatalogRegister(&req); err != nil {
return fmt.Errorf("failed to register, ResourceURL: %q, ResourceID: %q, err: %w", resourceURL, resourceID, err)
}
}
return nil
}

View File

@ -11,14 +11,18 @@ import (
"testing"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
gogrpc "google.golang.org/grpc"
"github.com/hashicorp/consul/agent/consul/state"
grpc "github.com/hashicorp/consul/agent/grpc/private"
"github.com/hashicorp/consul/agent/grpc/private/resolver"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbservice"
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/acl"
@ -307,6 +311,359 @@ func TestPeeringService_List(t *testing.T) {
prototest.AssertDeepEqual(t, expect, resp)
}
func Test_StreamHandler_UpsertServices(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}
type testCase struct {
name string
msg *pbpeering.ReplicationMessage_Response
input structs.CheckServiceNodes
expect structs.CheckServiceNodes
}
s := newTestServer(t, nil)
testrpc.WaitForLeader(t, s.Server.RPC, "dc1")
srv := peering.NewService(testutil.Logger(t), consul.NewPeeringBackend(s.Server, nil))
require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{
Name: "my-peer",
}))
_, p, err := s.Server.FSM().State().PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
client := peering.NewMockClient(context.Background())
errCh := make(chan error, 1)
client.ErrCh = errCh
go func() {
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
err := srv.StreamResources(client.ReplicationStream)
if err != nil {
errCh <- err
}
}()
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: p.ID,
ResourceURL: pbpeering.TypeURLService,
},
},
}
require.NoError(t, client.Send(sub))
// Receive subscription request from peer for our services
_, err = client.Recv()
require.NoError(t, err)
remoteEntMeta := structs.DefaultEnterpriseMetaInPartition("remote-partition")
localEntMeta := acl.DefaultEnterpriseMeta()
localPeerName := "my-peer"
// Scrub data we don't need for the assertions below.
scrubCheckServiceNodes := func(instances structs.CheckServiceNodes) {
for _, csn := range instances {
csn.Node.RaftIndex = structs.RaftIndex{}
csn.Service.TaggedAddresses = nil
csn.Service.Weights = nil
csn.Service.RaftIndex = structs.RaftIndex{}
csn.Service.Proxy = structs.ConnectProxyConfig{}
for _, c := range csn.Checks {
c.RaftIndex = structs.RaftIndex{}
c.Definition = structs.HealthCheckDefinition{}
}
}
}
run := func(t *testing.T, tc testCase) {
pbCSN := &pbservice.IndexedCheckServiceNodes{}
for _, csn := range tc.input {
pbCSN.Nodes = append(pbCSN.Nodes, pbservice.NewCheckServiceNodeFromStructs(&csn))
}
any, err := ptypes.MarshalAny(pbCSN)
require.NoError(t, err)
tc.msg.Resource = any
resp := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Response_{
Response: tc.msg,
},
}
require.NoError(t, client.Send(resp))
msg, err := client.RecvWithTimeout(1 * time.Second)
require.NoError(t, err)
req := msg.GetRequest()
require.NotNil(t, req)
require.Equal(t, tc.msg.Nonce, req.Nonce)
require.Nil(t, req.Error)
_, got, err := s.Server.FSM().State().CombinedCheckServiceNodes(nil, structs.NewServiceName("api", nil), localPeerName)
require.NoError(t, err)
scrubCheckServiceNodes(got)
require.Equal(t, tc.expect, got)
}
// NOTE: These test cases do not run against independent state stores, they show sequential updates for a given service.
// Every new upsert must replace the data from the previous case.
tt := []testCase{
{
name: "upsert an instance on a node",
msg: &pbpeering.ReplicationMessage_Response{
ResourceURL: pbpeering.TypeURLService,
ResourceID: "api",
Nonce: "1",
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
},
input: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: remoteEntMeta.PartitionOrEmpty(),
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *remoteEntMeta,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *remoteEntMeta,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *remoteEntMeta,
},
},
},
},
expect: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: localPeerName,
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
},
},
},
},
{
name: "upsert two instances on the same node",
msg: &pbpeering.ReplicationMessage_Response{
ResourceURL: pbpeering.TypeURLService,
ResourceID: "api",
Nonce: "2",
Operation: pbpeering.ReplicationMessage_Response_UPSERT,
},
input: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: remoteEntMeta.PartitionOrEmpty(),
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *remoteEntMeta,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *remoteEntMeta,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *remoteEntMeta,
},
},
},
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: remoteEntMeta.PartitionOrEmpty(),
},
Service: &structs.NodeService{
Kind: "",
ID: "api-2",
Service: "api",
Port: 9090,
EnterpriseMeta: *remoteEntMeta,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *remoteEntMeta,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
ServiceName: "api",
Node: "node-1",
Status: api.HealthWarning,
EnterpriseMeta: *remoteEntMeta,
},
},
},
},
expect: structs.CheckServiceNodes{
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: localPeerName,
},
Service: &structs.NodeService{
Kind: "",
ID: "api-1",
Service: "api",
Port: 8080,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
{
CheckID: "api-1-check",
ServiceID: "api-1",
ServiceName: "api",
Node: "node-1",
Status: api.HealthCritical,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
},
},
{
Node: &structs.Node{
ID: "112e2243-ab62-4e8a-9317-63306972183c",
Node: "node-1",
Address: "10.0.0.1",
Datacenter: "dc1",
Partition: localEntMeta.PartitionOrEmpty(),
PeerName: localPeerName,
},
Service: &structs.NodeService{
Kind: "",
ID: "api-2",
Service: "api",
Port: 9090,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
Checks: []*structs.HealthCheck{
{
CheckID: "node-1-check",
Node: "node-1",
Status: api.HealthPassing,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
{
CheckID: "api-2-check",
ServiceID: "api-2",
ServiceName: "api",
Node: "node-1",
Status: api.HealthWarning,
EnterpriseMeta: *localEntMeta,
PeerName: localPeerName,
},
},
},
},
},
}
for _, tc := range tt {
runStep(t, tc.name, func(t *testing.T) {
run(t, tc)
})
}
}
func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
// newTestServer is copied from partition/service_test.go, with the addition of certs/cas.
// TODO(peering): these are endpoint tests and should live in the agent/consul
// package. Instead, these can be written around a mock client (see testing.go)

View File

@ -32,16 +32,23 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
}
run := func(t *testing.T, tc testCase) {
srv := NewService(testutil.Logger(t), nil)
client := newMockClient(context.Background())
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
})
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.errCh = errCh
client.ErrCh = errCh
go func() {
// Pass errors from server handler into errCh so that they can be seen by the client on Recv().
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
err := srv.StreamResources(client.replicationStream)
err := srv.StreamResources(client.ReplicationStream)
if err != nil {
errCh <- err
}
@ -103,6 +110,18 @@ func TestStreamResources_Server_FirstRequest(t *testing.T) {
},
wantErr: status.Error(codes.InvalidArgument, "subscription request to unknown resource URL: nomad.Job"),
},
{
name: "unknown peer",
input: &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
PeerID: "63b60245-c475-426b-b314-4588d210859d",
ResourceURL: pbpeering.TypeURLService,
},
},
},
wantErr: status.Error(codes.InvalidArgument, "initial subscription for unknown PeerID: 63b60245-c475-426b-b314-4588d210859d"),
},
}
for _, tc := range tt {
@ -127,21 +146,30 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
}
srv.streams.timeNow = it.Now
client := newMockClient(context.Background())
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.errCh = errCh
client.ErrCh = errCh
go func() {
// Pass errors from server handler into errCh so that they can be seen by the client on Recv().
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.replicationStream); err != nil {
if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err
}
}()
peering := pbpeering.Peering{
Name: "my-peer",
}
require.NoError(t, store.PeeringWrite(0, &peering))
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
// Receive a subscription from a peer
peerID := "63b60245-c475-426b-b314-4588d210859d"
peerID := p.ID
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
@ -150,7 +178,7 @@ func TestStreamResources_Server_Terminate(t *testing.T) {
},
},
}
err := client.Send(sub)
err = client.Send(sub)
require.NoError(t, err)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
@ -209,14 +237,23 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
}
srv.streams.timeNow = it.Now
client := newMockClient(context.Background())
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
go func() {
errCh <- srv.StreamResources(client.replicationStream)
errCh <- srv.StreamResources(client.ReplicationStream)
}()
peerID := "63b60245-c475-426b-b314-4588d210859d"
peering := pbpeering.Peering{
Name: "my-peer",
}
require.NoError(t, store.PeeringWrite(0, &peering))
_, p, err := store.PeeringRead(nil, state.Query{Value: "my-peer"})
require.NoError(t, err)
peerID := p.ID
sub := &pbpeering.ReplicationMessage{
Payload: &pbpeering.ReplicationMessage_Request_{
Request: &pbpeering.ReplicationMessage_Request{
@ -225,7 +262,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
},
},
}
err := client.Send(sub)
err = client.Send(sub)
require.NoError(t, err)
testutil.RunStep(t, "new stream gets tracked", func(t *testing.T) {
@ -483,15 +520,15 @@ func TestStreamResources_Server_ServiceUpdates(t *testing.T) {
pub: publisher,
})
client := newMockClient(context.Background())
client := NewMockClient(context.Background())
errCh := make(chan error, 1)
client.errCh = errCh
client.ErrCh = errCh
go func() {
// Pass errors from server handler into errCh so that they can be seen by the client on Recv().
// Pass errors from server handler into ErrCh so that they can be seen by the client on Recv().
// This matches gRPC's behavior when an error is returned by a server.
if err := srv.StreamResources(client.replicationStream); err != nil {
if err := srv.StreamResources(client.ReplicationStream); err != nil {
errCh <- err
}
}()
@ -683,7 +720,7 @@ func (b *testStreamBackend) Apply() Apply {
return nil
}
func Test_processResponse(t *testing.T) {
func Test_processResponse_Validation(t *testing.T) {
type testCase struct {
name string
in *pbpeering.ReplicationMessage_Response
@ -691,8 +728,15 @@ func Test_processResponse(t *testing.T) {
wantErr bool
}
publisher := stream.NewEventPublisher(10 * time.Second)
store := newStateStore(t, publisher)
srv := NewService(testutil.Logger(t), &testStreamBackend{
store: store,
pub: publisher,
})
run := func(t *testing.T, tc testCase) {
reply, err := processResponse(tc.in)
reply, err := srv.processResponse("", "", tc.in)
if tc.wantErr {
require.Error(t, err)
} else {

View File

@ -47,6 +47,8 @@ func (e *exportedServiceRequest) CacheInfo() cache.RequestInfo {
// NewMaterializer implements submatview.Request
func (e *exportedServiceRequest) NewMaterializer() (submatview.Materializer, error) {
reqFn := func(index uint64) *pbsubscribe.SubscribeRequest {
// TODO(peering): We need to be able to receive both connect proxies and typical service instances for a given name.
// Using the Topic_ServiceHealth will ignore proxies unless the ServiceName is a proxy name.
r := &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceHealth,
Key: e.req.ServiceName,

View File

@ -75,52 +75,52 @@ func TestPeeringToken(peerID string) structs.PeeringToken {
}
}
type mockClient struct {
mu sync.Mutex
errCh chan error
type MockClient struct {
mu sync.Mutex
replicationStream *mockStream
ErrCh chan error
ReplicationStream *MockStream
}
func (c *mockClient) Send(r *pbpeering.ReplicationMessage) error {
c.replicationStream.recvCh <- r
func (c *MockClient) Send(r *pbpeering.ReplicationMessage) error {
c.ReplicationStream.recvCh <- r
return nil
}
func (c *mockClient) Recv() (*pbpeering.ReplicationMessage, error) {
func (c *MockClient) Recv() (*pbpeering.ReplicationMessage, error) {
select {
case err := <-c.errCh:
case err := <-c.ErrCh:
return nil, err
case r := <-c.replicationStream.sendCh:
case r := <-c.ReplicationStream.sendCh:
return r, nil
case <-time.After(10 * time.Millisecond):
return nil, io.EOF
}
}
func (c *mockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error) {
func (c *MockClient) RecvWithTimeout(dur time.Duration) (*pbpeering.ReplicationMessage, error) {
select {
case err := <-c.errCh:
case err := <-c.ErrCh:
return nil, err
case r := <-c.replicationStream.sendCh:
case r := <-c.ReplicationStream.sendCh:
return r, nil
case <-time.After(dur):
return nil, io.EOF
}
}
func (c *mockClient) Close() {
close(c.replicationStream.recvCh)
func (c *MockClient) Close() {
close(c.ReplicationStream.recvCh)
}
func newMockClient(ctx context.Context) *mockClient {
return &mockClient{
replicationStream: newTestReplicationStream(ctx),
func NewMockClient(ctx context.Context) *MockClient {
return &MockClient{
ReplicationStream: newTestReplicationStream(ctx),
}
}
// mockStream mocks peering.PeeringService_StreamResourcesServer
type mockStream struct {
// MockStream mocks peering.PeeringService_StreamResourcesServer
type MockStream struct {
sendCh chan *pbpeering.ReplicationMessage
recvCh chan *pbpeering.ReplicationMessage
@ -128,10 +128,10 @@ type mockStream struct {
mu sync.Mutex
}
var _ pbpeering.PeeringService_StreamResourcesServer = (*mockStream)(nil)
var _ pbpeering.PeeringService_StreamResourcesServer = (*MockStream)(nil)
func newTestReplicationStream(ctx context.Context) *mockStream {
return &mockStream{
func newTestReplicationStream(ctx context.Context) *MockStream {
return &MockStream{
sendCh: make(chan *pbpeering.ReplicationMessage, 1),
recvCh: make(chan *pbpeering.ReplicationMessage, 1),
ctx: ctx,
@ -139,13 +139,13 @@ func newTestReplicationStream(ctx context.Context) *mockStream {
}
// Send implements pbpeering.PeeringService_StreamResourcesServer
func (s *mockStream) Send(r *pbpeering.ReplicationMessage) error {
func (s *MockStream) Send(r *pbpeering.ReplicationMessage) error {
s.sendCh <- r
return nil
}
// Recv implements pbpeering.PeeringService_StreamResourcesServer
func (s *mockStream) Recv() (*pbpeering.ReplicationMessage, error) {
func (s *MockStream) Recv() (*pbpeering.ReplicationMessage, error) {
r := <-s.recvCh
if r == nil {
return nil, io.EOF
@ -154,32 +154,32 @@ func (s *mockStream) Recv() (*pbpeering.ReplicationMessage, error) {
}
// Context implements grpc.ServerStream and grpc.ClientStream
func (s *mockStream) Context() context.Context {
func (s *MockStream) Context() context.Context {
return s.ctx
}
// SendMsg implements grpc.ServerStream and grpc.ClientStream
func (s *mockStream) SendMsg(m interface{}) error {
func (s *MockStream) SendMsg(m interface{}) error {
return nil
}
// RecvMsg implements grpc.ServerStream and grpc.ClientStream
func (s *mockStream) RecvMsg(m interface{}) error {
func (s *MockStream) RecvMsg(m interface{}) error {
return nil
}
// SetHeader implements grpc.ServerStream
func (s *mockStream) SetHeader(metadata.MD) error {
func (s *MockStream) SetHeader(metadata.MD) error {
return nil
}
// SendHeader implements grpc.ServerStream
func (s *mockStream) SendHeader(metadata.MD) error {
func (s *MockStream) SendHeader(metadata.MD) error {
return nil
}
// SetTrailer implements grpc.ServerStream
func (s *mockStream) SetTrailer(metadata.MD) {}
func (s *MockStream) SetTrailer(metadata.MD) {}
type incrementalTime struct {
base time.Time

View File

@ -855,6 +855,20 @@ func (n *Node) BestAddress(wan bool) string {
return n.Address
}
func (n *Node) ToRegisterRequest() RegisterRequest {
return RegisterRequest{
ID: n.ID,
Node: n.Node,
Datacenter: n.Datacenter,
Address: n.Address,
TaggedAddresses: n.TaggedAddresses,
NodeMeta: n.Meta,
RaftIndex: n.RaftIndex,
EnterpriseMeta: *n.GetEnterpriseMeta(),
PeerName: n.PeerName,
}
}
type Nodes []*Node
// IsSame return whether nodes are similar without taking into account