This repository was archived by the owner on Nov 19, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathalphavantagetickers.py
More file actions
88 lines (75 loc) · 3 KB
/
alphavantagetickers.py
File metadata and controls
88 lines (75 loc) · 3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import requests
import pandas as pd
import logging
from dotenv import load_dotenv
import os
from pymongo import MongoClient, UpdateOne
from datetime import datetime
# Load environment variables from .env file
load_dotenv()
# Create a logger
logging.basicConfig(filename='logfile.log', level=logging.INFO,
format='%(asctime)s:%(levelname)s:%(message)s')
# MongoDB connection setup
MONGO_DB_CONN_STRING = os.getenv("MONGO_DB_CONN_STRING")
DB_NAME = "stock_data" # Specify your database name
COLLECTION_NAME = "tickers" # Specify your collection name
def fetch_tickers(api_key):
"""
Fetches ticker data from Alpha Vantage API.
"""
url = f'https://www.alphavantage.co/query?function=LISTING_STATUS&apikey={api_key}'
try:
response = requests.get(url)
response.raise_for_status()
logging.info("Connected to Alpha Vantage API successfully.")
return response.text
except requests.exceptions.RequestException as e:
logging.error(f"Error connecting to Alpha Vantage API: {e}")
return None
def parse_tickers(data):
"""
Parses the ticker data into a DataFrame.
"""
lines = data.strip().split("\n")
header, *rows = lines
data_list = [dict(zip(header.split(","), row.split(","))) for row in rows]
return pd.DataFrame(data_list)
def save_to_mongodb(dataframe, connection_string, db_name, collection_name):
"""
Saves the DataFrame to a MongoDB collection with deduplication using bulk operations.
Also adds a datetime_imported field to each document.
"""
client = MongoClient(connection_string)
db = client[db_name]
collection = db[collection_name]
# Prepare bulk update operations
operations = []
datetime_imported = datetime.now() # Capture the current datetime for the import
for record in dataframe.to_dict('records'):
# Add/update the datetime_imported field in the record
record['datetime_imported'] = datetime_imported
# Assume 'symbol' is the unique identifier for each ticker
filter_ = {'symbol': record['symbol']}
update_ = {'$set': record}
operations.append(UpdateOne(filter_, update_, upsert=True))
try:
# Execute all the operations in bulk
if operations:
result = collection.bulk_write(operations)
logging.info(f"Bulk write completed with {result.modified_count} modifications.")
else:
logging.info("No operations to perform.")
except Exception as e:
logging.error(f"Error saving data to MongoDB with bulk write: {e}")
def update_tickers():
"""
Main function to fetch ticker data from Alpha Vantage and save it to MongoDB.
"""
api_key = os.getenv("ALPHA_VANTAGE_API")
data = fetch_tickers(api_key)
if data:
df = parse_tickers(data)
save_to_mongodb(df, MONGO_DB_CONN_STRING, DB_NAME, COLLECTION_NAME)
if __name__ == "__main__":
update_tickers()