diff --git a/crc/__init__.py b/crc/__init__.py index 1d5c5473..62381589 100644 --- a/crc/__init__.py +++ b/crc/__init__.py @@ -12,6 +12,7 @@ from flask_mail import Mail from flask_migrate import Migrate from flask_sqlalchemy import SQLAlchemy from sentry_sdk.integrations.flask import FlaskIntegration +from apscheduler.schedulers.background import BackgroundScheduler logging.basicConfig(level=logging.INFO) @@ -33,6 +34,7 @@ db = SQLAlchemy(app) session = db.session """:type: sqlalchemy.orm.Session""" +scheduler = BackgroundScheduler() # Mail settings mail = Mail(app) @@ -46,6 +48,9 @@ from crc.api import admin connexion_app.add_api('api.yml', base_path='/v1.0') +def setup_scheduler(): + from crc.services.workflow_service import WorkflowService + scheduler.add_job(WorkflowService.do_waiting()) # Convert list of allowed origins to list of regexes origins_re = [r"^https?:\/\/%s(.*)" % o.replace('.', '\.') for o in app.config['CORS_ALLOW_ORIGINS']] diff --git a/crc/services/workflow_processor.py b/crc/services/workflow_processor.py index 8636077b..4600a926 100644 --- a/crc/services/workflow_processor.py +++ b/crc/services/workflow_processor.py @@ -364,6 +364,13 @@ class WorkflowProcessor(object): def get_status(self): return self.status_of(self.bpmn_workflow) + def update_waiting_tasks(self): + try: + self.bpmn_workflow.refresh_waiting_tasks() + except WorkflowTaskExecException as we: + raise ApiError.from_task("task_error", str(we), we.task) + + def do_engine_steps(self): try: self.bpmn_workflow.do_engine_steps() diff --git a/crc/services/workflow_service.py b/crc/services/workflow_service.py index 395048b4..35c34aaf 100644 --- a/crc/services/workflow_service.py +++ b/crc/services/workflow_service.py @@ -90,6 +90,15 @@ class WorkflowService(object): if user: db.session.delete(user) + @staticmethod + def do_waiting(): + records = db.session.query(WorkflowModel).filter(WorkflowModel.status==WorkflowStatus.waiting).all() + for workflow_model in records: + processor = WorkflowProcessor(workflow_model) + processor.bpmn_workflow.update_waiting_tasks() + processor.bpmn_workflow.do_engine_steps() + + @staticmethod def test_spec(spec_id, validate_study_id=None, required_only=False): """Runs a spec through it's paces to see if it results in any errors.