diff --git a/python/ruff.toml b/python/ruff.toml index d15a705e..e5b0ec88 100644 --- a/python/ruff.toml +++ b/python/ruff.toml @@ -1,7 +1,7 @@ # Enable rules. lint.select = [ - #"A", # flake8-builtins - https://docs.astral.sh/ruff/rules/#flake8-builtins-a - #"B", # flake8-bugbear-b - https://docs.astral.sh/ruff/rules/#flake8-bugbear-b + "A", # flake8-builtins - https://docs.astral.sh/ruff/rules/#flake8-builtins-a + "B", # flake8-bugbear-b - https://docs.astral.sh/ruff/rules/#flake8-bugbear-b #"C4", # flake8-comprehensions - https://docs.astral.sh/ruff/rules/#flake8-comprehensions-c4 #"C90", # mccabe - https://docs.astral.sh/ruff/rules/#mccabe-c90 #"COM", # flak8-commas - https://docs.astral.sh/ruff/rules/#flake8-commas-com diff --git a/python/src/multi_agent_orchestrator/agents/anthropic_agent.py b/python/src/multi_agent_orchestrator/agents/anthropic_agent.py index e33ce5f9..a86ed3a3 100644 --- a/python/src/multi_agent_orchestrator/agents/anthropic_agent.py +++ b/python/src/multi_agent_orchestrator/agents/anthropic_agent.py @@ -38,15 +38,13 @@ def __init__(self, options: AnthropicAgentOptions): if self.streaming: if not isinstance(options.client, AsyncAnthropic): raise ValueError("If streaming is enabled, the provided client must be an AsyncAnthropic client") - else: - if not isinstance(options.client, Anthropic): - raise ValueError("If streaming is disabled, the provided client must be an Anthropic client") + elif not isinstance(options.client, Anthropic): + raise ValueError("If streaming is disabled, the provided client must be an Anthropic client") self.client = options.client + elif self.streaming: + self.client = AsyncAnthropic(api_key=options.api_key) else: - if self.streaming: - self.client = AsyncAnthropic(api_key=options.api_key) - else: - self.client = Anthropic(api_key=options.api_key) + self.client = Anthropic(api_key=options.api_key) self.system_prompt = '' self.custom_variables = {} @@ -146,7 +144,7 @@ def _build_input( system_prompt: str ) -> dict: """Build the conversation command with all necessary configurations.""" - input = { + json_input = { "model": self.model_id, "max_tokens": self.inference_config.get('maxTokens'), "messages": messages, @@ -157,9 +155,9 @@ def _build_input( } if self.tool_config: - input["tools"] = self._prepare_tool_config() + json_input["tools"] = self._prepare_tool_config() - return input + return json_input def _get_max_recursions(self) -> int: """Get the maximum number of recursions based on tool configuration.""" @@ -169,7 +167,7 @@ def _get_max_recursions(self) -> int: async def _handle_streaming( self, - input: dict, + payload_input: dict, messages: list[Any], max_recursions: int ) -> AsyncIterable[Any]: @@ -181,7 +179,7 @@ async def stream_generator(): nonlocal continue_with_tools, final_response, max_recursions while continue_with_tools and max_recursions > 0: - response = self.handle_streaming_response(input) + response = self.handle_streaming_response(payload_input) async for chunk in response: if chunk.final_message: @@ -190,9 +188,9 @@ async def stream_generator(): yield chunk if any('tool_use' in content.type for content in final_response.content): - input['messages'].append({"role": "assistant", "content": final_response.content}) + payload_input['messages'].append({"role": "assistant", "content": final_response.content}) tool_response = await self._process_tool_block(final_response, messages) - input['messages'].append(tool_response) + payload_input['messages'].append(tool_response) else: continue_with_tools = False # yield las message @@ -205,7 +203,7 @@ async def stream_generator(): async def _process_with_strategy( self, streaming: bool, - input: dict, + payload_input: dict, messages: list[Any] ) -> ConversationMessage | AsyncIterable[Any]: """Process the request using the specified strategy.""" @@ -213,24 +211,22 @@ async def _process_with_strategy( max_recursions = self._get_max_recursions() if streaming: - return await self._handle_streaming(input, messages, max_recursions) - return await self._handle_single_response_loop(input, messages, max_recursions) + return await self._handle_streaming(payload_input, messages, max_recursions) + return await self._handle_single_response_loop(payload_input, messages, max_recursions) - async def _process_tool_block(self, llm_response: Any, conversation: list[Any]) -> (Any): + async def _process_tool_block(self, llm_response: Any, conversation: list[Any]) -> Any: if 'useToolHandler' in self.tool_config: # tool process logic is handled elsewhere tool_response = await self.tool_config['useToolHandler'](llm_response, conversation) + elif isinstance(self.tool_config['tool'], AgentTools): + tool_response = await self.tool_config['tool'].tool_handler(AgentProviderType.ANTHROPIC.value, llm_response, conversation) else: - # tool process logic is handled in AgentTools class - if isinstance(self.tool_config['tool'], AgentTools): - tool_response = await self.tool_config['tool'].tool_handler(AgentProviderType.ANTHROPIC.value, llm_response, conversation) - else: - raise ValueError("You must use class when not providing a custom tool handler") + raise ValueError("You must use class when not providing a custom tool handler") return tool_response async def _handle_single_response_loop( self, - input: Any, + payload_input: Any, messages: list[Any], max_recursions: int ) -> ConversationMessage: @@ -240,11 +236,11 @@ async def _handle_single_response_loop( llm_response = None while continue_with_tools and max_recursions > 0: - llm_response = await self.handle_single_response(input) + llm_response = await self.handle_single_response(payload_input) if any('tool_use' in content.type for content in llm_response.content): - input['messages'].append({"role": "assistant", "content": llm_response.content}) + payload_input['messages'].append({"role": "assistant", "content": llm_response.content}) tool_response = await self._process_tool_block(llm_response, messages) - input['messages'].append(tool_response) + payload_input['messages'].append(tool_response) else: continue_with_tools = False @@ -263,23 +259,19 @@ async def process_request( messages = self._prepare_conversation(input_text, chat_history) system_prompt = await self._prepare_system_prompt(input_text) - input = self._build_input(messages, system_prompt) + json_input = self._build_input(messages, system_prompt) - return await self._process_with_strategy(self.streaming, input, messages) + return await self._process_with_strategy(self.streaming, json_input, messages) async def handle_single_response(self, input_data: dict) -> Any: try: - response = self.client.messages.create(**input_data) - return response + return self.client.messages.create(**input_data) except Exception as error: Logger.error(f"Error invoking Anthropic: {error}") raise error - async def handle_streaming_response(self, input) -> AsyncGenerator[AgentStreamResponse, None]: - message = {} - content = [] - accumulated = {} - message['content'] = content + async def handle_streaming_response(self, input) -> AsyncGenerator[AgentStreamResponse, None]: # noqa: A002 + accumulated: dict[str, Any] = {} try: async with self.client.messages.stream(**input) as stream: diff --git a/python/src/multi_agent_orchestrator/agents/chain_agent.py b/python/src/multi_agent_orchestrator/agents/chain_agent.py index 8139ad7b..2bbbf3e8 100644 --- a/python/src/multi_agent_orchestrator/agents/chain_agent.py +++ b/python/src/multi_agent_orchestrator/agents/chain_agent.py @@ -62,7 +62,7 @@ async def process_request( except Exception as error: Logger.error(f"Error processing request with agent {agent.name}:{str(error)}") - raise f"Error processing request with agent {agent.name}:{str(error)}" + raise f"Error processing request with agent {agent.name}:{str(error)}" from error return final_response @@ -83,4 +83,4 @@ def create_default_response(self) -> ConversationMessage: return ConversationMessage( role=ParticipantRole.ASSISTANT.value, content=[{"text": self.default_output}] - ) \ No newline at end of file + ) diff --git a/python/src/multi_agent_orchestrator/agents/supervisor_agent.py b/python/src/multi_agent_orchestrator/agents/supervisor_agent.py index 0f7418d5..90a999a7 100644 --- a/python/src/multi_agent_orchestrator/agents/supervisor_agent.py +++ b/python/src/multi_agent_orchestrator/agents/supervisor_agent.py @@ -253,7 +253,7 @@ def _format_agents_memory(self, agents_history: list[ConversationMessage]) -> st return ''.join( f"{user_msg.role}:{user_msg.content[0].get('text','')}\n" f"{asst_msg.role}:{asst_msg.content[0].get('text','')}\n" - for user_msg, asst_msg in zip(agents_history[::2], agents_history[1::2]) + for user_msg, asst_msg in zip(agents_history[::2], agents_history[1::2], strict=True) if self.id not in asst_msg.content[0].get('text', '') ) @@ -287,4 +287,4 @@ async def process_request( except Exception as e: Logger.error(f"Error in process_request: {e}") - raise e \ No newline at end of file + raise e diff --git a/python/src/multi_agent_orchestrator/orchestrator.py b/python/src/multi_agent_orchestrator/orchestrator.py index 78a9771f..65d4742b 100644 --- a/python/src/multi_agent_orchestrator/orchestrator.py +++ b/python/src/multi_agent_orchestrator/orchestrator.py @@ -137,10 +137,13 @@ async def agent_process_request(self, user_id: str, session_id: str, classifier_result: ClassifierResult, - additional_params: dict[str, str] = {}, + additional_params: dict[str, str] | None = None, stream_response: bool | None = False # wether to stream back the response from the agent ) -> AgentResponse: """Process agent response and handle chat storage.""" + + if additional_params is None: + additional_params = {} try: agent_response = await self.dispatch_to_agent({ "user_input": user_input, @@ -180,7 +183,6 @@ async def process_stream(): yield chunk else: Logger.error("Invalid response type from agent. Expected AgentStreamResponse") - pass if full_message: await self.save_message(full_message, @@ -199,7 +201,7 @@ async def process_stream() -> ConversationMessage: full_message = chunk.final_message else: Logger.error("Invalid response type from agent. Expected AgentStreamResponse") - pass + if full_message: await self.save_message(full_message, @@ -227,13 +229,10 @@ async def process_stream() -> ConversationMessage: self.logger.error(f"Error during agent processing: {str(error)}") raise error - async def route_request(self, - user_input: str, - user_id: str, - session_id: str, - additional_params: dict[str, str] = {}, - stream_response: bool | None = False) -> AgentResponse: + async def route_request(self, user_input: str, user_id: str, session_id: str, additional_params: dict[str, str] | None = None, stream_response: bool | None = False) -> AgentResponse: """Route user request to appropriate agent.""" + if additional_params is None: + additional_params = {} self.execution_times.clear() try: diff --git a/python/src/multi_agent_orchestrator/utils/tool.py b/python/src/multi_agent_orchestrator/utils/tool.py index 659d8f06..1febbe9c 100644 --- a/python/src/multi_agent_orchestrator/utils/tool.py +++ b/python/src/multi_agent_orchestrator/utils/tool.py @@ -84,7 +84,7 @@ def _extract_properties(self, func: Callable) -> dict[str, dict[str, Any]]: param_descriptions[param_name] = description properties = {} - for param_name, param in sig.parameters.items(): + for param_name, _param in sig.parameters.items(): # Skip 'self' parameter for class methods if param_name == 'self': continue