Skip to content
Snippets Groups Projects
Commit 7db02b4d authored by penenadpi's avatar penenadpi Committed by Anze Luzar
Browse files

Compatibility matrix support for scans

Within this commit we're applying the following changes:
- Compatibility class encapsulating dictionary of (scan_type, file_type) pairs
parent 7296d89f
Branches
Tags
No related merge requests found
import os
class Compatibility:
def __init__(self, matrix: dict):
"""
Initialize new IaC Compatibility matrix
:param matrix: dictionary of available checks for given Iac type
"""
self.compatibility_matrix = matrix
def get_check_list(self, iac_type: str) -> list:
"""
Returns the list of available scanner check tools for given type of IaC archive
:return: list object conatining string names of checks
"""
return self.compatibility_matrix[iac_type]
def check_iac_type(self, iac_directory: str):
"""Check the type of iac archive
:param iac_dircetory: Extracted iac archive path"
:return: Specific type of iac"
"""
terraform = False
shell = False
py = False
yaml = False
types = list()
try:
for filename in os.listdir(iac_directory):
f = os.path.join(iac_directory, filename)
if os.path.isfile(f):
if f.find(".tf") > -1 and (terraform is False):
types.append("terraform")
terraform = True
if f.find(".sh") > -1 and (shell is False):
types.append("shell")
shell = True
if f.find(".py") > -1 and (py is False):
types.append("python")
py = True
if f.find(".yaml") > -1 and (yaml is False):
types.append("yaml")
yaml = True
return types
except Exception as e:
raise Exception(f"Error when checking directory type: {str(e)}.")
def get_all_compatible_checks(self, iac_directory: str) -> list:
"""
Returns the list of available scanner check tools for given type of IaC archive
:return: list object conatining string names of checks
"""
checks_list = list()
types_list = self.check_iac_type(iac_directory)
for iac_type in types_list:
type_checks = self.compatibility_matrix[iac_type]
for check in type_checks:
checks_list.append(check)
print(checks_list)
return checks_list
......@@ -4,6 +4,9 @@ from typing import Optional, List, Union
import iac_scan_runner.vars as env
from fastapi import UploadFile
from iac_scan_runner.compatibility import Compatibility
from iac_scan_runner.checks.ansible_lint import AnsibleLintCheck
from iac_scan_runner.checks.bandit import BanditCheck
from iac_scan_runner.checks.checkstyle import CheckStyle
......@@ -29,10 +32,18 @@ from iac_scan_runner.checks.tfsec import TfsecCheck
from iac_scan_runner.checks.ts_lint import TSLintCheck
from iac_scan_runner.checks.yamllint import YamlLintCheck
from iac_scan_runner.scan_response_type import ScanResponseType
from iac_scan_runner.utils import generate_random_pathname, unpack_archive_to_dir
from iac_scan_runner.utils import (
generate_random_pathname,
unpack_archive_to_dir,
write_string_to_file,
)
from pydantic import SecretStr
from datetime import datetime
import os
class ScanRunner:
def __init__(self):
"""Initialize new scan runner that can perform IaC scanning with multiple IaC checks"""
......@@ -66,6 +77,20 @@ class ScanRunner:
snyk = SnykCheck()
sonar_scanner = SonarScannerCheck()
init_dict = {
"terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"],
"yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"],
"shell": ["shellcheck", "git-leaks", "git-secrets"],
"python": ["pylint", "bandit", "pyup-safety"],
"ansible": ["ansible-lint", "steanounk-scanner"],
"java": ["checkstyle"],
"js": ["es-lint"],
"html": ["htmlhint"],
"docker": ["hadolint"],
}
self.checker = Compatibility(init_dict)
self.iac_checks = {
opera_tosca_parser.name: opera_tosca_parser,
ansible_lint.name: ansible_lint,
......@@ -90,7 +115,7 @@ class ScanRunner:
cloc.name: cloc,
checkstyle.name: checkstyle,
snyk.name: snyk,
sonar_scanner.name: sonar_scanner
sonar_scanner.name: sonar_scanner,
}
def _init_iac_dir(self, iac_file: UploadFile):
......@@ -100,28 +125,40 @@ class ScanRunner:
"""
try:
iac_filename_local = generate_random_pathname(iac_file.filename)
with open(iac_filename_local, 'wb+') as iac_file_local:
with open(iac_filename_local, "wb+") as iac_file_local:
iac_file_local.write(iac_file.file.read())
iac_file_local.close()
self.iac_dir = unpack_archive_to_dir(iac_filename_local, None)
# print(self.compatiblity.check_iac_type(self.iac_dir))
remove(iac_filename_local)
except Exception as e:
raise Exception(f'Error when initializing IaC directory: {str(e)}.')
raise Exception(f"Error when initializing IaC directory: {str(e)}.")
def _cleanup_iac_dir(self):
"""Remove the created IaC directory"""
try:
rmtree(self.iac_dir, True)
except Exception as e:
raise Exception(f'Error when cleaning IaC directory: {str(e)}.')
raise Exception(f"Error when cleaning IaC directory: {str(e)}.")
def _run_checks(self, selected_checks: Optional[List], scan_response_type: ScanResponseType) -> Union[dict, str]:
def _run_checks(
self, selected_checks: Optional[List], scan_response_type: ScanResponseType
) -> Union[dict, str]:
"""
Run the specified IaC checks
:param selected_checks: List of selected checks to be executed on IaC
:param scan_response_type: Scan response type (JSON or HTML)
:return: Dict or string with output for running checks
"""
dt = datetime.now()
ts = datetime.timestamp(dt)
dir_name = "scan_run_" + str(ts)
os.mkdir(dir_name)
compatible_checks = self.checker.get_all_compatible_checks(self.iac_dir)
if scan_response_type == ScanResponseType.json:
scan_output = {}
else:
......@@ -129,20 +166,36 @@ class ScanRunner:
if selected_checks:
for selected_check in selected_checks:
check = self.iac_checks[selected_check]
if check.enabled:
if selected_check in compatible_checks:
print("Selected:")
print(selected_check)
check_output = check.run(self.iac_dir)
print("compatible------")
if scan_response_type == ScanResponseType.json:
scan_output[selected_check] = check_output.to_dict()
else:
scan_output += f'### {selected_check} ###\n{check_output.to_string()}\n\n'
scan_output += f"### {selected_check} ###\n{check_output.to_string()}\n\n"
write_string_to_file(
check.name, dir_name, scan_output[check.name]["output"]
)
else:
for iac_check in self.iac_checks.values():
if iac_check.enabled:
check_output = iac_check.run(self.iac_dir)
if scan_response_type == ScanResponseType.json:
scan_output[iac_check.name] = check_output.to_dict()
else:
scan_output += f'### {iac_check.name} ###\n{check_output.to_string()}\n\n'
scan_output += (
f"### {iac_check.name} ###\n{check_output.to_string()}\n\n"
)
write_string_to_file(
iac_check.name, dir_name, scan_output[iac_heck.name]["output"]
)
return scan_output
......@@ -156,11 +209,11 @@ class ScanRunner:
check = self.iac_checks[check_name]
if not check.enabled:
check.enabled = True
return f'Check: {check_name} is now enabled and available to use.'
return f"Check: {check_name} is now enabled and available to use."
else:
raise Exception(f'Check: {check_name} is already enabled.')
raise Exception(f"Check: {check_name} is already enabled.")
else:
raise Exception(f'Nonexistent check: {check_name}')
raise Exception(f"Nonexistent check: {check_name}")
def disable_check(self, check_name: str) -> str:
"""
......@@ -172,13 +225,18 @@ class ScanRunner:
check = self.iac_checks[check_name]
if check.enabled:
check.enabled = False
return f'Check: {check_name} is now disabled and cannot be used.'
return f"Check: {check_name} is now disabled and cannot be used."
else:
raise Exception(f'Check: {check_name} is already disabled.')
raise Exception(f"Check: {check_name} is already disabled.")
else:
raise Exception(f'Nonexistent check: {check_name}')
raise Exception(f"Nonexistent check: {check_name}")
def configure_check(self, check_name: str, config_file: Optional[UploadFile], secret: Optional[SecretStr]) -> str:
def configure_check(
self,
check_name: str,
config_file: Optional[UploadFile],
secret: Optional[SecretStr],
) -> str:
"""
Configures the selected check with the supplied optional configuration file or/and secret
:param check_name: Name of the check
......@@ -191,19 +249,27 @@ class ScanRunner:
if check.enabled:
config_filename_local = None
if config_file:
config_filename_local = generate_random_pathname("", "-" + config_file.filename)
with open(f'{env.CONFIG_DIR}/{config_filename_local}', 'wb+') as config_file_local:
config_filename_local = generate_random_pathname(
"", "-" + config_file.filename
)
with open(
f"{env.CONFIG_DIR}/{config_filename_local}", "wb+"
) as config_file_local:
config_file_local.write(config_file.file.read())
config_file_local.close()
check_output = check.configure(config_filename_local, secret)
check.configured = True
return check_output.output
else:
raise Exception(f'Check: {check_name} is disabled. You need to enable it first.')
raise Exception(
f"Check: {check_name} is disabled. You need to enable it first."
)
else:
raise Exception(f'Nonexistent check: {check_name}')
raise Exception(f"Nonexistent check: {check_name}")
def scan_iac(self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType) -> Union[dict, str]:
def scan_iac(
self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType
) -> Union[dict, str]:
"""
Run IaC scanning process (initiate IaC dir, run checks and cleanup IaC dir)
:param iac_file: IaC file that will be scanned
......@@ -211,11 +277,22 @@ class ScanRunner:
:param scan_response_type: Scan response type (JSON or HTML)
:return: Dict or string with scan result
"""
nonexistent_checks = list(set(checks) - set(
map(lambda check: check.name,
filter(lambda check: check.enabled and check.configured, self.iac_checks.values()))))
nonexistent_checks = list(
set(checks)
- set(
map(
lambda check: check.name,
filter(
lambda check: check.enabled and check.configured,
self.iac_checks.values(),
),
)
)
)
if nonexistent_checks:
raise Exception(f'Nonexistent, disabled or un-configured checks: {nonexistent_checks}.')
raise Exception(
f"Nonexistent, disabled or un-configured checks: {nonexistent_checks}."
)
self._init_iac_dir(iac_file)
scan_output = self._run_checks(checks, scan_response_type)
......
import requests
URL = "http://127.0.0.1:8000/scan"
multipart_form_data = {
"iac": ("hello-world.zip", open("hello-world.zip", "rb")),
"checks": (None, "git-leaks,tfsec,tflint,shellcheck"),
}
response = requests.post(URL, files=multipart_form_data)
print(response.json())
scan_result = response.json()
print(scan_result)
......@@ -17,9 +17,14 @@ def run_command(command: str, directory: str = ".") -> CheckOutput:
:return: CheckOutput object
"""
try:
return CheckOutput(check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode('utf-8'), 0)
return CheckOutput(
check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode(
"utf-8"
),
0,
)
except CalledProcessError as e:
return CheckOutput(str(e.output.decode('utf-8')), e.returncode)
return CheckOutput(str(e.output.decode("utf-8")), e.returncode)
def determine_archive_format(archive_path: str) -> str:
......@@ -34,7 +39,9 @@ def determine_archive_format(archive_path: str) -> str:
return "tar"
else:
raise Exception(
"Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format(archive_path)
"Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format(
archive_path
)
)
......@@ -67,4 +74,16 @@ def unpack_archive_to_dir(archive_path: str, output_dir: Optional[str]) -> str:
unpack_archive(archive_path, output_dir, iac_format)
return output_dir
except Exception as e:
raise Exception(f'Nonexistent check: {str(e)}')
raise Exception(f"Nonexistent check: {str(e)}")
def write_string_to_file(check_name: str, dir_name: str, output_value: str):
"""
Writes string to given file inside specified directory
:param check_name: Name of the check
:param output_dir: Directory where log will be stored
:param output_value: Content written to given file
"""
file_name = dir_name + "/" + check_name + ".txt"
with open(file_name, "w") as text_file:
text_file.write(output_value)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment