From 3c8929c7e1c075ae2e71209df27fe5530d689827 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Mon, 19 Oct 2020 18:24:02 -0400 Subject: [PATCH] streaming: apply filter to a single item Instead of the whole map. This should save a lot of time performing reflecting on a large map. The filter does not change, so there is no reason to re-apply it to older entries. --- .../cache-types/streaming_health_services.go | 59 ++++++++++++------- 1 file changed, 38 insertions(+), 21 deletions(-) diff --git a/agent/cache-types/streaming_health_services.go b/agent/cache-types/streaming_health_services.go index 139fc614fe..e3d0adefd0 100644 --- a/agent/cache-types/streaming_health_services.go +++ b/agent/cache-types/streaming_health_services.go @@ -3,6 +3,7 @@ package cachetype import ( "context" "fmt" + "reflect" "time" "github.com/hashicorp/go-bexpr" @@ -138,15 +139,14 @@ func (s *streamingHealthState) Fetch(opts cache.FetchOptions) (cache.FetchResult } func newHealthView(filterExpr string) (*healthView, error) { - s := &healthView{state: make(map[string]structs.CheckServiceNode)} - - // We apply filtering to the raw CheckServiceNodes before we are done mutating - // state in Update to save from storing stuff in memory we'll only filter - // later. Because the state is just a map of those types, we can simply run - // that map through filter and it will remove any entries that don't match. - var err error - s.filter, err = bexpr.CreateFilter(filterExpr, nil, s.state) - return s, err + fe, err := newFilterEvaluator(filterExpr) + if err != nil { + return nil, err + } + return &healthView{ + state: make(map[string]structs.CheckServiceNode), + filter: fe, + }, nil } // healthView implements submatview.View for storing the view state @@ -156,7 +156,7 @@ func newHealthView(filterExpr string) (*healthView, error) { // involves re-sorting each time etc. though. type healthView struct { state map[string]structs.CheckServiceNode - filter *bexpr.Filter + filter filterEvaluator } // Update implements View @@ -171,24 +171,41 @@ func (s *healthView) Update(events []*pbsubscribe.Event) error { id := serviceHealth.CheckServiceNode.UniqueID() switch serviceHealth.Op { case pbsubscribe.CatalogOp_Register: - csn := pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode) - s.state[id] = *csn + csn := *pbservice.CheckServiceNodeToStructs(serviceHealth.CheckServiceNode) + passed, err := s.filter.Evaluate(csn) + switch { + case err != nil: + return err + case passed: + s.state[id] = csn + } + case pbsubscribe.CatalogOp_Deregister: delete(s.state, id) } } - // TODO(streaming): should this filter be applied to only the new CheckServiceNode - // instead of the full map, which should already be filtered. - if s.filter != nil { - filtered, err := s.filter.Execute(s.state) - if err != nil { - return err - } - s.state = filtered.(map[string]structs.CheckServiceNode) - } return nil } +type filterEvaluator interface { + Evaluate(datum interface{}) (bool, error) +} + +func newFilterEvaluator(expr string) (filterEvaluator, error) { + if expr == "" { + return noopFilterEvaluator{}, nil + } + return bexpr.CreateEvaluatorForType(expr, nil, reflect.TypeOf(structs.CheckServiceNode{})) +} + +// noopFilterEvaluator may be used in place of a bexpr.Evaluator. The Evaluate +// method always return true, so no items will be filtered out. +type noopFilterEvaluator struct{} + +func (noopFilterEvaluator) Evaluate(_ interface{}) (bool, error) { + return true, nil +} + // Result returns the structs.IndexedCheckServiceNodes stored by this view. func (s *healthView) Result(index uint64) (interface{}, error) { result := structs.IndexedCheckServiceNodes{