From 86b31d20cb9120a7f07b68128a543120536ad198 Mon Sep 17 00:00:00 2001 From: Dan Funk Date: Mon, 11 Mar 2024 14:24:31 -0400 Subject: [PATCH] Messages as MicroServices (#1185) * Purpose: Allow the messages api endpoint to accept arbitrary JSON in it's body - and to respond with the latest task_data - this will allow you to create BPMN processes are that can function as microservices. * Fixing a bug that prevented the synchronous execution of a message post. * A message post directly uses the body of the post, do not specify it within a "payload". * The message response is not just the process instance details, it now contains the "process_instance" and "task_data" at the top level of the returned json. * The last completed task data is now returned as a part of a response to the message api endpoint, it is with the "task_data" attribute. * CodeRabbit suggestions * run_pyl fixes * fix lint --------- Co-authored-by: burnettk --- .../models/process_instance.py | 13 ++++++ .../routes/messages_controller.py | 17 +++---- .../services/message_service.py | 23 +++++----- .../integration/test_messages.py | 11 +++-- .../integration/test_process_api.py | 44 ++++++++++--------- 5 files changed, 62 insertions(+), 46 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py index 988bae10..57e08d58 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/models/process_instance.py @@ -7,6 +7,7 @@ from flask_sqlalchemy.query import Query from marshmallow import INCLUDE from marshmallow import Schema from sqlalchemy import ForeignKey +from sqlalchemy import desc from sqlalchemy.orm import relationship from sqlalchemy.orm import validates @@ -198,6 +199,18 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel): def immediately_runnable_statuses(cls) -> list[str]: return ["not_started", "running"] + def get_data(self) -> dict: + """Returns the data of the last completed task in this process instance.""" + last_completed_task = ( + TaskModel.query.filter_by(process_instance_id=self.id, state="COMPLETED") + .order_by(desc(TaskModel.end_in_seconds)) # type: ignore + .first() + ) + if last_completed_task: # pragma: no cover + return last_completed_task.json_data() # type: ignore + else: + return {} + class ProcessInstanceModelSchema(Schema): class Meta: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py index 1ef1991c..e867e26e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py @@ -67,22 +67,13 @@ def message_send( body: dict[str, Any], execution_mode: str | None = None, ) -> flask.wrappers.Response: - if "payload" not in body: - raise ( - ApiError( - error_code="missing_payload", - message="Please include a 'payload' in the JSON body that contains the message contents.", - status_code=400, - ) - ) - process_instance = None # Create the send message message_instance = MessageInstanceModel( message_type="send", name=message_name, - payload=body["payload"], + payload=body, user_id=g.user.id, ) db.session.add(message_instance) @@ -110,8 +101,12 @@ def message_send( ) process_instance = ProcessInstanceModel.query.filter_by(id=receiver_message.process_instance_id).first() + response_json = { + "task_data": process_instance.get_data(), + "process_instance": ProcessInstanceModelSchema().dump(process_instance), + } return Response( - json.dumps(ProcessInstanceModelSchema().dump(process_instance)), + json.dumps(response_json), status=200, mimetype="application/json", ) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index f5117a20..f0219e06 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -90,9 +90,7 @@ class MessageService: message_instance_receive.status = "running" cls.process_message_receive( - receiving_process, - message_instance_receive, - message_instance_send, + receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode ) message_instance_receive.status = "completed" message_instance_receive.counterpart_id = message_instance_send.id @@ -146,13 +144,7 @@ class MessageService: cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model) processor_receive.save() - if not queue_process_instance_if_appropriate( - process_instance_receive, execution_mode=execution_mode - ) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive): - execution_strategy_name = None - if execution_mode == ProcessInstanceExecutionMode.synchronous.value: - execution_strategy_name = "greedy" - processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + processor_receive.do_engine_steps(save=True) return process_instance_receive @@ -195,6 +187,7 @@ class MessageService: process_instance_receive: ProcessInstanceModel, message_instance_receive: MessageInstanceModel, message_instance_send: MessageInstanceModel, + execution_mode: str | None = None, ) -> None: correlation_properties = [] for cr in message_instance_receive.correlation_rules: @@ -216,7 +209,15 @@ class MessageService: ) processor_receive = ProcessInstanceProcessor(process_instance_receive) processor_receive.bpmn_process_instance.send_event(bpmn_event) - processor_receive.do_engine_steps(save=True) + execution_strategy_name = None + + if not queue_process_instance_if_appropriate( + process_instance_receive, execution_mode=execution_mode + ) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive): + execution_strategy_name = None + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + execution_strategy_name = "greedy" + processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) message_instance_receive.status = MessageStatuses.completed.value db.session.add(message_instance_receive) db.session.commit() diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_messages.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_messages.py index 4a88387e..50bc8f7c 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_messages.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_messages.py @@ -42,15 +42,20 @@ class TestMessages(BaseTest): with pytest.raises(ApiError): message_send( "Approval Result", - {"payload": {"po_number": 1001, "customer_id": "jon"}}, + {"po_number": 1001, "customer_id": "jon"}, ) # No error when calling with the correct parameters - message_send( + response = message_send( "Approval Result", - {"payload": {"po_number": 1001, "customer_id": "Sartography"}}, + {"po_number": 1001, "customer_id": "Sartography"}, ) + # The response's task data should also match up with the correlation keys. + response_json = response.json + assert response_json["task_data"]["po_number"] == 1001 + assert response_json["task_data"]["customer_id"] == "Sartography" + # There is no longer a waiting message waiting_messages = ( MessageInstanceModel.query.filter_by(message_type="receive") diff --git a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py index ac3e6a1c..85efad6d 100644 --- a/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py +++ b/spiffworkflow-backend/tests/spiffworkflow_backend/integration/test_process_api.py @@ -550,7 +550,7 @@ class TestProcessApi(BaseTest): bpmn_file_name="simple_form", ) # When adding a process model with one Process, no decisions, and some json files, only one process is recorded. - assert len(ReferenceCacheModel.basic_query().all()) == 1 + assert ReferenceCacheModel.basic_query().count() == 1 self.create_group_and_model_with_bpmn( client=client, @@ -560,7 +560,7 @@ class TestProcessApi(BaseTest): bpmn_file_location="call_activity_nested", ) # When adding a process model with 4 processes and a decision, 5 new records will be in the Cache - assert len(ReferenceCacheModel.basic_query().all()) == 6 + assert ReferenceCacheModel.basic_query().count() == 6 # get the results response = client.get( @@ -592,7 +592,7 @@ class TestProcessApi(BaseTest): bpmn_file_name="simple_form", ) # When adding a process model with one Process, no decisions, and some json files, only one process is recorded. - assert len(ReferenceCacheModel.basic_query().all()) == 1 + assert ReferenceCacheModel.basic_query().count() == 1 self.create_group_and_model_with_bpmn( client=client, @@ -602,7 +602,7 @@ class TestProcessApi(BaseTest): bpmn_file_location="call_activity_nested", ) # When adding a process model with 4 processes and a decision, 5 new records will be in the Cache - assert len(ReferenceCacheModel.basic_query().all()) == 6 + assert ReferenceCacheModel.basic_query().count() == 6 user_one = self.create_user_with_permission(username="user_one", target_uri="/v1.0/process-groups/test_group_one:*") self.add_permissions_to_user(user=user_one, target_uri="/v1.0/processes", permission_names=["read"]) @@ -642,7 +642,7 @@ class TestProcessApi(BaseTest): bpmn_file_name="simple_form", ) # When adding a process model with one Process, no decisions, and some json files, only one process is recorded. - assert len(ReferenceCacheModel.basic_query().all()) == 1 + assert ReferenceCacheModel.basic_query().count() == 1 # but no callers are recorded assert ProcessCallerService.count() == 0 @@ -654,7 +654,7 @@ class TestProcessApi(BaseTest): bpmn_file_location="call_activity_nested", ) # When adding a process model with 4 processes and a decision, 5 new records will be in the Cache - assert len(ReferenceCacheModel.basic_query().all()) == 6 + assert ReferenceCacheModel.basic_query().count() == 6 # and 4 callers recorded assert ProcessCallerService.count() == 4 @@ -1462,13 +1462,14 @@ class TestProcessApi(BaseTest): f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), - data=json.dumps({"payload": payload}), + data=json.dumps(payload), ) assert response.status_code == 200 json_data = response.json assert json_data - assert json_data["status"] == "complete" - process_instance_id = json_data["id"] + assert json_data["process_instance"]["status"] == "complete" + assert json_data["task_data"]["invoice"] == payload + process_instance_id = json_data["process_instance"]["id"] process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() assert process_instance @@ -1537,13 +1538,14 @@ class TestProcessApi(BaseTest): f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), - data=json.dumps({"payload": payload, "process_instance_id": process_instance_id}), + data=json.dumps(payload), ) assert response.status_code == 200 json_data = response.json assert json_data - assert json_data["status"] == "complete" - process_instance_id = json_data["id"] + assert json_data["process_instance"]["status"] == "complete" + assert json_data["task_data"]["the_payload"] == payload + process_instance_id = json_data["process_instance"]["id"] process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() assert process_instance @@ -1609,7 +1611,7 @@ class TestProcessApi(BaseTest): f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), - data=json.dumps({"payload": payload, "process_instance_id": process_instance_id}), + data=json.dumps(payload), ) assert response.status_code == 400 assert response.json @@ -1621,13 +1623,13 @@ class TestProcessApi(BaseTest): f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), - data=json.dumps({"payload": payload}), + data=json.dumps(payload), ) assert response.status_code == 200 json_data = response.json assert json_data - assert json_data["status"] == "complete" - process_instance_id = json_data["id"] + assert json_data["process_instance"]["status"] == "complete" + process_instance_id = json_data["process_instance"]["id"] process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() assert process_instance processor = ProcessInstanceProcessor(process_instance) @@ -1640,7 +1642,7 @@ class TestProcessApi(BaseTest): f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), - data=json.dumps({"payload": payload, "process_instance_id": process_instance_id}), + data=json.dumps(payload), ) assert response.status_code == 400 assert response.json @@ -2282,22 +2284,22 @@ class TestProcessApi(BaseTest): f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), - data=json.dumps({"payload": payload}), + data=json.dumps(payload), ) assert response.status_code == 200 assert response.json is not None - process_instance_id_one = response.json["id"] + process_instance_id_one = response.json["process_instance"]["id"] payload["po_number"] = "1002" response = client.post( f"/v1.0/messages/{message_model_identifier}", content_type="application/json", headers=self.logged_in_headers(with_super_admin_user), - data=json.dumps({"payload": payload}), + data=json.dumps(payload), ) assert response.status_code == 200 assert response.json is not None - process_instance_id_two = response.json["id"] + process_instance_id_two = response.json["process_instance"]["id"] response = client.get( f"/v1.0/messages?process_instance_id={process_instance_id_one}",