mirror of
https://github.com/status-im/consul.git
synced 2025-01-16 08:45:37 +00:00
73b9b407ba
Registering gRPC balancers is thread-unsafe because they are stored in a global map variable that is accessed without holding a lock. Therefore, it's expected that balancers are registered _once_ at the beginning of your program (e.g. in a package `init` function) and certainly not after you've started dialing connections, etc. > NOTE: this function must only be called during initialization time > (i.e. in an init() function), and is not thread-safe. While this is fine for us in production, it's challenging for tests that spin up multiple agents in-memory. We currently register a balancer per- agent which holds agent-specific state that cannot safely be shared. This commit introduces our own registry that _is_ thread-safe, and implements the Builder interface such that we can call gRPC's `Register` method once, on start-up. It uses the same pattern as our resolver registry where we use the dial target's host (aka "authority"), which is unique per-agent, to determine which builder to use.
70 lines
2.1 KiB
Go
70 lines
2.1 KiB
Go
package balancer
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
gbalancer "google.golang.org/grpc/balancer"
|
|
)
|
|
|
|
// BuilderName should be given in gRPC service configuration to enable our
|
|
// custom balancer. It refers to this package's global registry, rather than
|
|
// an instance of Builder to enable us to add and remove builders at runtime,
|
|
// specifically during tests.
|
|
const BuilderName = "consul-internal"
|
|
|
|
// gRPC's balancer.Register method is thread-unsafe because it mutates a global
|
|
// map without holding a lock. As such, it's expected that you register custom
|
|
// balancers once at the start of your program (e.g. a package init function).
|
|
//
|
|
// In production, this is fine. Agents register a single instance of our builder
|
|
// and use it for the duration. Tests are where this becomes problematic, as we
|
|
// spin up several agents in-memory and register/deregister a builder for each,
|
|
// with its own agent-specific state, logger, etc.
|
|
//
|
|
// To avoid data races, we call gRPC's Register method once, on-package init,
|
|
// with a global registry struct that implements the Builder interface but
|
|
// delegates the building to N instances of our Builder that are registered and
|
|
// deregistered at runtime. We the dial target's host (aka "authority") which
|
|
// is unique per-agent to pick the correct builder.
|
|
func init() {
|
|
gbalancer.Register(globalRegistry)
|
|
}
|
|
|
|
var globalRegistry = ®istry{
|
|
byAuthority: make(map[string]*Builder),
|
|
}
|
|
|
|
type registry struct {
|
|
mu sync.RWMutex
|
|
byAuthority map[string]*Builder
|
|
}
|
|
|
|
func (r *registry) Build(cc gbalancer.ClientConn, opts gbalancer.BuildOptions) gbalancer.Balancer {
|
|
r.mu.RLock()
|
|
defer r.mu.RUnlock()
|
|
|
|
auth := opts.Target.URL.Host
|
|
builder, ok := r.byAuthority[auth]
|
|
if !ok {
|
|
panic(fmt.Sprintf("no gRPC balancer builder registered for authority: %q", auth))
|
|
}
|
|
return builder.Build(cc, opts)
|
|
}
|
|
|
|
func (r *registry) Name() string { return BuilderName }
|
|
|
|
func (r *registry) register(auth string, builder *Builder) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
r.byAuthority[auth] = builder
|
|
}
|
|
|
|
func (r *registry) deregister(auth string) {
|
|
r.mu.Lock()
|
|
defer r.mu.Unlock()
|
|
|
|
delete(r.byAuthority, auth)
|
|
}
|