API Gateway Controller Logic (#16058)

* Add initial API gateway controller logic

---------

Co-authored-by: Nathan Coleman <nathan.coleman@hashicorp.com>
Co-authored-by: Andrew Stucki <andrew.stucki@hashicorp.com>
Co-authored-by: Thomas Eckert <teckert@hashicorp.com>
This commit is contained in:
sarahalsmiller 2023-02-03 15:55:48 -06:00 committed by GitHub
parent 2f149d60cc
commit 143b2bc1f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 511 additions and 13 deletions

View File

@ -0,0 +1,77 @@
package consul
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/structs"
)
// FSMDataStore implements the DataStore interface using the Consul server and finite state manager.
type FSMDataStore struct {
server *Server
fsm *fsm.FSM
}
func NewFSMDataStore(server *Server, fsm *fsm.FSM) *FSMDataStore {
return &FSMDataStore{
server: server,
fsm: fsm,
}
}
// GetConfigEntry takes in a kind, name, and meta and returns a configentry and an error from the FSM state
func (f *FSMDataStore) GetConfigEntry(kind string, name string, meta *acl.EnterpriseMeta) (structs.ConfigEntry, error) {
store := f.fsm.State()
_, entry, err := store.ConfigEntry(nil, kind, name, meta)
if err != nil {
return nil, err
}
return entry, nil
}
// GetConfigEntriesByKind takes in a kind and returns all instances of that kind of config entry from the FSM state
func (f *FSMDataStore) GetConfigEntriesByKind(kind string) ([]structs.ConfigEntry, error) {
store := f.fsm.State()
_, entries, err := store.ConfigEntriesByKind(nil, kind, acl.WildcardEnterpriseMeta())
if err != nil {
return nil, err
}
return entries, nil
}
// Update takes a config entry and upserts it in the FSM state
func (f *FSMDataStore) Update(entry structs.ConfigEntry) error {
_, err := f.server.leaderRaftApply("ConfigEntry.Apply", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{
Op: structs.ConfigEntryUpsertCAS,
Entry: entry,
})
return err
}
// UpdateStatus takes a config entry, an error, and updates the status field as needed in the FSM state
func (f *FSMDataStore) UpdateStatus(entry structs.ControlledConfigEntry, err error) error {
if err == nil {
//TODO additional status messages for success?
return nil
}
status := structs.Status{
Conditions: []structs.Condition{{
Status: err.Error() + ": Accepted == false",
},
},
}
entry.SetStatus(status)
return f.Update(entry)
}
// Delete takes a config entry and deletes it from the FSM state
func (f *FSMDataStore) Delete(entry structs.ConfigEntry) error {
_, err := f.server.leaderRaftApply("ConfigEntry.Delete", structs.ConfigEntryRequestType, &structs.ConfigEntryRequest{
Op: structs.ConfigEntryDelete,
Entry: entry,
})
return err
}

View File

