Secrets now uses simple key/value instead of service and client
Finished update and delete Add checks for correct user
This commit is contained in:
parent
5acf06046d
commit
faffa9c37b
|
@ -1,8 +1,8 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: 33c37028fb51
|
||||
Revision ID: d4922c5cd32d
|
||||
Revises:
|
||||
Create Date: 2022-09-14 08:59:45.896805
|
||||
Create Date: 2022-09-15 11:08:25.133655
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
@ -10,7 +10,7 @@ import sqlalchemy as sa
|
|||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '33c37028fb51'
|
||||
revision = 'd4922c5cd32d'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
@ -137,11 +137,12 @@ def upgrade():
|
|||
op.create_index(op.f('ix_process_instance_report_process_model_identifier'), 'process_instance_report', ['process_model_identifier'], unique=False)
|
||||
op.create_table('secret',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('key', sa.String(length=50), nullable=True),
|
||||
sa.Column('value', sa.String(length=255), nullable=True),
|
||||
sa.Column('key', sa.String(length=50), nullable=False),
|
||||
sa.Column('value', sa.String(length=255), nullable=False),
|
||||
sa.Column('creator_user_id', sa.Integer(), nullable=False),
|
||||
sa.ForeignKeyConstraint(['creator_user_id'], ['user.id'], ),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.UniqueConstraint('key')
|
||||
)
|
||||
op.create_table('user_group_assignment',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
|
@ -229,7 +230,6 @@ def upgrade():
|
|||
sa.ForeignKeyConstraint(['secret_id'], ['secret.id'], ),
|
||||
sa.PrimaryKeyConstraint('id')
|
||||
)
|
||||
op.create_index(op.f('ix_secret_allowed_process_allowed_relative_path'), 'secret_allowed_process', ['allowed_relative_path'], unique=False)
|
||||
op.create_table('task_event',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('user_id', sa.Integer(), nullable=False),
|
||||
|
@ -286,7 +286,6 @@ def downgrade():
|
|||
op.drop_table('message_correlation_message_instance')
|
||||
op.drop_table('data_store')
|
||||
op.drop_table('task_event')
|
||||
op.drop_index(op.f('ix_secret_allowed_process_allowed_relative_path'), table_name='secret_allowed_process')
|
||||
op.drop_table('secret_allowed_process')
|
||||
op.drop_table('message_instance')
|
||||
op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation')
|
|
@ -13,8 +13,8 @@ class SecretModel(SpiffworkflowBaseDBModel):
|
|||
|
||||
__tablename__ = "secret"
|
||||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
key: str = db.Column(db.String(50))
|
||||
value: str = db.Column(db.String(255))
|
||||
key: str = db.Column(db.String(50), unique=True, nullable=False)
|
||||
value: str = db.Column(db.String(255), nullable=False)
|
||||
creator_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False)
|
||||
|
||||
allowed_processes = relationship("SecretAllowedProcessPathModel", cascade="delete")
|
||||
|
@ -39,4 +39,4 @@ class SecretAllowedProcessPathModel(SpiffworkflowBaseDBModel):
|
|||
__tablename__ = "secret_allowed_process"
|
||||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
secret_id: int = db.Column(ForeignKey(SecretModel.id), nullable=False) # type: ignore
|
||||
allowed_relative_path: str = db.Column(db.String(500), nullable=False, index=True)
|
||||
allowed_relative_path: str = db.Column(db.String(500), nullable=False)
|
||||
|
|
|
@ -15,7 +15,7 @@ class SecretService:
|
|||
def add_secret(
|
||||
key: str,
|
||||
value: str,
|
||||
creator_user_id: Optional[int] = None,
|
||||
creator_user_id: int,
|
||||
) -> SecretModel:
|
||||
"""Add_secret."""
|
||||
secret_model = SecretModel(
|
||||
|
@ -43,13 +43,13 @@ class SecretService:
|
|||
|
||||
@staticmethod
|
||||
def add_allowed_process(
|
||||
secret_id: int, user_id: str, allowed_relative_path: str
|
||||
key: str, user_id: str, allowed_relative_path: str
|
||||
) -> SecretAllowedProcessPathModel:
|
||||
"""Add_allowed_process."""
|
||||
creator = SecretModel.query.filter(SecretModel.id == secret_id).first()
|
||||
if creator == user_id:
|
||||
secret_model = SecretModel.query.filter(SecretModel.key == key).first()
|
||||
if secret_model.creator_user_id == user_id:
|
||||
secret_process_model = SecretAllowedProcessPathModel(
|
||||
secret_id=secret_id, allowed_relative_path=allowed_relative_path
|
||||
secret_id=secret_model.id, allowed_relative_path=allowed_relative_path
|
||||
)
|
||||
assert secret_process_model # noqa: S101
|
||||
db.session.add(secret_process_model)
|
||||
|
@ -58,7 +58,7 @@ class SecretService:
|
|||
except Exception as e:
|
||||
raise ApiError(
|
||||
code="create_allowed_process_failure",
|
||||
message=f"Count not create an allowed process for for secret: {secret_id} "
|
||||
message=f"Could not create an allowed process for secret with key: {key} "
|
||||
f"with path: {allowed_relative_path}. "
|
||||
f"Original error is {e}",
|
||||
) from e
|
||||
|
@ -66,27 +66,47 @@ class SecretService:
|
|||
else:
|
||||
raise ApiError(
|
||||
code="create_allowed_process_path_error",
|
||||
message=f"User: {user_id} cannot modify the secret with id : {secret_id}",
|
||||
message=f"User: {user_id} cannot modify the secret with key : {key}",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def update_secret(
|
||||
self,
|
||||
key: str,
|
||||
value: str,
|
||||
creator_user_id: Optional[int] = None,
|
||||
) -> None:
|
||||
"""Does this pass pre commit?"""
|
||||
...
|
||||
secret_model = SecretModel.query.filter(SecretModel.key == key).first()
|
||||
if secret_model.creator_user_id == creator_user_id:
|
||||
secret_model.value = value
|
||||
db.session.add(secret_model)
|
||||
try:
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
raise ApiError(code='update_secret_error',
|
||||
message=f"There was an error updating the secret with key: {key}, and value: {value}")
|
||||
else:
|
||||
raise ApiError(
|
||||
code="update_secret_error",
|
||||
message=f"User: {creator_user_id} cannot update the secret with key : {key}",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def delete_secret(key: str) -> None:
|
||||
def delete_secret(key: str, user_id: int) -> None:
|
||||
"""Delete secret."""
|
||||
secret = SecretModel.query.filter(SecretModel.key == key).first()
|
||||
db.session.delete(secret)
|
||||
try:
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
secret_model = SecretModel.query.filter(SecretModel.key == key).first()
|
||||
if secret_model.creator_user_id == user_id:
|
||||
db.session.delete(secret_model)
|
||||
try:
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
raise ApiError(
|
||||
code="delete_secret_error",
|
||||
message=f"Could not delete secret with key: {key}. Original error is: {e}",
|
||||
) from e
|
||||
else:
|
||||
raise ApiError(
|
||||
code="delete_secret_error",
|
||||
message=f"Could not delete secret with key: {key}. Original error is: {e}",
|
||||
) from e
|
||||
message=f"User: {user_id} cannot delete the secret with key : {key}",
|
||||
)
|
||||
|
||||
|
|
|
@ -1,9 +1,15 @@
|
|||
"""Test_secret_service."""
|
||||
import pytest
|
||||
|
||||
from flask.app import Flask
|
||||
from flask.testing import FlaskClient
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
|
||||
from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
||||
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel, SecretAllowedProcessPathModel
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.secret_service import SecretService
|
||||
|
||||
|
@ -13,11 +19,32 @@ class TestSecretService(BaseTest):
|
|||
|
||||
test_key = "test_key"
|
||||
test_value = "test_value"
|
||||
test_process_group_id = "test"
|
||||
test_process_group_display_name = "My Test Process Group"
|
||||
test_process_model_id = "make_cookies"
|
||||
test_process_model_display_name = "Cooooookies"
|
||||
test_process_model_description = "Om nom nom delicious cookies"
|
||||
|
||||
def add_test_secret(self, user: UserModel) -> SecretModel:
|
||||
"""Add_test_secret."""
|
||||
return SecretService().add_secret(self.test_key, self.test_value, user.id)
|
||||
|
||||
def add_test_process(self, client, user):
|
||||
self.create_process_group(
|
||||
client, user, self.test_process_group_id, display_name=self.test_process_group_display_name
|
||||
)
|
||||
self.create_process_model_with_api(
|
||||
client,
|
||||
process_group_id=self.test_process_group_id,
|
||||
process_model_id=self.test_process_model_id,
|
||||
process_model_display_name=self.test_process_model_display_name,
|
||||
process_model_description=self.test_process_model_description,
|
||||
)
|
||||
process_model_info = ProcessModelService().get_process_model(
|
||||
self.test_process_model_id, self.test_process_group_id
|
||||
)
|
||||
return process_model_info
|
||||
|
||||
def test_add_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_add_secret."""
|
||||
user = self.find_or_create_user()
|
||||
|
@ -28,6 +55,13 @@ class TestSecretService(BaseTest):
|
|||
assert test_secret.value == self.test_value
|
||||
assert test_secret.creator_user_id == user.id
|
||||
|
||||
def test_add_secret_duplicate_key_fails(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
self.add_test_secret(user)
|
||||
assert "Duplicate entry" in ae.value.message
|
||||
|
||||
def test_get_secret(self, app: Flask, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||
"""Test_get_secret."""
|
||||
user = self.find_or_create_user()
|
||||
|
@ -47,53 +81,61 @@ class TestSecretService(BaseTest):
|
|||
bad_secret = SecretService().get_secret("bad_key")
|
||||
assert bad_secret is None
|
||||
|
||||
# def test_secret_add_allowed_process(
|
||||
# self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
# ) -> None:
|
||||
# """Test_secret_add_allowed_process."""
|
||||
# process_group_id = "test"
|
||||
# process_group_display_name = "My Test Process Group"
|
||||
#
|
||||
# user = self.find_or_create_user()
|
||||
# self.create_process_group(
|
||||
# client, user, process_group_id, display_name=process_group_display_name
|
||||
# )
|
||||
#
|
||||
# process_model_id = "make_cookies"
|
||||
# process_model_display_name = "Cooooookies"
|
||||
# process_model_description = "Om nom nom delicious cookies"
|
||||
# self.create_process_model(
|
||||
# client,
|
||||
# process_group_id=process_group_id,
|
||||
# process_model_id=process_model_id,
|
||||
# process_model_display_name=process_model_display_name,
|
||||
# process_model_description=process_model_description,
|
||||
# )
|
||||
#
|
||||
# process_model_info = ProcessModelService().get_process_model(
|
||||
# process_model_id, process_group_id
|
||||
# )
|
||||
# process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
# process_model_info
|
||||
# )
|
||||
#
|
||||
# test_secret = self.add_test_secret(user)
|
||||
# allowed_process_model = SecretService().add_allowed_process(
|
||||
# secret_id=test_secret.id, allowed_relative_path=process_model_relative_path
|
||||
# )
|
||||
# assert allowed_process_model is not None
|
||||
# assert isinstance(allowed_process_model, SecretAllowedProcessPathModel)
|
||||
# assert allowed_process_model.secret_id == test_secret.id
|
||||
# assert (
|
||||
# allowed_process_model.allowed_relative_path == process_model_relative_path
|
||||
# )
|
||||
#
|
||||
# assert len(test_secret.allowed_processes) == 1
|
||||
# assert test_secret.allowed_processes[0] == allowed_process_model
|
||||
def test_secret_add_allowed_process(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_secret_add_allowed_process."""
|
||||
user = self.find_or_create_user()
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
|
||||
def test_update_secret(self) -> None:
|
||||
test_secret = self.add_test_secret(user)
|
||||
allowed_process_model = SecretService().add_allowed_process(
|
||||
key=test_secret.key, user_id=user.id, allowed_relative_path=process_model_relative_path
|
||||
)
|
||||
assert allowed_process_model is not None
|
||||
assert isinstance(allowed_process_model, SecretAllowedProcessPathModel)
|
||||
assert allowed_process_model.secret_id == test_secret.id
|
||||
assert (
|
||||
allowed_process_model.allowed_relative_path == process_model_relative_path
|
||||
)
|
||||
|
||||
assert len(test_secret.allowed_processes) == 1
|
||||
assert test_secret.allowed_processes[0] == allowed_process_model
|
||||
|
||||
def test_secret_add_allowed_process_bad_user(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
user = self.find_or_create_user()
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
test_secret = self.add_test_secret(user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().add_allowed_process(
|
||||
key=test_secret.key, user_id=user.id+1, allowed_relative_path=process_model_relative_path
|
||||
)
|
||||
assert ae.value.message == f"User: {user.id+1} cannot modify the secret with key : test_key"
|
||||
|
||||
def test_update_secret(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||
"""Test update secret."""
|
||||
...
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
secret = SecretService.get_secret(self.test_key)
|
||||
assert secret == self.test_value
|
||||
SecretService.update_secret(self.test_key, 'new_secret_value', user.id)
|
||||
new_secret = SecretService.get_secret(self.test_key)
|
||||
assert new_secret == 'new_secret_value'
|
||||
|
||||
def test_update_secret_bad_user(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None) -> None:
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService.update_secret(self.test_key, 'new_secret_value', user.id+1)
|
||||
assert ae.value.message == f"User: {user.id+1} cannot update the secret with key : test_key"
|
||||
|
||||
def test_delete_secret(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
|
@ -101,7 +143,20 @@ class TestSecretService(BaseTest):
|
|||
"""Test delete secret."""
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
SecretService.delete_secret(self.test_key)
|
||||
secrets = SecretModel.query.all()
|
||||
assert len(secrets) == 1
|
||||
assert secrets[0].creator_user_id == user.id
|
||||
SecretService.delete_secret(self.test_key, user.id)
|
||||
secrets = SecretModel.query.all()
|
||||
assert len(secrets) == 0
|
||||
|
||||
def test_delete_secret_bad_user(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None):
|
||||
user = self.find_or_create_user()
|
||||
self.add_test_secret(user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService.delete_secret(self.test_key, user.id+1)
|
||||
assert ae.value.message == f"User: {user.id+1} cannot delete the secret with key : test_key"
|
||||
|
||||
|
||||
|
||||
# class TestSecretServiceApi(BaseTest):
|
||||
|
|
Loading…
Reference in New Issue