1. Created a TaskLoggingService to encapsulate what we are doing with Task Logging through BPMN Script calls and API calls from the front end.

2. Added two api endpoints that will allow us to get all the logs for a specific workflow or study.
3. Assured that requests for logs are paginated, sortable, and can be limited to a specific code if needed.
4. Assure that we only use logging levels that match the log levels of Python.
This commit is contained in:
Dan 2022-01-10 13:16:54 -05:00
parent 3081956128
commit 962d05c875
11 changed files with 378 additions and 86 deletions

View File

@ -441,13 +441,36 @@ paths:
responses:
'200':
description: list of Study Associate Objects
/study/{study_id}/log:
parameters:
- name: study_id
in: path
required: true
description: The id of the study for which logs should be returned.
schema:
type: integer
format: int32
put:
operationId: crc.api.study.get_logs_for_study
summary: Provides a list of logged events that occured within a study
tags:
- Studies
requestBody:
description: Log Pagination Request
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/PaginatedTaskLog"
responses:
'200':
description: list of Study logs - events that have occured within a study.
content:
application/json:
schema:
type: array
items:
$ref: "#/components/schemas/StudyAssociate"
$ref: "#/components/schemas/TaskLog"
/workflow-specification:
get:
operationId: crc.api.workflow.all_specifications
@ -1219,7 +1242,34 @@ paths:
application/json:
schema:
$ref: "#/components/schemas/Workflow"
/workflow/{workflow_id}/log:
parameters:
- name: workflow_id
in: path
required: true
description: The id of the workflow for which logs should be returned.
schema:
type: integer
format: int32
put:
operationId: crc.api.workflow.get_logs_for_workflow
summary: Provides a paginated list of logged events that occured within a study,
tags:
- Workflows and Tasks
requestBody:
description: Log Pagination Request
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/PaginatedTaskLog"
responses:
'200':
description: list of logs - events that have occured within a specific workflow.
content:
application/json:
schema:
$ref: "#/components/schemas/PaginatedTaskLog"
/workflow/{workflow_id}/task/{task_id}/data:
parameters:
- name: workflow_id
@ -2023,6 +2073,51 @@ components:
value: "model.my_boolean_field_id && model.my_enum_field_value !== 'something'"
- id: "hide_expression"
value: "model.my_enum_field_value === 'something'"
PaginatedTaskLog:
properties:
code:
example: "email_sent"
type: string
page:
type: integer
example: 0
per_page:
type: integer
example: 10
sort_column:
type: string
example: "timestamp"
sort_reverse:
type: boolean
example: false
items:
type: array
items:
$ref: "#/components/schemas/TaskLog"
TaskLog:
properties:
level:
type: string
example: "info"
code:
example: "email_sent"
type: string
message:
example: "Approval email set to Jake in Accounting"
type: string
workflow_id:
example: 42
type: integer
study_id:
example: 187
type: integer
user_uid:
example: "dhf8r"
type: string
timestamp:
type: string
format: date_time
example: "2021-01-07T11:36:40.001Z"
TaskEvent:
properties:
workflow:

View File

@ -8,7 +8,9 @@ from crc.api.common import ApiError, ApiErrorSchema
from crc.models.protocol_builder import ProtocolBuilderStatus
from crc.models.study import Study, StudyEvent, StudyEventType, StudyModel, StudySchema, StudyForUpdateSchema, \
StudyStatus, StudyAssociatedSchema
from crc.models.task_log import TaskLogModelSchema, TaskLogQuery, TaskLogQuerySchema
from crc.services.study_service import StudyService
from crc.services.task_logging_service import TaskLoggingService
from crc.services.user_service import UserService
from crc.services.workflow_service import WorkflowService
@ -85,6 +87,12 @@ def get_study_associates(study_id):
return StudyAssociatedSchema(many=True).dump(StudyService.get_study_associates(study_id))
def get_logs_for_study(study_id, body):
task_log_query = TaskLogQuery(**body)
return TaskLogQuerySchema().dump(
TaskLoggingService.get_logs_for_study(study_id, task_log_query))
def delete_study(study_id):
try:
StudyService.delete_study(study_id)

