From 2cdacfa03bd99b31a295579f06f0c7ec7ca9e83c Mon Sep 17 00:00:00 2001 From: jasquat Date: Fri, 3 Mar 2023 16:51:24 -0500 Subject: [PATCH] some stuff is passing but still needs the process_instance_data w/ burnettk --- spiffworkflow-backend/conftest.py | 3 + .../migrations/versions/61cd3e8462f5_.py | 30 ++++ .../migrations/versions/941f7b76d278_.py | 30 ++++ .../migrations/versions/b0b82fab4cf9_.py | 30 ++++ .../models/bpmn_process.py | 4 +- .../models/bpmn_process_definition.py | 5 +- .../models/process_instance.py | 8 +- .../src/spiffworkflow_backend/models/task.py | 6 +- .../services/process_instance_processor.py | 135 ++++++++++++++---- .../unit/test_message_service.py | 1 - 10 files changed, 216 insertions(+), 36 deletions(-) create mode 100644 spiffworkflow-backend/migrations/versions/61cd3e8462f5_.py create mode 100644 spiffworkflow-backend/migrations/versions/941f7b76d278_.py create mode 100644 spiffworkflow-backend/migrations/versions/b0b82fab4cf9_.py diff --git a/spiffworkflow-backend/conftest.py b/spiffworkflow-backend/conftest.py index 9c6c242e4..3e4804a8b 100644 --- a/spiffworkflow-backend/conftest.py +++ b/spiffworkflow-backend/conftest.py @@ -5,6 +5,7 @@ import shutil import pytest from flask.app import Flask from flask.testing import FlaskClient +from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from tests.spiffworkflow_backend.helpers.base_test import BaseTest from spiffworkflow_backend.models.db import db @@ -46,6 +47,8 @@ def app() -> Flask: def with_db_and_bpmn_file_cleanup() -> None: """Do it cleanly!""" meta = db.metadata + db.session.execute(db.update(BpmnProcessModel, values={"parent_process_id": None})) + for table in reversed(meta.sorted_tables): db.session.execute(table.delete()) db.session.commit() diff --git a/spiffworkflow-backend/migrations/versions/61cd3e8462f5_.py b/spiffworkflow-backend/migrations/versions/61cd3e8462f5_.py new file mode 100644 index 000000000..c6a5f000e --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/61cd3e8462f5_.py @@ -0,0 +1,30 @@ +"""empty message + +Revision ID: 61cd3e8462f5 +Revises: 941f7b76d278 +Create Date: 2023-03-03 16:22:12.449757 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = '61cd3e8462f5' +down_revision = '941f7b76d278' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint('task_ibfk_2', 'task', type_='foreignkey') + op.drop_column('task', 'task_definition_id') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('task', sa.Column('task_definition_id', mysql.INTEGER(), autoincrement=False, nullable=False)) + op.create_foreign_key('task_ibfk_2', 'task', 'task_definition', ['task_definition_id'], ['id']) + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/941f7b76d278_.py b/spiffworkflow-backend/migrations/versions/941f7b76d278_.py new file mode 100644 index 000000000..8705894a8 --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/941f7b76d278_.py @@ -0,0 +1,30 @@ +"""empty message + +Revision ID: 941f7b76d278 +Revises: b0b82fab4cf9 +Create Date: 2023-03-03 14:40:46.574985 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = '941f7b76d278' +down_revision = 'b0b82fab4cf9' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('bpmn_process', sa.Column('guid', sa.String(length=36), nullable=True)) + op.create_index(op.f('ix_bpmn_process_guid'), 'bpmn_process', ['guid'], unique=True) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_bpmn_process_guid'), table_name='bpmn_process') + op.drop_column('bpmn_process', 'guid') + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/b0b82fab4cf9_.py b/spiffworkflow-backend/migrations/versions/b0b82fab4cf9_.py new file mode 100644 index 000000000..61d199450 --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/b0b82fab4cf9_.py @@ -0,0 +1,30 @@ +"""empty message + +Revision ID: b0b82fab4cf9 +Revises: a63a61a21398 +Create Date: 2023-03-03 13:24:08.304492 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'b0b82fab4cf9' +down_revision = 'a63a61a21398' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('process_instance', sa.Column('bpmn_process_id', sa.Integer(), nullable=True)) + op.create_foreign_key(None, 'process_instance', 'bpmn_process', ['bpmn_process_id'], ['id']) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_constraint(None, 'process_instance', type_='foreignkey') + op.drop_column('process_instance', 'bpmn_process_id') + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py index 0d7e556d9..b5a3137e6 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process.py @@ -1,4 +1,5 @@ from __future__ import annotations +from typing import Optional from sqlalchemy import ForeignKey @@ -15,8 +16,9 @@ from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel class BpmnProcessModel(SpiffworkflowBaseDBModel): __tablename__ = "bpmn_process" id: int = db.Column(db.Integer, primary_key=True) + guid: Optional[str] = db.Column(db.String(36), nullable=True, unique=True, index=True) - parent_process_id: int = db.Column(ForeignKey("bpmn_process.id"), nullable=True) + parent_process_id: Optional[int] = db.Column(ForeignKey("bpmn_process.id"), nullable=True) properties_json: dict = db.Column(db.JSON, nullable=False) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process_definition.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process_definition.py index af655408e..d689a8f2f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process_definition.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/bpmn_process_definition.py @@ -15,11 +15,14 @@ class BpmnProcessDefinitionModel(SpiffworkflowBaseDBModel): id: int = db.Column(db.Integer, primary_key=True) # this is a sha256 hash of spec and serializer_version + # note that a call activity is its own row in this table, with its own hash, + # and therefore it only gets stored once per version, and can be reused + # by multiple calling processes. hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True) bpmn_identifier: str = db.Column(db.String(255), nullable=False, index=True) - properties_json: str = db.Column(db.JSON, nullable=False) + properties_json: dict = db.Column(db.JSON, nullable=False) # process or subprocess # FIXME: will probably ignore for now since we do not strictly need it diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index e0fb3f3b2..5b3733710 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -1,5 +1,7 @@ """Process_instance.""" from __future__ import annotations +from typing import Optional +from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from typing import Any from typing import cast @@ -77,10 +79,14 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): ) process_instance_data = relationship("ProcessInstanceDataModel", cascade="delete") - bpmn_process_definition_id: int = db.Column( + bpmn_process_definition_id: Optional[int] = db.Column( ForeignKey(BpmnProcessDefinitionModel.id), nullable=True # type: ignore ) bpmn_process_definition = relationship(BpmnProcessDefinitionModel) + bpmn_process_id: Optional[int] = db.Column( + ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore + ) + bpmn_process = relationship(BpmnProcessModel) spiff_serializer_version = db.Column(db.String(50), nullable=True) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py index 8038a14be..b1bba7158 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/task.py @@ -48,9 +48,9 @@ class TaskModel(SpiffworkflowBaseDBModel): ) # find this by looking up the "workflow_name" and "task_spec" from the properties_json - task_definition_id: int = db.Column( - ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore - ) + # task_definition_id: int = db.Column( + # ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore + # ) state: str = db.Column(db.String(10), nullable=False) properties_json: dict = db.Column(db.JSON, nullable=False) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 7ec181659..c2db1a463 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1,5 +1,7 @@ """Process_instance_processor.""" import _strptime # type: ignore +from SpiffWorkflow.task import TaskStateNames # type: ignore +from spiffworkflow_backend.models.task import TaskModel # noqa: F401 import decimal import json import logging @@ -55,6 +57,7 @@ from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from sqlalchemy import text from spiffworkflow_backend.exceptions.api_error import ApiError +from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process_definition import ( BpmnProcessDefinitionModel, ) @@ -67,6 +70,7 @@ from spiffworkflow_backend.models.file import FileType from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel +from spiffworkflow_backend.models.json_data import JsonDataModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import ( MessageInstanceCorrelationRuleModel, @@ -534,16 +538,27 @@ class ProcessInstanceProcessor: task_definitions = TaskDefinitionModel.query.filter_by( bpmn_process_definition_id=bpmn_process_definition.id ).all() - bpmn_process_definition_dict: dict = json.loads( - bpmn_process_definition.properties_json - ) + bpmn_process_definition_dict: dict = bpmn_process_definition.properties_json bpmn_process_definition_dict["task_specs"] = {} for task_definition in task_definitions: bpmn_process_definition_dict["task_specs"][ task_definition.bpmn_identifier - ] = json.loads(task_definition.properties_json) + ] = task_definition.properties_json return bpmn_process_definition_dict + @classmethod + def _get_bpmn_process_dict(cls, bpmn_process: BpmnProcessModel) -> dict: + json_data = JsonDataModel.query.filter_by(hash=bpmn_process.json_data_hash).first() + bpmn_process_dict = {'data': json_data.data, 'tasks': {}} + bpmn_process_dict.update(bpmn_process.properties_json) + tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all() + for task in tasks: + json_data = JsonDataModel.query.filter_by(hash=task.json_data_hash).first() + bpmn_process_dict['tasks'][task.guid] = task.properties_json + bpmn_process_dict['tasks'][task.guid]['data'] = json_data.data + + return bpmn_process_dict + @classmethod def _get_full_bpmn_json(cls, process_instance_model: ProcessInstanceModel) -> dict: if process_instance_model.serialized_bpmn_definition_id is None: @@ -552,16 +567,14 @@ class ProcessInstanceProcessor: # print(f"serialized_bpmn_definition.static_json: {serialized_bpmn_definition.static_json}") # loaded_json: dict = json.loads(serialized_bpmn_definition.static_json) # or "{}") - serialized_bpmn_definition: dict = {} - bpmn_process_definition = BpmnProcessDefinitionModel.query.filter_by( - id=process_instance_model.bpmn_process_definition_id - ).first() + serialized_bpmn_definition: dict = { + "serializer_version": process_instance_model.spiff_serializer_version, + "spec": {}, + "subprocess_specs": {}, + "subprocesses": {} + } + bpmn_process_definition = process_instance_model.bpmn_process_definition if bpmn_process_definition is not None: - serialized_bpmn_definition = { - "serializer_version": process_instance_model.spiff_serializer_version, - "spec": {}, - "subprocess_specs": {}, - } serialized_bpmn_definition["spec"] = ( cls._get_definition_dict_for_bpmn_process_definition( bpmn_process_definition @@ -580,11 +593,21 @@ class ProcessInstanceProcessor: serialized_bpmn_definition["subprocess_specs"][ bpmn_subprocess_definition.bpmn_identifier ] = spec - loaded_json: dict = serialized_bpmn_definition - process_instance_data = process_instance_model.process_instance_data - loaded_json.update(json.loads(process_instance_data.runtime_json)) - return loaded_json + bpmn_process = process_instance_model.bpmn_process + if bpmn_process is not None: + bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_process) + serialized_bpmn_definition.update(bpmn_process_dict) + + bpmn_subprocesses = BpmnProcessModel.query.filter_by(parent_process_id=bpmn_process.id).all() + for bpmn_subprocess in bpmn_subprocesses: + bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess) + serialized_bpmn_definition['subprocesses'][bpmn_subprocess.guid] = bpmn_process_dict + + # process_instance_data = process_instance_model.process_instance_data + # loaded_json.update(json.loads(process_instance_data.runtime_json)) + + return serialized_bpmn_definition def current_user(self) -> Any: """Current_user.""" @@ -979,7 +1002,7 @@ class ProcessInstanceProcessor: bpmn_process_definition = BpmnProcessDefinitionModel( hash=new_hash_digest, bpmn_identifier=process_bpmn_identifier, - properties_json=json.dumps(process_bpmn_properties), + properties_json=process_bpmn_properties, ) db.session.add(bpmn_process_definition) @@ -987,7 +1010,7 @@ class ProcessInstanceProcessor: task_definition = TaskDefinitionModel( bpmn_process_definition=bpmn_process_definition, bpmn_identifier=task_bpmn_identifier, - properties_json=json.dumps(task_bpmn_properties), + properties_json=task_bpmn_properties, typename=task_bpmn_properties["typename"], ) db.session.add(task_definition) @@ -1019,12 +1042,71 @@ class ProcessInstanceProcessor: bpmn_process_definition_parent ) + def _add_bpmn_process(self, bpmn_process_dict: dict, bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None) -> BpmnProcessModel: + tasks = bpmn_process_dict.pop("tasks") + bpmn_process_data = bpmn_process_dict.pop("data") + + bpmn_process = None + if bpmn_process_parent is not None: + bpmn_process = BpmnProcessModel.query.filter_by(parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid).first() + elif self.process_instance_model.bpmn_process_id is not None: + bpmn_process = self.process_instance_model.bpmn_process + + if bpmn_process is None: + bpmn_process = BpmnProcessModel() + + bpmn_process.properties_json = bpmn_process_dict + + bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode("utf8") + bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest() + if bpmn_process.json_data_hash != bpmn_process_data_hash: + json_data = db.session.query(JsonDataModel.id).filter_by(hash=bpmn_process_data_hash).first() + if json_data is None: + json_data = JsonDataModel(hash=bpmn_process_data_hash, data=bpmn_process_data) + db.session.add(json_data) + bpmn_process.json_data_hash = bpmn_process_data_hash + + if bpmn_process_parent is None: + self.process_instance_model.bpmn_process = bpmn_process + elif bpmn_process.parent_process_id is None: + bpmn_process.parent_process_id = bpmn_process_parent.id + db.session.add(bpmn_process) + + for task_id, task_properties in tasks.items(): + task_data_dict = task_properties.pop('data') + state_int = task_properties['state'] + + task = TaskModel.query.filter_by(guid=task_id).first() + if task is None: + # bpmn_process_identifier = task_properties['workflow_name'] + # bpmn_identifier = task_properties['task_spec'] + # + # task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier).join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() + # if task_definition is None: + # subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid) + task = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id) + task.state = TaskStateNames[state_int] + task.properties_json = task_properties + + task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8") + task_data_hash = sha256(task_data_json).hexdigest() + if task.json_data_hash != task_data_hash: + json_data = db.session.query(JsonDataModel.id).filter_by(hash=task_data_hash).first() + if json_data is None: + json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict) + db.session.add(json_data) + task.json_data_hash = task_data_hash + db.session.add(task) + + return bpmn_process + def _add_bpmn_json_records_new(self) -> None: """Adds serialized_bpmn_definition and process_instance_data records to the db session. Expects the save method to commit it. """ bpmn_dict = json.loads(self.serialize()) + # with open('tmp2.json', 'w') as f: f.write(json.dumps(bpmn_dict) bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") process_instance_data_dict = {} bpmn_spec_dict = {} @@ -1038,19 +1120,14 @@ class ProcessInstanceProcessor: # if self.process_instance_model.bpmn_process_definition_id is None: self._add_bpmn_process_definitions(bpmn_spec_dict) - # process_instance_data = None - # if self.process_instance_model.process_instance_data_id is None: - # process_instance_data = ProcessInstanceDataModel() - # else: - # process_instance_data = self.process_instance_model.process_instance_data - # - # process_instance_data.runtime_json = json.dumps(process_instance_data_dict) - # db.session.add(process_instance_data) - # self.process_instance_model.process_instance_data = process_instance_data + subprocesses = process_instance_data_dict.pop("subprocesses") + bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict) + for _subprocess_task_id, subprocess_properties in subprocesses.items(): + self._add_bpmn_process(subprocess_properties, bpmn_process_parent) def save(self) -> None: """Saves the current state of this processor to the database.""" - self._add_bpmn_json_records() + # self._add_bpmn_json_records() self._add_bpmn_json_records_new() self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py index e30d9dcdb..c0898a042 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py @@ -71,7 +71,6 @@ class TestMessageService(BaseTest): .all() ) assert len(waiting_messages) == 0 - # The process has completed assert self.process_instance.status == "complete"