Select Git revision
api_server.py
api_server.py 13.19 KiB
# SPDX-License-Identifier: Apache-2.0
# api_server.py
from fastapi import FastAPI, HTTPException, Depends, Request
from fastapi.responses import PlainTextResponse, RedirectResponse
#from fastapi.middleware.cors import CORSMiddleware
from starlette import status
from starlette.middleware.cors import CORSMiddleware
from app_utils import xml_utils, editor_utils, security_utils, orchestrator_utils, logger, catalogue_utils
import uvicorn
from config import *
import json
from fastapi.security import OAuth2PasswordBearer
import sys
description = """
DSL Mapper APIs for MEDINA Project.
## Description
You can use this APIs to:
- Map obligations into Rego Reules and push them to the MEDINA Orchestrator.
"""
app = FastAPI(
title="DSL Mapper",
description=description,
version="0.0.1",
# terms_of_service="http://example.com/terms/",
contact={
"name": "Franz Berger",
# "url": "http://x-force.example.com/contact/",
"email": "franz.berger@fabasoft.com",
}
# license_info={
# "name": "Apache 2.0",
# "url": "https://www.apache.org/licenses/LICENSE-2.0.html",
# },
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
#allow_origin_regex="https?://(.*-(test|dev)\.k8s\.medina\.esilab\.org|localhost(:8080)?)",
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
data_df = None
metric_df = None
X = None
X_train = None
X_test = None
recommender = None
@app.on_event("startup")
async def startup_event():
#global recommender, data_df, metric_df, X, X_train, X_test
#data_df = load_local_data()
#X = load_local_features()
#metric_mask = data_df["source_type"] == "MEDINA Metrics"
#X_train = X[metric_mask]
#metric_df = data_df[metric_mask]
#metric_df.index = metric_df["ID"]
#X_test = {k: x.reshape(1, -1) for k, x in zip(list(data_df[~metric_mask]["ID"]), X[~metric_mask])}
#recommender = MetricRecommender(X_train, metric_df, X_test)
pass
@app.get("/livez")
async def liveness_check() -> str:
"""
Perform liveness check in k8s
"""
return {"API_liveness": 'Live!'}
@app.get("/readyz")
async def readiness_check() -> str:
"""
Perform readiness check in k8s
"""
return {"API_readiness": 'Ready!'}
@app.get("/ids", include_in_schema=False)
async def get_ids(request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
#token = security_utils.get_access_token_from_request(request)
#tODO: questo rigo non serve più, credo
#access_info = security_utils.access_token_is_valid(token)
print(access_info)
return json.dumps(access_info)
#return json.dumps(req.headers["Authorization"])
"""
some_id = 517
resp = catalogue_utils.get_tom_from_catalogue(some_id)
json_resp = json.dumps(resp)
print(type(json_resp))
return json_resp
"""
#@app.get("/ids")
#async def get_ids():
# """
# Return the list of toms ids from the (hard-coded) catalogue
# """
# return list(data_df["ID"].astype("str"))
@app.get("/recommend/{req_id}", include_in_schema=False)
async def recommend(req_id: str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
"""
Given a tom id, it returns a set of recommended metrics as json (from hard-coded catalogue)
"""
print('---ACCESS INFO ARE----\n')
from pprint import pprint
pprint(access_info)
print('-----------------------\n')
# return {"name":req_id}
if req_id not in list(data_df["ID"].astype("str")):
return {"req_id": req_id, "error": "not found"}
result = recommender.get_recommended_metrics(req_id).to_json(orient="records")
print(json.dumps(json.loads(result), indent=4))
return json.loads(result)
@app.get("/recommend_metrics/{req_id}", include_in_schema=False)
async def recommend_metrics(req_id: str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
"""
Given a tom id, it returns a list with the ids of recommended metrics (from hard-coded catalogue)
"""
if req_id not in list(data_df["ID"].astype("str")):
return {"req_id": req_id, "error": "not found"}
return list(recommender.get_recommended_metrics(req_id)["ID"])
"""
@app.get("/recommend_metrics_from_catalogue/{tom_code}")
async def get_recommended_metrics_from_catalogue(req_id: str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
#Given a tom code, it returns a list with the names of recommended metrics (from online catalogue)
#TODO: check if tom code exists
return list(recommender.get_recommended_metrics(req_id)["ID"])
"""
@app.post("/map_obligations_to_rego/{reoid}", status_code=200)
async def map_obl2rego(reoid:str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
"""(WIP) Empty endpoint to test connection between Editor and Mapper
"""
service_id = '00000000-0000-0000-000000000000'
orchestrator_response = orchestrator_utils.map_reo(reoid, service_id)
print('REO ID IS {0}'.format(reoid))
logger.create_log(logger.INFO, request)
if orchestrator_response == 'success':
message = "REO with id {0} correctly mapped and sent to orchestrator".format(reoid)
logger.create_log(logger.INFO, None, message)
return {"status": True, "message": message}
else:
message = "Unable to map REO with id {0}. Error: {1}".format(reoid,orchestrator_response)
logger.create_log(logger.ERROR, None, message)
return {"status": False, "message": message}
#return {'connection_status' : 'ok','reo_id' : reoid}
@app.post("/create_reo_for_requirement/{username}", status_code=201, include_in_schema=False)
async def get_reo_for_tom(username:str, tom_code:str, request:Request, access_info:dict = Depends(security_utils.access_token_is_valid)):
"""(WIP) Given a tom code:
a) get tom information from the catalogue
b) get a list of associated metrics (from catalogue) and recommended metrics (from Metric Recommender)
c) traslate metrics into obligations (get obligations info from metrics)
d) create a REO object with tom info and obligations info
e) store the REO object into the CNL Store through the CNL Editor API
Currently, given a tom code, the function return a reo id
Args:
tom_code (string): code of a TOM to be assessed
Returns:
status: return the reoid of the created object
"""
USERNAME = username
logger.create_log(logger.INFO, request)
# a)------------------------------------------------
# read toms from catalogue and get tom_id for tom_code
selected_tom = None
toms_list = catalogue_utils.get_all_toms_from_catalogue()
for tom in toms_list:
if tom['code'] == tom_code:
selected_tom = tom
break
if selected_tom is None:
#no tom found for that id
detail = 'The requirement code/id you have specified has not been found in the catalogue'
logger.create_log(logger.INFO, None, detail)
raise HTTPException(status_code=404, detail=detail)
# b)------------------------------------------------
else:
# print('Selected requirement with code: {0}\n'.format(selected_tom['code']))
recommended_metric_ids = await recommend_metrics(tom_code,request)
#TODO: get recommendations from catalogue
#read metrics associated to a tom from catalogue
associated_metrics = []
recommended_metrics = []
metrics_list = catalogue_utils.get_all_metrics_from_catalogue()
for metric in metrics_list:
#print('Evaluationg metric: {0}'.format(metric['name']))
if metric['tom']['id'] == selected_tom['id']:
associated_metrics.append(metric)
#print('AAAAA: {0}'.format(metric['name']))
elif metric['name']in recommended_metric_ids:
recommended_metrics.append(metric)
#print('RRRRR: {0}'.format(metric['name']))
#put all metrics together (retrieved plus recommended) in a list of dicts
metrics = associated_metrics + recommended_metrics
print('Found {0} metrics for this requirement\n'.format(len(metrics)))
#extract tom data
tom_data = catalogue_utils.get_tom_data(selected_tom)
#print(selected_tom['securityControl']['id'])
tom_framework = catalogue_utils.get_security_control_framework(selected_tom['securityControl']['id'])
tom_data['framework'] = tom_framework
#print('Tom framework is {}'.format(tom_framework))
#c)------------------------------------------------
#extract from each metric obligation data (list of dict)
# pass the metric_source depending if metric is 'associated' or 'recommended'
obligation_data_list = []
if(metrics):
for metric in associated_metrics:
#creating multiple obligations if TargetResourceType is a List (managed inside get_obligation_data)
obligation_data_list.extend(catalogue_utils.get_obligation_data(metric,'catalogue'))
for metric in recommended_metrics:
obligation_data_list.extend(catalogue_utils.get_obligation_data(metric,'recommender'))
#d)------------------------------------------------
#create REO in store
#get username from token
try:
#USERNAME = access_info['preferred_username']
#print('Username is: {0}'.format(access_info['preferred_username']))
reoid = editor_utils.create_reo(USERNAME, REO_XML_FILE)
except Exception as e:
detail = 'The user you are using cannot access to the Editor'
logger.create_log(logger.INFO, None, detail)
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail=detail)
#reoid = editor_utils.create_reo(USERNAME, REO_XML_FILE)
message = 'Created empty reo in store with reoid: {0}'.format(reoid)
logger.create_log(logger.INFO, None, message)
#create reo dict from template
#check: if no obligation update file with no obl
print('Datum template path: {0}'.format(DATUM_TEMPLATE))
if(obligation_data_list):
reo_obj = xml_utils.create_reo(REO_SCHEMA_FILE, REO_XML_FILE)
new_reo_obj = xml_utils.update_reo (reo_obj, tom_data, obligation_data_list, reoid)
else: #no obligations
reo_obj = xml_utils.create_reo(REO_SCHEMA_FILE, REO_XML_FILE_NO_OBL)
new_reo_obj = xml_utils.update_reo (reo_obj, tom_data, obligation_data_list, reoid)
message = 'Updated reo locally with reoid {0}'.format(new_reo_obj['@id'])
logger.create_log(logger.INFO, None, message)
with open(REO_UPDATING_FILE, 'w') as f:
json.dump(new_reo_obj,f)
#from pprint import pprint
#pprint(new_reo_obj)
#e)------------------------------------------------
#update REO in store
#convert reo dict to reo xml
reo_schema = xml_utils.read_xml_schema(REO_SCHEMA_FILE)
reoxml = xml_utils.dict2xml(new_reo_obj, reo_schema)
#save reo xml within a temp file
with open(REO_UPDATING_FILE, 'w') as f:
f.write(reoxml)
print('The environment before updating reo in store is {0}'.format(env))
#update reo object with reo xml in the store
#REO_UPDATING_FILE = reo_xml_obl = 'data/xml/reo_temp_with_obl.xml'
update_status = editor_utils.update_reo(REO_UPDATING_FILE,reoid)
print('Updated reo in store with status {0}\n'.format(update_status))
#print(update_status)
return {"reo_id" : reoid}
#return new_reo_obj
@app.get("/cnl_string/{m_id}", include_in_schema=False)
async def get_cnl(m_id, request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
"""
Given the id of a metric, it returns the content of the metric as a string (from hard-coded catalogue)
"""
metric = metric_df.loc[m_id]
return '<strong>{metric["Target/Asset"]}</strong> must <strong>"{metric["Metric Name"]}"</strong> <strong>{metric["Target Value Datatype"]}</strong>(<strong>{metric["Operator"]}</strong>,<strong>{metric["Target Value"]}</strong>)'
@app.get("/cnl/{m_id}", include_in_schema=False)
async def get_cnl(m_id,request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
"""
Given the id of a metric, it returns the content of the metric as a json (from hard-coded catalogue)
"""
metric = metric_df.loc[m_id] [["Target/Asset","Metric Name", "Target Value Datatype", "Operator", "Target Value"]]
# metric["HTML"] = get_cnl(m_id)
return json.loads(metric.to_json())
@app.get("/get_all_toms/", response_class=PlainTextResponse, include_in_schema=False)
async def get_all_toms(request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
return catalogue_utils.get_all_toms_from_catalogue()
@app.get("/get_tom/{tom_id}", response_class=PlainTextResponse, include_in_schema=False)
async def get_tom(tom_id:int,request:Request,access_info:dict = Depends(security_utils.access_token_is_valid)):
return catalogue_utils.get_tom_from_catalogue(tom_id)
if __name__ == "__main__":
uvicorn.run("api_server:app", host="127.0.0.1", port = 8000)
# api_server.py