consul/agent/proxycfg-glue/service_list.go

125 lines
3.6 KiB
Go

package proxycfgglue
import (
"context"
"sort"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/agent/submatview"
"github.com/hashicorp/consul/proto/private/pbcommon"
"github.com/hashicorp/consul/proto/private/pbsubscribe"
)
// CacheServiceList satisfies the proxycfg.ServiceList interface by sourcing
// data from the agent cache.
func CacheServiceList(c *cache.Cache) proxycfg.ServiceList {
return &cacheProxyDataSource[*structs.DCSpecificRequest]{c, cachetype.CatalogServiceListName}
}
func ServerServiceList(deps ServerDataSourceDeps, remoteSource proxycfg.ServiceList) proxycfg.ServiceList {
return &serverServiceList{deps, remoteSource}
}
type serverServiceList struct {
deps ServerDataSourceDeps
remoteSource proxycfg.ServiceList
}
func (s *serverServiceList) Notify(ctx context.Context, req *structs.DCSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error {
if req.Datacenter != s.deps.Datacenter {
return s.remoteSource.Notify(ctx, req, correlationID, ch)
}
return s.deps.ViewStore.NotifyCallback(
ctx,
&serviceListRequest{s.deps, req},
correlationID,
dispatchCacheUpdate(ch),
)
}
type serviceListRequest struct {
deps ServerDataSourceDeps
req *structs.DCSpecificRequest
}
func (r *serviceListRequest) Request(index uint64) *pbsubscribe.SubscribeRequest {
return &pbsubscribe.SubscribeRequest{
Topic: pbsubscribe.Topic_ServiceList,
Subject: &pbsubscribe.SubscribeRequest_WildcardSubject{WildcardSubject: true},
Index: index,
Datacenter: r.req.Datacenter,
Token: r.req.QueryOptions.Token,
}
}
func (r *serviceListRequest) CacheInfo() cache.RequestInfo { return r.req.CacheInfo() }
func (r *serviceListRequest) NewMaterializer() (submatview.Materializer, error) {
return submatview.NewLocalMaterializer(submatview.LocalMaterializerDeps{
Backend: r.deps.EventPublisher,
ACLResolver: r.deps.ACLResolver,
Deps: submatview.Deps{
View: newServiceListView(r.req.EnterpriseMeta),
Logger: r.deps.Logger,
Request: r.Request,
},
}), nil
}
func (serviceListRequest) Type() string { return "proxycfgglue.ServiceList" }
func newServiceListView(entMeta acl.EnterpriseMeta) *serviceListView {
view := &serviceListView{entMeta: entMeta}
view.Reset()
return view
}
type serviceListView struct {
entMeta acl.EnterpriseMeta
state map[string]structs.ServiceName
}
func (v *serviceListView) Reset() { v.state = make(map[string]structs.ServiceName) }
func (v *serviceListView) Update(events []*pbsubscribe.Event) error {
for _, event := range filterByEnterpriseMeta(events, v.entMeta) {
update := event.GetService()
if update == nil {
continue
}
var entMeta acl.EnterpriseMeta
pbcommon.EnterpriseMetaToStructs(update.EnterpriseMeta, &entMeta)
name := structs.NewServiceName(update.Name, &entMeta)
switch update.Op {
case pbsubscribe.CatalogOp_Register:
v.state[name.String()] = name
case pbsubscribe.CatalogOp_Deregister:
delete(v.state, name.String())
}
}
return nil
}
func (v *serviceListView) Result(index uint64) any {
serviceList := make(structs.ServiceList, 0, len(v.state))
for _, name := range v.state {
serviceList = append(serviceList, name)
}
sort.Slice(serviceList, func(a, b int) bool {
return serviceList[a].String() < serviceList[b].String()
})
return &structs.IndexedServiceList{
Services: serviceList,
QueryMeta: structs.QueryMeta{
Backend: structs.QueryBackendStreaming,
Index: index,
},
}
}