we can save the top level spec to the database with its tasks w/ burnettk

This commit is contained in:
jasquat 2023-03-02 17:28:31 -05:00
parent cd746e78df
commit e28d212e38
No known key found for this signature in database
7 changed files with 220 additions and 1 deletions

View File

@ -0,0 +1,44 @@
"""empty message
Revision ID: 2fe2830f45e1
Revises: 317dd5155137
Create Date: 2023-03-02 17:19:08.535027
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '2fe2830f45e1'
down_revision = '317dd5155137'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('json_data',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hash', sa.String(length=255), nullable=False),
sa.Column('data', sa.JSON(), nullable=False),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_json_data_hash'), 'json_data', ['hash'], unique=True)
op.create_table('bpmn_process_definition_relationship',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_parent_id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_child_id', sa.Integer(), nullable=False),
sa.ForeignKeyConstraint(['bpmn_process_definition_child_id'], ['bpmn_process_definition.id'], ),
sa.ForeignKeyConstraint(['bpmn_process_definition_parent_id'], ['bpmn_process_definition.id'], ),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('bpmn_process_definition_relationship')
op.drop_index(op.f('ix_json_data_hash'), table_name='json_data')
op.drop_table('json_data')
# ### end Alembic commands ###

View File

@ -0,0 +1,85 @@
"""empty message
Revision ID: 317dd5155137
Revises: 8930711a75a4
Create Date: 2023-03-02 17:16:15.687837
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '317dd5155137'
down_revision = '8930711a75a4'
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('bpmn_process',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('parent_process_id', sa.Integer(), nullable=True),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.Column('process_type', sa.String(length=30), nullable=False),
sa.ForeignKeyConstraint(['parent_process_id'], ['bpmn_process.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_bpmn_process_json_data_hash'), 'bpmn_process', ['json_data_hash'], unique=False)
op.create_table('bpmn_process_definition',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('hash', sa.String(length=255), nullable=False),
sa.Column('bpmn_identifier', sa.String(length=255), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('type', sa.String(length=32), nullable=False),
sa.Column('bpmn_version_control_type', sa.String(length=50), nullable=True),
sa.Column('bpmn_version_control_identifier', sa.String(length=255), nullable=True),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_bpmn_process_definition_bpmn_identifier'), 'bpmn_process_definition', ['bpmn_identifier'], unique=False)
op.create_index(op.f('ix_bpmn_process_definition_hash'), 'bpmn_process_definition', ['hash'], unique=True)
op.create_index(op.f('ix_bpmn_process_definition_type'), 'bpmn_process_definition', ['type'], unique=False)
op.create_table('task_definition',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('bpmn_process_definition_id', sa.Integer(), nullable=False),
sa.Column('bpmn_identifier', sa.String(length=255), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('typename', sa.String(length=255), nullable=False),
sa.ForeignKeyConstraint(['bpmn_process_definition_id'], ['bpmn_process_definition.id'], ),
sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('bpmn_process_definition_id', 'bpmn_identifier', name='task_definition_unique')
)
op.create_index(op.f('ix_task_definition_bpmn_identifier'), 'task_definition', ['bpmn_identifier'], unique=False)
op.create_table('task',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('guid', sa.String(length=36), nullable=False),
sa.Column('bpmn_process_id', sa.Integer(), nullable=False),
sa.Column('task_definition_id', sa.Integer(), nullable=False),
sa.Column('state', sa.String(length=10), nullable=False),
sa.Column('properties_json', sa.JSON(), nullable=False),
sa.Column('json_data_hash', sa.String(length=255), nullable=False),
sa.ForeignKeyConstraint(['bpmn_process_id'], ['bpmn_process.id'], ),
sa.ForeignKeyConstraint(['task_definition_id'], ['task_definition.id'], ),
sa.PrimaryKeyConstraint('id')
)
op.create_index(op.f('ix_task_guid'), 'task', ['guid'], unique=True)
op.create_index(op.f('ix_task_json_data_hash'), 'task', ['json_data_hash'], unique=False)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_index(op.f('ix_task_json_data_hash'), table_name='task')
op.drop_index(op.f('ix_task_guid'), table_name='task')
op.drop_table('task')
op.drop_index(op.f('ix_task_definition_bpmn_identifier'), table_name='task_definition')
op.drop_table('task_definition')
op.drop_index(op.f('ix_bpmn_process_definition_type'), table_name='bpmn_process_definition')
op.drop_index(op.f('ix_bpmn_process_definition_hash'), table_name='bpmn_process_definition')
op.drop_index(op.f('ix_bpmn_process_definition_bpmn_identifier'), table_name='bpmn_process_definition')
op.drop_table('bpmn_process_definition')
op.drop_index(op.f('ix_bpmn_process_json_data_hash'), table_name='bpmn_process')
op.drop_table('bpmn_process')
# ### end Alembic commands ###

View File

@ -57,5 +57,11 @@ from spiffworkflow_backend.models.process_instance_data import (
ProcessInstanceDataModel,
) # noqa: F401
from spiffworkflow_backend.models.bpmn_process import BpmnProcessModel # noqa: F401
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel # noqa: F401
from spiffworkflow_backend.models.task import TaskModel # noqa: F401
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel # noqa: F401
from spiffworkflow_backend.models.json_data import JsonDataModel # noqa: F401
from spiffworkflow_backend.models.bpmn_process_definition_relationship import BpmnProcessDefinitionRelationshipModel # noqa: F401
add_listeners()

View File

@ -16,7 +16,7 @@ class BpmnProcessModel(SpiffworkflowBaseDBModel):
__tablename__ = "bpmn_process"
id: int = db.Column(db.Integer, primary_key=True)
parent_process_id: int = db.Column(ForeignKey("BpmnProcessModel.id"), nullable=True)
parent_process_id: int = db.Column(ForeignKey("bpmn_process.id"), nullable=True)
properties_json: dict = db.Column(db.JSON, nullable=False)
json_data_hash: str = db.Column(db.String(255), nullable=False, index=True)

View File

@ -16,6 +16,7 @@ class BpmnProcessDefinitionModel(SpiffworkflowBaseDBModel):
# this is a sha256 hash of spec and serializer_version
hash: str = db.Column(db.String(255), nullable=False, index=True, unique=True)
bpmn_identifier: str = db.Column(db.String(255), nullable=False, index=True)
properties_json: str = db.Column(db.JSON, nullable=False)

View File

@ -1,4 +1,5 @@
from __future__ import annotations
from sqlalchemy.orm import relationship
from sqlalchemy import UniqueConstraint
from sqlalchemy import ForeignKey
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
@ -17,6 +18,7 @@ class TaskDefinitionModel(SpiffworkflowBaseDBModel):
id: int = db.Column(db.Integer, primary_key=True)
bpmn_process_definition_id: int = db.Column(ForeignKey(BpmnProcessDefinitionModel.id), nullable=False) # type: ignore
bpmn_process_definition = relationship(BpmnProcessDefinitionModel)
bpmn_identifier: str = db.Column(db.String(255), nullable=False, index=True)
properties_json: dict = db.Column(db.JSON, nullable=False)

View File

@ -55,6 +55,7 @@ from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from sqlalchemy import text
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.bpmn_process_definition import BpmnProcessDefinitionModel
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType
@ -80,6 +81,7 @@ from spiffworkflow_backend.models.serialized_bpmn_definition import (
) # noqa: F401
from spiffworkflow_backend.models.spec_reference import SpecReferenceCache
from spiffworkflow_backend.models.spiff_step_details import SpiffStepDetailsModel
from spiffworkflow_backend.models.task_definition import TaskDefinitionModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.scripts.script import Script
from spiffworkflow_backend.services.custom_parser import MyCustomParser
@ -906,9 +908,88 @@ class ProcessInstanceProcessor:
db.session.add(process_instance_data)
self.process_instance_model.process_instance_data = process_instance_data
def _store_bpmn_process_definitions(self, process_bpmn_properties: dict) -> None:
# for process_bpmn_identifier, process_bpmn_properties in bpmn_spec_dict.items():
print(f"process_bpmn_properties: {process_bpmn_properties}")
process_bpmn_identifier = process_bpmn_properties['name']
new_hash_digest = sha256(
json.dumps(process_bpmn_properties, sort_keys=True).encode("utf8")
).hexdigest()
bpmn_process_definition = BpmnProcessDefinitionModel.query.filter_by(
hash=new_hash_digest
).first()
if bpmn_process_definition is None:
# print(f"process_bpmn_identifier: {process_bpmn_identifier}")
print(f"process_bpmn_properties: {process_bpmn_properties}")
task_specs = process_bpmn_properties.pop("task_specs")
bpmn_process_definition = BpmnProcessDefinitionModel(
hash=new_hash_digest, bpmn_identifier=process_bpmn_identifier, properties_json=json.dumps(process_bpmn_properties), type="process"
)
db.session.add(bpmn_process_definition)
for task_bpmn_identifier, task_bpmn_properties in task_specs.items():
task_definition = TaskDefinitionModel(
bpmn_process_definition=bpmn_process_definition,
bpmn_identifier=task_bpmn_identifier,
properties_json=json.dumps(task_bpmn_properties),
typename=task_bpmn_properties['typename'],
)
db.session.add(task_definition)
def _add_bpmn_json_records_new(self) -> None:
"""Adds serialized_bpmn_definition and process_instance_data records to the db session.
Expects the save method to commit it.
"""
bpmn_dict = json.loads(self.serialize())
bpmn_dict_keys = ("spec", "subprocess_specs", "serializer_version")
bpmn_spec_dict = {}
process_instance_data_dict = {}
for bpmn_key in bpmn_dict.keys():
if bpmn_key in bpmn_dict_keys:
bpmn_spec_dict[bpmn_key] = bpmn_dict[bpmn_key]
else:
process_instance_data_dict[bpmn_key] = bpmn_dict[bpmn_key]
self._store_bpmn_process_definitions(bpmn_spec_dict['spec'])
# # FIXME: always save new hash until we get updated Spiff without loopresettask
# # if self.process_instance_model.serialized_bpmn_definition_id is None:
# new_hash_digest = sha256(
# json.dumps(bpmn_spec_dict, sort_keys=True).encode("utf8")
# ).hexdigest()
# serialized_bpmn_definition = SerializedBpmnDefinitionModel.query.filter_by(
# hash=new_hash_digest
# ).first()
# if serialized_bpmn_definition is None:
# serialized_bpmn_definition = SerializedBpmnDefinitionModel(
# hash=new_hash_digest, static_json=json.dumps(bpmn_spec_dict)
# )
# db.session.add(serialized_bpmn_definition)
# if (
# self.process_instance_model.serialized_bpmn_definition_id is None
# or self.process_instance_model.serialized_bpmn_definition.hash
# != new_hash_digest
# ):
# self.process_instance_model.serialized_bpmn_definition = (
# serialized_bpmn_definition
# )
#
# process_instance_data = None
# if self.process_instance_model.process_instance_data_id is None:
# process_instance_data = ProcessInstanceDataModel()
# else:
# process_instance_data = self.process_instance_model.process_instance_data
#
# process_instance_data.runtime_json = json.dumps(process_instance_data_dict)
# db.session.add(process_instance_data)
# self.process_instance_model.process_instance_data = process_instance_data
def save(self) -> None:
"""Saves the current state of this processor to the database."""
self._add_bpmn_json_records()
self._add_bpmn_json_records_new()
complete_states = [TaskState.CANCELLED, TaskState.COMPLETED]
user_tasks = list(self.get_all_user_tasks())