Merge pull request #20 from balboah/master

Cassandra driver tweaks
This commit is contained in:
Matthias Kadenbach 2014-11-17 12:42:36 -08:00
commit 150ce9b524
3 changed files with 68 additions and 27 deletions

View File

@ -10,4 +10,5 @@ migrate help # for more info
## Authors
* Paul Bergeron, https://github.com/dinedal
* Paul Bergeron, https://github.com/dinedal
* Johnny Bergström, https://github.com/balboah

View File

@ -3,6 +3,7 @@ package cassandra
import (
"net/url"
"strings"
"time"
"github.com/gocql/gocql"
@ -14,8 +15,27 @@ type Driver struct {
session *gocql.Session
}
const tableName = "schema_migrations"
const versionRow = 1
const (
tableName = "schema_migrations"
versionRow = 1
)
type counterStmt bool
func (c counterStmt) String() string {
sign := ""
if bool(c) {
sign = "+"
} else {
sign = "-"
}
return "UPDATE " + tableName + " SET version = version " + sign + " 1 where versionRow = ?"
}
const (
up counterStmt = true
down counterStmt = false
)
// Cassandra Driver URL format:
// cassandra://host:port/keyspace
@ -55,7 +75,7 @@ func (driver *Driver) ensureVersionTableExists() error {
_, err = driver.Version()
if err != nil {
driver.session.Query("UPDATE "+tableName+" SET version = version + 1 where versionRow = ?", versionRow).Exec()
driver.session.Query(up.String(), versionRow).Exec()
}
return nil
@ -65,35 +85,52 @@ func (driver *Driver) FilenameExtension() string {
return "cql"
}
func (driver *Driver) version(d direction.Direction, invert bool) error {
var stmt counterStmt
switch d {
case direction.Up:
stmt = up
case direction.Down:
stmt = down
}
if invert {
stmt = !stmt
}
return driver.session.Query(stmt.String(), versionRow).Exec()
}
func (driver *Driver) Migrate(f file.File, pipe chan interface{}) {
defer close(pipe)
var err error
defer func() {
if err != nil {
// Invert version direction if we couldn't apply the changes for some reason.
if err := driver.version(f.Direction, true); err != nil {
pipe <- err
}
pipe <- err
}
close(pipe)
}()
pipe <- f
if f.Direction == direction.Up {
err := driver.session.Query("UPDATE "+tableName+" SET version = version + 1 where versionRow = ?", versionRow).Exec()
if err != nil {
pipe <- err
return
}
} else if f.Direction == direction.Down {
err := driver.session.Query("UPDATE "+tableName+" SET version = version - 1 where versionRow = ?", versionRow).Exec()
if err != nil {
pipe <- err
return
}
}
if err := f.ReadContent(); err != nil {
pipe <- err
if err = driver.version(f.Direction, false); err != nil {
return
}
err := driver.session.Query(string(f.Content)).Exec()
if err != nil {
pipe <- err
if err = f.ReadContent(); err != nil {
return
}
for _, query := range strings.Split(string(f.Content), ";") {
query = strings.TrimSpace(query)
if len(query) == 0 {
continue
}
if err = driver.session.Query(query).Exec(); err != nil {
return
}
}
}
func (driver *Driver) Version() (uint64, error) {

View File

@ -53,8 +53,11 @@ func TestMigrate(t *testing.T) {
Direction: direction.Up,
Content: []byte(`
CREATE TABLE yolo (
id varint primary key
id varint primary key,
msg text
);
CREATE INDEX ON yolo (msg);
`),
},
{