@ -2,33 +2,201 @@ package gateways
import (
"context"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/consul/agent/consul/controller"
"github.com/hashicorp/consul/agent/consul/fsm"
"github.com/hashicorp/consul/agent/consul/state"
"github.com/hashicorp/consul/agent/consul/stream"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/pkg/errors"
)
type apiGatewayReconciler struct {
fsm *fsm.FSM
logger hclog.Logger
store DataStore
}
func (r apiGatewayReconciler) Reconcile(ctx context.Context, req controller.Request) error {
return nil
}
func NewAPIGatewayController(fsm *fsm.FSM, publisher state.EventPublisher, logger hclog.Logger) controller.Controller {
// NewAPIGatewayController returns a new APIGateway controller
func NewAPIGatewayController(store DataStore, publisher state.EventPublisher, logger hclog.Logger) controller.Controller {
reconciler := apiGatewayReconciler{
fsm: fsm,
logger: logger,
store: store,
}
return controller.New(publisher, reconciler).Subscribe(
return controller.New(publisher, &reconciler).Subscribe(
&stream.SubscribeRequest{
Topic: state.EventTopicAPIGateway,
Subject: stream.SubjectWildcard,
},
)
}
// Reconcile takes in a controller request and ensures this api gateways corresponding BoundAPIGateway exists and is
// up to date
func (r *apiGatewayReconciler) Reconcile(ctx context.Context, req controller.Request) error {
r.logger.Debug("started reconciling gateway", "gateway", req.Name)
metaGateway, err := r.initGatewayMeta(req)
if err != nil {
return err
} else if metaGateway == nil {
//delete meta gateway
r.logger.Info("cleaning up deleted gateway object", "request", req)
if err := r.store.Delete(&structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: req.Name,
EnterpriseMeta: *req.Meta,
}); err != nil {
msg := "error cleaning up deleted gateway object"
r.logger.Error(msg, err)
return errors.Wrap(err, msg)
}
return nil
}
r.ensureBoundGateway(metaGateway)
routes, err := r.retrieveAllRoutesFromStore()
if err != nil {
return err
}
boundGateways, routeErrors := BindRoutesToGateways([]*gatewayMeta{metaGateway}, routes...)
//In this loop there should only be 1 bound gateway returned, but looping over all returned gateways
//to make sure nothing gets dropped and handle case where 0 gateways are returned
for _, boundGateway := range boundGateways {
// now update the gateway state
r.logger.Debug("persisting gateway state", "state", boundGateway)
if err := r.store.Update(boundGateway); err != nil {
msg := "error persisting state"
r.logger.Error(msg, "error", err)
return errors.Wrap(err, msg)
}
// then update the gateway status
r.logger.Debug("persisting gateway status", "gateway", metaGateway.Gateway)
if err := r.store.UpdateStatus(metaGateway.Gateway, err); err != nil {
return err
}
}
// and update the route statuses
for route, routeError := range routeErrors {
configEntry := r.resourceReferenceToBoundRoute(route)
r.logger.Error("route binding error:", routeError)
if err := r.store.UpdateStatus(configEntry, routeError); err != nil {
return err
}
}
return nil
}
func (r *apiGatewayReconciler) retrieveAllRoutesFromStore() ([]structs.BoundRoute, error) {
tcpRoutes, err := r.store.GetConfigEntriesByKind(structs.TCPRoute)
if err != nil {
return nil, err
}
//TODO not implemented
//httpRoutes, err := r.store.GetConfigEntriesByKind(structs.HTTPRoute)
//if err != nil {
// return nil, err
//}
routes := []structs.BoundRoute{}
for _, r := range tcpRoutes {
if r == nil {
continue
}
routes = append(routes, r.(*structs.TCPRouteConfigEntry))
}
//TODO not implemented
//for _, r := range httpRoutes {
// routes = append(routes, r.(*structs.HTTPRouteConfigEntry))
//}
return routes, nil
}
func (r *apiGatewayReconciler) initGatewayMeta(req controller.Request) (*gatewayMeta, error) {
metaGateway := &gatewayMeta{}
apiGateway, err := r.store.GetConfigEntry(req.Kind, req.Name, req.Meta)
if err != nil {
return nil, err
}
if apiGateway == nil {
//gateway doesn't exist
return nil, nil
}
metaGateway.Gateway = apiGateway.(*structs.APIGatewayConfigEntry)
boundGateway, err := r.store.GetConfigEntry(structs.BoundAPIGateway, req.Name, req.Meta)
if err != nil {
return nil, err
}
//initialize object, values get copied over in ensureBoundGateway if they don't exist
metaGateway.BoundGateway = boundGateway.(*structs.BoundAPIGatewayConfigEntry)
return metaGateway, nil
}
func (r *apiGatewayReconciler) resourceReferenceToBoundRoute(ref structs.ResourceReference) structs.ControlledConfigEntry {
//TODO currently have to retrieve from the store to persist parent field on update call, is there a better way to do this?
boundRoute, err := r.store.GetConfigEntry(ref.Kind, ref.Name, &ref.EnterpriseMeta)
if err != nil {
return nil
}
switch ref.Kind {
case structs.TCPRoute:
return boundRoute.(*structs.TCPRouteConfigEntry)
case structs.HTTPRoute:
return boundRoute.(*structs.HTTPRouteConfigEntry)
}
return nil
}
// ensureBoundGateway copies all relevant data from a gatewayMeta's APIGateway to BoundAPIGateway
func (r *apiGatewayReconciler) ensureBoundGateway(gw *gatewayMeta) {
if gw.BoundGateway == nil {
gw.BoundGateway = &structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: gw.Gateway.Name,
EnterpriseMeta: gw.Gateway.EnterpriseMeta,
}
}
r.ensureListeners(gw)
}
func (r *apiGatewayReconciler) ensureListeners(gw *gatewayMeta) {
//rebuild the list from scratch, just copying over the ones that already exist
listeners := []structs.BoundAPIGatewayListener{}
for _, l := range gw.Gateway.Listeners {
boundListener := getBoundGatewayListener(l, gw.BoundGateway.Listeners)
if boundListener != nil {
//listener is already on gateway, copy onto our new list
listeners = append(listeners, *boundListener)
continue
}
//create new listener to add to our gateway
listeners = append(listeners, structs.BoundAPIGatewayListener{
Name: l.Name,
})
}
gw.BoundGateway.Listeners = listeners
}
func getBoundGatewayListener(listener structs.APIGatewayListener, boundListeners []structs.BoundAPIGatewayListener) *structs.BoundAPIGatewayListener {
for _, bl := range boundListeners {
if bl.Name == listener.Name {
return &bl
}
}
return nil
}

