diff --git a/crc/api/workflow.py b/crc/api/workflow.py index 0ab2b5cb..1cf4f097 100644 --- a/crc/api/workflow.py +++ b/crc/api/workflow.py @@ -213,20 +213,14 @@ def get_workflow(workflow_id, do_engine_steps=True): def restart_workflow(workflow_id, clear_data=False, delete_files=False): """Restart a workflow with the latest spec. Clear data allows user to restart the workflow without previous data.""" - lasttime = firsttime() workflow_model: WorkflowModel = session.query(WorkflowModel).filter_by(id=workflow_id).first() - lasttime = sincetime('load workflow model', lasttime) processor = WorkflowProcessor.reset(workflow_model, clear_data=clear_data, delete_files=delete_files) - lasttime = sincetime('reset model', lasttime) processor.do_engine_steps() - lasttime = sincetime('do steps', lasttime) processor.save() - lasttime = sincetime('save', lasttime) WorkflowService.update_task_assignments(processor) - lasttime = sincetime('update assignments', lasttime) workflow_api_model = WorkflowService.processor_to_workflow_api(processor) - lasttime = sincetime('create api model', lasttime) - return WorkflowApiSchema().dump(workflow_api_model) + api_model = WorkflowApiSchema().dump(workflow_api_model) + return api_model def get_task_events(action = None, workflow = None, study = None): diff --git a/crc/models/workflow.py b/crc/models/workflow.py index 0d8fbd41..9a58f0c5 100644 --- a/crc/models/workflow.py +++ b/crc/models/workflow.py @@ -3,6 +3,7 @@ import enum import marshmallow from marshmallow import EXCLUDE, post_load, fields, INCLUDE from sqlalchemy import func +from sqlalchemy.orm import deferred from crc import db, ma @@ -104,10 +105,10 @@ class WorkflowStatus(enum.Enum): class WorkflowModel(db.Model): __tablename__ = 'workflow' id = db.Column(db.Integer, primary_key=True) - bpmn_workflow_json = db.Column(db.JSON) + bpmn_workflow_json = deferred(db.Column(db.JSON)) status = db.Column(db.Enum(WorkflowStatus)) study_id = db.Column(db.Integer, db.ForeignKey('study.id')) - study = db.relationship("StudyModel", backref='workflow') + study = db.relationship("StudyModel", backref='workflow', lazy='select') workflow_spec_id = db.Column(db.String) total_tasks = db.Column(db.Integer, default=0) completed_tasks = db.Column(db.Integer, default=0) diff --git a/crc/services/study_service.py b/crc/services/study_service.py index 51e5df3e..74d38d52 100755 --- a/crc/services/study_service.py +++ b/crc/services/study_service.py @@ -272,7 +272,6 @@ class StudyService(object): """Returns a list of documents related to the study, and any file information that is available..""" - lastime = firsttime() # Get PB required docs, if Protocol Builder Service is enabled. if ProtocolBuilderService.is_enabled() and study_id is not None: try: @@ -282,17 +281,14 @@ class StudyService(object): pb_docs = [] else: pb_docs = [] - lasttime = sincetime("GET_REQUIRED DOCS", lastime) # Loop through all known document types, get the counts for those files, # and use pb_docs to mark those as required. doc_dictionary = DocumentService.get_dictionary() - lasttime = sincetime("GET DOC DICTIONARY", lastime) file_time = 0 documents = {} study_files = UserFileService.get_files_for_study(study_id=study_id) - lasttime = sincetime("GET_FILES_FOR_STUDY", lastime) for code, doc in doc_dictionary.items(): @@ -315,7 +311,6 @@ class StudyService(object): name_list.append(doc[cat_key]) doc['display_name'] = ' / '.join(name_list) - t = firsttime() # For each file, get associated workflow status doc_files = list(filter(lambda f: f.irb_doc_code == code, study_files)) @@ -323,7 +318,6 @@ class StudyService(object): doc['count'] = len(doc_files) doc['files'] = [] - file_time = file_time + firsttime() - t for file_model in doc_files: file = File.from_models(file_model, UserFileService.get_file_data(file_model.id), []) @@ -336,7 +330,6 @@ class StudyService(object): doc['status'] = status.value documents[code] = doc - lasttime = sincetime("PROCESS_DOCUMENTS", lastime) return Box(documents) @staticmethod diff --git a/crc/services/workflow_processor.py b/crc/services/workflow_processor.py index 796d72e4..ce4494f1 100644 --- a/crc/services/workflow_processor.py +++ b/crc/services/workflow_processor.py @@ -125,7 +125,6 @@ class WorkflowProcessor(object): if self.WORKFLOW_ID_KEY not in self.bpmn_workflow.data: if not workflow_model.id: session.add(workflow_model) - session.commit() # If the model is new, and has no id, save it, write it into the workflow model # and save it again. In this way, the workflow process is always aware of the # database model to which it is associated, and scripts running within the model @@ -133,6 +132,7 @@ class WorkflowProcessor(object): self.bpmn_workflow.data[WorkflowProcessor.WORKFLOW_ID_KEY] = workflow_model.id workflow_model.bpmn_workflow_json = WorkflowProcessor._serializer.serialize_workflow( self.bpmn_workflow, include_spec=True) + self.save() except MissingSpecError as ke: @@ -146,8 +146,8 @@ class WorkflowProcessor(object): def reset(workflow_model, clear_data=False, delete_files=False): # Try to execute a cancel notify try: - wp = WorkflowProcessor(workflow_model) - wp.cancel_notify() # The executes a notification to all endpoints + bpmn_workflow = WorkflowProcessor.__get_bpmn_workflow(workflow_model) + WorkflowProcessor.__cancel_notify(bpmn_workflow) except Exception as e: app.logger.error(f"Unable to send a cancel notify for workflow %s during a reset." f" Continuing with the reset anyway so we don't get in an unresolvable" @@ -168,12 +168,15 @@ class WorkflowProcessor(object): session.commit() return WorkflowProcessor(workflow_model) - def __get_bpmn_workflow(self, workflow_model: WorkflowModel, spec: WorkflowSpec, validate_only=False): + @staticmethod + @timeit + def __get_bpmn_workflow(workflow_model: WorkflowModel, spec: WorkflowSpec = None, validate_only=False): if workflow_model.bpmn_workflow_json: - bpmn_workflow = self._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json, + bpmn_workflow = WorkflowProcessor._serializer.deserialize_workflow(workflow_model.bpmn_workflow_json, workflow_spec=spec) + bpmn_workflow.script_engine = WorkflowProcessor._script_engine else: - bpmn_workflow = BpmnWorkflow(spec, script_engine=self._script_engine) + bpmn_workflow = BpmnWorkflow(spec, script_engine=WorkflowProcessor._script_engine) bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = workflow_model.study_id bpmn_workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY] = validate_only return bpmn_workflow @@ -195,18 +198,13 @@ class WorkflowProcessor(object): def run_master_spec(spec_model, study): """Executes a BPMN specification for the given study, without recording any information to the database Useful for running the master specification, which should not persist. """ - lasttime = firsttime() spec_files = SpecFileService().get_files(spec_model, include_libraries=True) - lasttime = sincetime('load Files', lasttime) spec = WorkflowProcessor.get_spec(spec_files, spec_model) - lasttime = sincetime('get spec', lasttime) try: bpmn_workflow = BpmnWorkflow(spec, script_engine=WorkflowProcessor._script_engine) bpmn_workflow.data[WorkflowProcessor.STUDY_ID_KEY] = study.id bpmn_workflow.data[WorkflowProcessor.VALIDATION_PROCESS_KEY] = False - lasttime = sincetime('get_workflow', lasttime) bpmn_workflow.do_engine_steps() - lasttime = sincetime('run steps', lasttime) except WorkflowException as we: raise ApiError.from_task_spec("error_running_master_spec", str(we), we.sender) @@ -273,14 +271,19 @@ class WorkflowProcessor(object): raise ApiError.from_workflow_exception("task_error", str(we), we) def cancel_notify(self): + self.__cancel_notify(self.bpmn_workflow) + + @staticmethod + def __cancel_notify(bpmn_workflow): try: # A little hackly, but make the bpmn_workflow catch a cancel event. - self.bpmn_workflow.signal('cancel') # generate a cancel signal. - self.bpmn_workflow.catch(CancelEventDefinition()) - self.bpmn_workflow.do_engine_steps() + bpmn_workflow.signal('cancel') # generate a cancel signal. + bpmn_workflow.catch(CancelEventDefinition()) + bpmn_workflow.do_engine_steps() except WorkflowTaskExecException as we: raise ApiError.from_workflow_exception("task_error", str(we), we) + def serialize(self): return self._serializer.serialize_workflow(self.bpmn_workflow,include_spec=True) diff --git a/crc/services/workflow_service.py b/crc/services/workflow_service.py index cde31a55..13f4e270 100755 --- a/crc/services/workflow_service.py +++ b/crc/services/workflow_service.py @@ -631,14 +631,10 @@ class WorkflowService(object): def processor_to_workflow_api(processor: WorkflowProcessor, next_task=None): """Returns an API model representing the state of the current workflow, if requested, and possible, next_task is set to the current_task.""" - lasttime = firsttime() navigation = processor.bpmn_workflow.get_deep_nav_list() - lasttime = sincetime('WS: TO API: create navigation', lasttime) WorkflowService.update_navigation(navigation, processor) - lasttime = sincetime('WS: TO API: updateNav', lasttime) spec_service = WorkflowSpecService() spec = spec_service.get_spec(processor.workflow_spec_id) - lasttime = sincetime('WS: TO API: GET_SPEC', lasttime) workflow_api = WorkflowApi( id=processor.get_workflow_id(), status=processor.get_status(), @@ -652,7 +648,6 @@ class WorkflowService(object): title=spec.display_name, study_id=processor.workflow_model.study_id or None ) - lasttime = sincetime('WS: TO API: CREATE API', lasttime) if not next_task: # The Next Task can be requested to be a certain task, useful for parallel tasks. # This may or may not work, sometimes there is no next task to complete. next_task = processor.next_task() @@ -666,7 +661,6 @@ class WorkflowService(object): user_uids = WorkflowService.get_users_assigned_to_task(processor, next_task) if not UserService.in_list(user_uids, allow_admin_impersonate=True): workflow_api.next_task.state = WorkflowService.TASK_STATE_LOCKED - lasttime = sincetime('WS: TO API: NEXT_TASK', lasttime) return workflow_api @@ -942,9 +936,8 @@ class WorkflowService(object): db.session.query(TaskEventModel). \ filter(TaskEventModel.workflow_id == processor.workflow_model.id). \ filter(TaskEventModel.action == WorkflowService.TASK_ACTION_ASSIGNMENT).delete() - db.session.commit() - - for task in processor.get_current_user_tasks(): + tasks = processor.get_current_user_tasks() + for task in tasks: user_ids = WorkflowService.get_users_assigned_to_task(processor, task) for user_id in user_ids: WorkflowService.log_task_action(user_id, processor, task, WorkflowService.TASK_ACTION_ASSIGNMENT) @@ -1012,7 +1005,6 @@ class WorkflowService(object): # date=datetime.utcnow(), <=== For future reference, NEVER do this. Let the database set the time. ) db.session.add(task_event) - db.session.commit() @staticmethod def extract_form_data(latest_data, task):