View File

@ -8,12 +8,14 @@ from crc.models.api_models import WorkflowApiSchema
from crc.models.file import FileModel
from crc.models.study import StudyModel, WorkflowMetadata, StudyStatus
from crc.models.task_event import TaskEventModel, TaskEvent, TaskEventSchema
from crc.models.task_log import TaskLogModelSchema, TaskLogQuery, TaskLogQuerySchema
from crc.models.workflow import WorkflowModel, WorkflowSpecModelSchema, WorkflowSpecModel, WorkflowSpecCategoryModel, \
WorkflowSpecCategoryModelSchema, WorkflowLibraryModel, WorkflowLibraryModelSchema
from crc.services.error_service import ValidationErrorService
from crc.services.file_service import FileService
from crc.services.lookup_service import LookupService
from crc.services.study_service import StudyService
from crc.services.task_logging_service import TaskLoggingService
from crc.services.user_service import UserService
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.workflow_service import WorkflowService
@ -424,3 +426,9 @@ def _verify_user_and_role(processor, spiff_task):
raise ApiError.from_task("permission_denied",
f"This task must be completed by '{allowed_users}', "
f"but you are {user.uid}", spiff_task)
def get_logs_for_workflow(workflow_id, body):
task_log_query = TaskLogQuery(**body)
return TaskLogQuerySchema().dump(
TaskLoggingService.get_logs_for_workflow(workflow_id, task_log_query))

View File

@ -248,3 +248,4 @@ class WorkflowApiSchema(ma.Schema):
filtered_fields = {key: data[key] for key in keys}
filtered_fields['next_task'] = TaskSchema().make_task(data['next_task'])
return WorkflowApi(**filtered_fields)

View File

@ -1,15 +1,33 @@
import enum
import marshmallow
from crc import db, ma
from crc.models.study import StudyModel
from crc.models.workflow import WorkflowModel
from sqlalchemy import func
class MyEnumMeta(enum.EnumMeta):
def __contains__(cls, item):
return item in [v.name for v in cls.__members__.values()]
class TaskLogLevels(enum.Enum, metaclass=MyEnumMeta):
critical = 50
error = 40
warning = 30
info = 20
debug = 10
class TaskLogModel(db.Model):
__tablename__ = 'task_log'
id = db.Column(db.Integer, primary_key=True)
level = db.Column(db.String)
code = db.Column(db.String)
message = db.Column(db.String)
user_uid = db.Column(db.String)
study_id = db.Column(db.Integer, db.ForeignKey(StudyModel.id), nullable=False)
workflow_id = db.Column(db.Integer, db.ForeignKey(WorkflowModel.id), nullable=False)
task = db.Column(db.String)
@ -17,7 +35,38 @@ class TaskLogModel(db.Model):
class TaskLogModelSchema(ma.Schema):
class Meta:
model = TaskLogModel
fields = ["id", "level", "code", "message", "study_id", "workflow_id", "timestamp"]
fields = ["id", "level", "code", "message", "study_id", "workflow_id", "user_uid", "timestamp"]
class TaskLogQuery:
"""Encapsulates the paginated queries and results when retrieving and filtering task logs over the
API"""
def __init__(self, code=None, page=1, per_page=10, sort_column=None, sort_reverse=False, items=None):
self.code = code # Filter down to just this code.
self.page = page
self.per_page = per_page
self.sort_column = sort_column
self.sort_reverse = sort_reverse
self.items = items
self.pages = 0
self.has_next = False
self.has_prev = False
def update_from_sqlalchemy_paginator(self, paginator):
"""Updates this with results that are returned from the paginator"""
self.items = paginator.items
self.page = paginator.page
self.per_page = paginator.per_page
self.pages = paginator.pages
self.has_next = paginator.has_next
self.has_prev = paginator.has_prev
class TaskLogQuerySchema(ma.Schema):
class Meta:
model = TaskLogModel
fields = ["code", "page", "per_page", "sort_column", "reverse_sort", "items", "pages",
"has_next", "has_previous"]
items = marshmallow.fields.List(marshmallow.fields.Nested(TaskLogModelSchema))

