From 7a67b96bcd15fa640c06a07dbc10a190c9266dcc Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Mon, 18 Nov 2024 16:51:23 -0800 Subject: [PATCH 1/4] Allow multi-node commands in async pipeline --- redis/asyncio/cluster.py | 55 +++++++++++++++++++++++++++++----------- 1 file changed, 40 insertions(+), 15 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 4e82e5448f..1f548e3d56 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1070,12 +1070,13 @@ async def execute_pipeline(self, commands: List["PipelineCommand"]) -> bool: ret = False for cmd in commands: try: - cmd.result = await self.parse_response( + result = await self.parse_response( connection, cmd.args[0], **cmd.kwargs ) except Exception as e: - cmd.result = e + result = e ret = True + cmd.set_node_result(self.name, result) # Release connection self._free.append(connection) @@ -1530,12 +1531,11 @@ async def _execute( raise RedisClusterException( f"No targets were found to execute {cmd.args} command on" ) - if len(target_nodes) > 1: - raise RedisClusterException(f"Too many targets for command {cmd.args}") - node = target_nodes[0] - if node.name not in nodes: - nodes[node.name] = (node, []) - nodes[node.name][1].append(cmd) + cmd.target_nodes = target_nodes + for node in target_nodes: + if node.name not in nodes: + nodes[node.name] = (node, []) + nodes[node.name][1].append(cmd) errors = await asyncio.gather( *( @@ -1550,20 +1550,27 @@ async def _execute( for cmd in todo: if isinstance(cmd.result, (TryAgainError, MovedError, AskError)): try: - cmd.result = await client.execute_command( + result = await client.execute_command( *cmd.args, **cmd.kwargs ) except Exception as e: - cmd.result = e + result = e + + if isinstance(result, dict): + cmd.result = result + else: + cmd.set_node_result(cmd.target_nodes[0].name, result) if raise_on_error: for cmd in todo: - result = cmd.result - if isinstance(result, Exception): + name_exc = cmd.get_first_exception() + if name_exc: + name, exc = name_exc command = " ".join(map(safe_str, cmd.args)) msg = ( f"Command # {cmd.position + 1} ({command}) of pipeline " - f"caused error: {result.args}" + f"caused error on node {name}: " + f"{result.args}" ) result.args = (msg,) + result.args[1:] raise result @@ -1581,7 +1588,7 @@ async def _execute( client.replace_default_node() break - return [cmd.result for cmd in stack] + return [cmd.unwrap_result() for cmd in stack] def _split_command_across_slots( self, command: str, *keys: KeyT @@ -1620,7 +1627,25 @@ def __init__(self, position: int, *args: Any, **kwargs: Any) -> None: self.args = args self.kwargs = kwargs self.position = position - self.result: Union[Any, Exception] = None + self.result: Dict[str, Union[Any, Exception]] = {} + self.target_nodes = None + + def set_node_result(self, node_name: str, result: Union[Any, Exception]): + self.result[node_name] = result + + def unwrap_result( + self, + ) -> Optional[Union[Union[Any, Exception], Dict[str, Union[Any, Exception]]]]: + if len(self.result) == 0: + return None + if len(self.result) == 1: + return next(iter(self.result.values())) + return self.result + + def get_first_exception(self) -> Optional[Tuple[str, Exception]]: + return next( + ((n, r) for n, r in self.result.items() if isinstance(r, Exception)), None + ) def __repr__(self) -> str: return f"[{self.position}] {self.args} ({self.kwargs})" From da5378c32ddcbed6344b5d9c093041b542a92657 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Mon, 18 Nov 2024 16:58:49 -0800 Subject: [PATCH 2/4] Nits --- redis/asyncio/cluster.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 1f548e3d56..0db833860c 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1567,6 +1567,8 @@ async def _execute( if name_exc: name, exc = name_exc command = " ".join(map(safe_str, cmd.args)) + # Note: this will only raise the first exception, but that is + # consistent with RedisCluster.execute_command. msg = ( f"Command # {cmd.position + 1} ({command}) of pipeline " f"caused error on node {name}: " @@ -1635,7 +1637,7 @@ def set_node_result(self, node_name: str, result: Union[Any, Exception]): def unwrap_result( self, - ) -> Optional[Union[Union[Any, Exception], Dict[str, Union[Any, Exception]]]]: + ) -> Optional[Union[Any, Exception, Dict[str, Union[Any, Exception]]]]: if len(self.result) == 0: return None if len(self.result) == 1: From e7fcd7b3925dddc52289df5b52a827350372259d Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 10 Dec 2024 11:55:47 -0800 Subject: [PATCH 3/4] Better exception handling --- redis/asyncio/cluster.py | 40 ++++++++++++++++++++++++++-------------- 1 file changed, 26 insertions(+), 14 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index 0db833860c..ac13b06bf1 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1515,7 +1515,7 @@ async def _execute( allow_redirections: bool = True, ) -> List[Any]: todo = [ - cmd for cmd in stack if not cmd.result or isinstance(cmd.result, Exception) + cmd for cmd in stack if not cmd.unwrap_result() or cmd.get_all_exceptions() ] nodes = {} @@ -1548,18 +1548,22 @@ async def _execute( if allow_redirections: # send each errored command individually for cmd in todo: - if isinstance(cmd.result, (TryAgainError, MovedError, AskError)): - try: - result = await client.execute_command( - *cmd.args, **cmd.kwargs - ) - except Exception as e: - result = e + for name, exc in cmd.get_all_exceptions(): + if isinstance(exc, (TryAgainError, MovedError, AskError)): + try: + result = await client.execute_command( + *cmd.args, **cmd.kwargs + ) + except Exception as e: + result = e - if isinstance(result, dict): - cmd.result = result - else: - cmd.set_node_result(cmd.target_nodes[0].name, result) + if isinstance(result, dict): + cmd.result = result + else: + cmd.set_node_result(name, result) + + # We have already retried the command on all nodes. + break if raise_on_error: for cmd in todo: @@ -1583,11 +1587,16 @@ async def _execute( # to replace it. # Note: when the error is raised we'll reset the default node in the # caller function. + has_exc = False for cmd in default_node[1]: # Check if it has a command that failed with a relevant # exception - if type(cmd.result) in self.__class__.ERRORS_ALLOW_RETRY: - client.replace_default_node() + for name, exc in cmd.get_all_exceptions(): + if type(exc) in self.__class__.ERRORS_ALLOW_RETRY: + client.replace_default_node() + has_exc = True + break + if has_exc: break return [cmd.unwrap_result() for cmd in stack] @@ -1649,5 +1658,8 @@ def get_first_exception(self) -> Optional[Tuple[str, Exception]]: ((n, r) for n, r in self.result.items() if isinstance(r, Exception)), None ) + def get_all_exceptions(self) -> List[Tuple[str, Exception]]: + return [(n, r) for n, r in self.result.items() if isinstance(r, Exception)] + def __repr__(self) -> str: return f"[{self.position}] {self.args} ({self.kwargs})" From cf1875daea6d0e28f62d68bb63298fb3b29dfcc1 Mon Sep 17 00:00:00 2001 From: Antoni Baum Date: Tue, 17 Dec 2024 09:54:04 -0800 Subject: [PATCH 4/4] Fix unbound variable Co-authored-by: Edmund L. Wong --- redis/asyncio/cluster.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/redis/asyncio/cluster.py b/redis/asyncio/cluster.py index ac13b06bf1..492e36c85d 100644 --- a/redis/asyncio/cluster.py +++ b/redis/asyncio/cluster.py @@ -1576,10 +1576,10 @@ async def _execute( msg = ( f"Command # {cmd.position + 1} ({command}) of pipeline " f"caused error on node {name}: " - f"{result.args}" + f"{exc.args}" ) - result.args = (msg,) + result.args[1:] - raise result + exc.args = (msg,) + exc.args[1:] + raise exc default_node = nodes.get(client.get_default_node().name) if default_node is not None: