Skip to content
Snippets Groups Projects
Select Git revision
  • 22512136e115f9575b7c677a8e2cbca7f6574086
  • main default
2 results

api_server.py

Blame
  • api_server.py 13.19 KiB
    # SPDX-License-Identifier: Apache-2.0
    
    # api_server.py
    
    from fastapi import FastAPI, HTTPException, Depends, Request
    from fastapi.responses import PlainTextResponse, RedirectResponse
    #from fastapi.middleware.cors import CORSMiddleware
    from starlette import status
    from starlette.middleware.cors import CORSMiddleware
    from app_utils import xml_utils, editor_utils, security_utils, orchestrator_utils, logger, catalogue_utils
    import uvicorn
    from config import *
    import json
    from fastapi.security import OAuth2PasswordBearer
    import sys
    
    description = """
    DSL Mapper APIs for MEDINA Project.
    
    ## Description
    You can use this APIs to:
    - Map obligations into Rego Reules and push them to the MEDINA Orchestrator.
    
    """
    
    app = FastAPI(
        title="DSL Mapper",
        description=description,
        version="0.0.1",
        # terms_of_service="http://example.com/terms/",
        contact={
            "name": "Franz Berger",
            # "url": "http://x-force.example.com/contact/",
            "email": "franz.berger@fabasoft.com",
        }
        # license_info={
        #     "name": "Apache 2.0",
        #     "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
        # },
    )
    
    
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=["*"],
        #allow_origin_regex="https?://(.*-(test|dev)\.k8s\.medina\.esilab\.org|localhost(:8080)?)",
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
    )
    
    
    data_df = None
    metric_df = None
    X = None
    X_train = None
    X_test = None
    recommender = None
    
    
    @app.on_event("startup")
    async def startup_event():
        
        #global recommender, data_df, metric_df, X, X_train, X_test
        #data_df = load_local_data()
        #X = load_local_features()
        #metric_mask = data_df["source_type"] == "MEDINA Metrics"
        #X_train = X[metric_mask]
        #metric_df = data_df[metric_mask]
        #metric_df.index = metric_df["ID"]
        #X_test = {k: x.reshape(1, -1) for k, x in zip(list(data_df[~metric_mask]["ID"]), X[~metric_mask])}
        #recommender = MetricRecommender(X_train, metric_df, X_test)
    
        pass
    
    @app.get("/livez")
    async def liveness_check() -> str:
        """
        Perform liveness check in k8s
        """
        return {"API_liveness": 'Live!'}
    
    @app.get("/readyz")
    async def readiness_check() -> str:
        """
        Perform readiness check in k8s
        """
        return {"API_readiness": 'Ready!'}
    
    
    
    
    @app.get("/ids", include_in_schema=False)
    async def get_ids(request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
        #token = security_utils.get_access_token_from_request(request)
    
        #tODO: questo rigo non serve più, credo
        #access_info = security_utils.access_token_is_valid(token)
    
        print(access_info)
        return json.dumps(access_info)
    
        #return json.dumps(req.headers["Authorization"])
    
    """
        some_id = 517
        resp = catalogue_utils.get_tom_from_catalogue(some_id)
        json_resp = json.dumps(resp)
        print(type(json_resp))
        return json_resp
    """
    #@app.get("/ids")
    #async def get_ids():
    #    """
    #    Return the list of toms ids from the (hard-coded) catalogue
    #    """
    #    return list(data_df["ID"].astype("str"))
    
    
    @app.get("/recommend/{req_id}", include_in_schema=False)
    async def recommend(req_id: str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
        """
        Given a tom id, it returns a set of recommended metrics as json (from hard-coded catalogue)
        """
        print('---ACCESS INFO ARE----\n')
        from pprint import pprint
        pprint(access_info)
        print('-----------------------\n')
        # return {"name":req_id}
        if req_id not in list(data_df["ID"].astype("str")):
            return {"req_id": req_id, "error": "not found"}
        result = recommender.get_recommended_metrics(req_id).to_json(orient="records")
        print(json.dumps(json.loads(result), indent=4))
        return json.loads(result)
    
    @app.get("/recommend_metrics/{req_id}", include_in_schema=False)
    async def recommend_metrics(req_id: str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
        """
        Given a tom id, it returns a list with the ids of recommended metrics (from hard-coded catalogue)
        """
        if req_id not in list(data_df["ID"].astype("str")):
            return {"req_id": req_id, "error": "not found"}
        return list(recommender.get_recommended_metrics(req_id)["ID"])
    
    """
    @app.get("/recommend_metrics_from_catalogue/{tom_code}")
    async def get_recommended_metrics_from_catalogue(req_id: str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
        
        #Given a tom code, it returns a list with the names of recommended metrics (from online catalogue)
        
        #TODO: check if tom code exists
        return list(recommender.get_recommended_metrics(req_id)["ID"])
    """
    
    
    @app.post("/map_obligations_to_rego/{reoid}", status_code=200)
    async def map_obl2rego(reoid:str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
        """(WIP) Empty endpoint to test connection between Editor and Mapper
        """
        service_id = '00000000-0000-0000-000000000000'
        orchestrator_response = orchestrator_utils.map_reo(reoid, service_id)
    
        print('REO ID IS {0}'.format(reoid))
        logger.create_log(logger.INFO, request)
    
        if orchestrator_response == 'success':
            message = "REO with id {0} correctly mapped and sent to orchestrator".format(reoid)
            logger.create_log(logger.INFO, None, message)
            return {"status": True, "message": message}
        else:
            message = "Unable to map REO with id {0}. Error: {1}".format(reoid,orchestrator_response)
            logger.create_log(logger.ERROR, None, message)
            return {"status": False, "message": message}
    
        #return {'connection_status' : 'ok','reo_id' : reoid}
    
    @app.post("/create_reo_for_requirement/{username}", status_code=201, include_in_schema=False)
    async def get_reo_for_tom(username:str, tom_code:str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
        """(WIP) Given a tom code:
        a) get tom information from the catalogue
        b) get a list of associated metrics (from catalogue) and recommended metrics (from Metric Recommender)
        c) traslate metrics into obligations (get obligations info from metrics)
        d) create a REO object with tom info and obligations info
        e) store the REO object into the CNL Store through the CNL Editor API
    
        Currently, given a tom code, the function return a reo id
            Args:
                tom_code (string): code of a TOM to be assessed
            Returns:
                status: return the reoid of the created object
    
        """
        USERNAME = username
        logger.create_log(logger.INFO, request)
    
        # a)------------------------------------------------
        # read toms from catalogue and get tom_id for tom_code
        selected_tom = None
        toms_list = catalogue_utils.get_all_toms_from_catalogue()
        for tom in toms_list:
            if tom['code'] == tom_code:
                selected_tom = tom
                break
    
        if selected_tom is None:
            #no tom found for that id
            detail = 'The requirement code/id you have specified has not been found in the catalogue'
            logger.create_log(logger.INFO, None, detail)
            raise HTTPException(status_code=404, detail=detail)
        # b)------------------------------------------------
        else:
            # print('Selected requirement with code: {0}\n'.format(selected_tom['code']))
            recommended_metric_ids = await recommend_metrics(tom_code,request)
            #TODO: get recommendations from catalogue
            #read metrics associated to a tom from catalogue
            associated_metrics = []
            recommended_metrics = []
            metrics_list = catalogue_utils.get_all_metrics_from_catalogue()
    
            for metric in metrics_list:
                #print('Evaluationg metric: {0}'.format(metric['name']))
                if metric['tom']['id'] == selected_tom['id']:
                    associated_metrics.append(metric)
                    #print('AAAAA: {0}'.format(metric['name']))
    
                elif metric['name']in recommended_metric_ids:
                    recommended_metrics.append(metric)
                    #print('RRRRR: {0}'.format(metric['name']))
    
            #put all metrics together (retrieved plus recommended) in a list of dicts
            metrics = associated_metrics + recommended_metrics
            print('Found {0} metrics for this requirement\n'.format(len(metrics)))
            #extract tom data
            tom_data = catalogue_utils.get_tom_data(selected_tom)
            #print(selected_tom['securityControl']['id'])
            tom_framework = catalogue_utils.get_security_control_framework(selected_tom['securityControl']['id'])
            tom_data['framework'] = tom_framework
            #print('Tom framework is {}'.format(tom_framework))
    
        #c)------------------------------------------------
            #extract from each metric obligation data (list of dict)
            # pass the metric_source depending if metric is 'associated' or 'recommended'
            obligation_data_list = []
            if(metrics):
                for metric in associated_metrics:
                    #creating multiple obligations if TargetResourceType is a List (managed inside get_obligation_data)
                    obligation_data_list.extend(catalogue_utils.get_obligation_data(metric,'catalogue'))
                for metric in recommended_metrics:
                    obligation_data_list.extend(catalogue_utils.get_obligation_data(metric,'recommender'))
    
        #d)------------------------------------------------
            #create REO in store
            #get username from token
            try:
                #USERNAME = access_info['preferred_username']
                #print('Username is: {0}'.format(access_info['preferred_username']))
                reoid = editor_utils.create_reo(USERNAME, REO_XML_FILE)
            except Exception as e:
                detail = 'The user you are using cannot access to the Editor'
                logger.create_log(logger.INFO, None, detail)
                raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=detail)
            #reoid = editor_utils.create_reo(USERNAME, REO_XML_FILE)
            message = 'Created empty reo in store with reoid: {0}'.format(reoid)
            logger.create_log(logger.INFO, None, message)
    
            #create reo dict from template
            #check: if no obligation update file with no obl
            print('Datum template path: {0}'.format(DATUM_TEMPLATE))
            if(obligation_data_list):
                reo_obj = xml_utils.create_reo(REO_SCHEMA_FILE, REO_XML_FILE)
                new_reo_obj = xml_utils.update_reo (reo_obj, tom_data, obligation_data_list, reoid)
            else: #no obligations
                reo_obj = xml_utils.create_reo(REO_SCHEMA_FILE, REO_XML_FILE_NO_OBL)
                new_reo_obj = xml_utils.update_reo (reo_obj, tom_data, obligation_data_list, reoid)
    
            message = 'Updated reo locally with reoid {0}'.format(new_reo_obj['@id'])
            logger.create_log(logger.INFO, None, message)
    
            with open(REO_UPDATING_FILE, 'w') as f:
                json.dump(new_reo_obj,f)
            #from pprint import pprint
            #pprint(new_reo_obj)
        #e)------------------------------------------------
            #update REO in store
            #convert reo dict to reo xml
    
            reo_schema = xml_utils.read_xml_schema(REO_SCHEMA_FILE)
            reoxml = xml_utils.dict2xml(new_reo_obj, reo_schema)
    
            #save reo xml within a temp file
            with open(REO_UPDATING_FILE, 'w') as f:
                f.write(reoxml)
            print('The environment before updating reo in store is {0}'.format(env))
            #update reo object with reo xml in the store
            #REO_UPDATING_FILE = reo_xml_obl = 'data/xml/reo_temp_with_obl.xml'
            update_status = editor_utils.update_reo(REO_UPDATING_FILE,reoid)
            print('Updated reo in store with status {0}\n'.format(update_status))
            #print(update_status)
    
            return {"reo_id" : reoid}
            #return new_reo_obj
    
    
    @app.get("/cnl_string/{m_id}", include_in_schema=False)
    async def get_cnl(m_id, request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
        """
        Given the id of a metric, it returns the content of the metric as a string (from hard-coded catalogue)
        """
        metric = metric_df.loc[m_id]
        return '<strong>{metric["Target/Asset"]}</strong> must <strong>"{metric["Metric Name"]}"</strong> <strong>{metric["Target Value Datatype"]}</strong>(<strong>{metric["Operator"]}</strong>,<strong>{metric["Target Value"]}</strong>)'
    
    @app.get("/cnl/{m_id}", include_in_schema=False)
    async def get_cnl(m_id,request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
        """
        Given the id of a metric, it returns the content of the metric as a json (from hard-coded catalogue)
        """
        metric = metric_df.loc[m_id] [["Target/Asset","Metric Name", "Target Value Datatype", "Operator", "Target Value"]]
        # metric["HTML"] = get_cnl(m_id)
        return json.loads(metric.to_json())
    
    
    
    @app.get("/get_all_toms/", response_class=PlainTextResponse, include_in_schema=False)
    async def get_all_toms(request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
        return catalogue_utils.get_all_toms_from_catalogue()
    
    
    @app.get("/get_tom/{tom_id}", response_class=PlainTextResponse, include_in_schema=False)
    async def get_tom(tom_id:int,request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
        return catalogue_utils.get_tom_from_catalogue(tom_id)
    
    if __name__ == "__main__":
        uvicorn.run("api_server:app", host="127.0.0.1", port = 8000)
    # api_server.py