diff --git a/spiffworkflow-backend/migrations/versions/389800c352ee_.py b/spiffworkflow-backend/migrations/versions/434e6494e8ff_.py similarity index 91% rename from spiffworkflow-backend/migrations/versions/389800c352ee_.py rename to spiffworkflow-backend/migrations/versions/434e6494e8ff_.py index bfcf5da8c..3663be8a9 100644 --- a/spiffworkflow-backend/migrations/versions/389800c352ee_.py +++ b/spiffworkflow-backend/migrations/versions/434e6494e8ff_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 389800c352ee +Revision ID: 434e6494e8ff Revises: -Create Date: 2023-03-07 10:40:43.709777 +Create Date: 2023-03-15 12:25:48.665481 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '389800c352ee' +revision = '434e6494e8ff' down_revision = None branch_labels = None depends_on = None @@ -166,8 +166,6 @@ def upgrade(): sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True), sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True), sa.Column('spiff_step', sa.Integer(), nullable=True), - sa.Column('locked_by', sa.String(length=80), nullable=True), - sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ), sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ), @@ -207,20 +205,6 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('key') ) - op.create_table('task', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('guid', sa.String(length=36), nullable=False), - sa.Column('bpmn_process_id', sa.Integer(), nullable=False), - sa.Column('state', sa.String(length=10), nullable=False), - sa.Column('properties_json', sa.JSON(), nullable=False), - sa.Column('json_data_hash', sa.String(length=255), nullable=False), - sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), - sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), - sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True) - op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False) op.create_table('task_definition', sa.Column('id', sa.Integer(), nullable=False), sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False), @@ -284,7 +268,7 @@ def upgrade(): sa.Column('payload', sa.JSON(), nullable=True), sa.Column('correlation_keys', sa.JSON(), nullable=True), sa.Column('status', sa.String(length=20), nullable=False), - sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=True), sa.Column('counterpart_id', sa.Integer(), nullable=True), sa.Column('failure_cause', sa.Text(), nullable=True), sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), @@ -331,6 +315,23 @@ def upgrade(): sa.UniqueConstraint('process_instance_id', 'key', name='process_instance_metadata_unique') ) op.create_index(op.f('ix_process_instance_metadata_key'), 'process_instance_metadata', ['key'], unique=False) + op.create_table('process_instance_queue', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_instance_id', sa.Integer(), nullable=False), + sa.Column('run_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('priority', sa.Integer(), nullable=True), + sa.Column('locked_by', sa.String(length=80), nullable=True), + sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('status', sa.String(length=50), nullable=True), + sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False) + op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False) + op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True) + op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False) op.create_table('spiff_step_details', sa.Column('id', sa.Integer(), nullable=False), sa.Column('process_instance_id', sa.Integer(), nullable=False), @@ -346,6 +347,26 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('process_instance_id', 'spiff_step', name='process_instance_id_spiff_step') ) + op.create_table('task', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('guid', sa.String(length=36), nullable=False), + sa.Column('bpmn_process_id', sa.Integer(), nullable=False), + sa.Column('process_instance_id', sa.Integer(), nullable=False), + sa.Column('task_definition_id', sa.Integer(), nullable=False), + sa.Column('state', sa.String(length=10), nullable=False), + sa.Column('properties_json', sa.JSON(), nullable=False), + sa.Column('json_data_hash', sa.String(length=255), nullable=False), + sa.Column('python_env_data_hash', sa.String(length=255), nullable=False), + sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), + sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), + sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), + sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), + sa.ForeignKeyConstraint(['task_definition_id'], ['task_definition.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True) + op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False) + op.create_index(op.f('ix_task_python_env_data_hash'), 'task', ['python_env_data_hash'], unique=False) op.create_table('human_task_user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('human_task_id', sa.Integer(), nullable=False), @@ -379,7 +400,16 @@ def downgrade(): op.drop_index(op.f('ix_human_task_user_user_id'), table_name='human_task_user') op.drop_index(op.f('ix_human_task_user_human_task_id'), table_name='human_task_user') op.drop_table('human_task_user') + op.drop_index(op.f('ix_task_python_env_data_hash'), table_name='task') + op.drop_index(op.f('ix_task_json_data_hash'), table_name='task') + op.drop_index(op.f('ix_task_guid'), table_name='task') + op.drop_table('task') op.drop_table('spiff_step_details') + op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue') + op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue') + op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue') + op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue') + op.drop_table('process_instance_queue') op.drop_index(op.f('ix_process_instance_metadata_key'), table_name='process_instance_metadata') op.drop_table('process_instance_metadata') op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data') @@ -392,9 +422,6 @@ def downgrade(): op.drop_table('user_group_assignment') op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition') op.drop_table('task_definition') - op.drop_index(op.f('ix_task_json_data_hash'), table_name='task') - op.drop_index(op.f('ix_task_guid'), table_name='task') - op.drop_table('task') op.drop_table('secret') op.drop_table('refresh_token') op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report') diff --git a/spiffworkflow-backend/migrations/versions/e2972eaf8469_.py b/spiffworkflow-backend/migrations/versions/e2972eaf8469_.py deleted file mode 100644 index f1796bfbc..000000000 --- a/spiffworkflow-backend/migrations/versions/e2972eaf8469_.py +++ /dev/null @@ -1,58 +0,0 @@ -"""empty message - -Revision ID: e2972eaf8469 -Revises: 389800c352ee -Create Date: 2023-03-13 22:00:21.579493 - -""" -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql - -# revision identifiers, used by Alembic. -revision = 'e2972eaf8469' -down_revision = '389800c352ee' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('process_instance_queue', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('process_instance_id', sa.Integer(), nullable=False), - sa.Column('run_at_in_seconds', sa.Integer(), nullable=True), - sa.Column('priority', sa.Integer(), nullable=True), - sa.Column('locked_by', sa.String(length=80), nullable=True), - sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), - sa.Column('status', sa.String(length=50), nullable=True), - sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), - sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False) - op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False) - op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True) - op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False) - op.alter_column('message_instance', 'user_id', - existing_type=mysql.INTEGER(), - nullable=True) - op.drop_column('process_instance', 'locked_by') - op.drop_column('process_instance', 'locked_at_in_seconds') - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.add_column('process_instance', sa.Column('locked_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True)) - op.add_column('process_instance', sa.Column('locked_by', mysql.VARCHAR(length=80), nullable=True)) - op.alter_column('message_instance', 'user_id', - existing_type=mysql.INTEGER(), - nullable=False) - op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue') - op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue') - op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue') - op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue') - op.drop_table('process_instance_queue') - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/poetry.lock b/spiffworkflow-backend/poetry.lock index 33503bb7c..a8d70db3b 100644 --- a/spiffworkflow-backend/poetry.lock +++ b/spiffworkflow-backend/poetry.lock @@ -1894,8 +1894,8 @@ lxml = "*" [package.source] type = "git" url = "https://github.com/sartography/SpiffWorkflow" -reference = "feature/remove-loop-reset" -resolved_reference = "13034aaf12f62aa3914744ca05bc9a3e3b3c3452" +reference = "main" +resolved_reference = "f162aac43af3af18d1a55186aeccea154fb8b05d" [[package]] name = "SQLAlchemy" @@ -2274,7 +2274,7 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools" [metadata] lock-version = "1.1" python-versions = ">=3.9,<3.12" -content-hash = "7ab6d5021406b573edfdca4f9e0f5e62c41a6f6ea09d34154df72454887e3670" +content-hash = "b9ea32912509637f1378d060771de7548d93953aa3db12d6a48098f7dc15205f" [metadata.files] alabaster = [ diff --git a/spiffworkflow-backend/pyproject.toml b/spiffworkflow-backend/pyproject.toml index 3b3f09aad..87f3a5d5c 100644 --- a/spiffworkflow-backend/pyproject.toml +++ b/spiffworkflow-backend/pyproject.toml @@ -27,7 +27,7 @@ flask-marshmallow = "*" flask-migrate = "*" flask-restful = "*" werkzeug = "*" -SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "feature/remove-loop-reset"} +SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"} # SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" } sentry-sdk = "^1.10" sphinx-autoapi = "^2.0" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py index 67e295e98..f7e301e43 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py @@ -1,6 +1,7 @@ from __future__ import annotations from sqlalchemy import ForeignKey +from sqlalchemy.orm import relationship from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel @@ -24,6 +25,8 @@ class BpmnProcessModel(SpiffworkflowBaseDBModel): properties_json: dict = db.Column(db.JSON, nullable=False) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) + tasks = relationship("TaskModel", cascade="delete") # type: ignore + # subprocess or top_level_process # process_type: str = db.Column(db.String(30), nullable=False) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py index 0723a50a0..95993e2e5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/json_data.py @@ -4,6 +4,10 @@ from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel +class JsonDataModelNotFoundError(Exception): + pass + + # delta algorithm <- just to save it for when we want to try to implement it: # a = {"hey": { "hey2": 2, "hey3": 3, "hey6": 7 }, "hey30": 3, "hey40": 4} # b = {"hey": { "hey2": 4, "hey5": 3 }, "hey20": 2, "hey30": 3} @@ -27,3 +31,18 @@ class JsonDataModel(SpiffworkflowBaseDBModel): # this is a sha256 hash of spec and serializer_version hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True) data: dict = db.Column(db.JSON, nullable=False) + + @classmethod + def find_object_by_hash(cls, hash: str) -> JsonDataModel: + json_data_model: JsonDataModel | None = JsonDataModel.query.filter_by( + hash=hash + ).first() + if json_data_model is None: + raise JsonDataModelNotFoundError( + f"Could not find a json data model entry with hash: {hash}" + ) + return json_data_model + + @classmethod + def find_data_dict_by_hash(cls, hash: str) -> dict: + return cls.find_object_by_hash(hash).data diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index f155494a4..dc66c86f8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -71,7 +71,8 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): bpmn_process_id: int | None = db.Column( ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore ) - bpmn_process = relationship(BpmnProcessModel) + bpmn_process = relationship(BpmnProcessModel, cascade="delete") + tasks = relationship("TaskModel", cascade="delete") # type: ignore spiff_serializer_version = db.Column(db.String(50), nullable=True) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py index b35c87598..fc0d3262b 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py @@ -10,10 +10,13 @@ from marshmallow import Schema from marshmallow_enum import EnumField # type: ignore from SpiffWorkflow.task import TaskStateNames # type: ignore from sqlalchemy import ForeignKey +from sqlalchemy.orm import relationship from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel +from spiffworkflow_backend.models.json_data import JsonDataModel +from spiffworkflow_backend.models.task_definition import TaskDefinitionModel class MultiInstanceType(enum.Enum): @@ -45,18 +48,31 @@ class TaskModel(SpiffworkflowBaseDBModel): bpmn_process_id: int = db.Column( ForeignKey(BpmnProcessModel.id), nullable=False # type: ignore ) + process_instance_id: int = db.Column( + ForeignKey("process_instance.id"), nullable=False + ) # find this by looking up the "workflow_name" and "task_spec" from the properties_json - # task_definition_id: int = db.Column( - # ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore - # ) + task_definition_id: int = db.Column( + ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore + ) + task_definition = relationship("TaskDefinitionModel") + state: str = db.Column(db.String(10), nullable=False) properties_json: dict = db.Column(db.JSON, nullable=False) + json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) + python_env_data_hash: str = db.Column(db.String(255), nullable=False, index=True) start_in_seconds: float = db.Column(db.DECIMAL(17, 6)) end_in_seconds: Union[float, None] = db.Column(db.DECIMAL(17, 6)) + def python_env_data(self) -> dict: + return JsonDataModel.find_data_dict_by_hash(self.python_env_data_hash) + + def json_data(self) -> dict: + return JsonDataModel.find_data_dict_by_hash(self.json_data_hash) + class Task: """Task.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index dd2b405a9..89cea4ae8 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -456,7 +456,16 @@ class ProcessInstanceProcessor: self.process_instance_model = process_instance_model self.process_model_service = ProcessModelService() bpmn_process_spec = None - self.full_bpmn_process_dict = {} + self.full_bpmn_process_dict: dict = {} + + # this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id + # in the database. This is to cut down on database queries while adding new tasks to the database. + # Structure: + # { "bpmn_process_definition_identifier": { "task_identifier": task_definition } } + # To use from a spiff_task: + # [spiff_task.workflow.spec.name][spiff_task.task_spec.name] + self.bpmn_definition_to_task_definitions_mappings: dict = {} + subprocesses: Optional[IdToBpmnProcessSpecMapping] = None if process_instance_model.bpmn_process_definition_id is None: ( @@ -472,13 +481,15 @@ class ProcessInstanceProcessor: ) try: - (self.bpmn_process_instance, self.full_bpmn_process_dict) = ( - self.__get_bpmn_process_instance( - process_instance_model, - bpmn_process_spec, - validate_only, - subprocesses=subprocesses, - ) + ( + self.bpmn_process_instance, + self.full_bpmn_process_dict, + self.bpmn_definition_to_task_definitions_mappings, + ) = self.__get_bpmn_process_instance( + process_instance_model, + bpmn_process_spec, + validate_only, + subprocesses=subprocesses, ) self.set_script_engine(self.bpmn_process_instance) @@ -537,9 +548,29 @@ class ProcessInstanceProcessor: self.bpmn_process_instance ) + @classmethod + def _update_bpmn_definition_mappings( + cls, + bpmn_definition_to_task_definitions_mappings: dict, + bpmn_process_definition_identifier: str, + task_definition: TaskDefinitionModel, + ) -> None: + if ( + bpmn_process_definition_identifier + not in bpmn_definition_to_task_definitions_mappings + ): + bpmn_definition_to_task_definitions_mappings[ + bpmn_process_definition_identifier + ] = {} + bpmn_definition_to_task_definitions_mappings[ + bpmn_process_definition_identifier + ][task_definition.bpmn_identifier] = task_definition + @classmethod def _get_definition_dict_for_bpmn_process_definition( - cls, bpmn_process_definition: BpmnProcessDefinitionModel + cls, + bpmn_process_definition: BpmnProcessDefinitionModel, + bpmn_definition_to_task_definitions_mappings: dict, ) -> dict: task_definitions = TaskDefinitionModel.query.filter_by( bpmn_process_definition_id=bpmn_process_definition.id @@ -550,6 +581,11 @@ class ProcessInstanceProcessor: bpmn_process_definition_dict["task_specs"][ task_definition.bpmn_identifier ] = task_definition.properties_json + cls._update_bpmn_definition_mappings( + bpmn_definition_to_task_definitions_mappings, + bpmn_process_definition.bpmn_identifier, + task_definition, + ) return bpmn_process_definition_dict @classmethod @@ -557,6 +593,7 @@ class ProcessInstanceProcessor: cls, bpmn_process_definition: BpmnProcessDefinitionModel, spiff_bpmn_process_dict: dict, + bpmn_definition_to_task_definitions_mappings: dict, ) -> None: # find all child subprocesses of a process bpmn_process_subprocess_definitions = ( @@ -595,6 +632,11 @@ class ProcessInstanceProcessor: task_definition.bpmn_process_definition_id ] ) + cls._update_bpmn_definition_mappings( + bpmn_definition_to_task_definitions_mappings, + bpmn_subprocess_definition_bpmn_identifier, + task_definition, + ) spiff_bpmn_process_dict["subprocess_specs"][ bpmn_subprocess_definition_bpmn_identifier ]["task_specs"][ @@ -643,7 +685,9 @@ class ProcessInstanceProcessor: @classmethod def _get_full_bpmn_process_dict( - cls, process_instance_model: ProcessInstanceModel + cls, + process_instance_model: ProcessInstanceModel, + bpmn_definition_to_task_definitions_mappings: dict, ) -> dict: if process_instance_model.bpmn_process_definition_id is None: return {} @@ -658,11 +702,14 @@ class ProcessInstanceProcessor: if bpmn_process_definition is not None: spiff_bpmn_process_dict["spec"] = ( cls._get_definition_dict_for_bpmn_process_definition( - bpmn_process_definition + bpmn_process_definition, + bpmn_definition_to_task_definitions_mappings, ) ) cls._set_definition_dict_for_bpmn_subprocess_definitions( - bpmn_process_definition, spiff_bpmn_process_dict + bpmn_process_definition, + spiff_bpmn_process_dict, + bpmn_definition_to_task_definitions_mappings, ) bpmn_process = process_instance_model.bpmn_process @@ -729,8 +776,9 @@ class ProcessInstanceProcessor: spec: Optional[BpmnProcessSpec] = None, validate_only: bool = False, subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, - ) -> BpmnWorkflow: + ) -> Tuple[BpmnWorkflow, dict, dict]: full_bpmn_process_dict = {} + bpmn_definition_to_task_definitions_mappings: dict = {} if process_instance_model.bpmn_process_definition_id is not None: # turn off logging to avoid duplicated spiff logs spiff_logger = logging.getLogger("spiff") @@ -740,7 +788,8 @@ class ProcessInstanceProcessor: try: full_bpmn_process_dict = ( ProcessInstanceProcessor._get_full_bpmn_process_dict( - process_instance_model + process_instance_model, + bpmn_definition_to_task_definitions_mappings, ) ) bpmn_process_instance = ( @@ -763,7 +812,11 @@ class ProcessInstanceProcessor: bpmn_process_instance.data[ ProcessInstanceProcessor.VALIDATION_PROCESS_KEY ] = validate_only - return (bpmn_process_instance, full_bpmn_process_dict) + return ( + bpmn_process_instance, + full_bpmn_process_dict, + bpmn_definition_to_task_definitions_mappings, + ) def slam_in_data(self, data: dict) -> None: """Slam_in_data.""" @@ -1025,6 +1078,7 @@ class ProcessInstanceProcessor: self, process_bpmn_properties: dict, bpmn_process_definition_parent: Optional[BpmnProcessDefinitionModel] = None, + store_bpmn_definition_mappings: bool = False, ) -> BpmnProcessDefinitionModel: process_bpmn_identifier = process_bpmn_properties["name"] new_hash_digest = sha256( @@ -1033,6 +1087,7 @@ class ProcessInstanceProcessor: bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = ( BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first() ) + if bpmn_process_definition is None: task_specs = process_bpmn_properties.pop("task_specs") bpmn_process_definition = BpmnProcessDefinitionModel( @@ -1050,6 +1105,24 @@ class ProcessInstanceProcessor: typename=task_bpmn_properties["typename"], ) db.session.add(task_definition) + if store_bpmn_definition_mappings: + self._update_bpmn_definition_mappings( + self.bpmn_definition_to_task_definitions_mappings, + process_bpmn_identifier, + task_definition, + ) + elif store_bpmn_definition_mappings: + # this should only ever happen when new process instances use a pre-existing bpmn process definitions + # otherwise this should get populated on processor initialization + task_definitions = TaskDefinitionModel.query.filter_by( + bpmn_process_definition_id=bpmn_process_definition.id + ).all() + for task_definition in task_definitions: + self._update_bpmn_definition_mappings( + self.bpmn_definition_to_task_definitions_mappings, + process_bpmn_identifier, + task_definition, + ) if bpmn_process_definition_parent is not None: bpmn_process_definition_relationship = ( @@ -1067,12 +1140,19 @@ class ProcessInstanceProcessor: return bpmn_process_definition def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None: + # store only if mappings is currently empty. this also would mean this is a new instance that has never saved before + store_bpmn_definition_mappings = ( + not self.bpmn_definition_to_task_definitions_mappings + ) bpmn_process_definition_parent = self._store_bpmn_process_definition( - bpmn_spec_dict["spec"] + bpmn_spec_dict["spec"], + store_bpmn_definition_mappings=store_bpmn_definition_mappings, ) for process_bpmn_properties in bpmn_spec_dict["subprocess_specs"].values(): self._store_bpmn_process_definition( - process_bpmn_properties, bpmn_process_definition_parent + process_bpmn_properties, + bpmn_process_definition_parent, + store_bpmn_definition_mappings=store_bpmn_definition_mappings, ) self.process_instance_model.bpmn_process_definition = ( bpmn_process_definition_parent @@ -1083,7 +1163,7 @@ class ProcessInstanceProcessor: Expects the save method to commit it. """ - bpmn_dict = json.loads(self.serialize()) + bpmn_dict = self.serialize() bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") process_instance_data_dict = {} bpmn_spec_dict = {} @@ -1093,14 +1173,18 @@ class ProcessInstanceProcessor: else: process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key] - # FIXME: always save new hash until we get updated Spiff without loopresettask - # if self.process_instance_model.bpmn_process_definition_id is None: - self._add_bpmn_process_definitions(bpmn_spec_dict) + # we may have to already process bpmn_defintions if we ever care about the Root task again + if self.process_instance_model.bpmn_process_definition_id is None: + self._add_bpmn_process_definitions(bpmn_spec_dict) subprocesses = process_instance_data_dict.pop("subprocesses") bpmn_process_parent, new_task_models, new_json_data_dicts = ( TaskService.add_bpmn_process( - process_instance_data_dict, self.process_instance_model + bpmn_process_dict=process_instance_data_dict, + process_instance=self.process_instance_model, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + spiff_workflow=self.bpmn_process_instance, + serializer=self._serializer, ) ) for subprocess_task_id, subprocess_properties in subprocesses.items(): @@ -1109,10 +1193,13 @@ class ProcessInstanceProcessor: subprocess_new_task_models, subprocess_new_json_data_models, ) = TaskService.add_bpmn_process( - subprocess_properties, - self.process_instance_model, - bpmn_process_parent, + bpmn_process_dict=subprocess_properties, + process_instance=self.process_instance_model, + bpmn_process_parent=bpmn_process_parent, bpmn_process_guid=subprocess_task_id, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, + spiff_workflow=self.bpmn_process_instance, + serializer=self._serializer, ) new_task_models.update(subprocess_new_task_models) new_json_data_dicts.update(subprocess_new_json_data_models) @@ -1631,6 +1718,7 @@ class ProcessInstanceProcessor: secondary_engine_step_delegate=step_delegate, serializer=self._serializer, process_instance=self.process_instance_model, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) if execution_strategy_name is None: @@ -1722,11 +1810,11 @@ class ProcessInstanceProcessor: ) ) - def serialize(self) -> str: + def serialize(self) -> dict: """Serialize.""" self.check_task_data_size() self.preserve_script_engine_state() - return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore + return self._serializer.workflow_to_dict(self.bpmn_process_instance) # type: ignore def next_user_tasks(self) -> list[SpiffTask]: """Next_user_tasks.""" @@ -1870,18 +1958,19 @@ class ProcessInstanceProcessor: db.session.add(details_model) # ####### - json_data_dict = TaskService.update_task_model( + json_data_dict_list = TaskService.update_task_model( task_model, spiff_task, self._serializer ) - if json_data_dict is not None: - json_data = ( - db.session.query(JsonDataModel.id) - .filter_by(hash=json_data_dict["hash"]) - .first() - ) - if json_data is None: - json_data = JsonDataModel(**json_data_dict) - db.session.add(json_data) + for json_data_dict in json_data_dict_list: + if json_data_dict is not None: + json_data = ( + db.session.query(JsonDataModel.id) + .filter_by(hash=json_data_dict["hash"]) + .first() + ) + if json_data is None: + json_data = JsonDataModel(**json_data_dict) + db.session.add(json_data) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well) self.save() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index dbd0a912d..e6ae791ee 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -3,6 +3,7 @@ from hashlib import sha256 from typing import Optional from typing import Tuple from typing import TypedDict +from uuid import UUID from flask import current_app from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore @@ -25,6 +26,8 @@ class JsonDataDict(TypedDict): class TaskService: + PYTHON_ENVIRONMENT_STATE_KEY = "spiff__python_env_state" + @classmethod def insert_or_update_json_data_records( cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict] @@ -35,7 +38,7 @@ class TaskService: if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql": insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts) on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( - data=insert_stmt.inserted.data, status="U" + data=insert_stmt.inserted.data ) else: insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts) @@ -44,25 +47,13 @@ class TaskService: ) db.session.execute(on_duplicate_key_stmt) - @classmethod - def _update_task_data_on_task_model( - cls, task_model: TaskModel, task_data_dict: dict - ) -> Optional[JsonDataDict]: - task_data_json = json.dumps(task_data_dict, sort_keys=True) - task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest() - json_data_dict: Optional[JsonDataDict] = None - if task_model.json_data_hash != task_data_hash: - json_data_dict = {"hash": task_data_hash, "data": task_data_dict} - task_model.json_data_hash = task_data_hash - return json_data_dict - @classmethod def update_task_model( cls, task_model: TaskModel, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer, - ) -> Optional[JsonDataDict]: + ) -> list[Optional[JsonDataDict]]: """Updates properties_json and data on given task_model. This will NOT update start_in_seconds or end_in_seconds. @@ -70,12 +61,18 @@ class TaskService: """ new_properties_json = serializer.task_to_dict(spiff_task) spiff_task_data = new_properties_json.pop("data") + python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task( + spiff_task, serializer + ) task_model.properties_json = new_properties_json task_model.state = TaskStateNames[new_properties_json["state"]] json_data_dict = cls._update_task_data_on_task_model( - task_model, spiff_task_data + task_model, spiff_task_data, "json_data_hash" ) - return json_data_dict + python_env_dict = cls._update_task_data_on_task_model( + task_model, python_env_data_dict, "python_env_data_hash" + ) + return [json_data_dict, python_env_dict] @classmethod def find_or_create_task_model_from_spiff_task( @@ -83,6 +80,7 @@ class TaskService: spiff_task: SpiffTask, process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, + bpmn_definition_to_task_definitions_mappings: dict, ) -> Tuple[ Optional[BpmnProcessModel], TaskModel, @@ -98,12 +96,21 @@ class TaskService: new_json_data_dicts: dict[str, JsonDataDict] = {} if task_model is None: bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process( - spiff_task, process_instance, serializer + spiff_task, + process_instance, + serializer, + bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings, ) task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() if task_model is None: + task_definition = bpmn_definition_to_task_definitions_mappings[ + spiff_task.workflow.spec.name + ][spiff_task.task_spec.name] task_model = TaskModel( - guid=spiff_task_guid, bpmn_process_id=bpmn_process.id + guid=spiff_task_guid, + bpmn_process_id=bpmn_process.id, + process_instance_id=process_instance.id, + task_definition_id=task_definition.id, ) return (bpmn_process, task_model, new_task_models, new_json_data_dicts) @@ -130,6 +137,7 @@ class TaskService: spiff_task: SpiffTask, process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, + bpmn_definition_to_task_definitions_mappings: dict, ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: subprocess_guid, subprocess = cls.task_subprocess(spiff_task) bpmn_process: Optional[BpmnProcessModel] = None @@ -140,12 +148,14 @@ class TaskService: # This is the top level workflow, which has no guid # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None if process_instance.bpmn_process_id is None: + spiff_workflow = spiff_task.workflow._get_outermost_workflow() bpmn_process, new_task_models, new_json_data_dicts = ( cls.add_bpmn_process( - serializer.workflow_to_dict( - spiff_task.workflow._get_outermost_workflow() - ), - process_instance, + bpmn_process_dict=serializer.workflow_to_dict(spiff_workflow), + process_instance=process_instance, + bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings, + spiff_workflow=spiff_workflow, + serializer=serializer, ) ) else: @@ -153,12 +163,16 @@ class TaskService: guid=subprocess_guid ).first() if bpmn_process is None: + spiff_workflow = spiff_task.workflow bpmn_process, new_task_models, new_json_data_dicts = ( cls.add_bpmn_process( - serializer.workflow_to_dict(subprocess), - process_instance, - process_instance.bpmn_process, - subprocess_guid, + bpmn_process_dict=serializer.workflow_to_dict(subprocess), + process_instance=process_instance, + bpmn_process_parent=process_instance.bpmn_process, + bpmn_process_guid=subprocess_guid, + bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings, + spiff_workflow=spiff_workflow, + serializer=serializer, ) ) return (bpmn_process, new_task_models, new_json_data_dicts) @@ -168,6 +182,9 @@ class TaskService: cls, bpmn_process_dict: dict, process_instance: ProcessInstanceModel, + bpmn_definition_to_task_definitions_mappings: dict, + spiff_workflow: BpmnWorkflow, + serializer: BpmnWorkflowSerializer, bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None, ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: @@ -195,6 +212,12 @@ class TaskService: bpmn_process_is_new = True bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) + # Point the root id to the Start task instead of the Root task + # since we are ignoring the Root task. + for task_id, task_properties in tasks.items(): + if task_properties["task_spec"] == "Start": + bpmn_process_dict["root"] = task_id + bpmn_process.properties_json = bpmn_process_dict bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True) @@ -219,29 +242,85 @@ class TaskService: if bpmn_process_is_new: for task_id, task_properties in tasks.items(): + # The Root task is added to the spec by Spiff when the bpmn process is instantiated + # within Spiff. We do not actually need it and it's missing from our initial + # bpmn process defintion so let's avoid using it. + if task_properties["task_spec"] == "Root": + continue + if task_properties["task_spec"] == "Start": + task_properties["parent"] = None + task_data_dict = task_properties.pop("data") state_int = task_properties["state"] + spiff_task = spiff_workflow.get_task(UUID(task_id)) task_model = TaskModel.query.filter_by(guid=task_id).first() if task_model is None: - # bpmn_process_identifier = task_properties['workflow_name'] - # bpmn_identifier = task_properties['task_spec'] - # - # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) - # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() - # if task_definition is None: - # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) - task_model = TaskModel( - guid=task_id, bpmn_process_id=bpmn_process.id + task_model = cls._create_task( + bpmn_process, + process_instance, + spiff_task, + bpmn_definition_to_task_definitions_mappings, ) task_model.state = TaskStateNames[state_int] task_model.properties_json = task_properties + new_task_models[task_model.guid] = task_model json_data_dict = TaskService._update_task_data_on_task_model( - task_model, task_data_dict + task_model, task_data_dict, "json_data_hash" ) - new_task_models[task_model.guid] = task_model if json_data_dict is not None: new_json_data_dicts[json_data_dict["hash"]] = json_data_dict + python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task( + spiff_task, serializer + ) + python_env_dict = TaskService._update_task_data_on_task_model( + task_model, python_env_data_dict, "python_env_data_hash" + ) + if python_env_dict is not None: + new_json_data_dicts[python_env_dict["hash"]] = python_env_dict + return (bpmn_process, new_task_models, new_json_data_dicts) + + @classmethod + def _update_task_data_on_task_model( + cls, task_model: TaskModel, task_data_dict: dict, task_model_data_column: str + ) -> Optional[JsonDataDict]: + task_data_json = json.dumps(task_data_dict, sort_keys=True) + task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest() + json_data_dict: Optional[JsonDataDict] = None + if getattr(task_model, task_model_data_column) != task_data_hash: + json_data_dict = {"hash": task_data_hash, "data": task_data_dict} + setattr(task_model, task_model_data_column, task_data_hash) + return json_data_dict + + @classmethod + def _create_task( + cls, + bpmn_process: BpmnProcessModel, + process_instance: ProcessInstanceModel, + spiff_task: SpiffTask, + bpmn_definition_to_task_definitions_mappings: dict, + ) -> TaskModel: + task_definition = bpmn_definition_to_task_definitions_mappings[ + spiff_task.workflow.spec.name + ][spiff_task.task_spec.name] + task_model = TaskModel( + guid=str(spiff_task.id), + bpmn_process_id=bpmn_process.id, + process_instance_id=process_instance.id, + task_definition_id=task_definition.id, + ) + return task_model + + @classmethod + def _get_python_env_data_dict_from_spiff_task( + cls, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer + ) -> dict: + user_defined_state = ( + spiff_task.workflow.script_engine.environment.user_defined_state() + ) + # this helps to convert items like datetime objects to be json serializable + converted_data: dict = serializer.data_converter.convert(user_defined_state) + return converted_data diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 386f20543..63a54baeb 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -57,10 +57,14 @@ class TaskModelSavingDelegate(EngineStepDelegate): self, serializer: BpmnWorkflowSerializer, process_instance: ProcessInstanceModel, + bpmn_definition_to_task_definitions_mappings: dict, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, ) -> None: self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance + self.bpmn_definition_to_task_definitions_mappings = ( + bpmn_definition_to_task_definitions_mappings + ) self.current_task_model: Optional[TaskModel] = None self.task_models: dict[str, TaskModel] = {} @@ -74,11 +78,21 @@ class TaskModelSavingDelegate(EngineStepDelegate): """ return self.process_instance.bpmn_process_id is not None + def _update_json_data_dicts_using_list( + self, json_data_dict_list: list[Optional[JsonDataDict]] + ) -> None: + for json_data_dict in json_data_dict_list: + if json_data_dict is not None: + self.json_data_dicts[json_data_dict["hash"]] = json_data_dict + def will_complete_task(self, spiff_task: SpiffTask) -> None: if self.should_update_task_model(): _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( - spiff_task, self.process_instance, self.serializer + spiff_task, + self.process_instance, + self.serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) ) self.current_task_model = task_model @@ -91,11 +105,10 @@ class TaskModelSavingDelegate(EngineStepDelegate): def did_complete_task(self, spiff_task: SpiffTask) -> None: if self.current_task_model and self.should_update_task_model(): self.current_task_model.end_in_seconds = time.time() - json_data_dict = TaskService.update_task_model( + json_data_dict_list = TaskService.update_task_model( self.current_task_model, spiff_task, self.serializer ) - if json_data_dict is not None: - self.json_data_dicts[json_data_dict["hash"]] = json_data_dict + self._update_json_data_dicts_using_list(json_data_dict_list) self.task_models[self.current_task_model.guid] = self.current_task_model if self.secondary_engine_step_delegate: self.secondary_engine_step_delegate.did_complete_task(spiff_task) @@ -119,19 +132,21 @@ class TaskModelSavingDelegate(EngineStepDelegate): | TaskState.MAYBE | TaskState.LIKELY ): - _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( + bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( - waiting_spiff_task, self.process_instance, self.serializer + waiting_spiff_task, + self.process_instance, + self.serializer, + bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings, ) ) self.task_models.update(new_task_models) self.json_data_dicts.update(new_json_data_dicts) - json_data_dict = TaskService.update_task_model( + json_data_dict_list = TaskService.update_task_model( task_model, waiting_spiff_task, self.serializer ) self.task_models[task_model.guid] = task_model - if json_data_dict is not None: - self.json_data_dicts[json_data_dict["hash"]] = json_data_dict + self._update_json_data_dicts_using_list(json_data_dict_list) class StepDetailLoggingDelegate(EngineStepDelegate): diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py similarity index 99% rename from spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling.py rename to spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py index 9d481788f..440604495 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py @@ -99,6 +99,7 @@ class TestErrorHandlingService(BaseTest): # Both send and receive messages should be generated, matched # and considered complete. messages = db.session.query(MessageInstanceModel).all() + # import pdb; pdb.set_trace() assert 2 == len(messages) assert "completed" == messages[0].status assert "completed" == messages[1].status diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 3452dcf13..827a3b3de 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -358,41 +358,62 @@ class TestProcessInstanceProcessor(BaseTest): processor_final = ProcessInstanceProcessor(process_instance_relookup) assert process_instance_relookup.status == "complete" - # first_data_set = {"set_in_top_level_script": 1} - # second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1}} - # third_data_set = { - # **second_data_set, - # **{"set_in_test_process_to_call_script": 1}, - # } - # expected_task_data = { - # "top_level_script": first_data_set, - # "manual_task": first_data_set, - # "top_level_subprocess_script": second_data_set, - # "top_level_subprocess": second_data_set, - # "test_process_to_call_script": third_data_set, - # "top_level_call_activity": third_data_set, - # "end_event_of_manual_task_model": third_data_set, - # } + first_data_set = {"set_in_top_level_script": 1} + second_data_set = { + **first_data_set, + **{"set_in_top_level_subprocess": 1, "we_move_on": False}, + } + third_data_set = { + **second_data_set, + **{"set_in_test_process_to_call_script": 1}, + } + fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}} + expected_task_data = { + "top_level_script": first_data_set, + "manual_task": first_data_set, + "top_level_subprocess_script": second_data_set, + "top_level_subprocess": second_data_set, + "test_process_to_call_script": third_data_set, + "top_level_call_activity": third_data_set, + "end_event_of_manual_task_model": third_data_set, + "top_level_subprocess_script_second": fourth_data_set, + "test_process_to_call_script_second": fourth_data_set, + } + + spiff_tasks_checked_once: list = [] + + # TODO: also check task data here from the spiff_task directly to ensure we hydrated spiff correctly + def assert_spiff_task_is_in_process( + spiff_task_name: str, bpmn_process_identifier: str + ) -> None: + if spiff_task.task_spec.name == spiff_task_name: + expected_python_env_data = expected_task_data[spiff_task.task_spec.name] + if spiff_task.task_spec.name in spiff_tasks_checked_once: + expected_python_env_data = expected_task_data[ + f"{spiff_task.task_spec.name}_second" + ] + task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() + assert task.task_definition_id is not None + task_definition = task.task_definition + assert task_definition.bpmn_identifier == spiff_task_name + assert ( + task_definition.bpmn_process_definition.bpmn_identifier + == bpmn_process_identifier + ) + assert task.python_env_data() == expected_python_env_data + spiff_tasks_checked_once.append(spiff_task.task_spec.name) all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks() assert len(all_spiff_tasks) > 1 for spiff_task in all_spiff_tasks: assert spiff_task.state == TaskState.COMPLETED - # FIXME: Checking task data cannot work with the feature/remove-loop-reset branch - # of SiffWorkflow. This is because it saves script data to the python_env and NOT - # to task.data. We may need to either create a new column on TaskModel to put the python_env - # data or we could just shove it back onto the task data when adding to the database. - # Right now everything works in practice because the python_env data is on the top level workflow - # and so is always there but is also always the most recent. If we want to replace spiff_step_details - # with TaskModel then we'll need some way to store python_env on each task. - # spiff_task_name = spiff_task.task_spec.name - # if spiff_task_name in expected_task_data: - # spiff_task_data = expected_task_data[spiff_task_name] - # failure_message = ( - # f"Found unexpected task data on {spiff_task_name}. " - # f"Expected: {spiff_task_data}, Found: {spiff_task.data}" - # ) - # assert spiff_task.data == spiff_task_data, failure_message + assert_spiff_task_is_in_process( + "test_process_to_call_script", "test_process_to_call" + ) + assert_spiff_task_is_in_process( + "top_level_subprocess_script", "top_level_subprocess" + ) + assert_spiff_task_is_in_process("top_level_script", "top_level_process") def test_does_not_recreate_human_tasks_on_multiple_saves( self,