Merge pull request #184 from sartography/feature/add_task_definition_to_task

Feature/add task definition to task
This commit is contained in:
jasquat 2023-03-15 16:18:08 -04:00 committed by GitHub
commit a6768a679d
13 changed files with 415 additions and 202 deletions

View File

@ -1,8 +1,8 @@
"""empty message """empty message
Revision ID: 389800c352ee Revision ID: 434e6494e8ff
Revises: Revises:
Create Date: 2023-03-07 10:40:43.709777 Create Date: 2023-03-15 12:25:48.665481
""" """
from alembic import op from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '389800c352ee' revision = '434e6494e8ff'
down_revision = None down_revision = None
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -166,8 +166,6 @@ def upgrade():
sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True), sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True), sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=True), sa.Column('spiff_step', sa.Integer(), nullable=True),
sa.Column('locked_by', sa.String(length=80), nullable=True),
sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ), sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ), sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ), sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ),
@ -207,20 +205,6 @@ def upgrade():
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('key') sa.UniqueConstraint('key')
) )
op.create_table('task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=False),
sa.Column('bpmn_process_id', sa.Integer(), nullable=False),
sa.Column('state', sa.String(length=10), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True)
op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False)
op.create_table('task_definition', op.create_table('task_definition',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False), sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False),
@ -284,7 +268,7 @@ def upgrade():
sa.Column('payload', sa.JSON(), nullable=True), sa.Column('payload', sa.JSON(), nullable=True),
sa.Column('correlation_keys', sa.JSON(), nullable=True), sa.Column('correlation_keys', sa.JSON(), nullable=True),
sa.Column('status', sa.String(length=20), nullable=False), sa.Column('status', sa.String(length=20), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False), sa.Column('user_id', sa.Integer(), nullable=True),
sa.Column('counterpart_id', sa.Integer(), nullable=True), sa.Column('counterpart_id', sa.Integer(), nullable=True),
sa.Column('failure_cause', sa.Text(), nullable=True), sa.Column('failure_cause', sa.Text(), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
@ -331,6 +315,23 @@ def upgrade():
sa.UniqueConstraint('process_instance_id', 'key', name='process_instance_metadata_unique') sa.UniqueConstraint('process_instance_id', 'key', name='process_instance_metadata_unique')
) )
op.create_index(op.f('ix_process_instance_metadata_key'), 'process_instance_metadata', ['key'], unique=False) op.create_index(op.f('ix_process_instance_metadata_key'), 'process_instance_metadata', ['key'], unique=False)
op.create_table('process_instance_queue',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('run_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('priority', sa.Integer(), nullable=True),
sa.Column('locked_by', sa.String(length=80), nullable=True),
sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False)
op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False)
op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True)
op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False)
op.create_table('spiff_step_details', op.create_table('spiff_step_details',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False), sa.Column('process_instance_id', sa.Integer(), nullable=False),
@ -346,6 +347,26 @@ def upgrade():
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('process_instance_id', 'spiff_step', name='process_instance_id_spiff_step') sa.UniqueConstraint('process_instance_id', 'spiff_step', name='process_instance_id_spiff_step')
) )
op.create_table('task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=False),
sa.Column('bpmn_process_id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('task_definition_id', sa.Integer(), nullable=False),
sa.Column('state', sa.String(length=10), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('python_env_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.ForeignKeyConstraint(['task_definition_id'], ['task_definition.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True)
op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False)
op.create_index(op.f('ix_task_python_env_data_hash'), 'task', ['python_env_data_hash'], unique=False)
op.create_table('human_task_user', op.create_table('human_task_user',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('human_task_id', sa.Integer(), nullable=False), sa.Column('human_task_id', sa.Integer(), nullable=False),
@ -379,7 +400,16 @@ def downgrade():
op.drop_index(op.f('ix_human_task_user_user_id'), table_name='human_task_user') op.drop_index(op.f('ix_human_task_user_user_id'), table_name='human_task_user')
op.drop_index(op.f('ix_human_task_user_human_task_id'), table_name='human_task_user') op.drop_index(op.f('ix_human_task_user_human_task_id'), table_name='human_task_user')
op.drop_table('human_task_user') op.drop_table('human_task_user')
op.drop_index(op.f('ix_task_python_env_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_json_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_guid'), table_name='task')
op.drop_table('task')
op.drop_table('spiff_step_details') op.drop_table('spiff_step_details')
op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue')
op.drop_table('process_instance_queue')
op.drop_index(op.f('ix_process_instance_metadata_key'), table_name='process_instance_metadata') op.drop_index(op.f('ix_process_instance_metadata_key'), table_name='process_instance_metadata')
op.drop_table('process_instance_metadata') op.drop_table('process_instance_metadata')
op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data') op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data')
@ -392,9 +422,6 @@ def downgrade():
op.drop_table('user_group_assignment') op.drop_table('user_group_assignment')
op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition') op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition')
op.drop_table('task_definition') op.drop_table('task_definition')
op.drop_index(op.f('ix_task_json_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_guid'), table_name='task')
op.drop_table('task')
op.drop_table('secret') op.drop_table('secret')
op.drop_table('refresh_token') op.drop_table('refresh_token')
op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report') op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report')

View File

@ -1,58 +0,0 @@
"""empty message
Revision ID: e2972eaf8469
Revises: 389800c352ee
Create Date: 2023-03-13 22:00:21.579493
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = 'e2972eaf8469'
down_revision = '389800c352ee'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('process_instance_queue',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('run_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('priority', sa.Integer(), nullable=True),
sa.Column('locked_by', sa.String(length=80), nullable=True),
sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), 'process_instance_queue', ['locked_at_in_seconds'], unique=False)
op.create_index(op.f('ix_process_instance_queue_locked_by'), 'process_instance_queue', ['locked_by'], unique=False)
op.create_index(op.f('ix_process_instance_queue_process_instance_id'), 'process_instance_queue', ['process_instance_id'], unique=True)
op.create_index(op.f('ix_process_instance_queue_status'), 'process_instance_queue', ['status'], unique=False)
op.alter_column('message_instance', 'user_id',
existing_type=mysql.INTEGER(),
nullable=True)
op.drop_column('process_instance', 'locked_by')
op.drop_column('process_instance', 'locked_at_in_seconds')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.add_column('process_instance', sa.Column('locked_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True))
op.add_column('process_instance', sa.Column('locked_by', mysql.VARCHAR(length=80), nullable=True))
op.alter_column('message_instance', 'user_id',
existing_type=mysql.INTEGER(),
nullable=False)
op.drop_index(op.f('ix_process_instance_queue_status'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_process_instance_id'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_by'), table_name='process_instance_queue')
op.drop_index(op.f('ix_process_instance_queue_locked_at_in_seconds'), table_name='process_instance_queue')
op.drop_table('process_instance_queue')
# ### end Alembic commands ###

View File

@ -1894,8 +1894,8 @@ lxml = "*"
[package.source] [package.source]
type = "git" type = "git"
url = "https://github.com/sartography/SpiffWorkflow" url = "https://github.com/sartography/SpiffWorkflow"
reference = "feature/remove-loop-reset" reference = "main"
resolved_reference = "13034aaf12f62aa3914744ca05bc9a3e3b3c3452" resolved_reference = "f162aac43af3af18d1a55186aeccea154fb8b05d"
[[package]] [[package]]
name = "SQLAlchemy" name = "SQLAlchemy"
@ -2274,7 +2274,7 @@ testing = ["flake8 (<5)", "func-timeout", "jaraco.functools", "jaraco.itertools"
[metadata] [metadata]
lock-version = "1.1" lock-version = "1.1"
python-versions = ">=3.9,<3.12" python-versions = ">=3.9,<3.12"
content-hash = "7ab6d5021406b573edfdca4f9e0f5e62c41a6f6ea09d34154df72454887e3670" content-hash = "b9ea32912509637f1378d060771de7548d93953aa3db12d6a48098f7dc15205f"
[metadata.files] [metadata.files]
alabaster = [ alabaster = [

View File

@ -27,7 +27,7 @@ flask-marshmallow = "*"
flask-migrate = "*" flask-migrate = "*"
flask-restful = "*" flask-restful = "*"
werkzeug = "*" werkzeug = "*"
SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "feature/remove-loop-reset"} SpiffWorkflow = {git = "https://github.com/sartography/SpiffWorkflow", rev = "main"}
# SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" } # SpiffWorkflow = {develop = true, path = "../SpiffWorkflow" }
sentry-sdk = "^1.10" sentry-sdk = "^1.10"
sphinx-autoapi = "^2.0" sphinx-autoapi = "^2.0"

View File

@ -1,6 +1,7 @@
from __future__ import annotations from __future__ import annotations
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@ -24,6 +25,8 @@ class BpmnProcessModel(SpiffworkflowBaseDBModel):
properties_json: dict = db.Column(db.JSON, nullable=False) properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
tasks = relationship("TaskModel", cascade="delete") # type: ignore
# subprocess or top_level_process # subprocess or top_level_process
# process_type: str = db.Column(db.String(30), nullable=False) # process_type: str = db.Column(db.String(30), nullable=False)

View File

@ -4,6 +4,10 @@ from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
class JsonDataModelNotFoundError(Exception):
pass
# delta algorithm <- just to save it for when we want to try to implement it: # delta algorithm <- just to save it for when we want to try to implement it:
# a = {"hey": { "hey2": 2, "hey3": 3, "hey6": 7 }, "hey30": 3, "hey40": 4} # a = {"hey": { "hey2": 2, "hey3": 3, "hey6": 7 }, "hey30": 3, "hey40": 4}
# b = {"hey": { "hey2": 4, "hey5": 3 }, "hey20": 2, "hey30": 3} # b = {"hey": { "hey2": 4, "hey5": 3 }, "hey20": 2, "hey30": 3}
@ -27,3 +31,18 @@ class JsonDataModel(SpiffworkflowBaseDBModel):
# this is a sha256 hash of spec and serializer_version # this is a sha256 hash of spec and serializer_version
hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True) hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True)
data: dict = db.Column(db.JSON, nullable=False) data: dict = db.Column(db.JSON, nullable=False)
@classmethod
def find_object_by_hash(cls, hash: str) -> JsonDataModel:
json_data_model: JsonDataModel | None = JsonDataModel.query.filter_by(
hash=hash
).first()
if json_data_model is None:
raise JsonDataModelNotFoundError(
f"Could not find a json data model entry with hash: {hash}"
)
return json_data_model
@classmethod
def find_data_dict_by_hash(cls, hash: str) -> dict:
return cls.find_object_by_hash(hash).data

View File

@ -71,7 +71,8 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
bpmn_process_id: int | None = db.Column( bpmn_process_id: int | None = db.Column(
ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore
) )
bpmn_process = relationship(BpmnProcessModel) bpmn_process = relationship(BpmnProcessModel, cascade="delete")
tasks = relationship("TaskModel", cascade="delete") # type: ignore
spiff_serializer_version = db.Column(db.String(50), nullable=True) spiff_serializer_version = db.Column(db.String(50), nullable=True)

View File

@ -10,10 +10,13 @@ from marshmallow import Schema
from marshmallow_enum import EnumField # type: ignore from marshmallow_enum import EnumField # type: ignore
from SpiffWorkflow.task import TaskStateNames # type: ignore from SpiffWorkflow.task import TaskStateNames # type: ignore
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.json_data import JsonDataModel
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
class MultiInstanceType(enum.Enum): class MultiInstanceType(enum.Enum):
@ -45,18 +48,31 @@ class TaskModel(SpiffworkflowBaseDBModel):
bpmn_process_id: int = db.Column( bpmn_process_id: int = db.Column(
ForeignKey(BpmnProcessModel.id), nullable=False # type: ignore ForeignKey(BpmnProcessModel.id), nullable=False # type: ignore
) )
process_instance_id: int = db.Column(
ForeignKey("process_instance.id"), nullable=False
)
# find this by looking up the "workflow_name" and "task_spec" from the properties_json # find this by looking up the "workflow_name" and "task_spec" from the properties_json
# task_definition_id: int = db.Column( task_definition_id: int = db.Column(
# ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore
# ) )
task_definition = relationship("TaskDefinitionModel")
state: str = db.Column(db.String(10), nullable=False) state: str = db.Column(db.String(10), nullable=False)
properties_json: dict = db.Column(db.JSON, nullable=False) properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True) json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
python_env_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
start_in_seconds: float = db.Column(db.DECIMAL(17, 6)) start_in_seconds: float = db.Column(db.DECIMAL(17, 6))
end_in_seconds: Union[float, None] = db.Column(db.DECIMAL(17, 6)) end_in_seconds: Union[float, None] = db.Column(db.DECIMAL(17, 6))
def python_env_data(self) -> dict:
return JsonDataModel.find_data_dict_by_hash(self.python_env_data_hash)
def json_data(self) -> dict:
return JsonDataModel.find_data_dict_by_hash(self.json_data_hash)
class Task: class Task:
"""Task.""" """Task."""

View File

@ -456,7 +456,16 @@ class ProcessInstanceProcessor:
self.process_instance_model = process_instance_model self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService() self.process_model_service = ProcessModelService()
bpmn_process_spec = None bpmn_process_spec = None
self.full_bpmn_process_dict = {} self.full_bpmn_process_dict: dict = {}
# this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id
# in the database. This is to cut down on database queries while adding new tasks to the database.
# Structure:
# { "bpmn_process_definition_identifier": { "task_identifier": task_definition } }
# To use from a spiff_task:
# [spiff_task.workflow.spec.name][spiff_task.task_spec.name]
self.bpmn_definition_to_task_definitions_mappings: dict = {}
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None subprocesses: Optional[IdToBpmnProcessSpecMapping] = None
if process_instance_model.bpmn_process_definition_id is None: if process_instance_model.bpmn_process_definition_id is None:
( (
@ -472,13 +481,15 @@ class ProcessInstanceProcessor:
) )
try: try:
(self.bpmn_process_instance, self.full_bpmn_process_dict) = ( (
self.__get_bpmn_process_instance( self.bpmn_process_instance,
process_instance_model, self.full_bpmn_process_dict,
bpmn_process_spec, self.bpmn_definition_to_task_definitions_mappings,
validate_only, ) = self.__get_bpmn_process_instance(
subprocesses=subprocesses, process_instance_model,
) bpmn_process_spec,
validate_only,
subprocesses=subprocesses,
) )
self.set_script_engine(self.bpmn_process_instance) self.set_script_engine(self.bpmn_process_instance)
@ -537,9 +548,29 @@ class ProcessInstanceProcessor:
self.bpmn_process_instance self.bpmn_process_instance
) )
@classmethod
def _update_bpmn_definition_mappings(
cls,
bpmn_definition_to_task_definitions_mappings: dict,
bpmn_process_definition_identifier: str,
task_definition: TaskDefinitionModel,
) -> None:
if (
bpmn_process_definition_identifier
not in bpmn_definition_to_task_definitions_mappings
):
bpmn_definition_to_task_definitions_mappings[
bpmn_process_definition_identifier
] = {}
bpmn_definition_to_task_definitions_mappings[
bpmn_process_definition_identifier
][task_definition.bpmn_identifier] = task_definition
@classmethod @classmethod
def _get_definition_dict_for_bpmn_process_definition( def _get_definition_dict_for_bpmn_process_definition(
cls, bpmn_process_definition: BpmnProcessDefinitionModel cls,
bpmn_process_definition: BpmnProcessDefinitionModel,
bpmn_definition_to_task_definitions_mappings: dict,
) -> dict: ) -> dict:
task_definitions = TaskDefinitionModel.query.filter_by( task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id bpmn_process_definition_id=bpmn_process_definition.id
@ -550,6 +581,11 @@ class ProcessInstanceProcessor:
bpmn_process_definition_dict["task_specs"][ bpmn_process_definition_dict["task_specs"][
task_definition.bpmn_identifier task_definition.bpmn_identifier
] = task_definition.properties_json ] = task_definition.properties_json
cls._update_bpmn_definition_mappings(
bpmn_definition_to_task_definitions_mappings,
bpmn_process_definition.bpmn_identifier,
task_definition,
)
return bpmn_process_definition_dict return bpmn_process_definition_dict
@classmethod @classmethod
@ -557,6 +593,7 @@ class ProcessInstanceProcessor:
cls, cls,
bpmn_process_definition: BpmnProcessDefinitionModel, bpmn_process_definition: BpmnProcessDefinitionModel,
spiff_bpmn_process_dict: dict, spiff_bpmn_process_dict: dict,
bpmn_definition_to_task_definitions_mappings: dict,
) -> None: ) -> None:
# find all child subprocesses of a process # find all child subprocesses of a process
bpmn_process_subprocess_definitions = ( bpmn_process_subprocess_definitions = (
@ -595,6 +632,11 @@ class ProcessInstanceProcessor:
task_definition.bpmn_process_definition_id task_definition.bpmn_process_definition_id
] ]
) )
cls._update_bpmn_definition_mappings(
bpmn_definition_to_task_definitions_mappings,
bpmn_subprocess_definition_bpmn_identifier,
task_definition,
)
spiff_bpmn_process_dict["subprocess_specs"][ spiff_bpmn_process_dict["subprocess_specs"][
bpmn_subprocess_definition_bpmn_identifier bpmn_subprocess_definition_bpmn_identifier
]["task_specs"][ ]["task_specs"][
@ -643,7 +685,9 @@ class ProcessInstanceProcessor:
@classmethod @classmethod
def _get_full_bpmn_process_dict( def _get_full_bpmn_process_dict(
cls, process_instance_model: ProcessInstanceModel cls,
process_instance_model: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
) -> dict: ) -> dict:
if process_instance_model.bpmn_process_definition_id is None: if process_instance_model.bpmn_process_definition_id is None:
return {} return {}
@ -658,11 +702,14 @@ class ProcessInstanceProcessor:
if bpmn_process_definition is not None: if bpmn_process_definition is not None:
spiff_bpmn_process_dict["spec"] = ( spiff_bpmn_process_dict["spec"] = (
cls._get_definition_dict_for_bpmn_process_definition( cls._get_definition_dict_for_bpmn_process_definition(
bpmn_process_definition bpmn_process_definition,
bpmn_definition_to_task_definitions_mappings,
) )
) )
cls._set_definition_dict_for_bpmn_subprocess_definitions( cls._set_definition_dict_for_bpmn_subprocess_definitions(
bpmn_process_definition, spiff_bpmn_process_dict bpmn_process_definition,
spiff_bpmn_process_dict,
bpmn_definition_to_task_definitions_mappings,
) )
bpmn_process = process_instance_model.bpmn_process bpmn_process = process_instance_model.bpmn_process
@ -729,8 +776,9 @@ class ProcessInstanceProcessor:
spec: Optional[BpmnProcessSpec] = None, spec: Optional[BpmnProcessSpec] = None,
validate_only: bool = False, validate_only: bool = False,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow: ) -> Tuple[BpmnWorkflow, dict, dict]:
full_bpmn_process_dict = {} full_bpmn_process_dict = {}
bpmn_definition_to_task_definitions_mappings: dict = {}
if process_instance_model.bpmn_process_definition_id is not None: if process_instance_model.bpmn_process_definition_id is not None:
# turn off logging to avoid duplicated spiff logs # turn off logging to avoid duplicated spiff logs
spiff_logger = logging.getLogger("spiff") spiff_logger = logging.getLogger("spiff")
@ -740,7 +788,8 @@ class ProcessInstanceProcessor:
try: try:
full_bpmn_process_dict = ( full_bpmn_process_dict = (
ProcessInstanceProcessor._get_full_bpmn_process_dict( ProcessInstanceProcessor._get_full_bpmn_process_dict(
process_instance_model process_instance_model,
bpmn_definition_to_task_definitions_mappings,
) )
) )
bpmn_process_instance = ( bpmn_process_instance = (
@ -763,7 +812,11 @@ class ProcessInstanceProcessor:
bpmn_process_instance.data[ bpmn_process_instance.data[
ProcessInstanceProcessor.VALIDATION_PROCESS_KEY ProcessInstanceProcessor.VALIDATION_PROCESS_KEY
] = validate_only ] = validate_only
return (bpmn_process_instance, full_bpmn_process_dict) return (
bpmn_process_instance,
full_bpmn_process_dict,
bpmn_definition_to_task_definitions_mappings,
)
def slam_in_data(self, data: dict) -> None: def slam_in_data(self, data: dict) -> None:
"""Slam_in_data.""" """Slam_in_data."""
@ -1025,6 +1078,7 @@ class ProcessInstanceProcessor:
self, self,
process_bpmn_properties: dict, process_bpmn_properties: dict,
bpmn_process_definition_parent: Optional[BpmnProcessDefinitionModel] = None, bpmn_process_definition_parent: Optional[BpmnProcessDefinitionModel] = None,
store_bpmn_definition_mappings: bool = False,
) -> BpmnProcessDefinitionModel: ) -> BpmnProcessDefinitionModel:
process_bpmn_identifier = process_bpmn_properties["name"] process_bpmn_identifier = process_bpmn_properties["name"]
new_hash_digest = sha256( new_hash_digest = sha256(
@ -1033,6 +1087,7 @@ class ProcessInstanceProcessor:
bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = ( bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = (
BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first() BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first()
) )
if bpmn_process_definition is None: if bpmn_process_definition is None:
task_specs = process_bpmn_properties.pop("task_specs") task_specs = process_bpmn_properties.pop("task_specs")
bpmn_process_definition = BpmnProcessDefinitionModel( bpmn_process_definition = BpmnProcessDefinitionModel(
@ -1050,6 +1105,24 @@ class ProcessInstanceProcessor:
typename=task_bpmn_properties["typename"], typename=task_bpmn_properties["typename"],
) )
db.session.add(task_definition) db.session.add(task_definition)
if store_bpmn_definition_mappings:
self._update_bpmn_definition_mappings(
self.bpmn_definition_to_task_definitions_mappings,
process_bpmn_identifier,
task_definition,
)
elif store_bpmn_definition_mappings:
# this should only ever happen when new process instances use a pre-existing bpmn process definitions
# otherwise this should get populated on processor initialization
task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id
).all()
for task_definition in task_definitions:
self._update_bpmn_definition_mappings(
self.bpmn_definition_to_task_definitions_mappings,
process_bpmn_identifier,
task_definition,
)
if bpmn_process_definition_parent is not None: if bpmn_process_definition_parent is not None:
bpmn_process_definition_relationship = ( bpmn_process_definition_relationship = (
@ -1067,12 +1140,19 @@ class ProcessInstanceProcessor:
return bpmn_process_definition return bpmn_process_definition
def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None: def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None:
# store only if mappings is currently empty. this also would mean this is a new instance that has never saved before
store_bpmn_definition_mappings = (
not self.bpmn_definition_to_task_definitions_mappings
)
bpmn_process_definition_parent = self._store_bpmn_process_definition( bpmn_process_definition_parent = self._store_bpmn_process_definition(
bpmn_spec_dict["spec"] bpmn_spec_dict["spec"],
store_bpmn_definition_mappings=store_bpmn_definition_mappings,
) )
for process_bpmn_properties in bpmn_spec_dict["subprocess_specs"].values(): for process_bpmn_properties in bpmn_spec_dict["subprocess_specs"].values():
self._store_bpmn_process_definition( self._store_bpmn_process_definition(
process_bpmn_properties, bpmn_process_definition_parent process_bpmn_properties,
bpmn_process_definition_parent,
store_bpmn_definition_mappings=store_bpmn_definition_mappings,
) )
self.process_instance_model.bpmn_process_definition = ( self.process_instance_model.bpmn_process_definition = (
bpmn_process_definition_parent bpmn_process_definition_parent
@ -1083,7 +1163,7 @@ class ProcessInstanceProcessor:
Expects the save method to commit it. Expects the save method to commit it.
""" """
bpmn_dict = json.loads(self.serialize()) bpmn_dict = self.serialize()
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
process_instance_data_dict = {} process_instance_data_dict = {}
bpmn_spec_dict = {} bpmn_spec_dict = {}
@ -1093,14 +1173,18 @@ class ProcessInstanceProcessor:
else: else:
process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key] process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key]
# FIXME: always save new hash until we get updated Spiff without loopresettask # we may have to already process bpmn_defintions if we ever care about the Root task again
# if self.process_instance_model.bpmn_process_definition_id is None: if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict) self._add_bpmn_process_definitions(bpmn_spec_dict)
subprocesses = process_instance_data_dict.pop("subprocesses") subprocesses = process_instance_data_dict.pop("subprocesses")
bpmn_process_parent, new_task_models, new_json_data_dicts = ( bpmn_process_parent, new_task_models, new_json_data_dicts = (
TaskService.add_bpmn_process( TaskService.add_bpmn_process(
process_instance_data_dict, self.process_instance_model bpmn_process_dict=process_instance_data_dict,
process_instance=self.process_instance_model,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
spiff_workflow=self.bpmn_process_instance,
serializer=self._serializer,
) )
) )
for subprocess_task_id, subprocess_properties in subprocesses.items(): for subprocess_task_id, subprocess_properties in subprocesses.items():
@ -1109,10 +1193,13 @@ class ProcessInstanceProcessor:
subprocess_new_task_models, subprocess_new_task_models,
subprocess_new_json_data_models, subprocess_new_json_data_models,
) = TaskService.add_bpmn_process( ) = TaskService.add_bpmn_process(
subprocess_properties, bpmn_process_dict=subprocess_properties,
self.process_instance_model, process_instance=self.process_instance_model,
bpmn_process_parent, bpmn_process_parent=bpmn_process_parent,
bpmn_process_guid=subprocess_task_id, bpmn_process_guid=subprocess_task_id,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
spiff_workflow=self.bpmn_process_instance,
serializer=self._serializer,
) )
new_task_models.update(subprocess_new_task_models) new_task_models.update(subprocess_new_task_models)
new_json_data_dicts.update(subprocess_new_json_data_models) new_json_data_dicts.update(subprocess_new_json_data_models)
@ -1631,6 +1718,7 @@ class ProcessInstanceProcessor:
secondary_engine_step_delegate=step_delegate, secondary_engine_step_delegate=step_delegate,
serializer=self._serializer, serializer=self._serializer,
process_instance=self.process_instance_model, process_instance=self.process_instance_model,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
if execution_strategy_name is None: if execution_strategy_name is None:
@ -1722,11 +1810,11 @@ class ProcessInstanceProcessor:
) )
) )
def serialize(self) -> str: def serialize(self) -> dict:
"""Serialize.""" """Serialize."""
self.check_task_data_size() self.check_task_data_size()
self.preserve_script_engine_state() self.preserve_script_engine_state()
return self._serializer.serialize_json(self.bpmn_process_instance) # type: ignore return self._serializer.workflow_to_dict(self.bpmn_process_instance) # type: ignore
def next_user_tasks(self) -> list[SpiffTask]: def next_user_tasks(self) -> list[SpiffTask]:
"""Next_user_tasks.""" """Next_user_tasks."""
@ -1870,18 +1958,19 @@ class ProcessInstanceProcessor:
db.session.add(details_model) db.session.add(details_model)
# ####### # #######
json_data_dict = TaskService.update_task_model( json_data_dict_list = TaskService.update_task_model(
task_model, spiff_task, self._serializer task_model, spiff_task, self._serializer
) )
if json_data_dict is not None: for json_data_dict in json_data_dict_list:
json_data = ( if json_data_dict is not None:
db.session.query(JsonDataModel.id) json_data = (
.filter_by(hash=json_data_dict["hash"]) db.session.query(JsonDataModel.id)
.first() .filter_by(hash=json_data_dict["hash"])
) .first()
if json_data is None: )
json_data = JsonDataModel(**json_data_dict) if json_data is None:
db.session.add(json_data) json_data = JsonDataModel(**json_data_dict)
db.session.add(json_data)
# this is the thing that actually commits the db transaction (on behalf of the other updates above as well) # this is the thing that actually commits the db transaction (on behalf of the other updates above as well)
self.save() self.save()

View File

@ -3,6 +3,7 @@ from hashlib import sha256
from typing import Optional from typing import Optional
from typing import Tuple from typing import Tuple
from typing import TypedDict from typing import TypedDict
from uuid import UUID
from flask import current_app from flask import current_app
from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore from SpiffWorkflow.bpmn.serializer.workflow import BpmnWorkflow # type: ignore
@ -25,6 +26,8 @@ class JsonDataDict(TypedDict):
class TaskService: class TaskService:
PYTHON_ENVIRONMENT_STATE_KEY = "spiff__python_env_state"
@classmethod @classmethod
def insert_or_update_json_data_records( def insert_or_update_json_data_records(
cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict] cls, json_data_hash_to_json_data_dict_mapping: dict[str, JsonDataDict]
@ -35,7 +38,7 @@ class TaskService:
if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql": if current_app.config["SPIFFWORKFLOW_BACKEND_DATABASE_TYPE"] == "mysql":
insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts) insert_stmt = mysql_insert(JsonDataModel).values(list_of_dicts)
on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update( on_duplicate_key_stmt = insert_stmt.on_duplicate_key_update(
data=insert_stmt.inserted.data, status="U" data=insert_stmt.inserted.data
) )
else: else:
insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts) insert_stmt = postgres_insert(JsonDataModel).values(list_of_dicts)
@ -44,25 +47,13 @@ class TaskService:
) )
db.session.execute(on_duplicate_key_stmt) db.session.execute(on_duplicate_key_stmt)
@classmethod
def _update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict
) -> Optional[JsonDataDict]:
task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest()
json_data_dict: Optional[JsonDataDict] = None
if task_model.json_data_hash != task_data_hash:
json_data_dict = {"hash": task_data_hash, "data": task_data_dict}
task_model.json_data_hash = task_data_hash
return json_data_dict
@classmethod @classmethod
def update_task_model( def update_task_model(
cls, cls,
task_model: TaskModel, task_model: TaskModel,
spiff_task: SpiffTask, spiff_task: SpiffTask,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
) -> Optional[JsonDataDict]: ) -> list[Optional[JsonDataDict]]:
"""Updates properties_json and data on given task_model. """Updates properties_json and data on given task_model.
This will NOT update start_in_seconds or end_in_seconds. This will NOT update start_in_seconds or end_in_seconds.
@ -70,12 +61,18 @@ class TaskService:
""" """
new_properties_json = serializer.task_to_dict(spiff_task) new_properties_json = serializer.task_to_dict(spiff_task)
spiff_task_data = new_properties_json.pop("data") spiff_task_data = new_properties_json.pop("data")
python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(
spiff_task, serializer
)
task_model.properties_json = new_properties_json task_model.properties_json = new_properties_json
task_model.state = TaskStateNames[new_properties_json["state"]] task_model.state = TaskStateNames[new_properties_json["state"]]
json_data_dict = cls._update_task_data_on_task_model( json_data_dict = cls._update_task_data_on_task_model(
task_model, spiff_task_data task_model, spiff_task_data, "json_data_hash"
) )
return json_data_dict python_env_dict = cls._update_task_data_on_task_model(
task_model, python_env_data_dict, "python_env_data_hash"
)
return [json_data_dict, python_env_dict]
@classmethod @classmethod
def find_or_create_task_model_from_spiff_task( def find_or_create_task_model_from_spiff_task(
@ -83,6 +80,7 @@ class TaskService:
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
bpmn_definition_to_task_definitions_mappings: dict,
) -> Tuple[ ) -> Tuple[
Optional[BpmnProcessModel], Optional[BpmnProcessModel],
TaskModel, TaskModel,
@ -98,12 +96,21 @@ class TaskService:
new_json_data_dicts: dict[str, JsonDataDict] = {} new_json_data_dicts: dict[str, JsonDataDict] = {}
if task_model is None: if task_model is None:
bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process( bpmn_process, new_task_models, new_json_data_dicts = cls.task_bpmn_process(
spiff_task, process_instance, serializer spiff_task,
process_instance,
serializer,
bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
) )
task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first() task_model = TaskModel.query.filter_by(guid=spiff_task_guid).first()
if task_model is None: if task_model is None:
task_definition = bpmn_definition_to_task_definitions_mappings[
spiff_task.workflow.spec.name
][spiff_task.task_spec.name]
task_model = TaskModel( task_model = TaskModel(
guid=spiff_task_guid, bpmn_process_id=bpmn_process.id guid=spiff_task_guid,
bpmn_process_id=bpmn_process.id,
process_instance_id=process_instance.id,
task_definition_id=task_definition.id,
) )
return (bpmn_process, task_model, new_task_models, new_json_data_dicts) return (bpmn_process, task_model, new_task_models, new_json_data_dicts)
@ -130,6 +137,7 @@ class TaskService:
spiff_task: SpiffTask, spiff_task: SpiffTask,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
bpmn_definition_to_task_definitions_mappings: dict,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
subprocess_guid, subprocess = cls.task_subprocess(spiff_task) subprocess_guid, subprocess = cls.task_subprocess(spiff_task)
bpmn_process: Optional[BpmnProcessModel] = None bpmn_process: Optional[BpmnProcessModel] = None
@ -140,12 +148,14 @@ class TaskService:
# This is the top level workflow, which has no guid # This is the top level workflow, which has no guid
# check for bpmn_process_id because mypy doesn't realize bpmn_process can be None # check for bpmn_process_id because mypy doesn't realize bpmn_process can be None
if process_instance.bpmn_process_id is None: if process_instance.bpmn_process_id is None:
spiff_workflow = spiff_task.workflow._get_outermost_workflow()
bpmn_process, new_task_models, new_json_data_dicts = ( bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process( cls.add_bpmn_process(
serializer.workflow_to_dict( bpmn_process_dict=serializer.workflow_to_dict(spiff_workflow),
spiff_task.workflow._get_outermost_workflow() process_instance=process_instance,
), bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
process_instance, spiff_workflow=spiff_workflow,
serializer=serializer,
) )
) )
else: else:
@ -153,12 +163,16 @@ class TaskService:
guid=subprocess_guid guid=subprocess_guid
).first() ).first()
if bpmn_process is None: if bpmn_process is None:
spiff_workflow = spiff_task.workflow
bpmn_process, new_task_models, new_json_data_dicts = ( bpmn_process, new_task_models, new_json_data_dicts = (
cls.add_bpmn_process( cls.add_bpmn_process(
serializer.workflow_to_dict(subprocess), bpmn_process_dict=serializer.workflow_to_dict(subprocess),
process_instance, process_instance=process_instance,
process_instance.bpmn_process, bpmn_process_parent=process_instance.bpmn_process,
subprocess_guid, bpmn_process_guid=subprocess_guid,
bpmn_definition_to_task_definitions_mappings=bpmn_definition_to_task_definitions_mappings,
spiff_workflow=spiff_workflow,
serializer=serializer,
) )
) )
return (bpmn_process, new_task_models, new_json_data_dicts) return (bpmn_process, new_task_models, new_json_data_dicts)
@ -168,6 +182,9 @@ class TaskService:
cls, cls,
bpmn_process_dict: dict, bpmn_process_dict: dict,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
spiff_workflow: BpmnWorkflow,
serializer: BpmnWorkflowSerializer,
bpmn_process_parent: Optional[BpmnProcessModel] = None, bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None, bpmn_process_guid: Optional[str] = None,
) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]: ) -> Tuple[BpmnProcessModel, dict[str, TaskModel], dict[str, JsonDataDict]]:
@ -195,6 +212,12 @@ class TaskService:
bpmn_process_is_new = True bpmn_process_is_new = True
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid) bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
# Point the root id to the Start task instead of the Root task
# since we are ignoring the Root task.
for task_id, task_properties in tasks.items():
if task_properties["task_spec"] == "Start":
bpmn_process_dict["root"] = task_id
bpmn_process.properties_json = bpmn_process_dict bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True) bpmn_process_data_json = json.dumps(bpmn_process_data_dict, sort_keys=True)
@ -219,29 +242,85 @@ class TaskService:
if bpmn_process_is_new: if bpmn_process_is_new:
for task_id, task_properties in tasks.items(): for task_id, task_properties in tasks.items():
# The Root task is added to the spec by Spiff when the bpmn process is instantiated
# within Spiff. We do not actually need it and it's missing from our initial
# bpmn process defintion so let's avoid using it.
if task_properties["task_spec"] == "Root":
continue
if task_properties["task_spec"] == "Start":
task_properties["parent"] = None
task_data_dict = task_properties.pop("data") task_data_dict = task_properties.pop("data")
state_int = task_properties["state"] state_int = task_properties["state"]
spiff_task = spiff_workflow.get_task(UUID(task_id))
task_model = TaskModel.query.filter_by(guid=task_id).first() task_model = TaskModel.query.filter_by(guid=task_id).first()
if task_model is None: if task_model is None:
# bpmn_process_identifier = task_properties['workflow_name'] task_model = cls._create_task(
# bpmn_identifier = task_properties['task_spec'] bpmn_process,
# process_instance,
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier) spiff_task,
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first() bpmn_definition_to_task_definitions_mappings,
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task_model = TaskModel(
guid=task_id, bpmn_process_id=bpmn_process.id
) )
task_model.state = TaskStateNames[state_int] task_model.state = TaskStateNames[state_int]
task_model.properties_json = task_properties task_model.properties_json = task_properties
new_task_models[task_model.guid] = task_model
json_data_dict = TaskService._update_task_data_on_task_model( json_data_dict = TaskService._update_task_data_on_task_model(
task_model, task_data_dict task_model, task_data_dict, "json_data_hash"
) )
new_task_models[task_model.guid] = task_model
if json_data_dict is not None: if json_data_dict is not None:
new_json_data_dicts[json_data_dict["hash"]] = json_data_dict new_json_data_dicts[json_data_dict["hash"]] = json_data_dict
python_env_data_dict = cls._get_python_env_data_dict_from_spiff_task(
spiff_task, serializer
)
python_env_dict = TaskService._update_task_data_on_task_model(
task_model, python_env_data_dict, "python_env_data_hash"
)
if python_env_dict is not None:
new_json_data_dicts[python_env_dict["hash"]] = python_env_dict
return (bpmn_process, new_task_models, new_json_data_dicts) return (bpmn_process, new_task_models, new_json_data_dicts)
@classmethod
def _update_task_data_on_task_model(
cls, task_model: TaskModel, task_data_dict: dict, task_model_data_column: str
) -> Optional[JsonDataDict]:
task_data_json = json.dumps(task_data_dict, sort_keys=True)
task_data_hash: str = sha256(task_data_json.encode("utf8")).hexdigest()
json_data_dict: Optional[JsonDataDict] = None
if getattr(task_model, task_model_data_column) != task_data_hash:
json_data_dict = {"hash": task_data_hash, "data": task_data_dict}
setattr(task_model, task_model_data_column, task_data_hash)
return json_data_dict
@classmethod
def _create_task(
cls,
bpmn_process: BpmnProcessModel,
process_instance: ProcessInstanceModel,
spiff_task: SpiffTask,
bpmn_definition_to_task_definitions_mappings: dict,
) -> TaskModel:
task_definition = bpmn_definition_to_task_definitions_mappings[
spiff_task.workflow.spec.name
][spiff_task.task_spec.name]
task_model = TaskModel(
guid=str(spiff_task.id),
bpmn_process_id=bpmn_process.id,
process_instance_id=process_instance.id,
task_definition_id=task_definition.id,
)
return task_model
@classmethod
def _get_python_env_data_dict_from_spiff_task(
cls, spiff_task: SpiffTask, serializer: BpmnWorkflowSerializer
) -> dict:
user_defined_state = (
spiff_task.workflow.script_engine.environment.user_defined_state()
)
# this helps to convert items like datetime objects to be json serializable
converted_data: dict = serializer.data_converter.convert(user_defined_state)
return converted_data

View File

@ -57,10 +57,14 @@ class TaskModelSavingDelegate(EngineStepDelegate):
self, self,
serializer: BpmnWorkflowSerializer, serializer: BpmnWorkflowSerializer,
process_instance: ProcessInstanceModel, process_instance: ProcessInstanceModel,
bpmn_definition_to_task_definitions_mappings: dict,
secondary_engine_step_delegate: Optional[EngineStepDelegate] = None, secondary_engine_step_delegate: Optional[EngineStepDelegate] = None,
) -> None: ) -> None:
self.secondary_engine_step_delegate = secondary_engine_step_delegate self.secondary_engine_step_delegate = secondary_engine_step_delegate
self.process_instance = process_instance self.process_instance = process_instance
self.bpmn_definition_to_task_definitions_mappings = (
bpmn_definition_to_task_definitions_mappings
)
self.current_task_model: Optional[TaskModel] = None self.current_task_model: Optional[TaskModel] = None
self.task_models: dict[str, TaskModel] = {} self.task_models: dict[str, TaskModel] = {}
@ -74,11 +78,21 @@ class TaskModelSavingDelegate(EngineStepDelegate):
""" """
return self.process_instance.bpmn_process_id is not None return self.process_instance.bpmn_process_id is not None
def _update_json_data_dicts_using_list(
self, json_data_dict_list: list[Optional[JsonDataDict]]
) -> None:
for json_data_dict in json_data_dict_list:
if json_data_dict is not None:
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
def will_complete_task(self, spiff_task: SpiffTask) -> None: def will_complete_task(self, spiff_task: SpiffTask) -> None:
if self.should_update_task_model(): if self.should_update_task_model():
_bpmn_process, task_model, new_task_models, new_json_data_dicts = ( _bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
spiff_task, self.process_instance, self.serializer spiff_task,
self.process_instance,
self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
) )
self.current_task_model = task_model self.current_task_model = task_model
@ -91,11 +105,10 @@ class TaskModelSavingDelegate(EngineStepDelegate):
def did_complete_task(self, spiff_task: SpiffTask) -> None: def did_complete_task(self, spiff_task: SpiffTask) -> None:
if self.current_task_model and self.should_update_task_model(): if self.current_task_model and self.should_update_task_model():
self.current_task_model.end_in_seconds = time.time() self.current_task_model.end_in_seconds = time.time()
json_data_dict = TaskService.update_task_model( json_data_dict_list = TaskService.update_task_model(
self.current_task_model, spiff_task, self.serializer self.current_task_model, spiff_task, self.serializer
) )
if json_data_dict is not None: self._update_json_data_dicts_using_list(json_data_dict_list)
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
self.task_models[self.current_task_model.guid] = self.current_task_model self.task_models[self.current_task_model.guid] = self.current_task_model
if self.secondary_engine_step_delegate: if self.secondary_engine_step_delegate:
self.secondary_engine_step_delegate.did_complete_task(spiff_task) self.secondary_engine_step_delegate.did_complete_task(spiff_task)
@ -119,19 +132,21 @@ class TaskModelSavingDelegate(EngineStepDelegate):
| TaskState.MAYBE | TaskState.MAYBE
| TaskState.LIKELY | TaskState.LIKELY
): ):
_bpmn_process, task_model, new_task_models, new_json_data_dicts = ( bpmn_process, task_model, new_task_models, new_json_data_dicts = (
TaskService.find_or_create_task_model_from_spiff_task( TaskService.find_or_create_task_model_from_spiff_task(
waiting_spiff_task, self.process_instance, self.serializer waiting_spiff_task,
self.process_instance,
self.serializer,
bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
) )
) )
self.task_models.update(new_task_models) self.task_models.update(new_task_models)
self.json_data_dicts.update(new_json_data_dicts) self.json_data_dicts.update(new_json_data_dicts)
json_data_dict = TaskService.update_task_model( json_data_dict_list = TaskService.update_task_model(
task_model, waiting_spiff_task, self.serializer task_model, waiting_spiff_task, self.serializer
) )
self.task_models[task_model.guid] = task_model self.task_models[task_model.guid] = task_model
if json_data_dict is not None: self._update_json_data_dicts_using_list(json_data_dict_list)
self.json_data_dicts[json_data_dict["hash"]] = json_data_dict
class StepDetailLoggingDelegate(EngineStepDelegate): class StepDetailLoggingDelegate(EngineStepDelegate):

View File

@ -99,6 +99,7 @@ class TestErrorHandlingService(BaseTest):
# Both send and receive messages should be generated, matched # Both send and receive messages should be generated, matched
# and considered complete. # and considered complete.
messages = db.session.query(MessageInstanceModel).all() messages = db.session.query(MessageInstanceModel).all()
# import pdb; pdb.set_trace()
assert 2 == len(messages) assert 2 == len(messages)
assert "completed" == messages[0].status assert "completed" == messages[0].status
assert "completed" == messages[1].status assert "completed" == messages[1].status

View File

@ -358,41 +358,62 @@ class TestProcessInstanceProcessor(BaseTest):
processor_final = ProcessInstanceProcessor(process_instance_relookup) processor_final = ProcessInstanceProcessor(process_instance_relookup)
assert process_instance_relookup.status == "complete" assert process_instance_relookup.status == "complete"
# first_data_set = {"set_in_top_level_script": 1} first_data_set = {"set_in_top_level_script": 1}
# second_data_set = {**first_data_set, **{"set_in_top_level_subprocess": 1}} second_data_set = {
# third_data_set = { **first_data_set,
# **second_data_set, **{"set_in_top_level_subprocess": 1, "we_move_on": False},
# **{"set_in_test_process_to_call_script": 1}, }
# } third_data_set = {
# expected_task_data = { **second_data_set,
# "top_level_script": first_data_set, **{"set_in_test_process_to_call_script": 1},
# "manual_task": first_data_set, }
# "top_level_subprocess_script": second_data_set, fourth_data_set = {**third_data_set, **{"a": 1, "we_move_on": True}}
# "top_level_subprocess": second_data_set, expected_task_data = {
# "test_process_to_call_script": third_data_set, "top_level_script": first_data_set,
# "top_level_call_activity": third_data_set, "manual_task": first_data_set,
# "end_event_of_manual_task_model": third_data_set, "top_level_subprocess_script": second_data_set,
# } "top_level_subprocess": second_data_set,
"test_process_to_call_script": third_data_set,
"top_level_call_activity": third_data_set,
"end_event_of_manual_task_model": third_data_set,
"top_level_subprocess_script_second": fourth_data_set,
"test_process_to_call_script_second": fourth_data_set,
}
spiff_tasks_checked_once: list = []
# TODO: also check task data here from the spiff_task directly to ensure we hydrated spiff correctly
def assert_spiff_task_is_in_process(
spiff_task_name: str, bpmn_process_identifier: str
) -> None:
if spiff_task.task_spec.name == spiff_task_name:
expected_python_env_data = expected_task_data[spiff_task.task_spec.name]
if spiff_task.task_spec.name in spiff_tasks_checked_once:
expected_python_env_data = expected_task_data[
f"{spiff_task.task_spec.name}_second"
]
task = TaskModel.query.filter_by(guid=str(spiff_task.id)).first()
assert task.task_definition_id is not None
task_definition = task.task_definition
assert task_definition.bpmn_identifier == spiff_task_name
assert (
task_definition.bpmn_process_definition.bpmn_identifier
== bpmn_process_identifier
)
assert task.python_env_data() == expected_python_env_data
spiff_tasks_checked_once.append(spiff_task.task_spec.name)
all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks() all_spiff_tasks = processor_final.bpmn_process_instance.get_tasks()
assert len(all_spiff_tasks) > 1 assert len(all_spiff_tasks) > 1
for spiff_task in all_spiff_tasks: for spiff_task in all_spiff_tasks:
assert spiff_task.state == TaskState.COMPLETED assert spiff_task.state == TaskState.COMPLETED
# FIXME: Checking task data cannot work with the feature/remove-loop-reset branch assert_spiff_task_is_in_process(
# of SiffWorkflow. This is because it saves script data to the python_env and NOT "test_process_to_call_script", "test_process_to_call"
# to task.data. We may need to either create a new column on TaskModel to put the python_env )
# data or we could just shove it back onto the task data when adding to the database. assert_spiff_task_is_in_process(
# Right now everything works in practice because the python_env data is on the top level workflow "top_level_subprocess_script", "top_level_subprocess"
# and so is always there but is also always the most recent. If we want to replace spiff_step_details )
# with TaskModel then we'll need some way to store python_env on each task. assert_spiff_task_is_in_process("top_level_script", "top_level_process")
# spiff_task_name = spiff_task.task_spec.name
# if spiff_task_name in expected_task_data:
# spiff_task_data = expected_task_data[spiff_task_name]
# failure_message = (
# f"Found unexpected task data on {spiff_task_name}. "
# f"Expected: {spiff_task_data}, Found: {spiff_task.data}"
# )
# assert spiff_task.data == spiff_task_data, failure_message
def test_does_not_recreate_human_tasks_on_multiple_saves( def test_does_not_recreate_human_tasks_on_multiple_saves(
self, self,