use the greedy strategy when running messages from the background processor w/ burnettk (#1798)
Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
df91abef7c
commit
d2e005022b
|
@ -30,7 +30,7 @@ backend_client_secret = os.getenv("BACKEND_CLIENT_secret", "JXeQExm0JhQPLumgHtII
|
|||
openid_token_url = os.getenv("OPENID_TOKEN_URL")
|
||||
keycloak_base_url = os.getenv("KEYCLOAK_BASE_URL")
|
||||
if openid_token_url is None:
|
||||
if keycloak_base_url is None:
|
||||
if keycloak_base_url is not None:
|
||||
if "spiffworkflow.org" in backend_base_url:
|
||||
pattern = r".*api\.(\w+\.spiffworkflow.org).*"
|
||||
match = re.search(pattern, backend_base_url)
|
||||
|
@ -39,8 +39,10 @@ if openid_token_url is None:
|
|||
env_domain = match.group(1)
|
||||
keycloak_base_url = "https://keycloak.${env_domain}"
|
||||
elif "localhost:7000" in backend_base_url:
|
||||
keycloak_base_url = "http://localhost:7002"
|
||||
openid_token_url = f"{keycloak_base_url}/realms/{realm_name}/protocol/openid-connect/token"
|
||||
keycloak_base_url = "http://localhost:7000"
|
||||
openid_token_url = f"{keycloak_base_url}/realms/{realm_name}/protocol/openid-connect/token"
|
||||
else:
|
||||
openid_token_url = f"{backend_base_url}/openid/token"
|
||||
|
||||
|
||||
def get_auth_token_object() -> dict:
|
||||
|
|
|
@ -1,17 +1,17 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
function error_handler() {
|
||||
>&2 echo "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}."
|
||||
echo >&2 "Exited with BAD EXIT CODE '${2}' in ${0} script at line: ${1}."
|
||||
exit "$2"
|
||||
}
|
||||
trap 'error_handler ${LINENO} $?' ERR
|
||||
set -o errtrace -o errexit -o nounset -o pipefail
|
||||
|
||||
script_dir="$( cd -- "$(dirname "$0")" >/dev/null 2>&1 ; pwd -P )"
|
||||
script_dir="$(
|
||||
cd -- "$(dirname "$0")" >/dev/null 2>&1
|
||||
pwd -P
|
||||
)"
|
||||
|
||||
if [[ -z "${KEYCLOAK_BASE_URL:-}" ]]; then
|
||||
export KEYCLOAK_BASE_URL=http://localhost:7002
|
||||
fi
|
||||
if [[ -z "${BACKEND_BASE_URL:-}" ]]; then
|
||||
export BACKEND_BASE_URL=http://localhost:7000
|
||||
fi
|
||||
|
@ -21,7 +21,7 @@ username="${2:-admin}"
|
|||
password="${3:-admin}"
|
||||
realm_name="${4:-spiffworkflow}"
|
||||
if [[ -z "${message_identifier}" ]]; then
|
||||
>&2 echo "usage: $(basename "$0") [message_identifier] [username: OPTONAL] [password: OPTONAL] [realm_name: OPTONAL]"
|
||||
echo >&2 "usage: $(basename "$0") [message_identifier] [username: OPTONAL] [password: OPTONAL] [realm_name: OPTONAL]"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
|
@ -29,7 +29,7 @@ function check_result_for_error() {
|
|||
local result="$1"
|
||||
error_code=$(jq '.error_code' <<<"$result")
|
||||
if [[ -n "$error_code" && "$error_code" != "null" ]]; then
|
||||
>&2 echo "ERROR: Failed to run process instance. Received error: $result"
|
||||
echo >&2 "ERROR: Failed to run process instance. Received error: $result"
|
||||
exit 1
|
||||
fi
|
||||
}
|
||||
|
|
|
@ -50,7 +50,7 @@ class BackgroundProcessingService:
|
|||
"""Since this runs in a scheduler, we need to specify the app context as well."""
|
||||
with self.app.app_context():
|
||||
ProcessInstanceLockService.set_thread_local_locking_context("bg:messages")
|
||||
MessageService.correlate_all_message_instances()
|
||||
MessageService.correlate_all_message_instances(execution_mode="synchronous")
|
||||
|
||||
def remove_stale_locks(self) -> None:
|
||||
"""If something has been locked for a certain amount of time it is probably stale so unlock it."""
|
||||
|
|
|
@ -48,7 +48,6 @@ from spiffworkflow_backend.services.authorization_service import AuthorizationSe
|
|||
from spiffworkflow_backend.services.error_handling_service import ErrorHandlingService
|
||||
from spiffworkflow_backend.services.git_service import GitCommandError
|
||||
from spiffworkflow_backend.services.git_service import GitService
|
||||
from spiffworkflow_backend.services.message_service import MessageService
|
||||
from spiffworkflow_backend.services.process_instance_processor import ProcessInstanceProcessor
|
||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsAlreadyLockedError
|
||||
from spiffworkflow_backend.services.process_instance_queue_service import ProcessInstanceIsNotEnqueuedError
|
||||
|
@ -702,9 +701,6 @@ def _process_instance_run(
|
|||
) from e
|
||||
raise e
|
||||
|
||||
if not current_app.config["SPIFFWORKFLOW_BACKEND_RUN_BACKGROUND_SCHEDULER_IN_CREATE_APP"]:
|
||||
MessageService.correlate_all_message_instances()
|
||||
|
||||
|
||||
def _process_instance_create(
|
||||
process_model_identifier: str,
|
||||
|
|
|
@ -72,7 +72,7 @@ class MessageService:
|
|||
if user is None:
|
||||
user = UserService.find_or_create_system_user()
|
||||
receiving_process_instance = MessageService.start_process_with_message(
|
||||
message_triggerable_process_model, user
|
||||
message_triggerable_process_model, user, execution_mode=execution_mode
|
||||
)
|
||||
message_instance_receive = MessageInstanceModel.query.filter_by(
|
||||
process_instance_id=receiving_process_instance.id,
|
||||
|
@ -127,18 +127,22 @@ class MessageService:
|
|||
raise exception
|
||||
|
||||
@classmethod
|
||||
def correlate_all_message_instances(cls) -> None:
|
||||
def correlate_all_message_instances(
|
||||
cls,
|
||||
execution_mode: str | None = None,
|
||||
) -> None:
|
||||
"""Look at ALL the Send and Receive Messages and attempt to find correlations."""
|
||||
message_instances_send = MessageInstanceModel.query.filter_by(message_type="send", status="ready").all()
|
||||
|
||||
for message_instance_send in message_instances_send:
|
||||
cls.correlate_send_message(message_instance_send)
|
||||
cls.correlate_send_message(message_instance_send, execution_mode=execution_mode)
|
||||
|
||||
@classmethod
|
||||
def start_process_with_message(
|
||||
cls,
|
||||
message_triggerable_process_model: MessageTriggerableProcessModel,
|
||||
user: UserModel,
|
||||
execution_mode: str | None = None,
|
||||
) -> ProcessInstanceModel:
|
||||
"""Start up a process instance, so it is ready to catch the event."""
|
||||
receiving_process_instance = ProcessInstanceService.create_process_instance_from_process_model_identifier(
|
||||
|
@ -150,7 +154,10 @@ class MessageService:
|
|||
cls._cancel_non_matching_start_events(processor_receive, message_triggerable_process_model)
|
||||
processor_receive.save()
|
||||
|
||||
processor_receive.do_engine_steps(save=True)
|
||||
execution_strategy_name = None
|
||||
if execution_mode == ProcessInstanceExecutionMode.synchronous.value:
|
||||
execution_strategy_name = "greedy"
|
||||
processor_receive.do_engine_steps(save=True, execution_strategy_name=execution_strategy_name)
|
||||
|
||||
return receiving_process_instance
|
||||
|
||||
|
|
Loading…
Reference in New Issue