Cleanup from unblocking the pipeline 🧹 (#17121)

This commit is contained in:
Dan Upton 2023-04-26 13:59:58 +01:00 committed by GitHub
parent faae7bb5f2
commit eeaa636164
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 64 additions and 65 deletions

View File

@ -62,8 +62,8 @@ func (s *supervisor) run(ctx context.Context) {
return return
// Task stopped running. // Task stopped running.
case err := <-s.errCh: case <-s.errCh:
stopBackoffTimer := s.handleError(err) stopBackoffTimer := s.handleError()
if stopBackoffTimer != nil { if stopBackoffTimer != nil {
defer stopBackoffTimer() defer stopBackoffTimer()
} }
@ -121,9 +121,7 @@ func (s *supervisor) stopTask() {
s.running = false s.running = false
} }
func (s *supervisor) handleError(err error) func() bool { func (s *supervisor) handleError() func() bool {
// TODO(spatel): Fix unused err flagged by lint
_ = err
s.running = false s.running = false
if time.Since(s.startedAt) > flapThreshold { if time.Since(s.startedAt) > flapThreshold {

View File

@ -4,26 +4,28 @@
package inmem_test package inmem_test
import ( import (
"context"
"testing" "testing"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/conformance"
"github.com/hashicorp/consul/internal/storage/inmem"
) )
func TestBackend_Conformance(t *testing.T) { func TestBackend_Conformance(t *testing.T) {
// TODO(spatel): temporarily commenting out to get a green pipleine. conformance.Test(t, conformance.TestOptions{
require.True(t, true) NewBackend: func(t *testing.T) storage.Backend {
backend, err := inmem.NewBackend()
require.NoError(t, err)
// conformance.Test(t, conformance.TestOptions{ ctx, cancel := context.WithCancel(context.Background())
// NewBackend: func(t *testing.T) storage.Backend { t.Cleanup(cancel)
// backend, err := inmem.NewBackend() go backend.Run(ctx)
// require.NoError(t, err)
// ctx, cancel := context.WithCancel(context.Background()) return backend
// t.Cleanup(cancel) },
// go backend.Run(ctx) SupportsStronglyConsistentList: true,
})
// return backend
// },
// SupportsStronglyConsistentList: true,
// })
} }

View File

@ -26,7 +26,6 @@ type Watch struct {
// Next returns the next WatchEvent, blocking until one is available. // Next returns the next WatchEvent, blocking until one is available.
func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
var idx uint64
for { for {
e, err := w.nextEvent(ctx) e, err := w.nextEvent(ctx)
if err == stream.ErrSubForceClosed { if err == stream.ErrSubForceClosed {
@ -36,6 +35,31 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
return nil, err return nil, err
} }
event := e.Payload.(eventPayload).event
if w.query.matches(event.Resource) {
return event, nil
}
}
}
func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) {
if len(w.events) != 0 {
event := w.events[0]
w.events = w.events[1:]
return &event, nil
}
var idx uint64
for {
e, err := w.sub.Next(ctx)
if err != nil {
return nil, err
}
if e.IsFramingEvent() {
continue
}
// This works around a *very* rare race-condition in the EventPublisher where // This works around a *very* rare race-condition in the EventPublisher where
// it's possible to see duplicate events when events are published at the same // it's possible to see duplicate events when events are published at the same
// time as the first subscription is created on a {topic, subject} pair. // time as the first subscription is created on a {topic, subject} pair.
@ -56,30 +80,6 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) {
} }
idx = e.Index idx = e.Index
event := e.Payload.(eventPayload).event
if w.query.matches(event.Resource) {
return event, nil
}
}
}
func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) {
if len(w.events) != 0 {
event := w.events[0]
w.events = w.events[1:]
return &event, nil
}
for {
e, err := w.sub.Next(ctx)
if err != nil {
return nil, err
}
if e.IsFramingEvent() {
continue
}
switch t := e.Payload.(type) { switch t := e.Payload.(type) {
case eventPayload: case eventPayload:
return &e, nil return &e, nil

View File

@ -16,33 +16,32 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"github.com/hashicorp/consul/internal/storage"
"github.com/hashicorp/consul/internal/storage/conformance"
"github.com/hashicorp/consul/internal/storage/raft" "github.com/hashicorp/consul/internal/storage/raft"
"github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/sdk/testutil"
) )
func TestBackend_Conformance(t *testing.T) { func TestBackend_Conformance(t *testing.T) {
// TODO(spatel): Temporarily disable to get a green pipeline t.Run("Leader", func(t *testing.T) {
require.True(t, true) conformance.Test(t, conformance.TestOptions{
NewBackend: func(t *testing.T) storage.Backend {
leader, _ := newRaftCluster(t)
return leader
},
SupportsStronglyConsistentList: true,
})
})
// t.Run("Leader", func(t *testing.T) { t.Run("Follower", func(t *testing.T) {
// conformance.Test(t, conformance.TestOptions{ conformance.Test(t, conformance.TestOptions{
// NewBackend: func(t *testing.T) storage.Backend { NewBackend: func(t *testing.T) storage.Backend {
// leader, _ := newRaftCluster(t) _, follower := newRaftCluster(t)
// return leader return follower
// }, },
// SupportsStronglyConsistentList: true, SupportsStronglyConsistentList: true,
// }) })
// }) })
// t.Run("Follower", func(t *testing.T) {
// conformance.Test(t, conformance.TestOptions{
// NewBackend: func(t *testing.T) storage.Backend {
// _, follower := newRaftCluster(t)
// return follower
// },
// SupportsStronglyConsistentList: true,
// })
// })
} }
func newRaftCluster(t *testing.T) (*raft.Backend, *raft.Backend) { func newRaftCluster(t *testing.T) (*raft.Backend, *raft.Backend) {