Skip to content
Snippets Groups Projects
Commit 53ff563d authored by penenadpi's avatar penenadpi Committed by Anze Luzar
Browse files

HTML result summary with prioritization

Within this commit we're applying the following changes:
-HTML code generation in ResultsSummary
-Adding outcome prioritization
-Added results summary with colors
-Extended JSON info with additional fields
parent 4a5a4274
No related branches found
No related tags found
No related merge requests found
import requests
import sys
URL = "http://127.0.0.1:8000/scan"
multipart_form_data = {
"iac": ("hello-world.zip", open("hello-world.zip", "rb")),
"checks": (None, "git-leaks,tfsec,tflint,shellcheck"),
"iac": (sys.argv[1], open(sys.argv[1], "rb")),
"checks": (None, sys.argv[2]),
}
response = requests.post(URL, files=multipart_form_data)
print(response.json())
......
Generated HTML pages are stored here
JSON scan summaries are stored here
Individual scan tool logs are dumped here
......@@ -8,6 +8,7 @@ class Compatibility:
:param matrix: dictionary of available checks for given Iac type
"""
self.compatibility_matrix = matrix
self.scanned_files = dict()
def get_check_list(self, iac_type: str) -> list:
"""
......@@ -25,25 +26,64 @@ class Compatibility:
shell = False
py = False
yaml = False
java = False
html = False
types = list()
scanned_terraform = list()
scanned_shell = list()
scanned_py = list()
scanned_yaml = list()
scanned_java = list()
scanned_html = list()
try:
for filename in os.listdir(iac_directory):
f = os.path.join(iac_directory, filename)
if os.path.isfile(f):
if f.find(".tf") > -1 and (terraform is False):
if f.find(".tf") > -1:
# and (terraform is False):
types.append("terraform")
terraform = True
if f.find(".sh") > -1 and (shell is False):
scanned_terraform.append(filename)
if f.find(".sh") > -1:
# and (shell is False):
types.append("shell")
shell = True
if f.find(".py") > -1 and (py is False):
scanned_shell.append(filename)
if f.find(".py") > -1:
# and (py is False):
types.append("python")
py = True
if f.find(".yaml") > -1 and (yaml is False):
scanned_py.append(filename)
if f.find(".yaml") > -1:
# and (yaml is False):
types.append("yaml")
yaml = True
scanned_yaml.append(filename)
if f.find(".java") > -1:
# and (yaml is False):
types.append("java")
java = True
scanned_java.append(filename)
if f.find(".html") > -1:
# and (yaml is False):
types.append("html")
html = True
scanned_html.append(filename)
self.scanned_files["terraform"] = str(scanned_terraform)
self.scanned_files["python"] = str(scanned_py)
self.scanned_files["shell"] = str(scanned_shell)
self.scanned_files["yaml"] = str(scanned_yaml)
self.scanned_files["java"] = str(scanned_java)
self.scanned_files["html"] = str(scanned_html)
return types
except Exception as e:
raise Exception(f"Error when checking directory type: {str(e)}.")
......
......@@ -2,6 +2,9 @@ import os
import json
from iac_scan_runner.utils import write_html_to_file
class ResultsSummary:
def __init__(self):
"""
......@@ -15,49 +18,123 @@ class ResultsSummary:
Returns the list of available scanner check tools for given type of IaC archive
:return: list object conatining string names of checks
"""
return self.outcomes[check_name]
return self.outcomes[check_name]["status"]
def set_check_outcome(self, check_name: str, outcome: bool):
def set_check_outcome(self, check_name: str, outcome: str, file_list: str):
"""
Returns the list of available scanner check tools for given type of IaC archive
:return: list object conatining string names of checks
"""
outcomes[check_name] = outcome
self.outcomes[check] = {}
outcomes[check_name]["status"] = outcome
def summarize_outcome(self, check: str, outcome: str) -> bool:
def summarize_outcome(
self, check: str, outcome: str, scanned_files: dict, compatibility_matrix: dict
) -> str:
"""Summarize the check result to True/False depending on the return tool output
:param check: Name of the considered check of interest
:return: Whether the check passed (True) or failed (False)
"""
self.outcomes[check] = {}
self.outcomes[check]["log"] = outcome
file_list = ""
for t in compatibility_matrix:
if check in compatibility_matrix[t]:
file_list = str(scanned_files[t])
self.outcomes[check]["files"] = file_list
if check == "tfsec":
if outcome.find("No problems detected!") > -1:
self.outcomes[check] = True
return True
self.outcomes[check]["status"] = "Passed"
return "Passed"
else:
self.outcomes[check] = False
return False
self.outcomes[check]["status"] = "Problems"
return "Problems"
if check == "git-leaks":
if outcome.find("No leaks found") > -1:
self.outcomes[check] = True
return True
self.outcomes[check]["status"] = "Passed"
return "Passed"
else:
self.outcomes[check] = False
return False
self.outcomes[check]["status"] = "Problems"
return "Problems"
if check == "tflint":
if outcome == "":
self.outcomes[check] = True
return True
self.outcomes[check]["status"] = "Passed"
return "Passed"
else:
self.outcomes[check] = False
return False
self.outcomes[check]["status"] = "Problems"
return "Problems"
def summarize_no_files(self, check: str):
self.outcomes[check] = {}
self.outcomes[check]["status"] = "No files"
self.outcomes[check]["log"] = ""
self.outcomes[check]["files"] = ""
def show_outcomes(self):
print(self.outcomes)
def dump_outcomes(self, file_name: str):
file_path = "json_dumps/" + file_name + ".json"
file_path = "../outputs/json_dumps/" + file_name + ".json"
with open(file_path, "w") as fp:
json.dump(self.outcomes, fp)
def generate_html_prioritized(self, file_name: str):
html_page = "<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style> <body> <h2>Scan results</h2> <table style='width:100%'> <tr> <th>Scan</th><th>Status</th><th>Files</th><th>Log</th> </tr>"
# parse scans
for scan in self.outcomes:
if self.outcomes[scan]["status"] == "Problems":
html_page = html_page + "<tr>"
html_page = html_page + "<td>" + scan + "</td>"
html_page = (
html_page
+ "<td bgcolor='red'>"
+ str(self.outcomes[scan]["status"])
+ "</td>"
)
html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>"
html_page = html_page + "</tr>"
for scan in self.outcomes:
if self.outcomes[scan]["status"] == "Passed":
html_page = html_page + "<tr>"
html_page = html_page + "<td>" + scan + "</td>"
html_page = (
html_page
+ "<td bgcolor='green'>"
+ str(self.outcomes[scan]["status"])
+ "</td>"
)
html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>"
html_page = html_page + "</tr>"
for scan in self.outcomes:
if self.outcomes[scan]["status"] == "No files":
html_page = html_page + "<tr>"
html_page = html_page + "<td>" + scan + "</td>"
html_page = (
html_page
+ "<td bgcolor='gray'>"
+ str(self.outcomes[scan]["status"])
+ "</td>"
)
html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>"
html_page = html_page + "</tr>"
html_page = html_page + "</tr></table></body></html>"
write_html_to_file(file_name, html_page)
......@@ -78,12 +78,13 @@ class ScanRunner:
snyk = SnykCheck()
sonar_scanner = SonarScannerCheck()
# This matrix should be revised and extended, it is just a proof of concept here as for now
init_dict = {
"terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"],
"yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"],
"shell": ["shellcheck", "git-leaks", "git-secrets"],
"python": ["pylint", "bandit", "pyup-safety"],
"ansible": ["ansible-lint", "steanounk-scanner"],
"ansible": ["ansible-lint", "steampunk-scanner"],
"java": ["checkstyle"],
"js": ["es-lint"],
"html": ["htmlhint"],
......@@ -131,7 +132,6 @@ class ScanRunner:
iac_file_local.write(iac_file.file.read())
iac_file_local.close()
self.iac_dir = unpack_archive_to_dir(iac_filename_local, None)
# print(self.compatiblity.check_iac_type(self.iac_dir))
remove(iac_filename_local)
except Exception as e:
raise Exception(f"Error when initializing IaC directory: {str(e)}.")
......@@ -155,11 +155,12 @@ class ScanRunner:
dt = datetime.now()
ts = datetime.timestamp(dt)
dir_name = "scan_run_" + str(ts)
dir_name = "../outputs/logs/scan_run_" + str(ts)
os.mkdir(dir_name)
compatible_checks = self.checker.get_all_compatible_checks(self.iac_dir)
non_compatible_checks = list()
if scan_response_type == ScanResponseType.json:
scan_output = {}
......@@ -168,13 +169,12 @@ class ScanRunner:
if selected_checks:
for selected_check in selected_checks:
check = self.iac_checks[selected_check]
print("NEW CHECK")
print(check)
if check.enabled:
if selected_check in compatible_checks:
print("Selected:")
print(selected_check)
check_output = check.run(self.iac_dir)
print("compatible------")
if scan_response_type == ScanResponseType.json:
scan_output[selected_check] = check_output.to_dict()
......@@ -184,11 +184,30 @@ class ScanRunner:
write_string_to_file(
check.name, dir_name, scan_output[check.name]["output"]
)
self.results_summary.summarize_outcome(
selected_check, scan_output[check.name]["output"]
selected_check,
scan_output[check.name]["output"],
self.checker.scanned_files,
self.checker.compatibility_matrix,
)
self.results_summary.show_outcomes()
else:
non_compatible_checks.append(check.name)
write_string_to_file(check.name, dir_name, "No files to scan")
print("NO SCAN")
self.results_summary.summarize_no_files(check.name)
print(self.checker.scanned_files)
print("Non executed checks")
print(non_compatible_checks)
print(self.results_summary.show_outcomes())
self.results_summary.dump_outcomes(str(ts))
self.results_summary.generate_html_prioritized(str(ts))
else:
for iac_check in self.iac_checks.values():
......@@ -202,7 +221,7 @@ class ScanRunner:
)
write_string_to_file(
iac_check.name, dir_name, scan_output[iac_heck.name]["output"]
iac_check.name, dir_name, scan_output[iac_check.name]["output"]
)
return scan_output
......
......@@ -87,3 +87,15 @@ def write_string_to_file(check_name: str, dir_name: str, output_value: str):
file_name = dir_name + "/" + check_name + ".txt"
with open(file_name, "w") as text_file:
text_file.write(output_value)
def write_html_to_file(file_name: str, output_value: str):
"""
Writes string to given file inside specified directory
:param check_name: Name of the check
:param output_dir: Directory where log will be stored
:param output_value: Content written to given file
"""
file_name = "../outputs/generated_html/" + file_name + ".html"
with open(file_name, "w") as text_file:
text_file.write(output_value)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment