diff --git a/driver/neo4j/README.md b/driver/neo4j/README.md new file mode 100644 index 0000000..5a708cf --- /dev/null +++ b/driver/neo4j/README.md @@ -0,0 +1,20 @@ +# Neo4j Driver + +* Runs migrations in transcations. + That means that if a migration failes, it will be safely rolled back. + +* Stores migration version details with the label ``SchemaMigrations``. + An unique constraint for the field :SchemaMigrations(version) will be auto-generated. + +* Neo4j cannot perform schema and data updates in a transaction, therefore it's necessary to use different migration files + +## Usage + +```bash +migrate -url neo4j://user:password@host:port/db/data -path ./db/migrations create add_field_to_table +migrate -url neo4j://user:password@host:port/db/data -path ./db/migrations up +migrate help # for more info +``` +## Author + +* Carlos Forero, https://github.com/carlosforero \ No newline at end of file diff --git a/driver/neo4j/neo4j.go b/driver/neo4j/neo4j.go new file mode 100644 index 0000000..9670abe --- /dev/null +++ b/driver/neo4j/neo4j.go @@ -0,0 +1,157 @@ +// Package neo4j implements the Driver interface. +package neo4j + +import ( + "fmt" + "bytes" + "strings" + "errors" + "github.com/jmcvetta/neoism" + "github.com/mattes/migrate/driver" + "github.com/mattes/migrate/file" + "github.com/mattes/migrate/migrate/direction" +) + +type Driver struct { + db *neoism.Database +} + +const labelName = "SchemaMigration" + +func (driver *Driver) Initialize(url string) error { + url = strings.Replace(url,"neo4j","http",1) + + db, err := neoism.Connect(url) + if err != nil { + return err + } + + driver.db = db + + if err := driver.ensureVersionConstraintExists(); err != nil { + return err + } + return nil +} + +func (driver *Driver) Close() error { + driver.db = nil + return nil +} + +func (driver *Driver) FilenameExtension() string { + return "cql" +} + +func (driver *Driver) ensureVersionConstraintExists() error { + uc, _ := driver.db.UniqueConstraints("SchemaMigration", "version") + if len(uc) == 0 { + _, err := driver.db.CreateUniqueConstraint("SchemaMigration", "version") + return err + } + return nil +} + +func (driver *Driver) setVersion(d direction.Direction, v uint64, invert bool) error { + + cqUp := neoism.CypherQuery { + Statement: `CREATE (n:SchemaMigration {version: {Version}}) RETURN n`, + Parameters: neoism.Props{"Version": v}, + } + + cqDown := neoism.CypherQuery { + Statement: `MATCH (n:SchemaMigration {version: {Version}}) DELETE n`, + Parameters: neoism.Props{"Version": v}, + } + + var cq neoism.CypherQuery + switch d { + case direction.Up: + if invert { cq = cqDown } else { cq = cqUp } + case direction.Down: + if invert { cq = cqUp } else { cq = cqDown } + } + return driver.db.Cypher(&cq) +} + +func (driver *Driver) Migrate(f file.File, pipe chan interface{}) { + var err error + + defer func() { + if err != nil { + // Invert version direction if we couldn't apply the changes for some reason. + if err := driver.setVersion(f.Direction, f.Version, true); err != nil { + pipe <- err + } + pipe <- err + } + close(pipe) + }() + + pipe <- f + + + if err = driver.setVersion(f.Direction, f.Version, false); err != nil { + pipe <- err + return + } + + if err = f.ReadContent(); err != nil { + pipe <- err + return + } + + cQueries := []*neoism.CypherQuery{} + + // Neoism doesn't support multiple statements per query. + cqlStmts := bytes.Split(f.Content, []byte(";")) + + for _, cqlStmt := range cqlStmts { + cqlStmt = bytes.TrimSpace(cqlStmt) + if len(cqlStmt) > 0 { + cq := neoism.CypherQuery{Statement: string(cqlStmt)} + cQueries = append( cQueries, &cq ) + } + } + + var tx *neoism.Tx + + tx, err = driver.db.Begin(cQueries) + if err != nil { + pipe <- err + for _, err := range tx.Errors { + pipe <- errors.New(fmt.Sprintf("%v", err.Message)) + } + if err = tx.Rollback(); err != nil { + pipe <- err + } + return + } + + if err = tx.Commit(); err != nil { + pipe <- err + for _, err := range tx.Errors { + pipe <- errors.New(fmt.Sprintf("%v", err.Message)) + } + return + } +} + +func (driver *Driver) Version() (uint64, error) { + res := []struct {Version uint64 `json:"n.version"`}{} + + cq := neoism.CypherQuery{ + Statement: `MATCH (n:SchemaMigration) + RETURN n.version ORDER BY n.version DESC LIMIT 1`, + Result: &res, + } + + if err := driver.db.Cypher(&cq); err != nil || len(res) == 0 { + return 0, err + } + return res[0].Version, nil +} + +func init() { + driver.RegisterDriver("neo4j", &Driver{}) +} diff --git a/driver/neo4j/neo4j_test.go b/driver/neo4j/neo4j_test.go new file mode 100644 index 0000000..09d06b4 --- /dev/null +++ b/driver/neo4j/neo4j_test.go @@ -0,0 +1,98 @@ +package neo4j + +import ( + "testing" + "os" + + "github.com/jmcvetta/neoism" + "github.com/mattes/migrate/file" + "github.com/mattes/migrate/migrate/direction" + pipep "github.com/mattes/migrate/pipe" +) + +// TestMigrate runs some additional tests on Migrate(). +// Basic testing is already done in migrate/migrate_test.go +func TestMigrate(t *testing.T) { + host := os.Getenv("NEO4J_PORT_7474_TCP_ADDR") + port := os.Getenv("NEO4J_PORT_7474_TCP_PORT") + + driverUrl := "http://neo4j:test@" + host + ":" + port + "/db/data" + + // prepare clean database + db, err := neoism.Connect(driverUrl) + if err != nil { + t.Fatal(err) + } + + cq := neoism.CypherQuery{ + Statement: `DROP INDEX ON :Yolo(name)`, + } + + // If an error dropping the index then ignore it + db.Cypher(&cq) + + driverUrl = "neo4j://neo4j:test@" + host + ":" + port + "/db/data" + + d := &Driver{} + if err := d.Initialize(driverUrl); err != nil { + t.Fatal(err) + } + + files := []file.File{ + { + Path: "/foobar", + FileName: "001_foobar.up.cql", + Version: 1, + Name: "foobar", + Direction: direction.Up, + Content: []byte(` + CREATE INDEX ON :Yolo(name) + `), + }, + { + Path: "/foobar", + FileName: "001_foobar.down.cql", + Version: 1, + Name: "foobar", + Direction: direction.Down, + Content: []byte(` + DROP INDEX ON :Yolo(name) + `), + }, + { + Path: "/foobar", + FileName: "002_foobar.up.cql", + Version: 2, + Name: "foobar", + Direction: direction.Up, + Content: []byte(` + CREATE INDEX :Yolo(name) THIS WILL CAUSE AN ERROR + `), + }, + } + + pipe := pipep.New() + go d.Migrate(files[0], pipe) + errs := pipep.ReadErrors(pipe) + if len(errs) > 0 { + t.Fatal(errs) + } + + pipe = pipep.New() + go d.Migrate(files[1], pipe) + errs = pipep.ReadErrors(pipe) + if len(errs) > 0 { + t.Fatal(errs) + } + + pipe = pipep.New() + go d.Migrate(files[2], pipe) + errs = pipep.ReadErrors(pipe) + if len(errs) == 0 { + t.Error("Expected test case to fail") + } + + if err := d.Close(); err != nil { + t.Fatal(err) + } +} diff --git a/main.go b/main.go index b6da438..e63ceab 100644 --- a/main.go +++ b/main.go @@ -16,6 +16,7 @@ import ( _ "github.com/mattes/migrate/driver/mysql" _ "github.com/mattes/migrate/driver/postgres" _ "github.com/mattes/migrate/driver/sqlite3" + _ "github.com/mattes/migrate/driver/neo4j" "github.com/mattes/migrate/file" "github.com/mattes/migrate/migrate" "github.com/mattes/migrate/migrate/direction"