mirror of https://github.com/status-im/consul.git
submatview: move error return to NewMaterializer
So that we don't have to create views ahead of time, when we will never use that view.
This commit is contained in:
parent
55a677b7d1
commit
4fb2ba9de7
|
@ -59,15 +59,17 @@ func (c *Client) getServiceNodes(
|
||||||
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
) (structs.IndexedCheckServiceNodes, cache.ResultMeta, error) {
|
||||||
var out structs.IndexedCheckServiceNodes
|
var out structs.IndexedCheckServiceNodes
|
||||||
|
|
||||||
|
// TODO: if UseStreaming, elif !UseCache, else cache
|
||||||
|
|
||||||
if !req.QueryOptions.UseCache {
|
if !req.QueryOptions.UseCache {
|
||||||
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
err := c.NetRPC.RPC("Health.ServiceNodes", &req, &out)
|
||||||
return out, cache.ResultMeta{}, err
|
return out, cache.ResultMeta{}, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if req.Source.Node == "" {
|
if req.Source.Node == "" {
|
||||||
sr, err := newServiceRequest(req, c.MaterializerDeps)
|
sr := serviceRequest{
|
||||||
if err != nil {
|
ServiceSpecificRequest: req,
|
||||||
return out, cache.ResultMeta{}, err
|
deps: c.MaterializerDeps,
|
||||||
}
|
}
|
||||||
|
|
||||||
result, err := c.ViewStore.Get(ctx, sr)
|
result, err := c.ViewStore.Get(ctx, sr)
|
||||||
|
@ -109,21 +111,8 @@ func (c *Client) Notify(
|
||||||
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
|
return c.Cache.Notify(ctx, cacheName, &req, correlationID, ch)
|
||||||
}
|
}
|
||||||
|
|
||||||
func newServiceRequest(req structs.ServiceSpecificRequest, deps MaterializerDeps) (serviceRequest, error) {
|
|
||||||
view, err := newHealthView(req)
|
|
||||||
if err != nil {
|
|
||||||
return serviceRequest{}, err
|
|
||||||
}
|
|
||||||
return serviceRequest{
|
|
||||||
ServiceSpecificRequest: req,
|
|
||||||
view: view,
|
|
||||||
deps: deps,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
type serviceRequest struct {
|
type serviceRequest struct {
|
||||||
structs.ServiceSpecificRequest
|
structs.ServiceSpecificRequest
|
||||||
view *healthView
|
|
||||||
deps MaterializerDeps
|
deps MaterializerDeps
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -135,11 +124,15 @@ func (r serviceRequest) Type() string {
|
||||||
return "service-health"
|
return "service-health"
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r serviceRequest) NewMaterializer() *submatview.Materializer {
|
func (r serviceRequest) NewMaterializer() (*submatview.Materializer, error) {
|
||||||
|
view, err := newHealthView(r.ServiceSpecificRequest)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
return submatview.NewMaterializer(submatview.Deps{
|
return submatview.NewMaterializer(submatview.Deps{
|
||||||
View: r.view,
|
View: view,
|
||||||
Client: r.deps.Client,
|
Client: r.deps.Client,
|
||||||
Logger: r.deps.Logger,
|
Logger: r.deps.Logger,
|
||||||
Request: newMaterializerRequest(r.ServiceSpecificRequest),
|
Request: newMaterializerRequest(r.ServiceSpecificRequest),
|
||||||
})
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -73,7 +73,7 @@ func (s *Store) Run(ctx context.Context) {
|
||||||
// TODO: godoc
|
// TODO: godoc
|
||||||
type Request interface {
|
type Request interface {
|
||||||
cache.Request
|
cache.Request
|
||||||
NewMaterializer() *Materializer
|
NewMaterializer() (*Materializer, error)
|
||||||
Type() string
|
Type() string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -82,7 +82,10 @@ type Request interface {
|
||||||
// See agent/cache.Cache.Get for complete documentation.
|
// See agent/cache.Cache.Get for complete documentation.
|
||||||
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
|
func (s *Store) Get(ctx context.Context, req Request) (Result, error) {
|
||||||
info := req.CacheInfo()
|
info := req.CacheInfo()
|
||||||
key, e := s.getEntry(req)
|
key, e, err := s.getEntry(req)
|
||||||
|
if err != nil {
|
||||||
|
return Result{}, err
|
||||||
|
}
|
||||||
defer s.releaseEntry(key)
|
defer s.releaseEntry(key)
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, info.Timeout)
|
ctx, cancel := context.WithTimeout(ctx, info.Timeout)
|
||||||
|
@ -108,7 +111,10 @@ func (s *Store) Notify(
|
||||||
updateCh chan<- cache.UpdateEvent,
|
updateCh chan<- cache.UpdateEvent,
|
||||||
) error {
|
) error {
|
||||||
info := req.CacheInfo()
|
info := req.CacheInfo()
|
||||||
key, e := s.getEntry(req)
|
key, e, err := s.getEntry(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
defer s.releaseEntry(key)
|
defer s.releaseEntry(key)
|
||||||
|
@ -150,7 +156,7 @@ func (s *Store) Notify(
|
||||||
|
|
||||||
// getEntry from the store, and increment the requests counter. releaseEntry
|
// getEntry from the store, and increment the requests counter. releaseEntry
|
||||||
// must be called when the request is finished to decrement the counter.
|
// must be called when the request is finished to decrement the counter.
|
||||||
func (s *Store) getEntry(req Request) (string, entry) {
|
func (s *Store) getEntry(req Request) (string, entry, error) {
|
||||||
info := req.CacheInfo()
|
info := req.CacheInfo()
|
||||||
key := makeEntryKey(req.Type(), info)
|
key := makeEntryKey(req.Type(), info)
|
||||||
|
|
||||||
|
@ -160,11 +166,15 @@ func (s *Store) getEntry(req Request) (string, entry) {
|
||||||
if ok {
|
if ok {
|
||||||
e.requests++
|
e.requests++
|
||||||
s.byKey[key] = e
|
s.byKey[key] = e
|
||||||
return key, e
|
return key, e, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
mat, err := req.NewMaterializer()
|
||||||
|
if err != nil {
|
||||||
|
return "", e, err
|
||||||
}
|
}
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
mat := req.NewMaterializer()
|
|
||||||
go mat.Run(ctx)
|
go mat.Run(ctx)
|
||||||
|
|
||||||
e = entry{
|
e = entry{
|
||||||
|
@ -173,7 +183,7 @@ func (s *Store) getEntry(req Request) (string, entry) {
|
||||||
requests: 1,
|
requests: 1,
|
||||||
}
|
}
|
||||||
s.byKey[key] = e
|
s.byKey[key] = e
|
||||||
return key, e
|
return key, e, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// idleTTL is the duration of time an entry should remain in the Store after the
|
// idleTTL is the duration of time an entry should remain in the Store after the
|
||||||
|
|
|
@ -146,7 +146,7 @@ func (r *fakeRequest) CacheInfo() cache.RequestInfo {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *fakeRequest) NewMaterializer() *Materializer {
|
func (r *fakeRequest) NewMaterializer() (*Materializer, error) {
|
||||||
return NewMaterializer(Deps{
|
return NewMaterializer(Deps{
|
||||||
View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)},
|
View: &fakeView{srvs: make(map[string]*pbservice.CheckServiceNode)},
|
||||||
Client: r.client,
|
Client: r.client,
|
||||||
|
@ -162,7 +162,7 @@ func (r *fakeRequest) NewMaterializer() *Materializer {
|
||||||
}
|
}
|
||||||
return req
|
return req
|
||||||
},
|
},
|
||||||
})
|
}), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *fakeRequest) Type() string {
|
func (r *fakeRequest) Type() string {
|
||||||
|
|
Loading…
Reference in New Issue