Removed unused sqlite "provider" storage
This commit is contained in:
parent
8df24008ea
commit
19d5905b6c
|
@ -176,11 +176,12 @@ func rowidForBlob(c conn, name string, length int64, create bool) (rowid int64,
|
|||
}
|
||||
|
||||
func (t torrent) Piece(p metainfo.Piece) storage.PieceImpl {
|
||||
ret := piece{sb: SquirrelBlob{
|
||||
p.Hash().HexString(),
|
||||
p.Length(),
|
||||
t.c,
|
||||
},
|
||||
ret := piece{
|
||||
sb: SquirrelBlob{
|
||||
p.Hash().HexString(),
|
||||
p.Length(),
|
||||
t.c,
|
||||
},
|
||||
}
|
||||
ret.ReaderAt = &ret.sb
|
||||
ret.WriterAt = &ret.sb
|
||||
|
|
|
@ -4,28 +4,13 @@
|
|||
package sqliteStorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
_ "embed"
|
||||
"errors"
|
||||
"expvar"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/url"
|
||||
"os"
|
||||
"runtime"
|
||||
"runtime/pprof"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"crawshaw.io/sqlite"
|
||||
"crawshaw.io/sqlite/sqlitex"
|
||||
"github.com/anacrolix/missinggo/iter"
|
||||
"github.com/anacrolix/missinggo/v2/resource"
|
||||
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
)
|
||||
|
||||
type conn = *sqlite.Conn
|
||||
|
@ -160,19 +145,6 @@ func InitSchema(conn conn, pageSize int, triggers bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
type NewPiecesStorageOpts struct {
|
||||
NewPoolOpts
|
||||
InitDbOpts
|
||||
ProvOpts func(*ProviderOpts)
|
||||
StorageOpts func(*storage.ResourcePiecesOpts)
|
||||
}
|
||||
|
||||
type NewPoolOpts struct {
|
||||
NewConnOpts
|
||||
InitConnOpts
|
||||
NumConns int
|
||||
}
|
||||
|
||||
type InitDbOpts struct {
|
||||
DontInitSchema bool
|
||||
PageSize int
|
||||
|
@ -181,21 +153,13 @@ type InitDbOpts struct {
|
|||
NoTriggers bool
|
||||
}
|
||||
|
||||
// There's some overlap here with NewPoolOpts, and I haven't decided what needs to be done. For now,
|
||||
// the fact that the pool opts are a superset, means our helper NewPiecesStorage can just take the
|
||||
// top-level option type.
|
||||
type PoolConf struct {
|
||||
NumConns int
|
||||
JournalMode string
|
||||
}
|
||||
|
||||
// Remove any capacity limits.
|
||||
func UnlimitCapacity(conn conn) error {
|
||||
func unlimitCapacity(conn conn) error {
|
||||
return sqlitex.Exec(conn, "delete from setting where key='capacity'", nil)
|
||||
}
|
||||
|
||||
// Set the capacity limit to exactly this value.
|
||||
func SetCapacity(conn conn, cap int64) error {
|
||||
func setCapacity(conn conn, cap int64) error {
|
||||
return sqlitex.Exec(conn, "insert into setting values ('capacity', ?)", nil, cap)
|
||||
}
|
||||
|
||||
|
@ -229,22 +193,14 @@ func initDatabase(conn conn, opts InitDbOpts) (err error) {
|
|||
return
|
||||
}
|
||||
}
|
||||
if opts.Capacity != 0 {
|
||||
err = SetCapacity(conn, opts.Capacity)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if opts.Capacity < 0 {
|
||||
err = unlimitCapacity(conn)
|
||||
} else if opts.Capacity > 0 {
|
||||
err = setCapacity(conn, opts.Capacity)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func initPoolDatabase(pool ConnPool, opts InitDbOpts) (err error) {
|
||||
withPoolConn(pool, func(c conn) {
|
||||
err = initDatabase(c, opts)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Go fmt, why you so shit?
|
||||
const openConnFlags = 0 |
|
||||
sqlite.SQLITE_OPEN_READWRITE |
|
||||
|
@ -255,545 +211,3 @@ const openConnFlags = 0 |
|
|||
func newConn(opts NewConnOpts) (conn, error) {
|
||||
return sqlite.OpenConn(newOpenUri(opts), openConnFlags)
|
||||
}
|
||||
|
||||
type poolWithNumConns struct {
|
||||
*sqlitex.Pool
|
||||
numConns int
|
||||
}
|
||||
|
||||
func (me poolWithNumConns) NumConns() int {
|
||||
return me.numConns
|
||||
}
|
||||
|
||||
func NewPool(opts NewPoolOpts) (_ ConnPool, err error) {
|
||||
if opts.NumConns == 0 {
|
||||
opts.NumConns = runtime.NumCPU()
|
||||
}
|
||||
switch opts.NumConns {
|
||||
case 1:
|
||||
conn, err := newConn(opts.NewConnOpts)
|
||||
return &poolFromConn{conn: conn}, err
|
||||
default:
|
||||
_pool, err := sqlitex.Open(newOpenUri(opts.NewConnOpts), openConnFlags, opts.NumConns)
|
||||
return poolWithNumConns{_pool, opts.NumConns}, err
|
||||
}
|
||||
}
|
||||
|
||||
// Emulates a ConnPool from a single Conn. Might be faster than using a sqlitex.Pool.
|
||||
type poolFromConn struct {
|
||||
mu sync.Mutex
|
||||
conn conn
|
||||
}
|
||||
|
||||
func (me *poolFromConn) Get(ctx context.Context) conn {
|
||||
me.mu.Lock()
|
||||
return me.conn
|
||||
}
|
||||
|
||||
func (me *poolFromConn) Put(conn conn) {
|
||||
if conn != me.conn {
|
||||
panic("expected to same conn")
|
||||
}
|
||||
me.mu.Unlock()
|
||||
}
|
||||
|
||||
func (me *poolFromConn) Close() error {
|
||||
return me.conn.Close()
|
||||
}
|
||||
|
||||
func (me *poolFromConn) NumConns() int { return 1 }
|
||||
|
||||
type ProviderOpts struct {
|
||||
BatchWrites bool
|
||||
}
|
||||
|
||||
// Needs the ConnPool size so it can initialize all the connections with pragmas. Takes ownership of
|
||||
// the ConnPool (since it has to initialize all the connections anyway).
|
||||
func NewProvider(pool ConnPool, opts ProviderOpts) (_ *provider, err error) {
|
||||
prov := &provider{pool: pool, opts: opts}
|
||||
if opts.BatchWrites {
|
||||
writes := make(chan writeRequest)
|
||||
prov.writes = writes
|
||||
// This is retained for backwards compatibility. It may not be necessary.
|
||||
runtime.SetFinalizer(prov, func(p *provider) {
|
||||
p.Close()
|
||||
})
|
||||
go providerWriter(writes, prov.pool)
|
||||
}
|
||||
return prov, nil
|
||||
}
|
||||
|
||||
type InitPoolOpts struct {
|
||||
NumConns int
|
||||
InitConnOpts
|
||||
}
|
||||
|
||||
func initPoolConns(ctx context.Context, pool ConnPool, opts InitConnOpts) (err error) {
|
||||
var conns []conn
|
||||
defer func() {
|
||||
for _, c := range conns {
|
||||
pool.Put(c)
|
||||
}
|
||||
}()
|
||||
for range iter.N(pool.NumConns()) {
|
||||
conn := pool.Get(ctx)
|
||||
if conn == nil {
|
||||
break
|
||||
}
|
||||
conns = append(conns, conn)
|
||||
err = initConn(conn, opts)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("initing conn %v: %w", len(conns), err)
|
||||
return
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type ConnPool interface {
|
||||
Get(context.Context) conn
|
||||
Put(conn)
|
||||
Close() error
|
||||
NumConns() int
|
||||
}
|
||||
|
||||
func withPoolConn(pool ConnPool, with func(conn)) {
|
||||
c := pool.Get(context.TODO())
|
||||
defer pool.Put(c)
|
||||
with(c)
|
||||
}
|
||||
|
||||
type provider struct {
|
||||
pool ConnPool
|
||||
writes chan<- writeRequest
|
||||
opts ProviderOpts
|
||||
closeMu sync.RWMutex
|
||||
closed bool
|
||||
closeErr error
|
||||
}
|
||||
|
||||
var _ storage.ConsecutiveChunkReader = (*provider)(nil)
|
||||
|
||||
func (p *provider) ReadConsecutiveChunks(prefix string) (io.ReadCloser, error) {
|
||||
p.closeMu.RLock()
|
||||
runner, err := p.getReadWithConnRunner()
|
||||
if err != nil {
|
||||
p.closeMu.RUnlock()
|
||||
return nil, err
|
||||
}
|
||||
r, w := io.Pipe()
|
||||
go func() {
|
||||
defer p.closeMu.RUnlock()
|
||||
err = runner(func(_ context.Context, conn conn) error {
|
||||
var written int64
|
||||
err = sqlitex.Exec(conn, `
|
||||
select
|
||||
data,
|
||||
cast(substr(name, ?+1) as integer) as offset
|
||||
from blob
|
||||
where name like ?||'%'
|
||||
order by offset`,
|
||||
func(stmt *sqlite.Stmt) error {
|
||||
offset := stmt.ColumnInt64(1)
|
||||
if offset != written {
|
||||
return fmt.Errorf("got chunk at offset %v, expected offset %v", offset, written)
|
||||
}
|
||||
// TODO: Avoid intermediate buffers here
|
||||
r := stmt.ColumnReader(0)
|
||||
w1, err := io.Copy(w, r)
|
||||
written += w1
|
||||
return err
|
||||
},
|
||||
len(prefix),
|
||||
prefix,
|
||||
)
|
||||
return err
|
||||
})
|
||||
w.CloseWithError(err)
|
||||
}()
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (me *provider) Close() error {
|
||||
me.closeMu.Lock()
|
||||
defer me.closeMu.Unlock()
|
||||
if me.closed {
|
||||
return me.closeErr
|
||||
}
|
||||
if me.writes != nil {
|
||||
close(me.writes)
|
||||
}
|
||||
me.closeErr = me.pool.Close()
|
||||
me.closed = true
|
||||
return me.closeErr
|
||||
}
|
||||
|
||||
type writeRequest struct {
|
||||
query withConn
|
||||
done chan<- error
|
||||
labels pprof.LabelSet
|
||||
}
|
||||
|
||||
var expvars = expvar.NewMap("sqliteStorage")
|
||||
|
||||
func runQueryWithLabels(query withConn, labels pprof.LabelSet, conn conn) (err error) {
|
||||
pprof.Do(context.Background(), labels, func(ctx context.Context) {
|
||||
// We pass in the context in the hope that the CPU profiler might incorporate sqlite
|
||||
// activity the action that triggered it. It doesn't seem that way, since those calls don't
|
||||
// take a context.Context themselves. It may come in useful in the goroutine profiles
|
||||
// though, and doesn't hurt to expose it here for other purposes should things change.
|
||||
err = query(ctx, conn)
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Intentionally avoids holding a reference to *provider to allow it to use a finalizer, and to have
|
||||
// stronger typing on the writes channel.
|
||||
func providerWriter(writes <-chan writeRequest, pool ConnPool) {
|
||||
conn := pool.Get(context.TODO())
|
||||
if conn == nil {
|
||||
return
|
||||
}
|
||||
defer pool.Put(conn)
|
||||
for {
|
||||
first, ok := <-writes
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
var buf []func()
|
||||
var cantFail error
|
||||
func() {
|
||||
defer sqlitex.Save(conn)(&cantFail)
|
||||
firstErr := runQueryWithLabels(first.query, first.labels, conn)
|
||||
buf = append(buf, func() { first.done <- firstErr })
|
||||
for {
|
||||
select {
|
||||
case wr, ok := <-writes:
|
||||
if ok {
|
||||
err := runQueryWithLabels(wr.query, wr.labels, conn)
|
||||
buf = append(buf, func() { wr.done <- err })
|
||||
continue
|
||||
}
|
||||
default:
|
||||
}
|
||||
break
|
||||
}
|
||||
}()
|
||||
// Not sure what to do if this failed.
|
||||
if cantFail != nil {
|
||||
expvars.Add("batchTransactionErrors", 1)
|
||||
}
|
||||
// Signal done after we know the transaction succeeded.
|
||||
for _, done := range buf {
|
||||
done()
|
||||
}
|
||||
expvars.Add("batchTransactions", 1)
|
||||
expvars.Add("batchedQueries", int64(len(buf)))
|
||||
//log.Printf("batched %v write queries", len(buf))
|
||||
}
|
||||
}
|
||||
|
||||
func (p *provider) NewInstance(s string) (resource.Instance, error) {
|
||||
return instance{s, p}, nil
|
||||
}
|
||||
|
||||
type instance struct {
|
||||
location string
|
||||
p *provider
|
||||
}
|
||||
|
||||
func getLabels(skip int) pprof.LabelSet {
|
||||
return pprof.Labels("sqlite-storage-action", func() string {
|
||||
var pcs [8]uintptr
|
||||
runtime.Callers(skip+3, pcs[:])
|
||||
fs := runtime.CallersFrames(pcs[:])
|
||||
f, _ := fs.Next()
|
||||
funcName := f.Func.Name()
|
||||
funcName = funcName[strings.LastIndexByte(funcName, '.')+1:]
|
||||
//log.Printf("func name: %q", funcName)
|
||||
return funcName
|
||||
}())
|
||||
}
|
||||
|
||||
func (p *provider) withConn(with withConn, write bool, skip int) error {
|
||||
p.closeMu.RLock()
|
||||
// I think we need to check this here because it may not be valid to send to the writes channel
|
||||
// if we're already closed. So don't try to move this check into getReadWithConnRunner.
|
||||
if p.closed {
|
||||
p.closeMu.RUnlock()
|
||||
return errors.New("closed")
|
||||
}
|
||||
if write && p.opts.BatchWrites {
|
||||
done := make(chan error)
|
||||
p.writes <- writeRequest{
|
||||
query: with,
|
||||
done: done,
|
||||
labels: getLabels(skip + 1),
|
||||
}
|
||||
p.closeMu.RUnlock()
|
||||
return <-done
|
||||
} else {
|
||||
defer p.closeMu.RUnlock()
|
||||
runner, err := p.getReadWithConnRunner()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return runner(with)
|
||||
}
|
||||
}
|
||||
|
||||
// Obtains a DB conn and returns a withConn for executing with it. If no error is returned from this
|
||||
// function, the runner *must* be used or the conn is leaked. You should check the provider isn't
|
||||
// closed before using this.
|
||||
func (p *provider) getReadWithConnRunner() (with func(withConn) error, err error) {
|
||||
conn := p.pool.Get(context.TODO())
|
||||
if conn == nil {
|
||||
err = errors.New("couldn't get pool conn")
|
||||
return
|
||||
}
|
||||
with = func(with withConn) error {
|
||||
defer p.pool.Put(conn)
|
||||
return runQueryWithLabels(with, getLabels(1), conn)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
type withConn func(context.Context, conn) error
|
||||
|
||||
func (i instance) withConn(with withConn, write bool) error {
|
||||
return i.p.withConn(with, write, 1)
|
||||
}
|
||||
|
||||
func (i instance) getConn() *sqlite.Conn {
|
||||
return i.p.pool.Get(context.TODO())
|
||||
}
|
||||
|
||||
func (i instance) putConn(conn *sqlite.Conn) {
|
||||
i.p.pool.Put(conn)
|
||||
}
|
||||
|
||||
func (i instance) Readdirnames() (names []string, err error) {
|
||||
prefix := i.location + "/"
|
||||
err = i.withConn(func(_ context.Context, conn conn) error {
|
||||
return sqlitex.Exec(conn, "select name from blob where name like ?", func(stmt *sqlite.Stmt) error {
|
||||
names = append(names, stmt.ColumnText(0)[len(prefix):])
|
||||
return nil
|
||||
}, prefix+"%")
|
||||
}, false)
|
||||
//log.Printf("readdir %q gave %q", i.location, names)
|
||||
return
|
||||
}
|
||||
|
||||
func (i instance) getBlobRowid(conn conn) (rowid int64, err error) {
|
||||
rows := 0
|
||||
err = sqlitex.Exec(conn, "select rowid from blob where name=?", func(stmt *sqlite.Stmt) error {
|
||||
rowid = stmt.ColumnInt64(0)
|
||||
rows++
|
||||
return nil
|
||||
}, i.location)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
if rows == 1 {
|
||||
return
|
||||
}
|
||||
if rows == 0 {
|
||||
err = errors.New("blob not found")
|
||||
return
|
||||
}
|
||||
panic(rows)
|
||||
}
|
||||
|
||||
type connBlob struct {
|
||||
*sqlite.Blob
|
||||
onClose func()
|
||||
}
|
||||
|
||||
func (me connBlob) Close() error {
|
||||
err := me.Blob.Close()
|
||||
me.onClose()
|
||||
return err
|
||||
}
|
||||
|
||||
func (i instance) Get() (ret io.ReadCloser, err error) {
|
||||
conn := i.getConn()
|
||||
if conn == nil {
|
||||
panic("nil sqlite conn")
|
||||
}
|
||||
blob, err := i.openBlob(conn, false, true)
|
||||
if err != nil {
|
||||
i.putConn(conn)
|
||||
return
|
||||
}
|
||||
var once sync.Once
|
||||
return connBlob{blob, func() {
|
||||
once.Do(func() { i.putConn(conn) })
|
||||
}}, nil
|
||||
}
|
||||
|
||||
func (i instance) openBlob(conn conn, write, updateAccess bool) (*sqlite.Blob, error) {
|
||||
rowid, err := i.getBlobRowid(conn)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// This seems to cause locking issues with in-memory databases. Is it something to do with not
|
||||
// having WAL?
|
||||
if updateAccess {
|
||||
err = sqlitex.Exec(conn, "update blob set last_used=datetime('now') where rowid=?", nil, rowid)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("updating last_used: %w", err)
|
||||
return nil, err
|
||||
}
|
||||
if conn.Changes() != 1 {
|
||||
panic(conn.Changes())
|
||||
}
|
||||
}
|
||||
return conn.OpenBlob("main", "blob", "data", rowid, write)
|
||||
}
|
||||
|
||||
func (i instance) PutSized(reader io.Reader, size int64) (err error) {
|
||||
err = i.withConn(func(_ context.Context, conn conn) error {
|
||||
err := sqlitex.Exec(conn, "insert or replace into blob(name, data) values(?, zeroblob(?))",
|
||||
nil,
|
||||
i.location, size)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
blob, err := i.openBlob(conn, true, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer blob.Close()
|
||||
_, err = io.Copy(blob, reader)
|
||||
return err
|
||||
}, true)
|
||||
return
|
||||
}
|
||||
|
||||
func (i instance) Put(reader io.Reader) (err error) {
|
||||
var buf bytes.Buffer
|
||||
_, err = io.Copy(&buf, reader)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if false {
|
||||
return i.PutSized(&buf, int64(buf.Len()))
|
||||
} else {
|
||||
return i.withConn(func(_ context.Context, conn conn) error {
|
||||
for range iter.N(10) {
|
||||
err = sqlitex.Exec(conn,
|
||||
"insert or replace into blob(name, data) values(?, cast(? as blob))",
|
||||
nil,
|
||||
i.location, buf.Bytes())
|
||||
if err, ok := err.(sqlite.Error); ok && err.Code == sqlite.SQLITE_BUSY {
|
||||
log.Print("sqlite busy")
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
}
|
||||
break
|
||||
}
|
||||
return err
|
||||
}, true)
|
||||
}
|
||||
}
|
||||
|
||||
type fileInfo struct {
|
||||
size int64
|
||||
}
|
||||
|
||||
func (f fileInfo) Name() string {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (f fileInfo) Size() int64 {
|
||||
return f.size
|
||||
}
|
||||
|
||||
func (f fileInfo) Mode() os.FileMode {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (f fileInfo) ModTime() time.Time {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (f fileInfo) IsDir() bool {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (f fileInfo) Sys() interface{} {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (i instance) Stat() (ret os.FileInfo, err error) {
|
||||
err = i.withConn(func(_ context.Context, conn conn) error {
|
||||
var blob *sqlite.Blob
|
||||
blob, err = i.openBlob(conn, false, false)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer blob.Close()
|
||||
ret = fileInfo{blob.Size()}
|
||||
return nil
|
||||
}, false)
|
||||
return
|
||||
}
|
||||
|
||||
func (i instance) ReadAt(p []byte, off int64) (n int, err error) {
|
||||
err = i.withConn(func(_ context.Context, conn conn) error {
|
||||
if false {
|
||||
var blob *sqlite.Blob
|
||||
blob, err = i.openBlob(conn, false, true)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer blob.Close()
|
||||
if off >= blob.Size() {
|
||||
err = io.EOF
|
||||
return err
|
||||
}
|
||||
if off+int64(len(p)) > blob.Size() {
|
||||
p = p[:blob.Size()-off]
|
||||
}
|
||||
n, err = blob.ReadAt(p, off)
|
||||
} else {
|
||||
gotRow := false
|
||||
err = sqlitex.Exec(
|
||||
conn,
|
||||
"select substr(data, ?, ?) from blob where name=?",
|
||||
func(stmt *sqlite.Stmt) error {
|
||||
if gotRow {
|
||||
panic("found multiple matching blobs")
|
||||
} else {
|
||||
gotRow = true
|
||||
}
|
||||
n = stmt.ColumnBytes(0, p)
|
||||
return nil
|
||||
},
|
||||
off+1, len(p), i.location,
|
||||
)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if !gotRow {
|
||||
err = errors.New("blob not found")
|
||||
return err
|
||||
}
|
||||
if n < len(p) {
|
||||
err = io.EOF
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, false)
|
||||
return
|
||||
}
|
||||
|
||||
func (i instance) WriteAt(bytes []byte, i2 int64) (int, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (i instance) Delete() error {
|
||||
return i.withConn(func(_ context.Context, conn conn) error {
|
||||
return sqlitex.Exec(conn, "delete from blob where name=?", nil, i.location)
|
||||
}, true)
|
||||
}
|
||||
|
|
|
@ -4,82 +4,19 @@
|
|||
package sqliteStorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"path/filepath"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
_ "github.com/anacrolix/envpprof"
|
||||
"github.com/dustin/go-humanize"
|
||||
qt "github.com/frankban/quicktest"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/anacrolix/torrent/storage"
|
||||
test_storage "github.com/anacrolix/torrent/storage/test"
|
||||
"github.com/dustin/go-humanize"
|
||||
qt "github.com/frankban/quicktest"
|
||||
)
|
||||
|
||||
func newConnsAndProv(t *testing.T, opts NewPoolOpts) (ConnPool, *provider) {
|
||||
opts.Path = filepath.Join(t.TempDir(), "sqlite3.db")
|
||||
pool, err := NewPool(opts)
|
||||
qt.Assert(t, err, qt.IsNil)
|
||||
// sqlitex.Pool.Close doesn't like being called more than once. Let it slide for now.
|
||||
//t.Cleanup(func() { pool.Close() })
|
||||
qt.Assert(t, initPoolDatabase(pool, InitDbOpts{}), qt.IsNil)
|
||||
if !opts.Memory && opts.SetJournalMode == "" {
|
||||
opts.SetJournalMode = "wal"
|
||||
}
|
||||
qt.Assert(t, initPoolConns(context.TODO(), pool, opts.InitConnOpts), qt.IsNil)
|
||||
prov, err := NewProvider(pool, ProviderOpts{BatchWrites: pool.NumConns() > 1})
|
||||
require.NoError(t, err)
|
||||
t.Cleanup(func() { prov.Close() })
|
||||
return pool, prov
|
||||
}
|
||||
|
||||
func TestTextBlobSize(t *testing.T) {
|
||||
_, prov := newConnsAndProv(t, NewPoolOpts{})
|
||||
a, _ := prov.NewInstance("a")
|
||||
err := a.Put(bytes.NewBufferString("\x00hello"))
|
||||
qt.Assert(t, err, qt.IsNil)
|
||||
fi, err := a.Stat()
|
||||
qt.Assert(t, err, qt.IsNil)
|
||||
assert.EqualValues(t, 6, fi.Size())
|
||||
}
|
||||
|
||||
func TestSimultaneousIncrementalBlob(t *testing.T) {
|
||||
_, p := newConnsAndProv(t, NewPoolOpts{
|
||||
NumConns: 3,
|
||||
})
|
||||
a, err := p.NewInstance("a")
|
||||
require.NoError(t, err)
|
||||
const contents = "hello, world"
|
||||
require.NoError(t, a.Put(bytes.NewReader([]byte("hello, world"))))
|
||||
rc1, err := a.Get()
|
||||
require.NoError(t, err)
|
||||
rc2, err := a.Get()
|
||||
require.NoError(t, err)
|
||||
var b1, b2 []byte
|
||||
var e1, e2 error
|
||||
var wg sync.WaitGroup
|
||||
doRead := func(b *[]byte, e *error, rc io.ReadCloser, n int) {
|
||||
defer wg.Done()
|
||||
defer rc.Close()
|
||||
*b, *e = ioutil.ReadAll(rc)
|
||||
require.NoError(t, *e, n)
|
||||
assert.EqualValues(t, contents, *b)
|
||||
}
|
||||
wg.Add(2)
|
||||
go doRead(&b2, &e2, rc2, 2)
|
||||
go doRead(&b1, &e1, rc1, 1)
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func BenchmarkMarkComplete(b *testing.B) {
|
||||
const pieceSize = test_storage.DefaultPieceSize
|
||||
const noTriggers = false
|
||||
|
|
Loading…
Reference in New Issue