mirror of
https://github.com/sartography/cr-connect-workflow.git
synced 2025-02-23 05:08:32 +00:00
1. defer the loading of the bpmn_json when loading the workflow model. It can be stupidly larger. (see models/workflow.py)
2. Shave a little more time off the reset by not re-creating the whole WorkflowProcessor cleaning up all the firsttime/sincetime statements.
This commit is contained in:
parent
82a90846fc
commit
dde8873c9e
@ -213,20 +213,14 @@ def get_workflow(workflow_id, do_engine_steps=True):
|
||||
def restart_workflow(workflow_id, clear_data=False, delete_files=False):
|
||||
"""Restart a workflow with the latest spec.
|
||||
Clear data allows user to restart the workflow without previous data."""
|
||||
lasttime = firsttime()
|
||||
workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first()
|
||||
lasttime = sincetime('load workflow model', lasttime)
|
||||
processor = WorkflowProcessor.reset(workflow_model, clear_data=clear_data, delete_files=delete_files)
|
||||
lasttime = sincetime('reset model', lasttime)
|
||||
processor.do_engine_steps()
|
||||
lasttime = sincetime('do steps', lasttime)
|
||||
processor.save()
|
||||
lasttime = sincetime('save', lasttime)
|
||||
WorkflowService.update_task_assignments(processor)
|
||||
lasttime = sincetime('update assignments', lasttime)
|
||||
workflow_api_model = WorkflowService.processor_to_workflow_api(processor)
|
||||
lasttime = sincetime('create api model', lasttime)
|
||||
return WorkflowApiSchema().dump(workflow_api_model)
|
||||
api_model = WorkflowApiSchema().dump(workflow_api_model)
|
||||
return api_model
|
||||
|
||||
|
||||
def get_task_events(action = None, workflow = None, study = None):
|
||||
|
@ -3,6 +3,7 @@ import enum
|
||||
import marshmallow
|
||||
from marshmallow import EXCLUDE, post_load, fields, INCLUDE
|
||||
from sqlalchemy import func
|
||||
from sqlalchemy.orm import deferred
|
||||
|
||||
from crc import db, ma
|
||||
|
||||
@ -104,10 +105,10 @@ class WorkflowStatus(enum.Enum):
|
||||
class WorkflowModel(db.Model):
|
||||
__tablename__ = 'workflow'
|
||||
id = db.Column(db.Integer, primary_key=True)
|
||||
bpmn_workflow_json = db.Column(db.JSON)
|
||||
bpmn_workflow_json = deferred(db.Column(db.JSON))
|
||||
status = db.Column(db.Enum(WorkflowStatus))
|
||||
study_id = db.Column(db.Integer, db.ForeignKey('study.id'))
|
||||
study = db.relationship("StudyModel", backref='workflow')
|
||||
study = db.relationship("StudyModel", backref='workflow', lazy='select')
|
||||
workflow_spec_id = db.Column(db.String)
|
||||
total_tasks = db.Column(db.Integer, default=0)
|
||||
completed_tasks = db.Column(db.Integer, default=0)
|
||||
|
@ -272,7 +272,6 @@ class StudyService(object):
|
||||
"""Returns a list of documents related to the study, and any file information
|
||||
that is available.."""
|
||||
|
||||
lastime = firsttime()
|
||||
# Get PB required docs, if Protocol Builder Service is enabled.
|
||||
if ProtocolBuilderService.is_enabled() and study_id is not None:
|
||||
try:
|
||||
@ -282,17 +281,14 @@ class StudyService(object):
|
||||
pb_docs = []
|
||||
else:
|
||||
pb_docs = []
|
||||
lasttime = sincetime("GET_REQUIRED DOCS", lastime)
|
||||
# Loop through all known document types, get the counts for those files,
|
||||
# and use pb_docs to mark those as required.
|
||||
doc_dictionary = DocumentService.get_dictionary()
|
||||
lasttime = sincetime("GET DOC DICTIONARY", lastime)
|
||||
|
||||
|
||||
file_time = 0
|
||||
documents = {}
|
||||
study_files = UserFileService.get_files_for_study(study_id=study_id)
|
||||
lasttime = sincetime("GET_FILES_FOR_STUDY", lastime)
|
||||
|
||||
for code, doc in doc_dictionary.items():
|
||||
|
||||
@ -315,7 +311,6 @@ class StudyService(object):
|
||||
name_list.append(doc[cat_key])
|
||||
doc['display_name'] = ' / '.join(name_list)
|
||||
|
||||
t = firsttime()
|
||||
|
||||
# For each file, get associated workflow status
|
||||
doc_files = list(filter(lambda f: f.irb_doc_code == code, study_files))
|
||||
@ -323,7 +318,6 @@ class StudyService(object):
|
||||
doc['count'] = len(doc_files)
|
||||
doc['files'] = []
|
||||
|
||||
file_time = file_time + firsttime() - t
|
||||
|
||||
for file_model in doc_files:
|
||||
file = File.from_models(file_model, UserFileService.get_file_data(file_model.id), [])
|
||||
@ -336,7 +330,6 @@ class StudyService(object):
|
||||
doc['status'] = status.value
|
||||
|
||||
documents[code] = doc
|
||||
lasttime = sincetime("PROCESS_DOCUMENTS", lastime)
|
||||
return Box(documents)
|
||||
|
||||
@staticmethod
|
||||
|
@ -125,7 +125,6 @@ class WorkflowProcessor(object):
|
||||
if self.WORKFLOW_ID_KEY not in self.bpmn_workflow.data:
|
||||
if not workflow_model.id:
|
||||
session.add(workflow_model)
|
||||
session.commit()
|
||||
# If the model is new, and has no id, save it, write it into the workflow model
|
||||
# and save it again. In this way, the workflow process is always aware of the
|
||||
# database model to which it is associated, and scripts running within the model
|
||||
@ -133,6 +132,7 @@ class WorkflowProcessor(object):
|
||||
self.bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id
|
||||
workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow(
|
||||
self.bpmn_workflow, include_spec=True)
|
||||
|
||||
self.save()
|
||||
|
||||
except MissingSpecError as ke:
|
||||
@ -146,8 +146,8 @@ class WorkflowProcessor(object):
|
||||
def reset(workflow_model, clear_data=False, delete_files=False):
|
||||
# Try to execute a cancel notify
|
||||
try:
|
||||
wp = WorkflowProcessor(workflow_model)
|
||||
wp.cancel_notify() # The executes a notification to all endpoints
|
||||
bpmn_workflow = WorkflowProcessor.__get_bpmn_workflow(workflow_model)
|
||||
WorkflowProcessor.__cancel_notify(bpmn_workflow)
|
||||
except Exception as e:
|
||||
app.logger.error(f"Unable to send a cancel notify for workflow %s during a reset."
|
||||
f" Continuing with the reset anyway so we don't get in an unresolvable"
|
||||
@ -168,12 +168,15 @@ class WorkflowProcessor(object):
|
||||
session.commit()
|
||||
return WorkflowProcessor(workflow_model)
|
||||
|
||||
def __get_bpmn_workflow(self, workflow_model: WorkflowModel, spec: WorkflowSpec, validate_only=False):
|
||||
@staticmethod
|
||||
@timeit
|
||||
def __get_bpmn_workflow(workflow_model: WorkflowModel, spec: WorkflowSpec = None, validate_only=False):
|
||||
if workflow_model.bpmn_workflow_json:
|
||||
bpmn_workflow = self._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json,
|
||||
bpmn_workflow = WorkflowProcessor._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json,
|
||||
workflow_spec=spec)
|
||||
bpmn_workflow.script_engine = WorkflowProcessor._script_engine
|
||||
else:
|
||||
bpmn_workflow = BpmnWorkflow(spec, script_engine=self._script_engine)
|
||||
bpmn_workflow = BpmnWorkflow(spec, script_engine=WorkflowProcessor._script_engine)
|
||||
bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = workflow_model.study_id
|
||||
bpmn_workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY] = validate_only
|
||||
return bpmn_workflow
|
||||
@ -195,18 +198,13 @@ class WorkflowProcessor(object):
|
||||
def run_master_spec(spec_model, study):
|
||||
"""Executes a BPMN specification for the given study, without recording any information to the database
|
||||
Useful for running the master specification, which should not persist. """
|
||||
lasttime = firsttime()
|
||||
spec_files = SpecFileService().get_files(spec_model, include_libraries=True)
|
||||
lasttime = sincetime('load Files', lasttime)
|
||||
spec = WorkflowProcessor.get_spec(spec_files, spec_model)
|
||||
lasttime = sincetime('get spec', lasttime)
|
||||
try:
|
||||
bpmn_workflow = BpmnWorkflow(spec, script_engine=WorkflowProcessor._script_engine)
|
||||
bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = study.id
|
||||
bpmn_workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY] = False
|
||||
lasttime = sincetime('get_workflow', lasttime)
|
||||
bpmn_workflow.do_engine_steps()
|
||||
lasttime = sincetime('run steps', lasttime)
|
||||
except WorkflowException as we:
|
||||
raise ApiError.from_task_spec("error_running_master_spec", str(we), we.sender)
|
||||
|
||||
@ -273,14 +271,19 @@ class WorkflowProcessor(object):
|
||||
raise ApiError.from_workflow_exception("task_error", str(we), we)
|
||||
|
||||
def cancel_notify(self):
|
||||
self.__cancel_notify(self.bpmn_workflow)
|
||||
|
||||
@staticmethod
|
||||
def __cancel_notify(bpmn_workflow):
|
||||
try:
|
||||
# A little hackly, but make the bpmn_workflow catch a cancel event.
|
||||
self.bpmn_workflow.signal('cancel') # generate a cancel signal.
|
||||
self.bpmn_workflow.catch(CancelEventDefinition())
|
||||
self.bpmn_workflow.do_engine_steps()
|
||||
bpmn_workflow.signal('cancel') # generate a cancel signal.
|
||||
bpmn_workflow.catch(CancelEventDefinition())
|
||||
bpmn_workflow.do_engine_steps()
|
||||
except WorkflowTaskExecException as we:
|
||||
raise ApiError.from_workflow_exception("task_error", str(we), we)
|
||||
|
||||
|
||||
def serialize(self):
|
||||
return self._serializer.serialize_workflow(self.bpmn_workflow,include_spec=True)
|
||||
|
||||
|
@ -631,14 +631,10 @@ class WorkflowService(object):
|
||||
def processor_to_workflow_api(processor: WorkflowProcessor, next_task=None):
|
||||
"""Returns an API model representing the state of the current workflow, if requested, and
|
||||
possible, next_task is set to the current_task."""
|
||||
lasttime = firsttime()
|
||||
navigation = processor.bpmn_workflow.get_deep_nav_list()
|
||||
lasttime = sincetime('WS: TO API: create navigation', lasttime)
|
||||
WorkflowService.update_navigation(navigation, processor)
|
||||
lasttime = sincetime('WS: TO API: updateNav', lasttime)
|
||||
spec_service = WorkflowSpecService()
|
||||
spec = spec_service.get_spec(processor.workflow_spec_id)
|
||||
lasttime = sincetime('WS: TO API: GET_SPEC', lasttime)
|
||||
workflow_api = WorkflowApi(
|
||||
id=processor.get_workflow_id(),
|
||||
status=processor.get_status(),
|
||||
@ -652,7 +648,6 @@ class WorkflowService(object):
|
||||
title=spec.display_name,
|
||||
study_id=processor.workflow_model.study_id or None
|
||||
)
|
||||
lasttime = sincetime('WS: TO API: CREATE API', lasttime)
|
||||
if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks.
|
||||
# This may or may not work, sometimes there is no next task to complete.
|
||||
next_task = processor.next_task()
|
||||
@ -666,7 +661,6 @@ class WorkflowService(object):
|
||||
user_uids = WorkflowService.get_users_assigned_to_task(processor, next_task)
|
||||
if not UserService.in_list(user_uids, allow_admin_impersonate=True):
|
||||
workflow_api.next_task.state = WorkflowService.TASK_STATE_LOCKED
|
||||
lasttime = sincetime('WS: TO API: NEXT_TASK', lasttime)
|
||||
|
||||
return workflow_api
|
||||
|
||||
@ -942,9 +936,8 @@ class WorkflowService(object):
|
||||
db.session.query(TaskEventModel). \
|
||||
filter(TaskEventModel.workflow_id == processor.workflow_model.id). \
|
||||
filter(TaskEventModel.action == WorkflowService.TASK_ACTION_ASSIGNMENT).delete()
|
||||
db.session.commit()
|
||||
|
||||
for task in processor.get_current_user_tasks():
|
||||
tasks = processor.get_current_user_tasks()
|
||||
for task in tasks:
|
||||
user_ids = WorkflowService.get_users_assigned_to_task(processor, task)
|
||||
for user_id in user_ids:
|
||||
WorkflowService.log_task_action(user_id, processor, task, WorkflowService.TASK_ACTION_ASSIGNMENT)
|
||||
@ -1012,7 +1005,6 @@ class WorkflowService(object):
|
||||
# date=datetime.utcnow(), <=== For future reference, NEVER do this. Let the database set the time.
|
||||
)
|
||||
db.session.add(task_event)
|
||||
db.session.commit()
|
||||
|
||||
@staticmethod
|
||||
def extract_form_data(latest_data, task):
|
||||
|
Loading…
x
Reference in New Issue
Block a user