Rework storage.TorrentImpl to support shared capacity key

This commit is contained in:
Matt Joiner 2021-05-09 23:40:44 +10:00
parent 6e97ce952f
commit 5f8471e21b
11 changed files with 71 additions and 45 deletions

View File

@ -15,11 +15,9 @@ type badStorage struct{}
var _ storage.ClientImpl = badStorage{}
func (bs badStorage) OpenTorrent(*metainfo.Info, metainfo.Hash) (storage.TorrentImpl, error) {
return bs, nil
}
func (bs badStorage) Close() error {
return nil
return storage.TorrentImpl{
Piece: bs.Piece,
}, nil
}
func (bs badStorage) Piece(p metainfo.Piece) storage.PieceImpl {

View File

@ -98,7 +98,7 @@ func BenchmarkConnectionMainReadLoop(b *testing.B) {
ts := &torrentStorage{}
t := &Torrent{
cl: cl,
storage: &storage.Torrent{TorrentImpl: ts},
storage: &storage.Torrent{TorrentImpl: storage.TorrentImpl{Piece: ts.Piece, Close: ts.Close}},
pieceStateChanges: pubsub.NewPubSub(),
}
require.NoError(b, t.setInfo(&metainfo.Info{

View File

@ -43,7 +43,11 @@ func (me *boltClient) Close() error {
}
func (me *boltClient) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return &boltTorrent{me, infoHash}, nil
t := &boltTorrent{me, infoHash}
return TorrentImpl{
Piece: t.Piece,
Close: t.Close,
}, nil
}
func (me *boltTorrent) Piece(p metainfo.Piece) PieceImpl {

View File

@ -67,14 +67,16 @@ func (me *fileClientImpl) Close() error {
return me.pc.Close()
}
func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
dir := fs.pathMaker(fs.baseDir, info, infoHash)
upvertedFiles := info.UpvertedFiles()
files := make([]file, 0, len(upvertedFiles))
for i, fileInfo := range upvertedFiles {
s, err := ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...)
var s string
s, err = ToSafeFilePath(append([]string{info.Name}, fileInfo.Path...)...)
if err != nil {
return nil, fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err)
err = fmt.Errorf("file %v has unsafe path %q: %w", i, fileInfo.Path, err)
return
}
f := file{
path: filepath.Join(dir, s),
@ -83,16 +85,21 @@ func (fs *fileClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
if f.length == 0 {
err = CreateNativeZeroLengthFile(f.path)
if err != nil {
return nil, fmt.Errorf("creating zero length file: %w", err)
err = fmt.Errorf("creating zero length file: %w", err)
return
}
}
files = append(files, f)
}
return &fileTorrentImpl{
t := &fileTorrentImpl{
files,
segments.NewIndex(common.LengthIterFromUpvertedFiles(upvertedFiles)),
infoHash,
fs.pc,
}
return TorrentImpl{
Piece: t.Piece,
Close: t.Close,
}, nil
}

View File

@ -17,9 +17,11 @@ type ClientImpl interface {
}
// Data storage bound to a torrent.
type TorrentImpl interface {
Piece(metainfo.Piece) PieceImpl
Close() error
type TorrentImpl struct {
Piece func(metainfo.Piece) PieceImpl
Close func() error
// Storages that share the same value, will provide a pointer to the same function.
Capacity *func() *int64
}
// Interacts with torrent piece data. Optional interfaces to implement include io.WriterTo, such as

View File

@ -30,14 +30,14 @@ func NewMMapWithCompletion(baseDir string, completion PieceCompletion) *mmapClie
}
}
func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (t TorrentImpl, err error) {
func (s *mmapClientImpl) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (_ TorrentImpl, err error) {
span, err := mMapTorrent(info, s.baseDir)
t = &mmapTorrentStorage{
t := &mmapTorrentStorage{
infoHash: infoHash,
span: span,
pc: s.pc,
}
return
return TorrentImpl{Piece: t.Piece, Close: t.Close}, err
}
func (s *mmapClientImpl) Close() error {

View File

@ -26,6 +26,7 @@ type ResourcePiecesOpts struct {
// Sized puts require being able to stream from a statement executed on another connection.
// Without them, we buffer the entire read and then put that.
NoSizedPuts bool
Capacity *int64
}
func NewResourcePieces(p PieceProvider) ClientImpl {
@ -49,10 +50,11 @@ func (piecePerResourceTorrentImpl) Close() error {
}
func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (TorrentImpl, error) {
return piecePerResourceTorrentImpl{
t := piecePerResourceTorrentImpl{
s,
make([]sync.RWMutex, info.NumPieces()),
}, nil
}
return TorrentImpl{Piece: t.Piece, Close: t.Close}, nil
}
func (s piecePerResourceTorrentImpl) Piece(p metainfo.Piece) PieceImpl {

View File

@ -61,9 +61,24 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er
if opts.BlobFlushInterval != 0 {
cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
}
cl.capacity = cl.getCapacity
return cl, nil
}
func (cl *client) getCapacity() (ret *int64) {
cl.l.Lock()
defer cl.l.Unlock()
err := sqlitex.Exec(cl.conn, "select value from setting where name='capacity'", func(stmt *sqlite.Stmt) error {
ret = new(int64)
*ret = stmt.ColumnInt64(0)
return nil
})
if err != nil {
panic(err)
}
return
}
type client struct {
l sync.Mutex
conn conn
@ -71,6 +86,7 @@ type client struct {
blobFlusher *time.Timer
opts NewDirectStorageOpts
closed bool
capacity func() *int64
}
func (c *client) blobFlusherFunc() {
@ -91,7 +107,8 @@ func (c *client) flushBlobs() {
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
return torrent{c}, nil
t := torrent{c}
return storage.TorrentImpl{Piece: t.Piece, Close: t.Close, Capacity: &c.capacity}, nil
}
func (c *client) Close() error {

View File

@ -124,7 +124,7 @@ func (me *diskFullStorage) Close() error {
}
func (d *diskFullStorage) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
return d, nil
return storage.TorrentImpl{Piece: d.Piece, Close: d.Close}, nil
}
type pieceImpl struct {

View File

@ -192,7 +192,6 @@ func testClientTransfer(t *testing.T, ps testClientTransferParams) {
type fileCacheClientStorageFactoryParams struct {
Capacity int64
SetCapacity bool
Wrapper func(*filecache.Cache) storage.ClientImplCloser
}
func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) storageFactory {
@ -201,10 +200,22 @@ func newFileCacheClientStorageFactory(ps fileCacheClientStorageFactoryParams) st
if err != nil {
panic(err)
}
var sharedCapacity *int64
if ps.SetCapacity {
sharedCapacity = &ps.Capacity
fc.SetCapacity(ps.Capacity)
}
return ps.Wrapper(fc)
return struct {
storage.ClientImpl
io.Closer
}{
storage.NewResourcePiecesOpts(
fc.AsResourceProvider(),
storage.ResourcePiecesOpts{
Capacity: sharedCapacity,
}),
ioutil.NopCloser(nil),
}
}
}
@ -212,17 +223,13 @@ type storageFactory func(string) storage.ClientImplCloser
func TestClientTransferDefault(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
Wrapper: fileCachePieceResourceStorage,
}),
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
})
}
func TestClientTransferDefaultNoMetadata(t *testing.T) {
testClientTransfer(t, testClientTransferParams{
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
Wrapper: fileCachePieceResourceStorage,
}),
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}),
LeecherStartsWithoutMetadata: true,
})
}
@ -244,16 +251,6 @@ func TestClientTransferRateLimitedDownload(t *testing.T) {
})
}
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImplCloser {
return struct {
storage.ClientImpl
io.Closer
}{
storage.NewResourcePieces(fc.AsResourceProvider()),
ioutil.NopCloser(nil),
}
}
func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int64) {
testClientTransfer(t, testClientTransferParams{
LeecherStorage: newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
@ -261,7 +258,6 @@ func testClientTransferSmallCache(t *testing.T, setReadahead bool, readahead int
// Going below the piece length means it can't complete a piece so
// that it can be hashed.
Capacity: 5,
Wrapper: fileCachePieceResourceStorage,
}),
SetReadahead: setReadahead,
// Can't readahead too far or the cache will thrash and drop data we
@ -324,9 +320,7 @@ func sqliteLeecherStorageTestCase(numConns int) leecherStorageTestCase {
func TestClientTransferVarious(t *testing.T) {
// Leecher storage
for _, ls := range []leecherStorageTestCase{
{"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{
Wrapper: fileCachePieceResourceStorage,
}), 0},
{"Filecache", newFileCacheClientStorageFactory(fileCacheClientStorageFactoryParams{}), 0},
{"Boltdb", storage.NewBoltDB, 0},
{"SqliteDirect", func(s string) storage.ClientImplCloser {
path := filepath.Join(s, "sqlite3.db")

View File

@ -760,7 +760,9 @@ func (t *Torrent) close() (err error) {
func() {
t.storageLock.Lock()
defer t.storageLock.Unlock()
t.storage.Close()
if f := t.storage.Close; f != nil {
f()
}
}()
}
t.iterPeers(func(p *Peer) {