Track dirty chunks, instead of pending chunk specs

This is the first step toward having purely Reader-based priorities. If a chunk is pending, that currently implies that we want to download it. I want to move that kind of state out to the readers.
This commit is contained in:
Matt Joiner 2016-01-13 17:11:59 +11:00
parent 153c13db43
commit 2beb5f8bd4
4 changed files with 78 additions and 56 deletions

View File

@ -2509,7 +2509,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) {
continue
}
piece := &t.Pieces[pieceIndex]
for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) {
for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) {
r := request{pp.Integer(pieceIndex), cs}
if !addRequest(r) {
return
@ -2546,7 +2546,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
unexpectedChunksReceived.Add(1)
}
piece := &t.Pieces[req.Index]
index := int(req.Index)
piece := &t.Pieces[index]
// Do we actually want this chunk?
if !t.wantChunk(req) {
@ -2584,7 +2585,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
if c.peerTouchedPieces == nil {
c.peerTouchedPieces = make(map[int]struct{})
}
c.peerTouchedPieces[int(req.Index)] = struct{}{}
c.peerTouchedPieces[index] = struct{}{}
me.mu.Unlock()
}()
@ -2596,7 +2597,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
delete(t.urgent, req)
// It's important that the piece is potentially queued before we check if
// the piece is still wanted, because if it is queued, it won't be wanted.
if piece.numPendingChunks() == 0 {
if t.pieceAllDirty(index) {
me.queuePieceCheck(t, int(req.Index))
}
if !t.wantPiece(int(req.Index)) {
@ -2663,15 +2664,14 @@ func (me *Client) pieceChanged(t *torrent, piece int) {
defer me.event.Broadcast()
if correct {
p.Priority = PiecePriorityNone
p.PendingChunkSpecs = nil
for req := range t.urgent {
if int(req.Index) == piece {
delete(t.urgent, req)
}
}
} else {
if p.numPendingChunks() == 0 {
t.pendAllChunkSpecs(int(piece))
if t.pieceAllDirty(piece) {
t.pendAllChunkSpecs(piece)
}
if t.wantPiece(piece) {
me.openNewConns(t)

View File

@ -106,9 +106,8 @@ func TestTorrentInitialState(t *testing.T) {
if len(tor.Pieces) != 3 {
t.Fatal("wrong number of pieces")
}
p := &tor.Pieces[0]
tor.pendAllChunkSpecs(0)
assert.EqualValues(t, 3, p.numPendingChunks())
assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0))
assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize))
}

View File

@ -4,6 +4,8 @@ import (
"math/rand"
"sync"
"github.com/bradfitz/iter"
pp "github.com/anacrolix/torrent/peer_protocol"
)
@ -22,14 +24,14 @@ const (
type piece struct {
// The completed piece SHA1 hash, from the metainfo "pieces" field.
Hash pieceSum
// Chunks we don't have. The offset and length can be determined by the
// request chunkSize in use.
PendingChunkSpecs []bool
Hashing bool
QueuedForHash bool
EverHashed bool
Priority piecePriority
PublicPieceState PieceState
// Chunks we've written to since the last check. The chunk offset and
// length can be determined by the request chunkSize in use.
DirtyChunks []bool
Hashing bool
QueuedForHash bool
EverHashed bool
Priority piecePriority
PublicPieceState PieceState
pendingWritesMutex sync.Mutex
pendingWrites int
@ -37,15 +39,16 @@ type piece struct {
}
func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool {
if p.PendingChunkSpecs == nil {
return false
ci := chunkIndex(cs, chunkSize)
if ci >= len(p.DirtyChunks) {
return true
}
return p.PendingChunkSpecs[chunkIndex(cs, chunkSize)]
return !p.DirtyChunks[ci]
}
func (p *piece) numPendingChunks() (ret int) {
for _, pending := range p.PendingChunkSpecs {
if pending {
func (p *piece) numDirtyChunks() (ret int) {
for _, dirty := range p.DirtyChunks {
if dirty {
ret++
}
}
@ -53,10 +56,10 @@ func (p *piece) numPendingChunks() (ret int) {
}
func (p *piece) unpendChunkIndex(i int) {
if p.PendingChunkSpecs == nil {
return
for i >= len(p.DirtyChunks) {
p.DirtyChunks = append(p.DirtyChunks, false)
}
p.PendingChunkSpecs[i] = false
p.DirtyChunks[i] = true
}
func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
@ -67,14 +70,18 @@ func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
return ret
}
func (p *piece) shuffledPendingChunkSpecs(pieceLength, chunkSize pp.Integer) (css []chunkSpec) {
if p.numPendingChunks() == 0 {
func (p *piece) shuffledPendingChunkSpecs(t *torrent, piece int) (css []chunkSpec) {
// defer func() {
// log.Println(piece, css)
// }()
numPending := t.pieceNumPendingChunks(piece)
if numPending == 0 {
return
}
css = make([]chunkSpec, 0, p.numPendingChunks())
for i, pending := range p.PendingChunkSpecs {
if pending {
css = append(css, chunkIndexSpec(i, pieceLength, chunkSize))
css = make([]chunkSpec, 0, numPending)
for ci := range iter.N(t.pieceNumChunks(piece)) {
if ci >= len(p.DirtyChunks) || !p.DirtyChunks[ci] {
css = append(css, t.chunkIndexSpec(ci, piece))
}
}
if len(css) <= 1 {

View File

@ -22,18 +22,22 @@ import (
"github.com/anacrolix/torrent/tracker"
)
func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec {
return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize)
}
func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) {
if t.pieceComplete(index) {
return 0
return
}
piece := &t.Pieces[index]
pieceLength := t.pieceLength(index)
count = t.pieceLength(index)
if !piece.EverHashed {
return pieceLength
return
}
for i, pending := range piece.PendingChunkSpecs {
if pending {
count += chunkIndexSpec(i, pieceLength, t.chunkSize).Length
for i, dirty := range piece.DirtyChunks {
if dirty {
count -= t.chunkIndexSpec(i, index).Length
}
}
return
@ -588,9 +592,7 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
func (t *torrent) bitfield() (bf []bool) {
for i := range t.Pieces {
p := &t.Pieces[i]
// TODO: Check this logic.
bf = append(bf, p.EverHashed && p.numPendingChunks() == 0)
bf = append(bf, t.havePiece(i))
}
return
}
@ -626,18 +628,12 @@ func (t *torrent) pieceChunks(piece int) (css []chunkSpec) {
return
}
func (t *torrent) pieceNumChunks(piece int) int {
return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize)
}
func (t *torrent) pendAllChunkSpecs(pieceIndex int) {
piece := &t.Pieces[pieceIndex]
if piece.PendingChunkSpecs == nil {
// Allocate to exact size.
piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize)
}
// Pend all the chunks.
pcss := piece.PendingChunkSpecs
for i := range pcss {
pcss[i] = true
}
return
t.Pieces[pieceIndex].DirtyChunks = nil
}
type Peer struct {
@ -650,6 +646,9 @@ type Peer struct {
}
func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
if piece < 0 || piece > t.Info.NumPieces() {
return
}
if int(piece) == t.numPieces()-1 {
len_ = pp.Integer(t.Length() % t.Info.PieceLength)
}
@ -707,7 +706,10 @@ func (t *torrent) havePiece(index int) bool {
return t.haveInfo() && t.pieceComplete(index)
}
func (t *torrent) haveChunk(r request) bool {
func (t *torrent) haveChunk(r request) (ret bool) {
// defer func() {
// log.Println("have chunk", r, ret)
// }()
if !t.haveInfo() {
return false
}
@ -715,9 +717,6 @@ func (t *torrent) haveChunk(r request) bool {
return true
}
p := &t.Pieces[r.Index]
if p.PendingChunkSpecs == nil {
return false
}
return !p.pendingChunk(r.chunkSpec, t.chunkSize)
}
@ -806,3 +805,20 @@ func (t *torrent) publishPieceChange(piece int) {
}
p.PublicPieceState = cur
}
func (t *torrent) pieceNumPendingChunks(piece int) int {
return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
}
func (t *torrent) pieceAllDirty(piece int) bool {
p := &t.Pieces[piece]
if len(p.DirtyChunks) != t.pieceNumChunks(piece) {
return false
}
for _, dirty := range p.DirtyChunks {
if !dirty {
return false
}
}
return true
}