Optimize piece priorities when reader position changes
Gives a decent boost to throughput and reduces a lot of CPU when reading very quickly from Reader.
This commit is contained in:
parent
b81913c860
commit
0e221dbdcd
1
TODO
1
TODO
|
@ -13,4 +13,3 @@
|
|||
* Implement BEP 40.
|
||||
* Rewrite tracker package to be announce-centric, rather than client. Currently the clients are private and adapted onto by the Announce() func.
|
||||
* Move tracker management code in the torrent package to its own file.
|
||||
* Optimize Reader.posChanged, it triggers all piece priorities to be recomputed.
|
||||
|
|
28
reader.go
28
reader.go
|
@ -11,7 +11,13 @@ import (
|
|||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// Accesses torrent data via a client.
|
||||
// Piece range by piece index, [begin, end).
|
||||
type pieceRange struct {
|
||||
begin, end int
|
||||
}
|
||||
|
||||
// Accesses Torrent data via a Client. Reads block until the data is
|
||||
// available. Seeks and readahead also drive Client behaviour.
|
||||
type Reader struct {
|
||||
t *Torrent
|
||||
responsive bool
|
||||
|
@ -24,6 +30,10 @@ type Reader struct {
|
|||
mu sync.Locker
|
||||
pos int64
|
||||
readahead int64
|
||||
// The cached piece range this reader wants downloaded. The zero value
|
||||
// corresponds to nothing. We cache this so that changes can be detected,
|
||||
// and bubbled up to the Torrent only as required.
|
||||
pieces pieceRange
|
||||
}
|
||||
|
||||
var _ io.ReadCloser = &Reader{}
|
||||
|
@ -92,6 +102,17 @@ func (r *Reader) waitReadable(off int64) {
|
|||
r.t.cl.event.Wait()
|
||||
}
|
||||
|
||||
// Calculates the pieces this reader wants downloaded, ignoring the cached
|
||||
// value at r.pieces.
|
||||
func (r *Reader) piecesUncached() (ret pieceRange) {
|
||||
ra := r.readahead
|
||||
if ra < 1 {
|
||||
ra = 1
|
||||
}
|
||||
ret.begin, ret.end = r.t.byteRegionPieces(r.pos, ra)
|
||||
return
|
||||
}
|
||||
|
||||
func (r *Reader) Read(b []byte) (n int, err error) {
|
||||
return r.ReadContext(b, context.Background())
|
||||
}
|
||||
|
@ -193,6 +214,11 @@ func (r *Reader) Close() error {
|
|||
}
|
||||
|
||||
func (r *Reader) posChanged() {
|
||||
p := r.piecesUncached()
|
||||
if p == r.pieces {
|
||||
return
|
||||
}
|
||||
r.pieces = p
|
||||
r.t.readersChanged()
|
||||
}
|
||||
|
||||
|
|
1
t.go
1
t.go
|
@ -36,6 +36,7 @@ func (t *Torrent) NewReader() (ret *Reader) {
|
|||
t: t,
|
||||
readahead: 5 * 1024 * 1024,
|
||||
}
|
||||
ret.pieces = ret.piecesUncached()
|
||||
t.addReader(ret)
|
||||
return
|
||||
}
|
||||
|
|
13
torrent.go
13
torrent.go
|
@ -872,6 +872,7 @@ func (t *Torrent) updatePiecePriorities() {
|
|||
}
|
||||
}
|
||||
|
||||
// Returns the range of pieces [begin, end) that contains the extent of bytes.
|
||||
func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
|
||||
if off >= t.length {
|
||||
return
|
||||
|
@ -896,17 +897,11 @@ func (t *Torrent) byteRegionPieces(off, size int64) (begin, end int) {
|
|||
// callers depend on this method to enumerate readers.
|
||||
func (t *Torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
|
||||
for r := range t.readers {
|
||||
// r.mu.Lock()
|
||||
pos, readahead := r.pos, r.readahead
|
||||
// r.mu.Unlock()
|
||||
if readahead < 1 {
|
||||
readahead = 1
|
||||
}
|
||||
begin, end := t.byteRegionPieces(pos, readahead)
|
||||
if begin >= end {
|
||||
p := r.pieces
|
||||
if p.begin >= p.end {
|
||||
continue
|
||||
}
|
||||
if !f(begin, end) {
|
||||
if !f(p.begin, p.end) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue