Adds endpoint to get workflow stats. Adds a test for the endpoint.
This commit is contained in:
parent
1119bb2b6c
commit
bbfe9291e0
21
crc/api.yml
21
crc/api.yml
|
@ -500,6 +500,27 @@ paths:
|
|||
responses:
|
||||
'204':
|
||||
description: The workflow was removed
|
||||
/workflow/{workflow_id}/stats:
|
||||
parameters:
|
||||
- name: workflow_id
|
||||
in: path
|
||||
required: true
|
||||
description: The id of the workflow
|
||||
schema:
|
||||
type: integer
|
||||
format: int32
|
||||
get:
|
||||
operationId: crc.api.stats.get_workflow_stats
|
||||
summary: Stats about a given workflow
|
||||
tags:
|
||||
- Workflows and Tasks
|
||||
responses:
|
||||
'200':
|
||||
description: Returns counts of complete, incomplete, and total number of tasks
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/Workflow"
|
||||
# /v1.0/workflow/0/task/0
|
||||
/workflow/{workflow_id}/task/{task_id}:
|
||||
parameters:
|
||||
|
|
|
@ -0,0 +1,57 @@
|
|||
from datetime import datetime
|
||||
|
||||
from flask import g
|
||||
|
||||
from crc import session
|
||||
from crc.models.stats import WorkflowStatsModel, WorkflowStatsModelSchema, TaskEventModel, TaskEventModelSchema
|
||||
|
||||
|
||||
def get_workflow_stats(workflow_id):
|
||||
schema = WorkflowStatsModelSchema()
|
||||
workflow_model = session.query(WorkflowStatsModel).filter_by(id=workflow_id).first()
|
||||
return schema.dump(workflow_model)
|
||||
|
||||
|
||||
def update_workflow_stats(workflow_model, workflow_api_model):
|
||||
stats = session.query(WorkflowStatsModel) \
|
||||
.filter_by(study_id=workflow_model.study_id) \
|
||||
.filter_by(workflow_id=workflow_model.id) \
|
||||
.filter_by(workflow_spec_id=workflow_model.workflow_spec_id) \
|
||||
.filter_by(spec_version=workflow_model.spec_version) \
|
||||
.first()
|
||||
|
||||
if stats is None:
|
||||
stats = WorkflowStatsModel(
|
||||
study_id=workflow_model.study_id,
|
||||
workflow_id=workflow_model.id,
|
||||
workflow_spec_id=workflow_model.workflow_spec_id,
|
||||
spec_version=workflow_model.spec_version,
|
||||
)
|
||||
|
||||
complete_states = ['CANCELLED', 'COMPLETED']
|
||||
incomplete_states = ['MAYBE', 'LIKELY', 'FUTURE', 'WAITING', 'READY']
|
||||
tasks = list(workflow_api_model.user_tasks)
|
||||
stats.num_tasks_total = len(tasks)
|
||||
stats.num_tasks_complete = sum(1 for t in tasks if t.state in complete_states)
|
||||
stats.num_tasks_incomplete = sum(1 for t in tasks if t.state in incomplete_states)
|
||||
stats.last_updated = datetime.now()
|
||||
|
||||
session.add(stats)
|
||||
session.commit()
|
||||
return WorkflowStatsModelSchema().dump(stats)
|
||||
|
||||
|
||||
def log_task_complete(workflow_model, task_id):
|
||||
task_event = TaskEventModel(
|
||||
study_id=workflow_model.study_id,
|
||||
user_uid=g.user.uid,
|
||||
workflow_id=workflow_model.id,
|
||||
workflow_spec_id=workflow_model.workflow_spec_id,
|
||||
spec_version=workflow_model.spec_version,
|
||||
task_id=task_id,
|
||||
task_state='COMPLETE',
|
||||
date=datetime.now(),
|
||||
)
|
||||
session.add(task_event)
|
||||
session.commit()
|
||||
return TaskEventModelSchema().dump(task_event)
|
|
@ -1,15 +1,13 @@
|
|||
import uuid
|
||||
from datetime import datetime
|
||||
from flask import g
|
||||
|
||||
from crc.api.file import delete_file
|
||||
from api.stats import update_workflow_stats, log_task_complete
|
||||
from crc import session
|
||||
from crc.api.common import ApiError, ApiErrorSchema
|
||||
from crc.api.file import delete_file
|
||||
from crc.models.api_models import Task, WorkflowApi, WorkflowApiSchema
|
||||
from crc.models.file import FileModel
|
||||
from crc.models.workflow import WorkflowModel, WorkflowSpecModelSchema, WorkflowSpecModel
|
||||
from crc.services.workflow_processor import WorkflowProcessor
|
||||
from crc.models.file import FileModel
|
||||
from crc.models.stats import WorkflowStatsModel, WorkflowStatsModelSchema, TaskEventModel, TaskEventModelSchema
|
||||
|
||||
|
||||
def all_specifications():
|
||||
|
@ -102,22 +100,6 @@ def get_task(workflow_id, task_id):
|
|||
return workflow.bpmn_workflow().get_task(task_id)
|
||||
|
||||
|
||||
def log_task_complete(workflow_model, task_id):
|
||||
task_event = TaskEventModel(
|
||||
study_id=workflow_model.study_id,
|
||||
user_uid=g.user.uid,
|
||||
workflow_id=workflow_model.id,
|
||||
workflow_spec_id=workflow_model.workflow_spec_id,
|
||||
spec_version=workflow_model.spec_version,
|
||||
task_id=task_id,
|
||||
task_state='COMPLETE',
|
||||
date=datetime.now(),
|
||||
)
|
||||
session.add(task_event)
|
||||
session.commit()
|
||||
return TaskEventModelSchema().dump(task_event)
|
||||
|
||||
|
||||
def update_task(workflow_id, task_id, body):
|
||||
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
||||
processor = WorkflowProcessor(workflow_model)
|
||||
|
@ -131,39 +113,7 @@ def update_task(workflow_id, task_id, body):
|
|||
session.add(workflow_model)
|
||||
session.commit()
|
||||
|
||||
workflow_api_model =__get_workflow_api_model(processor)
|
||||
workflow_api_model = __get_workflow_api_model(processor)
|
||||
update_workflow_stats(workflow_model, workflow_api_model)
|
||||
log_task_complete(workflow_model, task_id)
|
||||
return WorkflowApiSchema().dump(workflow_api_model)
|
||||
|
||||
|
||||
def update_workflow_stats(workflow_model, workflow_api_model):
|
||||
stats = session.query(WorkflowStatsModel)\
|
||||
.filter_by(study_id=workflow_model.study_id)\
|
||||
.filter_by(workflow_id=workflow_model.id)\
|
||||
.filter_by(workflow_spec_id=workflow_model.workflow_spec_id)\
|
||||
.filter_by(spec_version=workflow_model.spec_version)\
|
||||
.first()
|
||||
|
||||
if stats is None:
|
||||
stats = WorkflowStatsModel(
|
||||
study_id=workflow_model.study_id,
|
||||
workflow_id=workflow_model.id,
|
||||
workflow_spec_id=workflow_model.workflow_spec_id,
|
||||
spec_version=workflow_model.spec_version,
|
||||
)
|
||||
|
||||
complete_states = ['CANCELLED', 'COMPLETED']
|
||||
incomplete_states = ['MAYBE', 'LIKELY', 'FUTURE', 'WAITING', 'READY']
|
||||
tasks = list(workflow_api_model.user_tasks)
|
||||
stats.num_tasks_total = len(tasks)
|
||||
stats.num_tasks_complete = sum(1 for t in tasks if t.state in complete_states)
|
||||
stats.num_tasks_incomplete = sum(1 for t in tasks if t.state in incomplete_states)
|
||||
stats.last_updated = datetime.now()
|
||||
|
||||
session.add(stats)
|
||||
session.commit()
|
||||
return WorkflowStatsModelSchema().dump(stats)
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -4,10 +4,10 @@ import os
|
|||
from crc import session, app
|
||||
from crc.models.api_models import WorkflowApiSchema, Task
|
||||
from crc.models.file import FileModelSchema
|
||||
from crc.models.stats import WorkflowStatsModel, TaskEventModel
|
||||
from crc.models.study import StudyModel
|
||||
from crc.models.workflow import WorkflowSpecModelSchema, WorkflowModel, WorkflowStatus
|
||||
from crc.services.workflow_processor import WorkflowProcessor
|
||||
from crc.models.stats import WorkflowStatsModel, TaskEventModel
|
||||
from tests.base_test import BaseTest
|
||||
|
||||
|
||||
|
@ -42,15 +42,15 @@ class TestTasksApi(BaseTest):
|
|||
self.assert_success(rv)
|
||||
json_data = json.loads(rv.get_data(as_text=True))
|
||||
|
||||
num_stats = session.query(WorkflowStatsModel)\
|
||||
.filter_by(workflow_id=workflow.id)\
|
||||
.filter_by(workflow_spec_id=workflow.workflow_spec_id)\
|
||||
num_stats = session.query(WorkflowStatsModel) \
|
||||
.filter_by(workflow_id=workflow.id) \
|
||||
.filter_by(workflow_spec_id=workflow.workflow_spec_id) \
|
||||
.count()
|
||||
self.assertGreater(num_stats, 0)
|
||||
|
||||
num_task_events = session.query(TaskEventModel)\
|
||||
.filter_by(workflow_id=workflow.id)\
|
||||
.filter_by(task_id=task.id)\
|
||||
num_task_events = session.query(TaskEventModel) \
|
||||
.filter_by(workflow_id=workflow.id) \
|
||||
.filter_by(task_id=task.id) \
|
||||
.count()
|
||||
self.assertGreater(num_task_events, 0)
|
||||
|
||||
|
@ -240,3 +240,30 @@ class TestTasksApi(BaseTest):
|
|||
workflow_api = self.get_workflow_api(workflow, hard_reset=True)
|
||||
self.assertTrue(workflow_api.spec_version.startswith("v2 "))
|
||||
self.assertTrue(workflow_api.is_latest_spec)
|
||||
|
||||
def test_get_workflow_stats(self):
|
||||
self.load_example_data()
|
||||
workflow = self.create_workflow('exclusive_gateway')
|
||||
|
||||
response_before = self.app.get('/v1.0/workflow/%i/stats' % workflow.id,
|
||||
content_type="application/json",
|
||||
headers=self.logged_in_headers())
|
||||
self.assert_success(response_before)
|
||||
db_stats_before = session.query(WorkflowStatsModel).filter_by(workflow_id=workflow.id).first()
|
||||
self.assertIsNone(db_stats_before)
|
||||
|
||||
# Start the workflow.
|
||||
tasks = self.get_workflow_api(workflow).user_tasks
|
||||
self.complete_form(workflow, tasks[0], {"has_bananas": True})
|
||||
|
||||
response_after = self.app.get('/v1.0/workflow/%i/stats' % workflow.id,
|
||||
content_type="application/json",
|
||||
headers=self.logged_in_headers())
|
||||
self.assert_success(response_after)
|
||||
db_stats_after = session.query(WorkflowStatsModel).filter_by(workflow_id=workflow.id).first()
|
||||
self.assertIsNotNone(db_stats_after)
|
||||
self.assertGreater(db_stats_after.num_tasks_complete, 0)
|
||||
self.assertGreater(db_stats_after.num_tasks_incomplete, 0)
|
||||
self.assertGreater(db_stats_after.num_tasks_total, 0)
|
||||
self.assertEqual(db_stats_after.num_tasks_total,
|
||||
db_stats_after.num_tasks_complete + db_stats_after.num_tasks_incomplete)
|
||||
|
|
Loading…
Reference in New Issue