-
Notifications
You must be signed in to change notification settings - Fork 2.6k
[Async] Multi Exec on cluster #3649
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
except Exception as e: | ||
error = e | ||
|
||
thread = threading.Thread(target=runner) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@petyaslavova @elena-kolevska ClusterPipeline
provides only synchronous API for execute_command
method, so because of this limitation the only way I found to run asynchronous code inside of synchronous method is to run it inside of short-living thread with it's own event loop. During testing I didn't find a significant performance impact of this (basic set/get test takes around 15ms on my machine).
I think the reason why API is synchronous is the fact that in case of Pipeline execute_command
is just about adding command into queue, but for Transactions we need much more that requires asynchronous code execution. Let me know WDYT about this
redis/asyncio/cluster.py
Outdated
|
||
def __init__(self, client: RedisCluster) -> None: | ||
self._client = client | ||
__slots__ = ("cluster_client",) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When you use slots, you should add all properties that the objects needs. Here you should add "_transaction" and "_execution_strategy"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
redis/asyncio/cluster.py
Outdated
self._pipe: ClusterPipeline = pipe | ||
self._command_queue: List["PipelineCommand"] = [] | ||
|
||
async def __aenter__(self) -> "ClusterPipeline": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that those methods are not needed. We only use the strategies internally within the pipeline, so there's no need to support context manager behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
return self | ||
|
||
async def __aenter__(self) -> "ClusterPipeline": | ||
return await self.initialize() | ||
|
||
async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None: | ||
self._command_stack = [] | ||
self._execution_strategy._command_queue = [] | ||
|
||
def __await__(self) -> Generator[Any, None, "ClusterPipeline"]: | ||
return self.initialize().__await__() | ||
|
||
def __enter__(self) -> "ClusterPipeline": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it is a meaningful use case to use the async ClusterPipeline as a synchronous context manager.
Can we remove those enter and exit methods, or it will be considered a breaking change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be a breaking, not sure how does it works if someone uses it
redis/asyncio/cluster.py
Outdated
return self | ||
|
||
async def __aenter__(self) -> "ClusterPipeline": | ||
return await self.initialize() | ||
|
||
async def __aexit__(self, exc_type: None, exc_value: None, traceback: None) -> None: | ||
self._command_stack = [] | ||
self._execution_strategy._command_queue = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.reset() would probably be a better choice here - it will clear the state of the transactional pipeline as well when the ClusterPipeline should be closed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
if self._client._initialize: | ||
await self._client.initialize() | ||
self._command_stack = [] | ||
if self.cluster_client._initialize: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the _execution_strategy's initialize method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
return self | ||
|
||
def __exit__(self, exc_type: None, exc_value: None, traceback: None) -> None: | ||
self._command_stack = [] | ||
self._execution_strategy._command_queue = [] | ||
|
||
def __bool__(self) -> bool: | ||
"Pipeline instances should always evaluate to True on Python 3+" | ||
return True | ||
|
||
def __len__(self) -> int: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can expose a len method in the ExecutionStrategy and then call len(_execution_strategy) - this way you won't need to access the private property in the ClusterPipeline class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
redis/asyncio/cluster.py
Outdated
await asyncio.sleep(0.25) | ||
else: | ||
# All other errors should be raised. | ||
raise e | ||
finally: | ||
self._command_stack = [] | ||
self._command_queue = [] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
self.reset()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces asynchronous support for cluster transactions, enhancing error handling during operations such as migrations and connection issues.
- Added comprehensive async tests for cluster transactions covering AskError, ExecAbortError, and connection errors.
- Updated behavior of the cluster pipeline tests by removing the deprecated transaction flag check.
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 2 comments.
File | Description |
---|---|
tests/test_asyncio/test_cluster_transaction.py | Added extensive async transaction tests with various error cases. |
tests/test_asyncio/test_cluster.py | Removed deprecated transaction check and adjusted the behavior. |
Pull Request check-list
Please make sure to review and check all of these items:
NOTE: these things are not required to open a PR and can be done
afterwards / while the PR is open.
Description of change
Async implementation of #3611