diff --git a/agent/proxycfg/manager.go b/agent/proxycfg/manager.go index 71b6270fef..b01787f2c1 100644 --- a/agent/proxycfg/manager.go +++ b/agent/proxycfg/manager.go @@ -5,6 +5,7 @@ package proxycfg import ( "errors" + "github.com/hashicorp/consul/lib/channels" "runtime/debug" "sync" @@ -259,37 +260,15 @@ func (m *Manager) notify(snap *ConfigSnapshot) { // it will drain the chan and then re-attempt delivery so that a slow consumer // gets the latest config earlier. This MUST be called from a method where m.mu // is held to be safe since it assumes we are the only goroutine sending on ch. -func (m *Manager) deliverLatest(snap *ConfigSnapshot, ch chan proxysnapshot.ProxySnapshot) { - // Send if chan is empty - select { - case ch <- snap: - return - default: - } - - // Not empty, drain the chan of older snapshots and redeliver. For now we only - // use 1-buffered chans but this will still work if we change that later. -OUTER: - for { - select { - case <-ch: - continue - default: - break OUTER - } - } - - // Now send again - select { - case ch <- snap: - return - default: - // This should not be possible since we should be the only sender, enforced - // by m.mu but error and drop the update rather than panic. - m.Logger.Error("failed to deliver ConfigSnapshot to proxy", - "proxy", snap.ProxyID.String(), +func (m *Manager) deliverLatest(snap proxysnapshot.ProxySnapshot, ch chan proxysnapshot.ProxySnapshot) { + m.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", snap.(*ConfigSnapshot).ProxyID) + err := channels.DeliverLatest(snap, ch) + if err != nil { + m.Logger.Error("failed to deliver proxyState to proxy", + "proxy", snap.(*ConfigSnapshot).ProxyID, ) } + } // Watch registers a watch on a proxy. It might not exist yet in which case this diff --git a/internal/mesh/proxy-tracker/proxy_tracker.go b/internal/mesh/proxy-tracker/proxy_tracker.go index d9dae03a13..f60cacb5cd 100644 --- a/internal/mesh/proxy-tracker/proxy_tracker.go +++ b/internal/mesh/proxy-tracker/proxy_tracker.go @@ -6,6 +6,7 @@ package proxytracker import ( "errors" "fmt" + "github.com/hashicorp/consul/lib/channels" "sync" "github.com/hashicorp/go-hclog" @@ -207,32 +208,8 @@ func (pt *ProxyTracker) PushChange(proxyID *pbresource.ID, proxyState proxysnaps func (pt *ProxyTracker) deliverLatest(proxyID *pbresource.ID, proxyState proxysnapshot.ProxySnapshot, ch chan proxysnapshot.ProxySnapshot) { pt.config.Logger.Trace("delivering latest proxy snapshot to proxy", "proxyID", proxyID) - // Send if chan is empty - select { - case ch <- proxyState: - return - default: - } - - // Not empty, drain the chan of older snapshots and redeliver. For now we only - // use 1-buffered chans but this will still work if we change that later. -OUTER: - for { - select { - case <-ch: - continue - default: - break OUTER - } - } - - // Now send again - select { - case ch <- proxyState: - return - default: - // This should not be possible since we should be the only sender, enforced - // by m.mu but error and drop the update rather than panic. + err := channels.DeliverLatest(proxyState, ch) + if err != nil { pt.config.Logger.Error("failed to deliver proxyState to proxy", "proxy", proxyID.String(), ) diff --git a/lib/channels/deliver_latest.go b/lib/channels/deliver_latest.go new file mode 100644 index 0000000000..2e43c135e9 --- /dev/null +++ b/lib/channels/deliver_latest.go @@ -0,0 +1,35 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 +package channels + +import "fmt" + +// DeliverLatest will drain the channel discarding any messages if there are any and sends the current message. +func DeliverLatest[T any](val T, ch chan T) error { + // Send if chan is empty + select { + case ch <- val: + return nil + default: + } + + // If it falls through to here, the channel is not empty. + // Drain the channel. + done := false + for !done { + select { + case <-ch: + continue + default: + done = true + } + } + + // Attempt to send again. If it is not empty, throw an error + select { + case ch <- val: + return nil + default: + return fmt.Errorf("failed to deliver latest event: chan full again after draining") + } +}