From 355cbfa7668a9e5b89ad17bdb4ad91b2fb7b33a9 Mon Sep 17 00:00:00 2001 From: Dhia Ayachi Date: Tue, 21 Jun 2022 12:20:00 -0400 Subject: [PATCH 1/4] update github.com/containerd/containerd to 1.5.13 (#13520) --- test/integration/consul-container/go.mod | 6 +++--- test/integration/consul-container/go.sum | 15 ++++++++++++--- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/test/integration/consul-container/go.mod b/test/integration/consul-container/go.mod index b5a7b17384..4dea73bbab 100644 --- a/test/integration/consul-container/go.mod +++ b/test/integration/consul-container/go.mod @@ -14,11 +14,11 @@ require ( require ( github.com/Azure/go-ansiterm v0.0.0-20210617225240-d185dfc1b5a1 // indirect github.com/Microsoft/go-winio v0.4.17 // indirect - github.com/Microsoft/hcsshim v0.8.23 // indirect + github.com/Microsoft/hcsshim v0.8.24 // indirect github.com/armon/go-metrics v0.3.10 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect - github.com/containerd/cgroups v1.0.1 // indirect - github.com/containerd/containerd v1.5.9 // indirect + github.com/containerd/cgroups v1.0.3 // indirect + github.com/containerd/containerd v1.5.13 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/docker/distribution v2.7.1+incompatible // indirect github.com/docker/go-connections v0.4.0 // indirect diff --git a/test/integration/consul-container/go.sum b/test/integration/consul-container/go.sum index 7d67cf3caf..4bc58fd5c1 100644 --- a/test/integration/consul-container/go.sum +++ b/test/integration/consul-container/go.sum @@ -55,8 +55,9 @@ github.com/Microsoft/hcsshim v0.8.9/go.mod h1:5692vkUqntj1idxauYlpoINNKeqCiG6Sg3 github.com/Microsoft/hcsshim v0.8.14/go.mod h1:NtVKoYxQuTLx6gEq0L96c9Ju4JbRJ4nY2ow3VK6a9Lg= github.com/Microsoft/hcsshim v0.8.15/go.mod h1:x38A4YbHbdxJtc0sF6oIz+RG0npwSCAvn69iY6URG00= github.com/Microsoft/hcsshim v0.8.16/go.mod h1:o5/SZqmR7x9JNKsW3pu+nqHm0MF8vbA+VxGOoXdC600= -github.com/Microsoft/hcsshim v0.8.23 h1:47MSwtKGXet80aIn+7h4YI6fwPmwIghAnsx2aOUrG2M= github.com/Microsoft/hcsshim v0.8.23/go.mod h1:4zegtUJth7lAvFyc6cH2gGQ5B3OFQim01nnU2M8jKDg= +github.com/Microsoft/hcsshim v0.8.24 h1:jP+GMeRXIR1sH1kG4lJr9ShmSjVrua5jmFZDtfYGkn4= +github.com/Microsoft/hcsshim v0.8.24/go.mod h1:4zegtUJth7lAvFyc6cH2gGQ5B3OFQim01nnU2M8jKDg= github.com/Microsoft/hcsshim/test v0.0.0-20201218223536-d3e5debf77da/go.mod h1:5hlzMzRKMLyo42nCZ9oml8AdTlq/0cvIaBv6tK1RehU= github.com/Microsoft/hcsshim/test v0.0.0-20210227013316-43a75bb4edd3/go.mod h1:mw7qgWloBUl75W/gVH3cQszUg1+gUITj7D6NY7ywVnY= github.com/NYTimes/gziphandler v0.0.0-20170623195520-56545f4a5d46/go.mod h1:3wb06e3pkSAbeQ52E9H9iFoQsEEwGN64994WTCIhntQ= @@ -130,8 +131,9 @@ github.com/containerd/cgroups v0.0.0-20200531161412-0dbf7f05ba59/go.mod h1:pA0z1 github.com/containerd/cgroups v0.0.0-20200710171044-318312a37340/go.mod h1:s5q4SojHctfxANBDvMeIaIovkq29IP48TKAxnhYRxvo= github.com/containerd/cgroups v0.0.0-20200824123100-0b889c03f102/go.mod h1:s5q4SojHctfxANBDvMeIaIovkq29IP48TKAxnhYRxvo= github.com/containerd/cgroups v0.0.0-20210114181951-8a68de567b68/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= -github.com/containerd/cgroups v1.0.1 h1:iJnMvco9XGvKUvNQkv88bE4uJXxRQH18efbKo9w5vHQ= github.com/containerd/cgroups v1.0.1/go.mod h1:0SJrPIenamHDcZhEcJMNBB85rHcUsw4f25ZfBiPYRkU= +github.com/containerd/cgroups v1.0.3 h1:ADZftAkglvCiD44c77s5YmMqaP2pzVCFZvBmAlBdAP4= +github.com/containerd/cgroups v1.0.3/go.mod h1:/ofk34relqNjSGyqPrmEULrO4Sc8LJhvJmWbUCUKqj8= github.com/containerd/console v0.0.0-20180822173158-c12b1e7919c1/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20181022165439-0650fd9eeb50/go.mod h1:Tj/on1eG8kiEhd0+fhSDzsPAFESxzBBvdyEgyryXffw= github.com/containerd/console v0.0.0-20191206165004-02ecf6a7291e/go.mod h1:8Pf4gM6VEbTNRIT26AyyU7hxdQU3MvAvxVI0sc00XBE= @@ -150,8 +152,9 @@ github.com/containerd/containerd v1.5.0-beta.1/go.mod h1:5HfvG1V2FsKesEGQ17k5/T7 github.com/containerd/containerd v1.5.0-beta.3/go.mod h1:/wr9AVtEM7x9c+n0+stptlo/uBBoBORwEx6ardVcmKU= github.com/containerd/containerd v1.5.0-beta.4/go.mod h1:GmdgZd2zA2GYIBZ0w09ZvgqEq8EfBp/m3lcVZIvPHhI= github.com/containerd/containerd v1.5.0-rc.0/go.mod h1:V/IXoMqNGgBlabz3tHD2TWDoTJseu1FGOKuoA4nNb2s= -github.com/containerd/containerd v1.5.9 h1:rs6Xg1gtIxaeyG+Smsb/0xaSDu1VgFhOCKBXxMxbsF4= github.com/containerd/containerd v1.5.9/go.mod h1:fvQqCfadDGga5HZyn3j4+dx56qj2I9YwBrlSdalvJYQ= +github.com/containerd/containerd v1.5.13 h1:XqvKw9i4P7/mFrC3TSM7yV5cwFZ9avXe6M3YANKnzEE= +github.com/containerd/containerd v1.5.13/go.mod h1:3AlCrzKROjIuP3JALsY14n8YtntaUDBu7vek+rPN5Vc= github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/containerd/continuity v0.0.0-20190815185530-f2a389ac0a02/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/containerd/continuity v0.0.0-20191127005431-f65d91d395eb/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= @@ -699,6 +702,7 @@ github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yvasiyarov/go-metrics v0.0.0-20140926110328-57bccd1ccd43/go.mod h1:aX5oPXxHm3bOH+xeAttToC8pqch2ScQN/JoXYupl6xs= github.com/yvasiyarov/gorelic v0.0.0-20141212073537-a9bba5b9ab50/go.mod h1:NUSPSUX/bi6SeDMUh6brw0nXpxHnc96TguQh0+r/ssA= github.com/yvasiyarov/newrelic_platform_go v0.0.0-20140908184405-b21fdbd4370f/go.mod h1:GlGEuHIJweS1mbCqG+7vt2nvWLzLLnRHbXz5JKd/Qbg= @@ -715,6 +719,7 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= golang.org/x/crypto v0.0.0-20171113213409-9f005a07e0d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -761,6 +766,7 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191107180719-034126e5016b/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -797,6 +803,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210410081132-afb366fc7cd1/go.mod h1:9tjilg8BloeKEkVJvy7fQ90B1CfIiPueXVOjqfkSzI8= golang.org/x/net v0.0.0-20211108170745-6635138e15ea/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211216030914-fe4d6282115f h1:hEYJvxw1lSnWIl8X9ofsYMklzaDs90JI2az5YMd4fPM= @@ -886,6 +893,7 @@ golang.org/x/sys v0.0.0-20210324051608-47abb6519492/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -946,6 +954,7 @@ golang.org/x/tools v0.0.0-20200304193943-95d2e580d8eb/go.mod h1:o4KQGtdN14AW+yjs golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= +golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= From cb01702cd227692db37f24a207ab3ca0785cbc37 Mon Sep 17 00:00:00 2001 From: Matt Keeler Date: Tue, 21 Jun 2022 13:36:49 -0400 Subject: [PATCH 2/4] Add server local blocking queries and watches (#13438) Co-authored-by: Dan Upton --- agent/consul/watch/mock_StateStore_test.go | 40 ++ agent/consul/watch/server_local.go | 332 ++++++++++++++++ agent/consul/watch/server_local_test.go | 424 +++++++++++++++++++++ 3 files changed, 796 insertions(+) create mode 100644 agent/consul/watch/mock_StateStore_test.go create mode 100644 agent/consul/watch/server_local.go create mode 100644 agent/consul/watch/server_local_test.go diff --git a/agent/consul/watch/mock_StateStore_test.go b/agent/consul/watch/mock_StateStore_test.go new file mode 100644 index 0000000000..08d58e2f04 --- /dev/null +++ b/agent/consul/watch/mock_StateStore_test.go @@ -0,0 +1,40 @@ +// Code generated by mockery v2.12.2. DO NOT EDIT. + +package watch + +import ( + testing "testing" + + mock "github.com/stretchr/testify/mock" +) + +// MockStateStore is an autogenerated mock type for the StateStore type +type MockStateStore struct { + mock.Mock +} + +// AbandonCh provides a mock function with given fields: +func (_m *MockStateStore) AbandonCh() <-chan struct{} { + ret := _m.Called() + + var r0 <-chan struct{} + if rf, ok := ret.Get(0).(func() <-chan struct{}); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(<-chan struct{}) + } + } + + return r0 +} + +// NewMockStateStore creates a new instance of MockStateStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations. +func NewMockStateStore(t testing.TB) *MockStateStore { + mock := &MockStateStore{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/agent/consul/watch/server_local.go b/agent/consul/watch/server_local.go new file mode 100644 index 0000000000..8085396fe2 --- /dev/null +++ b/agent/consul/watch/server_local.go @@ -0,0 +1,332 @@ +package watch + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/go-memdb" + hashstructure_v2 "github.com/mitchellh/hashstructure/v2" +) + +var ( + ErrorNotFound = errors.New("no data found for query") + ErrorNotChanged = errors.New("data did not change for query") + + errNilContext = errors.New("cannot call ServerLocalNotify with a nil context") + errNilGetStore = errors.New("cannot call ServerLocalNotify without a callback to get a StateStore") + errNilQuery = errors.New("cannot call ServerLocalNotify without a callback to perform the query") + errNilNotify = errors.New("cannot call ServerLocalNotify without a callback to send notifications") +) + +//go:generate mockery --name StateStore --inpackage --testonly +type StateStore interface { + AbandonCh() <-chan struct{} +} + +const ( + defaultWaiterMinFailures uint = 1 + defaultWaiterMinWait = time.Second + defaultWaiterMaxWait = 60 * time.Second + defaultWaiterFactor = 2 * time.Second +) + +var ( + defaultWaiterJitter = retry.NewJitter(100) +) + +func defaultWaiter() *retry.Waiter { + return &retry.Waiter{ + MinFailures: defaultWaiterMinFailures, + MinWait: defaultWaiterMinWait, + MaxWait: defaultWaiterMaxWait, + Jitter: defaultWaiterJitter, + Factor: defaultWaiterFactor, + } +} + +// noopDone can be passed to serverLocalNotifyWithWaiter +func noopDone() {} + +// ServerLocalBlockingQuery performs a blocking query similar to the pre-existing blockingQuery +// method on the agent/consul.Server type. There are a few key differences. +// +// 1. This function makes use of Go 1.18 generics. The function is parameterized with two +// types. The first is the ResultType which can be anything. Having this be parameterized +// instead of using interface{} allows us to simplify the call sites so that no type +// coercion from interface{} to the real type is necessary. The second parameterized type +// is something that VERY loosely resembles a agent/consul/state.Store type. The StateStore +// interface in this package has a single method to get the stores abandon channel so we +// know when a snapshot restore is occurring and can act accordingly. We could have not +// parameterized this type and used a real *state.Store instead but then we would have +// concrete dependencies on the state package and it would make it a little harder to +// test this function. +// +// We could have also avoided the need to use a ResultType parameter by taking the route +// the original blockingQuery method did and to just assume all callers close around +// a pointer to their results and can modify it as necessary. That way of doing things +// feels a little gross so I have taken this one a different direction. The old way +// also gets especially gross with how we have to push concerns of spurious wakeup +// suppression down into every call site. +// +// 2. This method has no internal timeout and can potentially run forever until a state +// change is observed. If there is a desire to have a timeout, that should be built into +// the context.Context passed as the first argument. +// +// 3. This method bakes in some newer functionality around hashing of results to prevent sending +// back data when nothing has actually changed. With the old blockingQuery method this has to +// be done within the closure passed to the method which means the same bit of code is duplicated +// in many places. As this functionality isn't necessary in many scenarios whether to opt-in to +// that behavior is a argument to this function. +// +// Similar to the older method: +// +// 1. Errors returned from the query will be propagated back to the caller. +// +// The query function must follow these rules: +// +// 1. To access data it must use the passed in StoreType (which will be a state.Store when +// everything gets stiched together outside of unit tests). +// 2. It must return an index greater than the minIndex if the results returned by the query +// have changed. +// 3. Any channels added to the memdb.WatchSet must unblock when the results +// returned by the query have changed. +// +// To ensure optimal performance of the query, the query function should make a +// best-effort attempt to follow these guidelines: +// +// 1. Only return an index greater than the minIndex. +// 2. Any channels added to the memdb.WatchSet should only unblock when the +// results returned by the query have changed. This might be difficult +// to do when blocking on non-existent data. +// +func ServerLocalBlockingQuery[ResultType any, StoreType StateStore]( + ctx context.Context, + getStore func() StoreType, + minIndex uint64, + suppressSpuriousWakeup bool, + query func(memdb.WatchSet, StoreType) (uint64, ResultType, error), +) (uint64, ResultType, error) { + var ( + notFound bool + ranOnce bool + priorHash uint64 + ) + + var zeroResult ResultType + if getStore == nil { + return 0, zeroResult, fmt.Errorf("no getStore function was provided to ServerLocalBlockingQuery") + } + if query == nil { + return 0, zeroResult, fmt.Errorf("no query function was provided to ServerLocalBlockingQuery") + } + + for { + state := getStore() + + ws := memdb.NewWatchSet() + + // Adding the AbandonCh to the WatchSet allows us to detect when + // a snapshot restore happens that would otherwise not modify anything + // within the individual state store. If we didn't do this then we + // could end up blocking indefinitely. + ws.Add(state.AbandonCh()) + + index, result, err := query(ws, state) + + switch { + case errors.Is(err, ErrorNotFound): + // if minIndex is 0 then we should never block but we + // also should not propagate the error + if minIndex == 0 { + return index, result, nil + } + + // update the min index if the previous result was not found. This + // is an attempt to not return data unnecessarily when we end up + // watching the root of a memdb Radix tree because the data being + // watched doesn't exist yet. + if notFound { + minIndex = index + } + + notFound = true + case err != nil: + return index, result, err + } + + // when enabled we can prevent sending back data that hasn't changed. + if suppressSpuriousWakeup { + newHash, err := hashstructure_v2.Hash(result, hashstructure_v2.FormatV2, nil) + if err != nil { + return index, result, fmt.Errorf("error hashing data for spurious wakeup suppression: %w", err) + } + + // set minIndex to the returned index to prevent sending back identical data + if ranOnce && priorHash == newHash { + minIndex = index + } + ranOnce = true + priorHash = newHash + } + + // one final check if we should be considered unblocked and + // return the value. Some conditions in the switch above + // alter the minIndex and prevent this return if it would + // be desirable. One such case is when the actual data has + // not changed since the last round through the query and + // we would rather not do any further processing for unchanged + // data. This mostly protects against watches for data that + // doesn't exist from return the non-existant value constantly. + if index > minIndex { + return index, result, nil + } + + // Block until something changes. Because we have added the state + // stores AbandonCh to this watch set, a snapshot restore will + // cause things to unblock in addition to changes to the actual + // queried data. + if err := ws.WatchCtx(ctx); err != nil { + // exit if the context was cancelled + return index, result, nil + } + + select { + case <-state.AbandonCh(): + return index, result, nil + default: + } + } +} + +// ServerLocalNotify will watch for changes in the State Store using the provided +// query function and invoke the notify callback whenever the results of that query +// function have changed. This function will return an error if parameter validations +// fail but otherwise the background go routine to process the notifications will +// be spawned and nil will be returned. Just like ServerLocalBlockingQuery this makes +// use of Go Generics and for the same reasons as outlined in the documentation for +// that function. +func ServerLocalNotify[ResultType any, StoreType StateStore]( + ctx context.Context, + correlationID string, + getStore func() StoreType, + query func(memdb.WatchSet, StoreType) (uint64, ResultType, error), + notify func(ctx context.Context, correlationID string, result ResultType, err error), +) error { + return serverLocalNotify( + ctx, + correlationID, + getStore, + query, + notify, + // Public callers should not need to know when the internal go routines are finished. + // Being able to provide a done function to the internal version of this function is + // to allow our tests to be more determinstic and to eliminate arbitrary sleeps. + noopDone, + // Public callers do not get to override the error backoff configuration. Internally + // we want to allow for this to enable our unit tests to run much more quickly. + defaultWaiter(), + ) +} + +// serverLocalNotify is the internal version of ServerLocalNotify. It takes +// two additional arguments of the waiter to use and a function to call +// when the notification go routine has finished +func serverLocalNotify[ResultType any, StoreType StateStore]( + ctx context.Context, + correlationID string, + getStore func() StoreType, + query func(memdb.WatchSet, StoreType) (uint64, ResultType, error), + notify func(ctx context.Context, correlationID string, result ResultType, err error), + done func(), + waiter *retry.Waiter, +) error { + if ctx == nil { + return errNilContext + } + + if getStore == nil { + return errNilGetStore + } + + if query == nil { + return errNilQuery + } + + if notify == nil { + return errNilNotify + } + + go serverLocalNotifyRoutine( + ctx, + correlationID, + getStore, + query, + notify, + done, + waiter, + ) + return nil +} + +// serverLocalNotifyRoutine is the function intended to be run within a new +// go routine to process the updates. It will not check to ensure callbacks +// are non-nil nor perform other parameter validation. It is assumed that +// the in-package caller of this method will have already done that. It also +// takes the backoff waiter in as an argument so that unit tests within this +// package can override the default values that the exported ServerLocalNotify +// function would have set up. +func serverLocalNotifyRoutine[ResultType any, StoreType StateStore]( + ctx context.Context, + correlationID string, + getStore func() StoreType, + query func(memdb.WatchSet, StoreType) (uint64, ResultType, error), + notify func(ctx context.Context, correlationID string, result ResultType, err error), + done func(), + waiter *retry.Waiter, +) { + defer done() + + var minIndex uint64 + + for { + // Check if the context has been cancelled. Do not issue + // more queries if it has been cancelled. + if ctx.Err() != nil { + return + } + + // Perform the blocking query + index, result, err := ServerLocalBlockingQuery(ctx, getStore, minIndex, true, query) + + // Check if the context has been cancelled. If it has we should not send more + // notifications. + if ctx.Err() != nil { + return + } + + // Check the index to see if we should call notify + if minIndex == 0 || minIndex < index { + notify(ctx, correlationID, result, err) + minIndex = index + } + + // Handle errors with backoff. Badly behaved blocking calls that returned + // a zero index are considered as failures since we need to not get stuck + // in a busy loop. + if err == nil && index > 0 { + waiter.Reset() + } else { + if waiter.Wait(ctx) != nil { + return + } + } + + // ensure we don't use zero indexes + if err == nil && minIndex < 1 { + minIndex = 1 + } + } +} diff --git a/agent/consul/watch/server_local_test.go b/agent/consul/watch/server_local_test.go new file mode 100644 index 0000000000..6fa440979b --- /dev/null +++ b/agent/consul/watch/server_local_test.go @@ -0,0 +1,424 @@ +package watch + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/hashicorp/consul/lib/retry" + "github.com/hashicorp/go-memdb" + mock "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +type mockStoreProvider struct { + mock.Mock +} + +func newMockStoreProvider(t *testing.T) *mockStoreProvider { + t.Helper() + provider := &mockStoreProvider{} + t.Cleanup(func() { + provider.AssertExpectations(t) + }) + return provider +} + +func (m *mockStoreProvider) getStore() *MockStateStore { + return m.Called().Get(0).(*MockStateStore) +} + +type testResult struct { + value string +} + +func (m *mockStoreProvider) query(ws memdb.WatchSet, store *MockStateStore) (uint64, *testResult, error) { + ret := m.Called(ws, store) + + index := ret.Get(0).(uint64) + result := ret.Get(1).(*testResult) + err := ret.Error(2) + + return index, result, err +} + +func (m *mockStoreProvider) notify(ctx context.Context, correlationID string, result *testResult, err error) { + m.Called(ctx, correlationID, result, err) +} + +func TestServerLocalBlockingQuery_getStoreNotProvided(t *testing.T) { + _, _, err := ServerLocalBlockingQuery( + context.Background(), + nil, + 0, + true, + func(memdb.WatchSet, *MockStateStore) (uint64, struct{}, error) { + return 0, struct{}{}, nil + }, + ) + + require.Error(t, err) + require.Contains(t, err.Error(), "no getStore function was provided") +} + +func TestServerLocalBlockingQuery_queryNotProvided(t *testing.T) { + var query func(memdb.WatchSet, *MockStateStore) (uint64, struct{}, error) + _, _, err := ServerLocalBlockingQuery( + context.Background(), + func() *MockStateStore { return nil }, + 0, + true, + query, + ) + + require.Error(t, err) + require.Contains(t, err.Error(), "no query function was provided") +} + +func TestServerLocalBlockingQuery_NonBlocking(t *testing.T) { + abandonCh := make(chan struct{}) + t.Cleanup(func() { close(abandonCh) }) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Once() + + provider := newMockStoreProvider(t) + provider.On("getStore").Return(store).Once() + provider.On("query", mock.Anything, store). + Return(uint64(1), &testResult{value: "foo"}, nil). + Once() + + idx, result, err := ServerLocalBlockingQuery( + context.Background(), + provider.getStore, + 0, + true, + provider.query, + ) + require.NoError(t, err) + require.EqualValues(t, 1, idx) + require.Equal(t, &testResult{value: "foo"}, result) +} + +func TestServerLocalBlockingQuery_NotFound(t *testing.T) { + abandonCh := make(chan struct{}) + t.Cleanup(func() { close(abandonCh) }) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Once() + + provider := newMockStoreProvider(t) + provider.On("getStore"). + Return(store). + Once() + + var nilResult *testResult + provider.On("query", mock.Anything, store). + Return(uint64(1), nilResult, ErrorNotFound). + Once() + + idx, result, err := ServerLocalBlockingQuery( + context.Background(), + provider.getStore, + 0, + true, + provider.query, + ) + require.NoError(t, err) + require.EqualValues(t, 1, idx) + require.Nil(t, result) +} + +func TestServerLocalBlockingQuery_NotFoundBlocks(t *testing.T) { + abandonCh := make(chan struct{}) + t.Cleanup(func() { close(abandonCh) }) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Times(5) + + provider := newMockStoreProvider(t) + provider.On("getStore"). + Return(store). + Times(3) + + var nilResult *testResult + // Initial data returned is not found and has an index less than the original + // blocking index. This should not return data to the caller. + provider.On("query", mock.Anything, store). + Return(uint64(4), nilResult, ErrorNotFound). + Run(addReadyWatchSet). + Once() + // There is an update to the data but the value still doesn't exist. Therefore + // we should not return data to the caller. + provider.On("query", mock.Anything, store). + Return(uint64(6), nilResult, ErrorNotFound). + Run(addReadyWatchSet). + Once() + // Finally we have some real data and can return it to the caller. + provider.On("query", mock.Anything, store). + Return(uint64(7), &testResult{value: "foo"}, nil). + Once() + + idx, result, err := ServerLocalBlockingQuery( + context.Background(), + provider.getStore, + 5, + true, + provider.query, + ) + require.NoError(t, err) + require.EqualValues(t, 7, idx) + require.Equal(t, &testResult{value: "foo"}, result) +} + +func TestServerLocalBlockingQuery_Error(t *testing.T) { + abandonCh := make(chan struct{}) + t.Cleanup(func() { close(abandonCh) }) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Once() + + provider := newMockStoreProvider(t) + provider.On("getStore"). + Return(store). + Once() + + var nilResult *testResult + provider.On("query", mock.Anything, store). + Return(uint64(10), nilResult, fmt.Errorf("synthetic error")). + Once() + + idx, result, err := ServerLocalBlockingQuery( + context.Background(), + provider.getStore, + 4, + true, + provider.query, + ) + require.Error(t, err) + require.Contains(t, err.Error(), "synthetic error") + require.EqualValues(t, 10, idx) + require.Nil(t, result) +} + +func TestServerLocalBlockingQuery_ContextCancellation(t *testing.T) { + abandonCh := make(chan struct{}) + t.Cleanup(func() { close(abandonCh) }) + + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Once() + + provider := newMockStoreProvider(t) + provider.On("getStore"). + Return(store). + Once() + provider.On("query", mock.Anything, store). + // Return an index that should not cause the blocking query to return. + Return(uint64(4), &testResult{value: "foo"}, nil). + Once(). + Run(func(_ mock.Arguments) { + // Cancel the context so that the memdb WatchCtx call will error. + cancel() + }) + + idx, result, err := ServerLocalBlockingQuery( + ctx, + provider.getStore, + 8, + true, + provider.query, + ) + // The internal cancellation error should not be propagated. + require.NoError(t, err) + require.EqualValues(t, 4, idx) + require.Equal(t, &testResult{value: "foo"}, result) +} + +func TestServerLocalBlockingQuery_StateAbandoned(t *testing.T) { + abandonCh := make(chan struct{}) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Twice() + + provider := newMockStoreProvider(t) + provider.On("getStore"). + Return(store). + Once() + provider.On("query", mock.Anything, store). + // Return an index that should not cause the blocking query to return. + Return(uint64(4), &testResult{value: "foo"}, nil). + Once(). + Run(func(_ mock.Arguments) { + // Cancel the context so that the memdb WatchCtx call will error. + close(abandonCh) + }) + + idx, result, err := ServerLocalBlockingQuery( + context.Background(), + provider.getStore, + 8, + true, + provider.query, + ) + // The internal cancellation error should not be propagated. + require.NoError(t, err) + require.EqualValues(t, 4, idx) + require.Equal(t, &testResult{value: "foo"}, result) +} + +func TestServerLocalNotify_Validations(t *testing.T) { + provider := newMockStoreProvider(t) + + type testCase struct { + ctx context.Context + getStore func() *MockStateStore + query func(memdb.WatchSet, *MockStateStore) (uint64, *testResult, error) + notify func(context.Context, string, *testResult, error) + err error + } + + cases := map[string]testCase{ + "nil-context": { + getStore: provider.getStore, + query: provider.query, + notify: provider.notify, + err: errNilContext, + }, + "nil-getStore": { + ctx: context.Background(), + query: provider.query, + notify: provider.notify, + err: errNilGetStore, + }, + "nil-query": { + ctx: context.Background(), + getStore: provider.getStore, + notify: provider.notify, + err: errNilQuery, + }, + "nil-notify": { + ctx: context.Background(), + getStore: provider.getStore, + query: provider.query, + err: errNilNotify, + }, + } + + for name, tcase := range cases { + t.Run(name, func(t *testing.T) { + err := ServerLocalNotify(tcase.ctx, "test", tcase.getStore, tcase.query, tcase.notify) + require.ErrorIs(t, err, tcase.err) + }) + } +} + +func TestServerLocalNotify(t *testing.T) { + notifyCtx, notifyCancel := context.WithCancel(context.Background()) + t.Cleanup(notifyCancel) + + abandonCh := make(chan struct{}) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Times(3) + + provider := newMockStoreProvider(t) + provider.On("getStore"). + Return(store). + Times(3) + provider.On("query", mock.Anything, store). + Return(uint64(4), &testResult{value: "foo"}, nil). + Once() + provider.On("notify", notifyCtx, t.Name(), &testResult{value: "foo"}, nil).Once() + provider.On("query", mock.Anything, store). + Return(uint64(6), &testResult{value: "bar"}, nil). + Once() + provider.On("notify", notifyCtx, t.Name(), &testResult{value: "bar"}, nil).Once() + provider.On("query", mock.Anything, store). + Return(uint64(7), &testResult{value: "baz"}, context.Canceled). + Run(func(mock.Arguments) { + notifyCancel() + }) + + doneCtx, routineDone := context.WithCancel(context.Background()) + err := serverLocalNotify(notifyCtx, t.Name(), provider.getStore, provider.query, provider.notify, routineDone, defaultWaiter()) + require.NoError(t, err) + + // Wait for the context cancellation which will happen when the "query" func is run the third time. The doneCtx gets "cancelled" + // by the backgrounded go routine when it is actually finished. We need to wait for this to ensure that all mocked calls have been + // made and that no extra calls get made. + <-doneCtx.Done() +} + +func TestServerLocalNotify_internal(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + abandonCh := make(chan struct{}) + + store := NewMockStateStore(t) + store.On("AbandonCh"). + Return(closeChan(abandonCh)). + Times(4) + + var nilResult *testResult + + provider := newMockStoreProvider(t) + provider.On("getStore"). + Return(store). + Times(4) + provider.On("query", mock.Anything, store). + Return(uint64(0), nilResult, fmt.Errorf("injected error")). + Times(3) + provider.On("notify", ctx, "test", nilResult, fmt.Errorf("injected error")). + Times(3) + provider.On("query", mock.Anything, store). + Return(uint64(7), &testResult{value: "foo"}, nil). + Once() + provider.On("notify", ctx, "test", &testResult{value: "foo"}, nil). + Once(). + Run(func(mock.Arguments) { + cancel() + }) + waiter := retry.Waiter{ + MinFailures: 1, + MinWait: time.Millisecond, + MaxWait: 50 * time.Millisecond, + Jitter: retry.NewJitter(100), + Factor: 2 * time.Millisecond, + } + + // all the mock expectations should ensure things are working properly + serverLocalNotifyRoutine(ctx, "test", provider.getStore, provider.query, provider.notify, noopDone, &waiter) +} + +func addReadyWatchSet(args mock.Arguments) { + ws := args.Get(0).(memdb.WatchSet) + ch := make(chan struct{}) + ws.Add(ch) + close(ch) +} + +// small convenience to make this more readable. The alternative in a few +// cases would be to do something like (<-chan struct{})(ch). I find that +// syntax very difficult to read. +func closeChan(ch chan struct{}) <-chan struct{} { + return ch +} From e8ea3d7c3b780cef8c8fa1e51465eb62b54d163f Mon Sep 17 00:00:00 2001 From: "R.B. Boyer" <4903+rboyer@users.noreply.github.com> Date: Tue, 21 Jun 2022 13:04:08 -0500 Subject: [PATCH 3/4] state: peering ID assignment cannot happen inside of the state store (#13525) Move peering ID assignment outisde of the FSM, so that the ID is written to the raft log and the same ID is used by all voters, and after restarts. --- agent/consul/fsm/snapshot_oss_test.go | 1 + agent/consul/leader_peering_test.go | 20 ++- agent/consul/peering_backend.go | 11 ++ agent/consul/state/peering.go | 60 ++++----- agent/consul/state/peering_test.go | 115 ++++++++---------- agent/rpc/peering/service.go | 56 +++++++++ agent/rpc/peering/service_test.go | 16 +++ agent/rpc/peering/stream_test.go | 13 +- .../rpc/peering/subscription_manager_test.go | 1 + 9 files changed, 189 insertions(+), 104 deletions(-) diff --git a/agent/consul/fsm/snapshot_oss_test.go b/agent/consul/fsm/snapshot_oss_test.go index 558abf4beb..29678d1d00 100644 --- a/agent/consul/fsm/snapshot_oss_test.go +++ b/agent/consul/fsm/snapshot_oss_test.go @@ -476,6 +476,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) { // Peerings require.NoError(t, fsm.state.PeeringWrite(31, &pbpeering.Peering{ + ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5", Name: "baz", })) diff --git a/agent/consul/leader_peering_test.go b/agent/consul/leader_peering_test.go index 3e2f6c8ff9..169ca833f3 100644 --- a/agent/consul/leader_peering_test.go +++ b/agent/consul/leader_peering_test.go @@ -7,13 +7,13 @@ import ( "testing" "time" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/api" "github.com/stretchr/testify/require" "google.golang.org/grpc" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" @@ -62,6 +62,10 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { _, found := s1.peeringService.StreamStatus(token.PeerID) require.False(t, found) + var ( + s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + ) + // Bring up s2 and store s1's token so that it attempts to dial. _, s2 := testServerWithConfig(t, func(c *Config) { c.NodeName = "s2.dc2" @@ -73,6 +77,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { // Simulate a peering initiation event by writing a peering with data from a peering token. // Eventually the leader in dc2 should dial and connect to the leader in dc1. p := &pbpeering.Peering{ + ID: s2PeerID, Name: "my-peer-s1", PeerID: token.PeerID, PeerCAPems: token.CA, @@ -92,6 +97,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) { // Delete the peering to trigger the termination sequence. deleted := &pbpeering.Peering{ + ID: s2PeerID, Name: "my-peer-s1", DeletedAt: structs.TimeToProto(time.Now()), } @@ -151,6 +157,11 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { var token structs.PeeringToken require.NoError(t, json.Unmarshal(tokenJSON, &token)) + var ( + s1PeerID = token.PeerID + s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" + ) + // Bring up s2 and store s1's token so that it attempts to dial. _, s2 := testServerWithConfig(t, func(c *Config) { c.NodeName = "s2.dc2" @@ -162,6 +173,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { // Simulate a peering initiation event by writing a peering with data from a peering token. // Eventually the leader in dc2 should dial and connect to the leader in dc1. p := &pbpeering.Peering{ + ID: s2PeerID, Name: "my-peer-s1", PeerID: token.PeerID, PeerCAPems: token.CA, @@ -181,6 +193,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) { // Delete the peering from the server peer to trigger the termination sequence. deleted := &pbpeering.Peering{ + ID: s1PeerID, Name: "my-peer-s2", DeletedAt: structs.TimeToProto(time.Now()), } @@ -216,6 +229,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) { testrpc.WaitForLeader(t, s1.RPC, "dc1") var ( + peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d" peerName = "my-peer-s2" defaultMeta = acl.DefaultEnterpriseMeta() lastIdx = uint64(0) @@ -224,6 +238,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) { // Simulate a peering initiation event by writing a peering to the state store. lastIdx++ require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, Name: peerName, })) @@ -233,6 +248,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) { // Mark the peering for deletion to trigger the termination sequence. lastIdx++ require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID, Name: peerName, DeletedAt: structs.TimeToProto(time.Now()), })) diff --git a/agent/consul/peering_backend.go b/agent/consul/peering_backend.go index 0ba3463c88..047569f110 100644 --- a/agent/consul/peering_backend.go +++ b/agent/consul/peering_backend.go @@ -143,6 +143,17 @@ type peeringApply struct { srv *Server } +func (a *peeringApply) CheckPeeringUUID(id string) (bool, error) { + state := a.srv.fsm.State() + if _, existing, err := state.PeeringReadByID(nil, id); err != nil { + return false, err + } else if existing != nil { + return false, nil + } + + return true, nil +} + func (a *peeringApply) PeeringWrite(req *pbpeering.PeeringWriteRequest) error { _, err := a.srv.raftApplyProtobuf(structs.PeeringWriteType, req) return err diff --git a/agent/consul/state/peering.go b/agent/consul/state/peering.go index 3d115707d3..6515055b25 100644 --- a/agent/consul/state/peering.go +++ b/agent/consul/state/peering.go @@ -1,12 +1,12 @@ package state import ( + "errors" "fmt" "strings" "github.com/golang/protobuf/proto" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-uuid" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/structs" @@ -191,50 +191,47 @@ func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.Enterp return idx, result, nil } -func generatePeeringUUID(tx ReadTxn) (string, error) { - for { - uuid, err := uuid.GenerateUUID() - if err != nil { - return "", fmt.Errorf("failed to generate UUID: %w", err) - } - existing, err := peeringReadByIDTxn(tx, nil, uuid) - if err != nil { - return "", fmt.Errorf("failed to read peering: %w", err) - } - if existing == nil { - return uuid, nil - } - } -} - func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error { tx := s.db.WriteTxn(idx) defer tx.Abort() - q := Query{ - Value: p.Name, - EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition), + // Check that the ID and Name are set. + if p.ID == "" { + return errors.New("Missing Peering ID") } - existingRaw, err := tx.First(tablePeering, indexName, q) - if err != nil { - return fmt.Errorf("failed peering lookup: %w", err) + if p.Name == "" { + return errors.New("Missing Peering Name") } - existing, ok := existingRaw.(*pbpeering.Peering) - if existingRaw != nil && !ok { - return fmt.Errorf("invalid type %T", existingRaw) + // ensure the name is unique (cannot conflict with another peering with a different ID) + _, existing, err := peeringReadTxn(tx, nil, Query{ + Value: p.Name, + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition), + }) + if err != nil { + return err } if existing != nil { + if p.ID != existing.ID { + return fmt.Errorf("A peering already exists with the name %q and a different ID %q", p.Name, existing.ID) + } // Prevent modifications to Peering marked for deletion if !existing.IsActive() { return fmt.Errorf("cannot write to peering that is marked for deletion") } p.CreateIndex = existing.CreateIndex - p.ID = existing.ID - + p.ModifyIndex = idx } else { + idMatch, err := peeringReadByIDTxn(tx, nil, p.ID) + if err != nil { + return err + } + if idMatch != nil { + return fmt.Errorf("A peering already exists with the ID %q and a different name %q", p.Name, existing.ID) + } + if !p.IsActive() { return fmt.Errorf("cannot create a new peering marked for deletion") } @@ -242,13 +239,8 @@ func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error { // TODO(peering): consider keeping PeeringState enum elsewhere? p.State = pbpeering.PeeringState_INITIAL p.CreateIndex = idx - - p.ID, err = generatePeeringUUID(tx) - if err != nil { - return fmt.Errorf("failed to generate peering id: %w", err) - } + p.ModifyIndex = idx } - p.ModifyIndex = idx if err := tx.Insert(tablePeering, p); err != nil { return fmt.Errorf("failed inserting peering: %w", err) diff --git a/agent/consul/state/peering_test.go b/agent/consul/state/peering_test.go index 4aba5c3404..04389a8e95 100644 --- a/agent/consul/state/peering_test.go +++ b/agent/consul/state/peering_test.go @@ -1,13 +1,10 @@ package state import ( - "fmt" - "math/rand" "testing" "time" "github.com/hashicorp/go-memdb" - "github.com/hashicorp/go-uuid" "github.com/stretchr/testify/require" "github.com/hashicorp/consul/acl" @@ -17,6 +14,12 @@ import ( "github.com/hashicorp/consul/sdk/testutil" ) +const ( + testFooPeerID = "9e650110-ac74-4c5a-a6a8-9348b2bed4e9" + testBarPeerID = "5ebcff30-5509-4858-8142-a8e580f1863f" + testBazPeerID = "432feb2f-5476-4ae2-b33c-e43640ca0e86" +) + func insertTestPeerings(t *testing.T, s *Store) { t.Helper() @@ -26,7 +29,7 @@ func insertTestPeerings(t *testing.T, s *Store) { err := tx.Insert(tablePeering, &pbpeering.Peering{ Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -36,7 +39,7 @@ func insertTestPeerings(t *testing.T, s *Store) { err = tx.Insert(tablePeering, &pbpeering.Peering{ Name: "bar", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, State: pbpeering.PeeringState_FAILING, CreateIndex: 2, ModifyIndex: 2, @@ -97,16 +100,16 @@ func TestStateStore_PeeringReadByID(t *testing.T) { run := func(t *testing.T, tc testcase) { _, peering, err := s.PeeringReadByID(nil, tc.id) require.NoError(t, err) - require.Equal(t, tc.expect, peering) + prototest.AssertDeepEqual(t, tc.expect, peering) } tcs := []testcase{ { name: "get foo", - id: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + id: testFooPeerID, expect: &pbpeering.Peering{ Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -114,11 +117,11 @@ func TestStateStore_PeeringReadByID(t *testing.T) { }, { name: "get bar", - id: "5ebcff30-5509-4858-8142-a8e580f1863f", + id: testBarPeerID, expect: &pbpeering.Peering{ Name: "bar", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, State: pbpeering.PeeringState_FAILING, CreateIndex: 2, ModifyIndex: 2, @@ -149,7 +152,7 @@ func TestStateStore_PeeringRead(t *testing.T) { run := func(t *testing.T, tc testcase) { _, peering, err := s.PeeringRead(nil, tc.query) require.NoError(t, err) - require.Equal(t, tc.expect, peering) + prototest.AssertDeepEqual(t, tc.expect, peering) } tcs := []testcase{ { @@ -160,7 +163,7 @@ func TestStateStore_PeeringRead(t *testing.T) { expect: &pbpeering.Peering{ Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -189,6 +192,7 @@ func TestStore_Peering_Watch(t *testing.T) { // set up initial write err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", }) require.NoError(t, err) @@ -210,6 +214,7 @@ func TestStore_Peering_Watch(t *testing.T) { lastIdx++ err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testBarPeerID, Name: "bar", }) require.NoError(t, err) @@ -229,6 +234,7 @@ func TestStore_Peering_Watch(t *testing.T) { // unrelated write shouldn't fire watch lastIdx++ err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testBarPeerID, Name: "bar", }) require.NoError(t, err) @@ -237,6 +243,7 @@ func TestStore_Peering_Watch(t *testing.T) { // foo write should fire watch lastIdx++ err = s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), }) @@ -261,6 +268,7 @@ func TestStore_Peering_Watch(t *testing.T) { // mark for deletion before actually deleting lastIdx++ err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testBarPeerID, Name: "bar", DeletedAt: structs.TimeToProto(time.Now()), }) @@ -293,7 +301,7 @@ func TestStore_PeeringList(t *testing.T) { { Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, State: pbpeering.PeeringState_INITIAL, CreateIndex: 1, ModifyIndex: 1, @@ -301,7 +309,7 @@ func TestStore_PeeringList(t *testing.T) { { Name: "bar", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, State: pbpeering.PeeringState_FAILING, CreateIndex: 2, ModifyIndex: 2, @@ -336,6 +344,7 @@ func TestStore_PeeringList_Watch(t *testing.T) { lastIdx++ // insert a peering err := s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), }) @@ -357,6 +366,7 @@ func TestStore_PeeringList_Watch(t *testing.T) { // update peering lastIdx++ require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), @@ -422,6 +432,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "create baz", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), }, @@ -429,6 +440,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "update baz", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", State: pbpeering.PeeringState_FAILING, Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), @@ -437,6 +449,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "mark baz for deletion", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", State: pbpeering.PeeringState_TERMINATED, DeletedAt: structs.TimeToProto(time.Now()), @@ -446,6 +459,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "cannot update peering marked for deletion", input: &pbpeering.Peering{ + ID: testBazPeerID, Name: "baz", // Attempt to add metadata Meta: map[string]string{ @@ -458,6 +472,7 @@ func TestStore_PeeringWrite(t *testing.T) { { name: "cannot create peering marked for deletion", input: &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(), @@ -472,54 +487,6 @@ func TestStore_PeeringWrite(t *testing.T) { } } -func TestStore_PeeringWrite_GenerateUUID(t *testing.T) { - rand.Seed(1) - - s := NewStateStore(nil) - - entMeta := structs.NodeEnterpriseMetaInDefaultPartition() - partition := entMeta.PartitionOrDefault() - - for i := 1; i < 11; i++ { - require.NoError(t, s.PeeringWrite(uint64(i), &pbpeering.Peering{ - Name: fmt.Sprintf("peering-%d", i), - Partition: partition, - })) - } - - idx, peerings, err := s.PeeringList(nil, *entMeta) - require.NoError(t, err) - require.Equal(t, uint64(10), idx) - require.Len(t, peerings, 10) - - // Ensure that all assigned UUIDs are unique. - uniq := make(map[string]struct{}) - for _, p := range peerings { - uniq[p.ID] = struct{}{} - } - require.Len(t, uniq, 10) - - // Ensure that the ID of an existing peering cannot be overwritten. - updated := &pbpeering.Peering{ - Name: peerings[0].Name, - Partition: peerings[0].Partition, - } - - // Attempt to overwrite ID. - updated.ID, err = uuid.GenerateUUID() - require.NoError(t, err) - require.NoError(t, s.PeeringWrite(11, updated)) - - q := Query{ - Value: updated.Name, - EnterpriseMeta: *entMeta, - } - idx, got, err := s.PeeringRead(nil, q) - require.NoError(t, err) - require.Equal(t, uint64(11), idx) - require.Equal(t, peerings[0].ID, got.ID) -} - func TestStore_PeeringDelete(t *testing.T) { s := NewStateStore(nil) insertTestPeerings(t, s) @@ -532,6 +499,7 @@ func TestStore_PeeringDelete(t *testing.T) { testutil.RunStep(t, "can delete after marking for deletion", func(t *testing.T) { require.NoError(t, s.PeeringWrite(11, &pbpeering.Peering{ + ID: testFooPeerID, Name: "foo", DeletedAt: structs.TimeToProto(time.Now()), })) @@ -550,7 +518,7 @@ func TestStore_PeeringTerminateByID(t *testing.T) { insertTestPeerings(t, s) // id corresponding to default/foo - id := "9e650110-ac74-4c5a-a6a8-9348b2bed4e9" + const id = testFooPeerID require.NoError(t, s.PeeringTerminateByID(10, id)) @@ -607,7 +575,7 @@ func TestStateStore_PeeringTrustBundleRead(t *testing.T) { run := func(t *testing.T, tc testcase) { _, ptb, err := s.PeeringTrustBundleRead(nil, tc.query) require.NoError(t, err) - require.Equal(t, tc.expect, ptb) + prototest.AssertDeepEqual(t, tc.expect, ptb) } entMeta := structs.NodeEnterpriseMetaInDefaultPartition() @@ -708,6 +676,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) { lastIdx++ require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testUUID(), Name: "my-peering", })) @@ -1000,6 +969,9 @@ func TestStateStore_PeeringsForService(t *testing.T) { var lastIdx uint64 // Create peerings for _, tp := range tc.peerings { + if tp.peering.ID == "" { + tp.peering.ID = testUUID() + } lastIdx++ require.NoError(t, s.PeeringWrite(lastIdx, tp.peering)) @@ -1009,6 +981,7 @@ func TestStateStore_PeeringsForService(t *testing.T) { lastIdx++ copied := pbpeering.Peering{ + ID: tp.peering.ID, Name: tp.peering.Name, DeletedAt: structs.TimeToProto(time.Now()), } @@ -1247,6 +1220,11 @@ func TestStore_TrustBundleListByService(t *testing.T) { var lastIdx uint64 ws := memdb.NewWatchSet() + var ( + peerID1 = testUUID() + peerID2 = testUUID() + ) + testutil.RunStep(t, "no results on initial setup", func(t *testing.T) { idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta) require.NoError(t, err) @@ -1279,6 +1257,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { testutil.RunStep(t, "creating peering does not yield trust bundles", func(t *testing.T) { lastIdx++ require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID1, Name: "peer1", })) @@ -1377,6 +1356,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { testutil.RunStep(t, "bundles for other peers are ignored", func(t *testing.T) { lastIdx++ require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID2, Name: "peer2", })) @@ -1431,6 +1411,7 @@ func TestStore_TrustBundleListByService(t *testing.T) { testutil.RunStep(t, "deleting the peering excludes its trust bundle", func(t *testing.T) { lastIdx++ require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: peerID1, Name: "peer1", DeletedAt: structs.TimeToProto(time.Now()), })) @@ -1470,7 +1451,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) { err := tx.Insert(tablePeering, &pbpeering.Peering{ Name: "foo", Partition: acl.DefaultPartitionName, - ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9", + ID: testFooPeerID, DeletedAt: structs.TimeToProto(time.Now()), CreateIndex: 1, ModifyIndex: 1, @@ -1480,7 +1461,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) { err = tx.Insert(tablePeering, &pbpeering.Peering{ Name: "bar", Partition: acl.DefaultPartitionName, - ID: "5ebcff30-5509-4858-8142-a8e580f1863f", + ID: testBarPeerID, CreateIndex: 2, ModifyIndex: 2, }) @@ -1489,7 +1470,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) { err = tx.Insert(tablePeering, &pbpeering.Peering{ Name: "baz", Partition: acl.DefaultPartitionName, - ID: "432feb2f-5476-4ae2-b33c-e43640ca0e86", + ID: testBazPeerID, DeletedAt: structs.TimeToProto(time.Now()), CreateIndex: 3, ModifyIndex: 3, diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index 6d8de85d32..5638702aac 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -24,6 +24,7 @@ import ( "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/dns" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbpeering" ) @@ -140,6 +141,7 @@ type Store interface { // Apply provides a write-only interface for persisting Peering data. type Apply interface { + CheckPeeringUUID(id string) (bool, error) PeeringWrite(req *pbpeering.PeeringWriteRequest) error PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error @@ -189,8 +191,16 @@ func (s *Service) GenerateToken( return nil, err } + canRetry := true +RETRY_ONCE: + id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition) + if err != nil { + return nil, err + } + writeReq := pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ + ID: id, Name: req.PeerName, // TODO(peering): Normalize from ACL token once this endpoint is guarded by ACLs. Partition: req.PartitionOrDefault(), @@ -198,6 +208,15 @@ func (s *Service) GenerateToken( }, } if err := s.Backend.Apply().PeeringWrite(&writeReq); err != nil { + // There's a possible race where two servers call Generate Token at the + // same time with the same peer name for the first time. They both + // generate an ID and try to insert and only one wins. This detects the + // collision and forces the loser to discard its generated ID and use + // the one from the other server. + if canRetry && strings.Contains(err.Error(), "A peering already exists with the name") { + canRetry = false + goto RETRY_ONCE + } return nil, fmt.Errorf("failed to write peering: %w", err) } @@ -270,6 +289,11 @@ func (s *Service) Establish( serverAddrs[i] = addr } + id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition) + if err != nil { + return nil, err + } + // as soon as a peering is written with a list of ServerAddresses that is // non-empty, the leader routine will see the peering and attempt to // establish a connection with the remote peer. @@ -278,6 +302,7 @@ func (s *Service) Establish( // RemotePeerID(PeerID) but at this point the other peer does not. writeReq := &pbpeering.PeeringWriteRequest{ Peering: &pbpeering.Peering{ + ID: id, Name: req.PeerName, PeerCAPems: tok.CA, PeerServerAddresses: serverAddrs, @@ -368,6 +393,16 @@ func (s *Service) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteR defer metrics.MeasureSince([]string{"peering", "write"}, time.Now()) // TODO(peering): ACL check request token + if req.Peering == nil { + return nil, fmt.Errorf("missing required peering body") + } + + id, err := s.getExistingOrCreateNewPeerID(req.Peering.Name, req.Peering.Partition) + if err != nil { + return nil, err + } + req.Peering.ID = id + // TODO(peering): handle blocking queries err = s.Backend.Apply().PeeringWrite(req) if err != nil { @@ -418,6 +453,7 @@ func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelet // We only need to include the name and partition for the peering to be identified. // All other data associated with the peering can be discarded because once marked // for deletion the peering is effectively gone. + ID: existing.ID, Name: req.Name, Partition: req.Partition, DeletedAt: structs.TimeToProto(time.Now().UTC()), @@ -837,6 +873,26 @@ func getTrustDomain(store Store, logger hclog.Logger) (string, error) { return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil } +func (s *Service) getExistingOrCreateNewPeerID(peerName, partition string) (string, error) { + q := state.Query{ + Value: strings.ToLower(peerName), + EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition), + } + _, peering, err := s.Backend.Store().PeeringRead(nil, q) + if err != nil { + return "", err + } + if peering != nil { + return peering.ID, nil + } + + id, err := lib.GenerateUUID(s.Backend.Apply().CheckPeeringUUID) + if err != nil { + return "", err + } + return id, nil +} + func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) { return s.streams.streamStatus(peer) } diff --git a/agent/rpc/peering/service_test.go b/agent/rpc/peering/service_test.go index aba7973d00..af089f56c5 100644 --- a/agent/rpc/peering/service_test.go +++ b/agent/rpc/peering/service_test.go @@ -30,6 +30,7 @@ import ( "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/agent/token" "github.com/hashicorp/consul/api" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" "github.com/hashicorp/consul/proto/prototest" @@ -224,6 +225,7 @@ func TestPeeringService_Read(t *testing.T) { // insert peering directly to state store p := &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerCAPems: nil, @@ -279,6 +281,7 @@ func TestPeeringService_Delete(t *testing.T) { s := newTestServer(t, nil) p := &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerCAPems: nil, @@ -316,6 +319,7 @@ func TestPeeringService_List(t *testing.T) { // Note that the state store holds reference to the underlying // variables; do not modify them after writing. foo := &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerCAPems: nil, @@ -324,6 +328,7 @@ func TestPeeringService_List(t *testing.T) { } require.NoError(t, s.Server.FSM().State().PeeringWrite(10, foo)) bar := &pbpeering.Peering{ + ID: testUUID(t), Name: "bar", State: pbpeering.PeeringState_ACTIVE, PeerCAPems: nil, @@ -405,6 +410,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) { lastIdx++ require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testUUID(t), Name: "foo", State: pbpeering.PeeringState_INITIAL, PeerServerName: "test", @@ -413,6 +419,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) { lastIdx++ require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{ + ID: testUUID(t), Name: "bar", State: pbpeering.PeeringState_INITIAL, PeerServerName: "test-bar", @@ -513,6 +520,7 @@ func Test_StreamHandler_UpsertServices(t *testing.T) { ) require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{ + ID: testUUID(t), Name: "my-peer", })) @@ -998,7 +1006,9 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps { } func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string { + t.Helper() err := store.PeeringWrite(index, &pbpeering.Peering{ + ID: testUUID(t), Name: name, }) require.NoError(t, err) @@ -1009,3 +1019,9 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6 return p.ID } + +func testUUID(t *testing.T) string { + v, err := lib.GenerateUUID(nil) + require.NoError(t, err) + return v +} diff --git a/agent/rpc/peering/stream_test.go b/agent/rpc/peering/stream_test.go index dc30fa6868..9bc8eff4e6 100644 --- a/agent/rpc/peering/stream_test.go +++ b/agent/rpc/peering/stream_test.go @@ -23,6 +23,7 @@ import ( "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/consul/stream" "github.com/hashicorp/consul/agent/structs" + "github.com/hashicorp/consul/lib" "github.com/hashicorp/consul/proto/pbcommon" "github.com/hashicorp/consul/proto/pbpeering" "github.com/hashicorp/consul/proto/pbservice" @@ -1030,6 +1031,10 @@ type testApplier struct { store *state.Store } +func (a *testApplier) CheckPeeringUUID(id string) (bool, error) { + panic("not implemented") +} + func (a *testApplier) PeeringWrite(req *pbpeering.PeeringWriteRequest) error { panic("not implemented") } @@ -1216,6 +1221,7 @@ func writeEstablishedPeering(t *testing.T, store *state.Store, idx uint64, peerN require.NoError(t, err) peering := pbpeering.Peering{ + ID: testUUID(t), Name: peerName, PeerID: remotePeerID, } @@ -2169,5 +2175,10 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes) require.Equal(t, expect[i].Checks[j].PartitionOrDefault(), got[i].Checks[j].PartitionOrDefault(), "partition mismatch") } } - +} + +func testUUID(t *testing.T) string { + v, err := lib.GenerateUUID(nil) + require.NoError(t, err) + return v } diff --git a/agent/rpc/peering/subscription_manager_test.go b/agent/rpc/peering/subscription_manager_test.go index a7c49090b5..d556ff23eb 100644 --- a/agent/rpc/peering/subscription_manager_test.go +++ b/agent/rpc/peering/subscription_manager_test.go @@ -589,6 +589,7 @@ func (b *testSubscriptionBackend) ensureCARoots(t *testing.T, roots ...*structs. func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string { err := store.PeeringWrite(index, &pbpeering.Peering{ + ID: testUUID(t), Name: name, }) require.NoError(t, err) From 02e71fb2a462eb0b9f8f77d6ee4194de3a1f98cc Mon Sep 17 00:00:00 2001 From: David Yu Date: Tue, 21 Jun 2022 12:10:43 -0700 Subject: [PATCH 4/4] docs: Consul K8s compat matrix update for 0.45.0+ to include Consul 1.11 compatibility (#13528) --- .../content/docs/k8s/installation/compatibility.mdx | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/website/content/docs/k8s/installation/compatibility.mdx b/website/content/docs/k8s/installation/compatibility.mdx index cb2370f835..2d651401d0 100644 --- a/website/content/docs/k8s/installation/compatibility.mdx +++ b/website/content/docs/k8s/installation/compatibility.mdx @@ -14,11 +14,11 @@ For every release of Consul on Kubernetes, a Helm chart, `consul-k8s-control-pla Starting with Consul Kubernetes 0.33.0, Consul Kubernetes versions all of its components (`consul-k8s` CLI, `consul-k8s-control-plane`, and Helm chart) with a single semantic version. -| Consul Version | Compatible consul-k8s Versions | -| -------------- | ------------------------------- | -| 1.12.x | 0.43.0 - latest | -| 1.11.x | 0.39.0 - 0.42.0, 0.44.0 | -| 1.10.x | 0.33.0 - 0.38.0 | +| Consul Version | Compatible consul-k8s Versions | +| -------------- | -------------------------------- | +| 1.12.x | 0.43.0 - latest | +| 1.11.x | 0.39.0 - 0.42.0, 0.44.0 - latest | +| 1.10.x | 0.33.0 - 0.38.0 | ### Prior to version 0.33.0