added a debug endpoint to check celery backend and added return result to celery task w/ burnettk (#1280)

Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
jasquat 2024-03-27 00:31:01 +00:00 committed by GitHub
parent e724cc5856
commit 37902277cc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 64 additions and 2 deletions

View File

@ -273,6 +273,31 @@ paths:
schema: schema:
$ref: "#/components/schemas/OkTrue" $ref: "#/components/schemas/OkTrue"
/debug/celery-backend-results/{process_instance_id}:
parameters:
- name: process_instance_id
in: path
required: true
schema:
type: integer
- name: include_all_failures
in: query
required: false
schema:
type: boolean
get:
operationId: spiffworkflow_backend.routes.debug_controller.celery_backend_results
summary: Returns the results from the celery backend for a process instance
tags:
- Status
responses:
"200":
description: Returns results.
content:
application/json:
schema:
$ref: "#/components/schemas/OkTrue"
/onboarding: /onboarding:
get: get:
operationId: spiffworkflow_backend.routes.onboarding_controller.get_onboarding operationId: spiffworkflow_backend.routes.onboarding_controller.get_onboarding

View File

@ -18,7 +18,7 @@ ten_minutes = 60 * 10
@shared_task(ignore_result=False, time_limit=ten_minutes) @shared_task(ignore_result=False, time_limit=ten_minutes)
def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> None: def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict:
proc_index = current_process().index proc_index = current_process().index
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index) ProcessInstanceLockService.set_thread_local_locking_context("celery:worker", additional_processing_identifier=proc_index)
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
@ -40,11 +40,13 @@ def celery_task_process_instance_run(process_instance_id: int, task_guid: str |
db.session.commit() db.session.commit()
if task_runnability == TaskRunnability.has_ready_tasks: if task_runnability == TaskRunnability.has_ready_tasks:
queue_process_instance_if_appropriate(process_instance) queue_process_instance_if_appropriate(process_instance)
return {"ok": True, "process_instance_id": process_instance_id, "task_guid": task_guid}
except ProcessInstanceIsAlreadyLockedError as exception: except ProcessInstanceIsAlreadyLockedError as exception:
current_app.logger.info( current_app.logger.info(
f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:" f"Could not run process instance with worker: {current_app.config['PROCESS_UUID']} - {proc_index}. Error was:"
f" {str(exception)}" f" {str(exception)}"
) )
return {"ok": False, "process_instance_id": process_instance_id, "task_guid": task_guid, "exception": str(exception)}
except Exception as e: except Exception as e:
db.session.rollback() # in case the above left the database with a bad transaction db.session.rollback() # in case the above left the database with a bad transaction
error_message = ( error_message = (

View File

@ -1,8 +1,13 @@
"""APIs for dealing with process groups, process models, and process instances.""" import json
import redis
from flask import current_app
from flask import jsonify
from flask import make_response from flask import make_response
from flask import request from flask import request
from flask.wrappers import Response from flask.wrappers import Response
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.services.authentication_service import AuthenticationService from spiffworkflow_backend.services.authentication_service import AuthenticationService
from spiffworkflow_backend.services.monitoring_service import get_version_info_data from spiffworkflow_backend.services.monitoring_service import get_version_info_data
@ -17,3 +22,33 @@ def version_info() -> Response:
def url_info() -> Response: def url_info() -> Response:
return make_response({"url": request.url, "cache": AuthenticationService.ENDPOINT_CACHE}, 200) return make_response({"url": request.url, "cache": AuthenticationService.ENDPOINT_CACHE}, 200)
def celery_backend_results(
process_instance_id: int,
include_all_failures: bool = True,
) -> Response:
redis_client = redis.StrictRedis.from_url(current_app.config["SPIFFWORKFLOW_BACKEND_CELERY_RESULT_BACKEND"])
results: list = redis_client.keys("celery-task-meta-*") # type: ignore
if len(results) > 1000:
raise ApiError(
error_code="too_many_entries",
message=f"There are too many redis entries. You probably shouldn't use this api method. count {len(results)}",
status_code=400,
)
result_values = redis_client.mget(results) # type: ignore
return_body = []
for value in result_values:
if value is None:
continue
value_dict = json.loads(value.decode("utf-8"))
if (
value_dict["result"]
and "process_instance_id" in value_dict["result"]
and value_dict["result"]["process_instance_id"] == process_instance_id
) or (value_dict["status"] == "FAILURE" and include_all_failures is True):
return_body.append(value_dict)
return make_response(jsonify(return_body), 200)