29
29
'input_audio_buffer.speech_stopped' , 'input_audio_buffer.speech_started' ,
30
30
'session.created'
31
31
]
32
- SHOW_TIMING_MATH = False
33
32
34
33
# In-memory storage for set_memory tool
35
34
memory_storage = {}
@@ -66,7 +65,6 @@ def __init__(
66
65
self .temperature = temperature
67
66
self .base_url = "wss://api.openai.com/v1/realtime"
68
67
self .turn_detection_mode = turn_detection_mode
69
- # Track current response state
70
68
self ._current_response_id = None
71
69
self ._current_item_id = None
72
70
self ._is_responding = False
@@ -132,21 +130,43 @@ async def initialize_session(self) -> None:
132
130
print ('Sending session update:' , json .dumps (session_update ))
133
131
await self .ws .send (json .dumps (session_update ))
134
132
135
- async def send_text (self , text : str ) -> None :
136
- """Send text message to the API."""
137
- event = {
138
- "type" : "conversation.item.create" ,
139
- "item" : {
140
- "type" : "message" ,
141
- "role" : "user" ,
142
- "content" : [{
143
- "type" : "input_text" ,
144
- "text" : text
145
- }]
146
- }
147
- }
148
- await self .ws .send (json .dumps (event ))
149
- await self .create_response ()
133
+ async def get_weather (self , parameters : Dict [str , Any ]) -> Dict [str , Any ]:
134
+ """Fetch weather information."""
135
+ lat = parameters .get ("lat" )
136
+ lng = parameters .get ("lng" )
137
+ location = parameters .get ("location" , "the specified location" )
138
+
139
+ if lat is None or lng is None :
140
+ return {"error" : "Latitude and longitude are required." }
141
+
142
+ try :
143
+ url = f"https://api.open-meteo.com/v1/forecast?latitude={ lat } &longitude={ lng } ¤t_weather=true"
144
+ async with aiohttp .ClientSession () as session :
145
+ async with session .get (url ) as response :
146
+ if response .status == 200 :
147
+ data = await response .json ()
148
+ weather = data .get ("current_weather" , {})
149
+ return {
150
+ "temperature" : weather .get ("temperature" ),
151
+ "wind_speed" : weather .get ("windspeed" ),
152
+ "units" : {
153
+ "temperature" : "°C" ,
154
+ "wind_speed" : "m/s"
155
+ }
156
+ }
157
+ else :
158
+ return {"error" : f"Failed to fetch weather (HTTP { response .status } )." }
159
+ except Exception as e :
160
+ return {"error" : str (e )}
161
+
162
+ # i dont think this works 'type': 'function_call' 'name': 'get_weather'
163
+ # add cursor documentation -> realtime docs
164
+ async def handle_function_call (self , function_name : str , parameters : Dict [str , Any ]) -> Dict [str , Any ]:
165
+ if function_name == "get_weather" :
166
+ return await self .get_weather (parameters )
167
+ else :
168
+ print (f"Unknown function: { function_name } " )
169
+ return {}
150
170
151
171
async def handle_messages (self ) -> None :
152
172
try :
@@ -155,108 +175,33 @@ async def handle_messages(self) -> None:
155
175
event_type = event .get ("type" )
156
176
if event_type in LOG_EVENT_TYPES :
157
177
print (f"Received event: { event_type } " , event )
158
- if event_type == "response.text.delta" and self .on_text_delta :
159
- self .on_text_delta (event ["delta" ])
160
- elif event_type == "response.audio.delta" and self .on_audio_delta :
161
- audio_bytes = base64 .b64decode (event ["delta" ])
162
- self .on_audio_delta (audio_bytes )
178
+
179
+ if event_type == "response.done" :
180
+ # Parse the output to find the function call
181
+ output = event .get ("response" , {}).get ("output" , [])
182
+ for item in output :
183
+ if item .get ("type" ) == "function_call" :
184
+ function_name = item .get ("name" )
185
+ call_arguments = json .loads (item .get ("arguments" , "{}" ))
186
+ result = await self .handle_function_call (function_name , call_arguments )
187
+
188
+ # Send the function response back to the WebSocket
189
+ response = {
190
+ "type" : "function.response" ,
191
+ "name" : function_name ,
192
+ "result" : result
193
+ }
194
+ await self .ws .send (json .dumps (response ))
163
195
except websockets .exceptions .ConnectionClosed :
164
196
print ("Connection closed" )
165
197
except Exception as e :
166
- print (f"Error in message handling: { str (e )} " )
167
-
168
- async def close (self ) -> None :
169
- """Close the WebSocket connection."""
170
- if self .ws :
171
- await self .ws .close ()
198
+ print (f"Error: { str (e )} " )
172
199
173
200
app = FastAPI ()
174
201
175
- if not OPENAI_API_KEY :
176
- raise ValueError ('Missing the OpenAI API key. Please set it in the .env file.' )
177
-
178
- @app .get ("/" , response_class = JSONResponse )
179
- async def index_page ():
180
- return {"message" : "Twilio Media Stream Server is running!" }
181
-
182
- @app .api_route ("/incoming-call" , methods = ["GET" , "POST" ])
183
- async def handle_incoming_call (request : Request ):
184
- """Handle incoming call and return TwiML response to connect to Media Stream."""
185
- response = VoiceResponse ()
186
- host = request .url .hostname
187
- connect = Connect ()
188
- connect .stream (url = f'wss://{ host } /media-stream' )
189
- response .append (connect )
190
- return HTMLResponse (content = str (response ), media_type = "application/xml" )
191
-
192
202
@app .websocket ("/media-stream" )
193
203
async def handle_media_stream (websocket : WebSocket ):
194
- """Handle WebSocket connections between Twilio and OpenAI."""
195
- print ("Client connected" )
196
204
await websocket .accept ()
197
-
198
- # Initialize variables
199
- stream_sid = None
200
- latest_media_timestamp = 0
201
-
202
- # Set up OpenAI Realtime client
203
205
client = RealtimeClient (api_key = OPENAI_API_KEY )
204
206
await client .connect ()
205
-
206
- async def receive_from_twilio ():
207
- """Receive audio data from Twilio and send it to the OpenAI Realtime API."""
208
- nonlocal stream_sid , latest_media_timestamp
209
- try :
210
- async for message in websocket .iter_text ():
211
- data = json .loads (message )
212
- if data ['event' ] == 'media' and client .ws .open :
213
- latest_media_timestamp = int (data ['media' ]['timestamp' ])
214
- audio_append = {
215
- "type" : "input_audio_buffer.append" ,
216
- "audio" : data ['media' ]['payload' ]
217
- }
218
- await client .ws .send (json .dumps (audio_append ))
219
- elif data ['event' ] == 'start' :
220
- stream_sid = data ['start' ]['streamSid' ]
221
- print (f"Incoming stream has started { stream_sid } " )
222
- except WebSocketDisconnect :
223
- print ("Client disconnected." )
224
- await client .close ()
225
-
226
- async def send_to_twilio ():
227
- """Receive events from the OpenAI Realtime API and handle audio responses."""
228
- nonlocal stream_sid
229
- try :
230
- async for openai_message in client .ws :
231
- response = json .loads (openai_message )
232
-
233
- # Log received event
234
- if response .get ("type" ):
235
- print (f"Received event: { response ['type' ]} " , response )
236
-
237
- # Process audio responses
238
- if response .get ("type" ) == "response.audio.delta" and "delta" in response :
239
- # Decode and prepare the audio payload
240
- audio_payload = base64 .b64encode (
241
- base64 .b64decode (response ["delta" ])
242
- ).decode ("utf-8" )
243
- audio_delta = {
244
- "event" : "media" ,
245
- "streamSid" : stream_sid ,
246
- "media" : {"payload" : audio_payload },
247
- }
248
- # Send the audio payload back to Twilio
249
- await websocket .send_json (audio_delta )
250
-
251
- # Handle other response types as needed
252
- if response .get ("type" ) == "response.done" :
253
- print (f"Response completed: { response } " )
254
-
255
- except Exception as e :
256
- print (f"Error in send_to_twilio: { e } " )
257
-
258
- await asyncio .gather (receive_from_twilio (), send_to_twilio ())
259
-
260
- if __name__ == "__main__" :
261
- import uvicorn
262
- uvicorn .run (app , host = "0.0.0.0" , port = PORT )
207
+ await asyncio .gather (client .handle_messages ())
0 commit comments