Skip to content

Commit c8fbe63

Browse files
authored
Merge pull request #192 from SANDAG/LEHD-Employment
[PULL REQUEST] Add Employment Estimates
2 parents 60a34d2 + 86544f5 commit c8fbe63

File tree

16 files changed

+764
-42
lines changed

16 files changed

+764
-42
lines changed

config.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,4 +28,5 @@ housing_and_households = false
2828
population = false
2929
population_by_ase = false
3030
household_characteristics = false
31+
emloyment = false
3132
staging = false

main.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import python.pop_type as pop
1414
import python.ase as ase
1515
import python.hh_characteristics as hh_characteristics
16+
import python.employment as employment
1617
import python.staging as staging
1718

1819
import python.utils as utils
@@ -57,6 +58,11 @@
5758
logger.info("Running Household Characteristics module...")
5859
hh_characteristics.run_hh_characteristics(year)
5960

61+
# Employment module
62+
if utils.RUN_INSTRUCTIONS["employment"]:
63+
logger.info("Running Employment module...")
64+
employment.run_employment(year)
65+
6066
# Diagnostic print for this year
6167
logger.info(f"Finished running {year}\n")
6268

python/ase.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ def _get_controls_inputs(year: int) -> dict[str, pd.DataFrame]:
9898

9999
# Get regional age/sex/ethnicity group quarters distributions
100100
with open(utils.SQL_FOLDER / "ase/get_region_gq_ase_dist.sql") as file:
101-
region_gq_ase_dist = utils.read_sql_query_acs(
101+
region_gq_ase_dist = utils.read_sql_query_fallback(
102102
sql=sql.text(file.read()),
103103
con=con,
104104
params={
@@ -262,7 +262,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]:
262262
with utils.ESTIMATES_ENGINE.connect() as con:
263263
# Get the Age/Sex B010001 table data
264264
with open(utils.SQL_FOLDER / "ase/get_B01001.sql") as file:
265-
b01001 = utils.read_sql_query_acs(
265+
b01001 = utils.read_sql_query_fallback(
266266
sql=sql.text(file.read()),
267267
con=con,
268268
params={
@@ -272,7 +272,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]:
272272

273273
# Get the Ethnicity B03002 table data
274274
with open(utils.SQL_FOLDER / "ase/get_B03002.sql") as file:
275-
b03002 = utils.read_sql_query_acs(
275+
b03002 = utils.read_sql_query_fallback(
276276
sql=sql.text(file.read()),
277277
con=con,
278278
params={
@@ -282,7 +282,7 @@ def _get_seed_inputs(year: int) -> dict[str, pd.DataFrame]:
282282

283283
# Get Age/Sex/Ethnicity data from B01001(B-I) table data
284284
with open(utils.SQL_FOLDER / "ase/get_B01001(B-I).sql") as file:
285-
b01001_b_i = utils.read_sql_query_acs(
285+
b01001_b_i = utils.read_sql_query_fallback(
286286
sql=sql.text(file.read()),
287287
con=con,
288288
params={

python/employment.py

Lines changed: 288 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,288 @@
1+
# Container for the Employment module. See the Estimates-Program wiki page for
2+
# more details: Wiki page TBD
3+
4+
import numpy as np
5+
import pandas as pd
6+
import sqlalchemy as sql
7+
8+
import python.tests as tests
9+
import python.utils as utils
10+
11+
generator = np.random.default_rng(utils.RANDOM_SEED)
12+
13+
14+
def run_employment(year: int):
15+
"""Control function to create jobs data by naics_code (NAICS) at the MGRA level.
16+
17+
Get the LEHD LODES data, aggregate to the MGRA level using the block to MGRA
18+
crosswalk, then apply control totals from QCEW using integerization.
19+
20+
Functionality is split apart for code encapsulation (function inputs not included):
21+
_get_jobs_inputs - Get all input data related to jobs, including LODES data,
22+
block to MGRA crosswalk, and control totals from QCEW. Then process the
23+
LODES data to the MGRA level by naics_code.
24+
_validate_jobs_inputs - Validate the input tables from the above function
25+
_create_jobs_output - Apply control totals to employment data using
26+
utils.integerize_1d() and create output table
27+
_validate_jobs_outputs - Validate the output table from the above function
28+
_insert_jobs - Store input and output data related to jobs to the database.
29+
30+
Args:
31+
year: estimates year
32+
"""
33+
34+
jobs_inputs = _get_jobs_inputs(year)
35+
_validate_jobs_inputs(jobs_inputs)
36+
37+
jobs_outputs = _create_jobs_output(jobs_inputs)
38+
_validate_jobs_outputs(jobs_outputs)
39+
40+
_insert_jobs(jobs_inputs, jobs_outputs)
41+
42+
43+
def _get_lodes_data(year: int) -> pd.DataFrame:
44+
"""Retrieve LEHD LODES data for a specified year and split naics_code 72 into
45+
721 and 722 using split percentages.
46+
47+
Args:
48+
year: The year for which to retrieve LEHD LODES data.
49+
Returns:
50+
combined LEHD LODES data with naics
51+
"""
52+
53+
with utils.ESTIMATES_ENGINE.connect() as con:
54+
with open(utils.SQL_FOLDER / "employment/get_lodes_data.sql") as file:
55+
lodes_data = utils.read_sql_query_fallback(
56+
max_lookback=2,
57+
sql=sql.text(file.read()),
58+
con=con,
59+
params={"year": year},
60+
)
61+
62+
with utils.GIS_ENGINE.connect() as con:
63+
with open(utils.SQL_FOLDER / "employment/get_naics72_split.sql") as file:
64+
split_naics_72 = utils.read_sql_query_fallback(
65+
max_lookback=3,
66+
sql=sql.text(file.read()),
67+
con=con,
68+
params={"year": year},
69+
)
70+
71+
# Split naics_code 72 and combine with other industries
72+
lodes_72_split = lodes_data.loc[lambda df: df["naics_code"] == "72"].merge(
73+
split_naics_72, on="block", how="left"
74+
)
75+
76+
combined_data = pd.concat(
77+
[
78+
lodes_data.loc[lambda df: df["naics_code"] != "72"],
79+
lodes_72_split.assign(
80+
naics_code="721", jobs=lambda df: df["jobs"] * df["pct_721"]
81+
),
82+
lodes_72_split.assign(
83+
naics_code="722", jobs=lambda df: df["jobs"] * df["pct_722"]
84+
),
85+
],
86+
ignore_index=True,
87+
)[["year", "block", "naics_code", "jobs"]]
88+
89+
return combined_data
90+
91+
92+
def _aggregate_lodes_to_mgra(
93+
combined_data: pd.DataFrame, xref: pd.DataFrame, year: int
94+
) -> pd.DataFrame:
95+
"""Aggregate LODES data to MGRA level using allocation percentages.
96+
97+
Args:
98+
combined_data: LODES data with columns: year, block, naics_code, jobs
99+
xref: Crosswalk with columns: block, mgra, allocation_pct
100+
year: The year for which to aggregate data
101+
102+
Returns:
103+
Aggregated data at MGRA level with columns: run_id, year, mgra,
104+
naics_code, value
105+
"""
106+
# Get MGRA data from SQL
107+
with utils.ESTIMATES_ENGINE.connect() as con:
108+
mgra_data = pd.read_sql_query(
109+
sql=sql.text(
110+
"""
111+
SELECT DISTINCT [mgra]
112+
FROM [inputs].[mgra]
113+
WHERE run_id = :run_id
114+
ORDER BY [mgra]
115+
"""
116+
),
117+
con=con,
118+
params={"run_id": utils.RUN_ID},
119+
)
120+
121+
# Get unique industry codes and cross join with MGRA data
122+
unique_industries = combined_data["naics_code"].unique()
123+
jobs = (
124+
mgra_data.merge(pd.DataFrame({"naics_code": unique_industries}), how="cross")
125+
.assign(year=year)
126+
.merge(
127+
combined_data.merge(xref, on="block", how="inner")
128+
.assign(value=lambda df: df["jobs"] * df["allocation_pct"])
129+
.groupby(["year", "mgra", "naics_code"], as_index=False)["value"]
130+
.sum(),
131+
on=["year", "mgra", "naics_code"],
132+
how="left",
133+
)
134+
.fillna({"value": 0})
135+
.assign(run_id=utils.RUN_ID)[["run_id", "year", "mgra", "naics_code", "value"]]
136+
)
137+
138+
return jobs
139+
140+
141+
def _get_jobs_inputs(year: int) -> dict[str, pd.DataFrame]:
142+
"""Get input data related to jobs for a specified year.
143+
144+
Args:
145+
year: The year for which to retrieve input data.
146+
Returns:
147+
input DataFrames related to jobs.
148+
"""
149+
# Store results here
150+
jobs_inputs = {}
151+
152+
jobs_inputs["lodes_data"] = _get_lodes_data(year)
153+
154+
with utils.ESTIMATES_ENGINE.connect() as con:
155+
# get crosswalk from Census blocks to MGRAs
156+
with open(utils.SQL_FOLDER / "employment/xref_block_to_mgra.sql") as file:
157+
jobs_inputs["xref_block_to_mgra"] = pd.read_sql_query(
158+
sql=sql.text(file.read()),
159+
con=con,
160+
params={"mgra_version": utils.MGRA_VERSION},
161+
)
162+
163+
# get regional employment control totals from QCEW
164+
with open(utils.SQL_FOLDER / "employment/QCEW_control.sql") as file:
165+
jobs_inputs["control_totals"] = utils.read_sql_query_fallback(
166+
sql=sql.text(file.read()),
167+
con=con,
168+
params={
169+
"year": year,
170+
},
171+
)
172+
jobs_inputs["control_totals"]["run_id"] = utils.RUN_ID
173+
174+
jobs_inputs["lehd_jobs"] = _aggregate_lodes_to_mgra(
175+
jobs_inputs["lodes_data"], jobs_inputs["xref_block_to_mgra"], year
176+
)
177+
178+
return jobs_inputs
179+
180+
181+
def _validate_jobs_inputs(jobs_inputs: dict[str, pd.DataFrame]) -> None:
182+
"""Validate the jobs input data"""
183+
# LODES only includes blocks with jobs therefore no row count validation performed
184+
# https://lehd.ces.census.gov/data/lehd-code-samples/sections/lodes/basic_examples.html
185+
tests.validate_data(
186+
"LEHD LODES data",
187+
jobs_inputs["lodes_data"],
188+
negative={},
189+
null={},
190+
)
191+
# No row count validation performed as xref is many-to-many
192+
# check
193+
tests.validate_data(
194+
"xref",
195+
jobs_inputs["xref_block_to_mgra"],
196+
negative={},
197+
null={},
198+
)
199+
tests.validate_data(
200+
"QCEW control totals",
201+
jobs_inputs["control_totals"],
202+
row_count={"key_columns": {"naics_code"}},
203+
negative={},
204+
null={},
205+
)
206+
tests.validate_data(
207+
"LEHD jobs at MGRA level",
208+
jobs_inputs["lehd_jobs"],
209+
row_count={"key_columns": {"mgra", "naics_code"}},
210+
negative={},
211+
null={},
212+
)
213+
214+
215+
def _create_jobs_output(
216+
jobs_inputs: dict[str, pd.DataFrame],
217+
) -> dict[str, pd.DataFrame]:
218+
"""Apply control totals to employment data using utils.integerize_1d().
219+
220+
Args:
221+
jobs_inputs: A dictionary containing input DataFrames related to jobs
222+
223+
Returns:
224+
Controlled employment data.
225+
"""
226+
# Sort the input data and get unique naics codes
227+
sorted_jobs = jobs_inputs["lehd_jobs"].sort_values(by=["mgra", "naics_code"])
228+
naics_codes = sorted_jobs["naics_code"].unique()
229+
230+
# Create list to store controlled values for each industry
231+
results = []
232+
233+
# Apply integerize_1d to each naics code
234+
for naics_code in naics_codes:
235+
# Filter for this naics code
236+
naics_mask = sorted_jobs.query("naics_code == @naics_code")
237+
238+
# Get control value and apply integerize_1d
239+
control_value = (
240+
jobs_inputs["control_totals"]
241+
.query("naics_code == @naics_code")["value"]
242+
.iloc[0]
243+
)
244+
245+
results.append(
246+
naics_mask.assign(
247+
value=utils.integerize_1d(
248+
data=naics_mask["value"],
249+
control=control_value,
250+
methodology="weighted_random",
251+
generator=generator,
252+
)
253+
)
254+
)
255+
256+
return {"results": pd.concat(results, ignore_index=True)}
257+
258+
259+
def _validate_jobs_outputs(jobs_outputs: dict[str, pd.DataFrame]) -> None:
260+
"""Validate the jobs output data"""
261+
tests.validate_data(
262+
"Controlled jobs data",
263+
jobs_outputs["results"],
264+
row_count={"key_columns": {"mgra", "naics_code"}},
265+
negative={},
266+
null={},
267+
)
268+
269+
270+
def _insert_jobs(
271+
jobs_inputs: dict[str, pd.DataFrame], jobs_outputs: dict[str, pd.DataFrame]
272+
) -> None:
273+
"""Insert input and output data related to jobs to the database."""
274+
275+
# Insert input and output data to database
276+
with utils.ESTIMATES_ENGINE.connect() as con:
277+
278+
jobs_inputs["control_totals"].to_sql(
279+
name="controls_jobs",
280+
con=con,
281+
schema="inputs",
282+
if_exists="append",
283+
index=False,
284+
)
285+
286+
jobs_outputs["results"].to_sql(
287+
name="jobs", con=con, schema="outputs", if_exists="append", index=False
288+
)

python/hh_characteristics.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,12 @@ def _get_hh_income_inputs(year: int) -> dict[str, pd.DataFrame]:
8686
with open(
8787
utils.SQL_FOLDER / "hh_characteristics" / "get_tract_controls_hh_income.sql"
8888
) as file:
89-
hh_income_inputs["hh_income_tract_controls"] = utils.read_sql_query_acs(
90-
sql=sql.text(file.read()), # type: ignore
91-
con=con, # type: ignore
92-
params={"run_id": utils.RUN_ID, "year": year},
89+
hh_income_inputs["hh_income_tract_controls"] = (
90+
utils.read_sql_query_fallback(
91+
sql=sql.text(file.read()), # type: ignore
92+
con=con, # type: ignore
93+
params={"run_id": utils.RUN_ID, "year": year},
94+
)
9395
)
9496

9597
return hh_income_inputs
@@ -120,7 +122,7 @@ def _get_hh_size_inputs(year: int) -> dict[str, pd.DataFrame]:
120122
/ "hh_characteristics"
121123
/ "get_tract_controls_hh_by_size.sql"
122124
) as file:
123-
hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_acs(
125+
hh_char_inputs["hhs_tract_controls"] = utils.read_sql_query_fallback(
124126
sql=sql.text(file.read()), # type: ignore
125127
con=con, # type: ignore
126128
params={"run_id": utils.RUN_ID, "year": year},

python/hs_hh.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ def _get_hs_hh_inputs(year: int) -> dict[str, pd.DataFrame]:
105105

106106
# Get tract occupancy controls
107107
with open(utils.SQL_FOLDER / "hs_hh/get_tract_controls_hh.sql") as file:
108-
hs_hh_inputs["tract_controls"] = utils.read_sql_query_acs(
108+
hs_hh_inputs["tract_controls"] = utils.read_sql_query_fallback(
109109
sql=sql.text(file.read()),
110110
con=con,
111111
params={

0 commit comments

Comments
 (0)