diff --git a/src/iac_scan_runner/api.py b/src/iac_scan_runner/api.py index acb57574b7b17d9f50904fd8f194404094cc3503..9c29ecd2a36dce3d1e67fd85e4c35a1b72d20c38 100644 --- a/src/iac_scan_runner/api.py +++ b/src/iac_scan_runner/api.py @@ -13,6 +13,7 @@ from iac_scan_runner.scan_response_type import ScanResponseType from iac_scan_runner.scan_runner import ScanRunner from pydantic import SecretStr +# create an API instance app = FastAPI( docs_url="/swagger", title="IaC Scan Runner REST API", @@ -31,7 +32,7 @@ scan_runner.init_checks() def openapi_yaml() -> str: """ Return OpenAPI specification as YAML string - @return: string with YAML + :return: string with YAML """ openapi_json = app.openapi() yaml_str = io.StringIO() @@ -44,7 +45,7 @@ def openapi_yaml() -> str: def get_openapi_yml() -> Response: """ GET OpenAPI specification in YAML format (.yml) - @return: Response object + :return: Response object """ return Response(openapi_yaml(), media_type='text/yml') @@ -54,7 +55,7 @@ def get_openapi_yml() -> Response: def get_openapi_yaml() -> Response: """ GET OpenAPI specification in YAML format (.yaml) - @return: Response object + :return: Response object """ return Response(openapi_yaml(), media_type='text/yaml') @@ -64,11 +65,11 @@ async def get_checks(keyword: Optional[str] = None, enabled: Optional[bool] = No target_entity_type: Optional[CheckTargetEntityType] = None) -> JSONResponse: """ Retrieve and filter checks (GET method) - @param keyword: substring for filtering - @param enabled: bool saying whether check is enabled or disabled - @param configured: bool saying whether check is configured or not - @param target_entity_type: CheckTargetEntityType object - IaC, component or both - @return: JSONResponse object (with status code 200 or 400) + :param keyword: substring for filtering + :param enabled: bool saying whether check is enabled or disabled + :param configured: bool saying whether check is configured or not + :param target_entity_type: CheckTargetEntityType object - IaC, component or both + :return: JSONResponse object (with status code 200 or 400) """ try: filtered_checks = scan_runner.iac_checks.values() @@ -94,8 +95,8 @@ async def get_checks(keyword: Optional[str] = None, enabled: Optional[bool] = No async def put_enable_checks(check_name: str) -> JSONResponse: """ Enable check for running (PUT method) - @param check_name: Unique name of check to be enabled - @return: JSONResponse object (with status code 200 or 400) + :param check_name: Unique name of check to be enabled + :return: JSONResponse object (with status code 200 or 400) """ try: return JSONResponse(status_code=status.HTTP_200_OK, content=scan_runner.enable_check(check_name)) @@ -108,8 +109,8 @@ async def put_enable_checks(check_name: str) -> JSONResponse: async def put_disable_checks(check_name: str) -> JSONResponse: """ Disable check for running (PUT method) - @param check_name: Unique name of check to be disabled - @return: JSONResponse object (with status code 200 or 400) + :param check_name: Unique name of check to be disabled + :return: JSONResponse object (with status code 200 or 400) """ try: return JSONResponse(status_code=status.HTTP_200_OK, content=scan_runner.disable_check(check_name)) @@ -127,10 +128,10 @@ async def put_configure_check(check_name: str, 'etc.)')) -> JSONResponse: """ Configure check for running (PUT method) - @param check_name: Unique name of check to be configured - @param config_file: Check configuration file - @param secret: Secret needed for configuration (e.g., API key, token, password, cloud credentials, etc.) - @return: JSONResponse object (with status code 200 or 400) + :param check_name: Unique name of check to be configured + :param config_file: Check configuration file + :param secret: Secret needed for configuration (e.g., API key, token, password, cloud credentials, etc.) + :return: JSONResponse object (with status code 200 or 400) """ try: return JSONResponse(status_code=status.HTTP_200_OK, @@ -147,10 +148,10 @@ async def post_scan(iac: UploadFile = File(..., description='IaC file (zip or ta scan_response_type: ScanResponseType = ScanResponseType.json) -> Union[JSONResponse, HTMLResponse]: """ Run IaC scan (POST method) - @param iac: IaC file (zip or tar compressed) that will be scanned' - @param checks: List of selected checks to be executed on IaC - @param scan_response_type: Scan response type (JSON or HTML) - @return: JSONResponse or HTMLResponse object (with status code 200 or 400) + :param iac: IaC file (zip or tar compressed) that will be scanned' + :param checks: List of selected checks to be executed on IaC + :param scan_response_type: Scan response type (JSON or HTML) + :return: JSONResponse or HTMLResponse object (with status code 200 or 400) """ try: if not checks or checks == ['']: diff --git a/src/iac_scan_runner/check.py b/src/iac_scan_runner/check.py index ef236bd82203a3a15c13848e2af5a9662bc19acc..45c8c99aff23eaed4c866f90d9793d97d122702f 100644 --- a/src/iac_scan_runner/check.py +++ b/src/iac_scan_runner/check.py @@ -11,9 +11,9 @@ class Check(ABC): target_entity_type: Optional[CheckTargetEntityType] = None): """ Initialize new IaC check - @param name: Name of the check - @param description: Check description - @param target_entity_type: CheckTargetEntityType object - IaC, component or both + :param name: Name of the check + :param description: Check description + :param target_entity_type: CheckTargetEntityType object - IaC, component or both """ self.name = name self.description = description @@ -25,8 +25,8 @@ class Check(ABC): def configure(self, config_filename: Optional[str], secret: Optional[SecretStr]): """ Initiate check configuration (override this method if configuration is needed) - @param config_filename: Name of the check configuration file - @param secret: Secret needed for configuration (e.g. API key, token, password, cloud credentials, etc.) + :param config_filename: Name of the check configuration file + :param secret: Secret needed for configuration (e.g. API key, token, password, cloud credentials, etc.) """ pass @@ -34,7 +34,7 @@ class Check(ABC): def run(self, directory: str) -> CheckOutput: """ Initiate check run (this method has to be implemented for every subclass) - @param directory: Target directory where the check will be executed - @return: CheckOutput object + :param directory: Target directory where the check will be executed + :return: CheckOutput object """ pass diff --git a/src/iac_scan_runner/check_output.py b/src/iac_scan_runner/check_output.py index 995448bccfac5a5f468c2a8f470493f6a5795be7..5f8fabfe7702320bf24bfd630d3351c20a27f702 100644 --- a/src/iac_scan_runner/check_output.py +++ b/src/iac_scan_runner/check_output.py @@ -2,8 +2,8 @@ class CheckOutput: def __init__(self, output: str, rc: int): """ Initialize new IaC check output - @param output: Returned check output - @param rc: Check return code + :param output: Returned check output + :param rc: Check return code """ self.output = output self.rc = rc @@ -11,13 +11,13 @@ class CheckOutput: def to_dict(self) -> dict: """ Transform CheckOutput object to dict (for JSON response) - @return: dict with check output and return code + :return: dict with check output and return code """ return {"output": self.output, "rc": self.rc} def to_string(self) -> str: """ Transform CheckOutput object to string (for HTML response) - @return: string with result for check output + :return: string with result for check output """ return f'Return code: {self.rc}\nOutput: {self.output}' diff --git a/src/iac_scan_runner/cli.py b/src/iac_scan_runner/cli.py index 29d692ba43ad663819b10ad74b11949898397fee..c828d88d3e05d36688b696abfeaa420191534b75 100644 --- a/src/iac_scan_runner/cli.py +++ b/src/iac_scan_runner/cli.py @@ -22,8 +22,8 @@ def openapi( output: str = typer.Option(None, "--output", "-o", help="Output file path")): """ Get OpenAPI specification - @param output_format: OpenAPI output format (JSON or YAML) - @param output: Output file path name, where OpenAPI specification will be written to + :param output_format: OpenAPI output format (JSON or YAML) + :param output: Output file path name, where OpenAPI specification will be written to """ try: if output_format == OpenApiFormat.json: diff --git a/src/iac_scan_runner/scan_runner.py b/src/iac_scan_runner/scan_runner.py index 4d69b53f23cc66b0e3c97cf70c79a7461dcc49ed..f6358cb1d1e700927e892bd09efdae1883f3f959 100644 --- a/src/iac_scan_runner/scan_runner.py +++ b/src/iac_scan_runner/scan_runner.py @@ -93,7 +93,7 @@ class ScanRunner: def _init_iac_dir(self, iac_file: UploadFile): """ Initiate new unique IaC directory for scanning - @param iac_file: IaC file + :param iac_file: IaC file """ try: iac_filename_local = generate_random_pathname(iac_file.filename) @@ -115,9 +115,9 @@ class ScanRunner: def _run_checks(self, selected_checks: Optional[List], scan_response_type: ScanResponseType) -> Union[dict, str]: """ Run the specified IaC checks - @param selected_checks: List of selected checks to be executed on IaC - @param scan_response_type: Scan response type (JSON or HTML) - @return: Dict or string with output for running checks + :param selected_checks: List of selected checks to be executed on IaC + :param scan_response_type: Scan response type (JSON or HTML) + :return: Dict or string with output for running checks """ if scan_response_type == ScanResponseType.json: scan_output = {} @@ -146,8 +146,8 @@ class ScanRunner: def enable_check(self, check_name: str) -> str: """ Enables the specified check and makes it available to be used - @param check_name: Name of the check - @return: String with result for enabling check + :param check_name: Name of the check + :return: String with result for enabling check """ if check_name in self.iac_checks.keys(): check = self.iac_checks[check_name] @@ -162,8 +162,8 @@ class ScanRunner: def disable_check(self, check_name: str) -> str: """ Disables the specified check and makes it unavailable to be used - @param check_name: Name of the check - @return: String with result for disabling check + :param check_name: Name of the check + :return: String with result for disabling check """ if check_name in self.iac_checks.keys(): check = self.iac_checks[check_name] @@ -178,10 +178,10 @@ class ScanRunner: def configure_check(self, check_name: str, config_file: Optional[UploadFile], secret: Optional[SecretStr]) -> str: """ Configures the selected check with the supplied optional configuration file or/and secret - @param check_name: Name of the check - @param config_file: Check configuration file - @param secret: Secret needed for configuration (e.g. API key, token, password etc.) - @return: String with check configuration output + :param check_name: Name of the check + :param config_file: Check configuration file + :param secret: Secret needed for configuration (e.g. API key, token, password etc.) + :return: String with check configuration output """ if check_name in self.iac_checks.keys(): check = self.iac_checks[check_name] @@ -203,10 +203,10 @@ class ScanRunner: def scan_iac(self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType) -> Union[dict, str]: """ Run IaC scanning process (initiate IaC dir, run checks and cleanup IaC dir) - @param iac_file: IaC file that will be scanned - @param checks: List of selected checks to be executed on IaC - @param scan_response_type: Scan response type (JSON or HTML) - @return: Dict or string with scan result + :param iac_file: IaC file that will be scanned + :param checks: List of selected checks to be executed on IaC + :param scan_response_type: Scan response type (JSON or HTML) + :return: Dict or string with scan result """ nonexistent_checks = list(set(checks) - set( map(lambda check: check.name, diff --git a/src/iac_scan_runner/utils.py b/src/iac_scan_runner/utils.py index 9abf9d04c3a30228b4ad21953420ad7c9154f028..3f078c6160560b64390774f7dbf79bcc46afcb07 100644 --- a/src/iac_scan_runner/utils.py +++ b/src/iac_scan_runner/utils.py @@ -12,9 +12,9 @@ from iac_scan_runner.check_output import CheckOutput def run_command(command: str, directory: str = ".") -> CheckOutput: """ Run command with arguments in directory and return the output and return code - @param command: A command to run - @param directory: Target directory where the command will be executed (default is current dir) - @return: CheckOutput object + :param command: A command to run + :param directory: Target directory where the command will be executed (default is current dir) + :return: CheckOutput object """ try: return CheckOutput(check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode('utf-8'), 0) @@ -25,8 +25,8 @@ def run_command(command: str, directory: str = ".") -> CheckOutput: def determine_archive_format(archive_path: str) -> str: """ Figures out the format of the supplied archive file - @param archive_path: Path to the archive file - @return: String with archive format (zip or tar) + :param archive_path: Path to the archive file + :return: String with archive format (zip or tar) """ if is_zipfile(archive_path): return "zip" @@ -41,9 +41,9 @@ def determine_archive_format(archive_path: str) -> str: def generate_random_pathname(prefix: str = "", suffix: str = "") -> str: """ Creates a unique random pathname and select last 6 characters - @param prefix: Pathname prefix - @param suffix: Pathname suffix - @return: String with random pathname + :param prefix: Pathname prefix + :param suffix: Pathname suffix + :return: String with random pathname """ pathname = prefix + str(uuid4().hex)[-6:] + suffix if path.exists(pathname): @@ -55,9 +55,9 @@ def generate_random_pathname(prefix: str = "", suffix: str = "") -> str: def unpack_archive_to_dir(archive_path: str, output_dir: Optional[str]) -> str: """ Unpacks archive to a specified directory - @param archive_path: Path to the archive file - @param output_dir: Directory where IaC will be unpacked to - @return: String output dir name + :param archive_path: Path to the archive file + :param output_dir: Directory where IaC will be unpacked to + :return: String output dir name """ try: if not output_dir: