mirror of
https://github.com/status-im/consul.git
synced 2025-01-10 05:45:46 +00:00
5fb9df1640
* Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
129 lines
3.2 KiB
Go
129 lines
3.2 KiB
Go
// Copyright (c) HashiCorp, Inc.
|
|
// SPDX-License-Identifier: BUSL-1.1
|
|
|
|
package submatview
|
|
|
|
import (
|
|
"context"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"github.com/hashicorp/consul/proto/private/pbsubscribe"
|
|
)
|
|
|
|
// RPCMaterializer is a materializer for a streaming cache type
|
|
// and manages the actual streaming RPC call to the servers behind
|
|
// the scenes until the cache result is discarded when its TTL expires.
|
|
type RPCMaterializer struct {
|
|
deps Deps
|
|
client StreamClient
|
|
handler eventHandler
|
|
|
|
mat *materializer
|
|
}
|
|
|
|
var _ Materializer = (*RPCMaterializer)(nil)
|
|
|
|
// StreamClient provides a subscription to state change events.
|
|
type StreamClient interface {
|
|
Subscribe(ctx context.Context, in *pbsubscribe.SubscribeRequest, opts ...grpc.CallOption) (pbsubscribe.StateChangeSubscription_SubscribeClient, error)
|
|
}
|
|
|
|
// NewRPCMaterializer returns a new Materializer. Run must be called to start it.
|
|
func NewRPCMaterializer(client StreamClient, deps Deps) *RPCMaterializer {
|
|
m := RPCMaterializer{
|
|
deps: deps,
|
|
client: client,
|
|
mat: newMaterializer(deps.Logger, deps.View, deps.Waiter),
|
|
}
|
|
return &m
|
|
}
|
|
|
|
// Query implements Materializer
|
|
func (m *RPCMaterializer) Query(ctx context.Context, minIndex uint64) (Result, error) {
|
|
return m.mat.query(ctx, minIndex)
|
|
}
|
|
|
|
// Run receives events from the StreamClient and sends them to the View. It runs
|
|
// until ctx is cancelled, so it is expected to be run in a goroutine.
|
|
// Mirrors implementation of LocalMaterializer
|
|
//
|
|
// Run implements Materializer
|
|
func (m *RPCMaterializer) Run(ctx context.Context) {
|
|
for {
|
|
req := m.deps.Request(m.mat.currentIndex())
|
|
err := m.subscribeOnce(ctx, req)
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
m.mat.handleError(req, err)
|
|
|
|
if err := m.mat.retryWaiter.Wait(ctx); err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// subscribeOnce opens a new subscribe streaming call to the servers and runs
|
|
// for its lifetime or until the view is closed.
|
|
func (m *RPCMaterializer) subscribeOnce(ctx context.Context, req *pbsubscribe.SubscribeRequest) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
|
|
m.handler = initialHandler(req.Index)
|
|
|
|
s, err := m.client.Subscribe(ctx, req)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
for {
|
|
event, err := s.Recv()
|
|
switch {
|
|
case isGrpcStatus(err, codes.Aborted):
|
|
m.mat.reset()
|
|
return resetErr("stream reset requested")
|
|
case err != nil:
|
|
return err
|
|
}
|
|
|
|
m.handler, err = m.handler(m, event)
|
|
if err != nil {
|
|
m.mat.reset()
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func isGrpcStatus(err error, code codes.Code) bool {
|
|
s, ok := status.FromError(err)
|
|
return ok && s.Code() == code
|
|
}
|
|
|
|
// resetErr represents a server request to reset the subscription, it's typed so
|
|
// we can mark it as temporary and so attempt to retry first time without
|
|
// notifying clients.
|
|
type resetErr string
|
|
|
|
// Temporary Implements the internal Temporary interface
|
|
func (e resetErr) Temporary() bool {
|
|
return true
|
|
}
|
|
|
|
// Error implements error
|
|
func (e resetErr) Error() string {
|
|
return string(e)
|
|
}
|
|
|
|
// updateView implements viewState
|
|
func (m *RPCMaterializer) updateView(events []*pbsubscribe.Event, index uint64) error {
|
|
return m.mat.updateView(events, index)
|
|
}
|
|
|
|
// reset implements viewState
|
|
func (m *RPCMaterializer) reset() {
|
|
m.mat.reset()
|
|
}
|