diff --git a/client_test.go b/client_test.go index faded767..597b67b7 100644 --- a/client_test.go +++ b/client_test.go @@ -19,6 +19,7 @@ import ( _ "github.com/anacrolix/envpprof" "github.com/anacrolix/missinggo" . "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/filecache" "github.com/anacrolix/utp" "github.com/bradfitz/iter" "github.com/stretchr/testify/assert" @@ -310,17 +311,12 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) { leecherDataDir, err := ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(leecherDataDir) - // cfg.TorrentDataOpener = func() TorrentDataOpener { - // fc, err := filecache.NewCache(leecherDataDir) - // require.NoError(t, err) - // if ps.SetLeecherStorageCapacity { - // fc.SetCapacity(ps.LeecherStorageCapacity) - // } - // store := pieceStore.New(fileCacheDataBackend.New(fc)) - // return func(mi *metainfo.Info) storage.I { - // return store.OpenTorrentData(mi) - // } - // }() + fc, err := filecache.NewCache(leecherDataDir) + require.NoError(t, err) + if ps.SetLeecherStorageCapacity { + fc.SetCapacity(ps.LeecherStorageCapacity) + } + cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc}) leecher, err := NewClient(&cfg) require.NoError(t, err) defer leecher.Close() @@ -719,55 +715,64 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) { } } -// func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) { -// fileCacheDir, err := ioutil.TempDir("", "") -// require.NoError(t, err) -// defer os.RemoveAll(fileCacheDir) -// fileCache, err := filecache.NewCache(fileCacheDir) -// require.NoError(t, err) -// greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent() -// defer os.RemoveAll(greetingDataTempDir) -// filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache)) -// greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info) -// written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0) -// require.Equal(t, len(testutil.GreetingFileContents), written) -// require.NoError(t, err) -// for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ { -// // p := greetingMetainfo.Info.Piece(i) -// if alreadyCompleted { -// err := greetingData.PieceCompleted(i) -// assert.NoError(t, err) -// } -// } -// cfg := TestingConfig -// // TODO: Disable network option? -// cfg.DisableTCP = true -// cfg.DisableUTP = true -// // cfg.DefaultStorage = filePieceStore -// cl, err := NewClient(&cfg) -// require.NoError(t, err) -// defer cl.Close() -// tt, err := cl.AddTorrent(greetingMetainfo) -// require.NoError(t, err) -// psrs := tt.PieceStateRuns() -// assert.Len(t, psrs, 1) -// assert.EqualValues(t, 3, psrs[0].Length) -// assert.Equal(t, alreadyCompleted, psrs[0].Complete) -// if alreadyCompleted { -// r := tt.NewReader() -// b, err := ioutil.ReadAll(r) -// assert.NoError(t, err) -// assert.EqualValues(t, testutil.GreetingFileContents, b) -// } -// } +func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) { + for i := range iter.N(info.NumPieces()) { + n, err := ts.Piece(info.Piece(i)).WriteAt(b, 0) + b = b[n:] + log.Print(err) + } +} -// func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) { -// testAddTorrentPriorPieceCompletion(t, true) -// } +func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) { + fileCacheDir, err := ioutil.TempDir("", "") + require.NoError(t, err) + defer os.RemoveAll(fileCacheDir) + fileCache, err := filecache.NewCache(fileCacheDir) + require.NoError(t, err) + greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent() + defer os.RemoveAll(greetingDataTempDir) + filePieceStore := storage.NewPieceFileStorage(storage.FileCacheFileStore{fileCache}) + greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info) + require.NoError(t, err) + writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents)) + // require.Equal(t, len(testutil.GreetingFileContents), written) + // require.NoError(t, err) + for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ { + p := greetingMetainfo.Info.Piece(i) + if alreadyCompleted { + err := greetingData.Piece(p).MarkComplete() + assert.NoError(t, err) + } + } + cfg := TestingConfig + // TODO: Disable network option? + cfg.DisableTCP = true + cfg.DisableUTP = true + cfg.DefaultStorage = filePieceStore + cl, err := NewClient(&cfg) + require.NoError(t, err) + defer cl.Close() + tt, err := cl.AddTorrent(greetingMetainfo) + require.NoError(t, err) + psrs := tt.PieceStateRuns() + assert.Len(t, psrs, 1) + assert.EqualValues(t, 3, psrs[0].Length) + assert.Equal(t, alreadyCompleted, psrs[0].Complete) + if alreadyCompleted { + r := tt.NewReader() + b, err := ioutil.ReadAll(r) + assert.NoError(t, err) + assert.EqualValues(t, testutil.GreetingFileContents, b) + } +} -// func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) { -// testAddTorrentPriorPieceCompletion(t, false) -// } +func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) { + testAddTorrentPriorPieceCompletion(t, true) +} + +func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) { + testAddTorrentPriorPieceCompletion(t, false) +} func TestAddMetainfoWithNodes(t *testing.T) { cfg := TestingConfig @@ -812,17 +817,12 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) { leecherDataDir, err := ioutil.TempDir("", "") require.NoError(t, err) defer os.RemoveAll(leecherDataDir) - // cfg.TorrentDataOpener = func() TorrentDataOpener { - // fc, err := filecache.NewCache(leecherDataDir) - // require.NoError(t, err) - // if ps.SetLeecherStorageCapacity { - // fc.SetCapacity(ps.LeecherStorageCapacity) - // } - // store := pieceStore.New(fileCacheDataBackend.New(fc)) - // return func(mi *metainfo.Info) storage.I { - // return store.OpenTorrentData(mi) - // } - // }() + fc, err := filecache.NewCache(leecherDataDir) + require.NoError(t, err) + if ps.SetLeecherStorageCapacity { + fc.SetCapacity(ps.LeecherStorageCapacity) + } + cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc}) cfg.DataDir = leecherDataDir leecher, _ := NewClient(&cfg) defer leecher.Close() diff --git a/storage/piece_file.go b/storage/piece_file.go new file mode 100644 index 00000000..0e323751 --- /dev/null +++ b/storage/piece_file.go @@ -0,0 +1,102 @@ +package storage + +import ( + "os" + "path" + + "github.com/anacrolix/missinggo" + "github.com/anacrolix/missinggo/filecache" + + "github.com/anacrolix/torrent/metainfo" +) + +type FileStore interface { + OpenFile(path string, flags int) (File, error) + Stat(path string) (os.FileInfo, error) + Rename(from, to string) error +} + +type File interface { + WriteAt([]byte, int64) (int, error) + ReadAt([]byte, int64) (int, error) + Close() error +} + +type FileCacheFileStore struct { + *filecache.Cache +} + +func (me FileCacheFileStore) OpenFile(p string, f int) (File, error) { + return me.Cache.OpenFile(p, f) +} + +type pieceFileStorage struct { + fs FileStore +} + +func NewPieceFileStorage(fs FileStore) I { + return &pieceFileStorage{ + fs: fs, + } +} + +type pieceFileTorrentStorage struct { + s *pieceFileStorage +} + +func (me *pieceFileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) { + return &pieceFileTorrentStorage{me}, nil +} + +func (me *pieceFileTorrentStorage) Close() error { + return nil +} + +func (me *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece { + return pieceFileTorrentStoragePiece{me, p, me.s.fs} +} + +type pieceFileTorrentStoragePiece struct { + ts *pieceFileTorrentStorage + p metainfo.Piece + fs FileStore +} + +func (me pieceFileTorrentStoragePiece) completedPath() string { + return path.Join("completed", me.p.Hash().HexString()) +} + +func (me pieceFileTorrentStoragePiece) incompletePath() string { + return path.Join("incomplete", me.p.Hash().HexString()) +} + +func (me pieceFileTorrentStoragePiece) GetIsComplete() bool { + fi, err := me.ts.s.fs.Stat(me.completedPath()) + return err == nil && fi.Size() == me.p.Length() +} + +func (me pieceFileTorrentStoragePiece) MarkComplete() error { + return me.fs.Rename(me.incompletePath(), me.completedPath()) +} + +func (me pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err error) { + f, err := me.fs.OpenFile(me.completedPath(), os.O_RDONLY) + if err != nil { + f, err = me.fs.OpenFile(me.incompletePath(), os.O_RDONLY) + if err != nil { + return + } + } + defer f.Close() + return f.ReadAt(b, off) +} + +func (me pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) { + f, err := me.fs.OpenFile(me.incompletePath(), os.O_WRONLY|os.O_CREATE) + if err != nil { + return + } + defer f.Close() + missinggo.LimitLen(&b, me.p.Length()-off) + return f.WriteAt(b, off) +}