Encrypt/decrypt secrets

This commit is contained in:
Jon Herron 2023-03-06 19:07:08 -05:00
parent 41196c4fdf
commit 4851b09d39
2 changed files with 29 additions and 22 deletions

View File

@ -13,6 +13,7 @@ from apscheduler.schedulers.base import BaseScheduler # type: ignore
from flask.json.provider import DefaultJSONProvider
from flask_cors import CORS # type: ignore
from flask_mail import Mail # type: ignore
from flask_simple_crypt import SimpleCrypt
from werkzeug.exceptions import NotFound
import spiffworkflow_backend.load_database_models # noqa: F401
@ -133,6 +134,11 @@ def create_app() -> flask.app.Flask:
configure_sentry(app)
cipher = SimpleCrypt()
app.config['FSC_EXPANSION_COUNT'] = 2048
cipher.init_app(app)
app.config['CIPHER'] = cipher
app.before_request(verify_token)
app.before_request(AuthorizationService.check_for_permission)
app.after_request(set_new_access_token_in_cookie)

View File

@ -1,41 +1,39 @@
"""Secret_service."""
from typing import Optional
from flask import current_app
from flask_simple_crypt import SimpleCrypt
from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.secret_model import SecretModel
# from cryptography.fernet import Fernet
#
#
# class EncryptionService:
# key = Fernet.generate_key() # this is your "password"
# cipher_suite = Fernet(key)
# encoded_text = cipher_suite.encrypt(b"Hello stackoverflow!")
# decoded_text = cipher_suite.decrypt(encoded_text)
class SecretService:
"""SecretService."""
# def encrypt_key(self, plain_key: str) -> str:
# """Encrypt_key."""
# # flask_secret = current_app.secret_key
# # print("encrypt_key")
# ...
CIPHER_ENCODING = "ascii"
# def decrypt_key(self, encrypted_key: str) -> str:
# """Decrypt key."""
# ...
@classmethod
def _encrypt(cls, value: str) -> str:
encrypted_bytes = current_app.config["CIPHER"].encrypt(value)
return encrypted_bytes.decode(cls.CIPHER_ENCODING)
@staticmethod
@classmethod
def _decrypt(cls, value: str) -> str:
bytes_to_decrypt = bytes(value, cls.CIPHER_ENCODING)
decrypted_bytes = current_app.config["CIPHER"].decrypt(bytes_to_decrypt)
return decrypted_bytes.decode(cls.CIPHER_ENCODING)
@classmethod
def add_secret(
cls,
key: str,
value: str,
user_id: int,
) -> SecretModel:
"""Add_secret."""
# encrypted_key = self.encrypt_key(key)
value = cls._encrypt(value)
secret_model = SecretModel(key=key, value=value, user_id=user_id)
db.session.add(secret_model)
try:
@ -50,11 +48,12 @@ class SecretService:
) from e
return secret_model
@staticmethod
def get_secret(key: str) -> SecretModel:
@classmethod
def get_secret(cls, key: str) -> SecretModel:
"""Get_secret."""
secret = db.session.query(SecretModel).filter(SecretModel.key == key).first()
if isinstance(secret, SecretModel):
secret.value = cls._decrypt(secret.value)
return secret
else:
raise ApiError(
@ -62,14 +61,16 @@ class SecretService:
message=f"Unable to locate a secret with the name: {key}. ",
)
@staticmethod
@classmethod
def update_secret(
cls,
key: str,
value: str,
user_id: Optional[int] = None,
create_if_not_exists: Optional[bool] = False,
) -> None:
"""Does this pass pre commit?"""
value = cls._encrypt(value)
secret_model = SecretModel.query.filter(SecretModel.key == key).first()
if secret_model:
secret_model.value = value