Merge branch 'fs-handle-reader'

This commit is contained in:
Matt Joiner 2017-08-28 17:28:05 +10:00
commit 6794ace27c
6 changed files with 90 additions and 83 deletions

76
fs/file_handle.go Normal file
View File

@ -0,0 +1,76 @@
package torrentfs
import (
"context"
"io"
"os"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/torrent"
"bazil.org/fuse"
"bazil.org/fuse/fs"
)
type fileHandle struct {
fn fileNode
r *torrent.Reader
}
var _ interface {
fs.HandleReader
fs.HandleReleaser
} = fileHandle{}
func (me fileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
torrentfsReadRequests.Add(1)
if req.Dir {
panic("read on directory")
}
pos, err := me.r.Seek(me.fn.TorrentOffset+req.Offset, os.SEEK_SET)
if err != nil {
panic(err)
}
if pos != me.fn.TorrentOffset+req.Offset {
panic("seek failed")
}
resp.Data = resp.Data[:req.Size]
readDone := make(chan struct{})
ctx, cancel := context.WithCancel(ctx)
var readErr error
go func() {
defer close(readDone)
me.fn.FS.mu.Lock()
me.fn.FS.blockedReads++
me.fn.FS.event.Broadcast()
me.fn.FS.mu.Unlock()
var n int
r := missinggo.ContextedReader{me.r, ctx}
n, readErr = r.Read(resp.Data)
if readErr == io.EOF {
readErr = nil
}
resp.Data = resp.Data[:n]
}()
defer func() {
<-readDone
me.fn.FS.mu.Lock()
me.fn.FS.blockedReads--
me.fn.FS.event.Broadcast()
me.fn.FS.mu.Unlock()
}()
defer cancel()
select {
case <-readDone:
return readErr
case <-me.fn.FS.destroyed:
return fuse.EIO
case <-ctx.Done():
return fuse.EINTR
}
}
func (me fileHandle) Release(context.Context, *fuse.ReleaseRequest) error {
return me.r.Close()
}

View File

@ -1,7 +1,7 @@
package torrentfs
import (
"fmt"
"os"
"bazil.org/fuse"
fusefs "bazil.org/fuse/fs"
@ -14,7 +14,9 @@ type fileNode struct {
TorrentOffset int64
}
var _ fusefs.HandleReader = fileNode{}
var (
_ fusefs.NodeOpener = fileNode{}
)
func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
attr.Size = fn.size
@ -22,30 +24,8 @@ func (fn fileNode) Attr(ctx context.Context, attr *fuse.Attr) error {
return nil
}
func (fn fileNode) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
torrentfsReadRequests.Add(1)
if req.Dir {
panic("read on directory")
}
size := req.Size
fileLeft := int64(fn.size) - req.Offset
if fileLeft < 0 {
fileLeft = 0
}
if fileLeft < int64(size) {
size = int(fileLeft)
}
resp.Data = resp.Data[:size]
if len(resp.Data) == 0 {
return nil
}
torrentOff := fn.TorrentOffset + req.Offset
n, err := readFull(ctx, fn.FS, fn.t, torrentOff, resp.Data)
if err != nil {
return err
}
if n != size {
panic(fmt.Sprintf("%d < %d", n, size))
}
return nil
func (fn fileNode) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.OpenResponse) (fusefs.Handle, error) {
r := fn.t.NewReader()
r.Seek(fn.TorrentOffset, os.SEEK_SET)
return fileHandle{fn, r}, nil
}

View File

@ -2,7 +2,6 @@ package torrentfs
import (
"expvar"
"io"
"os"
"path"
"strings"
@ -57,56 +56,6 @@ func (n *node) fsPath() string {
return "/" + n.metadata.Name + "/" + n.path
}
func blockingRead(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
fs.mu.Lock()
fs.blockedReads++
fs.event.Broadcast()
fs.mu.Unlock()
var (
_n int
_err error
)
readDone := make(chan struct{})
go func() {
defer close(readDone)
r := t.NewReader()
defer r.Close()
_, _err = r.Seek(off, os.SEEK_SET)
if _err != nil {
return
}
_n, _err = io.ReadFull(r, p)
}()
select {
case <-readDone:
n = _n
err = _err
case <-fs.destroyed:
err = fuse.EIO
case <-ctx.Done():
err = fuse.EINTR
}
fs.mu.Lock()
fs.blockedReads--
fs.event.Broadcast()
fs.mu.Unlock()
return
}
func readFull(ctx context.Context, fs *TorrentFS, t *torrent.Torrent, off int64, p []byte) (n int, err error) {
for len(p) != 0 {
var nn int
nn, err = blockingRead(ctx, fs, t, off, p)
if err != nil {
break
}
n += nn
off += int64(nn)
p = p[nn:]
}
return
}
type dirNode struct {
node
}

View File

@ -210,7 +210,9 @@ func TestDownloadOnDemand(t *testing.T) {
resp := &fuse.ReadResponse{
Data: make([]byte, size),
}
node.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
h, err := node.(fusefs.NodeOpener).Open(nil, nil, nil)
require.NoError(t, err)
h.(fusefs.HandleReader).Read(netContext.Background(), &fuse.ReadRequest{
Size: int(size),
}, resp)
assert.EqualValues(t, testutil.GreetingFileContents, resp.Data)

View File

@ -119,10 +119,10 @@ func (r *Reader) piecesUncached() (ret pieceRange) {
}
func (r *Reader) Read(b []byte) (n int, err error) {
return r.ReadContext(b, context.Background())
return r.ReadContext(context.Background(), b)
}
func (r *Reader) ReadContext(b []byte, ctx context.Context) (n int, err error) {
func (r *Reader) ReadContext(ctx context.Context, b []byte) (n int, err error) {
// This is set under the Client lock if the Context is canceled.
var ctxErr error
if ctx.Done() != nil {

View File

@ -20,6 +20,6 @@ func TestReaderReadContext(t *testing.T) {
ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Millisecond))
r := tt.NewReader()
defer r.Close()
_, err = r.ReadContext(make([]byte, 1), ctx)
_, err = r.ReadContext(ctx, make([]byte, 1))
require.EqualValues(t, context.DeadlineExceeded, err)
}