From fd7a403e1190ef7f8cfc35a13bf388945d94e37e Mon Sep 17 00:00:00 2001 From: alex <8968914+acpana@users.noreply.github.com> Date: Thu, 26 May 2022 17:55:16 -0700 Subject: [PATCH] monitor leadership in peering service (#13257) Signed-off-by: acpana <8968914+acpana@users.noreply.github.com> Co-authored-by: Chris S. Kim Co-authored-by: Freddy --- agent/consul/peering_backend.go | 27 +++++++++++++++++ agent/consul/server.go | 1 + agent/consul/server_test.go | 52 ++++++++++++++++++++++++++++++-- agent/rpc/peering/service.go | 14 +++++++++ agent/rpc/peering/stream_test.go | 25 +++++++++++++-- 5 files changed, 113 insertions(+), 6 deletions(-) diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 84e1676bdb..b82f52f230 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "strconv" + "sync" "google.golang.org/grpc" @@ -19,6 +20,7 @@ type peeringBackend struct { srv *Server connPool GRPCClientConner apply *peeringApply + monitor *leadershipMonitor } var _ peering.Backend = (*peeringBackend)(nil) @@ -29,6 +31,7 @@ func NewPeeringBackend(srv *Server, connPool GRPCClientConner) peering.Backend { srv: srv, connPool: connPool, apply: &peeringApply{srv: srv}, + monitor: &leadershipMonitor{}, } } @@ -101,6 +104,10 @@ func (b *peeringBackend) Apply() peering.Apply { return b.apply } +func (b *peeringBackend) LeadershipMonitor() peering.LeadershipMonitor { + return b.monitor +} + func (b *peeringBackend) EnterpriseCheckPartitions(partition string) error { return b.enterpriseCheckPartitions(partition) } @@ -109,6 +116,25 @@ func (b *peeringBackend) IsLeader() bool { return b.srv.IsLeader() } +type leadershipMonitor struct { + lock sync.RWMutex + leaderAddr string +} + +func (m *leadershipMonitor) UpdateLeaderAddr(addr string) { + m.lock.Lock() + defer m.lock.Unlock() + + m.leaderAddr = addr +} + +func (m *leadershipMonitor) GetLeaderAddr() string { + m.lock.RLock() + defer m.lock.RUnlock() + + return m.leaderAddr +} + type peeringApply struct { srv *Server } @@ -140,3 +166,4 @@ func (a *peeringApply) CatalogRegister(req *structs.RegisterRequest) error { } var _ peering.Apply = (*peeringApply)(nil) +var _ peering.LeadershipMonitor = (*leadershipMonitor)(nil) diff --git a/agent/consul/server.go b/agent/consul/server.go index e2ad87fd53..16cc22baf4 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -1659,6 +1659,7 @@ func (s *Server) trackLeaderChanges() { } s.grpcLeaderForwarder.UpdateLeaderAddr(s.config.Datacenter, string(leaderObs.LeaderAddr)) + s.peeringService.Backend.LeadershipMonitor().UpdateLeaderAddr(string(leaderObs.LeaderAddr)) case <-s.shutdownCh: s.raft.DeregisterObserver(observer) return diff --git a/agent/consul/server_test.go b/agent/consul/server_test.go index 72014f8f05..9cd9762d34 100644 --- a/agent/consul/server_test.go +++ b/agent/consul/server_test.go @@ -19,9 +19,6 @@ import ( "github.com/hashicorp/raft" "google.golang.org/grpc" - "github.com/hashicorp/consul/agent/rpc/middleware" - "github.com/hashicorp/consul/ipaddr" - "github.com/hashicorp/go-uuid" "golang.org/x/time/rate" @@ -29,8 +26,10 @@ import ( "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/metadata" + "github.com/hashicorp/consul/agent/rpc/middleware" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" + "github.com/hashicorp/consul/ipaddr" "github.com/hashicorp/consul/sdk/freeport" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil/retry" @@ -1952,3 +1951,50 @@ func TestServer_RPC_RateLimit(t *testing.T) { } }) } + +// TestServer_Peering_LeadershipMonitor tests that a peering service can receive the leader address +// through the LeadershipMonitor IRL. +func TestServer_Peering_LeadershipMonitor(t *testing.T) { + if testing.Short() { + t.Skip("too slow for testing.Short") + } + + t.Parallel() + + // given two servers: s1 (leader), s2 (follower) + _, conf1 := testServerConfig(t) + s1, err := newServer(t, conf1) + if err != nil { + t.Fatalf("err: %v", err) + } + defer s1.Shutdown() + + _, conf2 := testServerConfig(t) + conf2.Bootstrap = false + s2, err := newServer(t, conf2) + if err != nil { + t.Fatalf("err: %v", err) + } + defer s2.Shutdown() + + // Try to join + joinLAN(t, s2, s1) + + // Verify Raft has established a peer + retry.Run(t, func(r *retry.R) { + r.Check(wantRaft([]*Server{s1, s2})) + }) + + testrpc.WaitForLeader(t, s1.RPC, "dc1") + testrpc.WaitForLeader(t, s2.RPC, "dc1") + waitForLeaderEstablishment(t, s1) + + // the actual tests + // when leadership has been established s2 should have the address of s1 + // in its leadership monitor in the peering service + peeringLeaderAddr := s2.peeringService.Backend.LeadershipMonitor().GetLeaderAddr() + + require.Equal(t, s1.config.RPCAddr.String(), peeringLeaderAddr) + // test corollary by transitivity to future-proof against any setup bugs + require.NotEqual(t, s2.config.RPCAddr.String(), peeringLeaderAddr) +} diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index c50c825eb7..8ee07e7a89 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -107,6 +107,20 @@ type Backend interface { Store() Store Apply() Apply + LeadershipMonitor() LeadershipMonitor +} + +// LeadershipMonitor provides a way for the consul server to update the peering service about +// the server's leadership status. +// Server addresses should look like: ip:port +type LeadershipMonitor interface { + // UpdateLeaderAddr is called on a raft.LeaderObservation in a go routine in the consul server; + // see trackLeaderChanges() + UpdateLeaderAddr(leaderAddr string) + + // GetLeaderAddr provides the best hint for the current address of the leader. + // There is no guarantee that this is the actual address of the leader. + GetLeaderAddr() string } // Store provides a read-only interface for querying Peering data. diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index 27016a1582..d00080c443 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -886,9 +886,28 @@ func makeClient( } type testStreamBackend struct { - pub state.EventPublisher - store *state.Store - leader func() bool + pub state.EventPublisher + store *state.Store + leader func() bool + leadershipMonitor *leadershipMonitor +} + +var _ LeadershipMonitor = (*leadershipMonitor)(nil) + +type leadershipMonitor struct { +} + +func (l *leadershipMonitor) UpdateLeaderAddr(addr string) { + // noop +} + +func (l *leadershipMonitor) GetLeaderAddr() string { + // noop + return "" +} + +func (b *testStreamBackend) LeadershipMonitor() LeadershipMonitor { + return b.leadershipMonitor } func (b *testStreamBackend) IsLeader() bool {