Skip to content

Commit 555b4ae

Browse files
committed
experiments with qdrant
1 parent 5b2fb6a commit 555b4ae

File tree

7 files changed

+41
-24
lines changed

7 files changed

+41
-24
lines changed

benchmark/dataset.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import shutil
23
from dataclasses import dataclass
34
from typing import Optional
45
import tarfile
@@ -49,7 +50,8 @@ def download(self):
4950
else:
5051
print(f"Moving: {tmp_path} -> {target_path}")
5152
(DATASETS_DIR / self.config.path).parent.mkdir(exist_ok=True)
52-
os.rename(tmp_path, target_path)
53+
shutil.copy2(tmp_path, target_path)
54+
os.remove(tmp_path)
5355

5456
def get_reader(self) -> BaseReader:
5557
reader_class = READER_TYPE[self.config.type]

dataset_reader/ann_h5_reader.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,12 @@ def __init__(self, path):
1212

1313
def read_queries(self) -> Iterator[Query]:
1414
data = h5py.File(self.path)
15-
for vector, expected_result in zip(data['test'], data['neighbors']):
15+
for vector, expected_result, expected_scores in zip(data['test'], data['neighbors'], data['distances']):
1616
yield Query(
1717
vector=vector.tolist(),
1818
meta_conditions=None,
1919
expected_result=expected_result.tolist(),
20+
expected_scores=expected_scores.tolist()
2021
)
2122

2223
def read_data(self) -> Iterator[Record]:

dataset_reader/base_reader.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ class Query:
1414
vector: List[float]
1515
meta_conditions: Optional[dict]
1616
expected_result: Optional[List[int]]
17+
expected_scores: Optional[List[float]] = None
1718

1819

1920
class BaseReader:

engine/base_client/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,15 +47,18 @@ def save_upload_results(self, dataset_name: str, results: dict):
4747
)
4848

4949
def run_experiment(self, dataset: Dataset):
50+
print("Experiment stage: Configure")
5051
self.configurator.configure(
5152
distance=dataset.config.distance,
5253
vector_size=dataset.config.vector_size,
5354
)
5455

5556
reader = dataset.get_reader()
57+
print("Experiment stage: Upload")
5658
upload_stats = self.uploader.upload(reader.read_data())
5759
self.save_upload_results(dataset.config.name, upload_stats)
5860

61+
print("Experiment stage: Search")
5962
for search_id, searcher in enumerate(self.searchers):
6063
search_params = {**searcher.search_params}
6164
search_stats = searcher.search_all(reader.read_queries())

engine/base_client/upload.py

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22
from multiprocessing import get_context
33
from typing import List, Optional, Iterable
44

5+
import tqdm
6+
57
from dataset_reader.base_reader import Record
68
from engine.base_client.utils import iter_batches
79

@@ -30,7 +32,7 @@ def upload(
3032

3133
if parallel == 1:
3234
self.init_client(self.host, self.connection_params, self.upload_params)
33-
for ids, vectors, metadata in iter_batches(records, batch_size):
35+
for ids, vectors, metadata in iter_batches(tqdm.tqdm(records), batch_size):
3436
latencies.append(self._upload_batch(ids, vectors, metadata))
3537

3638
else:
@@ -41,15 +43,22 @@ def upload(
4143
initargs=(self.host, self.connection_params, self.upload_params),
4244
) as pool:
4345
latencies = pool.imap(
44-
self.__class__._upload_batch, iter_batches(records, batch_size)
46+
self.__class__._upload_batch, iter_batches(tqdm.tqdm(records), batch_size)
4547
)
4648

49+
upload_time = time.perf_counter() - start
50+
51+
print("Upload time: {}".format(upload_time))
52+
4753
post_upload_stats = self.post_upload()
4854

55+
total_time = time.perf_counter() - start
56+
4957
return {
50-
"latencies": latencies,
5158
"post_upload": post_upload_stats,
52-
"total_time": time.perf_counter() - start
59+
"upload_time": upload_time,
60+
"total_time": total_time,
61+
"latencies": latencies,
5362
}
5463

5564
@classmethod

engine/clients/qdrant/upload.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1+
import time
12
from typing import Optional, List
23

34
from qdrant_client import QdrantClient
4-
from qdrant_client.http.models import Batch
5+
from qdrant_client.http.models import Batch, CollectionStatus
56

67
from engine.base_client.upload import BaseUploader
78
from engine.clients.qdrant.config import QDRANT_COLLECTION_NAME
@@ -25,5 +26,21 @@ def upload_batch(
2526
):
2627
cls.client.upsert(
2728
collection_name=QDRANT_COLLECTION_NAME,
28-
points=Batch(ids=ids, vectors=vectors, payloads=metadata),
29+
points=Batch(ids=ids, vectors=vectors, payloads=[payload or {} for payload in metadata]),
2930
)
31+
32+
@classmethod
33+
def post_upload(cls):
34+
cls.wait_collection_green()
35+
return {}
36+
37+
@classmethod
38+
def wait_collection_green(cls):
39+
wait_time = 1.0
40+
total = 0
41+
collection_info = cls.client.get_collection(QDRANT_COLLECTION_NAME)
42+
while collection_info.status != CollectionStatus.GREEN:
43+
time.sleep(wait_time)
44+
total += wait_time
45+
collection_info = cls.client.get_collection(QDRANT_COLLECTION_NAME)
46+
return total

experiments/configurations/qdrant-single-node.json

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,5 @@
4242
{"parallel": 4},
4343
{"parallel": 100}
4444
]
45-
},
46-
{
47-
"name": "qdrant-distributed-3-node",
48-
"engine": "qdrant-distributed",
49-
"connection_params": {},
50-
"collection_params": {
51-
"hnsw_config": {
52-
"m": 64
53-
}
54-
},
55-
"search_params": [
56-
{"parallel": 1},
57-
{"parallel": 2},
58-
{"parallel": 4},
59-
{"parallel": 100}
60-
]
6145
}
6246
]

0 commit comments

Comments
 (0)