diff --git a/src/iac_scan_runner/compatibility.py b/src/iac_scan_runner/compatibility.py index 2e82ccae6339e17d12bf1094c2081f44bb91fcf6..65158daf68b1b5a7b27fa010e9e14136569343d3 100644 --- a/src/iac_scan_runner/compatibility.py +++ b/src/iac_scan_runner/compatibility.py @@ -2,80 +2,76 @@ import os class Compatibility: - def __init__(self, matrix: dict): + # TODO: This matrix should be revised and extended, it is just a proof of concept here as for now + compatibility_matrix = { + "terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"], + "yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"], + "shell": ["shellcheck", "git-leaks", "git-secrets"], + "python": ["pylint", "bandit", "pyup-safety"], + "ansible": ["ansible-lint", "steampunk-scanner"], + "java": ["checkstyle"], + "js": ["es-lint"], + "html": ["htmlhint"], + "docker": ["hadolint"], + } + + def __init__(self): """ Initialize new IaC Compatibility matrix - :param matrix: dictionary of available checks for given Iac type + :param matrix: Dictionary of available checks for given IaC type """ - self.compatibility_matrix = matrix self.scanned_files = dict() def get_check_list(self, iac_type: str) -> list: """ Returns the list of available scanner check tools for given type of IaC archive - :return: list object conatining string names of checks + :iac_type: Type of IaC file for which we consider the list of compatible scans + :return: List with names of checks as strings """ return self.compatibility_matrix[iac_type] - def check_iac_type(self, iac_directory: str): - """Check the type of iac archive - :param iac_dircetory: Extracted iac archive path" - :return: Specific type of iac" + def check_iac_type(self, iac_directory: str) -> list: + """Check the type of IaC archive + :param iac_dircetory: Extracted IaC archive path + :return: List of specific file types within the given IaC directory """ - terraform = False - shell = False - py = False - yaml = False - java = False - html = False - types = list() + types = [] - scanned_terraform = list() - scanned_shell = list() - scanned_py = list() - scanned_yaml = list() - scanned_java = list() - scanned_html = list() + scanned_terraform = [] + scanned_shell = [] + scanned_py = [] + scanned_yaml = [] + scanned_java = [] + scanned_html = [] + # TODO: List of supported file types should be extended try: for filename in os.listdir(iac_directory): f = os.path.join(iac_directory, filename) if os.path.isfile(f): if f.find(".tf") > -1: - # and (terraform is False): types.append("terraform") - terraform = True scanned_terraform.append(filename) if f.find(".sh") > -1: - # and (shell is False): types.append("shell") - shell = True scanned_shell.append(filename) if f.find(".py") > -1: - # and (py is False): types.append("python") - py = True scanned_py.append(filename) if f.find(".yaml") > -1: - # and (yaml is False): types.append("yaml") - yaml = True scanned_yaml.append(filename) if f.find(".java") > -1: - # and (yaml is False): types.append("java") - java = True scanned_java.append(filename) if f.find(".html") > -1: - # and (yaml is False): types.append("html") - html = True scanned_html.append(filename) self.scanned_files["terraform"] = str(scanned_terraform) @@ -91,14 +87,14 @@ class Compatibility: def get_all_compatible_checks(self, iac_directory: str) -> list: """ Returns the list of available scanner check tools for given type of IaC archive - :return: list object conatining string names of checks + :param iac_dircetory: Extracted IaC archive path + :return: List with names of compatible checks as strings """ - checks_list = list() + checks_list = [] types_list = self.check_iac_type(iac_directory) for iac_type in types_list: type_checks = self.compatibility_matrix[iac_type] for check in type_checks: checks_list.append(check) - print(checks_list) return checks_list diff --git a/src/iac_scan_runner/results_summary.py b/src/iac_scan_runner/results_summary.py index ec91afcba56061d4e49cf92cf50885b0e4f379b5..c707446264700b053be7d4a4e3229e982dad28a9 100644 --- a/src/iac_scan_runner/results_summary.py +++ b/src/iac_scan_runner/results_summary.py @@ -1,15 +1,11 @@ -import os import json - - from iac_scan_runner.utils import write_html_to_file - class ResultsSummary: def __init__(self): """ Initialize new IaC Compatibility matrix - :param matrix: dictionary of available checks for given Iac type + :param matrix: dictionary of available checks for given IaC type """ self.outcomes = dict() @@ -22,12 +18,15 @@ class ResultsSummary: def set_check_outcome(self, check_name: str, outcome: str, file_list: str): """ - Returns the list of available scanner check tools for given type of IaC archive - :return: list object conatining string names of checks + Sets the outcome and list of scanned files for specific scanning tool + :param check_name: Name of a scan tool + :param outcome: Scan verdict - passed, failed or no files scanned + :param file_list: List of files that were scanned using the particular tool """ - self.outcomes[check] = {} + self.outcomes[check_name] = {} outcomes[check_name]["status"] = outcome - + outcomes[check_name]["files"] = outcome + def summarize_outcome( self, check: str, outcome: str, scanned_files: dict, compatibility_matrix: dict ) -> str: @@ -44,7 +43,8 @@ class ResultsSummary: file_list = str(scanned_files[t]) self.outcomes[check]["files"] = file_list - + + # TODO: This part should be extended to cover all relevant cases if check == "tfsec": if outcome.find("No problems detected!") > -1: self.outcomes[check]["status"] = "Passed" @@ -70,21 +70,36 @@ class ResultsSummary: return "Problems" def summarize_no_files(self, check: str): + """ + Sets the outcome of the selected check to "no files" case + :param check: Name of the considered check of interest + """ self.outcomes[check] = {} self.outcomes[check]["status"] = "No files" self.outcomes[check]["log"] = "" self.outcomes[check]["files"] = "" def show_outcomes(self): + """ + Prints out current summary of the performed checks containing the log and list of scanned files + """ print(self.outcomes) def dump_outcomes(self, file_name: str): + """ + Summarizes scan results into JSON file + :param file_name: Name of the generated JSON file containing the scan summary + """ file_path = "../outputs/json_dumps/" + file_name + ".json" with open(file_path, "w") as fp: json.dump(self.outcomes, fp) def generate_html_prioritized(self, file_name: str): + """ + Summarizes scan results into HTML file + :param file_name: Name of the generated HTML file containing the page summary + """ html_page = "<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style> <body> <h2>Scan results</h2> <table style='width:100%'> <tr> <th>Scan</th><th>Status</th><th>Files</th><th>Log</th> </tr>" # parse scans for scan in self.outcomes: diff --git a/src/iac_scan_runner/scan_runner.py b/src/iac_scan_runner/scan_runner.py index d9b52d93f8694b0f2bdb809c2baa3d692ea74f8c..943fbbfbe9c3dd80c08e9d87e2be217dc1f09ecb 100644 --- a/src/iac_scan_runner/scan_runner.py +++ b/src/iac_scan_runner/scan_runner.py @@ -78,20 +78,7 @@ class ScanRunner: snyk = SnykCheck() sonar_scanner = SonarScannerCheck() - # This matrix should be revised and extended, it is just a proof of concept here as for now - init_dict = { - "terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"], - "yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"], - "shell": ["shellcheck", "git-leaks", "git-secrets"], - "python": ["pylint", "bandit", "pyup-safety"], - "ansible": ["ansible-lint", "steampunk-scanner"], - "java": ["checkstyle"], - "js": ["es-lint"], - "html": ["htmlhint"], - "docker": ["hadolint"], - } - - self.checker = Compatibility(init_dict) + self.checker = Compatibility() self.results_summary = ResultsSummary() self.iac_checks = { @@ -167,8 +154,6 @@ class ScanRunner: if selected_checks: for selected_check in selected_checks: check = self.iac_checks[selected_check] - print("NEW CHECK") - print(check) if check.enabled: if selected_check in compatible_checks: @@ -187,22 +172,16 @@ class ScanRunner: selected_check, scan_output[check.name]["output"], self.checker.scanned_files, - self.checker.compatibility_matrix, + Compatibility.compatibility_matrix, ) else: non_compatible_checks.append(check.name) write_string_to_file(check.name, dir_name, "No files to scan") - print("NO SCAN") - self.results_summary.summarize_no_files(check.name) - print(self.checker.scanned_files) - print("Non executed checks") - print(non_compatible_checks) - print(self.results_summary.show_outcomes()) self.results_summary.dump_outcomes(random_uuid) self.results_summary.generate_html_prioritized(random_uuid)