Skip to content

Commit fc51ba2

Browse files
authored
Merge pull request #119 from pinecone-io/batch-upsert
Batch upsert
2 parents 2588016 + f82f5e8 commit fc51ba2

File tree

5 files changed

+339
-11
lines changed

5 files changed

+339
-11
lines changed

pinecone/core/grpc/index_grpc.py

Lines changed: 42 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@
1010
import grpc
1111
from google.protobuf import json_format
1212
from grpc._channel import _InactiveRpcError, _MultiThreadedRendezvous
13+
from tqdm import tqdm
14+
1315
from pinecone import FetchResponse, QueryResponse, ScoredVector, SingleQueryResults, DescribeIndexStatsResponse
1416
from pinecone.config import Config
1517
from pinecone.core.client.model.namespace_summary import NamespaceSummary
@@ -256,6 +258,8 @@ def upsert(self,
256258
vectors: Union[List[GRPCVector], List[Tuple]],
257259
async_req: bool = False,
258260
namespace: Optional[str] = None,
261+
batch_size: Optional[int] = None,
262+
show_progress: bool = True,
259263
**kwargs) -> Union[UpsertResponse, PineconeGrpcFuture]:
260264
"""
261265
The upsert operation writes vectors into a namespace.
@@ -282,13 +286,21 @@ def upsert(self,
282286
283287
Note: the dimension of each vector must match the dimension of the index.
284288
async_req (bool): If True, the upsert operation will be performed asynchronously.
289+
Cannot be used with batch_size.
285290
Defaults to False. See: https://docs.pinecone.io/docs/performance-tuning [optional]
286291
namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
292+
batch_size (int): The number of vectors to upsert in each batch.
293+
Cannot be used with async_req=Ture.
294+
If not specified, all vectors will be upserted in a single batch. [optional]
295+
show_progress (bool): Whether to show a progress bar using tqdm.
296+
Applied only if batch_size is provided. Default is True.
287297
288298
Returns: UpsertResponse, contains the number of vectors upserted
289299
"""
290-
291-
args_dict = self._parse_non_empty_args([('namespace', namespace)])
300+
if async_req and batch_size is not None:
301+
raise ValueError('async_req is not supported when batch_size is provided.'
302+
'To upsert in parallel, please follow: '
303+
'https://docs.pinecone.io/docs/performance-tuning')
292304

293305
def _vector_transform(item):
294306
if isinstance(item, GRPCVector):
@@ -300,12 +312,37 @@ def _vector_transform(item):
300312

301313
timeout = kwargs.pop('timeout', None)
302314

303-
request = UpsertRequest(vectors=list(map(_vector_transform, vectors)), **args_dict, **kwargs)
315+
vectors = list(map(_vector_transform, vectors))
304316
if async_req:
317+
args_dict = self._parse_non_empty_args([('namespace', namespace)])
318+
request = UpsertRequest(vectors=vectors, **args_dict, **kwargs)
305319
future = self._wrap_grpc_call(self.stub.Upsert.future, request, timeout=timeout)
306320
return PineconeGrpcFuture(future)
307-
else:
308-
return self._wrap_grpc_call(self.stub.Upsert, request, timeout=timeout)
321+
322+
if batch_size is None:
323+
return self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs)
324+
325+
if not isinstance(batch_size, int) or batch_size <= 0:
326+
raise ValueError('batch_size must be a positive integer')
327+
328+
pbar = tqdm(total=len(vectors), disable=not show_progress, desc='Upserted vectors')
329+
total_upserted = 0
330+
for i in range(0, len(vectors), batch_size):
331+
batch_result = self._upsert_batch(vectors[i:i + batch_size], namespace, timeout=timeout, **kwargs)
332+
pbar.update(batch_result.upserted_count)
333+
# we can't use here pbar.n for the case show_progress=False
334+
total_upserted += batch_result.upserted_count
335+
336+
return UpsertResponse(upserted_count=total_upserted)
337+
338+
def _upsert_batch(self,
339+
vectors: List[GRPCVector],
340+
namespace: Optional[str],
341+
timeout: Optional[float],
342+
**kwargs) -> UpsertResponse:
343+
args_dict = self._parse_non_empty_args([('namespace', namespace)])
344+
request = UpsertRequest(vectors=vectors, **args_dict)
345+
return self._wrap_grpc_call(self.stub.Upsert, request, timeout=timeout, **kwargs)
309346

310347
def delete(self,
311348
ids: Optional[List[str]] = None,

pinecone/index.py

Lines changed: 36 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
#
22
# Copyright (c) 2020-2021 Pinecone Systems Inc. All right reserved.
33
#
4-
4+
from tqdm import tqdm
55
from collections.abc import Iterable
66
from typing import Union, List, Tuple, Optional, Dict, Any
77

@@ -64,6 +64,8 @@ def __init__(self, index_name: str, pool_threads=1):
6464
def upsert(self,
6565
vectors: Union[List[Vector], List[Tuple]],
6666
namespace: Optional[str] = None,
67+
batch_size: Optional[int] = None,
68+
show_progress: bool = True,
6769
**kwargs) -> UpsertResponse:
6870
"""
6971
The upsert operation writes vectors into a namespace.
@@ -95,16 +97,47 @@ def upsert(self,
9597
Note: the dimension of each vector must match the dimension of the index.
9698
9799
namespace (str): The namespace to write to. If not specified, the default namespace is used. [optional]
98-
100+
batch_size (int): The number of vectors to upsert in each batch.
101+
If not specified, all vectors will be upserted in a single batch. [optional]
102+
show_progress (bool): Whether to show a progress bar using tqdm.
103+
Applied only if batch_size is provided. Default is True.
99104
Keyword Args:
100105
Supports OpenAPI client keyword arguments. See pinecone.core.client.models.UpsertRequest for more details.
101106
102107
Returns: UpsertResponse, includes the number of vectors upserted.
103108
"""
104109
_check_type = kwargs.pop('_check_type', False)
110+
111+
if kwargs.get('async_req', False) and batch_size is not None:
112+
raise ValueError('async_req is not supported when batch_size is provided.'
113+
'To upsert in parallel, please follow: '
114+
'https://docs.pinecone.io/docs/insert-data#sending-upserts-in-parallel')
115+
116+
if batch_size is None:
117+
return self._upsert_batch(vectors, namespace, _check_type, **kwargs)
118+
119+
if not isinstance(batch_size, int) or batch_size <= 0:
120+
raise ValueError('batch_size must be a positive integer')
121+
122+
pbar = tqdm(total=len(vectors), disable=not show_progress, desc='Upserted vectors')
123+
total_upserted = 0
124+
for i in range(0, len(vectors), batch_size):
125+
batch_result = self._upsert_batch(vectors[i:i + batch_size], namespace, _check_type, **kwargs)
126+
pbar.update(batch_result.upserted_count)
127+
# we can't use here pbar.n for the case show_progress=False
128+
total_upserted += batch_result.upserted_count
129+
130+
return UpsertResponse(upserted_count=total_upserted)
131+
132+
def _upsert_batch(self,
133+
vectors: List[Vector],
134+
namespace: Optional[str],
135+
_check_type: bool,
136+
**kwargs) -> UpsertResponse:
137+
105138
args_dict = self._parse_non_empty_args([('namespace', namespace)])
106139

107-
def _vector_transform(item):
140+
def _vector_transform(item: Union[Vector, Tuple]):
108141
if isinstance(item, Vector):
109142
return item
110143
if isinstance(item, tuple):

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,4 +5,5 @@ typing-extensions>=3.7.4
55
dnspython>=2.0.0
66
# openapi generated client:
77
python_dateutil >= 2.5.3
8-
urllib3 >= 1.21.1
8+
urllib3 >= 1.21.1
9+
tqdm >= 4.64.1

tests/unit/test_grpc_index.py

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
import pytest
2+
13
import pinecone
24
from core.utils import dict_to_proto_struct
35
from pinecone import DescribeIndexStatsRequest
46
from pinecone.core.grpc.protos.vector_service_pb2 import Vector, DescribeIndexStatsRequest, UpdateRequest, \
5-
UpsertRequest, FetchRequest, QueryRequest, DeleteRequest, QueryVector
7+
UpsertRequest, FetchRequest, QueryRequest, DeleteRequest, QueryVector, UpsertResponse
68

79

810
class TestGrpcIndex:
@@ -75,6 +77,128 @@ def test_upsert_async_upsertInputVectorsAsync(self, mocker):
7577
namespace='ns'),
7678
timeout=None)
7779

