From bc3708bb0086316ee4561ec863a929eabfb4369e Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Wed, 6 Mar 2024 16:43:06 -0500 Subject: [PATCH] async-support-on-message-start (#1173) * added async and sync support for the message start api w/ burnettk * removed debug dump call w/ burnettk --------- Co-authored-by: jasquat --- ... => get_bpmn_json_for_process_instance.py} | 8 +--- spiffworkflow-backend/bin/start_celery_worker | 5 ++- .../src/spiffworkflow_backend/api.yml | 9 +++++ .../background_processing/apscheduler.py | 5 ++- .../spiffworkflow_backend/config/default.py | 2 + .../routes/messages_controller.py | 3 +- .../services/message_service.py | 40 ++++++++++++++++--- .../ProcessInstanceCurrentTaskInfo.tsx | 1 + 8 files changed, 58 insertions(+), 15 deletions(-) rename spiffworkflow-backend/bin/{get_bpmn_json_for_process_instance => get_bpmn_json_for_process_instance.py} (92%) diff --git a/spiffworkflow-backend/bin/get_bpmn_json_for_process_instance b/spiffworkflow-backend/bin/get_bpmn_json_for_process_instance.py similarity index 92% rename from spiffworkflow-backend/bin/get_bpmn_json_for_process_instance rename to spiffworkflow-backend/bin/get_bpmn_json_for_process_instance.py index 0c0eada0..fe60de70 100755 --- a/spiffworkflow-backend/bin/get_bpmn_json_for_process_instance +++ b/spiffworkflow-backend/bin/get_bpmn_json_for_process_instance.py @@ -1,12 +1,8 @@ -import json -import os import sys from spiffworkflow_backend import create_app from spiffworkflow_backend.models.process_instance import ProcessInstanceModel -from spiffworkflow_backend.services.process_instance_processor import ( - ProcessInstanceProcessor, -) +from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor def main(process_instance_id: str) -> None: @@ -15,7 +11,7 @@ def main(process_instance_id: str) -> None: with app.app_context(): process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() - file_path = f"/var/tmp/{process_instance_id}_bpmn_json.json" + file_path = f"/var/tmp/{process_instance_id}_bpmn_json.json" # noqa: S108 if not process_instance: raise Exception(f"Could not find a process instance with id: {process_instance_id}") diff --git a/spiffworkflow-backend/bin/start_celery_worker b/spiffworkflow-backend/bin/start_celery_worker index 8eefaf6e..1eed7caf 100755 --- a/spiffworkflow-backend/bin/start_celery_worker +++ b/spiffworkflow-backend/bin/start_celery_worker @@ -1,7 +1,7 @@ #!/usr/bin/env bash function error_handler() { - >&2 echo "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}." + echo >&2 "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}." exit "$2" } trap 'error_handler ${LINENO} $?' ERR @@ -10,4 +10,7 @@ set -o errtrace -o errexit -o nounset -o pipefail export SPIFFWORKFLOW_BACKEND_CELERY_ENABLED=true export SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER_IN_CREATE_APP=false +# so we can raise if calling unsafe code in celery +export SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER=true + poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker worker --loglevel=info diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml index ab3e1dde..84df4260 100755 --- a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml +++ b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml @@ -2538,6 +2538,15 @@ paths: description: The unique name of the message. schema: type: string + - name: execution_mode + in: query + required: false + description: Either run in "synchronous" or "asynchronous" mode. + schema: + type: string + enum: + - synchronous + - asynchronous post: tags: - Messages diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py index b15ee7f5..8acffc42 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/apscheduler.py @@ -33,7 +33,7 @@ def start_apscheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Bac else: _add_jobs_for_non_celery_based_configuration(app, scheduler) - _add_jobs_relevant_for_all_celery_configurations(app, scheduler) + _add_jobs_that_should_run_regardless_of_celery_config(app, scheduler) scheduler.start() @@ -78,12 +78,13 @@ def _add_jobs_for_non_celery_based_configuration(app: flask.app.Flask, scheduler ) -def _add_jobs_relevant_for_all_celery_configurations(app: flask.app.Flask, scheduler: BaseScheduler) -> None: +def _add_jobs_that_should_run_regardless_of_celery_config(app: flask.app.Flask, scheduler: BaseScheduler) -> None: not_started_polling_interval_in_seconds = app.config[ "SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS" ] # TODO: see if we can queue with celery instead on celery based configuration + # NOTE: pass in additional_processing_identifier if we move to celery scheduler.add_job( BackgroundProcessingService(app).process_message_instances_with_app_context, "interval", diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py index 356c39c2..7a005f1e 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/config/default.py @@ -207,6 +207,8 @@ config_from_env("SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY", default=False) # we load the CustomBpmnScriptEngine at import time, where we do not have access to current_app, # so instead of using config, we use os.environ directly over there. # config_from_env("SPIFFWORKFLOW_BACKEND_USE_RESTRICTED_SCRIPT_ENGINE", default=True) + + # adds the ProxyFix to Flask on http by processing the 'X-Forwarded-Proto' header # to make SpiffWorkflow aware that it should return https for the server urls etc rather than http. config_from_env("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default=False) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py index d8578613..1ef1991c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/messages_controller.py @@ -65,6 +65,7 @@ def message_instance_list( def message_send( message_name: str, body: dict[str, Any], + execution_mode: str | None = None, ) -> flask.wrappers.Response: if "payload" not in body: raise ( @@ -87,7 +88,7 @@ def message_send( db.session.add(message_instance) db.session.commit() try: - receiver_message = MessageService.correlate_send_message(message_instance) + receiver_message = MessageService.correlate_send_message(message_instance, execution_mode=execution_mode) except Exception as e: db.session.delete(message_instance) db.session.commit() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index 60e03607..f5117a20 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -1,8 +1,14 @@ +import os + from SpiffWorkflow.bpmn import BpmnEvent # type: ignore from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty # type: ignore from SpiffWorkflow.bpmn.specs.mixins import StartEventMixin # type: ignore from SpiffWorkflow.spiff.specs.event_definitions import MessageEventDefinition # type: ignore +from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import ( + queue_process_instance_if_appropriate, +) +from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.message_instance import MessageInstanceModel from spiffworkflow_backend.models.message_instance import MessageStatuses @@ -12,6 +18,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.process_instance_processor import CustomBpmnScriptEngine from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor +from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService from spiffworkflow_backend.services.user_service import UserService @@ -22,7 +29,11 @@ class MessageServiceError(Exception): class MessageService: @classmethod - def correlate_send_message(cls, message_instance_send: MessageInstanceModel) -> MessageInstanceModel | None: + def correlate_send_message( + cls, + message_instance_send: MessageInstanceModel, + execution_mode: str | None = None, + ) -> MessageInstanceModel | None: """Connects the given send message to a 'receive' message if possible. :param message_instance_send: @@ -56,7 +67,9 @@ class MessageService: user: UserModel | None = message_instance_send.user if user is None: user = UserService.find_or_create_system_user() - receiving_process = MessageService.start_process_with_message(message_triggerable_process_model, user) + receiving_process = MessageService.start_process_with_message( + message_triggerable_process_model, user, execution_mode=execution_mode + ) message_instance_receive = MessageInstanceModel.query.filter_by( process_instance_id=receiving_process.id, message_type="receive", @@ -115,15 +128,32 @@ class MessageService: cls, message_triggerable_process_model: MessageTriggerableProcessModel, user: UserModel, + execution_mode: str | None = None, ) -> ProcessInstanceModel: """Start up a process instance, so it is ready to catch the event.""" + if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true": + raise MessageServiceError( + "Calling start_process_with_message in a celery worker. This is not supported! (We may need to add" + " additional_processing_identifier to this code path." + ) + process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier( message_triggerable_process_model.process_model_identifier, user, ) - processor_receive = ProcessInstanceProcessor(process_instance_receive) - cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model) - processor_receive.do_engine_steps(save=True) + with ProcessInstanceQueueService.dequeued(process_instance_receive): + processor_receive = ProcessInstanceProcessor(process_instance_receive) + cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model) + processor_receive.save() + + if not queue_process_instance_if_appropriate( + process_instance_receive, execution_mode=execution_mode + ) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive): + execution_strategy_name = None + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + execution_strategy_name = "greedy" + processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) + return process_instance_receive @classmethod diff --git a/spiffworkflow-frontend/src/components/ProcessInstanceCurrentTaskInfo.tsx b/spiffworkflow-frontend/src/components/ProcessInstanceCurrentTaskInfo.tsx index 3df3aa2b..de565154 100644 --- a/spiffworkflow-frontend/src/components/ProcessInstanceCurrentTaskInfo.tsx +++ b/spiffworkflow-frontend/src/components/ProcessInstanceCurrentTaskInfo.tsx @@ -24,6 +24,7 @@ export default function ProcessInstanceCurrentTaskInfo({ HttpService.makeCallToBackend({ path: `/tasks/${processInstance.id}/instruction`, successCallback: processTaskResult, + failureCallback: (error: any) => console.error(error.message), }); }, [processInstance]);