2014-06-30 00:22:05 +10:00
package torrent
import (
2014-08-28 10:04:00 +10:00
"container/heap"
2014-08-22 17:33:17 +10:00
"fmt"
"io"
2014-08-28 10:04:00 +10:00
"math/rand"
2014-08-21 21:08:56 +10:00
pp "bitbucket.org/anacrolix/go.torrent/peer_protocol"
2014-06-30 00:22:05 +10:00
)
type DownloadStrategy interface {
2014-08-25 22:14:10 +10:00
FillRequests ( * torrent , * connection )
TorrentStarted ( * torrent )
TorrentStopped ( * torrent )
DeleteRequest ( * torrent , request )
2014-07-24 13:42:31 +10:00
TorrentPrioritize ( t * torrent , off , _len int64 )
2014-08-25 22:14:10 +10:00
TorrentGotChunk ( * torrent , request )
2014-07-24 13:42:31 +10:00
TorrentGotPiece ( t * torrent , piece int )
2014-08-22 17:33:17 +10:00
WriteStatus ( w io . Writer )
2014-08-24 03:08:11 +10:00
AssertNotRequested ( * torrent , request )
2014-06-30 00:22:05 +10:00
}
type DefaultDownloadStrategy struct {
heat map [ * torrent ] map [ request ] int
}
2014-08-24 03:08:11 +10:00
func ( me * DefaultDownloadStrategy ) AssertNotRequested ( t * torrent , r request ) {
if me . heat [ t ] [ r ] != 0 {
panic ( "outstanding requests break invariant" )
}
}
2014-08-22 17:33:17 +10:00
func ( me * DefaultDownloadStrategy ) WriteStatus ( w io . Writer ) { }
2014-06-30 00:22:05 +10:00
func ( s * DefaultDownloadStrategy ) FillRequests ( t * torrent , c * connection ) {
2014-07-01 00:03:07 +10:00
if c . Interested {
if c . PeerChoked {
return
}
2014-07-16 17:11:45 +10:00
if len ( c . Requests ) >= ( c . PeerMaxRequests + 1 ) / 2 {
2014-07-01 00:03:07 +10:00
return
}
}
2014-06-30 00:22:05 +10:00
th := s . heat [ t ]
addRequest := func ( req request ) ( again bool ) {
piece := t . Pieces [ req . Index ]
if piece . Hashing || piece . QueuedForHash {
// We can't be sure we want this.
return true
}
if piece . Complete ( ) {
// We already have this.
return true
}
if c . RequestPending ( req ) {
return true
}
again = c . Request ( req )
if c . RequestPending ( req ) {
th [ req ] ++
}
return
}
// Then finish off incomplete pieces in order of bytes remaining.
2014-07-16 17:11:45 +10:00
for _ , heatThreshold := range [ ] int { 1 , 4 , 15 , 60 } {
2014-08-25 05:31:34 +10:00
for e := t . IncompletePiecesByBytesLeft . Front ( ) ; e != nil ; e = e . Next ( ) {
2014-07-10 00:26:58 +10:00
pieceIndex := pp . Integer ( e . Value . ( int ) )
2014-08-28 10:04:00 +10:00
for chunkSpec := range t . Pieces [ pieceIndex ] . PendingChunkSpecs {
2014-06-30 00:22:05 +10:00
r := request { pieceIndex , chunkSpec }
2014-07-16 17:11:45 +10:00
if th [ r ] >= heatThreshold {
2014-06-30 00:22:05 +10:00
continue
}
2014-07-10 00:26:58 +10:00
if ! addRequest ( r ) {
2014-06-30 00:22:05 +10:00
return
}
}
}
}
}
func ( s * DefaultDownloadStrategy ) TorrentStarted ( t * torrent ) {
if s . heat [ t ] != nil {
panic ( "torrent already started" )
}
if s . heat == nil {
s . heat = make ( map [ * torrent ] map [ request ] int , 10 )
}
s . heat [ t ] = make ( map [ request ] int , t . ChunkCount ( ) )
}
func ( s * DefaultDownloadStrategy ) TorrentStopped ( t * torrent ) {
if _ , ok := s . heat [ t ] ; ! ok {
panic ( "torrent not yet started" )
}
delete ( s . heat , t )
}
func ( s * DefaultDownloadStrategy ) DeleteRequest ( t * torrent , r request ) {
m := s . heat [ t ]
if m [ r ] <= 0 {
panic ( "no pending requests" )
}
m [ r ] --
}
2014-07-24 13:42:31 +10:00
func ( me * DefaultDownloadStrategy ) TorrentGotChunk ( t * torrent , c request ) { }
func ( me * DefaultDownloadStrategy ) TorrentGotPiece ( t * torrent , piece int ) { }
func ( * DefaultDownloadStrategy ) TorrentPrioritize ( t * torrent , off , _len int64 ) { }
2014-08-24 03:08:11 +10:00
func NewResponsiveDownloadStrategy ( readahead int64 ) * responsiveDownloadStrategy {
2014-07-24 13:42:31 +10:00
return & responsiveDownloadStrategy {
2014-08-24 03:08:11 +10:00
Readahead : readahead ,
lastReadOffset : make ( map [ * torrent ] int64 ) ,
priorities : make ( map [ * torrent ] map [ request ] struct { } ) ,
requestHeat : make ( map [ * torrent ] map [ request ] int ) ,
2014-08-28 10:04:00 +10:00
rand : rand . New ( rand . NewSource ( 1337 ) ) ,
2014-07-24 13:42:31 +10:00
}
}
type responsiveDownloadStrategy struct {
2014-06-30 00:22:05 +10:00
// How many bytes to preemptively download starting at the beginning of
// the last piece read for a given torrent.
2014-08-24 03:08:11 +10:00
Readahead int64
lastReadOffset map [ * torrent ] int64
priorities map [ * torrent ] map [ request ] struct { }
requestHeat map [ * torrent ] map [ request ] int
2014-08-28 10:04:00 +10:00
rand * rand . Rand // Avoid global lock
2014-08-22 17:33:17 +10:00
}
func ( me * responsiveDownloadStrategy ) WriteStatus ( w io . Writer ) {
fmt . Fprintf ( w , "Priorities:\n" )
for t , pp := range me . priorities {
fmt . Fprintf ( w , "\t%s:" , t . Name ( ) )
for r := range pp {
2014-08-25 22:14:10 +10:00
fmt . Fprintf ( w , " %v" , r )
2014-08-22 17:33:17 +10:00
}
fmt . Fprintln ( w )
}
2014-06-30 00:22:05 +10:00
}
2014-07-24 13:42:31 +10:00
func ( me * responsiveDownloadStrategy ) TorrentStarted ( t * torrent ) {
2014-08-22 17:35:15 +10:00
me . priorities [ t ] = make ( map [ request ] struct { } )
2014-08-24 03:08:11 +10:00
me . requestHeat [ t ] = make ( map [ request ] int )
2014-07-24 13:42:31 +10:00
}
func ( me * responsiveDownloadStrategy ) TorrentStopped ( t * torrent ) {
2014-08-24 03:08:11 +10:00
delete ( me . lastReadOffset , t )
2014-07-24 13:42:31 +10:00
delete ( me . priorities , t )
}
2014-08-24 03:08:11 +10:00
func ( me * responsiveDownloadStrategy ) DeleteRequest ( t * torrent , r request ) {
rh := me . requestHeat [ t ]
if rh [ r ] <= 0 {
panic ( "request heat invariant broken" )
}
rh [ r ] --
}
2014-06-30 00:22:05 +10:00
2014-08-28 10:04:00 +10:00
type requestFiller struct {
c * connection
t * torrent
s * responsiveDownloadStrategy
}
// Wrapper around connection.request that tracks request heat.
func ( me * requestFiller ) request ( req request ) bool {
if me . c . RequestPending ( req ) {
return true
}
again := me . c . Request ( req )
if me . c . RequestPending ( req ) {
me . s . requestHeat [ me . t ] [ req ] ++
}
return again
}
// Adds additional constraints around the request heat wrapper.
func ( me * requestFiller ) conservativelyRequest ( req request ) bool {
again := me . request ( req )
if len ( me . c . Requests ) >= 50 {
return false
2014-08-24 03:08:11 +10:00
}
2014-08-28 10:04:00 +10:00
return again
}
2014-08-24 03:08:11 +10:00
2014-08-28 10:04:00 +10:00
// Fill priority requests.
func ( me * requestFiller ) priorities ( ) bool {
for req := range me . s . priorities [ me . t ] {
if _ , ok := me . t . Pieces [ req . Index ] . PendingChunkSpecs [ req . chunkSpec ] ; ! ok {
2014-08-22 17:35:15 +10:00
panic ( req )
}
2014-08-28 10:04:00 +10:00
if ! me . request ( req ) {
return false
2014-08-22 17:35:15 +10:00
}
}
2014-08-28 10:04:00 +10:00
return true
}
2014-08-22 17:35:15 +10:00
2014-08-28 10:04:00 +10:00
// Fill requests, with all contextual information available in the receiver.
func ( me requestFiller ) Run ( ) {
if ! me . priorities ( ) {
2014-07-24 13:42:31 +10:00
return
}
2014-08-28 10:04:00 +10:00
if len ( me . c . Requests ) > 25 {
return
}
if ! me . readahead ( ) {
return
}
if len ( me . c . Requests ) > 0 {
return
}
me . completePartial ( )
}
2014-07-24 13:42:31 +10:00
2014-08-28 10:04:00 +10:00
// Request partial pieces that aren't in the readahead zone.
func ( me * requestFiller ) completePartial ( ) bool {
t := me . t
th := me . s . requestHeat [ t ]
for e := t . IncompletePiecesByBytesLeft . Front ( ) ; e != nil ; e = e . Next ( ) {
p := e . Value . ( int )
if ! t . PiecePartiallyDownloaded ( p ) && int ( t . PieceLength ( pp . Integer ( p ) ) ) == t . UsualPieceSize ( ) {
break
2014-06-30 00:22:05 +10:00
}
2014-08-28 10:04:00 +10:00
if lastReadOffset , ok := me . s . lastReadOffset [ t ] ; ok {
if p >= int ( lastReadOffset / int64 ( t . UsualPieceSize ( ) ) ) {
if int64 ( p + 1 ) * int64 ( t . UsualPieceSize ( ) ) < lastReadOffset + me . s . Readahead {
continue
2014-08-25 05:31:34 +10:00
}
2014-08-28 10:04:00 +10:00
}
}
for chunkSpec := range t . Pieces [ p ] . PendingChunkSpecs {
r := request { pp . Integer ( p ) , chunkSpec }
if th [ r ] >= 1 {
2014-08-25 05:31:34 +10:00
continue
}
2014-08-28 10:04:00 +10:00
if ! me . conservativelyRequest ( r ) {
return false
2014-06-30 00:22:05 +10:00
}
}
}
2014-08-28 10:04:00 +10:00
return true
}
2014-08-25 05:31:34 +10:00
2014-08-28 10:04:00 +10:00
// Returns all wanted chunk specs in the readahead zone.
func ( me * requestFiller ) pendingReadaheadChunks ( ) ( ret [ ] request ) {
t := me . t
lastReadOffset , ok := me . s . lastReadOffset [ t ]
if ! ok {
return
}
ret = make ( [ ] request , 0 , ( me . s . Readahead + chunkSize - 1 ) / chunkSize )
for pi := int ( lastReadOffset / int64 ( t . UsualPieceSize ( ) ) ) ; pi < t . NumPieces ( ) && int64 ( pi ) * int64 ( t . UsualPieceSize ( ) ) < lastReadOffset + me . s . Readahead ; pi ++ {
if ! t . wantPiece ( pi ) || ! me . c . PeerHasPiece ( pp . Integer ( pi ) ) {
continue
2014-08-25 05:31:34 +10:00
}
2014-08-28 10:04:00 +10:00
for cs := range t . Pieces [ pi ] . PendingChunkSpecs {
r := request { pp . Integer ( pi ) , cs }
if _ , ok := me . c . Requests [ r ] ; ok {
2014-08-25 05:31:34 +10:00
continue
}
2014-08-28 10:04:00 +10:00
if off := t . requestOffset ( r ) ; off < lastReadOffset || off >= lastReadOffset + me . s . Readahead {
continue
2014-08-25 05:31:34 +10:00
}
2014-08-28 10:04:00 +10:00
ret = append ( ret , r )
2014-08-25 05:31:34 +10:00
}
}
2014-08-28 10:04:00 +10:00
return
}
// Min-heap of int.
type intHeap [ ] int
func ( h intHeap ) Len ( ) int { return len ( h ) }
func ( h intHeap ) Less ( i , j int ) bool { return h [ i ] < h [ j ] }
func ( h intHeap ) Swap ( i , j int ) { h [ i ] , h [ j ] = h [ j ] , h [ i ] }
func ( h * intHeap ) Push ( x interface { } ) { * h = append ( * h , x . ( int ) ) }
func ( h * intHeap ) Pop ( ) interface { } {
old := * h
n := len ( old )
x := old [ n - 1 ]
* h = old [ 0 : n - 1 ]
return x
}
func ( me * requestFiller ) readahead ( ) bool {
rr := me . pendingReadaheadChunks ( )
if len ( rr ) == 0 {
return true
}
// Produce a partially sorted random permutation into the readahead chunks to somewhat preserve order but reducing wasted chunks due to overlap with other peers.
ii := new ( intHeap )
* ii = me . s . rand . Perm ( len ( rr ) )
heap . Init ( ii )
for _ , i := range * ii {
if ! me . conservativelyRequest ( rr [ i ] ) {
return false
}
}
return true
}
func ( me * responsiveDownloadStrategy ) FillRequests ( t * torrent , c * connection ) {
( requestFiller { c , t , me } ) . Run ( )
2014-06-30 00:22:05 +10:00
}
2014-07-24 13:42:31 +10:00
func ( me * responsiveDownloadStrategy ) TorrentGotChunk ( t * torrent , req request ) {
2014-08-22 17:35:15 +10:00
delete ( me . priorities [ t ] , req )
2014-07-24 13:42:31 +10:00
}
func ( me * responsiveDownloadStrategy ) TorrentGotPiece ( t * torrent , piece int ) {
2014-08-22 17:35:15 +10:00
for _ , cs := range t . pieceChunks ( piece ) {
delete ( me . priorities [ t ] , request { pp . Integer ( piece ) , cs } )
2014-07-24 13:42:31 +10:00
}
}
func ( s * responsiveDownloadStrategy ) TorrentPrioritize ( t * torrent , off , _len int64 ) {
2014-08-24 03:08:11 +10:00
s . lastReadOffset [ t ] = off
2014-07-24 13:42:31 +10:00
for _len > 0 {
req , ok := t . offsetRequest ( off )
if ! ok {
panic ( "bad offset" )
}
reqOff := t . requestOffset ( req )
// Gain the alignment adjustment.
_len += off - reqOff
// Lose the length of this block.
_len -= int64 ( req . Length )
off = reqOff + int64 ( req . Length )
if t . wantChunk ( req ) {
2014-08-22 17:35:15 +10:00
s . priorities [ t ] [ req ] = struct { } { }
2014-07-24 13:42:31 +10:00
}
}
}
2014-08-24 03:08:11 +10:00
func ( s * responsiveDownloadStrategy ) AssertNotRequested ( t * torrent , r request ) {
if s . requestHeat [ t ] [ r ] != 0 {
panic ( "outstanding requests invariant broken" )
}
}