Fix flaky tests in the agent/grpc/public/services/serverdiscovery package (#13173)

Occasionally we had seen the TestWatchServers_ACLToken_PermissionDenied be flagged as flaky in circleci. This change should fix that.

Why it fixes it is complicated. The test was failing with a panic when a mocked ACL Resolver was being called more times than expected. I struggled for a while to determine how that could be. This test should call authorize once and only once and the error returned should cause the stream to be terminated and the error returned to the gRPC client. Another oddity was no amount of running this test locally seemed to be able to reproduce the issue. I ran the test hundreds of thousands of time and it always passed.

It turns out that there is nothing wrong with the test. It just so happens that the panic from unexpected invocation of a mocked call happened during the test but was caused by a previous test (specifically the TestWatchServers_StreamLifecycle test)

The stream from the previous test remained open after all the test Cleanup functions were run and it just so happened that when the EventPublisher eventually picked up that the context was cancelled during cleanup, it force closes all subscriptions which causes some loops to be re-entered and the streams to be reauthorized. Its that looping in response to forced subscription closures that causes the mock to eventually panic. All the components, publisher, server, client all operate based on contexts. We cancel all those contexts but there is no syncrhonous way to know when they are stopped.

We could have implemented a syncrhonous stop but in the context of an actual running Consul, context cancellation + async stopping is perfectly fine. What we (Dan and I) eventually thought was that the behavior of grpc streams such as this when a server was shutting down wasn’t super helpful. What we would want is for a client to be able to distinguish between subscription closed because something may have changed requiring re-authentication and subscription closed because the server is shutting down. That way we can send back appropriate error messages to detail that the server is shutting down and not confuse users with potentially needing to resubscribe.

So thats what this PR does. We have introduced a shutting down state to our event subscriptions and the various streaming gRPC services that rely on the event publisher will all just behave correctly and actually stop the stream (not attempt transparent reauthorization) if this particular error is the one we get from the stream. Additionally the error that gets transmitted back through gRPC when this does occur indicates to the consumer that the server is going away. That is more helpful so that a client can then attempt to reconnect to another server.
This commit is contained in:
Matt Keeler 2022-05-23 08:59:13 -04:00 committed by GitHub
parent 7ccbb3489b
commit 3c1e17cbd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 19 additions and 1 deletions

View File

@ -351,7 +351,7 @@ func (s *subscriptions) closeAll() {
for _, byRequest := range s.byToken { for _, byRequest := range s.byToken {
for _, sub := range byRequest { for _, sub := range byRequest {
sub.forceClose() sub.shutDown()
} }
} }
} }

View File

@ -21,12 +21,21 @@ const (
// subStateUnsub indicates the subscription was closed by the caller, and // subStateUnsub indicates the subscription was closed by the caller, and
// will not return new events. // will not return new events.
subStateUnsub = 2 subStateUnsub = 2
// subStateShutting down indicates the subscription was closed due to
// the server being shut down.
subStateShuttingDown = 3
) )
// ErrSubForceClosed is a error signalling the subscription has been // ErrSubForceClosed is a error signalling the subscription has been
// closed. The client should Unsubscribe, then re-Subscribe. // closed. The client should Unsubscribe, then re-Subscribe.
var ErrSubForceClosed = errors.New("subscription closed by server, client must reset state and resubscribe") var ErrSubForceClosed = errors.New("subscription closed by server, client must reset state and resubscribe")
// ErrShuttingDown is an error to signal that the subscription has
// been closed because the server is shutting down. The client should
// subscribe to a different server to get streaming event updates.
var ErrShuttingDown = errors.New("subscription closed by server, server is shutting down")
// Subscription provides events on a Topic. Events may be filtered by Key. // Subscription provides events on a Topic. Events may be filtered by Key.
// Events are returned by Next(), and may start with a Snapshot of events. // Events are returned by Next(), and may start with a Snapshot of events.
type Subscription struct { type Subscription struct {
@ -117,6 +126,8 @@ func (s *Subscription) requireStateOpen() error {
switch atomic.LoadUint32(&s.state) { switch atomic.LoadUint32(&s.state) {
case subStateForceClosed: case subStateForceClosed:
return ErrSubForceClosed return ErrSubForceClosed
case subStateShuttingDown:
return ErrShuttingDown
case subStateUnsub: case subStateUnsub:
return fmt.Errorf("subscription was closed by unsubscribe") return fmt.Errorf("subscription was closed by unsubscribe")
default: default:
@ -145,6 +156,13 @@ func (s *Subscription) forceClose() {
} }
} }
// Close the subscription and indicate that the server is being shut down.
func (s *Subscription) shutDown() {
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateShuttingDown) {
close(s.closed)
}
}
// Unsubscribe the subscription, freeing resources. // Unsubscribe the subscription, freeing resources.
func (s *Subscription) Unsubscribe() { func (s *Subscription) Unsubscribe() {
if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) { if atomic.CompareAndSwapUint32(&s.state, subStateOpen, subStateUnsub) {