Fix the download/prioritize piece functions

This involves adding a pendingPieces field to torrent.
This commit is contained in:
Matt Joiner 2016-01-19 01:28:56 +11:00
parent df07d93330
commit 2f40c48d37
5 changed files with 116 additions and 90 deletions

114
client.go
View File

@ -10,7 +10,6 @@ import (
"expvar"
"fmt"
"io"
"io/ioutil"
"log"
"math/big"
mathRand "math/rand"
@ -29,6 +28,7 @@ import (
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/sync"
"github.com/anacrolix/utp"
"github.com/bradfitz/iter"
"github.com/edsrzf/mmap-go"
"github.com/anacrolix/torrent/bencode"
@ -1963,13 +1963,9 @@ func (t Torrent) AddPeers(pp []Peer) error {
func (t Torrent) DownloadAll() {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
for i := range iter.N(t.torrent.numPieces()) {
t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal)
for i := range iter.N(t.torrent.Info.NumPieces()) {
t.torrent.pendPiece(i, t.cl)
}
// Nice to have the first and last pieces sooner for various interactive
// purposes.
t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead)
t.cl.raisePiecePriority(t.torrent, t.torrent.numPieces()-1, PiecePriorityReadahead)
}
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
@ -2366,59 +2362,11 @@ func (me *Client) WaitAll() bool {
return true
}
func (me *Client) connAddRequest(c *connection, req request) (more bool) {
if len(c.Requests) >= 64 {
return false
}
more = c.Request(req)
return
}
func (me *Client) connRequestPiecePendingChunks(c *connection, t *torrent, piece int) (more bool) {
for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
req := request{pp.Integer(piece), cs}
if !me.connAddRequest(c, req) {
return false
}
}
return true
}
func (me *Client) fillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
if len(c.Requests) > c.requestsLowWater {
return
}
}
if !t.forUrgentPieces(func(piece int) (again bool) {
if !c.PeerHasPiece(piece) {
return true
}
return me.connRequestPiecePendingChunks(c, t, piece)
}) {
return
}
t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
for i := begin + 1; i < end; i++ {
if !c.PeerHasPiece(i) {
continue
}
if !me.connRequestPiecePendingChunks(c, t, i) {
return false
}
}
return true
})
}
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
me.fillRequests(t, c)
t.fillRequests(c)
if len(c.Requests) == 0 && !c.PeerChoked {
// So we're not choked, but we don't want anything right now. We may
// have completed readahead, and the readahead window has not rolled
@ -2547,32 +2495,44 @@ func (me *Client) pieceHashed(t *torrent, piece int, correct bool) {
me.pieceChanged(t, int(piece))
}
func (me *Client) onCompletedPiece(t *torrent, piece int) {
for _, conn := range t.Conns {
conn.Have(piece)
for r := range conn.Requests {
if int(r.Index) == piece {
conn.Cancel(r)
}
}
// Could check here if peer doesn't have piece, but due to caching
// some peers may have said they have a piece but they don't.
me.upload(t, conn)
}
}
func (me *Client) onFailedPiece(t *torrent, piece int) {
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
if !t.wantPiece(piece) {
return
}
me.openNewConns(t)
for _, conn := range t.Conns {
if conn.PeerHasPiece(piece) {
me.replenishConnRequests(t, conn)
}
}
}
func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece)
defer t.publishPieceChange(piece)
defer me.event.Broadcast()
if !correct {
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
if t.wantPiece(piece) {
me.openNewConns(t)
}
if correct {
me.onCompletedPiece(t, piece)
} else {
me.onFailedPiece(t, piece)
}
for _, conn := range t.Conns {
if correct {
conn.Have(piece)
for r := range conn.Requests {
if int(r.Index) == piece {
conn.Cancel(r)
}
}
me.upload(t, conn)
} else if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
me.replenishConnRequests(t, conn)
}
}
me.event.Broadcast()
}
func (cl *Client) verifyPiece(t *torrent, piece int) {

View File

@ -1,9 +1,10 @@
package torrent_test
import (
"io"
"log"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent"
)
@ -26,5 +27,5 @@ func Example_fileReader() {
defer r.Close()
// Access the parts of the torrent pertaining to f. Data will be
// downloaded as required, per the configuration of the torrent.Reader.
_ = io.NewSectionReader(r, f.Offset(), f.Length())
_ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length())
}

13
file.go
View File

@ -75,15 +75,6 @@ func (f *File) State() (ret []FilePieceState) {
return
}
// Marks pieces in the region of the file for download. This is a helper
// wrapping Torrent.SetRegionPriority.
func (f *File) PrioritizeRegion(off, len int64) {
if off < 0 || off >= f.length {
return
}
if off+len > f.length {
len = f.length - off
}
off += f.offset
f.t.SetRegionPriority(off, len)
func (f *File) Download() {
f.t.DownloadPieces(f.t.torrent.byteRegionPieces(f.offset, f.length))
}

8
t.go
View File

@ -137,3 +137,11 @@ func (t Torrent) deleteReader(r *Reader) {
delete(t.torrent.readers, r)
t.torrent.readersChanged(t.cl)
}
func (t Torrent) DownloadPieces(begin, end int) {
t.cl.mu.Lock()
defer t.cl.mu.Unlock()
for i := begin; i < end; i++ {
t.torrent.pendPiece(i, t.cl)
}
}

View File

@ -96,6 +96,8 @@ type torrent struct {
gotMetainfo chan struct{}
readers map[*Reader]struct{}
pendingPieces map[int]struct{}
}
var (
@ -860,6 +862,9 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) {
if t.pieceComplete(piece) {
return
}
if _, ok := t.pendingPieces[piece]; ok {
ret = PiecePriorityNormal
}
raiseRet := func(prio piecePriority) {
if prio > ret {
ret = prio
@ -876,3 +881,64 @@ func (t *torrent) piecePriority(piece int) (ret piecePriority) {
})
return
}
func (t *torrent) pendPiece(piece int, cl *Client) {
if t.pendingPieces == nil {
t.pendingPieces = make(map[int]struct{}, t.Info.NumPieces())
}
if _, ok := t.pendingPieces[piece]; ok {
return
}
if t.havePiece(piece) {
return
}
t.pendingPieces[piece] = struct{}{}
for _, c := range t.Conns {
if !c.PeerHasPiece(piece) {
continue
}
}
}
func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
if !c.PeerHasPiece(piece) {
return true
}
for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
req := request{pp.Integer(piece), cs}
if !c.Request(req) {
return false
}
}
return true
}
func (t *torrent) fillRequests(c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
if len(c.Requests) > c.requestsLowWater {
return
}
}
if !t.forUrgentPieces(func(piece int) (again bool) {
return t.connRequestPiecePendingChunks(c, piece)
}) {
return
}
t.forReaderWantedRegionPieces(func(begin, end int) (again bool) {
for i := begin + 1; i < end; i++ {
if !t.connRequestPiecePendingChunks(c, i) {
return false
}
}
return true
})
for i := range t.pendingPieces {
if !t.connRequestPiecePendingChunks(c, i) {
return
}
}
}