Merge pull request #166 from sartography/feature/split_up_bpmn_json

Feature/split up bpmn json
This commit is contained in:
jasquat 2023-03-07 14:11:23 -05:00 committed by GitHub
commit 282806624e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 672 additions and 187 deletions

View File

@ -142,6 +142,7 @@ jobs:
mysql version: "8.0" mysql version: "8.0"
mysql database: "spiffworkflow_backend_unit_testing" mysql database: "spiffworkflow_backend_unit_testing"
mysql root password: password mysql root password: password
collation server: 'utf8mb4_0900_as_cs'
if: matrix.database == 'mysql' if: matrix.database == 'mysql'
- name: Setup Postgres - name: Setup Postgres
@ -192,7 +193,7 @@ jobs:
python-version: "3.11" python-version: "3.11"
- name: Install Poetry - name: Install Poetry
run: | run: |
pipx install poetry pipx install --pip-args=--constraint=spiffworkflow-backend/.github/workflows/constraints.txt poetry
poetry --version poetry --version
- name: Poetry Install - name: Poetry Install
run: poetry install run: poetry install

View File

@ -1,5 +1,5 @@
pip==22.2.2 pip==22.2.2
nox==2022.11.21 nox==2022.11.21
nox-poetry==1.0.2 nox-poetry==1.0.2
poetry==1.2.2 poetry==1.3.2
virtualenv==20.16.5 virtualenv==20.16.5

View File

@ -19,7 +19,12 @@ RUN apt-get update \
# Setup image for installing Python dependencies. # Setup image for installing Python dependencies.
FROM base AS setup FROM base AS setup
RUN pip install poetry # poetry 1.4 seems to cause an issue where it errors with
# This error originates from the build backend, and is likely not a
# problem with poetry but with lazy-object-proxy (1.7.1) not supporting PEP 517 builds.
# You can verify this by running 'pip wheel --use-pep517 "lazy-object-proxy (==1.7.1) ; python_version >= "3.6""'.
# Pinnning to 1.3.2 to attempt to avoid it.
RUN pip install poetry==1.3.2
RUN useradd _gunicorn --no-create-home --user-group RUN useradd _gunicorn --no-create-home --user-group
RUN apt-get update \ RUN apt-get update \

View File

@ -7,6 +7,7 @@ from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
@ -46,6 +47,8 @@ def app() -> Flask:
def with_db_and_bpmn_file_cleanup() -> None: def with_db_and_bpmn_file_cleanup() -> None:
"""Do it cleanly!""" """Do it cleanly!"""
meta = db.metadata meta = db.metadata
db.session.execute(db.update(BpmnProcessModel, values={"parent_process_id": None}))
for table in reversed(meta.sorted_tables): for table in reversed(meta.sorted_tables):
db.session.execute(table.delete()) db.session.execute(table.delete())
db.session.commit() db.session.commit()

View File