View File

@ -1,6 +1,7 @@
from crc import session
from crc.models.task_log import TaskLogModel, TaskLogModelSchema
from crc.models.task_log import TaskLogModel, TaskLogModelSchema, TaskLogQuery
from crc.scripts.script import Script
from crc.services.task_logging_service import TaskLoggingService
class GetLogsByWorkflow(Script):
@ -8,6 +9,7 @@ class GetLogsByWorkflow(Script):
def get_description(self):
return """Script to retrieve logs for the current workflow.
Accepts an optional `code` argument that is used to filter the DB query.
Accepts an optional 'size' otherwise will return the most recent 10 records.
"""
def do_task_validate_only(self, task, study_id, workflow_id, *args, **kwargs):
@ -21,18 +23,16 @@ class GetLogsByWorkflow(Script):
def do_task(self, task, study_id, workflow_id, *args, **kwargs):
code = None
size = 10
if 'code' in kwargs:
code = kwargs['code']
elif len(args) > 0:
code = args[0]
if code is not None:
log_models = session.query(TaskLogModel).\
filter(TaskLogModel.code == code).\
filter(TaskLogModel.workflow_id == workflow_id).\
all()
else:
log_models = session.query(TaskLogModel). \
filter(TaskLogModel.workflow_id == workflow_id). \
all()
if 'size' in kwargs:
size = kwargs['size']
elif len(args) > 1:
size = args[1]
query = TaskLogQuery(code=code, per_page=size)
log_models = TaskLoggingService.get_logs_for_workflow(workflow_id, query).items
return TaskLogModelSchema(many=True).dump(log_models)

View File

@ -1,6 +1,7 @@
from crc import session
from crc.models.task_log import TaskLogModel, TaskLogModelSchema
from crc.models.task_log import TaskLogModel, TaskLogModelSchema, TaskLogQuery
from crc.scripts.script import Script
from crc.services.task_logging_service import TaskLoggingService
class GetLogsByWorkflow(Script):
@ -8,6 +9,7 @@ class GetLogsByWorkflow(Script):
def get_description(self):
return """Script to retrieve logs for the current study.
Accepts an optional `code` argument that is used to filter the DB query.
Accepts an optional 'size' otherwise will return the most recent 10 records.
"""
def do_task_validate_only(self, task, study_id, workflow_id, *args, **kwargs):
@ -21,18 +23,16 @@ class GetLogsByWorkflow(Script):
def do_task(self, task, study_id, workflow_id, *args, **kwargs):
code = None
size = 10
if 'code' in kwargs:
code = kwargs['code']
elif len(args) > 0:
code = args[0]
if code is not None:
log_models = session.query(TaskLogModel).\
filter(TaskLogModel.code == code).\
filter(TaskLogModel.study_id == study_id).\
all()
else:
log_models = session.query(TaskLogModel). \
filter(TaskLogModel.study_id == study_id). \
all()
if 'size' in kwargs:
size = kwargs['size']
elif len(args) > 1:
size = args[1]
query = TaskLogQuery(code=code, per_page=size)
log_models = TaskLoggingService.get_logs_for_study(study_id, query).items
return TaskLogModelSchema(many=True).dump(log_models)

View File

