Got a nice working algorithm for responsive download strategy
This commit is contained in:
parent
1ff7414869
commit
2b079e4a9d
|
@ -1228,6 +1228,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
// Record that we have the chunk.
|
||||
delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec)
|
||||
t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement)
|
||||
me.dataReady(dataSpec{t.InfoHash, req})
|
||||
if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 {
|
||||
me.queuePieceCheck(t, req.Index)
|
||||
}
|
||||
|
@ -1236,18 +1237,14 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
me.downloadStrategy.TorrentGotChunk(t, req)
|
||||
|
||||
// Cancel pending requests for this chunk.
|
||||
cancelled := false
|
||||
for _, c := range t.Conns {
|
||||
if me.connCancel(t, c, req) {
|
||||
cancelled = true
|
||||
me.replenishConnRequests(t, c)
|
||||
}
|
||||
}
|
||||
if cancelled {
|
||||
log.Printf("cancelled concurrent requests for %v", req)
|
||||
}
|
||||
|
||||
me.dataReady(dataSpec{t.InfoHash, req})
|
||||
me.downloadStrategy.AssertNotRequested(t, req)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -29,7 +29,7 @@ var (
|
|||
disableTrackers = flag.Bool("disableTrackers", false, "disables trackers")
|
||||
testPeer = flag.String("testPeer", "", "the address for a test peer")
|
||||
httpAddr = flag.String("httpAddr", "localhost:0", "HTTP server bind address")
|
||||
readaheadBytes = flag.Int("readaheadBytes", 10*1024*1024, "bytes to readahead in each torrent from the last read piece")
|
||||
readaheadBytes = flag.Int64("readaheadBytes", 10*1024*1024, "bytes to readahead in each torrent from the last read piece")
|
||||
testPeerAddr *net.TCPAddr
|
||||
listenAddr = flag.String("listenAddr", ":6882", "incoming connection address")
|
||||
)
|
||||
|
|
|
@ -16,12 +16,19 @@ type DownloadStrategy interface {
|
|||
TorrentGotChunk(t *torrent, r request)
|
||||
TorrentGotPiece(t *torrent, piece int)
|
||||
WriteStatus(w io.Writer)
|
||||
AssertNotRequested(*torrent, request)
|
||||
}
|
||||
|
||||
type DefaultDownloadStrategy struct {
|
||||
heat map[*torrent]map[request]int
|
||||
}
|
||||
|
||||
func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {
|
||||
if me.heat[t][r] != 0 {
|
||||
panic("outstanding requests break invariant")
|
||||
}
|
||||
}
|
||||
|
||||
func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
|
||||
|
||||
func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
|
||||
|
@ -100,20 +107,22 @@ func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {
|
|||
func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {}
|
||||
func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {}
|
||||
|
||||
func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy {
|
||||
func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy {
|
||||
return &responsiveDownloadStrategy{
|
||||
Readahead: readahead,
|
||||
lastReadPiece: make(map[*torrent]int),
|
||||
priorities: make(map[*torrent]map[request]struct{}),
|
||||
Readahead: readahead,
|
||||
lastReadOffset: make(map[*torrent]int64),
|
||||
priorities: make(map[*torrent]map[request]struct{}),
|
||||
requestHeat: make(map[*torrent]map[request]int),
|
||||
}
|
||||
}
|
||||
|
||||
type responsiveDownloadStrategy struct {
|
||||
// How many bytes to preemptively download starting at the beginning of
|
||||
// the last piece read for a given torrent.
|
||||
Readahead int
|
||||
lastReadPiece map[*torrent]int
|
||||
priorities map[*torrent]map[request]struct{}
|
||||
Readahead int64
|
||||
lastReadOffset map[*torrent]int64
|
||||
priorities map[*torrent]map[request]struct{}
|
||||
requestHeat map[*torrent]map[request]int
|
||||
}
|
||||
|
||||
func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
|
||||
|
@ -129,69 +138,69 @@ func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) {
|
|||
|
||||
func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) {
|
||||
me.priorities[t] = make(map[request]struct{})
|
||||
me.requestHeat[t] = make(map[request]int)
|
||||
}
|
||||
|
||||
func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) {
|
||||
delete(me.lastReadPiece, t)
|
||||
delete(me.lastReadOffset, t)
|
||||
delete(me.priorities, t)
|
||||
}
|
||||
func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {}
|
||||
func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) {
|
||||
rh := me.requestHeat[t]
|
||||
if rh[r] <= 0 {
|
||||
panic("request heat invariant broken")
|
||||
}
|
||||
rh[r]--
|
||||
}
|
||||
|
||||
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
|
||||
requestWrapper := func(req request) bool {
|
||||
if c.RequestPending(req) {
|
||||
return true
|
||||
}
|
||||
again := c.Request(req)
|
||||
if c.RequestPending(req) {
|
||||
me.requestHeat[t][req]++
|
||||
}
|
||||
return again
|
||||
}
|
||||
|
||||
for req := range me.priorities[t] {
|
||||
if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok {
|
||||
panic(req)
|
||||
}
|
||||
if !c.Request(req) {
|
||||
if !requestWrapper(req) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if len(c.Requests) >= 32 {
|
||||
if len(c.Requests) >= 16 {
|
||||
return
|
||||
}
|
||||
|
||||
// Short circuit request fills at a level that might reduce receiving of
|
||||
// unnecessary chunks.
|
||||
requestWrapper := func(r request) bool {
|
||||
if len(c.Requests) >= 64 {
|
||||
return false
|
||||
}
|
||||
return c.Request(r)
|
||||
}
|
||||
|
||||
requestPiece := func(piece int) bool {
|
||||
if piece >= t.NumPieces() {
|
||||
return true
|
||||
}
|
||||
for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs() {
|
||||
if !requestWrapper(request{pp.Integer(piece), cs}) {
|
||||
requestWrapper = func() func(request) bool {
|
||||
f := requestWrapper
|
||||
return func(req request) bool {
|
||||
if len(c.Requests) >= 32 {
|
||||
return false
|
||||
}
|
||||
return f(req)
|
||||
}
|
||||
return true
|
||||
}
|
||||
}()
|
||||
|
||||
if lastReadPiece, ok := me.lastReadPiece[t]; ok {
|
||||
readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize()
|
||||
for i := 0; i < readaheadPieces; i++ {
|
||||
if !requestPiece(lastReadPiece + i) {
|
||||
return
|
||||
if lastReadOffset, ok := me.lastReadOffset[t]; ok {
|
||||
for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize {
|
||||
req, ok := t.offsetRequest(off)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Then finish off incomplete pieces in order of bytes remaining.
|
||||
for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() {
|
||||
index := e.Value.(int)
|
||||
// Stop when we're onto untouched pieces.
|
||||
if !t.PiecePartiallyDownloaded(index) {
|
||||
break
|
||||
}
|
||||
// Request chunks in random order to reduce overlap with other
|
||||
// connections.
|
||||
for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() {
|
||||
if !requestWrapper(request{pp.Integer(index), cs}) {
|
||||
if me.requestHeat[t][req] >= 2 {
|
||||
continue
|
||||
}
|
||||
if !t.wantChunk(req) {
|
||||
continue
|
||||
}
|
||||
if !requestWrapper(req) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
@ -209,7 +218,7 @@ func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {
|
|||
}
|
||||
|
||||
func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {
|
||||
s.lastReadPiece[t] = int(off / int64(t.UsualPieceSize()))
|
||||
s.lastReadOffset[t] = off
|
||||
for _len > 0 {
|
||||
req, ok := t.offsetRequest(off)
|
||||
if !ok {
|
||||
|
@ -226,3 +235,9 @@ func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) {
|
||||
if s.requestHeat[t][r] != 0 {
|
||||
panic("outstanding requests invariant broken")
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue