2
0
mirror of synced 2025-02-23 14:18:13 +00:00

Rework Reader waiting

This commit is contained in:
Matt Joiner 2021-09-02 20:53:49 +10:00
parent f86355ac0b
commit 175b826e73
6 changed files with 53 additions and 73 deletions

View File

@ -1097,12 +1097,12 @@ func (cl *Client) newTorrent(ih metainfo.Hash, specStorage storage.ClientImpl) (
storageOpener: storageClient,
maxEstablishedConns: cl.config.EstablishedConnsPerTorrent,
networkingEnabled: true,
metadataChanged: sync.Cond{
L: cl.locker(),
},
webSeeds: make(map[string]*Peer),
}
t.networkingEnabled.Set()
t._pendingPieces.NewSet = priorityBitmapStableNewSet
t.logger = cl.logger.WithContextValue(t)
t.setChunkSize(defaultChunkSize)
@ -1198,7 +1198,7 @@ func (t *Torrent) MergeSpec(spec *TorrentSpec) error {
}
t.addTrackers(spec.Trackers)
t.maybeNewConns()
t.dataDownloadDisallowed = spec.DisallowDataDownload
t.dataDownloadDisallowed.SetBool(spec.DisallowDataDownload)
t.dataUploadDisallowed = spec.DisallowDataUpload
return nil
}

View File

