unit tests are passing w/ burnettk

This commit is contained in:
jasquat 2023-03-15 11:25:15 -04:00
parent 190e02dd65
commit 6abc3dc69d
No known key found for this signature in database
9 changed files with 211 additions and 118 deletions

View File

@ -1,3 +1,5 @@
from __future__ import with_statement
import logging
from logging.config import fileConfig

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 389800c352ee
Revision ID: 99f1b5156b06
Revises:
Create Date: 2023-03-07 10:40:43.709777
Create Date: 2023-03-14 17:23:22.667853
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '389800c352ee'
revision = '99f1b5156b06'
down_revision = None
branch_labels = None
depends_on = None
@ -166,8 +166,6 @@ def upgrade():
sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=True),
sa.Column('locked_by', sa.String(length=80), nullable=True),
sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ),
@ -207,20 +205,6 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('key')
)
op.create_table('task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=False),
sa.Column('bpmn_process_id', sa.Integer(), nullable=False),
sa.Column('state', sa.String(length=10), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True)
op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False)
op.create_table('task_definition',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False),
@ -284,7 +268,7 @@ def upgrade():
sa.Column('payload', sa.JSON(), nullable=True),
sa.Column('correlation_keys', sa.JSON(), nullable=True),
sa.Column('status', sa.String(length=20), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('counterpart_id', sa.Integer(), nullable=True),
sa.Column('failure_cause', sa.Text(), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
@ -331,6 +315,23 @@ def upgrade():
sa.UniqueConstraint('process_instance_id', 'key', name='process_instance_metadata_unique')
)
op.create_index(op.f('ix_process_instance_metadata_key'), 'process_instance_metadata', ['key'], unique=False)
op.create_table('process_instance_queue',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('run_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('priority', sa.Integer(), nullable=True),
sa.Column('locked_by', sa.String(length=80), nullable=True),
sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False)
op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False)
op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True)
op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False)
op.create_table('spiff_step_details',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
@ -346,6 +347,24 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('process_instance_id', 'spiff_step', name='process_instance_id_spiff_step')
)
op.create_table('task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=False),
sa.Column('bpmn_process_id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('task_definition_id', sa.Integer(), nullable=False),
sa.Column('state', sa.String(length=10), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.ForeignKeyConstraint(['task_definition_id'], ['task_definition.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True)
op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False)
op.create_table('human_task_user',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('human_task_id', sa.Integer(), nullable=False),
@ -379,7 +398,15 @@ def downgrade():
op.drop_index(op.f('ix_human_task_user_user_id'), table_name='human_task_user')
op.drop_index(op.f('ix_human_task_user_human_task_id'), table_name='human_task_user')
op.drop_table('human_task_user')
op.drop_index(op.f('ix_task_json_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_guid'), table_name='task')
op.drop_table('task')
op.drop_table('spiff_step_details')
op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue')
op.drop_table('process_instance_queue')
op.drop_index(op.f('ix_process_instance_metadata_key'), table_name='process_instance_metadata')
op.drop_table('process_instance_metadata')
op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data')
@ -392,9 +419,6 @@ def downgrade():
op.drop_table('user_group_assignment')
op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition')
op.drop_table('task_definition')
op.drop_index(op.f('ix_task_json_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_guid'), table_name='task')
op.drop_table('task')
op.drop_table('secret')
op.drop_table('refresh_token')
op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report')

View File

@ -1,58 +0,0 @@
"""empty message
Revision ID: e2972eaf8469
Revises: 389800c352ee
Create Date: 2023-03-13 22:00:21.579493
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = 'e2972eaf8469'
down_revision = '389800c352ee'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('process_instance_queue',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('run_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('priority', sa.Integer(), nullable=True),
sa.Column('locked_by', sa.String(length=80), nullable=True),
sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False)
op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False)
op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True)
op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False)
op.alter_column('message_instance', 'user_id',
existing_type=mysql.INTEGER(),
nullable=True)
op.drop_column('process_instance', 'locked_by')
op.drop_column('process_instance', 'locked_at_in_seconds')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('process_instance', sa.Column('locked_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True))
op.add_column('process_instance', sa.Column('locked_by', mysql.VARCHAR(length=80), nullable=True))
op.alter_column('message_instance', 'user_id',
existing_type=mysql.INTEGER(),
nullable=False)
op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue')
op.drop_table('process_instance_queue')
# ### end Alembic commands ###

View File

@ -1,5 +1,8 @@
"""Task."""
import enum
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from dataclasses import dataclass
from typing import Any
from typing import Optional
@ -45,11 +48,16 @@ class TaskModel(SpiffworkflowBaseDBModel):
bpmn_process_id: int = db.Column(
ForeignKey(BpmnProcessModel.id), nullable=False # type: ignore
)
process_instance_id: int = db.Column(
ForeignKey("process_instance.id"), nullable=False
)
# find this by looking up the "workflow_name" and "task_spec" from the properties_json
# task_definition_id: int = db.Column(
# ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore
# )
task_definition_id: int = db.Column(
ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore
)
task_definition = relationship("TaskDefinitionModel")
state: str = db.Column(db.String(10), nullable=False)
properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)

View File

@ -53,6 +53,7 @@ from SpiffWorkflow.task import TaskState
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models import task_definition
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import (
BpmnProcessDefinitionModel,
@ -457,6 +458,15 @@ class ProcessInstanceProcessor:
self.process_model_service = ProcessModelService()
bpmn_process_spec = None
self.full_bpmn_process_dict = {}
# this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id
# in the database. This is to cut down on database queries while adding new tasks to the database.
# Structure:
# { "bpmn_process_definition_identifier": { "task_identifier": bpmn_process_id } }
# To use from a spiff_task:
# [spiff_task.workflow.spec.name][spiff_task.task_spec.name]
self.bpmn_definition_identifiers_to_bpmn_process_id_mappings = {}
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None
if process_instance_model.bpmn_process_definition_id is None:
(
@ -472,7 +482,7 @@ class ProcessInstanceProcessor:
)
try:
(self.bpmn_process_instance, self.full_bpmn_process_dict) = (
(self.bpmn_process_instance, self.full_bpmn_process_dict, self.bpmn_definition_identifiers_to_bpmn_process_id_mappings) = (
self.__get_bpmn_process_instance(
process_instance_model,
bpmn_process_spec,
@ -537,9 +547,20 @@ class ProcessInstanceProcessor:
self.bpmn_process_instance
)
@classmethod
def _update_bpmn_definition_mappings(
cls, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict, bpmn_process_definition_identifier: str, task_definition: TaskDefinitionModel
) -> None:
# import pdb; pdb.set_trace()
# if bpmn_process_definition_identifier == 'test_process_to_call' and task_definition.bpmn_identifier == "Root":
# import pdb; pdb.set_trace()
if bpmn_process_definition_identifier not in bpmn_definition_identifiers_to_bpmn_process_id_mappings:
bpmn_definition_identifiers_to_bpmn_process_id_mappings[bpmn_process_definition_identifier] = {}
bpmn_definition_identifiers_to_bpmn_process_id_mappings[bpmn_process_definition_identifier][task_definition.bpmn_identifier] = task_definition
@classmethod
def _get_definition_dict_for_bpmn_process_definition(
cls, bpmn_process_definition: BpmnProcessDefinitionModel
cls, bpmn_process_definition: BpmnProcessDefinitionModel, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict
) -> dict:
task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id
@ -550,6 +571,7 @@ class ProcessInstanceProcessor:
bpmn_process_definition_dict["task_specs"][
task_definition.bpmn_identifier
] = task_definition.properties_json
cls._update_bpmn_definition_mappings(bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_process_definition.bpmn_identifier, task_definition)
return bpmn_process_definition_dict
@classmethod
@ -557,6 +579,7 @@ class ProcessInstanceProcessor:
cls,
bpmn_process_definition: BpmnProcessDefinitionModel,
spiff_bpmn_process_dict: dict,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict,
) -> None:
# find all child subprocesses of a process
bpmn_process_subprocess_definitions = (
@ -595,6 +618,7 @@ class ProcessInstanceProcessor:
task_definition.bpmn_process_definition_id
]
)
cls._update_bpmn_definition_mappings(bpmn_definition_identifiers_to_bpmn_process_id_mappings, bpmn_subprocess_definition_bpmn_identifier, task_definition)
spiff_bpmn_process_dict["subprocess_specs"][
bpmn_subprocess_definition_bpmn_identifier
]["task_specs"][
@ -643,7 +667,7 @@ class ProcessInstanceProcessor:
@classmethod
def _get_full_bpmn_process_dict(
cls, process_instance_model: ProcessInstanceModel
cls, process_instance_model: ProcessInstanceModel, bpmn_definition_identifiers_to_bpmn_process_id_mappings: dict
) -> dict:
if process_instance_model.bpmn_process_definition_id is None:
return {}
@ -658,11 +682,11 @@ class ProcessInstanceProcessor:
if bpmn_process_definition is not None:
spiff_bpmn_process_dict["spec"] = (
cls._get_definition_dict_for_bpmn_process_definition(
bpmn_process_definition
bpmn_process_definition, bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
)
cls._set_definition_dict_for_bpmn_subprocess_definitions(
bpmn_process_definition, spiff_bpmn_process_dict
bpmn_process_definition, spiff_bpmn_process_dict, bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
bpmn_process = process_instance_model.bpmn_process
@ -729,8 +753,10 @@ class ProcessInstanceProcessor:
spec: Optional[BpmnProcessSpec] = None,
validate_only: bool = False,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow:
) -> Tuple[BpmnWorkflow, dict, dict]:
full_bpmn_process_dict = {}
bpmn_definition_identifiers_to_bpmn_process_id_mappings = {}
print("GET BPMN PROCESS INSTANCE")
if process_instance_model.bpmn_process_definition_id is not None:
# turn off logging to avoid duplicated spiff logs
spiff_logger = logging.getLogger("spiff")
@ -740,9 +766,10 @@ class ProcessInstanceProcessor:
try:
full_bpmn_process_dict = (
ProcessInstanceProcessor._get_full_bpmn_process_dict(
process_instance_model
process_instance_model, bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
)
print("WE GOT FULL BPMN PROCESS DICT")
bpmn_process_instance = (
ProcessInstanceProcessor._serializer.workflow_from_dict(
full_bpmn_process_dict
@ -755,15 +782,17 @@ class ProcessInstanceProcessor:
ProcessInstanceProcessor.set_script_engine(bpmn_process_instance)
else:
print("WE NO HAVE FULL BPMN YET")
bpmn_process_instance = (
ProcessInstanceProcessor.get_bpmn_process_instance_from_workflow_spec(
spec, subprocesses
)
)
# import pdb; pdb.set_trace()
bpmn_process_instance.data[
ProcessInstanceProcessor.VALIDATION_PROCESS_KEY
] = validate_only
return (bpmn_process_instance, full_bpmn_process_dict)
return (bpmn_process_instance, full_bpmn_process_dict, bpmn_definition_identifiers_to_bpmn_process_id_mappings)
def slam_in_data(self, data: dict) -> None:
"""Slam_in_data."""
@ -1025,6 +1054,7 @@ class ProcessInstanceProcessor:
self,
process_bpmn_properties: dict,
bpmn_process_definition_parent: Optional[BpmnProcessDefinitionModel] = None,
store_bpmn_definition_mappings: bool = False,
) -> BpmnProcessDefinitionModel:
process_bpmn_identifier = process_bpmn_properties["name"]
new_hash_digest = sha256(
@ -1033,7 +1063,16 @@ class ProcessInstanceProcessor:
bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = (
BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first()
)
print(f"process_bpmn_properties: {process_bpmn_properties}")
# import pdb; pdb.set_trace()
# if process_bpmn_identifier == "test_process_to_call":
# import pdb; pdb.set_trace()
# print("HEY22")
print(f"self.process_instance_model.id: {self.process_instance_model.id}")
if bpmn_process_definition is None:
# import pdb; pdb.set_trace()
print("NO DEFINITION")
task_specs = process_bpmn_properties.pop("task_specs")
bpmn_process_definition = BpmnProcessDefinitionModel(
hash=new_hash_digest,
@ -1050,6 +1089,14 @@ class ProcessInstanceProcessor:
typename=task_bpmn_properties["typename"],
)
db.session.add(task_definition)
if store_bpmn_definition_mappings:
self._update_bpmn_definition_mappings(self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, process_bpmn_identifier, task_definition)
elif store_bpmn_definition_mappings:
# this should only ever happen when new process instances use a pre-existing bpmn process definitions
# otherwise this should get populated on processor initialization
task_definitions = TaskDefinitionModel.query.filter_by(bpmn_process_definition_id=bpmn_process_definition.id).all()
for task_definition in task_definitions:
self._update_bpmn_definition_mappings(self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, process_bpmn_identifier, task_definition)
if bpmn_process_definition_parent is not None:
bpmn_process_definition_relationship = (
@ -1067,13 +1114,17 @@ class ProcessInstanceProcessor:
return bpmn_process_definition
def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None:
# store only if mappings is currently empty. this also would mean this is a new instance that has never saved before
print("WE STORE BPM PROCESS DEF")
store_bpmn_definition_mappings = not self.bpmn_definition_identifiers_to_bpmn_process_id_mappings
bpmn_process_definition_parent = self._store_bpmn_process_definition(
bpmn_spec_dict["spec"]
bpmn_spec_dict["spec"], store_bpmn_definition_mappings=store_bpmn_definition_mappings
)
for process_bpmn_properties in bpmn_spec_dict["subprocess_specs"].values():
self._store_bpmn_process_definition(
process_bpmn_properties, bpmn_process_definition_parent
process_bpmn_properties, bpmn_process_definition_parent, store_bpmn_definition_mappings=store_bpmn_definition_mappings
)
# import pdb; pdb.set_trace()
self.process_instance_model.bpmn_process_definition = (
bpmn_process_definition_parent
)
@ -1083,7 +1134,8 @@ class ProcessInstanceProcessor:
Expects the save method to commit it.
"""
bpmn_dict = json.loads(self.serialize())
print("WE SAVE THINGS")
bpmn_dict = self.serialize()
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
process_instance_data_dict = {}
bpmn_spec_dict = {}
@ -1096,11 +1148,14 @@ class ProcessInstanceProcessor:
# FIXME: always save new hash until we get updated Spiff without loopresettask
# if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict)
# import pdb; pdb.set_trace()
print("WE NOW STORE BPMN PROCESS STUFFS")
print(f"bpmn_definition_identifiers_to_bpmn_process_id_mappings: {self.bpmn_definition_identifiers_to_bpmn_process_id_mappings}")
subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent, new_task_models, new_json_data_dicts = (
TaskService.add_bpmn_process(
process_instance_data_dict, self.process_instance_model
process_instance_data_dict, self.process_instance_model, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings, spiff_workflow=self.bpmn_process_instance
)
)
for subprocess_task_id, subprocess_properties in subprocesses.items():
@ -1113,6 +1168,8 @@ class ProcessInstanceProcessor:
self.process_instance_model,
bpmn_process_parent,
bpmn_process_guid=subprocess_task_id,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings,
spiff_workflow=self.bpmn_process_instance
)
new_task_models.update(subprocess_new_task_models)
new_json_data_dicts.update(subprocess_new_json_data_models)
@ -1122,6 +1179,7 @@ class ProcessInstanceProcessor:
def save(self) -> None:
"""Saves the current state of this processor to the database."""
print("WE IN SAVE")
self._add_bpmn_json_records()
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
@ -1631,6 +1689,7 @@ class ProcessInstanceProcessor:
secondary_engine_step_delegate=step_delegate,
serializer=self._serializer,
process_instance=self.process_instance_model,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings,
)
if execution_strategy_name is None:
@ -1722,11 +1781,12 @@ class ProcessInstanceProcessor:
)
)
def serialize(self) -> str:
def serialize(self) -> dict:
"""Serialize."""
self.check_task_data_size()
self.preserve_script_engine_state()
return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore
# return self._serializer.workflow_to_dict(self.bpmn_process_instance) # type: ignore
return json.loads(self._serializer.serialize_json(self.bpmn_process_instance)) # type: ignore
def next_user_tasks(self) -> list[SpiffTask]:
"""Next_user_tasks."""

View File

@ -11,6 +11,7 @@ from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskStateNames
from sqlalchemy.dialects.mysql import insert as mysql_insert
from sqlalchemy.dialects.postgresql import insert as postgres_insert
from uuid import UUID
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db
@ -44,24 +45,13 @@ class TaskService:
)
db.session.execute(on_duplicate_key_stmt)
@classmethod
def _update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict
) -> Optional[JsonDataDict]:
task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest()
json_data_dict: Optional[JsonDataDict] = None
if task_model.json_data_hash != task_data_hash:
json_data_dict = {"hash": task_data_hash, "data": task_data_dict}
task_model.json_data_hash = task_data_hash
return json_data_dict
@classmethod
def update_task_model(
cls,
task_model: TaskModel,
spiff_task: SpiffTask,
serializer: BpmnWorkflowSerializer,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
) -> Optional[JsonDataDict]:
"""Updates properties_json and data on given task_model.
@ -83,6 +73,7 @@ class TaskService:
spiff_task: SpiffTask,
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
) -> Tuple[
Optional[BpmnProcessModel],
TaskModel,
@ -98,12 +89,13 @@ class TaskService:
new_json_data_dicts: dict[str, JsonDataDict] = {}
if task_model is None:
bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process(
spiff_task, process_instance, serializer
spiff_task, process_instance, serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
task_definition = bpmn_definition_identifiers_to_bpmn_process_id_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name]
if task_model is None:
task_model = TaskModel(
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id
)
return (bpmn_process, task_model, new_task_models, new_json_data_dicts)
@ -130,6 +122,7 @@ class TaskService:
spiff_task: SpiffTask,
process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None
@ -140,12 +133,15 @@ class TaskService:
# This is the top level workflow, which has no guid
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
if process_instance.bpmn_process_id is None:
spiff_workflow = spiff_task.workflow._get_outermost_workflow()
bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process(
serializer.workflow_to_dict(
spiff_task.workflow._get_outermost_workflow()
spiff_workflow
),
process_instance,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings,
spiff_workflow=spiff_workflow,
)
)
else:
@ -153,12 +149,16 @@ class TaskService:
guid=subprocess_guid
).first()
if bpmn_process is None:
spiff_workflow = spiff_task.workflow
bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process(
serializer.workflow_to_dict(subprocess),
process_instance,
process_instance.bpmn_process,
subprocess_guid,
bpmn_definition_identifiers_to_bpmn_process_id_mappings=bpmn_definition_identifiers_to_bpmn_process_id_mappings,
spiff_workflow=spiff_workflow,
)
)
return (bpmn_process, new_task_models, new_json_data_dicts)
@ -170,6 +170,8 @@ class TaskService:
process_instance: ProcessInstanceModel,
bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
spiff_workflow: Optional[BpmnWorkflow] = None,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
"""This creates and adds a bpmn_process to the Db session.
@ -183,6 +185,7 @@ class TaskService:
new_json_data_dicts: dict[str, JsonDataDict] = {}
bpmn_process = None
print("ADD BPMN PROCESS")
if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid
@ -194,6 +197,9 @@ class TaskService:
if bpmn_process is None:
bpmn_process_is_new = True
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
for task_id, task_properties in tasks.items():
if task_properties['task_spec'] == 'Start':
bpmn_process_dict['root'] = task_id
bpmn_process.properties_json = bpmn_process_dict
@ -202,6 +208,7 @@ class TaskService:
bpmn_process_data_json.encode("utf8")
).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash:
# print(f"bpmn_process_data_dict: {bpmn_process_data_dict}")
new_json_data_dicts[bpmn_process_data_hash] = {
"hash": bpmn_process_data_hash,
"data": bpmn_process_data_dict,
@ -219,6 +226,16 @@ class TaskService:
if bpmn_process_is_new:
for task_id, task_properties in tasks.items():
if task_properties['task_spec'] == 'Root':
continue
if task_properties['task_spec'] == 'Start':
task_properties['parent'] = None
process_dict = bpmn_process.properties_json
process_dict['root'] = task_id
# print(f"process_dict: {process_dict}")
bpmn_process.properties_json = process_dict
# print(f"bpmn_process.properties_json: {bpmn_process.properties_json}")
db.session.add(bpmn_process)
task_data_dict = task_properties.pop("data")
state_int = task_properties["state"]
@ -231,8 +248,15 @@ class TaskService:
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
spiff_task = spiff_workflow.get_task(UUID(task_id))
try:
task_definition = bpmn_definition_identifiers_to_bpmn_process_id_mappings[spiff_task.workflow.spec.name][spiff_task.task_spec.name]
except Exception as ex:
import pdb; pdb.set_trace()
print("HEY")
raise ex
task_model = TaskModel(
guid=task_id, bpmn_process_id=bpmn_process.id
guid=task_id, bpmn_process_id=bpmn_process.id, process_instance_id=process_instance.id, task_definition_id=task_definition.id
)
task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties
@ -245,3 +269,15 @@ class TaskService:
new_json_data_dicts[json_data_dict["hash"]] = json_data_dict
return (bpmn_process, new_task_models, new_json_data_dicts)
@classmethod
def _update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict
) -> Optional[JsonDataDict]:
task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest()
json_data_dict: Optional[JsonDataDict] = None
if task_model.json_data_hash != task_data_hash:
json_data_dict = {"hash": task_data_hash, "data": task_data_dict}
task_model.json_data_hash = task_data_hash
return json_data_dict