@ -1,8 +1,8 @@
"""empty message """empty message
Revision ID: ab91b129b473 Revision ID: 389800c352ee
Revises: Revises:
Create Date: 2023-03-03 14:00:59.134381 Create Date: 2023-03-07 10:40:43.709777
""" """
from alembic import op from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
from sqlalchemy.dialects import mysql from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = 'ab91b129b473' revision = '389800c352ee'
down_revision = None down_revision = None
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -18,6 +18,33 @@ depends_on = None
def upgrade(): def upgrade():
# ### commands auto generated by Alembic - please adjust! ### # ### commands auto generated by Alembic - please adjust! ###
op.create_table('bpmn_process',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=True),
sa.Column('parent_process_id', sa.Integer(), nullable=True),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['parent_process_id'], ['bpmn_process.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_bpmn_process_guid'), 'bpmn_process', ['guid'], unique=True)
op.create_index(op.f('ix_bpmn_process_json_data_hash'), 'bpmn_process', ['json_data_hash'], unique=False)
op.create_table('bpmn_process_definition',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hash', sa.String(length=255), nullable=False),
sa.Column('bpmn_identifier', sa.String(length=255), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('type', sa.String(length=32), nullable=True),
sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_bpmn_process_definition_bpmn_identifier'), 'bpmn_process_definition', ['bpmn_identifier'], unique=False)
op.create_index(op.f('ix_bpmn_process_definition_hash'), 'bpmn_process_definition', ['hash'], unique=True)
op.create_table('correlation_property_cache', op.create_table('correlation_property_cache',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('name', sa.String(length=50), nullable=False), sa.Column('name', sa.String(length=50), nullable=False),
@ -32,6 +59,13 @@ def upgrade():
sa.Column('identifier', sa.String(length=255), nullable=True), sa.Column('identifier', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id') sa.PrimaryKeyConstraint('id')
) )
op.create_table('json_data',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hash', sa.String(length=255), nullable=False),
sa.Column('data', sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_json_data_hash'), 'json_data', ['hash'], unique=True)
op.create_table('message_triggerable_process_model', op.create_table('message_triggerable_process_model',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('message_name', sa.String(length=255), nullable=True), sa.Column('message_name', sa.String(length=255), nullable=True),
@ -47,18 +81,6 @@ def upgrade():
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('uri') sa.UniqueConstraint('uri')
) )
op.create_table('process_instance_data',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('runtime_json', sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_table('serialized_bpmn_definition',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hash', sa.String(length=255), nullable=False),
sa.Column('static_json', sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_serialized_bpmn_definition_hash'), 'serialized_bpmn_definition', ['hash'], unique=True)
op.create_table('spec_reference_cache', op.create_table('spec_reference_cache',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('identifier', sa.String(length=255), nullable=True), sa.Column('identifier', sa.String(length=255), nullable=True),
@ -107,6 +129,15 @@ def upgrade():
sa.UniqueConstraint('service', 'service_id', name='service_key'), sa.UniqueConstraint('service', 'service_id', name='service_key'),
sa.UniqueConstraint('username') sa.UniqueConstraint('username')
) )
op.create_table('bpmn_process_definition_relationship',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_parent_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_child_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['bpmn_process_definition_child_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['bpmn_process_definition_parent_id'], ['bpmn_process_definition.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('bpmn_process_definition_parent_id', 'bpmn_process_definition_child_id', name='bpmn_process_definition_relationship_unique')
)
op.create_table('principal', op.create_table('principal',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=True), sa.Column('user_id', sa.Integer(), nullable=True),
@ -123,8 +154,9 @@ def upgrade():
sa.Column('process_model_identifier', sa.String(length=255), nullable=False), sa.Column('process_model_identifier', sa.String(length=255), nullable=False),
sa.Column('process_model_display_name', sa.String(length=255), nullable=False), sa.Column('process_model_display_name', sa.String(length=255), nullable=False),
sa.Column('process_initiator_id', sa.Integer(), nullable=False), sa.Column('process_initiator_id', sa.Integer(), nullable=False),
sa.Column('serialized_bpmn_definition_id', sa.Integer(), nullable=True), sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=True),
sa.Column('process_instance_data_id', sa.Integer(), nullable=True), sa.Column('bpmn_process_id', sa.Integer(), nullable=True),
sa.Column('spiff_serializer_version', sa.String(length=50), nullable=True),
sa.Column('bpmn_json', sa.JSON(), nullable=True), sa.Column('bpmn_json', sa.JSON(), nullable=True),
sa.Column('start_in_seconds', sa.Integer(), nullable=True), sa.Column('start_in_seconds', sa.Integer(), nullable=True),
sa.Column('end_in_seconds', sa.Integer(), nullable=True), sa.Column('end_in_seconds', sa.Integer(), nullable=True),
@ -136,9 +168,9 @@ def upgrade():
sa.Column('spiff_step', sa.Integer(), nullable=True), sa.Column('spiff_step', sa.Integer(), nullable=True),
sa.Column('locked_by', sa.String(length=80), nullable=True), sa.Column('locked_by', sa.String(length=80), nullable=True),
sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True), sa.Column('locked_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ), sa.ForeignKeyConstraint(['process_initiator_id'], ['user.id'], ),
sa.ForeignKeyConstraint(['process_instance_data_id'], ['process_instance_data.id'], ),
sa.ForeignKeyConstraint(['serialized_bpmn_definition_id'], ['serialized_bpmn_definition.id'], ),
sa.PrimaryKeyConstraint('id') sa.PrimaryKeyConstraint('id')
) )
op.create_index(op.f('ix_process_instance_process_model_display_name'), 'process_instance', ['process_model_display_name'], unique=False) op.create_index(op.f('ix_process_instance_process_model_display_name'), 'process_instance', ['process_model_display_name'], unique=False)
@ -175,6 +207,33 @@ def upgrade():
sa.PrimaryKeyConstraint('id'), sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('key') sa.UniqueConstraint('key')
) )
op.create_table('task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=False),
sa.Column('bpmn_process_id', sa.Integer(), nullable=False),
sa.Column('state', sa.String(length=10), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('start_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.Column('end_in_seconds', sa.DECIMAL(precision=17, scale=6), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True)
op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False)
op.create_table('task_definition',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False),
sa.Column('bpmn_identifier', sa.String(length=255), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('typename', sa.String(length=255), nullable=False),
sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True),
sa.Column('created_at_in_seconds', sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('bpmn_process_definition_id', 'bpmn_identifier', name='task_definition_unique')
)
op.create_index(op.f('ix_task_definition_bpmn_identifier'), 'task_definition', ['bpmn_identifier'], unique=False)
op.create_table('user_group_assignment', op.create_table('user_group_assignment',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),
sa.Column('user_id', sa.Integer(), nullable=False), sa.Column('user_id', sa.Integer(), nullable=False),
@ -331,6 +390,11 @@ def downgrade():
op.drop_table('human_task') op.drop_table('human_task')
op.drop_table('user_group_assignment_waiting') op.drop_table('user_group_assignment_waiting')
op.drop_table('user_group_assignment') op.drop_table('user_group_assignment')
op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition')
op.drop_table('task_definition')
op.drop_index(op.f('ix_task_json_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_guid'), table_name='task')
op.drop_table('task')
op.drop_table('secret') op.drop_table('secret')
op.drop_table('refresh_token') op.drop_table('refresh_token')
op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report') op.drop_index(op.f('ix_process_instance_report_identifier'), table_name='process_instance_report')
@ -340,18 +404,24 @@ def downgrade():
op.drop_index(op.f('ix_process_instance_process_model_display_name'), table_name='process_instance') op.drop_index(op.f('ix_process_instance_process_model_display_name'), table_name='process_instance')
op.drop_table('process_instance') op.drop_table('process_instance')
op.drop_table('principal') op.drop_table('principal')
op.drop_table('bpmn_process_definition_relationship')
op.drop_table('user') op.drop_table('user')
op.drop_table('spiff_logging') op.drop_table('spiff_logging')
op.drop_index(op.f('ix_spec_reference_cache_type'), table_name='spec_reference_cache') op.drop_index(op.f('ix_spec_reference_cache_type'), table_name='spec_reference_cache')
op.drop_index(op.f('ix_spec_reference_cache_identifier'), table_name='spec_reference_cache') op.drop_index(op.f('ix_spec_reference_cache_identifier'), table_name='spec_reference_cache')
op.drop_index(op.f('ix_spec_reference_cache_display_name'), table_name='spec_reference_cache') op.drop_index(op.f('ix_spec_reference_cache_display_name'), table_name='spec_reference_cache')
op.drop_table('spec_reference_cache') op.drop_table('spec_reference_cache')
op.drop_index(op.f('ix_serialized_bpmn_definition_hash'), table_name='serialized_bpmn_definition')
op.drop_table('serialized_bpmn_definition')
op.drop_table('process_instance_data')
op.drop_table('permission_target') op.drop_table('permission_target')
op.drop_index(op.f('ix_message_triggerable_process_model_process_model_identifier'), table_name='message_triggerable_process_model') op.drop_index(op.f('ix_message_triggerable_process_model_process_model_identifier'), table_name='message_triggerable_process_model')
op.drop_table('message_triggerable_process_model') op.drop_table('message_triggerable_process_model')
op.drop_index(op.f('ix_json_data_hash'), table_name='json_data')
op.drop_table('json_data')
op.drop_table('group') op.drop_table('group')
op.drop_table('correlation_property_cache') op.drop_table('correlation_property_cache')
op.drop_index(op.f('ix_bpmn_process_definition_hash'), table_name='bpmn_process_definition')
op.drop_index(op.f('ix_bpmn_process_definition_bpmn_identifier'), table_name='bpmn_process_definition')
op.drop_table('bpmn_process_definition')
op.drop_index(op.f('ix_bpmn_process_json_data_hash'), table_name='bpmn_process')
op.drop_index(op.f('ix_bpmn_process_guid'), table_name='bpmn_process')
op.drop_table('bpmn_process')
# ### end Alembic commands ### # ### end Alembic commands ###

View File

@ -50,15 +50,21 @@ from spiffworkflow_backend.models.group import GroupModel # noqa: F401
from spiffworkflow_backend.models.process_instance_metadata import ( from spiffworkflow_backend.models.process_instance_metadata import (
ProcessInstanceMetadataModel, ProcessInstanceMetadataModel,
) # noqa: F401 ) # noqa: F401
from spiffworkflow_backend.models.serialized_bpmn_definition import (
SerializedBpmnDefinitionModel,
) # noqa: F401
from spiffworkflow_backend.models.process_instance_data import (
ProcessInstanceDataModel,
) # noqa: F401
from spiffworkflow_backend.models.process_instance_file_data import ( from spiffworkflow_backend.models.process_instance_file_data import (
ProcessInstanceFileDataModel, ProcessInstanceFileDataModel,
) # noqa: F401 ) # noqa: F401
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel # noqa: F401
from spiffworkflow_backend.models.bpmn_process_definition import (
BpmnProcessDefinitionModel,
) # noqa: F401
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import (
TaskDefinitionModel,
) # noqa: F401
from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401
from spiffworkflow_backend.models.bpmn_process_definition_relationship import (
BpmnProcessDefinitionRelationshipModel,
) # noqa: F401
add_listeners() add_listeners()

View File

@ -0,0 +1,32 @@
from __future__ import annotations
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# properties_json attributes:
# "last_task", # guid generated by spiff
# "root", # guid generated by spiff
# "success", # boolean
# "bpmn_messages", # if top-level process
# "correlations", # if top-level process
class BpmnProcessModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process"
id: int = db.Column(db.Integer, primary_key=True)
guid: str | None = db.Column(db.String(36), nullable=True, unique=True, index=True)
parent_process_id: int | None = db.Column(
ForeignKey("bpmn_process.id"), nullable=True
)
properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
# subprocess or top_level_process
# process_type: str = db.Column(db.String(30), nullable=False)
# FIXME: find out how to set this but it'd be cool
start_in_seconds: float = db.Column(db.DECIMAL(17, 6))
end_in_seconds: float | None = db.Column(db.DECIMAL(17, 6))

View File

@ -0,0 +1,37 @@
from __future__ import annotations
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# contents of top-level attributes from spiff:
# "subprocess_specs",
# "spec",
#
# each subprocess will have its own row in this table.
# there is a join table to link them together: bpmn_process_definition_relationship
class BpmnProcessDefinitionModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process_definition"
id: int = db.Column(db.Integer, primary_key=True)
# this is a sha256 hash of spec and serializer_version
# note that a call activity is its own row in this table, with its own hash,
# and therefore it only gets stored once per version, and can be reused
# by multiple calling processes.
hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True)
bpmn_identifier: str = db.Column(db.String(255), nullable=False, index=True)
properties_json: dict = db.Column(db.JSON, nullable=False)
# process or subprocess
# FIXME: will probably ignore for now since we do not strictly need it
# make this nullable false and index it once we actually start using it
type: str = db.Column(db.String(32), nullable=True)
# TODO: remove these from process_instance
bpmn_version_control_type: str = db.Column(db.String(50))
bpmn_version_control_identifier: str = db.Column(db.String(255))
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)

View File

@ -0,0 +1,29 @@
from __future__ import annotations
from sqlalchemy import ForeignKey
from sqlalchemy import UniqueConstraint
from spiffworkflow_backend.models.bpmn_process_definition import (
BpmnProcessDefinitionModel,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
class BpmnProcessDefinitionRelationshipModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process_definition_relationship"
__table_args__ = (
UniqueConstraint(
"bpmn_process_definition_parent_id",
"bpmn_process_definition_child_id",
name="bpmn_process_definition_relationship_unique",
),
)
id: int = db.Column(db.Integer, primary_key=True)
bpmn_process_definition_parent_id: int = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=False # type: ignore
)
bpmn_process_definition_child_id: int = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=False # type: ignore
)

View File

@ -1,30 +1,10 @@
"""Process_instance."""
from __future__ import annotations from __future__ import annotations
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# top level serialized keys # delta algorithm <- just to save it for when we want to try to implement it:
#
# static
# "subprocess_specs",
# "spec",
# "serializer_version",
#
# runtime
# "bpmn_messages",
# "correlations",
# "data",
# "subprocesses",
# "tasks"
# "last_task", # guid generated by spiff
# "root", # guid generated by spiff
# "success", # boolean
#
# new columns on process_instance
#
# delta algorithm:
# a = {"hey": { "hey2": 2, "hey3": 3, "hey6": 7 }, "hey30": 3, "hey40": 4} # a = {"hey": { "hey2": 2, "hey3": 3, "hey6": 7 }, "hey30": 3, "hey40": 4}
# b = {"hey": { "hey2": 4, "hey5": 3 }, "hey20": 2, "hey30": 3} # b = {"hey": { "hey2": 4, "hey5": 3 }, "hey20": 2, "hey30": 3}
# a_keys = set(a.keys()) # a_keys = set(a.keys())
@ -40,8 +20,10 @@ from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# for added_key in added_keys: # for added_key in added_keys:
# added[added_key] = b[added_key] # added[added_key] = b[added_key]
# final_tuple = [added, removed, changed] # final_tuple = [added, removed, changed]
class SerializedBpmnDefinitionModel(SpiffworkflowBaseDBModel): class JsonDataModel(SpiffworkflowBaseDBModel):
__tablename__ = "serialized_bpmn_definition" __tablename__ = "json_data"
id: int = db.Column(db.Integer, primary_key=True) id: int = db.Column(db.Integer, primary_key=True)
# this is a sha256 hash of spec and serializer_version
hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True) hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True)
static_json: str = db.Column(db.JSON, nullable=False) data: dict = db.Column(db.JSON, nullable=False)

View File

@ -15,12 +15,12 @@ from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates from sqlalchemy.orm import validates
from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum from spiffworkflow_backend.helpers.spiff_enum import SpiffEnum
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import (
BpmnProcessDefinitionModel,
)
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
from spiffworkflow_backend.models.process_instance_data import ProcessInstanceDataModel
from spiffworkflow_backend.models.serialized_bpmn_definition import (
SerializedBpmnDefinitionModel,
) # noqa: F401
from spiffworkflow_backend.models.task import Task from spiffworkflow_backend.models.task import Task
from spiffworkflow_backend.models.task import TaskSchema from spiffworkflow_backend.models.task import TaskSchema
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
@ -64,15 +64,16 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
process_initiator = relationship("UserModel") process_initiator = relationship("UserModel")
serialized_bpmn_definition_id: int | None = db.Column( bpmn_process_definition_id: int | None = db.Column(
ForeignKey(SerializedBpmnDefinitionModel.id), nullable=True # type: ignore ForeignKey(BpmnProcessDefinitionModel.id), nullable=True # type: ignore
) )
serialized_bpmn_definition = relationship("SerializedBpmnDefinitionModel") bpmn_process_definition = relationship(BpmnProcessDefinitionModel)
bpmn_process_id: int | None = db.Column(
ForeignKey(BpmnProcessModel.id), nullable=True # type: ignore
)
bpmn_process = relationship(BpmnProcessModel)
process_instance_data_id: int | None = db.Column( spiff_serializer_version = db.Column(db.String(50), nullable=True)
ForeignKey(ProcessInstanceDataModel.id), nullable=True # type: ignore
)
process_instance_data = relationship("ProcessInstanceDataModel", cascade="delete")
active_human_tasks = relationship( active_human_tasks = relationship(
"HumanTaskModel", "HumanTaskModel",

View File

@ -1,22 +0,0 @@
"""Process_instance."""
from __future__ import annotations
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# the last three here should maybe become columns on process instance someday
# runtime_json
# "bpmn_messages",
# "correlations",
# "data",
# "subprocesses",
# "tasks",
# "last_task", # guid generated by spiff
# "root", # guid generated by spiff
# "success", # boolean
class ProcessInstanceDataModel(SpiffworkflowBaseDBModel):
__tablename__ = "process_instance_data"
id: int = db.Column(db.Integer, primary_key=True)
# this is not deferred because there is no reason to query this model if you do not want the runtime_json
runtime_json: str = db.Column(db.JSON, nullable=False)

View File

@ -1,5 +1,6 @@
"""Task.""" """Task."""
import enum import enum
from dataclasses import dataclass
from typing import Any from typing import Any
from typing import Optional from typing import Optional
from typing import Union from typing import Union
@ -8,6 +9,11 @@ import marshmallow
from marshmallow import Schema from marshmallow import Schema
from marshmallow_enum import EnumField # type: ignore from marshmallow_enum import EnumField # type: ignore
from SpiffWorkflow.task import TaskStateNames # type: ignore from SpiffWorkflow.task import TaskStateNames # type: ignore
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
class MultiInstanceType(enum.Enum): class MultiInstanceType(enum.Enum):
@ -19,6 +25,39 @@ class MultiInstanceType(enum.Enum):
sequential = "sequential" sequential = "sequential"
# properties_json attributes:
# "id": "a56e1403-2838-4f03-a31f-f99afe16f38d",
# "parent": null,
# "children": [
# "af6ba340-71e7-46d7-b2d4-e3db1751785d"
# ],
# "last_state_change": 1677775475.18116,
# "state": 32,
# "task_spec": "Root",
# "triggered": false,
# "workflow_name": "Process_category_number_one_call_activity_call_activity_test_bd2e724",
# "internal_data": {},
@dataclass
class TaskModel(SpiffworkflowBaseDBModel):
__tablename__ = "task"
id: int = db.Column(db.Integer, primary_key=True)
guid: str = db.Column(db.String(36), nullable=False, unique=True, index=True)
bpmn_process_id: int = db.Column(
ForeignKey(BpmnProcessModel.id), nullable=False # type: ignore
)
# find this by looking up the "workflow_name" and "task_spec" from the properties_json
# task_definition_id: int = db.Column(
# ForeignKey(TaskDefinitionModel.id), nullable=False # type: ignore
# )
state: str = db.Column(db.String(10), nullable=False)
properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)
start_in_seconds: float = db.Column(db.DECIMAL(17, 6))
end_in_seconds: float | None = db.Column(db.DECIMAL(17, 6))
class Task: class Task:
"""Task.""" """Task."""

View File

@ -0,0 +1,35 @@
from __future__ import annotations
from sqlalchemy import ForeignKey
from sqlalchemy import UniqueConstraint
from sqlalchemy.orm import relationship
from spiffworkflow_backend.models.bpmn_process_definition import (
BpmnProcessDefinitionModel,
)
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
class TaskDefinitionModel(SpiffworkflowBaseDBModel):
__tablename__ = "task_definition"
__table_args__ = (
UniqueConstraint(
"bpmn_process_definition_id",
"bpmn_identifier",
name="task_definition_unique",
),
)
id: int = db.Column(db.Integer, primary_key=True)
bpmn_process_definition_id: int = db.Column(
ForeignKey(BpmnProcessDefinitionModel.id), nullable=False # type: ignore
)
bpmn_process_definition = relationship(BpmnProcessDefinitionModel)
bpmn_identifier: str = db.Column(db.String(255), nullable=False, index=True)
properties_json: dict = db.Column(db.JSON, nullable=False)
typename: str = db.Column(db.String(255), nullable=False)
updated_at_in_seconds: int = db.Column(db.Integer)
created_at_in_seconds: int = db.Column(db.Integer)

View File

@ -549,13 +549,11 @@ def process_instance_task_list(
step_details = step_detail_query.all() step_details = step_detail_query.all()
process_instance_data = process_instance.process_instance_data processor = ProcessInstanceProcessor(process_instance)
process_instance_data_json = ( full_bpmn_process_dict = processor.full_bpmn_process_dict
"{}" if process_instance_data is None else process_instance_data.runtime_json
) tasks = full_bpmn_process_dict["tasks"]
process_instance_data_dict = json.loads(process_instance_data_json) subprocesses = full_bpmn_process_dict["subprocesses"]
tasks = process_instance_data_dict["tasks"]
subprocesses = process_instance_data_dict["subprocesses"]
steps_by_id = {step_detail.task_id: step_detail for step_detail in step_details} steps_by_id = {step_detail.task_id: step_detail for step_detail in step_details}
@ -588,18 +586,19 @@ def process_instance_task_list(
spiff_task_id, TaskState.FUTURE spiff_task_id, TaskState.FUTURE
) )
process_instance_data.runtime_json = json.dumps(process_instance_data_dict) bpmn_process_instance = ProcessInstanceProcessor._serializer.workflow_from_dict(
full_bpmn_process_dict
)
processor = ProcessInstanceProcessor(process_instance)
spiff_task = processor.__class__.get_task_by_bpmn_identifier( spiff_task = processor.__class__.get_task_by_bpmn_identifier(
step_details[-1].bpmn_task_identifier, processor.bpmn_process_instance step_details[-1].bpmn_task_identifier, bpmn_process_instance
) )
if spiff_task is not None and spiff_task.state != TaskState.READY: if spiff_task is not None and spiff_task.state != TaskState.READY:
spiff_task.complete() spiff_task.complete()
spiff_tasks = None spiff_tasks = None
if all_tasks: if all_tasks:
spiff_tasks = processor.bpmn_process_instance.get_tasks(TaskState.ANY_MASK) spiff_tasks = bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
else: else:
spiff_tasks = processor.get_all_user_tasks() spiff_tasks = processor.get_all_user_tasks()

View File

@ -51,23 +51,31 @@ from SpiffWorkflow.serializer.exceptions import MissingSpecError # type: ignore
from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore from SpiffWorkflow.spiff.serializer.config import SPIFF_SPEC_CONFIG # type: ignore
from SpiffWorkflow.task import Task as SpiffTask # type: ignore from SpiffWorkflow.task import Task as SpiffTask # type: ignore
from SpiffWorkflow.task import TaskState from SpiffWorkflow.task import TaskState
from SpiffWorkflow.task import TaskStateNames
from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import text from sqlalchemy import text
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel
from spiffworkflow_backend.models.bpmn_process_definition import (
BpmnProcessDefinitionModel,
)
from spiffworkflow_backend.models.bpmn_process_definition_relationship import (
BpmnProcessDefinitionRelationshipModel,
) # noqa: F401
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.file import File from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.group import GroupModel
from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.human_task import HumanTaskModel
from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel
from spiffworkflow_backend.models.json_data import JsonDataModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance_correlation import ( from spiffworkflow_backend.models.message_instance_correlation import (
MessageInstanceCorrelationRuleModel, MessageInstanceCorrelationRuleModel,
) )
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus
from spiffworkflow_backend.models.process_instance_data import ProcessInstanceDataModel
from spiffworkflow_backend.models.process_instance_metadata import ( from spiffworkflow_backend.models.process_instance_metadata import (
ProcessInstanceMetadataModel, ProcessInstanceMetadataModel,
) )
@ -75,11 +83,10 @@ from spiffworkflow_backend.models.process_model import ProcessModelInfo
from spiffworkflow_backend.models.script_attributes_context import ( from spiffworkflow_backend.models.script_attributes_context import (
ScriptAttributesContext, ScriptAttributesContext,
) )
from spiffworkflow_backend.models.serialized_bpmn_definition import (
SerializedBpmnDefinitionModel,
) # noqa: F401
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.script import Script from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.custom_parser import MyCustomParser from spiffworkflow_backend.services.custom_parser import MyCustomParser
@ -437,8 +444,9 @@ class ProcessInstanceProcessor:
self.process_instance_model = process_instance_model self.process_instance_model = process_instance_model
self.process_model_service = ProcessModelService() self.process_model_service = ProcessModelService()
bpmn_process_spec = None bpmn_process_spec = None
self.full_bpmn_process_dict = {}
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None subprocesses: Optional[IdToBpmnProcessSpecMapping] = None
if process_instance_model.serialized_bpmn_definition_id is None: if process_instance_model.bpmn_process_definition_id is None:
( (
bpmn_process_spec, bpmn_process_spec,
subprocesses, subprocesses,
@ -452,11 +460,13 @@ class ProcessInstanceProcessor:
) )
try: try:
self.bpmn_process_instance = self.__get_bpmn_process_instance( (self.bpmn_process_instance, self.full_bpmn_process_dict) = (
process_instance_model, self.__get_bpmn_process_instance(
bpmn_process_spec, process_instance_model,
validate_only, bpmn_process_spec,
subprocesses=subprocesses, validate_only,
subprocesses=subprocesses,
)
) )
self.set_script_engine(self.bpmn_process_instance) self.set_script_engine(self.bpmn_process_instance)
@ -516,14 +526,123 @@ class ProcessInstanceProcessor:
) )
@classmethod @classmethod
def _get_full_bpmn_json(cls, process_instance_model: ProcessInstanceModel) -> dict: def _get_definition_dict_for_bpmn_process_definition(
if process_instance_model.serialized_bpmn_definition_id is None: cls, bpmn_process_definition: BpmnProcessDefinitionModel
) -> dict:
task_definitions = TaskDefinitionModel.query.filter_by(
bpmn_process_definition_id=bpmn_process_definition.id
).all()
bpmn_process_definition_dict: dict = bpmn_process_definition.properties_json
bpmn_process_definition_dict["task_specs"] = {}
for task_definition in task_definitions:
bpmn_process_definition_dict["task_specs"][
task_definition.bpmn_identifier
] = task_definition.properties_json
return bpmn_process_definition_dict
@classmethod
def _set_definition_dict_for_bpmn_subprocess_definitions(
cls,
bpmn_process_definition: BpmnProcessDefinitionModel,
spiff_bpmn_process_dict: dict,
) -> None:
# find all child subprocesses of a process
bpmn_process_subprocess_definitions = (
BpmnProcessDefinitionModel.query.join(
BpmnProcessDefinitionRelationshipModel,
BpmnProcessDefinitionModel.id
== BpmnProcessDefinitionRelationshipModel.bpmn_process_definition_child_id,
)
.filter_by(bpmn_process_definition_parent_id=bpmn_process_definition.id)
.all()
)
bpmn_subprocess_definition_bpmn_identifiers = {}
for bpmn_subprocess_definition in bpmn_process_subprocess_definitions:
bpmn_process_definition_dict: dict = (
bpmn_subprocess_definition.properties_json
)
spiff_bpmn_process_dict["subprocess_specs"][
bpmn_subprocess_definition.bpmn_identifier
] = bpmn_process_definition_dict
spiff_bpmn_process_dict["subprocess_specs"][
bpmn_subprocess_definition.bpmn_identifier
]["task_specs"] = {}
bpmn_subprocess_definition_bpmn_identifiers[
bpmn_subprocess_definition.id
] = bpmn_subprocess_definition.bpmn_identifier
task_definitions = TaskDefinitionModel.query.filter(
TaskDefinitionModel.bpmn_process_definition_id.in_( # type: ignore
bpmn_subprocess_definition_bpmn_identifiers.keys()
)
).all()
for task_definition in task_definitions:
bpmn_subprocess_definition_bpmn_identifier = (
bpmn_subprocess_definition_bpmn_identifiers[
task_definition.bpmn_process_definition_id
]
)
spiff_bpmn_process_dict["subprocess_specs"][
bpmn_subprocess_definition_bpmn_identifier
]["task_specs"][
task_definition.bpmn_identifier
] = task_definition.properties_json
@classmethod
def _get_bpmn_process_dict(cls, bpmn_process: BpmnProcessModel) -> dict:
json_data = JsonDataModel.query.filter_by(
hash=bpmn_process.json_data_hash
).first()
bpmn_process_dict = {"data": json_data.data, "tasks": {}}
bpmn_process_dict.update(bpmn_process.properties_json)
tasks = TaskModel.query.filter_by(bpmn_process_id=bpmn_process.id).all()
for task in tasks:
json_data = JsonDataModel.query.filter_by(hash=task.json_data_hash).first()
bpmn_process_dict["tasks"][task.guid] = task.properties_json
bpmn_process_dict["tasks"][task.guid]["data"] = json_data.data
return bpmn_process_dict
@classmethod
def _get_full_bpmn_process_dict(
cls, process_instance_model: ProcessInstanceModel
) -> dict:
if process_instance_model.bpmn_process_definition_id is None:
return {} return {}
serialized_bpmn_definition = process_instance_model.serialized_bpmn_definition
process_instance_data = process_instance_model.process_instance_data spiff_bpmn_process_dict: dict = {
loaded_json: dict = json.loads(serialized_bpmn_definition.static_json or "{}") "serializer_version": process_instance_model.spiff_serializer_version,
loaded_json.update(json.loads(process_instance_data.runtime_json)) "spec": {},
return loaded_json "subprocess_specs": {},
"subprocesses": {},
}
bpmn_process_definition = process_instance_model.bpmn_process_definition
if bpmn_process_definition is not None:
spiff_bpmn_process_dict["spec"] = (
cls._get_definition_dict_for_bpmn_process_definition(
bpmn_process_definition
)
)
cls._set_definition_dict_for_bpmn_subprocess_definitions(
bpmn_process_definition, spiff_bpmn_process_dict
)
bpmn_process = process_instance_model.bpmn_process
if bpmn_process is not None:
bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_process)
spiff_bpmn_process_dict.update(bpmn_process_dict)
bpmn_subprocesses = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process.id
).all()
for bpmn_subprocess in bpmn_subprocesses:
bpmn_process_dict = cls._get_bpmn_process_dict(bpmn_subprocess)
spiff_bpmn_process_dict["subprocesses"][
bpmn_subprocess.guid
] = bpmn_process_dict
return spiff_bpmn_process_dict
def current_user(self) -> Any: def current_user(self) -> Any:
"""Current_user.""" """Current_user."""
@ -559,20 +678,22 @@ class ProcessInstanceProcessor:
validate_only: bool = False, validate_only: bool = False,
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None, subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow: ) -> BpmnWorkflow:
"""__get_bpmn_process_instance.""" full_bpmn_process_dict = {}
if process_instance_model.serialized_bpmn_definition_id is not None: if process_instance_model.bpmn_process_definition_id is not None:
# turn off logging to avoid duplicated spiff logs # turn off logging to avoid duplicated spiff logs
spiff_logger = logging.getLogger("spiff") spiff_logger = logging.getLogger("spiff")
original_spiff_logger_log_level = spiff_logger.level original_spiff_logger_log_level = spiff_logger.level
spiff_logger.setLevel(logging.WARNING) spiff_logger.setLevel(logging.WARNING)
try: try:
full_bpmn_json = ProcessInstanceProcessor._get_full_bpmn_json( full_bpmn_process_dict = (
process_instance_model ProcessInstanceProcessor._get_full_bpmn_process_dict(
process_instance_model
)
) )
bpmn_process_instance = ( bpmn_process_instance = (
ProcessInstanceProcessor._serializer.deserialize_json( ProcessInstanceProcessor._serializer.workflow_from_dict(
json.dumps(full_bpmn_json) full_bpmn_process_dict
) )
) )
except Exception as err: except Exception as err:
@ -590,7 +711,7 @@ class ProcessInstanceProcessor:
bpmn_process_instance.data[ bpmn_process_instance.data[
ProcessInstanceProcessor.VALIDATION_PROCESS_KEY ProcessInstanceProcessor.VALIDATION_PROCESS_KEY
] = validate_only ] = validate_only
return bpmn_process_instance return (bpmn_process_instance, full_bpmn_process_dict)
def slam_in_data(self, data: dict) -> None: def slam_in_data(self, data: dict) -> None:
"""Slam_in_data.""" """Slam_in_data."""
@ -730,9 +851,7 @@ class ProcessInstanceProcessor:
Rerturns: {process_name: [task_1, task_2, ...], ...} Rerturns: {process_name: [task_1, task_2, ...], ...}
""" """
bpmn_definition_dict = json.loads( bpmn_definition_dict = self.full_bpmn_process_dict
self.process_instance_model.serialized_bpmn_definition.static_json or "{}"
)
processes: dict[str, list[str]] = {bpmn_definition_dict["spec"]["name"]: []} processes: dict[str, list[str]] = {bpmn_definition_dict["spec"]["name"]: []}
for task_name, _task_spec in bpmn_definition_dict["spec"]["task_specs"].items(): for task_name, _task_spec in bpmn_definition_dict["spec"]["task_specs"].items():
processes[bpmn_definition_dict["spec"]["name"]].append(task_name) processes[bpmn_definition_dict["spec"]["name"]].append(task_name)
@ -780,9 +899,7 @@ class ProcessInstanceProcessor:
NOTE: this may not fully work for tasks that are NOT call activities since their task_name may not be unique NOTE: this may not fully work for tasks that are NOT call activities since their task_name may not be unique
but in our current use case we only care about the call activities here. but in our current use case we only care about the call activities here.
""" """
bpmn_definition_dict = json.loads( bpmn_definition_dict = self.full_bpmn_process_dict
self.process_instance_model.serialized_bpmn_definition.static_json or "{}"
)
spiff_task_json = bpmn_definition_dict["spec"]["task_specs"] or {} spiff_task_json = bpmn_definition_dict["spec"]["task_specs"] or {}
if "subprocess_specs" in bpmn_definition_dict: if "subprocess_specs" in bpmn_definition_dict:
for _subprocess_name, subprocess_details in bpmn_definition_dict[ for _subprocess_name, subprocess_details in bpmn_definition_dict[
@ -807,9 +924,7 @@ class ProcessInstanceProcessor:
Also note that subprocess_task_id might in fact be a call activity, because spiff treats Also note that subprocess_task_id might in fact be a call activity, because spiff treats
call activities like subprocesses in terms of the serialization. call activities like subprocesses in terms of the serialization.
""" """
process_instance_data_dict = json.loads( process_instance_data_dict = self.full_bpmn_process_dict
self.process_instance_model.process_instance_data.runtime_json or "{}"
)
spiff_task_json = self.get_all_task_specs() spiff_task_json = self.get_all_task_specs()
subprocesses_by_child_task_ids = {} subprocesses_by_child_task_ids = {}
@ -854,15 +969,151 @@ class ProcessInstanceProcessor:
) )
return subprocesses_by_child_task_ids return subprocesses_by_child_task_ids
def _store_bpmn_process_definition(
self,
process_bpmn_properties: dict,
bpmn_process_definition_parent: Optional[BpmnProcessDefinitionModel] = None,
) -> BpmnProcessDefinitionModel:
process_bpmn_identifier = process_bpmn_properties["name"]
new_hash_digest = sha256(
json.dumps(process_bpmn_properties, sort_keys=True).encode("utf8")
).hexdigest()
bpmn_process_definition: Optional[BpmnProcessDefinitionModel] = (
BpmnProcessDefinitionModel.query.filter_by(hash=new_hash_digest).first()
)
if bpmn_process_definition is None:
task_specs = process_bpmn_properties.pop("task_specs")
bpmn_process_definition = BpmnProcessDefinitionModel(
hash=new_hash_digest,
bpmn_identifier=process_bpmn_identifier,
properties_json=process_bpmn_properties,
)
db.session.add(bpmn_process_definition)
for task_bpmn_identifier, task_bpmn_properties in task_specs.items():
task_definition = TaskDefinitionModel(
bpmn_process_definition=bpmn_process_definition,
bpmn_identifier=task_bpmn_identifier,
properties_json=task_bpmn_properties,
typename=task_bpmn_properties["typename"],
)
db.session.add(task_definition)
if bpmn_process_definition_parent is not None:
bpmn_process_definition_relationship = (
BpmnProcessDefinitionRelationshipModel.query.filter_by(
bpmn_process_definition_parent_id=bpmn_process_definition_parent.id,
bpmn_process_definition_child_id=bpmn_process_definition.id,
).first()
)
if bpmn_process_definition_relationship is None:
bpmn_process_definition_relationship = BpmnProcessDefinitionRelationshipModel(
bpmn_process_definition_parent_id=bpmn_process_definition_parent.id,
bpmn_process_definition_child_id=bpmn_process_definition.id,
)
db.session.add(bpmn_process_definition_relationship)
return bpmn_process_definition
def _add_bpmn_process_definitions(self, bpmn_spec_dict: dict) -> None:
bpmn_process_definition_parent = self._store_bpmn_process_definition(
bpmn_spec_dict["spec"]
)
for process_bpmn_properties in bpmn_spec_dict["subprocess_specs"].values():
self._store_bpmn_process_definition(
process_bpmn_properties, bpmn_process_definition_parent
)
self.process_instance_model.bpmn_process_definition = (
bpmn_process_definition_parent
)
def _add_bpmn_process(
self,
bpmn_process_dict: dict,
bpmn_process_parent: Optional[BpmnProcessModel] = None,
bpmn_process_guid: Optional[str] = None,
) -> BpmnProcessModel:
tasks = bpmn_process_dict.pop("tasks")
bpmn_process_data = bpmn_process_dict.pop("data")
bpmn_process = None
if bpmn_process_parent is not None:
bpmn_process = BpmnProcessModel.query.filter_by(
parent_process_id=bpmn_process_parent.id, guid=bpmn_process_guid
).first()
elif self.process_instance_model.bpmn_process_id is not None:
bpmn_process = self.process_instance_model.bpmn_process
if bpmn_process is None:
bpmn_process = BpmnProcessModel(guid=bpmn_process_guid)
bpmn_process.properties_json = bpmn_process_dict
bpmn_process_data_json = json.dumps(bpmn_process_data, sort_keys=True).encode(
"utf8"
)
bpmn_process_data_hash = sha256(bpmn_process_data_json).hexdigest()
if bpmn_process.json_data_hash != bpmn_process_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=bpmn_process_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(
hash=bpmn_process_data_hash, data=bpmn_process_data
)
db.session.add(json_data)
bpmn_process.json_data_hash = bpmn_process_data_hash
if bpmn_process_parent is None:
self.process_instance_model.bpmn_process = bpmn_process
elif bpmn_process.parent_process_id is None:
bpmn_process.parent_process_id = bpmn_process_parent.id
db.session.add(bpmn_process)
for task_id, task_properties in tasks.items():
task_data_dict = task_properties.pop("data")
state_int = task_properties["state"]
task = TaskModel.query.filter_by(guid=task_id).first()
if task is None:
# bpmn_process_identifier = task_properties['workflow_name']
# bpmn_identifier = task_properties['task_spec']
#
# task_definition = TaskDefinitionModel.query.filter_by(bpmn_identifier=bpmn_identifier)
# .join(BpmnProcessDefinitionModel).filter(BpmnProcessDefinitionModel.bpmn_identifier==bpmn_process_identifier).first()
# if task_definition is None:
# subprocess_task = TaskModel.query.filter_by(guid=bpmn_process.guid)
task = TaskModel(guid=task_id, bpmn_process_id=bpmn_process.id)
task.state = TaskStateNames[state_int]
task.properties_json = task_properties
task_data_json = json.dumps(task_data_dict, sort_keys=True).encode("utf8")
task_data_hash = sha256(task_data_json).hexdigest()
if task.json_data_hash != task_data_hash:
json_data = (
db.session.query(JsonDataModel.id)
.filter_by(hash=task_data_hash)
.first()
)
if json_data is None:
json_data = JsonDataModel(hash=task_data_hash, data=task_data_dict)
db.session.add(json_data)
task.json_data_hash = task_data_hash
db.session.add(task)
return bpmn_process
def _add_bpmn_json_records(self) -> None: def _add_bpmn_json_records(self) -> None:
"""Adds serialized_bpmn_definition and process_instance_data records to the db session. """Adds serialized_bpmn_definition and process_instance_data records to the db session.
Expects the save method to commit it. Expects the save method to commit it.
""" """
bpmn_dict = json.loads(self.serialize()) bpmn_dict = json.loads(self.serialize())
# with open('tmp2.json', 'w') as f: f.write(json.dumps(bpmn_dict)
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version") bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
bpmn_spec_dict = {}
process_instance_data_dict = {} process_instance_data_dict = {}
bpmn_spec_dict = {}
for bpmn_key in bpmn_dict.keys(): for bpmn_key in bpmn_dict.keys():
if bpmn_key in bpmn_dict_keys: if bpmn_key in bpmn_dict_keys:
bpmn_spec_dict[bpmn_key] = bpmn_dict[bpmn_key] bpmn_spec_dict[bpmn_key] = bpmn_dict[bpmn_key]
@ -870,40 +1121,25 @@ class ProcessInstanceProcessor:
process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key] process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key]
# FIXME: always save new hash until we get updated Spiff without loopresettask # FIXME: always save new hash until we get updated Spiff without loopresettask
# if self.process_instance_model.serialized_bpmn_definition_id is None: # if self.process_instance_model.bpmn_process_definition_id is None:
new_hash_digest = sha256( self._add_bpmn_process_definitions(bpmn_spec_dict)
json.dumps(bpmn_spec_dict, sort_keys=True).encode("utf8")
).hexdigest()
serialized_bpmn_definition = SerializedBpmnDefinitionModel.query.filter_by(
hash=new_hash_digest
).first()
if serialized_bpmn_definition is None:
serialized_bpmn_definition = SerializedBpmnDefinitionModel(
hash=new_hash_digest, static_json=json.dumps(bpmn_spec_dict)
)
db.session.add(serialized_bpmn_definition)
if (
self.process_instance_model.serialized_bpmn_definition_id is None
or self.process_instance_model.serialized_bpmn_definition.hash
!= new_hash_digest
):
self.process_instance_model.serialized_bpmn_definition = (
serialized_bpmn_definition
)
process_instance_data = None # FIXME: Update tasks in the did_complete_task instead to set the final info.
if self.process_instance_model.process_instance_data_id is None: # We will need to somehow cache all tasks initially though before each task is run.
process_instance_data = ProcessInstanceDataModel() # Maybe always do this for first run - just need to know it's the first run.
else: subprocesses = process_instance_data_dict.pop("subprocesses")
process_instance_data = self.process_instance_model.process_instance_data bpmn_process_parent = self._add_bpmn_process(process_instance_data_dict)
for subprocess_task_id, subprocess_properties in subprocesses.items():
process_instance_data.runtime_json = json.dumps(process_instance_data_dict) self._add_bpmn_process(
db.session.add(process_instance_data) subprocess_properties,
self.process_instance_model.process_instance_data = process_instance_data bpmn_process_parent,
bpmn_process_guid=subprocess_task_id,
)
def save(self) -> None: def save(self) -> None:
"""Saves the current state of this processor to the database.""" """Saves the current state of this processor to the database."""
self._add_bpmn_json_records() self._add_bpmn_json_records()
self.process_instance_model.spiff_serializer_version = self.SERIALIZER_VERSION
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED] complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
user_tasks = list(self.get_all_user_tasks()) user_tasks = list(self.get_all_user_tasks())
@ -1002,7 +1238,7 @@ class ProcessInstanceProcessor:
) )
db.session.add(spiff_step_detail) db.session.add(spiff_step_detail)
db.session.commit() db.session.commit()
self.log_spiff_step_details(spiff_step_detail_mapping) # self.log_spiff_step_details(spiff_step_detail_mapping)
if len(human_tasks) > 0: if len(human_tasks) > 0:
for at in human_tasks: for at in human_tasks:
@ -1039,7 +1275,7 @@ class ProcessInstanceProcessor:
spiff_step_detail = SpiffStepDetailsModel(**step) spiff_step_detail = SpiffStepDetailsModel(**step)
db.session.add(spiff_step_detail) db.session.add(spiff_step_detail)
db.session.commit() db.session.commit()
self.log_spiff_step_details(step) # self.log_spiff_step_details(step)
def manual_complete_task(self, task_id: str, execute: bool) -> None: def manual_complete_task(self, task_id: str, execute: bool) -> None:
"""Mark the task complete optionally executing it.""" """Mark the task complete optionally executing it."""
@ -1504,7 +1740,7 @@ class ProcessInstanceProcessor:
raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe raise ApiError.from_workflow_exception("task_error", str(swe), swe) from swe
finally: finally:
self.log_spiff_step_details(step_details) # self.log_spiff_step_details(step_details)
db.session.bulk_insert_mappings(SpiffStepDetailsModel, step_details) db.session.bulk_insert_mappings(SpiffStepDetailsModel, step_details)
spiff_logger = logging.getLogger("spiff") spiff_logger = logging.getLogger("spiff")
for handler in spiff_logger.handlers: for handler in spiff_logger.handlers:

View File

@ -1301,10 +1301,11 @@ class TestProcessApi(BaseTest):
assert create_response.json is not None assert create_response.json is not None
assert create_response.status_code == 201 assert create_response.status_code == 201
process_instance_id = create_response.json["id"] process_instance_id = create_response.json["id"]
client.post( run_response = client.post(
f"/v1.0/process-instances/{modified_process_model_identifier}/{process_instance_id}/run", f"/v1.0/process-instances/{modified_process_model_identifier}/{process_instance_id}/run",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
) )
assert run_response.status_code == 200
show_response = client.get( show_response = client.get(
f"/v1.0/process-instances/{modified_process_model_identifier}/{process_instance_id}?process_identifier={spec_reference.identifier}", f"/v1.0/process-instances/{modified_process_model_identifier}/{process_instance_id}?process_identifier={spec_reference.identifier}",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),

View File

@ -13,9 +13,7 @@ from spiffworkflow_backend.services.process_instance_service import (
class TestDotNotation(BaseTest): class TestDotNotation(BaseTest):
"""TestVariousBpmnConstructs.""" def test_dot_notation_in_message_path(
def test_dot_notation(
self, self,
app: Flask, app: Flask,
client: FlaskClient, client: FlaskClient,

View File

@ -71,7 +71,6 @@ class TestMessageService(BaseTest):
.all() .all()
) )
assert len(waiting_messages) == 0 assert len(waiting_messages) == 0
# The process has completed # The process has completed
assert self.process_instance.status == "complete" assert self.process_instance.status == "complete"

View File

@ -3,6 +3,7 @@ import pytest
from flask import g from flask import g
from flask.app import Flask from flask.app import Flask
from flask.testing import FlaskClient from flask.testing import FlaskClient
from SpiffWorkflow.task import TaskState # type: ignore
from tests.spiffworkflow_backend.helpers.base_test import BaseTest from tests.spiffworkflow_backend.helpers.base_test import BaseTest
from tests.spiffworkflow_backend.helpers.test_data import load_test_spec from tests.spiffworkflow_backend.helpers.test_data import load_test_spec
@ -261,6 +262,36 @@ class TestProcessInstanceProcessor(BaseTest):
assert process_instance.status == ProcessInstanceStatus.complete.value assert process_instance.status == ProcessInstanceStatus.complete.value
def test_can_load_up_processor_after_running_model_with_call_activities(
self,
app: Flask,
client: FlaskClient,
with_db_and_bpmn_file_cleanup: None,
with_super_admin_user: UserModel,
) -> None:
"""Test_does_not_recreate_human_tasks_on_multiple_saves."""
initiator_user = self.find_or_create_user("initiator_user")
process_model = load_test_spec(
process_model_id="test_group/call_activity_nested",
process_model_source_directory="call_activity_nested",
)
process_instance = self.create_process_instance_from_process_model(
process_model=process_model, user=initiator_user
)
processor = ProcessInstanceProcessor(process_instance)
processor.do_engine_steps(save=True)
# ensure this does not raise
processor = ProcessInstanceProcessor(process_instance)
# this task will be found within subprocesses
spiff_task = processor.__class__.get_task_by_bpmn_identifier(
"do_nothing", processor.bpmn_process_instance
)
assert spiff_task is not None
assert spiff_task.state == TaskState.COMPLETED
def test_does_not_recreate_human_tasks_on_multiple_saves( def test_does_not_recreate_human_tasks_on_multiple_saves(
self, self,
app: Flask, app: Flask,

View File

@ -17,7 +17,7 @@ else
shift shift
fi fi
if [[ -z "CYPRESS_SPIFFWORKFLOW_FRONTEND_AUTH_WITH_KEYCLOAK" ]]; then if [[ -z "${CYPRESS_SPIFFWORKFLOW_FRONTEND_AUTH_WITH_KEYCLOAK:-}" ]]; then
export CYPRESS_SPIFFWORKFLOW_FRONTEND_AUTH_WITH_KEYCLOAK=true export CYPRESS_SPIFFWORKFLOW_FRONTEND_AUTH_WITH_KEYCLOAK=true
fi fi

View File

@ -179,13 +179,14 @@ describe('process-instances', () => {
cy.get(statusSelect).contains(processStatus).click(); cy.get(statusSelect).contains(processStatus).click();
cy.get(statusSelect).click(); cy.get(statusSelect).click();
cy.getBySel('filter-button').click(); cy.getBySel('filter-button').click();
// FIXME: wait a little bit for the useEffects to be able to fully set processInstanceFilters
cy.wait(1000);
cy.url().should('include', `status=${processStatus}`); cy.url().should('include', `status=${processStatus}`);
cy.assertAtLeastOneItemInPaginatedResults(); cy.assertAtLeastOneItemInPaginatedResults();
cy.getBySel(`process-instance-status-${processStatus}`); cy.getBySel(`process-instance-status-${processStatus}`);
// there should really only be one, but in CI there are sometimes more // there should really only be one, but in CI there are sometimes more
cy.get('div[aria-label="Clear all selected items"]:first').click(); cy.get('div[aria-label="Clear all selected items"]:first').click();
cy.get('div[aria-label="Clear all selected items"]').should(
'not.exist'
);
} }
}); });

View File

@ -32,13 +32,16 @@ const approveWithUser = (
describe('pp1', () => { describe('pp1', () => {
it('can run PP1', () => { it('can run PP1', () => {
cy.login('core5.contributor', 'core5.contributor'); cy.login('core-a1.contributor', 'core-a1.contributor');
cy.visit('/'); cy.visit('/');
cy.contains('Start New +').click(); cy.contains('Start New +').click();
cy.contains('Raise New Demand Request'); cy.contains('Raise New Demand Request');
cy.runPrimaryBpmnFile(true); cy.runPrimaryBpmnFile(true);
cy.contains('Procurement').click(); cy.contains('Please select the type of request to Start the process.');
// cy.contains('Submit').click(); // wait a second to ensure we can click the radio button
cy.wait(1000);
cy.get('input#root-procurement').click();
cy.wait(1000);
cy.get('button') cy.get('button')
.contains(/^Submit$/) .contains(/^Submit$/)
.click(); .click();
@ -77,7 +80,8 @@ describe('pp1', () => {
.click(); .click();
cy.contains( cy.contains(
'Review and provide any supporting information or files for your request.' 'Review and provide any supporting information or files for your request.',
{ timeout: 20000 }
); );
cy.contains('Submit the Request').click(); cy.contains('Submit the Request').click();
cy.get('input[value="Submit the Request"]').click(); cy.get('input[value="Submit the Request"]').click();
@ -91,14 +95,14 @@ describe('pp1', () => {
processInstanceId, processInstanceId,
'Task: Reminder: Request Additional Budget' 'Task: Reminder: Request Additional Budget'
); );
approveWithUser('ppg.ba.sme', processInstanceId); approveWithUser('ppg.ba-a1.sme', processInstanceId);
approveWithUser('security.sme', processInstanceId); approveWithUser('security-a1.sme', processInstanceId);
approveWithUser( approveWithUser(
'infra.sme', 'infra-a1.sme',
processInstanceId, processInstanceId,
'Task: Update Application Landscape' 'Task: Update Application Landscape'
); );
approveWithUser('legal.sme', processInstanceId); approveWithUser('legal-a1.sme', processInstanceId);
}); });
}); });
}); });

View File

@ -42,13 +42,11 @@ Cypress.Commands.add('navigateToAdmin', () => {
}); });
Cypress.Commands.add('login', (username, password) => { Cypress.Commands.add('login', (username, password) => {
// Cypress.Commands.add('login', (selector, ...args) => {
cy.visit('/admin'); cy.visit('/admin');
console.log('username', username);
if (!username) { if (!username) {
const username = username = Cypress.env('SPIFFWORKFLOW_FRONTEND_USERNAME') || 'ciadmin1';
Cypress.env('SPIFFWORKFLOW_FRONTEND_USERNAME') || 'ciadmin1'; password = Cypress.env('SPIFFWORKFLOW_FRONTEND_PASSWORD') || 'ciadmin1';
const password =
Cypress.env('SPIFFWORKFLOW_FRONTEND_PASSWORD') || 'ciadmin1';
} }
cy.get('#username').type(username); cy.get('#username').type(username);
cy.get('#password').type(password); cy.get('#password').type(password);
@ -111,7 +109,7 @@ Cypress.Commands.add(
if (expectAutoRedirectToHumanTask) { if (expectAutoRedirectToHumanTask) {
// the url changes immediately, so also make sure we get some content from the next page, "Task:", or else when we try to interact with the page, it'll re-render and we'll get an error with cypress. // the url changes immediately, so also make sure we get some content from the next page, "Task:", or else when we try to interact with the page, it'll re-render and we'll get an error with cypress.
cy.url().should('include', `/tasks/`); cy.url().should('include', `/tasks/`);
cy.contains('Task: '); cy.contains('Task: ', { timeout: 10000 });
} else { } else {
cy.contains(/Process Instance.*[kK]icked [oO]ff/); cy.contains(/Process Instance.*[kK]icked [oO]ff/);
cy.reload(true); cy.reload(true);