Remove allowed process stuff from secrets
This commit is contained in:
parent
85808cb6b4
commit
516ee5fcd7
|
@ -1,8 +1,8 @@
|
|||
"""empty message
|
||||
|
||||
Revision ID: 5f7d61fa371c
|
||||
Revision ID: 07ff3fbef405
|
||||
Revises:
|
||||
Create Date: 2022-10-11 14:45:41.213890
|
||||
Create Date: 2022-10-13 07:56:01.234090
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
@ -10,7 +10,7 @@ import sqlalchemy as sa
|
|||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = '5f7d61fa371c'
|
||||
revision = '07ff3fbef405'
|
||||
down_revision = None
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
@ -233,14 +233,6 @@ def upgrade():
|
|||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.UniqueConstraint('principal_id', 'permission_target_id', 'permission', name='permission_assignment_uniq')
|
||||
)
|
||||
op.create_table('secret_allowed_process',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('secret_id', sa.Integer(), nullable=False),
|
||||
sa.Column('allowed_relative_path', sa.String(length=500), nullable=False),
|
||||
sa.ForeignKeyConstraint(['secret_id'], ['secret.id'], ),
|
||||
sa.PrimaryKeyConstraint('id'),
|
||||
sa.UniqueConstraint('secret_id', 'allowed_relative_path', name='unique_secret_path')
|
||||
)
|
||||
op.create_table('spiff_logging',
|
||||
sa.Column('id', sa.Integer(), nullable=False),
|
||||
sa.Column('process_instance_id', sa.Integer(), nullable=False),
|
||||
|
@ -313,7 +305,6 @@ def downgrade():
|
|||
op.drop_table('data_store')
|
||||
op.drop_table('task_event')
|
||||
op.drop_table('spiff_logging')
|
||||
op.drop_table('secret_allowed_process')
|
||||
op.drop_table('permission_assignment')
|
||||
op.drop_table('message_instance')
|
||||
op.drop_index(op.f('ix_message_correlation_value'), table_name='message_correlation')
|
|
@ -1195,70 +1195,6 @@ paths:
|
|||
"404":
|
||||
description: Secret does not exist
|
||||
|
||||
/secrets/allowed_process_paths:
|
||||
post:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.add_allowed_process_path
|
||||
summary: Create an allowed process to a secret
|
||||
tags:
|
||||
- Secrets
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
responses:
|
||||
"201":
|
||||
description: Allowed process created successfully
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
/secrets/allowed_process_paths/{allowed_process_path_id}:
|
||||
parameters:
|
||||
- name: allowed_process_path_id
|
||||
in: path
|
||||
required: true
|
||||
description: The id of the allowed process path to delete
|
||||
schema:
|
||||
type: integer
|
||||
get:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.allowed_process_path_get
|
||||
summary: Get an existing allowed process for a secret, by id
|
||||
tags:
|
||||
- Secrets
|
||||
responses:
|
||||
"200":
|
||||
description: Return a secret allowed process
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
put:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.allowed_process_path_update
|
||||
summary: Update an existing allowed process for a secret
|
||||
tags:
|
||||
- Secrets
|
||||
requestBody:
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
responses:
|
||||
"200":
|
||||
description: Secret allowed process updated successfully
|
||||
content:
|
||||
application/json:
|
||||
schema:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
delete:
|
||||
operationId: spiffworkflow_backend.routes.process_api_blueprint.delete_allowed_process_path
|
||||
summary: Delete an existing allowed process for a secret
|
||||
tags:
|
||||
- Secrets
|
||||
responses:
|
||||
"204":
|
||||
description: The allowed process is deleted.
|
||||
|
||||
components:
|
||||
securitySchemes:
|
||||
jwt:
|
||||
|
@ -2024,12 +1960,6 @@ components:
|
|||
type: number
|
||||
example: 1
|
||||
nullable: false
|
||||
allowed_processes:
|
||||
description: The processes allowed to access this secret
|
||||
type: array
|
||||
items:
|
||||
$ref: "#/components/schemas/SecretAllowedProcessPath"
|
||||
nullable: true
|
||||
ProcessInstanceLog:
|
||||
properties:
|
||||
id:
|
||||
|
@ -2061,24 +1991,3 @@ components:
|
|||
description: The timestamp returned in the log
|
||||
type: number
|
||||
example: 123456789.12345
|
||||
SecretAllowedProcessPath:
|
||||
properties:
|
||||
id:
|
||||
description: The id of the allowed process path
|
||||
type: number
|
||||
example: 1
|
||||
nullable: true
|
||||
secret_key:
|
||||
description: The key of the secret associated with this allowed process path
|
||||
type: string
|
||||
example: 2
|
||||
nullable: true
|
||||
secret_id:
|
||||
description: The id of the secret associated with this allowed process path
|
||||
type: number
|
||||
example: 2
|
||||
nullable: true
|
||||
allowed_relative_path:
|
||||
description: The allowed process path
|
||||
type: string
|
||||
example: /group_one/group_two/model_a
|
||||
|
|
|
@ -45,9 +45,6 @@ from spiffworkflow_backend.models.process_instance import (
|
|||
from spiffworkflow_backend.models.process_instance_report import (
|
||||
ProcessInstanceReportModel,
|
||||
) # noqa: F401
|
||||
from spiffworkflow_backend.models.secret_model import (
|
||||
SecretAllowedProcessPathModel,
|
||||
) # noqa: F401
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel # noqa: F401
|
||||
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel # noqa: F401
|
||||
from spiffworkflow_backend.models.task_event import TaskEventModel # noqa: F401
|
||||
|
|
|
@ -5,8 +5,6 @@ from flask_bpmn.models.db import db
|
|||
from flask_bpmn.models.db import SpiffworkflowBaseDBModel
|
||||
from marshmallow import Schema
|
||||
from sqlalchemy import ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
from sqlalchemy.orm import RelationshipProperty
|
||||
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
|
||||
|
@ -21,29 +19,6 @@ class SecretModel(SpiffworkflowBaseDBModel):
|
|||
value: str = db.Column(db.String(255), nullable=False)
|
||||
creator_user_id: int = db.Column(ForeignKey(UserModel.id), nullable=False)
|
||||
|
||||
allowed_processes: RelationshipProperty = relationship(
|
||||
"SecretAllowedProcessPathModel", cascade="delete"
|
||||
)
|
||||
|
||||
|
||||
@dataclass()
|
||||
class SecretAllowedProcessPathModel(SpiffworkflowBaseDBModel):
|
||||
"""Allowed processes can be Process Groups or Process Models.
|
||||
|
||||
We store the path in either case.
|
||||
"""
|
||||
|
||||
__tablename__ = "secret_allowed_process"
|
||||
__table_args__ = (
|
||||
db.UniqueConstraint(
|
||||
"secret_id", "allowed_relative_path", name="unique_secret_path"
|
||||
),
|
||||
)
|
||||
|
||||
id: int = db.Column(db.Integer, primary_key=True)
|
||||
secret_id: int = db.Column(ForeignKey(SecretModel.id), nullable=False) # type: ignore
|
||||
allowed_relative_path: str = db.Column(db.String(500), nullable=False)
|
||||
|
||||
|
||||
class SecretModelSchema(Schema):
|
||||
"""SecretModelSchema."""
|
||||
|
@ -52,14 +27,4 @@ class SecretModelSchema(Schema):
|
|||
"""Meta."""
|
||||
|
||||
model = SecretModel
|
||||
fields = ["key", "value", "creator_user_id", "allowed_processes"]
|
||||
|
||||
|
||||
class SecretAllowedProcessSchema(Schema):
|
||||
"""SecretAllowedProcessSchema."""
|
||||
|
||||
class Meta:
|
||||
"""Meta."""
|
||||
|
||||
model = SecretAllowedProcessPathModel
|
||||
fields = ["id", "secret_id", "allowed_relative_path"]
|
||||
fields = ["key", "value", "creator_user_id"]
|
||||
|
|
|
@ -48,7 +48,6 @@ from spiffworkflow_backend.models.process_instance_report import (
|
|||
)
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfoSchema
|
||||
from spiffworkflow_backend.models.secret_model import SecretAllowedProcessSchema
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModelSchema
|
||||
from spiffworkflow_backend.models.spiff_logging import SpiffLoggingModel
|
||||
|
@ -1340,57 +1339,6 @@ def delete_secret(key: str) -> Response:
|
|||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def add_allowed_process_path(body: dict) -> Response:
|
||||
"""Add allowed process path."""
|
||||
secret = SecretService().get_secret(body["secret_key"])
|
||||
assert secret # noqa: S101
|
||||
allowed_process_path = SecretService.add_allowed_process(
|
||||
secret.id, g.user.id, body["allowed_relative_path"]
|
||||
)
|
||||
return Response(
|
||||
json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
|
||||
status=201,
|
||||
mimetype="application/json",
|
||||
)
|
||||
|
||||
|
||||
def allowed_process_path_get(allowed_process_path_id: int) -> Response:
|
||||
"""Get allowed process path by id."""
|
||||
allowed_process_path = SecretService.get_secret_allowed_process(
|
||||
allowed_process_path_id
|
||||
)
|
||||
assert allowed_process_path
|
||||
return Response(
|
||||
json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
|
||||
status=200,
|
||||
mimetype="application/json",
|
||||
)
|
||||
|
||||
|
||||
def allowed_process_path_update(
|
||||
allowed_process_path_id: int, body: dict[str, Any]
|
||||
) -> Response:
|
||||
"""Update an existing allowed process path."""
|
||||
allowed_process_path = SecretService.update_allowed_process_path(
|
||||
allowed_process_path_id,
|
||||
body["secret_id"],
|
||||
body["allowed_relative_path"],
|
||||
g.user.id,
|
||||
)
|
||||
assert allowed_process_path
|
||||
return Response(
|
||||
json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
|
||||
status=200,
|
||||
mimetype="application/json",
|
||||
)
|
||||
|
||||
|
||||
def delete_allowed_process_path(allowed_process_path_id: int) -> Response:
|
||||
"""Get allowed process paths."""
|
||||
SecretService().delete_allowed_process(allowed_process_path_id, g.user.id)
|
||||
return Response(json.dumps({"ok": True}), status=200, mimetype="application/json")
|
||||
|
||||
|
||||
def _get_required_parameter_or_raise(parameter: str, post_body: dict[str, Any]) -> Any:
|
||||
"""Get_required_parameter_or_raise."""
|
||||
return_value = None
|
||||
|
|
|
@ -3,9 +3,7 @@ from typing import Optional
|
|||
|
||||
from flask_bpmn.api.api_error import ApiError
|
||||
from flask_bpmn.models.db import db
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
||||
from spiffworkflow_backend.models.secret_model import SecretAllowedProcessPathModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel
|
||||
|
||||
# from cryptography.fernet import Fernet
|
||||
|
@ -125,133 +123,3 @@ class SecretService:
|
|||
message=f"Cannot delete secret with key: {key}. Resource does not exist.",
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def add_allowed_process(
|
||||
secret_id: int, user_id: str, allowed_relative_path: str
|
||||
) -> SecretAllowedProcessPathModel:
|
||||
"""Add_allowed_process."""
|
||||
secret_model = SecretModel.query.filter(SecretModel.id == secret_id).first()
|
||||
if secret_model:
|
||||
if secret_model.creator_user_id == user_id:
|
||||
secret_process_model = SecretAllowedProcessPathModel(
|
||||
secret_id=secret_model.id,
|
||||
allowed_relative_path=allowed_relative_path,
|
||||
)
|
||||
assert secret_process_model # noqa: S101
|
||||
db.session.add(secret_process_model)
|
||||
try:
|
||||
db.session.commit()
|
||||
except IntegrityError as ie:
|
||||
db.session.rollback()
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"Error adding allowed_process with secret {secret_model.id}, "
|
||||
f"and path: {allowed_relative_path}. Resource already exists. "
|
||||
f"Original error is {ie}",
|
||||
status_code=409,
|
||||
) from ie
|
||||
except Exception as e:
|
||||
# TODO: should we call db.session.rollback() here?
|
||||
# db.session.rollback()
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"Could not create an allowed process for secret with key: {secret_model.key} "
|
||||
f"with path: {allowed_relative_path}. "
|
||||
f"Original error is {e}",
|
||||
) from e
|
||||
return secret_process_model
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"User: {user_id} cannot modify the secret with key : {secret_model.key}",
|
||||
status_code=401,
|
||||
)
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="add_allowed_process_error",
|
||||
message=f"Cannot add allowed process to secret with key: {secret_id}. Resource does not exist.",
|
||||
status_code=404,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def get_secret_allowed_process(id: str) -> SecretAllowedProcessPathModel:
|
||||
"""Get_secret_allowed_process."""
|
||||
secret_allowed_process = SecretAllowedProcessPathModel.query.filter(
|
||||
SecretAllowedProcessPathModel.id == id
|
||||
).first()
|
||||
assert secret_allowed_process
|
||||
return secret_allowed_process
|
||||
|
||||
@staticmethod
|
||||
def update_allowed_process_path(
|
||||
allowed_process_id: int,
|
||||
secret_id: int,
|
||||
allowed_relative_path: str,
|
||||
user_id: int,
|
||||
) -> SecretAllowedProcessPathModel:
|
||||
"""Update_allowed_process_path."""
|
||||
secret = SecretModel.query.filter(SecretModel.id == secret_id).first()
|
||||
if secret.creator_user_id == user_id:
|
||||
allowed_process: SecretAllowedProcessPathModel = (
|
||||
SecretAllowedProcessPathModel.query.filter(
|
||||
SecretAllowedProcessPathModel.id == allowed_process_id
|
||||
).first()
|
||||
)
|
||||
if allowed_process:
|
||||
allowed_process.allowed_relative_path = allowed_relative_path
|
||||
db.session.add(allowed_process)
|
||||
try:
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
db.session.rollback()
|
||||
message = (
|
||||
f"Could not find an allowed process with id {allowed_process_id}"
|
||||
f"Original error is {e}"
|
||||
)
|
||||
raise ApiError(code="update_allowed_process_error", message=message)
|
||||
return allowed_process
|
||||
else:
|
||||
message = (
|
||||
f"Could not find an allowed process with id: {allowed_process_id}"
|
||||
)
|
||||
raise ApiError(code="update_allowed_process_error", message=message)
|
||||
else:
|
||||
raise ApiError(
|
||||
code="update_allowed_process_error",
|
||||
message=f"User: {user_id} cannot modify the allowed processes for secret: {secret.key}",
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def delete_allowed_process(allowed_process_id: int, user_id: int) -> None:
|
||||
"""Delete_allowed_process."""
|
||||
allowed_process = SecretAllowedProcessPathModel.query.filter(
|
||||
SecretAllowedProcessPathModel.id == allowed_process_id
|
||||
).first()
|
||||
if allowed_process:
|
||||
secret = SecretModel.query.filter(
|
||||
SecretModel.id == allowed_process.secret_id
|
||||
).first()
|
||||
assert secret # noqa: S101
|
||||
if secret.creator_user_id == user_id:
|
||||
db.session.delete(allowed_process)
|
||||
try:
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
raise ApiError(
|
||||
error_code="delete_allowed_process_error",
|
||||
message=f"There was an exception deleting allowed_process: {allowed_process_id}. "
|
||||
f"Original error is: {e}",
|
||||
) from e
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="delete_allowed_process_error",
|
||||
message=f"User: {user_id} cannot delete the allowed_process with id : {allowed_process_id}",
|
||||
status_code=401,
|
||||
)
|
||||
else:
|
||||
raise ApiError(
|
||||
error_code="delete_allowed_process_error",
|
||||
message=f"Cannot delete allowed_process: {allowed_process_id}. Resource does not exist.",
|
||||
status_code=404,
|
||||
)
|
||||
|
|
|
@ -10,11 +10,9 @@ from tests.spiffworkflow_backend.helpers.base_test import BaseTest
|
|||
from werkzeug.test import TestResponse
|
||||
|
||||
from spiffworkflow_backend.models.process_model import ProcessModelInfo
|
||||
from spiffworkflow_backend.models.secret_model import SecretAllowedProcessPathModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModel
|
||||
from spiffworkflow_backend.models.secret_model import SecretModelSchema
|
||||
from spiffworkflow_backend.models.user import UserModel
|
||||
from spiffworkflow_backend.services.file_system_service import FileSystemService
|
||||
from spiffworkflow_backend.services.process_model_service import ProcessModelService
|
||||
from spiffworkflow_backend.services.secret_service import SecretService
|
||||
|
||||
|
@ -56,23 +54,6 @@ class SecretServiceTestHelpers(BaseTest):
|
|||
)
|
||||
return process_model_info
|
||||
|
||||
def add_test_secret_allowed_process(
|
||||
self, client: FlaskClient, user: UserModel
|
||||
) -> SecretAllowedProcessPathModel:
|
||||
"""Add_test_secret_allowed_process."""
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
|
||||
test_secret = self.add_test_secret(user)
|
||||
allowed_process_model = SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
return allowed_process_model
|
||||
|
||||
|
||||
class TestSecretService(SecretServiceTestHelpers):
|
||||
"""TestSecretService."""
|
||||
|
@ -191,165 +172,6 @@ class TestSecretService(SecretServiceTestHelpers):
|
|||
SecretService.delete_secret(self.test_key + "x", user.id)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
|
||||
def test_secret_add_allowed_process(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_secret_add_allowed_process."""
|
||||
user = self.find_or_create_user()
|
||||
test_secret = self.add_test_secret(user)
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
allowed_process_model = SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
|
||||
assert allowed_process_model is not None
|
||||
assert isinstance(allowed_process_model, SecretAllowedProcessPathModel)
|
||||
assert allowed_process_model.secret_id == test_secret.id
|
||||
assert (
|
||||
allowed_process_model.allowed_relative_path == process_model_relative_path
|
||||
)
|
||||
|
||||
assert len(test_secret.allowed_processes) == 1
|
||||
assert test_secret.allowed_processes[0] == allowed_process_model
|
||||
|
||||
def test_secret_add_allowed_process_same_process_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Do not allow duplicate entries for secret_id/allowed_relative_path pairs.
|
||||
|
||||
We actually take care of this in the db model with a unique constraint
|
||||
on the 2 columns.
|
||||
"""
|
||||
user = self.find_or_create_user()
|
||||
test_secret = self.add_test_secret(user)
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
assert "Resource already exists" in ae.value.message
|
||||
|
||||
def test_secret_add_allowed_process_bad_user_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_secret_add_allowed_process_bad_user."""
|
||||
user = self.find_or_create_user()
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
test_secret = self.add_test_secret(user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id,
|
||||
user_id=user.id + 1,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
assert (
|
||||
ae.value.message
|
||||
== f"User: {user.id+1} cannot modify the secret with key : {self.test_key}"
|
||||
)
|
||||
|
||||
def test_secret_add_allowed_process_bad_secret_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_secret_add_allowed_process_bad_secret_fails."""
|
||||
user = self.find_or_create_user()
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
test_secret = self.add_test_secret(user)
|
||||
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().add_allowed_process(
|
||||
secret_id=test_secret.id + 1,
|
||||
user_id=user.id,
|
||||
allowed_relative_path=process_model_relative_path,
|
||||
)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
|
||||
def test_get_secret_allowed_process(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_get_secret_allowed_process."""
|
||||
user = self.find_or_create_user()
|
||||
test_secret = self.add_test_secret(user)
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
created_allowed_process = SecretService.add_allowed_process(
|
||||
test_secret.id, user.id, process_model_relative_path
|
||||
)
|
||||
get_allowed_process = SecretService.get_secret_allowed_process(
|
||||
created_allowed_process.id
|
||||
)
|
||||
assert created_allowed_process == get_allowed_process
|
||||
|
||||
def test_secret_delete_allowed_process(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_secret_delete_allowed_process."""
|
||||
user = self.find_or_create_user()
|
||||
allowed_process_model = self.add_test_secret_allowed_process(client, user)
|
||||
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
|
||||
SecretService().delete_allowed_process(allowed_process_model.id, user.id)
|
||||
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 0
|
||||
|
||||
def test_secret_delete_allowed_process_bad_user_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_secret_delete_allowed_process_bad_user_fails."""
|
||||
user = self.find_or_create_user()
|
||||
allowed_process_model = self.add_test_secret_allowed_process(client, user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().delete_allowed_process(
|
||||
allowed_process_model.id, user.id + 1
|
||||
)
|
||||
message = ae.value.message
|
||||
assert (
|
||||
f"User: {user.id+1} cannot delete the allowed_process with id : {allowed_process_model.id}"
|
||||
in message
|
||||
)
|
||||
|
||||
def test_secret_delete_allowed_process_bad_allowed_process_fails(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_secret_delete_allowed_process_bad_allowed_process_fails."""
|
||||
user = self.find_or_create_user()
|
||||
allowed_process_model = self.add_test_secret_allowed_process(client, user)
|
||||
with pytest.raises(ApiError) as ae:
|
||||
SecretService().delete_allowed_process(
|
||||
allowed_process_model.id + 1, user.id
|
||||
)
|
||||
assert "Resource does not exist" in ae.value.message
|
||||
|
||||
|
||||
class TestSecretServiceApi(SecretServiceTestHelpers):
|
||||
"""TestSecretServiceApi."""
|
||||
|
@ -459,83 +281,3 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
|||
headers=self.logged_in_headers(user),
|
||||
)
|
||||
assert secret_response.status_code == 404
|
||||
|
||||
def test_add_secret_allowed_process(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test add secret allowed process."""
|
||||
user = self.find_or_create_user()
|
||||
test_secret = self.add_test_secret(user)
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
data = {
|
||||
"secret_key": test_secret.key,
|
||||
"allowed_relative_path": process_model_relative_path,
|
||||
}
|
||||
response: TestResponse = client.post(
|
||||
"/v1.0/secrets/allowed_process_paths",
|
||||
headers=self.logged_in_headers(user),
|
||||
content_type="application/json",
|
||||
data=json.dumps(data),
|
||||
)
|
||||
assert response.status_code == 201
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
assert allowed_processes[0].allowed_relative_path == process_model_relative_path
|
||||
assert allowed_processes[0].secret_id == test_secret.id
|
||||
|
||||
def test_get_secret_allowed_process(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test_get_secret_allowed_process."""
|
||||
user = self.find_or_create_user()
|
||||
test_secret = self.add_test_secret(user)
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
created_allowed_process = SecretService.add_allowed_process(
|
||||
test_secret.id, user.id, process_model_relative_path
|
||||
)
|
||||
# allowed_process = SecretService.get_secret_allowed_process(created_allowed_process.id)
|
||||
response = client.get(
|
||||
f"/v1.0/secrets/allowed_process_paths/{created_allowed_process.id}",
|
||||
headers=self.logged_in_headers(user),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
response_allowed_process = response.json
|
||||
assert response_allowed_process["id"] == created_allowed_process.id
|
||||
assert (
|
||||
response_allowed_process["secret_id"] == created_allowed_process.secret_id
|
||||
)
|
||||
assert (
|
||||
response_allowed_process["allowed_relative_path"]
|
||||
== created_allowed_process.allowed_relative_path
|
||||
)
|
||||
|
||||
def test_delete_secret_allowed_process(
|
||||
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||
) -> None:
|
||||
"""Test delete secret allowed process."""
|
||||
user = self.find_or_create_user()
|
||||
test_secret = self.add_test_secret(user)
|
||||
process_model_info = self.add_test_process(client, user)
|
||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||
process_model_info
|
||||
)
|
||||
allowed_process = SecretService.add_allowed_process(
|
||||
test_secret.id, user.id, process_model_relative_path
|
||||
)
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 1
|
||||
assert allowed_processes[0].secret_id == test_secret.id
|
||||
assert allowed_processes[0].allowed_relative_path == process_model_relative_path
|
||||
response = client.delete(
|
||||
f"/v1.0/secrets/allowed_process_paths/{allowed_process.id}",
|
||||
headers=self.logged_in_headers(user),
|
||||
)
|
||||
assert response.status_code == 200
|
||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||
assert len(allowed_processes) == 0
|
||||
|
|
Loading…
Reference in New Issue