Skip to content
Snippets Groups Projects
Select Git revision
  • 974cea2d9bbba9b984694f4ce107f3255f076b08
  • main default
2 results

catalogue_utils.py

Blame
  • catalogue_utils.py 8.27 KiB
    # SPDX-License-Identifier: Apache-2.0
    import json
    from app_utils import security_utils
    from coc_backend_api_client.models import SecurityMetricDTO, TomDTO
    from coc_backend_api_client.types import Response
    from coc_backend_api_client import AuthenticatedClient, Client
    from config import *
    
    #import coc_backend_api_client
    
    def configure_authenticated_client(request=None):
        from fastapi.exceptions import HTTPException
        """Create a new keycloak client and configure it
                Args:
                Returns:
                    client (AuthenticatedClient): an authenticated client from coc_backend_api_client
        """
        token = security_utils.get_access_token(request)
        """
        # TODO risky without ssl check
        #TODO: change with token get from request
        try:
            token = security_utils.get_access_token_from_request(request)
            
        except HTTPException as e:
            print('WARNING: catalogue_utils->configure_authenticated_client was not able to retrieve an access token. Using admin account to get a token.')
            token = security_utils.get_access_token_from_keycloak()
        """
        client = AuthenticatedClient(base_url=CATALOGUE_API_URL, token=token,verify_ssl=False)
        #print(client.get_headers())
        return client
    
    
    def get_all_metrics_from_catalogue(request = None):
        """Connects to the MEDINA catalogue through an authenticated client and returns all metrics
                Args:
                Returns: 
                    metrics (list): a list of dicts, each containing info related to a metric
        """
        client = configure_authenticated_client(request)
        from coc_backend_api_client.api.security_metric_resource import get_all_security_metrics_using_get
        response: Response[SecurityMetricDTO] = get_all_security_metrics_using_get.sync_detailed(client=client)
        return json.loads(response.content.decode('utf-8'))
    
    def get_metric_by_name_from_catalogue(metric_name, request = None):
        """Connects to the MEDINA catalogue through an authenticated client and returns a metric by name
                Args:
                    metric_name (str): a string containing the metric name
                Returns:
                    metric (dict): a dict containing all the info related to a metric 
        """
        client = configure_authenticated_client(request)
        from coc_backend_api_client.api.security_metric_resource import get_all_security_metrics_using_get
        response: Response[SecurityMetricDTO] = get_all_security_metrics_using_get.sync_detailed(name_equals = metric_name, client=client)
        return json.loads(response.content.decode('utf-8'))[0]
    
    def get_metric_from_catalogue(mid:int, request = None):
        """Connects to the MEDINA catalogue through an authenticated client and returns a metric by id
                Args:
                    mid (int) : an integer corresponding to the internal id of metric in the catalogue
                Returns:
                    metric (dict): a dict contaning info related to a metric
        """
        client=configure_authenticated_client(request)
        from coc_backend_api_client.api.security_metric_resource import get_security_metric_using_get
        response: Response[SecurityMetricDTO] = get_security_metric_using_get.sync_detailed(id=mid, client=client)
        return json.loads(response.content.decode('utf-8'))
    
    
    def get_all_toms_from_catalogue(request=None):
        """Connects to the MEDINA catalogue through an authenticated client and returns all toms
                Arg:
                Returns:
                    toms (list): a list of dicts, each containing info related to a tom
        """
        client = configure_authenticated_client(request)
        from coc_backend_api_client.api.tom_resource import get_all_toms_using_get
        response: Response[TomDTO] = get_all_toms_using_get.sync_detailed(client=client)
        return json.loads(response.content.decode('utf-8'))
    
    
    def get_tom_from_catalogue(tom_id:int, request=None):
        """Connects to the MEDINA catalogue through an authenticated client and returns a tom by id
                Args:
                    tom_id (int): an integer corresponding to the internal id of tom in the catalogue
                Returns:
                    tom (dict): a dict contaning info related to a tom
        """
        client = configure_authenticated_client(request)
        from coc_backend_api_client.api.tom_resource import get_tom_using_get
        response: Response[TomDTO] = get_tom_using_get.sync_detailed(id=tom_id,client=client)
        return json.loads(response.content.decode('utf-8'))
    
    def get_security_control_framework(security_control_id:int, request=None):
        """Connects to the MEDINA catalogue through an authenticated client and retrieved the security control framework corresopondent to a tom
                Args:
                    security_control_id (int): an integer corresponding to the internal id of the security_control in the catalogue
                Returns:
                    framework (string): a string with the security framework the tom belongs to
        """
        client = configure_authenticated_client(request)
        from coc_backend_api_client.api.security_control_resource import get_security_control_using_get
        response: Response[SecurityControlDTO] = get_security_control_using_get.sync_detailed(id = security_control_id, client = client)
        resp = json.loads(response.content.decode('utf-8'))
        sc_cat_id = resp['securityControlCategory']['id']
        from coc_backend_api_client.api.security_control_category_resource import get_security_control_category_using_get
        response: Response[SecurityControlCategoryDTO] = get_security_control_category_using_get.sync_detailed (id=sc_cat_id, client=client)
        resp = json.loads(response.content.decode('utf-8'))
        return resp['securityControlFramework']['name']
        
        
    
    def get_tom_data(tom:dict):
        """Given a tom in a dict, it returns a dict containing only the information to put in the reo object
                Args:
                    tom (dict): a dict containing a tom
                Returns:
                    tom data (dict): a dict containing only tom data to put in the reo object
        """
        return  {
                'tomCode' : tom['code'],
                'tomName' : tom['name'],
                'securityControl' : tom['securityControl']['name'],
                'framework' : 'EUCS',
                'type' : tom['type'],
                'description' : tom['description'],
                'assuranceLevel' : tom['assuranceLevel']
                }
    
    
    def get_obligation_data(metric:dict,source=None):
        """Given a metric in a dict, it returns a dict containing only the information to put in the reo object
            Args:
                metric (dict): a dict containing a metric
                source (str): str with the source of the metric (associated, recommended, None)
            Returns:
                metric data (list of dict): a list of dict containing only metric data to put in the reo object
        """
        #if targetResourceType is a list, create two obligations, else a list with a single dict
        obl_data_list = []
        trt_list = metric['targetResourceType'].split(', ')
    
    
        for value in trt_list:
            obl_data_list.append({'id' : metric['metricId'],
                'description' : metric['description'],
                'targetResourceType' : value,
                'name' : metric['name'],
                'targetValueDatatype' : metric['targetValueDatatype'],
                'operator' : metric['operator'],
                'targetValue' : metric['targetValue'],
                'source' : source
                })
        return obl_data_list
    
    
    def test_get_metric(metric_id:int):
        resp = get_metric_from_catalogue(metric_id)
        print(resp)
    
    def test_get_metric_by_name(metric_name:str):
        resp = get_metric_by_name_from_catalogue(metric_name)
        print(resp)
    
    def test_get_all_metrics():
        resp = get_all_metrics_from_catalogue()
        print(resp)
    
    
    def test_get_all_toms():
        resp = get_all_toms_from_catalogue()
        print(resp)
    
    
    def test_get_tom(tom_id:int):
        resp = get_tom_from_catalogue(tom_id)
        print(resp)
    
    if __name__ == '__main__':
        #some_id = 2
        #test_get_metric(some_id)
        #test_get_all_metrics()
    
        #some_id = 1
        #test_get_metric(some_id)
        #test_get_all_metrics()
        #some_id = 'OPS-01.1'
        #some_id = 517
        #test_get_tom(some_id)
        #from pprint import pprint
        m1 = get_metric_by_name_from_catalogue('TransportEncryptionEnforced')
    
        #m2 = get_metric_by_name_from_catalogue('TLSVersion')
        from pprint import pprint
        pprint(get_obligation_data(m1))
        #get_obligation_data(metric)
        #pprint(test_get_all_toms())