From 4dbcd8a6556a57b0708ddf83f857572d1d4943d8 Mon Sep 17 00:00:00 2001 From: jasquat <2487833+jasquat@users.noreply.github.com> Date: Fri, 21 Jun 2024 09:51:36 -0400 Subject: [PATCH] improved-celery-logging (#1785) * updated logging for celery workers w/ burnettk * fixed indentation error --------- Co-authored-by: jasquat --- .../celery_tasks/process_instance_task.py | 6 +++ .../background_processing/celery_worker.py | 5 ++- .../services/logging_service.py | 42 +++++++++++++------ .../services/process_instance_lock_service.py | 1 - 4 files changed, 39 insertions(+), 15 deletions(-) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py index 6b26c2a4..038ff8d1 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_tasks/process_instance_task.py @@ -29,6 +29,12 @@ class SpiffCeleryWorkerError(Exception): @shared_task(ignore_result=False, time_limit=ten_minutes) def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict: proc_index = current_process().index + + message = f"celery_task_process_instance_run: process_instance_id: {process_instance_id}" + if task_guid: + message += f" task_guid: {task_guid}" + current_app.logger.info(message) + ProcessInstanceLockService.set_thread_local_locking_context("celery:worker") process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first() diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_worker.py b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_worker.py index 0bbda886..49f1ed0d 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_worker.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/background_processing/celery_worker.py @@ -25,4 +25,7 @@ def setup_loggers(logger: Any, *args: Any, **kwargs: Any) -> None: stdout_handler = logging.StreamHandler(sys.stdout) stdout_handler.setFormatter(log_formatter) logger.addHandler(stdout_handler) - setup_logger_for_app(the_flask_app, logger) + setup_logger_for_app(the_flask_app, logger, force_run_with_celery=True) + # this handler is getting added somewhere but not sure where so set its + # level really high since we do not need it + logging.getLogger("spiff").setLevel(logging.CRITICAL) diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/logging_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/logging_service.py index 247613bd..56bfe976 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/logging_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/logging_service.py @@ -1,5 +1,6 @@ import json import logging +import os import re import sys from typing import Any @@ -83,7 +84,14 @@ class JsonFormatter(logging.Formatter): return json.dumps(message_dict, default=str) -def setup_logger_for_app(app: Flask, primary_logger: Any) -> None: +def setup_logger_for_app(app: Flask, primary_logger: Any, force_run_with_celery: bool = False) -> None: + # In a celery worker it runs this code twice: once for the flask app and once again for the celery worker. + # The flask app adds a bunch of handlers that the celery worker's root.manager.loggerDict does not return + # however the handlers are still there with the wrong log level so this avoids running the logging + # code completely when flask runs it so the celery worker will run it only once. + if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") and not force_run_with_celery: + return + upper_log_level_string = app.config["SPIFFWORKFLOW_BACKEND_LOG_LEVEL"].upper() log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] @@ -101,7 +109,12 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None: spiff_logger_filehandler.setFormatter(log_formatter) # these loggers have been deemed too verbose to be useful - obscure_loggers_to_exclude_from_main_logging = ["connexion", "flask_cors.extension", "flask_cors.core", "sqlalchemy"] + obscure_loggers_to_exclude_from_main_logging = [ + "connexion", + "flask_cors.core", + "flask_cors.extension", + "sqlalchemy", + ] # if you actually want one of these excluded loggers, there is a config option to turn it on loggers_to_use = app.config.get("SPIFFWORKFLOW_BACKEND_LOGGERS_TO_USE", []) @@ -132,14 +145,14 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None: for name in primary_logger.root.manager.loggerDict: # use a regex so spiffworkflow_backend isn't filtered out if not re.match(r"^spiff\b", name): - sub_logger = logging.getLogger(name) - sub_logger.setLevel(log_level) + logger_for_name = logging.getLogger(name) + log_level_to_use = log_level if spiff_logger_filehandler: - sub_logger.handlers = [] - sub_logger.propagate = False - sub_logger.addHandler(spiff_logger_filehandler) + logger_for_name.handlers = [] + logger_for_name.propagate = False + logger_for_name.addHandler(spiff_logger_filehandler) else: - if len(sub_logger.handlers) < 1: + if len(logger_for_name.handlers) < 1: exclude_logger_name_from_logging = False for logger_to_exclude in obscure_loggers_to_exclude_from_main_logging: if name.startswith(logger_to_exclude): @@ -147,7 +160,7 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None: # it's very verbose so set all obscure loggers to ERROR if not in DEBUG if exclude_logger_name_from_logging or upper_log_level_string != "DEBUG": - sub_logger.setLevel("ERROR") + log_level_to_use = "ERROR" # only need to set the log level here if it is not already excluded from main logging if not exclude_logger_name_from_logging and upper_log_level_string == "DEBUG": @@ -156,13 +169,16 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None: if name.startswith(logger_to_exclude_from_debug): exclude_logger_name_from_debug = True if exclude_logger_name_from_debug: - sub_logger.setLevel("INFO") + log_level_to_use = "INFO" - sub_logger.addHandler(logging.StreamHandler(sys.stdout)) + logger_for_name.addHandler(logging.StreamHandler(sys.stdout)) - for the_handler in sub_logger.handlers: + for the_handler in logger_for_name.handlers: the_handler.setFormatter(log_formatter) - the_handler.setLevel(log_level) + level_number = logging.getLevelName(log_level_to_use) + the_handler.setLevel(level_number) + level_number = logging.getLevelName(log_level_to_use) + logger_for_name.setLevel(level_number) def get_log_formatter(app: Flask) -> logging.Formatter: diff --git a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py index 99735631..da049c04 100644 --- a/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py +++ b/spiffworkflow-backend/src/spiffworkflow_backend/services/process_instance_lock_service.py @@ -75,7 +75,6 @@ class ProcessInstanceLockService: @classmethod def has_lock(cls, process_instance_id: int) -> bool: ctx = cls.get_thread_local_locking_context() - current_app.logger.info(f"THREAD LOCK: {ctx}") return process_instance_id in ctx["locks"] @classmethod