Bails out of blocking queries when a state restore occurs.

This commit is contained in:
James Phillips 2017-01-25 19:00:32 -08:00
parent 77dc6fb08e
commit c370d4ff29
No known key found for this signature in database
GPG Key ID: 77183E682AC5FC11
2 changed files with 103 additions and 1 deletions

View File

@ -417,7 +417,16 @@ RUN_QUERY:
err := fn(ws, state)
if err == nil && queryMeta.Index > 0 && queryMeta.Index <= queryOpts.MinQueryIndex {
if expired := ws.Watch(timeout.C); !expired {
goto RUN_QUERY
// If a restore may have woken us up then bail out from
// the query immediately. This is slightly race-ey since
// this might have been interrupted for other reasons,
// but it's OK to kick it back to the caller in either
// case.
select {
case <-state.AbandonCh():
default:
goto RUN_QUERY
}
}
}
return err

View File

@ -1,12 +1,15 @@
package consul
import (
"bytes"
"os"
"testing"
"time"
"github.com/hashicorp/consul/consul/state"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
@ -70,3 +73,93 @@ func TestRPC_NoLeader_Retry(t *testing.T) {
t.Fatalf("bad: %v", err)
}
}
func TestRPC_blockingQuery(t *testing.T) {
dir, s := testServer(t)
defer os.RemoveAll(dir)
defer s.Shutdown()
// Perform a non-blocking query.
{
var opts structs.QueryOptions
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
calls++
return nil
}
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
t.Fatalf("err: %v", err)
}
if calls != 1 {
t.Fatalf("bad: %d", calls)
}
}
// Perform a blocking query that gets woken up and loops around once.
{
opts := structs.QueryOptions{
MinQueryIndex: 3,
}
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
if calls == 0 {
meta.Index = 3
fakeCh := make(chan struct{})
close(fakeCh)
ws.Add(fakeCh)
} else {
meta.Index = 4
}
calls++
return nil
}
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
t.Fatalf("err: %v", err)
}
if calls != 2 {
t.Fatalf("bad: %d", calls)
}
}
// Perform a query that blocks and gets interrupted when the state store
// is abandoned.
{
opts := structs.QueryOptions{
MinQueryIndex: 3,
}
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.StateStore) error {
if calls == 0 {
meta.Index = 3
snap, err := s.fsm.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Release()
buf := bytes.NewBuffer(nil)
sink := &MockSink{buf, false}
if err := snap.Persist(sink); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.fsm.Restore(sink); err != nil {
t.Fatalf("err: %v", err)
}
}
calls++
return nil
}
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
t.Fatalf("err: %v", err)
}
if calls != 1 {
t.Fatalf("bad: %d", calls)
}
}
}