From 0030a469380eaf6914594c3544f990bcb9161ae0 Mon Sep 17 00:00:00 2001 From: Dan Date: Thu, 23 Feb 2023 14:15:49 -0500 Subject: [PATCH] run_pyl --- .../migrations/versions/4ab08bf12666_.py | 139 ++++++++++++++++++ .../models/message_instance.py | 36 ++++- .../models/message_instance_correlation.py | 17 ++- .../message_triggerable_process_model.py | 2 - .../routes/messages_controller.py | 33 +++-- .../services/error_handling_service.py | 4 +- .../services/message_service.py | 73 +++++---- .../services/process_instance_processor.py | 33 +++-- .../services/spec_file_service.py | 2 - .../integration/test_process_api.py | 6 +- .../unit/test_message_instance.py | 8 +- .../unit/test_message_service.py | 9 +- 12 files changed, 269 insertions(+), 93 deletions(-) create mode 100644 spiffworkflow-backend/migrations/versions/4ab08bf12666_.py diff --git a/spiffworkflow-backend/migrations/versions/4ab08bf12666_.py b/spiffworkflow-backend/migrations/versions/4ab08bf12666_.py new file mode 100644 index 00000000..d0af4066 --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/4ab08bf12666_.py @@ -0,0 +1,139 @@ +"""empty message + +Revision ID: 4ab08bf12666 +Revises: b581ff2351ad +Create Date: 2023-02-23 07:25:54.135765 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import mysql + +# revision identifiers, used by Alembic. +revision = '4ab08bf12666' +down_revision = 'b581ff2351ad' +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('message_instance_correlation', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('message_instance_id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=50), nullable=False), + sa.Column('expected_value', sa.String(length=255), nullable=True), + sa.Column('retrieval_expression', sa.String(length=255), nullable=True), + sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), + sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), + sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], ), + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique') + ) + op.create_index(op.f('ix_message_instance_correlation_expected_value'), 'message_instance_correlation', ['expected_value'], unique=False) + op.create_index(op.f('ix_message_instance_correlation_message_instance_id'), 'message_instance_correlation', ['message_instance_id'], unique=False) + + op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property') + op.drop_index('message_correlation_property_unique', table_name='message_correlation_property') + op.drop_constraint(u'message_correlation_ibfk_1', 'message_correlation', type_='foreignkey') + op.drop_table('message_correlation_property') + op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation') + op.drop_index('ix_message_correlation_name', table_name='message_correlation') + op.drop_index('ix_message_correlation_process_instance_id', table_name='message_correlation') + op.drop_index('ix_message_correlation_value', table_name='message_correlation') + op.drop_constraint(u'message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey') + op.drop_constraint(u'message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey') + op.drop_table('message_correlation') + op.drop_index('ix_message_model_identifier', table_name='message_model') + op.drop_index('ix_message_model_name', table_name='message_model') + op.drop_constraint(u'message_instance_ibfk_1', 'message_instance', type_='foreignkey') + op.drop_table('message_model') + op.drop_table('message_correlation_message_instance') + op.add_column('message_instance', sa.Column('name', sa.String(length=255), nullable=True)) + op.add_column('message_instance', sa.Column('user_id', sa.Integer(), nullable=False)) + op.add_column('message_instance', sa.Column('counterpart_id', sa.Integer(), nullable=True)) + op.alter_column('message_instance', 'process_instance_id', + existing_type=mysql.INTEGER(), + nullable=True) + op.create_foreign_key(None, 'message_instance', 'user', ['user_id'], ['id']) + op.drop_column('message_instance', 'message_model_id') + op.add_column('message_triggerable_process_model', sa.Column('message_name', sa.String(length=255), nullable=True)) + op.drop_index('message_model_id', table_name='message_triggerable_process_model') + op.drop_column('message_triggerable_process_model', 'message_model_id') + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.add_column('message_triggerable_process_model', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) + op.create_foreign_key('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', 'message_model', ['message_model_id'], ['id']) + op.create_index('message_model_id', 'message_triggerable_process_model', ['message_model_id'], unique=False) + op.drop_column('message_triggerable_process_model', 'name') + op.add_column('message_instance', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) + op.drop_constraint(None, 'message_instance', type_='foreignkey') + op.create_foreign_key('message_instance_ibfk_1', 'message_instance', 'message_model', ['message_model_id'], ['id']) + op.alter_column('message_instance', 'process_instance_id', + existing_type=mysql.INTEGER(), + nullable=False) + op.drop_column('message_instance', 'counterpart_id') + op.drop_column('message_instance', 'user_id') + op.drop_column('message_instance', 'name') + op.create_table('message_correlation_message_instance', + sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'), + sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'), + sa.PrimaryKeyConstraint('message_instance_id', 'message_correlation_id'), + mysql_collate='utf8mb4_0900_ai_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_table('message_model', + sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), + sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), + sa.Column('name', mysql.VARCHAR(length=50), nullable=True), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_0900_ai_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False) + op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False) + op.create_table('message_correlation', + sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), + sa.Column('process_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('message_correlation_property_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('name', mysql.VARCHAR(length=255), nullable=False), + sa.Column('value', mysql.VARCHAR(length=255), nullable=False), + sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint(['message_correlation_property_id'], ['message_correlation_property.id'], name='message_correlation_ibfk_1'), + sa.ForeignKeyConstraint(['process_instance_id'], ['process_instance.id'], name='message_correlation_ibfk_2'), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_0900_ai_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('message_instance_id_name_unique', 'message_correlation', ['process_instance_id', 'message_correlation_property_id', 'name'], unique=False) + op.create_index('ix_message_correlation_value', 'message_correlation', ['value'], unique=False) + op.create_index('ix_message_correlation_process_instance_id', 'message_correlation', ['process_instance_id'], unique=False) + op.create_index('ix_message_correlation_name', 'message_correlation', ['name'], unique=False) + op.create_index('ix_message_correlation_message_correlation_property_id', 'message_correlation', ['message_correlation_property_id'], unique=False) + op.create_table('message_correlation_property', + sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), + sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), + sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_0900_ai_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False) + op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False) + op.drop_index(op.f('ix_message_instance_correlation_message_instance_id'), table_name='message_instance_correlation') + op.drop_index(op.f('ix_message_instance_correlation_expected_value'), table_name='message_instance_correlation') + op.drop_table('message_instance_correlation') + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py index 196e05ca..94d93dc3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py @@ -3,17 +3,24 @@ import enum from dataclasses import dataclass from typing import Any from typing import Optional +from typing import TYPE_CHECKING +from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore from sqlalchemy import ForeignKey from sqlalchemy.event import listens_for -from sqlalchemy.orm import Session, relationship +from sqlalchemy.orm import relationship +from sqlalchemy.orm import Session from sqlalchemy.orm import validates from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.user import UserModel -from SpiffWorkflow.bpmn.PythonScriptEngine import PythonScriptEngine # type: ignore + +if TYPE_CHECKING: + from spiffworkflow_backend.models.message_instance_correlation import ( # noqa: F401 + MessageInstanceCorrelationModel, + ) class MessageTypes(enum.Enum): @@ -46,11 +53,15 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): status: str = db.Column(db.String(20), nullable=False, default="ready") user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore user = relationship("UserModel") - counterpart_id: int = db.Column(db.Integer) # Not enforcing self-referential foreign key so we can delete messages. + counterpart_id: int = db.Column( + db.Integer + ) # Not enforcing self-referential foreign key so we can delete messages. failure_cause: str = db.Column(db.Text()) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) - correlations = relationship("MessageInstanceCorrelationModel", back_populates="message_instance") + correlations = relationship( + "MessageInstanceCorrelationModel", back_populates="message_instance" + ) @validates("message_type") def validate_message_type(self, key: str, value: Any) -> Any: @@ -62,7 +73,9 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): """Validate_status.""" return self.validate_enum_field(key, value, MessageStatuses) - def correlates(self, other_message_instance: Any, expression_engine: PythonScriptEngine) -> bool: + def correlates( + self, other_message_instance: Any, expression_engine: PythonScriptEngine + ) -> bool: # This must be a receive message, and the other must be a send (otherwise we reverse the call) # We evaluate the other messages payload and run our correlation's # retrieval expressions against it, then compare it against our @@ -76,26 +89,33 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): payload = other_message_instance.payload for corr in self.correlations: try: - result = expression_engine._evaluate(corr.retrieval_expression, payload) - except Exception as e: + result = expression_engine._evaluate( + corr.retrieval_expression, payload + ) + except Exception: # the failure of a payload evaluation may not mean that matches for these # message instances can't happen with other messages. So don't error up. # fixme: Perhaps log some sort of error. return False if corr.expected_value is None: continue # We will accept any value - elif corr.expected_value != str(result): # fixme: Don't require conversion to string + elif corr.expected_value != str( + result + ): # fixme: Don't require conversion to string return False return True elif other_message_instance.message_type == MessageTypes.receive.value: return other_message_instance.correlates(self, expression_engine) return False + # This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class # so this may not be worth it or there may be a better way to do it # # https://stackoverflow.com/questions/32555829/flask-validates-decorator-multiple-fields-simultaneously/33025472#33025472 # https://docs.sqlalchemy.org/en/14/orm/session_events.html#before-flush + + @listens_for(Session, "before_flush") # type: ignore def ensure_failure_cause_is_set_if_message_instance_failed( session: Any, _flush_context: Optional[Any], _instances: Optional[Any] diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py index a0035d75..b0c9b429 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py @@ -8,13 +8,16 @@ from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel + @dataclass class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel): - """These are the correlations of a specific Message Instance - these will - only exist on receive messages. It provides the expression to run on a - send messages payload which must match the expected value to be considered - a valid match. If the expected value is null, then it does not need to - match, but the expression should still evaluate and produce a result.""" + """These are the correlations of a specific Message Instance. + + These will only exist on receive messages. It provides the expression to run on + a send messages payload which must match the expected value to be considered + a valid match. If the expected value is null, then it does not need to + match, but the expression should still evaluate and produce a result. + """ __tablename__ = "message_instance_correlation" __table_args__ = ( @@ -34,4 +37,6 @@ class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel): retrieval_expression: str = db.Column(db.String(255)) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) - message_instance = relationship("MessageInstanceModel", back_populates="correlations") \ No newline at end of file + message_instance = relationship( + "MessageInstanceModel", back_populates="correlations" + ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_triggerable_process_model.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_triggerable_process_model.py index ae10e929..24a66f3a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_triggerable_process_model.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_triggerable_process_model.py @@ -1,6 +1,4 @@ """Message_correlation_property.""" -from sqlalchemy import ForeignKey - from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import SpiffworkflowBaseDBModel diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py index 51f29842..8821630c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py @@ -12,7 +12,7 @@ from flask.wrappers import Response from spiffworkflow_backend import db from spiffworkflow_backend.exceptions.api_error import ApiError -from spiffworkflow_backend.models.message_instance import MessageInstanceModel, MessageStatuses +from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceModelSchema from spiffworkflow_backend.services.message_service import MessageService @@ -37,7 +37,7 @@ def message_instance_list( MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore MessageInstanceModel.id.desc(), # type: ignore ) - .outerjoin(ProcessInstanceModel) # Not all messages were created by a process + .outerjoin(ProcessInstanceModel) # Not all messages were created by a process .add_columns( ProcessInstanceModel.process_model_identifier, ProcessInstanceModel.process_model_display_name, @@ -66,7 +66,6 @@ def message_send( body: Dict[str, Any], ) -> flask.wrappers.Response: """Message_start.""" - if "payload" not in body: raise ( ApiError( @@ -83,10 +82,9 @@ def message_send( # Create the send message message_instance = MessageInstanceModel( - process_instance_id=None, message_type="send", name=message_name, - payload=body['payload'], + payload=body["payload"], user_id=g.user.id, correlations=[], ) @@ -102,20 +100,23 @@ def message_send( db.session.delete(message_instance) db.session.commit() raise ( - ApiError( - error_code="message_not_accepted", - message=( - "No running process instances correlate with the given message name of" - f" '{message_name}'. And this message name is not" - " currently associated with any process Start Event. Nothing to do." - ), - status_code=400, - ) + ApiError( + error_code="message_not_accepted", + message=( + "No running process instances correlate with the given message" + f" name of '{message_name}'. And this message name is not" + " currently associated with any process Start Event. Nothing" + " to do." + ), + status_code=400, + ) ) - process_instance = ProcessInstanceModel.query.filter_by(id=receiver_message.process_instance_id).first() + process_instance = ProcessInstanceModel.query.filter_by( + id=receiver_message.process_instance_id + ).first() return Response( json.dumps(ProcessInstanceModelSchema().dump(process_instance)), status=200, mimetype="application/json", - ) \ No newline at end of file + ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py index c60a7afc..e335ab6f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py @@ -91,7 +91,6 @@ class ErrorHandlingService: # Create the send message message_instance = MessageInstanceModel( - process_instance_id=None, message_type="send", name=message_name, payload=message_payload, @@ -102,8 +101,7 @@ class ErrorHandlingService: db.session.commit() process_instance = MessageService.start_process_with_message( - message_triggerable_process_model, - message_instance + message_triggerable_process_model, message_instance ) return Response( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index 6f7f6b57..ed570c5c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -7,7 +7,10 @@ from spiffworkflow_backend.models.message_triggerable_process_model import ( ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.services.process_instance_processor import ( - ProcessInstanceProcessor, CustomBpmnScriptEngine, + CustomBpmnScriptEngine, +) +from spiffworkflow_backend.services.process_instance_processor import ( + ProcessInstanceProcessor, ) from spiffworkflow_backend.services.process_instance_service import ( ProcessInstanceService, @@ -16,16 +19,20 @@ from spiffworkflow_backend.services.process_instance_service import ( class MessageServiceError(Exception): """MessageServiceError.""" - def __init__(self, msg, is_fatal=False): - self.is_fatal = is_fatal - super().__init__(msg) + class MessageService: """MessageService.""" @classmethod - def correlate_send_message(cls, message_instance_send: MessageInstanceModel): + def correlate_send_message( + cls, message_instance_send: MessageInstanceModel + ) -> MessageInstanceModel | None: + """Connects the given send message to a 'receive' message if possible. + :param message_instance_send: + :return: the message instance that received this message. + """ # Thread safe via db locking - don't try to progress the same send message over multiple instances if message_instance_send.status != MessageStatuses.ready.value: return None @@ -35,13 +42,14 @@ class MessageService: # Find available messages that might match available_receive_messages = MessageInstanceModel.query.filter_by( - name=message_instance_send.name, - status=MessageStatuses.ready.value + name=message_instance_send.name, status=MessageStatuses.ready.value ).all() message_instance_receive = None try: for message_instance in available_receive_messages: - if message_instance.correlates(message_instance_send, CustomBpmnScriptEngine()): + if message_instance.correlates( + message_instance_send, CustomBpmnScriptEngine() + ): message_instance_receive = message_instance if message_instance_receive is None: @@ -52,18 +60,26 @@ class MessageService: ).first() ) if message_triggerable_process_model: - receiving_process = MessageService.start_process_with_message(message_triggerable_process_model, - message_instance_send) + receiving_process = MessageService.start_process_with_message( + message_triggerable_process_model, message_instance_send + ) message_instance_receive = MessageInstanceModel.query.filter_by( process_instance_id=receiving_process.id, - message_type="receive", status="ready" + message_type="receive", + status="ready", ).first() else: - receiving_process = MessageService.get_process_instance_for_message_instance( - message_instance_receive) + receiving_process = ( + MessageService.get_process_instance_for_message_instance( + message_instance_receive + ) + ) # Assure we can send the message, otherwise keep going. - if message_instance_receive is None or not receiving_process.can_receive_message(): + if ( + message_instance_receive is None + or not receiving_process.can_receive_message() + ): message_instance_send.status = "ready" message_instance_send.status = "ready" db.session.add(message_instance_send) @@ -102,7 +118,7 @@ class MessageService: @classmethod def correlate_all_message_instances(cls) -> None: - """Look at ALL the Send and Receive Messages and attempt to find correlations """ + """Look at ALL the Send and Receive Messages and attempt to find correlations.""" message_instances_send = MessageInstanceModel.query.filter_by( message_type="send", status="ready" ).all() @@ -110,11 +126,10 @@ class MessageService: for message_instance_send in message_instances_send: cls.correlate_send_message(message_instance_send) - @staticmethod def start_process_with_message( - message_triggerable_process_model: MessageTriggerableProcessModel, - message_instance: MessageInstanceModel + message_triggerable_process_model: MessageTriggerableProcessModel, + message_instance: MessageInstanceModel, ) -> ProcessInstanceModel: """Start up a process instance, so it is ready to catch the event.""" process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier( @@ -126,8 +141,8 @@ class MessageService: return process_instance_receive @staticmethod - def get_process_instance_for_message_instance ( - message_instance_receive: MessageInstanceModel, + def get_process_instance_for_message_instance( + message_instance_receive: MessageInstanceModel, ) -> ProcessInstanceModel: """Process_message_receive.""" process_instance_receive = ProcessInstanceModel.query.filter_by( @@ -141,23 +156,21 @@ class MessageService: f" {message_instance_receive.id}.Tried with id" f" {message_instance_receive.process_instance_id}" ), - ), is_fatal=True + ) ) return process_instance_receive @staticmethod - def process_message_receive ( - process_instance_receive: ProcessInstanceModel, - message_instance_receive: MessageInstanceModel, - message_model_name: str, - message_payload: dict, + def process_message_receive( + process_instance_receive: ProcessInstanceModel, + message_instance_receive: MessageInstanceModel, + message_model_name: str, + message_payload: dict, ) -> None: - """ """ - + """process_message_receive.""" processor_receive = ProcessInstanceProcessor(process_instance_receive) processor_receive.bpmn_process_instance.catch_bpmn_message( - message_model_name, - message_payload + message_model_name, message_payload ) processor_receive.do_engine_steps(save=True) message_instance_receive.status = MessageStatuses.completed.value diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 01a88a83..a2cac1b9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -61,7 +61,9 @@ from spiffworkflow_backend.models.group import GroupModel from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel -from spiffworkflow_backend.models.message_instance_correlation import MessageInstanceCorrelationModel +from spiffworkflow_backend.models.message_instance_correlation import ( + MessageInstanceCorrelationModel, +) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus from spiffworkflow_backend.models.process_instance_metadata import ( @@ -1340,7 +1342,7 @@ class ProcessInstanceProcessor: for bpmn_message in bpmn_messages: message_instance = MessageInstanceModel( process_instance_id=self.process_instance_model.id, - user_id=self.process_instance_model.process_initiator_id, # TODO: use the correct swimlane user when that is set up + user_id=self.process_instance_model.process_initiator_id, # TODO: use the correct swimlane user when that is set up message_type="send", name=bpmn_message.name, payload=bpmn_message.payload, @@ -1352,15 +1354,20 @@ class ProcessInstanceProcessor: def queue_waiting_receive_messages(self) -> None: """Queue_waiting_receive_messages.""" waiting_events = self.bpmn_process_instance.waiting_events() - waiting_message_events = filter(lambda e: e['event_type'] == "Message", waiting_events) + waiting_message_events = filter( + lambda e: e["event_type"] == "Message", waiting_events + ) for event in waiting_message_events: - # Ensure we are only creating one message instance for each waiting message - if MessageInstanceModel.query.filter_by( - process_instance_id=self.process_instance_model.id, - message_type="receive", - name=event['name']).count() > 0: + if ( + MessageInstanceModel.query.filter_by( + process_instance_id=self.process_instance_model.id, + message_type="receive", + name=event["name"], + ).count() + > 0 + ): continue # Create a new Message Instance @@ -1368,14 +1375,14 @@ class ProcessInstanceProcessor: process_instance_id=self.process_instance_model.id, user_id=self.process_instance_model.process_initiator_id, message_type="receive", - name=event['name'], + name=event["name"], ) - for correlation_property in event['value']: - message_correlation = MessageInstanceCorrelationModel ( + for correlation_property in event["value"]: + message_correlation = MessageInstanceCorrelationModel( message_instance_id=message_instance.id, name=correlation_property.name, expected_value=correlation_property.expected_value, - retrieval_expression=correlation_property.retrieval_expression + retrieval_expression=correlation_property.retrieval_expression, ) message_instance.correlations.append(message_correlation) db.session.add(message_instance) @@ -1467,7 +1474,7 @@ class ProcessInstanceProcessor: spiff_logger = logging.getLogger("spiff") for handler in spiff_logger.handlers: if hasattr(handler, "bulk_insert_logs"): - handler.bulk_insert_logs() # type: ignore + handler.bulk_insert_logs() db.session.commit() if save: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py index 189842ac..3886493d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/spec_file_service.py @@ -332,7 +332,6 @@ class SpecFileService(FileSystemService): @staticmethod def update_message_cache(ref: SpecReference) -> None: """Assure we have a record in the database of all possible message ids and names.""" - pass # for message_model_identifier in ref.messages.keys(): # message_model = MessageModel.query.filter_by( # identifier=message_model_identifier @@ -374,7 +373,6 @@ class SpecFileService(FileSystemService): @staticmethod def update_correlation_cache(ref: SpecReference) -> None: """Update_correlation_cache.""" - pass # for correlation_identifier in ref.correlations.keys(): # correlation_property_retrieval_expressions = ref.correlations[ # correlation_identifier diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index 380a8b77..1a6911c2 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -2330,7 +2330,7 @@ class TestProcessApi(BaseTest): assert response.json is not None process_instance_id_one = response.json["id"] - payload['po_number'] = "1002" + payload["po_number"] = "1002" response = client.post( f"/v1.0/messages/{message_model_identifier}", content_type="application/json", @@ -2347,7 +2347,9 @@ class TestProcessApi(BaseTest): ) assert response.status_code == 200 assert response.json is not None - assert len(response.json["results"]) == 2 # Two messages, one is the completed receive, the other is new send + assert ( + len(response.json["results"]) == 2 + ) # Two messages, one is the completed receive, the other is new send assert ( response.json["results"][0]["process_instance_id"] == process_instance_id_one diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py index c1be2d1a..a216cd28 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py @@ -53,8 +53,8 @@ class TestMessageInstance(BaseTest): process_instance_id=process_instance.id, user_id=process_instance.process_initiator_id, message_type="send", - name =message_name, - payload={"Word":"Eat At Mashita's, delicious!"} + name=message_name, + payload={"Word": "Eat At Mashita's, delicious!"}, ) db.session.add(queued_message) db.session.commit() @@ -138,7 +138,7 @@ class TestMessageInstance(BaseTest): process_instance_id=process_instance.id, user_id=process_instance.process_initiator_id, message_type="BAD_MESSAGE_TYPE", - name=message_name + name=message_name, ) assert ( str(exception.value) @@ -213,5 +213,3 @@ class TestMessageInstance(BaseTest): db.session.commit() assert queued_message.id is not None assert queued_message.failure_cause == "THIS TEST FAILURE" - - diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py index 201a4dd8..d262931a 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py @@ -208,9 +208,7 @@ class TestMessageService(BaseTest): ) -> None: # Correlation Properties should match up po_curr = next(c for c in message.correlations if c.name == "po_number") - customer_curr = next( - c for c in message.correlations if c.name == "customer_id" - ) + customer_curr = next(c for c in message.correlations if c.name == "customer_id") assert po_curr is not None assert customer_curr is not None assert po_curr.expected_value == "1001" @@ -273,13 +271,12 @@ class TestMessageService(BaseTest): assert len(orig_send_messages) == 2 assert MessageInstanceModel.query.filter_by(message_type="receive").count() == 1 - # process message instances MessageService.correlate_all_message_instances() # Once complete the original send messages should be completed and two new instances # should now exist, one for each of the process instances ... -# for osm in orig_send_messages: -# assert osm.status == "completed" + # for osm in orig_send_messages: + # assert osm.status == "completed" process_instance_result = ProcessInstanceModel.query.all() assert len(process_instance_result) == 3