added bpmn process definition to bpmn process w/ burnettk

This commit is contained in:
jasquat 2023-03-16 16:59:37 -04:00
parent d75d66c33f
commit d3393fc6d0
5 changed files with 115 additions and 102 deletions

View File

@ -1,8 +1,8 @@
"""empty message
Revision ID: 077a27ef1246
Revision ID: 8ee0f1c23cc7
Revises:
Create Date: 2023-03-15 16:36:23.278887
Create Date: 2023-03-16 16:24:47.364768
"""
from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = '077a27ef1246'
revision = '8ee0f1c23cc7'
down_revision = None
branch_labels = None
depends_on = None
@ -18,19 +18,6 @@ depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('bpmn_process',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=True),
sa.Column('parent_process_id', sa.Integer(), nullable=True),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['parent_process_id'], ['bpmn_process.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_bpmn_process_guid'), 'bpmn_process', ['guid'], unique=True)
op.create_index(op.f('ix_bpmn_process_json_data_hash'), 'bpmn_process', ['json_data_hash'], unique=False)
op.create_table('bpmn_process_definition',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hash', sa.String(length=255), nullable=False),
@ -129,6 +116,21 @@ def upgrade():
sa.UniqueConstraint('service', 'service_id', name='service_key'),
sa.UniqueConstraint('username')
)
op.create_table('bpmn_process',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=True),
sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False),
sa.Column('parent_process_id', sa.Integer(), nullable=True),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['parent_process_id'], ['bpmn_process.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_bpmn_process_guid'), 'bpmn_process', ['guid'], unique=True)
op.create_index(op.f('ix_bpmn_process_json_data_hash'), 'bpmn_process', ['json_data_hash'], unique=False)
op.create_table('bpmn_process_definition_relationship',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_parent_id', sa.Integer(), nullable=False),
@ -149,30 +151,6 @@ def upgrade():
sa.UniqueConstraint('group_id'),
sa.UniqueConstraint('user_id')
)
op.create_table('process_instance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_model_identifier', sa.String(length=255), nullable=False),
sa.Column('process_model_display_name', sa.String(length=255), nullable=False),
sa.Column('process_initiator_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=True),
sa.Column('bpmn_process_id', sa.Integer(), nullable=True),
sa.Column('spiff_serializer_version', sa.String(length=50), nullable=True),
sa.Column('bpmn_json', sa.JSON(), nullable=True),
sa.Column('start_in_seconds', sa.Integer(), nullable=True),
sa.Column('end_in_seconds', sa.Integer(), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_process_instance_process_model_display_name'), 'process_instance', ['process_model_display_name'], unique=False)
op.create_index(op.f('ix_process_instance_process_model_identifier'), 'process_instance', ['process_model_identifier'], unique=False)
op.create_table('process_instance_report',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('identifier', sa.String(length=50), nullable=False),
@ -235,6 +213,41 @@ def upgrade():
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('username', 'group_id', name='user_group_assignment_staged_unique')
)
op.create_table('permission_assignment',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('principal_id', sa.Integer(), nullable=False),
sa.Column('permission_target_id', sa.Integer(), nullable=False),
sa.Column('grant_type', sa.String(length=50), nullable=False),
sa.Column('permission', sa.String(length=50), nullable=False),
sa.ForeignKeyConstraint(['permission_target_id'], ['permission_target.id'], ),
sa.ForeignKeyConstraint(['principal_id'], ['principal.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq')
)
op.create_table('process_instance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_model_identifier', sa.String(length=255), nullable=False),
sa.Column('process_model_display_name', sa.String(length=255), nullable=False),
sa.Column('process_initiator_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=True),
sa.Column('bpmn_process_id', sa.Integer(), nullable=True),
sa.Column('spiff_serializer_version', sa.String(length=50), nullable=True),
sa.Column('bpmn_json', sa.JSON(), nullable=True),
sa.Column('start_in_seconds', sa.Integer(), nullable=True),
sa.Column('end_in_seconds', sa.Integer(), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True),
sa.Column('spiff_step', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_process_instance_process_model_display_name'), 'process_instance', ['process_model_display_name'], unique=False)
op.create_index(op.f('ix_process_instance_process_model_identifier'), 'process_instance', ['process_model_identifier'], unique=False)
op.create_table('message_instance',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=True),
@ -252,17 +265,6 @@ def upgrade():
sa.ForeignKeyConstraint(['user_id'], ['user.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_table('permission_assignment',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('principal_id', sa.Integer(), nullable=False),
sa.Column('permission_target_id', sa.Integer(), nullable=False),
sa.Column('grant_type', sa.String(length=50), nullable=False),
sa.Column('permission', sa.String(length=50), nullable=False),
sa.ForeignKeyConstraint(['permission_target_id'], ['permission_target.id'], ),
sa.ForeignKeyConstraint(['principal_id'], ['principal.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq')
)
op.create_table('process_instance_file_data',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
@ -418,8 +420,11 @@ def downgrade():
op.drop_table('process_instance_metadata')
op.drop_index(op.f('ix_process_instance_file_data_digest'), table_name='process_instance_file_data')
op.drop_table('process_instance_file_data')
op.drop_table('permission_assignment')
op.drop_table('message_instance')
op.drop_index(op.f('ix_process_instance_process_model_identifier'), table_name='process_instance')
op.drop_index(op.f('ix_process_instance_process_model_display_name'), table_name='process_instance')
op.drop_table('process_instance')
op.drop_table('permission_assignment')
op.drop_table('user_group_assignment_waiting')
op.drop_table('user_group_assignment')
op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition')
@ -429,11 +434,11 @@ def downgrade():
op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report')
op.drop_index(op.f('ix_process_instance_report_created_by_id'), table_name='process_instance_report')
op.drop_table('process_instance_report')
op.drop_index(op.f('ix_process_instance_process_model_identifier'), table_name='process_instance')
op.drop_index(op.f('ix_process_instance_process_model_display_name'), table_name='process_instance')
op.drop_table('process_instance')
op.drop_table('principal')
op.drop_table('bpmn_process_definition_relationship')
op.drop_index(op.f('ix_bpmn_process_json_data_hash'), table_name='bpmn_process')
op.drop_index(op.f('ix_bpmn_process_guid'), table_name='bpmn_process')
op.drop_table('bpmn_process')
op.drop_table('user')
op.drop_table('spiff_logging')
op.drop_index(op.f('ix_spec_reference_cache_type'), table_name='spec_reference_cache')
@ -450,7 +455,4 @@ def downgrade():
op.drop_index(op.f('ix_bpmn_process_definition_hash'), table_name='bpmn_process_definition')
op.drop_index(op.f('ix_bpmn_process_definition_bpmn_identifier'), table_name='bpmn_process_definition')
op.drop_table('bpmn_process_definition')
op.drop_index(op.f('ix_bpmn_process_json_data_hash'), table_name='bpmn_process')
op.drop_index(op.f('ix_bpmn_process_guid'), table_name='bpmn_process')
op.drop_table('bpmn_process')
# ### end Alembic commands ###

View File

@ -3,6 +3,7 @@ from __future__ import annotations
from sqlalchemy import ForeignKey
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
@ -18,6 +19,11 @@ class BpmnProcessModel(SpiffworkflowBaseDBModel):
id: int = db.Column(db.Integer, primary_key=True)
guid: str | None = db.Column(db.String(36), nullable=True, unique=True, index=True)
bpmn_process_definition_id: int = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=False # type: ignore
)
bpmn_process_definition = relationship(BpmnProcessDefinitionModel)
parent_process_id: int | None = db.Column(ForeignKey("bpmn_process.id"), nullable=True)
properties_json: dict = db.Column(db.JSON, nullable=False)

View File

@ -449,7 +449,10 @@ class ProcessInstanceProcessor:
# this caches the bpmn_process_definition_identifier and task_identifier back to the bpmn_process_id
# in the database. This is to cut down on database queries while adding new tasks to the database.
# Structure:
# { "bpmn_process_definition_identifier": { "task_identifier": task_definition } }
# { "[[BPMN_PROCESS_DEFINITION_IDENTIFIER]]": {
# "[[TASK_IDENTIFIER]]": [[TASK_DEFINITION]],
# "bpmn_process_definition": [[BPMN_PROCESS_DEFINITION]] }
# }
# To use from a spiff_task:
# [spiff_task.workflow.spec.name][spiff_task.task_spec.name]
self.bpmn_definition_to_task_definitions_mappings: dict = {}
@ -523,13 +526,21 @@ class ProcessInstanceProcessor:
cls,
bpmn_definition_to_task_definitions_mappings: dict,
bpmn_process_definition_identifier: str,
task_definition: TaskDefinitionModel,
task_definition: Optional[TaskDefinitionModel] = None,
bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = None,
) -> None:
if bpmn_process_definition_identifier not in bpmn_definition_to_task_definitions_mappings:
bpmn_definition_to_task_definitions_mappings[bpmn_process_definition_identifier] = {}
bpmn_definition_to_task_definitions_mappings[bpmn_process_definition_identifier][
task_definition.bpmn_identifier
] = task_definition
if task_definition is not None:
bpmn_definition_to_task_definitions_mappings[bpmn_process_definition_identifier][
task_definition.bpmn_identifier
] = task_definition
if bpmn_process_definition is not None:
bpmn_definition_to_task_definitions_mappings[bpmn_process_definition_identifier][
"bpmn_process_definition"
] = bpmn_process_definition
@classmethod
def _get_definition_dict_for_bpmn_process_definition(
@ -537,6 +548,11 @@ class ProcessInstanceProcessor:
bpmn_process_definition: BpmnProcessDefinitionModel,
bpmn_definition_to_task_definitions_mappings: dict,
) -> dict:
cls._update_bpmn_definition_mappings(
bpmn_definition_to_task_definitions_mappings,
bpmn_process_definition.bpmn_identifier,
bpmn_process_definition=bpmn_process_definition,
)
task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id
).all()
@ -549,7 +565,7 @@ class ProcessInstanceProcessor:
cls._update_bpmn_definition_mappings(
bpmn_definition_to_task_definitions_mappings,
bpmn_process_definition.bpmn_identifier,
task_definition,
task_definition=task_definition,
)
return bpmn_process_definition_dict
@ -573,6 +589,11 @@ class ProcessInstanceProcessor:
bpmn_subprocess_definition_bpmn_identifiers = {}
for bpmn_subprocess_definition in bpmn_process_subprocess_definitions:
cls._update_bpmn_definition_mappings(
bpmn_definition_to_task_definitions_mappings,
bpmn_subprocess_definition.bpmn_identifier,
bpmn_process_definition=bpmn_subprocess_definition,
)
bpmn_process_definition_dict: dict = bpmn_subprocess_definition.properties_json
spiff_bpmn_process_dict["subprocess_specs"][
bpmn_subprocess_definition.bpmn_identifier
@ -594,7 +615,7 @@ class ProcessInstanceProcessor:
cls._update_bpmn_definition_mappings(
bpmn_definition_to_task_definitions_mappings,
bpmn_subprocess_definition_bpmn_identifier,
task_definition,
task_definition=task_definition,
)
spiff_bpmn_process_dict["subprocess_specs"][bpmn_subprocess_definition_bpmn_identifier]["task_specs"][
task_definition.bpmn_identifier
@ -988,6 +1009,11 @@ class ProcessInstanceProcessor:
properties_json=process_bpmn_properties,
)
db.session.add(bpmn_process_definition)
self._update_bpmn_definition_mappings(
self.bpmn_definition_to_task_definitions_mappings,
bpmn_process_definition.bpmn_identifier,
bpmn_process_definition=bpmn_process_definition,
)
for task_bpmn_identifier, task_bpmn_properties in task_specs.items():
task_definition = TaskDefinitionModel(
@ -1001,11 +1027,16 @@ class ProcessInstanceProcessor:
self._update_bpmn_definition_mappings(
self.bpmn_definition_to_task_definitions_mappings,
process_bpmn_identifier,
task_definition,
task_definition=task_definition,
)
elif store_bpmn_definition_mappings:
# this should only ever happen when new process instances use a pre-existing bpmn process definitions
# otherwise this should get populated on processor initialization
self._update_bpmn_definition_mappings(
self.bpmn_definition_to_task_definitions_mappings,
process_bpmn_identifier,
bpmn_process_definition=bpmn_process_definition,
)
task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id
).all()
@ -1013,7 +1044,7 @@ class ProcessInstanceProcessor:
self._update_bpmn_definition_mappings(
self.bpmn_definition_to_task_definitions_mappings,
process_bpmn_identifier,
task_definition,
task_definition=task_definition,
)
if bpmn_process_definition_parent is not None:
@ -1044,10 +1075,10 @@ class ProcessInstanceProcessor:
)
self.process_instance_model.bpmn_process_definition = bpmn_process_definition_parent
def _add_bpmn_json_records(self) -> None:
"""Adds serialized_bpmn_definition and process_instance_data records to the db session.
def _add_bpmn_process_defintions(self) -> None:
"""Adds serialized_bpmn_definition records to the db session.
Expects the save method to commit it.
Expects the calling method to commit it.
"""
if self.process_instance_model.bpmn_process_definition_id is not None:
return None
@ -1063,40 +1094,10 @@ class ProcessInstanceProcessor:
else:
process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key]
# if self.process_instance_model.bpmn_process_definition_id is None:
self._add_bpmn_process_definitions(bpmn_spec_dict)
# subprocesses = process_instance_data_dict.pop("subprocesses")
# bpmn_process_parent, new_task_models, new_json_data_dicts = TaskService.add_bpmn_process(
# bpmn_process_dict=process_instance_data_dict,
# process_instance=self.process_instance_model,
# bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
# spiff_workflow=self.bpmn_process_instance,
# serializer=self._serializer,
# )
# for subprocess_task_id, subprocess_properties in subprocesses.items():
# (
# _bpmn_subprocess,
# subprocess_new_task_models,
# subprocess_new_json_data_models,
# ) = TaskService.add_bpmn_process(
# bpmn_process_dict=subprocess_properties,
# process_instance=self.process_instance_model,
# bpmn_process_parent=bpmn_process_parent,
# bpmn_process_guid=subprocess_task_id,
# bpmn_definition_to_task_definitions_mappings=self.bpmn_definition_to_task_definitions_mappings,
# spiff_workflow=self.bpmn_process_instance,
# serializer=self._serializer,
# )
# new_task_models.update(subprocess_new_task_models)
# new_json_data_dicts.update(subprocess_new_json_data_models)
# db.session.bulk_save_objects(new_task_models.values())
#
# TaskService.insert_or_update_json_data_records(new_json_data_dicts)
def save(self) -> None:
"""Saves the current state of this processor to the database."""
# self._add_bpmn_json_records()
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
@ -1557,7 +1558,7 @@ class ProcessInstanceProcessor:
self._script_engine.environment.revise_state_with_task_data(task)
return self.spiff_step_details_mapping(task, start, end)
self._add_bpmn_json_records()
self._add_bpmn_process_defintions()
step_delegate = StepDetailLoggingDelegate(self.increment_spiff_step, spiff_step_details_mapping_builder)
task_model_delegate = TaskModelSavingDelegate(

View File

@ -211,6 +211,11 @@ class TaskService:
elif bpmn_process.parent_process_id is None:
bpmn_process.parent_process_id = bpmn_process_parent.id
bpmn_process_definition = bpmn_definition_to_task_definitions_mappings[spiff_workflow.spec.name][
"bpmn_process_definition"
]
bpmn_process.bpmn_process_definition = bpmn_process_definition
# Since we bulk insert tasks later we need to add the bpmn_process to the session
# to ensure we have an id.
db.session.add(bpmn_process)

View File

@ -294,7 +294,6 @@ class WorkflowExecutionService:
)
try:
# import pdb; pdb.set_trace()
self.bpmn_process_instance.refresh_waiting_tasks()
# TODO: implicit re-entrant locks here `with_dequeued`