package proxycfg

import (
	"context"
	"errors"
	"fmt"
	"net"
	"reflect"
	"time"

	"github.com/hashicorp/go-hclog"
	"github.com/mitchellh/copystructure"

	"github.com/hashicorp/consul/agent/cache"
	cachetype "github.com/hashicorp/consul/agent/cache-types"
	"github.com/hashicorp/consul/agent/structs"
	"github.com/hashicorp/consul/logging"
)

type CacheNotifier interface {
	Notify(ctx context.Context, t string, r cache.Request,
		correlationID string, ch chan<- cache.UpdateEvent) error
}

type Health interface {
	Notify(ctx context.Context, req structs.ServiceSpecificRequest, correlationID string, ch chan<- cache.UpdateEvent) error
}

const (
	coalesceTimeout                    = 200 * time.Millisecond
	rootsWatchID                       = "roots"
	leafWatchID                        = "leaf"
	intentionsWatchID                  = "intentions"
	serviceListWatchID                 = "service-list"
	federationStateListGatewaysWatchID = "federation-state-list-mesh-gateways"
	consulServerListWatchID            = "consul-server-list"
	datacentersWatchID                 = "datacenters"
	serviceResolversWatchID            = "service-resolvers"
	gatewayServicesWatchID             = "gateway-services"
	gatewayConfigWatchID               = "gateway-config"
	externalServiceIDPrefix            = "external-service:"
	serviceLeafIDPrefix                = "service-leaf:"
	serviceConfigIDPrefix              = "service-config:"
	serviceResolverIDPrefix            = "service-resolver:"
	serviceIntentionsIDPrefix          = "service-intentions:"
	intentionUpstreamsID               = "intention-upstreams"
	meshConfigEntryID                  = "mesh"
	svcChecksWatchIDPrefix             = cachetype.ServiceHTTPChecksName + ":"
	preparedQueryIDPrefix              = string(structs.UpstreamDestTypePreparedQuery) + ":"
	defaultPreparedQueryPollInterval   = 30 * time.Second
)

type stateConfig struct {
	logger                hclog.Logger
	source                *structs.QuerySource
	cache                 CacheNotifier
	health                Health
	dnsConfig             DNSConfig
	serverSNIFn           ServerSNIFunc
	intentionDefaultAllow bool
}

// state holds all the state needed to maintain the config for a registered
// connect-proxy service. When a proxy registration is changed, the entire state
// is discarded and a new one created.
type state struct {
	logger          hclog.Logger
	serviceInstance serviceInstance
	handler         kindHandler

	// cancel is set by Watch and called by Close to stop the goroutine started
	// in Watch.
	cancel func()

	ch     chan cache.UpdateEvent
	snapCh chan ConfigSnapshot
	reqCh  chan chan *ConfigSnapshot
}

type DNSConfig struct {
	Domain    string
	AltDomain string
}

type ServerSNIFunc func(dc, nodeName string) string

type serviceInstance struct {
	kind            structs.ServiceKind
	service         string
	proxyID         structs.ServiceID
	address         string
	port            int
	meta            map[string]string
	taggedAddresses map[string]structs.ServiceAddress
	proxyCfg        structs.ConnectProxyConfig
	token           string
}

func copyProxyConfig(ns *structs.NodeService) (structs.ConnectProxyConfig, error) {
	if ns == nil {
		return structs.ConnectProxyConfig{}, nil
	}
	// Copy the config map
	proxyCfgRaw, err := copystructure.Copy(ns.Proxy)
	if err != nil {
		return structs.ConnectProxyConfig{}, err
	}
	proxyCfg, ok := proxyCfgRaw.(structs.ConnectProxyConfig)
	if !ok {
		return structs.ConnectProxyConfig{}, errors.New("failed to copy proxy config")
	}

	// we can safely modify these since we just copied them
	for idx := range proxyCfg.Upstreams {
		us := &proxyCfg.Upstreams[idx]
		if us.DestinationType != structs.UpstreamDestTypePreparedQuery {
			// default the upstreams target namespace and partition to those of the proxy
			// doing this here prevents needing much more complex logic a bunch of other
			// places and makes tracking these upstreams simpler as we can dedup them
			// with the maps tracking upstream ids being watched.
			if us.DestinationPartition == "" {
				proxyCfg.Upstreams[idx].DestinationPartition = ns.EnterpriseMeta.PartitionOrDefault()
			}
			if us.DestinationNamespace == "" {
				proxyCfg.Upstreams[idx].DestinationNamespace = ns.EnterpriseMeta.NamespaceOrDefault()
			}
		}
	}

	return proxyCfg, nil
}

// newState populates the state struct by copying relevant fields from the
// NodeService and Token. We copy so that we can use them in a separate
// goroutine later without reasoning about races with the NodeService passed
// (especially for embedded fields like maps and slices).
//
// The returned state needs its required dependencies to be set before Watch
// can be called.
func newState(ns *structs.NodeService, token string, config stateConfig) (*state, error) {
	// 10 is fairly arbitrary here but allow for the 3 mandatory and a
	// reasonable number of upstream watches to all deliver their initial
	// messages in parallel without blocking the cache.Notify loops. It's not a
	// huge deal if we do for a short period so we don't need to be more
	// conservative to handle larger numbers of upstreams correctly but gives
	// some head room for normal operation to be non-blocking in most typical
	// cases.
	ch := make(chan cache.UpdateEvent, 10)

	s, err := newServiceInstanceFromNodeService(ns, token)
	if err != nil {
		return nil, err
	}

	handler, err := newKindHandler(config, s, ch)
	if err != nil {
		return nil, err
	}

	return &state{
		logger:          config.logger.With("proxy", s.proxyID, "kind", s.kind),
		serviceInstance: s,
		handler:         handler,
		ch:              ch,
		snapCh:          make(chan ConfigSnapshot, 1),
		reqCh:           make(chan chan *ConfigSnapshot, 1),
	}, nil
}

func newKindHandler(config stateConfig, s serviceInstance, ch chan cache.UpdateEvent) (kindHandler, error) {
	var handler kindHandler
	h := handlerState{stateConfig: config, serviceInstance: s, ch: ch}

	switch s.kind {
	case structs.ServiceKindConnectProxy:
		handler = &handlerConnectProxy{handlerState: h}
	case structs.ServiceKindTerminatingGateway:
		h.stateConfig.logger = config.logger.Named(logging.TerminatingGateway)
		handler = &handlerTerminatingGateway{handlerState: h}
	case structs.ServiceKindMeshGateway:
		h.stateConfig.logger = config.logger.Named(logging.MeshGateway)
		handler = &handlerMeshGateway{handlerState: h}
	case structs.ServiceKindIngressGateway:
		handler = &handlerIngressGateway{handlerState: h}
	default:
		return nil, errors.New("not a connect-proxy, terminating-gateway, mesh-gateway, or ingress-gateway")
	}

	return handler, nil
}

func newServiceInstanceFromNodeService(ns *structs.NodeService, token string) (serviceInstance, error) {
	proxyCfg, err := copyProxyConfig(ns)
	if err != nil {
		return serviceInstance{}, err
	}

	taggedAddresses := make(map[string]structs.ServiceAddress)
	for k, v := range ns.TaggedAddresses {
		taggedAddresses[k] = v
	}

	meta := make(map[string]string)
	for k, v := range ns.Meta {
		meta[k] = v
	}

	return serviceInstance{
		kind:            ns.Kind,
		service:         ns.Service,
		proxyID:         ns.CompoundServiceID(),
		address:         ns.Address,
		port:            ns.Port,
		meta:            meta,
		taggedAddresses: taggedAddresses,
		proxyCfg:        proxyCfg,
		token:           token,
	}, nil
}

type kindHandler interface {
	initialize(ctx context.Context) (ConfigSnapshot, error)
	handleUpdate(ctx context.Context, u cache.UpdateEvent, snap *ConfigSnapshot) error
}

