Select Git revision
catalogue_utils.py
catalogue_utils.py 8.27 KiB
# SPDX-License-Identifier: Apache-2.0
import json
from app_utils import security_utils
from coc_backend_api_client.models import SecurityMetricDTO, TomDTO
from coc_backend_api_client.types import Response
from coc_backend_api_client import AuthenticatedClient, Client
from config import *
#import coc_backend_api_client
def configure_authenticated_client(request=None):
from fastapi.exceptions import HTTPException
"""Create a new keycloak client and configure it
Args:
Returns:
client (AuthenticatedClient): an authenticated client from coc_backend_api_client
"""
token = security_utils.get_access_token(request)
"""
# TODO risky without ssl check
#TODO: change with token get from request
try:
token = security_utils.get_access_token_from_request(request)
except HTTPException as e:
print('WARNING: catalogue_utils->configure_authenticated_client was not able to retrieve an access token. Using admin account to get a token.')
token = security_utils.get_access_token_from_keycloak()
"""
client = AuthenticatedClient(base_url=CATALOGUE_API_URL, token=token,verify_ssl=False)
#print(client.get_headers())
return client
def get_all_metrics_from_catalogue(request = None):
"""Connects to the MEDINA catalogue through an authenticated client and returns all metrics
Args:
Returns:
metrics (list): a list of dicts, each containing info related to a metric
"""
client = configure_authenticated_client(request)
from coc_backend_api_client.api.security_metric_resource import get_all_security_metrics_using_get
response: Response[SecurityMetricDTO] = get_all_security_metrics_using_get.sync_detailed(client=client)
return json.loads(response.content.decode('utf-8'))
def get_metric_by_name_from_catalogue(metric_name, request = None):
"""Connects to the MEDINA catalogue through an authenticated client and returns a metric by name
Args:
metric_name (str): a string containing the metric name
Returns:
metric (dict): a dict containing all the info related to a metric
"""
client = configure_authenticated_client(request)
from coc_backend_api_client.api.security_metric_resource import get_all_security_metrics_using_get
response: Response[SecurityMetricDTO] = get_all_security_metrics_using_get.sync_detailed(name_equals = metric_name, client=client)
return json.loads(response.content.decode('utf-8'))[0]
def get_metric_from_catalogue(mid:int, request = None):
"""Connects to the MEDINA catalogue through an authenticated client and returns a metric by id
Args:
mid (int) : an integer corresponding to the internal id of metric in the catalogue
Returns:
metric (dict): a dict contaning info related to a metric
"""
client=configure_authenticated_client(request)
from coc_backend_api_client.api.security_metric_resource import get_security_metric_using_get
response: Response[SecurityMetricDTO] = get_security_metric_using_get.sync_detailed(id=mid, client=client)
return json.loads(response.content.decode('utf-8'))
def get_all_toms_from_catalogue(request=None):
"""Connects to the MEDINA catalogue through an authenticated client and returns all toms
Arg:
Returns:
toms (list): a list of dicts, each containing info related to a tom
"""
client = configure_authenticated_client(request)
from coc_backend_api_client.api.tom_resource import get_all_toms_using_get
response: Response[TomDTO] = get_all_toms_using_get.sync_detailed(client=client)
return json.loads(response.content.decode('utf-8'))
def get_tom_from_catalogue(tom_id:int, request=None):
"""Connects to the MEDINA catalogue through an authenticated client and returns a tom by id
Args:
tom_id (int): an integer corresponding to the internal id of tom in the catalogue
Returns:
tom (dict): a dict contaning info related to a tom
"""
client = configure_authenticated_client(request)
from coc_backend_api_client.api.tom_resource import get_tom_using_get
response: Response[TomDTO] = get_tom_using_get.sync_detailed(id=tom_id,client=client)
return json.loads(response.content.decode('utf-8'))
def get_security_control_framework(security_control_id:int, request=None):
"""Connects to the MEDINA catalogue through an authenticated client and retrieved the security control framework corresopondent to a tom
Args:
security_control_id (int): an integer corresponding to the internal id of the security_control in the catalogue
Returns:
framework (string): a string with the security framework the tom belongs to
"""
client = configure_authenticated_client(request)
from coc_backend_api_client.api.security_control_resource import get_security_control_using_get
response: Response[SecurityControlDTO] = get_security_control_using_get.sync_detailed(id = security_control_id, client = client)
resp = json.loads(response.content.decode('utf-8'))
sc_cat_id = resp['securityControlCategory']['id']
from coc_backend_api_client.api.security_control_category_resource import get_security_control_category_using_get
response: Response[SecurityControlCategoryDTO] = get_security_control_category_using_get.sync_detailed (id=sc_cat_id, client=client)
resp = json.loads(response.content.decode('utf-8'))
return resp['securityControlFramework']['name']
def get_tom_data(tom:dict):
"""Given a tom in a dict, it returns a dict containing only the information to put in the reo object
Args:
tom (dict): a dict containing a tom
Returns:
tom data (dict): a dict containing only tom data to put in the reo object
"""
return {
'tomCode' : tom['code'],
'tomName' : tom['name'],
'securityControl' : tom['securityControl']['name'],
'framework' : 'EUCS',
'type' : tom['type'],
'description' : tom['description'],
'assuranceLevel' : tom['assuranceLevel']
}
def get_obligation_data(metric:dict,source=None):
"""Given a metric in a dict, it returns a dict containing only the information to put in the reo object
Args:
metric (dict): a dict containing a metric
source (str): str with the source of the metric (associated, recommended, None)
Returns:
metric data (list of dict): a list of dict containing only metric data to put in the reo object
"""
#if targetResourceType is a list, create two obligations, else a list with a single dict
obl_data_list = []
trt_list = metric['targetResourceType'].split(', ')
for value in trt_list:
obl_data_list.append({'id' : metric['metricId'],
'description' : metric['description'],
'targetResourceType' : value,
'name' : metric['name'],
'targetValueDatatype' : metric['targetValueDatatype'],
'operator' : metric['operator'],
'targetValue' : metric['targetValue'],
'source' : source
})
return obl_data_list
def test_get_metric(metric_id:int):
resp = get_metric_from_catalogue(metric_id)
print(resp)
def test_get_metric_by_name(metric_name:str):
resp = get_metric_by_name_from_catalogue(metric_name)
print(resp)
def test_get_all_metrics():
resp = get_all_metrics_from_catalogue()
print(resp)
def test_get_all_toms():
resp = get_all_toms_from_catalogue()
print(resp)
def test_get_tom(tom_id:int):
resp = get_tom_from_catalogue(tom_id)
print(resp)
if __name__ == '__main__':
#some_id = 2
#test_get_metric(some_id)
#test_get_all_metrics()
#some_id = 1
#test_get_metric(some_id)
#test_get_all_metrics()
#some_id = 'OPS-01.1'
#some_id = 517
#test_get_tom(some_id)
#from pprint import pprint
m1 = get_metric_by_name_from_catalogue('TransportEncryptionEnforced')
#m2 = get_metric_by_name_from_catalogue('TLSVersion')
from pprint import pprint
pprint(get_obligation_data(m1))
#get_obligation_data(metric)
#pprint(test_get_all_toms())