2017-06-21 12:58:46 +00:00
|
|
|
package clickhouse
|
|
|
|
|
|
|
|
import (
|
|
|
|
"database/sql"
|
|
|
|
"fmt"
|
|
|
|
"io"
|
|
|
|
"io/ioutil"
|
|
|
|
"net/url"
|
2018-08-03 06:08:47 +00:00
|
|
|
"strings"
|
2017-06-21 12:58:46 +00:00
|
|
|
"time"
|
|
|
|
|
2018-10-10 22:11:48 +00:00
|
|
|
"github.com/golang-migrate/migrate/v4"
|
|
|
|
"github.com/golang-migrate/migrate/v4/database"
|
2019-02-26 23:56:57 +00:00
|
|
|
"github.com/hashicorp/go-multierror"
|
2017-06-21 12:58:46 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
var DefaultMigrationsTable = "schema_migrations"
|
|
|
|
|
|
|
|
var ErrNilConfig = fmt.Errorf("no config")
|
|
|
|
|
|
|
|
type Config struct {
|
2018-08-03 06:08:47 +00:00
|
|
|
DatabaseName string
|
|
|
|
MigrationsTable string
|
|
|
|
MultiStatementEnabled bool
|
2017-06-21 12:58:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func init() {
|
|
|
|
database.Register("clickhouse", &ClickHouse{})
|
|
|
|
}
|
2017-06-21 14:24:55 +00:00
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
func WithInstance(conn *sql.DB, config *Config) (database.Driver, error) {
|
|
|
|
if config == nil {
|
|
|
|
return nil, ErrNilConfig
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := conn.Ping(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ch := &ClickHouse{
|
|
|
|
conn: conn,
|
|
|
|
config: config,
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := ch.init(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return ch, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
type ClickHouse struct {
|
|
|
|
conn *sql.DB
|
|
|
|
config *Config
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ClickHouse) Open(dsn string) (database.Driver, error) {
|
|
|
|
purl, err := url.Parse(dsn)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
q := migrate.FilterCustomQuery(purl)
|
|
|
|
q.Scheme = "tcp"
|
|
|
|
conn, err := sql.Open("clickhouse", q.String())
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
ch = &ClickHouse{
|
|
|
|
conn: conn,
|
|
|
|
config: &Config{
|
2018-08-03 06:08:47 +00:00
|
|
|
MigrationsTable: purl.Query().Get("x-migrations-table"),
|
|
|
|
DatabaseName: purl.Query().Get("database"),
|
|
|
|
MultiStatementEnabled: purl.Query().Get("x-multi-statement") == "true",
|
2017-06-21 12:58:46 +00:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := ch.init(); err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
|
|
|
|
|
|
|
return ch, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ClickHouse) init() error {
|
|
|
|
if len(ch.config.DatabaseName) == 0 {
|
2017-06-21 20:10:43 +00:00
|
|
|
if err := ch.conn.QueryRow("SELECT currentDatabase()").Scan(&ch.config.DatabaseName); err != nil {
|
2017-06-21 12:58:46 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2017-06-21 14:14:43 +00:00
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
if len(ch.config.MigrationsTable) == 0 {
|
|
|
|
ch.config.MigrationsTable = DefaultMigrationsTable
|
|
|
|
}
|
2017-06-21 14:14:43 +00:00
|
|
|
|
|
|
|
return ch.ensureVersionTable()
|
2017-06-21 12:58:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ClickHouse) Run(r io.Reader) error {
|
|
|
|
migration, err := ioutil.ReadAll(r)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2018-08-03 06:08:47 +00:00
|
|
|
|
|
|
|
if ch.config.MultiStatementEnabled {
|
|
|
|
// split query by semi-colon
|
|
|
|
queries := strings.Split(string(migration), ";")
|
|
|
|
for _, q := range queries {
|
|
|
|
tq := strings.TrimSpace(q)
|
|
|
|
if tq == "" {
|
|
|
|
continue
|
|
|
|
}
|
2019-03-22 21:40:35 +00:00
|
|
|
if _, err := ch.conn.Exec(q); err != nil {
|
2018-08-03 06:08:47 +00:00
|
|
|
return database.Error{OrigErr: err, Err: "migration failed", Query: []byte(q)}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
if _, err := ch.conn.Exec(string(migration)); err != nil {
|
|
|
|
return database.Error{OrigErr: err, Err: "migration failed", Query: migration}
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
func (ch *ClickHouse) Version() (int, bool, error) {
|
|
|
|
var (
|
|
|
|
version int
|
|
|
|
dirty uint8
|
|
|
|
query = "SELECT version, dirty FROM `" + ch.config.MigrationsTable + "` ORDER BY sequence DESC LIMIT 1"
|
|
|
|
)
|
|
|
|
if err := ch.conn.QueryRow(query).Scan(&version, &dirty); err != nil {
|
|
|
|
if err == sql.ErrNoRows {
|
|
|
|
return database.NilVersion, false, nil
|
|
|
|
}
|
|
|
|
return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}
|
|
|
|
}
|
|
|
|
return version, dirty == 1, nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ClickHouse) SetVersion(version int, dirty bool) error {
|
|
|
|
var (
|
|
|
|
bool = func(v bool) uint8 {
|
|
|
|
if v {
|
|
|
|
return 1
|
|
|
|
}
|
|
|
|
return 0
|
|
|
|
}
|
|
|
|
tx, err = ch.conn.Begin()
|
|
|
|
)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-06-21 14:12:05 +00:00
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
query := "INSERT INTO " + ch.config.MigrationsTable + " (version, dirty, sequence) VALUES (?, ?, ?)"
|
|
|
|
if _, err := tx.Exec(query, version, bool(dirty), time.Now().UnixNano()); err != nil {
|
|
|
|
return &database.Error{OrigErr: err, Query: []byte(query)}
|
|
|
|
}
|
2017-06-21 14:12:05 +00:00
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
return tx.Commit()
|
|
|
|
}
|
|
|
|
|
2019-02-26 23:56:57 +00:00
|
|
|
// ensureVersionTable checks if versions table exists and, if not, creates it.
|
|
|
|
// Note that this function locks the database, which deviates from the usual
|
|
|
|
// convention of "caller locks" in the ClickHouse type.
|
|
|
|
func (ch *ClickHouse) ensureVersionTable() (err error) {
|
|
|
|
if err = ch.Lock(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer func() {
|
|
|
|
if e := ch.Unlock(); e != nil {
|
|
|
|
if err == nil {
|
|
|
|
err = e
|
|
|
|
} else {
|
|
|
|
err = multierror.Append(err, e)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
var (
|
|
|
|
table string
|
|
|
|
query = "SHOW TABLES FROM " + ch.config.DatabaseName + " LIKE '" + ch.config.MigrationsTable + "'"
|
|
|
|
)
|
|
|
|
// check if migration table exists
|
|
|
|
if err := ch.conn.QueryRow(query).Scan(&table); err != nil {
|
|
|
|
if err != sql.ErrNoRows {
|
|
|
|
return &database.Error{OrigErr: err, Query: []byte(query)}
|
|
|
|
}
|
|
|
|
} else {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
// if not, create the empty migration table
|
|
|
|
query = `
|
|
|
|
CREATE TABLE ` + ch.config.MigrationsTable + ` (
|
2018-12-28 12:53:55 +00:00
|
|
|
version Int64,
|
2017-06-21 12:58:46 +00:00
|
|
|
dirty UInt8,
|
|
|
|
sequence UInt64
|
|
|
|
) Engine=TinyLog
|
|
|
|
`
|
|
|
|
if _, err := ch.conn.Exec(query); err != nil {
|
|
|
|
return &database.Error{OrigErr: err, Query: []byte(query)}
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2019-04-26 22:47:16 +00:00
|
|
|
func (ch *ClickHouse) Drop() (err error) {
|
|
|
|
query := "SHOW TABLES FROM " + ch.config.DatabaseName
|
|
|
|
tables, err := ch.conn.Query(query)
|
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
if err != nil {
|
|
|
|
return &database.Error{OrigErr: err, Query: []byte(query)}
|
|
|
|
}
|
2019-04-26 22:47:16 +00:00
|
|
|
defer func() {
|
|
|
|
if errClose := tables.Close(); errClose != nil {
|
|
|
|
err = multierror.Append(err, errClose)
|
|
|
|
}
|
|
|
|
}()
|
2017-06-21 12:58:46 +00:00
|
|
|
for tables.Next() {
|
|
|
|
var table string
|
|
|
|
if err := tables.Scan(&table); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2017-06-21 14:12:05 +00:00
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
query = "DROP TABLE IF EXISTS " + ch.config.DatabaseName + "." + table
|
2017-06-21 14:12:05 +00:00
|
|
|
|
2017-06-21 12:58:46 +00:00
|
|
|
if _, err := ch.conn.Exec(query); err != nil {
|
|
|
|
return &database.Error{OrigErr: err, Query: []byte(query)}
|
|
|
|
}
|
|
|
|
}
|
2019-02-26 23:56:57 +00:00
|
|
|
return nil
|
2017-06-21 12:58:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (ch *ClickHouse) Lock() error { return nil }
|
|
|
|
func (ch *ClickHouse) Unlock() error { return nil }
|
|
|
|
func (ch *ClickHouse) Close() error { return ch.conn.Close() }
|