@ -1,7 +1,10 @@
from SpiffWorkflow.exceptions import WorkflowTaskExecException
from crc import session
from crc.api.common import ApiError
from crc.models.task_log import TaskLogModel, TaskLogModelSchema
from crc.models.task_log import TaskLogModel, TaskLogModelSchema, TaskLogLevels
from crc.scripts.script import Script
from crc.services.task_logging_service import TaskLoggingService
class TaskLog(Script):
@ -17,22 +20,8 @@ class TaskLog(Script):
Message is a more descriptive string, including any info you want to log.
"""
def do_task_validate_only(self, task, study_id, workflow_id, *args, **kwargs):
if len(args) == 3 or ('level' in kwargs and 'code' in kwargs and 'message' in kwargs):
log_model = TaskLogModel(level='info',
code='mocked_code',
message='This is my logging message',
study_id=study_id,
workflow_id=workflow_id,
task=task.get_name())
return TaskLogModelSchema().dump(log_model)
else:
raise ApiError.from_task(code='missing_arguments',
message='You must include a level, code, and message to log.',
task=task)
def do_task(self, task, study_id, workflow_id, *args, **kwargs):
def get_arguments(self, task, *args, **kwargs):
# Returns a level, code, and message from the given arguments, or raises an error.
if len(args) == 3 or ('level' in kwargs and 'code' in kwargs and 'message' in kwargs):
if 'level' in kwargs:
level = kwargs['level']
@ -46,19 +35,30 @@ class TaskLog(Script):
message = kwargs['message']
else:
message = args[2]
task_name = task.get_name()
log_model = TaskLogModel(level=level,
code=code,
message=message,
study_id=study_id,
workflow_id=workflow_id,
task=task_name)
session.add(log_model)
session.commit()
return TaskLogModelSchema().dump(log_model)
if level not in TaskLogLevels:
raise WorkflowTaskExecException(task, f'You must supply a valid log level, one of ({TaskLogLevels})'
f' when calling the log() script. You specified "{level}"')
return level, code, message
else:
raise ApiError.from_task(code='missing_arguments',
message='You must include a level, code, and message to log.',
task=task)
raise WorkflowTaskExecException(task, 'You must include a level, code, and message'
' when calling the log() script')
def do_task_validate_only(self, task, study_id, workflow_id, *args, **kwargs):
level, code, message = self.get_arguments(task, *args, **kwargs)
log_model = TaskLogModel(level=level,
code=code,
message=message,
study_id=study_id,
workflow_id=workflow_id,
task=task.get_name())
return TaskLogModelSchema().dump(log_model)
def do_task(self, task, study_id, workflow_id, *args, **kwargs):
level, code, message = self.get_arguments(task, *args, **kwargs)
log_model = TaskLoggingService.add_log(task, level, code, message, study_id, workflow_id)
return TaskLogModelSchema().dump(log_model)

View File

@ -0,0 +1,80 @@
import markdown
import re
from flask import render_template
from flask_mail import Message
from jinja2 import Template
from sqlalchemy import desc
from crc import app, db, mail, session
from crc.api.common import ApiError
from crc.models.email import EmailModel
from crc.models.file import FileDataModel
from crc.models.study import StudyModel
from crc.models.task_log import TaskLogModel, TaskLogLevels, TaskLogQuery
from crc.models.user import UserModel
from crc.services.jinja_service import JinjaService
from crc.services.user_service import UserService
class TaskLoggingService(object):
"""Provides common tools for logging information from running workflows. This logging information
Can be useful in metrics, and as a means of showing what is happening over time. For this reason
we will record some of this data in the database, at least for now. We will also add this information
to our standard logs so all the details exist somewhere, even if the information in the database
is truncated or modified. """
@staticmethod
def add_log(task, level, code, message, study_id, workflow_id):
if level not in TaskLogLevels:
raise ApiError("invalid_logging_level", f"Please specify a valid log level. {TaskLogLevels}")
try:
user_uid = UserService.current_user().uid
except ApiError as e:
user_uid = "unknown"
log_message = f"Workflow {workflow_id}, study {study_id}, task {task.get_name()}, user {user_uid}: {message}"
app.logger.log(TaskLogLevels[level].value, log_message)
log_model = TaskLogModel(level=level,
code=code,
user_uid=user_uid,
message=message,
study_id=study_id,
workflow_id=workflow_id,
task=task.get_name())
session.add(log_model)
session.commit()
return log_model
@staticmethod
def get_logs_for_workflow(workflow_id, tq: TaskLogQuery):
""" Returns an updated TaskLogQuery, with items in reverse chronological order by default. """
query = session.query(TaskLogModel).filter(TaskLogModel.workflow_id == workflow_id)
return TaskLoggingService.__paginate(query, tq)
@staticmethod
def get_logs_for_study(study_id, tq: TaskLogQuery):
""" Returns an updated TaskLogQuery, with items in reverse chronological order by default. """
query = session.query(TaskLogModel).filter(TaskLogModel.study_id == study_id)
return TaskLoggingService.__paginate(query, tq)
@staticmethod
def __paginate(sql_query, task_log_query: TaskLogQuery):
"""Updates the given sql_query with parameters from the task log query, executes it, then updates the
task_log_query with the results from the SQL Query"""
if not task_log_query:
task_log_query = TaskLogQuery()
if task_log_query.sort_column is None:
task_log_query.sort_column = "timestamp"
task_log_query.sort_reverse = True
if task_log_query.code is not None:
sql_query = sql_query.filter(TaskLogModel.code == task_log_query.code)
if task_log_query.sort_reverse:
sort_column = desc(task_log_query.sort_column)
else:
sort_column = task_log_query.sort_column
paginator = sql_query.order_by(sort_column).paginate(task_log_query.page, task_log_query.per_page,
error_out=False)
task_log_query.update_from_sqlalchemy_paginator(paginator)
return task_log_query

View File

@ -8,7 +8,7 @@
<bpmn:incoming>Flow_0pc42yp</bpmn:incoming>
<bpmn:outgoing>Flow_0n34cdi</bpmn:outgoing>
<bpmn:script>log_model_info = log(level='info', code='test_code', message='You forgot to include the correct data.')
log_model_debug = log(level='degug', code='debug_test_code', message='This is my debugging message')</bpmn:script>
log_model_debug = log(level='debug', code='debug_test_code', message='This is my debugging message')</bpmn:script>
</bpmn:scriptTask>
<bpmn:manualTask id="Activity_DisplayLog" name="DisplayLog">
<bpmn:documentation># Logging Models Pre

View File

@ -1,10 +1,15 @@
import json
from tests.base_test import BaseTest
from crc import session
from crc.models.api_models import Task
from crc.models.task_log import TaskLogModel
from crc.models.user import UserModel
from crc import session, WorkflowService
from crc.models.api_models import Task, TaskSchema
from crc.models.task_log import TaskLogModel, TaskLogModelSchema, TaskLogQuery, TaskLogQuerySchema
from crc.models.study import StudyModel
from crc.scripts.log import TaskLog
from crc.services.workflow_processor import WorkflowProcessor
from crc.services.task_logging_service import TaskLoggingService
import types
@ -51,34 +56,16 @@ class TestTaskLogging(BaseTest):
def test_get_logs_for_study(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow = self.create_workflow('hello_world', study=study)
workflow_api = self.get_workflow_api(workflow)
task = workflow_api.next_task
processor = WorkflowProcessor(workflow)
task = processor.next_task()
task_model = Task(id=task.id,
name=task.name,
title=task.title,
type=task.type,
state=task.state,
lane=task.lane,
form=task.form,
documentation=task.documentation,
data=task.data,
multi_instance_type=task.multi_instance_type,
multi_instance_count=task.multi_instance_count,
multi_instance_index=task.multi_instance_index,
process_name=task.process_name,
properties=task.properties)
task_model.get_name = types.MethodType(lambda x: x.name, task_model)
TaskLog().do_task(task_model, study.id, workflow.id,
TaskLog().do_task(task, study.id, workflow.id,
level='critical',
code='critical_code',
message='This is my critical message.')
TaskLog().do_task(task_model, study.id, workflow.id,
TaskLog().do_task(task, study.id, workflow.id,
level='debug',
code='debug_code',
message='This is my debug message.')
@ -90,10 +77,74 @@ class TestTaskLogging(BaseTest):
# log('debug', 'debug_code', f'This message has a { some_text }!')
workflow = self.create_workflow('get_logging_for_study', study=study)
workflow_api = self.get_workflow_api(workflow)
task = workflow_api.next_task
workflow_api = self.complete_form(workflow, task, {})
task = workflow_api.next_task
workflow_logs = task.data['workflow_logs']
study_logs = task.data['study_logs']
task_api = workflow_api.next_task
workflow_api = self.complete_form(workflow, task_api, {})
task_api = workflow_api.next_task
workflow_logs = task_api.data['workflow_logs']
study_logs = task_api.data['study_logs']
self.assertEqual(3, len(workflow_logs))
self.assertEqual(5, len(study_logs))
def test_logging_api(self):
workflow = self.create_workflow('logging_task')
workflow_api = self.get_workflow_api(workflow)
task = workflow_api.next_task
user = session.query(UserModel).filter_by(uid=self.test_uid).first()
url = f'/v1.0/study/{workflow.study_id}/log'
task_log_query = TaskLogQuery()
rv = self.app.put(url, headers=self.logged_in_headers(user), content_type="application/json",
data=TaskLogQuerySchema().dump(task_log_query))
self.assert_success(rv)
log_query = json.loads(rv.get_data(as_text=True))
logs = log_query['items']
self.assertEqual(1, len(logs))
self.assertEqual(workflow.id, logs[0]['workflow_id'])
self.assertEqual(workflow.study_id, logs[0]['study_id'])
self.assertEqual('info', logs[0]['level'])
self.assertEqual(self.test_uid, logs[0]['user_uid'])
self.assertEqual('You forgot to include the correct data.', logs[0]['message'])
url = f'/v1.0/workflow/{workflow.id}/log'
rv = self.app.put(url, headers=self.logged_in_headers(user), content_type="application/json",
data=TaskLogQuerySchema().dump(task_log_query))
self.assert_success(rv)
wf_logs = json.loads(rv.get_data(as_text=True))['items']
self.assertEqual(wf_logs, logs, "Logs returned for the workflow should be identical to those returned from study")
def test_logging_service_paginates_and_sorts(self):
self.load_example_data()
study = session.query(StudyModel).first()
workflow_model = self.create_workflow('hello_world', study=study)
workflow_processor = WorkflowProcessor(workflow_model)
task = workflow_processor.next_task()
for i in range(0, 10):
TaskLog().do_task(task, study.id, workflow_model.id, level='critical', code='critical_code',
message=f'This is my critical message # {i}.')
TaskLog().do_task(task, study.id, workflow_model.id, level='debug', code='debug_code',
message=f'This is my debug message # {i}.')
TaskLog().do_task(task, study.id, workflow_model.id, level='error', code='debug_code',
message=f'This is my error message # {i}.')
TaskLog().do_task(task, study.id, workflow_model.id, level='info', code='debug_code',
message=f'This is my info message # {i}.')
results = TaskLoggingService.get_logs_for_study(study.id, TaskLogQuery(per_page=100))
self.assertEqual(40, len(results.items), "There should be 40 logs total")
logs = TaskLoggingService.get_logs_for_study(study.id, TaskLogQuery(per_page=5))
self.assertEqual(5, len(logs.items), "I can limit results to 5")
self.assertEqual(1, logs.page)
self.assertEqual(8, logs.pages)
self.assertEqual(5, logs.per_page)
self.assertEqual(True, logs.has_next)
self.assertEqual(False, logs.has_prev)
logs = TaskLoggingService.get_logs_for_study(study.id, TaskLogQuery(per_page=5, sort_column="level"))
for i in range(0, 5):
self.assertEqual('critical', logs.items[i].level, "It is possible to sort on a column")
logs = TaskLoggingService.get_logs_for_study(study.id, TaskLogQuery(per_page=5, sort_column="level", sort_reverse=True))
for i in range(0, 5):
self.assertEqual('info', logs.items[i].level, "It is possible to sort on a column")