// Watch initialized watches on all necessary cache data for the current proxy
// registration state and returns a chan to observe updates to the
// ConfigSnapshot that contains all necessary config state. The chan is closed
// when the state is Closed.
func (s *state) Watch() (<-chan ConfigSnapshot, error) {
	var ctx context.Context
	ctx, s.cancel = context.WithCancel(context.Background())

	snap, err := s.handler.initialize(ctx)
	if err != nil {
		s.cancel()
		return nil, err
	}

	go s.run(ctx, &snap)

	return s.snapCh, nil
}

// Close discards the state and stops any long-running watches.
func (s *state) Close() error {
	if s.cancel != nil {
		s.cancel()
	}
	return nil
}

type handlerState struct {
	stateConfig     // TODO: un-embed
	serviceInstance // TODO: un-embed
	ch              chan cache.UpdateEvent
}

func newConfigSnapshotFromServiceInstance(s serviceInstance, config stateConfig) ConfigSnapshot {
	// TODO: use serviceInstance type in ConfigSnapshot
	return ConfigSnapshot{
		Kind:                  s.kind,
		Service:               s.service,
		ProxyID:               s.proxyID,
		Address:               s.address,
		Port:                  s.port,
		ServiceMeta:           s.meta,
		TaggedAddresses:       s.taggedAddresses,
		Proxy:                 s.proxyCfg,
		Datacenter:            config.source.Datacenter,
		Locality:              GatewayKey{Datacenter: config.source.Datacenter, Partition: s.proxyID.PartitionOrDefault()},
		ServerSNIFn:           config.serverSNIFn,
		IntentionDefaultAllow: config.intentionDefaultAllow,
	}
}

func (s *state) run(ctx context.Context, snap *ConfigSnapshot) {
	// Close the channel we return from Watch when we stop so consumers can stop
	// watching and clean up their goroutines. It's important we do this here and
	// not in Close since this routine sends on this chan and so might panic if it
	// gets closed from another goroutine.
	defer close(s.snapCh)

	// This turns out to be really fiddly/painful by just using time.Timer.C
	// directly in the code below since you can't detect when a timer is stopped
	// vs waiting in order to know to reset it. So just use a chan to send
	// ourselves messages.
	sendCh := make(chan struct{})
	var coalesceTimer *time.Timer

	for {
		select {
		case <-ctx.Done():
			return
		case u := <-s.ch:
			s.logger.Trace("A blocking query returned; handling snapshot update", "correlationID", u.CorrelationID)

			if err := s.handler.handleUpdate(ctx, u, snap); err != nil {
				s.logger.Error("Failed to handle update from watch",
					"id", u.CorrelationID, "error", err,
				)
				continue
			}

		case <-sendCh:
			// Allow the next change to trigger a send
			coalesceTimer = nil
			// Make a deep copy of snap so we don't mutate any of the embedded structs
			// etc on future updates.
			snapCopy, err := snap.Clone()
			if err != nil {
				s.logger.Error("Failed to copy config snapshot for proxy", "error", err)
				continue
			}

			select {
			// Try to send
			case s.snapCh <- *snapCopy:
				s.logger.Trace("Delivered new snapshot to proxy config watchers")

				// Skip rest of loop - there is nothing to send since nothing changed on
				// this iteration
				continue

			// Avoid blocking if a snapshot is already buffered in snapCh as this can result in a deadlock.
			// See PR #9689 for more details.
			default:
				s.logger.Trace("Failed to deliver new snapshot to proxy config watchers")

				// Reset the timer to retry later. This is to ensure we attempt to redeliver the updated snapshot shortly.
				coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
					sendCh <- struct{}{}
				})

				// Do not reset coalesceTimer since we just queued a timer-based refresh
				continue
			}

		case replyCh := <-s.reqCh:
			s.logger.Trace("A proxy config snapshot was requested")

			if !snap.Valid() {
				// Not valid yet just respond with nil and move on to next task.
				replyCh <- nil

				s.logger.Trace("The proxy's config snapshot is not valid yet")
				continue
			}
			// Make a deep copy of snap so we don't mutate any of the embedded structs
			// etc on future updates.
			snapCopy, err := snap.Clone()
			if err != nil {
				s.logger.Error("Failed to copy config snapshot for proxy", "error", err)
				continue
			}
			replyCh <- snapCopy

			// Skip rest of loop - there is nothing to send since nothing changed on
			// this iteration
			continue
		}

		// Check if snap is complete enough to be a valid config to deliver to a
		// proxy yet.
		if snap.Valid() {
			// Don't send it right away, set a short timer that will wait for updates
			// from any of the other cache values and deliver them all together.
			if coalesceTimer == nil {
				coalesceTimer = time.AfterFunc(coalesceTimeout, func() {
					// This runs in another goroutine so we can't just do the send
					// directly here as access to snap is racy. Instead, signal the main
					// loop above.
					sendCh <- struct{}{}
				})
			}
		}
	}
}

