diff --git a/spiffworkflow-backend/migrations/env.py b/spiffworkflow-backend/migrations/env.py index 630e381a..68feded2 100644 --- a/spiffworkflow-backend/migrations/env.py +++ b/spiffworkflow-backend/migrations/env.py @@ -1,3 +1,5 @@ +from __future__ import with_statement + import logging from logging.config import fileConfig diff --git a/spiffworkflow-backend/migrations/versions/389800c352ee_.py b/spiffworkflow-backend/migrations/versions/99f1b5156b06_.py similarity index 92% rename from spiffworkflow-backend/migrations/versions/389800c352ee_.py rename to spiffworkflow-backend/migrations/versions/99f1b5156b06_.py index bfcf5da8..9407aeaf 100644 --- a/spiffworkflow-backend/migrations/versions/389800c352ee_.py +++ b/spiffworkflow-backend/migrations/versions/99f1b5156b06_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 389800c352ee +Revision ID: 99f1b5156b06 Revises: -Create Date: 2023-03-07 10:40:43.709777 +Create Date: 2023-03-14 17:23:22.667853 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '389800c352ee' +revision = '99f1b5156b06' down_revision = None branch_labels = None depends_on = None @@ -166,8 +166,6 @@ def upgrade(): sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True), sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True), sa.Column('spiff_step', sa.Integer(), nullable=True), - sa.Column('locked_by', sa.String(length=80), nullable=True), - sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ), sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ), @@ -207,20 +205,6 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('key') ) - op.create_table('task', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('guid', sa.String(length=36), nullable=False), - sa.Column('bpmn_process_id', sa.Integer(), nullable=False), - sa.Column('state', sa.String(length=10), nullable=False), - sa.Column('properties_json', sa.JSON(), nullable=False), - sa.Column('json_data_hash', sa.String(length=255), nullable=False), - sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), - sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), - sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True) - op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False) op.create_table('task_definition', sa.Column('id', sa.Integer(), nullable=False), sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False), @@ -284,7 +268,7 @@ def upgrade(): sa.Column('payload', sa.JSON(), nullable=True), sa.Column('correlation_keys', sa.JSON(), nullable=True), sa.Column('status', sa.String(length=20), nullable=False), - sa.Column('user_id', sa.Integer(), nullable=False), + sa.Column('user_id', sa.Integer(), nullable=True), sa.Column('counterpart_id', sa.Integer(), nullable=True), sa.Column('failure_cause', sa.Text(), nullable=True), sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), @@ -331,6 +315,23 @@ def upgrade(): sa.UniqueConstraint('process_instance_id', 'key', name='process_instance_metadata_unique') ) op.create_index(op.f('ix_process_instance_metadata_key'), 'process_instance_metadata', ['key'], unique=False) + op.create_table('process_instance_queue', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('process_instance_id', sa.Integer(), nullable=False), + sa.Column('run_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('priority', sa.Integer(), nullable=True), + sa.Column('locked_by', sa.String(length=80), nullable=True), + sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('status', sa.String(length=50), nullable=True), + sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False) + op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False) + op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True) + op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False) op.create_table('spiff_step_details', sa.Column('id', sa.Integer(), nullable=False), sa.Column('process_instance_id', sa.Integer(), nullable=False), @@ -346,6 +347,24 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('process_instance_id', 'spiff_step', name='process_instance_id_spiff_step') ) + op.create_table('task', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('guid', sa.String(length=36), nullable=False), + sa.Column('bpmn_process_id', sa.Integer(), nullable=False), + sa.Column('process_instance_id', sa.Integer(), nullable=False), + sa.Column('task_definition_id', sa.Integer(), nullable=False), + sa.Column('state', sa.String(length=10), nullable=False), + sa.Column('properties_json', sa.JSON(), nullable=False), + sa.Column('json_data_hash', sa.String(length=255), nullable=False), + sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), + sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True), + sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), + sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), + sa.ForeignKeyConstraint(['task_definition_id'], ['task_definition.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True) + op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False) op.create_table('human_task_user', sa.Column('id', sa.Integer(), nullable=False), sa.Column('human_task_id', sa.Integer(), nullable=False), @@ -379,7 +398,15 @@ def downgrade(): op.drop_index(op.f('ix_human_task_user_user_id'), table_name='human_task_user') op.drop_index(op.f('ix_human_task_user_human_task_id'), table_name='human_task_user') op.drop_table('human_task_user') + op.drop_index(op.f('ix_task_json_data_hash'), table_name='task') + op.drop_index(op.f('ix_task_guid'), table_name='task') + op.drop_table('task') op.drop_table('spiff_step_details') + op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue') + op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue') + op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue') + op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue') + op.drop_table('process_instance_queue') op.drop_index(op.f('ix_process_instance_metadata_key'), table_name='process_instance_metadata') op.drop_table('process_instance_metadata') op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data') @@ -392,9 +419,6 @@ def downgrade(): op.drop_table('user_group_assignment') op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition') op.drop_table('task_definition') - op.drop_index(op.f('ix_task_json_data_hash'), table_name='task') - op.drop_index(op.f('ix_task_guid'), table_name='task') - op.drop_table('task') op.drop_table('secret') op.drop_table('refresh_token') op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report') diff --git a/spiffworkflow-backend/migrations/versions/e2972eaf8469_.py b/spiffworkflow-backend/migrations/versions/e2972eaf8469_.py deleted file mode 100644 index f1796bfb..00000000 --- a/spiffworkflow-backend/migrations/versions/e2972eaf8469_.py +++ /dev/null @@ -1,58 +0,0 @@ -"""empty message - -Revision ID: e2972eaf8469 -Revises: 389800c352ee -Create Date: 2023-03-13 22:00:21.579493 - -""" -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql - -# revision identifiers, used by Alembic. -revision = 'e2972eaf8469' -down_revision = '389800c352ee' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('process_instance_queue', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('process_instance_id', sa.Integer(), nullable=False), - sa.Column('run_at_in_seconds', sa.Integer(), nullable=True), - sa.Column('priority', sa.Integer(), nullable=True), - sa.Column('locked_by', sa.String(length=80), nullable=True), - sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), - sa.Column('status', sa.String(length=50), nullable=True), - sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), - sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), - sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ), - sa.PrimaryKeyConstraint('id') - ) - op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False) - op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False) - op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True) - op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False) - op.alter_column('message_instance', 'user_id', - existing_type=mysql.INTEGER(), - nullable=True) - op.drop_column('process_instance', 'locked_by') - op.drop_column('process_instance', 'locked_at_in_seconds') - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.add_column('process_instance', sa.Column('locked_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True)) - op.add_column('process_instance', sa.Column('locked_by', mysql.VARCHAR(length=80), nullable=True)) - op.alter_column('message_instance', 'user_id', - existing_type=mysql.INTEGER(), - nullable=False) - op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue') - op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue') - op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue') - op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue') - op.drop_table('process_instance_queue') - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py index b35c8759..14746d6f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py @@ -1,5 +1,8 @@ """Task.""" import enum + +from sqlalchemy.orm import relationship +from spiffworkflow_backend.models.task_definition import TaskDefinitionModel from dataclasses import dataclass from typing import Any from typing import Optional @@ -45,11 +48,16 @@ class TaskModel(SpiffworkflowBaseDBModel): bpmn_process_id: int = db.Column( ForeignKey(BpmnProcessModel.id), nullable=False # type: ignore ) + process_instance_id: int = db.Column( + ForeignKey("process_instance.id"), nullable=False + ) # find this by looking up the "workflow_name" and "task_spec" from the properties_json - # task_definition_id: int = db.Column( - # ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore - # ) + task_definition_id: int = db.Column( + ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore + ) + task_definition = relationship("TaskDefinitionModel") + state: str = db.Column(db.String(10), nullable=False) properties_json: dict = db.Column(db.JSON, nullable=False) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index dd2b405a..b50ece2f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -53,6 +53,7 @@ from SpiffWorkflow.task import TaskState from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from spiffworkflow_backend.exceptions.api_error import ApiError +from spiffworkflow_backend.models import task_definition from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process_definition import ( BpmnProcessDefinitionModel, @@ -457,6 +458,15 @@ class ProcessInstanceProcessor: self.process_model_service = ProcessModelService() bpmn_process_spec = None self.full_bpmn_process_dict = {} + + # this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id + # in the database. This is to cut down on database queries while adding new tasks to the database. + # Structure: + # { "bpmn_process_definition_identifier": { "task_identifier": bpmn_process_id } } + # To use from a spiff_task: + # [spiff_task.workflow.spec.name][spiff_task.task_spec.name] + self.bpmn_definition_identifiers_to_bpmn_process_id_mappings = {} + subprocesses: Optional[IdToBpmnProcessSpecMapping] = None if process_instance_model.bpmn_process_definition_id is None: ( @@ -472,7 +482,7 @@ class ProcessInstanceProcessor: ) try: - (self.bpmn_process_instance, self.full_bpmn_process_dict) = ( + (self.bpmn_process_instance, self.full_bpmn_process_dict, self.bpmn_definition_identifiers_to_bpmn_process_id_mappings) = ( self.__get_bpmn_process_instance( process_instance_model, bpmn_process_spec, @@ -537,9 +547,20 @@ class ProcessInstanceProcessor: self.bpmn_process_instance ) + @classmethod + def _update_bpmn_definition_mappings( + cls, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict, bpmn_process_definition_identifier: str, task_definition: TaskDefinitionModel + ) -> None: + # import pdb; pdb.set_trace() + # if bpmn_process_definition_identifier == 'test_process_to_call' and task_definition.bpmn_identifier == "Root": + # import pdb; pdb.set_trace() + if bpmn_process_definition_identifier not in bpmn_definition_identifiers_to_bpmn_process_id_mappings: + bpmn_definition_identifiers_to_bpmn_process_id_mappings[bpmn_process_definition_identifier] = {} + bpmn_definition_identifiers_to_bpmn_process_id_mappings[bpmn_process_definition_identifier][task_definition.bpmn_identifier] = task_definition + @classmethod def _get_definition_dict_for_bpmn_process_definition( - cls, bpmn_process_definition: BpmnProcessDefinitionModel + cls, bpmn_process_definition: BpmnProcessDefinitionModel, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict ) -> dict: task_definitions = TaskDefinitionModel.query.filter_by( bpmn_process_definition_id=bpmn_process_definition.id @@ -550,6 +571,7 @@ class ProcessInstanceProcessor: bpmn_process_definition_dict["task_specs"][ task_definition.bpmn_identifier ] = task_definition.properties_json + cls._update_bpmn_definition_mappings(bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_process_definition.bpmn_identifier, task_definition) return bpmn_process_definition_dict @classmethod @@ -557,6 +579,7 @@ class ProcessInstanceProcessor: cls, bpmn_process_definition: BpmnProcessDefinitionModel, spiff_bpmn_process_dict: dict, + bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict, ) -> None: # find all child subprocesses of a process bpmn_process_subprocess_definitions = ( @@ -595,6 +618,7 @@ class ProcessInstanceProcessor: task_definition.bpmn_process_definition_id ] ) + cls._update_bpmn_definition_mappings(bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_subprocess_definition_bpmn_identifier, task_definition) spiff_bpmn_process_dict["subprocess_specs"][ bpmn_subprocess_definition_bpmn_identifier ]["task_specs"][ @@ -643,7 +667,7 @@ class ProcessInstanceProcessor: @classmethod def _get_full_bpmn_process_dict( - cls, process_instance_model: ProcessInstanceModel + cls, process_instance_model: ProcessInstanceModel, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict ) -> dict: if process_instance_model.bpmn_process_definition_id is None: return {} @@ -658,11 +682,11 @@ class ProcessInstanceProcessor: if bpmn_process_definition is not None: spiff_bpmn_process_dict["spec"] = ( cls._get_definition_dict_for_bpmn_process_definition( - bpmn_process_definition + bpmn_process_definition, bpmn_definition_identifiers_to_bpmn_process_id_mappings ) ) cls._set_definition_dict_for_bpmn_subprocess_definitions( - bpmn_process_definition, spiff_bpmn_process_dict + bpmn_process_definition, spiff_bpmn_process_dict, bpmn_definition_identifiers_to_bpmn_process_id_mappings ) bpmn_process = process_instance_model.bpmn_process @@ -729,8 +753,10 @@ class ProcessInstanceProcessor: spec: Optional[BpmnProcessSpec] = None, validate_only: bool = False, subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, - ) -> BpmnWorkflow: + ) -> Tuple[BpmnWorkflow, dict, dict]: full_bpmn_process_dict = {} + bpmn_definition_identifiers_to_bpmn_process_id_mappings = {} + print("GET BPMN PROCESS INSTANCE") if process_instance_model.bpmn_process_definition_id is not None: # turn off logging to avoid duplicated spiff logs spiff_logger = logging.getLogger("spiff") @@ -740,9 +766,10 @@ class ProcessInstanceProcessor: try: full_bpmn_process_dict = ( ProcessInstanceProcessor._get_full_bpmn_process_dict( - process_instance_model + process_instance_model, bpmn_definition_identifiers_to_bpmn_process_id_mappings ) ) + print("WE GOT FULL BPMN PROCESS DICT") bpmn_process_instance = ( ProcessInstanceProcessor._serializer.workflow_from_dict( full_bpmn_process_dict @@ -755,15 +782,17 @@ class ProcessInstanceProcessor: ProcessInstanceProcessor.set_script_engine(bpmn_process_instance) else: + print("WE NO HAVE FULL BPMN YET") bpmn_process_instance = ( ProcessInstanceProcessor.get_bpmn_process_instance_from_workflow_spec( spec, subprocesses ) ) + # import pdb; pdb.set_trace() bpmn_process_instance.data[ ProcessInstanceProcessor.VALIDATION_PROCESS_KEY ] = validate_only - return (bpmn_process_instance, full_bpmn_process_dict) + return (bpmn_process_instance, full_bpmn_process_dict, bpmn_definition_identifiers_to_bpmn_process_id_mappings) def slam_in_data(self, data: dict) -> None: """Slam_in_data.""" @@ -1025,6 +1054,7 @@ class ProcessInstanceProcessor: self, process_bpmn_properties: dict, bpmn_process_definition_parent: Optional[BpmnProcessDefinitionModel] = None, + store_bpmn_definition_mappings: bool = False, ) -> BpmnProcessDefinitionModel: process_bpmn_identifier = process_bpmn_properties["name"] new_hash_digest = sha256( @@ -1033,7 +1063,16 @@ class ProcessInstanceProcessor: bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = ( BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first() ) + print(f"process_bpmn_properties: {process_bpmn_properties}") + # import pdb; pdb.set_trace() + # if process_bpmn_identifier == "test_process_to_call": + # import pdb; pdb.set_trace() + # print("HEY22") + + print(f"self.process_instance_model.id: {self.process_instance_model.id}") if bpmn_process_definition is None: + # import pdb; pdb.set_trace() + print("NO DEFINITION") task_specs = process_bpmn_properties.pop("task_specs") bpmn_process_definition = BpmnProcessDefinitionModel( hash=new_hash_digest, @@ -1050,6 +1089,14 @@ class ProcessInstanceProcessor: typename=task_bpmn_properties["typename"], ) db.session.add(task_definition) + if store_bpmn_definition_mappings: + self._update_bpmn_definition_mappings(self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, process_bpmn_identifier, task_definition) + elif store_bpmn_definition_mappings: + # this should only ever happen when new process instances use a pre-existing bpmn process definitions + # otherwise this should get populated on processor initialization + task_definitions = TaskDefinitionModel.query.filter_by(bpmn_process_definition_id=bpmn_process_definition.id).all() + for task_definition in task_definitions: + self._update_bpmn_definition_mappings(self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, process_bpmn_identifier, task_definition) if bpmn_process_definition_parent is not None: bpmn_process_definition_relationship = ( @@ -1067,13 +1114,17 @@ class ProcessInstanceProcessor: return bpmn_process_definition def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None: + # store only if mappings is currently empty. this also would mean this is a new instance that has never saved before + print("WE STORE BPM PROCESS DEF") + store_bpmn_definition_mappings = not self.bpmn_definition_identifiers_to_bpmn_process_id_mappings bpmn_process_definition_parent = self._store_bpmn_process_definition( - bpmn_spec_dict["spec"] + bpmn_spec_dict["spec"], store_bpmn_definition_mappings=store_bpmn_definition_mappings ) for process_bpmn_properties in bpmn_spec_dict["subprocess_specs"].values(): self._store_bpmn_process_definition( - process_bpmn_properties, bpmn_process_definition_parent + process_bpmn_properties, bpmn_process_definition_parent, store_bpmn_definition_mappings=store_bpmn_definition_mappings ) + # import pdb; pdb.set_trace() self.process_instance_model.bpmn_process_definition = ( bpmn_process_definition_parent ) @@ -1083,7 +1134,8 @@ class ProcessInstanceProcessor: Expects the save method to commit it. """ - bpmn_dict = json.loads(self.serialize()) + print("WE SAVE THINGS") + bpmn_dict = self.serialize() bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") process_instance_data_dict = {} bpmn_spec_dict = {} @@ -1096,11 +1148,14 @@ class ProcessInstanceProcessor: # FIXME: always save new hash until we get updated Spiff without loopresettask # if self.process_instance_model.bpmn_process_definition_id is None: self._add_bpmn_process_definitions(bpmn_spec_dict) + # import pdb; pdb.set_trace() + print("WE NOW STORE BPMN PROCESS STUFFS") + print(f"bpmn_definition_identifiers_to_bpmn_process_id_mappings: {self.bpmn_definition_identifiers_to_bpmn_process_id_mappings}") subprocesses = process_instance_data_dict.pop("subprocesses") bpmn_process_parent, new_task_models, new_json_data_dicts = ( TaskService.add_bpmn_process( - process_instance_data_dict, self.process_instance_model + process_instance_data_dict, self.process_instance_model, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, spiff_workflow=self.bpmn_process_instance ) ) for subprocess_task_id, subprocess_properties in subprocesses.items(): @@ -1113,6 +1168,8 @@ class ProcessInstanceProcessor: self.process_instance_model, bpmn_process_parent, bpmn_process_guid=subprocess_task_id, + bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, + spiff_workflow=self.bpmn_process_instance ) new_task_models.update(subprocess_new_task_models) new_json_data_dicts.update(subprocess_new_json_data_models) @@ -1122,6 +1179,7 @@ class ProcessInstanceProcessor: def save(self) -> None: """Saves the current state of this processor to the database.""" + print("WE IN SAVE") self._add_bpmn_json_records() self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION @@ -1631,6 +1689,7 @@ class ProcessInstanceProcessor: secondary_engine_step_delegate=step_delegate, serializer=self._serializer, process_instance=self.process_instance_model, + bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, ) if execution_strategy_name is None: @@ -1722,11 +1781,12 @@ class ProcessInstanceProcessor: ) ) - def serialize(self) -> str: + def serialize(self) -> dict: """Serialize.""" self.check_task_data_size() self.preserve_script_engine_state() - return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore + # return self._serializer.workflow_to_dict(self.bpmn_process_instance) # type: ignore + return json.loads(self._serializer.serialize_json(self.bpmn_process_instance)) # type: ignore def next_user_tasks(self) -> list[SpiffTask]: """Next_user_tasks.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py index dbd0a912..f782241f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/task_service.py @@ -11,6 +11,7 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import TaskStateNames from sqlalchemy.dialects.mysql import insert as mysql_insert from sqlalchemy.dialects.postgresql import insert as postgres_insert +from uuid import UUID from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.db import db @@ -44,24 +45,13 @@ class TaskService: ) db.session.execute(on_duplicate_key_stmt) - @classmethod - def _update_task_data_on_task_model( - cls, task_model: TaskModel, task_data_dict: dict - ) -> Optional[JsonDataDict]: - task_data_json = json.dumps(task_data_dict, sort_keys=True) - task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest() - json_data_dict: Optional[JsonDataDict] = None - if task_model.json_data_hash != task_data_hash: - json_data_dict = {"hash": task_data_hash, "data": task_data_dict} - task_model.json_data_hash = task_data_hash - return json_data_dict - @classmethod def update_task_model( cls, task_model: TaskModel, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer, + bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None, ) -> Optional[JsonDataDict]: """Updates properties_json and data on given task_model. @@ -83,6 +73,7 @@ class TaskService: spiff_task: SpiffTask, process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, + bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None, ) -> Tuple[ Optional[BpmnProcessModel], TaskModel, @@ -98,12 +89,13 @@ class TaskService: new_json_data_dicts: dict[str, JsonDataDict] = {} if task_model is None: bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process( - spiff_task, process_instance, serializer + spiff_task, process_instance, serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings ) task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() + task_definition = bpmn_definition_identifiers_to_bpmn_process_id_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name] if task_model is None: task_model = TaskModel( - guid=spiff_task_guid, bpmn_process_id=bpmn_process.id + guid=spiff_task_guid, bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id ) return (bpmn_process, task_model, new_task_models, new_json_data_dicts) @@ -130,6 +122,7 @@ class TaskService: spiff_task: SpiffTask, process_instance: ProcessInstanceModel, serializer: BpmnWorkflowSerializer, + bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None, ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: subprocess_guid, subprocess = cls.task_subprocess(spiff_task) bpmn_process: Optional[BpmnProcessModel] = None @@ -140,12 +133,15 @@ class TaskService: # This is the top level workflow, which has no guid # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None if process_instance.bpmn_process_id is None: + spiff_workflow = spiff_task.workflow._get_outermost_workflow() bpmn_process, new_task_models, new_json_data_dicts = ( cls.add_bpmn_process( serializer.workflow_to_dict( - spiff_task.workflow._get_outermost_workflow() + spiff_workflow ), process_instance, + bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings, + spiff_workflow=spiff_workflow, ) ) else: @@ -153,12 +149,16 @@ class TaskService: guid=subprocess_guid ).first() if bpmn_process is None: + spiff_workflow = spiff_task.workflow bpmn_process, new_task_models, new_json_data_dicts = ( cls.add_bpmn_process( serializer.workflow_to_dict(subprocess), process_instance, process_instance.bpmn_process, subprocess_guid, + bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings, + spiff_workflow=spiff_workflow, + ) ) return (bpmn_process, new_task_models, new_json_data_dicts) @@ -170,6 +170,8 @@ class TaskService: process_instance: ProcessInstanceModel, bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None, + bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None, + spiff_workflow: Optional[BpmnWorkflow] = None, ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: """This creates and adds a bpmn_process to the Db session. @@ -183,6 +185,7 @@ class TaskService: new_json_data_dicts: dict[str, JsonDataDict] = {} bpmn_process = None + print("ADD BPMN PROCESS") if bpmn_process_parent is not None: bpmn_process = BpmnProcessModel.query.filter_by( parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid @@ -194,6 +197,9 @@ class TaskService: if bpmn_process is None: bpmn_process_is_new = True bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) + for task_id, task_properties in tasks.items(): + if task_properties['task_spec'] == 'Start': + bpmn_process_dict['root'] = task_id bpmn_process.properties_json = bpmn_process_dict @@ -202,6 +208,7 @@ class TaskService: bpmn_process_data_json.encode("utf8") ).hexdigest() if bpmn_process.json_data_hash != bpmn_process_data_hash: + # print(f"bpmn_process_data_dict: {bpmn_process_data_dict}") new_json_data_dicts[bpmn_process_data_hash] = { "hash": bpmn_process_data_hash, "data": bpmn_process_data_dict, @@ -219,6 +226,16 @@ class TaskService: if bpmn_process_is_new: for task_id, task_properties in tasks.items(): + if task_properties['task_spec'] == 'Root': + continue + if task_properties['task_spec'] == 'Start': + task_properties['parent'] = None + process_dict = bpmn_process.properties_json + process_dict['root'] = task_id + # print(f"process_dict: {process_dict}") + bpmn_process.properties_json = process_dict + # print(f"bpmn_process.properties_json: {bpmn_process.properties_json}") + db.session.add(bpmn_process) task_data_dict = task_properties.pop("data") state_int = task_properties["state"] @@ -231,8 +248,15 @@ class TaskService: # .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() # if task_definition is None: # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) + spiff_task = spiff_workflow.get_task(UUID(task_id)) + try: + task_definition = bpmn_definition_identifiers_to_bpmn_process_id_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name] + except Exception as ex: + import pdb; pdb.set_trace() + print("HEY") + raise ex task_model = TaskModel( - guid=task_id, bpmn_process_id=bpmn_process.id + guid=task_id, bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id ) task_model.state = TaskStateNames[state_int] task_model.properties_json = task_properties @@ -245,3 +269,15 @@ class TaskService: new_json_data_dicts[json_data_dict["hash"]] = json_data_dict return (bpmn_process, new_task_models, new_json_data_dicts) + + @classmethod + def _update_task_data_on_task_model( + cls, task_model: TaskModel, task_data_dict: dict + ) -> Optional[JsonDataDict]: + task_data_json = json.dumps(task_data_dict, sort_keys=True) + task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest() + json_data_dict: Optional[JsonDataDict] = None + if task_model.json_data_hash != task_data_hash: + json_data_dict = {"hash": task_data_hash, "data": task_data_dict} + task_model.json_data_hash = task_data_hash + return json_data_dict diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py index 386f2054..9a4a6c7d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/workflow_execution_service.py @@ -58,9 +58,11 @@ class TaskModelSavingDelegate(EngineStepDelegate): serializer: BpmnWorkflowSerializer, process_instance: ProcessInstanceModel, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, + bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None, ) -> None: self.secondary_engine_step_delegate = secondary_engine_step_delegate self.process_instance = process_instance + self.bpmn_definition_identifiers_to_bpmn_process_id_mappings = bpmn_definition_identifiers_to_bpmn_process_id_mappings self.current_task_model: Optional[TaskModel] = None self.task_models: dict[str, TaskModel] = {} @@ -78,7 +80,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): if self.should_update_task_model(): _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( - spiff_task, self.process_instance, self.serializer + spiff_task, self.process_instance, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings ) ) self.current_task_model = task_model @@ -92,7 +94,7 @@ class TaskModelSavingDelegate(EngineStepDelegate): if self.current_task_model and self.should_update_task_model(): self.current_task_model.end_in_seconds = time.time() json_data_dict = TaskService.update_task_model( - self.current_task_model, spiff_task, self.serializer + self.current_task_model, spiff_task, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings ) if json_data_dict is not None: self.json_data_dicts[json_data_dict["hash"]] = json_data_dict @@ -121,13 +123,13 @@ class TaskModelSavingDelegate(EngineStepDelegate): ): _bpmn_process, task_model, new_task_models, new_json_data_dicts = ( TaskService.find_or_create_task_model_from_spiff_task( - waiting_spiff_task, self.process_instance, self.serializer + waiting_spiff_task, self.process_instance, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings ) ) self.task_models.update(new_task_models) self.json_data_dicts.update(new_json_data_dicts) json_data_dict = TaskService.update_task_model( - task_model, waiting_spiff_task, self.serializer + task_model, waiting_spiff_task, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings ) self.task_models[task_model.guid] = task_model if json_data_dict is not None: diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py similarity index 98% rename from spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling.py rename to spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py index 9d481788..4566625a 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_error_handling_service.py @@ -33,6 +33,7 @@ class TestErrorHandlingService(BaseTest): process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( process_model.id, user ) + print(f"process_instance.id: {process_instance.id}") pip = ProcessInstanceProcessor(process_instance) with pytest.raises(ApiError) as e: pip.do_engine_steps(save=True) diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py index 3452dcf1..6bd7a305 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_process_instance_processor.py @@ -378,6 +378,24 @@ class TestProcessInstanceProcessor(BaseTest): assert len(all_spiff_tasks) > 1 for spiff_task in all_spiff_tasks: assert spiff_task.state == TaskState.COMPLETED + if spiff_task.task_spec.name == 'test_process_to_call_script': + task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() + assert task.task_definition_id is not None + task_definition = task.task_definition + assert task_definition.bpmn_identifier == 'test_process_to_call_script' + assert task_definition.bpmn_process_definition.bpmn_identifier == 'test_process_to_call' + elif spiff_task.task_spec.name == 'top_level_subprocess_script': + task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() + assert task.task_definition_id is not None + task_definition = task.task_definition + assert task_definition.bpmn_identifier == 'top_level_subprocess_script' + assert task_definition.bpmn_process_definition.bpmn_identifier == 'top_level_subprocess' + if spiff_task.task_spec.name == 'top_level_script': + task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first() + assert task.task_definition_id is not None + task_definition = task.task_definition + assert task_definition.bpmn_identifier == 'top_level_script' + assert task_definition.bpmn_process_definition.bpmn_identifier == 'top_level_process' # FIXME: Checking task data cannot work with the feature/remove-loop-reset branch # of SiffWorkflow. This is because it saves script data to the python_env and NOT # to task.data. We may need to either create a new column on TaskModel to put the python_env