Fixing a bug in SpiffWorkflow (new version in poetry.lock)

Don't explode when back-filling process models and hitting and error
Assure processes are executable when setting them as the default primary process.
The SpecReferenceCache now uses a unique constraint across two fields. (requires a new db)
This commit is contained in:
Dan 2022-11-15 14:50:41 -05:00
parent 7444e0a62c
commit de198940dd
6 changed files with 79 additions and 101 deletions

View File

@ -1,8 +1,8 @@
"""empty message """empty message
Revision ID: 7cc9bdcc309f Revision ID: b7790c9c8174
Revises: Revises:
Create Date: 2022-11-15 09:53:53.349712 Create Date: 2022-11-15 14:11:47.309399
""" """
from alembic import op from alembic import op
@ -10,7 +10,7 @@ import sqlalchemy as sa
# revision identifiers, used by Alembic. # revision identifiers, used by Alembic.
revision = '7cc9bdcc309f' revision = 'b7790c9c8174'
down_revision = None down_revision = None
branch_labels = None branch_labels = None
depends_on = None depends_on = None
@ -49,10 +49,11 @@ def upgrade():
sa.Column('has_lanes', sa.Boolean(), nullable=True), sa.Column('has_lanes', sa.Boolean(), nullable=True),
sa.Column('is_executable', sa.Boolean(), nullable=True), sa.Column('is_executable', sa.Boolean(), nullable=True),
sa.Column('is_primary', sa.Boolean(), nullable=True), sa.Column('is_primary', sa.Boolean(), nullable=True),
sa.PrimaryKeyConstraint('id') sa.PrimaryKeyConstraint('id'),
sa.UniqueConstraint('identifier', 'type', name='_identifier_type_unique')
) )
op.create_index(op.f('ix_spec_reference_cache_display_name'), 'spec_reference_cache', ['display_name'], unique=False) op.create_index(op.f('ix_spec_reference_cache_display_name'), 'spec_reference_cache', ['display_name'], unique=False)
op.create_index(op.f('ix_spec_reference_cache_identifier'), 'spec_reference_cache', ['identifier'], unique=True) op.create_index(op.f('ix_spec_reference_cache_identifier'), 'spec_reference_cache', ['identifier'], unique=False)
op.create_index(op.f('ix_spec_reference_cache_type'), 'spec_reference_cache', ['type'], unique=False) op.create_index(op.f('ix_spec_reference_cache_type'), 'spec_reference_cache', ['type'], unique=False)
op.create_table('spiff_logging', op.create_table('spiff_logging',
sa.Column('id', sa.Integer(), nullable=False), sa.Column('id', sa.Integer(), nullable=False),

View File

@ -643,7 +643,7 @@ werkzeug = "*"
type = "git" type = "git"
url = "https://github.com/sartography/flask-bpmn" url = "https://github.com/sartography/flask-bpmn"
reference = "main" reference = "main"
resolved_reference = "886bfdc31aade43e9683439e6d29b06acb235081" resolved_reference = "6f6762ec83bb6eec24f7cc799d4d5fa7867c7474"
[[package]] [[package]]
name = "Flask-Cors" name = "Flask-Cors"
@ -1876,7 +1876,7 @@ lxml = "*"
type = "git" type = "git"
url = "https://github.com/sartography/SpiffWorkflow" url = "https://github.com/sartography/SpiffWorkflow"
reference = "main" reference = "main"
resolved_reference = "14d3d8c3f69af880eaf994be1689ee9fcc72e829" resolved_reference = "025bc30f27366e06dd1286b7563e4b1cb04c1c46"
[[package]] [[package]]
name = "SQLAlchemy" name = "SQLAlchemy"

View File

@ -5,6 +5,7 @@ from flask_bpmn.models.db import db
from flask_bpmn.models.db import SpiffworkflowBaseDBModel from flask_bpmn.models.db import SpiffworkflowBaseDBModel
from flask_marshmallow import Schema # type: ignore from flask_marshmallow import Schema # type: ignore
from marshmallow import INCLUDE from marshmallow import INCLUDE
from sqlalchemy import UniqueConstraint
@dataclass() @dataclass()
@ -35,9 +36,10 @@ class SpecReferenceCache(SpiffworkflowBaseDBModel):
"""A cache of information about all the Processes and Decisions defined in all files.""" """A cache of information about all the Processes and Decisions defined in all files."""
__tablename__ = "spec_reference_cache" __tablename__ = "spec_reference_cache"
__table_args__ = (UniqueConstraint('identifier', 'type', name='_identifier_type_unique'),
)
id = db.Column(db.Integer, primary_key=True) id = db.Column(db.Integer, primary_key=True)
identifier = db.Column(db.String(255), unique=True, index=True) identifier = db.Column(db.String(255), index=True)
display_name = db.Column(db.String(255), index=True) display_name = db.Column(db.String(255), index=True)
process_model_id = db.Column(db.String(255)) process_model_id = db.Column(db.String(255))
type = db.Column(db.String(255), index=True) # either 'process' or 'decision' type = db.Column(db.String(255), index=True) # either 'process' or 'decision'

View File

@ -1,10 +1,10 @@
"""Data_setup_service.""" """Data_setup_service."""
from flask import current_app from flask import current_app
from flask_bpmn.models.db import db
from spiffworkflow_backend.services.process_model_service import ProcessModelService from spiffworkflow_backend.services.process_model_service import ProcessModelService
from spiffworkflow_backend.services.spec_file_service import SpecFileService from spiffworkflow_backend.services.spec_file_service import SpecFileService
class DataSetupService: class DataSetupService:
"""DataSetupService.""" """DataSetupService."""
@ -15,81 +15,41 @@ class DataSetupService:
@classmethod @classmethod
def save_all_process_models(cls) -> list: def save_all_process_models(cls) -> list:
"""Save_all.""" """Build a cache of all processes, messages, correlation keys, and start events that
exist within processes located on the file system, so we can quickly reference them
from the database. """
# Clear out all of the cached data.
SpecFileService.clear_caches()
current_app.logger.debug("DataSetupService.save_all_process_models() start") current_app.logger.debug("DataSetupService.save_all_process_models() start")
failing_process_models = [] failing_process_models = []
process_models = ProcessModelService().get_process_models() process_models = ProcessModelService().get_process_models()
for process_model in process_models: for process_model in process_models:
process_model_files = SpecFileService.get_files(
process_model, extension_filter=".bpmn"
)
for process_model_file in process_model_files:
bpmn_xml_file_contents = SpecFileService.get_data(
process_model, process_model_file.name
)
bad_files = [
"B.1.0.bpmn",
"C.1.0.bpmn",
"C.2.0.bpmn",
"C.6.0.bpmn",
"TC-5.1.bpmn",
]
if process_model_file.name in bad_files:
continue
current_app.logger.debug( current_app.logger.debug(
f"primary_file_name: {process_model_file.name}" f"Process Model: {process_model.display_name}"
) )
try: try:
SpecFileService.update_file( refs = SpecFileService.get_references_for_process(process_model)
process_model, for ref in refs:
process_model_file.name, try:
bpmn_xml_file_contents, SpecFileService.update_caches(ref)
)
except Exception as ex: except Exception as ex:
failing_process_models.append( failing_process_models.append(
( (
f"{process_model.process_group}/{process_model.id}/{process_model_file.name}", f"{ref.process_model_id}/{ref.file_name}",
str(ex), str(ex),
) )
) )
# files = SpecFileService.get_files( except Exception as ex2:
# process_model, extension_filter="bpmn"
# )
# bpmn_etree_element: EtreeElement = (
# SpecFileService.get_etree_element_from_binary_data(
# bpmn_xml_file_contents, process_model.primary_file_name
# )
# )
# if len(files) == 1:
# try:
# new_bpmn_process_identifier = (
# SpecFileService.get_bpmn_process_identifier(
# bpmn_etree_element
# )
# )
# if (
# process_model.primary_process_id
# != new_bpmn_process_identifier
# ):
# print(
# "primary_process_id: ", process_model.primary_process_id
# )
# # attributes_to_update = {
# # "primary_process_id": new_bpmn_process_identifier
# # }
# # ProcessModelService().update_spec(
# # process_model, attributes_to_update
# # )
# # except Exception as exception:
# except Exception:
# print(f"BAD ONE: {process_model.id}")
# # raise exception
else:
failing_process_models.append( failing_process_models.append(
( (
f"{process_model.process_group}/{process_model.id}", f"{process_model.id}",
"primary_file_name not set", str(ex2),
) )
) )
current_app.logger.debug("DataSetupService.save_all_process_models() end") current_app.logger.debug("DataSetupService.save_all_process_models() end")
db.session.commit()
return failing_process_models return failing_process_models

View File

@ -677,6 +677,7 @@ class ProcessInstanceProcessor:
"""Backfill_missing_spec_reference_records.""" """Backfill_missing_spec_reference_records."""
process_models = ProcessModelService().get_process_models() process_models = ProcessModelService().get_process_models()
for process_model in process_models: for process_model in process_models:
try:
refs = SpecFileService.reference_map( refs = SpecFileService.reference_map(
SpecFileService.get_references_for_process(process_model) SpecFileService.get_references_for_process(process_model)
) )
@ -684,6 +685,8 @@ class ProcessInstanceProcessor:
if bpmn_process_identifier in bpmn_process_identifiers: if bpmn_process_identifier in bpmn_process_identifiers:
SpecFileService.update_process_cache(refs[bpmn_process_identifier]) SpecFileService.update_process_cache(refs[bpmn_process_identifier])
return FileSystemService.full_path_to_process_model_file(process_model) return FileSystemService.full_path_to_process_model_file(process_model)
except Exception as e:
current_app.logger.warning('Failed to parse process ', process_model.id)
return None return None
@staticmethod @staticmethod

View File

@ -113,7 +113,7 @@ class SpecFileService(FileSystemService):
for sub_parser in sub_parsers: for sub_parser in sub_parsers:
if parser_type == "process": if parser_type == "process":
has_lanes = sub_parser.has_lanes() has_lanes = sub_parser.has_lanes()
sub_parser.process_executable is_executable = sub_parser.process_executable
start_messages = sub_parser.start_messages() start_messages = sub_parser.start_messages()
is_primary = ( is_primary = (
sub_parser.get_id() == process_model_info.primary_process_id sub_parser.get_id() == process_model_info.primary_process_id
@ -156,11 +156,12 @@ class SpecFileService(FileSystemService):
file = SpecFileService.to_file_object(file_name, full_file_path) file = SpecFileService.to_file_object(file_name, full_file_path)
references = SpecFileService.get_references_for_file(file, process_model_info) references = SpecFileService.get_references_for_file(file, process_model_info)
primary_process_ref = next((ref for ref in references if ref.is_primary), None) primary_process_ref = next((ref for ref in references if ref.is_primary and ref.is_executable), None)
for ref in references: for ref in references:
# If no valid primary process is defined, default to the first process in the # If no valid primary process is defined, default to the first process in the
# updated file. # updated file.
if not primary_process_ref and ref.type == "process": if not primary_process_ref and ref.type == "process" and ref.is_executable:
ref.is_primary = True ref.is_primary = True
if ref.is_primary: if ref.is_primary:
@ -172,10 +173,7 @@ class SpecFileService(FileSystemService):
"is_review": ref.has_lanes, "is_review": ref.has_lanes,
}, },
) )
SpecFileService.update_process_cache(ref) SpecFileService.update_caches(ref)
SpecFileService.update_message_cache(ref)
SpecFileService.update_message_trigger_cache(ref, process_model_info)
SpecFileService.update_correlation_cache(ref)
return file return file
@staticmethod @staticmethod
@ -227,12 +225,27 @@ class SpecFileService(FileSystemService):
# fixme: Place all the caching stuff in a different service. # fixme: Place all the caching stuff in a different service.
@staticmethod
def update_caches(ref):
SpecFileService.update_process_cache(ref)
SpecFileService.update_message_cache(ref)
SpecFileService.update_message_trigger_cache(ref)
SpecFileService.update_correlation_cache(ref)
@staticmethod
def clear_caches():
db.session.query(SpecReferenceCache).delete()
db.session.query(MessageCorrelationPropertyModel).delete()
db.session.query(MessageTriggerableProcessModel).delete()
db.session.query(MessageModel).delete()
@staticmethod @staticmethod
def update_process_cache(ref: SpecReference) -> None: def update_process_cache(ref: SpecReference) -> None:
"""Update_process_cache.""" """Update_process_cache."""
process_id_lookup = SpecReferenceCache.query.filter_by( process_id_lookup = SpecReferenceCache.query.\
identifier=ref.identifier filter_by(identifier=ref.identifier).\
).first() filter_by(type=ref.type).\
first()
if process_id_lookup is None: if process_id_lookup is None:
process_id_lookup = SpecReferenceCache.from_spec_reference(ref) process_id_lookup = SpecReferenceCache.from_spec_reference(ref)
db.session.add(process_id_lookup) db.session.add(process_id_lookup)
@ -269,9 +282,7 @@ class SpecFileService(FileSystemService):
db.session.commit() db.session.commit()
@staticmethod @staticmethod
def update_message_trigger_cache( def update_message_trigger_cache(ref: SpecReference) -> None:
ref: SpecReference, process_model_info: ProcessModelInfo
) -> None:
"""Assure we know which messages can trigger the start of a process.""" """Assure we know which messages can trigger the start of a process."""
for message_model_identifier in ref.start_messages: for message_model_identifier in ref.start_messages:
message_model = MessageModel.query.filter_by( message_model = MessageModel.query.filter_by(
@ -287,24 +298,25 @@ class SpecFileService(FileSystemService):
message_model_id=message_model.id, message_model_id=message_model.id,
).first() ).first()
) )
if message_triggerable_process_model is None: if message_triggerable_process_model is None:
message_triggerable_process_model = MessageTriggerableProcessModel( message_triggerable_process_model = (
MessageTriggerableProcessModel(
message_model_id=message_model.id, message_model_id=message_model.id,
process_model_identifier=process_model_info.id, process_model_identifier=ref.process_model_id,
process_group_identifier="process_group_identifier", process_group_identifier="process_group_identifier"
)
) )
db.session.add(message_triggerable_process_model) db.session.add(message_triggerable_process_model)
db.session.commit() db.session.commit()
else: else:
if ( if (
message_triggerable_process_model.process_model_identifier message_triggerable_process_model.process_model_identifier
!= process_model_info.id != ref.process_model_id
# or message_triggerable_process_model.process_group_identifier # or message_triggerable_process_model.process_group_identifier
# != process_model_info.process_group_id # != process_model_info.process_group_id
): ):
raise ValidationException( raise ValidationException(
f"Message model is already used to start process model {process_model_info.id}" f"Message model is already used to start process model {ref.process_model_id}"
) )
@staticmethod @staticmethod