Improve configurability and add PutSized to sqlite storage

This commit is contained in:
Matt Joiner 2021-01-18 19:29:53 +11:00
parent 434dfdf6e0
commit cb5f80ec11
5 changed files with 120 additions and 44 deletions

View File

@ -152,7 +152,12 @@ func TestAddDropManyTorrents(t *testing.T) {
}
func fileCachePieceResourceStorage(fc *filecache.Cache) storage.ClientImpl {
return storage.NewResourcePieces(fc.AsResourceProvider())
return storage.NewResourcePiecesOpts(
fc.AsResourceProvider(),
storage.ResourcePiecesOpts{
LeaveIncompleteChunks: true,
},
)
}
func TestMergingTrackersByAddingSpecs(t *testing.T) {

View File

@ -27,6 +27,7 @@ func BenchmarkPieceMarkComplete(tb testing.TB, pi PieceImpl, data []byte) {
}(off)
}
wg.Wait()
//pi.MarkNotComplete()
// This might not apply if users of this benchmark don't cache with the expected capacity.
c.Assert(pi.Completion(), qt.Equals, Completion{Complete: false, Ok: true})
c.Assert(pi.MarkComplete(), qt.IsNil)

View File

@ -14,12 +14,22 @@ import (
)
type piecePerResource struct {
p PieceProvider
rp PieceProvider
opts ResourcePiecesOpts
}
type ResourcePiecesOpts struct {
LeaveIncompleteChunks bool
AllowSizedPuts bool
}
func NewResourcePieces(p PieceProvider) ClientImpl {
return NewResourcePiecesOpts(p, ResourcePiecesOpts{})
}
func NewResourcePiecesOpts(p PieceProvider, opts ResourcePiecesOpts) ClientImpl {
return &piecePerResource{
p: p,
rp: p,
}
}
@ -37,8 +47,8 @@ func (s piecePerResource) OpenTorrent(info *metainfo.Info, infoHash metainfo.Has
func (s piecePerResource) Piece(p metainfo.Piece) PieceImpl {
return piecePerResourcePiece{
mp: p,
rp: s.p,
mp: p,
piecePerResource: s,
}
}
@ -52,7 +62,7 @@ type ConsecutiveChunkWriter interface {
type piecePerResourcePiece struct {
mp metainfo.Piece
rp resource.Provider
piecePerResource
}
var _ io.WriterTo = piecePerResourcePiece{}
@ -90,6 +100,10 @@ func (s piecePerResourcePiece) Completion() Completion {
}
}
type SizedPutter interface {
PutSized(io.Reader, int64) error
}
func (s piecePerResourcePiece) MarkComplete() error {
incompleteChunks := s.getChunks()
r, w := io.Pipe()
@ -102,8 +116,19 @@ func (s piecePerResourcePiece) MarkComplete() error {
}
w.CloseWithError(err)
}()
err := s.completed().Put(r)
if err == nil {
completedInstance := s.completed()
err := func() error {
if sp, ok := completedInstance.(SizedPutter); ok && s.opts.AllowSizedPuts {
return sp.PutSized(r, s.mp.Length())
} else {
return completedInstance.Put(r)
}
}()
if err == nil && !s.opts.LeaveIncompleteChunks {
// I think we do this synchronously here since we don't want callers to act on the completed
// piece if we're concurrently still deleting chunks. The caller may decide to start
// downloading chunks again and won't expect us to delete them. It seems to be much faster
// to let the resource provider do this if possible.
var wg sync.WaitGroup
for _, c := range incompleteChunks {
wg.Add(1)

View File

@ -52,6 +52,7 @@ func initSchema(conn conn) error {
-- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
-- can trim the database file size with partial vacuums without having to do a full vacuum, which
-- locks everything.
pragma page_size=16384;
pragma auto_vacuum=incremental;
create table if not exists blob (
@ -139,7 +140,10 @@ func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error)
conns.Close()
return
}
store := storage.NewResourcePieces(prov)
store := storage.NewResourcePiecesOpts(prov, storage.ResourcePiecesOpts{
LeaveIncompleteChunks: true,
AllowSizedPuts: true,
})
return struct {
storage.ClientImpl
io.Closer
@ -259,15 +263,20 @@ func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
if err != nil {
return
}
writes := make(chan writeRequest, 1<<(20-14))
prov := &provider{pool: pool, writes: writes, opts: opts}
runtime.SetFinalizer(prov, func(p *provider) {
// This is done in a finalizer, as it's easier than trying to synchronize on whether the
// channel has been closed. It also means that the provider writer can pass back errors from
// a closed ConnPool.
close(p.writes)
})
go providerWriter(writes, prov.pool)
prov := &provider{pool: pool, opts: opts}
if opts.BatchWrites {
if opts.NumConns < 2 {
err = errors.New("batch writes requires more than 1 conn")
return
}
writes := make(chan writeRequest)
prov.writes = writes
// This is retained for backwards compatibility. It may not be necessary.
runtime.SetFinalizer(prov, func(p *provider) {
p.Close()
})
go providerWriter(writes, prov.pool)
}
return prov, nil
}
@ -301,9 +310,11 @@ type ConnPool interface {
}
type provider struct {
pool ConnPool
writes chan<- writeRequest
opts ProviderOpts
pool ConnPool
writes chan<- writeRequest
opts ProviderOpts
closed sync.Once
closeErr error
}
var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
@ -337,7 +348,13 @@ func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (written i
}
func (me *provider) Close() error {
return me.pool.Close()
me.closed.Do(func() {
if me.writes != nil {
close(me.writes)
}
me.closeErr = me.pool.Close()
})
return me.closeErr
}
type writeRequest struct {
@ -350,6 +367,11 @@ var expvars = expvar.NewMap("sqliteStorage")
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
// stronger typing on the writes channel.
func providerWriter(writes <-chan writeRequest, pool ConnPool) {
conn := pool.Get(context.TODO())
if conn == nil {
return
}
defer pool.Put(conn)
for {
first, ok := <-writes
if !ok {
@ -358,11 +380,6 @@ func providerWriter(writes <-chan writeRequest, pool ConnPool) {
var buf []func()
var cantFail error
func() {
conn := pool.Get(context.TODO())
if conn == nil {
return
}
defer pool.Put(conn)
defer sqlitex.Save(conn)(&cantFail)
firstErr := first.query(conn)
buf = append(buf, func() { first.done <- firstErr })
@ -513,28 +530,50 @@ func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, e
return conn.OpenBlob("main", "blob", "data", rowid, write)
}
func (i instance) PutSized(reader io.Reader, size int64) (err error) {
err = i.withConn(func(conn conn) error {
err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
nil,
i.location, size)
if err != nil {
return err
}
blob, err := i.openBlob(conn, true, false)
if err != nil {
return err
}
defer blob.Close()
_, err = io.Copy(blob, reader)
return err
}, true)
return
}
func (i instance) Put(reader io.Reader) (err error) {
var buf bytes.Buffer
_, err = io.Copy(&buf, reader)
if err != nil {
return err
}
err = i.withConn(func(conn conn) error {
for range iter.N(10) {
err = sqlitex.Exec(conn,
"insert or replace into blob(name, data) values(?, cast(? as blob))",
nil,
i.location, buf.Bytes())
if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
log.Print("sqlite busy")
time.Sleep(time.Second)
continue
if false {
return i.PutSized(&buf, int64(buf.Len()))
} else {
return i.withConn(func(conn conn) error {
for range iter.N(10) {
err = sqlitex.Exec(conn,
"insert or replace into blob(name, data) values(?, cast(? as blob))",
nil,
i.location, buf.Bytes())
if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
log.Print("sqlite busy")
time.Sleep(time.Second)
continue
}
break
}
break
}
return err
}, true)
return
return err
}, true)
}
}
type fileInfo struct {

View File

@ -42,7 +42,7 @@ func TestTextBlobSize(t *testing.T) {
func TestSimultaneousIncrementalBlob(t *testing.T) {
_, p := newConnsAndProv(t, NewPoolOpts{
NumConns: 2,
NumConns: 3,
ConcurrentBlobReads: true,
})
a, err := p.NewInstance("a")
@ -76,7 +76,11 @@ func BenchmarkMarkComplete(b *testing.B) {
rand.Read(data)
dbPath := filepath.Join(b.TempDir(), "storage.db")
b.Logf("storage db path: %q", dbPath)
ci, err := NewPiecesStorage(NewPoolOpts{Path: dbPath, Capacity: pieceSize})
ci, err := NewPiecesStorage(NewPoolOpts{
Path: dbPath,
Capacity: pieceSize,
ConcurrentBlobReads: true,
})
c.Assert(err, qt.IsNil)
defer ci.Close()
ti, err := ci.OpenTorrent(nil, metainfo.Hash{})
@ -89,6 +93,8 @@ func BenchmarkMarkComplete(b *testing.B) {
Length: pieceSize,
},
})
// Do it untimed the first time to prime the cache.
storage.BenchmarkPieceMarkComplete(b, pi, data)
b.ResetTimer()
for range iter.N(b.N) {
storage.BenchmarkPieceMarkComplete(b, pi, data)