Allow subscribing to torrent piece state changes

This commit is contained in:
Matt Joiner 2015-09-06 12:33:22 +10:00
parent 5ea0c26717
commit eebd09c0fe
5 changed files with 36 additions and 4 deletions

View File

@ -1945,7 +1945,8 @@ func newTorrent(ih InfoHash) (t *torrent, err error) {
gotMetainfo: make(chan struct{}),
HalfOpen: make(map[string]struct{}),
HalfOpen: make(map[string]struct{}),
pieceStateChanges: pubsub.NewPubSub(),
}
t.wantPeers.L = &t.stateMu
return
@ -2587,6 +2588,7 @@ func (me *Client) downloadedChunk(t *torrent, c *connection, msg *pp.Message) er
// log.Println("got chunk", req)
piece.Event.Broadcast()
defer t.publishPieceChange(int(req.Index))
// Record that we have the chunk.
piece.unpendChunkIndex(chunkIndex(req.chunkSpec, t.chunkSize))
delete(t.urgent, req)
@ -2656,6 +2658,7 @@ func (me *Client) pieceHashed(t *torrent, piece pp.Integer, correct bool) {
func (me *Client) pieceChanged(t *torrent, piece int) {
correct := t.pieceComplete(piece)
p := t.Pieces[piece]
defer t.publishPieceChange(piece)
defer p.Event.Broadcast()
if correct {
p.Priority = PiecePriorityNone

View File

@ -274,6 +274,16 @@ func TestClientTransfer(t *testing.T) {
ret.ChunkSize = 2
return
}())
// TODO: The piece state publishing is kinda jammed in here until I have a
// more thorough test.
go func() {
s := leecherGreeting.pieceStateChanges.Subscribe()
defer s.Close()
for i := range s.Values {
log.Print(i)
}
log.Print("finished")
}()
leecherGreeting.AddPeers([]Peer{
Peer{
IP: missinggo.AddrIP(seeder.ListenAddr()),

View File

@ -30,6 +30,7 @@ type piece struct {
EverHashed bool
Event sync.Cond
Priority piecePriority
PublicPieceState PieceState
pendingWritesMutex sync.Mutex
pendingWrites int

5
t.go
View File

@ -1,6 +1,7 @@
package torrent
import (
"github.com/anacrolix/missinggo/pubsub"
"github.com/anacrolix/torrent/metainfo"
)
@ -68,3 +69,7 @@ func (t Torrent) BytesCompleted() int64 {
defer t.cl.mu.RUnlock()
return t.bytesCompleted()
}
func (t Torrent) SubscribePieceStateChanges() *pubsub.Subscription {
return t.torrent.pieceStateChanges.Subscribe()
}

View File

@ -11,6 +11,7 @@ import (
"time"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/pubsub"
"github.com/bradfitz/iter"
"github.com/anacrolix/torrent/bencode"
@ -60,9 +61,11 @@ type torrent struct {
// announcing, and communicating with peers.
ceasingNetworking chan struct{}
InfoHash InfoHash
Pieces []*piece
chunkSize pp.Integer
InfoHash InfoHash
Pieces []*piece
// Values are the piece indices that changed.
pieceStateChanges *pubsub.PubSub
chunkSize pp.Integer
// Chunks that are wanted before all others. This is for
// responsive/streaming readers that want to unblock ASAP.
urgent map[request]struct{}
@ -540,6 +543,7 @@ func (t *torrent) close() (err error) {
for _, conn := range t.Conns {
conn.Close()
}
t.pieceStateChanges.Close()
return
}
@ -761,3 +765,12 @@ func (t *torrent) worstBadConn(cl *Client) *connection {
}
return nil
}
func (t *torrent) publishPieceChange(piece int) {
cur := t.pieceState(piece)
p := t.Pieces[piece]
if cur != p.PublicPieceState {
t.pieceStateChanges.Publish(piece)
}
p.PublicPieceState = cur
}