Fix Internal.ServiceDump blocking (#6076)

maxIndexWatchTxn was only watching the IndexEntry of the max index of all the entries. It needed to watch all of them regardless of which was the max.

Also plumbed the query source through in the proxy config to help better track requests.
This commit is contained in:
Matt Keeler 2019-07-04 11:17:49 -04:00 committed by Paul Banks
parent 52c608df78
commit 3d562bee5c
2 changed files with 9 additions and 1 deletions

View File

@ -255,8 +255,8 @@ func maxIndexWatchTxn(tx *memdb.Txn, ws memdb.WatchSet, tables ...string) uint64
} }
if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex { if idx, ok := ti.(*IndexEntry); ok && idx.Value > lindex {
lindex = idx.Value lindex = idx.Value
ws.Add(ch)
} }
ws.Add(ch)
} }
return lindex return lindex
} }

View File

@ -150,6 +150,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
ServiceKind: structs.ServiceKindMeshGateway, ServiceKind: structs.ServiceKindMeshGateway,
UseServiceKind: true, UseServiceKind: true,
Source: *s.source,
}, correlationId, s.ch) }, correlationId, s.ch)
case structs.MeshGatewayModeLocal: case structs.MeshGatewayModeLocal:
return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{ return s.cache.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
@ -157,6 +158,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
ServiceKind: structs.ServiceKindMeshGateway, ServiceKind: structs.ServiceKindMeshGateway,
UseServiceKind: true, UseServiceKind: true,
Source: *s.source,
}, correlationId, s.ch) }, correlationId, s.ch)
default: default:
// This includes both the None and Default modes on purpose // This includes both the None and Default modes on purpose
@ -171,6 +173,7 @@ func (s *state) watchConnectProxyService(ctx context.Context, correlationId stri
// Note that Identifier doesn't type-prefix for service any more as it's // Note that Identifier doesn't type-prefix for service any more as it's
// the default and makes metrics and other things much cleaner. It's // the default and makes metrics and other things much cleaner. It's
// simpler for us if we have the type to make things unambiguous. // simpler for us if we have the type to make things unambiguous.
Source: *s.source,
}, correlationId, s.ch) }, correlationId, s.ch)
} }
} }
@ -182,6 +185,7 @@ func (s *state) initWatchesConnectProxy() error {
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, rootsWatchID, s.ch) }, rootsWatchID, s.ch)
if err != nil { if err != nil {
return err return err
@ -230,6 +234,7 @@ func (s *state) initWatchesConnectProxy() error {
QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval}, QueryOptions: structs.QueryOptions{Token: s.token, MaxAge: defaultPreparedQueryPollInterval},
QueryIDOrName: u.DestinationName, QueryIDOrName: u.DestinationName,
Connect: true, Connect: true,
Source: *s.source,
}, "upstream:"+u.Identifier(), s.ch) }, "upstream:"+u.Identifier(), s.ch)
case structs.UpstreamDestTypeService: case structs.UpstreamDestTypeService:
fallthrough fallthrough
@ -290,6 +295,7 @@ func (s *state) initWatchesMeshGateway() error {
err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{ err := s.cache.Notify(s.ctx, cachetype.ConnectCARootName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, rootsWatchID, s.ch) }, rootsWatchID, s.ch)
if err != nil { if err != nil {
return err return err
@ -299,6 +305,7 @@ func (s *state) initWatchesMeshGateway() error {
err = s.cache.Notify(s.ctx, cachetype.CatalogListServicesName, &structs.DCSpecificRequest{ err = s.cache.Notify(s.ctx, cachetype.CatalogListServicesName, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter, Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
Source: *s.source,
}, serviceListWatchID, s.ch) }, serviceListWatchID, s.ch)
if err != nil { if err != nil {
@ -721,6 +728,7 @@ func (s *state) handleUpdateMeshGateway(u cache.UpdateEvent, snap *ConfigSnapsho
QueryOptions: structs.QueryOptions{Token: s.token}, QueryOptions: structs.QueryOptions{Token: s.token},
ServiceKind: structs.ServiceKindMeshGateway, ServiceKind: structs.ServiceKindMeshGateway,
UseServiceKind: true, UseServiceKind: true,
Source: *s.source,
}, fmt.Sprintf("mesh-gateway:%s", dc), s.ch) }, fmt.Sprintf("mesh-gateway:%s", dc), s.ch)
if err != nil { if err != nil {