Skip to content

Refactor Separation of embedding logic through the DocumentTransformer #1239

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

youngmoneee
Copy link
Contributor

This PR aims to achieve two objectives through the proposed changes:

  1. Separate the common embedding logic present in the VectorStore implementations using the DocumentTransformer interface. By isolating the logic that adds embedding data before inserting Documents into the VectorStore, maintainability and testability are improved.
  2. Improve batch processing performance by executing blocking operations asynchronously. Sequential and synchronous Embedding Request tasks are executed on a separate Scheduler using Reactor, leading to enhanced performance.”

List<WeaviateObject> weaviateObjects = documents.stream().map(this::toWeaviateObject).toList();

In the example code, the map operation synchronously performs the next task only after the previous task has been completed.

private WeaviateObject toWeaviateObject(Document document) {
if (document.getEmbedding() == null || document.getEmbedding().length == 0) {
float[] embedding = this.embeddingModel.embed(document);
document.setEmbedding(embedding);
}

default List<float[]> embed(List<String> texts) {
Assert.notNull(texts, "Texts must not be null");
return this.call(new EmbeddingRequest(texts, EmbeddingOptionsBuilder.builder().build()))
.getResults()
.stream()
.map(Embedding::getOutput)
.toList();
}

The call method synchronously requests an EmbeddingResponse object, creating a significant bottleneck due to the sequential execution of these blocking methods.

For comparison, when embedding and inserting the same 100 Document objects into a vector database, the original code took 106 seconds.

Screenshot 2024-08-19 at 12 57 38 AM

return Flux.fromIterable(documents).flatMap(document -> {
if (document.getEmbedding() == null || document.getEmbedding().length == 0)
return Mono
.zip(Mono.just(document), Mono.fromCallable(() -> embeddingModel.embed(document)), (doc, embed) -> {
doc.setEmbedding(embed);
return doc;
})
.subscribeOn(Schedulers.boundedElastic());
return Mono.just(document);
}).collectList().block();
}

To decrease this bottleneck, the code internally uses Reactor objects to execute these blocking methods asynchronously, minimizing the need for major code modifications.

Screenshot 2024-08-19 at 1 01 24 AM

And, after modifying the code to process the tasks on a separate asynchronous scheduler, the execution time was reduced to 8.6 seconds, representing a 92% decrease in processing time.


This PR aimed to optimize performance with minimal changes to the existing code.
However, in the long term, I think that expressing the ETL pipeline as a stream rather than batch processing through a List would be more appropriate.

I have created an issue( #1219 ) related to this topic. I would appreciate any insights or thoughts you might have.

It would be great if you could take a look at the issue when you have time.

Thanks 🧑🏼‍💻

@markpollack
Copy link
Member

review in light of 087de16

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants