From d2e005022bf95448b8bd93f94431009a3f6fc3a9 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Fri, 21 Jun 2024 16:22:42 -0400 Subject: [PATCH] use the greedy strategy when running messages from the background processor w/ burnettk (#1798) Co-authored-by: jasquat --- spiffworkflow-backend/bin/get_token | 8 +++++--- .../bin/run_message_start_event_with_api | 14 +++++++------- .../background_processing_service.py | 2 +- .../routes/process_instances_controller.py | 4 ---- .../services/message_service.py | 15 +++++++++++---- 5 files changed, 24 insertions(+), 19 deletions(-) diff --git a/spiffworkflow-backend/bin/get_token b/spiffworkflow-backend/bin/get_token index 9812ce378..03ae6bf6f 100755 --- a/spiffworkflow-backend/bin/get_token +++ b/spiffworkflow-backend/bin/get_token @@ -30,7 +30,7 @@ backend_client_secret = os.getenv("BACKEND_CLIENT_secret", "JXeQExm0JhQPLumgHtII openid_token_url = os.getenv("OPENID_TOKEN_URL") keycloak_base_url = os.getenv("KEYCLOAK_BASE_URL") if openid_token_url is None: - if keycloak_base_url is None: + if keycloak_base_url is not None: if "spiffworkflow.org" in backend_base_url: pattern = r".*api\.(\w+\.spiffworkflow.org).*" match = re.search(pattern, backend_base_url) @@ -39,8 +39,10 @@ if openid_token_url is None: env_domain = match.group(1) keycloak_base_url = "https://keycloak.${env_domain}" elif "localhost:7000" in backend_base_url: - keycloak_base_url = "http://localhost:7002" - openid_token_url = f"{keycloak_base_url}/realms/{realm_name}/protocol/openid-connect/token" + keycloak_base_url = "http://localhost:7000" + openid_token_url = f"{keycloak_base_url}/realms/{realm_name}/protocol/openid-connect/token" + else: + openid_token_url = f"{backend_base_url}/openid/token" def get_auth_token_object() -> dict: diff --git a/spiffworkflow-backend/bin/run_message_start_event_with_api b/spiffworkflow-backend/bin/run_message_start_event_with_api index 2e5c7064e..da7db6e58 100755 --- a/spiffworkflow-backend/bin/run_message_start_event_with_api +++ b/spiffworkflow-backend/bin/run_message_start_event_with_api @@ -1,17 +1,17 @@ #!/usr/bin/env bash function error_handler() { - >&2 echo "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}." + echo >&2 "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}." exit "$2" } trap 'error_handler ${LINENO} $?' ERR set -o errtrace -o errexit -o nounset -o pipefail -script_dir="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )" +script_dir="$( + cd -- "$(dirname "$0")" >/dev/null 2>&1 + pwd -P +)" -if [[ -z "${KEYCLOAK_BASE_URL:-}" ]]; then - export KEYCLOAK_BASE_URL=http://localhost:7002 -fi if [[ -z "${BACKEND_BASE_URL:-}" ]]; then export BACKEND_BASE_URL=http://localhost:7000 fi @@ -21,7 +21,7 @@ username="${2:-admin}" password="${3:-admin}" realm_name="${4:-spiffworkflow}" if [[ -z "${message_identifier}" ]]; then - >&2 echo "usage: $(basename "$0") [message_identifier] [username: OPTONAL] [password: OPTONAL] [realm_name: OPTONAL]" + echo >&2 "usage: $(basename "$0") [message_identifier] [username: OPTONAL] [password: OPTONAL] [realm_name: OPTONAL]" exit 1 fi @@ -29,7 +29,7 @@ function check_result_for_error() { local result="$1" error_code=$(jq '.error_code' <<<"$result") if [[ -n "$error_code" && "$error_code" != "null" ]]; then - >&2 echo "ERROR: Failed to run process instance. Received error: $result" + echo >&2 "ERROR: Failed to run process instance. Received error: $result" exit 1 fi } diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py index c8c565d1c..ab70089fc 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/background_processing_service.py @@ -50,7 +50,7 @@ class BackgroundProcessingService: """Since this runs in a scheduler, we need to specify the app context as well.""" with self.app.app_context(): ProcessInstanceLockService.set_thread_local_locking_context("bg:messages") - MessageService.correlate_all_message_instances() + MessageService.correlate_all_message_instances(execution_mode="synchronous") def remove_stale_locks(self) -> None: """If something has been locked for a certain amount of time it is probably stale so unlock it.""" diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py index 302a146e0..735e525e5 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/process_instances_controller.py @@ -48,7 +48,6 @@ from spiffworkflow_backend.services.authorization_service import AuthorizationSe from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService from spiffworkflow_backend.services.git_service import GitCommandError from spiffworkflow_backend.services.git_service import GitService -from spiffworkflow_backend.services.message_service import MessageService from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsNotEnqueuedError @@ -702,9 +701,6 @@ def _process_instance_run( ) from e raise e - if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER_IN_CREATE_APP"]: - MessageService.correlate_all_message_instances() - def _process_instance_create( process_model_identifier: str, diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py index b302506f4..dab6b0721 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/message_service.py @@ -72,7 +72,7 @@ class MessageService: if user is None: user = UserService.find_or_create_system_user() receiving_process_instance = MessageService.start_process_with_message( - message_triggerable_process_model, user + message_triggerable_process_model, user, execution_mode=execution_mode ) message_instance_receive = MessageInstanceModel.query.filter_by( process_instance_id=receiving_process_instance.id, @@ -127,18 +127,22 @@ class MessageService: raise exception @classmethod - def correlate_all_message_instances(cls) -> None: + def correlate_all_message_instances( + cls, + execution_mode: str | None = None, + ) -> None: """Look at ALL the Send and Receive Messages and attempt to find correlations.""" message_instances_send = MessageInstanceModel.query.filter_by(message_type="send", status="ready").all() for message_instance_send in message_instances_send: - cls.correlate_send_message(message_instance_send) + cls.correlate_send_message(message_instance_send, execution_mode=execution_mode) @classmethod def start_process_with_message( cls, message_triggerable_process_model: MessageTriggerableProcessModel, user: UserModel, + execution_mode: str | None = None, ) -> ProcessInstanceModel: """Start up a process instance, so it is ready to catch the event.""" receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier( @@ -150,7 +154,10 @@ class MessageService: cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model) processor_receive.save() - processor_receive.do_engine_steps(save=True) + execution_strategy_name = None + if execution_mode == ProcessInstanceExecutionMode.synchronous.value: + execution_strategy_name = "greedy" + processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name) return receiving_process_instance