diff --git a/connection.go b/connection.go index c8e28920..753bc64f 100644 --- a/connection.go +++ b/connection.go @@ -514,6 +514,7 @@ func nextRequestState( } func (cn *connection) updateRequests() { + // log.Print("update requests") cn.tickleWriter() } @@ -640,7 +641,7 @@ func (cn *connection) updatePiecePriority(piece int) bool { prio += piece / 3 default: } - return cn.pieceRequestOrder.Set(piece, prio) + return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias() } func (cn *connection) getPieceInclination() []int { diff --git a/piece.go b/piece.go index fcec8623..4726faf7 100644 --- a/piece.go +++ b/piece.go @@ -14,10 +14,12 @@ import ( // Describes the importance of obtaining a particular piece. type piecePriority byte -func (pp *piecePriority) Raise(maybe piecePriority) { +func (pp *piecePriority) Raise(maybe piecePriority) bool { if maybe > *pp { *pp = maybe + return true } + return false } // Priority for use in PriorityBitmap @@ -200,3 +202,30 @@ func (p *Piece) torrentBeginOffset() int64 { func (p *Piece) torrentEndOffset() int64 { return p.torrentBeginOffset() + int64(p.length()) } + +func (p *Piece) SetPriority(prio piecePriority) { + p.t.cl.mu.Lock() + defer p.t.cl.mu.Unlock() + p.priority = prio + p.t.updatePiecePriority(p.index) +} + +func (p *Piece) uncachedPriority() (ret piecePriority) { + if p.t.pieceComplete(p.index) { + return PiecePriorityNone + } + for _, f := range p.files { + ret.Raise(f.prio) + } + if p.t.readerNowPieces.Contains(p.index) { + ret.Raise(PiecePriorityNow) + } + // if t.readerNowPieces.Contains(piece - 1) { + // return PiecePriorityNext + // } + if p.t.readerReadaheadPieces.Contains(p.index) { + ret.Raise(PiecePriorityReadahead) + } + ret.Raise(p.priority) + return +} diff --git a/reader.go b/reader.go index 00c3ff32..105e1f19 100644 --- a/reader.go +++ b/reader.go @@ -243,6 +243,7 @@ func (r *reader) posChanged() { return } r.pieces = to + // log.Printf("reader pos changed %v->%v", from, to) r.t.readerPosChanged(from, to) } diff --git a/t.go b/t.go index 4d288b7a..d8ab0a7f 100644 --- a/t.go +++ b/t.go @@ -149,13 +149,24 @@ func (t *Torrent) deleteReader(r *reader) { func (t *Torrent) DownloadPieces(begin, end int) { t.cl.mu.Lock() defer t.cl.mu.Unlock() - t.pendPieceRange(begin, end) + for i := begin; i < end; i++ { + if t.pieces[i].priority.Raise(PiecePriorityNormal) { + t.updatePiecePriority(i) + } + } } func (t *Torrent) CancelPieces(begin, end int) { t.cl.mu.Lock() defer t.cl.mu.Unlock() - t.unpendPieceRange(begin, end) + for i := begin; i < end; i++ { + p := &t.pieces[i] + if p.priority == PiecePriorityNone { + continue + } + p.priority = PiecePriorityNone + t.updatePiecePriority(i) + } } func (t *Torrent) initFiles() { @@ -189,11 +200,9 @@ func (t *Torrent) AddPeers(pp []Peer) { } // Marks the entire torrent for download. Requires the info first, see -// GotInfo. +// GotInfo. Sets piece priorities for historical reasons. func (t *Torrent) DownloadAll() { - t.cl.mu.Lock() - defer t.cl.mu.Unlock() - t.pendPieceRange(0, t.numPieces()) + t.DownloadPieces(0, t.numPieces()) } func (t *Torrent) String() string { diff --git a/torrent.go b/torrent.go index f6addaa2..cec8a045 100644 --- a/torrent.go +++ b/torrent.go @@ -117,8 +117,8 @@ type Torrent struct { readerNowPieces bitmap.Bitmap readerReadaheadPieces bitmap.Bitmap - // The indexes of pieces we want with normal priority, that aren't - // currently available. + // A cache of pieces we need to get. Calculated from various piece and + // file priorities and completion states elsewhere. pendingPieces prioritybitmap.PriorityBitmap // A cache of completed piece indices. completedPieces bitmap.Bitmap @@ -816,6 +816,7 @@ func (t *Torrent) wantPieceIndex(index int) bool { if t.pendingPieces.Contains(index) { return true } + // log.Printf("piece %d not pending", index) return !t.forReaderOffsetPieces(func(begin, end int) bool { return index < begin || index >= end }) @@ -911,8 +912,10 @@ func (t *Torrent) maybeNewConns() { } func (t *Torrent) piecePriorityChanged(piece int) { + // log.Printf("piece %d priority changed", piece) for c := range t.conns { if c.updatePiecePriority(piece) { + // log.Print("conn piece priority changed") c.updateRequests() } } @@ -922,11 +925,17 @@ func (t *Torrent) piecePriorityChanged(piece int) { func (t *Torrent) updatePiecePriority(piece int) { p := &t.pieces[piece] - newPrio := t.piecePriorityUncached(piece) - if newPrio == p.priority { - return + newPrio := p.uncachedPriority() + // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio) + if newPrio == PiecePriorityNone { + if !t.pendingPieces.Remove(piece) { + return + } + } else { + if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) { + return + } } - p.priority = newPrio t.piecePriorityChanged(piece) } @@ -979,63 +988,11 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all } func (t *Torrent) piecePriority(piece int) piecePriority { - if !t.haveInfo() { + prio, ok := t.pendingPieces.GetPriority(piece) + if !ok { return PiecePriorityNone } - return t.pieces[piece].priority -} - -func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) { - for _, f := range t.pieces[piece].files { - ret.Raise(f.prio) - } - if t.readerNowPieces.Contains(piece) { - ret.Raise(PiecePriorityNow) - } - // if t.readerNowPieces.Contains(piece - 1) { - // return PiecePriorityNext - // } - if t.readerReadaheadPieces.Contains(piece) { - ret.Raise(PiecePriorityReadahead) - } - if t.pendingPieces.Contains(piece) { - ret.Raise(PiecePriorityNormal) - } - if t.pieceComplete(piece) { - return PiecePriorityNone - } - return -} - -func (t *Torrent) pendPiece(piece int) { - if t.pendingPieces.Contains(piece) { - return - } - if t.havePiece(piece) { - return - } - t.pendingPieces.Set(piece, PiecePriorityNormal.BitmapPriority()) - t.updatePiecePriority(piece) -} - -func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) { - unpend.IterTyped(func(piece int) (more bool) { - t.pendingPieces.Remove(piece) - t.updatePiecePriority(piece) - return true - }) -} - -func (t *Torrent) pendPieceRange(begin, end int) { - for i := begin; i < end; i++ { - t.pendPiece(i) - } -} - -func (t *Torrent) unpendPieceRange(begin, end int) { - var bm bitmap.Bitmap - bm.AddRange(begin, end) - t.unpendPieces(bm) + return piecePriority(-prio) } func (t *Torrent) pendRequest(req request) { @@ -1078,6 +1035,8 @@ func (t *Torrent) updatePieceCompletion(piece int) { changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok p.storageCompletionOk = pcu.Ok t.completedPieces.Set(piece, pcu.Complete) + // log.Printf("piece %d uncached completion: %v", piece, pcu.Complete) + // log.Printf("piece %d changed: %v", piece, changed) if changed { t.pieceCompletionChanged(piece) } @@ -1144,13 +1103,7 @@ func (t *Torrent) needData() bool { if !t.haveInfo() { return true } - if t.pendingPieces.Len() != 0 { - return true - } - // Read as "not all complete". - return !t.readerPieces().IterTyped(func(piece int) bool { - return t.pieceComplete(piece) - }) + return t.pendingPieces.Len() != 0 } func appendMissingStrings(old, new []string) (ret []string) { @@ -1572,6 +1525,7 @@ func (t *Torrent) onIncompletePiece(piece int) { t.pendAllChunkSpecs(piece) } if !t.wantPieceIndex(piece) { + // log.Printf("piece %d incomplete and unwanted", piece) return } // We could drop any connections that we told we have a piece that we diff --git a/torrent_test.go b/torrent_test.go index 0f70cb97..3889aff2 100644 --- a/torrent_test.go +++ b/torrent_test.go @@ -93,10 +93,10 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) { r.Seek(3500000, 0) } assert.Len(b, t.readers, 7) - t.pendPieceRange(0, t.numPieces()) for i := 0; i < t.numPieces(); i += 3 { t.completedPieces.Set(i, true) } + t.DownloadPieces(0, t.numPieces()) for range iter.N(b.N) { t.updateAllPiecePriorities() }