2
0
mirror of synced 2025-02-22 13:48:21 +00:00
torrent/peer-conn-msg-writer.go
Matt Joiner fed765b2a0
Fix write error handling
Fixes https://github.com/anacrolix/torrent/issues/798.

Prior to this fix, it looks like the writer would just keep writing chunks of the front buffer (incorrectly if there was an error), until presumably the writer would be killed by read hangup elsewhere.
2022-12-25 19:24:16 +11:00

131 lines
3.1 KiB
Go

package torrent
import (
"bytes"
"io"
"time"
"github.com/anacrolix/chansync"
"github.com/anacrolix/log"
"github.com/anacrolix/sync"
pp "github.com/anacrolix/torrent/peer_protocol"
)
func (pc *PeerConn) initMessageWriter() {
w := &pc.messageWriter
*w = peerConnMsgWriter{
fillWriteBuffer: func() {
pc.locker().Lock()
defer pc.locker().Unlock()
if pc.closed.IsSet() {
return
}
pc.fillWriteBuffer()
},
closed: &pc.closed,
logger: pc.logger,
w: pc.w,
keepAlive: func() bool {
pc.locker().RLock()
defer pc.locker().RUnlock()
return pc.useful()
},
writeBuffer: new(bytes.Buffer),
}
}
func (pc *PeerConn) startMessageWriter() {
pc.initMessageWriter()
go pc.messageWriterRunner()
}
func (pc *PeerConn) messageWriterRunner() {
defer pc.locker().Unlock()
defer pc.close()
defer pc.locker().Lock()
pc.messageWriter.run(pc.t.cl.config.KeepAliveTimeout)
}
type peerConnMsgWriter struct {
// Must not be called with the local mutex held, as it will call back into the write method.
fillWriteBuffer func()
closed *chansync.SetOnce
logger log.Logger
w io.Writer
keepAlive func() bool
mu sync.Mutex
writeCond chansync.BroadcastCond
// Pointer so we can swap with the "front buffer".
writeBuffer *bytes.Buffer
}
// Routine that writes to the peer. Some of what to write is buffered by
// activity elsewhere in the Client, and some is determined locally when the
// connection is writable.
func (cn *peerConnMsgWriter) run(keepAliveTimeout time.Duration) {
lastWrite := time.Now()
keepAliveTimer := time.NewTimer(keepAliveTimeout)
frontBuf := new(bytes.Buffer)
for {
if cn.closed.IsSet() {
return
}
cn.fillWriteBuffer()
keepAlive := cn.keepAlive()
cn.mu.Lock()
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && keepAlive {
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
torrent.Add("written keepalives", 1)
}
if cn.writeBuffer.Len() == 0 {
writeCond := cn.writeCond.Signaled()
cn.mu.Unlock()
select {
case <-cn.closed.Done():
case <-writeCond:
case <-keepAliveTimer.C:
}
continue
}
// Flip the buffers.
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
cn.mu.Unlock()
if frontBuf.Len() == 0 {
panic("expected non-empty front buffer")
}
var err error
for frontBuf.Len() != 0 {
// Limit write size for WebRTC. See https://github.com/pion/datachannel/issues/59.
next := frontBuf.Next(1<<16 - 1)
var n int
n, err = cn.w.Write(next)
if err == nil && n != len(next) {
panic("expected full write")
}
if err != nil {
break
}
}
if err != nil {
cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
return
}
lastWrite = time.Now()
keepAliveTimer.Reset(keepAliveTimeout)
}
}
func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
cn.mu.Lock()
defer cn.mu.Unlock()
cn.writeBuffer.Write(msg.MustMarshalBinary())
cn.writeCond.Broadcast()
return !cn.writeBufferFull()
}
func (cn *peerConnMsgWriter) writeBufferFull() bool {
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
}