18
18
OPENAI_API_KEY = os .getenv ('OPENAI_API_KEY' )
19
19
PORT = int (os .getenv ('PORT' , 5050 ))
20
20
SYSTEM_MESSAGE = (
21
- "You are a helpful and bubbly AI assistant who loves to chat about "
21
+ "You are a helpful and bubbly AI assistant who loves to chat"
22
22
"anything the user is interested in and is prepared to offer them facts. "
23
23
"Always stay positive.."
24
24
)
29
29
'input_audio_buffer.speech_stopped' , 'input_audio_buffer.speech_started' ,
30
30
'session.created'
31
31
]
32
+ SHOW_TIMING_MATH = False
32
33
33
34
# In-memory storage for set_memory tool
34
35
memory_storage = {}
@@ -65,6 +66,7 @@ def __init__(
65
66
self .temperature = temperature
66
67
self .base_url = "wss://api.openai.com/v1/realtime"
67
68
self .turn_detection_mode = turn_detection_mode
69
+ # Track current response state
68
70
self ._current_response_id = None
69
71
self ._current_item_id = None
70
72
self ._is_responding = False
@@ -79,6 +81,24 @@ async def connect(self) -> None:
79
81
self .ws = await websockets .connect (url , extra_headers = headers )
80
82
await self .initialize_session ()
81
83
84
+
85
+ async def send_initial_conversation_item (self ):
86
+ """Send initial conversation item if AI talks first."""
87
+ initial_conversation_item = {
88
+ "type" : "conversation.item.create" ,
89
+ "item" : {
90
+ "type" : "message" ,
91
+ "role" : "user" ,
92
+ "content" : [
93
+ {
94
+ "type" : "input_text" ,
95
+ "text" : "Greet the user with 'Hello there! I am an AI voice assistant powered by Twilio and the OpenAI Realtime API. How can I help you?'"
96
+ }
97
+ ]
98
+ }
99
+ }
100
+ await self .ws .send (json .dumps (initial_conversation_item ))
101
+
82
102
async def initialize_session (self ) -> None :
83
103
"""Control initial session with OpenAI and register tools."""
84
104
tools = [
@@ -130,43 +150,23 @@ async def initialize_session(self) -> None:
130
150
print ('Sending session update:' , json .dumps (session_update ))
131
151
await self .ws .send (json .dumps (session_update ))
132
152
133
- async def get_weather (self , parameters : Dict [str , Any ]) -> Dict [str , Any ]:
134
- """Fetch weather information."""
135
- lat = parameters .get ("lat" )
136
- lng = parameters .get ("lng" )
137
- location = parameters .get ("location" , "the specified location" )
153
+ await self .send_initial_conversation_item (self .ws )
138
154
139
- if lat is None or lng is None :
140
- return {"error" : "Latitude and longitude are required." }
141
-
142
- try :
143
- url = f"https://api.open-meteo.com/v1/forecast?latitude={ lat } &longitude={ lng } ¤t_weather=true"
144
- async with aiohttp .ClientSession () as session :
145
- async with session .get (url ) as response :
146
- if response .status == 200 :
147
- data = await response .json ()
148
- weather = data .get ("current_weather" , {})
149
- return {
150
- "temperature" : weather .get ("temperature" ),
151
- "wind_speed" : weather .get ("windspeed" ),
152
- "units" : {
153
- "temperature" : "°C" ,
154
- "wind_speed" : "m/s"
155
- }
156
- }
157
- else :
158
- return {"error" : f"Failed to fetch weather (HTTP { response .status } )." }
159
- except Exception as e :
160
- return {"error" : str (e )}
161
-
162
- # i dont think this works 'type': 'function_call' 'name': 'get_weather'
163
- # add cursor documentation -> realtime docs
164
- async def handle_function_call (self , function_name : str , parameters : Dict [str , Any ]) -> Dict [str , Any ]:
165
- if function_name == "get_weather" :
166
- return await self .get_weather (parameters )
167
- else :
168
- print (f"Unknown function: { function_name } " )
169
- return {}
155
+ async def send_text (self , text : str ) -> None :
156
+ """Send text message to the API."""
157
+ event = {
158
+ "type" : "conversation.item.create" ,
159
+ "item" : {
160
+ "type" : "message" ,
161
+ "role" : "user" ,
162
+ "content" : [{
163
+ "type" : "input_text" ,
164
+ "text" : text
165
+ }]
166
+ }
167
+ }
168
+ await self .ws .send (json .dumps (event ))
169
+ await self .create_response ()
170
170
171
171
async def handle_messages (self ) -> None :
172
172
try :
@@ -175,33 +175,108 @@ async def handle_messages(self) -> None:
175
175
event_type = event .get ("type" )
176
176
if event_type in LOG_EVENT_TYPES :
177
177
print (f"Received event: { event_type } " , event )
178
-
179
- if event_type == "response.done" :
180
- # Parse the output to find the function call
181
- output = event .get ("response" , {}).get ("output" , [])
182
- for item in output :
183
- if item .get ("type" ) == "function_call" :
184
- function_name = item .get ("name" )
185
- call_arguments = json .loads (item .get ("arguments" , "{}" ))
186
- result = await self .handle_function_call (function_name , call_arguments )
187
-
188
- # Send the function response back to the WebSocket
189
- response = {
190
- "type" : "function.response" ,
191
- "name" : function_name ,
192
- "result" : result
193
- }
194
- await self .ws .send (json .dumps (response ))
178
+ if event_type == "response.text.delta" and self .on_text_delta :
179
+ self .on_text_delta (event ["delta" ])
180
+ elif event_type == "response.audio.delta" and self .on_audio_delta :
181
+ audio_bytes = base64 .b64decode (event ["delta" ])
182
+ self .on_audio_delta (audio_bytes )
195
183
except websockets .exceptions .ConnectionClosed :
196
184
print ("Connection closed" )
197
185
except Exception as e :
198
- print (f"Error: { str (e )} " )
186
+ print (f"Error in message handling: { str (e )} " )
187
+
188
+ async def close (self ) -> None :
189
+ """Close the WebSocket connection."""
190
+ if self .ws :
191
+ await self .ws .close ()
199
192
200
193
app = FastAPI ()
201
194
195
+ if not OPENAI_API_KEY :
196
+ raise ValueError ('Missing the OpenAI API key. Please set it in the .env file.' )
197
+
198
+ @app .get ("/" , response_class = JSONResponse )
199
+ async def index_page ():
200
+ return {"message" : "Twilio Media Stream Server is running!" }
201
+
202
+ @app .api_route ("/incoming-call" , methods = ["GET" , "POST" ])
203
+ async def handle_incoming_call (request : Request ):
204
+ """Handle incoming call and return TwiML response to connect to Media Stream."""
205
+ response = VoiceResponse ()
206
+ host = request .url .hostname
207
+ connect = Connect ()
208
+ connect .stream (url = f'wss://{ host } /media-stream' )
209
+ response .append (connect )
210
+ return HTMLResponse (content = str (response ), media_type = "application/xml" )
211
+
202
212
@app .websocket ("/media-stream" )
203
213
async def handle_media_stream (websocket : WebSocket ):
214
+ """Handle WebSocket connections between Twilio and OpenAI."""
215
+ print ("Client connected" )
204
216
await websocket .accept ()
217
+
218
+ # Initialize variables
219
+ stream_sid = None
220
+ latest_media_timestamp = 0
221
+
222
+ # Set up OpenAI Realtime client
205
223
client = RealtimeClient (api_key = OPENAI_API_KEY )
206
224
await client .connect ()
207
- await asyncio .gather (client .handle_messages ())
225
+
226
+ async def receive_from_twilio ():
227
+ """Receive audio data from Twilio and send it to the OpenAI Realtime API."""
228
+ nonlocal stream_sid , latest_media_timestamp
229
+ try :
230
+ async for message in websocket .iter_text ():
231
+ data = json .loads (message )
232
+ if data ['event' ] == 'media' and client .ws .open :
233
+ latest_media_timestamp = int (data ['media' ]['timestamp' ])
234
+ audio_append = {
235
+ "type" : "input_audio_buffer.append" ,
236
+ "audio" : data ['media' ]['payload' ]
237
+ }
238
+ await client .ws .send (json .dumps (audio_append ))
239
+ elif data ['event' ] == 'start' :
240
+ stream_sid = data ['start' ]['streamSid' ]
241
+ print (f"Incoming stream has started { stream_sid } " )
242
+ except WebSocketDisconnect :
243
+ print ("Client disconnected." )
244
+ await client .close ()
245
+
246
+ async def send_to_twilio ():
247
+ """Receive events from the OpenAI Realtime API and handle audio responses."""
248
+ nonlocal stream_sid
249
+ try :
250
+ async for openai_message in client .ws :
251
+ response = json .loads (openai_message )
252
+
253
+ # Log received event
254
+ if response .get ("type" ):
255
+ print (f"Received event: { response ['type' ]} " , response )
256
+
257
+ # Process audio responses
258
+ if response .get ("type" ) == "response.audio.delta" and "delta" in response :
259
+ # Decode and prepare the audio payload
260
+ audio_payload = base64 .b64encode (
261
+ base64 .b64decode (response ["delta" ])
262
+ ).decode ("utf-8" )
263
+ audio_delta = {
264
+ "event" : "media" ,
265
+ "streamSid" : stream_sid ,
266
+ "media" : {"payload" : audio_payload },
267
+ }
268
+ # Send the audio payload back to Twilio
269
+ await websocket .send_json (audio_delta )
270
+
271
+ # Handle other response types as needed
272
+ if response .get ("type" ) == "response.done" :
273
+ print (f"Response completed: { response } " )
274
+
275
+ except Exception as e :
276
+ print (f"Error in send_to_twilio: { e } " )
277
+
278
+ await asyncio .gather (receive_from_twilio (), send_to_twilio ())
279
+
280
+ if __name__ == "__main__" :
281
+ import uvicorn
282
+ uvicorn .run (app , host = "0.0.0.0" , port = PORT )
0 commit comments