View File

@ -0,0 +1,122 @@
package gateways
import (
"context"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/consul/controller"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/mock"
"testing"
)
func Test_apiGatewayReconciler_Reconcile(t *testing.T) {
type fields struct {
logger hclog.Logger
store DataStore
}
type args struct {
ctx context.Context
req controller.Request
}
tests := []struct {
name string
fields fields
args args
wantErr bool
}{
{
name: "happy path - update available",
fields: fields{
store: datastoreWithUpdate(t),
logger: hclog.Default(),
},
args: args{
ctx: context.Background(),
req: controller.Request{
Kind: structs.APIGateway,
Name: "test-gateway",
Meta: acl.DefaultEnterpriseMeta(),
},
},
wantErr: false,
},
{
name: "delete happy path",
fields: fields{
store: datastoreWithDelete(t),
logger: hclog.Default(),
},
args: args{
ctx: context.Background(),
req: controller.Request{
Kind: structs.APIGateway,
Name: "test-gateway",
Meta: acl.DefaultEnterpriseMeta(),
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
r := apiGatewayReconciler{
logger: tt.fields.logger,
store: tt.fields.store,
}
if err := r.Reconcile(tt.args.ctx, tt.args.req); (err != nil) != tt.wantErr {
t.Errorf("Reconcile() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}
func datastoreWithUpdate(t *testing.T) *MockDataStore {
ds := NewMockDataStore(t)
ds.On("GetConfigEntry", structs.APIGateway, mock.Anything, mock.Anything).Return(&structs.APIGatewayConfigEntry{
Kind: structs.APIGateway,
Name: "test-gateway",
Listeners: []structs.APIGatewayListener{
{
Name: "test-listener",
Protocol: "tcp",
Port: 8080,
},
},
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}, nil)
ds.On("GetConfigEntry", structs.BoundAPIGateway, mock.Anything, mock.Anything).Return(
&structs.BoundAPIGatewayConfigEntry{
Kind: structs.BoundAPIGateway,
Name: "test-gateway",
Listeners: []structs.BoundAPIGatewayListener{},
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
}, nil)
ds.On("GetConfigEntriesByKind", structs.TCPRoute).Return([]structs.ConfigEntry{
&structs.TCPRouteConfigEntry{
Kind: structs.TCPRoute,
Name: "test-route",
Parents: []structs.ResourceReference{
{
Kind: structs.APIGateway,
Name: "test-gateway",
SectionName: "test-listener",
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
},
},
EnterpriseMeta: *acl.DefaultEnterpriseMeta(),
},
}, nil)
ds.On("Update", mock.Anything).Return(nil)
ds.On("UpdateStatus", mock.Anything, mock.Anything).Return(nil)
return ds
}
func datastoreWithDelete(t *testing.T) *MockDataStore {
ds := NewMockDataStore(t)
ds.On("GetConfigEntry", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil)
ds.On("Delete", mock.Anything).Return(nil)
return ds
}

View File

@ -0,0 +1,15 @@
package gateways
import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
)
//go:generate mockery --name DataStore --inpackage
type DataStore interface {
GetConfigEntry(kind string, name string, meta *acl.EnterpriseMeta) (structs.ConfigEntry, error)
GetConfigEntriesByKind(kind string) ([]structs.ConfigEntry, error)
Update(entry structs.ConfigEntry) error
UpdateStatus(entry structs.ControlledConfigEntry, err error) error
Delete(entry structs.ConfigEntry) error
}

View File

@ -0,0 +1,115 @@
// Code generated by mockery v2.12.2. DO NOT EDIT.
package gateways
import (
acl "github.com/hashicorp/consul/acl"
mock "github.com/stretchr/testify/mock"
structs "github.com/hashicorp/consul/agent/structs"
testing "testing"
)
// MockDataStore is an autogenerated mock type for the DataStore type
type MockDataStore struct {
mock.Mock
}
// Delete provides a mock function with given fields: entry
func (_m *MockDataStore) Delete(entry structs.ConfigEntry) error {
ret := _m.Called(entry)
var r0 error
if rf, ok := ret.Get(0).(func(structs.ConfigEntry) error); ok {
r0 = rf(entry)
} else {
r0 = ret.Error(0)
}
return r0
}
// GetConfigEntriesByKind provides a mock function with given fields: kind
func (_m *MockDataStore) GetConfigEntriesByKind(kind string) ([]structs.ConfigEntry, error) {
ret := _m.Called(kind)
var r0 []structs.ConfigEntry
if rf, ok := ret.Get(0).(func(string) []structs.ConfigEntry); ok {
r0 = rf(kind)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).([]structs.ConfigEntry)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string) error); ok {
r1 = rf(kind)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// GetConfigEntry provides a mock function with given fields: kind, name, meta
func (_m *MockDataStore) GetConfigEntry(kind string, name string, meta *acl.EnterpriseMeta) (structs.ConfigEntry, error) {
ret := _m.Called(kind, name, meta)
var r0 structs.ConfigEntry
if rf, ok := ret.Get(0).(func(string, string, *acl.EnterpriseMeta) structs.ConfigEntry); ok {
r0 = rf(kind, name, meta)
} else {
if ret.Get(0) != nil {
r0 = ret.Get(0).(structs.ConfigEntry)
}
}
var r1 error
if rf, ok := ret.Get(1).(func(string, string, *acl.EnterpriseMeta) error); ok {
r1 = rf(kind, name, meta)
} else {
r1 = ret.Error(1)
}
return r0, r1
}
// Update provides a mock function with given fields: entry
func (_m *MockDataStore) Update(entry structs.ConfigEntry) error {
ret := _m.Called(entry)
var r0 error
if rf, ok := ret.Get(0).(func(structs.ConfigEntry) error); ok {
r0 = rf(entry)
} else {
r0 = ret.Error(0)
}
return r0
}
// UpdateStatus provides a mock function with given fields: entry, err
func (_m *MockDataStore) UpdateStatus(entry structs.ControlledConfigEntry, err error) error {
ret := _m.Called(entry, err)
var r0 error
if rf, ok := ret.Get(0).(func(structs.ControlledConfigEntry, error) error); ok {
r0 = rf(entry, err)
} else {
r0 = ret.Error(0)
}
return r0
}
// NewMockDataStore creates a new instance of MockDataStore. It also registers the testing.TB interface on the mock and a cleanup function to assert the mocks expectations.
func NewMockDataStore(t testing.TB) *MockDataStore {
mock := &MockDataStore{}
mock.Mock.Test(t)
t.Cleanup(func() { mock.AssertExpectations(t) })
return mock
}

View File

@ -76,7 +76,8 @@ func (s *Server) runConfigEntryControllers(ctx context.Context) error {
group.Go(func() error {
logger := s.logger.Named(logging.APIGatewayController)
return gateways.NewAPIGatewayController(s.fsm, s.publisher, logger).Run(ctx)
datastore := NewFSMDataStore(s, s.fsm)
return gateways.NewAPIGatewayController(datastore, s.publisher, logger).Run(ctx)
})
group.Go(func() error {