mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-23 13:18:35 +00:00
Spiffworkflow 1.2: Top Level Imports moved to appropriate modules - replace 'from SpiffWorkflow import WorkflowException' to 'from SpiffWorkflow.exceptions import WorkflowException' - replace 'from SpiffWorkflow import TaskState' to 'from SpiffWorkflow.task import TaskState' - replace 'from SpiffWorkflow import Task' to 'from SpiffWorkflow.task import Task' SpiffWorkflow 1.2: Navigation code removed completely. Proved to be of little use to folks, was super complex and difficult to maintain. SpiffWorkflow 1.2: When inserting custom functions into the PythonExecutionEngine - be aware that the task data will act as the full context for execution, and will contain global functions and methods during the exec call. SpiffWorkflow 1.2: All Task Specs now have a spec_type attribute, containing a descriptive string of the type, such as "User Task", "Script Task", "Start Event" etc...
137 lines
6.4 KiB
Python
137 lines
6.4 KiB
Python
from crc import app, session
|
|
from crc.api.common import ApiError
|
|
from crc.models.task_log import TaskLogModel, TaskLogLevels, TaskLogQuery, TaskLogModelSchema
|
|
from crc.models.workflow import WorkflowModel
|
|
from crc.services.user_service import UserService
|
|
|
|
from sqlalchemy import desc
|
|
|
|
import dateparser
|
|
import pytz
|
|
|
|
|
|
class TaskLoggingService(object):
|
|
"""Provides common tools for logging information from running workflows. This logging information
|
|
Can be useful in metrics, and as a means of showing what is happening over time. For this reason
|
|
we will record some of this data in the database, at least for now. We will also add this information
|
|
to our standard logs so all the details exist somewhere, even if the information in the database
|
|
is truncated or modified. """
|
|
|
|
@staticmethod
|
|
def add_log(task, level, code, message, study_id, workflow_id):
|
|
if level not in TaskLogLevels:
|
|
raise ApiError("invalid_logging_level", f"Please specify a valid log level. {TaskLogLevels}")
|
|
try:
|
|
user_uid = UserService.current_user().uid
|
|
except ApiError as e:
|
|
user_uid = "unknown"
|
|
if workflow_id:
|
|
workflow_spec_id = session.query(WorkflowModel.workflow_spec_id).\
|
|
filter(WorkflowModel.id == workflow_id).\
|
|
scalar()
|
|
else:
|
|
workflow_spec_id = None
|
|
log_message = f"Workflow Spec {workflow_spec_id}, study {study_id}, task {task.get_name()}, user {user_uid}: {message}"
|
|
app.logger.log(TaskLogLevels[level].value, log_message)
|
|
log_model = TaskLogModel(level=level,
|
|
code=code,
|
|
message=message,
|
|
study_id=study_id,
|
|
workflow_id=workflow_id,
|
|
task=task.get_name(),
|
|
user_uid=user_uid,
|
|
workflow_spec_id=workflow_spec_id)
|
|
session.add(log_model)
|
|
session.commit()
|
|
return log_model
|
|
|
|
def get_logs_for_workflow(self, workflow_id: int, level: str = None, code: str = None, size: int = None):
|
|
logs = self.get_logs(workflow_id=workflow_id, level=level, code=code, size=size)
|
|
return logs
|
|
|
|
def get_logs_for_study(self, study_id: int, level: str = None, code: str = None, size: int = None):
|
|
logs = self.get_logs(study_id=study_id, level=level, code=code, size=size)
|
|
return logs
|
|
|
|
@staticmethod
|
|
def get_logs(study_id: int = None, workflow_id: int = None, level: str = None, code: str = None, size: int = None):
|
|
"""We should almost always get a study_id or a workflow_id.
|
|
In *very* rare circumstances, an admin may want all the logs.
|
|
This could be a *lot* of logs."""
|
|
query = session.query(TaskLogModel)
|
|
if study_id:
|
|
query = query.filter(TaskLogModel.study_id == study_id)
|
|
if workflow_id:
|
|
query = query.filter(TaskLogModel.workflow_id == workflow_id)
|
|
if level:
|
|
query = query.filter(TaskLogModel.level == level)
|
|
if code:
|
|
query = query.filter(TaskLogModel.code == code)
|
|
if size:
|
|
query = query.limit(size)
|
|
logs = query.all()
|
|
return logs
|
|
|
|
@staticmethod
|
|
def get_logs_for_study_paginated(study_id, tq: TaskLogQuery):
|
|
""" Returns an updated TaskLogQuery, with items in reverse chronological order by default. """
|
|
query = session.query(TaskLogModel).filter(TaskLogModel.study_id == study_id)
|
|
return TaskLoggingService.__paginate(query, tq)
|
|
|
|
@staticmethod
|
|
def __paginate(sql_query, task_log_query: TaskLogQuery):
|
|
"""Updates the given sql_query with parameters from the task log query, executes it, then updates the
|
|
task_log_query with the results from the SQL Query"""
|
|
if not task_log_query:
|
|
task_log_query = TaskLogQuery()
|
|
if task_log_query.sort_column is None:
|
|
task_log_query.sort_column = "timestamp"
|
|
task_log_query.sort_reverse = True
|
|
if task_log_query.code:
|
|
sql_query = sql_query.filter(TaskLogModel.code.like(task_log_query.code + "%"))
|
|
if task_log_query.level:
|
|
sql_query = sql_query.filter(TaskLogModel.level.like(task_log_query.level + "%"))
|
|
if task_log_query.user:
|
|
sql_query = sql_query.filter(TaskLogModel.user_uid.like(task_log_query.user + "%"))
|
|
if task_log_query.sort_reverse:
|
|
sort_column = desc(task_log_query.sort_column)
|
|
else:
|
|
sort_column = task_log_query.sort_column
|
|
paginator = sql_query.order_by(sort_column).paginate(page=task_log_query.page + 1,
|
|
per_page=task_log_query.per_page,
|
|
error_out=False)
|
|
task_log_query.update_from_sqlalchemy_paginator(paginator)
|
|
return task_log_query
|
|
|
|
@staticmethod
|
|
def get_log_data_for_download(study_id):
|
|
# Admins can download the metrics logs for a study as an Excel file
|
|
# We only use a subset of the fields
|
|
logs = []
|
|
headers = []
|
|
result = session.query(TaskLogModel).\
|
|
filter(TaskLogModel.study_id == study_id).\
|
|
filter(TaskLogModel.level == 'metrics').\
|
|
all()
|
|
schemas = TaskLogModelSchema(many=True).dump(result)
|
|
# We only use these fields
|
|
fields = ['category', 'workflow', 'level', 'code', 'message', 'user_uid', 'timestamp', 'workflow_id', 'workflow_spec_id']
|
|
for schema in schemas:
|
|
# Build a dictionary using the items in fields
|
|
log = {}
|
|
for field in fields:
|
|
if field == 'timestamp':
|
|
# Excel doesn't accept timezones,
|
|
# so we return a local datetime without the timezone
|
|
# TODO: detect the local timezone with something like dateutil.tz.tzlocal()
|
|
parsed_timestamp = dateparser.parse(str(schema['timestamp']))
|
|
localtime = parsed_timestamp.astimezone(pytz.timezone('US/Eastern'))
|
|
log[field] = localtime.strftime('%Y-%m-%d %H:%M:%S')
|
|
else:
|
|
log[field] = schema[field]
|
|
if field.capitalize() not in headers:
|
|
headers.append(field.capitalize())
|
|
logs.append(log)
|
|
|
|
return logs, headers
|