diff --git a/app_utils/security_utils.py b/app_utils/security_utils.py index 315155a02db93129c9359b441031a6232e08e218..79bdd75ed9e4390966a2be6a2c46c61f07ce40d2 100644 --- a/app_utils/security_utils.py +++ b/app_utils/security_utils.py @@ -1,48 +1,72 @@ # SPDX-License-Identifier: Apache-2.0 -# api_server.py - -from fastapi import FastAPI, HTTPException, Request, status, Security, Depends -from fastapi.security import OAuth2AuthorizationCodeBearer -from keycloak import KeycloakOpenID - -#from app_utils import catalogue_utils, xml_utils, editor_utils +#security_utils.py from config import * +from app_utils import logger +from keycloak import KeycloakOpenID +from fastapi.security import OAuth2AuthorizationCodeBearer +from fastapi import HTTPException, Request, status, Depends def create_oauth2_scheme(): oauth2_scheme = OAuth2AuthorizationCodeBearer( - authorizationUrl="https://sso.example.com/auth/", - tokenUrl="https://sso.example.com/auth/realms/example-realm/protocol/openid-connect/token") + authorizationUrl="https://sso.example.com/auth/", + tokenUrl="https://sso.example.com/auth/realms/example-realm/protocol/openid-connect/token") return oauth2_scheme + def configure_keycloak_client(): - #TODO: move to config file + # DONE: move to config file # Configure keycloak info - keycloak_openid = KeycloakOpenID(server_url = KEYCLOAK_URL, - client_id= KEYCLOAK_CLIENT_ID, - realm_name= KEYCLOAK_REALM, - verify = True, - client_secret_key= KEYCLOAK_CLIENT_SECRET) + keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_URL, + client_id=KEYCLOAK_CLIENT_ID, + realm_name=KEYCLOAK_REALM, + verify=True, + client_secret_key=KEYCLOAK_CLIENT_SECRET) + return keycloak_openid -def get_access_token_from_request(req:Request): +def get_access_token_from_request(req: Request): """Get token from request header Args: - req (Request): request + req (Request): request Returns: access_token (str): string containing the access token """ try: bearer_token = req.headers["Authorization"] - token = bearer_token.replace('Bearer ','') + token = bearer_token.replace('Bearer ', '') return token except Exception as ex: - raise HTTPException(status_code=400, detail="Cannot retrieve access token from header. Original exception type: {0}".format(type(ex).__name__)) + logger.create_log(logger.ERROR, None, optional_message=ex) + raise HTTPException(status_code=400, + detail="Cannot retrieve access token from header. Original exception type: {0}".format( + type(ex).__name__)) + + +def get_access_token_from_keycloak_with_ccg(): + """Get token from keycloak server by using Client Credential Grant flow + Args: + Returns: + access_token (str): string containing an access token + """ + + # Configure keycloak client + keycloak_openid = configure_keycloak_client() + # print(keycloak_openid.client_id) + # Get access token with Client Credential Grant flow + try: + complete_token = keycloak_openid.token(grant_type="client_credentials") + return complete_token['access_token'] + except Exception as e: + logger.create_log(logger.ERROR, None, optional_message=e) + raise HTTPException(status_code=400, + detail="Unable to retrieve an access token with Client Credential Grant flow. " + "Original exception type: {0}".format(type(e).__name__)) + - -def get_access_token_from_keycloak(): +def get_access_token_from_keycloak_with_password(): """Get token from keycloak server by using a default account Args: Returns: @@ -53,53 +77,60 @@ def get_access_token_from_keycloak(): keycloak_openid = configure_keycloak_client() # Get Access Token - complete_token = keycloak_openid.token(KEYCLOAK_USER, KEYCLOAK_PASSWORD) - token = complete_token['access_token'] - return token + try: + complete_token = keycloak_openid.token(KEYCLOAK_USER, KEYCLOAK_PASSWORD) + return complete_token['access_token'] + except Exception as e: + logger.create_log(logger.ERROR, None, optional_message=e) + raise HTTPException(status_code=400, + detail="Unable to retrieve an access token with the specified username and password.") -def get_access_token(request:Request=None): +def get_access_token(request: Request = None): """Get access token, either from request header if any, otherwise from 'nl2cnl_test' keycloak account Args: - req(request): request, default None + request: request, default None Returns: access_token(str): a string containing the access token """ try: token = get_access_token_from_request(request) - return token except Exception as e: - print('WARNING: impossible to retrieve an access token from request header. Using default account to get a token.') - token = get_access_token_from_keycloak() - return token + logger.create_log(logger.ERROR, None, optional_message=e) + print( + 'WARNING: impossible to retrieve an access token from request header. Using Client Credential Grant to get a token.') + token = get_access_token_from_keycloak_with_ccg() + + return token -def access_token_is_valid(access_token:str = Depends(get_access_token)): +def access_token_is_valid(access_token: str = Depends(get_access_token)): keycloak_openid = configure_keycloak_client() KEYCLOAK_PUBLIC_KEY = "-----BEGIN PUBLIC KEY-----\n" + keycloak_openid.public_key() + "\n-----END PUBLIC KEY-----" options = {"verify_signature": True, "verify_aud": False, "verify_exp": True} try: user_info = keycloak_openid.decode_token(access_token, key=KEYCLOAK_PUBLIC_KEY, options=options) except Exception as e: - raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized: invalid token.", headers={"WWW-Authenticate": "Bearer"}) + logger.create_log(logger.ERROR, None, optional_message=e) + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized: invalid token.", + headers={"WWW-Authenticate": "Bearer"}) return user_info def test_access_token_is_valid(access_token): result = access_token_is_valid(access_token) return result - - + if __name__ == "__main__": - - token = get_access_token('Request') """ info = (access_token_is_valid(token)) from pprint import pprint pprint(info) """ - #token = 'invalid token' - access_token = token - print(get_current_user()) + print("Testing Mapper: security_utils.py") + + token = get_access_token_from_keycloak_with_ccg() + + print(token)