-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlambda_function.py
More file actions
255 lines (219 loc) · 8.82 KB
/
Copy pathlambda_function.py
File metadata and controls
255 lines (219 loc) · 8.82 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
import json
import os
import hmac
import hashlib
import base64
import sys
import pymongo
import certifi
import logging
from datetime import datetime, UTC
# Configure logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
# Get env vars
PYTHON_ENV = os.environ.get("PYTHON_ENV") or 'development'
HMAC_SECRET = os.environ.get("HMAC_SECRET")
MONGO_URI = os.environ.get("MONGO_URI")
def find_missing_fields(obj, required):
"""
Searches for fields in the `required` list that are missing or not present in the `obj`
dictionary. The method iterates through the `required` fields, checking whether each field
is present and has an associated value in the `obj` dictionary.
"""
return [field for field in required if not obj.get(field)]
def get_mongo_client():
"""
Initializes and returns a MongoDB client instance configured with the given URI
and TLS certificate.
"""
return pymongo.MongoClient(MONGO_URI, tlsCAFile=certifi.where())
def generate_hmac(secret_key, message):
"""
Generate an HMAC using the provided secret key and message. The result is
base64 encoded to match C# compatibility.
"""
return base64.b64encode(
hmac.new(secret_key.encode('utf-8'), message.encode('utf-8'), hashlib.sha256).digest()).decode('utf-8')
def get_scores(game: str):
"""
Fetches and returns the top scores of a specified game from the corresponding
MongoDB database. Scores are sorted by score in descending order and by date
in descending order.
"""
try:
# Connect to MongoDB and retrieve the database list.
logger.info("Connecting to MongoDB.")
client = get_mongo_client()
db_name = f"{game}-{PYTHON_ENV}-db"
db_list = client.list_database_names()
# Check if the database exists.
if db_name not in db_list:
return {
'statusCode': 404,
'body': f"No scores saved for game: {game}."
}
# Reference the database and collection
db = client[db_name]
col = db['top-scores-collection']
# Sort by 'score' descending, then by 'date' descending
sorted_cursor = col.find().sort([
('score', pymongo.DESCENDING),
('date', pymongo.DESCENDING)
])
# Convert the cursor to a list and make _id serializable
documents = list(sorted_cursor)
for doc in documents:
doc['_id'] = str(doc['_id'])
# Return the sorted list as JSON
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps(documents)
}
except Exception as e:
print(f"An error occurred while inserting the document: {e}")
sys.exit(1)
def set_scores(event, body):
# Retrieve the signature and timestamp from headers, the JSON body, and the current time.
signature = event["headers"].get("x-hmac-signature")
timestamp = event["headers"].get("x-timestamp")
current_time = datetime.now(UTC)
if PYTHON_ENV == 'production':
# Validate HMAC secret from environment variables, and signature and timestamp are present
if not HMAC_SECRET:
logger.error("HMAC secret not configured in the environment.")
return {
"statusCode": 500,
"body": "HMAC secret not configured in the environment."
}
if not signature or not timestamp:
logger.warning("Missing HMAC signature or timestamp in headers.")
return {
"statusCode": 403,
"body": "Missing HMAC signature or timestamp."
}
# Recreate the HMAC signature on the server side using the body and timestamp
try:
body_dict = json.loads(body) # Parse the JSON body
# Normalize the JSON to a compact format
normalized_body = json.dumps(body_dict, separators=(',', ':')) # No spaces
logger.info(f"Normalized body: {normalized_body}")
string_to_sign = normalized_body + timestamp
logger.info("String to sign: %s", string_to_sign)
calculated_signature = generate_hmac(HMAC_SECRET, string_to_sign)
logger.info("Calculated HMAC signature: %s", calculated_signature)
except Exception as e:
logger.error("Error generating HMAC: %s", e)
return {
"statusCode": 500,
"body": "Internal server error while generating HMAC."
}
# Validate the signature
if not hmac.compare_digest(calculated_signature, signature):
logger.warning("Invalid HMAC signature. Provided: %s, Calculated: %s", signature, calculated_signature)
return {
"statusCode": 403,
"body": "Invalid HMAC signature."
}
# Validate the timestamp (check if it's within an acceptable range, e.g., 5 minutes)
try:
timestamp_dt = datetime.fromisoformat(timestamp) # ISO format includes timezone info
time_diff = current_time - timestamp_dt
if abs(time_diff.total_seconds()) > 300: # 5-minute tolerance
logger.warning("Timestamp out of range. Current time: %s, Timestamp: %s", current_time, timestamp_dt)
return {
"statusCode": 403,
"body": "Timestamp out of range."
}
except ValueError as e:
logger.error("Invalid timestamp format: %s", e)
return {
"statusCode": 400,
"body": "Invalid timestamp format."
}
# Validate required fields
missing_params = find_missing_fields(body,["player", "score", "date"])
if missing_params:
logger.warning("Missing parameters in the request: %s", missing_params)
return {
"statusCode": 400,
"body": f"Missing parameters: {missing_params}"
}
try:
# Connect to MongoDB
logger.info("Connecting to MongoDB.")
client = get_mongo_client()
db = client[f'{body["game"]}-{PYTHON_ENV}-db']
col = db["top-scores-collection"]
# Insert the new document
new_doc = {
"player": body["player"],
"score": body["score"],
"date": body["date"],
}
logger.info("Inserting new document: %s", new_doc)
col.insert_one(new_doc)
# Retrieve the top 10 documents, sorted by "score" descending
logger.info("Retrieving top 10 scores from the database.")
top_10_cursor = col.find().sort("score", pymongo.DESCENDING).limit(10)
top_10_list = list(top_10_cursor)
# Keep only the documents with _id in the top 10
top_10_ids = [doc["_id"] for doc in top_10_list]
logger.info("Top 10 document IDs: %s", top_10_ids)
col.delete_many({"_id": {"$nin": top_10_ids}})
# Convert ObjectIds to strings for JSON serialization
for doc in top_10_list:
doc["_id"] = str(doc["_id"])
logger.info("Returning updated top 10 scores.")
return {
"statusCode": 200,
"headers": {"Content-Type": "application/json"},
"body": json.dumps(top_10_list),
}
except Exception as e:
logger.error("An error occurred while processing the request: %s", e, exc_info=True)
return {
"statusCode": 500,
"body": "Internal server error."
}
def lambda_handler(event, context):
logger.info(f"Lambda invoked with event: {json.dumps(event)}")
# Validate MongoDB URI
if not MONGO_URI:
logger.error("MongoDB URI not configured in the environment.")
return {
"statusCode": 500,
"body": "MongoDB URI not configured in the environment."
}
try:
# Parse the request body
body = event["body"]
body = json.loads(body)
logger.info("Parsed request body: %s", body)
except json.JSONDecodeError as e:
logger.error("Error parsing request body: %s", e)
return {
"statusCode": 400,
"body": "Invalid JSON format in the request body."
}
# Validate required fields
missing_params = find_missing_fields(body, ["action", "game"])
if missing_params:
logger.warning("Missing parameters in the request: %s", missing_params)
return {
"statusCode": 400,
"body": f"Missing parameters: {missing_params}"
}
# Process the request based on the action
match body["action"]:
case "get":
return get_scores(body["game"])
case "set":
return set_scores(event, body)
case _:
logger.warning(f"Invalid action: {body['action']} for function.")
return {
"statusCode": 400,
"body": f"Invalid action: {body['action']} for function."
}