2
0
mirror of synced 2025-02-23 22:28:11 +00:00

Rework to use a pool of blobs

This commit is contained in:
Matt Joiner 2021-05-05 10:02:15 +10:00
parent 855144212c
commit 675a0ab0dc
3 changed files with 97 additions and 66 deletions

View File

@ -31,16 +31,17 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er
return
}
return &client{
prov: prov,
conn: prov.pool.Get(nil),
prov: prov,
conn: prov.pool.Get(nil),
blobs: make(map[string]*sqlite.Blob),
}, nil
}
type client struct {
l sync.Mutex
prov *provider
conn conn
blob *sqlite.Blob
l sync.Mutex
prov *provider
conn conn
blobs map[string]*sqlite.Blob
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
@ -48,8 +49,8 @@ func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (stora
}
func (c *client) Close() error {
if c.blob != nil {
c.blob.Close()
for _, b := range c.blobs {
b.Close()
}
c.prov.pool.Put(c.conn)
return c.prov.Close()
@ -82,7 +83,7 @@ func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
t.c.l.Lock()
defer t.c.l.Unlock()
name := p.Hash().HexString()
return piece{t.c.conn, name, &t.c.l, p.Length(), &t.c.blob}
return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
}
func (t torrent) Close() error {
@ -91,29 +92,10 @@ func (t torrent) Close() error {
type piece struct {
conn conn
name string
l *sync.Mutex
name string
blobs map[string]*sqlite.Blob
length int64
blob **sqlite.Blob
}
func (p2 piece) getBlob() *sqlite.Blob {
rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
if err != nil {
panic(err)
}
if *p2.blob != nil {
err := (*p2.blob).Close()
if err != nil {
panic(err)
}
*p2.blob = nil
}
*p2.blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
if err != nil {
panic(err)
}
return *p2.blob
}
func (p2 piece) ReadAt(p []byte, off int64) (n int, err error) {
@ -140,10 +122,21 @@ func (p2 piece) MarkComplete() error {
if changes != 1 {
panic(changes)
}
p2.blobWouldExpire()
return nil
}
func (p2 piece) blobWouldExpire() {
blob, ok := p2.blobs[p2.name]
if !ok {
return
}
blob.Close()
delete(p2.blobs, p2.name)
}
func (p2 piece) MarkNotComplete() error {
p2.blobWouldExpire()
return sqlitex.Exec(p2.conn, "update blob set verified=false where name=?", nil, p2.name)
}
@ -160,3 +153,26 @@ func (p2 piece) Completion() (ret storage.Completion) {
}
return
}
func (p2 piece) closeBlobIfExists() {
if b, ok := p2.blobs[p2.name]; ok {
b.Close()
delete(p2.blobs, p2.name)
}
}
func (p2 piece) getBlob() *sqlite.Blob {
blob, ok := p2.blobs[p2.name]
if !ok {
rowid, err := rowidForBlob(p2.conn, p2.name, p2.length)
if err != nil {
panic(err)
}
blob, err = p2.conn.OpenBlob("main", "blob", "data", rowid, true)
if err != nil {
panic(err)
}
p2.blobs[p2.name] = blob
}
return blob
}

View File

@ -10,6 +10,7 @@ import (
"testing"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/torrent/storage"
test_storage "github.com/anacrolix/torrent/storage/test"
qt "github.com/frankban/quicktest"
"github.com/stretchr/testify/assert"
@ -68,42 +69,51 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
const capacity = test_storage.DefaultCapacity
const capacity = test_storage.DefaultNumPieces * pieceSize / 2
c := qt.New(b)
for _, memory := range []bool{false, true} {
b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
for _, batchWrites := range []bool{false, true} {
b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
dbPath := filepath.Join(b.TempDir(), "storage.db")
//b.Logf("storage db path: %q", dbPath)
newPoolOpts := NewPoolOpts{
Path: dbPath,
Capacity: 4*pieceSize - 1,
NoConcurrentBlobReads: false,
PageSize: 1 << 14,
Memory: memory,
}
provOpts := func(opts *ProviderOpts) {
opts.BatchWrites = batchWrites
}
b.Run("SqlitePieceStorage", func(b *testing.B) {
ci, err := NewPiecesStorage(NewPiecesStorageOpts{
NewPoolOpts: newPoolOpts,
ProvOpts: provOpts,
for _, storage := range []struct {
name string
maker func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser
}{
{"SqliteDirect", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
ci, err := NewDirectStorage(NewDirectStorageOpts{
NewPoolOpts: newPoolOpts,
ProvOpts: provOpts,
})
c.Assert(err, qt.IsNil)
return ci
}},
{"SqlitePieceStorage", func(newPoolOpts NewPoolOpts, provOpts func(*ProviderOpts)) storage.ClientImplCloser {
ci, err := NewPiecesStorage(NewPiecesStorageOpts{
NewPoolOpts: newPoolOpts,
ProvOpts: provOpts,
})
c.Assert(err, qt.IsNil)
return ci
}},
} {
b.Run(storage.name, func(b *testing.B) {
for _, memory := range []bool{false, true} {
b.Run(fmt.Sprintf("Memory=%v", memory), func(b *testing.B) {
for _, batchWrites := range []bool{false, true} {
b.Run(fmt.Sprintf("BatchWrites=%v", batchWrites), func(b *testing.B) {
dbPath := filepath.Join(b.TempDir(), "storage.db")
//b.Logf("storage db path: %q", dbPath)
newPoolOpts := NewPoolOpts{
Path: dbPath,
Capacity: capacity,
NoConcurrentBlobReads: false,
PageSize: 1 << 14,
Memory: memory,
}
provOpts := func(opts *ProviderOpts) {
opts.BatchWrites = batchWrites
}
ci := storage.maker(newPoolOpts, provOpts)
defer ci.Close()
test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
})
c.Assert(err, qt.IsNil)
defer ci.Close()
test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
})
b.Run("SqliteDirect", func(b *testing.B) {
ci, err := NewDirectStorage(NewDirectStorageOpts{
NewPoolOpts: newPoolOpts,
ProvOpts: provOpts,
})
c.Assert(err, qt.IsNil)
defer ci.Close()
test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
})
}
})
}
})

View File

@ -23,7 +23,10 @@ const (
func BenchmarkPieceMarkComplete(
b *testing.B, ci storage.ClientImpl,
pieceSize int64, numPieces int, capacity int64,
pieceSize int64, numPieces int,
// This drives any special handling around capacity that may be configured into the storage
// implementation.
capacity int64,
) {
const check = true
c := qt.New(b)
@ -60,16 +63,18 @@ func BenchmarkPieceMarkComplete(
}(off)
}
wg.Wait()
b.StopTimer()
if capacity == 0 {
pi.MarkNotComplete()
}
b.StartTimer()
// This might not apply if users of this benchmark don't cache with the expected capacity.
c.Assert(pi.Completion(), qt.Equals, storage.Completion{Complete: false, Ok: true})
c.Assert(pi.MarkComplete(), qt.IsNil)
c.Assert(pi.Completion(), qt.Equals, storage.Completion{true, true})
if check {
readData, err := ioutil.ReadAll(io.NewSectionReader(pi, 0, int64(len(data))))
c.Assert(err, qt.IsNil)
c.Check(err, qt.IsNil)
c.Assert(len(readData), qt.Equals, len(data))
c.Assert(bytes.Equal(readData, data), qt.IsTrue)
}