@ -52,8 +52,11 @@ func metadataPieceSize(totalSize int, piece int) int {
}
// Return the request that would include the given offset into the torrent data.
func torrentOffsetRequest(torrentLength, pieceSize, chunkSize, offset int64) (
r Request, ok bool) {
func torrentOffsetRequest(
torrentLength, pieceSize, chunkSize, offset int64,
) (
r Request, ok bool,
) {
if offset < 0 || offset >= torrentLength {
return
}

View File

@ -4,6 +4,7 @@ import (
"fmt"
"sync"
"github.com/anacrolix/chansync"
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/torrent/metainfo"
@ -21,6 +22,8 @@ type Piece struct {
// length can be determined by the request chunkSize in use.
_dirtyChunks bitmap.Bitmap
readerCond chansync.BroadcastCond
numVerifies int64
hashing bool
marking bool
@ -70,7 +73,7 @@ func (p *Piece) numDirtyChunks() pp.Integer {
func (p *Piece) unpendChunkIndex(i int) {
p._dirtyChunks.Add(bitmap.BitIndex(i))
p.t.tickleReaders()
p.readerCond.Broadcast()
}
func (p *Piece) pendChunkIndex(i int) {

View File

@ -97,11 +97,6 @@ func (r *reader) available(off, max int64) (ret int64) {
return
}
func (r *reader) waitReadable(off int64) {
// We may have been sent back here because we were told we could read but it failed.
r.t.cl.event.Wait()
}
// Calculates the pieces this reader wants downloaded, ignoring the cached value at r.pieces.
func (r *reader) piecesUncached() (ret pieceRange) {
ra := r.readahead
@ -122,27 +117,11 @@ func (r *reader) Read(b []byte) (n int, err error) {
}
func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// This is set under the Client lock if the Context is canceled. I think we coordinate on a
// separate variable so as to avoid false negatives with race conditions due to Contexts being
// synchronized.
var ctxErr error
if ctx.Done() != nil {
ctx, cancel := context.WithCancel(ctx)
// Abort the goroutine when the function returns.
defer cancel()
go func() {
<-ctx.Done()
r.t.cl.lock()
ctxErr = ctx.Err()
r.t.tickleReaders()
r.t.cl.unlock()
}()
}
// Hmmm, if a Read gets stuck, this means you can't change position for other purposes. That
// seems reasonable, but unusual.
r.opMu.Lock()
defer r.opMu.Unlock()
n, err = r.readOnceAt(b, r.pos, &ctxErr)
n, err = r.readOnceAt(ctx, b, r.pos)
if n == 0 {
if err == nil && len(b) > 0 {
panic("expected error")
@ -163,32 +142,44 @@ func (r *reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
return
}
var closedChan = make(chan struct{})
func init() {
close(closedChan)
}
// Wait until some data should be available to read. Tickles the client if it isn't. Returns how
// much should be readable without blocking.
func (r *reader) waitAvailable(pos, wanted int64, ctxErr *error, wait bool) (avail int64, err error) {
r.t.cl.lock()
defer r.t.cl.unlock()
func (r *reader) waitAvailable(ctx context.Context, pos, wanted int64, wait bool) (avail int64, err error) {
t := r.t
for {
r.t.cl.rLock()
avail = r.available(pos, wanted)
readerCond := t.piece(int((r.offset + pos) / t.info.PieceLength)).readerCond.Signaled()
r.t.cl.rUnlock()
if avail != 0 {
return
}
if r.t.closed.IsSet() {
var dontWait <-chan struct{}
if !wait || wanted == 0 {
dontWait = closedChan
}
select {
case <-r.t.closed.Done():
err = errors.New("torrent closed")
return
}
if *ctxErr != nil {
err = *ctxErr
case <-ctx.Done():
err = ctx.Err()
return
}
if r.t.dataDownloadDisallowed || !r.t.networkingEnabled {
err = errors.New("downloading disabled and data not already available")
case <-r.t.dataDownloadDisallowed.On():
err = errors.New("torrent data downloading disabled")
case <-r.t.networkingEnabled.Off():
err = errors.New("torrent networking disabled")
return
}
if !wait || wanted == 0 {
case <-dontWait:
return
case <-readerCond:
}
r.waitReadable(pos)
}
}
@ -199,14 +190,14 @@ func (r *reader) torrentOffset(readerPos int64) int64 {
}
// Performs at most one successful read to torrent storage.
func (r *reader) readOnceAt(b []byte, pos int64, ctxErr *error) (n int, err error) {
func (r *reader) readOnceAt(ctx context.Context, b []byte, pos int64) (n int, err error) {
if pos >= r.length {
err = io.EOF
return
}
for {
var avail int64
avail, err = r.waitAvailable(pos, int64(len(b)), ctxErr, n == 0)
avail, err = r.waitAvailable(ctx, pos, int64(len(b)), n == 0)
if avail == 0 {
return
}

View File

@ -13,12 +13,12 @@ import (
"net/url"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
"unsafe"
"github.com/RoaringBitmap/roaring"
"github.com/anacrolix/chansync"
"github.com/anacrolix/dht/v2"
"github.com/anacrolix/log"
"github.com/anacrolix/missinggo/iter"
@ -29,6 +29,7 @@ import (
"github.com/anacrolix/missinggo/v2/bitmap"
"github.com/anacrolix/missinggo/v2/prioritybitmap"
"github.com/anacrolix/multiless"
"github.com/anacrolix/sync"
"github.com/davecgh/go-spew/spew"
"github.com/pion/datachannel"
@ -52,12 +53,12 @@ type Torrent struct {
cl *Client
logger log.Logger
networkingEnabled bool
dataDownloadDisallowed bool
networkingEnabled chansync.Flag
dataDownloadDisallowed chansync.Flag
dataUploadDisallowed bool
userOnWriteChunkErr func(error)
closed missinggo.Event
closed chansync.SetOnce
infoHash metainfo.Hash
pieces []Piece
// Values are the piece indices that changed.
@ -192,13 +193,9 @@ func (t *Torrent) pendingPieces() *prioritybitmap.PriorityBitmap {
return &t._pendingPieces
}
func (t *Torrent) tickleReaders() {
t.cl.event.Broadcast()
}
// Returns a channel that is closed when the Torrent is closed.
func (t *Torrent) Closed() <-chan struct{} {
return t.closed.LockedChan(t.cl.locker())
func (t *Torrent) Closed() chansync.Done {
return t.closed.Done()
}
// KnownSwarm returns the known subset of the peers in the Torrent's swarm, including active,
@ -794,7 +791,6 @@ func (t *Torrent) numPiecesCompleted() (num pieceIndex) {
func (t *Torrent) close() (err error) {
t.closed.Set()
t.tickleReaders()
if t.storage != nil {
go func() {
t.storageLock.Lock()
@ -1162,7 +1158,6 @@ func (t *Torrent) pendRequest(req Request) {
}
func (t *Torrent) pieceCompletionChanged(piece pieceIndex) {
t.tickleReaders()
t.cl.event.Broadcast()
if t.pieceComplete(piece) {
t.onPieceCompleted(piece)
@ -1501,7 +1496,7 @@ func (t *Torrent) runHandshookConnLoggingErr(pc *PeerConn) {
func (t *Torrent) startWebsocketAnnouncer(u url.URL) torrentTrackerAnnouncer {
wtc, release := t.cl.websocketTrackers.Get(u.String())
go func() {
<-t.closed.LockedChan(t.cl.locker())
<-t.closed.Done()
release()
}()
wst := websocketTrackerStatus{u, wtc}
@ -1664,7 +1659,7 @@ func (t *Torrent) timeboxedAnnounceToDht(s DhtServer) error {
return err
}
select {
case <-t.closed.LockedChan(t.cl.locker()):
case <-t.closed.Done():
case <-time.After(5 * time.Minute):
}
stop()
@ -1815,7 +1810,7 @@ func (t *Torrent) addPeerConn(c *PeerConn) (err error) {
}
func (t *Torrent) wantConns() bool {
if !t.networkingEnabled {
if !t.networkingEnabled.Bool() {
return false
}
if t.closed.IsSet() {
@ -1949,6 +1944,7 @@ func (t *Torrent) cancelRequestsForPiece(piece pieceIndex) {
func (t *Torrent) onPieceCompleted(piece pieceIndex) {
t.pendAllChunkSpecs(piece)
t.cancelRequestsForPiece(piece)
t.piece(piece).readerCond.Broadcast()
for conn := range t.conns {
conn.have(piece)
t.maybeDropMutuallyCompletePeer(&conn.Peer)
@ -2137,27 +2133,15 @@ func (t *Torrent) onWriteChunkErr(err error) {
}
func (t *Torrent) DisallowDataDownload() {
t.cl.lock()
defer t.cl.unlock()
t.disallowDataDownloadLocked()
}
func (t *Torrent) disallowDataDownloadLocked() {
t.dataDownloadDisallowed = true
t.iterPeers(func(c *Peer) {
c.updateRequests()
})
t.tickleReaders()
t.dataDownloadDisallowed.Set()
}
func (t *Torrent) AllowDataDownload() {
t.cl.lock()
defer t.cl.unlock()
t.dataDownloadDisallowed = false
t.tickleReaders()
t.iterPeers(func(c *Peer) {
c.updateRequests()
})
t.dataDownloadDisallowed.Clear()
}
// Enables uploading data, if it was disabled.

View File

@ -216,7 +216,6 @@ func (me *trackerScraper) Run() {
me.t.cl.lock()
wantPeers := me.t.wantPeersEvent.C()
closed := me.t.closed.C()
me.t.cl.unlock()
// If we want peers, reduce the interval to the minimum if it's appropriate.
@ -234,7 +233,7 @@ func (me *trackerScraper) Run() {
}
select {
case <-closed:
case <-me.t.closed.Done():
return
case <-reconsider:
// Recalculate the interval.