some stuff is passing but still needs the process_instance_data w/ burnettk

This commit is contained in:
jasquat 2023-03-03 16:51:24 -05:00
parent 43513de4f5
commit 2cdacfa03b
10 changed files with 216 additions and 36 deletions

View File

@ -5,6 +5,7 @@ import shutil
import pytest
from flask.app import Flask
from flask.testing import FlaskClient
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.db import db
@ -46,6 +47,8 @@ def app() -> Flask:
def with_db_and_bpmn_file_cleanup() -> None:
"""Do it cleanly!"""
meta = db.metadata
db.session.execute(db.update(BpmnProcessModel, values={"parent_process_id": None}))
for table in reversed(meta.sorted_tables):
db.session.execute(table.delete())
db.session.commit()

View File

@ -0,0 +1,30 @@
"""empty message
Revision ID: 61cd3e8462f5
Revises: 941f7b76d278
Create Date: 2023-03-03 16:22:12.449757
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '61cd3e8462f5'
down_revision = '941f7b76d278'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint('task_ibfk_2', 'task', type_='foreignkey')
op.drop_column('task', 'task_definition_id')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('task', sa.Column('task_definition_id', mysql.INTEGER(), autoincrement=False, nullable=False))
op.create_foreign_key('task_ibfk_2', 'task', 'task_definition', ['task_definition_id'], ['id'])
# ### end Alembic commands ###

View File

@ -0,0 +1,30 @@
"""empty message
Revision ID: 941f7b76d278
Revises: b0b82fab4cf9
Create Date: 2023-03-03 14:40:46.574985
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '941f7b76d278'
down_revision = 'b0b82fab4cf9'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('bpmn_process', sa.Column('guid', sa.String(length=36), nullable=True))
op.create_index(op.f('ix_bpmn_process_guid'), 'bpmn_process', ['guid'], unique=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_bpmn_process_guid'), table_name='bpmn_process')
op.drop_column('bpmn_process', 'guid')
# ### end Alembic commands ###

View File

@ -0,0 +1,30 @@
"""empty message
Revision ID: b0b82fab4cf9
Revises: a63a61a21398
Create Date: 2023-03-03 13:24:08.304492
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'b0b82fab4cf9'
down_revision = 'a63a61a21398'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('process_instance', sa.Column('bpmn_process_id', sa.Integer(), nullable=True))
op.create_foreign_key(None, 'process_instance', 'bpmn_process', ['bpmn_process_id'], ['id'])
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(None, 'process_instance', type_='foreignkey')
op.drop_column('process_instance', 'bpmn_process_id')
# ### end Alembic commands ###

View File

@ -1,4 +1,5 @@
from __future__ import annotations
from typing import Optional
from sqlalchemy import ForeignKey
@ -15,8 +16,9 @@ from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
class BpmnProcessModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process"
id: int = db.Column(db.Integer, primary_key=True)
guid: Optional[str] = db.Column(db.String(36), nullable=True, unique=True, index=True)
parent_process_id: int = db.Column(ForeignKey("bpmn_process.id"), nullable=True)
parent_process_id: Optional[int] = db.Column(ForeignKey("bpmn_process.id"), nullable=True)
properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)

View File

@ -15,11 +15,14 @@ class BpmnProcessDefinitionModel(SpiffworkflowBaseDBModel):
id: int = db.Column(db.Integer, primary_key=True)
# this is a sha256 hash of spec and serializer_version
# note that a call activity is its own row in this table, with its own hash,
# and therefore it only gets stored once per version, and can be reused
# by multiple calling processes.
hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True)
bpmn_identifier: str = db.Column(db.String(255), nullable=False, index=True)
properties_json: str = db.Column(db.JSON, nullable=False)
properties_json: dict = db.Column(db.JSON, nullable=False)
# process or subprocess
# FIXME: will probably ignore for now since we do not strictly need it

View File

@ -1,5 +1,7 @@
"""Process_instance."""
from __future__ import annotations
from typing import Optional
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from typing import Any
from typing import cast
@ -77,10 +79,14 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
)
process_instance_data = relationship("ProcessInstanceDataModel", cascade="delete")
bpmn_process_definition_id: int = db.Column(
bpmn_process_definition_id: Optional[int] = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=True # type: ignore
)
bpmn_process_definition = relationship(BpmnProcessDefinitionModel)
bpmn_process_id: Optional[int] = db.Column(
ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore
)
bpmn_process = relationship(BpmnProcessModel)
spiff_serializer_version = db.Column(db.String(50), nullable=True)

View File

@ -48,9 +48,9 @@ class TaskModel(SpiffworkflowBaseDBModel):
)
# find this by looking up the "workflow_name" and "task_spec" from the properties_json
task_definition_id: int = db.Column(
ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore
)
# task_definition_id: int = db.Column(
# ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore
# )
state: str = db.Column(db.String(10), nullable=False)
properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)

View File

@ -1,5 +1,7 @@
"""Process_instance_processor."""
import _strptime # type: ignore
from SpiffWorkflow.task import TaskStateNames # type: ignore
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
import decimal
import json
import logging
@ -55,6 +57,7 @@ from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import text
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import (
BpmnProcessDefinitionModel,
)
@ -67,6 +70,7 @@ from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.json_data import JsonDataModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel,
@ -534,16 +538,27 @@ class ProcessInstanceProcessor:
task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id
).all()
bpmn_process_definition_dict: dict = json.loads(
bpmn_process_definition.properties_json
)
bpmn_process_definition_dict: dict = bpmn_process_definition.properties_json
bpmn_process_definition_dict["task_specs"] = {}
for task_definition in task_definitions:
bpmn_process_definition_dict["task_specs"][
task_definition.bpmn_identifier
] = json.loads(task_definition.properties_json)
] = task_definition.properties_json
return bpmn_process_definition_dict
@classmethod
def _get_bpmn_process_dict(cls, bpmn_process: BpmnProcessModel) -> dict:
json_data = JsonDataModel.query.filter_by(hash=bpmn_process.json_data_hash).first()
bpmn_process_dict = {'data': json_data.data, 'tasks': {}}
bpmn_process_dict.update(bpmn_process.properties_json)
tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all()
for task in tasks:
json_data = JsonDataModel.query.filter_by(hash=task.json_data_hash).first()
bpmn_process_dict['tasks'][task.guid] = task.properties_json
bpmn_process_dict['tasks'][task.guid]['data'] = json_data.data
return bpmn_process_dict
@classmethod
def _get_full_bpmn_json(cls, process_instance_model: ProcessInstanceModel) -> dict:
if process_instance_model.serialized_bpmn_definition_id is None:
@ -552,16 +567,14 @@ class ProcessInstanceProcessor:
# print(f"serialized_bpmn_definition.static_json: {serialized_bpmn_definition.static_json}")
# loaded_json: dict = json.loads(serialized_bpmn_definition.static_json) # or "{}")
serialized_bpmn_definition: dict = {}
bpmn_process_definition = BpmnProcessDefinitionModel.query.filter_by(
id=process_instance_model.bpmn_process_definition_id
).first()
serialized_bpmn_definition: dict = {
"serializer_version": process_instance_model.spiff_serializer_version,
"spec": {},
"subprocess_specs": {},
"subprocesses": {}
}
bpmn_process_definition = process_instance_model.bpmn_process_definition
if bpmn_process_definition is not None:
serialized_bpmn_definition = {
"serializer_version": process_instance_model.spiff_serializer_version,
"spec": {},
"subprocess_specs": {},
}
serialized_bpmn_definition["spec"] = (
cls._get_definition_dict_for_bpmn_process_definition(
bpmn_process_definition
@ -580,11 +593,21 @@ class ProcessInstanceProcessor:
serialized_bpmn_definition["subprocess_specs"][
bpmn_subprocess_definition.bpmn_identifier
] = spec
loaded_json: dict = serialized_bpmn_definition
process_instance_data = process_instance_model.process_instance_data
loaded_json.update(json.loads(process_instance_data.runtime_json))
return loaded_json
bpmn_process = process_instance_model.bpmn_process
if bpmn_process is not None:
bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_process)
serialized_bpmn_definition.update(bpmn_process_dict)
bpmn_subprocesses = BpmnProcessModel.query.filter_by(parent_process_id=bpmn_process.id).all()
for bpmn_subprocess in bpmn_subprocesses:
bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess)
serialized_bpmn_definition['subprocesses'][bpmn_subprocess.guid] = bpmn_process_dict
# process_instance_data = process_instance_model.process_instance_data
# loaded_json.update(json.loads(process_instance_data.runtime_json))
return serialized_bpmn_definition
def current_user(self) -> Any:
"""Current_user."""
@ -979,7 +1002,7 @@ class ProcessInstanceProcessor:
bpmn_process_definition = BpmnProcessDefinitionModel(
hash=new_hash_digest,
bpmn_identifier=process_bpmn_identifier,
properties_json=json.dumps(process_bpmn_properties),
properties_json=process_bpmn_properties,
)
db.session.add(bpmn_process_definition)
@ -987,7 +1010,7 @@ class ProcessInstanceProcessor:
task_definition = TaskDefinitionModel(
bpmn_process_definition=bpmn_process_definition,
bpmn_identifier=task_bpmn_identifier,
properties_json=json.dumps(task_bpmn_properties),
properties_json=task_bpmn_properties,
typename=task_bpmn_properties["typename"],
)
db.session.add(task_definition)
@ -1019,12 +1042,71 @@ class ProcessInstanceProcessor:
bpmn_process_definition_parent
)
def _add_bpmn_process(self, bpmn_process_dict: dict, bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_guid: Optional[str] = None) -> BpmnProcessModel:
tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data = bpmn_process_dict.pop("data")
bpmn_process = None
if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by(parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid).first()
elif self.process_instance_model.bpmn_process_id is not None:
bpmn_process = self.process_instance_model.bpmn_process
if bpmn_process is None:
bpmn_process = BpmnProcessModel()
bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode("utf8")
bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = db.session.query(JsonDataModel.id).filter_by(hash=bpmn_process_data_hash).first()
if json_data is None:
json_data = JsonDataModel(hash=bpmn_process_data_hash, data=bpmn_process_data)
db.session.add(json_data)
bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None:
self.process_instance_model.bpmn_process = bpmn_process
elif bpmn_process.parent_process_id is None:
bpmn_process.parent_process_id = bpmn_process_parent.id
db.session.add(bpmn_process)
for task_id, task_properties in tasks.items():
task_data_dict = task_properties.pop('data')
state_int = task_properties['state']
task = TaskModel.query.filter_by(guid=task_id).first()
if task is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier).join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id)
task.state = TaskStateNames[state_int]
task.properties_json = task_properties
task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8")
task_data_hash = sha256(task_data_json).hexdigest()
if task.json_data_hash != task_data_hash:
json_data = db.session.query(JsonDataModel.id).filter_by(hash=task_data_hash).first()
if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
db.session.add(json_data)
task.json_data_hash = task_data_hash
db.session.add(task)
return bpmn_process
def _add_bpmn_json_records_new(self) -> None:
"""Adds serialized_bpmn_definition and process_instance_data records to the db session.
Expects the save method to commit it.
"""
bpmn_dict = json.loads(self.serialize())
# with open('tmp2.json', 'w') as f: f.write(json.dumps(bpmn_dict)
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
process_instance_data_dict = {}
bpmn_spec_dict = {}
@ -1038,19 +1120,14 @@ class ProcessInstanceProcessor:
# if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict)
# process_instance_data = None
# if self.process_instance_model.process_instance_data_id is None:
# process_instance_data = ProcessInstanceDataModel()
# else:
# process_instance_data = self.process_instance_model.process_instance_data
#
# process_instance_data.runtime_json = json.dumps(process_instance_data_dict)
# db.session.add(process_instance_data)
# self.process_instance_model.process_instance_data = process_instance_data
subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict)
for _subprocess_task_id, subprocess_properties in subprocesses.items():
self._add_bpmn_process(subprocess_properties, bpmn_process_parent)
def save(self) -> None:
"""Saves the current state of this processor to the database."""
self._add_bpmn_json_records()
# self._add_bpmn_json_records()
self._add_bpmn_json_records_new()
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION

View File

@ -71,7 +71,6 @@ class TestMessageService(BaseTest):
.all()
)
assert len(waiting_messages) == 0
# The process has completed
assert self.process_instance.status == "complete"