Skip to content
Snippets Groups Projects
Commit cc068560 authored by michelafazzolari@gmail.com's avatar michelafazzolari@gmail.com
Browse files

Update security_utils.py

parent ccf1d5dc
No related branches found
No related tags found
No related merge requests found
# SPDX-License-Identifier: Apache-2.0
# api_server.py
from fastapi import FastAPI, HTTPException, Request, status, Security, Depends
from fastapi.security import OAuth2AuthorizationCodeBearer
from keycloak import KeycloakOpenID
#from app_utils import catalogue_utils, xml_utils, editor_utils
#security_utils.py
from config import *
from app_utils import logger
from keycloak import KeycloakOpenID
from fastapi.security import OAuth2AuthorizationCodeBearer
from fastapi import HTTPException, Request, status, Depends
def create_oauth2_scheme():
......@@ -16,14 +14,16 @@ def create_oauth2_scheme():
tokenUrl="https://sso.example.com/auth/realms/example-realm/protocol/openid-connect/token")
return oauth2_scheme
def configure_keycloak_client():
#TODO: move to config file
# DONE: move to config file
# Configure keycloak info
keycloak_openid = KeycloakOpenID(server_url=KEYCLOAK_URL,
client_id=KEYCLOAK_CLIENT_ID,
realm_name=KEYCLOAK_REALM,
verify=True,
client_secret_key=KEYCLOAK_CLIENT_SECRET)
return keycloak_openid
......@@ -39,10 +39,34 @@ def get_access_token_from_request(req:Request):
token = bearer_token.replace('Bearer ', '')
return token
except Exception as ex:
raise HTTPException(status_code=400, detail="Cannot retrieve access token from header. Original exception type: {0}".format(type(ex).__name__))
logger.create_log(logger.ERROR, None, optional_message=ex)
raise HTTPException(status_code=400,
detail="Cannot retrieve access token from header. Original exception type: {0}".format(
type(ex).__name__))
def get_access_token_from_keycloak_with_ccg():
"""Get token from keycloak server by using Client Credential Grant flow
Args:
Returns:
access_token (str): string containing an access token
"""
# Configure keycloak client
keycloak_openid = configure_keycloak_client()
# print(keycloak_openid.client_id)
# Get access token with Client Credential Grant flow
try:
complete_token = keycloak_openid.token(grant_type="client_credentials")
return complete_token['access_token']
except Exception as e:
logger.create_log(logger.ERROR, None, optional_message=e)
raise HTTPException(status_code=400,
detail="Unable to retrieve an access token with Client Credential Grant flow. "
"Original exception type: {0}".format(type(e).__name__))
def get_access_token_from_keycloak():
def get_access_token_from_keycloak_with_password():
"""Get token from keycloak server by using a default account
Args:
Returns:
......@@ -53,24 +77,30 @@ def get_access_token_from_keycloak():
keycloak_openid = configure_keycloak_client()
# Get Access Token
try:
complete_token = keycloak_openid.token(KEYCLOAK_USER, KEYCLOAK_PASSWORD)
token = complete_token['access_token']
return token
return complete_token['access_token']
except Exception as e:
logger.create_log(logger.ERROR, None, optional_message=e)
raise HTTPException(status_code=400,
detail="Unable to retrieve an access token with the specified username and password.")
def get_access_token(request: Request = None):
"""Get access token, either from request header if any, otherwise from 'nl2cnl_test' keycloak account
Args:
req(request): request, default None
request: request, default None
Returns:
access_token(str): a string containing the access token
"""
try:
token = get_access_token_from_request(request)
return token
except Exception as e:
print('WARNING: impossible to retrieve an access token from request header. Using default account to get a token.')
token = get_access_token_from_keycloak()
logger.create_log(logger.ERROR, None, optional_message=e)
print(
'WARNING: impossible to retrieve an access token from request header. Using Client Credential Grant to get a token.')
token = get_access_token_from_keycloak_with_ccg()
return token
......@@ -81,7 +111,9 @@ def access_token_is_valid(access_token:str = Depends(get_access_token)):
try:
user_info = keycloak_openid.decode_token(access_token, key=KEYCLOAK_PUBLIC_KEY, options=options)
except Exception as e:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized: invalid token.", headers={"WWW-Authenticate": "Bearer"})
logger.create_log(logger.ERROR, None, optional_message=e)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Unauthorized: invalid token.",
headers={"WWW-Authenticate": "Bearer"})
return user_info
......@@ -90,16 +122,15 @@ def test_access_token_is_valid(access_token):
return result
if __name__ == "__main__":
token = get_access_token('Request')
"""
info = (access_token_is_valid(token))
from pprint import pprint
pprint(info)
"""
#token = 'invalid token'
access_token = token
print(get_current_user())
print("Testing Mapper: security_utils.py")
token = get_access_token_from_keycloak_with_ccg()
print(token)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment