Skip to content
Snippets Groups Projects
Commit 4022c7af authored by penenadpi's avatar penenadpi Committed by Anze Luzar
Browse files

Auto output directory, refactoring, TODOs, try-catch

Within this commit we're applying the following changes:
-HTML code formatting and additional docstring comments
-Automatic outputs directory creation fix
-Resolving formatting issues and TODOs
-Try-catch blocks for file writing
-Adding TODOs
parent 96a1e480
No related branches found
No related tags found
No related merge requests found
......@@ -5,6 +5,10 @@
export ROOT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
export VIRTUALENV_DIR="${ROOT_DIR}/.venv"
export TOOLS_DIR="${ROOT_DIR}/tools"
export OUT_DIR="${ROOT_DIR}/outputs"
export HTML_DIR="${OUT_DIR}/generated_html"
export JSON_DIR="${OUT_DIR}/json_dumps"
export LOG_DIR="${OUT_DIR}/logs"
export TMP_DIR="${TOOLS_DIR}/tmp"
export NODE_MODULES_DIR="${ROOT_DIR}/node_modules"
export CONFIG_DIR="${ROOT_DIR}/config"
......@@ -153,6 +157,10 @@ createDirIfNot "${TOOLS_DIR}"
createDirIfNot "${TMP_DIR}"
createDirIfNot "${NODE_MODULES_DIR}"
createDirIfNot "${CONFIG_DIR}"
createDirIfNot "${OUT_DIR}"
createDirIfNot "${HTML_DIR}"
createDirIfNot "${JSON_DIR}"
createDirIfNot "${LOG_DIR}"
installPythonModules
installRequiredNpmModulesIfNot
downloadCheckStyleJarIfNot
......
Generated HTML pages are stored here
JSON scan summaries are stored here
Individual scan tool logs are dumped here
import os
from typing import List
class Compatibility:
# TODO: This matrix should be revised and extended, it is just a proof of concept here as for now
......@@ -10,7 +10,7 @@ class Compatibility:
"python": ["pylint", "bandit", "pyup-safety"],
"ansible": ["ansible-lint", "steampunk-scanner"],
"java": ["checkstyle"],
"js": ["es-lint"],
"js": ["es-lint", "ts-lint"],
"html": ["htmlhint"],
"docker": ["hadolint"],
}
......@@ -22,7 +22,7 @@ class Compatibility:
"""
self.scanned_files = dict()
def get_check_list(self, iac_type: str) -> list:
def get_check_list(self, iac_type: str) -> List[str]:
"""
Returns the list of available scanner check tools for given type of IaC archive
:iac_type: Type of IaC file for which we consider the list of compatible scans
......@@ -30,7 +30,7 @@ class Compatibility:
"""
return self.compatibility_matrix[iac_type]
def check_iac_type(self, iac_directory: str) -> list:
def check_iac_type(self, iac_directory: str) -> List[str]:
"""Check the type of IaC archive
:param iac_dircetory: Extracted IaC archive path
:return: List of specific file types within the given IaC directory
......@@ -46,6 +46,7 @@ class Compatibility:
scanned_html = []
# TODO: List of supported file types should be extended
# TODO: Remove hardcoded check names
try:
for filename in os.listdir(iac_directory):
f = os.path.join(iac_directory, filename)
......@@ -84,7 +85,7 @@ class Compatibility:
except Exception as e:
raise Exception(f"Error when checking directory type: {str(e)}.")
def get_all_compatible_checks(self, iac_directory: str) -> list:
def get_all_compatible_checks(self, iac_directory: str) -> List[str]:
"""
Returns the list of available scanner check tools for given type of IaC archive
:param iac_dircetory: Extracted IaC archive path
......
......@@ -32,7 +32,7 @@ class ResultsSummary:
) -> str:
"""Summarize the check result to True/False depending on the return tool output
:param check: Name of the considered check of interest
:return: Whether the check passed (True) or failed (False)
:return: Whether the check passed or problems were detected
"""
self.outcomes[check] = {}
self.outcomes[check]["log"] = outcome
......@@ -44,7 +44,8 @@ class ResultsSummary:
self.outcomes[check]["files"] = file_list
# TODO: This part should be extended to cover all relevant cases
# TODO: This part should be extended to cover all relevant cases and code refactored
# TODO: The check names hould not be hardcoded but replaced with parametrized values instead
if check == "tfsec":
if outcome.find("No problems detected!") > -1:
self.outcomes[check]["status"] = "Passed"
......@@ -74,6 +75,7 @@ class ResultsSummary:
Sets the outcome of the selected check to "no files" case
:param check: Name of the considered check of interest
"""
# TODO: Fields should be extracted into new Python object, probably extending the existing CheckOutput
self.outcomes[check] = {}
self.outcomes[check]["status"] = "No files"
self.outcomes[check]["log"] = ""
......@@ -90,30 +92,35 @@ class ResultsSummary:
Summarizes scan results into JSON file
:param file_name: Name of the generated JSON file containing the scan summary
"""
#TODO: Replace hardcoded path with parameter
file_path = "../outputs/json_dumps/" + file_name + ".json"
try:
with open(file_path, "w") as fp:
json.dump(self.outcomes, fp)
except Exception as e:
raise Exception(f"Error dumping json of scan results: {str(e)}.")
def generate_html_prioritized(self, file_name: str):
"""
Summarizes scan results into HTML file
Summarizes scan results (status, list of scanned files and scan tool log) into HTML file with the following visualization ordering: 1) problems detected 2) passed 3) no files scanned
:param file_name: Name of the generated HTML file containing the page summary
"""
html_page = "<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style> <body> <h2>Scan results</h2> <table style='width:100%'> <tr> <th>Scan</th><th>Status</th><th>Files</th><th>Log</th> </tr>"
# parse scans
html_page = (
"<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style>"
+ "<body> <h2>Scan results</h2>"
+ "<table style='width:100%'>"
+ "<tr> <th>Scan</th> <th>Status</th> <th>Files</th> <th>Log</th> </tr>"
)
for scan in self.outcomes:
if self.outcomes[scan]["status"] == "Problems":
html_page = html_page + "<tr>"
html_page = html_page + "<td>" + scan + "</td>"
html_page = (
html_page
+ "<td bgcolor='red'>"
+ str(self.outcomes[scan]["status"])
+ "</td>"
)
html_page = html_page + "<td bgcolor='red'>" + str(self.outcomes[scan]["status"]) + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>"
......@@ -124,12 +131,7 @@ class ResultsSummary:
if self.outcomes[scan]["status"] == "Passed":
html_page = html_page + "<tr>"
html_page = html_page + "<td>" + scan + "</td>"
html_page = (
html_page
+ "<td bgcolor='green'>"
+ str(self.outcomes[scan]["status"])
+ "</td>"
)
html_page = html_page + "<td bgcolor='green'>" + str(self.outcomes[scan]["status"]) + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>"
......@@ -140,12 +142,7 @@ class ResultsSummary:
if self.outcomes[scan]["status"] == "No files":
html_page = html_page + "<tr>"
html_page = html_page + "<td>" + scan + "</td>"
html_page = (
html_page
+ "<td bgcolor='gray'>"
+ str(self.outcomes[scan]["status"])
+ "</td>"
)
html_page = html_page + "<td bgcolor='gray'>" + str(self.outcomes[scan]["status"]) + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["files"] + "</td>"
html_page = html_page + "<td>" + self.outcomes[scan]["log"] + "</td>"
html_page = html_page + "</tr>"
......
......@@ -39,17 +39,16 @@ from iac_scan_runner.utils import (
write_string_to_file,
)
from pydantic import SecretStr
import uuid
import os
class ScanRunner:
def __init__(self):
"""Initialize new scan runner that can perform IaC scanning with multiple IaC checks"""
self.iac_checks = {}
self.iac_dir = None
self.compatibility_matrix = Compatibility()
self.results_summary = ResultsSummary()
def init_checks(self):
"""Initiate predefined check objects"""
......@@ -78,9 +77,6 @@ class ScanRunner:
snyk = SnykCheck()
sonar_scanner = SonarScannerCheck()
self.checker = Compatibility()
self.results_summary = ResultsSummary()
self.iac_checks = {
opera_tosca_parser.name: opera_tosca_parser,
ansible_lint.name: ansible_lint,
......@@ -105,7 +101,7 @@ class ScanRunner:
cloc.name: cloc,
checkstyle.name: checkstyle,
snyk.name: snyk,
sonar_scanner.name: sonar_scanner,
sonar_scanner.name: sonar_scanner
}
def _init_iac_dir(self, iac_file: UploadFile):
......@@ -130,9 +126,7 @@ class ScanRunner:
except Exception as e:
raise Exception(f"Error when cleaning IaC directory: {str(e)}.")
def _run_checks(
self, selected_checks: Optional[List], scan_response_type: ScanResponseType
) -> Union[dict, str]:
def _run_checks(self, selected_checks: Optional[List], scan_response_type: ScanResponseType) -> Union[dict, str]:
"""
Run the specified IaC checks
:param selected_checks: List of selected checks to be executed on IaC
......@@ -140,12 +134,13 @@ class ScanRunner:
:return: Dict or string with output for running checks
"""
random_uuid = str(uuid.uuid4())
# TODO: Replace this hardcoded path with a parameter
dir_name = "../outputs/logs/scan_run_" + random_uuid
os.mkdir(dir_name)
compatible_checks = self.checker.get_all_compatible_checks(self.iac_dir)
non_compatible_checks = list()
compatible_checks = self.compatibility_matrix.get_all_compatible_checks(self.iac_dir)
non_compatible_checks = []
if scan_response_type == ScanResponseType.json:
scan_output = {}
......@@ -156,47 +151,36 @@ class ScanRunner:
check = self.iac_checks[selected_check]
if check.enabled:
if selected_check in compatible_checks:
check_output = check.run(self.iac_dir)
if scan_response_type == ScanResponseType.json:
scan_output[selected_check] = check_output.to_dict()
else:
# TODO: Discuss the format of this output
scan_output += f"### {selected_check} ###\n{check_output.to_string()}\n\n"
write_string_to_file(
check.name, dir_name, scan_output[check.name]["output"]
)
write_string_to_file(check.name, dir_name, scan_output[check.name]["output"])
self.results_summary.summarize_outcome(selected_check, scan_output[check.name]["output"], self.compatibility_matrix.scanned_files, Compatibility.compatibility_matrix)
self.results_summary.summarize_outcome(
selected_check,
scan_output[check.name]["output"],
self.checker.scanned_files,
Compatibility.compatibility_matrix,
)
else:
non_compatible_checks.append(check.name)
write_string_to_file(check.name, dir_name, "No files to scan")
self.results_summary.summarize_no_files(check.name)
self.results_summary.dump_outcomes(random_uuid)
self.results_summary.generate_html_prioritized(random_uuid)
else:
for iac_check in self.iac_checks.values():
if iac_check.enabled:
check_output = iac_check.run(self.iac_dir)
if scan_response_type == ScanResponseType.json:
scan_output[iac_check.name] = check_output.to_dict()
else:
# TODO: Discuss the format of this output
scan_output += (
f"### {iac_check.name} ###\n{check_output.to_string()}\n\n"
)
# TODO: Discuss the format of this output
write_string_to_file(
iac_check.name, dir_name, scan_output[iac_check.name]["output"]
)
......@@ -235,12 +219,7 @@ class ScanRunner:
else:
raise Exception(f"Nonexistent check: {check_name}")
def configure_check(
self,
check_name: str,
config_file: Optional[UploadFile],
secret: Optional[SecretStr],
) -> str:
def configure_check(self, check_name: str, config_file: Optional[UploadFile], secret: Optional[SecretStr]) -> str:
"""
Configures the selected check with the supplied optional configuration file or/and secret
:param check_name: Name of the check
......@@ -253,9 +232,7 @@ class ScanRunner:
if check.enabled:
config_filename_local = None
if config_file:
config_filename_local = generate_random_pathname(
"", "-" + config_file.filename
)
config_filename_local = generate_random_pathname("", "-" + config_file.filename)
with open(
f"{env.CONFIG_DIR}/{config_filename_local}", "wb+"
) as config_file_local:
......@@ -265,11 +242,9 @@ class ScanRunner:
check.configured = True
return check_output.output
else:
raise Exception(
f"Check: {check_name} is disabled. You need to enable it first."
)
raise Exception(f'Check: {check_name} is disabled. You need to enable it first.')
else:
raise Exception(f"Nonexistent check: {check_name}")
raise Exception(f'Nonexistent check: {check_name}')
def scan_iac(
self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType
......@@ -281,22 +256,11 @@ class ScanRunner:
:param scan_response_type: Scan response type (JSON or HTML)
:return: Dict or string with scan result
"""
nonexistent_checks = list(
set(checks)
- set(
map(
lambda check: check.name,
filter(
lambda check: check.enabled and check.configured,
self.iac_checks.values(),
),
)
)
)
nonexistent_checks = list(set(checks) - set(
map(lambda check: check.name,
filter(lambda check: check.enabled and check.configured, self.iac_checks.values()))))
if nonexistent_checks:
raise Exception(
f"Nonexistent, disabled or un-configured checks: {nonexistent_checks}."
)
raise Exception(f'Nonexistent, disabled or un-configured checks: {nonexistent_checks}.')
self._init_iac_dir(iac_file)
scan_output = self._run_checks(checks, scan_response_type)
......
......@@ -17,12 +17,7 @@ def run_command(command: str, directory: str = ".") -> CheckOutput:
:return: CheckOutput object
"""
try:
return CheckOutput(
check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode(
"utf-8"
),
0,
)
return CheckOutput(check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode('utf-8'), 0)
except CalledProcessError as e:
return CheckOutput(str(e.output.decode("utf-8")), e.returncode)
......@@ -39,9 +34,7 @@ def determine_archive_format(archive_path: str) -> str:
return "tar"
else:
raise Exception(
"Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format(
archive_path
)
"Unsupported archive format: '{}'. The packaging format should be one of: zip, tar.".format(archive_path)
)
......@@ -85,8 +78,11 @@ def write_string_to_file(check_name: str, dir_name: str, output_value: str):
:param output_value: Content written to given file
"""
file_name = dir_name + "/" + check_name + ".txt"
try:
with open(file_name, "w") as text_file:
text_file.write(output_value)
except Exception as e:
raise Exception(f"Error while writing string to file: {str(e)}.")
def write_html_to_file(file_name: str, output_value: str):
"""
......@@ -96,6 +92,8 @@ def write_html_to_file(file_name: str, output_value: str):
:param output_value: Content written to given file
"""
file_name = "../outputs/generated_html/" + file_name + ".html"
try:
with open(file_name, "w") as text_file:
text_file.write(output_value)
except Exception as e:
raise Exception(f"Error storing HTML to file: {str(e)}.")
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment