Merge branch 'noprio2'
This commit is contained in:
commit
2a0b78e7ef
346
client.go
346
client.go
|
@ -24,7 +24,6 @@ import (
|
|||
|
||||
"github.com/anacrolix/missinggo"
|
||||
. "github.com/anacrolix/missinggo"
|
||||
"github.com/anacrolix/missinggo/perf"
|
||||
"github.com/anacrolix/missinggo/pubsub"
|
||||
"github.com/anacrolix/sync"
|
||||
"github.com/anacrolix/utp"
|
||||
|
@ -34,7 +33,6 @@ import (
|
|||
"github.com/anacrolix/torrent/bencode"
|
||||
filePkg "github.com/anacrolix/torrent/data/file"
|
||||
"github.com/anacrolix/torrent/dht"
|
||||
"github.com/anacrolix/torrent/internal/pieceordering"
|
||||
"github.com/anacrolix/torrent/iplist"
|
||||
"github.com/anacrolix/torrent/metainfo"
|
||||
"github.com/anacrolix/torrent/mse"
|
||||
|
@ -252,6 +250,7 @@ func (cl *Client) WriteStatus(_w io.Writer) {
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Make this non-blocking Read on Torrent.
|
||||
func dataReadAt(d Data, b []byte, off int64) (n int, err error) {
|
||||
// defer func() {
|
||||
// if err == io.ErrUnexpectedEOF && n != 0 {
|
||||
|
@ -276,47 +275,6 @@ func readaheadPieces(readahead, pieceLength int64) (ret int) {
|
|||
return
|
||||
}
|
||||
|
||||
func (cl *Client) readRaisePiecePriorities(t *torrent, off, readaheadBytes int64) {
|
||||
index := int(off / int64(t.usualPieceSize()))
|
||||
cl.raisePiecePriority(t, index, PiecePriorityNow)
|
||||
index++
|
||||
if index >= t.numPieces() {
|
||||
return
|
||||
}
|
||||
cl.raisePiecePriority(t, index, PiecePriorityNext)
|
||||
for range iter.N(readaheadPieces(readaheadBytes, t.Info.PieceLength)) {
|
||||
index++
|
||||
if index >= t.numPieces() {
|
||||
break
|
||||
}
|
||||
cl.raisePiecePriority(t, index, PiecePriorityReadahead)
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) addUrgentRequests(t *torrent, off int64, n int) {
|
||||
for n > 0 {
|
||||
req, ok := t.offsetRequest(off)
|
||||
if !ok {
|
||||
break
|
||||
}
|
||||
if _, ok := t.urgent[req]; !ok && !t.haveChunk(req) {
|
||||
if t.urgent == nil {
|
||||
t.urgent = make(map[request]struct{}, (n+int(t.chunkSize)-1)/int(t.chunkSize))
|
||||
}
|
||||
t.urgent[req] = struct{}{}
|
||||
cl.event.Broadcast() // Why?
|
||||
index := int(req.Index)
|
||||
cl.queueFirstHash(t, index)
|
||||
cl.pieceChanged(t, index)
|
||||
}
|
||||
reqOff := t.requestOffset(req)
|
||||
n1 := req.Length - pp.Integer(off-reqOff)
|
||||
off += int64(n1)
|
||||
n -= int(n1)
|
||||
}
|
||||
// log.Print(t.urgent)
|
||||
}
|
||||
|
||||
func (cl *Client) configDir() string {
|
||||
if cl.config.ConfigDir == "" {
|
||||
return filepath.Join(os.Getenv("HOME"), ".config/torrent")
|
||||
|
@ -330,30 +288,6 @@ func (cl *Client) ConfigDir() string {
|
|||
return cl.configDir()
|
||||
}
|
||||
|
||||
func (t *torrent) connPendPiece(c *connection, piece int) {
|
||||
c.pendPiece(piece, t.Pieces[piece].Priority, t)
|
||||
}
|
||||
|
||||
func (cl *Client) raisePiecePriority(t *torrent, piece int, priority piecePriority) {
|
||||
if t.Pieces[piece].Priority < priority {
|
||||
cl.prioritizePiece(t, piece, priority)
|
||||
}
|
||||
}
|
||||
|
||||
func (cl *Client) prioritizePiece(t *torrent, piece int, priority piecePriority) {
|
||||
if t.havePiece(piece) {
|
||||
priority = PiecePriorityNone
|
||||
}
|
||||
if priority != PiecePriorityNone {
|
||||
cl.queueFirstHash(t, piece)
|
||||
}
|
||||
p := &t.Pieces[piece]
|
||||
if p.Priority != priority {
|
||||
p.Priority = priority
|
||||
cl.pieceChanged(t, piece)
|
||||
}
|
||||
}
|
||||
|
||||
func loadPackedBlocklist(filename string) (ret iplist.Ranger, err error) {
|
||||
f, err := os.Open(filename)
|
||||
if os.IsNotExist(err) {
|
||||
|
@ -1169,9 +1103,6 @@ func (cl *Client) runHandshookConn(c *connection, t *torrent) (err error) {
|
|||
go c.writer()
|
||||
go c.writeOptimizer(time.Minute)
|
||||
cl.sendInitialMessages(c, t)
|
||||
if t.haveInfo() {
|
||||
t.initRequestOrdering(c)
|
||||
}
|
||||
err = cl.connectionLoop(t, c)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("error during connection loop: %s", err)
|
||||
|
@ -1237,26 +1168,6 @@ func (me *Client) sendInitialMessages(conn *connection, torrent *torrent) {
|
|||
}
|
||||
}
|
||||
|
||||
// Randomizes the piece order for this connection. Every connection will be
|
||||
// given a different ordering. Having it stored per connection saves having to
|
||||
// randomize during request filling, and constantly recalculate the ordering
|
||||
// based on piece priorities.
|
||||
func (t *torrent) initRequestOrdering(c *connection) {
|
||||
if c.pieceRequestOrder != nil || c.piecePriorities != nil {
|
||||
panic("double init of request ordering")
|
||||
}
|
||||
c.pieceRequestOrder = pieceordering.New()
|
||||
for i := range iter.N(t.Info.NumPieces()) {
|
||||
if !c.PeerHasPiece(i) {
|
||||
continue
|
||||
}
|
||||
if !t.wantPiece(i) {
|
||||
continue
|
||||
}
|
||||
t.connPendPiece(c, i)
|
||||
}
|
||||
}
|
||||
|
||||
func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
|
||||
if !c.peerHasAll {
|
||||
if t.haveInfo() {
|
||||
|
@ -1274,14 +1185,13 @@ func (me *Client) peerGotPiece(t *torrent, c *connection, piece int) error {
|
|||
c.PeerPieces[piece] = true
|
||||
}
|
||||
if t.wantPiece(piece) {
|
||||
t.connPendPiece(c, piece)
|
||||
me.replenishConnRequests(t, c)
|
||||
c.updateRequests()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (me *Client) peerUnchoked(torrent *torrent, conn *connection) {
|
||||
me.replenishConnRequests(torrent, conn)
|
||||
conn.updateRequests()
|
||||
}
|
||||
|
||||
func (cl *Client) connCancel(t *torrent, cn *connection, r request) (ok bool) {
|
||||
|
@ -1447,11 +1357,7 @@ func (me *Client) sendChunk(t *torrent, c *connection, r request) error {
|
|||
c.chunksSent++
|
||||
b := make([]byte, r.Length)
|
||||
tp := &t.Pieces[r.Index]
|
||||
tp.pendingWritesMutex.Lock()
|
||||
for tp.pendingWrites != 0 {
|
||||
tp.noPendingWrites.Wait()
|
||||
}
|
||||
tp.pendingWritesMutex.Unlock()
|
||||
tp.waitNoPendingWrites()
|
||||
p := t.Info.Piece(int(r.Index))
|
||||
n, err := dataReadAt(t.data, b, p.Offset()+int64(r.Begin))
|
||||
if err != nil {
|
||||
|
@ -1506,10 +1412,10 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
|
|||
me.connDeleteRequest(t, c, r)
|
||||
}
|
||||
// We can then reset our interest.
|
||||
me.replenishConnRequests(t, c)
|
||||
c.updateRequests()
|
||||
case pp.Reject:
|
||||
me.connDeleteRequest(t, c, newRequest(msg.Index, msg.Begin, msg.Length))
|
||||
me.replenishConnRequests(t, c)
|
||||
c.updateRequests()
|
||||
case pp.Unchoke:
|
||||
c.PeerChoked = false
|
||||
me.peerUnchoked(t, c)
|
||||
|
@ -1576,7 +1482,7 @@ func (me *Client) connectionLoop(t *torrent, c *connection) error {
|
|||
}
|
||||
}())
|
||||
case pp.Piece:
|
||||
err = me.downloadedChunk(t, c, &msg)
|
||||
me.downloadedChunk(t, c, &msg)
|
||||
case pp.Extended:
|
||||
switch msg.ExtendedID {
|
||||
case pp.HandshakeExtendedID:
|
||||
|
@ -1721,12 +1627,6 @@ func (me *Client) deleteConnection(t *torrent, c *connection) bool {
|
|||
func (me *Client) dropConnection(t *torrent, c *connection) {
|
||||
me.event.Broadcast()
|
||||
c.Close()
|
||||
if c.piecePriorities != nil {
|
||||
t.connPiecePriorites.Put(c.piecePriorities)
|
||||
// I wonder if it's safe to set it to nil. Probably not. Since it's
|
||||
// only read, it doesn't particularly matter if a closing connection
|
||||
// shares the slice with another connection.
|
||||
}
|
||||
if me.deleteConnection(t, c) {
|
||||
me.openNewConns(t)
|
||||
}
|
||||
|
@ -1767,6 +1667,7 @@ func (me *Client) addConnection(t *torrent, c *connection) bool {
|
|||
panic(len(t.Conns))
|
||||
}
|
||||
t.Conns = append(t.Conns, c)
|
||||
c.t = t
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -1774,16 +1675,19 @@ func (t *torrent) needData() bool {
|
|||
if !t.haveInfo() {
|
||||
return true
|
||||
}
|
||||
if len(t.urgent) != 0 {
|
||||
return true
|
||||
}
|
||||
for i := range t.Pieces {
|
||||
p := &t.Pieces[i]
|
||||
if p.Priority != PiecePriorityNone {
|
||||
for i := range t.pendingPieces {
|
||||
if t.wantPiece(i) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
return !t.forReaderOffsetPieces(func(begin, end int) (again bool) {
|
||||
for i := begin; i < end; i++ {
|
||||
if !t.pieceComplete(i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (cl *Client) usefulConn(t *torrent, c *connection) bool {
|
||||
|
@ -2048,16 +1952,6 @@ func (t Torrent) Files() (ret []File) {
|
|||
return
|
||||
}
|
||||
|
||||
// Marks the pieces in the given region for download.
|
||||
func (t Torrent) SetRegionPriority(off, len int64) {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
pieceSize := int64(t.torrent.usualPieceSize())
|
||||
for i := off / pieceSize; i*pieceSize < off+len; i++ {
|
||||
t.cl.raisePiecePriority(t.torrent, int(i), PiecePriorityNormal)
|
||||
}
|
||||
}
|
||||
|
||||
func (t Torrent) AddPeers(pp []Peer) error {
|
||||
cl := t.cl
|
||||
cl.mu.Lock()
|
||||
|
@ -2071,13 +1965,9 @@ func (t Torrent) AddPeers(pp []Peer) error {
|
|||
func (t Torrent) DownloadAll() {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
for i := range iter.N(t.torrent.numPieces()) {
|
||||
t.cl.raisePiecePriority(t.torrent, i, PiecePriorityNormal)
|
||||
for i := range iter.N(t.torrent.Info.NumPieces()) {
|
||||
t.torrent.pendPiece(i, t.cl)
|
||||
}
|
||||
// Nice to have the first and last pieces sooner for various interactive
|
||||
// purposes.
|
||||
t.cl.raisePiecePriority(t.torrent, 0, PiecePriorityReadahead)
|
||||
t.cl.raisePiecePriority(t.torrent, t.torrent.numPieces()-1, PiecePriorityReadahead)
|
||||
}
|
||||
|
||||
// Returns nil metainfo if it isn't in the cache. Checks that the retrieved
|
||||
|
@ -2474,71 +2364,15 @@ func (me *Client) WaitAll() bool {
|
|||
return true
|
||||
}
|
||||
|
||||
func (me *Client) fillRequests(t *torrent, c *connection) {
|
||||
if c.Interested {
|
||||
if c.PeerChoked {
|
||||
return
|
||||
}
|
||||
if len(c.Requests) > c.requestsLowWater {
|
||||
return
|
||||
}
|
||||
}
|
||||
addRequest := func(req request) (again bool) {
|
||||
// TODO: Couldn't this check also be done *after* the request?
|
||||
if len(c.Requests) >= 64 {
|
||||
return false
|
||||
}
|
||||
return c.Request(req)
|
||||
}
|
||||
for req := range t.urgent {
|
||||
if !addRequest(req) {
|
||||
return
|
||||
}
|
||||
}
|
||||
for e := c.pieceRequestOrder.First(); e != nil; e = e.Next() {
|
||||
pieceIndex := e.Piece()
|
||||
if !c.PeerHasPiece(pieceIndex) {
|
||||
panic("piece in request order but peer doesn't have it")
|
||||
}
|
||||
if !t.wantPiece(pieceIndex) {
|
||||
log.Printf("unwanted piece %d in connection request order\n%s", pieceIndex, c)
|
||||
c.pieceRequestOrder.DeletePiece(pieceIndex)
|
||||
continue
|
||||
}
|
||||
piece := &t.Pieces[pieceIndex]
|
||||
for _, cs := range piece.shuffledPendingChunkSpecs(t, pieceIndex) {
|
||||
r := request{pp.Integer(pieceIndex), cs}
|
||||
if !addRequest(r) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (me *Client) replenishConnRequests(t *torrent, c *connection) {
|
||||
if !t.haveInfo() {
|
||||
return
|
||||
}
|
||||
me.fillRequests(t, c)
|
||||
if len(c.Requests) == 0 && !c.PeerChoked {
|
||||
// So we're not choked, but we don't want anything right now. We may
|
||||
// have completed readahead, and the readahead window has not rolled
|
||||
// over to the next piece. Better to stay interested in case we're
|
||||
// going to want data in the near future.
|
||||
c.SetInterested(!t.haveAllPieces())
|
||||
}
|
||||
}
|
||||
|
||||
// Handle a received chunk from a peer.
|
||||
func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) error {
|
||||
func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) {
|
||||
chunksReceived.Add(1)
|
||||
|
||||
req := newRequest(msg.Index, msg.Begin, pp.Integer(len(msg.Piece)))
|
||||
|
||||
// Request has been satisfied.
|
||||
if me.connDeleteRequest(t, c, req) {
|
||||
defer me.replenishConnRequests(t, c)
|
||||
defer c.updateRequests()
|
||||
} else {
|
||||
unexpectedChunksReceived.Add(1)
|
||||
}
|
||||
|
@ -2550,7 +2384,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
if !t.wantChunk(req) {
|
||||
unwantedChunksReceived.Add(1)
|
||||
c.UnwantedChunksReceived++
|
||||
return nil
|
||||
return
|
||||
}
|
||||
|
||||
c.UsefulChunksReceived++
|
||||
|
@ -2558,59 +2392,46 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
|
|||
|
||||
me.upload(t, c)
|
||||
|
||||
piece.pendingWritesMutex.Lock()
|
||||
piece.pendingWrites++
|
||||
piece.pendingWritesMutex.Unlock()
|
||||
go func() {
|
||||
defer func() {
|
||||
piece.pendingWritesMutex.Lock()
|
||||
piece.pendingWrites--
|
||||
if piece.pendingWrites == 0 {
|
||||
piece.noPendingWrites.Broadcast()
|
||||
}
|
||||
piece.pendingWritesMutex.Unlock()
|
||||
}()
|
||||
// Write the chunk out.
|
||||
tr := perf.NewTimer()
|
||||
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
|
||||
if err != nil {
|
||||
log.Printf("error writing chunk: %s", err)
|
||||
return
|
||||
}
|
||||
tr.Stop("write chunk")
|
||||
me.mu.Lock()
|
||||
if c.peerTouchedPieces == nil {
|
||||
c.peerTouchedPieces = make(map[int]struct{})
|
||||
}
|
||||
c.peerTouchedPieces[index] = struct{}{}
|
||||
me.mu.Unlock()
|
||||
}()
|
||||
|
||||
// log.Println("got chunk", req)
|
||||
me.event.Broadcast()
|
||||
defer t.publishPieceChange(int(req.Index))
|
||||
// Need to record that it hasn't been written yet, before we attempt to do
|
||||
// anything with it.
|
||||
piece.incrementPendingWrites()
|
||||
// Record that we have the chunk.
|
||||
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
|
||||
delete(t.urgent, req)
|
||||
|
||||
// Cancel pending requests for this chunk.
|
||||
for _, c := range t.Conns {
|
||||
if me.connCancel(t, c, req) {
|
||||
c.updateRequests()
|
||||
}
|
||||
}
|
||||
|
||||
me.mu.Unlock()
|
||||
// Write the chunk out.
|
||||
err := t.writeChunk(int(msg.Index), int64(msg.Begin), msg.Piece)
|
||||
me.mu.Lock()
|
||||
|
||||
piece.decrementPendingWrites()
|
||||
|
||||
if err != nil {
|
||||
log.Printf("error writing chunk: %s", err)
|
||||
t.pendRequest(req)
|
||||
return
|
||||
}
|
||||
|
||||
// It's important that the piece is potentially queued before we check if
|
||||
// the piece is still wanted, because if it is queued, it won't be wanted.
|
||||
if t.pieceAllDirty(index) {
|
||||
me.queuePieceCheck(t, int(req.Index))
|
||||
}
|
||||
if !t.wantPiece(int(req.Index)) {
|
||||
for _, c := range t.Conns {
|
||||
c.pieceRequestOrder.DeletePiece(int(req.Index))
|
||||
}
|
||||
}
|
||||
|
||||
// Cancel pending requests for this chunk.
|
||||
for _, c := range t.Conns {
|
||||
if me.connCancel(t, c, req) {
|
||||
me.replenishConnRequests(t, c)
|
||||
}
|
||||
if c.peerTouchedPieces == nil {
|
||||
c.peerTouchedPieces = make(map[int]struct{})
|
||||
}
|
||||
c.peerTouchedPieces[index] = struct{}{}
|
||||
|
||||
return nil
|
||||
me.event.Broadcast()
|
||||
t.publishPieceChange(int(req.Index))
|
||||
return
|
||||
}
|
||||
|
||||
// Return the connections that touched a piece, and clear the entry while
|
||||
|
@ -2654,42 +2475,45 @@ func (me *Client) pieceHashed(t *torrent, piece int, correct bool) {
|
|||
me.pieceChanged(t, int(piece))
|
||||
}
|
||||
|
||||
func (me *Client) onCompletedPiece(t *torrent, piece int) {
|
||||
delete(t.pendingPieces, piece)
|
||||
for _, conn := range t.Conns {
|
||||
conn.Have(piece)
|
||||
for r := range conn.Requests {
|
||||
if int(r.Index) == piece {
|
||||
conn.Cancel(r)
|
||||
}
|
||||
}
|
||||
// Could check here if peer doesn't have piece, but due to caching
|
||||
// some peers may have said they have a piece but they don't.
|
||||
me.upload(t, conn)
|
||||
}
|
||||
}
|
||||
|
||||
func (me *Client) onFailedPiece(t *torrent, piece int) {
|
||||
if t.pieceAllDirty(piece) {
|
||||
t.pendAllChunkSpecs(piece)
|
||||
}
|
||||
if !t.wantPiece(piece) {
|
||||
return
|
||||
}
|
||||
me.openNewConns(t)
|
||||
for _, conn := range t.Conns {
|
||||
if conn.PeerHasPiece(piece) {
|
||||
conn.updateRequests()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (me *Client) pieceChanged(t *torrent, piece int) {
|
||||
correct := t.pieceComplete(piece)
|
||||
p := &t.Pieces[piece]
|
||||
defer t.publishPieceChange(piece)
|
||||
defer me.event.Broadcast()
|
||||
if correct {
|
||||
p.Priority = PiecePriorityNone
|
||||
for req := range t.urgent {
|
||||
if int(req.Index) == piece {
|
||||
delete(t.urgent, req)
|
||||
}
|
||||
}
|
||||
me.onCompletedPiece(t, piece)
|
||||
} else {
|
||||
if t.pieceAllDirty(piece) {
|
||||
t.pendAllChunkSpecs(piece)
|
||||
}
|
||||
if t.wantPiece(piece) {
|
||||
me.openNewConns(t)
|
||||
}
|
||||
me.onFailedPiece(t, piece)
|
||||
}
|
||||
for _, conn := range t.Conns {
|
||||
if correct {
|
||||
conn.Have(piece)
|
||||
for r := range conn.Requests {
|
||||
if int(r.Index) == piece {
|
||||
conn.Cancel(r)
|
||||
}
|
||||
}
|
||||
conn.pieceRequestOrder.DeletePiece(int(piece))
|
||||
me.upload(t, conn)
|
||||
} else if t.wantPiece(piece) && conn.PeerHasPiece(piece) {
|
||||
t.connPendPiece(conn, int(piece))
|
||||
me.replenishConnRequests(t, conn)
|
||||
}
|
||||
}
|
||||
me.event.Broadcast()
|
||||
}
|
||||
|
||||
func (cl *Client) verifyPiece(t *torrent, piece int) {
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"io/ioutil"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
@ -317,6 +318,12 @@ func TestClientTransfer(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func exportClientStatus(cl *Client, path string) {
|
||||
http.HandleFunc(path, func(w http.ResponseWriter, r *http.Request) {
|
||||
cl.WriteStatus(w)
|
||||
})
|
||||
}
|
||||
|
||||
// Check that after completing leeching, a leecher transitions to a seeding
|
||||
// correctly. Connected in a chain like so: Seeder <-> Leecher <-> LeecherLeecher.
|
||||
func TestSeedAfterDownloading(t *testing.T) {
|
||||
|
@ -327,12 +334,14 @@ func TestSeedAfterDownloading(t *testing.T) {
|
|||
cfg.DataDir = greetingTempDir
|
||||
seeder, err := NewClient(&cfg)
|
||||
defer seeder.Close()
|
||||
exportClientStatus(seeder, "/s")
|
||||
seeder.AddTorrentSpec(TorrentSpecFromMetaInfo(mi))
|
||||
cfg.DataDir, err = ioutil.TempDir("", "")
|
||||
require.NoError(t, err)
|
||||
defer os.RemoveAll(cfg.DataDir)
|
||||
leecher, _ := NewClient(&cfg)
|
||||
defer leecher.Close()
|
||||
exportClientStatus(leecher, "/l")
|
||||
cfg.Seed = false
|
||||
cfg.TorrentDataOpener = nil
|
||||
cfg.DataDir, err = ioutil.TempDir("", "")
|
||||
|
@ -340,6 +349,7 @@ func TestSeedAfterDownloading(t *testing.T) {
|
|||
defer os.RemoveAll(cfg.DataDir)
|
||||
leecherLeecher, _ := NewClient(&cfg)
|
||||
defer leecherLeecher.Close()
|
||||
exportClientStatus(leecherLeecher, "/ll")
|
||||
leecherGreeting, _, _ := leecher.AddTorrentSpec(func() (ret *TorrentSpec) {
|
||||
ret = TorrentSpecFromMetaInfo(mi)
|
||||
ret.ChunkSize = 2
|
||||
|
@ -361,7 +371,7 @@ func TestSeedAfterDownloading(t *testing.T) {
|
|||
defer r.Close()
|
||||
b, err := ioutil.ReadAll(r)
|
||||
require.NoError(t, err)
|
||||
require.EqualValues(t, testutil.GreetingFileContents, b)
|
||||
assert.EqualValues(t, testutil.GreetingFileContents, b)
|
||||
}()
|
||||
leecherGreeting.AddPeers([]Peer{
|
||||
Peer{
|
||||
|
@ -529,10 +539,14 @@ func TestResponsive(t *testing.T) {
|
|||
reader.SetReadahead(0)
|
||||
reader.SetResponsive()
|
||||
b := make([]byte, 2)
|
||||
_, err = reader.ReadAt(b, 3)
|
||||
_, err = reader.Seek(3, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
_, err = io.ReadFull(reader, b)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, "lo", string(b))
|
||||
n, err := reader.ReadAt(b, 11)
|
||||
_, err = reader.Seek(11, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
n, err := io.ReadFull(reader, b)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, 2, n)
|
||||
assert.EqualValues(t, "d\n", string(b))
|
||||
|
@ -571,11 +585,15 @@ func TestTorrentDroppedDuringResponsiveRead(t *testing.T) {
|
|||
reader.SetReadahead(0)
|
||||
reader.SetResponsive()
|
||||
b := make([]byte, 2)
|
||||
_, err = reader.ReadAt(b, 3)
|
||||
_, err = reader.Seek(3, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
_, err = io.ReadFull(reader, b)
|
||||
assert.Nil(t, err)
|
||||
assert.EqualValues(t, "lo", string(b))
|
||||
go leecherTorrent.Drop()
|
||||
n, err := reader.ReadAt(b, 11)
|
||||
_, err = reader.Seek(11, os.SEEK_SET)
|
||||
require.NoError(t, err)
|
||||
n, err := reader.Read(b)
|
||||
assert.EqualError(t, err, "torrent closed")
|
||||
assert.EqualValues(t, 0, n)
|
||||
}
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
"time"
|
||||
|
||||
_ "github.com/anacrolix/envpprof"
|
||||
"github.com/anacrolix/missinggo"
|
||||
"github.com/dustin/go-humanize"
|
||||
"github.com/jessevdk/go-flags"
|
||||
|
||||
|
@ -165,7 +166,7 @@ func main() {
|
|||
if file.DisplayPath() != rootGroup.Pick {
|
||||
continue
|
||||
}
|
||||
srcReader := io.NewSectionReader(t.NewReader(), file.Offset(), file.Length())
|
||||
srcReader := missinggo.NewSectionReadSeeker(t.NewReader(), file.Offset(), file.Length())
|
||||
io.Copy(dstWriter, srcReader)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -14,7 +14,6 @@ import (
|
|||
"time"
|
||||
|
||||
"github.com/anacrolix/torrent/bencode"
|
||||
"github.com/anacrolix/torrent/internal/pieceordering"
|
||||
pp "github.com/anacrolix/torrent/peer_protocol"
|
||||
)
|
||||
|
||||
|
@ -31,6 +30,7 @@ const (
|
|||
|
||||
// Maintains the state of a connection with a peer.
|
||||
type connection struct {
|
||||
t *torrent
|
||||
conn net.Conn
|
||||
rw io.ReadWriter // The real slim shady
|
||||
encrypted bool
|
||||
|
@ -41,12 +41,6 @@ type connection struct {
|
|||
post chan pp.Message
|
||||
writeCh chan []byte
|
||||
|
||||
// The connection's preferred order to download pieces. The index is the
|
||||
// piece, the value is its priority.
|
||||
piecePriorities []int
|
||||
// The piece request order based on piece priorities.
|
||||
pieceRequestOrder *pieceordering.Instance
|
||||
|
||||
UnwantedChunksReceived int
|
||||
UsefulChunksReceived int
|
||||
chunksSent int
|
||||
|
@ -105,42 +99,6 @@ func (cn *connection) localAddr() net.Addr {
|
|||
return cn.conn.LocalAddr()
|
||||
}
|
||||
|
||||
// Adjust piece position in the request order for this connection based on the
|
||||
// given piece priority.
|
||||
func (cn *connection) pendPiece(piece int, priority piecePriority, t *torrent) {
|
||||
if priority == PiecePriorityNone {
|
||||
cn.pieceRequestOrder.DeletePiece(piece)
|
||||
return
|
||||
}
|
||||
if cn.piecePriorities == nil {
|
||||
cn.piecePriorities = t.newConnPiecePriorities()
|
||||
}
|
||||
pp := cn.piecePriorities[piece]
|
||||
// Priority regions not to scale. Within each region, piece is randomized
|
||||
// according to connection.
|
||||
|
||||
// <-request first -- last->
|
||||
// [ Now ]
|
||||
// [ Next ]
|
||||
// [ Readahead ]
|
||||
// [ Normal ]
|
||||
key := func() int {
|
||||
switch priority {
|
||||
case PiecePriorityNow:
|
||||
return -3*len(cn.piecePriorities) + 3*pp
|
||||
case PiecePriorityNext:
|
||||
return -2*len(cn.piecePriorities) + 2*pp
|
||||
case PiecePriorityReadahead:
|
||||
return -len(cn.piecePriorities) + pp
|
||||
case PiecePriorityNormal:
|
||||
return pp
|
||||
default:
|
||||
panic(priority)
|
||||
}
|
||||
}()
|
||||
cn.pieceRequestOrder.SetPiece(piece, key)
|
||||
}
|
||||
|
||||
func (cn *connection) supportsExtension(ext string) bool {
|
||||
_, ok := cn.PeerExtensionIDs[ext]
|
||||
return ok
|
||||
|
@ -577,3 +535,49 @@ func (cn *connection) Bitfield(haves []bool) {
|
|||
})
|
||||
cn.sentHaves = haves
|
||||
}
|
||||
|
||||
func (c *connection) updateRequests() {
|
||||
if !c.t.haveInfo() {
|
||||
return
|
||||
}
|
||||
if c.Interested {
|
||||
if c.PeerChoked {
|
||||
return
|
||||
}
|
||||
if len(c.Requests) > c.requestsLowWater {
|
||||
return
|
||||
}
|
||||
}
|
||||
c.fillRequests()
|
||||
if len(c.Requests) == 0 && !c.PeerChoked {
|
||||
// So we're not choked, but we don't want anything right now. We may
|
||||
// have completed readahead, and the readahead window has not rolled
|
||||
// over to the next piece. Better to stay interested in case we're
|
||||
// going to want data in the near future.
|
||||
c.SetInterested(!c.t.haveAllPieces())
|
||||
}
|
||||
}
|
||||
|
||||
func (c *connection) fillRequests() {
|
||||
if !c.t.forUrgentPieces(func(piece int) (again bool) {
|
||||
return c.t.connRequestPiecePendingChunks(c, piece)
|
||||
}) {
|
||||
return
|
||||
}
|
||||
c.t.forReaderOffsetPieces(func(begin, end int) (again bool) {
|
||||
for i := begin + 1; i < end; i++ {
|
||||
if !c.t.connRequestPiecePendingChunks(c, i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
for i := range c.t.pendingPieces {
|
||||
if !c.t.wantPiece(i) {
|
||||
continue
|
||||
}
|
||||
if !c.t.connRequestPiecePendingChunks(c, i) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,10 +4,6 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bradfitz/iter"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/anacrolix/torrent/internal/pieceordering"
|
||||
"github.com/anacrolix/torrent/peer_protocol"
|
||||
)
|
||||
|
||||
|
@ -52,49 +48,3 @@ func TestCancelRequestOptimized(t *testing.T) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
func pieceOrderingAsSlice(po *pieceordering.Instance) (ret []int) {
|
||||
for e := po.First(); e != nil; e = e.Next() {
|
||||
ret = append(ret, e.Piece())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func testRequestOrder(expected []int, ro *pieceordering.Instance, t *testing.T) {
|
||||
assert.EqualValues(t, pieceOrderingAsSlice(ro), expected)
|
||||
}
|
||||
|
||||
// Tests the request ordering based on a connections priorities.
|
||||
func TestPieceRequestOrder(t *testing.T) {
|
||||
c := connection{
|
||||
pieceRequestOrder: pieceordering.New(),
|
||||
piecePriorities: []int{1, 4, 0, 3, 2},
|
||||
}
|
||||
testRequestOrder(nil, c.pieceRequestOrder, t)
|
||||
c.pendPiece(2, PiecePriorityNone, nil)
|
||||
testRequestOrder(nil, c.pieceRequestOrder, t)
|
||||
c.pendPiece(1, PiecePriorityNormal, nil)
|
||||
c.pendPiece(2, PiecePriorityNormal, nil)
|
||||
testRequestOrder([]int{2, 1}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(0, PiecePriorityNormal, nil)
|
||||
testRequestOrder([]int{2, 0, 1}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(1, PiecePriorityReadahead, nil)
|
||||
testRequestOrder([]int{1, 2, 0}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(4, PiecePriorityNow, nil)
|
||||
// now(4), r(1), normal(0, 2)
|
||||
testRequestOrder([]int{4, 1, 2, 0}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(2, PiecePriorityReadahead, nil)
|
||||
// N(4), R(1, 2), N(0)
|
||||
testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
|
||||
c.pendPiece(1, PiecePriorityNow, nil)
|
||||
// now(4, 1), readahead(2), normal(0)
|
||||
// in the same order, the keys will be: -15+6, -15+12, -5, 1
|
||||
// so we test that a very low priority (for this connection), "now"
|
||||
// piece has been placed after a readahead piece.
|
||||
testRequestOrder([]int{4, 2, 1, 0}, c.pieceRequestOrder, t)
|
||||
// Note this intentially sets to None a piece that's not in the order.
|
||||
for i := range iter.N(5) {
|
||||
c.pendPiece(i, PiecePriorityNone, nil)
|
||||
}
|
||||
testRequestOrder(nil, c.pieceRequestOrder, t)
|
||||
}
|
||||
|
|
|
@ -458,7 +458,9 @@ func (s *Server) query(node dHTAddr, q string, a map[string]interface{}, onRespo
|
|||
return
|
||||
}
|
||||
s.getNode(node, "").lastSentQuery = time.Now()
|
||||
t.mu.Lock()
|
||||
t.startTimer()
|
||||
t.mu.Unlock()
|
||||
s.addTransaction(t)
|
||||
return
|
||||
}
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
package torrent_test
|
||||
|
||||
import (
|
||||
"io"
|
||||
"log"
|
||||
|
||||
"github.com/anacrolix/missinggo"
|
||||
|
||||
"github.com/anacrolix/torrent"
|
||||
)
|
||||
|
||||
|
@ -26,5 +27,5 @@ func Example_fileReader() {
|
|||
defer r.Close()
|
||||
// Access the parts of the torrent pertaining to f. Data will be
|
||||
// downloaded as required, per the configuration of the torrent.Reader.
|
||||
_ = io.NewSectionReader(r, f.Offset(), f.Length())
|
||||
_ = missinggo.NewSectionReadSeeker(r, f.Offset(), f.Length())
|
||||
}
|
||||
|
|
13
file.go
13
file.go
|
@ -75,15 +75,6 @@ func (f *File) State() (ret []FilePieceState) {
|
|||
return
|
||||
}
|
||||
|
||||
// Marks pieces in the region of the file for download. This is a helper
|
||||
// wrapping Torrent.SetRegionPriority.
|
||||
func (f *File) PrioritizeRegion(off, len int64) {
|
||||
if off < 0 || off >= f.length {
|
||||
return
|
||||
}
|
||||
if off+len > f.length {
|
||||
len = f.length - off
|
||||
}
|
||||
off += f.offset
|
||||
f.t.SetRegionPriority(off, len)
|
||||
func (f *File) Download() {
|
||||
f.t.DownloadPieces(f.t.torrent.byteRegionPieces(f.offset, f.length))
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package torrentfs
|
|||
import (
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"strings"
|
||||
|
@ -82,7 +83,11 @@ func blockingRead(ctx context.Context, fs *TorrentFS, t torrent.Torrent, off int
|
|||
go func() {
|
||||
r := t.NewReader()
|
||||
defer r.Close()
|
||||
_n, _err = r.ReadAt(p, off)
|
||||
_, _err = r.Seek(off, os.SEEK_SET)
|
||||
if _err != nil {
|
||||
return
|
||||
}
|
||||
_n, _err = io.ReadFull(r, p)
|
||||
close(readDone)
|
||||
}()
|
||||
select {
|
||||
|
|
|
@ -1,129 +0,0 @@
|
|||
// Implements ordering of torrent piece indices for such purposes as download
|
||||
// prioritization.
|
||||
package pieceordering
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
||||
"github.com/ryszard/goskiplist/skiplist"
|
||||
)
|
||||
|
||||
// Maintains piece integers by their ascending assigned keys.
|
||||
type Instance struct {
|
||||
// Contains the ascending priority keys. The keys contain a slice of piece
|
||||
// indices.
|
||||
sl *skiplist.SkipList
|
||||
// Maps from piece index back to its key, so that it can be remove
|
||||
// efficiently from the skip list.
|
||||
pieceKeys map[int]int
|
||||
}
|
||||
|
||||
func New() *Instance {
|
||||
return &Instance{
|
||||
sl: skiplist.NewIntMap(),
|
||||
}
|
||||
}
|
||||
|
||||
// Add the piece with the given key. If the piece is already present, change
|
||||
// its key.
|
||||
func (me *Instance) SetPiece(piece, key int) {
|
||||
if existingKey, ok := me.pieceKeys[piece]; ok {
|
||||
if existingKey == key {
|
||||
return
|
||||
}
|
||||
me.removeKeyPiece(existingKey, piece)
|
||||
}
|
||||
var itemSl []int
|
||||
if exItem, ok := me.sl.Get(key); ok {
|
||||
itemSl = exItem.([]int)
|
||||
}
|
||||
me.sl.Set(key, append(itemSl, piece))
|
||||
if me.pieceKeys == nil {
|
||||
me.pieceKeys = make(map[int]int)
|
||||
}
|
||||
me.pieceKeys[piece] = key
|
||||
me.shuffleItem(key)
|
||||
}
|
||||
|
||||
// Shuffle the piece indices that share a given key.
|
||||
func (me *Instance) shuffleItem(key int) {
|
||||
_item, ok := me.sl.Get(key)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
item := _item.([]int)
|
||||
for i := range item {
|
||||
j := i + rand.Intn(len(item)-i)
|
||||
item[i], item[j] = item[j], item[i]
|
||||
}
|
||||
me.sl.Set(key, item)
|
||||
}
|
||||
|
||||
func (me *Instance) removeKeyPiece(key, piece int) {
|
||||
item, ok := me.sl.Get(key)
|
||||
if !ok {
|
||||
panic("no item for key")
|
||||
}
|
||||
itemSl := item.([]int)
|
||||
for i, piece1 := range itemSl {
|
||||
if piece1 == piece {
|
||||
itemSl[i] = itemSl[len(itemSl)-1]
|
||||
itemSl = itemSl[:len(itemSl)-1]
|
||||
break
|
||||
}
|
||||
}
|
||||
if len(itemSl) == 0 {
|
||||
me.sl.Delete(key)
|
||||
} else {
|
||||
me.sl.Set(key, itemSl)
|
||||
}
|
||||
}
|
||||
|
||||
func (me *Instance) DeletePiece(piece int) {
|
||||
key, ok := me.pieceKeys[piece]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
me.removeKeyPiece(key, piece)
|
||||
delete(me.pieceKeys, piece)
|
||||
}
|
||||
|
||||
// Returns the piece with the lowest key.
|
||||
func (me *Instance) First() Element {
|
||||
i := me.sl.SeekToFirst()
|
||||
if i == nil {
|
||||
return nil
|
||||
}
|
||||
return &element{i, i.Value().([]int)}
|
||||
}
|
||||
|
||||
func (me *Instance) Empty() bool {
|
||||
return me.sl.Len() == 0
|
||||
}
|
||||
|
||||
type Element interface {
|
||||
Piece() int
|
||||
Next() Element
|
||||
}
|
||||
|
||||
type element struct {
|
||||
i skiplist.Iterator
|
||||
sl []int
|
||||
}
|
||||
|
||||
func (e *element) Next() Element {
|
||||
e.sl = e.sl[1:]
|
||||
if len(e.sl) > 0 {
|
||||
return e
|
||||
}
|
||||
ok := e.i.Next()
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
e.sl = e.i.Value().([]int)
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *element) Piece() int {
|
||||
return e.sl[0]
|
||||
}
|
|
@ -1,109 +0,0 @@
|
|||
package pieceordering
|
||||
|
||||
import (
|
||||
"sort"
|
||||
"testing"
|
||||
|
||||
"github.com/bradfitz/iter"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func instanceSlice(i *Instance) (sl []int) {
|
||||
for e := i.First(); e != nil; e = e.Next() {
|
||||
sl = append(sl, e.Piece())
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func sameContents(a, b []int) bool {
|
||||
if len(a) != len(b) {
|
||||
panic("y u pass different length slices")
|
||||
}
|
||||
sort.IntSlice(a).Sort()
|
||||
sort.IntSlice(b).Sort()
|
||||
for i := range a {
|
||||
if a[i] != b[i] {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func checkOrder(t testing.TB, i *Instance, ppp ...[]int) {
|
||||
fatal := func() {
|
||||
t.Fatalf("have %v, expected %v", instanceSlice(i), ppp)
|
||||
}
|
||||
e := i.First()
|
||||
for _, pp := range ppp {
|
||||
var pp_ []int
|
||||
for len(pp_) != len(pp) {
|
||||
pp_ = append(pp_, e.Piece())
|
||||
e = e.Next()
|
||||
}
|
||||
if !sameContents(pp, pp_) {
|
||||
fatal()
|
||||
}
|
||||
}
|
||||
if e != nil {
|
||||
fatal()
|
||||
}
|
||||
}
|
||||
|
||||
func testPieceOrdering(t testing.TB) {
|
||||
i := New()
|
||||
assert.True(t, i.Empty())
|
||||
i.SetPiece(0, 1)
|
||||
assert.False(t, i.Empty())
|
||||
i.SetPiece(1, 0)
|
||||
checkOrder(t, i, []int{1, 0})
|
||||
i.SetPiece(1, 2)
|
||||
checkOrder(t, i, []int{0, 1})
|
||||
i.DeletePiece(1)
|
||||
checkOrder(t, i, []int{0})
|
||||
i.DeletePiece(2)
|
||||
i.DeletePiece(1)
|
||||
checkOrder(t, i, []int{0})
|
||||
i.DeletePiece(0)
|
||||
assert.True(t, i.Empty())
|
||||
checkOrder(t, i, nil)
|
||||
i.SetPiece(2, 1)
|
||||
assert.False(t, i.Empty())
|
||||
i.SetPiece(1, 1)
|
||||
i.SetPiece(3, 1)
|
||||
checkOrder(t, i, []int{3, 1, 2})
|
||||
// Move a piece that isn't the youngest in a key.
|
||||
i.SetPiece(1, -1)
|
||||
checkOrder(t, i, []int{1}, []int{3, 2})
|
||||
i.DeletePiece(2)
|
||||
i.DeletePiece(3)
|
||||
i.DeletePiece(1)
|
||||
assert.True(t, i.Empty())
|
||||
checkOrder(t, i, nil)
|
||||
// Deleting pieces that aren't present.
|
||||
i.DeletePiece(2)
|
||||
i.DeletePiece(3)
|
||||
i.DeletePiece(1)
|
||||
assert.True(t, i.Empty())
|
||||
checkOrder(t, i, nil)
|
||||
}
|
||||
|
||||
func TestPieceOrdering(t *testing.T) {
|
||||
testPieceOrdering(t)
|
||||
}
|
||||
|
||||
func BenchmarkPieceOrdering(b *testing.B) {
|
||||
for range iter.N(b.N) {
|
||||
testPieceOrdering(b)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkIteration(b *testing.B) {
|
||||
for range iter.N(b.N) {
|
||||
i := New()
|
||||
for p := range iter.N(500) {
|
||||
i.SetPiece(p, p)
|
||||
}
|
||||
for e := i.First(); e != nil; e = e.Next() {
|
||||
}
|
||||
}
|
||||
}
|
34
piece.go
34
piece.go
|
@ -30,7 +30,6 @@ type piece struct {
|
|||
Hashing bool
|
||||
QueuedForHash bool
|
||||
EverHashed bool
|
||||
Priority piecePriority
|
||||
PublicPieceState PieceState
|
||||
|
||||
pendingWritesMutex sync.Mutex
|
||||
|
@ -62,6 +61,13 @@ func (p *piece) unpendChunkIndex(i int) {
|
|||
p.DirtyChunks[i] = true
|
||||
}
|
||||
|
||||
func (p *piece) pendChunkIndex(i int) {
|
||||
if i >= len(p.DirtyChunks) {
|
||||
return
|
||||
}
|
||||
p.DirtyChunks[i] = false
|
||||
}
|
||||
|
||||
func chunkIndexSpec(index int, pieceLength, chunkSize pp.Integer) chunkSpec {
|
||||
ret := chunkSpec{pp.Integer(index) * chunkSize, chunkSize}
|
||||
if ret.Begin+ret.Length > pieceLength {
|
||||
|
@ -93,3 +99,29 @@ func (p *piece) shuffledPendingChunkSpecs(t *torrent, piece int) (css []chunkSpe
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (p *piece) incrementPendingWrites() {
|
||||
p.pendingWritesMutex.Lock()
|
||||
p.pendingWrites++
|
||||
p.pendingWritesMutex.Unlock()
|
||||
}
|
||||
|
||||
func (p *piece) decrementPendingWrites() {
|
||||
p.pendingWritesMutex.Lock()
|
||||
if p.pendingWrites == 0 {
|
||||
panic("assertion")
|
||||
}
|
||||
p.pendingWrites--
|
||||
if p.pendingWrites == 0 {
|
||||
p.noPendingWrites.Broadcast()
|
||||
}
|
||||
p.pendingWritesMutex.Unlock()
|
||||
}
|
||||
|
||||
func (p *piece) waitNoPendingWrites() {
|
||||
p.pendingWritesMutex.Lock()
|
||||
for p.pendingWrites != 0 {
|
||||
p.noPendingWrites.Wait()
|
||||
}
|
||||
p.pendingWritesMutex.Unlock()
|
||||
}
|
||||
|
|
62
reader.go
62
reader.go
|
@ -4,11 +4,14 @@ import (
|
|||
"errors"
|
||||
"io"
|
||||
"os"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// Accesses torrent data via a client.
|
||||
type Reader struct {
|
||||
t *Torrent
|
||||
t *Torrent
|
||||
|
||||
mu sync.Mutex
|
||||
pos int64
|
||||
responsive bool
|
||||
readahead int64
|
||||
|
@ -25,18 +28,11 @@ func (r *Reader) SetResponsive() {
|
|||
// Configure the number of bytes ahead of a read that should also be
|
||||
// prioritized in preparation for further reads.
|
||||
func (r *Reader) SetReadahead(readahead int64) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
r.readahead = readahead
|
||||
}
|
||||
|
||||
func (r *Reader) raisePriorities(off int64, n int) {
|
||||
if r.responsive {
|
||||
r.t.cl.addUrgentRequests(r.t.torrent, off, n)
|
||||
}
|
||||
if !r.responsive || r.readahead != 0 {
|
||||
r.t.cl.readRaisePiecePriorities(r.t.torrent, off, int64(n)+r.readahead)
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reader) readable(off int64) (ret bool) {
|
||||
// log.Println("readable", off)
|
||||
// defer func() {
|
||||
|
@ -77,26 +73,26 @@ func (r *Reader) available(off, max int64) (ret int64) {
|
|||
return
|
||||
}
|
||||
|
||||
func (r *Reader) tickleClient() {
|
||||
r.t.torrent.readersChanged(r.t.cl)
|
||||
}
|
||||
|
||||
func (r *Reader) waitReadable(off int64) {
|
||||
// We may have been sent back here because we were told we could read but
|
||||
// it failed.
|
||||
r.tickleClient()
|
||||
r.t.cl.event.Wait()
|
||||
}
|
||||
|
||||
func (r *Reader) ReadAt(b []byte, off int64) (n int, err error) {
|
||||
for {
|
||||
var n1 int
|
||||
n1, err = r.readAt(b, off)
|
||||
n += n1
|
||||
b = b[n1:]
|
||||
off += int64(n1)
|
||||
if err != nil || len(b) == 0 || n1 == 0 {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (r *Reader) Read(b []byte) (n int, err error) {
|
||||
n, err = r.readAt(b, r.pos)
|
||||
r.mu.Lock()
|
||||
pos := r.pos
|
||||
r.mu.Unlock()
|
||||
n, err = r.readAt(b, pos)
|
||||
r.mu.Lock()
|
||||
r.pos += int64(n)
|
||||
r.mu.Unlock()
|
||||
r.posChanged()
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -115,9 +111,7 @@ func (r *Reader) readAt(b []byte, pos int64) (n int, err error) {
|
|||
}
|
||||
again:
|
||||
r.t.cl.mu.Lock()
|
||||
r.raisePriorities(pos, len(b))
|
||||
for !r.readable(pos) {
|
||||
r.raisePriorities(pos, len(b))
|
||||
r.waitReadable(pos)
|
||||
}
|
||||
avail := r.available(pos, int64(len(b)))
|
||||
|
@ -131,11 +125,7 @@ again:
|
|||
if int64(len(b1)) > ip.Length()-po {
|
||||
b1 = b1[:ip.Length()-po]
|
||||
}
|
||||
tp.pendingWritesMutex.Lock()
|
||||
for tp.pendingWrites != 0 {
|
||||
tp.noPendingWrites.Wait()
|
||||
}
|
||||
tp.pendingWritesMutex.Unlock()
|
||||
tp.waitNoPendingWrites()
|
||||
n, err = dataReadAt(r.t.torrent.data, b1, pos)
|
||||
if n != 0 {
|
||||
err = nil
|
||||
|
@ -154,11 +144,19 @@ again:
|
|||
}
|
||||
|
||||
func (r *Reader) Close() error {
|
||||
r.t.deleteReader(r)
|
||||
r.t = nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reader) posChanged() {
|
||||
r.t.cl.mu.Lock()
|
||||
defer r.t.cl.mu.Unlock()
|
||||
r.t.torrent.readersChanged(r.t.cl)
|
||||
}
|
||||
|
||||
func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
|
||||
r.mu.Lock()
|
||||
switch whence {
|
||||
case os.SEEK_SET:
|
||||
r.pos = off
|
||||
|
@ -170,5 +168,7 @@ func (r *Reader) Seek(off int64, whence int) (ret int64, err error) {
|
|||
err = errors.New("bad whence")
|
||||
}
|
||||
ret = r.pos
|
||||
r.mu.Unlock()
|
||||
r.posChanged()
|
||||
return
|
||||
}
|
||||
|
|
29
t.go
29
t.go
|
@ -34,13 +34,13 @@ func (t Torrent) Info() *metainfo.Info {
|
|||
}
|
||||
|
||||
// Returns a Reader bound to the torrent's data. All read calls block until
|
||||
// the data requested is actually available. Priorities are set to ensure the
|
||||
// data requested will be downloaded as soon as possible.
|
||||
// the data requested is actually available.
|
||||
func (t Torrent) NewReader() (ret *Reader) {
|
||||
ret = &Reader{
|
||||
t: &t,
|
||||
readahead: 5 * 1024 * 1024,
|
||||
}
|
||||
t.addReader(ret)
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -119,3 +119,28 @@ func (t Torrent) MetaInfo() *metainfo.MetaInfo {
|
|||
defer t.cl.mu.Unlock()
|
||||
return t.torrent.MetaInfo()
|
||||
}
|
||||
|
||||
func (t Torrent) addReader(r *Reader) {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
if t.torrent.readers == nil {
|
||||
t.torrent.readers = make(map[*Reader]struct{})
|
||||
}
|
||||
t.torrent.readers[r] = struct{}{}
|
||||
t.torrent.readersChanged(t.cl)
|
||||
}
|
||||
|
||||
func (t Torrent) deleteReader(r *Reader) {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
delete(t.torrent.readers, r)
|
||||
t.torrent.readersChanged(t.cl)
|
||||
}
|
||||
|
||||
func (t Torrent) DownloadPieces(begin, end int) {
|
||||
t.cl.mu.Lock()
|
||||
defer t.cl.mu.Unlock()
|
||||
for i := begin; i < end; i++ {
|
||||
t.torrent.pendPiece(i, t.cl)
|
||||
}
|
||||
}
|
||||
|
|
222
torrent.go
222
torrent.go
|
@ -6,13 +6,13 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"math/rand"
|
||||
"net"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/anacrolix/missinggo"
|
||||
"github.com/anacrolix/missinggo/perf"
|
||||
"github.com/anacrolix/missinggo/pubsub"
|
||||
"github.com/bradfitz/iter"
|
||||
|
||||
|
@ -62,9 +62,6 @@ type torrent struct {
|
|||
// Values are the piece indices that changed.
|
||||
pieceStateChanges *pubsub.PubSub
|
||||
chunkSize pp.Integer
|
||||
// Chunks that are wanted before all others. This is for
|
||||
// responsive/streaming readers that want to unblock ASAP.
|
||||
urgent map[request]struct{}
|
||||
// Total length of the torrent in bytes. Stored because it's not O(1) to
|
||||
// get this from the info dict.
|
||||
length int64
|
||||
|
@ -99,7 +96,9 @@ type torrent struct {
|
|||
// Closed when .Info is set.
|
||||
gotMetainfo chan struct{}
|
||||
|
||||
connPiecePriorites sync.Pool
|
||||
readers map[*Reader]struct{}
|
||||
|
||||
pendingPieces map[int]struct{}
|
||||
}
|
||||
|
||||
var (
|
||||
|
@ -111,16 +110,6 @@ func (t *torrent) setDisplayName(dn string) {
|
|||
t.displayName = dn
|
||||
}
|
||||
|
||||
func (t *torrent) newConnPiecePriorities() []int {
|
||||
_ret := t.connPiecePriorites.Get()
|
||||
if _ret != nil {
|
||||
piecePrioritiesReused.Add(1)
|
||||
return _ret.([]int)
|
||||
}
|
||||
piecePrioritiesNew.Add(1)
|
||||
return rand.Perm(t.numPieces())
|
||||
}
|
||||
|
||||
func (t *torrent) pieceComplete(piece int) bool {
|
||||
// TODO: This is called when setting metadata, and before storage is
|
||||
// assigned, which doesn't seem right.
|
||||
|
@ -261,7 +250,6 @@ func (t *torrent) setMetadata(md *metainfo.Info, infoBytes []byte) (err error) {
|
|||
missinggo.CopyExact(piece.Hash[:], hash)
|
||||
}
|
||||
for _, conn := range t.Conns {
|
||||
t.initRequestOrdering(conn)
|
||||
if err := conn.setNumPieces(t.numPieces()); err != nil {
|
||||
log.Printf("closing connection: %s", err)
|
||||
conn.Close()
|
||||
|
@ -324,7 +312,7 @@ func (t *torrent) Name() string {
|
|||
|
||||
func (t *torrent) pieceState(index int) (ret PieceState) {
|
||||
p := &t.Pieces[index]
|
||||
ret.Priority = p.Priority
|
||||
ret.Priority = t.piecePriority(index)
|
||||
if t.pieceComplete(index) {
|
||||
ret.Complete = true
|
||||
}
|
||||
|
@ -435,10 +423,11 @@ func (t *torrent) writeStatus(w io.Writer, cl *Client) {
|
|||
}
|
||||
fmt.Fprintln(w)
|
||||
}
|
||||
fmt.Fprintf(w, "Urgent:")
|
||||
for req := range t.urgent {
|
||||
fmt.Fprintf(w, " %v", req)
|
||||
}
|
||||
fmt.Fprintf(w, "Reader Pieces:")
|
||||
t.forReaderOffsetPieces(func(begin, end int) (again bool) {
|
||||
fmt.Fprintf(w, " %d:%d", begin, end)
|
||||
return true
|
||||
})
|
||||
fmt.Fprintln(w)
|
||||
fmt.Fprintf(w, "Trackers: ")
|
||||
for _, tier := range t.Trackers {
|
||||
|
@ -470,7 +459,7 @@ func (t *torrent) String() string {
|
|||
}
|
||||
|
||||
func (t *torrent) haveInfo() bool {
|
||||
return t != nil && t.Info != nil
|
||||
return t.Info != nil
|
||||
}
|
||||
|
||||
// TODO: Include URIs that weren't converted to tracker clients.
|
||||
|
@ -579,10 +568,14 @@ func (t *torrent) offsetRequest(off int64) (req request, ok bool) {
|
|||
}
|
||||
|
||||
func (t *torrent) writeChunk(piece int, begin int64, data []byte) (err error) {
|
||||
tr := perf.NewTimer()
|
||||
n, err := t.data.WriteAt(data, int64(piece)*t.Info.PieceLength+begin)
|
||||
if err == nil && n != len(data) {
|
||||
err = io.ErrShortWrite
|
||||
}
|
||||
if err == nil {
|
||||
tr.Stop("write chunk")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
|
@ -657,11 +650,7 @@ func (t *torrent) pieceLength(piece int) (len_ pp.Integer) {
|
|||
func (t *torrent) hashPiece(piece int) (ps pieceSum) {
|
||||
hash := pieceHash.New()
|
||||
p := &t.Pieces[piece]
|
||||
p.pendingWritesMutex.Lock()
|
||||
for p.pendingWrites != 0 {
|
||||
p.noPendingWrites.Wait()
|
||||
}
|
||||
p.pendingWritesMutex.Unlock()
|
||||
p.waitNoPendingWrites()
|
||||
pl := t.Info.Piece(int(piece)).Length()
|
||||
n, err := t.data.WriteSectionTo(hash, int64(piece)*t.Info.PieceLength, pl)
|
||||
if err != nil {
|
||||
|
@ -728,17 +717,8 @@ func (t *torrent) wantChunk(r request) bool {
|
|||
if t.Pieces[r.Index].pendingChunk(r.chunkSpec, t.chunkSize) {
|
||||
return true
|
||||
}
|
||||
_, ok := t.urgent[r]
|
||||
return ok
|
||||
}
|
||||
|
||||
func (t *torrent) urgentChunkInPiece(piece int) bool {
|
||||
p := pp.Integer(piece)
|
||||
for req := range t.urgent {
|
||||
if req.Index == p {
|
||||
return true
|
||||
}
|
||||
}
|
||||
// TODO: What about pieces that were wanted, but aren't now, and aren't
|
||||
// completed either? That used to be done here.
|
||||
return false
|
||||
}
|
||||
|
||||
|
@ -754,18 +734,42 @@ func (t *torrent) wantPiece(index int) bool {
|
|||
if p.Hashing {
|
||||
return false
|
||||
}
|
||||
if p.Priority == PiecePriorityNone {
|
||||
if !t.urgentChunkInPiece(index) {
|
||||
return false
|
||||
}
|
||||
if t.pieceComplete(index) {
|
||||
return false
|
||||
}
|
||||
// Put piece complete check last, since it's the slowest as it can involve
|
||||
// calling out into external data stores.
|
||||
return !t.pieceComplete(index)
|
||||
if _, ok := t.pendingPieces[index]; ok {
|
||||
return true
|
||||
}
|
||||
return !t.forReaderOffsetPieces(func(begin, end int) bool {
|
||||
return index < begin || index >= end
|
||||
})
|
||||
}
|
||||
|
||||
func (t *torrent) forNeededPieces(f func(piece int) (more bool)) (all bool) {
|
||||
return t.forReaderOffsetPieces(func(begin, end int) (more bool) {
|
||||
for i := begin; begin < end; i++ {
|
||||
if !f(i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (t *torrent) connHasWantedPieces(c *connection) bool {
|
||||
return c.pieceRequestOrder != nil && !c.pieceRequestOrder.Empty()
|
||||
for i := range t.pendingPieces {
|
||||
if c.PeerHasPiece(i) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return !t.forReaderOffsetPieces(func(begin, end int) (again bool) {
|
||||
for i := begin; i < end; i++ {
|
||||
if c.PeerHasPiece(i) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (t *torrent) extentPieces(off, _len int64) (pieces []int) {
|
||||
|
@ -803,6 +807,9 @@ func (t *torrent) publishPieceChange(piece int) {
|
|||
}
|
||||
|
||||
func (t *torrent) pieceNumPendingChunks(piece int) int {
|
||||
if t.pieceComplete(piece) {
|
||||
return 0
|
||||
}
|
||||
return t.pieceNumChunks(piece) - t.Pieces[piece].numDirtyChunks()
|
||||
}
|
||||
|
||||
|
@ -818,3 +825,126 @@ func (t *torrent) pieceAllDirty(piece int) bool {
|
|||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *torrent) forUrgentPieces(f func(piece int) (again bool)) (all bool) {
|
||||
return t.forReaderOffsetPieces(func(begin, end int) (again bool) {
|
||||
if begin < end {
|
||||
if !f(begin) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (t *torrent) readersChanged(cl *Client) {
|
||||
// Accept new connections.
|
||||
cl.event.Broadcast()
|
||||
for _, c := range t.Conns {
|
||||
c.updateRequests()
|
||||
}
|
||||
cl.openNewConns(t)
|
||||
}
|
||||
|
||||
func (t *torrent) byteRegionPieces(off, size int64) (begin, end int) {
|
||||
if off >= t.length {
|
||||
return
|
||||
}
|
||||
if off < 0 {
|
||||
size += off
|
||||
off = 0
|
||||
}
|
||||
if size <= 0 {
|
||||
return
|
||||
}
|
||||
begin = int(off / t.Info.PieceLength)
|
||||
end = int((off + size + t.Info.PieceLength - 1) / t.Info.PieceLength)
|
||||
if end > t.Info.NumPieces() {
|
||||
end = t.Info.NumPieces()
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Returns true if all iterations complete without breaking.
|
||||
func (t *torrent) forReaderOffsetPieces(f func(begin, end int) (more bool)) (all bool) {
|
||||
for r := range t.readers {
|
||||
r.mu.Lock()
|
||||
pos, readahead := r.pos, r.readahead
|
||||
r.mu.Unlock()
|
||||
if readahead < 1 {
|
||||
readahead = 1
|
||||
}
|
||||
begin, end := t.byteRegionPieces(pos, readahead)
|
||||
if begin >= end {
|
||||
continue
|
||||
}
|
||||
if !f(begin, end) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *torrent) piecePriority(piece int) (ret piecePriority) {
|
||||
ret = PiecePriorityNone
|
||||
if t.pieceComplete(piece) {
|
||||
return
|
||||
}
|
||||
if _, ok := t.pendingPieces[piece]; ok {
|
||||
ret = PiecePriorityNormal
|
||||
}
|
||||
raiseRet := func(prio piecePriority) {
|
||||
if prio > ret {
|
||||
ret = prio
|
||||
}
|
||||
}
|
||||
t.forReaderOffsetPieces(func(begin, end int) (again bool) {
|
||||
if piece == begin {
|
||||
raiseRet(PiecePriorityNow)
|
||||
}
|
||||
if begin <= piece && piece < end {
|
||||
raiseRet(PiecePriorityReadahead)
|
||||
}
|
||||
return true
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
func (t *torrent) pendPiece(piece int, cl *Client) {
|
||||
if t.pendingPieces == nil {
|
||||
t.pendingPieces = make(map[int]struct{}, t.Info.NumPieces())
|
||||
}
|
||||
if _, ok := t.pendingPieces[piece]; ok {
|
||||
return
|
||||
}
|
||||
if t.havePiece(piece) {
|
||||
return
|
||||
}
|
||||
t.pendingPieces[piece] = struct{}{}
|
||||
for _, c := range t.Conns {
|
||||
if !c.PeerHasPiece(piece) {
|
||||
continue
|
||||
}
|
||||
c.updateRequests()
|
||||
}
|
||||
cl.openNewConns(t)
|
||||
cl.pieceChanged(t, piece)
|
||||
}
|
||||
|
||||
func (t *torrent) connRequestPiecePendingChunks(c *connection, piece int) (more bool) {
|
||||
if !c.PeerHasPiece(piece) {
|
||||
return true
|
||||
}
|
||||
for _, cs := range t.Pieces[piece].shuffledPendingChunkSpecs(t, piece) {
|
||||
req := request{pp.Integer(piece), cs}
|
||||
if !c.Request(req) {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *torrent) pendRequest(req request) {
|
||||
ci := chunkIndex(req.chunkSpec, t.chunkSize)
|
||||
t.Pieces[req.Index].pendChunkIndex(ci)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue