Skip to content

Commit 999986a

Browse files
committed
Merge branch 'dev' of github.com:pinecone-io/pinecone-python-client into upsert_numpy_values
# Conflicts: # pinecone/index.py
2 parents 5754639 + 322f90e commit 999986a

File tree

2 files changed

+126
-14
lines changed

2 files changed

+126
-14
lines changed

pinecone/core/grpc/index_grpc.py

Lines changed: 89 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,16 @@
55
import numbers
66
from abc import ABC, abstractmethod
77
from functools import wraps
8+
from importlib.util import find_spec
89
from typing import NamedTuple, Optional, Dict, Iterable, Union, List, Tuple, Any
910
from collections.abc import Mapping
1011

1112
import certifi
1213
import grpc
1314
from google.protobuf import json_format
1415
from grpc._channel import _InactiveRpcError, _MultiThreadedRendezvous
15-
from tqdm import tqdm
16+
from tqdm.autonotebook import tqdm
17+
import json
1618

1719
from pinecone import FetchResponse, QueryResponse, ScoredVector, SingleQueryResults, DescribeIndexStatsResponse
1820
from pinecone.config import Config
@@ -84,9 +86,38 @@ def __init__(self, index_name: str, channel=None, grpc_config: GRPCClientConfig
8486
"client-version": CLIENT_VERSION
8587
}
8688
self._endpoint_override = _endpoint_override
89+
90+
self.method_config = json.dumps(
91+
{
92+
"methodConfig": [
93+
{
94+
"name": [{"service": "VectorService.Upsert"}],
95+
"retryPolicy": {
96+
"maxAttempts": 5,
97+
"initialBackoff": "0.1s",
98+
"maxBackoff": "1s",
99+
"backoffMultiplier": 2,
100+
"retryableStatusCodes": ["UNAVAILABLE"],
101+
},
102+
},
103+
{
104+
"name": [{"service": "VectorService"}],
105+
"retryPolicy": {
106+
"maxAttempts": 5,
107+
"initialBackoff": "0.1s",
108+
"maxBackoff": "1s",
109+
"backoffMultiplier": 2,
110+
"retryableStatusCodes": ["UNAVAILABLE"],
111+
},
112+
}
113+
]
114+
}
115+
)
116+
87117
self._channel = channel or self._gen_channel()
88118
self.stub = self.stub_class(self._channel)
89119

120+
90121
@property
91122
@abstractmethod
92123
def stub_class(self):
@@ -100,7 +131,9 @@ def _gen_channel(self, options=None):
100131
target = self._endpoint()
101132
default_options = {
102133
"grpc.max_send_message_length": MAX_MSG_SIZE,
103-
"grpc.max_receive_message_length": MAX_MSG_SIZE
134+
"grpc.max_receive_message_length": MAX_MSG_SIZE,
135+
"grpc.service_config": self.method_config,
136+
"grpc.enable_retries": True
104137
}
105138
if self.grpc_client_config.secure:
106139
default_options['grpc.ssl_target_name_override'] = target.split(':')[0]
@@ -114,8 +147,8 @@ def _gen_channel(self, options=None):
114147
root_cas = open(certifi.where(), "rb").read()
115148
tls = grpc.ssl_channel_credentials(root_certificates=root_cas)
116149
channel = grpc.secure_channel(target, tls, options=_options)
117-
interceptor = RetryOnRpcErrorClientInterceptor(self.retry_config)
118-
return grpc.intercept_channel(channel, interceptor)
150+
151+
return channel
119152

120153
@property
121154
def channel(self):
@@ -246,7 +279,7 @@ def add_done_callback(self, fun):
246279

247280
def result(self, timeout=None):
248281
try:
249-
self._delegate.result(timeout=timeout)
282+
return self._delegate.result(timeout=timeout)
250283
except _MultiThreadedRendezvous as e:
251284
raise PineconeException(e._state.debug_error_string) from e
252285

