// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1

package submatview

import (
	"context"
	"sync"
	"time"

	"github.com/hashicorp/go-hclog"

	"github.com/hashicorp/consul/lib/retry"
	"github.com/hashicorp/consul/proto/private/pbsubscribe"
)

// View receives events from, and return results to, Materializer. A view is
// responsible for converting the pbsubscribe.Event.Payload into the local
// type, and storing it so that it can be returned by Result().
type View interface {
	// Update is called when one or more events are received. The first call will
	// include _all_ events in the initial snapshot which may be an empty set.
	// Subsequent calls will contain one or more update events in the order they
	// are received.
	Update(events []*pbsubscribe.Event) error

	// Result returns the type-specific cache result based on the state. When no
	// events have been delivered yet the result should be an empty value type
	// suitable to return to clients in case there is an empty result on the
	// servers. The index the materialized view represents is maintained
	// separately and passed in in case the return type needs an Index field
	// populating. This allows implementations to not worry about maintaining
	// indexes seen during Update.
	Result(index uint64) interface{}

	// Reset the view to the zero state, done in preparation for receiving a new
	// snapshot.
	Reset()
}

// Result returned from the View.
type Result struct {
	Index uint64
	Value interface{}
	// Cached is true if the requested value was already available locally. If
	// the value is false, it indicates that GetFromView had to wait for an update,
	Cached bool
}

type Deps struct {
	View    View
	Logger  hclog.Logger
	Waiter  *retry.Waiter
	Request func(index uint64) *pbsubscribe.SubscribeRequest
}

// materializer consumes the event stream, handling any framing events, and
// allows for querying the materialized view.
type materializer struct {
	retryWaiter *retry.Waiter
	logger      hclog.Logger

	// lock protects the mutable state - all fields below it must only be accessed
	// while holding lock.
	lock     sync.Mutex
	index    uint64
	view     View
	updateCh chan struct{}
	err      error
}

func newMaterializer(logger hclog.Logger, view View, waiter *retry.Waiter) *materializer {
	m := materializer{
		view:        view,
		retryWaiter: waiter,
		logger:      logger,
		updateCh:    make(chan struct{}),
	}
	if m.retryWaiter == nil {
		m.retryWaiter = defaultWaiter()
	}
	return &m
}

// Query blocks until the index of the View is greater than opts.MinIndex,
// or the context is cancelled.
func (m *materializer) query(ctx context.Context, minIndex uint64) (Result, error) {
	m.lock.Lock()

	result := Result{
		Index: m.index,
		Value: m.view.Result(m.index),
	}

	updateCh := m.updateCh
	m.lock.Unlock()

	// If our index is > req.Index return right away. If index is zero then we
	// haven't loaded a snapshot at all yet which means we should wait for one on
	// the update chan.
	if result.Index > 0 && result.Index > minIndex {
		result.Cached = true
		return result, nil
	}

	for {
		select {
		case <-updateCh:
			// View updated, return the new result
			m.lock.Lock()
			result.Index = m.index

			switch {
			case m.err != nil:
				err := m.err
				m.lock.Unlock()
				return result, err
			case result.Index <= minIndex:
				// get a reference to the new updateCh, the previous one was closed
				updateCh = m.updateCh
				m.lock.Unlock()
				continue
			}

			result.Value = m.view.Result(m.index)
			m.lock.Unlock()
			return result, nil

		case <-ctx.Done():
			// Update the result value to the latest because callers may still
			// use the value when the error is context.DeadlineExceeded
			m.lock.Lock()
			result.Value = m.view.Result(m.index)
			m.lock.Unlock()
			return result, ctx.Err()
		}
	}
}

func (m *materializer) currentIndex() uint64 {
	var resp uint64

	m.lock.Lock()
	resp = m.index
	m.lock.Unlock()

	return resp
}

// notifyUpdateLocked closes the current update channel and recreates a new
// one. It must be called while holding the m.lock lock.
func (m *materializer) notifyUpdateLocked(err error) {
	m.err = err
	close(m.updateCh)
	m.updateCh = make(chan struct{})
}

// reset clears the state ready to start a new stream from scratch.
func (m *materializer) reset() {
	m.lock.Lock()
	defer m.lock.Unlock()

	m.view.Reset()
	m.index = 0
}

// updateView updates the view from a sequence of events and stores
// the corresponding Raft index.
func (m *materializer) updateView(events []*pbsubscribe.Event, index uint64) error {
	m.lock.Lock()
	defer m.lock.Unlock()

	if err := m.view.Update(events); err != nil {
		return err
	}

	m.index = index
	m.notifyUpdateLocked(nil)
	m.retryWaiter.Reset()
	return nil
}

func (m *materializer) handleError(req *pbsubscribe.SubscribeRequest, err error) {
	failures := m.retryWaiter.Failures()
	if isNonTemporaryOrConsecutiveFailure(err, failures) {
		m.lock.Lock()
		m.notifyUpdateLocked(err)
		m.lock.Unlock()
	}

	logger := m.logger.With(
		"err", err,
		"topic", req.Topic,
		"failure_count", failures+1,
	)

	if req.GetWildcardSubject() {
		logger = logger.With("wildcard_subject", true)
	} else if sub := req.GetNamedSubject(); sub != nil {
		logger = logger.With("key", sub.Key)
	} else {
		logger = logger.With("key", req.Key) // nolint:staticcheck // SA1019 intentional use of deprecated field
	}

	logger.Error("subscribe call failed")
}

// isNonTemporaryOrConsecutiveFailure returns true if the error is not a
// temporary error or if failures > 0.
func isNonTemporaryOrConsecutiveFailure(err error, failures int) bool {
	// temporary is an interface used by net and other std lib packages to
	// show error types represent temporary/recoverable errors.
	temp, ok := err.(interface {
		Temporary() bool
	})
	return !ok || !temp.Temporary() || failures > 0
}

func defaultWaiter() *retry.Waiter {
	return &retry.Waiter{
		MinFailures: 1,
		// Start backing off with small increments (200-400ms) which will double
		// each attempt. (200-400, 400-800, 800-1600, 1600-3200, 3200-6000, 6000
		// after that). (retry.Wait applies Max limit after jitter right now).
		Factor:  200 * time.Millisecond,
		MinWait: 0,
		MaxWait: 60 * time.Second,
		Jitter:  retry.NewJitter(100),
	}
}