-
Notifications
You must be signed in to change notification settings - Fork 495
chore(ci): enabling A & B rules in Ruff #305
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -38,15 +38,13 @@ def __init__(self, options: AnthropicAgentOptions): | |
if self.streaming: | ||
if not isinstance(options.client, AsyncAnthropic): | ||
raise ValueError("If streaming is enabled, the provided client must be an AsyncAnthropic client") | ||
else: | ||
if not isinstance(options.client, Anthropic): | ||
raise ValueError("If streaming is disabled, the provided client must be an Anthropic client") | ||
elif not isinstance(options.client, Anthropic): | ||
raise ValueError("If streaming is disabled, the provided client must be an Anthropic client") | ||
self.client = options.client | ||
elif self.streaming: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Simplifying logic. |
||
self.client = AsyncAnthropic(api_key=options.api_key) | ||
else: | ||
if self.streaming: | ||
self.client = AsyncAnthropic(api_key=options.api_key) | ||
else: | ||
self.client = Anthropic(api_key=options.api_key) | ||
self.client = Anthropic(api_key=options.api_key) | ||
|
||
self.system_prompt = '' | ||
self.custom_variables = {} | ||
|
@@ -146,7 +144,7 @@ def _build_input( | |
system_prompt: str | ||
) -> dict: | ||
"""Build the conversation command with all necessary configurations.""" | ||
input = { | ||
json_input = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoiding variables names that shadow built-in python functions. |
||
"model": self.model_id, | ||
"max_tokens": self.inference_config.get('maxTokens'), | ||
"messages": messages, | ||
|
@@ -157,9 +155,9 @@ def _build_input( | |
} | ||
|
||
if self.tool_config: | ||
input["tools"] = self._prepare_tool_config() | ||
json_input["tools"] = self._prepare_tool_config() | ||
|
||
return input | ||
return json_input | ||
|
||
def _get_max_recursions(self) -> int: | ||
"""Get the maximum number of recursions based on tool configuration.""" | ||
|
@@ -169,7 +167,7 @@ def _get_max_recursions(self) -> int: | |
|
||
async def _handle_streaming( | ||
self, | ||
input: dict, | ||
payload_input: dict, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoiding parameters names that shadow built-in python functions. |
||
messages: list[Any], | ||
max_recursions: int | ||
) -> AsyncIterable[Any]: | ||
|
@@ -181,7 +179,7 @@ async def stream_generator(): | |
nonlocal continue_with_tools, final_response, max_recursions | ||
|
||
while continue_with_tools and max_recursions > 0: | ||
response = self.handle_streaming_response(input) | ||
response = self.handle_streaming_response(payload_input) | ||
|
||
async for chunk in response: | ||
if chunk.final_message: | ||
|
@@ -190,9 +188,9 @@ async def stream_generator(): | |
yield chunk | ||
|
||
if any('tool_use' in content.type for content in final_response.content): | ||
input['messages'].append({"role": "assistant", "content": final_response.content}) | ||
payload_input['messages'].append({"role": "assistant", "content": final_response.content}) | ||
tool_response = await self._process_tool_block(final_response, messages) | ||
input['messages'].append(tool_response) | ||
payload_input['messages'].append(tool_response) | ||
else: | ||
continue_with_tools = False | ||
# yield las message | ||
|
@@ -205,32 +203,30 @@ async def stream_generator(): | |
async def _process_with_strategy( | ||
self, | ||
streaming: bool, | ||
input: dict, | ||
payload_input: dict, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoiding parameters names that shadow built-in python functions. |
||
messages: list[Any] | ||
) -> ConversationMessage | AsyncIterable[Any]: | ||
"""Process the request using the specified strategy.""" | ||
|
||
max_recursions = self._get_max_recursions() | ||
|
||
if streaming: | ||
return await self._handle_streaming(input, messages, max_recursions) | ||
return await self._handle_single_response_loop(input, messages, max_recursions) | ||
return await self._handle_streaming(payload_input, messages, max_recursions) | ||
return await self._handle_single_response_loop(payload_input, messages, max_recursions) | ||
|
||
async def _process_tool_block(self, llm_response: Any, conversation: list[Any]) -> (Any): | ||
async def _process_tool_block(self, llm_response: Any, conversation: list[Any]) -> Any: | ||
if 'useToolHandler' in self.tool_config: | ||
# tool process logic is handled elsewhere | ||
tool_response = await self.tool_config['useToolHandler'](llm_response, conversation) | ||
elif isinstance(self.tool_config['tool'], AgentTools): | ||
tool_response = await self.tool_config['tool'].tool_handler(AgentProviderType.ANTHROPIC.value, llm_response, conversation) | ||
else: | ||
# tool process logic is handled in AgentTools class | ||
if isinstance(self.tool_config['tool'], AgentTools): | ||
tool_response = await self.tool_config['tool'].tool_handler(AgentProviderType.ANTHROPIC.value, llm_response, conversation) | ||
else: | ||
raise ValueError("You must use class when not providing a custom tool handler") | ||
raise ValueError("You must use class when not providing a custom tool handler") | ||
return tool_response | ||
|
||
async def _handle_single_response_loop( | ||
self, | ||
input: Any, | ||
payload_input: Any, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoiding parameters names that shadow built-in python functions. |
||
messages: list[Any], | ||
max_recursions: int | ||
) -> ConversationMessage: | ||
|
@@ -240,11 +236,11 @@ async def _handle_single_response_loop( | |
llm_response = None | ||
|
||
while continue_with_tools and max_recursions > 0: | ||
llm_response = await self.handle_single_response(input) | ||
llm_response = await self.handle_single_response(payload_input) | ||
if any('tool_use' in content.type for content in llm_response.content): | ||
input['messages'].append({"role": "assistant", "content": llm_response.content}) | ||
payload_input['messages'].append({"role": "assistant", "content": llm_response.content}) | ||
tool_response = await self._process_tool_block(llm_response, messages) | ||
input['messages'].append(tool_response) | ||
payload_input['messages'].append(tool_response) | ||
else: | ||
continue_with_tools = False | ||
|
||
|
@@ -263,23 +259,19 @@ async def process_request( | |
|
||
messages = self._prepare_conversation(input_text, chat_history) | ||
system_prompt = await self._prepare_system_prompt(input_text) | ||
input = self._build_input(messages, system_prompt) | ||
json_input = self._build_input(messages, system_prompt) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoiding variables names that shadow built-in python functions. |
||
|
||
return await self._process_with_strategy(self.streaming, input, messages) | ||
return await self._process_with_strategy(self.streaming, json_input, messages) | ||
|
||
async def handle_single_response(self, input_data: dict) -> Any: | ||
try: | ||
response = self.client.messages.create(**input_data) | ||
return response | ||
return self.client.messages.create(**input_data) | ||
except Exception as error: | ||
Logger.error(f"Error invoking Anthropic: {error}") | ||
raise error | ||
|
||
async def handle_streaming_response(self, input) -> AsyncGenerator[AgentStreamResponse, None]: | ||
message = {} | ||
content = [] | ||
accumulated = {} | ||
message['content'] = content | ||
async def handle_streaming_response(self, input) -> AsyncGenerator[AgentStreamResponse, None]: # noqa: A002 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding |
||
accumulated: dict[str, Any] = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. content and message was not being used. |
||
|
||
try: | ||
async with self.client.messages.stream(**input) as stream: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -62,7 +62,7 @@ async def process_request( | |
|
||
except Exception as error: | ||
Logger.error(f"Error processing request with agent {agent.name}:{str(error)}") | ||
raise f"Error processing request with agent {agent.name}:{str(error)}" | ||
raise f"Error processing request with agent {agent.name}:{str(error)}" from error | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exceptions should propagate the original error for complete stack trace. |
||
|
||
return final_response | ||
|
||
|
@@ -83,4 +83,4 @@ def create_default_response(self) -> ConversationMessage: | |
return ConversationMessage( | ||
role=ParticipantRole.ASSISTANT.value, | ||
content=[{"text": self.default_output}] | ||
) | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -253,7 +253,7 @@ def _format_agents_memory(self, agents_history: list[ConversationMessage]) -> st | |
return ''.join( | ||
f"{user_msg.role}:{user_msg.content[0].get('text','')}\n" | ||
f"{asst_msg.role}:{asst_msg.content[0].get('text','')}\n" | ||
for user_msg, asst_msg in zip(agents_history[::2], agents_history[1::2]) | ||
for user_msg, asst_msg in zip(agents_history[::2], agents_history[1::2], strict=True) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Adding strict parameter to make sure |
||
if self.id not in asst_msg.content[0].get('text', '') | ||
) | ||
|
||
|
@@ -287,4 +287,4 @@ async def process_request( | |
|
||
except Exception as e: | ||
Logger.error(f"Error in process_request: {e}") | ||
raise e | ||
raise e |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -137,10 +137,13 @@ async def agent_process_request(self, | |
user_id: str, | ||
session_id: str, | ||
classifier_result: ClassifierResult, | ||
additional_params: dict[str, str] = {}, | ||
additional_params: dict[str, str] | None = None, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should not add mutable parameters in the method signature. Moving the check to inside the function. |
||
stream_response: bool | None = False # wether to stream back the response from the agent | ||
) -> AgentResponse: | ||
"""Process agent response and handle chat storage.""" | ||
|
||
if additional_params is None: | ||
additional_params = {} | ||
try: | ||
agent_response = await self.dispatch_to_agent({ | ||
"user_input": user_input, | ||
|
@@ -180,7 +183,6 @@ async def process_stream(): | |
yield chunk | ||
else: | ||
Logger.error("Invalid response type from agent. Expected AgentStreamResponse") | ||
pass | ||
|
||
if full_message: | ||
await self.save_message(full_message, | ||
|
@@ -199,7 +201,7 @@ async def process_stream() -> ConversationMessage: | |
full_message = chunk.final_message | ||
else: | ||
Logger.error("Invalid response type from agent. Expected AgentStreamResponse") | ||
pass | ||
|
||
|
||
if full_message: | ||
await self.save_message(full_message, | ||
|
@@ -227,13 +229,10 @@ async def process_stream() -> ConversationMessage: | |
self.logger.error(f"Error during agent processing: {str(error)}") | ||
raise error | ||
|
||
async def route_request(self, | ||
user_input: str, | ||
user_id: str, | ||
session_id: str, | ||
additional_params: dict[str, str] = {}, | ||
stream_response: bool | None = False) -> AgentResponse: | ||
async def route_request(self, user_input: str, user_id: str, session_id: str, additional_params: dict[str, str] | None = None, stream_response: bool | None = False) -> AgentResponse: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should not add mutable parameters in the method signature. Moving the check to inside the function. |
||
"""Route user request to appropriate agent.""" | ||
if additional_params is None: | ||
additional_params = {} | ||
self.execution_times.clear() | ||
|
||
try: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -84,7 +84,7 @@ def _extract_properties(self, func: Callable) -> dict[str, dict[str, Any]]: | |
param_descriptions[param_name] = description | ||
|
||
properties = {} | ||
for param_name, param in sig.parameters.items(): | ||
for param_name, _param in sig.parameters.items(): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Variables is not being used, so, add |
||
# Skip 'self' parameter for class methods | ||
if param_name == 'self': | ||
continue | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simplifying logic.