-
Notifications
You must be signed in to change notification settings - Fork 330
Expand file tree
/
Copy pathlambda_s3_public.py
More file actions
152 lines (136 loc) · 5.29 KB
/
lambda_s3_public.py
File metadata and controls
152 lines (136 loc) · 5.29 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
# -*- coding: utf-8 -*-
__author__ = "Chirag Rathod (Srce Cde)"
__license__ = "MIT"
__email__ = "chiragr83@gmail.com"
__maintainer__ = "Chirag Rathod (Srce Cde)"
import json
import boto3
def sendMessage(eventDetails, subject=None, message=None):
"""Helper method to send message"""
# configure subject
if subject:
subject = subject
else:
subject = "S3 Public permission Alert for {} bucket".format(
eventDetails.get("bucketName")
)
# configure message
if message:
message = message
else:
message = f"""
The bucket permission is modified. Details below.
UserType : {eventDetails.get('UserType')}
eventTime : {eventDetails.get('eventTime')}
region : {eventDetails.get('region')}
sourceIP: {eventDetails.get('sourceIP')}
userAgent: {eventDetails.get('userAgent')}
Other crucial information: {eventDetails.get('parameters')}
"""
sns = boto3.client("sns")
# send message
response = sns.publish(
TopicArn="arn:aws:sns:us-east-1:052674914236:xxx",
Message=message,
Subject=subject,
)
def getEventDetails(eventData):
"""Helper method to parse event data"""
eventDetails = {
"UserType": userDetails(eventData.get("userIdentity")),
"eventTime": eventData.get("eventTime"),
"region": eventData.get("awsRegion"),
"sourceIP": eventData.get("sourceIPAddress"),
"userAgent": eventData.get("userAgent"),
"parameters": eventData.get("requestParameters"),
"bucketName": eventData.get("requestParameters").get("bucketName"),
}
return eventDetails
def userDetails(userData):
"""Helper method to parse userdata"""
if userData.get("type") == "Root":
userType = "Root User"
elif userData.get("type") == "IAMUser":
userType = "IAM User"
elif userData.get("type") == "AssumedRole":
userType = "Assumed Role"
elif userData.get("type") == "FederatedUser":
userType = "Federated User"
elif userData.get("type") == "AWSAccount":
userType = "User from another AWS account"
elif userData.get("type") == "AWSService":
userType = "AWS Service"
return userType
def lambda_handler(event, context):
"""Lambda handler"""
print(event)
# parsing event
eventData = event.get("detail")
# URI for all user group
allUserGroup = "http://acs.amazonaws.com/groups/global/AllUsers"
# URI for auth user group
authUserGroup = "http://acs.amazonaws.com/groups/global/AuthenticatedUsers"
# parsing and fetching required information
eventDetails = getEventDetails(eventData)
# check for PutBucketPublicAccessBlock eventype
if eventData.get("eventName") == "PutBucketPublicAccessBlock":
# fetch information of PutBucketPublicAccessBlock flags
getPublicInfo = eventData.get("requestParameters").get(
"PublicAccessBlockConfiguration"
)
# checking flags if any is false
if not (
getPublicInfo.get("RestrictPublicBuckets")
and getPublicInfo.get("RestrictPublicBuckets")
and getPublicInfo.get("RestrictPublicBuckets")
and getPublicInfo.get("RestrictPublicBuckets")
):
# sending message/email
sendMessage(eventDetails, subject=None, message=None)
# check for PutBucketAcl eventype
if eventData.get("eventName") == "PutBucketAcl":
# fetching grant information
grantDetails = (
eventData.get("requestParameters")
.get("AccessControlPolicy")
.get("AccessControlList")
.get("Grant")
)
# setting flag
flag = False
# checking length of grant since grantDetails[0] will be for owner
if len(grantDetails) > 1:
# looping through grants. Skipping first since its for owner itself
for grant in grantDetails[1:]:
# checking the user group
if grant.get("Grantee").get("URI") and (
grant.get("Grantee").get("URI") in [allUserGroup, authUserGroup]
):
# checking if any of this permission exist
if grant.get("Permission") in [
"READ",
"WRITE",
"READ_ACP",
"WRITE_ACP",
"FULL_CONTROL",
]:
# setting flag true
flag = True
# if flag is true then send message
if flag:
sendMessage(eventDetails, subject=None, message=None)
# check for CreateBucket eventype
if eventData.get("eventName") == "CreateBucket":
# create client of S3
client = boto3.client("s3")
# setting put_public_access_block to True
r = client.put_public_access_block(
Bucket=eventDetails.get("bucketName"),
PublicAccessBlockConfiguration={
"BlockPublicAcls": True,
"IgnorePublicAcls": True,
"BlockPublicPolicy": True,
"RestrictPublicBuckets": True,
},
)
return {"statusCode": 200, "body": json.dumps("Thanks from Srce Cde!")}