Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
251 changes: 251 additions & 0 deletions examples/function_calling_openai_compliant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,251 @@
#!/usr/bin/env python3
"""
OpenAI-Compliant Function Calling Example for EXO

This example demonstrates the server-side tool calling implementation
that returns OpenAI-compliant responses with tool_calls arrays.

The server now:
1. Parses tool calls from model output automatically
2. Returns properly formatted tool_calls array
3. Sets finish_reason to "tool_calls" when tools are invoked
4. Handles both streaming and non-streaming responses

No client-side parsing needed anymore!
"""

import json
import requests

def get_current_weather(location: str, unit: str = "celsius"):
"""Mock weather data function"""
return {
"location": location,
"temperature": 22 if unit == "celsius" else 72,
"unit": unit,
"forecast": "Sunny with light clouds"
}


def chat_completion(messages, tools=None, stream=False):
"""Send chat completion request to EXO server"""
payload = {
"model": "llama-3.2-1b", # or your preferred model
"messages": messages,
"temperature": 0.7,
"stream": stream
}

if tools:
payload["tools"] = tools

response = requests.post(
"http://localhost:52415/v1/chat/completions",
json=payload,
stream=stream
)

if stream:
return response
else:
return response.json()


def main():
"""
Demonstrates OpenAI-compliant tool calling workflow.

The server now returns responses in proper OpenAI format:
{
"choices": [{
"message": {
"role": "assistant",
"content": "...", # content before tool calls (or null)
"tool_calls": [{ # OpenAI-formatted tool calls
"id": "call_xyz123",
"type": "function",
"function": {
"name": "get_current_weather",
"arguments": "{\"location\": \"Boston, MA\"}" # JSON string
}
}]
},
"finish_reason": "tool_calls" # Set when tools are called
}]
}
"""

# Define tools in OpenAI format
tools = [{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "The city and state, e.g. San Francisco, CA"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit"
}
},
"required": ["location"]
}
}
}]

# Initial conversation
messages = [{
"role": "user",
"content": "Hi there, what's the weather in Boston?"
}]

print("User: Hi there, what's the weather in Boston?\n")

# Get initial response with tools
print("Sending request to EXO server...")
response = chat_completion(messages, tools=tools)

print(f"\nServer Response:")
print(json.dumps(response, indent=2))

# Extract assistant message
assistant_message = response["choices"][0]["message"]
messages.append(assistant_message)

# Check if assistant called any tools
if "tool_calls" in assistant_message:
print(f"\n✅ Tool calls detected! The server parsed them automatically.")
print(f"Number of tool calls: {len(assistant_message['tool_calls'])}")
print(f"Finish reason: {response['choices'][0]['finish_reason']}")

# Execute each tool call
for tool_call in assistant_message["tool_calls"]:
function_name = tool_call["function"]["name"]
function_args = json.loads(tool_call["function"]["arguments"])

print(f"\nExecuting tool: {function_name}")
print(f"Arguments: {function_args}")

# Call the actual function
if function_name == "get_current_weather":
result = get_current_weather(**function_args)
else:
result = {"error": f"Unknown function: {function_name}"}

print(f"Result: {result}")

# Add tool response to conversation
messages.append({
"role": "tool",
"tool_call_id": tool_call["id"], # Link back to the tool call
"name": function_name,
"content": json.dumps(result)
})

# Get final response with tool results
print("\nSending tool results back to model...")
final_response = chat_completion(messages, tools=tools)

final_message = final_response["choices"][0]["message"]
print(f"\nAssistant: {final_message.get('content', '')}")

messages.append(final_message)

else:
print(f"\nAssistant: {assistant_message.get('content', '')}")
print("\n(Model chose not to call any tools)")

# Print full conversation
print("\n" + "="*60)
print("Full Conversation History:")
print("="*60)
for msg in messages:
role = msg["role"].upper()
if "tool_calls" in msg:
print(f"\n{role}: [Called {len(msg['tool_calls'])} tool(s)]")
for tc in msg["tool_calls"]:
print(f" - {tc['function']['name']}({tc['function']['arguments']})")
elif role == "TOOL":
print(f"\n{role} ({msg['name']}): {msg['content']}")
else:
print(f"\n{role}: {msg.get('content', '')}")


def demo_parallel_tools():
"""
Demonstrates parallel tool calling (multiple tools in one response).

The server can detect and return multiple tool calls from a single
model response, enabling efficient parallel execution.
"""
print("\n" + "="*60)
print("DEMO: Parallel Tool Calling")
print("="*60)

tools = [{
"type": "function",
"function": {
"name": "get_current_weather",
"description": "Get the current weather in a given location",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string"}
},
"required": ["location"]
}
}
}]

messages = [{
"role": "user",
"content": "What's the weather in Boston, New York, and San Francisco?"
}]

response = chat_completion(messages, tools=tools)
assistant_message = response["choices"][0]["message"]

if "tool_calls" in assistant_message:
print(f"\n✅ Parallel tool calls detected!")
print(f"Number of simultaneous calls: {len(assistant_message['tool_calls'])}")

