From dac7100cf320860de13e8a897b66cb5dbc1735b3 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 26 Aug 2014 18:46:41 -0700 Subject: [PATCH 1/7] Add support for Cassandra to migration drivers --- driver/cassandra/cassandra.go | 96 +++++++++++++++++++++++++ driver/cassandra/cassandra_test.go | 109 +++++++++++++++++++++++++++++ driver/driver.go | 11 ++- migrate/migrate.go | 9 +-- 4 files changed, 220 insertions(+), 5 deletions(-) create mode 100644 driver/cassandra/cassandra.go create mode 100644 driver/cassandra/cassandra_test.go diff --git a/driver/cassandra/cassandra.go b/driver/cassandra/cassandra.go new file mode 100644 index 0000000..591ca6f --- /dev/null +++ b/driver/cassandra/cassandra.go @@ -0,0 +1,96 @@ +// Package cassandra implements the Driver interface. +package cassandra + +import ( + "net/url" + "time" + + "github.com/gocql/gocql" + "github.com/mattes/migrate/file" + "github.com/mattes/migrate/migrate/direction" +) + +type Driver struct { + session *gocql.Session +} + +const tableName = "schema_migrations" + +// Cassandra Driver URL format: +// cassandra://host:port/keyspace +// +// Example: +// cassandra://localhost/SpaceOfKeys +func (driver *Driver) Initialize(rawurl string) error { + u, err := url.Parse(rawurl) + + cluster := gocql.NewCluster(u.Host) + cluster.Keyspace = u.Path[1:len(u.Path)] + cluster.Consistency = gocql.All + cluster.Timeout = 1 * time.Minute + + driver.session, err = cluster.CreateSession() + + if err != nil { + return err + } + + if err := driver.ensureVersionTableExists(); err != nil { + return err + } + return nil +} + +func (driver *Driver) Close() error { + driver.session.Close() + return nil +} + +func (driver *Driver) ensureVersionTableExists() error { + err := driver.session.Query("CREATE TABLE IF NOT EXISTS " + tableName + " (version bigint primary key);").Exec() + if err != nil { + return err + } + return nil +} + +func (driver *Driver) FilenameExtension() string { + return "cql" +} + +func (driver *Driver) Migrate(f file.File, pipe chan interface{}) { + defer close(pipe) + pipe <- f + + if f.Direction == direction.Up { + err := driver.session.Query("INSERT INTO "+tableName+" (version) VALUES (?)", f.Version).Exec() + if err != nil { + pipe <- err + return + } + } else if f.Direction == direction.Down { + err := driver.session.Query("DELETE FROM "+tableName+" WHERE version = ?", f.Version).Exec() + if err != nil { + pipe <- err + return + } + } + + if err := f.ReadContent(); err != nil { + pipe <- err + return + } + + err := driver.session.Query(string(f.Content)).Exec() + + if err != nil { + pipe <- err + return + } +} + +func (driver *Driver) Version() (uint64, error) { + var version int64 + err := driver.session.Query("SELECT version FROM " + tableName + " ORDER BY version DESC LIMIT 1").Scan(&version) + return uint64(version), err +} diff --git a/driver/cassandra/cassandra_test.go b/driver/cassandra/cassandra_test.go new file mode 100644 index 0000000..4fee7cb --- /dev/null +++ b/driver/cassandra/cassandra_test.go @@ -0,0 +1,109 @@ +package cassandra + +import ( + "net/url" + "testing" + "time" + + "github.com/gocql/gocql" + "github.com/mattes/migrate/file" + "github.com/mattes/migrate/migrate/direction" + pipep "github.com/mattes/migrate/pipe" +) + +func TestMigrate(t *testing.T) { + var session *gocql.Session + driverUrl := "cassandra://localhost/migratetest" + + // prepare a clean test database + u, err := url.Parse(driverUrl) + if err != nil { + t.Fatal(err) + } + + cluster := gocql.NewCluster(u.Host) + cluster.Keyspace = u.Path[1:len(u.Path)] + cluster.Consistency = gocql.All + cluster.Timeout = 1 * time.Minute + + session, err = cluster.CreateSession() + + if err != nil { + t.Fatal(err) + } + + if err := session.Query(`DROP TABLE IF EXISTS yolo`).Exec(); err != nil { + t.Fatal(err) + } + if err := session.Query(`DROP TABLE IF EXISTS ` + tableName).Exec(); err != nil { + t.Fatal(err) + } + + d := &Driver{} + if err := d.Initialize(driverUrl); err != nil { + t.Fatal(err) + } + + files := []file.File{ + { + Path: "/foobar", + FileName: "001_foobar.up.sql", + Version: 1, + Name: "foobar", + Direction: direction.Up, + Content: []byte(` + CREATE TABLE yolo ( + id varint primary key + ); + `), + }, + { + Path: "/foobar", + FileName: "002_foobar.down.sql", + Version: 1, + Name: "foobar", + Direction: direction.Down, + Content: []byte(` + DROP TABLE yolo; + `), + }, + { + Path: "/foobar", + FileName: "002_foobar.up.sql", + Version: 2, + Name: "foobar", + Direction: direction.Up, + Content: []byte(` + CREATE TABLE error ( + id THIS WILL CAUSE AN ERROR + ) + `), + }, + } + + pipe := pipep.New() + go d.Migrate(files[0], pipe) + errs := pipep.ReadErrors(pipe) + if len(errs) > 0 { + t.Fatal(errs) + } + + pipe = pipep.New() + go d.Migrate(files[1], pipe) + errs = pipep.ReadErrors(pipe) + if len(errs) > 0 { + t.Fatal(errs) + } + + pipe = pipep.New() + go d.Migrate(files[2], pipe) + errs = pipep.ReadErrors(pipe) + if len(errs) == 0 { + t.Error("Expected test case to fail") + } + + if err := d.Close(); err != nil { + t.Fatal(err) + } + +} diff --git a/driver/driver.go b/driver/driver.go index 179375b..af696e4 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -4,10 +4,12 @@ package driver import ( "errors" "fmt" + neturl "net/url" // alias to allow `url string` func signature in New + + "github.com/dinedal/migrate/driver/cassandra" "github.com/mattes/migrate/driver/bash" "github.com/mattes/migrate/driver/postgres" "github.com/mattes/migrate/file" - neturl "net/url" // alias to allow `url string` func signature in New ) // Driver is the interface type that needs to implemented by all drivers. @@ -58,6 +60,13 @@ func New(url string) (Driver, error) { return nil, err } return d, nil + case "cassandra": + d := &cassandra.Driver{} + verifyFilenameExtension("cassanda", d) + if err := d.Initialize(url); err != nil { + return nil, err + } + return d, nil default: return nil, errors.New(fmt.Sprintf("Driver '%s' not found.", u.Scheme)) } diff --git a/migrate/migrate.go b/migrate/migrate.go index 8609089..401ab0f 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -4,16 +4,17 @@ package migrate import ( "fmt" - "github.com/mattes/migrate/driver" - "github.com/mattes/migrate/file" - "github.com/mattes/migrate/migrate/direction" - pipep "github.com/mattes/migrate/pipe" "io/ioutil" "os" "os/signal" "path" "strconv" "strings" + + "github.com/dinedal/migrate/driver" + "github.com/mattes/migrate/file" + "github.com/mattes/migrate/migrate/direction" + pipep "github.com/mattes/migrate/pipe" ) // Up applies all available migrations From ca8cd2c56b26e0a86c2584e18c855f9a81950cd6 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 26 Aug 2014 18:48:46 -0700 Subject: [PATCH 2/7] Fix import paths to reflect upstream --- driver/driver.go | 2 +- migrate/migrate.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/driver/driver.go b/driver/driver.go index af696e4..517daa4 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -6,8 +6,8 @@ import ( "fmt" neturl "net/url" // alias to allow `url string` func signature in New - "github.com/dinedal/migrate/driver/cassandra" "github.com/mattes/migrate/driver/bash" + "github.com/mattes/migrate/driver/cassandra" "github.com/mattes/migrate/driver/postgres" "github.com/mattes/migrate/file" ) diff --git a/migrate/migrate.go b/migrate/migrate.go index 401ab0f..fb39cd6 100644 --- a/migrate/migrate.go +++ b/migrate/migrate.go @@ -11,7 +11,7 @@ import ( "strconv" "strings" - "github.com/dinedal/migrate/driver" + "github.com/mattes/migrate/driver" "github.com/mattes/migrate/file" "github.com/mattes/migrate/migrate/direction" pipep "github.com/mattes/migrate/pipe" From c7e6d5436534ac56292eb71a10b391ab46a58b76 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 26 Aug 2014 19:43:05 -0700 Subject: [PATCH 3/7] Use counter for migration version information instead of rows --- driver/cassandra/cassandra.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/driver/cassandra/cassandra.go b/driver/cassandra/cassandra.go index 591ca6f..4415daf 100644 --- a/driver/cassandra/cassandra.go +++ b/driver/cassandra/cassandra.go @@ -15,6 +15,7 @@ type Driver struct { } const tableName = "schema_migrations" +const versionRow = 1 // Cassandra Driver URL format: // cassandra://host:port/keyspace @@ -47,7 +48,7 @@ func (driver *Driver) Close() error { } func (driver *Driver) ensureVersionTableExists() error { - err := driver.session.Query("CREATE TABLE IF NOT EXISTS " + tableName + " (version bigint primary key);").Exec() + err := driver.session.Query("CREATE TABLE IF NOT EXISTS " + tableName + " (version counter, versionRow bigint primary key);").Exec() if err != nil { return err } @@ -63,13 +64,13 @@ func (driver *Driver) Migrate(f file.File, pipe chan interface{}) { pipe <- f if f.Direction == direction.Up { - err := driver.session.Query("INSERT INTO "+tableName+" (version) VALUES (?)", f.Version).Exec() + err := driver.session.Query("UPDATE "+tableName+" SET version = version + 1 where versionRow = ?", versionRow).Exec() if err != nil { pipe <- err return } } else if f.Direction == direction.Down { - err := driver.session.Query("DELETE FROM "+tableName+" WHERE version = ?", f.Version).Exec() + err := driver.session.Query("UPDATE "+tableName+" SET version = version - 1 where versionRow = ?", versionRow).Exec() if err != nil { pipe <- err return @@ -91,6 +92,6 @@ func (driver *Driver) Migrate(f file.File, pipe chan interface{}) { func (driver *Driver) Version() (uint64, error) { var version int64 - err := driver.session.Query("SELECT version FROM " + tableName + " ORDER BY version DESC LIMIT 1").Scan(&version) + err := driver.session.Query("SELECT version FROM "+tableName+" WHERE versionRow = ?", versionRow).Scan(&version) return uint64(version), err } From 28f6567fa07ed10b9f3326871668282176af940f Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 26 Aug 2014 19:45:56 -0700 Subject: [PATCH 4/7] Add cassandra to travis --- .travis.yml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.travis.yml b/.travis.yml index 8763f0f..ef71221 100644 --- a/.travis.yml +++ b/.travis.yml @@ -8,7 +8,12 @@ go: addons: postgresql: "9.3" +services: + - cassandra + before_script: + - > + /usr/local/cassandra/bin/cqlsh -e "CREATE KEYSPACE migratetest WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor' : 1};" - psql -c 'create database migratetest;' -U postgres script: go test -p 1 ./... \ No newline at end of file From a191c8bf362e8784a7146ba65b79fa0340d34a8f Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 26 Aug 2014 20:19:13 -0700 Subject: [PATCH 5/7] Fix issue where version wouldn't correctly populate --- driver/cassandra/cassandra.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/driver/cassandra/cassandra.go b/driver/cassandra/cassandra.go index 4415daf..a6d1905 100644 --- a/driver/cassandra/cassandra.go +++ b/driver/cassandra/cassandra.go @@ -49,6 +49,12 @@ func (driver *Driver) Close() error { func (driver *Driver) ensureVersionTableExists() error { err := driver.session.Query("CREATE TABLE IF NOT EXISTS " + tableName + " (version counter, versionRow bigint primary key);").Exec() + if err != nil { + return err + } + + driver.session.Query("UPDATE "+tableName+" SET version = version + 1 where versionRow = ?", versionRow).Exec() + if err != nil { return err } @@ -93,5 +99,5 @@ func (driver *Driver) Migrate(f file.File, pipe chan interface{}) { func (driver *Driver) Version() (uint64, error) { var version int64 err := driver.session.Query("SELECT version FROM "+tableName+" WHERE versionRow = ?", versionRow).Scan(&version) - return uint64(version), err + return uint64(version) - 1, err } From 988db1b4defe9e2039ee5120ade9458d40c4fc86 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 26 Aug 2014 20:24:01 -0700 Subject: [PATCH 6/7] Only set inital version if there is an error on getting it the first time --- driver/cassandra/cassandra.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/driver/cassandra/cassandra.go b/driver/cassandra/cassandra.go index a6d1905..26099ed 100644 --- a/driver/cassandra/cassandra.go +++ b/driver/cassandra/cassandra.go @@ -53,7 +53,10 @@ func (driver *Driver) ensureVersionTableExists() error { return err } - driver.session.Query("UPDATE "+tableName+" SET version = version + 1 where versionRow = ?", versionRow).Exec() + _, err = driver.Version() + if err != nil { + driver.session.Query("UPDATE "+tableName+" SET version = version + 1 where versionRow = ?", versionRow).Exec() + } if err != nil { return err From cad776681eac00d3d85bad349d74b8b548a35542 Mon Sep 17 00:00:00 2001 From: Paul Bergeron Date: Tue, 26 Aug 2014 20:26:10 -0700 Subject: [PATCH 7/7] Remove extraneous error handle --- driver/cassandra/cassandra.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/driver/cassandra/cassandra.go b/driver/cassandra/cassandra.go index 26099ed..e8e7035 100644 --- a/driver/cassandra/cassandra.go +++ b/driver/cassandra/cassandra.go @@ -58,9 +58,6 @@ func (driver *Driver) ensureVersionTableExists() error { driver.session.Query("UPDATE "+tableName+" SET version = version + 1 where versionRow = ?", versionRow).Exec() } - if err != nil { - return err - } return nil }