2
2
import warnings
3
3
import logging
4
4
import json
5
- from typing import Union , List , Optional , Dict , Any , Literal , TYPE_CHECKING
5
+ from typing import Union , List , Optional , Dict , Any , Literal , Iterator , TYPE_CHECKING
6
6
7
7
from pinecone .config import ConfigBuilder
8
8
19
19
from .dataclasses import Vector , SparseValues , FetchResponse , SearchQuery , SearchRerank
20
20
from .interfaces import IndexInterface
21
21
from .request_factory import IndexRequestFactory
22
- from .features .bulk_import import ImportFeatureMixin
23
22
from .types import (
24
23
SparseVectorTypedDict ,
25
24
VectorTypedDict ,
47
46
48
47
if TYPE_CHECKING :
49
48
from pinecone .config import Config , OpenApiConfiguration
49
+ from .resources .sync .bulk_import import BulkImportResource
50
+
51
+ from pinecone .core .openapi .db_data .models import (
52
+ StartImportResponse ,
53
+ ListImportsResponse ,
54
+ ImportModel ,
55
+ )
56
+
57
+ from .resources .sync .bulk_import import ImportErrorMode
50
58
51
59
logger = logging .getLogger (__name__ )
52
60
""" @private """
@@ -58,12 +66,15 @@ def parse_query_response(response: QueryResponse):
58
66
return response
59
67
60
68
61
- class Index (PluginAware , IndexInterface , ImportFeatureMixin ):
69
+ class Index (PluginAware , IndexInterface ):
62
70
"""
63
71
A client for interacting with a Pinecone index via REST API.
64
72
For improved performance, use the Pinecone GRPC index client.
65
73
"""
66
74
75
+ _bulk_import_resource : Optional ["BulkImportResource" ]
76
+ """ @private """
77
+
67
78
def __init__ (
68
79
self ,
69
80
api_key : str ,
@@ -101,6 +112,9 @@ def __init__(
101
112
102
113
self ._api_client = self ._vector_api .api_client
103
114
115
+ self ._bulk_import_resource = None
116
+ """ @private """
117
+
104
118
# Pass the same api_client to the ImportFeatureMixin
105
119
super ().__init__ (api_client = self ._api_client )
106
120
@@ -129,6 +143,15 @@ def pool_threads(self) -> int:
129
143
)
130
144
return self ._pool_threads
131
145
146
+ @property
147
+ def bulk_import (self ) -> "BulkImportResource" :
148
+ """@private"""
149
+ if self ._bulk_import_resource is None :
150
+ from .resources .sync .bulk_import import BulkImportResource
151
+
152
+ self ._bulk_import_resource = BulkImportResource (api_client = self ._api_client )
153
+ return self ._bulk_import_resource
154
+
132
155
def _openapi_kwargs (self , kwargs : Dict [str , Any ]) -> Dict [str , Any ]:
133
156
return filter_dict (kwargs , OPENAPI_ENDPOINT_PARAMS )
134
157
@@ -457,3 +480,124 @@ def list(self, **kwargs):
457
480
kwargs .update ({"pagination_token" : results .pagination .next })
458
481
else :
459
482
done = True
483
+
484
+ @validate_and_convert_errors
485
+ def start_import (
486
+ self ,
487
+ uri : str ,
488
+ integration_id : Optional [str ] = None ,
489
+ error_mode : Optional [
490
+ Union ["ImportErrorMode" , Literal ["CONTINUE" , "ABORT" ], str ]
491
+ ] = "CONTINUE" ,
492
+ ) -> "StartImportResponse" :
493
+ """
494
+ Args:
495
+ uri (str): The URI of the data to import. The URI must start with the scheme of a supported storage provider.
496
+ integration_id (Optional[str], optional): If your bucket requires authentication to access, you need to pass the id of your storage integration using this property. Defaults to None.
497
+ error_mode: Defaults to "CONTINUE". If set to "CONTINUE", the import operation will continue even if some
498
+ records fail to import. Pass "ABORT" to stop the import operation if any records fail to import.
499
+
500
+ Returns:
501
+ `StartImportResponse`: Contains the id of the import operation.
502
+
503
+ Import data from a storage provider into an index. The uri must start with the scheme of a supported
504
+ storage provider. For buckets that are not publicly readable, you will also need to separately configure
505
+ a storage integration and pass the integration id.
506
+
507
+ Examples:
508
+ >>> from pinecone import Pinecone
509
+ >>> index = Pinecone().Index('my-index')
510
+ >>> index.start_import(uri="s3://bucket-name/path/to/data.parquet")
511
+ { id: "1" }
512
+ """
513
+ return self .bulk_import .start (uri = uri , integration_id = integration_id , error_mode = error_mode )
514
+
515
+ @validate_and_convert_errors
516
+ def list_imports (self , ** kwargs ) -> Iterator ["ImportModel" ]:
517
+ """
518
+ Args:
519
+ limit (Optional[int]): The maximum number of operations to fetch in each network call. If unspecified, the server will use a default value. [optional]
520
+ pagination_token (Optional[str]): When there are multiple pages of results, a pagination token is returned in the response. The token can be used
521
+ to fetch the next page of results. [optional]
522
+
523
+ Returns:
524
+ Returns a generator that yields each import operation. It automatically handles pagination tokens on your behalf so you can
525
+ easily iterate over all results. The `list_imports` method accepts all of the same arguments as list_imports_paginated
526
+
527
+ ```python
528
+ for op in index.list_imports():
529
+ print(op)
530
+ ```
531
+
532
+ You can convert the generator into a list by wrapping the generator in a call to the built-in `list` function:
533
+
534
+ ```python
535
+ operations = list(index.list_imports())
536
+ ```
537
+
538
+ You should be cautious with this approach because it will fetch all operations at once, which could be a large number
539
+ of network calls and a lot of memory to hold the results.
540
+ """
541
+ for i in self .bulk_import .list (** kwargs ):
542
+ yield i
543
+
544
+ @validate_and_convert_errors
545
+ def list_imports_paginated (
546
+ self , limit : Optional [int ] = None , pagination_token : Optional [str ] = None , ** kwargs
547
+ ) -> "ListImportsResponse" :
548
+ """
549
+ Args:
550
+ limit (Optional[int]): The maximum number of ids to return. If unspecified, the server will use a default value. [optional]
551
+ pagination_token (Optional[str]): A token needed to fetch the next page of results. This token is returned
552
+ in the response if additional results are available. [optional]
553
+
554
+ Returns: ListImportsResponse object which contains the list of operations as ImportModel objects, pagination information,
555
+ and usage showing the number of read_units consumed.
556
+
557
+ The list_imports_paginated operation returns information about import operations.
558
+ It returns operations in a paginated form, with a pagination token to fetch the next page of results.
559
+
560
+ Consider using the `list_imports` method to avoid having to handle pagination tokens manually.
561
+
562
+ Examples:
563
+ >>> results = index.list_imports_paginated(limit=5)
564
+ >>> results.pagination.next
565
+ eyJza2lwX3Bhc3QiOiI5OTMiLCJwcmVmaXgiOiI5OSJ9
566
+ >>> results.data[0]
567
+ {
568
+ "id": "6",
569
+ "uri": "s3://dev-bulk-import-datasets-pub/10-records-dim-10/",
570
+ "status": "Completed",
571
+ "percent_complete": 100.0,
572
+ "records_imported": 10,
573
+ "created_at": "2024-09-06T14:52:02.567776+00:00",
574
+ "finished_at": "2024-09-06T14:52:28.130717+00:00"
575
+ }
576
+ >>> next_results = index.list_imports_paginated(limit=5, pagination_token=results.pagination.next)
577
+ """
578
+ return self .bulk_import .list_paginated (
579
+ limit = limit , pagination_token = pagination_token , ** kwargs
580
+ )
581
+
582
+ @validate_and_convert_errors
583
+ def describe_import (self , id : str ) -> "ImportModel" :
584
+ """
585
+ Args:
586
+ id (str): The id of the import operation. This value is returned when
587
+ starting an import, and can be looked up using list_imports.
588
+
589
+ Returns:
590
+ `ImportModel`: An object containing operation id, status, and other details.
591
+
592
+ describe_import is used to get detailed information about a specific import operation.
593
+ """
594
+ return self .bulk_import .describe (id = id )
595
+
596
+ @validate_and_convert_errors
597
+ def cancel_import (self , id : str ):
598
+ """Cancel an import operation.
599
+
600
+ Args:
601
+ id (str): The id of the import operation to cancel.
602
+ """
603
+ return self .bulk_import .cancel (id = id )
0 commit comments