Share the post write buffer with the synchronous one in the connection writer

This should prevent messages posted while generating synchronous messages don't get out of order.
This commit is contained in:
Matt Joiner 2018-02-03 14:40:03 +11:00
parent 9b1a769bef
commit 86aabb081c
1 changed files with 14 additions and 13 deletions

View File

@ -93,9 +93,9 @@ type connection struct {
pieceInclination []int
pieceRequestOrder prioritybitmap.PriorityBitmap
postedBuffer bytes.Buffer
uploadTimer *time.Timer
writerCond sync.Cond
writeBuffer bytes.Buffer
uploadTimer *time.Timer
writerCond sync.Cond
}
func (cn *connection) peerHasAllPieces() (all bool, known bool) {
@ -245,7 +245,7 @@ func (cn *connection) PeerHasPiece(piece int) bool {
func (cn *connection) Post(msg pp.Message) {
messageTypesPosted.Add(strconv.FormatInt(int64(msg.Type), 10), 1)
cn.postedBuffer.Write(msg.MustMarshalBinary())
cn.writeBuffer.Write(msg.MustMarshalBinary())
cn.tickleWriter()
}
@ -404,7 +404,7 @@ func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
// connection is writable.
func (cn *connection) writer(keepAliveTimeout time.Duration) {
var (
buf bytes.Buffer
// buf bytes.Buffer
lastWrite time.Time = time.Now()
)
var keepAliveTimer *time.Timer
@ -424,24 +424,25 @@ func (cn *connection) writer(keepAliveTimeout time.Duration) {
if cn.closed.IsSet() {
return
}
buf.Write(cn.postedBuffer.Bytes())
cn.postedBuffer.Reset()
if buf.Len() == 0 {
if cn.writeBuffer.Len() == 0 {
cn.fillWriteBuffer(func(msg pp.Message) bool {
cn.wroteMsg(&msg)
buf.Write(msg.MustMarshalBinary())
return buf.Len() < 1<<16
cn.writeBuffer.Write(msg.MustMarshalBinary())
return cn.writeBuffer.Len() < 1<<16
})
}
if buf.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
buf.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
postedKeepalives.Add(1)
}
if buf.Len() == 0 {
if cn.writeBuffer.Len() == 0 {
// TODO: Minimize wakeups....
cn.writerCond.Wait()
continue
}
var buf bytes.Buffer
buf.Write(cn.writeBuffer.Bytes())
cn.writeBuffer.Reset()
cn.mu().Unlock()
// log.Printf("writing %d bytes", buf.Len())
n, err := cn.w.Write(buf.Bytes())