Refactor Separation of embedding logic through the DocumentTransformer #1239
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR aims to achieve two objectives through the proposed changes:
spring-ai/vector-stores/spring-ai-weaviate-store/src/main/java/org/springframework/ai/vectorstore/WeaviateVectorStore.java
Line 327 in 10e1e13
In the example code, the map operation
synchronously
performs the next task only after the previous task has been completed.spring-ai/vector-stores/spring-ai-weaviate-store/src/main/java/org/springframework/ai/vectorstore/WeaviateVectorStore.java
Lines 363 to 368 in 10e1e13
spring-ai/spring-ai-core/src/main/java/org/springframework/ai/embedding/EmbeddingModel.java
Lines 55 to 62 in 10e1e13
The
call
methodsynchronously
requests an EmbeddingResponse object, creating a significant bottleneck due to the sequential execution of these blocking methods.For comparison, when embedding and inserting the same 100 Document objects into a vector database, the original code took 106 seconds.
spring-ai/spring-ai-core/src/main/java/org/springframework/ai/transformer/DocumentEmbeddingTransformer.java
Lines 49 to 59 in eb58cf4
To decrease this bottleneck, the code internally uses Reactor objects to execute these blocking methods asynchronously, minimizing the need for major code modifications.
And, after modifying the code to process the tasks on a separate asynchronous scheduler, the execution time was reduced to 8.6 seconds, representing a 92% decrease in processing time.
This PR aimed to optimize performance with minimal changes to the existing code.
However, in the long term, I think that expressing the ETL pipeline as a stream rather than batch processing through a List would be more appropriate.
I have created an issue( #1219 ) related to this topic. I would appreciate any insights or thoughts you might have.
It would be great if you could take a look at the issue when you have time.
Thanks 🧑🏼💻