some additional checks to make sure duplicate message instances are not created

This commit is contained in:
jasquat 2022-09-29 13:32:44 -04:00
parent 8bcf3d08b5
commit 5946bd9d05
3 changed files with 30 additions and 8 deletions

View File

@ -437,10 +437,20 @@ def message_instance_list(
process_instance_id=process_instance_id
)
message_instances = message_instances_query.order_by(
MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore
MessageInstanceModel.id.desc(), # type: ignore
).paginate(page, per_page, False)
message_instances = (
message_instances_query.order_by(
MessageInstanceModel.created_at_in_seconds.desc(), # type: ignore
MessageInstanceModel.id.desc(), # type: ignore
)
.join(MessageModel)
.join(ProcessInstanceModel)
.add_columns(
MessageModel.identifier.label("message_identifier"),
ProcessInstanceModel.process_model_identifier,
ProcessInstanceModel.process_group_identifier,
)
.paginate(page, per_page, False)
)
response_json = {
"results": message_instances.items,

View File

@ -39,9 +39,10 @@ class MessageService:
message_type="send", status="ready"
).all()
message_instances_receive = MessageInstanceModel.query.filter_by(
message_type="receive"
message_type="receive", status="ready"
).all()
for message_instance_send in message_instances_send:
# print(f"message_instance_send.id: {message_instance_send.id}")
# check again in case another background process picked up the message
# while the previous one was running
if message_instance_send.status != "ready":
@ -79,6 +80,10 @@ class MessageService:
message_instance_send.status = "ready"
else:
if message_instance_receive.status != "ready":
continue
message_instance_receive.status = "running"
cls.process_message_receive(
message_instance_receive,
message_instance_send.message_model.name,

View File

@ -695,7 +695,6 @@ class ProcessInstanceProcessor:
bpmn_messages = self.bpmn_process_instance.get_bpmn_messages()
for bpmn_message in bpmn_messages:
# only message sends are in get_bpmn_messages
message_type = "send"
message_model = MessageModel.query.filter_by(name=bpmn_message.name).first()
if message_model is None:
raise ApiError(
@ -736,10 +735,9 @@ class ProcessInstanceProcessor:
"value": message_correlation_property_value,
}
)
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
message_type=message_type,
message_type="send",
message_model_id=message_model.id,
payload=bpmn_message.payload,
)
@ -792,6 +790,15 @@ class ProcessInstanceProcessor:
f"Invalid message name: {waiting_task.task_spec.event_definition.name}.",
)
# Ensure we are only creating one message instance for each waiting message
message_instance = MessageInstanceModel.query.filter_by(
process_instance_id=self.process_instance_model.id,
message_type="receive",
message_model_id=message_model.id,
).first()
if message_instance:
continue
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
message_type="receive",