Messages as MicroServices (#1185)

* Purpose:  Allow the messages api endpoint to accept arbitrary JSON in it's body - and to respond with the latest task_data - this will allow you to create BPMN processes are that can function as microservices.

* Fixing a bug that prevented the synchronous execution of a message post.
* A message post directly uses the body of the post, do not specify it within a "payload".
* The message response is not just the process instance details, it now contains the "process_instance" and "task_data" at the top level of the returned json.
* The last completed task data is now returned as a part of a response to the message api endpoint, it is with the "task_data" attribute.

* CodeRabbit suggestions

* run_pyl fixes

* fix lint

---------

Co-authored-by: burnettk <burnettk@users.noreply.github.com>
This commit is contained in:
Dan Funk 2024-03-11 14:24:31 -04:00 committed by GitHub
parent 6823f53536
commit 86b31d20cb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 62 additions and 46 deletions

View File

@ -7,6 +7,7 @@ from flask_sqlalchemy.query import Query
from marshmallow import INCLUDE from marshmallow import INCLUDE
from marshmallow import Schema from marshmallow import Schema
from sqlalchemy import ForeignKey from sqlalchemy import ForeignKey
from sqlalchemy import desc
from sqlalchemy.orm import relationship from sqlalchemy.orm import relationship
from sqlalchemy.orm import validates from sqlalchemy.orm import validates
@ -198,6 +199,18 @@ class ProcessInstanceModel(SpiffworkflowBaseDBModel):
def immediately_runnable_statuses(cls) -> list[str]: def immediately_runnable_statuses(cls) -> list[str]:
return ["not_started", "running"] return ["not_started", "running"]
def get_data(self) -> dict:
"""Returns the data of the last completed task in this process instance."""
last_completed_task = (
TaskModel.query.filter_by(process_instance_id=self.id, state="COMPLETED")
.order_by(desc(TaskModel.end_in_seconds)) # type: ignore
.first()
)
if last_completed_task: # pragma: no cover
return last_completed_task.json_data() # type: ignore
else:
return {}
class ProcessInstanceModelSchema(Schema): class ProcessInstanceModelSchema(Schema):
class Meta: class Meta:

View File

@ -67,22 +67,13 @@ def message_send(
body: dict[str, Any], body: dict[str, Any],
execution_mode: str | None = None, execution_mode: str | None = None,
) -> flask.wrappers.Response: ) -> flask.wrappers.Response:
if "payload" not in body:
raise (
ApiError(
error_code="missing_payload",
message="Please include a 'payload' in the JSON body that contains the message contents.",
status_code=400,
)
)
process_instance = None process_instance = None
# Create the send message # Create the send message
message_instance = MessageInstanceModel( message_instance = MessageInstanceModel(
message_type="send", message_type="send",
name=message_name, name=message_name,
payload=body["payload"], payload=body,
user_id=g.user.id, user_id=g.user.id,
) )
db.session.add(message_instance) db.session.add(message_instance)
@ -110,8 +101,12 @@ def message_send(
) )
process_instance = ProcessInstanceModel.query.filter_by(id=receiver_message.process_instance_id).first() process_instance = ProcessInstanceModel.query.filter_by(id=receiver_message.process_instance_id).first()
response_json = {
"task_data": process_instance.get_data(),
"process_instance": ProcessInstanceModelSchema().dump(process_instance),
}
return Response( return Response(
json.dumps(ProcessInstanceModelSchema().dump(process_instance)), json.dumps(response_json),
status=200, status=200,
mimetype="application/json", mimetype="application/json",
) )

View File

@ -90,9 +90,7 @@ class MessageService:
message_instance_receive.status = "running" message_instance_receive.status = "running"
cls.process_message_receive( cls.process_message_receive(
receiving_process, receiving_process, message_instance_receive, message_instance_send, execution_mode=execution_mode
message_instance_receive,
message_instance_send,
) )
message_instance_receive.status = "completed" message_instance_receive.status = "completed"
message_instance_receive.counterpart_id = message_instance_send.id message_instance_receive.counterpart_id = message_instance_send.id
@ -146,13 +144,7 @@ class MessageService:
cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model) cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model)
processor_receive.save() processor_receive.save()
if not queue_process_instance_if_appropriate( processor_receive.do_engine_steps(save=True)
process_instance_receive, execution_mode=execution_mode
) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
return process_instance_receive return process_instance_receive
@ -195,6 +187,7 @@ class MessageService:
process_instance_receive: ProcessInstanceModel, process_instance_receive: ProcessInstanceModel,
message_instance_receive: MessageInstanceModel, message_instance_receive: MessageInstanceModel,
message_instance_send: MessageInstanceModel, message_instance_send: MessageInstanceModel,
execution_mode: str | None = None,
) -> None: ) -> None:
correlation_properties = [] correlation_properties = []
for cr in message_instance_receive.correlation_rules: for cr in message_instance_receive.correlation_rules:
@ -216,7 +209,15 @@ class MessageService:
) )
processor_receive = ProcessInstanceProcessor(process_instance_receive) processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.send_event(bpmn_event) processor_receive.bpmn_process_instance.send_event(bpmn_event)
processor_receive.do_engine_steps(save=True) execution_strategy_name = None
if not queue_process_instance_if_appropriate(
process_instance_receive, execution_mode=execution_mode
) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
message_instance_receive.status = MessageStatuses.completed.value message_instance_receive.status = MessageStatuses.completed.value
db.session.add(message_instance_receive) db.session.add(message_instance_receive)
db.session.commit() db.session.commit()

