Do requests synchronously, and don't request from hashing or queued pieces
Calculating the desired state was a nice idea, but too hard to debug. This way should also be faster.
This commit is contained in:
parent
7dd532d46f
commit
c921242f30
181
connection.go
181
connection.go
|
@ -303,9 +303,8 @@ func (cn *connection) WriteStatus(w io.Writer, t *Torrent) {
|
|||
cn.statusFlags(),
|
||||
cn.downloadRate()/(1<<10),
|
||||
)
|
||||
roi := cn.pieceRequestOrderIter()
|
||||
fmt.Fprintf(w, " next pieces: %v%s\n",
|
||||
iter.ToSlice(iter.Head(10, roi)),
|
||||
iter.ToSlice(iter.Head(10, cn.iterPendingPiecesUntyped)),
|
||||
func() string {
|
||||
if cn.shouldRequestWithoutBias() {
|
||||
return " (fastest)"
|
||||
|
@ -524,33 +523,55 @@ func (cn *connection) request(r request, mw messageWriter) bool {
|
|||
}
|
||||
|
||||
func (cn *connection) fillWriteBuffer(msg func(pp.Message) bool) {
|
||||
numFillBuffers.Add(1)
|
||||
cancel, new, i := cn.desiredRequestState()
|
||||
if !cn.SetInterested(i, msg) {
|
||||
return
|
||||
}
|
||||
if cancel && len(cn.requests) != 0 {
|
||||
fillBufferSentCancels.Add(1)
|
||||
for r := range cn.requests {
|
||||
cn.deleteRequest(r)
|
||||
// log.Printf("%p: cancelling request: %v", cn, r)
|
||||
if !msg(makeCancelMessage(r)) {
|
||||
return
|
||||
if !cn.t.networkingEnabled {
|
||||
if !cn.SetInterested(false, msg) {
|
||||
return
|
||||
}
|
||||
if len(cn.requests) != 0 {
|
||||
for r := range cn.requests {
|
||||
cn.deleteRequest(r)
|
||||
// log.Printf("%p: cancelling request: %v", cn, r)
|
||||
if !msg(makeCancelMessage(r)) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(new) != 0 {
|
||||
fillBufferSentRequests.Add(1)
|
||||
for _, r := range new {
|
||||
if !cn.request(r, msg) {
|
||||
// If we didn't completely top up the requests, we shouldn't
|
||||
// mark the low water, since we'll want to top up the requests
|
||||
// as soon as we have more write buffer space.
|
||||
return
|
||||
}
|
||||
if len(cn.requests) <= cn.requestsLowWater {
|
||||
filledBuffer := false
|
||||
cn.iterPendingPieces(func(pieceIndex int) bool {
|
||||
cn.iterPendingRequests(pieceIndex, func(r request) bool {
|
||||
if !cn.SetInterested(true, msg) {
|
||||
filledBuffer = true
|
||||
return false
|
||||
}
|
||||
if len(cn.requests) >= cn.nominalMaxRequests() {
|
||||
return false
|
||||
}
|
||||
// Choking is looked at here because our interest is dependent
|
||||
// on whether we'd make requests in its absence.
|
||||
if cn.PeerChoked {
|
||||
if !cn.peerAllowedFast.Get(bitmap.BitIndex(r.Index)) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
if _, ok := cn.requests[r]; ok {
|
||||
return true
|
||||
}
|
||||
filledBuffer = !cn.request(r, msg)
|
||||
return !filledBuffer
|
||||
})
|
||||
return !filledBuffer
|
||||
})
|
||||
if filledBuffer {
|
||||
// If we didn't completely top up the requests, we shouldn't mark
|
||||
// the low water, since we'll want to top up the requests as soon
|
||||
// as we have more write buffer space.
|
||||
return
|
||||
}
|
||||
cn.requestsLowWater = len(cn.requests) / 2
|
||||
}
|
||||
|
||||
cn.upload(msg)
|
||||
}
|
||||
|
||||
|
@ -639,53 +660,6 @@ func (cn *connection) PostBitfield() {
|
|||
cn.sentHaves = cn.t.completedPieces.Copy()
|
||||
}
|
||||
|
||||
// Determines interest and requests to send to a connected peer.
|
||||
func nextRequestState(
|
||||
networkingEnabled bool,
|
||||
currentRequests map[request]struct{},
|
||||
peerChoking bool,
|
||||
iterPendingRequests func(f func(request) bool),
|
||||
requestsLowWater int,
|
||||
requestsHighWater int,
|
||||
allowedFast bitmap.Bitmap,
|
||||
) (
|
||||
cancelExisting bool, // Cancel all our pending requests
|
||||
newRequests []request, // Chunks to request that we currently aren't
|
||||
interested bool, // Whether we should indicate interest, even if we don't request anything
|
||||
) {
|
||||
if !networkingEnabled {
|
||||
return true, nil, false
|
||||
}
|
||||
if len(currentRequests) > requestsLowWater {
|
||||
return false, nil, true
|
||||
}
|
||||
// If we have existing requests, better maintain interest to ensure we get
|
||||
// them. iterPendingRequests might not iterate over outstanding requests.
|
||||
interested = len(currentRequests) != 0
|
||||
iterPendingRequests(func(r request) bool {
|
||||
interested = true
|
||||
if peerChoking {
|
||||
if allowedFast.IsEmpty() {
|
||||
return false
|
||||
}
|
||||
if !allowedFast.Get(int(r.Index)) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
if len(currentRequests)+len(newRequests) >= requestsHighWater {
|
||||
return false
|
||||
}
|
||||
if _, ok := currentRequests[r]; !ok {
|
||||
if newRequests == nil {
|
||||
newRequests = make([]request, 0, requestsHighWater-len(currentRequests))
|
||||
}
|
||||
newRequests = append(newRequests, r)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (cn *connection) updateRequests() {
|
||||
// log.Print("update requests")
|
||||
cn.tickleWriter()
|
||||
|
@ -707,18 +681,26 @@ func iterBitmapsDistinct(skip *bitmap.Bitmap, bms ...bitmap.Bitmap) iter.Func {
|
|||
}
|
||||
}
|
||||
|
||||
func (cn *connection) unbiasedPieceRequestOrder() iter.Func {
|
||||
func (cn *connection) iterUnbiasedPieceRequestOrder(f func(piece int) bool) bool {
|
||||
now, readahead := cn.t.readerPiecePriorities()
|
||||
var skip bitmap.Bitmap
|
||||
if !cn.peerSentHaveAll {
|
||||
// Pieces to skip include pieces the peer doesn't have
|
||||
// Pieces to skip include pieces the peer doesn't have.
|
||||
skip = bitmap.Flip(cn.peerPieces, 0, cn.t.numPieces())
|
||||
}
|
||||
// And pieces that we already have.
|
||||
skip.Union(cn.t.completedPieces)
|
||||
skip.Union(cn.t.piecesQueuedForHash)
|
||||
// Return an iterator over the different priority classes, minus the skip
|
||||
// pieces.
|
||||
return iter.Chain(
|
||||
return iter.All(
|
||||
func(_piece interface{}) bool {
|
||||
i := _piece.(pieceIndex)
|
||||
if cn.t.hashingPiece(i) {
|
||||
return true
|
||||
}
|
||||
return f(i)
|
||||
},
|
||||
iterBitmapsDistinct(&skip, now, readahead),
|
||||
func(cb iter.Callback) {
|
||||
cn.t.pendingPieces.IterTyped(func(piece int) bool {
|
||||
|
@ -756,47 +738,36 @@ func (cn *connection) shouldRequestWithoutBias() bool {
|
|||
return false
|
||||
}
|
||||
|
||||
func (cn *connection) pieceRequestOrderIter() iter.Func {
|
||||
func (cn *connection) iterPendingPieces(f func(int) bool) bool {
|
||||
if cn.t.requestStrategy == 3 {
|
||||
return cn.unbiasedPieceRequestOrder()
|
||||
return cn.iterUnbiasedPieceRequestOrder(f)
|
||||
}
|
||||
if cn.shouldRequestWithoutBias() {
|
||||
return cn.unbiasedPieceRequestOrder()
|
||||
return cn.iterUnbiasedPieceRequestOrder(f)
|
||||
} else {
|
||||
return cn.pieceRequestOrder.Iter
|
||||
return cn.pieceRequestOrder.IterTyped(f)
|
||||
}
|
||||
}
|
||||
|
||||
func (cn *connection) iterPendingRequests(f func(request) bool) {
|
||||
cn.pieceRequestOrderIter()(func(_piece interface{}) bool {
|
||||
piece := _piece.(int)
|
||||
return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
|
||||
r := request{pp.Integer(piece), cs}
|
||||
if cn.t.requestStrategy == 3 {
|
||||
lr := cn.t.lastRequested[r]
|
||||
if !lr.IsZero() {
|
||||
if time.Since(lr) < cn.t.duplicateRequestTimeout {
|
||||
return true
|
||||
} else {
|
||||
torrent.Add("requests duplicated due to timeout", 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
return f(r)
|
||||
})
|
||||
})
|
||||
func (cn *connection) iterPendingPiecesUntyped(f iter.Callback) {
|
||||
cn.iterPendingPieces(func(i int) bool { return f(i) })
|
||||
}
|
||||
|
||||
func (cn *connection) desiredRequestState() (bool, []request, bool) {
|
||||
return nextRequestState(
|
||||
cn.t.networkingEnabled,
|
||||
cn.requests,
|
||||
cn.PeerChoked,
|
||||
cn.iterPendingRequests,
|
||||
cn.requestsLowWater,
|
||||
cn.nominalMaxRequests(),
|
||||
cn.peerAllowedFast,
|
||||
)
|
||||
func (cn *connection) iterPendingRequests(piece int, f func(request) bool) bool {
|
||||
return iterUndirtiedChunks(piece, cn.t, func(cs chunkSpec) bool {
|
||||
r := request{pp.Integer(piece), cs}
|
||||
if cn.t.requestStrategy == 3 {
|
||||
lr := cn.t.lastRequested[r]
|
||||
if !lr.IsZero() {
|
||||
if time.Since(lr) < cn.t.duplicateRequestTimeout {
|
||||
return true
|
||||
} else {
|
||||
torrent.Add("requests duplicated due to timeout", 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
return f(r)
|
||||
})
|
||||
}
|
||||
|
||||
func iterUndirtiedChunks(piece int, t *Torrent, f func(chunkSpec) bool) bool {
|
||||
|
|
|
@ -49,8 +49,4 @@ var (
|
|||
pieceInclinationsReused = expvar.NewInt("pieceInclinationsReused")
|
||||
pieceInclinationsNew = expvar.NewInt("pieceInclinationsNew")
|
||||
pieceInclinationsPut = expvar.NewInt("pieceInclinationsPut")
|
||||
|
||||
fillBufferSentCancels = expvar.NewInt("fillBufferSentCancels")
|
||||
fillBufferSentRequests = expvar.NewInt("fillBufferSentRequests")
|
||||
numFillBuffers = expvar.NewInt("numFillBuffers")
|
||||
)
|
||||
|
|
2
piece.go
2
piece.go
|
@ -215,7 +215,7 @@ func (p *Piece) SetPriority(prio piecePriority) {
|
|||
}
|
||||
|
||||
func (p *Piece) uncachedPriority() (ret piecePriority) {
|
||||
if p.t.pieceComplete(p.index) {
|
||||
if p.t.pieceComplete(p.index) || p.t.pieceQueuedForHash(p.index) || p.t.hashingPiece(p.index) {
|
||||
return PiecePriorityNone
|
||||
}
|
||||
for _, f := range p.files {
|
||||
|
|
|
@ -1675,7 +1675,7 @@ func (t *Torrent) onIncompletePiece(piece int) {
|
|||
}
|
||||
}
|
||||
|
||||
func (t *Torrent) verifyPiece(piece int) {
|
||||
func (t *Torrent) verifyPiece(piece pieceIndex) {
|
||||
cl := t.cl
|
||||
cl.mu.Lock()
|
||||
defer cl.mu.Unlock()
|
||||
|
@ -1690,18 +1690,20 @@ func (t *Torrent) verifyPiece(piece int) {
|
|||
if !p.t.piecesQueuedForHash.Remove(piece) {
|
||||
panic("piece was not queued")
|
||||
}
|
||||
t.updatePiecePriority(piece)
|
||||
if t.closed.IsSet() || t.pieceComplete(piece) {
|
||||
t.updatePiecePriority(piece)
|
||||
return
|
||||
}
|
||||
p.hashing = true
|
||||
t.publishPieceChange(piece)
|
||||
t.updatePiecePriority(piece)
|
||||
t.storageLock.RLock()
|
||||
cl.mu.Unlock()
|
||||
sum := t.hashPiece(piece)
|
||||
t.storageLock.RUnlock()
|
||||
cl.mu.Lock()
|
||||
p.hashing = false
|
||||
t.updatePiecePriority(piece)
|
||||
t.pieceHashed(piece, sum == p.hash)
|
||||
t.publishPieceChange(piece)
|
||||
}
|
||||
|
@ -1732,6 +1734,7 @@ func (t *Torrent) queuePieceCheck(pieceIndex int) {
|
|||
}
|
||||
t.piecesQueuedForHash.Add(pieceIndex)
|
||||
t.publishPieceChange(pieceIndex)
|
||||
t.updatePiecePriority(pieceIndex)
|
||||
go t.verifyPiece(pieceIndex)
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue