Because of the pseudo-random selection of a communication in a select statement, flushing was occuring prematurely. Also the buffer was needlessly large. For large messages, like outgoing pieces, it's probably better they just go straight to the wire anyway. This change will make it easier to implement sending of pieces that doesn't block control messages in the future. Any time that the buffer is empty, we'll be able to signal that it's an optimal time to send a piece.
547 lines
12 KiB
Go
547 lines
12 KiB
Go
package torrent
|
|
|
|
import (
|
|
"bufio"
|
|
"bytes"
|
|
"container/list"
|
|
"encoding"
|
|
"errors"
|
|
"expvar"
|
|
"fmt"
|
|
"io"
|
|
"net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/anacrolix/torrent/bencode"
|
|
"github.com/anacrolix/torrent/internal/pieceordering"
|
|
pp "github.com/anacrolix/torrent/peer_protocol"
|
|
)
|
|
|
|
var optimizedCancels = expvar.NewInt("optimizedCancels")
|
|
|
|
type peerSource byte
|
|
|
|
const (
|
|
peerSourceIncoming = 'I'
|
|
peerSourceDHT = 'H'
|
|
peerSourcePEX = 'X'
|
|
)
|
|
|
|
// Maintains the state of a connection with a peer.
|
|
type connection struct {
|
|
conn net.Conn
|
|
rw io.ReadWriter // The real slim shady
|
|
encrypted bool
|
|
Discovery peerSource
|
|
uTP bool
|
|
closing chan struct{}
|
|
mu sync.Mutex // Only for closing.
|
|
post chan pp.Message
|
|
writeCh chan []byte
|
|
|
|
// The connection's preferred order to download pieces. The index is the
|
|
// piece, the value is its priority.
|
|
piecePriorities []int
|
|
// The piece request order based on piece priorities.
|
|
pieceRequestOrder *pieceordering.Instance
|
|
|
|
UnwantedChunksReceived int
|
|
UsefulChunksReceived int
|
|
chunksSent int
|
|
|
|
lastMessageReceived time.Time
|
|
completedHandshake time.Time
|
|
lastUsefulChunkReceived time.Time
|
|
lastChunkSent time.Time
|
|
|
|
// Stuff controlled by the local peer.
|
|
Interested bool
|
|
Choked bool
|
|
Requests map[request]struct{}
|
|
requestsLowWater int
|
|
// Indexed by metadata piece, set to true if posted and pending a
|
|
// response.
|
|
metadataRequests []bool
|
|
|
|
// Stuff controlled by the remote peer.
|
|
PeerID [20]byte
|
|
PeerInterested bool
|
|
PeerChoked bool
|
|
PeerRequests map[request]struct{}
|
|
PeerExtensionBytes peerExtensionBytes
|
|
// Whether the peer has the given piece. nil if they've not sent any
|
|
// related messages yet.
|
|
PeerPieces []bool
|
|
peerHasAll bool
|
|
|
|
PeerMaxRequests int // Maximum pending requests the peer allows.
|
|
PeerExtensionIDs map[string]byte
|
|
PeerClientName string
|
|
}
|
|
|
|
func newConnection() (c *connection) {
|
|
c = &connection{
|
|
Choked: true,
|
|
PeerChoked: true,
|
|
PeerMaxRequests: 250,
|
|
|
|
closing: make(chan struct{}),
|
|
writeCh: make(chan []byte),
|
|
post: make(chan pp.Message),
|
|
}
|
|
return
|
|
}
|
|
|
|
func (cn *connection) remoteAddr() net.Addr {
|
|
return cn.conn.RemoteAddr()
|
|
}
|
|
|
|
func (cn *connection) localAddr() net.Addr {
|
|
return cn.conn.LocalAddr()
|
|
}
|
|
|
|
// Adjust piece position in the request order for this connection based on the
|
|
// given piece priority.
|
|
func (cn *connection) pendPiece(piece int, priority piecePriority) {
|
|
if priority == PiecePriorityNone {
|
|
cn.pieceRequestOrder.DeletePiece(piece)
|
|
return
|
|
}
|
|
pp := cn.piecePriorities[piece]
|
|
// Priority regions not to scale. Within each region, piece is randomized
|
|
// according to connection.
|
|
|
|
// <-request first -- last->
|
|
// [ Now ]
|
|
// [ Next ]
|
|
// [ Readahead ]
|
|
// [ Normal ]
|
|
key := func() int {
|
|
switch priority {
|
|
case PiecePriorityNow:
|
|
return -3*len(cn.piecePriorities) + 3*pp
|
|
case PiecePriorityNext:
|
|
return -2*len(cn.piecePriorities) + 2*pp
|
|
case PiecePriorityReadahead:
|
|
return -len(cn.piecePriorities) + pp
|
|
case PiecePriorityNormal:
|
|
return pp
|
|
default:
|
|
panic(priority)
|
|
}
|
|
}()
|
|
cn.pieceRequestOrder.SetPiece(piece, key)
|
|
}
|
|
|
|
func (cn *connection) supportsExtension(ext string) bool {
|
|
_, ok := cn.PeerExtensionIDs[ext]
|
|
return ok
|
|
}
|
|
|
|
func (cn *connection) completedString(t *torrent) string {
|
|
if cn.PeerPieces == nil && !cn.peerHasAll {
|
|
return "?"
|
|
}
|
|
return fmt.Sprintf("%d/%d", func() int {
|
|
if cn.peerHasAll {
|
|
if t.haveInfo() {
|
|
return t.numPieces()
|
|
}
|
|
return -1
|
|
}
|
|
ret := 0
|
|
for _, b := range cn.PeerPieces {
|
|
if b {
|
|
ret++
|
|
}
|
|
}
|
|
return ret
|
|
}(), func() int {
|
|
if cn.peerHasAll || cn.PeerPieces == nil {
|
|
if t.haveInfo() {
|
|
return t.numPieces()
|
|
}
|
|
return -1
|
|
}
|
|
return len(cn.PeerPieces)
|
|
}())
|
|
}
|
|
|
|
// Correct the PeerPieces slice length. Return false if the existing slice is
|
|
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
|
|
// messages.
|
|
func (cn *connection) setNumPieces(num int) error {
|
|
if cn.peerHasAll {
|
|
return nil
|
|
}
|
|
if cn.PeerPieces == nil {
|
|
return nil
|
|
}
|
|
if len(cn.PeerPieces) == num {
|
|
} else if len(cn.PeerPieces) < num {
|
|
cn.PeerPieces = append(cn.PeerPieces, make([]bool, num-len(cn.PeerPieces))...)
|
|
} else if len(cn.PeerPieces) <= (num+7)/8*8 {
|
|
for _, have := range cn.PeerPieces[num:] {
|
|
if have {
|
|
return errors.New("peer has invalid piece")
|
|
}
|
|
}
|
|
cn.PeerPieces = cn.PeerPieces[:num]
|
|
} else {
|
|
return fmt.Errorf("peer bitfield is excessively long: expected %d, have %d", num, len(cn.PeerPieces))
|
|
}
|
|
if len(cn.PeerPieces) != num {
|
|
panic("wat")
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func eventAgeString(t time.Time) string {
|
|
if t.IsZero() {
|
|
return "never"
|
|
}
|
|
return fmt.Sprintf("%.2fs ago", time.Now().Sub(t).Seconds())
|
|
}
|
|
|
|
func (cn *connection) connectionFlags() (ret string) {
|
|
c := func(b byte) {
|
|
ret += string([]byte{b})
|
|
}
|
|
if cn.encrypted {
|
|
c('E')
|
|
}
|
|
if cn.Discovery != 0 {
|
|
c(byte(cn.Discovery))
|
|
}
|
|
if cn.uTP {
|
|
c('T')
|
|
}
|
|
return
|
|
}
|
|
|
|
// Inspired by https://trac.transmissionbt.com/wiki/PeerStatusText
|
|
func (cn *connection) statusFlags() (ret string) {
|
|
c := func(b byte) {
|
|
ret += string([]byte{b})
|
|
}
|
|
if cn.Interested {
|
|
c('i')
|
|
}
|
|
if cn.Choked {
|
|
c('c')
|
|
}
|
|
c('-')
|
|
ret += cn.connectionFlags()
|
|
c('-')
|
|
if cn.PeerInterested {
|
|
c('i')
|
|
}
|
|
if cn.PeerChoked {
|
|
c('c')
|
|
}
|
|
return
|
|
}
|
|
|
|
func (cn *connection) String() string {
|
|
var buf bytes.Buffer
|
|
cn.WriteStatus(&buf, nil)
|
|
return buf.String()
|
|
}
|
|
|
|
func (cn *connection) WriteStatus(w io.Writer, t *torrent) {
|
|
// \t isn't preserved in <pre> blocks?
|
|
fmt.Fprintf(w, "%+q: %s-%s\n", cn.PeerID, cn.localAddr(), cn.remoteAddr())
|
|
fmt.Fprintf(w, " last msg: %s, connected: %s, last useful chunk: %s\n",
|
|
eventAgeString(cn.lastMessageReceived),
|
|
eventAgeString(cn.completedHandshake),
|
|
eventAgeString(cn.lastUsefulChunkReceived))
|
|
fmt.Fprintf(w,
|
|
" %s completed, good chunks: %d/%d-%d reqq: %d-%d, flags: %s\n",
|
|
cn.completedString(t),
|
|
cn.UsefulChunksReceived,
|
|
cn.UnwantedChunksReceived+cn.UsefulChunksReceived,
|
|
cn.chunksSent,
|
|
len(cn.Requests),
|
|
len(cn.PeerRequests),
|
|
cn.statusFlags(),
|
|
)
|
|
}
|
|
|
|
func (c *connection) Close() {
|
|
c.mu.Lock()
|
|
defer c.mu.Unlock()
|
|
select {
|
|
case <-c.closing:
|
|
return
|
|
default:
|
|
}
|
|
close(c.closing)
|
|
// TODO: This call blocks sometimes, why?
|
|
go c.conn.Close()
|
|
}
|
|
|
|
func (c *connection) PeerHasPiece(piece int) bool {
|
|
if c.peerHasAll {
|
|
return true
|
|
}
|
|
if piece >= len(c.PeerPieces) {
|
|
return false
|
|
}
|
|
return c.PeerPieces[piece]
|
|
}
|
|
|
|
func (c *connection) Post(msg pp.Message) {
|
|
select {
|
|
case c.post <- msg:
|
|
case <-c.closing:
|
|
}
|
|
}
|
|
|
|
func (c *connection) RequestPending(r request) bool {
|
|
_, ok := c.Requests[r]
|
|
return ok
|
|
}
|
|
|
|
func (c *connection) requestMetadataPiece(index int) {
|
|
eID := c.PeerExtensionIDs["ut_metadata"]
|
|
if eID == 0 {
|
|
return
|
|
}
|
|
if index < len(c.metadataRequests) && c.metadataRequests[index] {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: pp.Extended,
|
|
ExtendedID: eID,
|
|
ExtendedPayload: func() []byte {
|
|
b, err := bencode.Marshal(map[string]int{
|
|
"msg_type": pp.RequestMetadataExtensionMsgType,
|
|
"piece": index,
|
|
})
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
return b
|
|
}(),
|
|
})
|
|
for index >= len(c.metadataRequests) {
|
|
c.metadataRequests = append(c.metadataRequests, false)
|
|
}
|
|
c.metadataRequests[index] = true
|
|
}
|
|
|
|
func (c *connection) requestedMetadataPiece(index int) bool {
|
|
return index < len(c.metadataRequests) && c.metadataRequests[index]
|
|
}
|
|
|
|
// Returns true if more requests can be sent.
|
|
func (c *connection) Request(chunk request) bool {
|
|
if len(c.Requests) >= c.PeerMaxRequests {
|
|
return false
|
|
}
|
|
if !c.PeerHasPiece(int(chunk.Index)) {
|
|
return true
|
|
}
|
|
if c.RequestPending(chunk) {
|
|
return true
|
|
}
|
|
c.SetInterested(true)
|
|
if c.PeerChoked {
|
|
return false
|
|
}
|
|
if c.Requests == nil {
|
|
c.Requests = make(map[request]struct{}, c.PeerMaxRequests)
|
|
}
|
|
c.Requests[chunk] = struct{}{}
|
|
c.requestsLowWater = len(c.Requests) / 2
|
|
c.Post(pp.Message{
|
|
Type: pp.Request,
|
|
Index: chunk.Index,
|
|
Begin: chunk.Begin,
|
|
Length: chunk.Length,
|
|
})
|
|
return true
|
|
}
|
|
|
|
// Returns true if an unsatisfied request was canceled.
|
|
func (c *connection) Cancel(r request) bool {
|
|
if c.Requests == nil {
|
|
return false
|
|
}
|
|
if _, ok := c.Requests[r]; !ok {
|
|
return false
|
|
}
|
|
delete(c.Requests, r)
|
|
c.Post(pp.Message{
|
|
Type: pp.Cancel,
|
|
Index: r.Index,
|
|
Begin: r.Begin,
|
|
Length: r.Length,
|
|
})
|
|
return true
|
|
}
|
|
|
|
// Returns true if an unsatisfied request was canceled.
|
|
func (c *connection) PeerCancel(r request) bool {
|
|
if c.PeerRequests == nil {
|
|
return false
|
|
}
|
|
if _, ok := c.PeerRequests[r]; !ok {
|
|
return false
|
|
}
|
|
delete(c.PeerRequests, r)
|
|
return true
|
|
}
|
|
|
|
func (c *connection) Choke() {
|
|
if c.Choked {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: pp.Choke,
|
|
})
|
|
c.PeerRequests = nil
|
|
c.Choked = true
|
|
}
|
|
|
|
func (c *connection) Unchoke() {
|
|
if !c.Choked {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: pp.Unchoke,
|
|
})
|
|
c.Choked = false
|
|
}
|
|
|
|
func (c *connection) SetInterested(interested bool) {
|
|
if c.Interested == interested {
|
|
return
|
|
}
|
|
c.Post(pp.Message{
|
|
Type: func() pp.MessageType {
|
|
if interested {
|
|
return pp.Interested
|
|
} else {
|
|
return pp.NotInterested
|
|
}
|
|
}(),
|
|
})
|
|
c.Interested = interested
|
|
}
|
|
|
|
var (
|
|
// Track connection writer buffer writes and flushes, to determine its
|
|
// efficiency.
|
|
connectionWriterFlush = expvar.NewInt("connectionWriterFlush")
|
|
connectionWriterWrite = expvar.NewInt("connectionWriterWrite")
|
|
)
|
|
|
|
// Writes buffers to the socket from the write channel.
|
|
func (conn *connection) writer() {
|
|
// Reduce write syscalls.
|
|
buf := bufio.NewWriter(conn.rw)
|
|
for {
|
|
if buf.Buffered() == 0 {
|
|
// There's nothing to write, so block until we get something.
|
|
select {
|
|
case b, ok := <-conn.writeCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
connectionWriterWrite.Add(1)
|
|
_, err := buf.Write(b)
|
|
if err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
case <-conn.closing:
|
|
return
|
|
}
|
|
} else {
|
|
// We already have something to write, so flush if there's nothing
|
|
// more to write.
|
|
select {
|
|
case b, ok := <-conn.writeCh:
|
|
if !ok {
|
|
return
|
|
}
|
|
connectionWriterWrite.Add(1)
|
|
_, err := buf.Write(b)
|
|
if err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
case <-conn.closing:
|
|
return
|
|
default:
|
|
connectionWriterFlush.Add(1)
|
|
err := buf.Flush()
|
|
if err != nil {
|
|
conn.Close()
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (conn *connection) writeOptimizer(keepAliveDelay time.Duration) {
|
|
defer close(conn.writeCh) // Responsible for notifying downstream routines.
|
|
pending := list.New() // Message queue.
|
|
var nextWrite []byte // Set to nil if we need to need to marshal the next message.
|
|
timer := time.NewTimer(keepAliveDelay)
|
|
defer timer.Stop()
|
|
lastWrite := time.Now()
|
|
for {
|
|
write := conn.writeCh // Set to nil if there's nothing to write.
|
|
if pending.Len() == 0 {
|
|
write = nil
|
|
} else if nextWrite == nil {
|
|
var err error
|
|
nextWrite, err = pending.Front().Value.(encoding.BinaryMarshaler).MarshalBinary()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}
|
|
event:
|
|
select {
|
|
case <-timer.C:
|
|
if pending.Len() != 0 {
|
|
break
|
|
}
|
|
keepAliveTime := lastWrite.Add(keepAliveDelay)
|
|
if time.Now().Before(keepAliveTime) {
|
|
timer.Reset(keepAliveTime.Sub(time.Now()))
|
|
break
|
|
}
|
|
pending.PushBack(pp.Message{Keepalive: true})
|
|
case msg, ok := <-conn.post:
|
|
if !ok {
|
|
return
|
|
}
|
|
if msg.Type == pp.Cancel {
|
|
for e := pending.Back(); e != nil; e = e.Prev() {
|
|
elemMsg := e.Value.(pp.Message)
|
|
if elemMsg.Type == pp.Request && msg.Index == elemMsg.Index && msg.Begin == elemMsg.Begin && msg.Length == elemMsg.Length {
|
|
pending.Remove(e)
|
|
optimizedCancels.Add(1)
|
|
break event
|
|
}
|
|
}
|
|
}
|
|
pending.PushBack(msg)
|
|
case write <- nextWrite:
|
|
pending.Remove(pending.Front())
|
|
nextWrite = nil
|
|
lastWrite = time.Now()
|
|
if pending.Len() == 0 {
|
|
timer.Reset(keepAliveDelay)
|
|
}
|
|
case <-conn.closing:
|
|
return
|
|
}
|
|
}
|
|
}
|