mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-23 05:08:32 +00:00
Added a File class, that we wrap around the FileModel so the api endpoints don't change, but File no longer holds refences to versions or dates of the file_data model, we figure this out based on a clean database structure. The ApprovalFile is directly related to the file_data_model - so no chance that a reviewer would review the incorrect version of a file.py Noticed that our FileType enum called "bpmn" "bpmm", hope this doesn't screw someone up. Workflows are directly related to the data_models that create the workflow spec it needs. So the files should always be there. There are no more hashes, and thus no more hash errors where it can't find the files to rebuild the workflow.py Not much to report here, other than I broke every single test in the system at one point. So I'm super concerned about this, and will be testing it a lot before creating the pull request.
244 lines
9.8 KiB
Python
244 lines
9.8 KiB
Python
import uuid
|
|
|
|
from crc import session
|
|
from crc.api.common import ApiError, ApiErrorSchema
|
|
from crc.models.api_models import WorkflowApi, WorkflowApiSchema, NavigationItem, NavigationItemSchema
|
|
from crc.models.file import FileModel, LookupDataSchema
|
|
from crc.models.stats import TaskEventModel
|
|
from crc.models.workflow import WorkflowModel, WorkflowSpecModelSchema, WorkflowSpecModel, WorkflowSpecCategoryModel, \
|
|
WorkflowSpecCategoryModelSchema
|
|
from crc.services.file_service import FileService
|
|
from crc.services.lookup_service import LookupService
|
|
from crc.services.study_service import StudyService
|
|
from crc.services.workflow_processor import WorkflowProcessor
|
|
from crc.services.workflow_service import WorkflowService
|
|
|
|
|
|
def all_specifications():
|
|
schema = WorkflowSpecModelSchema(many=True)
|
|
return schema.dump(session.query(WorkflowSpecModel).all())
|
|
|
|
|
|
def add_workflow_specification(body):
|
|
new_spec: WorkflowSpecModel = WorkflowSpecModelSchema().load(body, session=session)
|
|
session.add(new_spec)
|
|
session.commit()
|
|
return WorkflowSpecModelSchema().dump(new_spec)
|
|
|
|
|
|
def get_workflow_specification(spec_id):
|
|
if spec_id is None:
|
|
raise ApiError('unknown_spec', 'Please provide a valid Workflow Specification ID.')
|
|
|
|
spec: WorkflowSpecModel = session.query(WorkflowSpecModel).filter_by(id=spec_id).first()
|
|
|
|
if spec is None:
|
|
raise ApiError('unknown_spec', 'The Workflow Specification "' + spec_id + '" is not recognized.')
|
|
|
|
return WorkflowSpecModelSchema().dump(spec)
|
|
|
|
|
|
def validate_workflow_specification(spec_id):
|
|
|
|
errors = []
|
|
try:
|
|
WorkflowService.test_spec(spec_id)
|
|
except ApiError as ae:
|
|
errors.append(ae)
|
|
return ApiErrorSchema(many=True).dump(errors)
|
|
|
|
|
|
|
|
def update_workflow_specification(spec_id, body):
|
|
if spec_id is None:
|
|
raise ApiError('unknown_spec', 'Please provide a valid Workflow Spec ID.')
|
|
spec = session.query(WorkflowSpecModel).filter_by(id=spec_id).first()
|
|
|
|
if spec is None:
|
|
raise ApiError('unknown_study', 'The spec "' + spec_id + '" is not recognized.')
|
|
|
|
schema = WorkflowSpecModelSchema()
|
|
spec = schema.load(body, session=session, instance=spec, partial=True)
|
|
session.add(spec)
|
|
session.commit()
|
|
return schema.dump(spec)
|
|
|
|
|
|
def delete_workflow_specification(spec_id):
|
|
if spec_id is None:
|
|
raise ApiError('unknown_spec', 'Please provide a valid Workflow Specification ID.')
|
|
|
|
spec: WorkflowSpecModel = session.query(WorkflowSpecModel).filter_by(id=spec_id).first()
|
|
|
|
if spec is None:
|
|
raise ApiError('unknown_spec', 'The Workflow Specification "' + spec_id + '" is not recognized.')
|
|
|
|
# Delete all items in the database related to the deleted workflow spec.
|
|
files = session.query(FileModel).filter_by(workflow_spec_id=spec_id).all()
|
|
for file in files:
|
|
FileService.delete_file(file.id)
|
|
|
|
session.query(TaskEventModel).filter(TaskEventModel.workflow_spec_id == spec_id).delete()
|
|
|
|
# Delete all stats and workflow models related to this specification
|
|
for workflow in session.query(WorkflowModel).filter_by(workflow_spec_id=spec_id):
|
|
StudyService.delete_workflow(workflow)
|
|
session.query(WorkflowSpecModel).filter_by(id=spec_id).delete()
|
|
session.commit()
|
|
|
|
|
|
def __get_workflow_api_model(processor: WorkflowProcessor, next_task = None):
|
|
"""Returns an API model representing the state of the current workflow, if requested, and
|
|
possible, next_task is set to the current_task."""
|
|
|
|
nav_dict = processor.bpmn_workflow.get_nav_list()
|
|
navigation = []
|
|
for nav_item in nav_dict:
|
|
spiff_task = processor.bpmn_workflow.get_task(nav_item['task_id'])
|
|
if 'description' in nav_item:
|
|
nav_item['title'] = nav_item.pop('description')
|
|
# fixme: duplicate code from the workflow_service. Should only do this in one place.
|
|
if ' ' in nav_item['title']:
|
|
nav_item['title'] = nav_item['title'].partition(' ')[2]
|
|
else:
|
|
nav_item['title'] = ""
|
|
if spiff_task:
|
|
nav_item['task'] = WorkflowService.spiff_task_to_api_task(spiff_task, add_docs_and_forms=False)
|
|
nav_item['title'] = nav_item['task'].title # Prefer the task title.
|
|
else:
|
|
nav_item['task'] = None
|
|
if not 'is_decision' in nav_item:
|
|
nav_item['is_decision'] = False
|
|
|
|
navigation.append(NavigationItem(**nav_item))
|
|
NavigationItemSchema().dump(nav_item)
|
|
workflow_api = WorkflowApi(
|
|
id=processor.get_workflow_id(),
|
|
status=processor.get_status(),
|
|
next_task=None,
|
|
navigation=navigation,
|
|
workflow_spec_id=processor.workflow_spec_id,
|
|
spec_version=processor.get_version_string(),
|
|
is_latest_spec=processor.is_latest_spec,
|
|
total_tasks=processor.workflow_model.total_tasks,
|
|
completed_tasks=processor.workflow_model.completed_tasks,
|
|
last_updated=processor.workflow_model.last_updated
|
|
)
|
|
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
|
|
# This may or may not work, sometimes there is no next task to complete.
|
|
next_task = processor.next_task()
|
|
if next_task:
|
|
workflow_api.next_task = WorkflowService.spiff_task_to_api_task(next_task, add_docs_and_forms=True)
|
|
|
|
return workflow_api
|
|
|
|
|
|
def get_workflow(workflow_id, soft_reset=False, hard_reset=False):
|
|
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
|
processor = WorkflowProcessor(workflow_model, soft_reset=soft_reset, hard_reset=hard_reset)
|
|
workflow_api_model = __get_workflow_api_model(processor)
|
|
return WorkflowApiSchema().dump(workflow_api_model)
|
|
|
|
|
|
def delete_workflow(workflow_id):
|
|
StudyService.delete_workflow(workflow_id)
|
|
|
|
|
|
def set_current_task(workflow_id, task_id):
|
|
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
|
processor = WorkflowProcessor(workflow_model)
|
|
task_id = uuid.UUID(task_id)
|
|
task = processor.bpmn_workflow.get_task(task_id)
|
|
if task.state != task.COMPLETED and task.state != task.READY:
|
|
raise ApiError("invalid_state", "You may not move the token to a task who's state is not "
|
|
"currently set to COMPLETE or READY.")
|
|
|
|
# Only reset the token if the task doesn't already have it.
|
|
if task.state == task.COMPLETED:
|
|
task.reset_token(reset_data=False) # we could optionally clear the previous data.
|
|
processor.save()
|
|
WorkflowService.log_task_action(processor, task, WorkflowService.TASK_ACTION_TOKEN_RESET)
|
|
workflow_api_model = __get_workflow_api_model(processor, task)
|
|
return WorkflowApiSchema().dump(workflow_api_model)
|
|
|
|
|
|
def update_task(workflow_id, task_id, body):
|
|
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
|
processor = WorkflowProcessor(workflow_model)
|
|
task_id = uuid.UUID(task_id)
|
|
task = processor.bpmn_workflow.get_task(task_id)
|
|
if task.state != task.READY:
|
|
raise ApiError("invalid_state", "You may not update a task unless it is in the READY state. "
|
|
"Consider calling a token reset to make this task Ready.")
|
|
task.update_data(body)
|
|
processor.complete_task(task)
|
|
processor.do_engine_steps()
|
|
processor.save()
|
|
WorkflowService.log_task_action(processor, task, WorkflowService.TASK_ACTION_COMPLETE)
|
|
|
|
workflow_api_model = __get_workflow_api_model(processor)
|
|
return WorkflowApiSchema().dump(workflow_api_model)
|
|
|
|
|
|
def list_workflow_spec_categories():
|
|
schema = WorkflowSpecCategoryModelSchema(many=True)
|
|
return schema.dump(session.query(WorkflowSpecCategoryModel).all())
|
|
|
|
|
|
def get_workflow_spec_category(cat_id):
|
|
schema = WorkflowSpecCategoryModelSchema()
|
|
return schema.dump(session.query(WorkflowSpecCategoryModel).filter_by(id=cat_id).first())
|
|
|
|
|
|
def add_workflow_spec_category(body):
|
|
schema = WorkflowSpecCategoryModelSchema()
|
|
new_cat: WorkflowSpecCategoryModel = schema.load(body, session=session)
|
|
session.add(new_cat)
|
|
session.commit()
|
|
return schema.dump(new_cat)
|
|
|
|
|
|
def update_workflow_spec_category(cat_id, body):
|
|
if cat_id is None:
|
|
raise ApiError('unknown_category', 'Please provide a valid Workflow Spec Category ID.')
|
|
|
|
category = session.query(WorkflowSpecCategoryModel).filter_by(id=cat_id).first()
|
|
|
|
if category is None:
|
|
raise ApiError('unknown_category', 'The category "' + cat_id + '" is not recognized.')
|
|
|
|
schema = WorkflowSpecCategoryModelSchema()
|
|
category = schema.load(body, session=session, instance=category, partial=True)
|
|
session.add(category)
|
|
session.commit()
|
|
return schema.dump(category)
|
|
|
|
|
|
def delete_workflow_spec_category(cat_id):
|
|
session.query(WorkflowSpecCategoryModel).filter_by(id=cat_id).delete()
|
|
session.commit()
|
|
|
|
|
|
def lookup(workflow_id, task_id, field_id, query, limit):
|
|
"""
|
|
given a field in a task, attempts to find the lookup table or function associated
|
|
with that field and runs a full-text query against it to locate the values and
|
|
labels that would be returned to a type-ahead box.
|
|
"""
|
|
workflow_model = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
|
if not workflow_model:
|
|
raise ApiError("unknown_workflow", "No workflow found with id: %i" % workflow_id)
|
|
processor = WorkflowProcessor(workflow_model)
|
|
task_id = uuid.UUID(task_id)
|
|
spiff_task = processor.bpmn_workflow.get_task(task_id)
|
|
if not spiff_task:
|
|
raise ApiError("unknown_task", "No task with %s found in workflow: %i" % (task_id, workflow_id))
|
|
field = None
|
|
for f in spiff_task.task_spec.form.fields:
|
|
if f.id == field_id:
|
|
field = f
|
|
if not field:
|
|
raise ApiError("unknown_field", "No field named %s in task %s" % (task_id, spiff_task.task_spec.name))
|
|
|
|
lookup_data = LookupService.lookup(spiff_task, field, query, limit)
|
|
return LookupDataSchema(many=True).dump(lookup_data) |