consul: Support for incoming Yamux connections

This commit is contained in:
Armon Dadgar 2014-05-27 11:00:35 -07:00
parent cb8cac40f3
commit b03ead39a1

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"github.com/armon/go-metrics" "github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado" "github.com/inconshreveable/muxado"
"github.com/ugorji/go/codec" "github.com/ugorji/go/codec"
"io" "io"
@ -21,6 +22,7 @@ const (
rpcRaft rpcRaft
rpcMultiplex rpcMultiplex
rpcTLS rpcTLS
rpcMultiplexV2
) )
const ( const (
@ -97,6 +99,9 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
conn = tls.Server(conn, s.rpcTLS) conn = tls.Server(conn, s.rpcTLS)
s.handleConn(conn, true) s.handleConn(conn, true)
case rpcMultiplexV2:
s.handleMultiplexV2(conn)
default: default:
s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v", buf[0]) s.logger.Printf("[ERR] consul.rpc: unrecognized RPC byte: %v", buf[0])
conn.Close() conn.Close()
@ -105,6 +110,7 @@ func (s *Server) handleConn(conn net.Conn, isTLS bool) {
} }
// handleMultiplex is used to multiplex a single incoming connection // handleMultiplex is used to multiplex a single incoming connection
// using the Muxado multiplexer
func (s *Server) handleMultiplex(conn net.Conn) { func (s *Server) handleMultiplex(conn net.Conn) {
defer conn.Close() defer conn.Close()
server := muxado.Server(conn) server := muxado.Server(conn)
@ -120,6 +126,23 @@ func (s *Server) handleMultiplex(conn net.Conn) {
} }
} }
// handleMultiplexV2 is used to multiplex a single incoming connection
// using the Yamux multiplexer
func (s *Server) handleMultiplexV2(conn net.Conn) {
defer conn.Close()
server, _ := yamux.Server(conn, nil)
for {
sub, err := server.Accept()
if err != nil {
if err != io.EOF {
s.logger.Printf("[ERR] consul.rpc: multiplex conn accept failed: %v", err)
}
return
}
go s.handleConsulConn(sub)
}
}
// handleConsulConn is used to service a single Consul RPC connection // handleConsulConn is used to service a single Consul RPC connection
func (s *Server) handleConsulConn(conn net.Conn) { func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close() defer conn.Close()