28
28
29
29
SECURITY_HUB_CLIENT = boto3 .client ('securityhub' , region_name = REGION )
30
30
31
+
31
32
class AwsService (Enum ):
32
33
"""AWS service supported by function"""
33
34
@@ -141,21 +142,18 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A
141
142
compliance_status = finding ["Compliance" ].get ("Status" , "UNKNOWN" )
142
143
workflow_status = finding ["Workflow" ].get ("Status" , "UNKNOWN" )
143
144
if compliance_status == "FAILED" and workflow_status == "NEW" :
144
- notified = SECURITY_HUB_CLIENT .batch_update_findings (
145
- FindingIdentifiers = [{
146
- 'Id' : finding .get ('Id' ),
147
- 'ProductArn' : finding .get ("ProductArn" )
148
- }],
149
- Workflow = {"Status" : "NOTIFIED" }
150
- )
151
- logging .warning (f"Successfully updated finding status to NOTIFIED: { json .dumps (notified )} " )
145
+ notified = SECURITY_HUB_CLIENT .batch_update_findings (
146
+ FindingIdentifiers = [{
147
+ 'Id' : finding .get ('Id' ),
148
+ 'ProductArn' : finding .get ("ProductArn" )
149
+ }],
150
+ Workflow = {"Status" : "NOTIFIED" }
151
+ )
152
+ logging .warning (f"Successfully updated finding status to NOTIFIED: { json .dumps (notified )} " )
152
153
except Exception as e :
153
154
logging .error (f"Failed to update finding status: { str (e )} " )
154
155
pass
155
156
156
-
157
-
158
-
159
157
if finding .get ("ProductName" ) == "Inspector" :
160
158
severity = finding ["Severity" ].get ("Label" , "INFORMATIONAL" )
161
159
compliance_status = finding ["Compliance" ].get ("Status" , "UNKNOWN" )
@@ -178,7 +176,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A
178
176
179
177
color = SecurityHubSeverity .get (severity .upper (), SecurityHubSeverity .INFORMATIONAL ).value
180
178
if compliance_status == "PASSED" :
181
- color = "#4BB543"
179
+ color = "#4BB543"
182
180
183
181
slack_message = {
184
182
"color" : color ,
@@ -225,7 +223,7 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A
225
223
226
224
color = SecurityHubSeverity .get (severity .upper (), SecurityHubSeverity .INFORMATIONAL ).value
227
225
if compliance_status == "PASSED" :
228
- color = "#4BB543"
226
+ color = "#4BB543"
229
227
230
228
slack_message = {
231
229
"color" : color ,
@@ -250,9 +248,9 @@ def format_aws_security_hub(message: Dict[str, Any], region: str) -> Dict[str, A
250
248
251
249
return slack_message
252
250
253
-
254
251
return format_default (message = message )
255
252
253
+
256
254
class SecurityHubSeverity (Enum ):
257
255
"""Maps Security Hub finding severity to Slack message format color"""
258
256
@@ -269,13 +267,15 @@ def get(name, default):
269
267
except KeyError :
270
268
return default
271
269
270
+
272
271
class GuardDutyFindingSeverity (Enum ):
273
272
"""Maps GuardDuty finding severity to Slack message format color"""
274
273
275
274
Low = "#777777"
276
275
Medium = "warning"
277
276
High = "danger"
278
277
278
+
279
279
def format_guardduty_finding (message : Dict [str , Any ], region : str ) -> Dict [str , Any ]:
280
280
"""
281
281
Format GuardDuty finding event into Slack message format
0 commit comments