diff --git a/spiffworkflow-backend/migrations/env.py b/spiffworkflow-backend/migrations/env.py index 630e381a..68feded2 100644 --- a/spiffworkflow-backend/migrations/env.py +++ b/spiffworkflow-backend/migrations/env.py @@ -1,3 +1,5 @@ +from __future__ import with_statement + import logging from logging.config import fileConfig diff --git a/spiffworkflow-backend/migrations/versions/99f1b5156b06_.py b/spiffworkflow-backend/migrations/versions/434e6494e8ff_.py similarity index 98% rename from spiffworkflow-backend/migrations/versions/99f1b5156b06_.py rename to spiffworkflow-backend/migrations/versions/434e6494e8ff_.py index 9407aeaf..3663be8a 100644 --- a/spiffworkflow-backend/migrations/versions/99f1b5156b06_.py +++ b/spiffworkflow-backend/migrations/versions/434e6494e8ff_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 99f1b5156b06 +Revision ID: 434e6494e8ff Revises: -Create Date: 2023-03-14 17:23:22.667853 +Create Date: 2023-03-15 12:25:48.665481 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '99f1b5156b06' +revision = '434e6494e8ff' down_revision = None branch_labels = None depends_on = None @@ -356,6 +356,7 @@ def upgrade(): sa.Column('state', sa.String(length=10), nullable=False), sa.Column('properties_json', sa.JSON(), nullable=False), sa.Column('json_data_hash', sa.String(length=255), nullable=False), + sa.Column('python_env_data_hash', sa.String(length=255), nullable=False), sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), @@ -365,6 +366,7 @@ def upgrade(): ) op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True) op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False) + op.create_index(op.f('ix_task_python_env_data_hash'), 'task', ['python_env_data_hash'], unique=False) op.create_table('human_task_user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('human_task_id', sa.Integer(), nullable=False), @@ -398,6 +400,7 @@ def downgrade(): op.drop_index(op.f('ix_human_task_user_user_id'), table_name='human_task_user') op.drop_index(op.f('ix_human_task_user_human_task_id'), table_name='human_task_user') op.drop_table('human_task_user') + op.drop_index(op.f('ix_task_python_env_data_hash'), table_name='task') op.drop_index(op.f('ix_task_json_data_hash'), table_name='task') op.drop_index(op.f('ix_task_guid'), table_name='task') op.drop_table('task') diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py index 0723a50a..0713f527 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py @@ -1,9 +1,14 @@ from __future__ import annotations +from typing import Optional from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel +class JsonDataModelNotFoundError(Exception): + pass + + # delta algorithm <- just to save it for when we want to try to implement it: # a = {"hey": { "hey2": 2, "hey3": 3, "hey6": 7 }, "hey30": 3, "hey40": 4} # b = {"hey": { "hey2": 4, "hey5": 3 }, "hey20": 2, "hey30": 3} @@ -27,3 +32,15 @@ class JsonDataModel(SpiffworkflowBaseDBModel): # this is a sha256 hash of spec and serializer_version hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True) data: dict = db.Column(db.JSON, nullable=False) + + @classmethod + def find_object_by_hash(cls, hash: str) -> JsonDataModel: + json_data_model: Optional[JsonDataModel] = JsonDataModel.query.filter_by(hash=hash).first() + if json_data_model is None: + raise JsonDataModelNotFoundError(f"Could not find a json data model entry with hash: {hash}") + return json_data_model + + + @classmethod + def find_data_dict_by_hash(cls, hash: str) -> dict: + return cls.find_object_by_hash(hash).data diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py index 99ccb61b..fc0d3262 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py @@ -15,6 +15,7 @@ from sqlalchemy.orm import relationship from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel +from spiffworkflow_backend.models.json_data import JsonDataModel from spiffworkflow_backend.models.task_definition import TaskDefinitionModel @@ -59,11 +60,19 @@ class TaskModel(SpiffworkflowBaseDBModel): state: str = db.Column(db.String(10), nullable=False) properties_json: dict = db.Column(db.JSON, nullable=False) + json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) + python_env_data_hash: str = db.Column(db.String(255), nullable=False, index=True) start_in_seconds: float = db.Column(db.DECIMAL(17, 6)) end_in_seconds: Union[float, None] = db.Column(db.DECIMAL(17, 6)) + def python_env_data(self) -> dict: + return JsonDataModel.find_data_dict_by_hash(self.python_env_data_hash) + + def json_data(self) -> dict: + return JsonDataModel.find_data_dict_by_hash(self.json_data_hash) + class Task: """Task.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 6b80fbcf..806d8716 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1957,18 +1957,19 @@ class ProcessInstanceProcessor: db.session.add(details_model) # ####### - json_data_dict = TaskService.update_task_model( + json_data_dict_list = TaskService.update_task_model( task_model, spiff_task, self._serializer ) - if json_data_dict is not None: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=json_data_dict["hash"]) - .first() - ) - if json_data is None: - json_data = JsonDataModel(**json_data_dict) - db.session.add(json_data) + for json_data_dict in json_data_dict_list: + if json_data_dict is not None: + json_data = ( + db.session.query(JsonDataModel.id) + .filter_by(hash=json_data_dict["hash"]) + .first() + ) + if json_data is None: + json_data = JsonDataModel(**json_data_dict) + db.session.add(json_data) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index 1d81bc59..5b2c7935 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -26,6 +26,8 @@ class JsonDataDict(TypedDict): class TaskService: + PYTHON_ENVIRONMENT_STATE_KEY = "spiff__python_env_state" + @classmethod def insert_or_update_json_data_records( cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict] @@ -51,7 +53,7 @@ class TaskService: task_model: TaskModel, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer, - ) -> Optional[JsonDataDict]: + ) -> list[Optional[JsonDataDict]]: """Updates properties_json and data on given task_model. This will NOT update start_in_seconds or end_in_seconds. @@ -59,12 +61,16 @@ class TaskService: """ new_properties_json = serializer.task_to_dict(spiff_task) spiff_task_data = new_properties_json.pop("data") + python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(spiff_task) task_model.properties_json = new_properties_json task_model.state = TaskStateNames[new_properties_json["state"]] json_data_dict = cls._update_task_data_on_task_model( - task_model, spiff_task_data + task_model, spiff_task_data, "json_data_hash" ) - return json_data_dict + python_env_dict = cls._update_task_data_on_task_model( + task_model, python_env_data_dict, "python_env_data_hash" + ) + return [json_data_dict, python_env_dict] @classmethod def find_or_create_task_model_from_spiff_task( @@ -241,10 +247,10 @@ class TaskService: task_data_dict = task_properties.pop("data") state_int = task_properties["state"] + spiff_task = spiff_workflow.get_task(UUID(task_id)) task_model = TaskModel.query.filter_by(guid=task_id).first() if task_model is None: - spiff_task = spiff_workflow.get_task(UUID(task_id)) task_model = cls._create_task( bpmn_process, process_instance, @@ -253,26 +259,33 @@ class TaskService: ) task_model.state = TaskStateNames[state_int] task_model.properties_json = task_properties + new_task_models[task_model.guid] = task_model json_data_dict = TaskService._update_task_data_on_task_model( - task_model, task_data_dict + task_model, task_data_dict, "json_data_hash" ) - new_task_models[task_model.guid] = task_model if json_data_dict is not None: new_json_data_dicts[json_data_dict["hash"]] = json_data_dict + python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(spiff_task) + python_env_dict = TaskService._update_task_data_on_task_model( + task_model, python_env_data_dict, "python_env_data_hash" + ) + if python_env_dict is not None: + new_json_data_dicts[python_env_dict["hash"]] = python_env_dict + return (bpmn_process, new_task_models, new_json_data_dicts) @classmethod def _update_task_data_on_task_model( - cls, task_model: TaskModel, task_data_dict: dict + cls, task_model: TaskModel, task_data_dict: dict, task_model_data_column: str ) -> Optional[JsonDataDict]: task_data_json = json.dumps(task_data_dict, sort_keys=True) task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest() json_data_dict: Optional[JsonDataDict] = None - if task_model.json_data_hash != task_data_hash: + if getattr(task_model, task_model_data_column) != task_data_hash: json_data_dict = {"hash": task_data_hash, "data": task_data_dict} - task_model.json_data_hash = task_data_hash + setattr(task_model, task_model_data_column, task_data_hash) return json_data_dict @classmethod @@ -293,3 +306,7 @@ class TaskService: task_definition_id=task_definition.id, ) return task_model + + @classmethod + def _get_python_env_data_dict_from_spiff_task(cls, spiff_task: SpiffTask) -> dict: + return spiff_task.workflow.script_engine.environment.user_defined_state() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index be13342a..d9bf5bf8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -78,6 +78,12 @@ class TaskModelSavingDelegate(EngineStepDelegate): """ return self.process_instance.bpmn_process_id is not None + def _update_json_data_dicts_using_list(self, json_data_dict_list: list[Optional[JsonDataDict]]) -> None: + for json_data_dict in json_data_dict_list: + if json_data_dict is not None: + self.json_data_dicts[json_data_dict["hash"]] = json_data_dict + + def will_complete_task(self, spiff_task: SpiffTask) -> None: if self.should_update_task_model(): _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( @@ -98,11 +104,10 @@ class TaskModelSavingDelegate(EngineStepDelegate): def did_complete_task(self, spiff_task: SpiffTask) -> None: if self.current_task_model and self.should_update_task_model(): self.current_task_model.end_in_seconds = time.time() - json_data_dict = TaskService.update_task_model( + json_data_dict_list = TaskService.update_task_model( self.current_task_model, spiff_task, self.serializer ) - if json_data_dict is not None: - self.json_data_dicts[json_data_dict["hash"]] = json_data_dict + self._update_json_data_dicts_using_list(json_data_dict_list) self.task_models[self.current_task_model.guid] = self.current_task_model if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) @@ -126,7 +131,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): | TaskState.MAYBE | TaskState.LIKELY ): - _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( + bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( waiting_spiff_task, self.process_instance, @@ -136,12 +141,11 @@ class TaskModelSavingDelegate(EngineStepDelegate): ) self.task_models.update(new_task_models) self.json_data_dicts.update(new_json_data_dicts) - json_data_dict = TaskService.update_task_model( + json_data_dict_list = TaskService.update_task_model( task_model, waiting_spiff_task, self.serializer ) self.task_models[task_model.guid] = task_model - if json_data_dict is not None: - self.json_data_dicts[json_data_dict["hash"]] = json_data_dict + self._update_json_data_dicts_using_list(json_data_dict_list) class StepDetailLoggingDelegate(EngineStepDelegate): diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py index 9d481788..44060449 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py @@ -99,6 +99,7 @@ class TestErrorHandlingService(BaseTest): # Both send and receive messages should be generated, matched # and considered complete. messages = db.session.query(MessageInstanceModel).all() + # import pdb; pdb.set_trace() assert 2 == len(messages) assert "completed" == messages[0].status assert "completed" == messages[1].status diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index ac1a286e..b8cbb268 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -358,53 +358,56 @@ class TestProcessInstanceProcessor(BaseTest): processor_final = ProcessInstanceProcessor(process_instance_relookup) assert process_instance_relookup.status == "complete" - # first_data_set = {"set_in_top_level_script": 1} - # second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1}} - # third_data_set = { - # **second_data_set, - # **{"set_in_test_process_to_call_script": 1}, - # } - # expected_task_data = { - # "top_level_script": first_data_set, - # "manual_task": first_data_set, - # "top_level_subprocess_script": second_data_set, - # "top_level_subprocess": second_data_set, - # "test_process_to_call_script": third_data_set, - # "top_level_call_activity": third_data_set, - # "end_event_of_manual_task_model": third_data_set, - # } + first_data_set = {"set_in_top_level_script": 1} + second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1, "we_move_on": False}} + third_data_set = { + **second_data_set, + **{"set_in_test_process_to_call_script": 1}, + } + fourth_data_set = { + **third_data_set, + **{'a': 1, 'we_move_on': True} + } + expected_task_data = { + "top_level_script": first_data_set, + "manual_task": first_data_set, + "top_level_subprocess_script": second_data_set, + "top_level_subprocess": second_data_set, + "test_process_to_call_script": third_data_set, + "top_level_call_activity": third_data_set, + "end_event_of_manual_task_model": third_data_set, + "top_level_subprocess_script_second": fourth_data_set, + "test_process_to_call_script_second": fourth_data_set, + } + + spiff_tasks_checked_once: list = [] + + def assert_spiff_task_is_in_process(spiff_task_name: str, bpmn_process_identifier: str) -> None: + if spiff_task.task_spec.name == spiff_task_name: + expected_python_env_data = expected_task_data[spiff_task.task_spec.name] + if spiff_task.task_spec.name in spiff_tasks_checked_once: + expected_python_env_data = expected_task_data[f"{spiff_task.task_spec.name}_second"] + task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() + assert task.task_definition_id is not None + task_definition = task.task_definition + assert task_definition.bpmn_identifier == spiff_task_name + assert ( + task_definition.bpmn_process_definition.bpmn_identifier + == bpmn_process_identifier + ) + print(f"spiff_task_name: {spiff_task_name}") + print(f"task.json_data(): {task.json_data()}") + print(f"task.python_env_data(): {task.python_env_data()}") + assert task.python_env_data() == expected_python_env_data + spiff_tasks_checked_once.append(spiff_task.task_spec.name) all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks() assert len(all_spiff_tasks) > 1 for spiff_task in all_spiff_tasks: assert spiff_task.state == TaskState.COMPLETED - if spiff_task.task_spec.name == "test_process_to_call_script": - task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() - assert task.task_definition_id is not None - task_definition = task.task_definition - assert task_definition.bpmn_identifier == "test_process_to_call_script" - assert ( - task_definition.bpmn_process_definition.bpmn_identifier - == "test_process_to_call" - ) - elif spiff_task.task_spec.name == "top_level_subprocess_script": - task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() - assert task.task_definition_id is not None - task_definition = task.task_definition - assert task_definition.bpmn_identifier == "top_level_subprocess_script" - assert ( - task_definition.bpmn_process_definition.bpmn_identifier - == "top_level_subprocess" - ) - if spiff_task.task_spec.name == "top_level_script": - task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() - assert task.task_definition_id is not None - task_definition = task.task_definition - assert task_definition.bpmn_identifier == "top_level_script" - assert ( - task_definition.bpmn_process_definition.bpmn_identifier - == "top_level_process" - ) + assert_spiff_task_is_in_process("test_process_to_call_script", "test_process_to_call") + assert_spiff_task_is_in_process("top_level_subprocess_script", "top_level_subprocess") + assert_spiff_task_is_in_process("top_level_script", "top_level_process") # FIXME: Checking task data cannot work with the feature/remove-loop-reset branch # of SiffWorkflow. This is because it saves script data to the python_env and NOT # to task.data. We may need to either create a new column on TaskModel to put the python_env