diff --git a/connection.go b/connection.go index cea8dab6..24da542b 100644 --- a/connection.go +++ b/connection.go @@ -303,9 +303,8 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) { cn.statusFlags(), cn.downloadRate()/(1<<10), ) - roi := cn.pieceRequestOrderIter() fmt.Fprintf(w, " next pieces: %v%s\n", - iter.ToSlice(iter.Head(10, roi)), + iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)), func() string { if cn.shouldRequestWithoutBias() { return " (fastest)" @@ -524,33 +523,55 @@ func (cn *connection) request(r request, mw messageWriter) bool { } func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) { - numFillBuffers.Add(1) - cancel, new, i := cn.desiredRequestState() - if !cn.SetInterested(i, msg) { - return - } - if cancel && len(cn.requests) != 0 { - fillBufferSentCancels.Add(1) - for r := range cn.requests { - cn.deleteRequest(r) - // log.Printf("%p: cancelling request: %v", cn, r) - if !msg(makeCancelMessage(r)) { - return + if !cn.t.networkingEnabled { + if !cn.SetInterested(false, msg) { + return + } + if len(cn.requests) != 0 { + for r := range cn.requests { + cn.deleteRequest(r) + // log.Printf("%p: cancelling request: %v", cn, r) + if !msg(makeCancelMessage(r)) { + return + } } } } - if len(new) != 0 { - fillBufferSentRequests.Add(1) - for _, r := range new { - if !cn.request(r, msg) { - // If we didn't completely top up the requests, we shouldn't - // mark the low water, since we'll want to top up the requests - // as soon as we have more write buffer space. - return - } + if len(cn.requests) <= cn.requestsLowWater { + filledBuffer := false + cn.iterPendingPieces(func(pieceIndex int) bool { + cn.iterPendingRequests(pieceIndex, func(r request) bool { + if !cn.SetInterested(true, msg) { + filledBuffer = true + return false + } + if len(cn.requests) >= cn.nominalMaxRequests() { + return false + } + // Choking is looked at here because our interest is dependent + // on whether we'd make requests in its absence. + if cn.PeerChoked { + if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) { + return false + } + } + if _, ok := cn.requests[r]; ok { + return true + } + filledBuffer = !cn.request(r, msg) + return !filledBuffer + }) + return !filledBuffer + }) + if filledBuffer { + // If we didn't completely top up the requests, we shouldn't mark + // the low water, since we'll want to top up the requests as soon + // as we have more write buffer space. + return } cn.requestsLowWater = len(cn.requests) / 2 } + cn.upload(msg) } @@ -639,53 +660,6 @@ func (cn *connection) PostBitfield() { cn.sentHaves = cn.t.completedPieces.Copy() } -// Determines interest and requests to send to a connected peer. -func nextRequestState( - networkingEnabled bool, - currentRequests map[request]struct{}, - peerChoking bool, - iterPendingRequests func(f func(request) bool), - requestsLowWater int, - requestsHighWater int, - allowedFast bitmap.Bitmap, -) ( - cancelExisting bool, // Cancel all our pending requests - newRequests []request, // Chunks to request that we currently aren't - interested bool, // Whether we should indicate interest, even if we don't request anything -) { - if !networkingEnabled { - return true, nil, false - } - if len(currentRequests) > requestsLowWater { - return false, nil, true - } - // If we have existing requests, better maintain interest to ensure we get - // them. iterPendingRequests might not iterate over outstanding requests. - interested = len(currentRequests) != 0 - iterPendingRequests(func(r request) bool { - interested = true - if peerChoking { - if allowedFast.IsEmpty() { - return false - } - if !allowedFast.Get(int(r.Index)) { - return true - } - } - if len(currentRequests)+len(newRequests) >= requestsHighWater { - return false - } - if _, ok := currentRequests[r]; !ok { - if newRequests == nil { - newRequests = make([]request, 0, requestsHighWater-len(currentRequests)) - } - newRequests = append(newRequests, r) - } - return true - }) - return -} - func (cn *connection) updateRequests() { // log.Print("update requests") cn.tickleWriter() @@ -707,18 +681,26 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func { } } -func (cn *connection) unbiasedPieceRequestOrder() iter.Func { +func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool { now, readahead := cn.t.readerPiecePriorities() var skip bitmap.Bitmap if !cn.peerSentHaveAll { - // Pieces to skip include pieces the peer doesn't have + // Pieces to skip include pieces the peer doesn't have. skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces()) } // And pieces that we already have. skip.Union(cn.t.completedPieces) + skip.Union(cn.t.piecesQueuedForHash) // Return an iterator over the different priority classes, minus the skip // pieces. - return iter.Chain( + return iter.All( + func(_piece interface{}) bool { + i := _piece.(pieceIndex) + if cn.t.hashingPiece(i) { + return true + } + return f(i) + }, iterBitmapsDistinct(&skip, now, readahead), func(cb iter.Callback) { cn.t.pendingPieces.IterTyped(func(piece int) bool { @@ -756,47 +738,36 @@ func (cn *connection) shouldRequestWithoutBias() bool { return false } -func (cn *connection) pieceRequestOrderIter() iter.Func { +func (cn *connection) iterPendingPieces(f func(int) bool) bool { if cn.t.requestStrategy == 3 { - return cn.unbiasedPieceRequestOrder() + return cn.iterUnbiasedPieceRequestOrder(f) } if cn.shouldRequestWithoutBias() { - return cn.unbiasedPieceRequestOrder() + return cn.iterUnbiasedPieceRequestOrder(f) } else { - return cn.pieceRequestOrder.Iter + return cn.pieceRequestOrder.IterTyped(f) } } -func (cn *connection) iterPendingRequests(f func(request) bool) { - cn.pieceRequestOrderIter()(func(_piece interface{}) bool { - piece := _piece.(int) - return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool { - r := request{pp.Integer(piece), cs} - if cn.t.requestStrategy == 3 { - lr := cn.t.lastRequested[r] - if !lr.IsZero() { - if time.Since(lr) < cn.t.duplicateRequestTimeout { - return true - } else { - torrent.Add("requests duplicated due to timeout", 1) - } - } - } - return f(r) - }) - }) +func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) { + cn.iterPendingPieces(func(i int) bool { return f(i) }) } -func (cn *connection) desiredRequestState() (bool, []request, bool) { - return nextRequestState( - cn.t.networkingEnabled, - cn.requests, - cn.PeerChoked, - cn.iterPendingRequests, - cn.requestsLowWater, - cn.nominalMaxRequests(), - cn.peerAllowedFast, - ) +func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool { + return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool { + r := request{pp.Integer(piece), cs} + if cn.t.requestStrategy == 3 { + lr := cn.t.lastRequested[r] + if !lr.IsZero() { + if time.Since(lr) < cn.t.duplicateRequestTimeout { + return true + } else { + torrent.Add("requests duplicated due to timeout", 1) + } + } + } + return f(r) + }) } func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool { diff --git a/global.go b/global.go index 43b84ee1..bbec4b5f 100644 --- a/global.go +++ b/global.go @@ -49,8 +49,4 @@ var ( pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused") pieceInclinationsNew = expvar.NewInt("pieceInclinationsNew") pieceInclinationsPut = expvar.NewInt("pieceInclinationsPut") - - fillBufferSentCancels = expvar.NewInt("fillBufferSentCancels") - fillBufferSentRequests = expvar.NewInt("fillBufferSentRequests") - numFillBuffers = expvar.NewInt("numFillBuffers") ) diff --git a/piece.go b/piece.go index d29d1dc8..5e0c82a8 100644 --- a/piece.go +++ b/piece.go @@ -215,7 +215,7 @@ func (p *Piece) SetPriority(prio piecePriority) { } func (p *Piece) uncachedPriority() (ret piecePriority) { - if p.t.pieceComplete(p.index) { + if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) { return PiecePriorityNone } for _, f := range p.files { diff --git a/torrent.go b/torrent.go index 5736dee5..15913979 100644 --- a/torrent.go +++ b/torrent.go @@ -1675,7 +1675,7 @@ func (t *Torrent) onIncompletePiece(piece int) { } } -func (t *Torrent) verifyPiece(piece int) { +func (t *Torrent) verifyPiece(piece pieceIndex) { cl := t.cl cl.mu.Lock() defer cl.mu.Unlock() @@ -1690,18 +1690,20 @@ func (t *Torrent) verifyPiece(piece int) { if !p.t.piecesQueuedForHash.Remove(piece) { panic("piece was not queued") } + t.updatePiecePriority(piece) if t.closed.IsSet() || t.pieceComplete(piece) { - t.updatePiecePriority(piece) return } p.hashing = true t.publishPieceChange(piece) + t.updatePiecePriority(piece) t.storageLock.RLock() cl.mu.Unlock() sum := t.hashPiece(piece) t.storageLock.RUnlock() cl.mu.Lock() p.hashing = false + t.updatePiecePriority(piece) t.pieceHashed(piece, sum == p.hash) t.publishPieceChange(piece) } @@ -1732,6 +1734,7 @@ func (t *Torrent) queuePieceCheck(pieceIndex int) { } t.piecesQueuedForHash.Add(pieceIndex) t.publishPieceChange(pieceIndex) + t.updatePiecePriority(pieceIndex) go t.verifyPiece(pieceIndex) }