diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml index 6af522772..942185d1f 100755 --- a/spiffworkflow-backend/src/spiffworkflow_backend/api.yml +++ b/spiffworkflow-backend/src/spiffworkflow_backend/api.yml @@ -273,6 +273,31 @@ paths: schema: $ref: "#/components/schemas/OkTrue" + /debug/celery-backend-results/{process_instance_id}: + parameters: + - name: process_instance_id + in: path + required: true + schema: + type: integer + - name: include_all_failures + in: query + required: false + schema: + type: boolean + get: + operationId: spiffworkflow_backend.routes.debug_controller.celery_backend_results + summary: Returns the results from the celery backend for a process instance + tags: + - Status + responses: + "200": + description: Returns results. + content: + application/json: + schema: + $ref: "#/components/schemas/OkTrue" + /onboarding: get: operationId: spiffworkflow_backend.routes.onboarding_controller.get_onboarding diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index f080eb0ac..3484790d9 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -18,7 +18,7 @@ ten_minutes = 60 * 10 @shared_task(ignore_result=False, time_limit=ten_minutes) -def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> None: +def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: proc_index = current_process().index ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index) process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() @@ -40,11 +40,13 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str | db.session.commit() if task_runnability == TaskRunnability.has_ready_tasks: queue_process_instance_if_appropriate(process_instance) + return {"ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid} except ProcessInstanceIsAlreadyLockedError as exception: current_app.logger.info( f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:" f" {str(exception)}" ) + return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)} except Exception as e: db.session.rollback() # in case the above left the database with a bad transaction error_message = ( diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/routes/debug_controller.py b/spiffworkflow-backend/src/spiffworkflow_backend/routes/debug_controller.py index 0df120b63..c5e48e59c 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/routes/debug_controller.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/routes/debug_controller.py @@ -1,8 +1,13 @@ -"""APIs for dealing with process groups, process models, and process instances.""" +import json + +import redis +from flask import current_app +from flask import jsonify from flask import make_response from flask import request from flask.wrappers import Response +from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.services.authentication_service import AuthenticationService from spiffworkflow_backend.services.monitoring_service import get_version_info_data @@ -17,3 +22,33 @@ def version_info() -> Response: def url_info() -> Response: return make_response({"url": request.url, "cache": AuthenticationService.ENDPOINT_CACHE}, 200) + + +def celery_backend_results( + process_instance_id: int, + include_all_failures: bool = True, +) -> Response: + redis_client = redis.StrictRedis.from_url(current_app.config["SPIFFWORKFLOW_BACKEND_CELERY_RESULT_BACKEND"]) + results: list = redis_client.keys("celery-task-meta-*") # type: ignore + if len(results) > 1000: + raise ApiError( + error_code="too_many_entries", + message=f"There are too many redis entries. You probably shouldn't use this api method. count {len(results)}", + status_code=400, + ) + + result_values = redis_client.mget(results) # type: ignore + + return_body = [] + for value in result_values: + if value is None: + continue + value_dict = json.loads(value.decode("utf-8")) + if ( + value_dict["result"] + and "process_instance_id" in value_dict["result"] + and value_dict["result"]["process_instance_id"] == process_instance_id + ) or (value_dict["status"] == "FAILURE" and include_all_failures is True): + return_body.append(value_dict) + + return make_response(jsonify(return_body), 200)