cr-connect-workflow/tests/test_wait_event.py
Kelly McDonald 5f722d675f Add a function that gets runs via a background scheduler to look for any workflows that are in a 'waiting' state - it runs the update waiting tasks and does do_engine_steps
We have a test for the function that runs, but an assumption was made that the scheduler module has its own unit tests and we do not need to test that.

fixes #346
2021-06-09 10:42:34 -04:00

31 lines
1.0 KiB
Python

import json
import time
from crc.services.workflow_processor import WorkflowProcessor
from tests.base_test import BaseTest
from crc.models.workflow import WorkflowStatus, WorkflowModel
from crc import db
from crc.api.common import ApiError
from crc.models.task_event import TaskEventModel, TaskEventSchema
from crc.services.workflow_service import WorkflowService
class TestTimerEvent(BaseTest):
def test_timer_event(self):
workflow = self.create_workflow('timer_event')
processor = WorkflowProcessor(workflow)
processor.do_engine_steps()
task = processor.next_task()
processor.complete_task(task)
tasks = processor.get_ready_user_tasks()
self.assertEqual(tasks,[])
processor.save()
time.sleep(.3) # our timer is at .25 sec so we have to wait for it
# get done waiting
WorkflowService.do_waiting()
wf = db.session.query(WorkflowModel).filter(WorkflowModel.id == workflow.id).first()
self.assertTrue(wf.status != WorkflowStatus.waiting)