2
0
mirror of synced 2025-02-23 22:28:11 +00:00

Expose a variety of blob cleanup styles

This commit is contained in:
Matt Joiner 2021-05-06 15:17:31 +10:00
parent abe003b6b3
commit 96574468c5
3 changed files with 84 additions and 21 deletions

View File

@ -2,7 +2,9 @@ package sqliteStorage
import (
"errors"
"runtime"
"sync"
"time"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
@ -14,6 +16,9 @@ type NewDirectStorageOpts struct {
NewConnOpts
InitDbOpts
InitConnOpts
GcBlobs bool
CacheBlobs bool
BlobFlushInterval time.Duration
}
// A convenience function that creates a connection pool, resource provider, and a pieces storage
@ -32,16 +37,41 @@ func NewDirectStorage(opts NewDirectStorageOpts) (_ storage.ClientImplCloser, er
if err != nil {
return
}
return &client{
cl := &client{
conn: conn,
blobs: make(map[string]*sqlite.Blob),
}, nil
opts: opts,
}
if opts.BlobFlushInterval != 0 {
cl.blobFlusher = time.AfterFunc(opts.BlobFlushInterval, cl.blobFlusherFunc)
}
return cl, nil
}
type client struct {
l sync.Mutex
conn conn
blobs map[string]*sqlite.Blob
l sync.Mutex
conn conn
blobs map[string]*sqlite.Blob
blobFlusher *time.Timer
opts NewDirectStorageOpts
closed bool
}
func (c *client) blobFlusherFunc() {
c.l.Lock()
defer c.l.Unlock()
c.flushBlobs()
if !c.closed {
c.blobFlusher.Reset(c.opts.BlobFlushInterval)
}
}
func (c *client) flushBlobs() {
for key, b := range c.blobs {
// Need the lock to prevent racing with the GC finalizers.
b.Close()
delete(c.blobs, key)
}
}
func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (storage.TorrentImpl, error) {
@ -49,8 +79,12 @@ func (c *client) OpenTorrent(info *metainfo.Info, infoHash metainfo.Hash) (stora
}
func (c *client) Close() error {
for _, b := range c.blobs {
b.Close()
c.l.Lock()
defer c.l.Unlock()
c.flushBlobs()
c.closed = true
if c.opts.BlobFlushInterval != 0 {
c.blobFlusher.Stop()
}
return c.conn.Close()
}
@ -82,7 +116,11 @@ func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
t.c.l.Lock()
defer t.c.l.Unlock()
name := p.Hash().HexString()
return piece{t.c.conn, &t.c.l, name, t.c.blobs, p.Length()}
return piece{
name,
p.Length(),
t.c,
}
}
func (t torrent) Close() error {
@ -90,11 +128,9 @@ func (t torrent) Close() error {
}
type piece struct {
conn conn
l *sync.Mutex
name string
blobs map[string]*sqlite.Blob
length int64
*client
}
func (p2 piece) doAtIoWithBlob(
@ -104,13 +140,21 @@ func (p2 piece) doAtIoWithBlob(
) (n int, err error) {
p2.l.Lock()
defer p2.l.Unlock()
//defer p2.blobWouldExpire()
if !p2.opts.CacheBlobs {
defer p2.forgetBlob()
}
n, err = atIo(p2.getBlob())(p, off)
var se sqlite.Error
if !errors.As(err, &se) || se.Code != sqlite.SQLITE_ABORT {
if err == nil {
return
}
p2.blobWouldExpire()
var se sqlite.Error
if !errors.As(err, &se) {
return
}
if se.Code != sqlite.SQLITE_ABORT && !(p2.opts.GcBlobs && se.Code == sqlite.SQLITE_ERROR && se.Msg == "invalid blob") {
return
}
p2.forgetBlob()
return atIo(p2.getBlob())(p, off)
}
@ -140,7 +184,7 @@ func (p2 piece) MarkComplete() error {
return nil
}
func (p2 piece) blobWouldExpire() {
func (p2 piece) forgetBlob() {
blob, ok := p2.blobs[p2.name]
if !ok {
return
@ -185,6 +229,14 @@ func (p2 piece) getBlob() *sqlite.Blob {
if err != nil {
panic(err)
}
if p2.opts.GcBlobs {
herp := new(byte)
runtime.SetFinalizer(herp, func(*byte) {
p2.l.Lock()
defer p2.l.Unlock()
blob.Close()
})
}
p2.blobs[p2.name] = blob
}
return blob

View File

@ -251,7 +251,8 @@ type InitDbOpts struct {
DontInitSchema bool
PageSize int
// If non-zero, overrides the existing setting.
Capacity int64
Capacity int64
NoTriggers bool
}
// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
@ -300,7 +301,7 @@ func initDatabase(conn conn, opts InitDbOpts) (err error) {
if opts.PageSize == 0 {
opts.PageSize = 1 << 14
}
err = InitSchema(conn, opts.PageSize, true)
err = InitSchema(conn, opts.PageSize, !opts.NoTriggers)
if err != nil {
return
}

View File

@ -9,6 +9,7 @@ import (
"path/filepath"
"sync"
"testing"
"time"
_ "github.com/anacrolix/envpprof"
"github.com/anacrolix/torrent/storage"
@ -72,7 +73,12 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
func BenchmarkMarkComplete(b *testing.B) {
const pieceSize = test_storage.DefaultPieceSize
const capacity = test_storage.DefaultNumPieces * pieceSize / 2
const noTriggers = false
var capacity int64 = test_storage.DefaultNumPieces * pieceSize / 2
if noTriggers {
// Since we won't push out old pieces, we have to mark them incomplete manually.
capacity = 0
}
runBench := func(b *testing.B, ci storage.ClientImpl) {
test_storage.BenchmarkPieceMarkComplete(b, ci, pieceSize, test_storage.DefaultNumPieces, capacity)
}
@ -84,6 +90,10 @@ func BenchmarkMarkComplete(b *testing.B) {
opts.Memory = memory
opts.Path = filepath.Join(b.TempDir(), "storage.db")
opts.Capacity = capacity
opts.CacheBlobs = true
//opts.GcBlobs = true
opts.BlobFlushInterval = time.Second
opts.NoTriggers = noTriggers
directBench := func(b *testing.B) {
ci, err := NewDirectStorage(opts)
if errors.Is(err, UnexpectedJournalMode) {
@ -93,10 +103,10 @@ func BenchmarkMarkComplete(b *testing.B) {
defer ci.Close()
runBench(b, ci)
}
for _, journalMode := range []string{"", "wal", "off", "delete", "memory"} {
for _, journalMode := range []string{"", "wal", "off", "truncate", "delete", "persist", "memory"} {
opts.SetJournalMode = journalMode
b.Run("JournalMode="+journalMode, func(b *testing.B) {
for _, mmapSize := range []int64{-1, 0, 1 << 24, 1 << 25, 1 << 26} {
for _, mmapSize := range []int64{-1, 0, 1 << 23, 1 << 24, 1 << 25} {
if memory && mmapSize >= 0 {
continue
}