Skip to content
Snippets Groups Projects
Commit 96a1e480 authored by nenad.petrovic@xlab.si's avatar nenad.petrovic@xlab.si Committed by Anze Luzar
Browse files

Code refactoring and adding docstrings

parent 64e7c0ce
No related branches found
No related tags found
No related merge requests found
......@@ -2,80 +2,76 @@ import os
class Compatibility:
def __init__(self, matrix: dict):
# TODO: This matrix should be revised and extended, it is just a proof of concept here as for now
compatibility_matrix = {
"terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"],
"yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"],
"shell": ["shellcheck", "git-leaks", "git-secrets"],
"python": ["pylint", "bandit", "pyup-safety"],
"ansible": ["ansible-lint", "steampunk-scanner"],
"java": ["checkstyle"],
"js": ["es-lint"],
"html": ["htmlhint"],
"docker": ["hadolint"],
}
def __init__(self):
"""
Initialize new IaC Compatibility matrix
:param matrix: dictionary of available checks for given Iac type
:param matrix: Dictionary of available checks for given IaC type
"""
self.compatibility_matrix = matrix
self.scanned_files = dict()
def get_check_list(self, iac_type: str) -> list:
"""
Returns the list of available scanner check tools for given type of IaC archive
:return: list object conatining string names of checks
:iac_type: Type of IaC file for which we consider the list of compatible scans
:return: List with names of checks as strings
"""
return self.compatibility_matrix[iac_type]
def check_iac_type(self, iac_directory: str):
"""Check the type of iac archive
:param iac_dircetory: Extracted iac archive path"
:return: Specific type of iac"
def check_iac_type(self, iac_directory: str) -> list:
"""Check the type of IaC archive
:param iac_dircetory: Extracted IaC archive path
:return: List of specific file types within the given IaC directory
"""
terraform = False
shell = False
py = False
yaml = False
java = False
html = False
types = list()
scanned_terraform = list()
scanned_shell = list()
scanned_py = list()
scanned_yaml = list()
scanned_java = list()
scanned_html = list()
types = []
scanned_terraform = []
scanned_shell = []
scanned_py = []
scanned_yaml = []
scanned_java = []
scanned_html = []
# TODO: List of supported file types should be extended
try:
for filename in os.listdir(iac_directory):
f = os.path.join(iac_directory, filename)
if os.path.isfile(f):
if f.find(".tf") > -1:
# and (terraform is False):
types.append("terraform")
terraform = True
scanned_terraform.append(filename)
if f.find(".sh") > -1:
# and (shell is False):
types.append("shell")
shell = True
scanned_shell.append(filename)
if f.find(".py") > -1:
# and (py is False):
types.append("python")
py = True
scanned_py.append(filename)
if f.find(".yaml") > -1:
# and (yaml is False):
types.append("yaml")
yaml = True
scanned_yaml.append(filename)
if f.find(".java") > -1:
# and (yaml is False):
types.append("java")
java = True
scanned_java.append(filename)
if f.find(".html") > -1:
# and (yaml is False):
types.append("html")
html = True
scanned_html.append(filename)
self.scanned_files["terraform"] = str(scanned_terraform)
......@@ -91,14 +87,14 @@ class Compatibility:
def get_all_compatible_checks(self, iac_directory: str) -> list:
"""
Returns the list of available scanner check tools for given type of IaC archive
:return: list object conatining string names of checks
:param iac_dircetory: Extracted IaC archive path
:return: List with names of compatible checks as strings
"""
checks_list = list()
checks_list = []
types_list = self.check_iac_type(iac_directory)
for iac_type in types_list:
type_checks = self.compatibility_matrix[iac_type]
for check in type_checks:
checks_list.append(check)
print(checks_list)
return checks_list
import os
import json
from iac_scan_runner.utils import write_html_to_file
class ResultsSummary:
def __init__(self):
"""
Initialize new IaC Compatibility matrix
:param matrix: dictionary of available checks for given Iac type
:param matrix: dictionary of available checks for given IaC type
"""
self.outcomes = dict()
......@@ -22,11 +18,14 @@ class ResultsSummary:
def set_check_outcome(self, check_name: str, outcome: str, file_list: str):
"""
Returns the list of available scanner check tools for given type of IaC archive
:return: list object conatining string names of checks
Sets the outcome and list of scanned files for specific scanning tool
:param check_name: Name of a scan tool
:param outcome: Scan verdict - passed, failed or no files scanned
:param file_list: List of files that were scanned using the particular tool
"""
self.outcomes[check] = {}
self.outcomes[check_name] = {}
outcomes[check_name]["status"] = outcome
outcomes[check_name]["files"] = outcome
def summarize_outcome(
self, check: str, outcome: str, scanned_files: dict, compatibility_matrix: dict
......@@ -45,6 +44,7 @@ class ResultsSummary:
self.outcomes[check]["files"] = file_list
# TODO: This part should be extended to cover all relevant cases
if check == "tfsec":
if outcome.find("No problems detected!") > -1:
self.outcomes[check]["status"] = "Passed"
......@@ -70,21 +70,36 @@ class ResultsSummary:
return "Problems"
def summarize_no_files(self, check: str):
"""
Sets the outcome of the selected check to "no files" case
:param check: Name of the considered check of interest
"""
self.outcomes[check] = {}
self.outcomes[check]["status"] = "No files"
self.outcomes[check]["log"] = ""
self.outcomes[check]["files"] = ""
def show_outcomes(self):
"""
Prints out current summary of the performed checks containing the log and list of scanned files
"""
print(self.outcomes)
def dump_outcomes(self, file_name: str):
"""
Summarizes scan results into JSON file
:param file_name: Name of the generated JSON file containing the scan summary
"""
file_path = "../outputs/json_dumps/" + file_name + ".json"
with open(file_path, "w") as fp:
json.dump(self.outcomes, fp)
def generate_html_prioritized(self, file_name: str):
"""
Summarizes scan results into HTML file
:param file_name: Name of the generated HTML file containing the page summary
"""
html_page = "<!DOCTYPE html> <html> <style> table, th, td { border:1px solid black;}</style> <body> <h2>Scan results</h2> <table style='width:100%'> <tr> <th>Scan</th><th>Status</th><th>Files</th><th>Log</th> </tr>"
# parse scans
for scan in self.outcomes:
......
......@@ -78,20 +78,7 @@ class ScanRunner:
snyk = SnykCheck()
sonar_scanner = SonarScannerCheck()
# This matrix should be revised and extended, it is just a proof of concept here as for now
init_dict = {
"terraform": ["tfsec", "tflint", "terrascan", "git-leaks", "git-secrets"],
"yaml": ["git-leaks", "yamllint", "git-leaks", "git-secrets"],
"shell": ["shellcheck", "git-leaks", "git-secrets"],
"python": ["pylint", "bandit", "pyup-safety"],
"ansible": ["ansible-lint", "steampunk-scanner"],
"java": ["checkstyle"],
"js": ["es-lint"],
"html": ["htmlhint"],
"docker": ["hadolint"],
}
self.checker = Compatibility(init_dict)
self.checker = Compatibility()
self.results_summary = ResultsSummary()
self.iac_checks = {
......@@ -167,8 +154,6 @@ class ScanRunner:
if selected_checks:
for selected_check in selected_checks:
check = self.iac_checks[selected_check]
print("NEW CHECK")
print(check)
if check.enabled:
if selected_check in compatible_checks:
......@@ -187,22 +172,16 @@ class ScanRunner:
selected_check,
scan_output[check.name]["output"],
self.checker.scanned_files,
self.checker.compatibility_matrix,
Compatibility.compatibility_matrix,
)
else:
non_compatible_checks.append(check.name)
write_string_to_file(check.name, dir_name, "No files to scan")
print("NO SCAN")
self.results_summary.summarize_no_files(check.name)
print(self.checker.scanned_files)
print("Non executed checks")
print(non_compatible_checks)
print(self.results_summary.show_outcomes())
self.results_summary.dump_outcomes(random_uuid)
self.results_summary.generate_html_prioritized(random_uuid)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment