From 0dc2bc331695e7a1c0239cd5d70190fcd63d7387 Mon Sep 17 00:00:00 2001 From: Dan Date: Fri, 24 Feb 2023 14:53:22 -0500 Subject: [PATCH] # SpiffWorkflow: 1) Type Safe checking on correlation properties (no more str()) 2) A running workflows Correlations are once again at the key level. # Backend 1) Both send and receive messages can have correlation_keys - and we compare these to each other to quickly assure a match (if they both exist - otherwise we fall back to comparing the properties on the receive to the sending messages payload) 2) Cleaned up the migrations to just one file --- .../migrations/versions/4d47598d7181_.py | 35 ------ .../{4ab08bf12666_.py => 9f0b1662a8af_.py} | 108 ++++++++++-------- .../migrations/versions/ac40af4ddef3_.py | 33 ------ .../migrations/versions/b581ff2351ad_.py | 35 ------ .../models/message_instance.py | 85 ++++++++------ .../models/message_instance_correlation.py | 11 +- .../routes/messages_controller.py | 1 - .../services/error_handling_service.py | 3 +- .../services/process_instance_processor.py | 10 +- .../unit/test_message_instance.py | 2 +- .../unit/test_message_service.py | 16 ++- 11 files changed, 133 insertions(+), 206 deletions(-) delete mode 100644 spiffworkflow-backend/migrations/versions/4d47598d7181_.py rename spiffworkflow-backend/migrations/versions/{4ab08bf12666_.py => 9f0b1662a8af_.py} (75%) delete mode 100644 spiffworkflow-backend/migrations/versions/ac40af4ddef3_.py delete mode 100644 spiffworkflow-backend/migrations/versions/b581ff2351ad_.py diff --git a/spiffworkflow-backend/migrations/versions/4d47598d7181_.py b/spiffworkflow-backend/migrations/versions/4d47598d7181_.py deleted file mode 100644 index 721decab8..000000000 --- a/spiffworkflow-backend/migrations/versions/4d47598d7181_.py +++ /dev/null @@ -1,35 +0,0 @@ -"""empty message - -Revision ID: 4d47598d7181 -Revises: 4ab08bf12666 -Create Date: 2023-02-23 16:12:11.603900 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '4d47598d7181' -down_revision = '4ab08bf12666' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('correlation_property_cache', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('name', sa.String(length=50), nullable=False), - sa.Column('message_name', sa.String(length=50), nullable=False), - sa.Column('process_model_id', sa.String(length=255), nullable=False), - sa.Column('retrieval_expression', sa.String(length=255), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_table('correlation_property_cache') - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/4ab08bf12666_.py b/spiffworkflow-backend/migrations/versions/9f0b1662a8af_.py similarity index 75% rename from spiffworkflow-backend/migrations/versions/4ab08bf12666_.py rename to spiffworkflow-backend/migrations/versions/9f0b1662a8af_.py index fcc573158..6f3270f31 100644 --- a/spiffworkflow-backend/migrations/versions/4ab08bf12666_.py +++ b/spiffworkflow-backend/migrations/versions/9f0b1662a8af_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 4ab08bf12666 -Revises: b581ff2351ad -Create Date: 2023-02-23 07:25:54.135765 +Revision ID: 9f0b1662a8af +Revises: 63fc8d693b9f +Create Date: 2023-02-24 14:30:05.970959 """ from alembic import op @@ -10,19 +10,26 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '4ab08bf12666' -down_revision = 'b581ff2351ad' +revision = '9f0b1662a8af' +down_revision = '63fc8d693b9f' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.create_table('message_instance_correlation', + op.create_table('correlation_property_cache', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=50), nullable=False), + sa.Column('message_name', sa.String(length=50), nullable=False), + sa.Column('process_model_id', sa.String(length=255), nullable=False), + sa.Column('retrieval_expression', sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('message_instance_correlation_rule', sa.Column('id', sa.Integer(), nullable=False), sa.Column('message_instance_id', sa.Integer(), nullable=False), sa.Column('name', sa.String(length=50), nullable=False), - sa.Column('expected_value', sa.String(length=255), nullable=True), sa.Column('retrieval_expression', sa.String(length=255), nullable=True), sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), @@ -30,26 +37,29 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique') ) - op.create_index(op.f('ix_message_instance_correlation_expected_value'), 'message_instance_correlation', ['expected_value'], unique=False) - op.create_index(op.f('ix_message_instance_correlation_message_instance_id'), 'message_instance_correlation', ['message_instance_id'], unique=False) - - op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property') - op.drop_index('message_correlation_property_unique', table_name='message_correlation_property') - op.drop_constraint('message_correlation_ibfk_1', 'message_correlation', type_='foreignkey') - op.drop_table('message_correlation_property') - op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation') + op.create_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), 'message_instance_correlation_rule', ['message_instance_id'], unique=False) + op.drop_index('ix_message_model_identifier', table_name='message_model') + op.drop_index('ix_message_model_name', table_name='message_model') + op.drop_constraint('message_correlation_property_ibfk_1', 'message_correlation_property', type_='foreignkey') + op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey') + op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey') + op.drop_table('message_model') + op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey') op.drop_index('ix_message_correlation_name', table_name='message_correlation') op.drop_index('ix_message_correlation_process_instance_id', table_name='message_correlation') op.drop_index('ix_message_correlation_value', table_name='message_correlation') - op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey') - op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey') +# op.drop_index('message_instance_id_name_unique', table_name='message_correlation') +# op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation') op.drop_table('message_correlation') - op.drop_index('ix_message_model_identifier', table_name='message_model') - op.drop_index('ix_message_model_name', table_name='message_model') - op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey') - op.drop_table('message_model') + op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance') + op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance') +# op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance') op.drop_table('message_correlation_message_instance') + op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property') + op.drop_index('message_correlation_property_unique', table_name='message_correlation_property') + op.drop_table('message_correlation_property') op.add_column('message_instance', sa.Column('name', sa.String(length=255), nullable=True)) + op.add_column('message_instance', sa.Column('correlation_keys', sa.JSON(), nullable=True)) op.add_column('message_instance', sa.Column('user_id', sa.Integer(), nullable=False)) op.add_column('message_instance', sa.Column('counterpart_id', sa.Integer(), nullable=True)) op.alter_column('message_instance', 'process_instance_id', @@ -66,9 +76,8 @@ def upgrade(): def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('message_triggerable_process_model', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) - op.create_foreign_key('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', 'message_model', ['message_model_id'], ['id']) op.create_index('message_model_id', 'message_triggerable_process_model', ['message_model_id'], unique=False) - op.drop_column('message_triggerable_process_model', 'name') + op.drop_column('message_triggerable_process_model', 'message_name') op.add_column('message_instance', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) op.drop_constraint(None, 'message_instance', type_='foreignkey') op.create_foreign_key('message_instance_ibfk_1', 'message_instance', 'message_model', ['message_model_id'], ['id']) @@ -77,28 +86,36 @@ def downgrade(): nullable=False) op.drop_column('message_instance', 'counterpart_id') op.drop_column('message_instance', 'user_id') + op.drop_column('message_instance', 'correlation_keys') op.drop_column('message_instance', 'name') - op.create_table('message_correlation_message_instance', - sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), - sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False), - sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'), - sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'), - sa.PrimaryKeyConstraint('message_instance_id', 'message_correlation_id'), - mysql_collate='utf8mb4_0900_ai_ci', - mysql_default_charset='utf8mb4', - mysql_engine='InnoDB' - ) - op.create_table('message_model', + op.create_table('message_correlation_property', sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), - sa.Column('name', mysql.VARCHAR(length=50), nullable=True), + sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'), sa.PrimaryKeyConstraint('id'), mysql_collate='utf8mb4_0900_ai_ci', mysql_default_charset='utf8mb4', mysql_engine='InnoDB' ) - op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False) - op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False) + op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False) + op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False) + op.create_table('message_correlation_message_instance', + sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), + sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'), + sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_0900_ai_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False) + op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False) + op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False) op.create_table('message_correlation', sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('process_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), @@ -119,21 +136,18 @@ def downgrade(): op.create_index('ix_message_correlation_process_instance_id', 'message_correlation', ['process_instance_id'], unique=False) op.create_index('ix_message_correlation_name', 'message_correlation', ['name'], unique=False) op.create_index('ix_message_correlation_message_correlation_property_id', 'message_correlation', ['message_correlation_property_id'], unique=False) - op.create_table('message_correlation_property', + op.create_table('message_model', sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), - sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False), - sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), - sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), - sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'), + sa.Column('name', mysql.VARCHAR(length=50), nullable=True), sa.PrimaryKeyConstraint('id'), mysql_collate='utf8mb4_0900_ai_ci', mysql_default_charset='utf8mb4', mysql_engine='InnoDB' ) - op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False) - op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False) - op.drop_index(op.f('ix_message_instance_correlation_message_instance_id'), table_name='message_instance_correlation') - op.drop_index(op.f('ix_message_instance_correlation_expected_value'), table_name='message_instance_correlation') - op.drop_table('message_instance_correlation') + op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False) + op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False) + op.drop_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), table_name='message_instance_correlation_rule') + op.drop_table('message_instance_correlation_rule') + op.drop_table('correlation_property_cache') # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/ac40af4ddef3_.py b/spiffworkflow-backend/migrations/versions/ac40af4ddef3_.py deleted file mode 100644 index c66b35b84..000000000 --- a/spiffworkflow-backend/migrations/versions/ac40af4ddef3_.py +++ /dev/null @@ -1,33 +0,0 @@ -"""empty message - -Revision ID: ac40af4ddef3 -Revises: 63fc8d693b9f -Create Date: 2023-02-16 14:54:14.533029 - -""" -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql - -# revision identifiers, used by Alembic. -revision = 'ac40af4ddef3' -down_revision = '63fc8d693b9f' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance') - op.drop_column('message_correlation_message_instance', 'id') - op.create_primary_key('mcmi_pirmary_key','message_correlation_message_instance', ['message_instance_id', 'message_correlation_id']) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.add_column('message_correlation_message_instance', sa.Column('id', mysql.INTEGER(), autoincrement=False, nullable=False)) - op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False) - op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False) - op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False) - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py b/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py deleted file mode 100644 index 761edee36..000000000 --- a/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py +++ /dev/null @@ -1,35 +0,0 @@ -"""empty message - -Revision ID: b581ff2351ad -Revises: ac40af4ddef3 -Create Date: 2023-02-19 11:19:24.371528 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'b581ff2351ad' -down_revision = 'ac40af4ddef3' -branch_labels = None -depends_on = None - - -def upgrade(): - - # Just some table cleanup so the join table is properly built - op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey") - op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey") - op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance') - op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance') - op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id']) - op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id']) - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property') - op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False) - op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False) - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py index 418bc7eb2..95f687a39 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py @@ -19,7 +19,7 @@ from spiffworkflow_backend.models.user import UserModel if TYPE_CHECKING: from spiffworkflow_backend.models.message_instance_correlation import ( # noqa: F401 - MessageInstanceCorrelationModel, + MessageInstanceCorrelationRuleModel, ) @@ -49,7 +49,9 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore name: str = db.Column(db.String(255)) message_type: str = db.Column(db.String(20), nullable=False) + # Only Send Messages have a payload payload: dict = db.Column(db.JSON) + correlation_keys: dict = db.Column(db.JSON) # The correlation keys of the process at the time the message was created. status: str = db.Column(db.String(20), nullable=False, default="ready") user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore user = relationship("UserModel") @@ -59,8 +61,8 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): failure_cause: str = db.Column(db.Text()) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) - correlations = relationship( - "MessageInstanceCorrelationModel", back_populates="message_instance" + correlation_rules = relationship( + "MessageInstanceCorrelationRuleModel", back_populates="message_instance" ) @validates("message_type") @@ -74,40 +76,59 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): return self.validate_enum_field(key, value, MessageStatuses) def correlates( - self, other_message_instance: Any, expression_engine: PythonScriptEngine + self, other: Any, expression_engine: PythonScriptEngine ) -> bool: - # This must be a receive message, and the other must be a send (otherwise we reverse the call) - # We evaluate the other messages payload and run our correlation's - # retrieval expressions against it, then compare it against our - # expected values -- IF we don't have an expected value, we accept - # any non-erroring result from the retrieval expression. - if self.name != other_message_instance.name: + """ Returns true if the this Message correlates with the given message. + + This must be a 'receive' message, and the other must be a 'send' or vice/versa. We evaluate + the other messages payload and run our correlation's retrieval expressions against it, then + compare it against our expected values (as stored in this messages' correlation_keys) + IF we don't have an expected value, we accept any non-error result from the retrieval + expression. """ + if self.is_send() and other.is_receive(): + # Flip the call. + return other.correlates(self, expression_engine) # type: ignore + + if self.name != other.name: return False - if self.message_type == MessageTypes.receive.value: - if other_message_instance.message_type != MessageTypes.send.value: - return False - payload = other_message_instance.payload - for corr in self.correlations: - try: - result = expression_engine._evaluate( - corr.retrieval_expression, payload - ) - except Exception: - # the failure of a payload evaluation may not mean that matches for these - # message instances can't happen with other messages. So don't error up. - # fixme: Perhaps log some sort of error. - return False - if corr.expected_value is None: - continue # We will accept any value - elif corr.expected_value != str( - result - ): # fixme: Don't require conversion to string - return False + if not self.is_receive(): + return False + if isinstance(self.correlation_keys, dict) and self.correlation_keys == other.correlation_keys: + # We know we have a match, and we can just return if we don't have to figure out the key return True - elif other_message_instance.message_type == MessageTypes.receive.value: - return other_message_instance.correlates(self, expression_engine) # type: ignore + + # Loop over the receives' correlation keys - if any of the keys fully match, then we match. + for expected_values in self.correlation_keys.values(): + if self.payload_matches_expected_values(other.payload, expected_values, expression_engine): + return True return False + def is_receive(self): + return self.message_type == MessageTypes.receive.value + + def is_send(self): + return self.message_type == MessageTypes.send.value + + def payload_matches_expected_values( + self, payload: dict, + expected_values: dict, + expression_engine: PythonScriptEngine) -> bool: + """Compares the payload of a 'send' message against a single correlation key's expected values.""" + for correlation_key in self.correlation_rules: + expected_value = expected_values.get(correlation_key.name, None) + if expected_value is None: # This key is not required for this instance to match. + continue + try: + result = expression_engine._evaluate(correlation_key.retrieval_expression, payload) + except Exception: + # the failure of a payload evaluation may not mean that matches for these + # message instances can't happen with other messages. So don't error up. + # fixme: Perhaps log some sort of error. + return False + if result != expected_value: + return False + return True + # This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class # so this may not be worth it or there may be a better way to do it diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py index 024d3bca8..7431a273f 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py @@ -10,16 +10,16 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel @dataclass -class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel): +class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel): """These are the correlations of a specific Message Instance. These will only exist on receive messages. It provides the expression to run on - a send messages payload which must match the expected value to be considered - a valid match. If the expected value is null, then it does not need to + a send messages payload which must match receive messages correlation_key dictionary + to be considered a valid match. If the expected value is null, then it does not need to match, but the expression should still evaluate and produce a result. """ - __tablename__ = "message_instance_correlation" + __tablename__ = "message_instance_correlation_rule" __table_args__ = ( db.UniqueConstraint( "message_instance_id", @@ -33,10 +33,9 @@ class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel): ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore ) name: str = db.Column(db.String(50), nullable=False) - expected_value: str = db.Column(db.String(255), nullable=True, index=True) retrieval_expression: str = db.Column(db.String(255)) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) message_instance = relationship( - "MessageInstanceModel", back_populates="correlations" + "MessageInstanceModel", back_populates="correlation_rules" ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py index 8821630c7..1c86fddbb 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py @@ -86,7 +86,6 @@ def message_send( name=message_name, payload=body["payload"], user_id=g.user.id, - correlations=[], ) db.session.add(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py index e335ab6f1..56b3f547a 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py @@ -94,8 +94,7 @@ class ErrorHandlingService: message_type="send", name=message_name, payload=message_payload, - user_id=g.user.id, - correlations=[], + user_id=g.user.id ) db.session.add(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 39ee59689..51f6ba81e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -62,7 +62,7 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import ( - MessageInstanceCorrelationModel, + MessageInstanceCorrelationRuleModel ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus @@ -1352,7 +1352,7 @@ class ProcessInstanceProcessor: message_type="send", name=bpmn_message.name, payload=bpmn_message.payload, - correlations=[], + correlation_keys=self.bpmn_process_instance.correlations ) db.session.add(message_instance) db.session.commit() @@ -1382,15 +1382,15 @@ class ProcessInstanceProcessor: user_id=self.process_instance_model.process_initiator_id, message_type="receive", name=event["name"], + correlation_keys=self.bpmn_process_instance.correlations ) for correlation_property in event["value"]: - message_correlation = MessageInstanceCorrelationModel( + message_correlation = MessageInstanceCorrelationRuleModel( message_instance_id=message_instance.id, name=correlation_property.name, - expected_value=correlation_property.expected_value, retrieval_expression=correlation_property.retrieval_expression, ) - message_instance.correlations.append(message_correlation) + message_instance.correlation_rules.append(message_correlation) db.session.add(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py index a216cd288..b48bc239c 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py @@ -54,7 +54,7 @@ class TestMessageInstance(BaseTest): user_id=process_instance.process_initiator_id, message_type="send", name=message_name, - payload={"Word": "Eat At Mashita's, delicious!"}, + payload={"Word": "Eat At Mashita's, its delicious!"}, ) db.session.add(queued_message) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py index d262931a6..986951e9a 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py @@ -43,7 +43,7 @@ class TestMessageService(BaseTest): "amount": "100.00", } - self.start_sender_process(client, with_super_admin_user) + self.start_sender_process(client, with_super_admin_user, "test_from_api") self.assure_a_message_was_sent() self.assure_there_is_a_process_waiting_on_a_message() @@ -105,7 +105,7 @@ class TestMessageService(BaseTest): ) # Now start the main process - self.start_sender_process(client, with_super_admin_user) + self.start_sender_process(client, with_super_admin_user, "test_between_processes") self.assure_a_message_was_sent() # This is typically called in a background cron process, so we will manually call it @@ -145,9 +145,9 @@ class TestMessageService(BaseTest): assert message_receiver_process.status == "complete" def start_sender_process( - self, client: FlaskClient, with_super_admin_user: UserModel + self, client: FlaskClient, with_super_admin_user: UserModel, group_name:str = "test_group" ) -> None: - process_group_id = "test_group" + process_group_id = group_name self.create_process_group( client, with_super_admin_user, process_group_id, process_group_id ) @@ -186,7 +186,7 @@ class TestMessageService(BaseTest): assert len(send_messages) == 1 send_message = send_messages[0] assert ( - send_message.payload == self.payload + send_message.payload == self.payload ), "The send message should match up with the payload" assert send_message.name == "Request Approval" assert send_message.status == "ready" @@ -207,12 +207,10 @@ class TestMessageService(BaseTest): self, message: MessageInstanceModel ) -> None: # Correlation Properties should match up - po_curr = next(c for c in message.correlations if c.name == "po_number") - customer_curr = next(c for c in message.correlations if c.name == "customer_id") + po_curr = next(c for c in message.correlation_rules if c.name == "po_number") + customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id") assert po_curr is not None assert customer_curr is not None - assert po_curr.expected_value == "1001" - assert customer_curr.expected_value == "Sartography" def test_can_send_message_to_multiple_process_models( self,