diff --git a/consul/rpc.go b/consul/rpc.go index ebb86aa26b..b5899d465b 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -3,10 +3,12 @@ package consul import ( "fmt" "github.com/hashicorp/consul/consul/structs" + "github.com/inconshreveable/muxado" "github.com/ugorji/go/codec" "io" "math/rand" "net" + "strings" "time" ) @@ -15,6 +17,7 @@ type RPCType byte const ( rpcConsul RPCType = iota rpcRaft + rpcMultiplex ) const ( @@ -62,6 +65,9 @@ func (s *Server) handleConn(conn net.Conn) { case rpcRaft: s.raftLayer.Handoff(conn) + case rpcMultiplex: + s.handleMultiplex(conn) + default: s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v", buf[0]) conn.Close() @@ -69,6 +75,22 @@ func (s *Server) handleConn(conn net.Conn) { } } +// handleMultiplex is used to multiplex a single incoming connection +func (s *Server) handleMultiplex(conn net.Conn) { + defer conn.Close() + server := muxado.Server(conn) + for { + sub, err := server.Accept() + if err != nil { + if !strings.Contains(err.Error(), "closed") { + s.logger.Printf("[ERR] consul.rpc: multiplex conn accept failed: %v", err) + } + return + } + go s.handleConsulConn(sub) + } +} + // handleConsulConn is used to service a single Consul RPC connection func (s *Server) handleConsulConn(conn net.Conn) { defer func() {