mirror of
https://github.com/status-im/spiff-arena.git
synced 2025-02-07 07:34:17 +00:00
5225a8b4c pyl 259f74a1e Merge branch 'main' into bug/refresh-token d452208ef Merge pull request #135 from sartography/feature/permissions3 8e1075406 Merge branch 'main' into bug/refresh-token 2b01d2fe7 fixed authentication_callback and getting the user w/ burnettk 476e36c7d mypy changes 6403e62c0 Fix migration after merging main 594a32b67 merged in main and resolved conflicts w/ burnettk b285ba1a1 added updated columns to secrets and updated flask-bpmn 7c53fc9fa Merge remote-tracking branch 'origin/main' into feature/permissions3 201a6918a pyl changes a6112f7fb Merge branch 'main' into bug/refresh-token 87f65a6c6 auth_token should be dictionary, not string f163de61c pyl 1f443bb94 PublicAuthenticationService -> AuthenticationService 6c491a3df Don't refresh token here. They just logged in. We are validating the returned token. If it is bad, raise an error. 91b8649f8 id_token -> auth_token fc94774bb Move `store_refresh_token` to authentication_service 00d66e9c5 mypy c4e415dbe mypy 1e75716eb Pre commit a72b03e09 Rename method. We pass it auth_tokens, not id_tokens 9a6700a6d Too many things expect g.token. Reverting my change 74883fb23 Noe store refresh_token, and try to use it if auth_token is expired Renamed some methods to use correct token type be0557013 Cleanup - remove unused code cf01f0d51 Add refresh_token model 1c0c937af added method to delete all permissions so we can recreate them w/ burnettk aaeaac879 Merge remote-tracking branch 'origin/main' into feature/permissions3 44856fce2 added api endpoint to check if user has permissions based on given target uris w/ burnettk ae830054d precommit w/ burnettk 94d50efb1 created common method to check whether an api method should have auth w/ burnettk c955335d0 precommit w/ burnettk 37caf1a69 added a finance user to keycloak and fixed up the staging permission yml w/ burnettk 93c456294 merged in main and resolved conflicts w/ burnettk 06a7c6485 remaining tests are now passing w/ burnettk 50529d04c added test to make sure api gives a 403 if a permission is not found w/ burnettk 6a9d0a68a api calls are somewhat respecting permissions now and the process api tests are passing d07fbbeff attempting to respect permissions w/ burnettk git-subtree-dir: spiffworkflow-backend git-subtree-split: 5225a8b4c101133567d4f7efa33632d36c29c81d
350 lines
12 KiB
Python
350 lines
12 KiB
Python
"""Test_secret_service."""
|
|
import json
|
|
from typing import Optional
|
|
|
|
import pytest
|
|
from flask.app import Flask
|
|
from flask.testing import FlaskClient
|
|
from flask_bpmn.api.api_error import ApiError
|
|
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
|
from werkzeug.test import TestResponse # type: ignore
|
|
|
|
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
|
from spiffworkflow_backend.models.secret_model import SecretModel
|
|
from spiffworkflow_backend.models.secret_model import SecretModelSchema
|
|
from spiffworkflow_backend.models.user import UserModel
|
|
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
|
from spiffworkflow_backend.services.secret_service import SecretService
|
|
|
|
|
|
class SecretServiceTestHelpers(BaseTest):
|
|
"""SecretServiceTestHelpers."""
|
|
|
|
test_key = "test_key"
|
|
test_value = "test_value"
|
|
test_process_group_id = "test"
|
|
test_process_group_display_name = "My Test Process Group"
|
|
test_process_model_id = "make_cookies"
|
|
test_process_model_display_name = "Cooooookies"
|
|
test_process_model_description = "Om nom nom delicious cookies"
|
|
|
|
def add_test_secret(self, user: UserModel) -> SecretModel:
|
|
"""Add_test_secret."""
|
|
return SecretService().add_secret(self.test_key, self.test_value, user.id)
|
|
|
|
def add_test_process(
|
|
self, client: FlaskClient, user: UserModel
|
|
) -> ProcessModelInfo:
|
|
"""Add_test_process."""
|
|
self.create_process_group(
|
|
client,
|
|
user,
|
|
self.test_process_group_id,
|
|
display_name=self.test_process_group_display_name,
|
|
)
|
|
self.create_process_model_with_api(
|
|
client,
|
|
process_group_id=self.test_process_group_id,
|
|
process_model_id=self.test_process_model_id,
|
|
process_model_display_name=self.test_process_model_display_name,
|
|
process_model_description=self.test_process_model_description,
|
|
user=user,
|
|
)
|
|
process_model_info = ProcessModelService().get_process_model(
|
|
self.test_process_model_id, self.test_process_group_id
|
|
)
|
|
return process_model_info
|
|
|
|
|
|
class TestSecretService(SecretServiceTestHelpers):
|
|
"""TestSecretService."""
|
|
|
|
def test_add_secret(
|
|
self,
|
|
app: Flask,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_add_secret."""
|
|
test_secret = self.add_test_secret(with_super_admin_user)
|
|
|
|
assert test_secret is not None
|
|
assert test_secret.key == self.test_key
|
|
assert test_secret.value == self.test_value
|
|
assert test_secret.creator_user_id == with_super_admin_user.id
|
|
|
|
def test_add_secret_duplicate_key_fails(
|
|
self,
|
|
app: Flask,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_add_secret_duplicate_key_fails."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
with pytest.raises(ApiError) as ae:
|
|
self.add_test_secret(with_super_admin_user)
|
|
assert ae.value.error_code == "create_secret_error"
|
|
|
|
def test_get_secret(
|
|
self,
|
|
app: Flask,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_get_secret."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
|
|
secret = SecretService().get_secret(self.test_key)
|
|
assert secret is not None
|
|
assert secret.value == self.test_value
|
|
|
|
def test_get_secret_bad_key_fails(
|
|
self,
|
|
app: Flask,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_get_secret_bad_service."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
|
|
with pytest.raises(ApiError):
|
|
SecretService().get_secret("bad_key")
|
|
|
|
def test_update_secret(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test update secret."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
secret = SecretService.get_secret(self.test_key)
|
|
assert secret
|
|
assert secret.value == self.test_value
|
|
SecretService.update_secret(
|
|
self.test_key, "new_secret_value", with_super_admin_user.id
|
|
)
|
|
new_secret = SecretService.get_secret(self.test_key)
|
|
assert new_secret
|
|
assert new_secret.value == "new_secret_value" # noqa: S105
|
|
|
|
def test_update_secret_bad_user_fails(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_update_secret_bad_user."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
with pytest.raises(ApiError) as ae:
|
|
SecretService.update_secret(
|
|
self.test_key, "new_secret_value", with_super_admin_user.id + 1
|
|
) # noqa: S105
|
|
assert (
|
|
ae.value.message
|
|
== f"User: {with_super_admin_user.id+1} cannot update the secret with key : test_key"
|
|
)
|
|
|
|
def test_update_secret_bad_secret_fails(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_update_secret_bad_secret_fails."""
|
|
secret = self.add_test_secret(with_super_admin_user)
|
|
with pytest.raises(ApiError) as ae:
|
|
SecretService.update_secret(
|
|
secret.key + "x", "some_new_value", with_super_admin_user.id
|
|
)
|
|
assert "Resource does not exist" in ae.value.message
|
|
assert ae.value.error_code == "update_secret_error"
|
|
|
|
def test_delete_secret(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test delete secret."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
secrets = SecretModel.query.all()
|
|
assert len(secrets) == 1
|
|
assert secrets[0].creator_user_id == with_super_admin_user.id
|
|
SecretService.delete_secret(self.test_key, with_super_admin_user.id)
|
|
secrets = SecretModel.query.all()
|
|
assert len(secrets) == 0
|
|
|
|
def test_delete_secret_bad_user_fails(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_delete_secret_bad_user."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
with pytest.raises(ApiError) as ae:
|
|
SecretService.delete_secret(self.test_key, with_super_admin_user.id + 1)
|
|
assert (
|
|
f"User: {with_super_admin_user.id+1} cannot delete the secret with key"
|
|
in ae.value.message
|
|
)
|
|
|
|
def test_delete_secret_bad_secret_fails(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_delete_secret_bad_secret_fails."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
with pytest.raises(ApiError) as ae:
|
|
SecretService.delete_secret(self.test_key + "x", with_super_admin_user.id)
|
|
assert "Resource does not exist" in ae.value.message
|
|
|
|
|
|
class TestSecretServiceApi(SecretServiceTestHelpers):
|
|
"""TestSecretServiceApi."""
|
|
|
|
def test_add_secret(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_add_secret."""
|
|
secret_model = SecretModel(
|
|
key=self.test_key,
|
|
value=self.test_value,
|
|
creator_user_id=with_super_admin_user.id,
|
|
)
|
|
data = json.dumps(SecretModelSchema().dump(secret_model))
|
|
response: TestResponse = client.post(
|
|
"/v1.0/secrets",
|
|
headers=self.logged_in_headers(with_super_admin_user),
|
|
content_type="application/json",
|
|
data=data,
|
|
)
|
|
assert response.json
|
|
secret: dict = response.json
|
|
for key in ["key", "value", "creator_user_id"]:
|
|
assert key in secret.keys()
|
|
assert secret["key"] == self.test_key
|
|
assert secret["value"] == self.test_value
|
|
assert secret["creator_user_id"] == with_super_admin_user.id
|
|
|
|
def test_get_secret(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test get secret."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
secret_response = client.get(
|
|
f"/v1.0/secrets/{self.test_key}",
|
|
headers=self.logged_in_headers(with_super_admin_user),
|
|
)
|
|
assert secret_response
|
|
assert secret_response.status_code == 200
|
|
assert secret_response.json
|
|
assert secret_response.json["value"] == self.test_value
|
|
|
|
def test_update_secret(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_update_secret."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
secret: Optional[SecretModel] = SecretService.get_secret(self.test_key)
|
|
assert secret
|
|
assert secret.value == self.test_value
|
|
secret_model = SecretModel(
|
|
key=self.test_key,
|
|
value="new_secret_value",
|
|
creator_user_id=with_super_admin_user.id,
|
|
)
|
|
response = client.put(
|
|
f"/v1.0/secrets/{self.test_key}",
|
|
headers=self.logged_in_headers(with_super_admin_user),
|
|
content_type="application/json",
|
|
data=json.dumps(SecretModelSchema().dump(secret_model)),
|
|
)
|
|
assert response.status_code == 200
|
|
|
|
secret_model = SecretModel.query.filter(
|
|
SecretModel.key == self.test_key
|
|
).first()
|
|
assert secret_model.value == "new_secret_value"
|
|
|
|
def test_delete_secret(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test delete secret."""
|
|
self.add_test_secret(with_super_admin_user)
|
|
secret = SecretService.get_secret(self.test_key)
|
|
assert secret
|
|
assert secret.value == self.test_value
|
|
secret_response = client.delete(
|
|
f"/v1.0/secrets/{self.test_key}",
|
|
headers=self.logged_in_headers(with_super_admin_user),
|
|
)
|
|
assert secret_response.status_code == 200
|
|
with pytest.raises(ApiError):
|
|
secret = SecretService.get_secret(self.test_key)
|
|
|
|
def test_delete_secret_bad_user(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test_delete_secret_bad_user."""
|
|
user_1 = self.find_or_create_user()
|
|
user_2 = self.find_or_create_user("test_user_2")
|
|
self.add_test_secret(user_1)
|
|
|
|
# ensure user has permissions to delete the given secret
|
|
self.add_permissions_to_user(
|
|
user_2,
|
|
target_uri=f"/v1.0/secrets/{self.test_key}",
|
|
permission_names=["delete"],
|
|
)
|
|
secret_response = client.delete(
|
|
f"/v1.0/secrets/{self.test_key}",
|
|
headers=self.logged_in_headers(user_2),
|
|
)
|
|
assert secret_response.status_code == 401
|
|
assert secret_response.json
|
|
assert secret_response.json["error_code"] == "delete_secret_error"
|
|
|
|
def test_delete_secret_bad_key(
|
|
self,
|
|
app: Flask,
|
|
client: FlaskClient,
|
|
with_db_and_bpmn_file_cleanup: None,
|
|
with_super_admin_user: UserModel,
|
|
) -> None:
|
|
"""Test delete secret."""
|
|
secret_response = client.delete(
|
|
"/v1.0/secrets/bad_secret_key",
|
|
headers=self.logged_in_headers(with_super_admin_user),
|
|
)
|
|
assert secret_response.status_code == 404
|