From 2b079e4a9dc808f3058fcbde76fd38ab9fef6c21 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 24 Aug 2014 03:08:11 +1000 Subject: [PATCH] Got a nice working algorithm for responsive download strategy --- client.go | 9 ++-- cmd/torrentfs/main.go | 2 +- download_strategies.go | 111 +++++++++++++++++++++++------------------ 3 files changed, 67 insertions(+), 55 deletions(-) diff --git a/client.go b/client.go index b1dc099e..7a11c24a 100644 --- a/client.go +++ b/client.go @@ -1228,6 +1228,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er // Record that we have the chunk. delete(t.Pieces[req.Index].PendingChunkSpecs, req.chunkSpec) t.PiecesByBytesLeft.ValueChanged(t.Pieces[req.Index].bytesLeftElement) + me.dataReady(dataSpec{t.InfoHash, req}) if len(t.Pieces[req.Index].PendingChunkSpecs) == 0 { me.queuePieceCheck(t, req.Index) } @@ -1236,18 +1237,14 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er me.downloadStrategy.TorrentGotChunk(t, req) // Cancel pending requests for this chunk. - cancelled := false for _, c := range t.Conns { if me.connCancel(t, c, req) { - cancelled = true me.replenishConnRequests(t, c) } } - if cancelled { - log.Printf("cancelled concurrent requests for %v", req) - } - me.dataReady(dataSpec{t.InfoHash, req}) + me.downloadStrategy.AssertNotRequested(t, req) + return nil } diff --git a/cmd/torrentfs/main.go b/cmd/torrentfs/main.go index 1b3d9797..d542c717 100644 --- a/cmd/torrentfs/main.go +++ b/cmd/torrentfs/main.go @@ -29,7 +29,7 @@ var ( disableTrackers = flag.Bool("disableTrackers", false, "disables trackers") testPeer = flag.String("testPeer", "", "the address for a test peer") httpAddr = flag.String("httpAddr", "localhost:0", "HTTP server bind address") - readaheadBytes = flag.Int("readaheadBytes", 10*1024*1024, "bytes to readahead in each torrent from the last read piece") + readaheadBytes = flag.Int64("readaheadBytes", 10*1024*1024, "bytes to readahead in each torrent from the last read piece") testPeerAddr *net.TCPAddr listenAddr = flag.String("listenAddr", ":6882", "incoming connection address") ) diff --git a/download_strategies.go b/download_strategies.go index 476f607c..db31bc3a 100644 --- a/download_strategies.go +++ b/download_strategies.go @@ -16,12 +16,19 @@ type DownloadStrategy interface { TorrentGotChunk(t *torrent, r request) TorrentGotPiece(t *torrent, piece int) WriteStatus(w io.Writer) + AssertNotRequested(*torrent, request) } type DefaultDownloadStrategy struct { heat map[*torrent]map[request]int } +func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) { + if me.heat[t][r] != 0 { + panic("outstanding requests break invariant") + } +} + func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {} func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) { @@ -100,20 +107,22 @@ func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) { func (me *DefaultDownloadStrategy) TorrentGotPiece(t *torrent, piece int) {} func (*DefaultDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) {} -func NewResponsiveDownloadStrategy(readahead int) *responsiveDownloadStrategy { +func NewResponsiveDownloadStrategy(readahead int64) *responsiveDownloadStrategy { return &responsiveDownloadStrategy{ - Readahead: readahead, - lastReadPiece: make(map[*torrent]int), - priorities: make(map[*torrent]map[request]struct{}), + Readahead: readahead, + lastReadOffset: make(map[*torrent]int64), + priorities: make(map[*torrent]map[request]struct{}), + requestHeat: make(map[*torrent]map[request]int), } } type responsiveDownloadStrategy struct { // How many bytes to preemptively download starting at the beginning of // the last piece read for a given torrent. - Readahead int - lastReadPiece map[*torrent]int - priorities map[*torrent]map[request]struct{} + Readahead int64 + lastReadOffset map[*torrent]int64 + priorities map[*torrent]map[request]struct{} + requestHeat map[*torrent]map[request]int } func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) { @@ -129,69 +138,69 @@ func (me *responsiveDownloadStrategy) WriteStatus(w io.Writer) { func (me *responsiveDownloadStrategy) TorrentStarted(t *torrent) { me.priorities[t] = make(map[request]struct{}) + me.requestHeat[t] = make(map[request]int) } func (me *responsiveDownloadStrategy) TorrentStopped(t *torrent) { - delete(me.lastReadPiece, t) + delete(me.lastReadOffset, t) delete(me.priorities, t) } -func (responsiveDownloadStrategy) DeleteRequest(*torrent, request) {} +func (me *responsiveDownloadStrategy) DeleteRequest(t *torrent, r request) { + rh := me.requestHeat[t] + if rh[r] <= 0 { + panic("request heat invariant broken") + } + rh[r]-- +} func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) { + requestWrapper := func(req request) bool { + if c.RequestPending(req) { + return true + } + again := c.Request(req) + if c.RequestPending(req) { + me.requestHeat[t][req]++ + } + return again + } + for req := range me.priorities[t] { if _, ok := t.Pieces[req.Index].PendingChunkSpecs[req.chunkSpec]; !ok { panic(req) } - if !c.Request(req) { + if !requestWrapper(req) { return } } - if len(c.Requests) >= 32 { + if len(c.Requests) >= 16 { return } - // Short circuit request fills at a level that might reduce receiving of - // unnecessary chunks. - requestWrapper := func(r request) bool { - if len(c.Requests) >= 64 { - return false - } - return c.Request(r) - } - - requestPiece := func(piece int) bool { - if piece >= t.NumPieces() { - return true - } - for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs() { - if !requestWrapper(request{pp.Integer(piece), cs}) { + requestWrapper = func() func(request) bool { + f := requestWrapper + return func(req request) bool { + if len(c.Requests) >= 32 { return false } + return f(req) } - return true - } + }() - if lastReadPiece, ok := me.lastReadPiece[t]; ok { - readaheadPieces := (me.Readahead + t.UsualPieceSize() - 1) / t.UsualPieceSize() - for i := 0; i < readaheadPieces; i++ { - if !requestPiece(lastReadPiece + i) { - return + if lastReadOffset, ok := me.lastReadOffset[t]; ok { + for off := lastReadOffset; off < lastReadOffset+chunkSize-1+me.Readahead; off += chunkSize { + req, ok := t.offsetRequest(off) + if !ok { + break } - } - } - - // Then finish off incomplete pieces in order of bytes remaining. - for e := t.PiecesByBytesLeft.Front(); e != nil; e = e.Next() { - index := e.Value.(int) - // Stop when we're onto untouched pieces. - if !t.PiecePartiallyDownloaded(index) { - break - } - // Request chunks in random order to reduce overlap with other - // connections. - for _, cs := range t.Pieces[index].shuffledPendingChunkSpecs() { - if !requestWrapper(request{pp.Integer(index), cs}) { + if me.requestHeat[t][req] >= 2 { + continue + } + if !t.wantChunk(req) { + continue + } + if !requestWrapper(req) { return } } @@ -209,7 +218,7 @@ func (me *responsiveDownloadStrategy) TorrentGotPiece(t *torrent, piece int) { } func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int64) { - s.lastReadPiece[t] = int(off / int64(t.UsualPieceSize())) + s.lastReadOffset[t] = off for _len > 0 { req, ok := t.offsetRequest(off) if !ok { @@ -226,3 +235,9 @@ func (s *responsiveDownloadStrategy) TorrentPrioritize(t *torrent, off, _len int } } } + +func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) { + if s.requestHeat[t][r] != 0 { + panic("outstanding requests invariant broken") + } +}