diff --git a/migrations/versions/33c37028fb51_.py b/migrations/versions/d4922c5cd32d_.py similarity index 97% rename from migrations/versions/33c37028fb51_.py rename to migrations/versions/d4922c5cd32d_.py index de49f87f..640d41b6 100644 --- a/migrations/versions/33c37028fb51_.py +++ b/migrations/versions/d4922c5cd32d_.py @@ -1,8 +1,8 @@ """empty message -Revision ID: 33c37028fb51 +Revision ID: d4922c5cd32d Revises: -Create Date: 2022-09-14 08:59:45.896805 +Create Date: 2022-09-15 11:08:25.133655 """ from alembic import op @@ -10,7 +10,7 @@ import sqlalchemy as sa # revision identifiers, used by Alembic. -revision = '33c37028fb51' +revision = 'd4922c5cd32d' down_revision = None branch_labels = None depends_on = None @@ -137,11 +137,12 @@ def upgrade(): op.create_index(op.f('ix_process_instance_report_process_model_identifier'), 'process_instance_report', ['process_model_identifier'], unique=False) op.create_table('secret', sa.Column('id', sa.Integer(), nullable=False), - sa.Column('key', sa.String(length=50), nullable=True), - sa.Column('value', sa.String(length=255), nullable=True), + sa.Column('key', sa.String(length=50), nullable=False), + sa.Column('value', sa.String(length=255), nullable=False), sa.Column('creator_user_id', sa.Integer(), nullable=False), sa.ForeignKeyConstraint(['creator_user_id'], ['user.id'], ), - sa.PrimaryKeyConstraint('id') + sa.PrimaryKeyConstraint('id'), + sa.UniqueConstraint('key') ) op.create_table('user_group_assignment', sa.Column('id', sa.Integer(), nullable=False), @@ -229,7 +230,6 @@ def upgrade(): sa.ForeignKeyConstraint(['secret_id'], ['secret.id'], ), sa.PrimaryKeyConstraint('id') ) - op.create_index(op.f('ix_secret_allowed_process_allowed_relative_path'), 'secret_allowed_process', ['allowed_relative_path'], unique=False) op.create_table('task_event', sa.Column('id', sa.Integer(), nullable=False), sa.Column('user_id', sa.Integer(), nullable=False), @@ -286,7 +286,6 @@ def downgrade(): op.drop_table('message_correlation_message_instance') op.drop_table('data_store') op.drop_table('task_event') - op.drop_index(op.f('ix_secret_allowed_process_allowed_relative_path'), table_name='secret_allowed_process') op.drop_table('secret_allowed_process') op.drop_table('message_instance') op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation') diff --git a/src/spiffworkflow_backend/models/secret_model.py b/src/spiffworkflow_backend/models/secret_model.py index 783dfa40..6a9e1b4c 100644 --- a/src/spiffworkflow_backend/models/secret_model.py +++ b/src/spiffworkflow_backend/models/secret_model.py @@ -13,8 +13,8 @@ class SecretModel(SpiffworkflowBaseDBModel): __tablename__ = "secret" id: int = db.Column(db.Integer, primary_key=True) - key: str = db.Column(db.String(50)) - value: str = db.Column(db.String(255)) + key: str = db.Column(db.String(50), unique=True, nullable=False) + value: str = db.Column(db.String(255), nullable=False) creator_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False) allowed_processes = relationship("SecretAllowedProcessPathModel", cascade="delete") @@ -39,4 +39,4 @@ class SecretAllowedProcessPathModel(SpiffworkflowBaseDBModel): __tablename__ = "secret_allowed_process" id: int = db.Column(db.Integer, primary_key=True) secret_id: int = db.Column(ForeignKey(SecretModel.id), nullable=False) # type: ignore - allowed_relative_path: str = db.Column(db.String(500), nullable=False, index=True) + allowed_relative_path: str = db.Column(db.String(500), nullable=False) diff --git a/src/spiffworkflow_backend/services/secret_service.py b/src/spiffworkflow_backend/services/secret_service.py index badaf62e..a41007a6 100644 --- a/src/spiffworkflow_backend/services/secret_service.py +++ b/src/spiffworkflow_backend/services/secret_service.py @@ -15,7 +15,7 @@ class SecretService: def add_secret( key: str, value: str, - creator_user_id: Optional[int] = None, + creator_user_id: int, ) -> SecretModel: """Add_secret.""" secret_model = SecretModel( @@ -43,13 +43,13 @@ class SecretService: @staticmethod def add_allowed_process( - secret_id: int, user_id: str, allowed_relative_path: str + key: str, user_id: str, allowed_relative_path: str ) -> SecretAllowedProcessPathModel: """Add_allowed_process.""" - creator = SecretModel.query.filter(SecretModel.id == secret_id).first() - if creator == user_id: + secret_model = SecretModel.query.filter(SecretModel.key == key).first() + if secret_model.creator_user_id == user_id: secret_process_model = SecretAllowedProcessPathModel( - secret_id=secret_id, allowed_relative_path=allowed_relative_path + secret_id=secret_model.id, allowed_relative_path=allowed_relative_path ) assert secret_process_model # noqa: S101 db.session.add(secret_process_model) @@ -58,7 +58,7 @@ class SecretService: except Exception as e: raise ApiError( code="create_allowed_process_failure", - message=f"Count not create an allowed process for for secret: {secret_id} " + message=f"Could not create an allowed process for secret with key: {key} " f"with path: {allowed_relative_path}. " f"Original error is {e}", ) from e @@ -66,27 +66,47 @@ class SecretService: else: raise ApiError( code="create_allowed_process_path_error", - message=f"User: {user_id} cannot modify the secret with id : {secret_id}", + message=f"User: {user_id} cannot modify the secret with key : {key}", ) + @staticmethod def update_secret( - self, key: str, value: str, creator_user_id: Optional[int] = None, ) -> None: """Does this pass pre commit?""" - ... + secret_model = SecretModel.query.filter(SecretModel.key == key).first() + if secret_model.creator_user_id == creator_user_id: + secret_model.value = value + db.session.add(secret_model) + try: + db.session.commit() + except Exception as e: + raise ApiError(code='update_secret_error', + message=f"There was an error updating the secret with key: {key}, and value: {value}") + else: + raise ApiError( + code="update_secret_error", + message=f"User: {creator_user_id} cannot update the secret with key : {key}", + ) @staticmethod - def delete_secret(key: str) -> None: + def delete_secret(key: str, user_id: int) -> None: """Delete secret.""" - secret = SecretModel.query.filter(SecretModel.key == key).first() - db.session.delete(secret) - try: - db.session.commit() - except Exception as e: + secret_model = SecretModel.query.filter(SecretModel.key == key).first() + if secret_model.creator_user_id == user_id: + db.session.delete(secret_model) + try: + db.session.commit() + except Exception as e: + raise ApiError( + code="delete_secret_error", + message=f"Could not delete secret with key: {key}. Original error is: {e}", + ) from e + else: raise ApiError( code="delete_secret_error", - message=f"Could not delete secret with key: {key}. Original error is: {e}", - ) from e + message=f"User: {user_id} cannot delete the secret with key : {key}", + ) + diff --git a/tests/spiffworkflow_backend/integration/test_secret_service.py b/tests/spiffworkflow_backend/integration/test_secret_service.py index c0da23c5..ae5c5ec6 100644 --- a/tests/spiffworkflow_backend/integration/test_secret_service.py +++ b/tests/spiffworkflow_backend/integration/test_secret_service.py @@ -1,9 +1,15 @@ """Test_secret_service.""" +import pytest + from flask.app import Flask from flask.testing import FlaskClient +from flask_bpmn.api.api_error import ApiError +from spiffworkflow_backend.services.file_system_service import FileSystemService +from spiffworkflow_backend.services.process_model_service import ProcessModelService + from tests.spiffworkflow_backend.helpers.base_test import BaseTest -from spiffworkflow_backend.models.secret_model import SecretModel +from spiffworkflow_backend.models.secret_model import SecretModel, SecretAllowedProcessPathModel from spiffworkflow_backend.models.user import UserModel from spiffworkflow_backend.services.secret_service import SecretService @@ -13,11 +19,32 @@ class TestSecretService(BaseTest): test_key = "test_key" test_value = "test_value" + test_process_group_id = "test" + test_process_group_display_name = "My Test Process Group" + test_process_model_id = "make_cookies" + test_process_model_display_name = "Cooooookies" + test_process_model_description = "Om nom nom delicious cookies" def add_test_secret(self, user: UserModel) -> SecretModel: """Add_test_secret.""" return SecretService().add_secret(self.test_key, self.test_value, user.id) + def add_test_process(self, client, user): + self.create_process_group( + client, user, self.test_process_group_id, display_name=self.test_process_group_display_name + ) + self.create_process_model_with_api( + client, + process_group_id=self.test_process_group_id, + process_model_id=self.test_process_model_id, + process_model_display_name=self.test_process_model_display_name, + process_model_description=self.test_process_model_description, + ) + process_model_info = ProcessModelService().get_process_model( + self.test_process_model_id, self.test_process_group_id + ) + return process_model_info + def test_add_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None: """Test_add_secret.""" user = self.find_or_create_user() @@ -28,6 +55,13 @@ class TestSecretService(BaseTest): assert test_secret.value == self.test_value assert test_secret.creator_user_id == user.id + def test_add_secret_duplicate_key_fails(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None: + user = self.find_or_create_user() + self.add_test_secret(user) + with pytest.raises(ApiError) as ae: + self.add_test_secret(user) + assert "Duplicate entry" in ae.value.message + def test_get_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None: """Test_get_secret.""" user = self.find_or_create_user() @@ -47,53 +81,61 @@ class TestSecretService(BaseTest): bad_secret = SecretService().get_secret("bad_key") assert bad_secret is None - # def test_secret_add_allowed_process( - # self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None - # ) -> None: - # """Test_secret_add_allowed_process.""" - # process_group_id = "test" - # process_group_display_name = "My Test Process Group" - # - # user = self.find_or_create_user() - # self.create_process_group( - # client, user, process_group_id, display_name=process_group_display_name - # ) - # - # process_model_id = "make_cookies" - # process_model_display_name = "Cooooookies" - # process_model_description = "Om nom nom delicious cookies" - # self.create_process_model( - # client, - # process_group_id=process_group_id, - # process_model_id=process_model_id, - # process_model_display_name=process_model_display_name, - # process_model_description=process_model_description, - # ) - # - # process_model_info = ProcessModelService().get_process_model( - # process_model_id, process_group_id - # ) - # process_model_relative_path = FileSystemService.process_model_relative_path( - # process_model_info - # ) - # - # test_secret = self.add_test_secret(user) - # allowed_process_model = SecretService().add_allowed_process( - # secret_id=test_secret.id, allowed_relative_path=process_model_relative_path - # ) - # assert allowed_process_model is not None - # assert isinstance(allowed_process_model, SecretAllowedProcessPathModel) - # assert allowed_process_model.secret_id == test_secret.id - # assert ( - # allowed_process_model.allowed_relative_path == process_model_relative_path - # ) - # - # assert len(test_secret.allowed_processes) == 1 - # assert test_secret.allowed_processes[0] == allowed_process_model + def test_secret_add_allowed_process( + self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None + ) -> None: + """Test_secret_add_allowed_process.""" + user = self.find_or_create_user() + process_model_info = self.add_test_process(client, user) + process_model_relative_path = FileSystemService.process_model_relative_path( + process_model_info + ) - def test_update_secret(self) -> None: + test_secret = self.add_test_secret(user) + allowed_process_model = SecretService().add_allowed_process( + key=test_secret.key, user_id=user.id, allowed_relative_path=process_model_relative_path + ) + assert allowed_process_model is not None + assert isinstance(allowed_process_model, SecretAllowedProcessPathModel) + assert allowed_process_model.secret_id == test_secret.id + assert ( + allowed_process_model.allowed_relative_path == process_model_relative_path + ) + + assert len(test_secret.allowed_processes) == 1 + assert test_secret.allowed_processes[0] == allowed_process_model + + def test_secret_add_allowed_process_bad_user( + self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None + ) -> None: + user = self.find_or_create_user() + process_model_info = self.add_test_process(client, user) + process_model_relative_path = FileSystemService.process_model_relative_path( + process_model_info + ) + test_secret = self.add_test_secret(user) + with pytest.raises(ApiError) as ae: + SecretService().add_allowed_process( + key=test_secret.key, user_id=user.id+1, allowed_relative_path=process_model_relative_path + ) + assert ae.value.message == f"User: {user.id+1} cannot modify the secret with key : test_key" + + def test_update_secret(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None) -> None: """Test update secret.""" - ... + user = self.find_or_create_user() + self.add_test_secret(user) + secret = SecretService.get_secret(self.test_key) + assert secret == self.test_value + SecretService.update_secret(self.test_key, 'new_secret_value', user.id) + new_secret = SecretService.get_secret(self.test_key) + assert new_secret == 'new_secret_value' + + def test_update_secret_bad_user(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None) -> None: + user = self.find_or_create_user() + self.add_test_secret(user) + with pytest.raises(ApiError) as ae: + SecretService.update_secret(self.test_key, 'new_secret_value', user.id+1) + assert ae.value.message == f"User: {user.id+1} cannot update the secret with key : test_key" def test_delete_secret( self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None @@ -101,7 +143,20 @@ class TestSecretService(BaseTest): """Test delete secret.""" user = self.find_or_create_user() self.add_test_secret(user) - SecretService.delete_secret(self.test_key) + secrets = SecretModel.query.all() + assert len(secrets) == 1 + assert secrets[0].creator_user_id == user.id + SecretService.delete_secret(self.test_key, user.id) + secrets = SecretModel.query.all() + assert len(secrets) == 0 + + def test_delete_secret_bad_user(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None): + user = self.find_or_create_user() + self.add_test_secret(user) + with pytest.raises(ApiError) as ae: + SecretService.delete_secret(self.test_key, user.id+1) + assert ae.value.message == f"User: {user.id+1} cannot delete the secret with key : test_key" + # class TestSecretServiceApi(BaseTest):