add workflow_sync test
This commit is contained in:
parent
993c7bc76e
commit
9eea26e019
|
@ -31,7 +31,7 @@ def get_changed_workflows(remote,as_df=False):
|
|||
response = requests.get('http://'+remote+'/v1.0/workflow_sync/all',headers={'X-CR-API-KEY':app.config['API_TOKEN']})
|
||||
except:
|
||||
raise ApiError("endpoint error", 'had a problem connecting to '+remote)
|
||||
if response.status_code != 200:
|
||||
if not response.ok:
|
||||
raise ApiError("endpoint error", response.text)
|
||||
|
||||
remote = pd.DataFrame(json.loads(response.text))
|
||||
|
@ -102,7 +102,7 @@ def sync_changed_files(remote,workflow_spec_id):
|
|||
except:
|
||||
raise ApiError("endpoint error", 'had a problem connecting to '+remote)
|
||||
|
||||
if remotespectext.status_code != 200:
|
||||
if not remotespectext.ok:
|
||||
raise ApiError("endpoint error", response.text)
|
||||
|
||||
specdict = json.loads(remotespectext.text)
|
||||
|
@ -170,7 +170,7 @@ def sync_changed_files(remote,workflow_spec_id):
|
|||
except:
|
||||
raise ApiError("endpoint error", 'had a problem connecting to ' + remote)
|
||||
|
||||
if response.status_code != 200:
|
||||
if not response.ok:
|
||||
raise ApiError("endpoint error", response.text)
|
||||
|
||||
FileService.update_file(currentfile,response.content,updatefile['type'])
|
||||
|
@ -190,7 +190,7 @@ def get_changed_files(remote,workflow_spec_id,as_df=False):
|
|||
except:
|
||||
raise ApiError("endpoint error", 'had a problem connecting to '+remote)
|
||||
|
||||
if response.status_code != 200:
|
||||
if not response.ok:
|
||||
raise ApiError("endpoint error", response.text)
|
||||
|
||||
# This is probably very and may allow cross site attacks - fix later
|
||||
|
|
|
@ -204,6 +204,14 @@ class BaseTest(unittest.TestCase):
|
|||
data = myfile.read()
|
||||
return data
|
||||
|
||||
@staticmethod
|
||||
def workflow_sync_response(file_name):
|
||||
filepath = os.path.join(app.root_path, '..', 'tests', 'data', 'workflow_sync_responses', file_name)
|
||||
with open(filepath, 'r') as myfile:
|
||||
data = myfile.read()
|
||||
return data
|
||||
|
||||
|
||||
def assert_success(self, rv, msg=""):
|
||||
try:
|
||||
data = json.loads(rv.get_data(as_text=True))
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
from unittest.mock import patch
|
||||
|
||||
from crc import app
|
||||
from tests.base_test import BaseTest
|
||||
from crc.api.workflow_sync import get_all_spec_state, get_changed_workflows
|
||||
import json
|
||||
pip
|
||||
|
||||
class TestWorkflowSync(BaseTest):
|
||||
|
||||
@patch('crc.api.workflow_sync.requests.get')
|
||||
def test_get_no_changes(self, mock_get):
|
||||
othersys = get_all_spec_state()
|
||||
mock_get.return_value.ok = True
|
||||
mock_get.return_value.text = json.dumps(othersys)
|
||||
response = get_changed_workflows('somewhere over the rainbow')
|
||||
self.assertIsNotNone(response)
|
||||
self.assertEqual(response,[])
|
||||
|
Loading…
Reference in New Issue