Maintain a torrent.Reader for each file handle
This means that readahead will work much better. Addresses https://github.com/anacrolix/torrent/issues/182.
This commit is contained in:
parent
58d66a1b62
commit
7d55f573f5
|
@ -2,7 +2,11 @@ package torrentfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"io"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/anacrolix/missinggo"
|
||||||
|
"github.com/anacrolix/torrent"
|
||||||
|
|
||||||
"bazil.org/fuse"
|
"bazil.org/fuse"
|
||||||
"bazil.org/fuse/fs"
|
"bazil.org/fuse/fs"
|
||||||
|
@ -10,34 +14,63 @@ import (
|
||||||
|
|
||||||
type fileHandle struct {
|
type fileHandle struct {
|
||||||
fn fileNode
|
fn fileNode
|
||||||
|
r *torrent.Reader
|
||||||
}
|
}
|
||||||
|
|
||||||
var _ fs.HandleReader = fileHandle{}
|
var _ interface {
|
||||||
|
fs.HandleReader
|
||||||
|
fs.HandleReleaser
|
||||||
|
} = fileHandle{}
|
||||||
|
|
||||||
func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
|
||||||
torrentfsReadRequests.Add(1)
|
torrentfsReadRequests.Add(1)
|
||||||
if req.Dir {
|
if req.Dir {
|
||||||
panic("read on directory")
|
panic("read on directory")
|
||||||
}
|
}
|
||||||
size := req.Size
|
pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET)
|
||||||
fileLeft := int64(me.fn.size) - req.Offset
|
|
||||||
if fileLeft < 0 {
|
|
||||||
fileLeft = 0
|
|
||||||
}
|
|
||||||
if fileLeft < int64(size) {
|
|
||||||
size = int(fileLeft)
|
|
||||||
}
|
|
||||||
resp.Data = resp.Data[:size]
|
|
||||||
if len(resp.Data) == 0 {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
torrentOff := me.fn.TorrentOffset + req.Offset
|
|
||||||
n, err := readFull(ctx, me.fn.FS, me.fn.t, torrentOff, resp.Data)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
panic(err)
|
||||||
}
|
}
|
||||||
if n != size {
|
if pos != me.fn.TorrentOffset+req.Offset {
|
||||||
panic(fmt.Sprintf("%d < %d", n, size))
|
panic("seek failed")
|
||||||
|
}
|
||||||
|
resp.Data = resp.Data[:req.Size]
|
||||||
|
readDone := make(chan struct{})
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
|
var readErr error
|
||||||
|
go func() {
|
||||||
|
defer close(readDone)
|
||||||
|
me.fn.FS.mu.Lock()
|
||||||
|
me.fn.FS.blockedReads++
|
||||||
|
me.fn.FS.event.Broadcast()
|
||||||
|
me.fn.FS.mu.Unlock()
|
||||||
|
var n int
|
||||||
|
r := missinggo.ContextedReader{me.r, ctx}
|
||||||
|
n, readErr = r.Read(resp.Data)
|
||||||
|
if readErr == io.EOF {
|
||||||
|
readErr = nil
|
||||||
|
}
|
||||||
|
resp.Data = resp.Data[:n]
|
||||||
|
}()
|
||||||
|
defer func() {
|
||||||
|
<-readDone
|
||||||
|
me.fn.FS.mu.Lock()
|
||||||
|
me.fn.FS.blockedReads--
|
||||||
|
me.fn.FS.event.Broadcast()
|
||||||
|
me.fn.FS.mu.Unlock()
|
||||||
|
}()
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-readDone:
|
||||||
|
return readErr
|
||||||
|
case <-me.fn.FS.destroyed:
|
||||||
|
return fuse.EIO
|
||||||
|
case <-ctx.Done():
|
||||||
|
return fuse.EINTR
|
||||||
}
|
}
|
||||||
return nil
|
}
|
||||||
|
|
||||||
|
func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error {
|
||||||
|
return me.r.Close()
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,8 @@
|
||||||
package torrentfs
|
package torrentfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"os"
|
||||||
|
|
||||||
"bazil.org/fuse"
|
"bazil.org/fuse"
|
||||||
fusefs "bazil.org/fuse/fs"
|
fusefs "bazil.org/fuse/fs"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
|
@ -23,5 +25,7 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
|
func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
|
||||||
return fileHandle{fn}, nil
|
r := fn.t.NewReader()
|
||||||
|
r.Seek(fn.TorrentOffset, os.SEEK_SET)
|
||||||
|
return fileHandle{fn, r}, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -2,7 +2,6 @@ package torrentfs
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"expvar"
|
"expvar"
|
||||||
"io"
|
|
||||||
"os"
|
"os"
|
||||||
"path"
|
"path"
|
||||||
"strings"
|
"strings"
|
||||||
|
@ -57,56 +56,6 @@ func (n *node) fsPath() string {
|
||||||
return "/" + n.metadata.Name + "/" + n.path
|
return "/" + n.metadata.Name + "/" + n.path
|
||||||
}
|
}
|
||||||
|
|
||||||
func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
|
|
||||||
fs.mu.Lock()
|
|
||||||
fs.blockedReads++
|
|
||||||
fs.event.Broadcast()
|
|
||||||
fs.mu.Unlock()
|
|
||||||
var (
|
|
||||||
_n int
|
|
||||||
_err error
|
|
||||||
)
|
|
||||||
readDone := make(chan struct{})
|
|
||||||
go func() {
|
|
||||||
defer close(readDone)
|
|
||||||
r := t.NewReader()
|
|
||||||
defer r.Close()
|
|
||||||
_, _err = r.Seek(off, os.SEEK_SET)
|
|
||||||
if _err != nil {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
_n, _err = io.ReadFull(r, p)
|
|
||||||
}()
|
|
||||||
select {
|
|
||||||
case <-readDone:
|
|
||||||
n = _n
|
|
||||||
err = _err
|
|
||||||
case <-fs.destroyed:
|
|
||||||
err = fuse.EIO
|
|
||||||
case <-ctx.Done():
|
|
||||||
err = fuse.EINTR
|
|
||||||
}
|
|
||||||
fs.mu.Lock()
|
|
||||||
fs.blockedReads--
|
|
||||||
fs.event.Broadcast()
|
|
||||||
fs.mu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
|
|
||||||
for len(p) != 0 {
|
|
||||||
var nn int
|
|
||||||
nn, err = blockingRead(ctx, fs, t, off, p)
|
|
||||||
if err != nil {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
n += nn
|
|
||||||
off += int64(nn)
|
|
||||||
p = p[nn:]
|
|
||||||
}
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
type dirNode struct {
|
type dirNode struct {
|
||||||
node
|
node
|
||||||
}
|
}
|
||||||
|
|
|
@ -119,10 +119,10 @@ func (r *Reader) piecesUncached() (ret pieceRange) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) Read(b []byte) (n int, err error) {
|
func (r *Reader) Read(b []byte) (n int, err error) {
|
||||||
return r.ReadContext(b, context.Background())
|
return r.ReadContext(context.Background(), b)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
|
func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
|
||||||
// This is set under the Client lock if the Context is canceled.
|
// This is set under the Client lock if the Context is canceled.
|
||||||
var ctxErr error
|
var ctxErr error
|
||||||
if ctx.Done() != nil {
|
if ctx.Done() != nil {
|
||||||
|
|
|
@ -20,6 +20,6 @@ func TestReaderReadContext(t *testing.T) {
|
||||||
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
|
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
|
||||||
r := tt.NewReader()
|
r := tt.NewReader()
|
||||||
defer r.Close()
|
defer r.Close()
|
||||||
_, err = r.ReadContext(make([]byte, 1), ctx)
|
_, err = r.ReadContext(ctx, make([]byte, 1))
|
||||||
require.EqualValues(t, context.DeadlineExceeded, err)
|
require.EqualValues(t, context.DeadlineExceeded, err)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue