Skip to content

Commit ddef712

Browse files
committed
WIP
1 parent 3780924 commit ddef712

File tree

10 files changed

+308
-11
lines changed

10 files changed

+308
-11
lines changed

app.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from pinecone.grpc import PineconeGRPC, GRPCClientConfig
2+
3+
# Initialize a client. An API key must be passed, but the
4+
# value does not matter.
5+
pc = PineconeGRPC(api_key="test_api_key")
6+
7+
# Target the indexes. Use the host and port number along with disabling tls.
8+
index1 = pc.Index(host="localhost:5081", grpc_config=GRPCClientConfig(secure=False))
9+
index2 = pc.Index(host="localhost:5082", grpc_config=GRPCClientConfig(secure=False))
10+
11+
# You can now perform data plane operations with index1 and index2
12+
13+
dimension = 3
14+
15+
16+
def upserts():
17+
vectors = []
18+
for i in range(0, 100):
19+
vectors.append((f"vec{i}", [i] * dimension))
20+
21+
print(len(vectors))
22+
23+
index1.upsert(vectors=vectors, namespace="ns2")
24+
index2.upsert(vectors=vectors, namespace="ns2")
25+
26+
27+
upserts()
28+
print(index1.describe_index_stats())
29+
30+
print(index1.query(id="vec1", top_k=2, namespace="ns2", include_values=True))
31+
print(index1.query(id="vec1", top_k=10, namespace="", include_values=True))

app2.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from pinecone.grpc import PineconeGRPC
2+
from pinecone import Pinecone
3+
4+
pc = Pinecone(api_key="b1cb8ba4-b3d1-458f-9c32-8dd10813459a")
5+
pcg = PineconeGRPC(api_key="b1cb8ba4-b3d1-458f-9c32-8dd10813459a")
6+
7+
index = pc.Index("jen2")
8+
indexg = pcg.Index(name="jen2", use_asyncio=True)
9+
10+
# Rest call fails
11+
# print(index.upsert(vectors=[("vec1", [1, 2])]))
12+
13+
# GRPC succeeds
14+
print(indexg.upsert(vectors=[("vec1", [1, 2])]))
15+
16+
# print(index.fetch(ids=['vec1']))

app3.py

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import asyncio
2+
from pinecone.grpc import PineconeGRPC as Pinecone, Vector
3+
4+
import time
5+
import random
6+
import pandas as pd
7+
8+
9+
# Enable gRPC tracing and verbosity for more detailed logs
10+
# os.environ["GRPC_VERBOSITY"] = "DEBUG"
11+
# os.environ["GRPC_TRACE"] = "all"
12+
13+
14+
# Generate a large set of vectors (as an example)
15+
def generate_vectors(num_vectors, dimension):
16+
return [
17+
Vector(id=f"vector_{i}", values=[random.random()] * dimension) for i in range(num_vectors)
18+
]
19+
20+
21+
def load_vectors():
22+
df = pd.read_parquet("test_records_100k_dim1024.parquet")
23+
df["values"] = df["values"].apply(lambda x: [float(v) for v in x])
24+
25+
vectors = [Vector(id=row.id, values=list(row.values)) for row in df.itertuples()]
26+
return vectors
27+
28+
29+
async def main():
30+
# Create a semaphore to limit concurrency (e.g., max 5 concurrent requests)
31+
s = time.time()
32+
# all_vectors = load_vectors()
33+
all_vectors = generate_vectors(1000, 1024)
34+
f = time.time()
35+
print(f"Loaded {len(all_vectors)} vectors in {f-s:.2f} seconds")
36+
37+
start_time = time.time()
38+
39+
# Same setup as before...
40+
pc = Pinecone(api_key="b1cb8ba4-b3d1-458f-9c32-8dd10813459a")
41+
index = pc.Index(
42+
# index_host="jen2-dojoi3u.svc.aped-4627-b74a.pinecone.io"
43+
host="jen1024-dojoi3u.svc.apw5-4e34-81fa.pinecone.io",
44+
use_asyncio=True,
45+
)
46+
47+
batch_size = 150
48+
namespace = "asyncio-py7"
49+
res = await index.upsert(
50+
vectors=all_vectors, batch_size=batch_size, namespace=namespace, show_progress=True
51+
)
52+
53+
print(res)
54+
55+
end_time = time.time()
56+
57+
total_time = end_time - start_time
58+
print(f"All tasks completed in {total_time:.2f} seconds")
59+
print(f"Namespace: {namespace}")
60+
61+
62+
asyncio.run(main())

pinecone/grpc/base.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,32 @@
99
from pinecone import Config
1010
from .config import GRPCClientConfig
1111
from .grpc_runner import GrpcRunner
12+
from .utils import normalize_endpoint
1213

1314

1415
class GRPCIndexBase(ABC):
1516
"""
1617
Base class for grpc-based interaction with Pinecone indexes
1718
"""
1819

19-
_pool = None
20-
2120
def __init__(
2221
self,
2322
index_name: str,
2423
config: Config,
2524
channel: Optional[Channel] = None,
2625
grpc_config: Optional[GRPCClientConfig] = None,
2726
_endpoint_override: Optional[str] = None,
27+
use_asyncio: Optional[bool] = False,
2828
):
2929
self.config = config
3030
self.grpc_client_config = grpc_config or GRPCClientConfig()
31-
3231
self._endpoint_override = _endpoint_override
3332

3433
self.runner = GrpcRunner(
3534
index_name=index_name, config=config, grpc_config=self.grpc_client_config
3635
)
3736
self.channel_factory = GrpcChannelFactory(
38-
config=self.config, grpc_client_config=self.grpc_client_config, use_asyncio=False
37+
config=self.config, grpc_client_config=self.grpc_client_config, use_asyncio=use_asyncio
3938
)
4039
self._channel = channel or self._gen_channel()
4140
self.stub = self.stub_class(self._channel)
@@ -46,9 +45,7 @@ def stub_class(self):
4645
pass
4746

4847
def _endpoint(self):
49-
grpc_host = self.config.host.replace("https://", "")
50-
if ":" not in grpc_host:
51-
grpc_host = f"{grpc_host}:443"
48+
grpc_host = normalize_endpoint(self.config.host)
5249
return self._endpoint_override if self._endpoint_override else grpc_host
5350

5451
def _gen_channel(self):
@@ -83,3 +80,10 @@ def __enter__(self):
8380

8481
def __exit__(self, exc_type, exc_value, traceback):
8582
self.close()
83+
84+
async def __aenter__(self):
85+
return self
86+
87+
async def __aexit__(self, exc_type, exc_value, traceback):
88+
self.close()
89+
return True

pinecone/grpc/index_grpc.py

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@
3939
from .base import GRPCIndexBase
4040
from .future import PineconeGrpcFuture
4141

42+
from .config import GRPCClientConfig
43+
from pinecone.config import Config
44+
from grpc._channel import Channel
45+
4246

4347
__all__ = ["GRPCIndex", "GRPCVector", "GRPCQueryVector", "GRPCSparseValues"]
4448

@@ -53,6 +57,23 @@ class SparseVectorTypedDict(TypedDict):
5357
class GRPCIndex(GRPCIndexBase):
5458
"""A client for interacting with a Pinecone index via GRPC API."""
5559

60+
def __init__(
61+
self,
62+
index_name: str,
63+
config: Config,
64+
channel: Optional[Channel] = None,
65+
grpc_config: Optional[GRPCClientConfig] = None,
66+
_endpoint_override: Optional[str] = None,
67+
):
68+
super().__init__(
69+
index_name=index_name,
70+
config=config,
71+
channel=channel,
72+
grpc_config=grpc_config,
73+
_endpoint_override=_endpoint_override,
74+
use_asyncio=False,
75+
)
76+
5677
@property
5778
def stub_class(self):
5879
return VectorServiceStub

pinecone/grpc/index_grpc_asyncio.py

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
from typing import Optional, Union, List, Awaitable
2+
3+
from tqdm.asyncio import tqdm
4+
from asyncio import Semaphore
5+
6+
from .vector_factory_grpc import VectorFactoryGRPC
7+
8+
from pinecone.core.grpc.protos.vector_service_pb2 import (
9+
Vector as GRPCVector,
10+
QueryVector as GRPCQueryVector,
11+
UpsertRequest,
12+
UpsertResponse,
13+
SparseValues as GRPCSparseValues,
14+
)
15+
from .base import GRPCIndexBase
16+
from pinecone import Vector as NonGRPCVector
17+
from pinecone.core.grpc.protos.vector_service_pb2_grpc import VectorServiceStub
18+
from pinecone.utils import parse_non_empty_args
19+
20+
from .config import GRPCClientConfig
21+
from pinecone.config import Config
22+
from grpc._channel import Channel
23+
24+
__all__ = ["GRPCIndexAsyncio", "GRPCVector", "GRPCQueryVector", "GRPCSparseValues"]
25+
26+
27+
class GRPCIndexAsyncio(GRPCIndexBase):
28+
"""A client for interacting with a Pinecone index over GRPC with asyncio."""
29+
30+
def __init__(
31+
self,
32+
index_name: str,
33+
config: Config,
34+
channel: Optional[Channel] = None,
35+
grpc_config: Optional[GRPCClientConfig] = None,
36+
_endpoint_override: Optional[str] = None,
37+
):
38+
super().__init__(
39+
index_name=index_name,
40+
config=config,
41+
channel=channel,
42+
grpc_config=grpc_config,
43+
_endpoint_override=_endpoint_override,
44+
use_asyncio=True,
45+
)
46+
47+
@property
48+
def stub_class(self):
49+
return VectorServiceStub
50+
51+
async def upsert(
52+
self,
53+
vectors: Union[List[GRPCVector], List[NonGRPCVector], List[tuple], List[dict]],
54+
namespace: Optional[str] = None,
55+
batch_size: Optional[int] = None,
56+
show_progress: bool = True,
57+
**kwargs,
58+
) -> Awaitable[UpsertResponse]:
59+
timeout = kwargs.pop("timeout", None)
60+
vectors = list(map(VectorFactoryGRPC.build, vectors))
61+
62+
if batch_size is None:
63+
return await self._upsert_batch(vectors, namespace, timeout=timeout, **kwargs)
64+
65+
else:
66+
if not isinstance(batch_size, int) or batch_size <= 0:
67+
raise ValueError("batch_size must be a positive integer")
68+
69+
semaphore = Semaphore(25)
70+
vector_batches = [
71+
vectors[i : i + batch_size] for i in range(0, len(vectors), batch_size)
72+
]
73+
tasks = [
74+
self._upsert_batch(
75+
vectors=batch, namespace=namespace, timeout=100, semaphore=semaphore
76+
)
77+
for batch in vector_batches
78+
]
79+
80+
return await tqdm.gather(*tasks, disable=not show_progress, desc="Upserted batches")
81+
82+
async def _upsert_batch(
83+
self,
84+
vectors: List[GRPCVector],
85+
namespace: Optional[str],
86+
timeout: Optional[int] = None,
87+
semaphore: Optional[Semaphore] = None,
88+
**kwargs,
89+
) -> Awaitable[UpsertResponse]:
90+
args_dict = parse_non_empty_args([("namespace", namespace)])
91+
request = UpsertRequest(vectors=vectors, **args_dict)
92+
if semaphore is not None:
93+
async with semaphore:
94+
return await self.runner.run_asyncio(
95+
self.stub.Upsert, request, timeout=timeout, **kwargs
96+
)
97+
else:
98+
return await self.runner.run_asyncio(
99+
self.stub.Upsert, request, timeout=timeout, **kwargs
100+
)

pinecone/grpc/pinecone.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from ..control.pinecone import Pinecone
22
from ..config.config import ConfigBuilder
33
from .index_grpc import GRPCIndex
4+
from .index_grpc_asyncio import GRPCIndexAsyncio
45

56

67
class PineconeGRPC(Pinecone):
@@ -47,7 +48,7 @@ class PineconeGRPC(Pinecone):
4748
4849
"""
4950

50-
def Index(self, name: str = "", host: str = "", **kwargs):
51+
def Index(self, name: str = "", host: str = "", use_asyncio=False, **kwargs):
5152
"""
5253
Target an index for data operations.
5354
@@ -131,4 +132,8 @@ def Index(self, name: str = "", host: str = "", **kwargs):
131132
proxy_url=self.config.proxy_url,
132133
ssl_ca_certs=self.config.ssl_ca_certs,
133134
)
134-
return GRPCIndex(index_name=name, config=config, **kwargs)
135+
136+
if use_asyncio:
137+
return GRPCIndexAsyncio(index_name=name, config=config, **kwargs)
138+
else:
139+
return GRPCIndex(index_name=name, config=config, **kwargs)

pinecone/grpc/utils.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@ def _generate_request_id() -> str:
1919
return str(uuid.uuid4())
2020

2121

22+
def normalize_endpoint(endpoint: str) -> str:
23+
grpc_host = endpoint.replace("https://", "")
24+
if ":" not in grpc_host:
25+
grpc_host = f"{grpc_host}:443"
26+
27+
2228
def dict_to_proto_struct(d: Optional[dict]) -> "Struct":
2329
if not d:
2430
d = {}

0 commit comments

Comments
 (0)