mirror of
https://github.com/status-im/spiff-arena.git
synced 2025-03-01 01:40:42 +00:00
improved-celery-logging (#1785)
* updated logging for celery workers w/ burnettk * fixed indentation error --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
278fa239f4
commit
4dbcd8a655
@ -29,6 +29,12 @@ class SpiffCeleryWorkerError(Exception):
|
|||||||
@shared_task(ignore_result=False, time_limit=ten_minutes)
|
@shared_task(ignore_result=False, time_limit=ten_minutes)
|
||||||
def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict:
|
def celery_task_process_instance_run(process_instance_id: int, task_guid: str | None = None) -> dict:
|
||||||
proc_index = current_process().index
|
proc_index = current_process().index
|
||||||
|
|
||||||
|
message = f"celery_task_process_instance_run: process_instance_id: {process_instance_id}"
|
||||||
|
if task_guid:
|
||||||
|
message += f" task_guid: {task_guid}"
|
||||||
|
current_app.logger.info(message)
|
||||||
|
|
||||||
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker")
|
ProcessInstanceLockService.set_thread_local_locking_context("celery:worker")
|
||||||
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
|
process_instance = ProcessInstanceModel.query.filter_by(id=process_instance_id).first()
|
||||||
|
|
||||||
|
@ -25,4 +25,7 @@ def setup_loggers(logger: Any, *args: Any, **kwargs: Any) -> None:
|
|||||||
stdout_handler = logging.StreamHandler(sys.stdout)
|
stdout_handler = logging.StreamHandler(sys.stdout)
|
||||||
stdout_handler.setFormatter(log_formatter)
|
stdout_handler.setFormatter(log_formatter)
|
||||||
logger.addHandler(stdout_handler)
|
logger.addHandler(stdout_handler)
|
||||||
setup_logger_for_app(the_flask_app, logger)
|
setup_logger_for_app(the_flask_app, logger, force_run_with_celery=True)
|
||||||
|
# this handler is getting added somewhere but not sure where so set its
|
||||||
|
# level really high since we do not need it
|
||||||
|
logging.getLogger("spiff").setLevel(logging.CRITICAL)
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
import json
|
import json
|
||||||
import logging
|
import logging
|
||||||
|
import os
|
||||||
import re
|
import re
|
||||||
import sys
|
import sys
|
||||||
from typing import Any
|
from typing import Any
|
||||||
@ -83,7 +84,14 @@ class JsonFormatter(logging.Formatter):
|
|||||||
return json.dumps(message_dict, default=str)
|
return json.dumps(message_dict, default=str)
|
||||||
|
|
||||||
|
|
||||||
def setup_logger_for_app(app: Flask, primary_logger: Any) -> None:
|
def setup_logger_for_app(app: Flask, primary_logger: Any, force_run_with_celery: bool = False) -> None:
|
||||||
|
# In a celery worker it runs this code twice: once for the flask app and once again for the celery worker.
|
||||||
|
# The flask app adds a bunch of handlers that the celery worker's root.manager.loggerDict does not return
|
||||||
|
# however the handlers are still there with the wrong log level so this avoids running the logging
|
||||||
|
# code completely when flask runs it so the celery worker will run it only once.
|
||||||
|
if os.environ.get("SPIFFWORKFLOW_BACKEND_RUNNING_IN_CELERY_WORKER") and not force_run_with_celery:
|
||||||
|
return
|
||||||
|
|
||||||
upper_log_level_string = app.config["SPIFFWORKFLOW_BACKEND_LOG_LEVEL"].upper()
|
upper_log_level_string = app.config["SPIFFWORKFLOW_BACKEND_LOG_LEVEL"].upper()
|
||||||
log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
log_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
|
||||||
|
|
||||||
@ -101,7 +109,12 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None:
|
|||||||
spiff_logger_filehandler.setFormatter(log_formatter)
|
spiff_logger_filehandler.setFormatter(log_formatter)
|
||||||
|
|
||||||
# these loggers have been deemed too verbose to be useful
|
# these loggers have been deemed too verbose to be useful
|
||||||
obscure_loggers_to_exclude_from_main_logging = ["connexion", "flask_cors.extension", "flask_cors.core", "sqlalchemy"]
|
obscure_loggers_to_exclude_from_main_logging = [
|
||||||
|
"connexion",
|
||||||
|
"flask_cors.core",
|
||||||
|
"flask_cors.extension",
|
||||||
|
"sqlalchemy",
|
||||||
|
]
|
||||||
|
|
||||||
# if you actually want one of these excluded loggers, there is a config option to turn it on
|
# if you actually want one of these excluded loggers, there is a config option to turn it on
|
||||||
loggers_to_use = app.config.get("SPIFFWORKFLOW_BACKEND_LOGGERS_TO_USE", [])
|
loggers_to_use = app.config.get("SPIFFWORKFLOW_BACKEND_LOGGERS_TO_USE", [])
|
||||||
@ -132,14 +145,14 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None:
|
|||||||
for name in primary_logger.root.manager.loggerDict:
|
for name in primary_logger.root.manager.loggerDict:
|
||||||
# use a regex so spiffworkflow_backend isn't filtered out
|
# use a regex so spiffworkflow_backend isn't filtered out
|
||||||
if not re.match(r"^spiff\b", name):
|
if not re.match(r"^spiff\b", name):
|
||||||
sub_logger = logging.getLogger(name)
|
logger_for_name = logging.getLogger(name)
|
||||||
sub_logger.setLevel(log_level)
|
log_level_to_use = log_level
|
||||||
if spiff_logger_filehandler:
|
if spiff_logger_filehandler:
|
||||||
sub_logger.handlers = []
|
logger_for_name.handlers = []
|
||||||
sub_logger.propagate = False
|
logger_for_name.propagate = False
|
||||||
sub_logger.addHandler(spiff_logger_filehandler)
|
logger_for_name.addHandler(spiff_logger_filehandler)
|
||||||
else:
|
else:
|
||||||
if len(sub_logger.handlers) < 1:
|
if len(logger_for_name.handlers) < 1:
|
||||||
exclude_logger_name_from_logging = False
|
exclude_logger_name_from_logging = False
|
||||||
for logger_to_exclude in obscure_loggers_to_exclude_from_main_logging:
|
for logger_to_exclude in obscure_loggers_to_exclude_from_main_logging:
|
||||||
if name.startswith(logger_to_exclude):
|
if name.startswith(logger_to_exclude):
|
||||||
@ -147,7 +160,7 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None:
|
|||||||
|
|
||||||
# it's very verbose so set all obscure loggers to ERROR if not in DEBUG
|
# it's very verbose so set all obscure loggers to ERROR if not in DEBUG
|
||||||
if exclude_logger_name_from_logging or upper_log_level_string != "DEBUG":
|
if exclude_logger_name_from_logging or upper_log_level_string != "DEBUG":
|
||||||
sub_logger.setLevel("ERROR")
|
log_level_to_use = "ERROR"
|
||||||
|
|
||||||
# only need to set the log level here if it is not already excluded from main logging
|
# only need to set the log level here if it is not already excluded from main logging
|
||||||
if not exclude_logger_name_from_logging and upper_log_level_string == "DEBUG":
|
if not exclude_logger_name_from_logging and upper_log_level_string == "DEBUG":
|
||||||
@ -156,13 +169,16 @@ def setup_logger_for_app(app: Flask, primary_logger: Any) -> None:
|
|||||||
if name.startswith(logger_to_exclude_from_debug):
|
if name.startswith(logger_to_exclude_from_debug):
|
||||||
exclude_logger_name_from_debug = True
|
exclude_logger_name_from_debug = True
|
||||||
if exclude_logger_name_from_debug:
|
if exclude_logger_name_from_debug:
|
||||||
sub_logger.setLevel("INFO")
|
log_level_to_use = "INFO"
|
||||||
|
|
||||||
sub_logger.addHandler(logging.StreamHandler(sys.stdout))
|
logger_for_name.addHandler(logging.StreamHandler(sys.stdout))
|
||||||
|
|
||||||
for the_handler in sub_logger.handlers:
|
for the_handler in logger_for_name.handlers:
|
||||||
the_handler.setFormatter(log_formatter)
|
the_handler.setFormatter(log_formatter)
|
||||||
the_handler.setLevel(log_level)
|
level_number = logging.getLevelName(log_level_to_use)
|
||||||
|
the_handler.setLevel(level_number)
|
||||||
|
level_number = logging.getLevelName(log_level_to_use)
|
||||||
|
logger_for_name.setLevel(level_number)
|
||||||
|
|
||||||
|
|
||||||
def get_log_formatter(app: Flask) -> logging.Formatter:
|
def get_log_formatter(app: Flask) -> logging.Formatter:
|
||||||
|
@ -75,7 +75,6 @@ class ProcessInstanceLockService:
|
|||||||
@classmethod
|
@classmethod
|
||||||
def has_lock(cls, process_instance_id: int) -> bool:
|
def has_lock(cls, process_instance_id: int) -> bool:
|
||||||
ctx = cls.get_thread_local_locking_context()
|
ctx = cls.get_thread_local_locking_context()
|
||||||
current_app.logger.info(f"THREAD LOCK: {ctx}")
|
|
||||||
return process_instance_id in ctx["locks"]
|
return process_instance_id in ctx["locks"]
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
|
Loading…
x
Reference in New Issue
Block a user