From df951bd601720fb6bfb8f17ed9e8126ab7024f66 Mon Sep 17 00:00:00 2001 From: "Chris S. Kim" Date: Mon, 22 Aug 2022 10:07:00 -0400 Subject: [PATCH] Expose external gRPC port in autopilot The grpc_port was added to a NodeService's meta in ea58f235f5da416224ba615405269661ba1f4d8d --- .../autopilotevents/mock_StateStore_test.go | 32 +++++++++++++++ .../autopilotevents/ready_servers_events.go | 39 ++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/agent/consul/autopilotevents/mock_StateStore_test.go b/agent/consul/autopilotevents/mock_StateStore_test.go index 0262b410ae..dd048e58eb 100644 --- a/agent/consul/autopilotevents/mock_StateStore_test.go +++ b/agent/consul/autopilotevents/mock_StateStore_test.go @@ -4,6 +4,8 @@ package autopilotevents import ( acl "github.com/hashicorp/consul/acl" + memdb "github.com/hashicorp/go-memdb" + mock "github.com/stretchr/testify/mock" structs "github.com/hashicorp/consul/agent/structs" @@ -48,6 +50,36 @@ func (_m *MockStateStore) GetNodeID(_a0 types.NodeID, _a1 *acl.EnterpriseMeta, _ return r0, r1, r2 } +// NodeService provides a mock function with given fields: ws, nodeName, serviceID, entMeta, peerName +func (_m *MockStateStore) NodeService(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeService, error) { + ret := _m.Called(ws, nodeName, serviceID, entMeta, peerName) + + var r0 uint64 + if rf, ok := ret.Get(0).(func(memdb.WatchSet, string, string, *acl.EnterpriseMeta, string) uint64); ok { + r0 = rf(ws, nodeName, serviceID, entMeta, peerName) + } else { + r0 = ret.Get(0).(uint64) + } + + var r1 *structs.NodeService + if rf, ok := ret.Get(1).(func(memdb.WatchSet, string, string, *acl.EnterpriseMeta, string) *structs.NodeService); ok { + r1 = rf(ws, nodeName, serviceID, entMeta, peerName) + } else { + if ret.Get(1) != nil { + r1 = ret.Get(1).(*structs.NodeService) + } + } + + var r2 error + if rf, ok := ret.Get(2).(func(memdb.WatchSet, string, string, *acl.EnterpriseMeta, string) error); ok { + r2 = rf(ws, nodeName, serviceID, entMeta, peerName) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + // NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. func NewMockStateStore(t testing.TB) *MockStateStore { mock := &MockStateStore{} diff --git a/agent/consul/autopilotevents/ready_servers_events.go b/agent/consul/autopilotevents/ready_servers_events.go index ad3221e9a9..cbc9819491 100644 --- a/agent/consul/autopilotevents/ready_servers_events.go +++ b/agent/consul/autopilotevents/ready_servers_events.go @@ -4,9 +4,11 @@ import ( "fmt" "net" "sort" + "strconv" "sync" "time" + "github.com/hashicorp/go-memdb" autopilot "github.com/hashicorp/raft-autopilot" "github.com/hashicorp/consul/acl" @@ -26,6 +28,7 @@ type ReadyServerInfo struct { ID string Address string TaggedAddresses map[string]string + ExtGRPCPort int Version string } @@ -122,6 +125,7 @@ func NewReadyServersEventPublisher(config Config) *ReadyServersEventPublisher { //go:generate mockery --name StateStore --inpackage --filename mock_StateStore_test.go type StateStore interface { GetNodeID(types.NodeID, *acl.EnterpriseMeta, string) (uint64, *structs.Node, error) + NodeService(ws memdb.WatchSet, nodeName string, serviceID string, entMeta *acl.EnterpriseMeta, peerName string) (uint64, *structs.NodeService, error) } //go:generate mockery --name Publisher --inpackage --filename mock_Publisher_test.go @@ -226,6 +230,7 @@ func (r *ReadyServersEventPublisher) autopilotStateToReadyServers(state *autopil Address: host, Version: srv.Server.Version, TaggedAddresses: r.getTaggedAddresses(srv), + ExtGRPCPort: r.getGRPCPort(srv), }) } } @@ -254,7 +259,7 @@ func (r *ReadyServersEventPublisher) getTaggedAddresses(srv *autopilot.ServerSta // code and reason about and having those addresses be updated within 30s is good enough. _, node, err := r.GetStore().GetNodeID(types.NodeID(srv.Server.ID), structs.NodeEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword) if err != nil || node == nil { - // no catalog information means we should return a nil addres map + // no catalog information means we should return a nil address map return nil } @@ -276,6 +281,38 @@ func (r *ReadyServersEventPublisher) getTaggedAddresses(srv *autopilot.ServerSta return addrs } +// getGRPCPort will get the external gRPC port for a Consul server. +// Returns 0 if there is none assigned or if an error is encountered. +func (r *ReadyServersEventPublisher) getGRPCPort(srv *autopilot.ServerState) int { + if r.GetStore == nil { + return 0 + } + + _, n, err := r.GetStore().GetNodeID(types.NodeID(srv.Server.ID), structs.NodeEnterpriseMetaInDefaultPartition(), structs.DefaultPeerKeyword) + if err != nil || n == nil { + return 0 + } + + _, ns, err := r.GetStore().NodeService( + nil, + n.Node, + structs.ConsulServiceID, + structs.NodeEnterpriseMetaInDefaultPartition(), + structs.DefaultPeerKeyword, + ) + if err != nil || ns == nil || ns.Meta == nil { + return 0 + } + if str, ok := ns.Meta["grpc_port"]; ok { + grpcPort, err := strconv.Atoi(str) + if err == nil { + return grpcPort + } + } + + return 0 +} + // newReadyServersEvent will create a stream.Event with the provided ready server info. func (r *ReadyServersEventPublisher) newReadyServersEvent(servers EventPayloadReadyServers) stream.Event { now := time.Now()