From 58d66a1b622fffa3aa101cf2e85ed093f35b6e59 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Sun, 27 Aug 2017 14:19:58 +1000 Subject: [PATCH 1/2] fs: Move file Read behaviour onto a new handle type --- fs/file_handle.go | 43 +++++++++++++++++++++++++++++++++++++++++++ fs/filenode.go | 34 +++++----------------------------- fs/torrentfs_test.go | 4 +++- 3 files changed, 51 insertions(+), 30 deletions(-) create mode 100644 fs/file_handle.go diff --git a/fs/file_handle.go b/fs/file_handle.go new file mode 100644 index 00000000..11a5d74d --- /dev/null +++ b/fs/file_handle.go @@ -0,0 +1,43 @@ +package torrentfs + +import ( + "context" + "fmt" + + "bazil.org/fuse" + "bazil.org/fuse/fs" +) + +type fileHandle struct { + fn fileNode +} + +var _ fs.HandleReader = fileHandle{} + +func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { + torrentfsReadRequests.Add(1) + if req.Dir { + panic("read on directory") + } + size := req.Size + fileLeft := int64(me.fn.size) - req.Offset + if fileLeft < 0 { + fileLeft = 0 + } + if fileLeft < int64(size) { + size = int(fileLeft) + } + resp.Data = resp.Data[:size] + if len(resp.Data) == 0 { + return nil + } + torrentOff := me.fn.TorrentOffset + req.Offset + n, err := readFull(ctx, me.fn.FS, me.fn.t, torrentOff, resp.Data) + if err != nil { + return err + } + if n != size { + panic(fmt.Sprintf("%d < %d", n, size)) + } + return nil +} diff --git a/fs/filenode.go b/fs/filenode.go index f59e5b0d..2c83fbf1 100644 --- a/fs/filenode.go +++ b/fs/filenode.go @@ -1,8 +1,6 @@ package torrentfs import ( - "fmt" - "bazil.org/fuse" fusefs "bazil.org/fuse/fs" "golang.org/x/net/context" @@ -14,7 +12,9 @@ type fileNode struct { TorrentOffset int64 } -var _ fusefs.HandleReader = fileNode{} +var ( + _ fusefs.NodeOpener = fileNode{} +) func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Size = fn.size @@ -22,30 +22,6 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { return nil } -func (fn fileNode) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - torrentfsReadRequests.Add(1) - if req.Dir { - panic("read on directory") - } - size := req.Size - fileLeft := int64(fn.size) - req.Offset - if fileLeft < 0 { - fileLeft = 0 - } - if fileLeft < int64(size) { - size = int(fileLeft) - } - resp.Data = resp.Data[:size] - if len(resp.Data) == 0 { - return nil - } - torrentOff := fn.TorrentOffset + req.Offset - n, err := readFull(ctx, fn.FS, fn.t, torrentOff, resp.Data) - if err != nil { - return err - } - if n != size { - panic(fmt.Sprintf("%d < %d", n, size)) - } - return nil +func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) { + return fileHandle{fn}, nil } diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 19d5b1f1..2625cc3c 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -210,7 +210,9 @@ func TestDownloadOnDemand(t *testing.T) { resp := &fuse.ReadResponse{ Data: make([]byte, size), } - node.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{ + h, err := node.(fusefs.NodeOpener).Open(nil, nil, nil) + require.NoError(t, err) + h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{ Size: int(size), }, resp) assert.EqualValues(t, testutil.GreetingFileContents, resp.Data) From 7d55f573f537e9b08290216cc119f9c2b489ca79 Mon Sep 17 00:00:00 2001 From: Matt Joiner Date: Mon, 28 Aug 2017 01:42:02 +1000 Subject: [PATCH 2/2] Maintain a torrent.Reader for each file handle This means that readahead will work much better. Addresses https://github.com/anacrolix/torrent/issues/182. --- fs/file_handle.go | 73 ++++++++++++++++++++++++++++++++++------------- fs/filenode.go | 6 +++- fs/torrentfs.go | 51 --------------------------------- reader.go | 4 +-- reader_test.go | 2 +- 5 files changed, 61 insertions(+), 75 deletions(-) diff --git a/fs/file_handle.go b/fs/file_handle.go index 11a5d74d..592153c3 100644 --- a/fs/file_handle.go +++ b/fs/file_handle.go @@ -2,7 +2,11 @@ package torrentfs import ( "context" - "fmt" + "io" + "os" + + "github.com/anacrolix/missinggo" + "github.com/anacrolix/torrent" "bazil.org/fuse" "bazil.org/fuse/fs" @@ -10,34 +14,63 @@ import ( type fileHandle struct { fn fileNode + r *torrent.Reader } -var _ fs.HandleReader = fileHandle{} +var _ interface { + fs.HandleReader + fs.HandleReleaser +} = fileHandle{} func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { torrentfsReadRequests.Add(1) if req.Dir { panic("read on directory") } - size := req.Size - fileLeft := int64(me.fn.size) - req.Offset - if fileLeft < 0 { - fileLeft = 0 - } - if fileLeft < int64(size) { - size = int(fileLeft) - } - resp.Data = resp.Data[:size] - if len(resp.Data) == 0 { - return nil - } - torrentOff := me.fn.TorrentOffset + req.Offset - n, err := readFull(ctx, me.fn.FS, me.fn.t, torrentOff, resp.Data) + pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET) if err != nil { - return err + panic(err) } - if n != size { - panic(fmt.Sprintf("%d < %d", n, size)) + if pos != me.fn.TorrentOffset+req.Offset { + panic("seek failed") + } + resp.Data = resp.Data[:req.Size] + readDone := make(chan struct{}) + ctx, cancel := context.WithCancel(ctx) + var readErr error + go func() { + defer close(readDone) + me.fn.FS.mu.Lock() + me.fn.FS.blockedReads++ + me.fn.FS.event.Broadcast() + me.fn.FS.mu.Unlock() + var n int + r := missinggo.ContextedReader{me.r, ctx} + n, readErr = r.Read(resp.Data) + if readErr == io.EOF { + readErr = nil + } + resp.Data = resp.Data[:n] + }() + defer func() { + <-readDone + me.fn.FS.mu.Lock() + me.fn.FS.blockedReads-- + me.fn.FS.event.Broadcast() + me.fn.FS.mu.Unlock() + }() + defer cancel() + + select { + case <-readDone: + return readErr + case <-me.fn.FS.destroyed: + return fuse.EIO + case <-ctx.Done(): + return fuse.EINTR } - return nil +} + +func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error { + return me.r.Close() } diff --git a/fs/filenode.go b/fs/filenode.go index 2c83fbf1..301d92d3 100644 --- a/fs/filenode.go +++ b/fs/filenode.go @@ -1,6 +1,8 @@ package torrentfs import ( + "os" + "bazil.org/fuse" fusefs "bazil.org/fuse/fs" "golang.org/x/net/context" @@ -23,5 +25,7 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { } func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) { - return fileHandle{fn}, nil + r := fn.t.NewReader() + r.Seek(fn.TorrentOffset, os.SEEK_SET) + return fileHandle{fn, r}, nil } diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 0d9244b8..edd644a4 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -2,7 +2,6 @@ package torrentfs import ( "expvar" - "io" "os" "path" "strings" @@ -57,56 +56,6 @@ func (n *node) fsPath() string { return "/" + n.metadata.Name + "/" + n.path } -func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) { - fs.mu.Lock() - fs.blockedReads++ - fs.event.Broadcast() - fs.mu.Unlock() - var ( - _n int - _err error - ) - readDone := make(chan struct{}) - go func() { - defer close(readDone) - r := t.NewReader() - defer r.Close() - _, _err = r.Seek(off, os.SEEK_SET) - if _err != nil { - return - } - _n, _err = io.ReadFull(r, p) - }() - select { - case <-readDone: - n = _n - err = _err - case <-fs.destroyed: - err = fuse.EIO - case <-ctx.Done(): - err = fuse.EINTR - } - fs.mu.Lock() - fs.blockedReads-- - fs.event.Broadcast() - fs.mu.Unlock() - return -} - -func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) { - for len(p) != 0 { - var nn int - nn, err = blockingRead(ctx, fs, t, off, p) - if err != nil { - break - } - n += nn - off += int64(nn) - p = p[nn:] - } - return -} - type dirNode struct { node } diff --git a/reader.go b/reader.go index 33ed4017..7da3b48a 100644 --- a/reader.go +++ b/reader.go @@ -119,10 +119,10 @@ func (r *Reader) piecesUncached() (ret pieceRange) { } func (r *Reader) Read(b []byte) (n int, err error) { - return r.ReadContext(b, context.Background()) + return r.ReadContext(context.Background(), b) } -func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) { +func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // This is set under the Client lock if the Context is canceled. var ctxErr error if ctx.Done() != nil { diff --git a/reader_test.go b/reader_test.go index dcda4152..f56e9473 100644 --- a/reader_test.go +++ b/reader_test.go @@ -20,6 +20,6 @@ func TestReaderReadContext(t *testing.T) { ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond)) r := tt.NewReader() defer r.Close() - _, err = r.ReadContext(make([]byte, 1), ctx) + _, err = r.ReadContext(ctx, make([]byte, 1)) require.EqualValues(t, context.DeadlineExceeded, err) }