mirror of https://github.com/status-im/consul.git
Merge pull request #3982 from hashicorp/yamux_update
Update yamux to pick up performance improvements
This commit is contained in:
commit
007ebe4370
|
@ -123,6 +123,12 @@ func (s *Session) IsClosed() bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CloseChan returns a read-only channel which is closed as
|
||||||
|
// soon as the session is closed.
|
||||||
|
func (s *Session) CloseChan() <-chan struct{} {
|
||||||
|
return s.shutdownCh
|
||||||
|
}
|
||||||
|
|
||||||
// NumStreams returns the number of currently open streams
|
// NumStreams returns the number of currently open streams
|
||||||
func (s *Session) NumStreams() int {
|
func (s *Session) NumStreams() int {
|
||||||
s.streamLock.Lock()
|
s.streamLock.Lock()
|
||||||
|
@ -323,8 +329,17 @@ func (s *Session) waitForSend(hdr header, body io.Reader) error {
|
||||||
// potential shutdown. Since there's the expectation that sends can happen
|
// potential shutdown. Since there's the expectation that sends can happen
|
||||||
// in a timely manner, we enforce the connection write timeout here.
|
// in a timely manner, we enforce the connection write timeout here.
|
||||||
func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
|
func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) error {
|
||||||
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
|
t := timerPool.Get()
|
||||||
defer timer.Stop()
|
timer := t.(*time.Timer)
|
||||||
|
timer.Reset(s.config.ConnectionWriteTimeout)
|
||||||
|
defer func() {
|
||||||
|
timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
timerPool.Put(t)
|
||||||
|
}()
|
||||||
|
|
||||||
ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
|
ready := sendReady{Hdr: hdr, Body: body, Err: errCh}
|
||||||
select {
|
select {
|
||||||
|
@ -349,8 +364,17 @@ func (s *Session) waitForSendErr(hdr header, body io.Reader, errCh chan error) e
|
||||||
// the send happens right here, we enforce the connection write timeout if we
|
// the send happens right here, we enforce the connection write timeout if we
|
||||||
// can't queue the header to be sent.
|
// can't queue the header to be sent.
|
||||||
func (s *Session) sendNoWait(hdr header) error {
|
func (s *Session) sendNoWait(hdr header) error {
|
||||||
timer := time.NewTimer(s.config.ConnectionWriteTimeout)
|
t := timerPool.Get()
|
||||||
defer timer.Stop()
|
timer := t.(*time.Timer)
|
||||||
|
timer.Reset(s.config.ConnectionWriteTimeout)
|
||||||
|
defer func() {
|
||||||
|
timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
timerPool.Put(t)
|
||||||
|
}()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case s.sendCh <- sendReady{Hdr: hdr}:
|
case s.sendCh <- sendReady{Hdr: hdr}:
|
||||||
|
@ -408,11 +432,20 @@ func (s *Session) recv() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure that the index of the handler (typeData/typeWindowUpdate/etc) matches the message type
|
||||||
|
var (
|
||||||
|
handlers = []func(*Session, header) error{
|
||||||
|
typeData: (*Session).handleStreamMessage,
|
||||||
|
typeWindowUpdate: (*Session).handleStreamMessage,
|
||||||
|
typePing: (*Session).handlePing,
|
||||||
|
typeGoAway: (*Session).handleGoAway,
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// recvLoop continues to receive data until a fatal error is encountered
|
// recvLoop continues to receive data until a fatal error is encountered
|
||||||
func (s *Session) recvLoop() error {
|
func (s *Session) recvLoop() error {
|
||||||
defer close(s.recvDoneCh)
|
defer close(s.recvDoneCh)
|
||||||
hdr := header(make([]byte, headerSize))
|
hdr := header(make([]byte, headerSize))
|
||||||
var handler func(header) error
|
|
||||||
for {
|
for {
|
||||||
// Read the header
|
// Read the header
|
||||||
if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
|
if _, err := io.ReadFull(s.bufRead, hdr); err != nil {
|
||||||
|
@ -428,22 +461,12 @@ func (s *Session) recvLoop() error {
|
||||||
return ErrInvalidVersion
|
return ErrInvalidVersion
|
||||||
}
|
}
|
||||||
|
|
||||||
// Switch on the type
|
mt := hdr.MsgType()
|
||||||
switch hdr.MsgType() {
|
if mt < typeData || mt > typeGoAway {
|
||||||
case typeData:
|
|
||||||
handler = s.handleStreamMessage
|
|
||||||
case typeWindowUpdate:
|
|
||||||
handler = s.handleStreamMessage
|
|
||||||
case typeGoAway:
|
|
||||||
handler = s.handleGoAway
|
|
||||||
case typePing:
|
|
||||||
handler = s.handlePing
|
|
||||||
default:
|
|
||||||
return ErrInvalidMsgType
|
return ErrInvalidMsgType
|
||||||
}
|
}
|
||||||
|
|
||||||
// Invoke the handler
|
if err := handlers[mt](s, hdr); err != nil {
|
||||||
if err := handler(hdr); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,8 +47,8 @@ type Stream struct {
|
||||||
recvNotifyCh chan struct{}
|
recvNotifyCh chan struct{}
|
||||||
sendNotifyCh chan struct{}
|
sendNotifyCh chan struct{}
|
||||||
|
|
||||||
readDeadline time.Time
|
readDeadline atomic.Value // time.Time
|
||||||
writeDeadline time.Time
|
writeDeadline atomic.Value // time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
// newStream is used to construct a new stream within
|
// newStream is used to construct a new stream within
|
||||||
|
@ -67,6 +67,8 @@ func newStream(session *Session, id uint32, state streamState) *Stream {
|
||||||
recvNotifyCh: make(chan struct{}, 1),
|
recvNotifyCh: make(chan struct{}, 1),
|
||||||
sendNotifyCh: make(chan struct{}, 1),
|
sendNotifyCh: make(chan struct{}, 1),
|
||||||
}
|
}
|
||||||
|
s.readDeadline.Store(time.Time{})
|
||||||
|
s.writeDeadline.Store(time.Time{})
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -122,8 +124,9 @@ START:
|
||||||
WAIT:
|
WAIT:
|
||||||
var timeout <-chan time.Time
|
var timeout <-chan time.Time
|
||||||
var timer *time.Timer
|
var timer *time.Timer
|
||||||
if !s.readDeadline.IsZero() {
|
readDeadline := s.readDeadline.Load().(time.Time)
|
||||||
delay := s.readDeadline.Sub(time.Now())
|
if !readDeadline.IsZero() {
|
||||||
|
delay := readDeadline.Sub(time.Now())
|
||||||
timer = time.NewTimer(delay)
|
timer = time.NewTimer(delay)
|
||||||
timeout = timer.C
|
timeout = timer.C
|
||||||
}
|
}
|
||||||
|
@ -188,7 +191,7 @@ START:
|
||||||
|
|
||||||
// Send the header
|
// Send the header
|
||||||
s.sendHdr.encode(typeData, flags, s.id, max)
|
s.sendHdr.encode(typeData, flags, s.id, max)
|
||||||
if err := s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
|
if err = s.session.waitForSendErr(s.sendHdr, body, s.sendErr); err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -200,8 +203,9 @@ START:
|
||||||
|
|
||||||
WAIT:
|
WAIT:
|
||||||
var timeout <-chan time.Time
|
var timeout <-chan time.Time
|
||||||
if !s.writeDeadline.IsZero() {
|
writeDeadline := s.writeDeadline.Load().(time.Time)
|
||||||
delay := s.writeDeadline.Sub(time.Now())
|
if !writeDeadline.IsZero() {
|
||||||
|
delay := writeDeadline.Sub(time.Now())
|
||||||
timeout = time.After(delay)
|
timeout = time.After(delay)
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
|
@ -238,18 +242,25 @@ func (s *Stream) sendWindowUpdate() error {
|
||||||
|
|
||||||
// Determine the delta update
|
// Determine the delta update
|
||||||
max := s.session.config.MaxStreamWindowSize
|
max := s.session.config.MaxStreamWindowSize
|
||||||
delta := max - atomic.LoadUint32(&s.recvWindow)
|
var bufLen uint32
|
||||||
|
s.recvLock.Lock()
|
||||||
|
if s.recvBuf != nil {
|
||||||
|
bufLen = uint32(s.recvBuf.Len())
|
||||||
|
}
|
||||||
|
delta := (max - bufLen) - s.recvWindow
|
||||||
|
|
||||||
// Determine the flags if any
|
// Determine the flags if any
|
||||||
flags := s.sendFlags()
|
flags := s.sendFlags()
|
||||||
|
|
||||||
// Check if we can omit the update
|
// Check if we can omit the update
|
||||||
if delta < (max/2) && flags == 0 {
|
if delta < (max/2) && flags == 0 {
|
||||||
|
s.recvLock.Unlock()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update our window
|
// Update our window
|
||||||
atomic.AddUint32(&s.recvWindow, delta)
|
s.recvWindow += delta
|
||||||
|
s.recvLock.Unlock()
|
||||||
|
|
||||||
// Send the header
|
// Send the header
|
||||||
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
|
s.controlHdr.encode(typeWindowUpdate, flags, s.id, delta)
|
||||||
|
@ -392,16 +403,18 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
|
||||||
if length == 0 {
|
if length == 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if remain := atomic.LoadUint32(&s.recvWindow); length > remain {
|
|
||||||
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, remain, length)
|
|
||||||
return ErrRecvWindowExceeded
|
|
||||||
}
|
|
||||||
|
|
||||||
// Wrap in a limited reader
|
// Wrap in a limited reader
|
||||||
conn = &io.LimitedReader{R: conn, N: int64(length)}
|
conn = &io.LimitedReader{R: conn, N: int64(length)}
|
||||||
|
|
||||||
// Copy into buffer
|
// Copy into buffer
|
||||||
s.recvLock.Lock()
|
s.recvLock.Lock()
|
||||||
|
|
||||||
|
if length > s.recvWindow {
|
||||||
|
s.session.logger.Printf("[ERR] yamux: receive window exceeded (stream: %d, remain: %d, recv: %d)", s.id, s.recvWindow, length)
|
||||||
|
return ErrRecvWindowExceeded
|
||||||
|
}
|
||||||
|
|
||||||
if s.recvBuf == nil {
|
if s.recvBuf == nil {
|
||||||
// Allocate the receive buffer just-in-time to fit the full data frame.
|
// Allocate the receive buffer just-in-time to fit the full data frame.
|
||||||
// This way we can read in the whole packet without further allocations.
|
// This way we can read in the whole packet without further allocations.
|
||||||
|
@ -414,7 +427,7 @@ func (s *Stream) readData(hdr header, flags uint16, conn io.Reader) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decrement the receive window
|
// Decrement the receive window
|
||||||
atomic.AddUint32(&s.recvWindow, ^uint32(length-1))
|
s.recvWindow -= length
|
||||||
s.recvLock.Unlock()
|
s.recvLock.Unlock()
|
||||||
|
|
||||||
// Unblock any readers
|
// Unblock any readers
|
||||||
|
@ -435,13 +448,13 @@ func (s *Stream) SetDeadline(t time.Time) error {
|
||||||
|
|
||||||
// SetReadDeadline sets the deadline for future Read calls.
|
// SetReadDeadline sets the deadline for future Read calls.
|
||||||
func (s *Stream) SetReadDeadline(t time.Time) error {
|
func (s *Stream) SetReadDeadline(t time.Time) error {
|
||||||
s.readDeadline = t
|
s.readDeadline.Store(t)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetWriteDeadline sets the deadline for future Write calls
|
// SetWriteDeadline sets the deadline for future Write calls
|
||||||
func (s *Stream) SetWriteDeadline(t time.Time) error {
|
func (s *Stream) SetWriteDeadline(t time.Time) error {
|
||||||
s.writeDeadline = t
|
s.writeDeadline.Store(t)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,20 @@
|
||||||
package yamux
|
package yamux
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
timerPool = &sync.Pool{
|
||||||
|
New: func() interface{} {
|
||||||
|
timer := time.NewTimer(time.Hour * 1e6)
|
||||||
|
timer.Stop()
|
||||||
|
return timer
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// asyncSendErr is used to try an async send of an error
|
// asyncSendErr is used to try an async send of an error
|
||||||
func asyncSendErr(ch chan error, err error) {
|
func asyncSendErr(ch chan error, err error) {
|
||||||
if ch == nil {
|
if ch == nil {
|
||||||
|
|
|
@ -72,7 +72,7 @@
|
||||||
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
|
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
|
||||||
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","comment":"v0.7.0-66-g6c4672d","revision":"b6017ae61f4420ed0c02d5eeeb9ff3fc02953f14","revisionTime":"2018-01-19T22:43:00Z"},
|
{"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","comment":"v0.7.0-66-g6c4672d","revision":"b6017ae61f4420ed0c02d5eeeb9ff3fc02953f14","revisionTime":"2018-01-19T22:43:00Z"},
|
||||||
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"QGImnWfhk0ILLZszcf3vRs/Ft7g=","comment":"v0.7.0-66-g6c4672d","revision":"b6017ae61f4420ed0c02d5eeeb9ff3fc02953f14","revisionTime":"2018-01-19T22:43:00Z"},
|
{"path":"github.com/hashicorp/serf/serf","checksumSHA1":"QGImnWfhk0ILLZszcf3vRs/Ft7g=","comment":"v0.7.0-66-g6c4672d","revision":"b6017ae61f4420ed0c02d5eeeb9ff3fc02953f14","revisionTime":"2018-01-19T22:43:00Z"},
|
||||||
{"path":"github.com/hashicorp/yamux","checksumSHA1":"ZhK6IO2XN81Y+3RAjTcVm1Ic7oU=","revision":"d1caa6c97c9fc1cc9e83bbe34d0603f9ff0ce8bd","revisionTime":"2016-07-20T23:31:40Z"},
|
{"path":"github.com/hashicorp/yamux","checksumSHA1":"NnWv17i1tpvBNJtpdRRWpE6j4LY=","revision":"2658be15c5f05e76244154714161f17e3e77de2e","revisionTime":"2018-03-14T20:07:45Z"},
|
||||||
{"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"},
|
{"path":"github.com/mattn/go-isatty","checksumSHA1":"xZuhljnmBysJPta/lMyYmJdujCg=","revision":"66b8e73f3f5cda9f96b69efd03dd3d7fc4a5cdb8","revisionTime":"2016-08-06T12:27:52Z"},
|
||||||
{"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"},
|
{"path":"github.com/miekg/dns","checksumSHA1":"Jo+pItYOocIRdoFL0fc4nHhUEJY=","revision":"bbca4873b326f5dc54bfe31148446d4ed79a5a02","revisionTime":"2017-08-08T22:19:10Z"},
|
||||||
{"path":"github.com/mitchellh/cli","checksumSHA1":"GzfpPGtV2UJH9hFsKwzGjKrhp/A=","revision":"dff723fff508858a44c1f4bd0911f00d73b0202f","revisionTime":"2017-09-05T22:10:09Z"},
|
{"path":"github.com/mitchellh/cli","checksumSHA1":"GzfpPGtV2UJH9hFsKwzGjKrhp/A=","revision":"dff723fff508858a44c1f4bd0911f00d73b0202f","revisionTime":"2017-09-05T22:10:09Z"},
|
||||||
|
|
Loading…
Reference in New Issue