Give each connection its own piece priority order

This commit is contained in:
Matt Joiner 2014-12-01 03:36:25 -06:00
parent 0a71fe4bee
commit 78ef36066d
3 changed files with 54 additions and 118 deletions

View File

@ -836,6 +836,9 @@ func (me *Client) runConnection(sock net.Conn, torrent *torrent, discovery peerS
Port: uint16(addr.Port),
})
}
if torrent.haveInfo() {
conn.initPieceOrder(torrent.NumPieces())
}
err = me.connectionLoop(torrent, conn)
if err != nil {
err = fmt.Errorf("during Connection loop with peer %q: %s", conn.PeerID, err)
@ -1758,36 +1761,11 @@ func (me *Client) WaitAll() bool {
return true
}
func (cl *Client) assertRequestHeat() {
dds, ok := cl.downloadStrategy.(*DefaultDownloadStrategy)
if !ok {
return
}
for _, t := range cl.torrents {
m := make(map[request]int, 3000)
for _, cn := range t.Conns {
for r := range cn.Requests {
m[r]++
}
}
for r, h := range dds.heat[t] {
if m[r] != h {
panic(fmt.Sprintln(m[r], h))
}
}
}
}
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
if !t.haveInfo() {
return
}
for _, p := range me.downloadStrategy.FillRequests(t, c) {
// Make sure the state of pieces that would have been requested is
// known.
me.queueFirstHash(t, p)
}
//me.assertRequestHeat()
me.downloadStrategy.FillRequests(t, c)
if len(c.Requests) == 0 && !c.PeerChoked {
c.SetInterested(false)
}
@ -1916,11 +1894,6 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
}
}
}
// Do this even if the piece is correct because new first-hashings may
// need to be scheduled.
if conn.PeerHasPiece(piece) {
me.replenishConnRequests(t, conn)
}
}
if t.haveAllPieces() && me.noUpload {
t.CeaseNetworking()

View File

@ -8,6 +8,7 @@ import (
"expvar"
"fmt"
"io"
"math/rand"
"net"
"sync"
"time"
@ -27,13 +28,14 @@ const (
// Maintains the state of a connection with a peer.
type connection struct {
Socket net.Conn
Discovery peerSource
uTP bool
closing chan struct{}
mu sync.Mutex // Only for closing.
post chan pp.Message
writeCh chan []byte
Socket net.Conn
Discovery peerSource
uTP bool
closing chan struct{}
mu sync.Mutex // Only for closing.
post chan pp.Message
writeCh chan []byte
pieceOrder []int
UnwantedChunksReceived int
UsefulChunksReceived int
@ -109,6 +111,7 @@ func (cn *connection) piecesPeerHasCount() (count int) {
// invalid, such as by receiving badly sized BITFIELD, or invalid HAVE
// messages.
func (cn *connection) setNumPieces(num int) error {
cn.initPieceOrder(num)
if cn.PeerPieces == nil {
return nil
}
@ -131,6 +134,15 @@ func (cn *connection) setNumPieces(num int) error {
return nil
}
func (cn *connection) initPieceOrder(numPieces int) {
if cn.pieceOrder == nil {
cn.pieceOrder = rand.Perm(numPieces)
}
if len(cn.pieceOrder) != numPieces {
panic("piece order initialized with wrong length")
}
}
func eventAgeString(t time.Time) string {
if t.IsZero() {
return "never"

View File

@ -10,10 +10,8 @@ import (
)
type DownloadStrategy interface {
// Tops up the outgoing pending requests. Returns the indices of pieces
// that would be requested. This is used to determine if pieces require
// hashing so the completed state is known.
FillRequests(*torrent, *connection) (pieces []int)
// Tops up the outgoing pending requests.
FillRequests(*torrent, *connection)
TorrentStarted(*torrent)
TorrentStopped(*torrent)
DeleteRequest(*torrent, request)
@ -25,97 +23,53 @@ type DownloadStrategy interface {
PendingData(*torrent) bool
}
type DefaultDownloadStrategy struct {
heat map[*torrent]map[request]int
}
type DefaultDownloadStrategy struct{}
func (me *DefaultDownloadStrategy) PendingData(t *torrent) bool {
return !t.haveAllPieces()
}
func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {
if me.heat[t][r] != 0 {
panic("outstanding requests break invariant")
}
}
func (me *DefaultDownloadStrategy) AssertNotRequested(t *torrent, r request) {}
func (me *DefaultDownloadStrategy) WriteStatus(w io.Writer) {}
func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) {
func (s *DefaultDownloadStrategy) FillRequests(t *torrent, c *connection) {
if c.Interested {
if c.PeerChoked {
return
}
if len(c.Requests) >= (c.PeerMaxRequests+1)/2 {
if len(c.Requests) != 0 {
return
}
}
th := s.heat[t]
addRequest := func(req request) (again bool) {
piece := t.Pieces[req.Index]
if piece.Hashing || piece.QueuedForHash {
// We can't be sure we want this.
return true
}
if piece.Complete() {
// We already have this.
return true
}
if c.RequestPending(req) {
return true
}
again = c.Request(req)
if c.RequestPending(req) {
th[req]++
}
return
return c.Request(req)
}
// Then finish off incomplete pieces in order of bytes remaining.
for _, heatThreshold := range []int{1, 4, 15, 60} {
for e := t.IncompletePiecesByBytesLeft.Front(); e != nil; e = e.Next() {
pieceIndex := pp.Integer(e.Value.(int))
piece := t.Pieces[pieceIndex]
if !piece.EverHashed {
pieces = append(pieces, int(pieceIndex))
continue
}
for chunkSpec := range t.Pieces[pieceIndex].PendingChunkSpecs {
r := request{pieceIndex, chunkSpec}
if th[r] >= heatThreshold {
continue
}
if !addRequest(r) {
return
}
for i := range t.Pieces {
pieceIndex := c.pieceOrder[i]
if !c.PeerHasPiece(pp.Integer(pieceIndex)) {
continue
}
if !t.wantPiece(pieceIndex) {
continue
}
piece := t.Pieces[pieceIndex]
for _, cs := range piece.shuffledPendingChunkSpecs() {
r := request{pp.Integer(pieceIndex), cs}
if !addRequest(r) {
return
}
}
}
return
}
func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {
if s.heat[t] != nil {
panic("torrent already started")
}
if s.heat == nil {
s.heat = make(map[*torrent]map[request]int, 10)
}
s.heat[t] = make(map[request]int, t.ChunkCount())
}
func (s *DefaultDownloadStrategy) TorrentStarted(t *torrent) {}
func (s *DefaultDownloadStrategy) TorrentStopped(t *torrent) {
if _, ok := s.heat[t]; !ok {
panic("torrent not yet started")
}
delete(s.heat, t)
}
func (s *DefaultDownloadStrategy) DeleteRequest(t *torrent, r request) {
m := s.heat[t]
if m[r] <= 0 {
panic("no pending requests")
}
m[r]--
}
func (me *DefaultDownloadStrategy) TorrentGotChunk(t *torrent, c request) {}
@ -176,18 +130,10 @@ type requestFiller struct {
c *connection
t *torrent
s *responsiveDownloadStrategy
// The set of pieces that were considered for requesting.
pieces map[int]struct{}
}
// Wrapper around connection.request that tracks request heat.
func (me *requestFiller) request(req request) bool {
if me.pieces == nil {
me.pieces = make(map[int]struct{})
}
// log.Print(req)
me.pieces[int(req.Index)] = struct{}{}
if me.c.RequestPending(req) {
return true
}
@ -340,12 +286,9 @@ func (me *requestFiller) readahead() bool {
return true
}
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) (pieces []int) {
func (me *responsiveDownloadStrategy) FillRequests(t *torrent, c *connection) {
rf := requestFiller{c: c, t: t, s: me}
rf.Run()
for p := range rf.pieces {
pieces = append(pieces, p)
}
return
}
@ -385,5 +328,13 @@ func (s *responsiveDownloadStrategy) AssertNotRequested(t *torrent, r request) {
}
func (me *responsiveDownloadStrategy) PendingData(t *torrent) bool {
return len(me.FillRequests(t, me.dummyConn)) != 0
if len(me.priorities[t]) != 0 {
return true
}
for index := range t.Pieces {
if t.wantPiece(index) {
return true
}
}
return false
}