Skip to content
Snippets Groups Projects
Unverified Commit 839f3c03 authored by Anze Luzar's avatar Anze Luzar
Browse files

Change docstrings style to reStructuredText

parent 3bcb1c02
Branches
Tags
No related merge requests found
......@@ -13,6 +13,7 @@ from iac_scan_runner.scan_response_type import ScanResponseType
from iac_scan_runner.scan_runner import ScanRunner
from pydantic import SecretStr
# create an API instance
app = FastAPI(
docs_url="/swagger",
title="IaC Scan Runner REST API",
......@@ -31,7 +32,7 @@ scan_runner.init_checks()
def openapi_yaml() -> str:
"""
Return OpenAPI specification as YAML string
@return: string with YAML
:return: string with YAML
"""
openapi_json = app.openapi()
yaml_str = io.StringIO()
......@@ -44,7 +45,7 @@ def openapi_yaml() -> str:
def get_openapi_yml() -> Response:
"""
GET OpenAPI specification in YAML format (.yml)
@return: Response object
:return: Response object
"""
return Response(openapi_yaml(), media_type='text/yml')
......@@ -54,7 +55,7 @@ def get_openapi_yml() -> Response:
def get_openapi_yaml() -> Response:
"""
GET OpenAPI specification in YAML format (.yaml)
@return: Response object
:return: Response object
"""
return Response(openapi_yaml(), media_type='text/yaml')
......@@ -64,11 +65,11 @@ async def get_checks(keyword: Optional[str] = None, enabled: Optional[bool] = No
target_entity_type: Optional[CheckTargetEntityType] = None) -> JSONResponse:
"""
Retrieve and filter checks (GET method)
@param keyword: substring for filtering
@param enabled: bool saying whether check is enabled or disabled
@param configured: bool saying whether check is configured or not
@param target_entity_type: CheckTargetEntityType object - IaC, component or both
@return: JSONResponse object (with status code 200 or 400)
:param keyword: substring for filtering
:param enabled: bool saying whether check is enabled or disabled
:param configured: bool saying whether check is configured or not
:param target_entity_type: CheckTargetEntityType object - IaC, component or both
:return: JSONResponse object (with status code 200 or 400)
"""
try:
filtered_checks = scan_runner.iac_checks.values()
......@@ -94,8 +95,8 @@ async def get_checks(keyword: Optional[str] = None, enabled: Optional[bool] = No
async def put_enable_checks(check_name: str) -> JSONResponse:
"""
Enable check for running (PUT method)
@param check_name: Unique name of check to be enabled
@return: JSONResponse object (with status code 200 or 400)
:param check_name: Unique name of check to be enabled
:return: JSONResponse object (with status code 200 or 400)
"""
try:
return JSONResponse(status_code=status.HTTP_200_OK, content=scan_runner.enable_check(check_name))
......@@ -108,8 +109,8 @@ async def put_enable_checks(check_name: str) -> JSONResponse:
async def put_disable_checks(check_name: str) -> JSONResponse:
"""
Disable check for running (PUT method)
@param check_name: Unique name of check to be disabled
@return: JSONResponse object (with status code 200 or 400)
:param check_name: Unique name of check to be disabled
:return: JSONResponse object (with status code 200 or 400)
"""
try:
return JSONResponse(status_code=status.HTTP_200_OK, content=scan_runner.disable_check(check_name))
......@@ -127,10 +128,10 @@ async def put_configure_check(check_name: str,
'etc.)')) -> JSONResponse:
"""
Configure check for running (PUT method)
@param check_name: Unique name of check to be configured
@param config_file: Check configuration file
@param secret: Secret needed for configuration (e.g., API key, token, password, cloud credentials, etc.)
@return: JSONResponse object (with status code 200 or 400)
:param check_name: Unique name of check to be configured
:param config_file: Check configuration file
:param secret: Secret needed for configuration (e.g., API key, token, password, cloud credentials, etc.)
:return: JSONResponse object (with status code 200 or 400)
"""
try:
return JSONResponse(status_code=status.HTTP_200_OK,
......@@ -147,10 +148,10 @@ async def post_scan(iac: UploadFile = File(..., description='IaC file (zip or ta
scan_response_type: ScanResponseType = ScanResponseType.json) -> Union[JSONResponse, HTMLResponse]:
"""
Run IaC scan (POST method)
@param iac: IaC file (zip or tar compressed) that will be scanned'
@param checks: List of selected checks to be executed on IaC
@param scan_response_type: Scan response type (JSON or HTML)
@return: JSONResponse or HTMLResponse object (with status code 200 or 400)
:param iac: IaC file (zip or tar compressed) that will be scanned'
:param checks: List of selected checks to be executed on IaC
:param scan_response_type: Scan response type (JSON or HTML)
:return: JSONResponse or HTMLResponse object (with status code 200 or 400)
"""
try:
if not checks or checks == ['']:
......
......@@ -11,9 +11,9 @@ class Check(ABC):
target_entity_type: Optional[CheckTargetEntityType] = None):
"""
Initialize new IaC check
@param name: Name of the check
@param description: Check description
@param target_entity_type: CheckTargetEntityType object - IaC, component or both
:param name: Name of the check
:param description: Check description
:param target_entity_type: CheckTargetEntityType object - IaC, component or both
"""
self.name = name
self.description = description
......@@ -25,8 +25,8 @@ class Check(ABC):
def configure(self, config_filename: Optional[str], secret: Optional[SecretStr]):
"""
Initiate check configuration (override this method if configuration is needed)
@param config_filename: Name of the check configuration file
@param secret: Secret needed for configuration (e.g. API key, token, password, cloud credentials, etc.)
:param config_filename: Name of the check configuration file
:param secret: Secret needed for configuration (e.g. API key, token, password, cloud credentials, etc.)
"""
pass
......@@ -34,7 +34,7 @@ class Check(ABC):
def run(self, directory: str) -> CheckOutput:
"""
Initiate check run (this method has to be implemented for every subclass)
@param directory: Target directory where the check will be executed
@return: CheckOutput object
:param directory: Target directory where the check will be executed
:return: CheckOutput object
"""
pass
......@@ -2,8 +2,8 @@ class CheckOutput:
def __init__(self, output: str, rc: int):
"""
Initialize new IaC check output
@param output: Returned check output
@param rc: Check return code
:param output: Returned check output
:param rc: Check return code
"""
self.output = output
self.rc = rc
......@@ -11,13 +11,13 @@ class CheckOutput:
def to_dict(self) -> dict:
"""
Transform CheckOutput object to dict (for JSON response)
@return: dict with check output and return code
:return: dict with check output and return code
"""
return {"output": self.output, "rc": self.rc}
def to_string(self) -> str:
"""
Transform CheckOutput object to string (for HTML response)
@return: string with result for check output
:return: string with result for check output
"""
return f'Return code: {self.rc}\nOutput: {self.output}'
......@@ -22,8 +22,8 @@ def openapi(
output: str = typer.Option(None, "--output", "-o", help="Output file path")):
"""
Get OpenAPI specification
@param output_format: OpenAPI output format (JSON or YAML)
@param output: Output file path name, where OpenAPI specification will be written to
:param output_format: OpenAPI output format (JSON or YAML)
:param output: Output file path name, where OpenAPI specification will be written to
"""
try:
if output_format == OpenApiFormat.json:
......
......@@ -93,7 +93,7 @@ class ScanRunner:
def _init_iac_dir(self, iac_file: UploadFile):
"""
Initiate new unique IaC directory for scanning
@param iac_file: IaC file
:param iac_file: IaC file
"""
try:
iac_filename_local = generate_random_pathname(iac_file.filename)
......@@ -115,9 +115,9 @@ class ScanRunner:
def _run_checks(self, selected_checks: Optional[List], scan_response_type: ScanResponseType) -> Union[dict, str]:
"""
Run the specified IaC checks
@param selected_checks: List of selected checks to be executed on IaC
@param scan_response_type: Scan response type (JSON or HTML)
@return: Dict or string with output for running checks
:param selected_checks: List of selected checks to be executed on IaC
:param scan_response_type: Scan response type (JSON or HTML)
:return: Dict or string with output for running checks
"""
if scan_response_type == ScanResponseType.json:
scan_output = {}
......@@ -146,8 +146,8 @@ class ScanRunner:
def enable_check(self, check_name: str) -> str:
"""
Enables the specified check and makes it available to be used
@param check_name: Name of the check
@return: String with result for enabling check
:param check_name: Name of the check
:return: String with result for enabling check
"""
if check_name in self.iac_checks.keys():
check = self.iac_checks[check_name]
......@@ -162,8 +162,8 @@ class ScanRunner:
def disable_check(self, check_name: str) -> str:
"""
Disables the specified check and makes it unavailable to be used
@param check_name: Name of the check
@return: String with result for disabling check
:param check_name: Name of the check
:return: String with result for disabling check
"""
if check_name in self.iac_checks.keys():
check = self.iac_checks[check_name]
......@@ -178,10 +178,10 @@ class ScanRunner:
def configure_check(self, check_name: str, config_file: Optional[UploadFile], secret: Optional[SecretStr]) -> str:
"""
Configures the selected check with the supplied optional configuration file or/and secret
@param check_name: Name of the check
@param config_file: Check configuration file
@param secret: Secret needed for configuration (e.g. API key, token, password etc.)
@return: String with check configuration output
:param check_name: Name of the check
:param config_file: Check configuration file
:param secret: Secret needed for configuration (e.g. API key, token, password etc.)
:return: String with check configuration output
"""
if check_name in self.iac_checks.keys():
check = self.iac_checks[check_name]
......@@ -203,10 +203,10 @@ class ScanRunner:
def scan_iac(self, iac_file: UploadFile, checks: List, scan_response_type: ScanResponseType) -> Union[dict, str]:
"""
Run IaC scanning process (initiate IaC dir, run checks and cleanup IaC dir)
@param iac_file: IaC file that will be scanned
@param checks: List of selected checks to be executed on IaC
@param scan_response_type: Scan response type (JSON or HTML)
@return: Dict or string with scan result
:param iac_file: IaC file that will be scanned
:param checks: List of selected checks to be executed on IaC
:param scan_response_type: Scan response type (JSON or HTML)
:return: Dict or string with scan result
"""
nonexistent_checks = list(set(checks) - set(
map(lambda check: check.name,
......
......@@ -12,9 +12,9 @@ from iac_scan_runner.check_output import CheckOutput
def run_command(command: str, directory: str = ".") -> CheckOutput:
"""
Run command with arguments in directory and return the output and return code
@param command: A command to run
@param directory: Target directory where the command will be executed (default is current dir)
@return: CheckOutput object
:param command: A command to run
:param directory: Target directory where the command will be executed (default is current dir)
:return: CheckOutput object
"""
try:
return CheckOutput(check_output(command, cwd=directory, shell=True, stderr=STDOUT).decode('utf-8'), 0)
......@@ -25,8 +25,8 @@ def run_command(command: str, directory: str = ".") -> CheckOutput:
def determine_archive_format(archive_path: str) -> str:
"""
Figures out the format of the supplied archive file
@param archive_path: Path to the archive file
@return: String with archive format (zip or tar)
:param archive_path: Path to the archive file
:return: String with archive format (zip or tar)
"""
if is_zipfile(archive_path):
return "zip"
......@@ -41,9 +41,9 @@ def determine_archive_format(archive_path: str) -> str:
def generate_random_pathname(prefix: str = "", suffix: str = "") -> str:
"""
Creates a unique random pathname and select last 6 characters
@param prefix: Pathname prefix
@param suffix: Pathname suffix
@return: String with random pathname
:param prefix: Pathname prefix
:param suffix: Pathname suffix
:return: String with random pathname
"""
pathname = prefix + str(uuid4().hex)[-6:] + suffix
if path.exists(pathname):
......@@ -55,9 +55,9 @@ def generate_random_pathname(prefix: str = "", suffix: str = "") -> str:
def unpack_archive_to_dir(archive_path: str, output_dir: Optional[str]) -> str:
"""
Unpacks archive to a specified directory
@param archive_path: Path to the archive file
@param output_dir: Directory where IaC will be unpacked to
@return: String output dir name
:param archive_path: Path to the archive file
:param output_dir: Directory where IaC will be unpacked to
:return: String output dir name
"""
try:
if not output_dir:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment