* Fix race condition on read/write of shutdown bool variable of server and connection pool.

* In connection pool, there is no guarantee that .reap() cannot execute the same time as .Shutdown() is called. It also did not benefit to eval shutdown when a select is run on the shutdown channel.
* In server, same principle applies to handleConsulConn. Since we also have a shutdown channel, it makes more to use this than to loop on a bool variable.
This commit is contained in:
Ali Abbas 2014-11-26 10:25:37 +01:00
parent f126bb7381
commit d1b2e41680
2 changed files with 15 additions and 8 deletions

View File

@ -358,12 +358,12 @@ func (p *ConnPool) RPC(addr net.Addr, version int, method string, args interface
// Reap is used to close conns open over maxTime // Reap is used to close conns open over maxTime
func (p *ConnPool) reap() { func (p *ConnPool) reap() {
for !p.shutdown { for {
// Sleep for a while // Sleep for a while
select { select {
case <-time.After(time.Second):
case <-p.shutdownCh: case <-p.shutdownCh:
return return
case <-time.After(time.Second):
} }
// Reap all old conns // Reap all old conns

View File

@ -3,16 +3,17 @@ package consul
import ( import (
"crypto/tls" "crypto/tls"
"fmt" "fmt"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
"io" "io"
"math/rand" "math/rand"
"net" "net"
"strings" "strings"
"time" "time"
"github.com/armon/go-metrics"
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/go-msgpack/codec"
"github.com/hashicorp/yamux"
"github.com/inconshreveable/muxado"
) )
type RPCType byte type RPCType byte
@ -149,7 +150,13 @@ func (s *Server) handleMultiplexV2(conn net.Conn) {
func (s *Server) handleConsulConn(conn net.Conn) { func (s *Server) handleConsulConn(conn net.Conn) {
defer conn.Close() defer conn.Close()
rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle) rpcCodec := codec.GoRpc.ServerCodec(conn, msgpackHandle)
for !s.shutdown { for {
select {
case <-s.shutdownCh:
return
default:
}
if err := s.rpcServer.ServeRequest(rpcCodec); err != nil { if err := s.rpcServer.ServeRequest(rpcCodec); err != nil {
if err != io.EOF && !strings.Contains(err.Error(), "closed") { if err != io.EOF && !strings.Contains(err.Error(), "closed") {
s.logger.Printf("[ERR] consul.rpc: RPC error: %v (%v)", err, conn) s.logger.Printf("[ERR] consul.rpc: RPC error: %v (%v)", err, conn)