diff --git a/src/iac_scan_runner/compatibility.py b/src/iac_scan_runner/compatibility.py new file mode 100644 index 0000000000000000000000000000000000000000..e2cc2f7bc60616427d6f207910db60a9bd75bfa7 --- /dev/null +++ b/src/iac_scan_runner/compatibility.py @@ -0,0 +1,64 @@ +import os + + +class Compatibility: + def __init__(self, matrix: dict): + """ + Initialize new IaC Compatibility matrix + :param matrix: dictionary of available checks for given Iac type + """ + self.compatibility_matrix = matrix + + def get_check_list(self, iac_type: str) -> list: + """ + Returns the list of available scanner check tools for given type of IaC archive + :return: list object conatining string names of checks + """ + return self.compatibility_matrix[iac_type] + + def check_iac_type(self, iac_directory: str): + """Check the type of iac archive + :param iac_dircetory: Extracted iac archive path" + :return: Specific type of iac" + """ + terraform = False + shell = False + py = False + yaml = False + + types = list() + try: + for filename in os.listdir(iac_directory): + f = os.path.join(iac_directory, filename) + if os.path.isfile(f): + if f.find(".tf") > -1 and (terraform is False): + types.append("terraform") + terraform = True + if f.find(".sh") > -1 and (shell is False): + types.append("shell") + shell = True + if f.find(".py") > -1 and (py is False): + types.append("python") + py = True + if f.find(".yaml") > -1 and (yaml is False): + types.append("yaml") + yaml = True + + return types + except Exception as e: + raise Exception(f"Error when checking directory type: {str(e)}.") + + def get_all_compatible_checks(self, iac_directory: str) -> list: + """ + Returns the list of available scanner check tools for given type of IaC archive + :return: list object conatining string names of checks + """ + checks_list = list() + types_list = self.check_iac_type(iac_directory) + for iac_type in types_list: + type_checks = self.compatibility_matrix[iac_type] + for check in type_checks: + checks_list.append(check) + + print(checks_list) + return checks_list diff --git a/src/iac_scan_runner/scan_runner.py b/src/iac_scan_runner/scan_runner.py index def2b0ee4d27efd32ec6884779faa790f8f6f441..8bce7f7a11ba53c9ffd846c2a6c94086127a5f0d 100644 --- a/src/iac_scan_runner/scan_runner.py +++ b/src/iac_scan_runner/scan_runner.py @@ -4,6 +4,9 @@ from typing import Optional, List, Union import iac_scan_runner.vars as env from fastapi import UploadFile + +from iac_scan_runner.compatibility import Compatibility + from iac_scan_runner.checks.ansible_lint import AnsibleLintCheck from iac_scan_runner.checks.bandit import BanditCheck from iac_scan_runner.checks.checkstyle import CheckStyle @@ -29,10 +32,18 @@ from iac_scan_runner.checks.tfsec import TfsecCheck from iac_scan_runner.checks.ts_lint import TSLintCheck from iac_scan_runner.checks.yamllint import YamlLintCheck from iac_scan_runner.scan_response_type import ScanResponseType -from iac_scan_runner.utils import generate_random_pathname, unpack_archive_to_dir +from iac_scan_runner.utils import ( + generate_random_pathname, + unpack_archive_to_dir, + write_string_to_file, +) from pydantic import SecretStr +from datetime import datetime +import os + + class ScanRunner: def __init__(self): """Initialize new scan runner that can perform IaC scanning with multiple IaC checks""" @@ -66,6 +77,20 @@ class ScanRunner: snyk = SnykCheck() sonar_scanner = SonarScannerCheck() + init_dict = { + "terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"], + "yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"], + "shell": ["shellcheck", "git-leaks", "git-secrets"], + "python": ["pylint", "bandit", "pyup-safety"], + "ansible": ["ansible-lint", "steanounk-scanner"], + "java": ["checkstyle"], + "js": ["es-lint"], + "html": ["htmlhint"], + "docker": ["hadolint"], + } + + self.checker = Compatibility(init_dict) + self.iac_checks = { opera_tosca_parser.name: opera_tosca_parser, ansible_lint.name: ansible_lint, @@ -90,7 +115,7 @@ class ScanRunner: cloc.name: cloc, checkstyle.name: checkstyle, snyk.name: snyk, - sonar_scanner.name: sonar_scanner + sonar_scanner.name: sonar_scanner, } def _init_iac_dir(self, iac_file: UploadFile): @@ -100,28 +125,40 @@ class ScanRunner: """ try: iac_filename_local = generate_random_pathname(iac_file.filename) - with open(iac_filename_local, 'wb+') as iac_file_local: + with open(iac_filename_local, "wb+") as iac_file_local: iac_file_local.write(iac_file.file.read()) iac_file_local.close() self.iac_dir = unpack_archive_to_dir(iac_filename_local, None) + # print(self.compatiblity.check_iac_type(self.iac_dir)) remove(iac_filename_local) except Exception as e: - raise Exception(f'Error when initializing IaC directory: {str(e)}.') + raise Exception(f"Error when initializing IaC directory: {str(e)}.") def _cleanup_iac_dir(self): """Remove the created IaC directory""" try: rmtree(self.iac_dir, True) except Exception as e: - raise Exception(f'Error when cleaning IaC directory: {str(e)}.') + raise Exception(f"Error when cleaning IaC directory: {str(e)}.") - def _run_checks(self, selected_checks: Optional[List], scan_response_type: ScanResponseType) -> Union[dict, str]: + def _run_checks( + self, selected_checks: Optional[List], scan_response_type: ScanResponseType + ) -> Union[dict, str]: """ Run the specified IaC checks :param selected_checks: List of selected checks to be executed on IaC :param scan_response_type: Scan response type (JSON or HTML) :return: Dict or string with output for running checks """ + + dt = datetime.now() + ts = datetime.timestamp(dt) + dir_name = "scan_run_" + str(ts) + + os.mkdir(dir_name) + + compatible_checks = self.checker.get_all_compatible_checks(self.iac_dir) + if scan_response_type == ScanResponseType.json: scan_output = {} else: @@ -129,20 +166,36 @@ class ScanRunner: if selected_checks: for selected_check in selected_checks: check = self.iac_checks[selected_check] + if check.enabled: - check_output = check.run(self.iac_dir) - if scan_response_type == ScanResponseType.json: - scan_output[selected_check] = check_output.to_dict() - else: - scan_output += f'### {selected_check} ###\n{check_output.to_string()}\n\n' + if selected_check in compatible_checks: + print("Selected:") + print(selected_check) + check_output = check.run(self.iac_dir) + print("compatible------") + if scan_response_type == ScanResponseType.json: + scan_output[selected_check] = check_output.to_dict() + else: + scan_output += f"### {selected_check} ###\n{check_output.to_string()}\n\n" + + write_string_to_file( + check.name, dir_name, scan_output[check.name]["output"] + ) else: for iac_check in self.iac_checks.values(): + if iac_check.enabled: check_output = iac_check.run(self.iac_dir) if scan_response_type == ScanResponseType.json: scan_output[iac_check.name] = check_output.to_dict() else: - scan_output += f'### {iac_check.name} ###\n{check_output.to_string()}\n\n' + scan_output += ( + f"### {iac_check.name} ###\n{check_output.to_string()}\n\n" + ) + + write_string_to_file( + iac_check.name, dir_name, scan_output[iac_heck.name]["output"] + ) return scan_output @@ -156,11 +209,11 @@ class ScanRunner: check = self.iac_checks[check_name] if not check.enabled: check.enabled = True - return f'Check: {check_name} is now enabled and available to use.' + return f"Check: {check_name} is now enabled and available to use." else: - raise Exception(f'Check: {check_name} is already enabled.') + raise Exception(f"Check: {check_name} is already enabled.") else: - raise Exception(f'Nonexistent check: {check_name}') + raise Exception(f"Nonexistent check: {check_name}") def disable_check(self, check_name: str) -> str: """ @@ -172,13 +225,18 @@ class ScanRunner: check = self.iac_checks[check_name] if check.enabled: check.enabled = False - return f'Check: {check_name} is now disabled and cannot be used.' + return f"Check: {check_name} is now disabled and cannot be used." else: - raise Exception(f'Check: {check_name} is already disabled.') + raise Exception(f"Check: {check_name} is already disabled.") else: - raise Exception(f'Nonexistent check: {check_name}') + raise Exception(f"Nonexistent check: {check_name}") - def configure_check(self, check_name: str, config_file: Optional[UploadFile], secret: Optional[SecretStr]) -> str: + def configure_check( + self, + check_name: str, + config_file: Optional[UploadFile], + secret: Optional[SecretStr], + ) -> str: """ Configures the selected check with the supplied optional configuration file or/and secret :param check_name: Name of the check @@ -191,19 +249,27 @@ class ScanRunner: if check.enabled: config_filename_local = None if config_file: - config_filename_local = generate_random_pathname("", "-" + config_file.filename) - with open(f'{env.CONFIG_DIR}/{config_filename_local}', 'wb+') as config_file_local: + config_filename_local = generate_random_pathname( + "", "-" + config_file.filename + ) + with open( + f"{env.CONFIG_DIR}/{config_filename_local}", "wb+" + ) as config_file_local: config_file_local.write(config_file.file.read()) config_file_local.close() check_output = check.configure(config_filename_local, secret) check.configured = True return check_output.output else: - raise Exception(f'Check: {check_name} is disabled. You need to enable it first.') + raise Exception( + f"Check: {check_name} is disabled. You need to enable it first." + ) else: - raise Exception(f'Nonexistent check: {check_name}') + raise Exception(f"Nonexistent check: {check_name}") - def scan_iac(self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType) -> Union[dict, str]: + def scan_iac( + self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType + ) -> Union[dict, str]: """ Run IaC scanning process (initiate IaC dir, run checks and cleanup IaC dir) :param iac_file: IaC file that will be scanned @@ -211,11 +277,22 @@ class ScanRunner: :param scan_response_type: Scan response type (JSON or HTML) :return: Dict or string with scan result """ - nonexistent_checks = list(set(checks) - set( - map(lambda check: check.name, - filter(lambda check: check.enabled and check.configured, self.iac_checks.values())))) + nonexistent_checks = list( + set(checks) + - set( + map( + lambda check: check.name, + filter( + lambda check: check.enabled and check.configured, + self.iac_checks.values(), + ), + ) + ) + ) if nonexistent_checks: - raise Exception(f'Nonexistent, disabled or un-configured checks: {nonexistent_checks}.') + raise Exception( + f"Nonexistent, disabled or un-configured checks: {nonexistent_checks}." + ) self._init_iac_dir(iac_file) scan_output = self._run_checks(checks, scan_response_type) diff --git a/src/iac_scan_runner/test/test_scanner_simple.py b/src/iac_scan_runner/test/test_scanner_simple.py new file mode 100644 index 0000000000000000000000000000000000000000..3614bec2211ba53584a5f36a841db72b2873c55d --- /dev/null +++ b/src/iac_scan_runner/test/test_scanner_simple.py @@ -0,0 +1,13 @@ +import requests + +URL = "http://127.0.0.1:8000/scan" +multipart_form_data = { + "iac": ("hello-world.zip", open("hello-world.zip", "rb")), + "checks": (None, "git-leaks,tfsec,tflint,shellcheck"), +} +response = requests.post(URL, files=multipart_form_data) +print(response.json()) + +scan_result = response.json() + +print(scan_result) diff --git a/src/iac_scan_runner/utils.py b/src/iac_scan_runner/utils.py index 3f078c6160560b64390774f7dbf79bcc46afcb07..865a34a5c6bb31de3ea7b48e80895261792947e4 100644 --- a/src/iac_scan_runner/utils.py +++ b/src/iac_scan_runner/utils.py @@ -17,9 +17,14 @@ def run_command(command: str, directory: str = ".") -> CheckOutput: :return: CheckOutput object """ try: - return CheckOutput(check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode('utf-8'), 0) + return CheckOutput( + check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode( + "utf-8" + ), + 0, + ) except CalledProcessError as e: - return CheckOutput(str(e.output.decode('utf-8')), e.returncode) + return CheckOutput(str(e.output.decode("utf-8")), e.returncode) def determine_archive_format(archive_path: str) -> str: @@ -34,7 +39,9 @@ def determine_archive_format(archive_path: str) -> str: return "tar" else: raise Exception( - "Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format(archive_path) + "Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format( + archive_path + ) ) @@ -67,4 +74,16 @@ def unpack_archive_to_dir(archive_path: str, output_dir: Optional[str]) -> str: unpack_archive(archive_path, output_dir, iac_format) return output_dir except Exception as e: - raise Exception(f'Nonexistent check: {str(e)}') + raise Exception(f"Nonexistent check: {str(e)}") + + +def write_string_to_file(check_name: str, dir_name: str, output_value: str): + """ + Writes string to given file inside specified directory + :param check_name: Name of the check + :param output_dir: Directory where log will be stored + :param output_value: Content written to given file + """ + file_name = dir_name + "/" + check_name + ".txt" + with open(file_name, "w") as text_file: + text_file.write(output_value)