Skip to content

Commit 07353b6

Browse files
feat: adding delete_all_docs to ChromaDB document store (#2399)
* feat: adding delete_all_docs to Qdrant document store * feat: add support for recreating index in delete_all_documents methods * fix: await _ensure_initialized_async in delete_all_documents to ensure proper async handling * Wrapped the code in try/except --------- Co-authored-by: David S. Batista <[email protected]>
1 parent eb39cb0 commit 07353b6

File tree

3 files changed

+174
-0
lines changed

3 files changed

+174
-0
lines changed

integrations/chroma/src/haystack_integrations/document_stores/chroma/document_store.py

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from chromadb.api.types import GetResult, QueryResult
1010
from haystack import default_from_dict, default_to_dict, logging
1111
from haystack.dataclasses import Document
12+
from haystack.document_stores.errors import DocumentStoreError
1213
from haystack.document_stores.types import DuplicatePolicy
1314

1415
from .filters import _convert_filters
@@ -113,6 +114,8 @@ def _ensure_initialized(self):
113114
# Local persistent storage
114115
client = chromadb.PersistentClient(path=self._persist_path)
115116

117+
self._client = client # store client for potential future use
118+
116119
self._metadata = self._metadata or {}
117120
if "hnsw:space" not in self._metadata:
118121
self._metadata["hnsw:space"] = self._distance_function
@@ -149,6 +152,8 @@ async def _ensure_initialized_async(self):
149152
port=self._port,
150153
)
151154

155+
self._async_client = client # store client for potential future use
156+
152157
self._metadata = self._metadata or {}
153158
if "hnsw:space" not in self._metadata:
154159
self._metadata["hnsw:space"] = self._distance_function
@@ -408,6 +413,86 @@ async def delete_documents_async(self, document_ids: List[str]) -> None:
408413

409414
await self._async_collection.delete(ids=document_ids)
410415

416+
def delete_all_documents(self, *, recreate_index: bool = False) -> None:
417+
"""
418+
Deletes all documents in the document store.
419+
420+
A fast way to clear all documents from the document store while preserving any collection settings and mappings.
421+
:param recreate_index: Whether to recreate the index after deleting all documents.
422+
"""
423+
self._ensure_initialized() # _ensure_initialized ensures _client is not None and a collection exists
424+
assert self._collection is not None
425+
426+
try:
427+
if recreate_index:
428+
# Store existing collection metadata and embedding function
429+
metadata = self._collection.metadata
430+
embedding_function = self._collection._embedding_function
431+
collection_name = self._collection_name
432+
433+
# Delete the collection
434+
self._client.delete_collection(name=collection_name)
435+
436+
# Recreate the collection with previous metadata
437+
self._collection = self._client.create_collection(
438+
name=collection_name,
439+
metadata=metadata,
440+
embedding_function=embedding_function,
441+
)
442+
443+
else:
444+
collection = self._collection.get()
445+
ids = collection.get("ids", [])
446+
self._collection.delete(ids=ids) # type: ignore
447+
logger.info(
448+
"Deleted all the {n_docs} documents from the collection '{name}'.",
449+
name=self._collection_name,
450+
n_docs=len(ids),
451+
)
452+
except Exception as e:
453+
msg = f"Failed to delete all documents from ChromaDB: {e!s}"
454+
raise DocumentStoreError(msg) from e
455+
456+
async def delete_all_documents_async(self, *, recreate_index: bool = False) -> None:
457+
"""
458+
Asynchronously deletes all documents in the document store.
459+
460+
A fast way to clear all documents from the document store while preserving any collection settings and mappings.
461+
:param recreate_index: Whether to recreate the index after deleting all documents.
462+
"""
463+
await self._ensure_initialized_async() # ensures _async_client is not None
464+
assert self._async_collection is not None
465+
466+
try:
467+
if recreate_index:
468+
# Store existing collection metadata and embedding function
469+
metadata = self._async_collection.metadata
470+
embedding_function = self._async_collection._embedding_function
471+
collection_name = self._collection_name
472+
473+
# Delete the collection
474+
await self._async_client.delete_collection(name=collection_name)
475+
476+
# Recreate the collection with previous metadata
477+
self._async_collection = await self._async_client.create_collection(
478+
name=collection_name,
479+
metadata=metadata,
480+
embedding_function=embedding_function,
481+
)
482+
else:
483+
collection = await self._async_collection.get()
484+
ids = collection.get("ids", [])
485+
await self._async_collection.delete(ids=ids) # type: ignore
486+
logger.info(
487+
"Deleted all the {n_docs} documents from the collection '{name}'.",
488+
name=self._collection_name,
489+
n_docs=len(ids),
490+
)
491+
492+
except Exception as e:
493+
msg = f"Failed to delete all documents from ChromaDB: {e!s}"
494+
raise DocumentStoreError(msg) from e
495+
411496
def search(
412497
self,
413498
queries: List[str],

integrations/chroma/tests/test_document_store.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import logging
66
import operator
7+
import time
78
import uuid
89
from typing import List
910
from unittest import mock
@@ -381,3 +382,41 @@ def test_search(self):
381382
# check that empty filters behave as no filters
382383
result_empty_filters = document_store.search(["Third"], filters={}, top_k=1)
383384
assert result == result_empty_filters
385+
386+
def test_delete_all_documents_index_recreation(self, document_store: ChromaDocumentStore):
387+
# write some documents
388+
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
389+
document_store.write_documents(docs)
390+
391+
# get the current document_store config
392+
config_before = document_store._collection.get(document_store._collection_name)
393+
394+
# delete all documents with recreating the index
395+
document_store.delete_all_documents(recreate_index=True)
396+
assert document_store.count_documents() == 0
397+
398+
# assure that with the same config
399+
config_after = document_store._collection.get(document_store._collection_name)
400+
401+
assert config_before == config_after
402+
403+
# ensure the collection still exists by writing documents again
404+
document_store.write_documents(docs)
405+
assert document_store.count_documents() == 2
406+
407+
def test_delete_all_documents_no_index_recreation(self, document_store: ChromaDocumentStore):
408+
docs = [Document(id="1", content="A first document"), Document(id="2", content="Second document")]
409+
document_store.write_documents(docs)
410+
assert document_store.count_documents() == 2
411+
412+
document_store.delete_all_documents()
413+
time.sleep(2) # need to wait for the deletion to be reflected in count_documents
414+
assert document_store.count_documents() == 0
415+
416+
new_doc = Document(id="3", content="New document after delete all")
417+
document_store.write_documents([new_doc])
418+
assert document_store.count_documents() == 1
419+
420+
results = document_store.filter_documents()
421+
assert len(results) == 1
422+
assert results[0].content == "New document after delete all"

integrations/chroma/tests/test_document_store_async.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,3 +120,53 @@ async def test_search_async(self):
120120
# check that empty filters behave as no filters
121121
result_empty_filters = document_store.search(["Third"], filters={}, top_k=1)
122122
assert result == result_empty_filters
123+
124+
@pytest.mark.asyncio
125+
async def test_delete_all_documents_index_recreation(self, document_store: ChromaDocumentStore):
126+
# write some documents
127+
docs = [
128+
Document(id="1", content="First document", meta={"category": "test"}),
129+
Document(id="2", content="Second document", meta={"category": "test"}),
130+
Document(id="3", content="Third document", meta={"category": "other"}),
131+
]
132+
await document_store.write_documents_async(docs)
133+
134+
# get the current document_store config
135+
config_before = await document_store._async_collection.get(document_store._collection_name)
136+
137+
# delete all documents with recreating the index
138+
await document_store.delete_all_documents_async(recreate_index=True)
139+
assert await document_store.count_documents_async() == 0
140+
141+
# assure that with the same config
142+
config_after = await document_store._async_collection.get(document_store._collection_name)
143+
144+
assert config_before == config_after
145+
146+
# ensure the collection still exists by writing documents again
147+
await document_store.write_documents_async(docs)
148+
assert await document_store.count_documents_async() == 3
149+
150+
@pytest.mark.asyncio
151+
async def test_delete_all_documents_async(self, document_store):
152+
docs = [
153+
Document(id="1", content="First document", meta={"category": "test"}),
154+
Document(id="2", content="Second document", meta={"category": "test"}),
155+
Document(id="3", content="Third document", meta={"category": "other"}),
156+
]
157+
await document_store.write_documents_async(docs)
158+
assert await document_store.count_documents_async() == 3
159+
160+
# delete all documents
161+
await document_store.delete_all_documents_async()
162+
assert await document_store.count_documents_async() == 0
163+
164+
# verify index still exists and can accept new documents and retrieve
165+
new_doc = Document(id="4", content="New document after delete all")
166+
await document_store.write_documents_async([new_doc])
167+
assert await document_store.count_documents_async() == 1
168+
169+
results = await document_store.filter_documents_async()
170+
assert len(results) == 1
171+
assert results[0].id == "4"
172+
assert results[0].content == "New document after delete all"

0 commit comments

Comments
 (0)