-
Notifications
You must be signed in to change notification settings - Fork 731
Open
Labels
Description
When using AgentOps with Agent Lightning, the library sometimes captures incomplete message objects. These incomplete messages are passed to convert_to_openai_messages, which expects required keys such as "role" and "tool_calls" to always exist.
def convert_to_openai_messages(prompt_completion_list: List[_RawSpanInfo]) -> Generator[OpenAIMessages, None, None]:
"""Convert raw trace payloads into OpenAI-style chat messages.
The function consumes an iterable produced by
[`TraceToMessages.adapt()`][agentlightning.TraceToMessages.adapt] and yields
structures that match the OpenAI fine-tuning JSONL schema, including tool definitions.
Args:
prompt_completion_list: Raw prompt/completion/tool payloads extracted from a trace.
Returns:
A generator that yields [`OpenAIMessages`][agentlightning.adapter.messages.OpenAIMessages]
entries compatible with the OpenAI Functions fine-tuning format.
"""
# Import locally to avoid legacy OpenAI version type import errors
from openai.types.chat import (
ChatCompletionAssistantMessageParam,
ChatCompletionFunctionToolParam,
ChatCompletionMessageFunctionToolCallParam,
ChatCompletionMessageParam,
)
for pc_entry in prompt_completion_list:
messages: List[ChatCompletionMessageParam] = []
# Extract messages
for msg in pc_entry["prompt"]:
**role = msg["role"]**
if role == "assistant" and "tool_calls" in msg:
# Use the tool_calls directly
# This branch is usually not used in the wild.
tool_calls: List[ChatCompletionMessageFunctionToolCallParam] = [
ChatCompletionMessageFunctionToolCallParam(
**id=call["id"],**
type="function",
function={"name": call["name"], "arguments": call["arguments"]},
)
for call in msg["tool_calls"]
]
messages.append(
ChatCompletionAssistantMessageParam(role="assistant", content=None, tool_calls=tool_calls)
)
else:
# Normal user/system/tool content
message = cast(
ChatCompletionMessageParam,
TypeAdapter(ChatCompletionMessageParam).validate_python(
dict(role=role, content=msg.get("content", ""), tool_call_id=msg.get("tool_call_id", None))
),
)
messages.append(message)
# Extract completions (assistant outputs after tool responses)
for comp in pc_entry["completion"]:
if comp.get("role") == "assistant":
content = comp.get("content")
if pc_entry["tools"]:
tool_calls = [
ChatCompletionMessageFunctionToolCallParam(
id=tool["call"]["id"],
type=tool["call"]["type"],
function={"name": tool["name"], "arguments": tool["parameters"]},
)
for tool in pc_entry["tools"]
]
messages.append(
ChatCompletionAssistantMessageParam(role="assistant", content=content, tool_calls=tool_calls)
)
else:
messages.append(ChatCompletionAssistantMessageParam(role="assistant", content=content))
# Build tools definitions (if available)
if "functions" in pc_entry["request"]:
tools = [
ChatCompletionFunctionToolParam(
type="function",
function={
"name": fn["name"],
"description": fn.get("description", ""),
"parameters": (
json.loads(fn["parameters"]) if isinstance(fn["parameters"], str) else fn["parameters"]
),
},
)
for fn in pc_entry["request"]["functions"]
]
yield OpenAIMessages(messages=messages, tools=tools)
else:
yield OpenAIMessages(messages=messages, tools=None)