mirror of https://github.com/status-im/consul.git
consul: Support a stale read query
This commit is contained in:
parent
a9d4e2357e
commit
fa90f1cd0d
|
@ -232,6 +232,57 @@ func TestCatalogListNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCatalogListNodes_StaleRaad(t *testing.T) {
|
||||||
|
dir1, s1 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
client1 := rpcClient(t, s1)
|
||||||
|
defer client1.Close()
|
||||||
|
|
||||||
|
dir2, s2 := testServer(t)
|
||||||
|
defer os.RemoveAll(dir2)
|
||||||
|
defer s2.Shutdown()
|
||||||
|
client2 := rpcClient(t, s2)
|
||||||
|
defer client2.Close()
|
||||||
|
|
||||||
|
// Try to join
|
||||||
|
addr := fmt.Sprintf("127.0.0.1:%d",
|
||||||
|
s1.config.SerfLANConfig.MemberlistConfig.BindPort)
|
||||||
|
if _, err := s2.JoinLAN([]string{addr}); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for a leader
|
||||||
|
time.Sleep(100 * time.Millisecond)
|
||||||
|
|
||||||
|
// Use the follower as the client
|
||||||
|
var client *rpc.Client
|
||||||
|
if !s1.IsLeader() {
|
||||||
|
client = client1
|
||||||
|
|
||||||
|
// Inject fake data on the follower!
|
||||||
|
s1.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||||
|
} else {
|
||||||
|
client = client2
|
||||||
|
|
||||||
|
// Inject fake data on the follower!
|
||||||
|
s2.fsm.State().EnsureNode(1, structs.Node{"foo", "127.0.0.1"})
|
||||||
|
}
|
||||||
|
|
||||||
|
args := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
QueryOptions: structs.QueryOptions{AllowStale: true},
|
||||||
|
}
|
||||||
|
var out structs.IndexedNodes
|
||||||
|
if err := client.Call("Catalog.ListNodes", &args, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(out.Nodes) != 3 {
|
||||||
|
t.Fatalf("bad: %v", out)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func BenchmarkCatalogListNodes(t *testing.B) {
|
func BenchmarkCatalogListNodes(t *testing.B) {
|
||||||
dir1, s1 := testServer(nil)
|
dir1, s1 := testServer(nil)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
|
|
|
@ -142,6 +142,11 @@ func (s *Server) forward(method string, info structs.RPCInfo, args interface{},
|
||||||
return true, err
|
return true, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if we can allow a stale read
|
||||||
|
if info.IsRead() && info.AllowStaleRead() {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Handle leader forwarding
|
// Handle leader forwarding
|
||||||
if !s.IsLeader() {
|
if !s.IsLeader() {
|
||||||
err := s.forwardLeader(method, args, reply)
|
err := s.forwardLeader(method, args, reply)
|
||||||
|
|
Loading…
Reference in New Issue