consul/agent/consul/rpc_test.go

220 lines
4.7 KiB
Go

package consul
import (
"bytes"
"os"
"testing"
"time"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/net-rpc-msgpackrpc"
)
func TestRPC_NoLeader_Fail(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.RPCHoldTimeout = 1 * time.Millisecond
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var out struct{}
// Make sure we eventually fail with a no leader error, which we should
// see given the short timeout.
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err == nil || err.Error() != structs.ErrNoLeader.Error() {
t.Fatalf("bad: %v", err)
}
// Now make sure it goes through.
testrpc.WaitForLeader(t, s1.RPC, "dc1")
err = msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err != nil {
t.Fatalf("bad: %v", err)
}
}
func TestRPC_NoLeader_Retry(t *testing.T) {
t.Parallel()
dir1, s1 := testServerWithConfig(t, func(c *Config) {
c.RPCHoldTimeout = 10 * time.Second
})
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
arg := structs.RegisterRequest{
Datacenter: "dc1",
Node: "foo",
Address: "127.0.0.1",
}
var out struct{}
// This isn't sure-fire but tries to check that we don't have a
// leader going into the RPC, so we exercise the retry logic.
if ok, _ := s1.getLeader(); ok {
t.Fatalf("should not have a leader yet")
}
// The timeout is long enough to ride out any reasonable leader
// election.
err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out)
if err != nil {
t.Fatalf("bad: %v", err)
}
}
type MockSink struct {
*bytes.Buffer
cancel bool
}
func (m *MockSink) ID() string {
return "Mock"
}
func (m *MockSink) Cancel() error {
m.cancel = true
return nil
}
func (m *MockSink) Close() error {
return nil
}
func TestRPC_blockingQuery(t *testing.T) {
t.Parallel()
dir, s := testServer(t)
defer os.RemoveAll(dir)
defer s.Shutdown()
// Perform a non-blocking query.
{
var opts structs.QueryOptions
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.Store) error {
calls++
return nil
}
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
t.Fatalf("err: %v", err)
}
if calls != 1 {
t.Fatalf("bad: %d", calls)
}
}
// Perform a blocking query that gets woken up and loops around once.
{
opts := structs.QueryOptions{
MinQueryIndex: 3,
}
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.Store) error {
if calls == 0 {
meta.Index = 3
fakeCh := make(chan struct{})
close(fakeCh)
ws.Add(fakeCh)
} else {
meta.Index = 4
}
calls++
return nil
}
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
t.Fatalf("err: %v", err)
}
if calls != 2 {
t.Fatalf("bad: %d", calls)
}
}
// Perform a query that blocks and gets interrupted when the state store
// is abandoned.
{
opts := structs.QueryOptions{
MinQueryIndex: 3,
}
var meta structs.QueryMeta
var calls int
fn := func(ws memdb.WatchSet, state *state.Store) error {
if calls == 0 {
meta.Index = 3
snap, err := s.fsm.Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
defer snap.Release()
buf := bytes.NewBuffer(nil)
sink := &MockSink{buf, false}
if err := snap.Persist(sink); err != nil {
t.Fatalf("err: %v", err)
}
if err := s.fsm.Restore(sink); err != nil {
t.Fatalf("err: %v", err)
}
}
calls++
return nil
}
if err := s.blockingQuery(&opts, &meta, fn); err != nil {
t.Fatalf("err: %v", err)
}
if calls != 1 {
t.Fatalf("bad: %d", calls)
}
}
}
func TestRPC_ReadyForConsistentReads(t *testing.T) {
t.Parallel()
dir, s := testServerWithConfig(t, func(c *Config) {
c.RPCHoldTimeout = 2 * time.Millisecond
})
defer os.RemoveAll(dir)
defer s.Shutdown()
testrpc.WaitForLeader(t, s.RPC, "dc1")
if !s.isReadyForConsistentReads() {
t.Fatal("Server should be ready for consistent reads")
}
s.resetConsistentReadReady()
err := s.consistentRead()
if err.Error() != "Not ready to serve consistent reads" {
t.Fatal("Server should NOT be ready for consistent reads")
}
go func() {
time.Sleep(100 * time.Millisecond)
s.setConsistentReadReady()
}()
retry.Run(t, func(r *retry.R) {
if err := s.consistentRead(); err != nil {
r.Fatalf("Expected server to be ready for consistent reads, got error %v", err)
}
})
}