diff --git a/src/iac_scan_runner/test/test_scanner_simple.py b/examples/test_scanner_simple.py similarity index 63% rename from src/iac_scan_runner/test/test_scanner_simple.py rename to examples/test_scanner_simple.py index 3614bec2211ba53584a5f36a841db72b2873c55d..4f6ceb18e3c69c1213ee4b6f7ec4d5776d3b15fc 100644 --- a/src/iac_scan_runner/test/test_scanner_simple.py +++ b/examples/test_scanner_simple.py @@ -1,9 +1,11 @@ import requests +import sys URL = "http://127.0.0.1:8000/scan" + multipart_form_data = { - "iac": ("hello-world.zip", open("hello-world.zip", "rb")), - "checks": (None, "git-leaks,tfsec,tflint,shellcheck"), + "iac": (sys.argv[1], open(sys.argv[1], "rb")), + "checks": (None, sys.argv[2]), } response = requests.post(URL, files=multipart_form_data) print(response.json()) diff --git a/outputs/generated_html/README b/outputs/generated_html/README new file mode 100644 index 0000000000000000000000000000000000000000..781acfd5d6cea98d6109d74a75b12954d41cb160 --- /dev/null +++ b/outputs/generated_html/README @@ -0,0 +1 @@ +Generated HTML pages are stored here diff --git a/outputs/json_dumps/README b/outputs/json_dumps/README new file mode 100644 index 0000000000000000000000000000000000000000..25f79dd06bce810466f56e3a9ebff62684a68dc3 --- /dev/null +++ b/outputs/json_dumps/README @@ -0,0 +1 @@ +JSON scan summaries are stored here diff --git a/outputs/logs/README b/outputs/logs/README new file mode 100644 index 0000000000000000000000000000000000000000..b47e3eb053cf3a8dd86cb133efb77a6441732e55 --- /dev/null +++ b/outputs/logs/README @@ -0,0 +1 @@ +Individual scan tool logs are dumped here diff --git a/src/iac_scan_runner/compatibility.py b/src/iac_scan_runner/compatibility.py index e2cc2f7bc60616427d6f207910db60a9bd75bfa7..2e82ccae6339e17d12bf1094c2081f44bb91fcf6 100644 --- a/src/iac_scan_runner/compatibility.py +++ b/src/iac_scan_runner/compatibility.py @@ -8,6 +8,7 @@ class Compatibility: :param matrix: dictionary of available checks for given Iac type """ self.compatibility_matrix = matrix + self.scanned_files = dict() def get_check_list(self, iac_type: str) -> list: """ @@ -25,25 +26,64 @@ class Compatibility: shell = False py = False yaml = False + java = False + html = False types = list() + + scanned_terraform = list() + scanned_shell = list() + scanned_py = list() + scanned_yaml = list() + scanned_java = list() + scanned_html = list() + try: for filename in os.listdir(iac_directory): f = os.path.join(iac_directory, filename) if os.path.isfile(f): - if f.find(".tf") > -1 and (terraform is False): + if f.find(".tf") > -1: + # and (terraform is False): types.append("terraform") terraform = True - if f.find(".sh") > -1 and (shell is False): + scanned_terraform.append(filename) + + if f.find(".sh") > -1: + # and (shell is False): types.append("shell") shell = True - if f.find(".py") > -1 and (py is False): + scanned_shell.append(filename) + + if f.find(".py") > -1: + # and (py is False): types.append("python") py = True - if f.find(".yaml") > -1 and (yaml is False): + scanned_py.append(filename) + + if f.find(".yaml") > -1: + # and (yaml is False): types.append("yaml") yaml = True + scanned_yaml.append(filename) + + if f.find(".java") > -1: + # and (yaml is False): + types.append("java") + java = True + scanned_java.append(filename) + + if f.find(".html") > -1: + # and (yaml is False): + types.append("html") + html = True + scanned_html.append(filename) + self.scanned_files["terraform"] = str(scanned_terraform) + self.scanned_files["python"] = str(scanned_py) + self.scanned_files["shell"] = str(scanned_shell) + self.scanned_files["yaml"] = str(scanned_yaml) + self.scanned_files["java"] = str(scanned_java) + self.scanned_files["html"] = str(scanned_html) return types except Exception as e: raise Exception(f"Error when checking directory type: {str(e)}.") diff --git a/src/iac_scan_runner/results_summary.py b/src/iac_scan_runner/results_summary.py index d13657253caf0012f3cead0ae5167422a88cccdd..ec91afcba56061d4e49cf92cf50885b0e4f379b5 100644 --- a/src/iac_scan_runner/results_summary.py +++ b/src/iac_scan_runner/results_summary.py @@ -2,6 +2,9 @@ import os import json +from iac_scan_runner.utils import write_html_to_file + + class ResultsSummary: def __init__(self): """ @@ -15,49 +18,123 @@ class ResultsSummary: Returns the list of available scanner check tools for given type of IaC archive :return: list object conatining string names of checks """ - return self.outcomes[check_name] + return self.outcomes[check_name]["status"] - def set_check_outcome(self, check_name: str, outcome: bool): + def set_check_outcome(self, check_name: str, outcome: str, file_list: str): """ Returns the list of available scanner check tools for given type of IaC archive :return: list object conatining string names of checks """ - outcomes[check_name] = outcome + self.outcomes[check] = {} + outcomes[check_name]["status"] = outcome - def summarize_outcome(self, check: str, outcome: str) -> bool: + def summarize_outcome( + self, check: str, outcome: str, scanned_files: dict, compatibility_matrix: dict + ) -> str: """Summarize the check result to True/False depending on the return tool output :param check: Name of the considered check of interest :return: Whether the check passed (True) or failed (False) """ + self.outcomes[check] = {} + self.outcomes[check]["log"] = outcome + + file_list = "" + for t in compatibility_matrix: + if check in compatibility_matrix[t]: + file_list = str(scanned_files[t]) + + self.outcomes[check]["files"] = file_list + if check == "tfsec": if outcome.find("No problems detected!") > -1: - self.outcomes[check] = True - return True + self.outcomes[check]["status"] = "Passed" + return "Passed" else: - self.outcomes[check] = False - return False + self.outcomes[check]["status"] = "Problems" + return "Problems" if check == "git-leaks": if outcome.find("No leaks found") > -1: - self.outcomes[check] = True - return True + self.outcomes[check]["status"] = "Passed" + return "Passed" else: - self.outcomes[check] = False - return False + self.outcomes[check]["status"] = "Problems" + return "Problems" if check == "tflint": if outcome == "": - self.outcomes[check] = True - return True + self.outcomes[check]["status"] = "Passed" + return "Passed" else: - self.outcomes[check] = False - return False + self.outcomes[check]["status"] = "Problems" + return "Problems" + + def summarize_no_files(self, check: str): + self.outcomes[check] = {} + self.outcomes[check]["status"] = "No files" + self.outcomes[check]["log"] = "" + self.outcomes[check]["files"] = "" def show_outcomes(self): print(self.outcomes) def dump_outcomes(self, file_name: str): - file_path = "json_dumps/" + file_name + ".json" + file_path = "../outputs/json_dumps/" + file_name + ".json" with open(file_path, "w") as fp: json.dump(self.outcomes, fp) + + def generate_html_prioritized(self, file_name: str): + html_page = "<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style> <body> <h2>Scan results</h2> <table style='width:100%'> <tr> <th>Scan</th><th>Status</th><th>Files</th><th>Log</th> </tr>" + # parse scans + for scan in self.outcomes: + + if self.outcomes[scan]["status"] == "Problems": + + html_page = html_page + "<tr>" + html_page = html_page + "<td>" + scan + "</td>" + html_page = ( + html_page + + "<td bgcolor='red'>" + + str(self.outcomes[scan]["status"]) + + "</td>" + ) + + html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>" + html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>" + html_page = html_page + "</tr>" + + for scan in self.outcomes: + + if self.outcomes[scan]["status"] == "Passed": + html_page = html_page + "<tr>" + html_page = html_page + "<td>" + scan + "</td>" + html_page = ( + html_page + + "<td bgcolor='green'>" + + str(self.outcomes[scan]["status"]) + + "</td>" + ) + + html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>" + html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>" + html_page = html_page + "</tr>" + + for scan in self.outcomes: + + if self.outcomes[scan]["status"] == "No files": + html_page = html_page + "<tr>" + html_page = html_page + "<td>" + scan + "</td>" + html_page = ( + html_page + + "<td bgcolor='gray'>" + + str(self.outcomes[scan]["status"]) + + "</td>" + ) + html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>" + html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>" + html_page = html_page + "</tr>" + + html_page = html_page + "</tr></table></body></html>" + + write_html_to_file(file_name, html_page) diff --git a/src/iac_scan_runner/scan_runner.py b/src/iac_scan_runner/scan_runner.py index 0400571665a6d40e6efd76728a59dc55962b5c49..72659afc388be6fdaf539360de815b4dc5cb3ff2 100644 --- a/src/iac_scan_runner/scan_runner.py +++ b/src/iac_scan_runner/scan_runner.py @@ -78,12 +78,13 @@ class ScanRunner: snyk = SnykCheck() sonar_scanner = SonarScannerCheck() + # This matrix should be revised and extended, it is just a proof of concept here as for now init_dict = { "terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"], "yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"], "shell": ["shellcheck", "git-leaks", "git-secrets"], "python": ["pylint", "bandit", "pyup-safety"], - "ansible": ["ansible-lint", "steanounk-scanner"], + "ansible": ["ansible-lint", "steampunk-scanner"], "java": ["checkstyle"], "js": ["es-lint"], "html": ["htmlhint"], @@ -131,7 +132,6 @@ class ScanRunner: iac_file_local.write(iac_file.file.read()) iac_file_local.close() self.iac_dir = unpack_archive_to_dir(iac_filename_local, None) - # print(self.compatiblity.check_iac_type(self.iac_dir)) remove(iac_filename_local) except Exception as e: raise Exception(f"Error when initializing IaC directory: {str(e)}.") @@ -155,11 +155,12 @@ class ScanRunner: dt = datetime.now() ts = datetime.timestamp(dt) - dir_name = "scan_run_" + str(ts) + dir_name = "../outputs/logs/scan_run_" + str(ts) os.mkdir(dir_name) compatible_checks = self.checker.get_all_compatible_checks(self.iac_dir) + non_compatible_checks = list() if scan_response_type == ScanResponseType.json: scan_output = {} @@ -168,13 +169,12 @@ class ScanRunner: if selected_checks: for selected_check in selected_checks: check = self.iac_checks[selected_check] - + print("NEW CHECK") + print(check) if check.enabled: if selected_check in compatible_checks: - print("Selected:") - print(selected_check) + check_output = check.run(self.iac_dir) - print("compatible------") if scan_response_type == ScanResponseType.json: scan_output[selected_check] = check_output.to_dict() @@ -184,11 +184,30 @@ class ScanRunner: write_string_to_file( check.name, dir_name, scan_output[check.name]["output"] ) + self.results_summary.summarize_outcome( - selected_check, scan_output[check.name]["output"] + selected_check, + scan_output[check.name]["output"], + self.checker.scanned_files, + self.checker.compatibility_matrix, ) - self.results_summary.show_outcomes() - self.results_summary.dump_outcomes(str(ts)) + else: + non_compatible_checks.append(check.name) + + write_string_to_file(check.name, dir_name, "No files to scan") + + print("NO SCAN") + + self.results_summary.summarize_no_files(check.name) + + print(self.checker.scanned_files) + print("Non executed checks") + print(non_compatible_checks) + + print(self.results_summary.show_outcomes()) + self.results_summary.dump_outcomes(str(ts)) + self.results_summary.generate_html_prioritized(str(ts)) + else: for iac_check in self.iac_checks.values(): @@ -202,7 +221,7 @@ class ScanRunner: ) write_string_to_file( - iac_check.name, dir_name, scan_output[iac_heck.name]["output"] + iac_check.name, dir_name, scan_output[iac_check.name]["output"] ) return scan_output diff --git a/src/iac_scan_runner/utils.py b/src/iac_scan_runner/utils.py index 865a34a5c6bb31de3ea7b48e80895261792947e4..3b993c0647948f5160e0c1b4aff2598ab920a9b8 100644 --- a/src/iac_scan_runner/utils.py +++ b/src/iac_scan_runner/utils.py @@ -87,3 +87,15 @@ def write_string_to_file(check_name: str, dir_name: str, output_value: str): file_name = dir_name + "/" + check_name + ".txt" with open(file_name, "w") as text_file: text_file.write(output_value) + +def write_html_to_file(file_name: str, output_value: str): + """ + Writes string to given file inside specified directory + :param check_name: Name of the check + :param output_dir: Directory where log will be stored + :param output_value: Content written to given file + """ + file_name = "../outputs/generated_html/" + file_name + ".html" + with open(file_name, "w") as text_file: + text_file.write(output_value) +