for i, tc in enumerate(assistant_message["tool_calls"], 1):
args = json.loads(tc["function"]["arguments"])
print(f"{i}. {tc['function']['name']}(location={args.get('location')})")
else:
print("\n⚠️ Model did not make parallel tool calls")


if __name__ == "__main__":
print("="*60)
print(" EXO OpenAI-Compliant Tool Calling Demo")
print("="*60)
print("\nThis demonstrates server-side tool calling with OpenAI format.")
print("No client-side parsing required!\n")

try:
main()
demo_parallel_tools()

print("\n" + "="*60)
print("Key Implementation Features:")
print("="*60)
print("✅ Server-side parsing of tool calls from model output")
print("✅ OpenAI-compliant response format with tool_calls array")
print("✅ Proper finish_reason='tool_calls' when tools are invoked")
print("✅ Support for parallel tool calling")
print("✅ Works with both streaming and non-streaming")
print("✅ Arguments always returned as JSON strings (not objects)")
print("✅ Unique tool_call IDs generated server-side")

except requests.exceptions.ConnectionError:
print("\n❌ Error: Could not connect to EXO server")
print("Make sure EXO is running on http://localhost:52415")
print("\nStart the server with:")
print(" exo --inference-engine mlx")
90 changes: 86 additions & 4 deletions exo/api/chatgpt_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import asyncio
import json
import os
import re
from pathlib import Path
from transformers import AutoTokenizer
from typing import List, Literal, Union, Dict, Optional
Expand Down Expand Up @@ -33,6 +34,61 @@
import numpy as mx


def parse_tool_calls(content: str) -> tuple[Optional[str], Optional[List[Dict]], Optional[str]]:
"""
Parse tool calls from model output in XML format.

Returns:
tuple of (content_before_tools, tool_calls_list, finish_reason)
- content_before_tools: Text content before first tool call (or None if no tools)
- tool_calls_list: List of tool call dicts with OpenAI format (or None if no tools)
- finish_reason: "tool_calls" if tools found, None otherwise
"""
tool_calls = []

# Find all tool call matches
matches = list(re.finditer(r"<tool_call>\n(.+?)\n</tool_call>", content, re.DOTALL))

if not matches:
return None, None, None

# Get content before first tool call
first_match_start = matches[0].start()
content_before = content[:first_match_start].strip() if first_match_start > 0 else None

# Parse each tool call
for match in matches:
try:
tool_call_json = json.loads(match.group(1))

# Ensure arguments is a JSON string (not an object)
if "arguments" in tool_call_json and isinstance(tool_call_json["arguments"], dict):
tool_call_json["arguments"] = json.dumps(tool_call_json["arguments"])

# Generate unique call ID
call_id = f"call_{uuid.uuid4().hex[:24]}"

# Format according to OpenAI spec
tool_calls.append({
"id": call_id,
"type": "function",
"function": {
"name": tool_call_json.get("name", ""),
"arguments": tool_call_json.get("arguments", "{}")
}
})
except json.JSONDecodeError as e:
if DEBUG >= 2:
print(f"Failed to parse tool call JSON: {match.group(1)}")
print(f"Error: {e}")
continue

if tool_calls:
return content_before, tool_calls, "tool_calls"

return None, None, None


class Message:
def __init__(self, role: str, content: Union[str, List[Dict[str, Union[str, Dict[str, str]]]]], tools: Optional[List[Dict]] = None):
self.role = role
Expand Down Expand Up @@ -64,9 +120,23 @@ def generate_completion(
request_id: str,
tokens: List[int],
stream: bool,
finish_reason: Union[Literal["length", "stop"], None],
finish_reason: Union[Literal["length", "stop", "tool_calls"], None],
object_type: Literal["chat.completion", "text_completion"],
) -> dict:
decoded_content = tokenizer.decode(tokens)

# Parse tool calls from content if tools were provided in request
content_before_tools = None
tool_calls = None
tool_finish_reason = None

if chat_request.tools:
content_before_tools, tool_calls, tool_finish_reason = parse_tool_calls(decoded_content)

# Override finish_reason if tool calls were detected
if tool_finish_reason:
finish_reason = tool_finish_reason

completion = {
"id": f"chatcmpl-{request_id}",
"object": object_type,
Expand All @@ -75,7 +145,7 @@ def generate_completion(
"system_fingerprint": f"exo_{VERSION}",
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": tokenizer.decode(tokens)},
"message": {"role": "assistant", "content": decoded_content},
"logprobs": None,
"finish_reason": finish_reason,
}],
Expand All @@ -91,9 +161,21 @@ def generate_completion(
choice = completion["choices"][0]
if object_type.startswith("chat.completion"):
key_name = "delta" if stream else "message"
choice[key_name] = {"role": "assistant", "content": tokenizer.decode(tokens)}

# Build message/delta content
message_content = {
"role": "assistant",
"content": content_before_tools if tool_calls else decoded_content
}

# Add tool_calls array if tools were called
if tool_calls:
message_content["tool_calls"] = tool_calls

choice[key_name] = message_content

elif object_type == "text_completion":
choice["text"] = tokenizer.decode(tokens)
choice["text"] = decoded_content
else:
ValueError(f"Unsupported response type: {object_type}")

Expand Down
Loading