consul/agent/submatview/rpc_materializer.go
R.B. Boyer f507f62f3c
peering: initial sync (#12842)
- Add endpoints related to peering: read, list, generate token, initiate peering
- Update node/service/check table indexing to account for peers
- Foundational changes for pushing service updates to a peer
- Plumb peer name through Health.ServiceNodes path

see: ENT-1765, ENT-1280, ENT-1283, ENT-1283, ENT-1756, ENT-1739, ENT-1750, ENT-1679,
     ENT-1709, ENT-1704, ENT-1690, ENT-1689, ENT-1702, ENT-1701, ENT-1683, ENT-1663,
     ENT-1650, ENT-1678, ENT-1628, ENT-1658, ENT-1640, ENT-1637, ENT-1597, ENT-1634,
     ENT-1613, ENT-1616, ENT-1617, ENT-1591, ENT-1588, ENT-1596, ENT-1572, ENT-1555

Co-authored-by: R.B. Boyer <rb@hashicorp.com>
Co-authored-by: freddygv <freddy@hashicorp.com>
Co-authored-by: Chris S. Kim <ckim@hashicorp.com>
Co-authored-by: Evan Culver <eculver@hashicorp.com>
Co-authored-by: Nitya Dhanushkodi <nitya@hashicorp.com>
2022-04-21 17:34:40 -05:00

126 lines
3.1 KiB
Go

package submatview
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/consul/proto/pbsubscribe"
)
// RPCMaterializer is a materializer for a streaming cache type
// and manages the actual streaming RPC call to the servers behind
// the scenes until the cache result is discarded when its TTL expires.
type RPCMaterializer struct {
deps Deps
client StreamClient
handler eventHandler
mat *materializer
}
var _ Materializer = (*RPCMaterializer)(nil)
// StreamClient provides a subscription to state change events.
type StreamClient interface {
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
}
// NewRPCMaterializer returns a new Materializer. Run must be called to start it.
func NewRPCMaterializer(client StreamClient, deps Deps) *RPCMaterializer {
m := RPCMaterializer{
deps: deps,
client: client,
mat: newMaterializer(deps.Logger, deps.View, deps.Waiter),
}
return &m
}
// Query implements Materializer
func (m *RPCMaterializer) Query(ctx context.Context, minIndex uint64) (Result, error) {
return m.mat.query(ctx, minIndex)
}
// Run receives events from the StreamClient and sends them to the View. It runs
// until ctx is cancelled, so it is expected to be run in a goroutine.
// Mirrors implementation of LocalMaterializer
//
// Run implements Materializer
func (m *RPCMaterializer) Run(ctx context.Context) {
for {
req := m.deps.Request(m.mat.currentIndex())
err := m.subscribeOnce(ctx, req)
if ctx.Err() != nil {
return
}
m.mat.handleError(req, err)
if err := m.mat.retryWaiter.Wait(ctx); err != nil {
return
}
}
}
// subscribeOnce opens a new subscribe streaming call to the servers and runs
// for its lifetime or until the view is closed.
func (m *RPCMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
m.handler = initialHandler(req.Index)
s, err := m.client.Subscribe(ctx, req)
if err != nil {
return err
}
for {
event, err := s.Recv()
switch {
case isGrpcStatus(err, codes.Aborted):
m.mat.reset()
return resetErr("stream reset requested")
case err != nil:
return err
}
m.handler, err = m.handler(m, event)
if err != nil {
m.mat.reset()
return err
}
}
}
func isGrpcStatus(err error, code codes.Code) bool {
s, ok := status.FromError(err)
return ok && s.Code() == code
}
// resetErr represents a server request to reset the subscription, it's typed so
// we can mark it as temporary and so attempt to retry first time without
// notifying clients.
type resetErr string
// Temporary Implements the internal Temporary interface
func (e resetErr) Temporary() bool {
return true
}
// Error implements error
func (e resetErr) Error() string {
return string(e)
}
// updateView implements viewState
func (m *RPCMaterializer) updateView(events []*pbsubscribe.Event, index uint64) error {
return m.mat.updateView(events, index)
}
// reset implements viewState
func (m *RPCMaterializer) reset() {
m.mat.reset()
}