mirror of
https://github.com/sartography/spiffworkflow-backend.git
synced 2025-02-24 13:28:31 +00:00
Pre commit
This commit is contained in:
parent
8a34e9e89d
commit
de512fb373
@ -49,6 +49,7 @@ class SecretAllowedProcessPathModel(SpiffworkflowBaseDBModel):
|
|||||||
|
|
||||||
|
|
||||||
class SecretAllowedProcessSchema(Schema):
|
class SecretAllowedProcessSchema(Schema):
|
||||||
|
"""SecretAllowedProcessSchema."""
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
"""Meta."""
|
"""Meta."""
|
||||||
|
@ -1108,7 +1108,7 @@ def add_secret(body: Dict) -> Response:
|
|||||||
|
|
||||||
def update_secret(key: str, body: dict) -> None:
|
def update_secret(key: str, body: dict) -> None:
|
||||||
"""Update secret."""
|
"""Update secret."""
|
||||||
SecretService().update_secret(key, body['value'], body['creator_user_id'])
|
SecretService().update_secret(key, body["value"], body["creator_user_id"])
|
||||||
|
|
||||||
|
|
||||||
def delete_secret(key: str) -> None:
|
def delete_secret(key: str) -> None:
|
||||||
@ -1120,10 +1120,13 @@ def delete_secret(key: str) -> None:
|
|||||||
def add_allowed_process_path(body: dict) -> Any:
|
def add_allowed_process_path(body: dict) -> Any:
|
||||||
"""Get allowed process paths."""
|
"""Get allowed process paths."""
|
||||||
allowed_process_path = SecretService.add_allowed_process(
|
allowed_process_path = SecretService.add_allowed_process(
|
||||||
body['secret_id'], g.user.id, body["allowed_relative_path"]
|
body["secret_id"], g.user.id, body["allowed_relative_path"]
|
||||||
|
)
|
||||||
|
return Response(
|
||||||
|
json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
|
||||||
|
status=201,
|
||||||
|
mimetype="application/json",
|
||||||
)
|
)
|
||||||
return Response(json.dumps(SecretAllowedProcessSchema().dump(allowed_process_path)),
|
|
||||||
status=201, mimetype="application/json")
|
|
||||||
|
|
||||||
|
|
||||||
def delete_allowed_process_path(allowed_process_path_id: int) -> Any:
|
def delete_allowed_process_path(allowed_process_path_id: int) -> Any:
|
||||||
|
@ -184,8 +184,8 @@ def encode_auth_token(sub: str, token_type: Optional[str] = None) -> str:
|
|||||||
"""
|
"""
|
||||||
payload = {"sub": sub}
|
payload = {"sub": sub}
|
||||||
if token_type is None:
|
if token_type is None:
|
||||||
token_type = 'internal'
|
token_type = "internal" # noqa: S105
|
||||||
payload['token_type'] = token_type
|
payload["token_type"] = token_type
|
||||||
if "SECRET_KEY" in current_app.config:
|
if "SECRET_KEY" in current_app.config:
|
||||||
secret_key = current_app.config.get("SECRET_KEY")
|
secret_key = current_app.config.get("SECRET_KEY")
|
||||||
else:
|
else:
|
||||||
@ -326,10 +326,12 @@ def get_user_from_decoded_internal_token(decoded_token: dict) -> Optional[UserMo
|
|||||||
# user: UserModel = UserModel.query.filter()
|
# user: UserModel = UserModel.query.filter()
|
||||||
if user:
|
if user:
|
||||||
return user
|
return user
|
||||||
user = UserModel(username=service_id,
|
user = UserModel(
|
||||||
|
username=service_id,
|
||||||
uid=service_id,
|
uid=service_id,
|
||||||
service=service,
|
service=service,
|
||||||
service_id=service_id,
|
service_id=service_id,
|
||||||
name="API User")
|
name="API User",
|
||||||
|
)
|
||||||
|
|
||||||
return user
|
return user
|
||||||
|
@ -132,7 +132,9 @@ def setup_logger(app: Flask) -> None:
|
|||||||
|
|
||||||
spiff_logger_filehandler = None
|
spiff_logger_filehandler = None
|
||||||
if app.config["SPIFFWORKFLOW_BACKEND_LOG_TO_FILE"]:
|
if app.config["SPIFFWORKFLOW_BACKEND_LOG_TO_FILE"]:
|
||||||
spiff_logger_filehandler = logging.FileHandler(f"{app.root_path}/../../log/{app.env}.log")
|
spiff_logger_filehandler = logging.FileHandler(
|
||||||
|
f"{app.root_path}/../../log/{app.env}.log"
|
||||||
|
)
|
||||||
spiff_logger_filehandler.setLevel(spiff_log_level)
|
spiff_logger_filehandler.setLevel(spiff_log_level)
|
||||||
spiff_logger_filehandler.setFormatter(log_formatter)
|
spiff_logger_filehandler.setFormatter(log_formatter)
|
||||||
|
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
"""Secret_service."""
|
"""Secret_service."""
|
||||||
import logging
|
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from flask import current_app
|
|
||||||
from flask_bpmn.api.api_error import ApiError
|
from flask_bpmn.api.api_error import ApiError
|
||||||
from flask_bpmn.models.db import db
|
from flask_bpmn.models.db import db
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
@ -181,7 +179,7 @@ class SecretService:
|
|||||||
secret = SecretModel.query.filter(
|
secret = SecretModel.query.filter(
|
||||||
SecretModel.id == allowed_process.secret_id
|
SecretModel.id == allowed_process.secret_id
|
||||||
).first()
|
).first()
|
||||||
assert secret
|
assert secret # noqa: S101
|
||||||
if secret.creator_user_id == user_id:
|
if secret.creator_user_id == user_id:
|
||||||
db.session.delete(allowed_process)
|
db.session.delete(allowed_process)
|
||||||
try:
|
try:
|
||||||
|
@ -374,14 +374,17 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
|||||||
assert secret_response.status_code == 200
|
assert secret_response.status_code == 200
|
||||||
assert secret_response.json == self.test_value
|
assert secret_response.json == self.test_value
|
||||||
|
|
||||||
def test_update_secret(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None) -> None:
|
def test_update_secret(
|
||||||
|
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||||
|
) -> None:
|
||||||
|
"""Test_update_secret."""
|
||||||
user = self.find_or_create_user()
|
user = self.find_or_create_user()
|
||||||
self.add_test_secret(user)
|
self.add_test_secret(user)
|
||||||
secret = SecretService.get_secret(self.test_key)
|
secret = SecretService.get_secret(self.test_key)
|
||||||
assert secret == self.test_value
|
assert secret == self.test_value
|
||||||
secret_model = SecretModel(key=self.test_key,
|
secret_model = SecretModel(
|
||||||
value="new_secret_value",
|
key=self.test_key, value="new_secret_value", creator_user_id=user.id
|
||||||
creator_user_id=user.id)
|
)
|
||||||
response = client.put(
|
response = client.put(
|
||||||
f"/v1.0/secrets/{self.test_key}",
|
f"/v1.0/secrets/{self.test_key}",
|
||||||
headers=self.logged_in_headers(user),
|
headers=self.logged_in_headers(user),
|
||||||
@ -390,7 +393,9 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
|||||||
)
|
)
|
||||||
assert response.status_code == 204
|
assert response.status_code == 204
|
||||||
|
|
||||||
secret_model = SecretModel.query.filter(SecretModel.key == self.test_key).first()
|
secret_model = SecretModel.query.filter(
|
||||||
|
SecretModel.key == self.test_key
|
||||||
|
).first()
|
||||||
assert secret_model.value == "new_secret_value"
|
assert secret_model.value == "new_secret_value"
|
||||||
|
|
||||||
def test_delete_secret(
|
def test_delete_secret(
|
||||||
@ -435,22 +440,25 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
|||||||
assert secret_response.status_code == 404
|
assert secret_response.status_code == 404
|
||||||
print("test_delete_secret_bad_key")
|
print("test_delete_secret_bad_key")
|
||||||
|
|
||||||
def test_add_secret_allowed_process(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None) -> None:
|
def test_add_secret_allowed_process(
|
||||||
"""Test add secret allowed process"""
|
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||||
|
) -> None:
|
||||||
|
"""Test add secret allowed process."""
|
||||||
user = self.find_or_create_user()
|
user = self.find_or_create_user()
|
||||||
test_secret = self.add_test_secret(user)
|
test_secret = self.add_test_secret(user)
|
||||||
process_model_info = self.add_test_process(client, user)
|
process_model_info = self.add_test_process(client, user)
|
||||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||||
process_model_info
|
process_model_info
|
||||||
)
|
)
|
||||||
data = {"secret_id": test_secret.id,
|
data = {
|
||||||
"allowed_relative_path": process_model_relative_path
|
"secret_id": test_secret.id,
|
||||||
|
"allowed_relative_path": process_model_relative_path,
|
||||||
}
|
}
|
||||||
response: TestResponse = client.post(
|
response: TestResponse = client.post(
|
||||||
"/v1.0/secrets/allowed_process_paths",
|
"/v1.0/secrets/allowed_process_paths",
|
||||||
headers=self.logged_in_headers(user),
|
headers=self.logged_in_headers(user),
|
||||||
content_type='application/json',
|
content_type="application/json",
|
||||||
data=json.dumps(data)
|
data=json.dumps(data),
|
||||||
)
|
)
|
||||||
assert response.status_code == 201
|
assert response.status_code == 201
|
||||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||||
@ -458,15 +466,19 @@ class TestSecretServiceApi(SecretServiceTestHelpers):
|
|||||||
assert allowed_processes[0].allowed_relative_path == process_model_relative_path
|
assert allowed_processes[0].allowed_relative_path == process_model_relative_path
|
||||||
assert allowed_processes[0].secret_id == test_secret.id
|
assert allowed_processes[0].secret_id == test_secret.id
|
||||||
|
|
||||||
def test_delete_secret_allowed_process(self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None) -> None:
|
def test_delete_secret_allowed_process(
|
||||||
"""Test delete secret allowed process"""
|
self, app: Flask, client: FlaskClient, with_db_and_bpmn_file_cleanup: None
|
||||||
|
) -> None:
|
||||||
|
"""Test delete secret allowed process."""
|
||||||
user = self.find_or_create_user()
|
user = self.find_or_create_user()
|
||||||
test_secret = self.add_test_secret(user)
|
test_secret = self.add_test_secret(user)
|
||||||
process_model_info = self.add_test_process(client, user)
|
process_model_info = self.add_test_process(client, user)
|
||||||
process_model_relative_path = FileSystemService.process_model_relative_path(
|
process_model_relative_path = FileSystemService.process_model_relative_path(
|
||||||
process_model_info
|
process_model_info
|
||||||
)
|
)
|
||||||
allowed_process = SecretService.add_allowed_process(test_secret.id, user.id, process_model_relative_path)
|
allowed_process = SecretService.add_allowed_process(
|
||||||
|
test_secret.id, user.id, process_model_relative_path
|
||||||
|
)
|
||||||
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
allowed_processes = SecretAllowedProcessPathModel.query.all()
|
||||||
assert len(allowed_processes) == 1
|
assert len(allowed_processes) == 1
|
||||||
assert allowed_processes[0].secret_id == test_secret.id
|
assert allowed_processes[0].secret_id == test_secret.id
|
||||||
|
Loading…
x
Reference in New Issue
Block a user