some logic to attempt to use the new bpmn json tables w/ burnettk

This commit is contained in:
jasquat 2023-02-28 17:46:14 -05:00
parent d5acd5a16d
commit f74ce0f568
8 changed files with 144 additions and 23 deletions

View File

@ -0,0 +1,42 @@
"""empty message
Revision ID: dc8f4b664e78
Revises: fdf522e4c48a
Create Date: 2023-02-28 17:38:42.496187
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'dc8f4b664e78'
down_revision = 'fdf522e4c48a'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('process_instance_data',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('process_instance_id', sa.Integer(), nullable=False),
sa.Column('runtime_json', sa.JSON(), nullable=True),
sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.add_column('process_instance', sa.Column('serialized_bpmn_definition_id', sa.Integer(), nullable=True))
op.create_foreign_key(None, 'process_instance', 'serialized_bpmn_definition', ['serialized_bpmn_definition_id'], ['id'])
op.create_index(op.f('ix_serialized_bpmn_definition_hash'), 'serialized_bpmn_definition', ['hash'], unique=False)
op.add_column('spiff_step_details', sa.Column('delta_json', sa.JSON(), nullable=False))
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column('spiff_step_details', 'delta_json')
op.drop_index(op.f('ix_serialized_bpmn_definition_hash'), table_name='serialized_bpmn_definition')
op.drop_constraint(None, 'process_instance', type_='foreignkey')
op.drop_column('process_instance', 'serialized_bpmn_definition_id')
op.drop_table('process_instance_data')
# ### end Alembic commands ###

View File

@ -0,0 +1,32 @@
"""empty message
Revision ID: e494a3955ce5
Revises: dc8f4b664e78
Create Date: 2023-02-28 17:41:51.208350
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import mysql
# revision identifiers, used by Alembic.
revision = 'e494a3955ce5'
down_revision = 'dc8f4b664e78'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('spiff_step_details', 'delta_json',
existing_type=mysql.JSON(),
nullable=True)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column('spiff_step_details', 'delta_json',
existing_type=mysql.JSON(),
nullable=False)
# ### end Alembic commands ###

View File

@ -51,6 +51,8 @@ from spiffworkflow_backend.models.process_instance_metadata import (
ProcessInstanceMetadataModel,
) # noqa: F401
from spiffworkflow_backend.models.serialized_bpmn_definition import SerializedBpmnDefinitionModel # noqa: F401
# it was wrongly ProcessesInstanceData
from spiffworkflow_backend.models.process_instance_data import ProcessInstanceDataModel # noqa: F401

View File

@ -1,5 +1,7 @@
"""Process_instance."""
from __future__ import annotations
from typing import Optional
from spiffworkflow_backend.models.serialized_bpmn_definition import SerializedBpmnDefinitionModel # noqa: F401
from typing import Any
from typing import cast
@ -60,6 +62,9 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
process_initiator_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore
process_initiator = relationship("UserModel")
serialized_bpmn_definition_id: Optional[int] = db.Column(ForeignKey(SerializedBpmnDefinitionModel.id), nullable=True) # type: ignore
serialized_bpmn_definition = relationship("SerializedBpmnDefinitionModel")
active_human_tasks = relationship(
"HumanTaskModel",
primaryjoin=(
@ -79,21 +84,6 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
cascade="delete",
) # type: ignore
# static
# "subprocess_specs",
# "spec",
# "serializer_version",
# runtime
# "bpmn_messages",
# "correlations",
# "data",
# "last_task",
# "root",
# "subprocesses",
# "success",
# "tasks"
bpmn_json: str | None = deferred(db.Column(db.JSON)) # type: ignore
start_in_seconds: int | None = db.Column(db.Integer)
end_in_seconds: int | None = db.Column(db.Integer)

View File

@ -0,0 +1,29 @@
"""Process_instance."""
from __future__ import annotations
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from sqlalchemy.orm import deferred
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# the last three here should maybe become columns on process instance someday
# runtime_json
# "bpmn_messages",
# "correlations",
# "data",
# "subprocesses",
# "tasks",
# "last_task", # guid generated by spiff
# "root", # guid generated by spiff
# "success", # boolean
class ProcessInstanceDataModel(SpiffworkflowBaseDBModel):
__tablename__ = "process_instance_data"
id: int = db.Column(db.Integer, primary_key=True)
process_instance_id: int = db.Column(
ForeignKey(ProcessInstanceModel.id), nullable=False # type: ignore
)
runtime_json: str | None = deferred(db.Column(db.JSON)) # type: ignore

View File

@ -20,12 +20,12 @@ from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel
# "data",
# "subprocesses",
# "tasks"
#
# new columns on process_instance
# "last_task", # guid generated by spiff
# "root", # guid generated by spiff
# "success", # boolean
#
# new columns on process_instance
#
# delta algorithm:
# a = {"hey": { "hey2": 2, "hey3": 3, "hey6": 7 }, "hey30": 3, "hey40": 4}
# b = {"hey": { "hey2": 4, "hey5": 3 }, "hey20": 2, "hey30": 3}
@ -46,5 +46,5 @@ class SerializedBpmnDefinitionModel(SpiffworkflowBaseDBModel):
__tablename__ = "serialized_bpmn_definition"
id: int = db.Column(db.Integer, primary_key=True)
hash: str = db.Column(db.String(255), nullable=False)
hash: str = db.Column(db.String(255), nullable=False, index=True)
static_json: str | None = deferred(db.Column(db.JSON)) # type: ignore

View File

@ -31,7 +31,7 @@ class SpiffStepDetailsModel(SpiffworkflowBaseDBModel):
task_id: str = db.Column(db.String(50), nullable=False)
task_state: str = db.Column(db.String(50), nullable=False)
bpmn_task_identifier: str = db.Column(db.String(255), nullable=False)
delta_json: list = deferred(db.Column(db.JSON, nullable=False)) # type: ignore
delta_json: list = deferred(db.Column(db.JSON)) # type: ignore
start_in_seconds: float = db.Column(db.DECIMAL(17, 6), nullable=False)

View File

@ -1,4 +1,8 @@
"""Process_instance_processor."""
from hashlib import sha256
from spiffworkflow_backend.models import serialized_bpmn_definition
from spiffworkflow_backend.models.process_instance_data import ProcessInstanceDataModel
from spiffworkflow_backend.models.serialized_bpmn_definition import SerializedBpmnDefinitionModel # noqa: F401
import _strptime # type: ignore
import decimal
import json
@ -438,7 +442,7 @@ class ProcessInstanceProcessor:
self.process_model_service = ProcessModelService()
bpmn_process_spec = None
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None
if process_instance_model.bpmn_json is None:
if process_instance_model.serialized_bpmn_definition_id is None:
(
bpmn_process_spec,
subprocesses,
@ -515,6 +519,18 @@ class ProcessInstanceProcessor:
self.bpmn_process_instance
)
@classmethod
def get_full_bpmn_json(self, process_instance_model: ProcessInstanceModel) -> Optional[dict]:
if process_instance_model.serialized_bpmn_definition_id is None:
return None
serialized_bpmn_definition = process_instance_model.serialized_bpmn_definition
process_instance_data = ProcessInstanceDataModel.query.filter_by(process_instance_id=process_instance_model.id).first()
# if process_instance_data is not None:
return json.loads(serialized_bpmn_definition.static_json).update(json.loads(process_instance_data.runtime_json))
def current_user(self) -> Any:
"""Current_user."""
current_user = None
@ -550,7 +566,7 @@ class ProcessInstanceProcessor:
subprocesses: Optional[IdToBpmnProcessSpecMapping] = None,
) -> BpmnWorkflow:
"""__get_bpmn_process_instance."""
if process_instance_model.bpmn_json:
if process_instance_model.serialized_bpmn_definition_id:
# turn off logging to avoid duplicated spiff logs
spiff_logger = logging.getLogger("spiff")
original_spiff_logger_log_level = spiff_logger.level
@ -559,7 +575,7 @@ class ProcessInstanceProcessor:
try:
bpmn_process_instance = (
ProcessInstanceProcessor._serializer.deserialize_json(
process_instance_model.bpmn_json
json.dumps(ProcessInstanceProcessor.get_full_bpmn_json(process_instance_model))
)
)
except Exception as err:
@ -834,7 +850,17 @@ class ProcessInstanceProcessor:
def save(self) -> None:
"""Saves the current state of this processor to the database."""
self.process_instance_model.bpmn_json = self.serialize()
# self.process_instance_model.bpmn_json = self.serialize()
bpmn_json = self.serialize()
if self.process_instance_model.serialized_bpmn_definition_id is None:
new_hash = {k: bpmn_json[k] for k in ('spec', 'subprocess_spec', 'serializer_version')}
new_hash_digest = sha256(json.dumps(new_hash, sort_keys=True).encode('utf8')).hexdigest()
serialized_bpmn_definition = SerializedBpmnDefinitionModel(hash=new_hash_digest).first()
if serialized_bpmn_definition is None:
serialized_bpmn_definition = SerializedBpmnDefinitionModel(hash=new_hash_digest, static_json=json.dumps(new_hash))
db.session.add(serialized_bpmn_definition)
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
user_tasks = list(self.get_all_user_tasks())