diff --git a/internal/controller/supervisor.go b/internal/controller/supervisor.go index 8174b86d6d..5983ff4c4b 100644 --- a/internal/controller/supervisor.go +++ b/internal/controller/supervisor.go @@ -62,8 +62,8 @@ func (s *supervisor) run(ctx context.Context) { return // Task stopped running. - case err := <-s.errCh: - stopBackoffTimer := s.handleError(err) + case <-s.errCh: + stopBackoffTimer := s.handleError() if stopBackoffTimer != nil { defer stopBackoffTimer() } @@ -121,9 +121,7 @@ func (s *supervisor) stopTask() { s.running = false } -func (s *supervisor) handleError(err error) func() bool { - // TODO(spatel): Fix unused err flagged by lint - _ = err +func (s *supervisor) handleError() func() bool { s.running = false if time.Since(s.startedAt) > flapThreshold { diff --git a/internal/storage/inmem/backend_test.go b/internal/storage/inmem/backend_test.go index 65cfca85d4..e37de15afa 100644 --- a/internal/storage/inmem/backend_test.go +++ b/internal/storage/inmem/backend_test.go @@ -4,26 +4,28 @@ package inmem_test import ( + "context" "testing" "github.com/stretchr/testify/require" + + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/internal/storage/conformance" + "github.com/hashicorp/consul/internal/storage/inmem" ) func TestBackend_Conformance(t *testing.T) { - // TODO(spatel): temporarily commenting out to get a green pipleine. - require.True(t, true) + conformance.Test(t, conformance.TestOptions{ + NewBackend: func(t *testing.T) storage.Backend { + backend, err := inmem.NewBackend() + require.NoError(t, err) - // conformance.Test(t, conformance.TestOptions{ - // NewBackend: func(t *testing.T) storage.Backend { - // backend, err := inmem.NewBackend() - // require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + go backend.Run(ctx) - // ctx, cancel := context.WithCancel(context.Background()) - // t.Cleanup(cancel) - // go backend.Run(ctx) - - // return backend - // }, - // SupportsStronglyConsistentList: true, - // }) + return backend + }, + SupportsStronglyConsistentList: true, + }) } diff --git a/internal/storage/inmem/watch.go b/internal/storage/inmem/watch.go index de340bb39e..85c657e3a3 100644 --- a/internal/storage/inmem/watch.go +++ b/internal/storage/inmem/watch.go @@ -26,7 +26,6 @@ type Watch struct { // Next returns the next WatchEvent, blocking until one is available. func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { - var idx uint64 for { e, err := w.nextEvent(ctx) if err == stream.ErrSubForceClosed { @@ -36,6 +35,31 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { return nil, err } + event := e.Payload.(eventPayload).event + if w.query.matches(event.Resource) { + return event, nil + } + } +} + +func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) { + if len(w.events) != 0 { + event := w.events[0] + w.events = w.events[1:] + return &event, nil + } + + var idx uint64 + for { + e, err := w.sub.Next(ctx) + if err != nil { + return nil, err + } + + if e.IsFramingEvent() { + continue + } + // This works around a *very* rare race-condition in the EventPublisher where // it's possible to see duplicate events when events are published at the same // time as the first subscription is created on a {topic, subject} pair. @@ -56,30 +80,6 @@ func (w *Watch) Next(ctx context.Context) (*pbresource.WatchEvent, error) { } idx = e.Index - event := e.Payload.(eventPayload).event - if w.query.matches(event.Resource) { - return event, nil - } - } -} - -func (w *Watch) nextEvent(ctx context.Context) (*stream.Event, error) { - if len(w.events) != 0 { - event := w.events[0] - w.events = w.events[1:] - return &event, nil - } - - for { - e, err := w.sub.Next(ctx) - if err != nil { - return nil, err - } - - if e.IsFramingEvent() { - continue - } - switch t := e.Payload.(type) { case eventPayload: return &e, nil diff --git a/internal/storage/raft/conformance_test.go b/internal/storage/raft/conformance_test.go index 230f4ca119..ef79087e17 100644 --- a/internal/storage/raft/conformance_test.go +++ b/internal/storage/raft/conformance_test.go @@ -16,33 +16,32 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "github.com/hashicorp/consul/internal/storage" + "github.com/hashicorp/consul/internal/storage/conformance" "github.com/hashicorp/consul/internal/storage/raft" "github.com/hashicorp/consul/sdk/testutil" ) func TestBackend_Conformance(t *testing.T) { - // TODO(spatel): Temporarily disable to get a green pipeline - require.True(t, true) + t.Run("Leader", func(t *testing.T) { + conformance.Test(t, conformance.TestOptions{ + NewBackend: func(t *testing.T) storage.Backend { + leader, _ := newRaftCluster(t) + return leader + }, + SupportsStronglyConsistentList: true, + }) + }) - // t.Run("Leader", func(t *testing.T) { - // conformance.Test(t, conformance.TestOptions{ - // NewBackend: func(t *testing.T) storage.Backend { - // leader, _ := newRaftCluster(t) - // return leader - // }, - // SupportsStronglyConsistentList: true, - // }) - // }) - - // t.Run("Follower", func(t *testing.T) { - // conformance.Test(t, conformance.TestOptions{ - // NewBackend: func(t *testing.T) storage.Backend { - // _, follower := newRaftCluster(t) - // return follower - // }, - // SupportsStronglyConsistentList: true, - // }) - // }) + t.Run("Follower", func(t *testing.T) { + conformance.Test(t, conformance.TestOptions{ + NewBackend: func(t *testing.T) storage.Backend { + _, follower := newRaftCluster(t) + return follower + }, + SupportsStronglyConsistentList: true, + }) + }) } func newRaftCluster(t *testing.T) (*raft.Backend, *raft.Backend) {