Encrypt/decrypt secrets

This commit is contained in:
Jon Herron 2023-03-06 19:07:08 -05:00
parent f994c2f5d1
commit 6dfb096a5e
2 changed files with 29 additions and 22 deletions

View File

@ -13,6 +13,7 @@ from apscheduler.schedulers.base import BaseScheduler # type: ignore
from flask.json.provider import DefaultJSONProvider from flask.json.provider import DefaultJSONProvider
from flask_cors import CORS # type: ignore from flask_cors import CORS # type: ignore
from flask_mail import Mail # type: ignore from flask_mail import Mail # type: ignore
from flask_simple_crypt import SimpleCrypt
from werkzeug.exceptions import NotFound from werkzeug.exceptions import NotFound
import spiffworkflow_backend.load_database_models # noqa: F401 import spiffworkflow_backend.load_database_models # noqa: F401
@ -133,6 +134,11 @@ def create_app() -> flask.app.Flask:
configure_sentry(app) configure_sentry(app)
cipher = SimpleCrypt()
app.config['FSC_EXPANSION_COUNT'] = 2048
cipher.init_app(app)
app.config['CIPHER'] = cipher
app.before_request(verify_token) app.before_request(verify_token)
app.before_request(AuthorizationService.check_for_permission) app.before_request(AuthorizationService.check_for_permission)
app.after_request(set_new_access_token_in_cookie) app.after_request(set_new_access_token_in_cookie)

View File

@ -1,41 +1,39 @@
"""Secret_service.""" """Secret_service."""
from typing import Optional from typing import Optional
from flask import current_app
from flask_simple_crypt import SimpleCrypt
from spiffworkflow_backend.exceptions.api_error import ApiError from spiffworkflow_backend.exceptions.api_error import ApiError
from spiffworkflow_backend.models.db import db from spiffworkflow_backend.models.db import db
from spiffworkflow_backend.models.secret_model import SecretModel from spiffworkflow_backend.models.secret_model import SecretModel
# from cryptography.fernet import Fernet
#
#
# class EncryptionService:
# key = Fernet.generate_key() # this is your "password"
# cipher_suite = Fernet(key)
# encoded_text = cipher_suite.encrypt(b"Hello stackoverflow!")
# decoded_text = cipher_suite.decrypt(encoded_text)
class SecretService: class SecretService:
"""SecretService.""" """SecretService."""
# def encrypt_key(self, plain_key: str) -> str: CIPHER_ENCODING = "ascii"
# """Encrypt_key."""
# # flask_secret = current_app.secret_key
# # print("encrypt_key")
# ...
# def decrypt_key(self, encrypted_key: str) -> str: @classmethod
# """Decrypt key.""" def _encrypt(cls, value: str) -> str:
# ... encrypted_bytes = current_app.config["CIPHER"].encrypt(value)
return encrypted_bytes.decode(cls.CIPHER_ENCODING)
@staticmethod @classmethod
def _decrypt(cls, value: str) -> str:
bytes_to_decrypt = bytes(value, cls.CIPHER_ENCODING)
decrypted_bytes = current_app.config["CIPHER"].decrypt(bytes_to_decrypt)
return decrypted_bytes.decode(cls.CIPHER_ENCODING)
@classmethod
def add_secret( def add_secret(
cls,
key: str, key: str,
value: str, value: str,
user_id: int, user_id: int,
) -> SecretModel: ) -> SecretModel:
"""Add_secret.""" """Add_secret."""
# encrypted_key = self.encrypt_key(key) value = cls._encrypt(value)
secret_model = SecretModel(key=key, value=value, user_id=user_id) secret_model = SecretModel(key=key, value=value, user_id=user_id)
db.session.add(secret_model) db.session.add(secret_model)
try: try:
@ -50,11 +48,12 @@ class SecretService:
) from e ) from e
return secret_model return secret_model
@staticmethod @classmethod
def get_secret(key: str) -> SecretModel: def get_secret(cls, key: str) -> SecretModel:
"""Get_secret.""" """Get_secret."""
secret = db.session.query(SecretModel).filter(SecretModel.key == key).first() secret = db.session.query(SecretModel).filter(SecretModel.key == key).first()
if isinstance(secret, SecretModel): if isinstance(secret, SecretModel):
secret.value = cls._decrypt(secret.value)
return secret return secret
else: else:
raise ApiError( raise ApiError(
@ -62,14 +61,16 @@ class SecretService:
message=f"Unable to locate a secret with the name: {key}. ", message=f"Unable to locate a secret with the name: {key}. ",
) )
@staticmethod @classmethod
def update_secret( def update_secret(
cls,
key: str, key: str,
value: str, value: str,
user_id: Optional[int] = None, user_id: Optional[int] = None,
create_if_not_exists: Optional[bool] = False, create_if_not_exists: Optional[bool] = False,
) -> None: ) -> None:
"""Does this pass pre commit?""" """Does this pass pre commit?"""
value = cls._encrypt(value)
secret_model = SecretModel.query.filter(SecretModel.key == key).first() secret_model = SecretModel.query.filter(SecretModel.key == key).first()
if secret_model: if secret_model:
secret_model.value = value secret_model.value = value