diff --git a/consul/rpc.go b/consul/rpc.go index cfc1fc51b5..c6ea007a33 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -5,6 +5,7 @@ import ( "fmt" "github.com/armon/go-metrics" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/yamux" "github.com/inconshreveable/muxado" "github.com/ugorji/go/codec" "io" @@ -21,6 +22,7 @@ const ( rpcRaft rpcMultiplex rpcTLS + rpcMultiplexV2 ) const ( @@ -97,6 +99,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { conn = tls.Server(conn, s.rpcTLS) s.handleConn(conn, true) + case rpcMultiplexV2: + s.handleMultiplexV2(conn) + default: s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v", buf[0]) conn.Close() @@ -105,6 +110,7 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) { } // handleMultiplex is used to multiplex a single incoming connection +// using the Muxado multiplexer func (s *Server) handleMultiplex(conn net.Conn) { defer conn.Close() server := muxado.Server(conn) @@ -120,6 +126,23 @@ func (s *Server) handleMultiplex(conn net.Conn) { } } +// handleMultiplexV2 is used to multiplex a single incoming connection +// using the Yamux multiplexer +func (s *Server) handleMultiplexV2(conn net.Conn) { + defer conn.Close() + server, _ := yamux.Server(conn, nil) + for { + sub, err := server.Accept() + if err != nil { + if err != io.EOF { + s.logger.Printf("[ERR] consul.rpc: multiplex conn accept failed: %v", err) + } + return + } + go s.handleConsulConn(sub) + } +} + // handleConsulConn is used to service a single Consul RPC connection func (s *Server) handleConsulConn(conn net.Conn) { defer conn.Close()