package controller

import (
	"context"
	"fmt"
	"sync"
	"sync/atomic"

	"github.com/hashicorp/go-hclog"

	"github.com/hashicorp/consul/internal/resource"
	"github.com/hashicorp/consul/proto-public/pbresource"
)

// Manager is responsible for scheduling the execution of controllers.
type Manager struct {
	client pbresource.ResourceServiceClient
	logger hclog.Logger

	raftLeader atomic.Bool

	mu          sync.Mutex
	running     bool
	controllers []Controller
	leaseChans  []chan struct{}
}

// NewManager creates a Manager. logger will be used by the Manager, and as the
// base logger for controllers when one is not specified using WithLogger.
func NewManager(client pbresource.ResourceServiceClient, logger hclog.Logger) *Manager {
	return &Manager{
		client: client,
		logger: logger,
	}
}

// Register the given controller to be executed by the Manager. Cannot be called
// once the Manager is running.
func (m *Manager) Register(ctrl Controller) {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.running {
		panic("cannot register additional controllers after calling Run")
	}

	if ctrl.reconciler == nil {
		panic(fmt.Sprintf("cannot register controller without a reconciler %s", ctrl))
	}

	m.controllers = append(m.controllers, ctrl)
}

// Run the Manager and start executing controllers until the given context is
// canceled. Cannot be called more than once.
func (m *Manager) Run(ctx context.Context) {
	m.mu.Lock()
	defer m.mu.Unlock()

	if m.running {
		panic("cannot call Run more than once")
	}
	m.running = true

	for _, desc := range m.controllers {
		logger := desc.logger
		if logger == nil {
			logger = m.logger.With("managed_type", resource.ToGVK(desc.managedType))
		}

		runner := &controllerRunner{
			ctrl:   desc,
			client: m.client,
			logger: logger,
		}
		go newSupervisor(runner.run, m.newLeaseLocked(desc)).run(ctx)
	}
}

// SetRaftLeader notifies the Manager of Raft leadership changes. Controllers
// are currently only executed on the Raft leader, so calling this method will
// cause the Manager to spin them up/down accordingly.
func (m *Manager) SetRaftLeader(leader bool) {
	m.raftLeader.Store(leader)

	m.mu.Lock()
	defer m.mu.Unlock()

	for _, ch := range m.leaseChans {
		select {
		case ch <- struct{}{}:
		default:
			// Do not block if there's nothing receiving on ch (because the supervisor is
			// busy doing something else). Note that ch has a buffer of 1, so we'll never
			// miss the notification that something has changed so we need to re-evaluate
			// the lease.
		}
	}
}

func (m *Manager) newLeaseLocked(ctrl Controller) Lease {
	if ctrl.placement == PlacementEachServer {
		return eternalLease{}
	}

	ch := make(chan struct{}, 1)
	m.leaseChans = append(m.leaseChans, ch)
	return &raftLease{m: m, ch: ch}
}