cr-connect-workflow/tests/test_workflow_spec_api.py

105 lines
5.2 KiB
Python
Raw Normal View History

2020-02-20 18:30:04 +00:00
import json
from crc import session
from crc.models.file import FileModel
2020-03-16 16:32:39 +00:00
from crc.models.workflow import WorkflowSpecModel, WorkflowSpecModelSchema, WorkflowModel, WorkflowSpecCategoryModel
2020-02-20 18:30:04 +00:00
from tests.base_test import BaseTest
class TestWorkflowSpec(BaseTest):
def test_list_workflow_specifications(self):
self.load_example_data()
spec = session.query(WorkflowSpecModel).first()
rv = self.app.get('/v1.0/workflow-specification',
follow_redirects=True,
content_type="application/json",headers=self.logged_in_headers())
2020-02-20 18:30:04 +00:00
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
specs = WorkflowSpecModelSchema(many=True).load(json_data, session=session)
spec2 = specs[0]
self.assertEqual(spec.id, spec2.id)
self.assertEqual(spec.display_name, spec2.display_name)
self.assertEqual(spec.description, spec2.description)
def test_add_new_workflow_specification(self):
self.load_example_data();
num_before = session.query(WorkflowSpecModel).count()
2020-03-16 16:10:32 +00:00
spec = WorkflowSpecModel(id='make_cookies', name='make_cookies', display_name='Cooooookies',
2020-02-20 18:30:04 +00:00
description='Om nom nom delicious cookies')
2020-03-11 19:27:22 +00:00
rv = self.app.post('/v1.0/workflow-specification',
headers=self.logged_in_headers(),
content_type="application/json",
2020-02-20 18:30:04 +00:00
data=json.dumps(WorkflowSpecModelSchema().dump(spec)))
self.assert_success(rv)
db_spec = session.query(WorkflowSpecModel).filter_by(id='make_cookies').first()
self.assertEqual(spec.display_name, db_spec.display_name)
num_after = session.query(WorkflowSpecModel).count()
self.assertEqual(num_after, num_before + 1)
def test_get_workflow_specification(self):
self.load_example_data()
db_spec = session.query(WorkflowSpecModel).first()
2020-03-11 19:27:22 +00:00
rv = self.app.get('/v1.0/workflow-specification/%s' % db_spec.id, headers=self.logged_in_headers())
2020-02-20 18:30:04 +00:00
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
api_spec = WorkflowSpecModelSchema().load(json_data, session=session)
self.assertEqual(db_spec, api_spec)
2020-03-16 16:32:39 +00:00
def test_update_workflow_specification(self):
self.load_example_data()
category = WorkflowSpecCategoryModel(id=0, name='trap', display_name="It's a trap!")
session.add(category)
session.commit()
db_spec_before: WorkflowSpecModel = session.query(WorkflowSpecModel).first()
spec_id = db_spec_before.id
self.assertIsNone(db_spec_before.workflow_spec_category_id)
db_spec_before.workflow_spec_category_id = 0
rv = self.app.put('/v1.0/workflow-specification/%s' % spec_id,
content_type="application/json",
headers=self.logged_in_headers(),
data=json.dumps(WorkflowSpecModelSchema().dump(db_spec_before)))
self.assert_success(rv)
json_data = json.loads(rv.get_data(as_text=True))
api_spec = WorkflowSpecModelSchema().load(json_data, session=session)
self.assertEqual(db_spec_before, api_spec)
db_spec_after: WorkflowSpecModel = session.query(WorkflowSpecModel).filter_by(id=spec_id).first()
self.assertIsNotNone(db_spec_after.workflow_spec_category_id)
self.assertIsNotNone(db_spec_after.workflow_spec_category)
self.assertEqual(db_spec_after.workflow_spec_category.display_name, category.display_name)
2020-02-20 18:30:04 +00:00
def test_delete_workflow_specification(self):
self.load_example_data()
spec_id = 'random_fact'
self.load_test_spec(spec_id)
2020-02-20 18:30:04 +00:00
num_specs_before = session.query(WorkflowSpecModel).filter_by(id=spec_id).count()
self.assertEqual(num_specs_before, 1)
num_files_before = session.query(FileModel).filter_by(workflow_spec_id=spec_id).count()
num_workflows_before = session.query(WorkflowModel).filter_by(workflow_spec_id=spec_id).count()
self.assertGreater(num_files_before + num_workflows_before, 0)
2020-03-11 19:27:22 +00:00
rv = self.app.delete('/v1.0/workflow-specification/' + spec_id, headers=self.logged_in_headers())
2020-02-20 18:30:04 +00:00
self.assert_success(rv)
num_specs_after = session.query(WorkflowSpecModel).filter_by(id=spec_id).count()
self.assertEqual(0, num_specs_after)
# Make sure that all items in the database with the workflow spec ID are deleted as well.
num_files_after = session.query(FileModel).filter_by(workflow_spec_id=spec_id).count()
num_workflows_after = session.query(WorkflowModel).filter_by(workflow_spec_id=spec_id).count()
self.assertEqual(num_files_after + num_workflows_after, 0)
# def test_validate_workflow_specification(self):
# self.load_example_data()
# db_spec = session.query(WorkflowSpecModel).first()
# rv = self.app.get('/v1.0/workflow-specification/%s/validate' % db_spec.id, headers=self.logged_in_headers())
# self.assert_success(rv)
# json_data = json.loads(rv.get_data(as_text=True))
# api_spec = WorkflowSpecModelSchema().load(json_data, session=session)
# self.assertEqual(db_spec, api_spec)