2020-10-11 12:58:27 +11:00
|
|
|
package sqliteStorage
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
2020-10-23 09:03:44 +11:00
|
|
|
"context"
|
2020-10-11 12:58:27 +11:00
|
|
|
"errors"
|
2020-10-13 09:36:58 +11:00
|
|
|
"fmt"
|
2020-10-11 12:58:27 +11:00
|
|
|
"io"
|
2020-10-27 17:07:49 +11:00
|
|
|
"log"
|
2020-10-30 12:20:54 +11:00
|
|
|
"net/url"
|
2020-10-11 12:58:27 +11:00
|
|
|
"os"
|
2020-10-30 12:20:54 +11:00
|
|
|
"runtime"
|
2020-10-11 12:58:27 +11:00
|
|
|
"sync"
|
|
|
|
"time"
|
|
|
|
|
|
|
|
"crawshaw.io/sqlite"
|
|
|
|
"crawshaw.io/sqlite/sqlitex"
|
2020-10-27 11:08:37 +11:00
|
|
|
"github.com/anacrolix/missinggo/iter"
|
2020-10-11 12:58:27 +11:00
|
|
|
"github.com/anacrolix/missinggo/v2/resource"
|
2020-10-30 12:20:54 +11:00
|
|
|
"github.com/anacrolix/torrent/storage"
|
2020-10-11 12:58:27 +11:00
|
|
|
)
|
|
|
|
|
|
|
|
type conn = *sqlite.Conn
|
|
|
|
|
2020-10-27 17:07:49 +11:00
|
|
|
func initConn(conn conn, wal bool) error {
|
2020-11-03 13:10:17 +11:00
|
|
|
// Recursive triggers are required because we need to trim the blob_meta size after trimming to
|
|
|
|
// capacity. Hopefully we don't hit the recursion limit, and if we do, there's an error thrown.
|
|
|
|
err := sqlitex.ExecTransient(conn, "pragma recursive_triggers=on", nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
err = sqlitex.ExecTransient(conn, `pragma synchronous=off`, nil)
|
2020-10-27 17:07:49 +11:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
if !wal {
|
|
|
|
err = sqlitex.ExecTransient(conn, `pragma journal_mode=off`, nil)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2020-10-30 18:40:47 +11:00
|
|
|
err = sqlitex.ExecTransient(conn, `pragma mmap_size=1000000000000`, nil)
|
2020-10-27 17:07:49 +11:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
2020-10-27 11:08:37 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
func initSchema(conn conn) error {
|
2020-10-11 12:58:27 +11:00
|
|
|
return sqlitex.ExecScript(conn, `
|
2020-11-03 13:10:17 +11:00
|
|
|
-- We have to opt into this before creating any tables, or before a vacuum to enable it. It means we
|
|
|
|
-- can trim the database file size with partial vacuums without having to do a full vacuum, which
|
|
|
|
-- locks everything.
|
2020-10-27 11:08:37 +11:00
|
|
|
pragma auto_vacuum=incremental;
|
|
|
|
|
2020-11-03 13:10:17 +11:00
|
|
|
create table if not exists blob (
|
2020-10-13 09:36:58 +11:00
|
|
|
name text,
|
|
|
|
last_used timestamp default (datetime('now')),
|
|
|
|
data blob,
|
|
|
|
primary key (name)
|
|
|
|
);
|
2020-10-27 11:08:37 +11:00
|
|
|
|
2020-11-03 13:10:17 +11:00
|
|
|
create table if not exists blob_meta (
|
|
|
|
key text primary key,
|
|
|
|
value
|
|
|
|
);
|
|
|
|
|
|
|
|
-- While sqlite *seems* to be faster to get sum(length(data)) instead of
|
|
|
|
-- sum(length(cast(data as blob))), it may still require a large table scan at start-up or with a
|
|
|
|
-- cold-cache. With this we can be assured that it doesn't.
|
|
|
|
insert or ignore into blob_meta values ('size', 0);
|
|
|
|
|
|
|
|
create table if not exists setting (
|
2020-10-27 11:08:37 +11:00
|
|
|
name primary key on conflict replace,
|
|
|
|
value
|
|
|
|
);
|
|
|
|
|
|
|
|
create view if not exists deletable_blob as
|
2020-11-03 13:10:17 +11:00
|
|
|
with recursive excess (
|
2020-10-27 11:08:37 +11:00
|
|
|
usage_with,
|
|
|
|
last_used,
|
|
|
|
blob_rowid,
|
|
|
|
data_length
|
|
|
|
) as (
|
2020-11-03 13:10:17 +11:00
|
|
|
select *
|
|
|
|
from (
|
|
|
|
select
|
|
|
|
(select value from blob_meta where key='size') as usage_with,
|
|
|
|
last_used,
|
|
|
|
rowid,
|
|
|
|
length(cast(data as blob))
|
|
|
|
from blob order by last_used, rowid limit 1
|
|
|
|
)
|
|
|
|
where usage_with >= (select value from setting where name='capacity')
|
2020-10-27 11:08:37 +11:00
|
|
|
union all
|
2020-11-03 13:10:17 +11:00
|
|
|
select
|
|
|
|
usage_with-data_length,
|
|
|
|
blob.last_used,
|
|
|
|
blob.rowid,
|
|
|
|
length(cast(data as blob))
|
|
|
|
from excess join blob
|
|
|
|
on blob.rowid=(select rowid from blob where (last_used, rowid) > (excess.last_used, blob_rowid))
|
2020-10-27 11:08:37 +11:00
|
|
|
where usage_with >= (select value from setting where name='capacity')
|
2020-11-03 13:10:17 +11:00
|
|
|
)
|
|
|
|
select * from excess;
|
2020-10-27 11:08:37 +11:00
|
|
|
|
2020-11-03 13:10:17 +11:00
|
|
|
create trigger if not exists after_insert_blob
|
|
|
|
after insert on blob
|
|
|
|
begin
|
|
|
|
update blob_meta set value=value+length(cast(new.data as blob)) where key='size';
|
2020-10-27 11:08:37 +11:00
|
|
|
delete from blob where rowid in (select blob_rowid from deletable_blob);
|
|
|
|
end;
|
2020-11-02 15:35:07 +11:00
|
|
|
|
2020-11-03 13:10:17 +11:00
|
|
|
create trigger if not exists after_update_blob
|
|
|
|
after update of data on blob
|
2020-11-02 15:35:07 +11:00
|
|
|
begin
|
2020-11-03 13:10:17 +11:00
|
|
|
update blob_meta set value=value+length(cast(new.data as blob))-length(cast(old.data as blob)) where key='size';
|
2020-10-27 11:08:37 +11:00
|
|
|
delete from blob where rowid in (select blob_rowid from deletable_blob);
|
|
|
|
end;
|
2020-11-03 13:10:17 +11:00
|
|
|
|
|
|
|
create trigger if not exists after_delete_blob
|
|
|
|
after delete on blob
|
|
|
|
begin
|
|
|
|
update blob_meta set value=value-length(cast(old.data as blob)) where key='size';
|
|
|
|
end;
|
2020-10-11 12:58:27 +11:00
|
|
|
`)
|
|
|
|
}
|
|
|
|
|
2020-10-30 12:20:54 +11:00
|
|
|
// A convenience function that creates a connection pool, resource provider, and a pieces storage
|
|
|
|
// ClientImpl and returns them all with a Close attached.
|
|
|
|
func NewPiecesStorage(opts NewPoolOpts) (_ storage.ClientImplCloser, err error) {
|
|
|
|
conns, provOpts, err := NewPool(opts)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
prov, err := NewProvider(conns, provOpts)
|
|
|
|
if err != nil {
|
2020-10-30 19:46:51 +11:00
|
|
|
conns.Close()
|
2020-10-30 12:20:54 +11:00
|
|
|
return
|
|
|
|
}
|
|
|
|
store := storage.NewResourcePieces(prov)
|
|
|
|
return struct {
|
|
|
|
storage.ClientImpl
|
|
|
|
io.Closer
|
|
|
|
}{
|
|
|
|
store,
|
2020-10-30 19:46:51 +11:00
|
|
|
prov,
|
2020-10-30 12:20:54 +11:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type NewPoolOpts struct {
|
|
|
|
Path string
|
|
|
|
Memory bool
|
|
|
|
NumConns int
|
|
|
|
// Forces WAL, disables shared caching.
|
|
|
|
ConcurrentBlobReads bool
|
|
|
|
}
|
|
|
|
|
|
|
|
// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
|
|
|
|
// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
|
|
|
|
// top-level option type.
|
|
|
|
type ProviderOpts struct {
|
|
|
|
NumConns int
|
|
|
|
// Concurrent blob reads require WAL.
|
|
|
|
ConcurrentBlobRead bool
|
2020-10-30 19:46:51 +11:00
|
|
|
BatchWrites bool
|
2020-10-30 12:20:54 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
func NewPool(opts NewPoolOpts) (_ ConnPool, _ ProviderOpts, err error) {
|
|
|
|
if opts.NumConns == 0 {
|
|
|
|
opts.NumConns = runtime.NumCPU()
|
|
|
|
}
|
|
|
|
if opts.Memory {
|
|
|
|
opts.Path = ":memory:"
|
|
|
|
}
|
|
|
|
values := make(url.Values)
|
|
|
|
if !opts.ConcurrentBlobReads {
|
|
|
|
values.Add("cache", "shared")
|
|
|
|
}
|
|
|
|
path := fmt.Sprintf("file:%s?%s", opts.Path, values.Encode())
|
|
|
|
conns, err := func() (ConnPool, error) {
|
|
|
|
switch opts.NumConns {
|
|
|
|
case 1:
|
2020-10-30 19:46:51 +11:00
|
|
|
conn, err := sqlite.OpenConn(path, 0)
|
2020-10-30 12:20:54 +11:00
|
|
|
return &poolFromConn{conn: conn}, err
|
|
|
|
default:
|
|
|
|
return sqlitex.Open(path, 0, opts.NumConns)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
return conns, ProviderOpts{
|
|
|
|
NumConns: opts.NumConns,
|
|
|
|
ConcurrentBlobRead: opts.ConcurrentBlobReads,
|
2020-10-30 19:46:51 +11:00
|
|
|
BatchWrites: true,
|
2020-10-30 12:20:54 +11:00
|
|
|
}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
|
2020-10-23 09:03:44 +11:00
|
|
|
type poolFromConn struct {
|
|
|
|
mu sync.Mutex
|
|
|
|
conn conn
|
|
|
|
}
|
|
|
|
|
|
|
|
func (me *poolFromConn) Get(ctx context.Context) conn {
|
|
|
|
me.mu.Lock()
|
|
|
|
return me.conn
|
|
|
|
}
|
|
|
|
|
|
|
|
func (me *poolFromConn) Put(conn conn) {
|
|
|
|
if conn != me.conn {
|
|
|
|
panic("expected to same conn")
|
|
|
|
}
|
|
|
|
me.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
2020-10-30 12:20:54 +11:00
|
|
|
func (me *poolFromConn) Close() error {
|
|
|
|
return me.conn.Close()
|
2020-10-23 09:03:44 +11:00
|
|
|
}
|
|
|
|
|
2020-10-30 19:46:51 +11:00
|
|
|
// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
|
|
|
|
// the ConnPool (since it has to initialize all the connections anyway).
|
2020-10-30 12:20:54 +11:00
|
|
|
func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
|
2020-10-30 19:46:51 +11:00
|
|
|
_, err = initPoolConns(context.TODO(), pool, opts.NumConns, true)
|
2020-10-27 11:08:37 +11:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
2020-10-23 09:03:44 +11:00
|
|
|
conn := pool.Get(context.TODO())
|
|
|
|
defer pool.Put(conn)
|
2020-10-27 11:08:37 +11:00
|
|
|
err = initSchema(conn)
|
2020-10-30 19:46:51 +11:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
writes := make(chan writeRequest)
|
|
|
|
prov := &provider{pool: pool, writes: writes, opts: opts}
|
2020-11-02 15:35:07 +11:00
|
|
|
runtime.SetFinalizer(prov, func(p *provider) {
|
|
|
|
// This is done in a finalizer, as it's easier than trying to synchronize on whether the
|
|
|
|
// channel has been closed. It also means that the provider writer can pass back errors from
|
|
|
|
// a closed ConnPool.
|
|
|
|
close(p.writes)
|
|
|
|
})
|
|
|
|
go providerWriter(writes, prov.pool)
|
2020-10-30 19:46:51 +11:00
|
|
|
return prov, nil
|
2020-10-23 09:03:44 +11:00
|
|
|
}
|
|
|
|
|
2020-10-30 12:20:54 +11:00
|
|
|
func initPoolConns(ctx context.Context, pool ConnPool, numConn int, wal bool) (numInited int, err error) {
|
2020-10-27 11:08:37 +11:00
|
|
|
var conns []conn
|
|
|
|
defer func() {
|
|
|
|
for _, c := range conns {
|
|
|
|
pool.Put(c)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
for range iter.N(numConn) {
|
|
|
|
conn := pool.Get(ctx)
|
|
|
|
if conn == nil {
|
|
|
|
break
|
|
|
|
}
|
|
|
|
conns = append(conns, conn)
|
2020-10-27 17:07:49 +11:00
|
|
|
err = initConn(conn, wal)
|
2020-10-27 11:08:37 +11:00
|
|
|
if err != nil {
|
|
|
|
err = fmt.Errorf("initing conn %v: %w", len(conns), err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
numInited++
|
|
|
|
}
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-10-30 12:20:54 +11:00
|
|
|
type ConnPool interface {
|
2020-10-23 09:03:44 +11:00
|
|
|
Get(context.Context) conn
|
|
|
|
Put(conn)
|
2020-10-30 12:20:54 +11:00
|
|
|
Close() error
|
2020-10-11 12:58:27 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
type provider struct {
|
2020-10-30 19:46:51 +11:00
|
|
|
pool ConnPool
|
|
|
|
writes chan<- writeRequest
|
|
|
|
opts ProviderOpts
|
|
|
|
}
|
|
|
|
|
2020-11-02 15:35:07 +11:00
|
|
|
var _ storage.ConsecutiveChunkWriter = (*provider)(nil)
|
|
|
|
|
|
|
|
func (p *provider) WriteConsecutiveChunks(prefix string, w io.Writer) (err error) {
|
|
|
|
p.withConn(func(conn conn) {
|
|
|
|
err = io.EOF
|
|
|
|
err = sqlitex.Exec(conn, `
|
|
|
|
select
|
|
|
|
cast(data as blob),
|
|
|
|
cast(substr(name, ?+1) as integer) as offset
|
|
|
|
from blob
|
|
|
|
where name like ?||'%'
|
|
|
|
order by offset`,
|
|
|
|
func(stmt *sqlite.Stmt) error {
|
|
|
|
r := stmt.ColumnReader(0)
|
|
|
|
//offset := stmt.ColumnInt64(1)
|
|
|
|
//log.Printf("got %v bytes at offset %v", r.Len(), offset)
|
|
|
|
_, err := io.Copy(w, r)
|
|
|
|
return err
|
|
|
|
},
|
|
|
|
len(prefix),
|
|
|
|
prefix,
|
|
|
|
)
|
|
|
|
}, false)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2020-10-30 19:46:51 +11:00
|
|
|
func (me *provider) Close() error {
|
|
|
|
return me.pool.Close()
|
|
|
|
}
|
|
|
|
|
|
|
|
type writeRequest struct {
|
|
|
|
query func(*sqlite.Conn)
|
|
|
|
done chan<- struct{}
|
|
|
|
}
|
|
|
|
|
2020-11-02 15:35:07 +11:00
|
|
|
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
|
|
|
|
// stronger typing on the writes channel.
|
|
|
|
func providerWriter(writes <-chan writeRequest, pool ConnPool) {
|
2020-10-30 19:46:51 +11:00
|
|
|
for {
|
|
|
|
first, ok := <-writes
|
|
|
|
if !ok {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
buf := []writeRequest{first}
|
|
|
|
buffer:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case wr, ok := <-writes:
|
|
|
|
if !ok {
|
|
|
|
break buffer
|
|
|
|
}
|
|
|
|
buf = append(buf, wr)
|
|
|
|
default:
|
|
|
|
break buffer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
var cantFail error
|
|
|
|
func() {
|
2020-11-02 15:35:07 +11:00
|
|
|
conn := pool.Get(context.TODO())
|
|
|
|
defer pool.Put(conn)
|
2020-10-30 19:46:51 +11:00
|
|
|
defer sqlitex.Save(conn)(&cantFail)
|
|
|
|
for _, wr := range buf {
|
|
|
|
wr.query(conn)
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
if cantFail != nil {
|
|
|
|
panic(cantFail)
|
|
|
|
}
|
|
|
|
for _, wr := range buf {
|
|
|
|
close(wr.done)
|
|
|
|
}
|
2020-11-02 15:35:07 +11:00
|
|
|
//log.Printf("batched %v write queries", len(buf))
|
2020-10-30 19:46:51 +11:00
|
|
|
}
|
2020-10-11 12:58:27 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
func (p *provider) NewInstance(s string) (resource.Instance, error) {
|
|
|
|
return instance{s, p}, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type instance struct {
|
|
|
|
location string
|
|
|
|
p *provider
|
|
|
|
}
|
|
|
|
|
2020-11-02 15:35:07 +11:00
|
|
|
func (p *provider) withConn(with func(conn conn), write bool) {
|
|
|
|
if write && p.opts.BatchWrites {
|
2020-10-30 19:46:51 +11:00
|
|
|
done := make(chan struct{})
|
2020-11-02 15:35:07 +11:00
|
|
|
p.writes <- writeRequest{
|
2020-10-30 19:46:51 +11:00
|
|
|
query: with,
|
|
|
|
done: done,
|
|
|
|
}
|
|
|
|
<-done
|
|
|
|
} else {
|
2020-11-02 15:35:07 +11:00
|
|
|
conn := p.pool.Get(context.TODO())
|
|
|
|
defer p.pool.Put(conn)
|
2020-10-30 19:46:51 +11:00
|
|
|
with(conn)
|
|
|
|
}
|
2020-10-11 12:58:27 +11:00
|
|
|
}
|
|
|
|
|
2020-11-02 15:35:07 +11:00
|
|
|
func (i instance) withConn(with func(conn conn), write bool) {
|
|
|
|
i.p.withConn(with, write)
|
|
|
|
}
|
|
|
|
|
2020-10-23 09:03:44 +11:00
|
|
|
func (i instance) getConn() *sqlite.Conn {
|
|
|
|
return i.p.pool.Get(context.TODO())
|
2020-10-11 12:58:27 +11:00
|
|
|
}
|
|
|
|
|
2020-10-23 09:03:44 +11:00
|
|
|
func (i instance) putConn(conn *sqlite.Conn) {
|
|
|
|
i.p.pool.Put(conn)
|
2020-10-11 12:58:27 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) Readdirnames() (names []string, err error) {
|
|
|
|
prefix := i.location + "/"
|
|
|
|
i.withConn(func(conn conn) {
|
2020-10-23 09:01:15 +11:00
|
|
|
err = sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
|
2020-10-11 12:58:27 +11:00
|
|
|
names = append(names, stmt.ColumnText(0)[len(prefix):])
|
|
|
|
return nil
|
|
|
|
}, prefix+"%")
|
2020-10-30 19:46:51 +11:00
|
|
|
}, false)
|
2020-10-13 09:36:58 +11:00
|
|
|
//log.Printf("readdir %q gave %q", i.location, names)
|
2020-10-11 12:58:27 +11:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
|
|
|
|
rows := 0
|
2020-10-23 09:01:15 +11:00
|
|
|
err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
|
2020-10-11 12:58:27 +11:00
|
|
|
rowid = stmt.ColumnInt64(0)
|
|
|
|
rows++
|
|
|
|
return nil
|
|
|
|
}, i.location)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if rows == 1 {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if rows == 0 {
|
|
|
|
err = errors.New("blob not found")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
panic(rows)
|
|
|
|
}
|
|
|
|
|
|
|
|
type connBlob struct {
|
|
|
|
*sqlite.Blob
|
|
|
|
onClose func()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (me connBlob) Close() error {
|
|
|
|
err := me.Blob.Close()
|
|
|
|
me.onClose()
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) Get() (ret io.ReadCloser, err error) {
|
2020-10-23 09:03:44 +11:00
|
|
|
conn := i.getConn()
|
|
|
|
blob, err := i.openBlob(conn, false, true)
|
2020-10-11 12:58:27 +11:00
|
|
|
if err != nil {
|
2020-10-23 09:03:44 +11:00
|
|
|
i.putConn(conn)
|
2020-10-11 12:58:27 +11:00
|
|
|
return
|
|
|
|
}
|
|
|
|
var once sync.Once
|
|
|
|
return connBlob{blob, func() {
|
2020-10-23 09:03:44 +11:00
|
|
|
once.Do(func() { i.putConn(conn) })
|
2020-10-11 12:58:27 +11:00
|
|
|
}}, nil
|
|
|
|
}
|
|
|
|
|
2020-10-13 09:36:58 +11:00
|
|
|
func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
|
2020-10-11 12:58:27 +11:00
|
|
|
rowid, err := i.getBlobRowid(conn)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2020-10-23 09:03:44 +11:00
|
|
|
// This seems to cause locking issues with in-memory databases. Is it something to do with not
|
|
|
|
// having WAL?
|
2020-10-13 09:36:58 +11:00
|
|
|
if updateAccess {
|
2020-10-23 09:01:15 +11:00
|
|
|
err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
|
2020-10-13 09:36:58 +11:00
|
|
|
if err != nil {
|
|
|
|
err = fmt.Errorf("updating last_used: %w", err)
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
if conn.Changes() != 1 {
|
|
|
|
panic(conn.Changes())
|
|
|
|
}
|
|
|
|
}
|
2020-10-23 09:01:15 +11:00
|
|
|
return conn.OpenBlob("main", "blob", "data", rowid, write)
|
2020-10-11 12:58:27 +11:00
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) Put(reader io.Reader) (err error) {
|
|
|
|
var buf bytes.Buffer
|
|
|
|
_, err = io.Copy(&buf, reader)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
i.withConn(func(conn conn) {
|
2020-10-27 17:07:49 +11:00
|
|
|
for range iter.N(10) {
|
2020-10-30 10:47:50 +11:00
|
|
|
err = sqlitex.Exec(conn,
|
|
|
|
"insert or replace into blob(name, data) values(?, cast(? as blob))",
|
|
|
|
nil,
|
|
|
|
i.location, buf.Bytes())
|
2020-10-27 17:07:49 +11:00
|
|
|
if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
|
|
|
|
log.Print("sqlite busy")
|
|
|
|
time.Sleep(time.Second)
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
break
|
|
|
|
}
|
2020-10-30 19:46:51 +11:00
|
|
|
}, true)
|
2020-10-11 12:58:27 +11:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
type fileInfo struct {
|
|
|
|
size int64
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f fileInfo) Name() string {
|
|
|
|
panic("implement me")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f fileInfo) Size() int64 {
|
|
|
|
return f.size
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f fileInfo) Mode() os.FileMode {
|
|
|
|
panic("implement me")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f fileInfo) ModTime() time.Time {
|
|
|
|
panic("implement me")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f fileInfo) IsDir() bool {
|
|
|
|
panic("implement me")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (f fileInfo) Sys() interface{} {
|
|
|
|
panic("implement me")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) Stat() (ret os.FileInfo, err error) {
|
|
|
|
i.withConn(func(conn conn) {
|
|
|
|
var blob *sqlite.Blob
|
2020-10-13 09:36:58 +11:00
|
|
|
blob, err = i.openBlob(conn, false, false)
|
2020-10-11 12:58:27 +11:00
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer blob.Close()
|
|
|
|
ret = fileInfo{blob.Size()}
|
2020-10-30 19:46:51 +11:00
|
|
|
}, false)
|
2020-10-11 12:58:27 +11:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
|
|
|
|
i.withConn(func(conn conn) {
|
2020-10-27 11:08:08 +11:00
|
|
|
if false {
|
|
|
|
var blob *sqlite.Blob
|
|
|
|
blob, err = i.openBlob(conn, false, true)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer blob.Close()
|
|
|
|
if off >= blob.Size() {
|
|
|
|
err = io.EOF
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if off+int64(len(p)) > blob.Size() {
|
|
|
|
p = p[:blob.Size()-off]
|
|
|
|
}
|
|
|
|
n, err = blob.ReadAt(p, off)
|
|
|
|
} else {
|
|
|
|
gotRow := false
|
|
|
|
err = sqlitex.Exec(
|
|
|
|
conn,
|
2020-10-30 10:47:50 +11:00
|
|
|
"select substr(cast(data as blob), ?, ?) from blob where name=?",
|
2020-10-27 11:08:08 +11:00
|
|
|
func(stmt *sqlite.Stmt) error {
|
|
|
|
if gotRow {
|
|
|
|
panic("found multiple matching blobs")
|
|
|
|
} else {
|
|
|
|
gotRow = true
|
|
|
|
}
|
|
|
|
n = stmt.ColumnBytes(0, p)
|
|
|
|
return nil
|
|
|
|
},
|
|
|
|
off+1, len(p), i.location,
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if !gotRow {
|
|
|
|
err = errors.New("blob not found")
|
|
|
|
return
|
|
|
|
}
|
|
|
|
if n < len(p) {
|
|
|
|
err = io.EOF
|
|
|
|
}
|
2020-10-11 12:58:27 +11:00
|
|
|
}
|
2020-10-30 19:46:51 +11:00
|
|
|
}, false)
|
2020-10-11 12:58:27 +11:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
|
|
|
|
panic("implement me")
|
|
|
|
}
|
|
|
|
|
|
|
|
func (i instance) Delete() (err error) {
|
|
|
|
i.withConn(func(conn conn) {
|
2020-10-23 09:01:15 +11:00
|
|
|
err = sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
|
2020-10-30 19:46:51 +11:00
|
|
|
}, true)
|
2020-10-11 12:58:27 +11:00
|
|
|
return
|
|
|
|
}
|