diff --git a/fs/file_handle.go b/fs/file_handle.go new file mode 100644 index 00000000..592153c3 --- /dev/null +++ b/fs/file_handle.go @@ -0,0 +1,76 @@ +package torrentfs + +import ( + "context" + "io" + "os" + + "github.com/anacrolix/missinggo" + "github.com/anacrolix/torrent" + + "bazil.org/fuse" + "bazil.org/fuse/fs" +) + +type fileHandle struct { + fn fileNode + r *torrent.Reader +} + +var _ interface { + fs.HandleReader + fs.HandleReleaser +} = fileHandle{} + +func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { + torrentfsReadRequests.Add(1) + if req.Dir { + panic("read on directory") + } + pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET) + if err != nil { + panic(err) + } + if pos != me.fn.TorrentOffset+req.Offset { + panic("seek failed") + } + resp.Data = resp.Data[:req.Size] + readDone := make(chan struct{}) + ctx, cancel := context.WithCancel(ctx) + var readErr error + go func() { + defer close(readDone) + me.fn.FS.mu.Lock() + me.fn.FS.blockedReads++ + me.fn.FS.event.Broadcast() + me.fn.FS.mu.Unlock() + var n int + r := missinggo.ContextedReader{me.r, ctx} + n, readErr = r.Read(resp.Data) + if readErr == io.EOF { + readErr = nil + } + resp.Data = resp.Data[:n] + }() + defer func() { + <-readDone + me.fn.FS.mu.Lock() + me.fn.FS.blockedReads-- + me.fn.FS.event.Broadcast() + me.fn.FS.mu.Unlock() + }() + defer cancel() + + select { + case <-readDone: + return readErr + case <-me.fn.FS.destroyed: + return fuse.EIO + case <-ctx.Done(): + return fuse.EINTR + } +} + +func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error { + return me.r.Close() +} diff --git a/fs/filenode.go b/fs/filenode.go index f59e5b0d..301d92d3 100644 --- a/fs/filenode.go +++ b/fs/filenode.go @@ -1,7 +1,7 @@ package torrentfs import ( - "fmt" + "os" "bazil.org/fuse" fusefs "bazil.org/fuse/fs" @@ -14,7 +14,9 @@ type fileNode struct { TorrentOffset int64 } -var _ fusefs.HandleReader = fileNode{} +var ( + _ fusefs.NodeOpener = fileNode{} +) func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { attr.Size = fn.size @@ -22,30 +24,8 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error { return nil } -func (fn fileNode) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - torrentfsReadRequests.Add(1) - if req.Dir { - panic("read on directory") - } - size := req.Size - fileLeft := int64(fn.size) - req.Offset - if fileLeft < 0 { - fileLeft = 0 - } - if fileLeft < int64(size) { - size = int(fileLeft) - } - resp.Data = resp.Data[:size] - if len(resp.Data) == 0 { - return nil - } - torrentOff := fn.TorrentOffset + req.Offset - n, err := readFull(ctx, fn.FS, fn.t, torrentOff, resp.Data) - if err != nil { - return err - } - if n != size { - panic(fmt.Sprintf("%d < %d", n, size)) - } - return nil +func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) { + r := fn.t.NewReader() + r.Seek(fn.TorrentOffset, os.SEEK_SET) + return fileHandle{fn, r}, nil } diff --git a/fs/torrentfs.go b/fs/torrentfs.go index 0d9244b8..edd644a4 100644 --- a/fs/torrentfs.go +++ b/fs/torrentfs.go @@ -2,7 +2,6 @@ package torrentfs import ( "expvar" - "io" "os" "path" "strings" @@ -57,56 +56,6 @@ func (n *node) fsPath() string { return "/" + n.metadata.Name + "/" + n.path } -func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) { - fs.mu.Lock() - fs.blockedReads++ - fs.event.Broadcast() - fs.mu.Unlock() - var ( - _n int - _err error - ) - readDone := make(chan struct{}) - go func() { - defer close(readDone) - r := t.NewReader() - defer r.Close() - _, _err = r.Seek(off, os.SEEK_SET) - if _err != nil { - return - } - _n, _err = io.ReadFull(r, p) - }() - select { - case <-readDone: - n = _n - err = _err - case <-fs.destroyed: - err = fuse.EIO - case <-ctx.Done(): - err = fuse.EINTR - } - fs.mu.Lock() - fs.blockedReads-- - fs.event.Broadcast() - fs.mu.Unlock() - return -} - -func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) { - for len(p) != 0 { - var nn int - nn, err = blockingRead(ctx, fs, t, off, p) - if err != nil { - break - } - n += nn - off += int64(nn) - p = p[nn:] - } - return -} - type dirNode struct { node } diff --git a/fs/torrentfs_test.go b/fs/torrentfs_test.go index 19d5b1f1..2625cc3c 100644 --- a/fs/torrentfs_test.go +++ b/fs/torrentfs_test.go @@ -210,7 +210,9 @@ func TestDownloadOnDemand(t *testing.T) { resp := &fuse.ReadResponse{ Data: make([]byte, size), } - node.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{ + h, err := node.(fusefs.NodeOpener).Open(nil, nil, nil) + require.NoError(t, err) + h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{ Size: int(size), }, resp) assert.EqualValues(t, testutil.GreetingFileContents, resp.Data) diff --git a/reader.go b/reader.go index 33ed4017..7da3b48a 100644 --- a/reader.go +++ b/reader.go @@ -119,10 +119,10 @@ func (r *Reader) piecesUncached() (ret pieceRange) { } func (r *Reader) Read(b []byte) (n int, err error) { - return r.ReadContext(b, context.Background()) + return r.ReadContext(context.Background(), b) } -func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) { +func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) { // This is set under the Client lock if the Context is canceled. var ctxErr error if ctx.Done() != nil { diff --git a/reader_test.go b/reader_test.go index dcda4152..f56e9473 100644 --- a/reader_test.go +++ b/reader_test.go @@ -20,6 +20,6 @@ func TestReaderReadContext(t *testing.T) { ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond)) r := tt.NewReader() defer r.Close() - _, err = r.ReadContext(make([]byte, 1), ctx) + _, err = r.ReadContext(ctx, make([]byte, 1)) require.EqualValues(t, context.DeadlineExceeded, err) }