From 4022c7af97f562a600adc657ec40d0e3f7946897 Mon Sep 17 00:00:00 2001 From: penenadpi <penenadpi@gmail.com> Date: Fri, 12 Aug 2022 05:01:11 -0400 Subject: [PATCH] Auto output directory, refactoring, TODOs, try-catch Within this commit we're applying the following changes: -HTML code formatting and additional docstring comments -Automatic outputs directory creation fix -Resolving formatting issues and TODOs -Try-catch blocks for file writing -Adding TODOs --- install-checks.sh | 8 +++ outputs/generated_html/README | 1 - outputs/json_dumps/README | 1 - outputs/logs/README | 1 - src/iac_scan_runner/compatibility.py | 11 ++-- src/iac_scan_runner/results_summary.py | 47 ++++++++-------- src/iac_scan_runner/scan_runner.py | 78 +++++++------------------- src/iac_scan_runner/utils.py | 26 ++++----- 8 files changed, 69 insertions(+), 104 deletions(-) delete mode 100644 outputs/generated_html/README delete mode 100644 outputs/json_dumps/README delete mode 100644 outputs/logs/README diff --git a/install-checks.sh b/install-checks.sh index 843cc1d..9806812 100755 --- a/install-checks.sh +++ b/install-checks.sh @@ -5,6 +5,10 @@ export ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" export VIRTUALENV_DIR="${ROOT_DIR}/.venv" export TOOLS_DIR="${ROOT_DIR}/tools" +export OUT_DIR="${ROOT_DIR}/outputs" +export HTML_DIR="${OUT_DIR}/generated_html" +export JSON_DIR="${OUT_DIR}/json_dumps" +export LOG_DIR="${OUT_DIR}/logs" export TMP_DIR="${TOOLS_DIR}/tmp" export NODE_MODULES_DIR="${ROOT_DIR}/node_modules" export CONFIG_DIR="${ROOT_DIR}/config" @@ -153,6 +157,10 @@ createDirIfNot "${TOOLS_DIR}" createDirIfNot "${TMP_DIR}" createDirIfNot "${NODE_MODULES_DIR}" createDirIfNot "${CONFIG_DIR}" +createDirIfNot "${OUT_DIR}" +createDirIfNot "${HTML_DIR}" +createDirIfNot "${JSON_DIR}" +createDirIfNot "${LOG_DIR}" installPythonModules installRequiredNpmModulesIfNot downloadCheckStyleJarIfNot diff --git a/outputs/generated_html/README b/outputs/generated_html/README deleted file mode 100644 index 781acfd..0000000 --- a/outputs/generated_html/README +++ /dev/null @@ -1 +0,0 @@ -Generated HTML pages are stored here diff --git a/outputs/json_dumps/README b/outputs/json_dumps/README deleted file mode 100644 index 25f79dd..0000000 --- a/outputs/json_dumps/README +++ /dev/null @@ -1 +0,0 @@ -JSON scan summaries are stored here diff --git a/outputs/logs/README b/outputs/logs/README deleted file mode 100644 index b47e3eb..0000000 --- a/outputs/logs/README +++ /dev/null @@ -1 +0,0 @@ -Individual scan tool logs are dumped here diff --git a/src/iac_scan_runner/compatibility.py b/src/iac_scan_runner/compatibility.py index 65158da..d82e897 100644 --- a/src/iac_scan_runner/compatibility.py +++ b/src/iac_scan_runner/compatibility.py @@ -1,5 +1,5 @@ import os - +from typing import List class Compatibility: # TODO: This matrix should be revised and extended, it is just a proof of concept here as for now @@ -10,7 +10,7 @@ class Compatibility: "python": ["pylint", "bandit", "pyup-safety"], "ansible": ["ansible-lint", "steampunk-scanner"], "java": ["checkstyle"], - "js": ["es-lint"], + "js": ["es-lint", "ts-lint"], "html": ["htmlhint"], "docker": ["hadolint"], } @@ -22,7 +22,7 @@ class Compatibility: """ self.scanned_files = dict() - def get_check_list(self, iac_type: str) -> list: + def get_check_list(self, iac_type: str) -> List[str]: """ Returns the list of available scanner check tools for given type of IaC archive :iac_type: Type of IaC file for which we consider the list of compatible scans @@ -30,7 +30,7 @@ class Compatibility: """ return self.compatibility_matrix[iac_type] - def check_iac_type(self, iac_directory: str) -> list: + def check_iac_type(self, iac_directory: str) -> List[str]: """Check the type of IaC archive :param iac_dircetory: Extracted IaC archive path :return: List of specific file types within the given IaC directory @@ -46,6 +46,7 @@ class Compatibility: scanned_html = [] # TODO: List of supported file types should be extended + # TODO: Remove hardcoded check names try: for filename in os.listdir(iac_directory): f = os.path.join(iac_directory, filename) @@ -84,7 +85,7 @@ class Compatibility: except Exception as e: raise Exception(f"Error when checking directory type: {str(e)}.") - def get_all_compatible_checks(self, iac_directory: str) -> list: + def get_all_compatible_checks(self, iac_directory: str) -> List[str]: """ Returns the list of available scanner check tools for given type of IaC archive :param iac_dircetory: Extracted IaC archive path diff --git a/src/iac_scan_runner/results_summary.py b/src/iac_scan_runner/results_summary.py index c707446..8256e61 100644 --- a/src/iac_scan_runner/results_summary.py +++ b/src/iac_scan_runner/results_summary.py @@ -32,7 +32,7 @@ class ResultsSummary: ) -> str: """Summarize the check result to True/False depending on the return tool output :param check: Name of the considered check of interest - :return: Whether the check passed (True) or failed (False) + :return: Whether the check passed or problems were detected """ self.outcomes[check] = {} self.outcomes[check]["log"] = outcome @@ -44,7 +44,8 @@ class ResultsSummary: self.outcomes[check]["files"] = file_list - # TODO: This part should be extended to cover all relevant cases + # TODO: This part should be extended to cover all relevant cases and code refactored + # TODO: The check names hould not be hardcoded but replaced with parametrized values instead if check == "tfsec": if outcome.find("No problems detected!") > -1: self.outcomes[check]["status"] = "Passed" @@ -74,6 +75,7 @@ class ResultsSummary: Sets the outcome of the selected check to "no files" case :param check: Name of the considered check of interest """ + # TODO: Fields should be extracted into new Python object, probably extending the existing CheckOutput self.outcomes[check] = {} self.outcomes[check]["status"] = "No files" self.outcomes[check]["log"] = "" @@ -90,30 +92,35 @@ class ResultsSummary: Summarizes scan results into JSON file :param file_name: Name of the generated JSON file containing the scan summary """ + #TODO: Replace hardcoded path with parameter file_path = "../outputs/json_dumps/" + file_name + ".json" - with open(file_path, "w") as fp: - json.dump(self.outcomes, fp) + try: + with open(file_path, "w") as fp: + json.dump(self.outcomes, fp) + except Exception as e: + raise Exception(f"Error dumping json of scan results: {str(e)}.") + def generate_html_prioritized(self, file_name: str): """ - Summarizes scan results into HTML file + Summarizes scan results (status, list of scanned files and scan tool log) into HTML file with the following visualization ordering: 1) problems detected 2) passed 3) no files scanned :param file_name: Name of the generated HTML file containing the page summary """ - html_page = "<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style> <body> <h2>Scan results</h2> <table style='width:100%'> <tr> <th>Scan</th><th>Status</th><th>Files</th><th>Log</th> </tr>" - # parse scans + html_page = ( + "<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style>" + + "<body> <h2>Scan results</h2>" + + "<table style='width:100%'>" + + "<tr> <th>Scan</th> <th>Status</th> <th>Files</th> <th>Log</th> </tr>" + ) + for scan in self.outcomes: if self.outcomes[scan]["status"] == "Problems": html_page = html_page + "<tr>" html_page = html_page + "<td>" + scan + "</td>" - html_page = ( - html_page - + "<td bgcolor='red'>" - + str(self.outcomes[scan]["status"]) - + "</td>" - ) + html_page = html_page + "<td bgcolor='red'>" + str(self.outcomes[scan]["status"]) + "</td>" html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>" html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>" @@ -124,12 +131,7 @@ class ResultsSummary: if self.outcomes[scan]["status"] == "Passed": html_page = html_page + "<tr>" html_page = html_page + "<td>" + scan + "</td>" - html_page = ( - html_page - + "<td bgcolor='green'>" - + str(self.outcomes[scan]["status"]) - + "</td>" - ) + html_page = html_page + "<td bgcolor='green'>" + str(self.outcomes[scan]["status"]) + "</td>" html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>" html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>" @@ -140,12 +142,7 @@ class ResultsSummary: if self.outcomes[scan]["status"] == "No files": html_page = html_page + "<tr>" html_page = html_page + "<td>" + scan + "</td>" - html_page = ( - html_page - + "<td bgcolor='gray'>" - + str(self.outcomes[scan]["status"]) - + "</td>" - ) + html_page = html_page + "<td bgcolor='gray'>" + str(self.outcomes[scan]["status"]) + "</td>" html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>" html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>" html_page = html_page + "</tr>" diff --git a/src/iac_scan_runner/scan_runner.py b/src/iac_scan_runner/scan_runner.py index 943fbbf..48387fb 100644 --- a/src/iac_scan_runner/scan_runner.py +++ b/src/iac_scan_runner/scan_runner.py @@ -39,17 +39,16 @@ from iac_scan_runner.utils import ( write_string_to_file, ) from pydantic import SecretStr - - import uuid import os - class ScanRunner: def __init__(self): """Initialize new scan runner that can perform IaC scanning with multiple IaC checks""" self.iac_checks = {} self.iac_dir = None + self.compatibility_matrix = Compatibility() + self.results_summary = ResultsSummary() def init_checks(self): """Initiate predefined check objects""" @@ -78,9 +77,6 @@ class ScanRunner: snyk = SnykCheck() sonar_scanner = SonarScannerCheck() - self.checker = Compatibility() - self.results_summary = ResultsSummary() - self.iac_checks = { opera_tosca_parser.name: opera_tosca_parser, ansible_lint.name: ansible_lint, @@ -105,7 +101,7 @@ class ScanRunner: cloc.name: cloc, checkstyle.name: checkstyle, snyk.name: snyk, - sonar_scanner.name: sonar_scanner, + sonar_scanner.name: sonar_scanner } def _init_iac_dir(self, iac_file: UploadFile): @@ -130,9 +126,7 @@ class ScanRunner: except Exception as e: raise Exception(f"Error when cleaning IaC directory: {str(e)}.") - def _run_checks( - self, selected_checks: Optional[List], scan_response_type: ScanResponseType - ) -> Union[dict, str]: + def _run_checks(self, selected_checks: Optional[List], scan_response_type: ScanResponseType) -> Union[dict, str]: """ Run the specified IaC checks :param selected_checks: List of selected checks to be executed on IaC @@ -140,12 +134,13 @@ class ScanRunner: :return: Dict or string with output for running checks """ random_uuid = str(uuid.uuid4()) + # TODO: Replace this hardcoded path with a parameter dir_name = "../outputs/logs/scan_run_" + random_uuid os.mkdir(dir_name) - compatible_checks = self.checker.get_all_compatible_checks(self.iac_dir) - non_compatible_checks = list() + compatible_checks = self.compatibility_matrix.get_all_compatible_checks(self.iac_dir) + non_compatible_checks = [] if scan_response_type == ScanResponseType.json: scan_output = {} @@ -156,47 +151,36 @@ class ScanRunner: check = self.iac_checks[selected_check] if check.enabled: if selected_check in compatible_checks: - check_output = check.run(self.iac_dir) - if scan_response_type == ScanResponseType.json: scan_output[selected_check] = check_output.to_dict() else: + # TODO: Discuss the format of this output scan_output += f"### {selected_check} ###\n{check_output.to_string()}\n\n" - write_string_to_file( - check.name, dir_name, scan_output[check.name]["output"] - ) - - self.results_summary.summarize_outcome( - selected_check, - scan_output[check.name]["output"], - self.checker.scanned_files, - Compatibility.compatibility_matrix, - ) + write_string_to_file(check.name, dir_name, scan_output[check.name]["output"]) + self.results_summary.summarize_outcome(selected_check, scan_output[check.name]["output"], self.compatibility_matrix.scanned_files, Compatibility.compatibility_matrix) + else: non_compatible_checks.append(check.name) - write_string_to_file(check.name, dir_name, "No files to scan") - self.results_summary.summarize_no_files(check.name) - self.results_summary.dump_outcomes(random_uuid) self.results_summary.generate_html_prioritized(random_uuid) else: for iac_check in self.iac_checks.values(): - if iac_check.enabled: check_output = iac_check.run(self.iac_dir) if scan_response_type == ScanResponseType.json: scan_output[iac_check.name] = check_output.to_dict() else: + # TODO: Discuss the format of this output scan_output += ( f"### {iac_check.name} ###\n{check_output.to_string()}\n\n" ) - + # TODO: Discuss the format of this output write_string_to_file( iac_check.name, dir_name, scan_output[iac_check.name]["output"] ) @@ -235,12 +219,7 @@ class ScanRunner: else: raise Exception(f"Nonexistent check: {check_name}") - def configure_check( - self, - check_name: str, - config_file: Optional[UploadFile], - secret: Optional[SecretStr], - ) -> str: + def configure_check(self, check_name: str, config_file: Optional[UploadFile], secret: Optional[SecretStr]) -> str: """ Configures the selected check with the supplied optional configuration file or/and secret :param check_name: Name of the check @@ -253,9 +232,7 @@ class ScanRunner: if check.enabled: config_filename_local = None if config_file: - config_filename_local = generate_random_pathname( - "", "-" + config_file.filename - ) + config_filename_local = generate_random_pathname("", "-" + config_file.filename) with open( f"{env.CONFIG_DIR}/{config_filename_local}", "wb+" ) as config_file_local: @@ -265,11 +242,9 @@ class ScanRunner: check.configured = True return check_output.output else: - raise Exception( - f"Check: {check_name} is disabled. You need to enable it first." - ) + raise Exception(f'Check: {check_name} is disabled. You need to enable it first.') else: - raise Exception(f"Nonexistent check: {check_name}") + raise Exception(f'Nonexistent check: {check_name}') def scan_iac( self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType @@ -281,22 +256,11 @@ class ScanRunner: :param scan_response_type: Scan response type (JSON or HTML) :return: Dict or string with scan result """ - nonexistent_checks = list( - set(checks) - - set( - map( - lambda check: check.name, - filter( - lambda check: check.enabled and check.configured, - self.iac_checks.values(), - ), - ) - ) - ) + nonexistent_checks = list(set(checks) - set( + map(lambda check: check.name, + filter(lambda check: check.enabled and check.configured, self.iac_checks.values())))) if nonexistent_checks: - raise Exception( - f"Nonexistent, disabled or un-configured checks: {nonexistent_checks}." - ) + raise Exception(f'Nonexistent, disabled or un-configured checks: {nonexistent_checks}.') self._init_iac_dir(iac_file) scan_output = self._run_checks(checks, scan_response_type) diff --git a/src/iac_scan_runner/utils.py b/src/iac_scan_runner/utils.py index 3b993c0..6c90b05 100644 --- a/src/iac_scan_runner/utils.py +++ b/src/iac_scan_runner/utils.py @@ -17,12 +17,7 @@ def run_command(command: str, directory: str = ".") -> CheckOutput: :return: CheckOutput object """ try: - return CheckOutput( - check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode( - "utf-8" - ), - 0, - ) + return CheckOutput(check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode('utf-8'), 0) except CalledProcessError as e: return CheckOutput(str(e.output.decode("utf-8")), e.returncode) @@ -39,9 +34,7 @@ def determine_archive_format(archive_path: str) -> str: return "tar" else: raise Exception( - "Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format( - archive_path - ) + "Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format(archive_path) ) @@ -85,8 +78,11 @@ def write_string_to_file(check_name: str, dir_name: str, output_value: str): :param output_value: Content written to given file """ file_name = dir_name + "/" + check_name + ".txt" - with open(file_name, "w") as text_file: - text_file.write(output_value) + try: + with open(file_name, "w") as text_file: + text_file.write(output_value) + except Exception as e: + raise Exception(f"Error while writing string to file: {str(e)}.") def write_html_to_file(file_name: str, output_value: str): """ @@ -96,6 +92,8 @@ def write_html_to_file(file_name: str, output_value: str): :param output_value: Content written to given file """ file_name = "../outputs/generated_html/" + file_name + ".html" - with open(file_name, "w") as text_file: - text_file.write(output_value) - + try: + with open(file_name, "w") as text_file: + text_file.write(output_value) + except Exception as e: + raise Exception(f"Error storing HTML to file: {str(e)}.") -- GitLab