View File

@ -58,9 +58,11 @@ class TaskModelSavingDelegate(EngineStepDelegate):
serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
bpmn_definition_identifiers_to_bpmn_process_id_mappings: Optional[dict] = None,
) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance
self.bpmn_definition_identifiers_to_bpmn_process_id_mappings = bpmn_definition_identifiers_to_bpmn_process_id_mappings
self.current_task_model: Optional[TaskModel] = None
self.task_models: dict[str, TaskModel] = {}
@ -78,7 +80,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if self.should_update_task_model():
_bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance, self.serializer
spiff_task, self.process_instance, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
)
self.current_task_model = task_model
@ -92,7 +94,7 @@ class TaskModelSavingDelegate(EngineStepDelegate):
if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time()
json_data_dict = TaskService.update_task_model(
self.current_task_model, spiff_task, self.serializer
self.current_task_model, spiff_task, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
if json_data_dict is not None:
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
@ -121,13 +123,13 @@ class TaskModelSavingDelegate(EngineStepDelegate):
):
_bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task, self.process_instance, self.serializer
waiting_spiff_task, self.process_instance, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
)
self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts)
json_data_dict = TaskService.update_task_model(
task_model, waiting_spiff_task, self.serializer
task_model, waiting_spiff_task, self.serializer, bpmn_definition_identifiers_to_bpmn_process_id_mappings=self.bpmn_definition_identifiers_to_bpmn_process_id_mappings
)
self.task_models[task_model.guid] = task_model
if json_data_dict is not None:

View File

@ -33,6 +33,7 @@ class TestErrorHandlingService(BaseTest):
process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
process_model.id, user
)
print(f"process_instance.id: {process_instance.id}")
pip = ProcessInstanceProcessor(process_instance)
with pytest.raises(ApiError) as e:
pip.do_engine_steps(save=True)

View File

@ -378,6 +378,24 @@ class TestProcessInstanceProcessor(BaseTest):
assert len(all_spiff_tasks) > 1
for spiff_task in all_spiff_tasks:
assert spiff_task.state == TaskState.COMPLETED
if spiff_task.task_spec.name == 'test_process_to_call_script':
task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
assert task.task_definition_id is not None
task_definition = task.task_definition
assert task_definition.bpmn_identifier == 'test_process_to_call_script'
assert task_definition.bpmn_process_definition.bpmn_identifier == 'test_process_to_call'
elif spiff_task.task_spec.name == 'top_level_subprocess_script':
task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
assert task.task_definition_id is not None
task_definition = task.task_definition
assert task_definition.bpmn_identifier == 'top_level_subprocess_script'
assert task_definition.bpmn_process_definition.bpmn_identifier == 'top_level_subprocess'
if spiff_task.task_spec.name == 'top_level_script':
task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
assert task.task_definition_id is not None
task_definition = task.task_definition
assert task_definition.bpmn_identifier == 'top_level_script'
assert task_definition.bpmn_process_definition.bpmn_identifier == 'top_level_process'
# FIXME: Checking task data cannot work with the feature/remove-loop-reset branch
# of SiffWorkflow. This is because it saves script data to the python_env and NOT
# to task.data. We may need to either create a new column on TaskModel to put the python_env