2014-08-27 01:46:41 +00:00
|
|
|
// Package cassandra implements the Driver interface.
|
|
|
|
package cassandra
|
|
|
|
|
|
|
|
import (
|
2015-01-27 22:40:46 +00:00
|
|
|
"fmt"
|
|
|
|
"net/url"
|
2016-03-19 21:36:47 +00:00
|
|
|
"strconv"
|
2015-01-27 22:40:46 +00:00
|
|
|
"strings"
|
|
|
|
"time"
|
2015-10-22 20:29:26 +00:00
|
|
|
|
|
|
|
"github.com/gocql/gocql"
|
|
|
|
"github.com/mattes/migrate/driver"
|
|
|
|
"github.com/mattes/migrate/file"
|
|
|
|
"github.com/mattes/migrate/migrate/direction"
|
2014-08-27 01:46:41 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
type Driver struct {
|
|
|
|
session *gocql.Session
|
|
|
|
}
|
|
|
|
|
2014-11-17 10:42:19 +00:00
|
|
|
const (
|
|
|
|
tableName = "schema_migrations"
|
|
|
|
versionRow = 1
|
|
|
|
)
|
|
|
|
|
|
|
|
type counterStmt bool
|
|
|
|
|
2016-12-19 03:25:52 +00:00
|
|
|
func (c counterStmt) Exec(session *gocql.Session) error {
|
|
|
|
var version int64
|
|
|
|
if err := session.Query("SELECT version FROM "+tableName+" WHERE versionRow = ?", versionRow).Scan(&version); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-11-17 10:42:19 +00:00
|
|
|
if bool(c) {
|
2016-12-19 03:25:52 +00:00
|
|
|
version++
|
2014-11-17 10:42:19 +00:00
|
|
|
} else {
|
2016-12-19 03:25:52 +00:00
|
|
|
version--
|
2014-11-17 10:42:19 +00:00
|
|
|
}
|
2016-12-19 03:25:52 +00:00
|
|
|
|
|
|
|
return session.Query("UPDATE "+tableName+" SET version = ? WHERE versionRow = ?", version, versionRow).Exec()
|
2014-11-17 10:42:19 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
const (
|
|
|
|
up counterStmt = true
|
|
|
|
down counterStmt = false
|
|
|
|
)
|
2014-08-27 01:46:41 +00:00
|
|
|
|
|
|
|
// Cassandra Driver URL format:
|
2016-07-23 18:47:11 +00:00
|
|
|
// cassandra://host:port/keyspace?protocol=version&consistency=level
|
2014-08-27 01:46:41 +00:00
|
|
|
//
|
2016-07-23 18:47:11 +00:00
|
|
|
// Examples:
|
2016-03-19 21:36:47 +00:00
|
|
|
// cassandra://localhost/SpaceOfKeys?protocol=4
|
2016-07-23 18:47:11 +00:00
|
|
|
// cassandra://localhost/SpaceOfKeys?protocol=4&consistency=all
|
|
|
|
// cassandra://localhost/SpaceOfKeys?consistency=quorum
|
2014-08-27 01:46:41 +00:00
|
|
|
func (driver *Driver) Initialize(rawurl string) error {
|
|
|
|
u, err := url.Parse(rawurl)
|
2016-06-12 21:59:23 +00:00
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("failed to parse connectil url: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
if u.Path == "" {
|
|
|
|
return fmt.Errorf("no keyspace provided in connection url")
|
|
|
|
}
|
2014-08-27 01:46:41 +00:00
|
|
|
|
|
|
|
cluster := gocql.NewCluster(u.Host)
|
|
|
|
cluster.Keyspace = u.Path[1:len(u.Path)]
|
|
|
|
cluster.Consistency = gocql.All
|
|
|
|
cluster.Timeout = 1 * time.Minute
|
|
|
|
|
2016-07-23 18:47:11 +00:00
|
|
|
if len(u.Query().Get("consistency")) > 0 {
|
2016-07-25 16:02:50 +00:00
|
|
|
consistency, err := parseConsistency(u.Query().Get("consistency"))
|
2016-07-23 18:47:11 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
cluster.Consistency = consistency
|
|
|
|
}
|
|
|
|
|
2016-03-19 21:36:47 +00:00
|
|
|
if len(u.Query().Get("protocol")) > 0 {
|
|
|
|
protoversion, err := strconv.Atoi(u.Query().Get("protocol"))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
cluster.ProtoVersion = protoversion
|
|
|
|
}
|
|
|
|
|
2015-01-27 22:40:46 +00:00
|
|
|
// Check if url user struct is null
|
|
|
|
if u.User != nil {
|
|
|
|
password, passwordSet := u.User.Password()
|
|
|
|
|
|
|
|
if passwordSet == false {
|
2015-02-04 20:49:59 +00:00
|
|
|
return fmt.Errorf("Missing password. Please provide password.")
|
2015-01-27 22:40:46 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
cluster.Authenticator = gocql.PasswordAuthenticator{
|
|
|
|
Username: u.User.Username(),
|
|
|
|
Password: password,
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
2014-08-27 01:46:41 +00:00
|
|
|
driver.session, err = cluster.CreateSession()
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
if err := driver.ensureVersionTableExists(); err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (driver *Driver) Close() error {
|
|
|
|
driver.session.Close()
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (driver *Driver) ensureVersionTableExists() error {
|
2016-12-19 03:25:52 +00:00
|
|
|
err := driver.session.Query("CREATE TABLE IF NOT EXISTS " + tableName + " (version int, versionRow bigint primary key);").Exec()
|
2014-08-27 03:19:13 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-08-27 03:24:01 +00:00
|
|
|
_, err = driver.Version()
|
|
|
|
if err != nil {
|
2016-12-19 03:25:52 +00:00
|
|
|
if err.Error() == "not found" {
|
|
|
|
return driver.session.Query("UPDATE "+tableName+" SET version = ? WHERE versionRow = ?", 1, versionRow).Exec()
|
|
|
|
}
|
|
|
|
return err
|
2014-08-27 03:24:01 +00:00
|
|
|
}
|
2014-08-27 03:19:13 +00:00
|
|
|
|
2014-08-27 01:46:41 +00:00
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
func (driver *Driver) FilenameExtension() string {
|
|
|
|
return "cql"
|
|
|
|
}
|
|
|
|
|
2014-11-17 10:42:19 +00:00
|
|
|
func (driver *Driver) version(d direction.Direction, invert bool) error {
|
|
|
|
var stmt counterStmt
|
|
|
|
switch d {
|
|
|
|
case direction.Up:
|
|
|
|
stmt = up
|
|
|
|
case direction.Down:
|
|
|
|
stmt = down
|
|
|
|
}
|
|
|
|
if invert {
|
|
|
|
stmt = !stmt
|
|
|
|
}
|
2016-12-19 03:25:52 +00:00
|
|
|
return stmt.Exec(driver.session)
|
2014-11-17 10:42:19 +00:00
|
|
|
}
|
2014-08-27 01:46:41 +00:00
|
|
|
|
2014-11-17 10:42:19 +00:00
|
|
|
func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {
|
|
|
|
var err error
|
|
|
|
defer func() {
|
2014-08-27 01:46:41 +00:00
|
|
|
if err != nil {
|
2014-11-17 10:42:19 +00:00
|
|
|
// Invert version direction if we couldn't apply the changes for some reason.
|
|
|
|
if err := driver.version(f.Direction, true); err != nil {
|
|
|
|
pipe <- err
|
|
|
|
}
|
2014-08-27 01:46:41 +00:00
|
|
|
pipe <- err
|
|
|
|
}
|
2014-11-17 10:42:19 +00:00
|
|
|
close(pipe)
|
|
|
|
}()
|
2014-08-27 01:46:41 +00:00
|
|
|
|
2014-11-17 10:42:19 +00:00
|
|
|
pipe <- f
|
|
|
|
if err = driver.version(f.Direction, false); err != nil {
|
2014-08-27 01:46:41 +00:00
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2014-11-17 10:42:19 +00:00
|
|
|
if err = f.ReadContent(); err != nil {
|
2014-08-27 01:46:41 +00:00
|
|
|
return
|
|
|
|
}
|
2014-11-17 10:42:19 +00:00
|
|
|
|
2014-11-17 10:54:21 +00:00
|
|
|
for _, query := range strings.Split(string(f.Content), ";") {
|
|
|
|
query = strings.TrimSpace(query)
|
|
|
|
if len(query) == 0 {
|
|
|
|
continue
|
|
|
|
}
|
|
|
|
|
|
|
|
if err = driver.session.Query(query).Exec(); err != nil {
|
|
|
|
return
|
|
|
|
}
|
|
|
|
}
|
2014-08-27 01:46:41 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
func (driver *Driver) Version() (uint64, error) {
|
|
|
|
var version int64
|
2014-08-27 02:43:05 +00:00
|
|
|
err := driver.session.Query("SELECT version FROM "+tableName+" WHERE versionRow = ?", versionRow).Scan(&version)
|
2014-08-27 03:19:13 +00:00
|
|
|
return uint64(version) - 1, err
|
2014-08-27 01:46:41 +00:00
|
|
|
}
|
2015-06-11 10:11:28 +00:00
|
|
|
|
|
|
|
func init() {
|
2015-10-22 20:29:26 +00:00
|
|
|
driver.RegisterDriver("cassandra", &Driver{})
|
2015-06-11 10:11:28 +00:00
|
|
|
}
|
2016-07-23 18:47:11 +00:00
|
|
|
|
2016-07-25 16:02:50 +00:00
|
|
|
// ParseConsistency wraps gocql.ParseConsistency to return an error
|
|
|
|
// instead of a panicing.
|
|
|
|
func parseConsistency(consistencyStr string) (consistency gocql.Consistency, err error) {
|
|
|
|
defer func() {
|
|
|
|
if r := recover(); r != nil {
|
|
|
|
var ok bool
|
|
|
|
err, ok = r.(error)
|
|
|
|
if !ok {
|
2016-07-25 17:25:45 +00:00
|
|
|
err = fmt.Errorf("Failed to parse consistency \"%s\": %v", consistencyStr, r)
|
2016-07-25 16:02:50 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
consistency = gocql.ParseConsistency(consistencyStr)
|
2016-07-23 18:47:11 +00:00
|
|
|
|
2016-07-25 16:02:50 +00:00
|
|
|
return consistency, nil
|
2016-07-23 18:47:11 +00:00
|
|
|
}
|