47 lines
1.9 KiB
Python
47 lines
1.9 KiB
Python
import json
|
|
import time
|
|
|
|
from tests.base_test import BaseTest
|
|
from crc.models.workflow import WorkflowStatus, WorkflowModel
|
|
from crc import db
|
|
from crc.api.common import ApiError
|
|
from crc.models.task_event import TaskEventModel, TaskEventSchema
|
|
from crc.services.workflow_service import WorkflowService
|
|
from crc.services.workflow_processor import WorkflowProcessor
|
|
|
|
|
|
class TestTimerEvent(BaseTest):
|
|
|
|
|
|
def test_timer_event(self):
|
|
workflow = self.create_workflow('timer_event')
|
|
processor = WorkflowProcessor(workflow)
|
|
processor.do_engine_steps()
|
|
task = processor.next_task()
|
|
processor.complete_task(task)
|
|
tasks = processor.get_ready_user_tasks()
|
|
self.assertEqual(tasks,[])
|
|
processor.save()
|
|
time.sleep(.3) # our timer is at .25 sec so we have to wait for it
|
|
# get done waiting
|
|
WorkflowService.do_waiting()
|
|
wf = db.session.query(WorkflowModel).filter(WorkflowModel.id == workflow.id).first()
|
|
self.assertTrue(wf.status != WorkflowStatus.waiting)
|
|
|
|
def test_waiting_event_error(self):
|
|
workflow = self.create_workflow('timer_event_error')
|
|
processor = WorkflowProcessor(workflow)
|
|
processor.do_engine_steps()
|
|
processor.save()
|
|
time.sleep(.3) # our timer is at .25 sec so we have to wait for it
|
|
# get done waiting
|
|
wf = db.session.query(WorkflowModel).filter(WorkflowModel.id == workflow.id).first()
|
|
self.assertTrue(wf.status == WorkflowStatus.waiting)
|
|
with self.assertLogs('crc', level='ERROR') as cm:
|
|
WorkflowService.do_waiting()
|
|
self.assertEqual(1, len(cm.output))
|
|
self.assertRegexpMatches(cm.output[0], f"workflow #%i" % workflow.id)
|
|
self.assertRegexpMatches(cm.output[0], f"study #%i" % workflow.study_id)
|
|
|
|
self.assertTrue(wf.status == WorkflowStatus.waiting)
|