From 89879968bb69e4dd128ac2df0116f59e28e873be Mon Sep 17 00:00:00 2001 From: Jolan Malassigne Date: Wed, 24 May 2017 09:59:18 +0200 Subject: [PATCH] add cassandra driver and function to retrieve networkSettings to get port bound to 9042 --- Makefile | 8 +- README.md | 4 +- cli/build_cassandra.go | 7 + database/cassandra/README.md | 29 ++++ database/cassandra/cassandra.go | 223 +++++++++++++++++++++++++++ database/cassandra/cassandra_test.go | 53 +++++++ database/driver.go | 4 +- testing/docker.go | 8 +- testing/testing.go | 7 +- 9 files changed, 332 insertions(+), 11 deletions(-) create mode 100644 cli/build_cassandra.go create mode 100644 database/cassandra/cassandra.go create mode 100644 database/cassandra/cassandra_test.go diff --git a/Makefile b/Makefile index 416a2d2..5e3fa6d 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ SOURCE ?= file go-bindata github aws-s3 google-cloud-storage -DATABASE ?= postgres mysql redshift sqlite3 spanner +DATABASE ?= postgres mysql redshift cassandra sqlite3 spanner VERSION ?= $(shell git describe --tags 2>/dev/null | cut -c 2-) TEST_FLAGS ?= REPO_OWNER ?= $(shell cd .. && basename "$$(pwd)") @@ -7,9 +7,9 @@ REPO_OWNER ?= $(shell cd .. && basename "$$(pwd)") build-cli: clean -mkdir ./cli/build - cd ./cli && GOOS=linux GOARCH=amd64 go build -a -o build/migrate.linux-amd64 -ldflags='-X main.Version=$(VERSION)' -tags '$(DATABASE) $(SOURCE)' . - cd ./cli && GOOS=darwin GOARCH=amd64 go build -a -o build/migrate.darwin-amd64 -ldflags='-X main.Version=$(VERSION)' -tags '$(DATABASE) $(SOURCE)' . - cd ./cli && GOOS=windows GOARCH=amd64 go build -a -o build/migrate.windows-amd64.exe -ldflags='-X main.Version=$(VERSION)' -tags '$(DATABASE) $(SOURCE)' . + cd ./cli && GOOS=linux GOARCH=amd64 go build -a -o build/migrate.linux-amd64 -ldflags='-X main.Version=$(VERSION)' -tags '$(DATABASE) $(SOURCE)' . + cd ./cli && GOOS=darwin GOARCH=amd64 go build -a -o build/migrate.darwin-amd64 -ldflags='-X main.Version=$(VERSION)' -tags '$(DATABASE) $(SOURCE)' . + cd ./cli && GOOS=windows GOARCH=amd64 go build -a -o build/migrate.windows-amd64.exe -ldflags='-X main.Version=$(VERSION)' -tags '$(DATABASE) $(SOURCE)' . cd ./cli/build && find . -name 'migrate*' | xargs -I{} tar czf {}.tar.gz {} cd ./cli/build && shasum -a 256 * > sha256sum.txt cat ./cli/build/sha256sum.txt diff --git a/README.md b/README.md index 89ea696..599c3d5 100644 --- a/README.md +++ b/README.md @@ -24,8 +24,8 @@ Database drivers run migrations. [Add a new database?](database/driver.go) * [PostgreSQL](database/postgres) * [Redshift](database/redshift) * [Ql](database/ql) - * [Cassandra](database/cassandra) ([todo #164](https://github.com/mattes/migrate/issues/164)) - * [SQLite](database/sqlite3) + * [Cassandra](database/cassandra) + * [SQLite](database/sqlite) * [MySQL/ MariaDB](database/mysql) * [Neo4j](database/neo4j) ([todo #167](https://github.com/mattes/migrate/issues/167)) * [MongoDB](database/mongodb) ([todo #169](https://github.com/mattes/migrate/issues/169)) diff --git a/cli/build_cassandra.go b/cli/build_cassandra.go new file mode 100644 index 0000000..319b52d --- /dev/null +++ b/cli/build_cassandra.go @@ -0,0 +1,7 @@ +// +build cassandra + +package main + +import ( + _ "github.com/mattes/migrate/database/cassandra" +) diff --git a/database/cassandra/README.md b/database/cassandra/README.md index e69de29..e7765e2 100644 --- a/database/cassandra/README.md +++ b/database/cassandra/README.md @@ -0,0 +1,29 @@ +# Cassandra + +* Drop command will not work on Cassandra 2.X because it rely on +system_schema table which comes with 3.X +* Other commands should work properly but are **not tested** + + +## Usage +`cassandra://host:port/keyspace?param1=value¶m2=value2` + + +| URL Query | Default value | Description | +|------------|-------------|-----------| +| `x-migrations-table` | schema_migrations | Name of the migrations table | +| `port` | 9042 | The port to bind to | +| `consistency` | ALL | Migration consistency +| `protocol` | | Cassandra protocol version (3 or 4) +| `timeout` | 1 minute | Migration timeout + + +`timeout` is parsed using [time.ParseDuration(s string)](https://golang.org/pkg/time/#ParseDuration) + + +## Upgrading from v1 + +1. Write down the current migration version from schema_migrations +2. `DROP TABLE schema_migrations` +4. Download and install the latest migrate version. +5. Force the current migration version with `migrate force `. diff --git a/database/cassandra/cassandra.go b/database/cassandra/cassandra.go new file mode 100644 index 0000000..3dd9379 --- /dev/null +++ b/database/cassandra/cassandra.go @@ -0,0 +1,223 @@ +package cassandra + +import ( + "fmt" + "io" + "io/ioutil" + nurl "net/url" + "github.com/gocql/gocql" + "time" + "github.com/mattes/migrate/database" + "strconv" +) + +func init() { + db := new(Cassandra) + database.Register("cassandra", db) +} + +var DefaultMigrationsTable = "schema_migrations" +var dbLocked = false + +var ( + ErrNilConfig = fmt.Errorf("no config") + ErrNoKeyspace = fmt.Errorf("no keyspace provided") + ErrDatabaseDirty = fmt.Errorf("database is dirty") +) + +type Config struct { + MigrationsTable string + KeyspaceName string +} + +type Cassandra struct { + session *gocql.Session + isLocked bool + + // Open and WithInstance need to guarantee that config is never nil + config *Config +} + +func (p *Cassandra) Open(url string) (database.Driver, error) { + u, err := nurl.Parse(url) + if err != nil { + return nil, err + } + + // Check for missing mandatory attributes + if len(u.Path) == 0 { + return nil, ErrNoKeyspace + } + + migrationsTable := u.Query().Get("x-migrations-table") + if len(migrationsTable) == 0 { + migrationsTable = DefaultMigrationsTable + } + + p.config = &Config{ + KeyspaceName: u.Path, + MigrationsTable: migrationsTable, + } + + cluster := gocql.NewCluster(u.Host) + cluster.Keyspace = u.Path[1:len(u.Path)] + cluster.Consistency = gocql.All + cluster.Timeout = 1 * time.Minute + + + // Retrieve query string configuration + if len(u.Query().Get("consistency")) > 0 { + var consistency gocql.Consistency + consistency, err = parseConsistency(u.Query().Get("consistency")) + if err != nil { + return nil, err + } + + cluster.Consistency = consistency + } + if len(u.Query().Get("protocol")) > 0 { + var protoversion int + protoversion, err = strconv.Atoi(u.Query().Get("protocol")) + if err != nil { + return nil, err + } + cluster.ProtoVersion = protoversion + } + if len(u.Query().Get("timeout")) > 0 { + var timeout time.Duration + timeout, err = time.ParseDuration(u.Query().Get("timeout")) + if err != nil { + return nil, err + } + cluster.Timeout = timeout + } + + p.session, err = cluster.CreateSession() + + if err != nil { + return nil, err + } + + if err := p.ensureVersionTable(); err != nil { + return nil, err + } + + return p, nil +} + +func (p *Cassandra) Close() error { + p.session.Close() + return nil +} + +func (p *Cassandra) Lock() error { + if (dbLocked) { + return database.ErrLocked + } + dbLocked = true + return nil +} + +func (p *Cassandra) Unlock() error { + dbLocked = false + return nil +} + +func (p *Cassandra) Run(migration io.Reader) error { + migr, err := ioutil.ReadAll(migration) + if err != nil { + return err + } + // run migration + query := string(migr[:]) + if err := p.session.Query(query).Exec(); err != nil { + // TODO: cast to Cassandra error and get line number + return database.Error{OrigErr: err, Err: "migration failed", Query: migr} + } + + return nil +} + +func (p *Cassandra) SetVersion(version int, dirty bool) error { + query := `TRUNCATE "` + p.config.MigrationsTable + `"` + if err := p.session.Query(query).Exec(); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + if version >= 0 { + query = `INSERT INTO "` + p.config.MigrationsTable + `" (version, dirty) VALUES (?, ?)` + if err := p.session.Query(query, version, dirty).Exec(); err != nil { + return &database.Error{OrigErr: err, Query: []byte(query)} + } + } + + return nil +} + + +// Return current keyspace version +func (p *Cassandra) Version() (version int, dirty bool, err error) { + query := `SELECT version, dirty FROM "` + p.config.MigrationsTable + `" LIMIT 1` + err = p.session.Query(query).Scan(&version, &dirty) + switch { + case err == gocql.ErrNotFound: + return database.NilVersion, false, nil + + case err != nil: + if _, ok := err.(*gocql.Error); ok { + return database.NilVersion, false, nil + } + return 0, false, &database.Error{OrigErr: err, Query: []byte(query)} + + default: + return version, dirty, nil + } +} + +func (p *Cassandra) Drop() error { + // select all tables in current schema + query := fmt.Sprintf(`SELECT table_name from system_schema.tables WHERE keyspace_name='%s'`, p.config.KeyspaceName[1:]) // Skip '/' character + iter := p.session.Query(query).Iter() + var tableName string + for iter.Scan(&tableName) { + err := p.session.Query(fmt.Sprintf(`DROP TABLE %s`, tableName)).Exec() + if err != nil { + return err + } + } + // Re-create the version table + if err := p.ensureVersionTable(); err != nil { + return err + } + return nil +} + + +// Ensure version table exists +func (p *Cassandra) ensureVersionTable() error { + err := p.session.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (version bigint, dirty boolean, PRIMARY KEY(version))", p.config.MigrationsTable)).Exec() + if err != nil { + return err + } + if _, _, err = p.Version(); err != nil { + return err + } + return nil +} + + +// ParseConsistency wraps gocql.ParseConsistency +// to return an error instead of a panicking. +func parseConsistency(consistencyStr string) (consistency gocql.Consistency, err error) { + defer func() { + if r := recover(); r != nil { + var ok bool + err, ok = r.(error) + if !ok { + err = fmt.Errorf("Failed to parse consistency \"%s\": %v", consistencyStr, r) + } + } + }() + consistency = gocql.ParseConsistency(consistencyStr) + + return consistency, nil +} diff --git a/database/cassandra/cassandra_test.go b/database/cassandra/cassandra_test.go new file mode 100644 index 0000000..4e9150f --- /dev/null +++ b/database/cassandra/cassandra_test.go @@ -0,0 +1,53 @@ +package cassandra + +import ( + "fmt" + "testing" + dt "github.com/mattes/migrate/database/testing" + mt "github.com/mattes/migrate/testing" + "github.com/gocql/gocql" + "time" + "strconv" +) + +var versions = []mt.Version{ + {"cassandra:3.0.10", []string{}}, + {"cassandra:3.0", []string{}}, +} + +func isReady(i mt.Instance) bool { + // Cassandra exposes 5 ports (7000, 7001, 7199, 9042 & 9160) + // We only need the port bound to 9042, but we can only access to the first one + // through 'i.Port()' (which calls DockerContainer.firstPortMapping()) + // So we need to get port mapping to retrieve correct port number bound to 9042 + portMap := i.NetworkSettings().Ports + port, _ := strconv.Atoi(portMap["9042/tcp"][0].HostPort) + + cluster := gocql.NewCluster(i.Host()) + cluster.Port = port + //cluster.ProtoVersion = 4 + cluster.Consistency = gocql.All + cluster.Timeout = 1 * time.Minute + p, err := cluster.CreateSession() + if err != nil { + return false + } + // Create keyspace for tests + p.Query("CREATE KEYSPACE testks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor':1}").Exec() + return true +} + +func Test(t *testing.T) { + mt.ParallelTest(t, versions, isReady, + func(t *testing.T, i mt.Instance) { + p := &Cassandra{} + portMap := i.NetworkSettings().Ports + port, _ := strconv.Atoi(portMap["9042/tcp"][0].HostPort) + addr := fmt.Sprintf("cassandra://%v:%v/testks", i.Host(), port) + d, err := p.Open(addr) + if err != nil { + t.Fatalf("%v", err) + } + dt.Test(t, d, []byte("SELECT table_name from system_schema.tables")) + }) +} \ No newline at end of file diff --git a/database/driver.go b/database/driver.go index 53d5240..3983a6a 100644 --- a/database/driver.go +++ b/database/driver.go @@ -32,6 +32,8 @@ var drivers = make(map[string]Driver) // All other functions are tested by tests in database/testing. // Saves you some time and makes sure all database drivers behave the same way. // 5. Call Register in init(). +// 6. Create a migrate/cli/build_.go file +// 7. Add driver name in 'DATABASE' variable in Makefile // // Guidelines: // * Don't try to correct user input. Don't assume things. @@ -71,7 +73,7 @@ type Driver interface { // Dirty means, a previous migration failed and user interaction is required. Version() (version int, dirty bool, err error) - // Drop deletes everyting in the database. + // Drop deletes everything in the database. Drop() error } diff --git a/testing/docker.go b/testing/docker.go index 9017e97..de93bbf 100644 --- a/testing/docker.go +++ b/testing/docker.go @@ -3,7 +3,7 @@ package testing import ( "bufio" - "context" // TODO: is issue with go < 1.7? + "context" "encoding/json" "fmt" "io" @@ -12,7 +12,6 @@ import ( "strings" "testing" "time" - dockertypes "github.com/docker/docker/api/types" dockercontainer "github.com/docker/docker/api/types/container" dockernetwork "github.com/docker/docker/api/types/network" @@ -202,6 +201,11 @@ func (d *DockerContainer) Port() uint { return port } +func (d *DockerContainer) NetworkSettings() dockertypes.NetworkSettings { + netSettings := d.ContainerJSON.NetworkSettings + return *netSettings +} + type dockerImagePullOutput struct { Status string `json:"status"` ProgressDetails struct { diff --git a/testing/testing.go b/testing/testing.go index fb04ad7..0d03432 100644 --- a/testing/testing.go +++ b/testing/testing.go @@ -6,6 +6,8 @@ import ( "strconv" "testing" "time" + + dockertypes "github.com/docker/docker/api/types" ) type IsReadyFunc func(Instance) bool @@ -46,8 +48,8 @@ func ParallelTest(t *testing.T, versions []Version, readyFn IsReadyFunc, testFn // wait until database is ready tick := time.Tick(1000 * time.Millisecond) - timeout := time.After(time.Duration(delay+60) * time.Second) - outer: + timeout := time.After(time.Duration(delay + 60) * time.Second) + outer: for { select { case <-tick: @@ -87,5 +89,6 @@ func containerLogs(t *testing.T, c *DockerContainer) []byte { type Instance interface { Host() string Port() uint + NetworkSettings() dockertypes.NetworkSettings KeepForDebugging() }