View File

@ -42,15 +42,20 @@ class TestMessages(BaseTest):
with pytest.raises(ApiError): with pytest.raises(ApiError):
message_send( message_send(
"Approval Result", "Approval Result",
{"payload": {"po_number": 1001, "customer_id": "jon"}}, {"po_number": 1001, "customer_id": "jon"},
) )
# No error when calling with the correct parameters # No error when calling with the correct parameters
message_send( response = message_send(
"Approval Result", "Approval Result",
{"payload": {"po_number": 1001, "customer_id": "Sartography"}}, {"po_number": 1001, "customer_id": "Sartography"},
) )
# The response's task data should also match up with the correlation keys.
response_json = response.json
assert response_json["task_data"]["po_number"] == 1001
assert response_json["task_data"]["customer_id"] == "Sartography"
# There is no longer a waiting message # There is no longer a waiting message
waiting_messages = ( waiting_messages = (
MessageInstanceModel.query.filter_by(message_type="receive") MessageInstanceModel.query.filter_by(message_type="receive")

View File

@ -550,7 +550,7 @@ class TestProcessApi(BaseTest):
bpmn_file_name="simple_form", bpmn_file_name="simple_form",
) )
# When adding a process model with one Process, no decisions, and some json files, only one process is recorded. # When adding a process model with one Process, no decisions, and some json files, only one process is recorded.
assert len(ReferenceCacheModel.basic_query().all()) == 1 assert ReferenceCacheModel.basic_query().count() == 1
self.create_group_and_model_with_bpmn( self.create_group_and_model_with_bpmn(
client=client, client=client,
@ -560,7 +560,7 @@ class TestProcessApi(BaseTest):
bpmn_file_location="call_activity_nested", bpmn_file_location="call_activity_nested",
) )
# When adding a process model with 4 processes and a decision, 5 new records will be in the Cache # When adding a process model with 4 processes and a decision, 5 new records will be in the Cache
assert len(ReferenceCacheModel.basic_query().all()) == 6 assert ReferenceCacheModel.basic_query().count() == 6
# get the results # get the results
response = client.get( response = client.get(
@ -592,7 +592,7 @@ class TestProcessApi(BaseTest):
bpmn_file_name="simple_form", bpmn_file_name="simple_form",
) )
# When adding a process model with one Process, no decisions, and some json files, only one process is recorded. # When adding a process model with one Process, no decisions, and some json files, only one process is recorded.
assert len(ReferenceCacheModel.basic_query().all()) == 1 assert ReferenceCacheModel.basic_query().count() == 1
self.create_group_and_model_with_bpmn( self.create_group_and_model_with_bpmn(
client=client, client=client,
@ -602,7 +602,7 @@ class TestProcessApi(BaseTest):
bpmn_file_location="call_activity_nested", bpmn_file_location="call_activity_nested",
) )
# When adding a process model with 4 processes and a decision, 5 new records will be in the Cache # When adding a process model with 4 processes and a decision, 5 new records will be in the Cache
assert len(ReferenceCacheModel.basic_query().all()) == 6 assert ReferenceCacheModel.basic_query().count() == 6
user_one = self.create_user_with_permission(username="user_one", target_uri="/v1.0/process-groups/test_group_one:*") user_one = self.create_user_with_permission(username="user_one", target_uri="/v1.0/process-groups/test_group_one:*")
self.add_permissions_to_user(user=user_one, target_uri="/v1.0/processes", permission_names=["read"]) self.add_permissions_to_user(user=user_one, target_uri="/v1.0/processes", permission_names=["read"])
@ -642,7 +642,7 @@ class TestProcessApi(BaseTest):
bpmn_file_name="simple_form", bpmn_file_name="simple_form",
) )
# When adding a process model with one Process, no decisions, and some json files, only one process is recorded. # When adding a process model with one Process, no decisions, and some json files, only one process is recorded.
assert len(ReferenceCacheModel.basic_query().all()) == 1 assert ReferenceCacheModel.basic_query().count() == 1
# but no callers are recorded # but no callers are recorded
assert ProcessCallerService.count() == 0 assert ProcessCallerService.count() == 0
@ -654,7 +654,7 @@ class TestProcessApi(BaseTest):
bpmn_file_location="call_activity_nested", bpmn_file_location="call_activity_nested",
) )
# When adding a process model with 4 processes and a decision, 5 new records will be in the Cache # When adding a process model with 4 processes and a decision, 5 new records will be in the Cache
assert len(ReferenceCacheModel.basic_query().all()) == 6 assert ReferenceCacheModel.basic_query().count() == 6
# and 4 callers recorded # and 4 callers recorded
assert ProcessCallerService.count() == 4 assert ProcessCallerService.count() == 4
@ -1462,13 +1462,14 @@ class TestProcessApi(BaseTest):
f"/v1.0/messages/{message_model_identifier}", f"/v1.0/messages/{message_model_identifier}",
content_type="application/json", content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps({"payload": payload}), data=json.dumps(payload),
) )
assert response.status_code == 200 assert response.status_code == 200
json_data = response.json json_data = response.json
assert json_data assert json_data
assert json_data["status"] == "complete" assert json_data["process_instance"]["status"] == "complete"
process_instance_id = json_data["id"] assert json_data["task_data"]["invoice"] == payload
process_instance_id = json_data["process_instance"]["id"]
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
assert process_instance assert process_instance
@ -1537,13 +1538,14 @@ class TestProcessApi(BaseTest):
f"/v1.0/messages/{message_model_identifier}", f"/v1.0/messages/{message_model_identifier}",
content_type="application/json", content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps({"payload": payload, "process_instance_id": process_instance_id}), data=json.dumps(payload),
) )
assert response.status_code == 200 assert response.status_code == 200
json_data = response.json json_data = response.json
assert json_data assert json_data
assert json_data["status"] == "complete" assert json_data["process_instance"]["status"] == "complete"
process_instance_id = json_data["id"] assert json_data["task_data"]["the_payload"] == payload
process_instance_id = json_data["process_instance"]["id"]
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
assert process_instance assert process_instance
@ -1609,7 +1611,7 @@ class TestProcessApi(BaseTest):
f"/v1.0/messages/{message_model_identifier}", f"/v1.0/messages/{message_model_identifier}",
content_type="application/json", content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps({"payload": payload, "process_instance_id": process_instance_id}), data=json.dumps(payload),
) )
assert response.status_code == 400 assert response.status_code == 400
assert response.json assert response.json
@ -1621,13 +1623,13 @@ class TestProcessApi(BaseTest):
f"/v1.0/messages/{message_model_identifier}", f"/v1.0/messages/{message_model_identifier}",
content_type="application/json", content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps({"payload": payload}), data=json.dumps(payload),
) )
assert response.status_code == 200 assert response.status_code == 200
json_data = response.json json_data = response.json
assert json_data assert json_data
assert json_data["status"] == "complete" assert json_data["process_instance"]["status"] == "complete"
process_instance_id = json_data["id"] process_instance_id = json_data["process_instance"]["id"]
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
assert process_instance assert process_instance
processor = ProcessInstanceProcessor(process_instance) processor = ProcessInstanceProcessor(process_instance)
@ -1640,7 +1642,7 @@ class TestProcessApi(BaseTest):
f"/v1.0/messages/{message_model_identifier}", f"/v1.0/messages/{message_model_identifier}",
content_type="application/json", content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps({"payload": payload, "process_instance_id": process_instance_id}), data=json.dumps(payload),
) )
assert response.status_code == 400 assert response.status_code == 400
assert response.json assert response.json
@ -2282,22 +2284,22 @@ class TestProcessApi(BaseTest):
f"/v1.0/messages/{message_model_identifier}", f"/v1.0/messages/{message_model_identifier}",
content_type="application/json", content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps({"payload": payload}), data=json.dumps(payload),
) )
assert response.status_code == 200 assert response.status_code == 200
assert response.json is not None assert response.json is not None
process_instance_id_one = response.json["id"] process_instance_id_one = response.json["process_instance"]["id"]
payload["po_number"] = "1002" payload["po_number"] = "1002"
response = client.post( response = client.post(
f"/v1.0/messages/{message_model_identifier}", f"/v1.0/messages/{message_model_identifier}",
content_type="application/json", content_type="application/json",
headers=self.logged_in_headers(with_super_admin_user), headers=self.logged_in_headers(with_super_admin_user),
data=json.dumps({"payload": payload}), data=json.dumps(payload),
) )
assert response.status_code == 200 assert response.status_code == 200
assert response.json is not None assert response.json is not None
process_instance_id_two = response.json["id"] process_instance_id_two = response.json["process_instance"]["id"]
response = client.get( response = client.get(
f"/v1.0/messages?process_instance_id={process_instance_id_one}", f"/v1.0/messages?process_instance_id={process_instance_id_one}",