most of the message processing is mapped out and also save correlations to db

This commit is contained in:
jasquat 2022-08-03 16:29:47 -04:00
parent 4ba4440b49
commit 2bf14a0514
2 changed files with 96 additions and 32 deletions

View File

@ -3,12 +3,17 @@ from typing import Optional
import flask
from flask_bpmn.models.db import db
from SpiffWorkflow.bpmn.specs.events.event_definitions import MessageEventDefinition # type: ignore
from sqlalchemy import and_
from sqlalchemy import or_
from sqlalchemy import select
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
class MessageServiceWithAppContext:
@ -28,6 +33,10 @@ class MessageServiceWithAppContext:
MessageService().process_queued_messages()
class MessageServiceError(Exception):
"""MessageServiceError."""
class MessageService:
"""MessageService."""
@ -40,42 +49,75 @@ class MessageService:
message_type="receive"
).all()
for queued_message_send in queued_messages_send:
queued_message_receive = self.get_related_queued_message(
queued_message_receive = self.get_queued_message_receive(
queued_message_send, queued_messages_receive
)
if queued_message_receive:
self.process_related_message(
self.process_message_receive(
queued_message_send, queued_message_receive
)
def process_related_message(
def process_message_receive(
self,
queued_message: MessageInstanceModel,
related_queued_message: MessageInstanceModel,
queued_message_send: MessageInstanceModel,
queued_message_receive: MessageInstanceModel,
) -> None:
"""Process_related_message."""
print(f"queued_message: {queued_message}")
print(f"related_queued_message: {related_queued_message}")
"""Process_message_receive."""
process_instance_send = ProcessInstanceModel.query.filter_by(
id=queued_message_send.process_instance_id
).first()
if process_instance_send is None:
raise MessageServiceError(
f"Process instance cannot be found for message: {queued_message_send.id}."
f"Tried with id {queued_message_send.process_instance_id}"
)
def get_related_queued_message(
self,
queued_message: MessageInstanceModel,
related_queued_messages: list[MessageInstanceModel],
) -> Optional[MessageInstanceModel]:
"""Get_related_queued_message."""
message_correlations = MessageCorrelationModel.query.filter_by(
message_instance_id=queued_message.id
).all()
message_correlation_filter = []
for message_correlation in message_correlations:
message_correlation_filter.append(
and_(
MessageCorrelationModel.name == message_correlation.name,
MessageCorrelationModel.value == message_correlation.value,
processor_send = ProcessInstanceProcessor(process_instance_send)
spiff_task_send = processor_send.bpmn_process_instance.get_task_by_id(
queued_message_send.bpmn_element_id
)
if spiff_task_send is None:
raise MessageServiceError(
"Processor failed to obtain task.",
)
message_event_send = MessageEventDefinition(
spiff_task_send.id, payload=spiff_task_send.payload
)
process_instance_receive = ProcessInstanceModel.query.filter_by(
id=queued_message_receive.process_instance_id
).first()
if process_instance_receive is None:
raise MessageServiceError(
(
f"Process instance cannot be found for queued message: {queued_message_receive.id}."
f"Tried with id {queued_message_receive.process_instance_id}",
)
)
for queued_message_related in related_queued_messages:
processor_receive = ProcessInstanceProcessor(process_instance_receive)
processor_receive.bpmn_process_instance.catch(message_event_send)
def get_queued_message_receive(
self,
queued_message_send: MessageInstanceModel,
queued_messages_receive: list[MessageInstanceModel],
) -> Optional[MessageInstanceModel]:
"""Get_queued_message_receive."""
message_correlations_send = MessageCorrelationModel.query.filter_by(
message_instance_id=queued_message_send.id
).all()
message_correlation_filter = []
for message_correlation_send in message_correlations_send:
message_correlation_filter.append(
and_(
MessageCorrelationModel.name == message_correlation_send.name,
MessageCorrelationModel.value == message_correlation_send.value,
)
)
for queued_message_receive in queued_messages_receive:
# sqlalchemy supports select / where statements like active record apparantly
# https://docs.sqlalchemy.org/en/14/core/tutorial.html#conjunctions
@ -85,18 +127,18 @@ class MessageService:
.where(
and_(
MessageCorrelationModel.message_instance_id
== queued_message_related.id,
== queued_message_receive.id,
or_(*message_correlation_filter),
)
)
)
message_correlations_related = db.session.execute(
message_correlations_receive = db.session.execute(
message_correlation_select
)
# since the query matches on name, value, and queued_message_related.id, if the counts
# since the query matches on name, value, and queued_message_receive.id, if the counts
# message correlations found are the same, then this should be the relevant message
if message_correlations_related.scalar() == len(message_correlations):
return queued_message_related
if message_correlations_receive.scalar() == len(message_correlations_send):
return queued_message_receive
return None

View File

@ -35,6 +35,7 @@ from SpiffWorkflow.util.deep_merge import DeepMerge # type: ignore
from spiffworkflow_backend.models.active_task import ActiveTaskModel
from spiffworkflow_backend.models.file import File
from spiffworkflow_backend.models.file import FileType
from spiffworkflow_backend.models.message_correlation import MessageCorrelationModel
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageModel
from spiffworkflow_backend.models.principal import PrincipalModel
@ -462,7 +463,7 @@ class ProcessInstanceProcessor:
for bpmn_event in self.bpmn_process_instance.bpmn_events:
message_type = None
# TODO: message: who knows the of the message model?
# TODO: message: who knows the name of the message model?
# will it be in the bpmn_event?
message_model = MessageModel.query.filter_by(
name=bpmn_event.message_name
@ -486,13 +487,29 @@ class ProcessInstanceProcessor:
f"Invalid event type for a message: {bpmn_event.event}.",
)
queued_message = MessageInstanceModel(
if not bpmn_event.message_correlations:
raise ApiError(
"message_correlations_missing",
f"Could not find any message correlations bpmn_event: {bpmn_event}",
)
message_instance = MessageInstanceModel(
process_instance_id=self.process_instance_model.id,
bpmn_element_id=bpmn_event.task_name,
message_type=message_type,
message_model_id=message_model.id,
)
db.session.add(queued_message)
db.session.add(message_instance)
db.session.commit()
# TODO: find out what spiff will call the correlations
for message_correlation in bpmn_event.message_correlations:
message_correlation = MessageCorrelationModel(
message_instance_id=message_instance.id,
name=message_correlation.name,
value=message_correlation.value,
)
db.session.add(message_correlation)
db.session.commit()
def do_engine_steps(self, exit_at: None = None) -> None:
@ -662,6 +679,11 @@ class ProcessInstanceProcessor:
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [t for t in all_tasks if t.state in [TaskState.WAITING, TaskState.READY]]
def get_task_by_id(self, task_id: str) -> SpiffTask:
"""Get_task_by_id."""
all_tasks = self.bpmn_process_instance.get_tasks(TaskState.ANY_MASK)
return [t for t in all_tasks if t.id == task_id]
def get_nav_item(self, task: SpiffTask) -> Any:
"""Get_nav_item."""
for nav_item in self.bpmn_process_instance.get_nav_list():