diff --git a/client.go b/client.go index 322c7e70..0c383caf 100644 --- a/client.go +++ b/client.go @@ -2509,7 +2509,7 @@ func (me *Client) fillRequests(t *torrent, c *connection) { continue } piece := &t.Pieces[pieceIndex] - for _, cs := range piece.shuffledPendingChunkSpecs(t.pieceLength(pieceIndex), pp.Integer(t.chunkSize)) { + for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) { r := request{pp.Integer(pieceIndex), cs} if !addRequest(r) { return @@ -2546,7 +2546,8 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er unexpectedChunksReceived.Add(1) } - piece := &t.Pieces[req.Index] + index := int(req.Index) + piece := &t.Pieces[index] // Do we actually want this chunk? if !t.wantChunk(req) { @@ -2584,7 +2585,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er if c.peerTouchedPieces == nil { c.peerTouchedPieces = make(map[int]struct{}) } - c.peerTouchedPieces[int(req.Index)] = struct{}{} + c.peerTouchedPieces[index] = struct{}{} me.mu.Unlock() }() @@ -2596,7 +2597,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er delete(t.urgent, req) // It's important that the piece is potentially queued before we check if // the piece is still wanted, because if it is queued, it won't be wanted. - if piece.numPendingChunks() == 0 { + if t.pieceAllDirty(index) { me.queuePieceCheck(t, int(req.Index)) } if !t.wantPiece(int(req.Index)) { @@ -2663,15 +2664,14 @@ func (me *Client) pieceChanged(t *torrent, piece int) { defer me.event.Broadcast() if correct { p.Priority = PiecePriorityNone - p.PendingChunkSpecs = nil for req := range t.urgent { if int(req.Index) == piece { delete(t.urgent, req) } } } else { - if p.numPendingChunks() == 0 { - t.pendAllChunkSpecs(int(piece)) + if t.pieceAllDirty(piece) { + t.pendAllChunkSpecs(piece) } if t.wantPiece(piece) { me.openNewConns(t) diff --git a/client_test.go b/client_test.go index 80d1535c..f073f5ee 100644 --- a/client_test.go +++ b/client_test.go @@ -106,9 +106,8 @@ func TestTorrentInitialState(t *testing.T) { if len(tor.Pieces) != 3 { t.Fatal("wrong number of pieces") } - p := &tor.Pieces[0] tor.pendAllChunkSpecs(0) - assert.EqualValues(t, 3, p.numPendingChunks()) + assert.EqualValues(t, 3, tor.pieceNumPendingChunks(0)) assert.EqualValues(t, chunkSpec{4, 1}, chunkIndexSpec(2, tor.pieceLength(0), tor.chunkSize)) } diff --git a/piece.go b/piece.go index 35c8a7b2..c25bd2e4 100644 --- a/piece.go +++ b/piece.go @@ -4,6 +4,8 @@ import ( "math/rand" "sync" + "github.com/bradfitz/iter" + pp "github.com/anacrolix/torrent/peer_protocol" ) @@ -22,14 +24,14 @@ const ( type piece struct { // The completed piece SHA1 hash, from the metainfo "pieces" field. Hash pieceSum - // Chunks we don't have. The offset and length can be determined by the - // request chunkSize in use. - PendingChunkSpecs []bool - Hashing bool - QueuedForHash bool - EverHashed bool - Priority piecePriority - PublicPieceState PieceState + // Chunks we've written to since the last check. The chunk offset and + // length can be determined by the request chunkSize in use. + DirtyChunks []bool + Hashing bool + QueuedForHash bool + EverHashed bool + Priority piecePriority + PublicPieceState PieceState pendingWritesMutex sync.Mutex pendingWrites int @@ -37,15 +39,16 @@ type piece struct { } func (p *piece) pendingChunk(cs chunkSpec, chunkSize pp.Integer) bool { - if p.PendingChunkSpecs == nil { - return false + ci := chunkIndex(cs, chunkSize) + if ci >= len(p.DirtyChunks) { + return true } - return p.PendingChunkSpecs[chunkIndex(cs, chunkSize)] + return !p.DirtyChunks[ci] } -func (p *piece) numPendingChunks() (ret int) { - for _, pending := range p.PendingChunkSpecs { - if pending { +func (p *piece) numDirtyChunks() (ret int) { + for _, dirty := range p.DirtyChunks { + if dirty { ret++ } } @@ -53,10 +56,10 @@ func (p *piece) numPendingChunks() (ret int) { } func (p *piece) unpendChunkIndex(i int) { - if p.PendingChunkSpecs == nil { - return + for i >= len(p.DirtyChunks) { + p.DirtyChunks = append(p.DirtyChunks, false) } - p.PendingChunkSpecs[i] = false + p.DirtyChunks[i] = true } func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec { @@ -67,14 +70,18 @@ func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec { return ret } -func (p *piece) shuffledPendingChunkSpecs(pieceLength, chunkSize pp.Integer) (css []chunkSpec) { - if p.numPendingChunks() == 0 { +func (p *piece) shuffledPendingChunkSpecs(t *torrent, piece int) (css []chunkSpec) { + // defer func() { + // log.Println(piece, css) + // }() + numPending := t.pieceNumPendingChunks(piece) + if numPending == 0 { return } - css = make([]chunkSpec, 0, p.numPendingChunks()) - for i, pending := range p.PendingChunkSpecs { - if pending { - css = append(css, chunkIndexSpec(i, pieceLength, chunkSize)) + css = make([]chunkSpec, 0, numPending) + for ci := range iter.N(t.pieceNumChunks(piece)) { + if ci >= len(p.DirtyChunks) || !p.DirtyChunks[ci] { + css = append(css, t.chunkIndexSpec(ci, piece)) } } if len(css) <= 1 { diff --git a/torrent.go b/torrent.go index b2478656..928f5ed9 100644 --- a/torrent.go +++ b/torrent.go @@ -22,18 +22,22 @@ import ( "github.com/anacrolix/torrent/tracker" ) +func (t *torrent) chunkIndexSpec(chunkIndex, piece int) chunkSpec { + return chunkIndexSpec(chunkIndex, t.pieceLength(piece), t.chunkSize) +} + func (t *torrent) pieceNumPendingBytes(index int) (count pp.Integer) { if t.pieceComplete(index) { - return 0 + return } piece := &t.Pieces[index] - pieceLength := t.pieceLength(index) + count = t.pieceLength(index) if !piece.EverHashed { - return pieceLength + return } - for i, pending := range piece.PendingChunkSpecs { - if pending { - count += chunkIndexSpec(i, pieceLength, t.chunkSize).Length + for i, dirty := range piece.DirtyChunks { + if dirty { + count -= t.chunkIndexSpec(i, index).Length } } return @@ -588,9 +592,7 @@ func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) { func (t *torrent) bitfield() (bf []bool) { for i := range t.Pieces { - p := &t.Pieces[i] - // TODO: Check this logic. - bf = append(bf, p.EverHashed && p.numPendingChunks() == 0) + bf = append(bf, t.havePiece(i)) } return } @@ -626,18 +628,12 @@ func (t *torrent) pieceChunks(piece int) (css []chunkSpec) { return } +func (t *torrent) pieceNumChunks(piece int) int { + return int((t.pieceLength(piece) + t.chunkSize - 1) / t.chunkSize) +} + func (t *torrent) pendAllChunkSpecs(pieceIndex int) { - piece := &t.Pieces[pieceIndex] - if piece.PendingChunkSpecs == nil { - // Allocate to exact size. - piece.PendingChunkSpecs = make([]bool, (t.pieceLength(pieceIndex)+t.chunkSize-1)/t.chunkSize) - } - // Pend all the chunks. - pcss := piece.PendingChunkSpecs - for i := range pcss { - pcss[i] = true - } - return + t.Pieces[pieceIndex].DirtyChunks = nil } type Peer struct { @@ -650,6 +646,9 @@ type Peer struct { } func (t *torrent) pieceLength(piece int) (len_ pp.Integer) { + if piece < 0 || piece > t.Info.NumPieces() { + return + } if int(piece) == t.numPieces()-1 { len_ = pp.Integer(t.Length() % t.Info.PieceLength) } @@ -707,7 +706,10 @@ func (t *torrent) havePiece(index int) bool { return t.haveInfo() && t.pieceComplete(index) } -func (t *torrent) haveChunk(r request) bool { +func (t *torrent) haveChunk(r request) (ret bool) { + // defer func() { + // log.Println("have chunk", r, ret) + // }() if !t.haveInfo() { return false } @@ -715,9 +717,6 @@ func (t *torrent) haveChunk(r request) bool { return true } p := &t.Pieces[r.Index] - if p.PendingChunkSpecs == nil { - return false - } return !p.pendingChunk(r.chunkSpec, t.chunkSize) } @@ -806,3 +805,20 @@ func (t *torrent) publishPieceChange(piece int) { } p.PublicPieceState = cur } + +func (t *torrent) pieceNumPendingChunks(piece int) int { + return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks() +} + +func (t *torrent) pieceAllDirty(piece int) bool { + p := &t.Pieces[piece] + if len(p.DirtyChunks) != t.pieceNumChunks(piece) { + return false + } + for _, dirty := range p.DirtyChunks { + if !dirty { + return false + } + } + return true +}