feature/support-google-oauth (#1125)
* support decrypting and validating jwt tokens from google auth w/ burnettk * moved code as suggested by coderabbit --------- Co-authored-by: jasquat <jasquat@users.noreply.github.com>
This commit is contained in:
parent
840dd74cea
commit
4cf70a8e9b
|
@ -9,6 +9,7 @@ from hmac import compare_digest
|
||||||
from typing import Any
|
from typing import Any
|
||||||
|
|
||||||
from cryptography.hazmat.backends import default_backend
|
from cryptography.hazmat.backends import default_backend
|
||||||
|
from cryptography.hazmat.primitives.asymmetric import rsa
|
||||||
from cryptography.x509 import load_der_x509_certificate
|
from cryptography.x509 import load_der_x509_certificate
|
||||||
|
|
||||||
from spiffworkflow_backend.models.user import SPIFF_GENERATED_JWT_ALGORITHM
|
from spiffworkflow_backend.models.user import SPIFF_GENERATED_JWT_ALGORITHM
|
||||||
|
@ -149,6 +150,31 @@ class AuthenticationService:
|
||||||
json_key_configs: dict = next(jk for jk in jwks_configs["keys"] if jk["kid"] == key_id)
|
json_key_configs: dict = next(jk for jk in jwks_configs["keys"] if jk["kid"] == key_id)
|
||||||
return json_key_configs
|
return json_key_configs
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def public_key_from_rsa_public_numbers(cls, json_key_configs: dict) -> Any:
|
||||||
|
modulus = base64.urlsafe_b64decode(json_key_configs["n"] + "===")
|
||||||
|
exponent = base64.urlsafe_b64decode(json_key_configs["e"] + "===")
|
||||||
|
public_key_numbers = rsa.RSAPublicNumbers(
|
||||||
|
int.from_bytes(exponent, byteorder="big"), int.from_bytes(modulus, byteorder="big")
|
||||||
|
)
|
||||||
|
return public_key_numbers.public_key(backend=default_backend())
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def public_key_from_x5c(cls, key_id: str, json_key_configs: dict) -> Any:
|
||||||
|
x5c = json_key_configs["x5c"][0]
|
||||||
|
decoded_certificate = base64.b64decode(x5c)
|
||||||
|
|
||||||
|
# our backend-based openid provider implementation (which you should never use in prod)
|
||||||
|
# uses a public/private key pair. we played around with adding an x509 cert so we could
|
||||||
|
# follow the exact same mechanism for getting the public key that we use for keycloak,
|
||||||
|
# but using an x509 cert for no reason seemed a little overboard for this toy-openid use case,
|
||||||
|
# when we already have the public key that can work hardcoded in our config.
|
||||||
|
if key_id == SPIFF_OPEN_ID_KEY_ID:
|
||||||
|
return decoded_certificate
|
||||||
|
else:
|
||||||
|
x509_cert = load_der_x509_certificate(decoded_certificate, default_backend())
|
||||||
|
return x509_cert.public_key()
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def parse_jwt_token(cls, authentication_identifier: str, token: str) -> dict:
|
def parse_jwt_token(cls, authentication_identifier: str, token: str) -> dict:
|
||||||
header = jwt.get_unverified_header(token)
|
header = jwt.get_unverified_header(token)
|
||||||
|
@ -164,22 +190,14 @@ class AuthenticationService:
|
||||||
options={"verify_exp": False},
|
options={"verify_exp": False},
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
json_key_configs = cls.jwks_public_key_for_key_id(authentication_identifier, key_id)
|
|
||||||
x5c = json_key_configs["x5c"][0]
|
|
||||||
algorithm = str(header.get("alg"))
|
algorithm = str(header.get("alg"))
|
||||||
decoded_certificate = base64.b64decode(x5c)
|
json_key_configs = cls.jwks_public_key_for_key_id(authentication_identifier, key_id)
|
||||||
|
|
||||||
# our backend-based openid provider implementation (which you should never use in prod)
|
|
||||||
# uses a public/private key pair. we played around with adding an x509 cert so we could
|
|
||||||
# follow the exact same mechanism for getting the public key that we use for keycloak,
|
|
||||||
# but using an x509 cert for no reason seemed a little overboard for this toy-openid use case,
|
|
||||||
# when we already have the public key that can work hardcoded in our config.
|
|
||||||
public_key: Any = None
|
public_key: Any = None
|
||||||
if key_id == SPIFF_OPEN_ID_KEY_ID:
|
|
||||||
public_key = decoded_certificate
|
if "x5c" not in json_key_configs:
|
||||||
|
public_key = cls.public_key_from_rsa_public_numbers(json_key_configs)
|
||||||
else:
|
else:
|
||||||
x509_cert = load_der_x509_certificate(decoded_certificate, default_backend())
|
public_key = cls.public_key_from_x5c(key_id, json_key_configs)
|
||||||
public_key = x509_cert.public_key()
|
|
||||||
|
|
||||||
# tokens generated from the cli have an aud like: [ "realm-management", "account" ]
|
# tokens generated from the cli have an aud like: [ "realm-management", "account" ]
|
||||||
# while tokens generated from frontend have an aud like: "spiffworkflow-backend."
|
# while tokens generated from frontend have an aud like: "spiffworkflow-backend."
|
||||||
|
|
Loading…
Reference in New Issue