Add separate piece priorities, and cache pending pieces

Should fix a bug where prioritizing files is not alone sufficient to trigger downloading.
This commit is contained in:
Matt Joiner 2018-01-25 17:18:36 +11:00
parent e3c098441a
commit 52524925d2
6 changed files with 71 additions and 77 deletions

View File

@ -514,6 +514,7 @@ func nextRequestState(
} }
func (cn *connection) updateRequests() { func (cn *connection) updateRequests() {
// log.Print("update requests")
cn.tickleWriter() cn.tickleWriter()
} }
@ -640,7 +641,7 @@ func (cn *connection) updatePiecePriority(piece int) bool {
prio += piece / 3 prio += piece / 3
default: default:
} }
return cn.pieceRequestOrder.Set(piece, prio) return cn.pieceRequestOrder.Set(piece, prio) || cn.shouldRequestWithoutBias()
} }
func (cn *connection) getPieceInclination() []int { func (cn *connection) getPieceInclination() []int {

View File

@ -14,10 +14,12 @@ import (
// Describes the importance of obtaining a particular piece. // Describes the importance of obtaining a particular piece.
type piecePriority byte type piecePriority byte
func (pp *piecePriority) Raise(maybe piecePriority) { func (pp *piecePriority) Raise(maybe piecePriority) bool {
if maybe > *pp { if maybe > *pp {
*pp = maybe *pp = maybe
return true
} }
return false
} }
// Priority for use in PriorityBitmap // Priority for use in PriorityBitmap
@ -200,3 +202,30 @@ func (p *Piece) torrentBeginOffset() int64 {
func (p *Piece) torrentEndOffset() int64 { func (p *Piece) torrentEndOffset() int64 {
return p.torrentBeginOffset() + int64(p.length()) return p.torrentBeginOffset() + int64(p.length())
} }
func (p *Piece) SetPriority(prio piecePriority) {
p.t.cl.mu.Lock()
defer p.t.cl.mu.Unlock()
p.priority = prio
p.t.updatePiecePriority(p.index)
}
func (p *Piece) uncachedPriority() (ret piecePriority) {
if p.t.pieceComplete(p.index) {
return PiecePriorityNone
}
for _, f := range p.files {
ret.Raise(f.prio)
}
if p.t.readerNowPieces.Contains(p.index) {
ret.Raise(PiecePriorityNow)
}
// if t.readerNowPieces.Contains(piece - 1) {
// return PiecePriorityNext
// }
if p.t.readerReadaheadPieces.Contains(p.index) {
ret.Raise(PiecePriorityReadahead)
}
ret.Raise(p.priority)
return
}

View File

@ -243,6 +243,7 @@ func (r *reader) posChanged() {
return return
} }
r.pieces = to r.pieces = to
// log.Printf("reader pos changed %v->%v", from, to)
r.t.readerPosChanged(from, to) r.t.readerPosChanged(from, to)
} }

21
t.go
View File

@ -149,13 +149,24 @@ func (t *Torrent) deleteReader(r *reader) {
func (t *Torrent) DownloadPieces(begin, end int) { func (t *Torrent) DownloadPieces(begin, end int) {
t.cl.mu.Lock() t.cl.mu.Lock()
defer t.cl.mu.Unlock() defer t.cl.mu.Unlock()
t.pendPieceRange(begin, end) for i := begin; i < end; i++ {
if t.pieces[i].priority.Raise(PiecePriorityNormal) {
t.updatePiecePriority(i)
}
}
} }
func (t *Torrent) CancelPieces(begin, end int) { func (t *Torrent) CancelPieces(begin, end int) {
t.cl.mu.Lock() t.cl.mu.Lock()
defer t.cl.mu.Unlock() defer t.cl.mu.Unlock()
t.unpendPieceRange(begin, end) for i := begin; i < end; i++ {
p := &t.pieces[i]
if p.priority == PiecePriorityNone {
continue
}
p.priority = PiecePriorityNone
t.updatePiecePriority(i)
}
} }
func (t *Torrent) initFiles() { func (t *Torrent) initFiles() {
@ -189,11 +200,9 @@ func (t *Torrent) AddPeers(pp []Peer) {
} }
// Marks the entire torrent for download. Requires the info first, see // Marks the entire torrent for download. Requires the info first, see
// GotInfo. // GotInfo. Sets piece priorities for historical reasons.
func (t *Torrent) DownloadAll() { func (t *Torrent) DownloadAll() {
t.cl.mu.Lock() t.DownloadPieces(0, t.numPieces())
defer t.cl.mu.Unlock()
t.pendPieceRange(0, t.numPieces())
} }
func (t *Torrent) String() string { func (t *Torrent) String() string {

View File

@ -117,8 +117,8 @@ type Torrent struct {
readerNowPieces bitmap.Bitmap readerNowPieces bitmap.Bitmap
readerReadaheadPieces bitmap.Bitmap readerReadaheadPieces bitmap.Bitmap
// The indexes of pieces we want with normal priority, that aren't // A cache of pieces we need to get. Calculated from various piece and
// currently available. // file priorities and completion states elsewhere.
pendingPieces prioritybitmap.PriorityBitmap pendingPieces prioritybitmap.PriorityBitmap
// A cache of completed piece indices. // A cache of completed piece indices.
completedPieces bitmap.Bitmap completedPieces bitmap.Bitmap
@ -816,6 +816,7 @@ func (t *Torrent) wantPieceIndex(index int) bool {
if t.pendingPieces.Contains(index) { if t.pendingPieces.Contains(index) {
return true return true
} }
// log.Printf("piece %d not pending", index)
return !t.forReaderOffsetPieces(func(begin, end int) bool { return !t.forReaderOffsetPieces(func(begin, end int) bool {
return index < begin || index >= end return index < begin || index >= end
}) })
@ -911,8 +912,10 @@ func (t *Torrent) maybeNewConns() {
} }
func (t *Torrent) piecePriorityChanged(piece int) { func (t *Torrent) piecePriorityChanged(piece int) {
// log.Printf("piece %d priority changed", piece)
for c := range t.conns { for c := range t.conns {
if c.updatePiecePriority(piece) { if c.updatePiecePriority(piece) {
// log.Print("conn piece priority changed")
c.updateRequests() c.updateRequests()
} }
} }
@ -922,11 +925,17 @@ func (t *Torrent) piecePriorityChanged(piece int) {
func (t *Torrent) updatePiecePriority(piece int) { func (t *Torrent) updatePiecePriority(piece int) {
p := &t.pieces[piece] p := &t.pieces[piece]
newPrio := t.piecePriorityUncached(piece) newPrio := p.uncachedPriority()
if newPrio == p.priority { // log.Printf("torrent %p: piece %d: uncached priority: %v", t, piece, newPrio)
return if newPrio == PiecePriorityNone {
if !t.pendingPieces.Remove(piece) {
return
}
} else {
if !t.pendingPieces.Set(piece, newPrio.BitmapPriority()) {
return
}
} }
p.priority = newPrio
t.piecePriorityChanged(piece) t.piecePriorityChanged(piece)
} }
@ -979,63 +988,11 @@ func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all
} }
func (t *Torrent) piecePriority(piece int) piecePriority { func (t *Torrent) piecePriority(piece int) piecePriority {
if !t.haveInfo() { prio, ok := t.pendingPieces.GetPriority(piece)
if !ok {
return PiecePriorityNone return PiecePriorityNone
} }
return t.pieces[piece].priority return piecePriority(-prio)
}
func (t *Torrent) piecePriorityUncached(piece int) (ret piecePriority) {
for _, f := range t.pieces[piece].files {
ret.Raise(f.prio)
}
if t.readerNowPieces.Contains(piece) {
ret.Raise(PiecePriorityNow)
}
// if t.readerNowPieces.Contains(piece - 1) {
// return PiecePriorityNext
// }
if t.readerReadaheadPieces.Contains(piece) {
ret.Raise(PiecePriorityReadahead)
}
if t.pendingPieces.Contains(piece) {
ret.Raise(PiecePriorityNormal)
}
if t.pieceComplete(piece) {
return PiecePriorityNone
}
return
}
func (t *Torrent) pendPiece(piece int) {
if t.pendingPieces.Contains(piece) {
return
}
if t.havePiece(piece) {
return
}
t.pendingPieces.Set(piece, PiecePriorityNormal.BitmapPriority())
t.updatePiecePriority(piece)
}
func (t *Torrent) unpendPieces(unpend bitmap.Bitmap) {
unpend.IterTyped(func(piece int) (more bool) {
t.pendingPieces.Remove(piece)
t.updatePiecePriority(piece)
return true
})
}
func (t *Torrent) pendPieceRange(begin, end int) {
for i := begin; i < end; i++ {
t.pendPiece(i)
}
}
func (t *Torrent) unpendPieceRange(begin, end int) {
var bm bitmap.Bitmap
bm.AddRange(begin, end)
t.unpendPieces(bm)
} }
func (t *Torrent) pendRequest(req request) { func (t *Torrent) pendRequest(req request) {
@ -1078,6 +1035,8 @@ func (t *Torrent) updatePieceCompletion(piece int) {
changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok changed := t.completedPieces.Get(piece) != pcu.Complete || p.storageCompletionOk != pcu.Ok
p.storageCompletionOk = pcu.Ok p.storageCompletionOk = pcu.Ok
t.completedPieces.Set(piece, pcu.Complete) t.completedPieces.Set(piece, pcu.Complete)
// log.Printf("piece %d uncached completion: %v", piece, pcu.Complete)
// log.Printf("piece %d changed: %v", piece, changed)
if changed { if changed {
t.pieceCompletionChanged(piece) t.pieceCompletionChanged(piece)
} }
@ -1144,13 +1103,7 @@ func (t *Torrent) needData() bool {
if !t.haveInfo() { if !t.haveInfo() {
return true return true
} }
if t.pendingPieces.Len() != 0 { return t.pendingPieces.Len() != 0
return true
}
// Read as "not all complete".
return !t.readerPieces().IterTyped(func(piece int) bool {
return t.pieceComplete(piece)
})
} }
func appendMissingStrings(old, new []string) (ret []string) { func appendMissingStrings(old, new []string) (ret []string) {
@ -1572,6 +1525,7 @@ func (t *Torrent) onIncompletePiece(piece int) {
t.pendAllChunkSpecs(piece) t.pendAllChunkSpecs(piece)
} }
if !t.wantPieceIndex(piece) { if !t.wantPieceIndex(piece) {
// log.Printf("piece %d incomplete and unwanted", piece)
return return
} }
// We could drop any connections that we told we have a piece that we // We could drop any connections that we told we have a piece that we

View File

@ -93,10 +93,10 @@ func BenchmarkUpdatePiecePriorities(b *testing.B) {
r.Seek(3500000, 0) r.Seek(3500000, 0)
} }
assert.Len(b, t.readers, 7) assert.Len(b, t.readers, 7)
t.pendPieceRange(0, t.numPieces())
for i := 0; i < t.numPieces(); i += 3 { for i := 0; i < t.numPieces(); i += 3 {
t.completedPieces.Set(i, true) t.completedPieces.Set(i, true)
} }
t.DownloadPieces(0, t.numPieces())
for range iter.N(b.N) { for range iter.N(b.N) {
t.updateAllPiecePriorities() t.updateAllPiecePriorities()
} }