diff --git a/dojo/settings/settings.dist.py b/dojo/settings/settings.dist.py index 60579577150..96fa2fdf3ac 100644 --- a/dojo/settings/settings.dist.py +++ b/dojo/settings/settings.dist.py @@ -1587,6 +1587,9 @@ def saml2_attrib_map_format(din): "Kubescape JSON Importer": DEDUPE_ALGO_HASH_CODE, "Kiuwan SCA Scan": DEDUPE_ALGO_HASH_CODE, "Rapplex Scan": DEDUPE_ALGO_HASH_CODE, + "Wizcli Img Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, + "Wizcli Dir Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, + "Wizcli IAC Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL, "AppCheck Web Application Scanner": DEDUPE_ALGO_HASH_CODE, "AWS Inspector2 Scan": DEDUPE_ALGO_UNIQUE_ID_FROM_TOOL_OR_HASH_CODE, "Legitify Scan": DEDUPE_ALGO_HASH_CODE, diff --git a/dojo/tools/wizcli_common_parsers/parsers.py b/dojo/tools/wizcli_common_parsers/parsers.py index e80dada5378..e20c7a9dec3 100644 --- a/dojo/tools/wizcli_common_parsers/parsers.py +++ b/dojo/tools/wizcli_common_parsers/parsers.py @@ -1,190 +1,441 @@ +import hashlib +import logging +import re + from dojo.models import Finding +logger = logging.getLogger(__name__) + +# Mapping from Wiz severities to DefectDojo severities +SEVERITY_MAPPING = { + "CRITICAL": "Critical", + "HIGH": "High", + "MEDIUM": "Medium", + "LOW": "Low", + "INFORMATIONAL": "Info", + "INFO": "Info", + "UNKNOWN": "Info", # Default for unknown severities +} + class WizcliParsers: @staticmethod - def parse_libraries(libraries, test): - findings = [] - if libraries: - for library in libraries: - lib_name = library.get("name", "N/A") - lib_version = library.get("version", "N/A") - lib_path = library.get("path", "N/A") - vulnerabilities = library.get("vulnerabilities", []) - - for vulnerability in vulnerabilities: - vuln_name = vulnerability.get("name", "N/A") - severity = vulnerability.get("severity", "low").lower().capitalize() - fixed_version = vulnerability.get("fixedVersion", "N/A") - source = vulnerability.get("source", "N/A") - description = vulnerability.get("description", "N/A") - score = vulnerability.get("score", "N/A") - exploitability_score = vulnerability.get("exploitabilityScore", "N/A") - has_exploit = vulnerability.get("hasExploit", False) - has_cisa_kev_exploit = vulnerability.get("hasCisaKevExploit", False) - - finding_description = ( - f"**Library Name**: {lib_name}\n" - f"**Library Version**: {lib_version}\n" - f"**Library Path**: {lib_path}\n" - f"**Vulnerability Name**: {vuln_name}\n" - f"**Fixed Version**: {fixed_version}\n" - f"**Source**: {source}\n" - f"**Description**: {description}\n" - f"**Score**: {score}\n" - f"**Exploitability Score**: {exploitability_score}\n" - f"**Has Exploit**: {has_exploit}\n" - f"**Has CISA KEV Exploit**: {has_cisa_kev_exploit}\n" - ) - - finding = Finding( - title=f"{lib_name} - {vuln_name}", - description=finding_description, - file_path=lib_path, - severity=severity, - static_finding=True, - dynamic_finding=False, - mitigation=None, - test=test, - ) - findings.append(finding) - return findings + def get_severity(severity_str): + """Maps Wiz severity strings to DefectDojo standard TitleCase.""" + if severity_str: + return SEVERITY_MAPPING.get(severity_str.upper(), "Info") + return "Info" # Default if severity is missing or None + + @staticmethod + def extract_reference_link(text): + """Extracts potential URL from remediation instructions.""" + if not text: + return None + # Basic regex to find URLs, might need refinement + match = re.search(r"(https?://[^\s)]+)", text) + return match.group(1) if match else None @staticmethod - def parse_secrets(secrets, test): - findings = [] - if secrets: - for secret in secrets: - secret_id = secret.get("id", "N/A") - desc = secret.get("description", "N/A") - severity = "High" - file_name = secret.get("path", "N/A") - line_number = secret.get("lineNumber", "N/A") - match_content = secret.get("type", "N/A") - - description = ( - f"**Secret ID**: {secret_id}\n" - f"**Description**: {desc}\n" - f"**File Name**: {file_name}\n" - f"**Line Number**: {line_number}\n" - f"**Match Content**: {match_content}\n" + def _generate_unique_id(components: list) -> str: + """ + Generates a stable unique ID for findings. + + Args: + components: List of components to use for ID generation + + """ + # Filter out None and empty values + filtered_components = [str(c).strip() for c in components if c is not None and str(c).strip()] + + # Sort components for consistent order regardless of input order + filtered_components = sorted(filtered_components) + + id_string = "|".join(filtered_components) + hash_object = hashlib.sha256(id_string.encode("utf-8")) + return hash_object.hexdigest() + + @staticmethod + def parse_libraries(libraries_data, test): + """ + Parses library vulnerability data into granular DefectDojo findings. + Creates one finding per unique vulnerability (CVE/ID) per library instance (name/version/path). + """ + findings_list = [] + if not libraries_data: + return findings_list + + for lib_item in libraries_data: + lib_name = lib_item.get("name", "N/A") + lib_version = lib_item.get("version", "N/A") + lib_path = lib_item.get("path", "N/A") + lib_line = lib_item.get("startLine") + + vulnerabilities_in_lib_instance = lib_item.get("vulnerabilities", []) + if not vulnerabilities_in_lib_instance: + continue + + for vuln_data in vulnerabilities_in_lib_instance: + vuln_name = vuln_data.get("name", "N/A") + severity_str = vuln_data.get("severity") + severity = WizcliParsers.get_severity(severity_str) + fixed_version = vuln_data.get("fixedVersion") + source_url = vuln_data.get("source", "N/A") + vuln_description_from_wiz = vuln_data.get("description") + score_str = vuln_data.get("score") + has_exploit = vuln_data.get("hasExploit", False) + has_cisa_kev_exploit = vuln_data.get("hasCisaKevExploit", False) + + title = f"{lib_name} {lib_version} - {vuln_name}" + + description_parts = [ + f"**Vulnerability**: `{vuln_name}`", + f"**Severity**: {severity}", + f"**Library**: `{lib_name}`", + f"**Version**: `{lib_version}`", + f"**Path/Manifest**: `{lib_path}`", + ] + if lib_line is not None: + description_parts.append(f"**Line in Manifest**: {lib_line}") + + if fixed_version: + description_parts.append(f"**Fixed Version**: {fixed_version}") + mitigation = f"Update `{lib_name}` to version `{fixed_version}` or later in path/manifest `{lib_path}`." + else: + description_parts.append("**Fixed Version**: N/A") + mitigation = f"No fixed version available from Wiz. Investigate `{vuln_name}` for `{lib_name}` in `{lib_path}` and apply vendor guidance or risk acceptance." + + description_parts.append(f"**Source**: {source_url}") + if vuln_description_from_wiz: + description_parts.append(f"\n**Details from Wiz**:\n{vuln_description_from_wiz}\n") + if score_str is not None: + description_parts.append(f"**CVSS Score (from Wiz)**: {score_str}") + description_parts.extend([ + f"**Has Exploit (Known)**: {has_exploit}", + f"**In CISA KEV**: {has_cisa_kev_exploit}", + ]) + + failed_policies = vuln_data.get("failedPolicyMatches", []) + if failed_policies: + description_parts.append("\n**Failed Policies**:") + for match in failed_policies: + policy = match.get("policy", {}) + description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})") + ignored_policies = vuln_data.get("ignoredPolicyMatches", []) + if ignored_policies: + description_parts.append("\n**Ignored Policies**:") + for match in ignored_policies: + policy = match.get("policy", {}) + reason = match.get("ignoreReason", "N/A") + description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')}), Reason: {reason}") + + full_description = "\n".join(description_parts) + references = source_url if source_url != "N/A" else None + + # Generate unique ID using stable components including file path + unique_id = WizcliParsers._generate_unique_id( + [lib_name, lib_version, vuln_name, lib_path], ) finding = Finding( - title=f"Secret: {desc}", - description=description, + test=test, + title=title, + description=full_description, severity=severity, - file_path=file_name, - line=line_number, + mitigation=mitigation, + file_path=lib_path, + line=lib_line if lib_line is not None else 0, + component_name=lib_name, + component_version=lib_version, static_finding=True, dynamic_finding=False, - mitigation=None, - test=test, + unique_id_from_tool=unique_id, + vuln_id_from_tool=vuln_name, + references=references, + active=True, # Always set as active since we don't have status from Wiz ) - findings.append(finding) - return findings + if score_str is not None: + try: + finding.cvssv3_score = float(score_str) + except (ValueError, TypeError): + logger.warning(f"Could not convert score '{score_str}' to float for finding '{title}'.") + if isinstance(vuln_name, str) and vuln_name.upper().startswith("CVE-"): + finding.cve = vuln_name + findings_list.append(finding) + return findings_list @staticmethod - def parse_rule_matches(rule_matches, test): - findings = [] - if rule_matches: - for rule_match in rule_matches: - rule = rule_match.get("rule", {}) - rule_id = rule.get("id", "N/A") - rule_name = rule.get("name", "N/A") - severity = rule_match.get("severity", "low").lower().capitalize() - - matches = rule_match.get("matches", []) - if matches: - for match in matches: - resource_name = match.get("resourceName", "N/A") - file_name = match.get("fileName", "N/A") - line_number = match.get("lineNumber", "N/A") - match_content = match.get("matchContent", "N/A") - expected = match.get("expected", "N/A") - found = match.get("found", "N/A") - file_type = match.get("fileType", "N/A") - - description = ( - f"**Rule ID**: {rule_id}\n" - f"**Rule Name**: {rule_name}\n" - f"**Resource Name**: {resource_name}\n" - f"**File Name**: {file_name}\n" - f"**Line Number**: {line_number}\n" - f"**Match Content**: {match_content}\n" - f"**Expected**: {expected}\n" - f"**Found**: {found}\n" - f"**File Type**: {file_type}\n" - ) - - finding = Finding( - title=f"{rule_name} - {resource_name}", - description=description, - severity=severity, - file_path=file_name, - line=line_number, - static_finding=True, - dynamic_finding=False, - mitigation=None, - test=test, - ) - findings.append(finding) - return findings + def parse_secrets(secrets_data, test): + """Parses secret findings into granular DefectDojo findings.""" + findings_list = [] + if not secrets_data: + return findings_list + for secret in secrets_data: + secret_description = secret.get("description", "Secret detected") + secret_type = secret.get("type", "UNKNOWN_TYPE") + file_path = secret.get("path", "N/A") + line_number = secret.get("lineNumber") + severity_str = secret.get("severity") + severity = WizcliParsers.get_severity(severity_str) + title = f"Secret Detected: {secret_description} ({secret_type})" + description_parts = [ + f"**Type**: `{secret_type}`", + f"**Description**: {secret_description}", + f"**File**: `{file_path}`", + ] + if line_number is not None: + description_parts.append(f"**Line**: {line_number}") + details = secret.get("details", {}) + detail_type = details.get("__typename") + if detail_type == "DiskScanSecretDetailsPassword": + description_parts.append("\n**Password Details**:") + if (pw_len := details.get("length")) is not None: + description_parts.append(f"- Length: {pw_len}") + if (is_complex := details.get("isComplex")) is not None: + description_parts.append(f"- Complex: {is_complex}") + elif detail_type == "DiskScanSecretDetailsCloudKey": + description_parts.append("\n**Cloud Key Details**:") + if (provider_id := details.get("providerUniqueID")): + description_parts.append(f"- Provider Unique ID: {provider_id}") + if (key_type_num := details.get("keyType")) is not None: + description_parts.append(f"- Key Type Code: {key_type_num}") + if (is_long_term := details.get("isLongTerm")) is not None: + description_parts.append(f"- Long Term Key: {is_long_term}") + + failed_policies = secret.get("failedPolicyMatches", []) + if failed_policies: + description_parts.append("\n**Failed Policies**:") + for match in failed_policies: + policy = match.get("policy", {}) + description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})") + + full_description = "\n".join(description_parts) + mitigation = "Rotate the exposed secret immediately. Remove the secret from the specified file path and line. Store secrets securely using a secrets management solution. Review commit history." + + # Generate unique ID using stable components + unique_id = WizcliParsers._generate_unique_id( + [secret_type, file_path, str(line_number) if line_number is not None else "0"], + ) + + finding = Finding( + test=test, + title=title, + description=full_description, + severity=severity, + mitigation=mitigation, + file_path=file_path, + line=line_number if line_number is not None else 0, + static_finding=True, + dynamic_finding=False, + unique_id_from_tool=unique_id, + active=True, # Always set as active since we don't have status from Wiz + ) + findings_list.append(finding) + return findings_list @staticmethod - def parse_os_packages(os_packages, test): - findings = [] - if os_packages: - for osPackage in os_packages: - pkg_name = osPackage.get("name", "N/A") - pkg_version = osPackage.get("version", "N/A") - vulnerabilities = osPackage.get("vulnerabilities", []) - - for vulnerability in vulnerabilities: - vuln_name = vulnerability.get("name", "N/A") - severity = vulnerability.get("severity", "low").lower().capitalize() - fixed_version = vulnerability.get("fixedVersion", "N/A") - source = vulnerability.get("source", "N/A") - description = vulnerability.get("description", "N/A") - score = vulnerability.get("score", "N/A") - exploitability_score = vulnerability.get("exploitabilityScore", "N/A") - has_exploit = vulnerability.get("hasExploit", False) - has_cisa_kev_exploit = vulnerability.get("hasCisaKevExploit", False) - - finding_description = ( - f"**OS Package Name**: {pkg_name}\n" - f"**OS Package Version**: {pkg_version}\n" - f"**Vulnerability Name**: {vuln_name}\n" - f"**Fixed Version**: {fixed_version}\n" - f"**Source**: {source}\n" - f"**Description**: {description}\n" - f"**Score**: {score}\n" - f"**Exploitability Score**: {exploitability_score}\n" - f"**Has Exploit**: {has_exploit}\n" - f"**Has CISA KEV Exploit**: {has_cisa_kev_exploit}\n" - ) - - finding = Finding( - title=f"{pkg_name} - {vuln_name}", - description=finding_description, - severity=severity, - static_finding=True, - dynamic_finding=False, - mitigation=None, - test=test, - ) - findings.append(finding) - return findings + def parse_os_packages(os_packages_data, test): + """Parses OS package vulnerabilities into granular DefectDojo findings.""" + findings_list = [] + if not os_packages_data: + return findings_list + for os_pkg in os_packages_data: + pkg_name = os_pkg.get("name", "N/A") + pkg_version = os_pkg.get("version", "N/A") + vulnerabilities = os_pkg.get("vulnerabilities", []) + if not vulnerabilities: + continue + for vuln_data in vulnerabilities: + vuln_name = vuln_data.get("name", "N/A") + severity_str = vuln_data.get("severity") + severity = WizcliParsers.get_severity(severity_str) + fixed_version = vuln_data.get("fixedVersion") + source_url = vuln_data.get("source", "N/A") + vuln_description_from_wiz = vuln_data.get("description") + score_str = vuln_data.get("score") + has_exploit = vuln_data.get("hasExploit", False) + has_cisa_kev_exploit = vuln_data.get("hasCisaKevExploit", False) + title = f"OS Pkg: {pkg_name} {pkg_version} - {vuln_name}" + description_parts = [ + f"**Vulnerability**: `{vuln_name}`", + f"**Severity**: {severity}", + f"**OS Package**: `{pkg_name}`", + f"**Version**: `{pkg_version}`", + ] + if fixed_version: + description_parts.append(f"**Fixed Version**: {fixed_version}") + mitigation = f"Update OS package `{pkg_name}` to version `{fixed_version}` or later." + else: + description_parts.append("**Fixed Version**: N/A") + mitigation = f"Patch or update OS package `{pkg_name}` as per vendor advisory for `{vuln_name}`." + description_parts.append(f"**Source**: {source_url}") + if vuln_description_from_wiz: + description_parts.append(f"\n**Details from Wiz**:\n{vuln_description_from_wiz}\n") + if score_str is not None: + description_parts.append(f"**CVSS Score (from Wiz)**: {score_str}") + description_parts.extend([ + f"**Has Exploit (Known)**: {has_exploit}", + f"**In CISA KEV**: {has_cisa_kev_exploit}", + ]) + failed_policies = vuln_data.get("failedPolicyMatches", []) + if failed_policies: + description_parts.append("\n**Failed Policies**:") + for match in failed_policies: + policy = match.get("policy", {}) + description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})") + ignored_policies = vuln_data.get("ignoredPolicyMatches", []) + if ignored_policies: + description_parts.append("\n**Ignored Policies**:") + for match in ignored_policies: + policy = match.get("policy", {}) + description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {policy.get('id', 'N/A')})") + + full_description = "\n".join(description_parts) + references = source_url if source_url != "N/A" else None + + # Generate unique ID using stable components + unique_id = WizcliParsers._generate_unique_id( + [pkg_name, pkg_version, vuln_name], + ) + + finding = Finding( + test=test, + title=title, + description=full_description, + severity=severity, + mitigation=mitigation, + static_finding=True, + dynamic_finding=False, + unique_id_from_tool=unique_id, + vuln_id_from_tool=vuln_name, + references=references, + active=True, # Always set as active since we don't have status from Wiz + ) + if score_str is not None: + try: + finding.cvssv3_score = float(score_str) + except (ValueError, TypeError): + logger.warning(f"Could not convert score '{score_str}' to float for finding '{title}'.") + if isinstance(vuln_name, str) and vuln_name.upper().startswith("CVE-"): + finding.cve = vuln_name + findings_list.append(finding) + return findings_list @staticmethod - def convert_status(wiz_status) -> dict: + def parse_rule_matches(rule_matches_data, test): """ - Convert the Wiz Status to a dict of Finding status flags. - - - Open-> Active = True - - Other statuses that may exist... + Parses IaC rule match data into granular DefectDojo findings. + Creates one finding per rule match instance on a specific resource. """ + findings_list = [] + if not rule_matches_data: + logger.debug("No ruleMatches data found to parse.") + return findings_list + + for rule_match in rule_matches_data: + rule = rule_match.get("rule", {}) + rule_id = rule.get("id", "N/A") + rule_name = rule.get("name", "Unnamed Rule") + # Use the severity from the ruleMatch level + severity_str = rule_match.get("severity") + severity = WizcliParsers.get_severity(severity_str) + + matches = rule_match.get("matches", []) + if not matches: + continue + + for match in matches: + resource_name = match.get("resourceName", "N/A") + file_name = match.get("fileName", "N/A") + line_number = match.get("lineNumber") # Can be None or int + match_content = match.get("matchContent", "N/A") # Code snippet + expected = match.get("expected", "N/A") + found = match.get("found", "N/A") + file_type = match.get("fileType", "IaC") # e.g., TERRAFORM, KUBERNETES + remediation = match.get("remediationInstructions") # Can be None + + # Title: IaC: Rule Name - Resource Name (e.g., IaC: S3 Bucket Logging Disabled - my-bucket) + title = f"{rule_name} - {resource_name}" + + # Description + description_parts = [ + f"**Rule**: {rule_name} (ID: `{rule_id}`)", + f"**Severity**: {severity}", + f"**Resource**: `{resource_name}`", + f"**File**: `{file_name}`", + ] + if line_number is not None: + description_parts.append(f"**Line**: {line_number}") + if match_content and match_content != "N/A": + description_parts.append(f"**Code Snippet**: ```\n{match_content}\n```") # Use markdown code block + + description_parts.extend([ + "\n**Finding Details**:", + f"- **Expected**: {expected}", + f"- **Found**: {found}", + f"- **File Type**: {file_type}", + ]) + + # Use remediationInstructions as mitigation and potentially extract reference + mitigation = remediation or "Refer to Wiz rule details and vendor documentation." + references = WizcliParsers.extract_reference_link(remediation) + + # Policy Information (from match level first, then rule level) + match_failed_policies = match.get("failedPolicies", []) + rule_failed_policies = rule_match.get("failedPolicyMatches", []) # Top level rule match policies + if match_failed_policies or rule_failed_policies: + description_parts.append("\n**Failed Policies**:") + processed_policy_ids = set() + for pol_match in match_failed_policies + rule_failed_policies: + policy = pol_match.get("policy", {}) + pol_id = policy.get("id") + if pol_id and pol_id not in processed_policy_ids: + description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {pol_id})") + processed_policy_ids.add(pol_id) + + match_ignored_policies = match.get("ignoredPolicyMatches", []) + rule_ignored_policies = [] # Ignored policies seem to only be at the match level in the sample + if match_ignored_policies or rule_ignored_policies: + description_parts.append("\n**Ignored Policies**:") + processed_policy_ids = set() + for pol_match in match_ignored_policies + rule_ignored_policies: + policy = pol_match.get("policy", {}) + pol_id = policy.get("id") + reason = pol_match.get("ignoreReason", "N/A") + if pol_id and pol_id not in processed_policy_ids: + description_parts.append(f"- {policy.get('name', 'N/A')} (ID: {pol_id}), Reason: {reason}") + processed_policy_ids.add(pol_id) + + full_description = "\n".join(description_parts) + + # Generate unique ID using stable components for IAC + unique_id = WizcliParsers._generate_unique_id( + [rule_id, resource_name, file_name, str(line_number) if line_number is not None else "0"], # Only use rule ID and resource name for deduplication + ) + + finding = Finding( + test=test, + title=title, + description=full_description, + severity=severity, + mitigation=mitigation, + file_path=file_name, + line=line_number if line_number is not None else 0, + component_name=resource_name, # Use resource name as component + static_finding=True, + dynamic_finding=False, + unique_id_from_tool=unique_id, + vuln_id_from_tool=rule_id, # Use rule ID as the identifier + references=references, + active=True, # Always set as active since we don't have status from Wiz + ) + findings_list.append(finding) + + return findings_list + + @staticmethod + def convert_status(wiz_status) -> dict: + """Convert the Wiz Status to a dict of Finding status flags.""" if (status := wiz_status) is not None: if status.upper() == "OPEN": return {"active": True} @@ -194,5 +445,4 @@ def convert_status(wiz_status) -> dict: return {"active": False, "out_of_scope": True} if status.upper() == "IN_PROGRESS": return {"active": True} - # Return the default status of active return {"active": True} diff --git a/dojo/tools/wizcli_dir/parser.py b/dojo/tools/wizcli_dir/parser.py index 1beb0b68dbb..4abb0899543 100644 --- a/dojo/tools/wizcli_dir/parser.py +++ b/dojo/tools/wizcli_dir/parser.py @@ -1,36 +1,66 @@ import json +import logging from dojo.tools.wizcli_common_parsers.parsers import WizcliParsers +logger = logging.getLogger(__name__) + class WizcliDirParser: - """Wizcli Dir Scan results in JSON file format.""" + """Wiz CLI Directory/IaC Scan results in JSON file format.""" def get_scan_types(self): return ["Wizcli Dir Scan"] def get_label_for_scan_types(self, scan_type): - return "Wizcli Dir Scan" + return "Wiz CLI Scan (Directory)" def get_description_for_scan_types(self, scan_type): - return "Wizcli Dir Scan results in JSON file format." + return "Parses Wiz CLI Directory/IaC scan results in JSON format, creating granular findings for vulnerabilities and secrets." - def get_findings(self, filename, test): - scan_data = filename.read() + def get_findings(self, file, test): + """Processes the JSON report and returns a list of DefectDojo Finding objects.""" try: - data = json.loads(scan_data.decode("utf-8")) - except Exception: + scan_data = file.read() + if isinstance(scan_data, bytes): + # Try decoding common encodings + try: + scan_data = scan_data.decode("utf-8-sig") # Handles BOM + except UnicodeDecodeError: + scan_data = scan_data.decode("utf-8") # Fallback data = json.loads(scan_data) + except json.JSONDecodeError as e: + msg = f"Invalid JSON format: {e}" + logger.error(msg) + raise ValueError(msg) from e + except Exception as e: + msg = f"Error processing report file: {e}" + logger.error(msg) + raise ValueError(msg) from e + findings = [] - results = data.get("result", {}) + results_data = data.get("result", {}) + + if not results_data: + logger.warning("No 'result' key found in the Wiz report. Unable to parse findings.") + return findings - libraries = results.get("libraries", None) + # Parse Libraries (Vulnerabilities) + libraries = results_data.get("libraries") if libraries: + logger.debug(f"Parsing {len(libraries)} library entries.") findings.extend(WizcliParsers.parse_libraries(libraries, test)) + else: + logger.debug("No 'libraries' data found in results.") - secrets = results.get("secrets", None) + # Parse Secrets + secrets = results_data.get("secrets") if secrets: + logger.debug(f"Parsing {len(secrets)} secret entries.") findings.extend(WizcliParsers.parse_secrets(secrets, test)) + else: + logger.debug("No 'secrets' data found in results.") + logger.info(f"WizcliDirParser processed {len(findings)} findings.") return findings diff --git a/dojo/tools/wizcli_iac/parser.py b/dojo/tools/wizcli_iac/parser.py index 40a53d62dfb..09ab717d8d4 100644 --- a/dojo/tools/wizcli_iac/parser.py +++ b/dojo/tools/wizcli_iac/parser.py @@ -1,36 +1,64 @@ import json +import logging -from dojo.tools.wizcli_common_parsers.parsers import WizcliParsers +from dojo.tools.wizcli_common_parsers.parsers import WizcliParsers # Adjust import path +logger = logging.getLogger(__name__) -class WizcliIaCParser: - """Wizcli IaC Scan results in JSON file format.""" +class WizcliIacParser: + + """Wiz CLI IaC Scan results in JSON file format.""" def get_scan_types(self): return ["Wizcli IaC Scan"] def get_label_for_scan_types(self, scan_type): - return "Wizcli IaC Scan" + return "Wiz CLI Scan (IaC)" def get_description_for_scan_types(self, scan_type): - return "Wizcli IaC Scan results in JSON file format." + return "Parses Wiz CLI Infrastructure as Code (IaC) scan results in JSON format." - def get_findings(self, filename, test): - scan_data = filename.read() + def get_findings(self, file, test): try: - data = json.loads(scan_data.decode("utf-8")) - except Exception: + scan_data = file.read() + if isinstance(scan_data, bytes): + try: + scan_data = scan_data.decode("utf-8-sig") + except UnicodeDecodeError: + scan_data = scan_data.decode("utf-8") data = json.loads(scan_data) + except json.JSONDecodeError as e: + msg = f"Invalid JSON format: {e}" + logger.error(msg) + raise ValueError(msg) from e + except Exception as e: + msg = f"Error processing report file: {e}" + logger.error(msg) + raise ValueError(msg) from e + findings = [] - results = data.get("result", {}) + results_data = data.get("result", {}) + + if not results_data: + logger.warning("No 'result' key found in the Wiz report.") + return findings - rule_matches = results.get("ruleMatches", None) + # Parse Rule Matches (IaC findings) + rule_matches = results_data.get("ruleMatches") if rule_matches: + logger.debug(f"Parsing {len(rule_matches)} rule match entries.") findings.extend(WizcliParsers.parse_rule_matches(rule_matches, test)) + else: + logger.debug("No 'ruleMatches' data found in results.") - secrets = results.get("secrets", None) + # Parse Secrets (if present in IaC scans) + secrets = results_data.get("secrets") if secrets: + logger.debug(f"Parsing {len(secrets)} secret entries.") findings.extend(WizcliParsers.parse_secrets(secrets, test)) + else: + logger.debug("No 'secrets' data found in results.") + logger.info(f"WizcliIacParser processed {len(findings)} findings.") return findings diff --git a/dojo/tools/wizcli_img/parser.py b/dojo/tools/wizcli_img/parser.py index 3b8a898b4d0..31b1317d5b2 100644 --- a/dojo/tools/wizcli_img/parser.py +++ b/dojo/tools/wizcli_img/parser.py @@ -1,40 +1,73 @@ import json +import logging -from dojo.tools.wizcli_common_parsers.parsers import WizcliParsers +from dojo.tools.wizcli_common_parsers.parsers import WizcliParsers # Adjust import path + +logger = logging.getLogger(__name__) class WizcliImgParser: - """Wizcli Image Scan results in JSON file format.""" + """Wiz CLI Container Image Scan results in JSON file format.""" def get_scan_types(self): + # Use a distinct name for image scans return ["Wizcli Img Scan"] def get_label_for_scan_types(self, scan_type): - return "Wizcli Img Scan" + return "Wiz CLI Scan (Image)" def get_description_for_scan_types(self, scan_type): - return "Wizcli Img report file can be imported in JSON format." + return "Parses Wiz CLI Container Image scan results in JSON format." - def get_findings(self, filename, test): - scan_data = filename.read() + def get_findings(self, file, test): try: - data = json.loads(scan_data.decode("utf-8")) - except Exception: + scan_data = file.read() + if isinstance(scan_data, bytes): + try: + scan_data = scan_data.decode("utf-8-sig") + except UnicodeDecodeError: + scan_data = scan_data.decode("utf-8") data = json.loads(scan_data) + except json.JSONDecodeError as e: + msg = f"Invalid JSON format: {e}" + logger.error(msg) + raise ValueError(msg) from e + except Exception as e: + msg = f"Error processing report file: {e}" + logger.error(msg) + raise ValueError(msg) from e + findings = [] - results = data.get("result", {}) + results_data = data.get("result", {}) + + if not results_data: + logger.warning("No 'result' key found in the Wiz report.") + return findings - osPackages = results.get("osPackages", None) - if osPackages: - findings.extend(WizcliParsers.parse_os_packages(osPackages, test)) + # Parse OS Packages - Key difference for image scans + os_packages = results_data.get("osPackages") + if os_packages: + logger.debug(f"Parsing {len(os_packages)} OS package entries.") + findings.extend(WizcliParsers.parse_os_packages(os_packages, test)) + else: + logger.debug("No 'osPackages' data found in results.") - libraries = results.get("libraries", None) + # Parse Libraries (if present in image scans) + libraries = results_data.get("libraries") if libraries: + logger.debug(f"Parsing {len(libraries)} library entries.") findings.extend(WizcliParsers.parse_libraries(libraries, test)) + else: + logger.debug("No 'libraries' data found in results.") - secrets = results.get("secrets", None) + # Parse Secrets (if present in image scans) + secrets = results_data.get("secrets") if secrets: + logger.debug(f"Parsing {len(secrets)} secret entries.") findings.extend(WizcliParsers.parse_secrets(secrets, test)) + else: + logger.debug("No 'secrets' data found in results.") + logger.info(f"WizcliImgParser processed {len(findings)} findings.") return findings