sqlite storage: Include capacity management
This commit is contained in:
parent
55d4bcaf26
commit
e30084223d
|
@ -1,11 +1,29 @@
|
|||
with recursive excess(
|
||||
pragma auto_vacuum=incremental;
|
||||
create table if not exists blob(
|
||||
name text,
|
||||
last_used timestamp default (datetime('now')),
|
||||
data blob,
|
||||
primary key (name)
|
||||
);
|
||||
|
||||
create view if not exists deletable_blob as
|
||||
with recursive excess_blob(
|
||||
usage_with,
|
||||
last_used,
|
||||
blob_rowid,
|
||||
data_length
|
||||
) as (
|
||||
select * from (select (select sum(length(data)) from blob), last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
|
||||
select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
|
||||
where usage_with >= (select value from setting where name='capacity')
|
||||
union all
|
||||
select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess join blob
|
||||
select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess_blob join blob
|
||||
on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
|
||||
) select * from excess limit 10;
|
||||
where usage_with >= (select value from setting where name='capacity')
|
||||
) select * from excess;
|
||||
|
||||
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
|
||||
delete from blob where rowid in (select blob_rowid from deletable_blob);
|
||||
end;
|
||||
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
|
||||
delete from blob where rowid in (select blob_rowid from deletable_blob);
|
||||
end;
|
||||
|
|
|
@ -12,19 +12,53 @@ import (
|
|||
|
||||
"crawshaw.io/sqlite"
|
||||
"crawshaw.io/sqlite/sqlitex"
|
||||
"github.com/anacrolix/missinggo/iter"
|
||||
"github.com/anacrolix/missinggo/v2/resource"
|
||||
)
|
||||
|
||||
type conn = *sqlite.Conn
|
||||
|
||||
func initConn(conn conn) error {
|
||||
return sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
|
||||
}
|
||||
|
||||
func initSchema(conn conn) error {
|
||||
return sqlitex.ExecScript(conn, `
|
||||
pragma auto_vacuum=incremental;
|
||||
|
||||
create table if not exists blob(
|
||||
name text,
|
||||
last_used timestamp default (datetime('now')),
|
||||
data blob,
|
||||
primary key (name)
|
||||
);
|
||||
|
||||
create table if not exists setting(
|
||||
name primary key on conflict replace,
|
||||
value
|
||||
);
|
||||
|
||||
create view if not exists deletable_blob as
|
||||
with recursive excess(
|
||||
usage_with,
|
||||
last_used,
|
||||
blob_rowid,
|
||||
data_length
|
||||
) as (
|
||||
select * from (select (select sum(length(data)) from blob) as usage_with, last_used, rowid, length(data) from blob order by last_used, rowid limit 1)
|
||||
where usage_with >= (select value from setting where name='capacity')
|
||||
union all
|
||||
select usage_with-data_length, blob.last_used, blob.rowid, length(data) from excess join blob
|
||||
on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
|
||||
where usage_with >= (select value from setting where name='capacity')
|
||||
) select * from excess;
|
||||
|
||||
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_update after update on blob begin
|
||||
delete from blob where rowid in (select blob_rowid from deletable_blob);
|
||||
end;
|
||||
CREATE TRIGGER if not exists trim_blobs_to_capacity_after_insert after insert on blob begin
|
||||
delete from blob where rowid in (select blob_rowid from deletable_blob);
|
||||
end;
|
||||
`)
|
||||
}
|
||||
|
||||
|
@ -46,18 +80,49 @@ func (me *poolFromConn) Put(conn conn) {
|
|||
me.mu.Unlock()
|
||||
}
|
||||
|
||||
func NewProvider(conn *sqlite.Conn) (*provider, error) {
|
||||
err := initConn(conn)
|
||||
func NewProvider(conn *sqlite.Conn) (_ *provider, err error) {
|
||||
err = initConn(conn)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
err = initSchema(conn)
|
||||
return &provider{&poolFromConn{conn: conn}}, err
|
||||
}
|
||||
|
||||
func NewProviderPool(pool *sqlitex.Pool) (*provider, error) {
|
||||
func NewProviderPool(pool *sqlitex.Pool, numConns int) (_ *provider, err error) {
|
||||
_, err = initPoolConns(context.TODO(), pool, numConns)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
conn := pool.Get(context.TODO())
|
||||
defer pool.Put(conn)
|
||||
err := initConn(conn)
|
||||
err = initSchema(conn)
|
||||
return &provider{pool: pool}, err
|
||||
}
|
||||
|
||||
func initPoolConns(ctx context.Context, pool *sqlitex.Pool, numConn int) (numInited int, err error) {
|
||||
var conns []conn
|
||||
defer func() {
|
||||
for _, c := range conns {
|
||||
pool.Put(c)
|
||||
}
|
||||
}()
|
||||
for range iter.N(numConn) {
|
||||
conn := pool.Get(ctx)
|
||||
if conn == nil {
|
||||
break
|
||||
}
|
||||
conns = append(conns, conn)
|
||||
err = initConn(conn)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("initing conn %v: %w", len(conns), err)
|
||||
return
|
||||
}
|
||||
numInited++
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type pool interface {
|
||||
Get(context.Context) conn
|
||||
Put(conn)
|
||||
|
|
|
@ -24,7 +24,7 @@ func TestSimultaneousIncrementalBlob(t *testing.T) {
|
|||
10)
|
||||
require.NoError(t, err)
|
||||
defer pool.Close()
|
||||
p, err := NewProviderPool(pool)
|
||||
p, err := NewProviderPool(pool, 10)
|
||||
require.NoError(t, err)
|
||||
a, err := p.NewInstance("a")
|
||||
require.NoError(t, err)
|
||||
|
|
Loading…
Reference in New Issue