mirror of https://github.com/status-im/consul.git
Merge branch 'main' of github.com:hashicorp/consul into docs-ecs-mesh-gw
pulling merged changes into this branch
This commit is contained in:
commit
42123bec1f
|
@ -476,6 +476,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
|
||||
// Peerings
|
||||
require.NoError(t, fsm.state.PeeringWrite(31, &pbpeering.Peering{
|
||||
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
||||
Name: "baz",
|
||||
}))
|
||||
|
||||
|
|
|
@ -7,13 +7,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
|
@ -62,6 +62,10 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
|||
_, found := s1.peeringService.StreamStatus(token.PeerID)
|
||||
require.False(t, found)
|
||||
|
||||
var (
|
||||
s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
)
|
||||
|
||||
// Bring up s2 and store s1's token so that it attempts to dial.
|
||||
_, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s2.dc2"
|
||||
|
@ -73,6 +77,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
|||
// Simulate a peering initiation event by writing a peering with data from a peering token.
|
||||
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
|
||||
p := &pbpeering.Peering{
|
||||
ID: s2PeerID,
|
||||
Name: "my-peer-s1",
|
||||
PeerID: token.PeerID,
|
||||
PeerCAPems: token.CA,
|
||||
|
@ -92,6 +97,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
|||
|
||||
// Delete the peering to trigger the termination sequence.
|
||||
deleted := &pbpeering.Peering{
|
||||
ID: s2PeerID,
|
||||
Name: "my-peer-s1",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}
|
||||
|
@ -151,6 +157,11 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
|
|||
var token structs.PeeringToken
|
||||
require.NoError(t, json.Unmarshal(tokenJSON, &token))
|
||||
|
||||
var (
|
||||
s1PeerID = token.PeerID
|
||||
s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
)
|
||||
|
||||
// Bring up s2 and store s1's token so that it attempts to dial.
|
||||
_, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s2.dc2"
|
||||
|
@ -162,6 +173,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
|
|||
// Simulate a peering initiation event by writing a peering with data from a peering token.
|
||||
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
|
||||
p := &pbpeering.Peering{
|
||||
ID: s2PeerID,
|
||||
Name: "my-peer-s1",
|
||||
PeerID: token.PeerID,
|
||||
PeerCAPems: token.CA,
|
||||
|
@ -181,6 +193,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
|
|||
|
||||
// Delete the peering from the server peer to trigger the termination sequence.
|
||||
deleted := &pbpeering.Peering{
|
||||
ID: s1PeerID,
|
||||
Name: "my-peer-s2",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}
|
||||
|
@ -216,6 +229,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
var (
|
||||
peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
peerName = "my-peer-s2"
|
||||
defaultMeta = acl.DefaultEnterpriseMeta()
|
||||
lastIdx = uint64(0)
|
||||
|
@ -224,6 +238,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||
// Simulate a peering initiation event by writing a peering to the state store.
|
||||
lastIdx++
|
||||
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName,
|
||||
}))
|
||||
|
||||
|
@ -233,6 +248,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||
// Mark the peering for deletion to trigger the termination sequence.
|
||||
lastIdx++
|
||||
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}))
|
||||
|
|
|
@ -143,6 +143,17 @@ type peeringApply struct {
|
|||
srv *Server
|
||||
}
|
||||
|
||||
func (a *peeringApply) CheckPeeringUUID(id string) (bool, error) {
|
||||
state := a.srv.fsm.State()
|
||||
if _, existing, err := state.PeeringReadByID(nil, id); err != nil {
|
||||
return false, err
|
||||
} else if existing != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (a *peeringApply) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
|
||||
_, err := a.srv.raftApplyProtobuf(structs.PeeringWriteType, req)
|
||||
return err
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -191,50 +191,47 @@ func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.Enterp
|
|||
return idx, result, nil
|
||||
}
|
||||
|
||||
func generatePeeringUUID(tx ReadTxn) (string, error) {
|
||||
for {
|
||||
uuid, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to generate UUID: %w", err)
|
||||
}
|
||||
existing, err := peeringReadByIDTxn(tx, nil, uuid)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read peering: %w", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return uuid, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error {
|
||||
tx := s.db.WriteTxn(idx)
|
||||
defer tx.Abort()
|
||||
|
||||
q := Query{
|
||||
Value: p.Name,
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition),
|
||||
// Check that the ID and Name are set.
|
||||
if p.ID == "" {
|
||||
return errors.New("Missing Peering ID")
|
||||
}
|
||||
existingRaw, err := tx.First(tablePeering, indexName, q)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed peering lookup: %w", err)
|
||||
if p.Name == "" {
|
||||
return errors.New("Missing Peering Name")
|
||||
}
|
||||
|
||||
existing, ok := existingRaw.(*pbpeering.Peering)
|
||||
if existingRaw != nil && !ok {
|
||||
return fmt.Errorf("invalid type %T", existingRaw)
|
||||
// ensure the name is unique (cannot conflict with another peering with a different ID)
|
||||
_, existing, err := peeringReadTxn(tx, nil, Query{
|
||||
Value: p.Name,
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
if p.ID != existing.ID {
|
||||
return fmt.Errorf("A peering already exists with the name %q and a different ID %q", p.Name, existing.ID)
|
||||
}
|
||||
// Prevent modifications to Peering marked for deletion
|
||||
if !existing.IsActive() {
|
||||
return fmt.Errorf("cannot write to peering that is marked for deletion")
|
||||
}
|
||||
|
||||
p.CreateIndex = existing.CreateIndex
|
||||
p.ID = existing.ID
|
||||
|
||||
p.ModifyIndex = idx
|
||||
} else {
|
||||
idMatch, err := peeringReadByIDTxn(tx, nil, p.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if idMatch != nil {
|
||||
return fmt.Errorf("A peering already exists with the ID %q and a different name %q", p.Name, existing.ID)
|
||||
}
|
||||
|
||||
if !p.IsActive() {
|
||||
return fmt.Errorf("cannot create a new peering marked for deletion")
|
||||
}
|
||||
|
@ -242,13 +239,8 @@ func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error {
|
|||
// TODO(peering): consider keeping PeeringState enum elsewhere?
|
||||
p.State = pbpeering.PeeringState_INITIAL
|
||||
p.CreateIndex = idx
|
||||
|
||||
p.ID, err = generatePeeringUUID(tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate peering id: %w", err)
|
||||
}
|
||||
}
|
||||
p.ModifyIndex = idx
|
||||
}
|
||||
|
||||
if err := tx.Insert(tablePeering, p); err != nil {
|
||||
return fmt.Errorf("failed inserting peering: %w", err)
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
|
@ -17,6 +14,12 @@ import (
|
|||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
const (
|
||||
testFooPeerID = "9e650110-ac74-4c5a-a6a8-9348b2bed4e9"
|
||||
testBarPeerID = "5ebcff30-5509-4858-8142-a8e580f1863f"
|
||||
testBazPeerID = "432feb2f-5476-4ae2-b33c-e43640ca0e86"
|
||||
)
|
||||
|
||||
func insertTestPeerings(t *testing.T, s *Store) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -26,7 +29,7 @@ func insertTestPeerings(t *testing.T, s *Store) {
|
|||
err := tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -36,7 +39,7 @@ func insertTestPeerings(t *testing.T, s *Store) {
|
|||
err = tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "bar",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
|
@ -97,16 +100,16 @@ func TestStateStore_PeeringReadByID(t *testing.T) {
|
|||
run := func(t *testing.T, tc testcase) {
|
||||
_, peering, err := s.PeeringReadByID(nil, tc.id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, peering)
|
||||
prototest.AssertDeepEqual(t, tc.expect, peering)
|
||||
}
|
||||
tcs := []testcase{
|
||||
{
|
||||
name: "get foo",
|
||||
id: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
id: testFooPeerID,
|
||||
expect: &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -114,11 +117,11 @@ func TestStateStore_PeeringReadByID(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "get bar",
|
||||
id: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
id: testBarPeerID,
|
||||
expect: &pbpeering.Peering{
|
||||
Name: "bar",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
|
@ -149,7 +152,7 @@ func TestStateStore_PeeringRead(t *testing.T) {
|
|||
run := func(t *testing.T, tc testcase) {
|
||||
_, peering, err := s.PeeringRead(nil, tc.query)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, peering)
|
||||
prototest.AssertDeepEqual(t, tc.expect, peering)
|
||||
}
|
||||
tcs := []testcase{
|
||||
{
|
||||
|
@ -160,7 +163,7 @@ func TestStateStore_PeeringRead(t *testing.T) {
|
|||
expect: &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -189,6 +192,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
|
||||
// set up initial write
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -210,6 +214,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testBarPeerID,
|
||||
Name: "bar",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -229,6 +234,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
// unrelated write shouldn't fire watch
|
||||
lastIdx++
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testBarPeerID,
|
||||
Name: "bar",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -237,6 +243,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
// foo write should fire watch
|
||||
lastIdx++
|
||||
err = s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
})
|
||||
|
@ -261,6 +268,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
// mark for deletion before actually deleting
|
||||
lastIdx++
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testBarPeerID,
|
||||
Name: "bar",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
})
|
||||
|
@ -293,7 +301,7 @@ func TestStore_PeeringList(t *testing.T) {
|
|||
{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -301,7 +309,7 @@ func TestStore_PeeringList(t *testing.T) {
|
|||
{
|
||||
Name: "bar",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
|
@ -336,6 +344,7 @@ func TestStore_PeeringList_Watch(t *testing.T) {
|
|||
lastIdx++
|
||||
// insert a peering
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
})
|
||||
|
@ -357,6 +366,7 @@ func TestStore_PeeringList_Watch(t *testing.T) {
|
|||
// update peering
|
||||
lastIdx++
|
||||
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
|
@ -422,6 +432,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "create baz",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
},
|
||||
|
@ -429,6 +440,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "update baz",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
|
@ -437,6 +449,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "mark baz for deletion",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
State: pbpeering.PeeringState_TERMINATED,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
|
@ -446,6 +459,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "cannot update peering marked for deletion",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
// Attempt to add metadata
|
||||
Meta: map[string]string{
|
||||
|
@ -458,6 +472,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "cannot create peering marked for deletion",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
|
@ -472,54 +487,6 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStore_PeeringWrite_GenerateUUID(t *testing.T) {
|
||||
rand.Seed(1)
|
||||
|
||||
s := NewStateStore(nil)
|
||||
|
||||
entMeta := structs.NodeEnterpriseMetaInDefaultPartition()
|
||||
partition := entMeta.PartitionOrDefault()
|
||||
|
||||
for i := 1; i < 11; i++ {
|
||||
require.NoError(t, s.PeeringWrite(uint64(i), &pbpeering.Peering{
|
||||
Name: fmt.Sprintf("peering-%d", i),
|
||||
Partition: partition,
|
||||
}))
|
||||
}
|
||||
|
||||
idx, peerings, err := s.PeeringList(nil, *entMeta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(10), idx)
|
||||
require.Len(t, peerings, 10)
|
||||
|
||||
// Ensure that all assigned UUIDs are unique.
|
||||
uniq := make(map[string]struct{})
|
||||
for _, p := range peerings {
|
||||
uniq[p.ID] = struct{}{}
|
||||
}
|
||||
require.Len(t, uniq, 10)
|
||||
|
||||
// Ensure that the ID of an existing peering cannot be overwritten.
|
||||
updated := &pbpeering.Peering{
|
||||
Name: peerings[0].Name,
|
||||
Partition: peerings[0].Partition,
|
||||
}
|
||||
|
||||
// Attempt to overwrite ID.
|
||||
updated.ID, err = uuid.GenerateUUID()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, s.PeeringWrite(11, updated))
|
||||
|
||||
q := Query{
|
||||
Value: updated.Name,
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
idx, got, err := s.PeeringRead(nil, q)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(11), idx)
|
||||
require.Equal(t, peerings[0].ID, got.ID)
|
||||
}
|
||||
|
||||
func TestStore_PeeringDelete(t *testing.T) {
|
||||
s := NewStateStore(nil)
|
||||
insertTestPeerings(t, s)
|
||||
|
@ -532,6 +499,7 @@ func TestStore_PeeringDelete(t *testing.T) {
|
|||
|
||||
testutil.RunStep(t, "can delete after marking for deletion", func(t *testing.T) {
|
||||
require.NoError(t, s.PeeringWrite(11, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}))
|
||||
|
@ -550,7 +518,7 @@ func TestStore_PeeringTerminateByID(t *testing.T) {
|
|||
insertTestPeerings(t, s)
|
||||
|
||||
// id corresponding to default/foo
|
||||
id := "9e650110-ac74-4c5a-a6a8-9348b2bed4e9"
|
||||
const id = testFooPeerID
|
||||
|
||||
require.NoError(t, s.PeeringTerminateByID(10, id))
|
||||
|
||||
|
@ -607,7 +575,7 @@ func TestStateStore_PeeringTrustBundleRead(t *testing.T) {
|
|||
run := func(t *testing.T, tc testcase) {
|
||||
_, ptb, err := s.PeeringTrustBundleRead(nil, tc.query)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, ptb)
|
||||
prototest.AssertDeepEqual(t, tc.expect, ptb)
|
||||
}
|
||||
|
||||
entMeta := structs.NodeEnterpriseMetaInDefaultPartition()
|
||||
|
@ -708,6 +676,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testUUID(),
|
||||
Name: "my-peering",
|
||||
}))
|
||||
|
||||
|
@ -1000,6 +969,9 @@ func TestStateStore_PeeringsForService(t *testing.T) {
|
|||
var lastIdx uint64
|
||||
// Create peerings
|
||||
for _, tp := range tc.peerings {
|
||||
if tp.peering.ID == "" {
|
||||
tp.peering.ID = testUUID()
|
||||
}
|
||||
lastIdx++
|
||||
require.NoError(t, s.PeeringWrite(lastIdx, tp.peering))
|
||||
|
||||
|
@ -1009,6 +981,7 @@ func TestStateStore_PeeringsForService(t *testing.T) {
|
|||
lastIdx++
|
||||
|
||||
copied := pbpeering.Peering{
|
||||
ID: tp.peering.ID,
|
||||
Name: tp.peering.Name,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}
|
||||
|
@ -1247,6 +1220,11 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
var lastIdx uint64
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
var (
|
||||
peerID1 = testUUID()
|
||||
peerID2 = testUUID()
|
||||
)
|
||||
|
||||
testutil.RunStep(t, "no results on initial setup", func(t *testing.T) {
|
||||
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
|
||||
require.NoError(t, err)
|
||||
|
@ -1279,6 +1257,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
testutil.RunStep(t, "creating peering does not yield trust bundles", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID1,
|
||||
Name: "peer1",
|
||||
}))
|
||||
|
||||
|
@ -1377,6 +1356,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
testutil.RunStep(t, "bundles for other peers are ignored", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID2,
|
||||
Name: "peer2",
|
||||
}))
|
||||
|
||||
|
@ -1431,6 +1411,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
testutil.RunStep(t, "deleting the peering excludes its trust bundle", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID1,
|
||||
Name: "peer1",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}))
|
||||
|
@ -1470,7 +1451,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) {
|
|||
err := tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: acl.DefaultPartitionName,
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -1480,7 +1461,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) {
|
|||
err = tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "bar",
|
||||
Partition: acl.DefaultPartitionName,
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
})
|
||||
|
@ -1489,7 +1470,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) {
|
|||
err = tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "baz",
|
||||
Partition: acl.DefaultPartitionName,
|
||||
ID: "432feb2f-5476-4ae2-b33c-e43640ca0e86",
|
||||
ID: testBazPeerID,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 3,
|
||||
|
|
|
@ -0,0 +1,40 @@
|
|||
// Code generated by mockery v2.12.2. DO NOT EDIT.
|
||||
|
||||
package watch
|
||||
|
||||
import (
|
||||
testing "testing"
|
||||
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
)
|
||||
|
||||
// MockStateStore is an autogenerated mock type for the StateStore type
|
||||
type MockStateStore struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
// AbandonCh provides a mock function with given fields:
|
||||
func (_m *MockStateStore) AbandonCh() <-chan struct{} {
|
||||
ret := _m.Called()
|
||||
|
||||
var r0 <-chan struct{}
|
||||
if rf, ok := ret.Get(0).(func() <-chan struct{}); ok {
|
||||
r0 = rf()
|
||||
} else {
|
||||
if ret.Get(0) != nil {
|
||||
r0 = ret.Get(0).(<-chan struct{})
|
||||
}
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
|
||||
func NewMockStateStore(t testing.TB) *MockStateStore {
|
||||
mock := &MockStateStore{}
|
||||
mock.Mock.Test(t)
|
||||
|
||||
t.Cleanup(func() { mock.AssertExpectations(t) })
|
||||
|
||||
return mock
|
||||
}
|
|
@ -0,0 +1,332 @@
|
|||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
hashstructure_v2 "github.com/mitchellh/hashstructure/v2"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrorNotFound = errors.New("no data found for query")
|
||||
ErrorNotChanged = errors.New("data did not change for query")
|
||||
|
||||
errNilContext = errors.New("cannot call ServerLocalNotify with a nil context")
|
||||
errNilGetStore = errors.New("cannot call ServerLocalNotify without a callback to get a StateStore")
|
||||
errNilQuery = errors.New("cannot call ServerLocalNotify without a callback to perform the query")
|
||||
errNilNotify = errors.New("cannot call ServerLocalNotify without a callback to send notifications")
|
||||
)
|
||||
|
||||
//go:generate mockery --name StateStore --inpackage --testonly
|
||||
type StateStore interface {
|
||||
AbandonCh() <-chan struct{}
|
||||
}
|
||||
|
||||
const (
|
||||
defaultWaiterMinFailures uint = 1
|
||||
defaultWaiterMinWait = time.Second
|
||||
defaultWaiterMaxWait = 60 * time.Second
|
||||
defaultWaiterFactor = 2 * time.Second
|
||||
)
|
||||
|
||||
var (
|
||||
defaultWaiterJitter = retry.NewJitter(100)
|
||||
)
|
||||
|
||||
func defaultWaiter() *retry.Waiter {
|
||||
return &retry.Waiter{
|
||||
MinFailures: defaultWaiterMinFailures,
|
||||
MinWait: defaultWaiterMinWait,
|
||||
MaxWait: defaultWaiterMaxWait,
|
||||
Jitter: defaultWaiterJitter,
|
||||
Factor: defaultWaiterFactor,
|
||||
}
|
||||
}
|
||||
|
||||
// noopDone can be passed to serverLocalNotifyWithWaiter
|
||||
func noopDone() {}
|
||||
|
||||
// ServerLocalBlockingQuery performs a blocking query similar to the pre-existing blockingQuery
|
||||
// method on the agent/consul.Server type. There are a few key differences.
|
||||
//
|
||||
// 1. This function makes use of Go 1.18 generics. The function is parameterized with two
|
||||
// types. The first is the ResultType which can be anything. Having this be parameterized
|
||||
// instead of using interface{} allows us to simplify the call sites so that no type
|
||||
// coercion from interface{} to the real type is necessary. The second parameterized type
|
||||
// is something that VERY loosely resembles a agent/consul/state.Store type. The StateStore
|
||||
// interface in this package has a single method to get the stores abandon channel so we
|
||||
// know when a snapshot restore is occurring and can act accordingly. We could have not
|
||||
// parameterized this type and used a real *state.Store instead but then we would have
|
||||
// concrete dependencies on the state package and it would make it a little harder to
|
||||
// test this function.
|
||||
//
|
||||
// We could have also avoided the need to use a ResultType parameter by taking the route
|
||||
// the original blockingQuery method did and to just assume all callers close around
|
||||
// a pointer to their results and can modify it as necessary. That way of doing things
|
||||
// feels a little gross so I have taken this one a different direction. The old way
|
||||
// also gets especially gross with how we have to push concerns of spurious wakeup
|
||||
// suppression down into every call site.
|
||||
//
|
||||
// 2. This method has no internal timeout and can potentially run forever until a state
|
||||
// change is observed. If there is a desire to have a timeout, that should be built into
|
||||
// the context.Context passed as the first argument.
|
||||
//
|
||||
// 3. This method bakes in some newer functionality around hashing of results to prevent sending
|
||||
// back data when nothing has actually changed. With the old blockingQuery method this has to
|
||||
// be done within the closure passed to the method which means the same bit of code is duplicated
|
||||
// in many places. As this functionality isn't necessary in many scenarios whether to opt-in to
|
||||
// that behavior is a argument to this function.
|
||||
//
|
||||
// Similar to the older method:
|
||||
//
|
||||
// 1. Errors returned from the query will be propagated back to the caller.
|
||||
//
|
||||
// The query function must follow these rules:
|
||||
//
|
||||
// 1. To access data it must use the passed in StoreType (which will be a state.Store when
|
||||
// everything gets stiched together outside of unit tests).
|
||||
// 2. It must return an index greater than the minIndex if the results returned by the query
|
||||
// have changed.
|
||||
// 3. Any channels added to the memdb.WatchSet must unblock when the results
|
||||
// returned by the query have changed.
|
||||
//
|
||||
// To ensure optimal performance of the query, the query function should make a
|
||||
// best-effort attempt to follow these guidelines:
|
||||
//
|
||||
// 1. Only return an index greater than the minIndex.
|
||||
// 2. Any channels added to the memdb.WatchSet should only unblock when the
|
||||
// results returned by the query have changed. This might be difficult
|
||||
// to do when blocking on non-existent data.
|
||||
//
|
||||
func ServerLocalBlockingQuery[ResultType any, StoreType StateStore](
|
||||
ctx context.Context,
|
||||
getStore func() StoreType,
|
||||
minIndex uint64,
|
||||
suppressSpuriousWakeup bool,
|
||||
query func(memdb.WatchSet, StoreType) (uint64, ResultType, error),
|
||||
) (uint64, ResultType, error) {
|
||||
var (
|
||||
notFound bool
|
||||
ranOnce bool
|
||||
priorHash uint64
|
||||
)
|
||||
|
||||
var zeroResult ResultType
|
||||
if getStore == nil {
|
||||
return 0, zeroResult, fmt.Errorf("no getStore function was provided to ServerLocalBlockingQuery")
|
||||
}
|
||||
if query == nil {
|
||||
return 0, zeroResult, fmt.Errorf("no query function was provided to ServerLocalBlockingQuery")
|
||||
}
|
||||
|
||||
for {
|
||||
state := getStore()
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
// Adding the AbandonCh to the WatchSet allows us to detect when
|
||||
// a snapshot restore happens that would otherwise not modify anything
|
||||
// within the individual state store. If we didn't do this then we
|
||||
// could end up blocking indefinitely.
|
||||
ws.Add(state.AbandonCh())
|
||||
|
||||
index, result, err := query(ws, state)
|
||||
|
||||
switch {
|
||||
case errors.Is(err, ErrorNotFound):
|
||||
// if minIndex is 0 then we should never block but we
|
||||
// also should not propagate the error
|
||||
if minIndex == 0 {
|
||||
return index, result, nil
|
||||
}
|
||||
|
||||
// update the min index if the previous result was not found. This
|
||||
// is an attempt to not return data unnecessarily when we end up
|
||||
// watching the root of a memdb Radix tree because the data being
|
||||
// watched doesn't exist yet.
|
||||
if notFound {
|
||||
minIndex = index
|
||||
}
|
||||
|
||||
notFound = true
|
||||
case err != nil:
|
||||
return index, result, err
|
||||
}
|
||||
|
||||
// when enabled we can prevent sending back data that hasn't changed.
|
||||
if suppressSpuriousWakeup {
|
||||
newHash, err := hashstructure_v2.Hash(result, hashstructure_v2.FormatV2, nil)
|
||||
if err != nil {
|
||||
return index, result, fmt.Errorf("error hashing data for spurious wakeup suppression: %w", err)
|
||||
}
|
||||
|
||||
// set minIndex to the returned index to prevent sending back identical data
|
||||
if ranOnce && priorHash == newHash {
|
||||
minIndex = index
|
||||
}
|
||||
ranOnce = true
|
||||
priorHash = newHash
|
||||
}
|
||||
|
||||
// one final check if we should be considered unblocked and
|
||||
// return the value. Some conditions in the switch above
|
||||
// alter the minIndex and prevent this return if it would
|
||||
// be desirable. One such case is when the actual data has
|
||||
// not changed since the last round through the query and
|
||||
// we would rather not do any further processing for unchanged
|
||||
// data. This mostly protects against watches for data that
|
||||
// doesn't exist from return the non-existant value constantly.
|
||||
if index > minIndex {
|
||||
return index, result, nil
|
||||
}
|
||||
|
||||
// Block until something changes. Because we have added the state
|
||||
// stores AbandonCh to this watch set, a snapshot restore will
|
||||
// cause things to unblock in addition to changes to the actual
|
||||
// queried data.
|
||||
if err := ws.WatchCtx(ctx); err != nil {
|
||||
// exit if the context was cancelled
|
||||
return index, result, nil
|
||||
}
|
||||
|
||||
select {
|
||||
case <-state.AbandonCh():
|
||||
return index, result, nil
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ServerLocalNotify will watch for changes in the State Store using the provided
|
||||
// query function and invoke the notify callback whenever the results of that query
|
||||
// function have changed. This function will return an error if parameter validations
|
||||
// fail but otherwise the background go routine to process the notifications will
|
||||
// be spawned and nil will be returned. Just like ServerLocalBlockingQuery this makes
|
||||
// use of Go Generics and for the same reasons as outlined in the documentation for
|
||||
// that function.
|
||||
func ServerLocalNotify[ResultType any, StoreType StateStore](
|
||||
ctx context.Context,
|
||||
correlationID string,
|
||||
getStore func() StoreType,
|
||||
query func(memdb.WatchSet, StoreType) (uint64, ResultType, error),
|
||||
notify func(ctx context.Context, correlationID string, result ResultType, err error),
|
||||
) error {
|
||||
return serverLocalNotify(
|
||||
ctx,
|
||||
correlationID,
|
||||
getStore,
|
||||
query,
|
||||
notify,
|
||||
// Public callers should not need to know when the internal go routines are finished.
|
||||
// Being able to provide a done function to the internal version of this function is
|
||||
// to allow our tests to be more determinstic and to eliminate arbitrary sleeps.
|
||||
noopDone,
|
||||
// Public callers do not get to override the error backoff configuration. Internally
|
||||
// we want to allow for this to enable our unit tests to run much more quickly.
|
||||
defaultWaiter(),
|
||||
)
|
||||
}
|
||||
|
||||
// serverLocalNotify is the internal version of ServerLocalNotify. It takes
|
||||
// two additional arguments of the waiter to use and a function to call
|
||||
// when the notification go routine has finished
|
||||
func serverLocalNotify[ResultType any, StoreType StateStore](
|
||||
ctx context.Context,
|
||||
correlationID string,
|
||||
getStore func() StoreType,
|
||||
query func(memdb.WatchSet, StoreType) (uint64, ResultType, error),
|
||||
notify func(ctx context.Context, correlationID string, result ResultType, err error),
|
||||
done func(),
|
||||
waiter *retry.Waiter,
|
||||
) error {
|
||||
if ctx == nil {
|
||||
return errNilContext
|
||||
}
|
||||
|
||||
if getStore == nil {
|
||||
return errNilGetStore
|
||||
}
|
||||
|
||||
if query == nil {
|
||||
return errNilQuery
|
||||
}
|
||||
|
||||
if notify == nil {
|
||||
return errNilNotify
|
||||
}
|
||||
|
||||
go serverLocalNotifyRoutine(
|
||||
ctx,
|
||||
correlationID,
|
||||
getStore,
|
||||
query,
|
||||
notify,
|
||||
done,
|
||||
waiter,
|
||||
)
|
||||
return nil
|
||||
}
|
||||
|
||||
// serverLocalNotifyRoutine is the function intended to be run within a new
|
||||
// go routine to process the updates. It will not check to ensure callbacks
|
||||
// are non-nil nor perform other parameter validation. It is assumed that
|
||||
// the in-package caller of this method will have already done that. It also
|
||||
// takes the backoff waiter in as an argument so that unit tests within this
|
||||
// package can override the default values that the exported ServerLocalNotify
|
||||
// function would have set up.
|
||||
func serverLocalNotifyRoutine[ResultType any, StoreType StateStore](
|
||||
ctx context.Context,
|
||||
correlationID string,
|
||||
getStore func() StoreType,
|
||||
query func(memdb.WatchSet, StoreType) (uint64, ResultType, error),
|
||||
notify func(ctx context.Context, correlationID string, result ResultType, err error),
|
||||
done func(),
|
||||
waiter *retry.Waiter,
|
||||
) {
|
||||
defer done()
|
||||
|
||||
var minIndex uint64
|
||||
|
||||
for {
|
||||
// Check if the context has been cancelled. Do not issue
|
||||
// more queries if it has been cancelled.
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Perform the blocking query
|
||||
index, result, err := ServerLocalBlockingQuery(ctx, getStore, minIndex, true, query)
|
||||
|
||||
// Check if the context has been cancelled. If it has we should not send more
|
||||
// notifications.
|
||||
if ctx.Err() != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Check the index to see if we should call notify
|
||||
if minIndex == 0 || minIndex < index {
|
||||
notify(ctx, correlationID, result, err)
|
||||
minIndex = index
|
||||
}
|
||||
|
||||
// Handle errors with backoff. Badly behaved blocking calls that returned
|
||||
// a zero index are considered as failures since we need to not get stuck
|
||||
// in a busy loop.
|
||||
if err == nil && index > 0 {
|
||||
waiter.Reset()
|
||||
} else {
|
||||
if waiter.Wait(ctx) != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// ensure we don't use zero indexes
|
||||
if err == nil && minIndex < 1 {
|
||||
minIndex = 1
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,424 @@
|
|||
package watch
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/lib/retry"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
mock "github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
type mockStoreProvider struct {
|
||||
mock.Mock
|
||||
}
|
||||
|
||||
func newMockStoreProvider(t *testing.T) *mockStoreProvider {
|
||||
t.Helper()
|
||||
provider := &mockStoreProvider{}
|
||||
t.Cleanup(func() {
|
||||
provider.AssertExpectations(t)
|
||||
})
|
||||
return provider
|
||||
}
|
||||
|
||||
func (m *mockStoreProvider) getStore() *MockStateStore {
|
||||
return m.Called().Get(0).(*MockStateStore)
|
||||
}
|
||||
|
||||
type testResult struct {
|
||||
value string
|
||||
}
|
||||
|
||||
func (m *mockStoreProvider) query(ws memdb.WatchSet, store *MockStateStore) (uint64, *testResult, error) {
|
||||
ret := m.Called(ws, store)
|
||||
|
||||
index := ret.Get(0).(uint64)
|
||||
result := ret.Get(1).(*testResult)
|
||||
err := ret.Error(2)
|
||||
|
||||
return index, result, err
|
||||
}
|
||||
|
||||
func (m *mockStoreProvider) notify(ctx context.Context, correlationID string, result *testResult, err error) {
|
||||
m.Called(ctx, correlationID, result, err)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_getStoreNotProvided(t *testing.T) {
|
||||
_, _, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
nil,
|
||||
0,
|
||||
true,
|
||||
func(memdb.WatchSet, *MockStateStore) (uint64, struct{}, error) {
|
||||
return 0, struct{}{}, nil
|
||||
},
|
||||
)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "no getStore function was provided")
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_queryNotProvided(t *testing.T) {
|
||||
var query func(memdb.WatchSet, *MockStateStore) (uint64, struct{}, error)
|
||||
_, _, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
func() *MockStateStore { return nil },
|
||||
0,
|
||||
true,
|
||||
query,
|
||||
)
|
||||
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "no query function was provided")
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_NonBlocking(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(abandonCh) })
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Once()
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").Return(store).Once()
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(1), &testResult{value: "foo"}, nil).
|
||||
Once()
|
||||
|
||||
idx, result, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
provider.getStore,
|
||||
0,
|
||||
true,
|
||||
provider.query,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, idx)
|
||||
require.Equal(t, &testResult{value: "foo"}, result)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_NotFound(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(abandonCh) })
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Once()
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").
|
||||
Return(store).
|
||||
Once()
|
||||
|
||||
var nilResult *testResult
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(1), nilResult, ErrorNotFound).
|
||||
Once()
|
||||
|
||||
idx, result, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
provider.getStore,
|
||||
0,
|
||||
true,
|
||||
provider.query,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 1, idx)
|
||||
require.Nil(t, result)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_NotFoundBlocks(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(abandonCh) })
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Times(5)
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").
|
||||
Return(store).
|
||||
Times(3)
|
||||
|
||||
var nilResult *testResult
|
||||
// Initial data returned is not found and has an index less than the original
|
||||
// blocking index. This should not return data to the caller.
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(4), nilResult, ErrorNotFound).
|
||||
Run(addReadyWatchSet).
|
||||
Once()
|
||||
// There is an update to the data but the value still doesn't exist. Therefore
|
||||
// we should not return data to the caller.
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(6), nilResult, ErrorNotFound).
|
||||
Run(addReadyWatchSet).
|
||||
Once()
|
||||
// Finally we have some real data and can return it to the caller.
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(7), &testResult{value: "foo"}, nil).
|
||||
Once()
|
||||
|
||||
idx, result, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
provider.getStore,
|
||||
5,
|
||||
true,
|
||||
provider.query,
|
||||
)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 7, idx)
|
||||
require.Equal(t, &testResult{value: "foo"}, result)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_Error(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(abandonCh) })
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Once()
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").
|
||||
Return(store).
|
||||
Once()
|
||||
|
||||
var nilResult *testResult
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(10), nilResult, fmt.Errorf("synthetic error")).
|
||||
Once()
|
||||
|
||||
idx, result, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
provider.getStore,
|
||||
4,
|
||||
true,
|
||||
provider.query,
|
||||
)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), "synthetic error")
|
||||
require.EqualValues(t, 10, idx)
|
||||
require.Nil(t, result)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_ContextCancellation(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
t.Cleanup(func() { close(abandonCh) })
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Once()
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").
|
||||
Return(store).
|
||||
Once()
|
||||
provider.On("query", mock.Anything, store).
|
||||
// Return an index that should not cause the blocking query to return.
|
||||
Return(uint64(4), &testResult{value: "foo"}, nil).
|
||||
Once().
|
||||
Run(func(_ mock.Arguments) {
|
||||
// Cancel the context so that the memdb WatchCtx call will error.
|
||||
cancel()
|
||||
})
|
||||
|
||||
idx, result, err := ServerLocalBlockingQuery(
|
||||
ctx,
|
||||
provider.getStore,
|
||||
8,
|
||||
true,
|
||||
provider.query,
|
||||
)
|
||||
// The internal cancellation error should not be propagated.
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, idx)
|
||||
require.Equal(t, &testResult{value: "foo"}, result)
|
||||
}
|
||||
|
||||
func TestServerLocalBlockingQuery_StateAbandoned(t *testing.T) {
|
||||
abandonCh := make(chan struct{})
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Twice()
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").
|
||||
Return(store).
|
||||
Once()
|
||||
provider.On("query", mock.Anything, store).
|
||||
// Return an index that should not cause the blocking query to return.
|
||||
Return(uint64(4), &testResult{value: "foo"}, nil).
|
||||
Once().
|
||||
Run(func(_ mock.Arguments) {
|
||||
// Cancel the context so that the memdb WatchCtx call will error.
|
||||
close(abandonCh)
|
||||
})
|
||||
|
||||
idx, result, err := ServerLocalBlockingQuery(
|
||||
context.Background(),
|
||||
provider.getStore,
|
||||
8,
|
||||
true,
|
||||
provider.query,
|
||||
)
|
||||
// The internal cancellation error should not be propagated.
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, 4, idx)
|
||||
require.Equal(t, &testResult{value: "foo"}, result)
|
||||
}
|
||||
|
||||
func TestServerLocalNotify_Validations(t *testing.T) {
|
||||
provider := newMockStoreProvider(t)
|
||||
|
||||
type testCase struct {
|
||||
ctx context.Context
|
||||
getStore func() *MockStateStore
|
||||
query func(memdb.WatchSet, *MockStateStore) (uint64, *testResult, error)
|
||||
notify func(context.Context, string, *testResult, error)
|
||||
err error
|
||||
}
|
||||
|
||||
cases := map[string]testCase{
|
||||
"nil-context": {
|
||||
getStore: provider.getStore,
|
||||
query: provider.query,
|
||||
notify: provider.notify,
|
||||
err: errNilContext,
|
||||
},
|
||||
"nil-getStore": {
|
||||
ctx: context.Background(),
|
||||
query: provider.query,
|
||||
notify: provider.notify,
|
||||
err: errNilGetStore,
|
||||
},
|
||||
"nil-query": {
|
||||
ctx: context.Background(),
|
||||
getStore: provider.getStore,
|
||||
notify: provider.notify,
|
||||
err: errNilQuery,
|
||||
},
|
||||
"nil-notify": {
|
||||
ctx: context.Background(),
|
||||
getStore: provider.getStore,
|
||||
query: provider.query,
|
||||
err: errNilNotify,
|
||||
},
|
||||
}
|
||||
|
||||
for name, tcase := range cases {
|
||||
t.Run(name, func(t *testing.T) {
|
||||
err := ServerLocalNotify(tcase.ctx, "test", tcase.getStore, tcase.query, tcase.notify)
|
||||
require.ErrorIs(t, err, tcase.err)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerLocalNotify(t *testing.T) {
|
||||
notifyCtx, notifyCancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(notifyCancel)
|
||||
|
||||
abandonCh := make(chan struct{})
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Times(3)
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").
|
||||
Return(store).
|
||||
Times(3)
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(4), &testResult{value: "foo"}, nil).
|
||||
Once()
|
||||
provider.On("notify", notifyCtx, t.Name(), &testResult{value: "foo"}, nil).Once()
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(6), &testResult{value: "bar"}, nil).
|
||||
Once()
|
||||
provider.On("notify", notifyCtx, t.Name(), &testResult{value: "bar"}, nil).Once()
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(7), &testResult{value: "baz"}, context.Canceled).
|
||||
Run(func(mock.Arguments) {
|
||||
notifyCancel()
|
||||
})
|
||||
|
||||
doneCtx, routineDone := context.WithCancel(context.Background())
|
||||
err := serverLocalNotify(notifyCtx, t.Name(), provider.getStore, provider.query, provider.notify, routineDone, defaultWaiter())
|
||||
require.NoError(t, err)
|
||||
|
||||
// Wait for the context cancellation which will happen when the "query" func is run the third time. The doneCtx gets "cancelled"
|
||||
// by the backgrounded go routine when it is actually finished. We need to wait for this to ensure that all mocked calls have been
|
||||
// made and that no extra calls get made.
|
||||
<-doneCtx.Done()
|
||||
}
|
||||
|
||||
func TestServerLocalNotify_internal(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
abandonCh := make(chan struct{})
|
||||
|
||||
store := NewMockStateStore(t)
|
||||
store.On("AbandonCh").
|
||||
Return(closeChan(abandonCh)).
|
||||
Times(4)
|
||||
|
||||
var nilResult *testResult
|
||||
|
||||
provider := newMockStoreProvider(t)
|
||||
provider.On("getStore").
|
||||
Return(store).
|
||||
Times(4)
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(0), nilResult, fmt.Errorf("injected error")).
|
||||
Times(3)
|
||||
provider.On("notify", ctx, "test", nilResult, fmt.Errorf("injected error")).
|
||||
Times(3)
|
||||
provider.On("query", mock.Anything, store).
|
||||
Return(uint64(7), &testResult{value: "foo"}, nil).
|
||||
Once()
|
||||
provider.On("notify", ctx, "test", &testResult{value: "foo"}, nil).
|
||||
Once().
|
||||
Run(func(mock.Arguments) {
|
||||
cancel()
|
||||
})
|
||||
waiter := retry.Waiter{
|
||||
MinFailures: 1,
|
||||
MinWait: time.Millisecond,
|
||||
MaxWait: 50 * time.Millisecond,
|
||||
Jitter: retry.NewJitter(100),
|
||||
Factor: 2 * time.Millisecond,
|
||||
}
|
||||
|
||||
// all the mock expectations should ensure things are working properly
|
||||
serverLocalNotifyRoutine(ctx, "test", provider.getStore, provider.query, provider.notify, noopDone, &waiter)
|
||||
}
|
||||
|
||||
func addReadyWatchSet(args mock.Arguments) {
|
||||
ws := args.Get(0).(memdb.WatchSet)
|
||||
ch := make(chan struct{})
|
||||
ws.Add(ch)
|
||||
close(ch)
|
||||
}
|
||||
|
||||
// small convenience to make this more readable. The alternative in a few
|
||||
// cases would be to do something like (<-chan struct{})(ch). I find that
|
||||
// syntax very difficult to read.
|
||||
func closeChan(ch chan struct{}) <-chan struct{} {
|
||||
return ch
|
||||
}
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/dns"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
)
|
||||
|
||||
|
@ -140,6 +141,7 @@ type Store interface {
|
|||
|
||||
// Apply provides a write-only interface for persisting Peering data.
|
||||
type Apply interface {
|
||||
CheckPeeringUUID(id string) (bool, error)
|
||||
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
|
||||
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
|
||||
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
||||
|
@ -189,8 +191,16 @@ func (s *Service) GenerateToken(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
canRetry := true
|
||||
RETRY_ONCE:
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writeReq := pbpeering.PeeringWriteRequest{
|
||||
Peering: &pbpeering.Peering{
|
||||
ID: id,
|
||||
Name: req.PeerName,
|
||||
// TODO(peering): Normalize from ACL token once this endpoint is guarded by ACLs.
|
||||
Partition: req.PartitionOrDefault(),
|
||||
|
@ -198,6 +208,15 @@ func (s *Service) GenerateToken(
|
|||
},
|
||||
}
|
||||
if err := s.Backend.Apply().PeeringWrite(&writeReq); err != nil {
|
||||
// There's a possible race where two servers call Generate Token at the
|
||||
// same time with the same peer name for the first time. They both
|
||||
// generate an ID and try to insert and only one wins. This detects the
|
||||
// collision and forces the loser to discard its generated ID and use
|
||||
// the one from the other server.
|
||||
if canRetry && strings.Contains(err.Error(), "A peering already exists with the name") {
|
||||
canRetry = false
|
||||
goto RETRY_ONCE
|
||||
}
|
||||
return nil, fmt.Errorf("failed to write peering: %w", err)
|
||||
}
|
||||
|
||||
|
@ -270,6 +289,11 @@ func (s *Service) Establish(
|
|||
serverAddrs[i] = addr
|
||||
}
|
||||
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// as soon as a peering is written with a list of ServerAddresses that is
|
||||
// non-empty, the leader routine will see the peering and attempt to
|
||||
// establish a connection with the remote peer.
|
||||
|
@ -278,6 +302,7 @@ func (s *Service) Establish(
|
|||
// RemotePeerID(PeerID) but at this point the other peer does not.
|
||||
writeReq := &pbpeering.PeeringWriteRequest{
|
||||
Peering: &pbpeering.Peering{
|
||||
ID: id,
|
||||
Name: req.PeerName,
|
||||
PeerCAPems: tok.CA,
|
||||
PeerServerAddresses: serverAddrs,
|
||||
|
@ -368,6 +393,16 @@ func (s *Service) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteR
|
|||
defer metrics.MeasureSince([]string{"peering", "write"}, time.Now())
|
||||
// TODO(peering): ACL check request token
|
||||
|
||||
if req.Peering == nil {
|
||||
return nil, fmt.Errorf("missing required peering body")
|
||||
}
|
||||
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.Peering.Name, req.Peering.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Peering.ID = id
|
||||
|
||||
// TODO(peering): handle blocking queries
|
||||
err = s.Backend.Apply().PeeringWrite(req)
|
||||
if err != nil {
|
||||
|
@ -418,6 +453,7 @@ func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelet
|
|||
// We only need to include the name and partition for the peering to be identified.
|
||||
// All other data associated with the peering can be discarded because once marked
|
||||
// for deletion the peering is effectively gone.
|
||||
ID: existing.ID,
|
||||
Name: req.Name,
|
||||
Partition: req.Partition,
|
||||
DeletedAt: structs.TimeToProto(time.Now().UTC()),
|
||||
|
@ -837,6 +873,26 @@ func getTrustDomain(store Store, logger hclog.Logger) (string, error) {
|
|||
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
|
||||
}
|
||||
|
||||
func (s *Service) getExistingOrCreateNewPeerID(peerName, partition string) (string, error) {
|
||||
q := state.Query{
|
||||
Value: strings.ToLower(peerName),
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition),
|
||||
}
|
||||
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if peering != nil {
|
||||
return peering.ID, nil
|
||||
}
|
||||
|
||||
id, err := lib.GenerateUUID(s.Backend.Apply().CheckPeeringUUID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) {
|
||||
return s.streams.streamStatus(peer)
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/prototest"
|
||||
|
@ -224,6 +225,7 @@ func TestPeeringService_Read(t *testing.T) {
|
|||
|
||||
// insert peering directly to state store
|
||||
p := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerCAPems: nil,
|
||||
|
@ -279,6 +281,7 @@ func TestPeeringService_Delete(t *testing.T) {
|
|||
s := newTestServer(t, nil)
|
||||
|
||||
p := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerCAPems: nil,
|
||||
|
@ -316,6 +319,7 @@ func TestPeeringService_List(t *testing.T) {
|
|||
// Note that the state store holds reference to the underlying
|
||||
// variables; do not modify them after writing.
|
||||
foo := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerCAPems: nil,
|
||||
|
@ -324,6 +328,7 @@ func TestPeeringService_List(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(10, foo))
|
||||
bar := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "bar",
|
||||
State: pbpeering.PeeringState_ACTIVE,
|
||||
PeerCAPems: nil,
|
||||
|
@ -405,6 +410,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerServerName: "test",
|
||||
|
@ -413,6 +419,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "bar",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerServerName: "test-bar",
|
||||
|
@ -513,6 +520,7 @@ func Test_StreamHandler_UpsertServices(t *testing.T) {
|
|||
)
|
||||
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "my-peer",
|
||||
}))
|
||||
|
||||
|
@ -998,7 +1006,9 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps {
|
|||
}
|
||||
|
||||
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
|
||||
t.Helper()
|
||||
err := store.PeeringWrite(index, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: name,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -1009,3 +1019,9 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6
|
|||
|
||||
return p.ID
|
||||
}
|
||||
|
||||
func testUUID(t *testing.T) string {
|
||||
v, err := lib.GenerateUUID(nil)
|
||||
require.NoError(t, err)
|
||||
return v
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
|
@ -1030,6 +1031,10 @@ type testApplier struct {
|
|||
store *state.Store
|
||||
}
|
||||
|
||||
func (a *testApplier) CheckPeeringUUID(id string) (bool, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (a *testApplier) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
@ -1216,6 +1221,7 @@ func writeEstablishedPeering(t *testing.T, store *state.Store, idx uint64, peerN
|
|||
require.NoError(t, err)
|
||||
|
||||
peering := pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: peerName,
|
||||
PeerID: remotePeerID,
|
||||
}
|
||||
|
@ -2169,5 +2175,10 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes)
|
|||
require.Equal(t, expect[i].Checks[j].PartitionOrDefault(), got[i].Checks[j].PartitionOrDefault(), "partition mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testUUID(t *testing.T) string {
|
||||
v, err := lib.GenerateUUID(nil)
|
||||
require.NoError(t, err)
|
||||
return v
|
||||
}
|
||||
|
|
|
@ -589,6 +589,7 @@ func (b *testSubscriptionBackend) ensureCARoots(t *testing.T, roots ...*structs.
|
|||
|
||||
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
|
||||
err := store.PeeringWrite(index, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: name,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
|
|
@ -14,11 +14,11 @@ require (
|
|||
require (
|
||||
github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect
|
||||
github.com/Microsoft/go-winio v0.4.17 // indirect
|
||||
github.com/Microsoft/hcsshim v0.8.23 // indirect
|
||||
github.com/Microsoft/hcsshim v0.8.24 // indirect
|
||||
github.com/armon/go-metrics v0.3.10 // indirect
|
||||
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
|
||||
github.com/containerd/cgroups v1.0.1 // indirect
|
||||
github.com/containerd/containerd v1.5.9 // indirect
|
||||
github.com/containerd/cgroups v1.0.3 // indirect
|
||||
github.com/containerd/containerd v1.5.13 // indirect
|
||||
github.com/davecgh/go-spew v1.1.1 // indirect
|
||||
github.com/docker/distribution v2.7.1+incompatible // indirect
|
||||
github.com/docker/go-connections v0.4.0 // indirect
|
||||
|
|
|
@ -55,8 +55,9 @@ github.com/Microsoft/hcsshim v0.8.9/go.mod h1:5692vkUqntj1idxauYlpoINNKeqCiG6Sg3
|
|||
github.com/Microsoft/hcsshim v0.8.14/go.mod h1:NtVKoYxQuTLx6gEq0L96c9Ju4JbRJ4nY2ow3VK6a9Lg=
|
||||
github.com/Microsoft/hcsshim v0.8.15/go.mod h1:x38A4YbHbdxJtc0sF6oIz+RG0npwSCAvn69iY6URG00=
|
||||
github.com/Microsoft/hcsshim v0.8.16/go.mod h1:o5/SZqmR7x9JNKsW3pu+nqHm0MF8vbA+VxGOoXdC600=
|
||||
github.com/Microsoft/hcsshim v0.8.23 h1:47MSwtKGXet80aIn+7h4YI6fwPmwIghAnsx2aOUrG2M=
|
||||
github.com/Microsoft/hcsshim v0.8.23/go.mod h1:4zegtUJth7lAvFyc6cH2gGQ5B3OFQim01nnU2M8jKDg=
|
||||
github.com/Microsoft/hcsshim v0.8.24 h1:jP+GMeRXIR1sH1kG4lJr9ShmSjVrua5jmFZDtfYGkn4=
|
||||
github.com/Microsoft/hcsshim v0.8.24/go.mod h1:4zegtUJth7lAvFyc6cH2gGQ5B3OFQim01nnU2M8jKDg=
|
||||
github.com/Microsoft/hcsshim/test v0.0.0-20201218223536-d3e5debf77da/go.mod h1:5hlzMzRKMLyo42nCZ9oml8AdTlq/0cvIaBv6tK1RehU=
|
||||
github.com/Microsoft/hcsshim/test v0.0.0-20210227013316-43a75bb4edd3/go.mod h1:mw7qgWloBUl75W/gVH3cQszUg1+gUITj7D6NY7ywVnY=
|
||||
github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ=
|
||||
|
@ -130,8 +131,9 @@ github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59/go.mod h1:pA0z1
|
|||
github.com/containerd/cgroups v0.0.0-20200710171044-318312a37340/go.mod h1:s5q4SojHctfxANBDvMeIaIovkq29IP48TKAxnhYRxvo=
|
||||
github.com/containerd/cgroups v0.0.0-20200824123100-0b889c03f102/go.mod h1:s5q4SojHctfxANBDvMeIaIovkq29IP48TKAxnhYRxvo=
|
||||
github.com/containerd/cgroups v0.0.0-20210114181951-8a68de567b68/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE=
|
||||
github.com/containerd/cgroups v1.0.1 h1:iJnMvco9XGvKUvNQkv88bE4uJXxRQH18efbKo9w5vHQ=
|
||||
github.com/containerd/cgroups v1.0.1/go.mod h1:0SJrPIenamHDcZhEcJMNBB85rHcUsw4f25ZfBiPYRkU=
|
||||
github.com/containerd/cgroups v1.0.3 h1:ADZftAkglvCiD44c77s5YmMqaP2pzVCFZvBmAlBdAP4=
|
||||
github.com/containerd/cgroups v1.0.3/go.mod h1:/ofk34relqNjSGyqPrmEULrO4Sc8LJhvJmWbUCUKqj8=
|
||||
github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
|
||||
github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw=
|
||||
github.com/containerd/console v0.0.0-20191206165004-02ecf6a7291e/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE=
|
||||
|
@ -150,8 +152,9 @@ github.com/containerd/containerd v1.5.0-beta.1/go.mod h1:5HfvG1V2FsKesEGQ17k5/T7
|
|||
github.com/containerd/containerd v1.5.0-beta.3/go.mod h1:/wr9AVtEM7x9c+n0+stptlo/uBBoBORwEx6ardVcmKU=
|
||||
github.com/containerd/containerd v1.5.0-beta.4/go.mod h1:GmdgZd2zA2GYIBZ0w09ZvgqEq8EfBp/m3lcVZIvPHhI=
|
||||
github.com/containerd/containerd v1.5.0-rc.0/go.mod h1:V/IXoMqNGgBlabz3tHD2TWDoTJseu1FGOKuoA4nNb2s=
|
||||
github.com/containerd/containerd v1.5.9 h1:rs6Xg1gtIxaeyG+Smsb/0xaSDu1VgFhOCKBXxMxbsF4=
|
||||
github.com/containerd/containerd v1.5.9/go.mod h1:fvQqCfadDGga5HZyn3j4+dx56qj2I9YwBrlSdalvJYQ=
|
||||
github.com/containerd/containerd v1.5.13 h1:XqvKw9i4P7/mFrC3TSM7yV5cwFZ9avXe6M3YANKnzEE=
|
||||
github.com/containerd/containerd v1.5.13/go.mod h1:3AlCrzKROjIuP3JALsY14n8YtntaUDBu7vek+rPN5Vc=
|
||||
github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
|
||||
github.com/containerd/continuity v0.0.0-20190815185530-f2a389ac0a02/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
|
||||
github.com/containerd/continuity v0.0.0-20191127005431-f65d91d395eb/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y=
|
||||
|
@ -699,6 +702,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q
|
|||
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
|
||||
github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs=
|
||||
github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA=
|
||||
github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg=
|
||||
|
@ -715,6 +719,7 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
|
|||
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
|
||||
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
|
||||
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
|
||||
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
|
||||
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
|
||||
golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
|
||||
|
@ -761,6 +766,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB
|
|||
golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
|
||||
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
|
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
|
||||
|
@ -797,6 +803,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
|
|||
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
|
||||
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
|
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
|
||||
golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8=
|
||||
golang.org/x/net v0.0.0-20211108170745-6635138e15ea/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
|
||||
golang.org/x/net v0.0.0-20211216030914-fe4d6282115f h1:hEYJvxw1lSnWIl8X9ofsYMklzaDs90JI2az5YMd4fPM=
|
||||
|
@ -886,6 +893,7 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w
|
|||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
|
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
|
@ -946,6 +954,7 @@ golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjs
|
|||
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
|
||||
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
|
||||
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
|
||||
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
|
||||
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
|
||||
|
|
|
@ -15,9 +15,9 @@ For every release of Consul on Kubernetes, a Helm chart, `consul-k8s-control-pla
|
|||
Starting with Consul Kubernetes 0.33.0, Consul Kubernetes versions all of its components (`consul-k8s` CLI, `consul-k8s-control-plane`, and Helm chart) with a single semantic version.
|
||||
|
||||
| Consul Version | Compatible consul-k8s Versions |
|
||||
| -------------- | ------------------------------- |
|
||||
| -------------- | -------------------------------- |
|
||||
| 1.12.x | 0.43.0 - latest |
|
||||
| 1.11.x | 0.39.0 - 0.42.0, 0.44.0 |
|
||||
| 1.11.x | 0.39.0 - 0.42.0, 0.44.0 - latest |
|
||||
| 1.10.x | 0.33.0 - 0.38.0 |
|
||||
|
||||
### Prior to version 0.33.0
|
||||
|
|
Loading…
Reference in New Issue