@@ -345,19 +378,19 @@ def _dict_to_grpc_vector(item):
345378
sparse_values = None
346379
if 'sparse_values' in item:
347380
if not isinstance(item['sparse_values'], Mapping):
348-
raise ValueError(f"Column `sparse_values` is expected to be a dictionary, found {type(item['sparse_values'])}")
381+
raise TypeError(f"Column `sparse_values` is expected to be a dictionary, found {type(item['sparse_values'])}")
349382
indices = item['sparse_values'].get('indices', None)
350383
values = item['sparse_values'].get('values', None)
351384
try:
352385
sparse_values = GRPCSparseValues(indices=indices, values=values)
353386
except TypeError as e:
354-
raise ValueError("Found unexpected data in column `sparse_values`. "
387+
raise TypeError("Found unexpected data in column `sparse_values`. "
355388
"Expected format is `'sparse_values': {'indices': List[int], 'values': List[float]}`."
356389
) from e
357390

358-
metadata = item.get('metadata', None)
359-
if metadata is not None and not isinstance(metadata, Mapping):
360-
raise TypeError(f"Column `metadata` is expected to be a dictionary, found {type(metadata)}")
391+
metadata = item.get('metadata', None)
392+
if metadata is not None and not isinstance(metadata, Mapping):
393+
raise TypeError(f"Column `metadata` is expected to be a dictionary, found {type(metadata)}")
361394

362395
try:
363396
return GRPCVector(id=item['id'], values=item['values'], sparse_values=sparse_values,
@@ -417,6 +450,52 @@ def _upsert_batch(self,
417450
request = UpsertRequest(vectors=vectors, **args_dict)
418451
return self._wrap_grpc_call(self.stub.Upsert, request, timeout=timeout, **kwargs)
419452

453+
def upsert_from_dataframe(self,
454+
df,
455+
namespace: str = None,
456+
batch_size: int = 500,
457+
use_async_requests: bool = True,
458+
show_progress: bool = True) -> None:
459+
"""Upserts a dataframe into the index.
460+
461+
Args:
462+
df: A pandas dataframe with the following columns: id, vector, and metadata.
463+
namespace: The namespace to upsert into.
464+
batch_size: The number of rows to upsert in a single batch.
465+
use_async_requests: Whether to upsert multiple requests at the same time using asynchronous request mechanism.
466+
Set to `False`
467+
show_progress: Whether to show a progress bar.
468+
"""
469+
try:
470+
import pandas as pd
471+
except ImportError:
472+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
473+
474+
if not isinstance(df, pd.DataFrame):
475+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
476+
477+
pbar = tqdm(total=len(df), disable=not show_progress, desc="sending upsert requests")
478+
results = []
479+
for chunk in self._iter_dataframe(df, batch_size=batch_size):
480+
res = self.upsert(vectors=chunk, namespace=namespace, async_req=use_async_requests)
481+
pbar.update(len(chunk))
482+
results.append(res)
483+
484+
if use_async_requests:
485+
results = [async_result.result() for async_result in tqdm(results, desc="collecting async responses")]
486+
487+
upserted_count = 0
488+
for res in results:
489+
upserted_count += res.upserted_count
490+
491+
return UpsertResponse(upserted_count=upserted_count)
492+
493+
@staticmethod
494+
def _iter_dataframe(df, batch_size):
495+
for i in range(0, len(df), batch_size):
496+
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
497+
yield batch
498+
420499
def delete(self,
421500
ids: Optional[List[str]] = None,
422501
delete_all: Optional[bool] = None,

pinecone/index.py

Lines changed: 37 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#
22
# Copyright (c) 2020-2021 Pinecone Systems Inc. All right reserved.
33
#
4+
from tqdm.autonotebook import tqdm
5+
from importlib.util import find_spec
46
import numbers
57
import numpy as np
6-
from tqdm import tqdm
8+
79
from collections.abc import Iterable, Mapping
810
from typing import Union, List, Tuple, Optional, Dict, Any
911

@@ -195,9 +197,9 @@ def _dict_to_vector(item):
195197
"Expected format is `'sparse_values': {'indices': List[int], 'values': List[float]}`."
196198
) from e
197199

198-
metadata = item.get('metadata') or {}
199-
if not isinstance(metadata, Mapping):
200-
raise TypeError(f"Column `metadata` is expected to be a dictionary, found {type(metadata)}")
200+
metadata = item.get('metadata') or {}
201+
if not isinstance(metadata, Mapping):
202+
raise TypeError(f"Column `metadata` is expected to be a dictionary, found {type(metadata)}")
201203

202204
if isinstance(item['values'], np.ndarray):
203205
item['values'] = item['values'].tolist()
@@ -236,6 +238,37 @@ def _vector_transform(item: Union[Vector, Tuple]):
236238
**{k: v for k, v in kwargs.items() if k in _OPENAPI_ENDPOINT_PARAMS}
237239
)
238240

241+
def upsert_from_dataframe(self,
242+
df,
243+
namespace: str = None,
244+
batch_size: int = 500,
245+
show_progress: bool = True) -> None:
246+
"""Upserts a dataframe into the index.
247+
248+
Args:
249+
df: A pandas dataframe with the following columns: id, vector, and metadata.
250+
namespace: The namespace to upsert into.
251+
batch_size: The number of rows to upsert in a single batch.
252+
show_progress: Whether to show a progress bar.
253+
"""
254+
try:
255+
import pandas as pd
256+
except ImportError:
257+
raise RuntimeError("The `pandas` package is not installed. Please install pandas to use `upsert_from_dataframe()`")
258+
259+
if not isinstance(df, pd.DataFrame):
260+
raise ValueError(f"Only pandas dataframes are supported. Found: {type(df)}")
261+
262+
upserted_count = 0
263+
pbar = tqdm(total=len(df), disable=not show_progress)
264+
for i in range(0, len(df), batch_size):
265+
batch = df.iloc[i:i + batch_size].to_dict(orient="records")
266+
res = self.upsert(batch, namespace=namespace)
267+
upserted_count += res.upserted_count
268+
pbar.update(len(batch))
269+
270+
return UpsertResponse(upserted_count=upserted_count)
271+
239272
@validate_and_convert_errors
240273
def delete(self,
241274
ids: Optional[List[str]] = None,

0 commit comments

Comments
 (0)