// CurrentSnapshot synchronously returns the current ConfigSnapshot if there is
// one ready. If we don't have one yet because not all necessary parts have been
// returned (i.e. both roots and leaf cert), nil is returned.
func (s *state) CurrentSnapshot() *ConfigSnapshot {
	// Make a chan for the response to be sent on
	ch := make(chan *ConfigSnapshot, 1)
	s.reqCh <- ch
	// Wait for the response
	return <-ch
}

// Changed returns whether or not the passed NodeService has had any of the
// fields we care about for config state watching changed or a different token.
func (s *state) Changed(ns *structs.NodeService, token string) bool {
	if ns == nil {
		return true
	}

	proxyCfg, err := copyProxyConfig(ns)
	if err != nil {
		s.logger.Warn("Failed to parse proxy config and will treat the new service as unchanged")
	}

	i := s.serviceInstance
	return ns.Kind != i.kind ||
		i.proxyID != ns.CompoundServiceID() ||
		i.address != ns.Address ||
		i.port != ns.Port ||
		!reflect.DeepEqual(i.proxyCfg, proxyCfg) ||
		i.token != token
}

// hostnameEndpoints returns all CheckServiceNodes that have hostnames instead of IPs as the address.
// Envoy cannot resolve hostnames provided through EDS, so we exclusively use CDS for these clusters.
// If there is a mix of hostnames and addresses we exclusively use the hostnames, since clusters cannot discover
// services with both EDS and DNS.
func hostnameEndpoints(logger hclog.Logger, localKey GatewayKey, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
	var (
		hasIP       bool
		hasHostname bool
		resp        structs.CheckServiceNodes
	)

	for _, n := range nodes {
		_, addr, _ := n.BestAddress(!localKey.Matches(n.Node.Datacenter, n.Node.PartitionOrDefault()))
		if net.ParseIP(addr) != nil {
			hasIP = true
			continue
		}
		hasHostname = true
		resp = append(resp, n)
	}

	if hasHostname && hasIP {
		dc := nodes[0].Node.Datacenter
		sn := nodes[0].Service.CompoundServiceName()

		logger.Warn("service contains instances with mix of hostnames and IP addresses; only hostnames will be passed to Envoy",
			"dc", dc, "service", sn.String())
	}
	return resp
}

type gatewayWatchOpts struct {
	notifier   CacheNotifier
	notifyCh   chan cache.UpdateEvent
	source     structs.QuerySource
	token      string
	key        GatewayKey
	upstreamID UpstreamID
}

func watchMeshGateway(ctx context.Context, opts gatewayWatchOpts) error {
	return opts.notifier.Notify(ctx, cachetype.InternalServiceDumpName, &structs.ServiceDumpRequest{
		Datacenter:     opts.key.Datacenter,
		QueryOptions:   structs.QueryOptions{Token: opts.token},
		ServiceKind:    structs.ServiceKindMeshGateway,
		UseServiceKind: true,
		Source:         opts.source,
		EnterpriseMeta: *structs.DefaultEnterpriseMetaInPartition(opts.key.Partition),
	}, fmt.Sprintf("mesh-gateway:%s:%s", opts.key.String(), opts.upstreamID.String()), opts.notifyCh)
}