mirror of
https://github.com/status-im/consul.git
synced 2025-01-12 14:55:02 +00:00
200c0fb3e9
This ensures that servers are removed from RPC routing when they are reaped.
73 lines
2.0 KiB
Go
73 lines
2.0 KiB
Go
package router
|
|
|
|
import (
|
|
"log"
|
|
|
|
"github.com/hashicorp/consul/agent/metadata"
|
|
"github.com/hashicorp/consul/types"
|
|
"github.com/hashicorp/serf/serf"
|
|
)
|
|
|
|
// routerFn selects one of the router operations to map to incoming Serf events.
|
|
type routerFn func(types.AreaID, *metadata.Server) error
|
|
|
|
// handleMemberEvents attempts to apply the given Serf member event to the given
|
|
// router function.
|
|
func handleMemberEvent(logger *log.Logger, fn routerFn, areaID types.AreaID, e serf.Event) {
|
|
me, ok := e.(serf.MemberEvent)
|
|
if !ok {
|
|
logger.Printf("[ERR] consul: Bad event type %#v", e)
|
|
return
|
|
}
|
|
|
|
for _, m := range me.Members {
|
|
ok, parts := metadata.IsConsulServer(m)
|
|
if !ok {
|
|
logger.Printf("[WARN]: consul: Non-server %q in server-only area %q",
|
|
m.Name, areaID)
|
|
continue
|
|
}
|
|
|
|
if err := fn(areaID, parts); err != nil {
|
|
logger.Printf("[ERR] consul: Failed to process %s event for server %q in area %q: %v",
|
|
me.Type.String(), m.Name, areaID, err)
|
|
continue
|
|
}
|
|
|
|
logger.Printf("[INFO] consul: Handled %s event for server %q in area %q",
|
|
me.Type.String(), m.Name, areaID)
|
|
}
|
|
}
|
|
|
|
// HandleSerfEvents is a long-running goroutine that pushes incoming events from
|
|
// a Serf manager's channel into the given router. This will return when the
|
|
// shutdown channel is closed.
|
|
func HandleSerfEvents(logger *log.Logger, router *Router, areaID types.AreaID, shutdownCh <-chan struct{}, eventCh <-chan serf.Event) {
|
|
for {
|
|
select {
|
|
case <-shutdownCh:
|
|
return
|
|
|
|
case e := <-eventCh:
|
|
switch e.EventType() {
|
|
case serf.EventMemberJoin:
|
|
handleMemberEvent(logger, router.AddServer, areaID, e)
|
|
|
|
case serf.EventMemberLeave, serf.EventMemberReap:
|
|
handleMemberEvent(logger, router.RemoveServer, areaID, e)
|
|
|
|
case serf.EventMemberFailed:
|
|
handleMemberEvent(logger, router.FailServer, areaID, e)
|
|
|
|
// All of these event types are ignored.
|
|
case serf.EventMemberUpdate:
|
|
case serf.EventUser:
|
|
case serf.EventQuery:
|
|
|
|
default:
|
|
logger.Printf("[WARN] consul: Unhandled Serf Event: %#v", e)
|
|
}
|
|
}
|
|
}
|
|
}
|