Move peerConnMsgWriter into its own file
This commit is contained in:
parent
24ceed61da
commit
d37dea1f61
40
client.go
40
client.go
|
@ -2,7 +2,6 @@ package torrent
|
|||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
|
@ -21,13 +20,11 @@ import (
|
|||
"github.com/anacrolix/missinggo/perf"
|
||||
"github.com/anacrolix/missinggo/pubsub"
|
||||
"github.com/anacrolix/missinggo/slices"
|
||||
"github.com/anacrolix/missinggo/v2"
|
||||
"github.com/anacrolix/missinggo/v2/bitmap"
|
||||
"github.com/anacrolix/missinggo/v2/conntrack"
|
||||
"github.com/anacrolix/missinggo/v2/pproffd"
|
||||
"github.com/anacrolix/sync"
|
||||
"github.com/anacrolix/torrent/internal/limiter"
|
||||
request_strategy "github.com/anacrolix/torrent/request-strategy"
|
||||
"github.com/anacrolix/torrent/tracker"
|
||||
"github.com/anacrolix/torrent/webtorrent"
|
||||
"github.com/davecgh/go-spew/spew"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/google/btree"
|
||||
|
@ -35,15 +32,16 @@ import (
|
|||
"golang.org/x/time/rate"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/anacrolix/missinggo/v2"
|
||||
"github.com/anacrolix/missinggo/v2/conntrack"
|
||||
|
||||
"github.com/anacrolix/torrent/bencode"
|
||||
"github.com/anacrolix/torrent/internal/limiter"
|
||||
"github.com/anacrolix/torrent/iplist"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/mse"
|
||||
pp "github.com/anacrolix/torrent/peer_protocol"
|
||||
request_strategy "github.com/anacrolix/torrent/request-strategy"
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
"github.com/anacrolix/torrent/tracker"
|
||||
"github.com/anacrolix/torrent/webtorrent"
|
||||
)
|
||||
|
||||
// Clients contain zero or more Torrents. A Client manages a blocklist, the
|
||||
|
@ -969,32 +967,6 @@ func (cl *Client) runHandshookConn(c *PeerConn, t *Torrent) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (pc *PeerConn) startWriter() {
|
||||
w := &pc.messageWriter
|
||||
*w = peerConnWriter{
|
||||
fillWriteBuffer: func() {
|
||||
pc.locker().Lock()
|
||||
defer pc.locker().Unlock()
|
||||
pc.fillWriteBuffer()
|
||||
},
|
||||
closed: &pc.closed,
|
||||
logger: pc.logger,
|
||||
w: pc.w,
|
||||
keepAlive: func() bool {
|
||||
pc.locker().Lock()
|
||||
defer pc.locker().Unlock()
|
||||
return pc.useful()
|
||||
},
|
||||
writeBuffer: new(bytes.Buffer),
|
||||
}
|
||||
go func() {
|
||||
defer pc.locker().Unlock()
|
||||
defer pc.close()
|
||||
defer pc.locker().Lock()
|
||||
pc.messageWriter.run(time.Minute)
|
||||
}()
|
||||
}
|
||||
|
||||
// Maximum pending requests we allow peers to send us. If peer requests are buffered on read, this
|
||||
// instructs the amount of memory that might be used to cache pending writes. Assuming 512KiB
|
||||
// (1<<19) cached for sending, for 16KiB (1<<14) chunks.
|
||||
|
|
86
peerconn.go
86
peerconn.go
|
@ -18,7 +18,6 @@ import (
|
|||
"github.com/anacrolix/missinggo/v2/bitmap"
|
||||
"github.com/anacrolix/missinggo/v2/prioritybitmap"
|
||||
"github.com/anacrolix/multiless"
|
||||
"github.com/anacrolix/sync"
|
||||
|
||||
"github.com/anacrolix/torrent/bencode"
|
||||
"github.com/anacrolix/torrent/internal/chansync"
|
||||
|
@ -148,7 +147,7 @@ type PeerConn struct {
|
|||
w io.Writer
|
||||
r io.Reader
|
||||
|
||||
messageWriter peerConnWriter
|
||||
messageWriter peerConnMsgWriter
|
||||
|
||||
uploadTimer *time.Timer
|
||||
pex pexConnState
|
||||
|
@ -438,7 +437,7 @@ func (cn *PeerConn) write(msg pp.Message) bool {
|
|||
return notFull
|
||||
}
|
||||
|
||||
func (cn *peerConnWriter) write(msg pp.Message) bool {
|
||||
func (cn *peerConnMsgWriter) write(msg pp.Message) bool {
|
||||
cn.mu.Lock()
|
||||
defer cn.mu.Unlock()
|
||||
cn.writeBuffer.Write(msg.MustMarshalBinary())
|
||||
|
@ -446,7 +445,7 @@ func (cn *peerConnWriter) write(msg pp.Message) bool {
|
|||
return !cn.writeBufferFull()
|
||||
}
|
||||
|
||||
func (cn *peerConnWriter) writeBufferFull() bool {
|
||||
func (cn *peerConnMsgWriter) writeBufferFull() bool {
|
||||
return cn.writeBuffer.Len() >= writeBufferHighWaterLen
|
||||
}
|
||||
|
||||
|
@ -643,85 +642,6 @@ func (cn *PeerConn) fillWriteBuffer() {
|
|||
cn.upload(cn.write)
|
||||
}
|
||||
|
||||
type peerConnWriter struct {
|
||||
// Must not be called with the local mutex held, as it will call back into the write method.
|
||||
fillWriteBuffer func()
|
||||
closed *chansync.SetOnce
|
||||
logger log.Logger
|
||||
w io.Writer
|
||||
keepAlive func() bool
|
||||
|
||||
mu sync.Mutex
|
||||
writeCond chansync.BroadcastCond
|
||||
// Pointer so we can swap with the "front buffer".
|
||||
writeBuffer *bytes.Buffer
|
||||
}
|
||||
|
||||
// Routine that writes to the peer. Some of what to write is buffered by
|
||||
// activity elsewhere in the Client, and some is determined locally when the
|
||||
// connection is writable.
|
||||
func (cn *peerConnWriter) run(keepAliveTimeout time.Duration) {
|
||||
var (
|
||||
lastWrite time.Time = time.Now()
|
||||
keepAliveTimer *time.Timer
|
||||
)
|
||||
keepAliveTimer = time.AfterFunc(keepAliveTimeout, func() {
|
||||
cn.mu.Lock()
|
||||
defer cn.mu.Unlock()
|
||||
if time.Since(lastWrite) >= keepAliveTimeout {
|
||||
cn.writeCond.Broadcast()
|
||||
}
|
||||
keepAliveTimer.Reset(keepAliveTimeout)
|
||||
})
|
||||
cn.mu.Lock()
|
||||
defer cn.mu.Unlock()
|
||||
defer keepAliveTimer.Stop()
|
||||
frontBuf := new(bytes.Buffer)
|
||||
for {
|
||||
if cn.closed.IsSet() {
|
||||
return
|
||||
}
|
||||
if cn.writeBuffer.Len() == 0 {
|
||||
func() {
|
||||
cn.mu.Unlock()
|
||||
defer cn.mu.Lock()
|
||||
cn.fillWriteBuffer()
|
||||
}()
|
||||
}
|
||||
if cn.writeBuffer.Len() == 0 && time.Since(lastWrite) >= keepAliveTimeout && cn.keepAlive() {
|
||||
cn.writeBuffer.Write(pp.Message{Keepalive: true}.MustMarshalBinary())
|
||||
torrent.Add("written keepalives", 1)
|
||||
}
|
||||
if cn.writeBuffer.Len() == 0 {
|
||||
writeCond := cn.writeCond.WaitChan()
|
||||
cn.mu.Unlock()
|
||||
select {
|
||||
case <-cn.closed.Chan():
|
||||
case <-writeCond:
|
||||
}
|
||||
cn.mu.Lock()
|
||||
continue
|
||||
}
|
||||
// Flip the buffers.
|
||||
frontBuf, cn.writeBuffer = cn.writeBuffer, frontBuf
|
||||
cn.mu.Unlock()
|
||||
n, err := cn.w.Write(frontBuf.Bytes())
|
||||
cn.mu.Lock()
|
||||
if n != 0 {
|
||||
lastWrite = time.Now()
|
||||
keepAliveTimer.Reset(keepAliveTimeout)
|
||||
}
|
||||
if err != nil {
|
||||
cn.logger.WithDefaultLevel(log.Debug).Printf("error writing: %v", err)
|
||||
return
|
||||
}
|
||||
if n != frontBuf.Len() {
|
||||
panic("short write")
|
||||
}
|
||||
frontBuf.Reset()
|
||||
}
|
||||
}
|
||||
|
||||
func (cn *PeerConn) have(piece pieceIndex) {
|
||||
if cn.sentHaves.Get(bitmap.BitIndex(piece)) {
|
||||
return
|
||||
|
|
Loading…
Reference in New Issue