mirror of https://github.com/status-im/consul.git
Fix bug with prepared queries using sameness-groups. (#19970)
This commit fixes an issue where the partition was not properly set on the peering query failover target created from sameness-groups. Before this change, it was always empty, meaning that the data would be queried with respect to the default partition always. This resulted in a situation where a PQ that was attempting to use a sameness-group for failover would select peers from the default partition, rather than the partition of the sameness-group itself.
This commit is contained in:
parent
79e02f8a89
commit
bbdbf3e4f8
|
@ -0,0 +1,3 @@
|
|||
```release-note:bug
|
||||
prepared-query: (Enterprise-only) Fix issue where sameness-group failover targets to peers would attempt to query data from the default partition, rather than the sameness-group's partition always.
|
||||
```
|
|
@ -3357,6 +3357,7 @@ type executeServers struct {
|
|||
|
||||
func createExecuteServers(t *testing.T) *executeServers {
|
||||
es := newExecuteServers(t)
|
||||
es.initPeering(t, "")
|
||||
es.initWanFed(t)
|
||||
es.exportPeeringServices(t)
|
||||
es.initTokens(t)
|
||||
|
@ -3365,7 +3366,6 @@ func createExecuteServers(t *testing.T) *executeServers {
|
|||
}
|
||||
|
||||
func newExecuteServers(t *testing.T) *executeServers {
|
||||
|
||||
// Setup server
|
||||
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||
c.PrimaryDatacenter = "dc1"
|
||||
|
@ -3416,67 +3416,6 @@ func newExecuteServers(t *testing.T) *executeServers {
|
|||
testrpc.WaitForLeader(t, s1.RPC, "dc1", testrpc.WithToken("root"))
|
||||
testrpc.WaitForLeader(t, s3.RPC, "dc3")
|
||||
|
||||
acceptingPeerName := "my-peer-accepting-server"
|
||||
dialingPeerName := "my-peer-dialing-server"
|
||||
|
||||
// Set up peering between dc1 (dialing) and dc3 (accepting) and export the foo service
|
||||
{
|
||||
// Create a peering by generating a token.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
options := structs.QueryOptions{Token: "root"}
|
||||
ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options)
|
||||
require.NoError(t, err)
|
||||
|
||||
conn, err := grpc.DialContext(ctx, s3.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(s3.config.RPCAddr.String())),
|
||||
//nolint:staticcheck
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
conn.Close()
|
||||
})
|
||||
|
||||
peeringClient := pbpeering.NewPeeringServiceClient(conn)
|
||||
req := pbpeering.GenerateTokenRequest{
|
||||
PeerName: dialingPeerName,
|
||||
}
|
||||
resp, err := peeringClient.GenerateToken(ctx, &req)
|
||||
require.NoError(t, err)
|
||||
|
||||
conn, err = grpc.DialContext(ctx, s1.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(s1.config.RPCAddr.String())),
|
||||
//nolint:staticcheck
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
conn.Close()
|
||||
})
|
||||
|
||||
peeringClient = pbpeering.NewPeeringServiceClient(conn)
|
||||
establishReq := pbpeering.EstablishRequest{
|
||||
PeerName: acceptingPeerName,
|
||||
PeeringToken: resp.PeeringToken,
|
||||
}
|
||||
establishResp, err := peeringClient.Establish(ctx, &establishReq)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, establishResp)
|
||||
|
||||
readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: acceptingPeerName})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, readResp)
|
||||
|
||||
// Wait for the stream to be connected.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
status, found := s1.peerStreamServer.StreamStatus(readResp.GetPeering().GetID())
|
||||
require.True(r, found)
|
||||
require.True(r, status.Connected)
|
||||
})
|
||||
}
|
||||
|
||||
es := executeServers{
|
||||
server: &serverTestMetadata{
|
||||
server: s1,
|
||||
|
@ -3487,8 +3426,8 @@ func newExecuteServers(t *testing.T) *executeServers {
|
|||
server: s3,
|
||||
codec: codec3,
|
||||
datacenter: "dc3",
|
||||
dialingPeerName: dialingPeerName,
|
||||
acceptingPeerName: acceptingPeerName,
|
||||
dialingPeerName: "my-peer-dialing-server",
|
||||
acceptingPeerName: "my-peer-accepting-server",
|
||||
},
|
||||
}
|
||||
|
||||
|
@ -3562,3 +3501,64 @@ func (es *executeServers) initWanFed(t *testing.T) {
|
|||
datacenter: "dc2",
|
||||
}
|
||||
}
|
||||
|
||||
func (es *executeServers) initPeering(t *testing.T, localPartition string) {
|
||||
// Create a peering by generating a token.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
t.Cleanup(cancel)
|
||||
|
||||
options := structs.QueryOptions{Token: "root"}
|
||||
ctx, err := grpcexternal.ContextWithQueryOptions(ctx, options)
|
||||
require.NoError(t, err)
|
||||
|
||||
conn, err := grpc.DialContext(ctx, es.peeringServer.server.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(es.peeringServer.server.config.RPCAddr.String())),
|
||||
//nolint:staticcheck
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
conn.Close()
|
||||
})
|
||||
|
||||
peeringClient := pbpeering.NewPeeringServiceClient(conn)
|
||||
req := pbpeering.GenerateTokenRequest{
|
||||
PeerName: es.peeringServer.dialingPeerName,
|
||||
}
|
||||
resp, err := peeringClient.GenerateToken(ctx, &req)
|
||||
require.NoError(t, err)
|
||||
|
||||
conn, err = grpc.DialContext(ctx, es.server.server.config.RPCAddr.String(),
|
||||
grpc.WithContextDialer(newServerDialer(es.server.server.config.RPCAddr.String())),
|
||||
//nolint:staticcheck
|
||||
grpc.WithInsecure(),
|
||||
grpc.WithBlock())
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() {
|
||||
conn.Close()
|
||||
})
|
||||
|
||||
peeringClient = pbpeering.NewPeeringServiceClient(conn)
|
||||
establishReq := pbpeering.EstablishRequest{
|
||||
Partition: localPartition,
|
||||
PeerName: es.peeringServer.acceptingPeerName,
|
||||
PeeringToken: resp.PeeringToken,
|
||||
}
|
||||
establishResp, err := peeringClient.Establish(ctx, &establishReq)
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, establishResp)
|
||||
|
||||
readResp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{
|
||||
Partition: localPartition,
|
||||
Name: es.peeringServer.acceptingPeerName,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
require.NotNil(t, readResp)
|
||||
|
||||
// Wait for the stream to be connected.
|
||||
retry.Run(t, func(r *retry.R) {
|
||||
status, found := es.server.server.peerStreamServer.StreamStatus(readResp.GetPeering().GetID())
|
||||
require.True(r, found)
|
||||
require.True(r, status.Connected)
|
||||
})
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue