From 5f6a61c93fd2c7989eee9c81efc8afd5d066b580 Mon Sep 17 00:00:00 2001 From: Dan Date: Mon, 20 Feb 2023 11:50:24 -0500 Subject: [PATCH] * SpiffWorkflow event_definitions wanted to return a message event's correlation properties mested within correlation keys. But messages are directly related to properties, not to keys - and it forced a number of conversions that made for tricky code. So Messages now contain a dictionary of correlation properties only. * SpiffWorkflow did not serialize correlations - so they were lost between save and retrieve. * When comparing Correlation Property values - we are storing these values as strings in the database and can't convert them back to integers later, so I'm changing everying everywhere to compare after conversion to a string. Don't feel great about this one. * By using an SQL Alchemy join table, there is a lot of db queries we don't need to write. * A few handy fucntions on db models to make it easier to work with correlations. * Updated tests because I changed some of the BPMN models we were testing against. * Database migration to use the new constraint names with the alternate form of the join table between correlation mesages to instance messages. --- .../migrations/versions/b581ff2351ad_.py | 35 +++++++ .../message_correlation_message_instance.py | 4 +- .../models/message_correlation_property.py | 2 + .../models/message_instance.py | 20 ++-- .../models/message_model.py | 7 ++ .../routes/messages_controller.py | 23 +---- .../services/message_service.py | 3 +- .../services/process_instance_processor.py | 60 ++++-------- .../integration/test_process_api.py | 92 ++++++++++++------- 9 files changed, 138 insertions(+), 108 deletions(-) create mode 100644 spiffworkflow-backend/migrations/versions/b581ff2351ad_.py diff --git a/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py b/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py new file mode 100644 index 00000000..761edee3 --- /dev/null +++ b/spiffworkflow-backend/migrations/versions/b581ff2351ad_.py @@ -0,0 +1,35 @@ +"""empty message + +Revision ID: b581ff2351ad +Revises: ac40af4ddef3 +Create Date: 2023-02-19 11:19:24.371528 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'b581ff2351ad' +down_revision = 'ac40af4ddef3' +branch_labels = None +depends_on = None + + +def upgrade(): + + # Just some table cleanup so the join table is properly built + op.drop_constraint('message_correlation_message_instance_ibfk_1', 'message_correlation_message_instance', type_="foreignkey") + op.drop_constraint('message_correlation_message_instance_ibfk_2', 'message_correlation_message_instance', type_="foreignkey") + op.drop_index('ix_message_correlation_message_instance_message_correlation_id', table_name='message_correlation_message_instance') + op.drop_index('ix_message_correlation_message_instance_message_instance_id', table_name='message_correlation_message_instance') + op.create_foreign_key(None, 'message_correlation_message_instance', 'message_correlation', ['message_correlation_id'], ['id']) + op.create_foreign_key(None, 'message_correlation_message_instance', 'message_instance', ['message_instance_id'], ['id']) + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_index(op.f('ix_message_correlation_property_correlation_key'), table_name='message_correlation_property') + op.create_index('ix_message_correlation_message_instance_message_instance_id', 'message_correlation_message_instance', ['message_instance_id'], unique=False) + op.create_index('ix_message_correlation_message_instance_message_correlation_id', 'message_correlation_message_instance', ['message_correlation_id'], unique=False) + # ### end Alembic commands ### diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_message_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_message_instance.py index c61dc7cf..d31e4845 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_message_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_message_instance.py @@ -9,5 +9,5 @@ message_correlation_message_instance_table \ db.Column('message_instance_id', ForeignKey('message_instance.id'), primary_key=True), db.Column('message_correlation_id', - ForeignKey('message_correlation.id'),primary_key=True) - ) + ForeignKey('message_correlation.id'),primary_key=True), + ) \ No newline at end of file diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_property.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_property.py index 1e09dc0c..d2508cc4 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_property.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_correlation_property.py @@ -23,3 +23,5 @@ class MessageCorrelationPropertyModel(SpiffworkflowBaseDBModel): message_model_id = db.Column(ForeignKey(MessageModel.id), nullable=False) updated_at_in_seconds: int = db.Column(db.Integer) created_at_in_seconds: int = db.Column(db.Integer) + message_model = db.relationship("MessageModel", + backref="correlation_properties") diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py index d0d24550..e8e7b312 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_instance.py @@ -68,19 +68,23 @@ class MessageInstanceModel(SpiffworkflowBaseDBModel): """Validate_status.""" return self.validate_enum_field(key, value, MessageStatuses) + def correlation_dictionary(self): + correlation_dict = {} + for c in self.message_correlations: + correlation_dict[c.name]=c.value + return correlation_dict + def correlates(self, other_message_instance: Self) -> bool: if other_message_instance.message_model_id != self.message_model_id: return False - correlation_dict = {} - for c in other_message_instance.message_correlations: - correlation_dict[c.name]=c.value - return self.correlates_with_dictionary(correlation_dict) + return self.correlates_with_dictionary(other_message_instance.correlation_dictionary()) - def correlates_with_dictionary(self, correlation_dictionary: dict) -> bool: - """Returns true if the given dictionary matches the correlation names and values connected to this message instance""" + def correlates_with_dictionary(self, dict: dict) -> bool: + """Returns true if the given dictionary matches the correlation + names and values connected to this message instance""" for c in self.message_correlations: - # Fixme: Maybe we should look at typing the correlations and not forcing them to strings? - if c.name in correlation_dictionary and str(correlation_dictionary[c.name]) == c.value: + # Fixme: Maybe we should look at typing the correlations and not forcing them to strings? + if c.name in dict and str(dict[c.name]) == c.value: continue else: return False diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_model.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_model.py index 8ebd15c5..e5528033 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/message_model.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/message_model.py @@ -11,3 +11,10 @@ class MessageModel(SpiffworkflowBaseDBModel): id = db.Column(db.Integer, primary_key=True) identifier = db.Column(db.String(50), unique=True, index=True) name = db.Column(db.String(50), unique=True, index=True) + # correlation_properties is a backref and defined in the MessageCorrelationProperties class. + + def get_correlation_property(self, identifier): + for corr_prop in self.correlation_properties: + if corr_prop.identifier == identifier: + return corr_prop; + return None \ No newline at end of file diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py index 864a763c..5f82765c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py @@ -56,26 +56,6 @@ def message_instance_list( .paginate(page=page, per_page=per_page, error_out=False) ) - for message_instance in message_instances: - message_correlations: dict = {} - for ( - mcmi - ) in ( - message_instance.MessageInstanceModel.message_correlations_message_instances - ): - mc = MessageCorrelationModel.query.filter_by( - id=mcmi.message_correlation_id - ).all() - for m in mc: - if m.name not in message_correlations: - message_correlations[m.name] = {} - message_correlations[m.name][ - m.message_correlation_property.identifier - ] = m.value - message_instance.MessageInstanceModel.message_correlations = ( - message_correlations - ) - response_json = { "results": message_instances.items, "pagination": { @@ -120,7 +100,6 @@ def message_send( # Is there a running instance that is waiting for this message? message_instances = MessageInstanceModel.query.filter_by(message_model_id=message_model.id).all() - correlations = MessageCorrelationPropertyModel.query.filter_by(message_model_id=message_model.id).all() # do any waiting message instances have matching correlations? matching_message = None @@ -133,7 +112,7 @@ def message_send( process_instance = ProcessInstanceModel.query.filter_by(id = matching_message.process_instance_id).first() if matching_message and process_instance and process_instance.status != ProcessInstanceStatus.waiting.value: - ApiError( + raise ApiError( error_code="message_not_accepted", message=( f"The process that can accept message '{message_identifier}' with the given correlation keys" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index 8277f584..b37013b3 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -149,11 +149,12 @@ class MessageService: ) ) + processor_receive = ProcessInstanceProcessor(process_instance_receive) processor_receive.bpmn_process_instance.catch_bpmn_message( message_model_name, message_payload, - correlations={}, + correlations=message_instance_receive.correlation_dictionary(), ) processor_receive.do_engine_steps(save=True) message_instance_receive.status = MessageStatuses.completed.value diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py index 6b2d091c..43e46680 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_processor.py @@ -1338,10 +1338,9 @@ class ProcessInstanceProcessor: db.session.add(self.process_instance_model) db.session.commit() - # messages have one correlation key (possibly wrong) - # correlation keys may have many correlation properties def process_bpmn_messages(self) -> None: """Process_bpmn_messages.""" + bpmn_messages = self.bpmn_process_instance.get_bpmn_messages() for bpmn_message in bpmn_messages: # only message sends are in get_bpmn_messages @@ -1352,6 +1351,7 @@ class ProcessInstanceProcessor: f"Invalid message name: {bpmn_message.name}.", ) + if not bpmn_message.correlations: raise ApiError( "message_correlations_missing", @@ -1362,54 +1362,28 @@ class ProcessInstanceProcessor: ) message_correlations = [] - for ( - message_correlation_key, - message_correlation_properties, - ) in bpmn_message.correlations.items(): - for ( - message_correlation_property_identifier, - message_correlation_property_value, - ) in message_correlation_properties.items(): - message_correlation_property = ( - MessageCorrelationPropertyModel.query.filter_by( - identifier=message_correlation_property_identifier, - ).first() - ) - if message_correlation_property is None: - raise ApiError( - "message_correlations_missing_from_process", - ( - "Could not find a known message correlation with" - f" identifier:{message_correlation_property_identifier}" - ), - ) - message_correlations.append( - { - "message_correlation_property": ( - message_correlation_property - ), - "name": message_correlation_property_identifier, - "value": message_correlation_property_value, - } + for (name, value) in bpmn_message.correlations.items(): + message_correlation_property = message_model.get_correlation_property(name) + if message_correlation_property is None: + raise ApiError( + "message_correlations_missing_from_process", + ( + "Could not find a known message correlation with" + f" identifier:{name}" + ), ) + message_correlations.append(MessageCorrelationModel( + process_instance_id=self.process_instance_model.id, + message_correlation_property_id=message_correlation_property.id, + name=name, + value=value)) message_instance = MessageInstanceModel( process_instance_id=self.process_instance_model.id, message_type="send", message_model_id=message_model.id, payload=bpmn_message.payload, + message_correlations=message_correlations ) - - correlation_models = [] - for message_correlation in message_correlations: - correlation_models.append(MessageCorrelationModel( - process_instance_id=self.process_instance_model.id, - message_correlation_property_id=message_correlation[ - "message_correlation_property" - ].id, - name=message_correlation["name"], - value=message_correlation["value"], - )) - message_instance.message_correlations = correlation_models db.session.add(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index a72796e0..f3c2e0d0 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -1347,11 +1347,12 @@ class TestProcessApi(BaseTest): bpmn_file_location=bpmn_file_location, ) - message_model_identifier = "message_send" + message_model_identifier = "request_approval" payload = { - "topica": "the_topica_string", - "topicb": "the_topicb_string", - "andThis": "another_item_non_key", + "customer_id": "sartography", + "po_number": "1001", + "amount": "One Billion Dollars! Mwhahahahahaha", + "description": "But seriously." } response = client.post( f"/v1.0/messages/{message_model_identifier}", @@ -1372,7 +1373,7 @@ class TestProcessApi(BaseTest): processor = ProcessInstanceProcessor(process_instance) process_instance_data = processor.get_data() assert process_instance_data - assert process_instance_data["the_payload"] == payload + assert process_instance_data["invoice"] == payload def test_message_send_when_providing_message_to_running_process_instance( self, @@ -1395,13 +1396,12 @@ class TestProcessApi(BaseTest): bpmn_file_location=bpmn_file_location, ) - message_model_identifier = "message_response" + message_model_identifier = "approval_result" payload = { - "the_payload": { - "topica": "the_payload.topica_string", - "topicb": "the_payload.topicb_string", - "andThis": "another_item_non_key", - } + "customer_id": "sartography", + "po_number": "1001", + "amount": "One Billion Dollars! Mwhahahahahaha", + "description": "Ya!, a-ok bud!" } response = self.create_process_instance_from_process_model_id_with_api( client, @@ -1415,9 +1415,26 @@ class TestProcessApi(BaseTest): f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run", headers=self.logged_in_headers(with_super_admin_user), ) - assert response.json is not None + process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() + processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + task = processor.get_all_user_tasks()[0] + human_task = process_instance.active_human_tasks[0] + spiff_task = processor.__class__.get_task_by_bpmn_identifier( + human_task.task_name, processor.bpmn_process_instance + ) + + ProcessInstanceService.complete_form_task( + processor, + task, + payload, + with_super_admin_user, + human_task, + ) + processor.save() + response = client.post( f"/v1.0/messages/{message_model_identifier}", content_type="application/json", @@ -1462,14 +1479,14 @@ class TestProcessApi(BaseTest): bpmn_file_location=bpmn_file_location, ) - message_model_identifier = "message_response" + message_model_identifier = "approval_result" payload = { - "the_payload": { - "topica": "the_payload.topica_string", - "topicb": "the_payload.topicb_string", - "andThis": "another_item_non_key", - } + "customer_id": "sartography", + "po_number": "1001", + "amount": "One Billion Dollars! Mwhahahahahaha", + "description": "But seriously." } + response = self.create_process_instance_from_process_model_id_with_api( client, process_model_identifier, @@ -1478,20 +1495,29 @@ class TestProcessApi(BaseTest): assert response.json is not None process_instance_id = response.json["id"] - response = client.post( - f"/v1.0/process-instances/{self.modify_process_identifier_for_path_param(process_model_identifier)}/{process_instance_id}/run", - headers=self.logged_in_headers(with_super_admin_user), - ) - - assert response.status_code == 200 - assert response.json is not None process_instance = ProcessInstanceModel.query.filter_by( id=process_instance_id ).first() processor = ProcessInstanceProcessor(process_instance) + processor.do_engine_steps(save=True) + task = processor.get_all_user_tasks()[0] + human_task = process_instance.active_human_tasks[0] + spiff_task = processor.__class__.get_task_by_bpmn_identifier( + human_task.task_name, processor.bpmn_process_instance + ) + + ProcessInstanceService.complete_form_task( + processor, + task, + payload, + with_super_admin_user, + human_task, + ) + processor.save() processor.suspend() + payload['description'] = "Message To Suspended" response = client.post( f"/v1.0/messages/{message_model_identifier}", content_type="application/json", @@ -1502,15 +1528,16 @@ class TestProcessApi(BaseTest): ) assert response.status_code == 400 assert response.json - assert response.json["error_code"] == "process_instance_is_suspended" + assert response.json["error_code"] == "message_not_accepted" processor.resume() + payload['description'] = "Message To Resumed" response = client.post( f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), data=json.dumps( - {"payload": payload, "process_instance_id": process_instance_id} + {"payload": payload} ), ) assert response.status_code == 200 @@ -1538,7 +1565,7 @@ class TestProcessApi(BaseTest): ) assert response.status_code == 400 assert response.json - assert response.json["error_code"] == "process_instance_is_terminated" + assert response.json["error_code"] == "message_not_accepted" def test_process_instance_can_be_terminated( self, @@ -2293,11 +2320,12 @@ class TestProcessApi(BaseTest): # process_model_source_directory="message_send_one_conversation", # bpmn_file_name="message_receiver", # ) - message_model_identifier = "message_send" + message_model_identifier = "request_approval" payload = { - "topica": "the_topica_string", - "topicb": "the_topicb_string", - "andThis": "another_item_non_key", + "customer_id": "sartography", + "po_number": "1001", + "amount": "One Billion Dollars! Mwhahahahahaha", + "description": "But seriously." } response = client.post( f"/v1.0/messages/{message_model_identifier}",