2
0
mirror of synced 2025-02-24 14:48:27 +00:00
torrent/storage/sqlite/sqlite-storage.go

376 lines
7.9 KiB
Go
Raw Normal View History

package sqliteStorage
import (
"bytes"
2020-10-23 09:03:44 +11:00
"context"
"errors"
2020-10-13 09:36:58 +11:00
"fmt"
"io"
2020-10-27 17:07:49 +11:00
"log"
"os"
"sync"
"time"
"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
"github.com/anacrolix/missinggo/iter"
"github.com/anacrolix/missinggo/v2/resource"
)
type conn = *sqlite.Conn
2020-10-27 17:07:49 +11:00
func initConn(conn conn, wal bool) error {
err := sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
if err != nil {
return err
}
if !wal {
err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
if err != nil {
return err
}
}
err = sqlitex.ExecTransient(conn, `pragma mmap_size=1000000000`, nil)
if err != nil {
return err
}
return nil
}
func initSchema(conn conn) error {
return sqlitex.ExecScript(conn, `
pragma auto_vacuum=incremental;
create table if not exists blob(
2020-10-13 09:36:58 +11:00
name text,
last_used timestamp default (datetime('now')),
data blob,
primary key (name)
);
create table if not exists setting(
name primary key on conflict replace,
value
);
create view if not exists deletable_blob as
with recursive excess(
usage_with,
last_used,
blob_rowid,
data_length
) as (
select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
where usage_with >= (select value from setting where name='capacity')
union all
select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess join blob
on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
where usage_with >= (select value from setting where name='capacity')
) select * from excess;
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
delete from blob where rowid in (select blob_rowid from deletable_blob);
end;
`)
}
2020-10-23 09:03:44 +11:00
// Emulates a pool from a single Conn.
type poolFromConn struct {
mu sync.Mutex
conn conn
}
func (me *poolFromConn) Get(ctx context.Context) conn {
me.mu.Lock()
return me.conn
}
func (me *poolFromConn) Put(conn conn) {
if conn != me.conn {
panic("expected to same conn")
}
me.mu.Unlock()
}
func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
2020-10-27 17:07:49 +11:00
err = initConn(conn, false)
if err != nil {
return
}
err = initSchema(conn)
2020-10-23 09:03:44 +11:00
return &provider{&poolFromConn{conn: conn}}, err
}
2020-10-27 17:07:49 +11:00
// Needs the pool size so it can initialize all the connections with pragmas.
func NewProviderPool(pool *sqlitex.Pool, numConns int, wal bool) (_ *provider, err error) {
_, err = initPoolConns(context.TODO(), pool, numConns, wal)
if err != nil {
return
}
2020-10-23 09:03:44 +11:00
conn := pool.Get(context.TODO())
defer pool.Put(conn)
err = initSchema(conn)
2020-10-23 09:03:44 +11:00
return &provider{pool: pool}, err
}
2020-10-27 17:07:49 +11:00
func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int, wal bool) (numInited int, err error) {
var conns []conn
defer func() {
for _, c := range conns {
pool.Put(c)
}
}()
for range iter.N(numConn) {
conn := pool.Get(ctx)
if conn == nil {
break
}
conns = append(conns, conn)
2020-10-27 17:07:49 +11:00
err = initConn(conn, wal)
if err != nil {
err = fmt.Errorf("initing conn %v: %w", len(conns), err)
return
}
numInited++
}
return
}
2020-10-23 09:03:44 +11:00
type pool interface {
Get(context.Context) conn
Put(conn)
}
type provider struct {
2020-10-23 09:03:44 +11:00
pool pool
}
func (p *provider) NewInstance(s string) (resource.Instance, error) {
return instance{s, p}, nil
}
type instance struct {
location string
p *provider
}
func (i instance) withConn(with func(conn conn)) {
2020-10-23 09:03:44 +11:00
conn := i.p.pool.Get(context.TODO())
2020-10-27 17:07:49 +11:00
//err := sqlitex.Exec(conn, "pragma synchronous", func(stmt *sqlite.Stmt) error {
// log.Print(stmt.ColumnText(0))
// return nil
//})
//if err != nil {
// log.Print(err)
//}
2020-10-23 09:03:44 +11:00
defer i.p.pool.Put(conn)
with(conn)
}
2020-10-23 09:03:44 +11:00
func (i instance) getConn() *sqlite.Conn {
return i.p.pool.Get(context.TODO())
}
2020-10-23 09:03:44 +11:00
func (i instance) putConn(conn *sqlite.Conn) {
i.p.pool.Put(conn)
}
func (i instance) Readdirnames() (names []string, err error) {
prefix := i.location + "/"
i.withConn(func(conn conn) {
err = sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
names = append(names, stmt.ColumnText(0)[len(prefix):])
return nil
}, prefix+"%")
})
2020-10-13 09:36:58 +11:00
//log.Printf("readdir %q gave %q", i.location, names)
return
}
func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
rows := 0
err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
rowid = stmt.ColumnInt64(0)
rows++
return nil
}, i.location)
if err != nil {
return
}
if rows == 1 {
return
}
if rows == 0 {
err = errors.New("blob not found")
return
}
panic(rows)
}
type connBlob struct {
*sqlite.Blob
onClose func()
}
func (me connBlob) Close() error {
err := me.Blob.Close()
me.onClose()
return err
}
func (i instance) Get() (ret io.ReadCloser, err error) {
2020-10-23 09:03:44 +11:00
conn := i.getConn()
blob, err := i.openBlob(conn, false, true)
if err != nil {
2020-10-23 09:03:44 +11:00
i.putConn(conn)
return
}
var once sync.Once
return connBlob{blob, func() {
2020-10-23 09:03:44 +11:00
once.Do(func() { i.putConn(conn) })
}}, nil
}
2020-10-13 09:36:58 +11:00
func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
rowid, err := i.getBlobRowid(conn)
if err != nil {
return nil, err
}
2020-10-23 09:03:44 +11:00
// This seems to cause locking issues with in-memory databases. Is it something to do with not
// having WAL?
2020-10-13 09:36:58 +11:00
if updateAccess {
err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
2020-10-13 09:36:58 +11:00
if err != nil {
err = fmt.Errorf("updating last_used: %w", err)
return nil, err
}
if conn.Changes() != 1 {
panic(conn.Changes())
}
}
return conn.OpenBlob("main", "blob", "data", rowid, write)
}
func (i instance) Put(reader io.Reader) (err error) {
var buf bytes.Buffer
_, err = io.Copy(&buf, reader)
if err != nil {
return err
}
i.withConn(func(conn conn) {
2020-10-27 17:07:49 +11:00
for range iter.N(10) {
err = sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, ?)", nil, i.location, buf.Bytes())
if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
log.Print("sqlite busy")
time.Sleep(time.Second)
continue
}
break
}
})
return
}
type fileInfo struct {
size int64
}
func (f fileInfo) Name() string {
panic("implement me")
}
func (f fileInfo) Size() int64 {
return f.size
}
func (f fileInfo) Mode() os.FileMode {
panic("implement me")
}
func (f fileInfo) ModTime() time.Time {
panic("implement me")
}
func (f fileInfo) IsDir() bool {
panic("implement me")
}
func (f fileInfo) Sys() interface{} {
panic("implement me")
}
func (i instance) Stat() (ret os.FileInfo, err error) {
i.withConn(func(conn conn) {
var blob *sqlite.Blob
2020-10-13 09:36:58 +11:00
blob, err = i.openBlob(conn, false, false)
if err != nil {
return
}
defer blob.Close()
ret = fileInfo{blob.Size()}
})
return
}
func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
i.withConn(func(conn conn) {
if false {
var blob *sqlite.Blob
blob, err = i.openBlob(conn, false, true)
if err != nil {
return
}
defer blob.Close()
if off >= blob.Size() {
err = io.EOF
return
}
if off+int64(len(p)) > blob.Size() {
p = p[:blob.Size()-off]
}
n, err = blob.ReadAt(p, off)
} else {
gotRow := false
err = sqlitex.Exec(
conn,
"select substr(data, ?, ?) from blob where name=?",
func(stmt *sqlite.Stmt) error {
if gotRow {
panic("found multiple matching blobs")
} else {
gotRow = true
}
n = stmt.ColumnBytes(0, p)
return nil
},
off+1, len(p), i.location,
)
if err != nil {
return
}
if !gotRow {
err = errors.New("blob not found")
return
}
if n < len(p) {
err = io.EOF
}
}
})
return
}
func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
panic("implement me")
}
func (i instance) Delete() (err error) {
i.withConn(func(conn conn) {
err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
})
return
}