2
0
mirror of synced 2025-02-24 06:38:14 +00:00

Reimplement piece storage

This commit is contained in:
Matt Joiner 2016-03-29 11:14:34 +11:00
parent 0a3a5d6ae0
commit ee22446440
2 changed files with 171 additions and 69 deletions

View File

@ -19,6 +19,7 @@ import (
_ "github.com/anacrolix/envpprof" _ "github.com/anacrolix/envpprof"
"github.com/anacrolix/missinggo" "github.com/anacrolix/missinggo"
. "github.com/anacrolix/missinggo" . "github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/filecache"
"github.com/anacrolix/utp" "github.com/anacrolix/utp"
"github.com/bradfitz/iter" "github.com/bradfitz/iter"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -310,17 +311,12 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
leecherDataDir, err := ioutil.TempDir("", "") leecherDataDir, err := ioutil.TempDir("", "")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(leecherDataDir) defer os.RemoveAll(leecherDataDir)
// cfg.TorrentDataOpener = func() TorrentDataOpener { fc, err := filecache.NewCache(leecherDataDir)
// fc, err := filecache.NewCache(leecherDataDir) require.NoError(t, err)
// require.NoError(t, err) if ps.SetLeecherStorageCapacity {
// if ps.SetLeecherStorageCapacity { fc.SetCapacity(ps.LeecherStorageCapacity)
// fc.SetCapacity(ps.LeecherStorageCapacity) }
// } cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
// store := pieceStore.New(fileCacheDataBackend.New(fc))
// return func(mi *metainfo.Info) storage.I {
// return store.OpenTorrentData(mi)
// }
// }()
leecher, err := NewClient(&cfg) leecher, err := NewClient(&cfg)
require.NoError(t, err) require.NoError(t, err)
defer leecher.Close() defer leecher.Close()
@ -719,55 +715,64 @@ func TestTorrentDroppedBeforeGotInfo(t *testing.T) {
} }
} }
// func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) { func writeTorrentData(ts storage.Torrent, info *metainfo.InfoEx, b []byte) {
// fileCacheDir, err := ioutil.TempDir("", "") for i := range iter.N(info.NumPieces()) {
// require.NoError(t, err) n, err := ts.Piece(info.Piece(i)).WriteAt(b, 0)
// defer os.RemoveAll(fileCacheDir) b = b[n:]
// fileCache, err := filecache.NewCache(fileCacheDir) log.Print(err)
// require.NoError(t, err) }
// greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent() }
// defer os.RemoveAll(greetingDataTempDir)
// filePieceStore := pieceStore.New(fileCacheDataBackend.New(fileCache))
// greetingData := filePieceStore.OpenTorrentData(&greetingMetainfo.Info.Info)
// written, err := greetingData.WriteAt([]byte(testutil.GreetingFileContents), 0)
// require.Equal(t, len(testutil.GreetingFileContents), written)
// require.NoError(t, err)
// for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
// // p := greetingMetainfo.Info.Piece(i)
// if alreadyCompleted {
// err := greetingData.PieceCompleted(i)
// assert.NoError(t, err)
// }
// }
// cfg := TestingConfig
// // TODO: Disable network option?
// cfg.DisableTCP = true
// cfg.DisableUTP = true
// // cfg.DefaultStorage = filePieceStore
// cl, err := NewClient(&cfg)
// require.NoError(t, err)
// defer cl.Close()
// tt, err := cl.AddTorrent(greetingMetainfo)
// require.NoError(t, err)
// psrs := tt.PieceStateRuns()
// assert.Len(t, psrs, 1)
// assert.EqualValues(t, 3, psrs[0].Length)
// assert.Equal(t, alreadyCompleted, psrs[0].Complete)
// if alreadyCompleted {
// r := tt.NewReader()
// b, err := ioutil.ReadAll(r)
// assert.NoError(t, err)
// assert.EqualValues(t, testutil.GreetingFileContents, b)
// }
// }
// func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) { func testAddTorrentPriorPieceCompletion(t *testing.T, alreadyCompleted bool) {
// testAddTorrentPriorPieceCompletion(t, true) fileCacheDir, err := ioutil.TempDir("", "")
// } require.NoError(t, err)
defer os.RemoveAll(fileCacheDir)
fileCache, err := filecache.NewCache(fileCacheDir)
require.NoError(t, err)
greetingDataTempDir, greetingMetainfo := testutil.GreetingTestTorrent()
defer os.RemoveAll(greetingDataTempDir)
filePieceStore := storage.NewPieceFileStorage(storage.FileCacheFileStore{fileCache})
greetingData, err := filePieceStore.OpenTorrent(&greetingMetainfo.Info)
require.NoError(t, err)
writeTorrentData(greetingData, &greetingMetainfo.Info, []byte(testutil.GreetingFileContents))
// require.Equal(t, len(testutil.GreetingFileContents), written)
// require.NoError(t, err)
for i := 0; i < greetingMetainfo.Info.NumPieces(); i++ {
p := greetingMetainfo.Info.Piece(i)
if alreadyCompleted {
err := greetingData.Piece(p).MarkComplete()
assert.NoError(t, err)
}
}
cfg := TestingConfig
// TODO: Disable network option?
cfg.DisableTCP = true
cfg.DisableUTP = true
cfg.DefaultStorage = filePieceStore
cl, err := NewClient(&cfg)
require.NoError(t, err)
defer cl.Close()
tt, err := cl.AddTorrent(greetingMetainfo)
require.NoError(t, err)
psrs := tt.PieceStateRuns()
assert.Len(t, psrs, 1)
assert.EqualValues(t, 3, psrs[0].Length)
assert.Equal(t, alreadyCompleted, psrs[0].Complete)
if alreadyCompleted {
r := tt.NewReader()
b, err := ioutil.ReadAll(r)
assert.NoError(t, err)
assert.EqualValues(t, testutil.GreetingFileContents, b)
}
}
// func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) { func TestAddTorrentPiecesAlreadyCompleted(t *testing.T) {
// testAddTorrentPriorPieceCompletion(t, false) testAddTorrentPriorPieceCompletion(t, true)
// } }
func TestAddTorrentPiecesNotAlreadyCompleted(t *testing.T) {
testAddTorrentPriorPieceCompletion(t, false)
}
func TestAddMetainfoWithNodes(t *testing.T) { func TestAddMetainfoWithNodes(t *testing.T) {
cfg := TestingConfig cfg := TestingConfig
@ -812,17 +817,12 @@ func testDownloadCancel(t *testing.T, ps testDownloadCancelParams) {
leecherDataDir, err := ioutil.TempDir("", "") leecherDataDir, err := ioutil.TempDir("", "")
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(leecherDataDir) defer os.RemoveAll(leecherDataDir)
// cfg.TorrentDataOpener = func() TorrentDataOpener { fc, err := filecache.NewCache(leecherDataDir)
// fc, err := filecache.NewCache(leecherDataDir) require.NoError(t, err)
// require.NoError(t, err) if ps.SetLeecherStorageCapacity {
// if ps.SetLeecherStorageCapacity { fc.SetCapacity(ps.LeecherStorageCapacity)
// fc.SetCapacity(ps.LeecherStorageCapacity) }
// } cfg.DefaultStorage = storage.NewPieceFileStorage(storage.FileCacheFileStore{fc})
// store := pieceStore.New(fileCacheDataBackend.New(fc))
// return func(mi *metainfo.Info) storage.I {
// return store.OpenTorrentData(mi)
// }
// }()
cfg.DataDir = leecherDataDir cfg.DataDir = leecherDataDir
leecher, _ := NewClient(&cfg) leecher, _ := NewClient(&cfg)
defer leecher.Close() defer leecher.Close()

102
storage/piece_file.go Normal file
View File

@ -0,0 +1,102 @@
package storage
import (
"os"
"path"
"github.com/anacrolix/missinggo"
"github.com/anacrolix/missinggo/filecache"
"github.com/anacrolix/torrent/metainfo"
)
type FileStore interface {
OpenFile(path string, flags int) (File, error)
Stat(path string) (os.FileInfo, error)
Rename(from, to string) error
}
type File interface {
WriteAt([]byte, int64) (int, error)
ReadAt([]byte, int64) (int, error)
Close() error
}
type FileCacheFileStore struct {
*filecache.Cache
}
func (me FileCacheFileStore) OpenFile(p string, f int) (File, error) {
return me.Cache.OpenFile(p, f)
}
type pieceFileStorage struct {
fs FileStore
}
func NewPieceFileStorage(fs FileStore) I {
return &pieceFileStorage{
fs: fs,
}
}
type pieceFileTorrentStorage struct {
s *pieceFileStorage
}
func (me *pieceFileStorage) OpenTorrent(info *metainfo.InfoEx) (Torrent, error) {
return &pieceFileTorrentStorage{me}, nil
}
func (me *pieceFileTorrentStorage) Close() error {
return nil
}
func (me *pieceFileTorrentStorage) Piece(p metainfo.Piece) Piece {
return pieceFileTorrentStoragePiece{me, p, me.s.fs}
}
type pieceFileTorrentStoragePiece struct {
ts *pieceFileTorrentStorage
p metainfo.Piece
fs FileStore
}
func (me pieceFileTorrentStoragePiece) completedPath() string {
return path.Join("completed", me.p.Hash().HexString())
}
func (me pieceFileTorrentStoragePiece) incompletePath() string {
return path.Join("incomplete", me.p.Hash().HexString())
}
func (me pieceFileTorrentStoragePiece) GetIsComplete() bool {
fi, err := me.ts.s.fs.Stat(me.completedPath())
return err == nil && fi.Size() == me.p.Length()
}
func (me pieceFileTorrentStoragePiece) MarkComplete() error {
return me.fs.Rename(me.incompletePath(), me.completedPath())
}
func (me pieceFileTorrentStoragePiece) ReadAt(b []byte, off int64) (n int, err error) {
f, err := me.fs.OpenFile(me.completedPath(), os.O_RDONLY)
if err != nil {
f, err = me.fs.OpenFile(me.incompletePath(), os.O_RDONLY)
if err != nil {
return
}
}
defer f.Close()
return f.ReadAt(b, off)
}
func (me pieceFileTorrentStoragePiece) WriteAt(b []byte, off int64) (n int, err error) {
f, err := me.fs.OpenFile(me.incompletePath(), os.O_WRONLY|os.O_CREATE)
if err != nil {
return
}
defer f.Close()
missinggo.LimitLen(&b, me.p.Length()-off)
return f.WriteAt(b, off)
}