80+
def test_upsert_vectorListIsMultiplyOfBatchSize_vectorsUpsertedInBatches(self, mocker):
81+
mocker.patch.object(self.index, '_wrap_grpc_call', autospec=True,
82+
side_effect=lambda stub, upsert_request, timeout: UpsertResponse(
83+
upserted_count=len(upsert_request.vectors)))
84+
85+
result = self.index.upsert([Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1)),
86+
Vector(id='vec2', values=self.vals2, metadata=dict_to_proto_struct(self.md2))],
87+
namespace='ns',
88+
batch_size=1,
89+
show_progress=False)
90+
self.index._wrap_grpc_call.assert_any_call(
91+
self.index.stub.Upsert,
92+
UpsertRequest(
93+
vectors=[
94+
Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1))],
95+
namespace='ns'),
96+
timeout=None)
97+
98+
self.index._wrap_grpc_call.assert_any_call(
99+
self.index.stub.Upsert,
100+
UpsertRequest(
101+
vectors=[Vector(id='vec2', values=self.vals2, metadata=dict_to_proto_struct(self.md2))],
102+
namespace='ns'),
103+
timeout=None)
104+
105+
assert result.upserted_count == 2
106+
107+
def test_upsert_vectorListNotMultiplyOfBatchSize_vectorsUpsertedInBatches(self, mocker):
108+
mocker.patch.object(self.index, '_wrap_grpc_call', autospec=True,
109+
side_effect=lambda stub, upsert_request, timeout: UpsertResponse(
110+
upserted_count=len(upsert_request.vectors)))
111+
112+
result = self.index.upsert([Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1)),
113+
Vector(id='vec2', values=self.vals2, metadata=dict_to_proto_struct(self.md2)),
114+
Vector(id='vec3', values=self.vals1, metadata=dict_to_proto_struct(self.md1))],
115+
namespace='ns',
116+
batch_size=2)
117+
self.index._wrap_grpc_call.assert_any_call(
118+
self.index.stub.Upsert,
119+
UpsertRequest(
120+
vectors=[
121+
Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1)),
122+
Vector(id='vec2', values=self.vals2, metadata=dict_to_proto_struct(self.md2))],
123+
namespace='ns'),
124+
timeout=None)
125+
126+
self.index._wrap_grpc_call.assert_any_call(
127+
self.index.stub.Upsert,
128+
UpsertRequest(
129+
vectors=[Vector(id='vec3', values=self.vals1, metadata=dict_to_proto_struct(self.md1))],
130+
namespace='ns'),
131+
timeout=None)
132+
133+
assert result.upserted_count == 3
134+
135+
def test_upsert_vectorListSmallerThanBatchSize_vectorsUpsertedInBatches(self, mocker):
136+
mocker.patch.object(self.index, '_wrap_grpc_call', autospec=True,
137+
side_effect=lambda stub, upsert_request, timeout: UpsertResponse(
138+
upserted_count=len(upsert_request.vectors)))
139+
140+
result = self.index.upsert([Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1)),
141+
Vector(id='vec2', values=self.vals2, metadata=dict_to_proto_struct(self.md2))],
142+
namespace='ns',
143+
batch_size=5)
144+
145+
self.index._wrap_grpc_call.assert_called_once_with(
146+
self.index.stub.Upsert,
147+
UpsertRequest(
148+
vectors=[
149+
Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1)),
150+
Vector(id='vec2', values=self.vals2, metadata=dict_to_proto_struct(self.md2))],
151+
namespace='ns'),
152+
timeout=None)
153+
154+
assert result.upserted_count == 2
155+
156+
def test_upsert_tuplesList_vectorsUpsertedInBatches(self, mocker):
157+
mocker.patch.object(self.index, '_wrap_grpc_call', autospec=True,
158+
side_effect=lambda stub, upsert_request, timeout: UpsertResponse(
159+
upserted_count=len(upsert_request.vectors)))
160+
161+
result = self.index.upsert([('vec1', self.vals1, self.md1),
162+
('vec2', self.vals2, self.md2),
163+
('vec3', self.vals1, self.md1)],
164+
namespace='ns',
165+
batch_size=2)
166+
self.index._wrap_grpc_call.assert_any_call(
167+
self.index.stub.Upsert,
168+
UpsertRequest(
169+
vectors=[
170+
Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1)),
171+
Vector(id='vec2', values=self.vals2, metadata=dict_to_proto_struct(self.md2))],
172+
namespace='ns'),
173+
timeout=None)
174+
175+
self.index._wrap_grpc_call.assert_any_call(
176+
self.index.stub.Upsert,
177+
UpsertRequest(
178+
vectors=[Vector(id='vec3', values=self.vals1, metadata=dict_to_proto_struct(self.md1))],
179+
namespace='ns'),
180+
timeout=None)
181+
182+
assert result.upserted_count == 3
183+
184+
def test_upsert_batchSizeIsNotPositive_errorIsRaised(self):
185+
with pytest.raises(ValueError, match='batch_size must be a positive integer'):
186+
self.index.upsert([Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1))],
187+
namespace='ns',
188+
batch_size=0)
189+
190+
with pytest.raises(ValueError, match='batch_size must be a positive integer'):
191+
self.index.upsert([Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1))],
192+
namespace='ns',
193+
batch_size=-1)
194+
195+
def test_upsert_useBatchSizeAndAsyncReq_valueErrorRaised(self):
196+
with pytest.raises(ValueError, match='async_req is not supported when batch_size is provided.'):
197+
self.index.upsert([Vector(id='vec1', values=self.vals1, metadata=dict_to_proto_struct(self.md1))],
198+
namespace='ns',
199+
batch_size=2,
200+
async_req=True)
201+
78202
# endregion
79203

80204
# region: query tests

0 commit comments

Comments
 (0)