async-support-on-message-start (#1173)

* added async and sync support for the message start api w/ burnettk

* removed debug dump call w/ burnettk

---------

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-03-06 16:43:06 -05:00 committed by GitHub
parent a762efca33
commit bc3708bb00
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 58 additions and 15 deletions

View File

@ -1,12 +1,8 @@
import json
import os
import sys
from spiffworkflow_backend import create_app
from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.services.process_instance_processor import (
ProcessInstanceProcessor,
)
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
def main(process_instance_id: str) -> None:
@ -15,7 +11,7 @@ def main(process_instance_id: str) -> None:
with app.app_context():
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
file_path = f"/var/tmp/{process_instance_id}_bpmn_json.json"
file_path = f"/var/tmp/{process_instance_id}_bpmn_json.json" # noqa: S108
if not process_instance:
raise Exception(f"Could not find a process instance with id: {process_instance_id}")

View File

@ -1,7 +1,7 @@
#!/usr/bin/env bash
function error_handler() {
>&2 echo "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}."
echo >&2 "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}."
exit "$2"
}
trap 'error_handler ${LINENO} $?' ERR
@ -10,4 +10,7 @@ set -o errtrace -o errexit -o nounset -o pipefail
export SPIFFWORKFLOW_BACKEND_CELERY_ENABLED=true
export SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER_IN_CREATE_APP=false
# so we can raise if calling unsafe code in celery
export SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER=true
poetry run celery -A src.spiffworkflow_backend.background_processing.celery_worker worker --loglevel=info

View File

@ -2538,6 +2538,15 @@ paths:
description: The unique name of the message.
schema:
type: string
- name: execution_mode
in: query
required: false
description: Either run in "synchronous" or "asynchronous" mode.
schema:
type: string
enum:
- synchronous
- asynchronous
post:
tags:
- Messages

View File

@ -33,7 +33,7 @@ def start_apscheduler(app: flask.app.Flask, scheduler_class: BaseScheduler = Bac
else:
_add_jobs_for_non_celery_based_configuration(app, scheduler)
_add_jobs_relevant_for_all_celery_configurations(app, scheduler)
_add_jobs_that_should_run_regardless_of_celery_config(app, scheduler)
scheduler.start()
@ -78,12 +78,13 @@ def _add_jobs_for_non_celery_based_configuration(app: flask.app.Flask, scheduler
)
def _add_jobs_relevant_for_all_celery_configurations(app: flask.app.Flask, scheduler: BaseScheduler) -> None:
def _add_jobs_that_should_run_regardless_of_celery_config(app: flask.app.Flask, scheduler: BaseScheduler) -> None:
not_started_polling_interval_in_seconds = app.config[
"SPIFFWORKFLOW_BACKEND_BACKGROUND_SCHEDULER_NOT_STARTED_POLLING_INTERVAL_IN_SECONDS"
]
# TODO: see if we can queue with celery instead on celery based configuration
# NOTE: pass in additional_processing_identifier if we move to celery
scheduler.add_job(
BackgroundProcessingService(app).process_message_instances_with_app_context,
"interval",

View File

@ -207,6 +207,8 @@ config_from_env("SPIFFWORKFLOW_BACKEND_DEBUG_TASK_CONSISTENCY", default=False)
# we load the CustomBpmnScriptEngine at import time, where we do not have access to current_app,
# so instead of using config, we use os.environ directly over there.
# config_from_env("SPIFFWORKFLOW_BACKEND_USE_RESTRICTED_SCRIPT_ENGINE", default=True)
# adds the ProxyFix to Flask on http by processing the 'X-Forwarded-Proto' header
# to make SpiffWorkflow aware that it should return https for the server urls etc rather than http.
config_from_env("SPIFFWORKFLOW_BACKEND_USE_WERKZEUG_MIDDLEWARE_PROXY_FIX", default=False)

View File

@ -65,6 +65,7 @@ def message_instance_list(
def message_send(
message_name: str,
body: dict[str, Any],
execution_mode: str | None = None,
) -> flask.wrappers.Response:
if "payload" not in body:
raise (
@ -87,7 +88,7 @@ def message_send(
db.session.add(message_instance)
db.session.commit()
try:
receiver_message = MessageService.correlate_send_message(message_instance)
receiver_message = MessageService.correlate_send_message(message_instance, execution_mode=execution_mode)
except Exception as e:
db.session.delete(message_instance)
db.session.commit()

View File

@ -1,8 +1,14 @@
import os
from SpiffWorkflow.bpmn import BpmnEvent # type: ignore
from SpiffWorkflow.bpmn.specs.event_definitions.message import CorrelationProperty # type: ignore
from SpiffWorkflow.bpmn.specs.mixins import StartEventMixin # type: ignore
from SpiffWorkflow.spiff.specs.event_definitions import MessageEventDefinition # type: ignore
from spiffworkflow_backend.background_processing.celery_tasks.process_instance_task_producer import (
queue_process_instance_if_appropriate,
)
from spiffworkflow_backend.helpers.spiff_enum import ProcessInstanceExecutionMode
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.message_instance import MessageInstanceModel
from spiffworkflow_backend.models.message_instance import MessageStatuses
@ -12,6 +18,7 @@ from spiffworkflow_backend.models.process_instance import ProcessInstanceModel
from spiffworkflow_backend.models.user import UserModel
from spiffworkflow_backend.services.process_instance_processor import CustomBpmnScriptEngine
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceQueueService
from spiffworkflow_backend.services.process_instance_service import ProcessInstanceService
from spiffworkflow_backend.services.user_service import UserService
@ -22,7 +29,11 @@ class MessageServiceError(Exception):
class MessageService:
@classmethod
def correlate_send_message(cls, message_instance_send: MessageInstanceModel) -> MessageInstanceModel | None:
def correlate_send_message(
cls,
message_instance_send: MessageInstanceModel,
execution_mode: str | None = None,
) -> MessageInstanceModel | None:
"""Connects the given send message to a 'receive' message if possible.
:param message_instance_send:
@ -56,7 +67,9 @@ class MessageService:
user: UserModel | None = message_instance_send.user
if user is None:
user = UserService.find_or_create_system_user()
receiving_process = MessageService.start_process_with_message(message_triggerable_process_model, user)
receiving_process = MessageService.start_process_with_message(
message_triggerable_process_model, user, execution_mode=execution_mode
)
message_instance_receive = MessageInstanceModel.query.filter_by(
process_instance_id=receiving_process.id,
message_type="receive",
@ -115,15 +128,32 @@ class MessageService:
cls,
message_triggerable_process_model: MessageTriggerableProcessModel,
user: UserModel,
execution_mode: str | None = None,
) -> ProcessInstanceModel:
"""Start up a process instance, so it is ready to catch the event."""
if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") == "true":
raise MessageServiceError(
"Calling start_process_with_message in a celery worker. This is not supported! (We may need to add"
" additional_processing_identifier to this code path."
)
process_instance_receive = ProcessInstanceService.create_process_instance_from_process_model_identifier(
message_triggerable_process_model.process_model_identifier,
user,
)
with ProcessInstanceQueueService.dequeued(process_instance_receive):
processor_receive = ProcessInstanceProcessor(process_instance_receive)
cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model)
processor_receive.do_engine_steps(save=True)
processor_receive.save()
if not queue_process_instance_if_appropriate(
process_instance_receive, execution_mode=execution_mode
) and not ProcessInstanceQueueService.is_enqueued_to_run_in_the_future(process_instance_receive):
execution_strategy_name = None
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
execution_strategy_name = "greedy"
processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
return process_instance_receive
@classmethod

View File

@ -24,6 +24,7 @@ export default function ProcessInstanceCurrentTaskInfo({
HttpService.makeCallToBackend({
path: `/tasks/${processInstance.id}/instruction`,
successCallback: processTaskResult,
failureCallback: (error: any) => console.error(error.message),
});
}, [processInstance]);