add get and update for allowed process paths

This commit is contained in:
mike cullerton 2022-10-05 18:06:33 -04:00
parent 35dd02d1ce
commit 453c4b1e24
5 changed files with 131 additions and 2 deletions

View File

@ -1226,6 +1226,35 @@ paths:
description: The id of the allowed process path to delete
schema:
type: integer
get:
operationId: spiffworkflow_backend.routes.process_api_blueprint.allowed_process_path_get
summary: Get an existing allowed process for a secret, by id
tags:
- Secrets
responses:
"200":
description: Return a secret allowed process
content:
application/json:
schema:
$ref: "#/components/schemas/SecretAllowedProcessPath"
put:
operationId: spiffworkflow_backend.routes.process_api_blueprint.allowed_process_path_update
summary: Update an existing allowed process for a secret
tags:
- Secrets
requestBody:
content:
application/json:
schema:
$ref: "#/components/schemas/SecretAllowedProcessPath"
responses:
"200":
description: Secret allowed process updated successfully
content:
application/json:
schema:
$ref: "#/components/schemas/SecretAllowedProcessPath"
delete:
operationId: spiffworkflow_backend.routes.process_api_blueprint.delete_allowed_process_path
summary: Delete an existing allowed process for a secret

View File

@ -62,4 +62,5 @@ class SecretAllowedProcessSchema(Schema):
"""Meta."""
model = SecretAllowedProcessPathModel
fields = ["secret_id", "allowed_relative_path"]
fields = ["id", "secret_id", "allowed_relative_path"]

View File

@ -1299,7 +1299,7 @@ def delete_secret(key: str) -> Response:
def add_allowed_process_path(body: dict) -> Response:
"""Get allowed process paths."""
"""Add allowed process path."""
secret = SecretService().get_secret(body["secret_key"])
assert secret # noqa: S101
allowed_process_path = SecretService.add_allowed_process(
@ -1312,6 +1312,33 @@ def add_allowed_process_path(body: dict) -> Response:
)
def allowed_process_path_get(allowed_process_path_id: int) -> Response:
"""Get allowed process path by id"""
allowed_process_path = SecretService.get_secret_allowed_process(allowed_process_path_id)
assert allowed_process_path
return Response(
json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
status=200,
mimetype="application/json"
)
def allowed_process_path_update(allowed_process_path_id: int, body: dict[str, Any]) -> Response:
"""Update an existing allowed process path"""
allowed_process_path = SecretService.update_allowed_process_path(
allowed_process_path_id,
body['secret_id'],
body['allowed_relative_path'],
g.user.id
)
assert allowed_process_path
return Response(
json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
status=200,
mimetype="application/json"
)
def delete_allowed_process_path(allowed_process_path_id: int) -> Response:
"""Get allowed process paths."""
SecretService().delete_allowed_process(allowed_process_path_id, g.user.id)

View File

@ -174,6 +174,44 @@ class SecretService:
status_code=404,
)
@staticmethod
def get_secret_allowed_process(id: str) -> SecretAllowedProcessPathModel:
secret_allowed_process = SecretAllowedProcessPathModel.\
query.\
filter(SecretAllowedProcessPathModel.id == id).\
first()
assert secret_allowed_process
return secret_allowed_process
@staticmethod
def update_allowed_process_path(
allowed_process_id: int, secret_id: int, allowed_relative_path: str, user_id: int
) -> SecretAllowedProcessPathModel:
secret = SecretModel.query.filter(SecretModel.id == secret_id).first()
if secret.creator_user_id == user_id:
allowed_process: SecretAllowedProcessPathModel = SecretAllowedProcessPathModel.query.filter(
SecretAllowedProcessPathModel.id == allowed_process_id
).first()
if allowed_process:
allowed_process.allowed_relative_path = allowed_relative_path
db.session.add(allowed_process)
try:
db.session.commit()
except Exception as e:
db.session.rollback()
message = f"Could not find an allowed process with id {allowed_process_id}"\
f"Original error is {e}"
raise ApiError(code='update_allowed_process_error',
message=message)
return allowed_process
else:
message = f"Could not find an allowed process with id: {allowed_process_id}"
raise ApiError(code='update_allowed_process_error',
message=message)
else:
raise ApiError(code='update_allowed_process_error',
message=f"User: {user_id} cannot modify the allowed processes for secret: {secret.key}")
@staticmethod
def delete_allowed_process(allowed_process_id: int, user_id: int) -> None:
"""Delete_allowed_process."""

View File

@ -289,6 +289,19 @@ class TestSecretService(SecretServiceTestHelpers):
)
assert "Resource does not exist" in ae.value.message
def test_get_secret_allowed_process(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
) -> None:
user = self.find_or_create_user()
test_secret = self.add_test_secret(user)
process_model_info = self.add_test_process(client, user)
process_model_relative_path = FileSystemService.process_model_relative_path(
process_model_info
)
created_allowed_process = SecretService.add_allowed_process(test_secret.id, user.id, process_model_relative_path)
get_allowed_process = SecretService.get_secret_allowed_process(created_allowed_process.id)
assert created_allowed_process == get_allowed_process
def test_secret_delete_allowed_process(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
) -> None:
@ -468,6 +481,27 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
assert allowed_processes[0].allowed_relative_path == process_model_relative_path
assert allowed_processes[0].secret_id == test_secret.id
def test_get_secret_allowed_process(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
) -> None:
user = self.find_or_create_user()
test_secret = self.add_test_secret(user)
process_model_info = self.add_test_process(client, user)
process_model_relative_path = FileSystemService.process_model_relative_path(
process_model_info
)
created_allowed_process = SecretService.add_allowed_process(test_secret.id, user.id, process_model_relative_path)
# allowed_process = SecretService.get_secret_allowed_process(created_allowed_process.id)
response = client.get(
f"/v1.0/secrets/allowed_process_paths/{created_allowed_process.id}",
headers=self.logged_in_headers(user),
)
assert response.status_code == 200
response_allowed_process = response.json
assert response_allowed_process['id'] == created_allowed_process.id
assert response_allowed_process['secret_id'] == created_allowed_process.secret_id
assert response_allowed_process['allowed_relative_path'] == created_allowed_process.allowed_relative_path
def test_delete_secret_allowed_process(
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
) -> None: