From 40c5af6f73bf1b355caf21106b47b2fc7f8fd1fe Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 12 Aug 2015 18:48:15 -0700 Subject: [PATCH 1/5] Fixes #1165 by having threads wait for any outstanding connect to finish. --- consul/client_test.go | 41 +++++++++++++++++++++ consul/pool.go | 86 ++++++++++++++++++++++++++++++------------- 2 files changed, 101 insertions(+), 26 deletions(-) diff --git a/consul/client_test.go b/consul/client_test.go index 16e751d690..dad7173971 100644 --- a/consul/client_test.go +++ b/consul/client_test.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "os" + "sync" "testing" "time" @@ -189,6 +190,46 @@ func TestClient_RPC(t *testing.T) { }) } +func TestClient_RPC_Pool(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + // Try to join. + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + if len(s1.LANMembers()) != 2 || len(c1.LANMembers()) != 2 { + t.Fatalf("bad len") + } + + // Blast out a bunch of RPC requests at the same time to try to get + // contention opening new connections. + var wg sync.WaitGroup + for i := 0; i < 150; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + var out struct{} + testutil.WaitForResult(func() (bool, error) { + err := c1.RPC("Status.Ping", struct{}{}, &out) + return err == nil, err + }, func(err error) { + t.Fatalf("err: %v", err) + }) + }() + } + + wg.Wait() +} + func TestClient_RPC_TLS(t *testing.T) { dir1, conf1 := testServerConfig(t, "a.testco.internal") conf1.VerifyIncoming = true diff --git a/consul/pool.go b/consul/pool.go index 3512fa6212..5b396b786b 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -134,6 +134,10 @@ type ConnPool struct { // Pool maps an address to a open connection pool map[string]*Conn + // limiter is used to throttle the number of connect attempts + // to a given address. + limiter map[string]chan int + // TLS wrapper tlsWrap tlsutil.DCWrapper @@ -153,6 +157,7 @@ func NewPool(logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap maxTime: maxTime, maxStreams: maxStreams, pool: make(map[string]*Conn), + limiter: make(map[string]chan int), tlsWrap: tlsWrap, shutdownCh: make(chan struct{}), } @@ -180,28 +185,68 @@ func (p *ConnPool) Shutdown() error { return nil } -// Acquire is used to get a connection that is -// pooled or to return a new connection +// acquire will return a pooled connection, if available. Otherwise it will +// wait for an existing connection attempt to finish, if one if in progress, +// and will return that one if it succeeds. If all else fails, it will return a +// newly-created connection and add it to the pool. func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) { - // Check for a pooled ocnn - if conn := p.getPooled(addr, version); conn != nil { - return conn, nil - } - - // Create a new connection - return p.getNewConn(dc, addr, version) -} - -// getPooled is used to return a pooled connection -func (p *ConnPool) getPooled(addr net.Addr, version int) *Conn { + // Check to see if there's a pooled connection available. This is up + // here since it should the the vastly more common case than the rest + // of the code here. p.Lock() c := p.pool[addr.String()] if c != nil { c.lastUsed = time.Now() atomic.AddInt32(&c.refCount, 1) + p.Unlock() + return c, nil + } + + // If not (while we are still locked), set up the throttling structure + // for this address. + var wait chan int + var ok bool + if wait, ok = p.limiter[addr.String()]; !ok { + wait = make(chan int, 1) + p.limiter[addr.String()] = wait } p.Unlock() - return c + + // Now throttle so we don't pound on a server if there are a ton of + // outstanding requests to one server. + wait <- 1 + defer func() { <- wait }() + + // In case we got throttled, check the pool one more time. + p.Lock() + c = p.pool[addr.String()] + if c != nil { + c.lastUsed = time.Now() + atomic.AddInt32(&c.refCount, 1) + p.Unlock() + return c, nil + } + p.Unlock() + + // Go ahead and make a new connection. + c, err := p.getNewConn(dc, addr, version) + if err != nil { + return nil, err + } + + // Return the new connection, adding it to the pool. If the connection + // the throttle was waiting for fails then all the threads will then try + // to connect, so we have to handle that potential race condition. + p.Lock() + if existing := p.pool[addr.String()]; existing != nil { + c.Close() + p.Unlock() + return existing, nil + } + + p.pool[addr.String()] = c + p.Unlock() + return c, nil } // getNewConn is used to return a new connection @@ -272,18 +317,7 @@ func (p *ConnPool) getNewConn(dc string, addr net.Addr, version int) (*Conn, err version: version, pool: p, } - - // Track this connection, handle potential race condition - p.Lock() - if existing := p.pool[addr.String()]; existing != nil { - c.Close() - p.Unlock() - return existing, nil - } else { - p.pool[addr.String()] = c - p.Unlock() - return c, nil - } + return c, nil } // clearConn is used to clear any cached connection, potentially in response to an erro From 8bca3eb644672165ed633ebb2ebc399c7bf9cfac Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 12 Aug 2015 19:17:52 -0700 Subject: [PATCH 2/5] Adds missing ref count for the race condition case. --- consul/pool.go | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/consul/pool.go b/consul/pool.go index 5b396b786b..1c2f86fe54 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -190,14 +190,20 @@ func (p *ConnPool) Shutdown() error { // and will return that one if it succeeds. If all else fails, it will return a // newly-created connection and add it to the pool. func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) { + // markforUse does all the bookkeeping for an existing connection we wish + // to use from the pool. + markForUse := func(c *Conn) { + c.lastUsed = time.Now() + atomic.AddInt32(&c.refCount, 1) + } + // Check to see if there's a pooled connection available. This is up // here since it should the the vastly more common case than the rest // of the code here. p.Lock() c := p.pool[addr.String()] if c != nil { - c.lastUsed = time.Now() - atomic.AddInt32(&c.refCount, 1) + markForUse(c) p.Unlock() return c, nil } @@ -221,8 +227,7 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) p.Lock() c = p.pool[addr.String()] if c != nil { - c.lastUsed = time.Now() - atomic.AddInt32(&c.refCount, 1) + markForUse(c) p.Unlock() return c, nil } @@ -236,10 +241,13 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) // Return the new connection, adding it to the pool. If the connection // the throttle was waiting for fails then all the threads will then try - // to connect, so we have to handle that potential race condition. + // to connect, so we have to handle that potential race condition and + // scuttle the connection we just made if someone else got there first. p.Lock() if existing := p.pool[addr.String()]; existing != nil { c.Close() + + markForUse(existing) p.Unlock() return existing, nil } From 072811f2898ee992af06b0eb5a9f3d09850f8be1 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Wed, 12 Aug 2015 20:14:48 -0700 Subject: [PATCH 3/5] Gets rid of follow up attempts if the lead thread can't connect. --- consul/pool.go | 79 +++++++++++++++++++++++++++----------------------- 1 file changed, 43 insertions(+), 36 deletions(-) diff --git a/consul/pool.go b/consul/pool.go index 1c2f86fe54..e6c7220eb3 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -135,8 +135,10 @@ type ConnPool struct { pool map[string]*Conn // limiter is used to throttle the number of connect attempts - // to a given address. - limiter map[string]chan int + // to a given address. The first thread will attempt a connection + // and put a channel in here, which all other threads will wait + // on to close. + limiter map[string]chan struct{} // TLS wrapper tlsWrap tlsutil.DCWrapper @@ -157,7 +159,7 @@ func NewPool(logOutput io.Writer, maxTime time.Duration, maxStreams int, tlsWrap maxTime: maxTime, maxStreams: maxStreams, pool: make(map[string]*Conn), - limiter: make(map[string]chan int), + limiter: make(map[string]chan struct{}), tlsWrap: tlsWrap, shutdownCh: make(chan struct{}), } @@ -209,52 +211,57 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) } // If not (while we are still locked), set up the throttling structure - // for this address. - var wait chan int + // for this address, which will make everyone else wait until our + // attempt is done. + var wait chan struct{} var ok bool if wait, ok = p.limiter[addr.String()]; !ok { - wait = make(chan int, 1) + wait = make(chan struct{}, 1) p.limiter[addr.String()] = wait } + isLeadThread := !ok p.Unlock() - // Now throttle so we don't pound on a server if there are a ton of - // outstanding requests to one server. - wait <- 1 - defer func() { <- wait }() + // If we are the lead thread, make the new connection and then wake + // everybody else up to see if we got it. + if isLeadThread { + defer func() { + p.Lock() + delete(p.limiter, addr.String()) + p.Unlock() - // In case we got throttled, check the pool one more time. + close(wait) + }() + + c, err := p.getNewConn(dc, addr, version) + if err != nil { + return nil, err + } + + p.Lock() + p.pool[addr.String()] = c + p.Unlock() + return c, nil + } + + // Otherwise, wait for the lead thread to attempt the connection + // and use what's in the pool at that point. + select { + case <-p.shutdownCh: + return nil, fmt.Errorf("rpc error: shutdown") + case <-wait: + } + + // See if the lead thread was able to get us a connection. p.Lock() - c = p.pool[addr.String()] - if c != nil { + if c := p.pool[addr.String()]; c != nil { markForUse(c) p.Unlock() return c, nil } + p.Unlock() - - // Go ahead and make a new connection. - c, err := p.getNewConn(dc, addr, version) - if err != nil { - return nil, err - } - - // Return the new connection, adding it to the pool. If the connection - // the throttle was waiting for fails then all the threads will then try - // to connect, so we have to handle that potential race condition and - // scuttle the connection we just made if someone else got there first. - p.Lock() - if existing := p.pool[addr.String()]; existing != nil { - c.Close() - - markForUse(existing) - p.Unlock() - return existing, nil - } - - p.pool[addr.String()] = c - p.Unlock() - return c, nil + return nil, fmt.Errorf("rpc error: lead thread didn't get connection") } // getNewConn is used to return a new connection From 1e8937b52bc5d3f92bb2fb88673b9d7410d81d00 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 13 Aug 2015 10:01:05 -0700 Subject: [PATCH 4/5] Cleans up locking and factors markForUse into a Conn method. --- consul/pool.go | 30 ++++++++++++------------------ 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/consul/pool.go b/consul/pool.go index e6c7220eb3..56fa80e0ab 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -114,6 +114,12 @@ func (c *Conn) returnClient(client *StreamClient) { } } +// markForUse does all the bookkeeping required to ready a connection for use. +func (c *Conn) markForUse() { + c.lastUsed = time.Now() + atomic.AddInt32(&c.refCount, 1) +} + // ConnPool is used to maintain a connection pool to other // Consul servers. This is used to reduce the latency of // RPC requests between servers. It is only used to pool @@ -192,20 +198,13 @@ func (p *ConnPool) Shutdown() error { // and will return that one if it succeeds. If all else fails, it will return a // newly-created connection and add it to the pool. func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) { - // markforUse does all the bookkeeping for an existing connection we wish - // to use from the pool. - markForUse := func(c *Conn) { - c.lastUsed = time.Now() - atomic.AddInt32(&c.refCount, 1) - } - // Check to see if there's a pooled connection available. This is up // here since it should the the vastly more common case than the rest // of the code here. p.Lock() c := p.pool[addr.String()] if c != nil { - markForUse(c) + c.markForUse() p.Unlock() return c, nil } @@ -225,20 +224,15 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) // If we are the lead thread, make the new connection and then wake // everybody else up to see if we got it. if isLeadThread { - defer func() { - p.Lock() - delete(p.limiter, addr.String()) - p.Unlock() - - close(wait) - }() - c, err := p.getNewConn(dc, addr, version) + p.Lock() + delete(p.limiter, addr.String()) + close(wait) if err != nil { + p.Unlock() return nil, err } - p.Lock() p.pool[addr.String()] = c p.Unlock() return c, nil @@ -255,7 +249,7 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) // See if the lead thread was able to get us a connection. p.Lock() if c := p.pool[addr.String()]; c != nil { - markForUse(c) + c.markForUse() p.Unlock() return c, nil } From 614bf446fc78111ab91d1a3b739bc8e20289400c Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 13 Aug 2015 11:38:02 -0700 Subject: [PATCH 5/5] Changes to an unbuffered channel, since we just close it. --- consul/pool.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/consul/pool.go b/consul/pool.go index 56fa80e0ab..0cd0a99dfd 100644 --- a/consul/pool.go +++ b/consul/pool.go @@ -215,7 +215,7 @@ func (p *ConnPool) acquire(dc string, addr net.Addr, version int) (*Conn, error) var wait chan struct{} var ok bool if wait, ok = p.limiter[addr.String()]; !ok { - wait = make(chan struct{}, 1) + wait = make(chan struct{}) p.limiter[addr.String()] = wait } isLeadThread := !ok