mirror of https://github.com/status-im/consul.git
Use a buffered channel for CA intermediate renew func
This commit is contained in:
parent
13c31ccfce
commit
781cae5809
|
@ -983,7 +983,7 @@ func (c *CAManager) RenewIntermediate(ctx context.Context, isPrimary bool) error
|
|||
if !isPrimary {
|
||||
renewalFunc = c.getIntermediateCASigned
|
||||
}
|
||||
errCh := make(chan error)
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- renewalFunc(provider, activeRoot)
|
||||
}()
|
||||
|
|
|
@ -19,6 +19,9 @@ import (
|
|||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
// TODO(kyhavlov): replace with t.Deadline()
|
||||
const CATestTimeout = 7 * time.Second
|
||||
|
||||
type mockCAServerDelegate struct {
|
||||
t *testing.T
|
||||
config *Config
|
||||
|
@ -77,6 +80,8 @@ func (m *mockCAServerDelegate) forwardDC(method, dc string, args interface{}, re
|
|||
case "ConnectCA.SignIntermediate":
|
||||
r := reply.(*string)
|
||||
*r = m.primaryRoot.RootCert
|
||||
default:
|
||||
return fmt.Errorf("received call to unsupported method %q", method)
|
||||
}
|
||||
|
||||
m.callbackCh <- fmt.Sprintf("forwardDC/%s", method)
|
||||
|
@ -101,6 +106,8 @@ func (m *mockCAServerDelegate) raftApply(t structs.MessageType, msg interface{})
|
|||
act, err = m.store.CACheckAndSetConfig(1, req.Config.ModifyIndex, req.Config)
|
||||
require.NoError(m.t, err)
|
||||
require.True(m.t, act)
|
||||
} else {
|
||||
return nil, fmt.Errorf("got invalid MessageType %v", t)
|
||||
}
|
||||
m.callbackCh <- fmt.Sprintf("raftApply/%s", t)
|
||||
return nil, nil
|
||||
|
@ -139,7 +146,7 @@ func waitForCh(t *testing.T, ch chan string, expected string) {
|
|||
if op != expected {
|
||||
t.Fatalf("got unexpected op %q, wanted %q", op, expected)
|
||||
}
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(CATestTimeout):
|
||||
t.Fatalf("never got op %q", expected)
|
||||
}
|
||||
}
|
||||
|
@ -174,13 +181,13 @@ func initTestManager(t *testing.T, manager *CAManager, delegate *mockCAServerDel
|
|||
for i := 0; i < 5; i++ {
|
||||
select {
|
||||
case <-delegate.callbackCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(CATestTimeout):
|
||||
t.Fatal("failed waiting for initialization events")
|
||||
}
|
||||
}
|
||||
select {
|
||||
case <-initCh:
|
||||
case <-time.After(3 * time.Second):
|
||||
case <-time.After(CATestTimeout):
|
||||
t.Fatal("failed waiting for initialization")
|
||||
}
|
||||
}
|
||||
|
@ -195,10 +202,10 @@ func TestCAManager_Initialize(t *testing.T) {
|
|||
|
||||
// Call InitializeCA and then confirm the RPCs and provider calls
|
||||
// happen in the expected order.
|
||||
go func() {
|
||||
require.EqualValues(t, CAStateUninitialized, manager.state)
|
||||
require.NoError(t, manager.InitializeCA())
|
||||
require.EqualValues(t, CAStateReady, manager.state)
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
errCh <- manager.InitializeCA()
|
||||
}()
|
||||
|
||||
waitForCh(t, delegate.callbackCh, "forwardDC/ConnectCA.Roots")
|
||||
|
@ -208,6 +215,16 @@ func TestCAManager_Initialize(t *testing.T) {
|
|||
waitForCh(t, delegate.callbackCh, "provider/SetIntermediate")
|
||||
waitForCh(t, delegate.callbackCh, "raftApply/ConnectCA")
|
||||
waitForEmptyCh(t, delegate.callbackCh)
|
||||
|
||||
// Make sure the InitializeCA call returned successfully.
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(CATestTimeout):
|
||||
t.Fatal("never got result from errCh")
|
||||
}
|
||||
|
||||
require.EqualValues(t, CAStateReady, manager.state)
|
||||
}
|
||||
|
||||
func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) {
|
||||
|
@ -235,9 +252,9 @@ func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) {
|
|||
|
||||
// Call RenewIntermediate and then confirm the RPCs and provider calls
|
||||
// happen in the expected order.
|
||||
errCh := make(chan error)
|
||||
go func() {
|
||||
require.NoError(t, manager.RenewIntermediate(context.TODO(), false))
|
||||
require.EqualValues(t, CAStateReady, manager.state)
|
||||
errCh <- manager.RenewIntermediate(context.TODO(), false)
|
||||
}()
|
||||
|
||||
waitForCh(t, delegate.callbackCh, "provider/GenerateIntermediateCSR")
|
||||
|
@ -253,4 +270,14 @@ func TestCAManager_UpdateConfigWhileRenewIntermediate(t *testing.T) {
|
|||
waitForCh(t, delegate.callbackCh, "provider/SetIntermediate")
|
||||
waitForCh(t, delegate.callbackCh, "raftApply/ConnectCA")
|
||||
waitForEmptyCh(t, delegate.callbackCh)
|
||||
|
||||
// Make sure the RenewIntermediate call returned successfully.
|
||||
select {
|
||||
case err := <-errCh:
|
||||
require.NoError(t, err)
|
||||
case <-time.After(CATestTimeout):
|
||||
t.Fatal("never got result from errCh")
|
||||
}
|
||||
|
||||
require.EqualValues(t, CAStateReady, manager.state)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue