Skip to content

Commit 4fc741c

Browse files
nj1973helensilva14sundar-mudupalli-work
authored
feat: Support custom BigQuery storage api endpoint (#1501)
* feat: Experiment with definition of custom BigQuery Storage API client * chore: Reformat * feat: Support custom BigQuery storage api endpoint * chore: reformat * Updated and wrote a small section on running DVT at on-prem * chore: Add type hints * feat: Add --secret-manager-api-endpoint option * Update connections.md Removed references to ENDPOINT_URI - since it is not a URI, but a IP Address based on my testing. I left the references to Secret Manager in - due to my discussion in the isssue. * Update README.md Updated on-prem section to explain using endpoints from on prem. * docs: Docs updates * Revert "feat: Add --secret-manager-api-endpoint option" This reverts commit 7e3d8fb. --------- Co-authored-by: Helen Cristina <[email protected]> Co-authored-by: Sundar Mudupalli <[email protected]>
1 parent 1c2d215 commit 4fc741c

File tree

13 files changed

+241
-36
lines changed

13 files changed

+241
-36
lines changed

README.md

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,10 @@ please review the [Connections](https://github.com/GoogleCloudPlatform/professio
6666
### Running Validations
6767

6868
The CLI is the main interface to use this tool and it has several different
69-
commands which can be used to create and run validations. Below are the command
70-
syntax and options for running validations.
69+
commands which can be used to create and run validations. DVT is designed to run in
70+
an environment connected to GCP services, specifically, BigQuery, GCS and Secret manager.
71+
If DVT is being run on-premises or in an environment with restricted access to GCP services, see
72+
[running DVT at on-prem](#running-dvt-at-on-prem). Below are the command syntax and options for running validations.
7173

7274
Alternatives to running DVT in the CLI include deploying DVT to Cloud Run, Cloud Functions, or Airflow
7375
([Examples Here](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples)). See the [Validation Logic](https://github.com/GoogleCloudPlatform/professional-services-data-validator#validation-logic) section
@@ -525,6 +527,11 @@ For example, this flag can be used as follows:
525527
"target_query": "SELECT `hash__all`, `station_id`\nFROM ..."
526528
}
527529
```
530+
#### Running DVT at on-prem
531+
On-premises environments can have limited access to GCP services. DVT supports using BigQuery for storing validation results, GCS for storage and
532+
the Secret Manager for secrets. You may also use BigQuery and Spanner as a source or target for validation. Service
533+
APIs (i.e. bigquery.googleapis.com) may not always be accessible due to firewall restrictions. Work with your network
534+
adminstrator to identify the way to access these services. They may set up a [Private Service Connect Endpoint](https://cloud.google.com/vpc/docs/about-accessing-google-apis-endpoints). DVT supports accessing source and target tables in Spanner and BigQuery via endpoints set up in your network. Connection Parameters for [Spanner](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/connections.md#google-spanner) and [BigQuery](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/connections.md#google-bigquery) outline regarding how to specify endpoints.
528535

529536
### Running DVT with YAML Configuration Files
530537

data_validation/cli_tools.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,11 @@
7676
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
7777
[
7878
"api_endpoint",
79-
'(Optional) GCP BigQuery API endpoint (e.g. "https://mybq.p.googleapis.com")',
79+
'(Optional) GCP BigQuery API endpoint (e.g. "https://bigquery-mypsc.p.googleapis.com")',
80+
],
81+
[
82+
"storage_api_endpoint",
83+
'(Optional) GCP BigQuery Storage API endpoint (e.g. "https://bigquerystorage-mypsc.p.googleapis.com")',
8084
],
8185
],
8286
consts.SOURCE_TYPE_TERADATA: [
@@ -140,7 +144,7 @@
140144
["google_service_account_key_path", "(Optional) GCP SA Key Path"],
141145
[
142146
"api_endpoint",
143-
'(Optional) GCP Spanner API endpoint (e.g. "https://mycs.p.googleapis.com")',
147+
'(Optional) GCP Spanner API endpoint (e.g. "https://spanner-mypsc.p.googleapis.com")',
144148
],
145149
],
146150
consts.SOURCE_TYPE_FILESYSTEM: [
@@ -1284,6 +1288,9 @@ def _get_result_handler(rc_value: str, sa_file=None) -> dict:
12841288
consts.PROJECT_ID: conn_from_file["project_id"],
12851289
consts.TABLE_ID: config[1],
12861290
consts.API_ENDPOINT: conn_from_file.get("api_endpoint", None),
1291+
consts.STORAGE_API_ENDPOINT: conn_from_file.get(
1292+
"storage_api_endpoint", None
1293+
),
12871294
}
12881295
elif conn_from_file[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_POSTGRES:
12891296
result_handler = {

data_validation/clients.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
from data_validation import client_info, consts, exceptions
2929
from data_validation.secret_manager import SecretManagerBuilder
30+
31+
from third_party.ibis.ibis_bigquery.api import bigquery_connect
3032
from third_party.ibis.ibis_cloud_spanner.api import spanner_connect
3133
from third_party.ibis.ibis_impala.api import impala_connect
3234
from third_party.ibis.ibis_mssql.api import mssql_connect
@@ -111,24 +113,42 @@ def get_google_bigquery_client(
111113
)
112114

113115

116+
def _get_google_bqstorage_client(credentials=None, api_endpoint: str = None):
117+
options = None
118+
if api_endpoint:
119+
options = client_options.ClientOptions(api_endpoint=api_endpoint)
120+
from google.cloud import bigquery_storage_v1 as bigquery_storage
121+
122+
return bigquery_storage.BigQueryReadClient(
123+
credentials=credentials,
124+
client_options=options,
125+
)
126+
127+
114128
def get_bigquery_client(
115-
project_id: str, dataset_id: str = "", credentials=None, api_endpoint: str = None
129+
project_id: str,
130+
dataset_id: str = "",
131+
credentials=None,
132+
api_endpoint: str = None,
133+
storage_api_endpoint: str = None,
116134
):
117135
google_client = get_google_bigquery_client(
118136
project_id, credentials=credentials, api_endpoint=api_endpoint
119137
)
138+
bqstorage_client = None
139+
if storage_api_endpoint:
140+
bqstorage_client = _get_google_bqstorage_client(
141+
credentials=credentials, api_endpoint=storage_api_endpoint
142+
)
120143

121-
ibis_client = ibis.bigquery.connect(
144+
return bigquery_connect(
122145
project_id=project_id,
123146
dataset_id=dataset_id,
124147
credentials=credentials,
148+
bigquery_client=google_client,
149+
bqstorage_client=bqstorage_client,
125150
)
126151

127-
# Override the BigQuery client object to ensure the correct user agent is
128-
# included and any api_endpoint is used.
129-
ibis_client.client = google_client
130-
return ibis_client
131-
132152

133153
def get_pandas_client(table_name, file_path, file_type):
134154
"""Return pandas client and env with file loaded into DataFrame

data_validation/consts.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,7 @@
143143
TABLE_ID = "table_id"
144144
GOOGLE_SERVICE_ACCOUNT_KEY_PATH = "google_service_account_key_path"
145145
API_ENDPOINT = "api_endpoint"
146+
STORAGE_API_ENDPOINT = "storage_api_endpoint"
146147

147148
# Result Handler Output Table Fields
148149
VALIDATION_TYPE = "validation_type"

data_validation/result_handlers/bigquery.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ def get_handler_for_project(
6565
Explicit credentials to use in case default credentials
6666
aren't working properly.
6767
status_list (list): provided status to filter the results with
68-
api_endpoint (str): BigQuery API endpoint (e.g. https://mybq.p.googleapis.com)
68+
api_endpoint (str): BigQuery API endpoint (e.g. https://bigquery-mypsc.p.googleapis.com)
6969
text_format (str, optional):
7070
This allows the user to influence the text results written via logger.debug.
7171
See: https://github.com/GoogleCloudPlatform/professional-services-data-validator/issues/871

docs/connections.md

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -123,8 +123,12 @@ data-validation connections add
123123
--connection-name CONN_NAME BigQuery Connection name
124124
--project-id MY_PROJECT Project ID where BQ data resides
125125
[--google-service-account-key-path PATH_TO_SA_KEY] Path to SA key
126-
[--api-endpoint ENDPOINT_URI] BigQuery API endpoint (e.g.
127-
"https://mybq.p.googleapis.com)
126+
[--api-endpoint API_ENDPOINT] BigQuery API endpoint (e.g.
127+
"https://bigquery-mypsc.p.googleapis.com)
128+
[--storage-api-endpoint STORAGE_API_ENDPOINT] BigQuery Storage API endpoint (e.g.
129+
"bigquerystorage-mypsc.p.googleapis.com)
130+
Note this is a GRPC endpoint and does not
131+
include a URI scheme.
128132
```
129133

130134
### User/Service account needs following BigQuery permissions to run DVT
@@ -150,8 +154,8 @@ data-validation connections add
150154
--instance-id MY_INSTANCE Spanner instance to connect to
151155
--database-id MY-DB Spanner database (schema) to connect to
152156
[--google-service-account-key-path PATH_TO_SA_KEY] Path to SA key
153-
[--api-endpoint ENDPOINT_URI] Spanner API endpoint (e.g.
154-
"https://mycs.p.googleapis.com)
157+
[--api-endpoint API_ENDPOINT] Spanner API endpoint (e.g.
158+
"https://spanner-mypsc.p.googleapis.com")
155159
```
156160

157161
### User/Service account needs following Spanner role to run DVT

tests/unit/test__main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@
152152
consts.PROJECT_ID: "dummy-gcp-project",
153153
consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH: None,
154154
"connection_name": "dummy-bq-connection",
155-
"api_endpoint": None,
155+
consts.API_ENDPOINT: None,
156+
consts.STORAGE_API_ENDPOINT: None,
156157
}
157158
CONNECTION_DESCRIBE_ARGS = {
158159
"verbose": False,
@@ -180,7 +181,7 @@
180181
consts.PROJECT_ID: "dummy-gcp-project",
181182
consts.GOOGLE_SERVICE_ACCOUNT_KEY_PATH: None,
182183
"connection_name": "dummy-bq-connection",
183-
"api_endpoint": None,
184+
consts.API_ENDPOINT: None,
184185
} # same as CONNECTION_ADD_ARGS but with the command item replaced
185186
FIND_TABLES_ARGS = {
186187
"verbose": False,

tests/unit/test_cli_tools.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,9 @@
8383
"--project-id",
8484
"example-project",
8585
"--api-endpoint",
86-
"https://mybq.p.googleapis.com",
86+
"https://bigquery-mypsc.p.googleapis.com",
87+
"--storage-api-endpoint",
88+
"https://bigquerystorage-mypsc.p.googleapis.com",
8789
]
8890

8991

@@ -309,7 +311,7 @@ def test_create_bq_connection(caplog, fs):
309311
assert bq_conn[consts.SOURCE_TYPE] == consts.SOURCE_TYPE_BIGQUERY
310312

311313
conn_from_file = cli_tools.get_connection(args.connection_name)
312-
assert conn_from_file["api_endpoint"] == "https://mybq.p.googleapis.com"
314+
assert conn_from_file["api_endpoint"] == "https://bigquery-mypsc.p.googleapis.com"
313315

314316

315317
@mock.patch(
@@ -625,6 +627,7 @@ def test_get_result_handler_by_conn_file(fs):
625627
consts.PROJECT_ID: args.project_id,
626628
consts.TABLE_ID: "dataset.table",
627629
consts.API_ENDPOINT: args.api_endpoint,
630+
consts.STORAGE_API_ENDPOINT: args.storage_api_endpoint,
628631
}
629632

630633
# Plus check standard format still works.

third_party/ibis/ibis_addon/operations.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@
6161
from ibis.expr.types import BinaryValue, NumericValue, StringValue, TemporalValue
6262

6363
# Do not remove these lines, they trigger patching of Ibis code.
64-
import third_party.ibis.ibis_biquery.api # noqa
64+
import third_party.ibis.ibis_bigquery.api # noqa
6565
import third_party.ibis.ibis_mysql.compiler # noqa
6666
from third_party.ibis.ibis_mssql import registry as mssql_registry
6767
from third_party.ibis.ibis_postgres import registry as postgres_registry
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
# Copyright 2024 Google LLC
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import TYPE_CHECKING
16+
17+
import google.auth.credentials
18+
import google.cloud.bigquery as bq
19+
import pydata_google_auth
20+
from pydata_google_auth import cache
21+
22+
from ibis.backends.bigquery import (
23+
Backend as BigQueryBackend,
24+
_create_client_info,
25+
parse_project_and_dataset,
26+
CLIENT_ID,
27+
CLIENT_SECRET,
28+
EXTERNAL_DATA_SCOPES,
29+
SCOPES,
30+
)
31+
32+
if TYPE_CHECKING:
33+
import google.cloud.bigquery_storage_v1
34+
35+
36+
class Backend(BigQueryBackend):
37+
def __init__(self):
38+
super().__init__()
39+
self.storage_client = None
40+
41+
def do_connect(
42+
self,
43+
project_id: str = None,
44+
dataset_id: str = "",
45+
credentials: google.auth.credentials.Credentials = None,
46+
application_name: str = None,
47+
auth_local_webserver: bool = True,
48+
auth_external_data: bool = False,
49+
auth_cache: str = "default",
50+
partition_column: str = "PARTITIONTIME",
51+
# Custom DVT arguments:
52+
bigquery_client: bq.Client = None,
53+
bqstorage_client: "google.cloud.bigquery_storage_v1.BigQueryReadClient" = None,
54+
):
55+
"""Copy of Ibis v5 BigQuery do_connect() customized for DVT, see original method for docs."""
56+
default_project_id = ""
57+
58+
if credentials is None:
59+
scopes = SCOPES
60+
if auth_external_data:
61+
scopes = EXTERNAL_DATA_SCOPES
62+
63+
if auth_cache == "default":
64+
credentials_cache = cache.ReadWriteCredentialsCache(
65+
filename="ibis.json"
66+
)
67+
elif auth_cache == "reauth":
68+
credentials_cache = cache.WriteOnlyCredentialsCache(
69+
filename="ibis.json"
70+
)
71+
elif auth_cache == "none":
72+
credentials_cache = cache.NOOP
73+
else:
74+
raise ValueError(
75+
f"Got unexpected value for auth_cache = '{auth_cache}'. "
76+
"Expected one of 'default', 'reauth', or 'none'."
77+
)
78+
79+
credentials, default_project_id = pydata_google_auth.default(
80+
scopes,
81+
client_id=CLIENT_ID,
82+
client_secret=CLIENT_SECRET,
83+
credentials_cache=credentials_cache,
84+
use_local_webserver=auth_local_webserver,
85+
)
86+
87+
project_id = project_id or default_project_id
88+
89+
(
90+
self.data_project,
91+
self.billing_project,
92+
self.dataset,
93+
) = parse_project_and_dataset(project_id, dataset_id)
94+
95+
if bigquery_client is None:
96+
self.client = bq.Client(
97+
project=self.billing_project,
98+
credentials=credentials,
99+
client_info=_create_client_info(application_name),
100+
)
101+
else:
102+
self.client = bigquery_client
103+
self.partition_column = partition_column
104+
self.storage_client = bqstorage_client
105+
106+
def _cursor_to_arrow(
107+
self,
108+
cursor,
109+
*,
110+
method=None,
111+
chunk_size: int = None,
112+
):
113+
"""Copy of Ibis v5 BigQuery _cursor_to_arrow() except can use custom DVT storage client"""
114+
if method is None:
115+
116+
def method(result, storage_client=self.storage_client):
117+
return result.to_arrow(
118+
progress_bar_type=None,
119+
# Include DVT specific storage client.
120+
bqstorage_client=storage_client,
121+
create_bqstorage_client=bool(not self.storage_client),
122+
)
123+
124+
query = cursor.query
125+
query_result = query.result(page_size=chunk_size)
126+
# workaround potentially not having the ability to create read sessions
127+
# in the dataset project
128+
orig_project = query_result._project
129+
query_result._project = self.billing_project
130+
try:
131+
arrow_obj = method(query_result)
132+
finally:
133+
query_result._project = orig_project
134+
return arrow_obj
135+
136+
def list_primary_key_columns(self, database: str, table: str) -> list:
137+
"""Return a list of primary key column names."""
138+
# TODO: Related to issue-1253, it's not clear if this is possible, we should revisit if it becomes a requirement.
139+
return None
140+
141+
def dvt_list_tables(self, like=None, database=None):
142+
return self.list_tables(like=like, database=database)

0 commit comments

Comments
 (0)