diff --git a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py index ef76270c..67043b85 100644 --- a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py +++ b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_definitions.py @@ -165,7 +165,6 @@ class CorrelationProperty: self.name = name # This is the property name self.retrieval_expression = retrieval_expression # This is how it's generated self.correlation_keys = correlation_keys # These are the keys it's used by - self.expected_value = expected_value # This is the (optional) expected value class MessageEventDefinition(NamedEventDefinition): """The default message event.""" @@ -206,34 +205,23 @@ class MessageEventDefinition(NamedEventDefinition): if payload is not None: my_task.set_data(**payload) - def get_correlation_keys(self, task, payload): + def get_correlations(self, task, payload): correlation_keys = {} for property in self.correlation_properties: for key in property.correlation_keys: if key not in correlation_keys: correlation_keys[key] = {} - correlation_keys[key][property.name] = task.workflow.script_engine._evaluate(property.expression, payload) + try: + correlation_keys[key][property.name] = task.workflow.script_engine._evaluate(property.retrieval_expression, payload) + except WorkflowException as we: + we.add_note( + f"Failed to evaluate correlation property '{property.name}'" + f" invalid expression '{property.retrieval_expression}'") + we.task_spec = task.task_spec + raise we return correlation_keys - def get_correlations(self, task, payload): - correlations = {} - for property in self.correlation_properties: - try: - correlations[property.name] = task.workflow.script_engine._evaluate(property.retrieval_expression, payload) - except WorkflowException as we: - we.add_note(f"Failed to evaluate correlation property '{property.name}'" - f" invalid expression '{property.retrieval_expression}'") - we.task_spec = task.task_spec - raise we - return correlations - def get_awaiting_correlations(self, task): - for prop in self.correlation_properties: - if prop.name in task.workflow.correlations: - prop.expected_value = task.workflow.correlations.get(prop.name) - else: - prop.expected_value = None - return self.correlation_properties class NoneEventDefinition(EventDefinition): """ diff --git a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_types.py b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_types.py index 846c6031..e1a01853 100644 --- a/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_types.py +++ b/SpiffWorkflow/SpiffWorkflow/bpmn/specs/events/event_types.py @@ -36,7 +36,7 @@ class CatchingEvent(Simple, BpmnSpecMixin): def catches(self, my_task, event_definition, correlations=None): if self.event_definition == event_definition: - return all([ str(correlations.get(key)) == str(my_task.workflow.correlations.get(key)) for key in correlations ]) + return all([correlations.get(key) == my_task.workflow.correlations.get(key) for key in correlations ]) else: return False diff --git a/SpiffWorkflow/SpiffWorkflow/bpmn/workflow.py b/SpiffWorkflow/SpiffWorkflow/bpmn/workflow.py index e6a41441..3f693981 100644 --- a/SpiffWorkflow/SpiffWorkflow/bpmn/workflow.py +++ b/SpiffWorkflow/SpiffWorkflow/bpmn/workflow.py @@ -179,8 +179,7 @@ class BpmnWorkflow(Workflow): if isinstance(event_definition, TimerEventDefinition): value = event_definition.timer_value(task) elif isinstance(event_definition, MessageEventDefinition): - value = event_definition.get_awaiting_correlations(task) - + value = event_definition.correlation_properties events.append({ 'event_type': event_definition.event_type, 'name': event_definition.name if isinstance(event_definition, NamedEventDefinition) else None, diff --git a/SpiffWorkflow/tests/SpiffWorkflow/bpmn/CollaborationTest.py b/SpiffWorkflow/tests/SpiffWorkflow/bpmn/CollaborationTest.py index d2e27307..77e54726 100644 --- a/SpiffWorkflow/tests/SpiffWorkflow/bpmn/CollaborationTest.py +++ b/SpiffWorkflow/tests/SpiffWorkflow/bpmn/CollaborationTest.py @@ -56,7 +56,6 @@ class CollaborationTest(BpmnWorkflowTestCase): self.assertEqual(1, len(events)) self.assertEqual("Message", events[0]['event_type']) self.assertEqual("Love Letter Response", events[0]['name']) - self.assertEqual('Peggy', events[0]['value'][0].expected_value) self.assertEqual(['lover'], events[0]['value'][0].correlation_keys) self.assertEqual('from_name', events[0]['value'][0].retrieval_expression) self.assertEqual('lover_name', events[0]['value'][0].name) diff --git a/spiffworkflow-backend/migrations/versions/4d47598d7181_.py b/spiffworkflow-backend/migrations/versions/4d47598d7181_.py deleted file mode 100644 index 721decab..00000000 --- a/spiffworkflow-backend/migrations/versions/4d47598d7181_.py +++ /dev/null @@ -1,35 +0,0 @@ -"""empty message - -Revision ID: 4d47598d7181 -Revises: 4ab08bf12666 -Create Date: 2023-02-23 16:12:11.603900 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = '4d47598d7181' -down_revision = '4ab08bf12666' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.create_table('correlation_property_cache', - sa.Column('id', sa.Integer(), nullable=False), - sa.Column('name', sa.String(length=50), nullable=False), - sa.Column('message_name', sa.String(length=50), nullable=False), - sa.Column('process_model_id', sa.String(length=255), nullable=False), - sa.Column('retrieval_expression', sa.String(length=255), nullable=True), - sa.PrimaryKeyConstraint('id') - ) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_table('correlation_property_cache') - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/4ab08bf12666_.py b/spiffworkflow-backend/migrations/versions/9f0b1662a8af_.py similarity index 75% rename from spiffworkflow-backend/migrations/versions/4ab08bf12666_.py rename to spiffworkflow-backend/migrations/versions/9f0b1662a8af_.py index fcc57315..6f3270f3 100644 --- a/spiffworkflow-backend/migrations/versions/4ab08bf12666_.py +++ b/spiffworkflow-backend/migrations/versions/9f0b1662a8af_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 4ab08bf12666 -Revises: b581ff2351ad -Create Date: 2023-02-23 07:25:54.135765 +Revision ID: 9f0b1662a8af +Revises: 63fc8d693b9f +Create Date: 2023-02-24 14:30:05.970959 """ from alembic import op @@ -10,19 +10,26 @@ import sqlalchemy as sa from sqlalchemy.dialects import mysql # revision identifiers, used by Alembic. -revision = '4ab08bf12666' -down_revision = 'b581ff2351ad' +revision = '9f0b1662a8af' +down_revision = '63fc8d693b9f' branch_labels = None depends_on = None def upgrade(): # ### commands auto generated by Alembic - please adjust! ### - op.create_table('message_instance_correlation', + op.create_table('correlation_property_cache', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('name', sa.String(length=50), nullable=False), + sa.Column('message_name', sa.String(length=50), nullable=False), + sa.Column('process_model_id', sa.String(length=255), nullable=False), + sa.Column('retrieval_expression', sa.String(length=255), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + op.create_table('message_instance_correlation_rule', sa.Column('id', sa.Integer(), nullable=False), sa.Column('message_instance_id', sa.Integer(), nullable=False), sa.Column('name', sa.String(length=50), nullable=False), - sa.Column('expected_value', sa.String(length=255), nullable=True), sa.Column('retrieval_expression', sa.String(length=255), nullable=True), sa.Column('updated_at_in_seconds', sa.Integer(), nullable=True), sa.Column('created_at_in_seconds', sa.Integer(), nullable=True), @@ -30,26 +37,29 @@ def upgrade(): sa.PrimaryKeyConstraint('id'), sa.UniqueConstraint('message_instance_id', 'name', name='message_instance_id_name_unique') ) - op.create_index(op.f('ix_message_instance_correlation_expected_value'), 'message_instance_correlation', ['expected_value'], unique=False) - op.create_index(op.f('ix_message_instance_correlation_message_instance_id'), 'message_instance_correlation', ['message_instance_id'], unique=False) - - op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property') - op.drop_index('message_correlation_property_unique', table_name='message_correlation_property') - op.drop_constraint('message_correlation_ibfk_1', 'message_correlation', type_='foreignkey') - op.drop_table('message_correlation_property') - op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation') + op.create_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), 'message_instance_correlation_rule', ['message_instance_id'], unique=False) + op.drop_index('ix_message_model_identifier', table_name='message_model') + op.drop_index('ix_message_model_name', table_name='message_model') + op.drop_constraint('message_correlation_property_ibfk_1', 'message_correlation_property', type_='foreignkey') + op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey') + op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey') + op.drop_table('message_model') + op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey') op.drop_index('ix_message_correlation_name', table_name='message_correlation') op.drop_index('ix_message_correlation_process_instance_id', table_name='message_correlation') op.drop_index('ix_message_correlation_value', table_name='message_correlation') - op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_='foreignkey') - op.drop_constraint('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', type_='foreignkey') +# op.drop_index('message_instance_id_name_unique', table_name='message_correlation') +# op.drop_index('ix_message_correlation_message_correlation_property_id', table_name='message_correlation') op.drop_table('message_correlation') - op.drop_index('ix_message_model_identifier', table_name='message_model') - op.drop_index('ix_message_model_name', table_name='message_model') - op.drop_constraint('message_instance_ibfk_1', 'message_instance', type_='foreignkey') - op.drop_table('message_model') + op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance') + op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance') +# op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance') op.drop_table('message_correlation_message_instance') + op.drop_index('ix_message_correlation_property_identifier', table_name='message_correlation_property') + op.drop_index('message_correlation_property_unique', table_name='message_correlation_property') + op.drop_table('message_correlation_property') op.add_column('message_instance', sa.Column('name', sa.String(length=255), nullable=True)) + op.add_column('message_instance', sa.Column('correlation_keys', sa.JSON(), nullable=True)) op.add_column('message_instance', sa.Column('user_id', sa.Integer(), nullable=False)) op.add_column('message_instance', sa.Column('counterpart_id', sa.Integer(), nullable=True)) op.alter_column('message_instance', 'process_instance_id', @@ -66,9 +76,8 @@ def upgrade(): def downgrade(): # ### commands auto generated by Alembic - please adjust! ### op.add_column('message_triggerable_process_model', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) - op.create_foreign_key('message_triggerable_process_model_ibfk_1', 'message_triggerable_process_model', 'message_model', ['message_model_id'], ['id']) op.create_index('message_model_id', 'message_triggerable_process_model', ['message_model_id'], unique=False) - op.drop_column('message_triggerable_process_model', 'name') + op.drop_column('message_triggerable_process_model', 'message_name') op.add_column('message_instance', sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False)) op.drop_constraint(None, 'message_instance', type_='foreignkey') op.create_foreign_key('message_instance_ibfk_1', 'message_instance', 'message_model', ['message_model_id'], ['id']) @@ -77,28 +86,36 @@ def downgrade(): nullable=False) op.drop_column('message_instance', 'counterpart_id') op.drop_column('message_instance', 'user_id') + op.drop_column('message_instance', 'correlation_keys') op.drop_column('message_instance', 'name') - op.create_table('message_correlation_message_instance', - sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), - sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False), - sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'), - sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'), - sa.PrimaryKeyConstraint('message_instance_id', 'message_correlation_id'), - mysql_collate='utf8mb4_0900_ai_ci', - mysql_default_charset='utf8mb4', - mysql_engine='InnoDB' - ) - op.create_table('message_model', + op.create_table('message_correlation_property', sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), - sa.Column('name', mysql.VARCHAR(length=50), nullable=True), + sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), + sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'), sa.PrimaryKeyConstraint('id'), mysql_collate='utf8mb4_0900_ai_ci', mysql_default_charset='utf8mb4', mysql_engine='InnoDB' ) - op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False) - op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False) + op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False) + op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False) + op.create_table('message_correlation_message_instance', + sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), + sa.Column('message_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.Column('message_correlation_id', mysql.INTEGER(), autoincrement=False, nullable=False), + sa.ForeignKeyConstraint(['message_correlation_id'], ['message_correlation.id'], name='message_correlation_message_instance_ibfk_1'), + sa.ForeignKeyConstraint(['message_instance_id'], ['message_instance.id'], name='message_correlation_message_instance_ibfk_2'), + sa.PrimaryKeyConstraint('id'), + mysql_collate='utf8mb4_0900_ai_ci', + mysql_default_charset='utf8mb4', + mysql_engine='InnoDB' + ) + op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False) + op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False) + op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False) op.create_table('message_correlation', sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('process_instance_id', mysql.INTEGER(), autoincrement=False, nullable=False), @@ -119,21 +136,18 @@ def downgrade(): op.create_index('ix_message_correlation_process_instance_id', 'message_correlation', ['process_instance_id'], unique=False) op.create_index('ix_message_correlation_name', 'message_correlation', ['name'], unique=False) op.create_index('ix_message_correlation_message_correlation_property_id', 'message_correlation', ['message_correlation_property_id'], unique=False) - op.create_table('message_correlation_property', + op.create_table('message_model', sa.Column('id', mysql.INTEGER(), autoincrement=True, nullable=False), sa.Column('identifier', mysql.VARCHAR(length=50), nullable=True), - sa.Column('message_model_id', mysql.INTEGER(), autoincrement=False, nullable=False), - sa.Column('updated_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), - sa.Column('created_at_in_seconds', mysql.INTEGER(), autoincrement=False, nullable=True), - sa.ForeignKeyConstraint(['message_model_id'], ['message_model.id'], name='message_correlation_property_ibfk_1'), + sa.Column('name', mysql.VARCHAR(length=50), nullable=True), sa.PrimaryKeyConstraint('id'), mysql_collate='utf8mb4_0900_ai_ci', mysql_default_charset='utf8mb4', mysql_engine='InnoDB' ) - op.create_index('message_correlation_property_unique', 'message_correlation_property', ['identifier', 'message_model_id'], unique=False) - op.create_index('ix_message_correlation_property_identifier', 'message_correlation_property', ['identifier'], unique=False) - op.drop_index(op.f('ix_message_instance_correlation_message_instance_id'), table_name='message_instance_correlation') - op.drop_index(op.f('ix_message_instance_correlation_expected_value'), table_name='message_instance_correlation') - op.drop_table('message_instance_correlation') + op.create_index('ix_message_model_name', 'message_model', ['name'], unique=False) + op.create_index('ix_message_model_identifier', 'message_model', ['identifier'], unique=False) + op.drop_index(op.f('ix_message_instance_correlation_rule_message_instance_id'), table_name='message_instance_correlation_rule') + op.drop_table('message_instance_correlation_rule') + op.drop_table('correlation_property_cache') # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/ac40af4ddef3_.py b/spiffworkflow-backend/migrations/versions/ac40af4ddef3_.py deleted file mode 100644 index c66b35b8..00000000 --- a/spiffworkflow-backend/migrations/versions/ac40af4ddef3_.py +++ /dev/null @@ -1,33 +0,0 @@ -"""empty message - -Revision ID: ac40af4ddef3 -Revises: 63fc8d693b9f -Create Date: 2023-02-16 14:54:14.533029 - -""" -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import mysql - -# revision identifiers, used by Alembic. -revision = 'ac40af4ddef3' -down_revision = '63fc8d693b9f' -branch_labels = None -depends_on = None - - -def upgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_index('message_correlation_message_instance_unique', table_name='message_correlation_message_instance') - op.drop_column('message_correlation_message_instance', 'id') - op.create_primary_key('mcmi_pirmary_key','message_correlation_message_instance', ['message_instance_id', 'message_correlation_id']) - # ### end Alembic commands ### - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.add_column('message_correlation_message_instance', sa.Column('id', mysql.INTEGER(), autoincrement=False, nullable=False)) - op.create_index('message_correlation_message_instance_unique', 'message_correlation_message_instance', ['message_instance_id', 'message_correlation_id'], unique=False) - op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False) - op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False) - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py b/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py deleted file mode 100644 index 761edee3..00000000 --- a/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py +++ /dev/null @@ -1,35 +0,0 @@ -"""empty message - -Revision ID: b581ff2351ad -Revises: ac40af4ddef3 -Create Date: 2023-02-19 11:19:24.371528 - -""" -from alembic import op -import sqlalchemy as sa - - -# revision identifiers, used by Alembic. -revision = 'b581ff2351ad' -down_revision = 'ac40af4ddef3' -branch_labels = None -depends_on = None - - -def upgrade(): - - # Just some table cleanup so the join table is properly built - op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey") - op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey") - op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance') - op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance') - op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id']) - op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id']) - - -def downgrade(): - # ### commands auto generated by Alembic - please adjust! ### - op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property') - op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False) - op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False) - # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py index 418bc7eb..95f687a3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py @@ -19,7 +19,7 @@ from spiffworkflow_backend.models.user import UserModel if TYPE_CHECKING: from spiffworkflow_backend.models.message_instance_correlation import ( # noqa: F401 - MessageInstanceCorrelationModel, + MessageInstanceCorrelationRuleModel, ) @@ -49,7 +49,9 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): process_instance_id: int = db.Column(ForeignKey(ProcessInstanceModel.id), nullable=True) # type: ignore name: str = db.Column(db.String(255)) message_type: str = db.Column(db.String(20), nullable=False) + # Only Send Messages have a payload payload: dict = db.Column(db.JSON) + correlation_keys: dict = db.Column(db.JSON) # The correlation keys of the process at the time the message was created. status: str = db.Column(db.String(20), nullable=False, default="ready") user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) # type: ignore user = relationship("UserModel") @@ -59,8 +61,8 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): failure_cause: str = db.Column(db.Text()) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) - correlations = relationship( - "MessageInstanceCorrelationModel", back_populates="message_instance" + correlation_rules = relationship( + "MessageInstanceCorrelationRuleModel", back_populates="message_instance" ) @validates("message_type") @@ -74,40 +76,59 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): return self.validate_enum_field(key, value, MessageStatuses) def correlates( - self, other_message_instance: Any, expression_engine: PythonScriptEngine + self, other: Any, expression_engine: PythonScriptEngine ) -> bool: - # This must be a receive message, and the other must be a send (otherwise we reverse the call) - # We evaluate the other messages payload and run our correlation's - # retrieval expressions against it, then compare it against our - # expected values -- IF we don't have an expected value, we accept - # any non-erroring result from the retrieval expression. - if self.name != other_message_instance.name: + """ Returns true if the this Message correlates with the given message. + + This must be a 'receive' message, and the other must be a 'send' or vice/versa. We evaluate + the other messages payload and run our correlation's retrieval expressions against it, then + compare it against our expected values (as stored in this messages' correlation_keys) + IF we don't have an expected value, we accept any non-error result from the retrieval + expression. """ + if self.is_send() and other.is_receive(): + # Flip the call. + return other.correlates(self, expression_engine) # type: ignore + + if self.name != other.name: return False - if self.message_type == MessageTypes.receive.value: - if other_message_instance.message_type != MessageTypes.send.value: - return False - payload = other_message_instance.payload - for corr in self.correlations: - try: - result = expression_engine._evaluate( - corr.retrieval_expression, payload - ) - except Exception: - # the failure of a payload evaluation may not mean that matches for these - # message instances can't happen with other messages. So don't error up. - # fixme: Perhaps log some sort of error. - return False - if corr.expected_value is None: - continue # We will accept any value - elif corr.expected_value != str( - result - ): # fixme: Don't require conversion to string - return False + if not self.is_receive(): + return False + if isinstance(self.correlation_keys, dict) and self.correlation_keys == other.correlation_keys: + # We know we have a match, and we can just return if we don't have to figure out the key return True - elif other_message_instance.message_type == MessageTypes.receive.value: - return other_message_instance.correlates(self, expression_engine) # type: ignore + + # Loop over the receives' correlation keys - if any of the keys fully match, then we match. + for expected_values in self.correlation_keys.values(): + if self.payload_matches_expected_values(other.payload, expected_values, expression_engine): + return True return False + def is_receive(self): + return self.message_type == MessageTypes.receive.value + + def is_send(self): + return self.message_type == MessageTypes.send.value + + def payload_matches_expected_values( + self, payload: dict, + expected_values: dict, + expression_engine: PythonScriptEngine) -> bool: + """Compares the payload of a 'send' message against a single correlation key's expected values.""" + for correlation_key in self.correlation_rules: + expected_value = expected_values.get(correlation_key.name, None) + if expected_value is None: # This key is not required for this instance to match. + continue + try: + result = expression_engine._evaluate(correlation_key.retrieval_expression, payload) + except Exception: + # the failure of a payload evaluation may not mean that matches for these + # message instances can't happen with other messages. So don't error up. + # fixme: Perhaps log some sort of error. + return False + if result != expected_value: + return False + return True + # This runs for ALL db flushes for ANY model, not just this one even if it's in the MessageInstanceModel class # so this may not be worth it or there may be a better way to do it diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py index 024d3bca..7431a273 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance_correlation.py @@ -10,16 +10,16 @@ from spiffworkflow_backend.models.message_instance import MessageInstanceModel @dataclass -class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel): +class MessageInstanceCorrelationRuleModel(SpiffworkflowBaseDBModel): """These are the correlations of a specific Message Instance. These will only exist on receive messages. It provides the expression to run on - a send messages payload which must match the expected value to be considered - a valid match. If the expected value is null, then it does not need to + a send messages payload which must match receive messages correlation_key dictionary + to be considered a valid match. If the expected value is null, then it does not need to match, but the expression should still evaluate and produce a result. """ - __tablename__ = "message_instance_correlation" + __tablename__ = "message_instance_correlation_rule" __table_args__ = ( db.UniqueConstraint( "message_instance_id", @@ -33,10 +33,9 @@ class MessageInstanceCorrelationModel(SpiffworkflowBaseDBModel): ForeignKey(MessageInstanceModel.id), nullable=False, index=True # type: ignore ) name: str = db.Column(db.String(50), nullable=False) - expected_value: str = db.Column(db.String(255), nullable=True, index=True) retrieval_expression: str = db.Column(db.String(255)) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) message_instance = relationship( - "MessageInstanceModel", back_populates="correlations" + "MessageInstanceModel", back_populates="correlation_rules" ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py index 8821630c..1c86fddb 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py @@ -86,7 +86,6 @@ def message_send( name=message_name, payload=body["payload"], user_id=g.user.id, - correlations=[], ) db.session.add(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py index e335ab6f..56b3f547 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/error_handling_service.py @@ -94,8 +94,7 @@ class ErrorHandlingService: message_type="send", name=message_name, payload=message_payload, - user_id=g.user.id, - correlations=[], + user_id=g.user.id ) db.session.add(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 39ee5968..51f6ba81 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -62,7 +62,7 @@ from spiffworkflow_backend.models.human_task import HumanTaskModel from spiffworkflow_backend.models.human_task_user import HumanTaskUserModel from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance_correlation import ( - MessageInstanceCorrelationModel, + MessageInstanceCorrelationRuleModel ) from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.process_instance import ProcessInstanceStatus @@ -1352,7 +1352,7 @@ class ProcessInstanceProcessor: message_type="send", name=bpmn_message.name, payload=bpmn_message.payload, - correlations=[], + correlation_keys=self.bpmn_process_instance.correlations ) db.session.add(message_instance) db.session.commit() @@ -1382,15 +1382,15 @@ class ProcessInstanceProcessor: user_id=self.process_instance_model.process_initiator_id, message_type="receive", name=event["name"], + correlation_keys=self.bpmn_process_instance.correlations ) for correlation_property in event["value"]: - message_correlation = MessageInstanceCorrelationModel( + message_correlation = MessageInstanceCorrelationRuleModel( message_instance_id=message_instance.id, name=correlation_property.name, - expected_value=correlation_property.expected_value, retrieval_expression=correlation_property.retrieval_expression, ) - message_instance.correlations.append(message_correlation) + message_instance.correlation_rules.append(message_correlation) db.session.add(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py index a216cd28..b48bc239 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_instance.py @@ -54,7 +54,7 @@ class TestMessageInstance(BaseTest): user_id=process_instance.process_initiator_id, message_type="send", name=message_name, - payload={"Word": "Eat At Mashita's, delicious!"}, + payload={"Word": "Eat At Mashita's, its delicious!"}, ) db.session.add(queued_message) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py index d262931a..986951e9 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/unit/test_message_service.py @@ -43,7 +43,7 @@ class TestMessageService(BaseTest): "amount": "100.00", } - self.start_sender_process(client, with_super_admin_user) + self.start_sender_process(client, with_super_admin_user, "test_from_api") self.assure_a_message_was_sent() self.assure_there_is_a_process_waiting_on_a_message() @@ -105,7 +105,7 @@ class TestMessageService(BaseTest): ) # Now start the main process - self.start_sender_process(client, with_super_admin_user) + self.start_sender_process(client, with_super_admin_user, "test_between_processes") self.assure_a_message_was_sent() # This is typically called in a background cron process, so we will manually call it @@ -145,9 +145,9 @@ class TestMessageService(BaseTest): assert message_receiver_process.status == "complete" def start_sender_process( - self, client: FlaskClient, with_super_admin_user: UserModel + self, client: FlaskClient, with_super_admin_user: UserModel, group_name:str = "test_group" ) -> None: - process_group_id = "test_group" + process_group_id = group_name self.create_process_group( client, with_super_admin_user, process_group_id, process_group_id ) @@ -186,7 +186,7 @@ class TestMessageService(BaseTest): assert len(send_messages) == 1 send_message = send_messages[0] assert ( - send_message.payload == self.payload + send_message.payload == self.payload ), "The send message should match up with the payload" assert send_message.name == "Request Approval" assert send_message.status == "ready" @@ -207,12 +207,10 @@ class TestMessageService(BaseTest): self, message: MessageInstanceModel ) -> None: # Correlation Properties should match up - po_curr = next(c for c in message.correlations if c.name == "po_number") - customer_curr = next(c for c in message.correlations if c.name == "customer_id") + po_curr = next(c for c in message.correlation_rules if c.name == "po_number") + customer_curr = next(c for c in message.correlation_rules if c.name == "customer_id") assert po_curr is not None assert customer_curr is not None - assert po_curr.expected_value == "1001" - assert customer_curr.expected_value == "Sartography" def test_can_send_message_to_multiple_process_models( self,