mirror of
https://github.com/status-im/spiff-arena.git
synced 2025-01-14 12:14:46 +00:00
Feature/data migrator tests (#546)
* using new spiffworkflow locally and the db can be recreated w/ burnettk * tests are passing w/ burnettk * added version 3 data migration for typenames on tasks and bpmn processes w/ burnettk * pyl w/ burnettk * attempting to add tests for data migrator and fix 1.3 for postgres * run version_1_3 migration differently from postgres versus mysql and sqlite * look up the task model again to make sure it is fresh w/ burnettk --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
4d842e8dbf
commit
c2dc4a738e
@ -42,7 +42,7 @@ if [[ "${1:-}" == "clean" ]]; then
|
|||||||
exit 1
|
exit 1
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [[ "${SPIFFWORKFLOW_BACKEND_DATABASE_TYPE:-mysql}" != "mysql" ]]; then
|
if [[ "${SPIFFWORKFLOW_BACKEND_DATABASE_TYPE:-mysql}" == "sqlite" ]]; then
|
||||||
rm -f ./src/instance/*.sqlite3
|
rm -f ./src/instance/*.sqlite3
|
||||||
else
|
else
|
||||||
mysql -h "$database_host" -uroot -e "DROP DATABASE IF EXISTS spiffworkflow_backend_local_development"
|
mysql -h "$database_host" -uroot -e "DROP DATABASE IF EXISTS spiffworkflow_backend_local_development"
|
||||||
|
@ -3,6 +3,7 @@ import json
|
|||||||
import uuid
|
import uuid
|
||||||
from hashlib import sha256
|
from hashlib import sha256
|
||||||
|
|
||||||
|
from flask import current_app
|
||||||
from spiffworkflow_backend.models.db import db
|
from spiffworkflow_backend.models.db import db
|
||||||
from spiffworkflow_backend.models.task import Task
|
from spiffworkflow_backend.models.task import Task
|
||||||
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||||
@ -221,7 +222,18 @@ class VersionOneThree:
|
|||||||
db.session.add(task_definition)
|
db.session.add(task_definition)
|
||||||
|
|
||||||
def update_tasks_where_last_change_is_null(self) -> None:
|
def update_tasks_where_last_change_is_null(self) -> None:
|
||||||
task_models = TaskModel.query.filter(TaskModel.properties_json.like('%last_state_change": null%')).all() # type: ignore
|
if current_app.config.get("SPIFFWORKFLOW_BACKEND_DATABASE_TYPE") == "postgres":
|
||||||
|
task_models = (
|
||||||
|
db.session.query(TaskModel)
|
||||||
|
.filter(
|
||||||
|
TaskModel.properties_json.op("->>")("last_state_changed") == None # type: ignore # noqa: E711
|
||||||
|
)
|
||||||
|
.all()
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
task_models = TaskModel.query.filter(
|
||||||
|
TaskModel.properties_json.like('%last_state_change": null%') # type: ignore
|
||||||
|
).all()
|
||||||
for task_model in task_models:
|
for task_model in task_models:
|
||||||
parent_task_model = task_model.parent_task_model()
|
parent_task_model = task_model.parent_task_model()
|
||||||
|
|
||||||
|
@ -0,0 +1,69 @@
|
|||||||
|
import copy
|
||||||
|
|
||||||
|
from flask.app import Flask
|
||||||
|
from flask.testing import FlaskClient
|
||||||
|
from spiffworkflow_backend.data_migrations.process_instance_migrator import ProcessInstanceMigrator
|
||||||
|
from spiffworkflow_backend.data_migrations.version_1_3 import VersionOneThree
|
||||||
|
from spiffworkflow_backend.models.db import db
|
||||||
|
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
|
||||||
|
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
|
||||||
|
|
||||||
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||||
|
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
|
||||||
|
|
||||||
|
|
||||||
|
class TestProcessInstanceMigrator(BaseTest):
|
||||||
|
def test_can_run_all_migrations(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_model = load_test_spec(
|
||||||
|
process_model_id="test_group/model_with_lanes",
|
||||||
|
bpmn_file_name="lanes.bpmn",
|
||||||
|
process_model_source_directory="model_with_lanes",
|
||||||
|
)
|
||||||
|
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
|
||||||
|
process_instance.spiff_serializer_version = "1"
|
||||||
|
db.session.add(process_instance)
|
||||||
|
db.session.commit()
|
||||||
|
|
||||||
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
processor.do_engine_steps(save=True)
|
||||||
|
ProcessInstanceMigrator.run(process_instance)
|
||||||
|
|
||||||
|
def test_can_run_version_1_3_migration(
|
||||||
|
self,
|
||||||
|
app: Flask,
|
||||||
|
client: FlaskClient,
|
||||||
|
with_db_and_bpmn_file_cleanup: None,
|
||||||
|
) -> None:
|
||||||
|
process_model = load_test_spec(
|
||||||
|
process_model_id="test_group/model_with_lanes",
|
||||||
|
bpmn_file_name="lanes.bpmn",
|
||||||
|
process_model_source_directory="model_with_lanes",
|
||||||
|
)
|
||||||
|
process_instance = self.create_process_instance_from_process_model(process_model=process_model)
|
||||||
|
process_instance.spiff_serializer_version = "1"
|
||||||
|
db.session.add(process_instance)
|
||||||
|
db.session.commit()
|
||||||
|
processor = ProcessInstanceProcessor(process_instance)
|
||||||
|
processor.do_engine_steps(save=True)
|
||||||
|
|
||||||
|
task_model = TaskModel.query.filter_by(process_instance_id=process_instance.id).all()[-1]
|
||||||
|
assert task_model is not None
|
||||||
|
assert task_model.parent_task_model() is not None
|
||||||
|
assert task_model.parent_task_model().properties_json["last_state_change"] is not None
|
||||||
|
|
||||||
|
new_properties_json = copy.copy(task_model.properties_json)
|
||||||
|
new_properties_json["last_state_change"] = None
|
||||||
|
task_model.properties_json = new_properties_json
|
||||||
|
db.session.add(task_model)
|
||||||
|
db.session.commit()
|
||||||
|
task_model = TaskModel.query.filter_by(id=task_model.id).first()
|
||||||
|
assert task_model.properties_json["last_state_change"] is None
|
||||||
|
|
||||||
|
VersionOneThree().run()
|
||||||
|
task_model = TaskModel.query.filter_by(id=task_model.id).first()
|
||||||
|
assert task_model.properties_json["last_state_change"] is not None
|
Loading…
x
Reference in New Issue
Block a user