Great complexifying of the responsive download strategy

Should be better after 4 days of experimentation...
This commit is contained in:
Matt Joiner 2014-08-28 10:04:00 +10:00
parent 7e19c9c12b
commit 1507d803bd
1 changed files with 129 additions and 67 deletions

View File

@ -1,8 +1,10 @@
package torrent
import (
"container/heap"
"fmt"
"io"
"math/rand"
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
)
@ -64,8 +66,7 @@ func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
for _, heatThreshold := range []int{1, 4, 15, 60} {
for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
pieceIndex := pp.Integer(e.Value.(int))
for _, chunkSpec := range t.Pieces[pieceIndex].shuffledPendingChunkSpecs() {
// for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
r := request{pieceIndex, chunkSpec}
if th[r] >= heatThreshold {
continue
@ -113,6 +114,7 @@ func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy
lastReadOffset: make(map[*torrent]int64),
priorities: make(map[*torrent]map[request]struct{}),
requestHeat: make(map[*torrent]map[request]int),
rand: rand.New(rand.NewSource(1337)),
}
}
@ -123,6 +125,7 @@ type responsiveDownloadStrategy struct {
lastReadOffset map[*torrent]int64
priorities map[*torrent]map[request]struct{}
requestHeat map[*torrent]map[request]int
rand *rand.Rand // Avoid global lock
}
func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
@ -153,93 +156,152 @@ func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
rh[r]--
}
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
th := me.requestHeat[t]
requestWrapper := func(req request) bool {
if c.RequestPending(req) {
return true
}
again := c.Request(req)
if c.RequestPending(req) {
th[req]++
}
return again
}
type requestFiller struct {
c *connection
t *torrent
s *responsiveDownloadStrategy
}
for req := range me.priorities[t] {
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
// Wrapper around connection.request that tracks request heat.
func (me *requestFiller) request(req request) bool {
if me.c.RequestPending(req) {
return true
}
again := me.c.Request(req)
if me.c.RequestPending(req) {
me.s.requestHeat[me.t][req]++
}
return again
}
// Adds additional constraints around the request heat wrapper.
func (me *requestFiller) conservativelyRequest(req request) bool {
again := me.request(req)
if len(me.c.Requests) >= 50 {
return false
}
return again
}
// Fill priority requests.
func (me *requestFiller) priorities() bool {
for req := range me.s.priorities[me.t] {
if _, ok := me.t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
panic(req)
}
if !requestWrapper(req) {
return
if !me.request(req) {
return false
}
}
return true
}
if len(c.Requests) >= 16 {
// Fill requests, with all contextual information available in the receiver.
func (me requestFiller) Run() {
if !me.priorities() {
return
}
requestWrapper = func() func(request) bool {
f := requestWrapper
return func(req request) bool {
if len(c.Requests) >= 32 {
return false
}
return f(req)
}
}()
if lastReadOffset, ok := me.lastReadOffset[t]; ok {
var nextAhead int64
for ahead := int64(0); ahead < me.Readahead; ahead = nextAhead {
off := lastReadOffset + ahead
req, ok := t.offsetRequest(off)
if !ok {
break
}
if !t.wantPiece(int(req.Index)) {
nextAhead = ahead + int64(t.PieceLength(req.Index))
continue
}
nextAhead = ahead + int64(req.Length)
if !t.wantChunk(req) {
continue
}
if th[req] >= func() int {
// Determine allowed redundancy based on how far into the
// readahead zone we're looking.
if ahead >= (2*me.Readahead+2)/3 {
return 1
} else if ahead >= (me.Readahead+2)/3 {
return 2
} else {
return 3
}
}() {
continue
}
if !requestWrapper(req) {
return
}
}
if len(me.c.Requests) > 25 {
return
}
if !me.readahead() {
return
}
if len(me.c.Requests) > 0 {
return
}
me.completePartial()
}
// t.assertIncompletePiecesByBytesLeftOrdering()
// Request partial pieces that aren't in the readahead zone.
func (me *requestFiller) completePartial() bool {
t := me.t
th := me.s.requestHeat[t]
for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
p := e.Value.(int)
if !t.PiecePartiallyDownloaded(p) && int(t.PieceLength(pp.Integer(p))) == t.UsualPieceSize() {
break
}
if lastReadOffset, ok := me.s.lastReadOffset[t]; ok {
if p >= int(lastReadOffset/int64(t.UsualPieceSize())) {
if int64(p+1)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead {
continue
}
}
}
for chunkSpec := range t.Pieces[p].PendingChunkSpecs {
r := request{pp.Integer(p), chunkSpec}
if th[r] >= 2 {
if th[r] >= 1 {
continue
}
if !requestWrapper(r) {
return
if !me.conservativelyRequest(r) {
return false
}
}
}
return true
}
// Returns all wanted chunk specs in the readahead zone.
func (me *requestFiller) pendingReadaheadChunks() (ret []request) {
t := me.t
lastReadOffset, ok := me.s.lastReadOffset[t]
if !ok {
return
}
ret = make([]request, 0, (me.s.Readahead+chunkSize-1)/chunkSize)
for pi := int(lastReadOffset / int64(t.UsualPieceSize())); pi < t.NumPieces() && int64(pi)*int64(t.UsualPieceSize()) < lastReadOffset+me.s.Readahead; pi++ {
if !t.wantPiece(pi) || !me.c.PeerHasPiece(pp.Integer(pi)) {
continue
}
for cs := range t.Pieces[pi].PendingChunkSpecs {
r := request{pp.Integer(pi), cs}
if _, ok := me.c.Requests[r]; ok {
continue
}
if off := t.requestOffset(r); off < lastReadOffset || off >= lastReadOffset+me.s.Readahead {
continue
}
ret = append(ret, r)
}
}
return
}
// Min-heap of int.
type intHeap []int
func (h intHeap) Len() int { return len(h) }
func (h intHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h intHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *intHeap) Push(x interface{}) { *h = append(*h, x.(int)) }
func (h *intHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}
func (me *requestFiller) readahead() bool {
rr := me.pendingReadaheadChunks()
if len(rr) == 0 {
return true
}
// Produce a partially sorted random permutation into the readahead chunks to somewhat preserve order but reducing wasted chunks due to overlap with other peers.
ii := new(intHeap)
*ii = me.s.rand.Perm(len(rr))
heap.Init(ii)
for _, i := range *ii {
if !me.conservativelyRequest(rr[i]) {
return false
}
}
return true
}
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
(requestFiller{c, t, me}).Run()
}
func (me *responsiveDownloadStrategy) TorrentGotChunk(t *torrent, req request) {