It's working and the tests are usually passing
I still need to handle "prefetch"-style downloading, and some functions haven't been committed to force this issue.
This commit is contained in:
parent
8bbfcfcaa4
commit
06445f2a1e
203
client.go
203
client.go
|
@ -10,6 +10,7 @@ import (
|
|||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"log"
|
||||
"math/big"
|
||||
mathRand "math/rand"
|
||||
|
@ -28,13 +29,11 @@ import (
|
|||
"github.com/anacrolix/missinggo/pubsub"
|
||||
"github.com/anacrolix/sync"
|
||||
"github.com/anacrolix/utp"
|
||||
"github.com/bradfitz/iter"
|
||||
"github.com/edsrzf/mmap-go"
|
||||
|
||||
"github.com/anacrolix/torrent/bencode"
|
||||
filePkg "github.com/anacrolix/torrent/data/file"
|
||||
"github.com/anacrolix/torrent/dht"
|
||||
"github.com/anacrolix/torrent/internal/pieceordering"
|
||||
"github.com/anacrolix/torrent/iplist"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/mse"
|
||||
|
@ -276,47 +275,6 @@ func readaheadPieces(readahead, pieceLength int64) (ret int) {
|
|||
return
|
||||
}
|
||||
|
||||
func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
|
||||
index := int(off / int64(t.usualPieceSize()))
|
||||
cl.raisePiecePriority(t, index, PiecePriorityNow)
|
||||
index++
|
||||
if index >= t.numPieces() {
|
||||
return
|
||||
}
|
||||
cl.raisePiecePriority(t, index, PiecePriorityNext)
|
||||
for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
|
||||
index++
|
||||
if index >= t.numPieces() {
|
||||
break
|
||||
}
|
||||
cl.raisePiecePriority(t, index, PiecePriorityReadahead)
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
|
||||
for n > 0 {
|
||||
req, ok := t.offsetRequest(off)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
|
||||
if t.urgent == nil {
|
||||
t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize))
|
||||
}
|
||||
t.urgent[req] = struct{}{}
|
||||
cl.event.Broadcast() // Why?
|
||||
index := int(req.Index)
|
||||
cl.queueFirstHash(t, index)
|
||||
cl.pieceChanged(t, index)
|
||||
}
|
||||
reqOff := t.requestOffset(req)
|
||||
n1 := req.Length - pp.Integer(off-reqOff)
|
||||
off += int64(n1)
|
||||
n -= int(n1)
|
||||
}
|
||||
// log.Print(t.urgent)
|
||||
}
|
||||
|
||||
func (cl *Client) configDir() string {
|
||||
if cl.config.ConfigDir == "" {
|
||||
return filepath.Join(os.Getenv("HOME"), ".config/torrent")
|
||||
|
@ -330,30 +288,6 @@ func (cl *Client) ConfigDir() string {
|
|||
return cl.configDir()
|
||||
}
|
||||
|
||||
func (t *torrent) connPendPiece(c *connection, piece int) {
|
||||
c.pendPiece(piece, t.Pieces[piece].Priority, t)
|
||||
}
|
||||
|
||||
func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
|
||||
if t.Pieces[piece].Priority < priority {
|
||||
cl.prioritizePiece(t, piece, priority)
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
|
||||
if t.havePiece(piece) {
|
||||
priority = PiecePriorityNone
|
||||
}
|
||||
if priority != PiecePriorityNone {
|
||||
cl.queueFirstHash(t, piece)
|
||||
}
|
||||
p := &t.Pieces[piece]
|
||||
if p.Priority != priority {
|
||||
p.Priority = priority
|
||||
cl.pieceChanged(t, piece)
|
||||
}
|
||||
}
|
||||
|
||||
func loadPackedBlocklist(filename string) (ret iplist.Ranger, err error) {
|
||||
f, err := os.Open(filename)
|
||||
if os.IsNotExist(err) {
|
||||
|
@ -1169,9 +1103,6 @@ func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
|
|||
go c.writer()
|
||||
go c.writeOptimizer(time.Minute)
|
||||
cl.sendInitialMessages(c, t)
|
||||
if t.haveInfo() {
|
||||
t.initRequestOrdering(c)
|
||||
}
|
||||
err = cl.connectionLoop(t, c)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error during connection loop: %s", err)
|
||||
|
@ -1237,26 +1168,6 @@ func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
|
|||
}
|
||||
}
|
||||
|
||||
// Randomizes the piece order for this connection. Every connection will be
|
||||
// given a different ordering. Having it stored per connection saves having to
|
||||
// randomize during request filling, and constantly recalculate the ordering
|
||||
// based on piece priorities.
|
||||
func (t *torrent) initRequestOrdering(c *connection) {
|
||||
if c.pieceRequestOrder != nil || c.piecePriorities != nil {
|
||||
panic("double init of request ordering")
|
||||
}
|
||||
c.pieceRequestOrder = pieceordering.New()
|
||||
for i := range iter.N(t.Info.NumPieces()) {
|
||||
if !c.PeerHasPiece(i) {
|
||||
continue
|
||||
}
|
||||
if !t.wantPiece(i) {
|
||||
continue
|
||||
}
|
||||
t.connPendPiece(c, i)
|
||||
}
|
||||
}
|
||||
|
||||
func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
|
||||
if !c.peerHasAll {
|
||||
if t.haveInfo() {
|
||||
|
@ -1274,7 +1185,6 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
|
|||
c.PeerPieces[piece] = true
|
||||
}
|
||||
if t.wantPiece(piece) {
|
||||
t.connPendPiece(c, piece)
|
||||
me.replenishConnRequests(t, c)
|
||||
}
|
||||
return nil
|
||||
|
@ -1721,12 +1631,6 @@ func (me *Client) deleteConnection(t *torrent, c *connection) bool {
|
|||
func (me *Client) dropConnection(t *torrent, c *connection) {
|
||||
me.event.Broadcast()
|
||||
c.Close()
|
||||
if c.piecePriorities != nil {
|
||||
t.connPiecePriorites.Put(c.piecePriorities)
|
||||
// I wonder if it's safe to set it to nil. Probably not. Since it's
|
||||
// only read, it doesn't particularly matter if a closing connection
|
||||
// shares the slice with another connection.
|
||||
}
|
||||
if me.deleteConnection(t, c) {
|
||||
me.openNewConns(t)
|
||||
}
|
||||
|
@ -1774,16 +1678,14 @@ func (t *torrent) needData() bool {
|
|||
if !t.haveInfo() {
|
||||
return true
|
||||
}
|
||||
if len(t.urgent) != 0 {
|
||||
return true
|
||||
}
|
||||
for i := range t.Pieces {
|
||||
p := &t.Pieces[i]
|
||||
if p.Priority != PiecePriorityNone {
|
||||
return true
|
||||
return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
|
||||
for i := begin; i < end; i++ {
|
||||
if !t.pieceComplete(i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return false
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (cl *Client) usefulConn(t *torrent, c *connection) bool {
|
||||
|
@ -2048,16 +1950,6 @@ func (t Torrent) Files() (ret []File) {
|
|||
return
|
||||
}
|
||||
|
||||
// Marks the pieces in the given region for download.
|
||||
func (t Torrent) SetRegionPriority(off, len int64) {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
pieceSize := int64(t.torrent.usualPieceSize())
|
||||
for i := off / pieceSize; i*pieceSize < off+len; i++ {
|
||||
t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal)
|
||||
}
|
||||
}
|
||||
|
||||
func (t Torrent) AddPeers(pp []Peer) error {
|
||||
cl := t.cl
|
||||
cl.mu.Lock()
|
||||
|
@ -2474,6 +2366,24 @@ func (me *Client) WaitAll() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (me *Client) connAddRequest(c *connection, req request) (more bool) {
|
||||
if len(c.Requests) >= 64 {
|
||||
return false
|
||||
}
|
||||
more = c.Request(req)
|
||||
return
|
||||
}
|
||||
|
||||
func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) {
|
||||
for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
|
||||
req := request{pp.Integer(piece), cs}
|
||||
if !me.connAddRequest(c, req) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (me *Client) fillRequests(t *torrent, c *connection) {
|
||||
if c.Interested {
|
||||
if c.PeerChoked {
|
||||
|
@ -2483,37 +2393,25 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
|
|||
return
|
||||
}
|
||||
}
|
||||
addRequest := func(req request) (again bool) {
|
||||
// TODO: Couldn't this check also be done *after* the request?
|
||||
if len(c.Requests) >= 64 {
|
||||
return false
|
||||
if !t.forUrgentPieces(func(piece int) (again bool) {
|
||||
if !c.PeerHasPiece(piece) {
|
||||
return true
|
||||
}
|
||||
return c.Request(req)
|
||||
return me.connRequestPiecePendingChunks(c, t, piece)
|
||||
}) {
|
||||
return
|
||||
}
|
||||
for req := range t.urgent {
|
||||
if !addRequest(req) {
|
||||
return
|
||||
}
|
||||
}
|
||||
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
|
||||
pieceIndex := e.Piece()
|
||||
if !c.PeerHasPiece(pieceIndex) {
|
||||
panic("piece in request order but peer doesn't have it")
|
||||
}
|
||||
if !t.wantPiece(pieceIndex) {
|
||||
log.Printf("unwanted piece %d in connection request order\n%s", pieceIndex, c)
|
||||
c.pieceRequestOrder.DeletePiece(pieceIndex)
|
||||
continue
|
||||
}
|
||||
piece := &t.Pieces[pieceIndex]
|
||||
for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) {
|
||||
r := request{pp.Integer(pieceIndex), cs}
|
||||
if !addRequest(r) {
|
||||
return
|
||||
t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
|
||||
for i := begin + 1; i < end; i++ {
|
||||
if !c.PeerHasPiece(i) {
|
||||
continue
|
||||
}
|
||||
if !me.connRequestPiecePendingChunks(c, t, i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
|
||||
|
@ -2562,6 +2460,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
piece.pendingWrites++
|
||||
piece.pendingWritesMutex.Unlock()
|
||||
go func() {
|
||||
defer me.event.Broadcast()
|
||||
defer func() {
|
||||
piece.pendingWritesMutex.Lock()
|
||||
piece.pendingWrites--
|
||||
|
@ -2591,17 +2490,11 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
defer t.publishPieceChange(int(req.Index))
|
||||
// Record that we have the chunk.
|
||||
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
|
||||
delete(t.urgent, req)
|
||||
// It's important that the piece is potentially queued before we check if
|
||||
// the piece is still wanted, because if it is queued, it won't be wanted.
|
||||
if t.pieceAllDirty(index) {
|
||||
me.queuePieceCheck(t, int(req.Index))
|
||||
}
|
||||
if !t.wantPiece(int(req.Index)) {
|
||||
for _, c := range t.Conns {
|
||||
c.pieceRequestOrder.DeletePiece(int(req.Index))
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel pending requests for this chunk.
|
||||
for _, c := range t.Conns {
|
||||
|
@ -2656,17 +2549,9 @@ func (me *Client) pieceHashed(t *torrent, piece int, correct bool) {
|
|||
|
||||
func (me *Client) pieceChanged(t *torrent, piece int) {
|
||||
correct := t.pieceComplete(piece)
|
||||
p := &t.Pieces[piece]
|
||||
defer t.publishPieceChange(piece)
|
||||
defer me.event.Broadcast()
|
||||
if correct {
|
||||
p.Priority = PiecePriorityNone
|
||||
for req := range t.urgent {
|
||||
if int(req.Index) == piece {
|
||||
delete(t.urgent, req)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
if !correct {
|
||||
if t.pieceAllDirty(piece) {
|
||||
t.pendAllChunkSpecs(piece)
|
||||
}
|
||||
|
@ -2682,10 +2567,8 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
|
|||
conn.Cancel(r)
|
||||
}
|
||||
}
|
||||
conn.pieceRequestOrder.DeletePiece(int(piece))
|
||||
me.upload(t, conn)
|
||||
} else if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
|
||||
t.connPendPiece(conn, int(piece))
|
||||
me.replenishConnRequests(t, conn)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -529,10 +529,14 @@ func TestResponsive(t *testing.T) {
|
|||
reader.SetReadahead(0)
|
||||
reader.SetResponsive()
|
||||
b := make([]byte, 2)
|
||||
_, err = reader.ReadAt(b, 3)
|
||||
_, err = reader.Seek(3, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
_, err = io.ReadFull(reader, b)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, "lo", string(b))
|
||||
n, err := reader.ReadAt(b, 11)
|
||||
_, err = reader.Seek(11, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
n, err := io.ReadFull(reader, b)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, 2, n)
|
||||
assert.EqualValues(t, "d\n", string(b))
|
||||
|
@ -571,11 +575,15 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
|
|||
reader.SetReadahead(0)
|
||||
reader.SetResponsive()
|
||||
b := make([]byte, 2)
|
||||
_, err = reader.ReadAt(b, 3)
|
||||
_, err = reader.Seek(3, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
_, err = io.ReadFull(reader, b)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, "lo", string(b))
|
||||
go leecherTorrent.Drop()
|
||||
n, err := reader.ReadAt(b, 11)
|
||||
_, err = reader.Seek(11, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
n, err := reader.Read(b)
|
||||
assert.EqualError(t, err, "torrent closed")
|
||||
assert.EqualValues(t, 0, n)
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/anacrolix/torrent/bencode"
|
||||
"github.com/anacrolix/torrent/internal/pieceordering"
|
||||
pp "github.com/anacrolix/torrent/peer_protocol"
|
||||
)
|
||||
|
||||
|
@ -41,12 +40,6 @@ type connection struct {
|
|||
post chan pp.Message
|
||||
writeCh chan []byte
|
||||
|
||||
// The connection's preferred order to download pieces. The index is the
|
||||
// piece, the value is its priority.
|
||||
piecePriorities []int
|
||||
// The piece request order based on piece priorities.
|
||||
pieceRequestOrder *pieceordering.Instance
|
||||
|
||||
UnwantedChunksReceived int
|
||||
UsefulChunksReceived int
|
||||
chunksSent int
|
||||
|
@ -105,42 +98,6 @@ func (cn *connection) localAddr() net.Addr {
|
|||
return cn.conn.LocalAddr()
|
||||
}
|
||||
|
||||
// Adjust piece position in the request order for this connection based on the
|
||||
// given piece priority.
|
||||
func (cn *connection) pendPiece(piece int, priority piecePriority, t *torrent) {
|
||||
if priority == PiecePriorityNone {
|
||||
cn.pieceRequestOrder.DeletePiece(piece)
|
||||
return
|
||||
}
|
||||
if cn.piecePriorities == nil {
|
||||
cn.piecePriorities = t.newConnPiecePriorities()
|
||||
}
|
||||
pp := cn.piecePriorities[piece]
|
||||
// Priority regions not to scale. Within each region, piece is randomized
|
||||
// according to connection.
|
||||
|
||||
// <-request first -- last->
|
||||
// [ Now ]
|
||||
// [ Next ]
|
||||
// [ Readahead ]
|
||||
// [ Normal ]
|
||||
key := func() int {
|
||||
switch priority {
|
||||
case PiecePriorityNow:
|
||||
return -3*len(cn.piecePriorities) + 3*pp
|
||||
case PiecePriorityNext:
|
||||
return -2*len(cn.piecePriorities) + 2*pp
|
||||
case PiecePriorityReadahead:
|
||||
return -len(cn.piecePriorities) + pp
|
||||
case PiecePriorityNormal:
|
||||
return pp
|
||||
default:
|
||||
panic(priority)
|
||||
}
|
||||
}()
|
||||
cn.pieceRequestOrder.SetPiece(piece, key)
|
||||
}
|
||||
|
||||
func (cn *connection) supportsExtension(ext string) bool {
|
||||
_, ok := cn.PeerExtensionIDs[ext]
|
||||
return ok
|
||||
|
|
|
@ -4,7 +4,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bradfitz/iter"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/anacrolix/torrent/internal/pieceordering"
|
||||
|
@ -63,38 +62,3 @@ func pieceOrderingAsSlice(po *pieceordering.Instance) (ret []int) {
|
|||
func testRequestOrder(expected []int, ro *pieceordering.Instance, t *testing.T) {
|
||||
assert.EqualValues(t, pieceOrderingAsSlice(ro), expected)
|
||||
}
|
||||
|
||||
// Tests the request ordering based on a connections priorities.
|
||||
func TestPieceRequestOrder(t *testing.T) {
|
||||
c := connection{
|
||||
pieceRequestOrder: pieceordering.New(),
|
||||
piecePriorities: []int{1, 4, 0, 3, 2},
|
||||
}
|
||||
testRequestOrder(nil, c.pieceRequestOrder, t)
|
||||
c.pendPiece(2, PiecePriorityNone, nil)
|
||||
testRequestOrder(nil, c.pieceRequestOrder, t)
|
||||
c.pendPiece(1, PiecePriorityNormal, nil)
|
||||
c.pendPiece(2, PiecePriorityNormal, nil)
|
||||
testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(0, PiecePriorityNormal, nil)
|
||||
testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(1, PiecePriorityReadahead, nil)
|
||||
testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(4, PiecePriorityNow, nil)
|
||||
// now(4), r(1), normal(0, 2)
|
||||
testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(2, PiecePriorityReadahead, nil)
|
||||
// N(4), R(1, 2), N(0)
|
||||
testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(1, PiecePriorityNow, nil)
|
||||
// now(4, 1), readahead(2), normal(0)
|
||||
// in the same order, the keys will be: -15+6, -15+12, -5, 1
|
||||
// so we test that a very low priority (for this connection), "now"
|
||||
// piece has been placed after a readahead piece.
|
||||
testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
|
||||
// Note this intentially sets to None a piece that's not in the order.
|
||||
for i := range iter.N(5) {
|
||||
c.pendPiece(i, PiecePriorityNone, nil)
|
||||
}
|
||||
testRequestOrder(nil, c.pieceRequestOrder, t)
|
||||
}
|
||||
|
|
1
piece.go
1
piece.go
|
@ -30,7 +30,6 @@ type piece struct {
|
|||
Hashing bool
|
||||
QueuedForHash bool
|
||||
EverHashed bool
|
||||
Priority piecePriority
|
||||
PublicPieceState PieceState
|
||||
|
||||
pendingWritesMutex sync.Mutex
|
||||
|
|
49
reader.go
49
reader.go
|
@ -4,11 +4,14 @@ import (
|
|||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Accesses torrent data via a client.
|
||||
type Reader struct {
|
||||
t *Torrent
|
||||
t *Torrent
|
||||
|
||||
mu sync.Mutex
|
||||
pos int64
|
||||
responsive bool
|
||||
readahead int64
|
||||
|
@ -25,18 +28,11 @@ func (r *Reader) SetResponsive() {
|
|||
// Configure the number of bytes ahead of a read that should also be
|
||||
// prioritized in preparation for further reads.
|
||||
func (r *Reader) SetReadahead(readahead int64) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.readahead = readahead
|
||||
}
|
||||
|
||||
func (r *Reader) raisePriorities(off int64, n int) {
|
||||
if r.responsive {
|
||||
r.t.cl.addUrgentRequests(r.t.torrent, off, n)
|
||||
}
|
||||
if !r.responsive || r.readahead != 0 {
|
||||
r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reader) readable(off int64) (ret bool) {
|
||||
// log.Println("readable", off)
|
||||
// defer func() {
|
||||
|
@ -81,22 +77,15 @@ func (r *Reader) waitReadable(off int64) {
|
|||
r.t.cl.event.Wait()
|
||||
}
|
||||
|
||||
func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
for {
|
||||
var n1 int
|
||||
n1, err = r.readAt(b, off)
|
||||
n += n1
|
||||
b = b[n1:]
|
||||
off += int64(n1)
|
||||
if err != nil || len(b) == 0 || n1 == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reader) Read(b []byte) (n int, err error) {
|
||||
n, err = r.readAt(b, r.pos)
|
||||
r.mu.Lock()
|
||||
pos := r.pos
|
||||
r.mu.Unlock()
|
||||
n, err = r.readAt(b, pos)
|
||||
r.mu.Lock()
|
||||
r.pos += int64(n)
|
||||
r.mu.Unlock()
|
||||
r.posChanged()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -115,9 +104,7 @@ func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
|
|||
}
|
||||
again:
|
||||
r.t.cl.mu.Lock()
|
||||
r.raisePriorities(pos, len(b))
|
||||
for !r.readable(pos) {
|
||||
r.raisePriorities(pos, len(b))
|
||||
r.waitReadable(pos)
|
||||
}
|
||||
avail := r.available(pos, int64(len(b)))
|
||||
|
@ -154,11 +141,19 @@ again:
|
|||
}
|
||||
|
||||
func (r *Reader) Close() error {
|
||||
r.t.deleteReader(r)
|
||||
r.t = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reader) posChanged() {
|
||||
r.t.cl.mu.Lock()
|
||||
defer r.t.cl.mu.Unlock()
|
||||
r.t.torrent.readersChanged(r.t.cl)
|
||||
}
|
||||
|
||||
func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
|
||||
r.mu.Lock()
|
||||
switch whence {
|
||||
case os.SEEK_SET:
|
||||
r.pos = off
|
||||
|
@ -170,5 +165,7 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
|
|||
err = errors.New("bad whence")
|
||||
}
|
||||
ret = r.pos
|
||||
r.mu.Unlock()
|
||||
r.posChanged()
|
||||
return
|
||||
}
|
||||
|
|
18
t.go
18
t.go
|
@ -41,6 +41,7 @@ func (t Torrent) NewReader() (ret *Reader) {
|
|||
t: &t,
|
||||
readahead: 5 * 1024 * 1024,
|
||||
}
|
||||
t.addReader(ret)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -119,3 +120,20 @@ func (t Torrent) MetaInfo() *metainfo.MetaInfo {
|
|||
defer t.cl.mu.Unlock()
|
||||
return t.torrent.MetaInfo()
|
||||
}
|
||||
|
||||
func (t Torrent) addReader(r *Reader) {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
if t.torrent.readers == nil {
|
||||
t.torrent.readers = make(map[*Reader]struct{})
|
||||
}
|
||||
t.torrent.readers[r] = struct{}{}
|
||||
t.torrent.readersChanged(t.cl)
|
||||
}
|
||||
|
||||
func (t Torrent) deleteReader(r *Reader) {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
delete(t.torrent.readers, r)
|
||||
t.torrent.readersChanged(t.cl)
|
||||
}
|
||||
|
|
132
torrent.go
132
torrent.go
|
@ -6,7 +6,6 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sort"
|
||||
"sync"
|
||||
|
@ -62,9 +61,6 @@ type torrent struct {
|
|||
// Values are the piece indices that changed.
|
||||
pieceStateChanges *pubsub.PubSub
|
||||
chunkSize pp.Integer
|
||||
// Chunks that are wanted before all others. This is for
|
||||
// responsive/streaming readers that want to unblock ASAP.
|
||||
urgent map[request]struct{}
|
||||
// Total length of the torrent in bytes. Stored because it's not O(1) to
|
||||
// get this from the info dict.
|
||||
length int64
|
||||
|
@ -99,7 +95,7 @@ type torrent struct {
|
|||
// Closed when .Info is set.
|
||||
gotMetainfo chan struct{}
|
||||
|
||||
connPiecePriorites sync.Pool
|
||||
readers map[*Reader]struct{}
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -111,16 +107,6 @@ func (t *torrent) setDisplayName(dn string) {
|
|||
t.displayName = dn
|
||||
}
|
||||
|
||||
func (t *torrent) newConnPiecePriorities() []int {
|
||||
_ret := t.connPiecePriorites.Get()
|
||||
if _ret != nil {
|
||||
piecePrioritiesReused.Add(1)
|
||||
return _ret.([]int)
|
||||
}
|
||||
piecePrioritiesNew.Add(1)
|
||||
return rand.Perm(t.numPieces())
|
||||
}
|
||||
|
||||
func (t *torrent) pieceComplete(piece int) bool {
|
||||
// TODO: This is called when setting metadata, and before storage is
|
||||
// assigned, which doesn't seem right.
|
||||
|
@ -261,7 +247,6 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
|
|||
missinggo.CopyExact(piece.Hash[:], hash)
|
||||
}
|
||||
for _, conn := range t.Conns {
|
||||
t.initRequestOrdering(conn)
|
||||
if err := conn.setNumPieces(t.numPieces()); err != nil {
|
||||
log.Printf("closing connection: %s", err)
|
||||
conn.Close()
|
||||
|
@ -324,7 +309,7 @@ func (t *torrent) Name() string {
|
|||
|
||||
func (t *torrent) pieceState(index int) (ret PieceState) {
|
||||
p := &t.Pieces[index]
|
||||
ret.Priority = p.Priority
|
||||
ret.Priority = t.piecePriority(index)
|
||||
if t.pieceComplete(index) {
|
||||
ret.Complete = true
|
||||
}
|
||||
|
@ -436,9 +421,10 @@ func (t *torrent) writeStatus(w io.Writer, cl *Client) {
|
|||
fmt.Fprintln(w)
|
||||
}
|
||||
fmt.Fprintf(w, "Urgent:")
|
||||
for req := range t.urgent {
|
||||
fmt.Fprintf(w, " %v", req)
|
||||
}
|
||||
t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
|
||||
fmt.Fprintf(w, " %d:%d", begin, end)
|
||||
return true
|
||||
})
|
||||
fmt.Fprintln(w)
|
||||
fmt.Fprintf(w, "Trackers: ")
|
||||
for _, tier := range t.Trackers {
|
||||
|
@ -728,17 +714,8 @@ func (t *torrent) wantChunk(r request) bool {
|
|||
if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
|
||||
return true
|
||||
}
|
||||
_, ok := t.urgent[r]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (t *torrent) urgentChunkInPiece(piece int) bool {
|
||||
p := pp.Integer(piece)
|
||||
for req := range t.urgent {
|
||||
if req.Index == p {
|
||||
return true
|
||||
}
|
||||
}
|
||||
// TODO: What about pieces that were wanted, but aren't now, and aren't
|
||||
// completed either? That used to be done here.
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -754,18 +731,21 @@ func (t *torrent) wantPiece(index int) bool {
|
|||
if p.Hashing {
|
||||
return false
|
||||
}
|
||||
if p.Priority == PiecePriorityNone {
|
||||
if !t.urgentChunkInPiece(index) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Put piece complete check last, since it's the slowest as it can involve
|
||||
// calling out into external data stores.
|
||||
return !t.pieceComplete(index)
|
||||
}
|
||||
|
||||
func (t *torrent) connHasWantedPieces(c *connection) bool {
|
||||
return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty()
|
||||
return !t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
|
||||
for i := begin; i < end; i++ {
|
||||
if c.PeerHasPiece(i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
|
||||
|
@ -818,3 +798,81 @@ func (t *torrent) pieceAllDirty(piece int) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
|
||||
return t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
|
||||
if begin < end {
|
||||
if !f(begin) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (t *torrent) readersChanged(cl *Client) {
|
||||
for _, c := range t.Conns {
|
||||
cl.replenishConnRequests(t, c)
|
||||
}
|
||||
cl.openNewConns(t)
|
||||
}
|
||||
|
||||
func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
|
||||
if off >= t.length {
|
||||
return
|
||||
}
|
||||
if off < 0 {
|
||||
size += off
|
||||
off = 0
|
||||
}
|
||||
if size <= 0 {
|
||||
return
|
||||
}
|
||||
begin = int(off / t.Info.PieceLength)
|
||||
end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
|
||||
if end > t.Info.NumPieces() {
|
||||
end = t.Info.NumPieces()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (t *torrent) forReaderWantedRegionPieces(f func(begin, end int) (more bool)) (all bool) {
|
||||
for r := range t.readers {
|
||||
r.mu.Lock()
|
||||
pos, readahead := r.pos, r.readahead
|
||||
r.mu.Unlock()
|
||||
if readahead < 1 {
|
||||
readahead = 1
|
||||
}
|
||||
begin, end := t.byteRegionPieces(pos, readahead)
|
||||
if begin >= end {
|
||||
continue
|
||||
}
|
||||
if !f(begin, end) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *torrent) piecePriority(piece int) (ret piecePriority) {
|
||||
ret = PiecePriorityNone
|
||||
if t.pieceComplete(piece) {
|
||||
return
|
||||
}
|
||||
raiseRet := func(prio piecePriority) {
|
||||
if prio > ret {
|
||||
ret = prio
|
||||
}
|
||||
}
|
||||
t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
|
||||
if piece == begin {
|
||||
raiseRet(PiecePriorityNow)
|
||||
}
|
||||
if begin <= piece && piece < end {
|
||||
raiseRet(PiecePriorityReadahead)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue