Skip to content

Commit 7ed007c

Browse files
committed
first commit
0 parents  commit 7ed007c

File tree

6 files changed

+341
-0
lines changed

6 files changed

+341
-0
lines changed

.env

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
LIVEKIT_URL=wss://assistant-fo94nojo.livekit.cloud
2+
LIVEKIT_API_KEY=APIKxvX5Ab5zkDn
3+
LIVEKIT_API_SECRET=BE9uqAxeTEyKTCrX04lHre6Y9lFKCge3N1WvDGfabeSA
4+
DEEPGRAM_API_KEY=f63176125b952fee10ecf06e0b2dc06b48672e6a
5+
OPENAI_API_KEY=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

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
.venv

README.md

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
# LiveKit Assistant
2+
3+
First, create a virtual environment, update pip, and install the required packages:
4+
5+
```
6+
$ python3 -m venv .venv
7+
$ source .venv/bin/activate
8+
$ pip install -U pip
9+
$ pip install -r requirements.txt
10+
```
11+
12+
You need to set up the following environment variables:
13+
14+
```
15+
LIVEKIT_URL=...
16+
LIVEKIT_API_KEY=...
17+
LIVEKIT_API_SECRET=...
18+
DEEPGRAM_API_KEY=...
19+
OPENAI_API_KEY=...
20+
```
21+
22+
Then, run the assistant:
23+
24+
```
25+
$ python3 assistant.py download-files
26+
$ python3 assistant.py start
27+
```
28+
29+
Finally, you can load the [hosted playground](https://agents-playground.livekit.io/).

assistant.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import asyncio
2+
from typing import Annotated
3+
4+
from livekit import agents, rtc
5+
from livekit.agents import JobContext, JobRequest, WorkerOptions, cli, tokenize, tts
6+
from livekit.agents.llm import (
7+
ChatContext,
8+
ChatImage,
9+
ChatMessage,
10+
ChatRole,
11+
)
12+
from livekit.agents.voice_assistant import AssistantContext, VoiceAssistant
13+
from livekit.plugins import deepgram, openai, silero
14+
15+
16+
class AssistantFunction(agents.llm.FunctionContext):
17+
"""This class is used to define functions that will be called by the assistant."""
18+
19+
@agents.llm.ai_callable(
20+
desc=(
21+
"Called when asked to evaluate something that would require vision capabilities,"
22+
"for example, an image, video, or the webcam feed."
23+
)
24+
)
25+
async def image(
26+
self,
27+
user_msg: Annotated[
28+
str,
29+
agents.llm.TypeInfo(desc="The user message that triggered this function"),
30+
],
31+
):
32+
print(f"Message triggering vision capabilities: {user_msg}")
33+
context = AssistantContext.get_current()
34+
context.store_metadata("user_msg", user_msg)
35+
36+
37+
async def get_video_track(room: rtc.Room):
38+
"""Get the first video track from the room. We'll use this track to process images."""
39+
40+
video_track = asyncio.Future[rtc.RemoteVideoTrack]()
41+
42+
for _, participant in room.participants.items():
43+
for _, track_publication in participant.tracks.items():
44+
if track_publication.track is not None and isinstance(
45+
track_publication.track, rtc.RemoteVideoTrack
46+
):
47+
video_track.set_result(track_publication.track)
48+
print(f"Using video track {track_publication.track.sid}")
49+
break
50+
51+
return await video_track
52+
53+
54+
async def entrypoint(ctx: JobContext):
55+
print(f"Room name: {ctx.room.name}")
56+
57+
chat_context = ChatContext(
58+
messages=[
59+
ChatMessage(
60+
role=ChatRole.SYSTEM,
61+
text=(
62+
"Your name is Alloy. You are a funny, witty bot. Your interface with users will be voice and vision."
63+
"Respond with short and concise answers. Avoid using unpronouncable punctuation or emojis."
64+
),
65+
)
66+
]
67+
)
68+
69+
gpt = openai.LLM(model="gpt-4o")
70+
71+
# Since OpenAI does not support streaming TTS, we'll use it with a StreamAdapter
72+
# to make it compatible with the VoiceAssistant
73+
openai_tts = tts.StreamAdapter(
74+
tts=openai.TTS(voice="alloy"),
75+
sentence_tokenizer=tokenize.basic.SentenceTokenizer(),
76+
)
77+
78+
latest_image: rtc.VideoFrame | None = None
79+
80+
assistant = VoiceAssistant(
81+
vad=silero.VAD(), # We'll use Silero's Voice Activity Detector (VAD)
82+
stt=deepgram.STT(), # We'll use Deepgram's Speech To Text (STT)
83+
llm=gpt,
84+
tts=openai_tts, # We'll use OpenAI's Text To Speech (TTS)
85+
fnc_ctx=AssistantFunction(),
86+
chat_ctx=chat_context,
87+
)
88+
89+
chat = rtc.ChatManager(ctx.room)
90+
91+
async def _answer(text: str, use_image: bool = False):
92+
"""
93+
Answer the user's message with the given text and optionally the latest
94+
image captured from the video track.
95+
"""
96+
args = {}
97+
if use_image and latest_image:
98+
args["images"] = [ChatImage(image=latest_image)]
99+
100+
chat_context.messages.append(ChatMessage(role=ChatRole.USER, text=text, **args))
101+
102+
stream = await gpt.chat(chat_context)
103+
await assistant.say(stream, allow_interruptions=True)
104+
105+
await assistant.say(stream)
106+
107+
@chat.on("message_received")
108+
def on_message_received(msg: rtc.ChatMessage):
109+
"""This event triggers whenever we get a new message from the user."""
110+
111+
if msg.message:
112+
asyncio.create_task(_answer(msg.message, use_image=False))
113+
114+
@assistant.on("function_calls_finished")
115+
def on_function_calls_finished(ctx: AssistantContext):
116+
"""This event triggers when an assistant's function call completes."""
117+
118+
user_msg = ctx.get_metadata("user_msg")
119+
if user_msg:
120+
asyncio.create_task(_answer(user_msg, use_image=True))
121+
122+
assistant.start(ctx.room)
123+
124+
await asyncio.sleep(1)
125+
await assistant.say("Hi there! How can I help?", allow_interruptions=True)
126+
127+
while ctx.room.connection_state == rtc.ConnectionState.CONN_CONNECTED:
128+
video_track = await get_video_track(ctx.room)
129+
130+
async for event in rtc.VideoStream(video_track):
131+
# We'll continually grab the latest image from the video track
132+
# and store it in a variable.
133+
latest_image = event.frame
134+
135+
136+
async def request_fnc(req: JobRequest) -> None:
137+
await req.accept(entrypoint)
138+
139+
140+
if __name__ == "__main__":
141+
cli.run_app(WorkerOptions(request_fnc))

requirements.txt

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
python-dotenv
2+
livekit-agents
3+
livekit-plugins-deepgram
4+
livekit-plugins-openai
5+
livekit-plugins-elevenlabs
6+
livekit-plugins-silero

sample.py

Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
import asyncio
2+
import copy
3+
import logging
4+
from collections import deque
5+
from typing import Annotated, List
6+
7+
from livekit import agents, rtc
8+
from livekit.agents import JobContext, JobRequest, WorkerOptions, cli, tokenize, tts
9+
from livekit.agents.llm import (
10+
ChatContext,
11+
ChatMessage,
12+
ChatRole,
13+
)
14+
from livekit.agents.voice_assistant import AssistantContext, VoiceAssistant
15+
from livekit.plugins import deepgram, openai, silero
16+
17+
MAX_IMAGES = 3
18+
NO_IMAGE_MESSAGE_GENERIC = (
19+
"I'm sorry, I don't have an image to process. Are you publishing your video?"
20+
)
21+
22+
23+
class AssistantFnc(agents.llm.FunctionContext):
24+
@agents.llm.ai_callable(
25+
desc="Called when asked to evaluate something that would require vision capabilities."
26+
)
27+
async def image(
28+
self,
29+
user_msg: Annotated[
30+
str,
31+
agents.llm.TypeInfo(desc="The user message that triggered this function"),
32+
],
33+
):
34+
ctx = AssistantContext.get_current()
35+
ctx.store_metadata("user_msg", user_msg)
36+
37+
38+
async def get_human_video_track(room: rtc.Room):
39+
track_future = asyncio.Future[rtc.RemoteVideoTrack]()
40+
41+
def on_sub(track: rtc.Track, *_):
42+
if isinstance(track, rtc.RemoteVideoTrack):
43+
track_future.set_result(track)
44+
45+
room.on("track_subscribed", on_sub)
46+
47+
remote_video_tracks: List[rtc.RemoteVideoTrack] = []
48+
for _, p in room.participants.items():
49+
for _, t_pub in p.tracks.items():
50+
if t_pub.track is not None and isinstance(
51+
t_pub.track, rtc.RemoteVideoTrack
52+
):
53+
remote_video_tracks.append(t_pub.track)
54+
55+
if len(remote_video_tracks) > 0:
56+
track_future.set_result(remote_video_tracks[0])
57+
58+
video_track = await track_future
59+
room.off("track_subscribed", on_sub)
60+
return video_track
61+
62+
63+
async def entrypoint(ctx: JobContext):
64+
sip = ctx.room.name.startswith("sip")
65+
initial_ctx = ChatContext(
66+
messages=[
67+
ChatMessage(
68+
role=ChatRole.SYSTEM,
69+
text=(
70+
"You are a funny bot created by LiveKit. Your interface with users will be voice. "
71+
"You should use short and concise responses, and avoiding usage of unpronouncable punctuation."
72+
),
73+
)
74+
]
75+
)
76+
77+
gpt = openai.LLM(
78+
model="gpt-4o",
79+
)
80+
81+
# Since OpenAI does not support streaming TTS, we'll use it with a StreamAdapter
82+
# to make it compatible with the VoiceAssistant
83+
openai_tts = tts.StreamAdapter(
84+
tts=openai.TTS(voice="alloy"),
85+
sentence_tokenizer=tokenize.basic.SentenceTokenizer(),
86+
)
87+
88+
latest_image: rtc.VideoFrame | None = None
89+
img_msg_queue: deque[agents.llm.ChatMessage] = deque()
90+
assistant = VoiceAssistant(
91+
vad=silero.VAD(),
92+
stt=deepgram.STT(),
93+
llm=gpt,
94+
tts=openai_tts,
95+
fnc_ctx=None if sip else AssistantFnc(),
96+
chat_ctx=initial_ctx,
97+
)
98+
99+
chat = rtc.ChatManager(ctx.room)
100+
101+
async def _answer_from_text(text: str):
102+
chat_ctx = copy.deepcopy(assistant.chat_context)
103+
chat_ctx.messages.append(ChatMessage(role=ChatRole.USER, text=text))
104+
105+
stream = await gpt.chat(chat_ctx)
106+
await assistant.say(stream)
107+
108+
@chat.on("message_received")
109+
def on_chat_received(msg: rtc.ChatMessage):
110+
if not msg.message:
111+
return
112+
113+
asyncio.create_task(_answer_from_text(msg.message))
114+
115+
async def respond_to_image(user_msg: str):
116+
nonlocal latest_image, img_msg_queue, initial_ctx
117+
if not latest_image:
118+
await assistant.say(NO_IMAGE_MESSAGE_GENERIC)
119+
return
120+
121+
initial_ctx.messages.append(
122+
agents.llm.ChatMessage(
123+
role=agents.llm.ChatRole.USER,
124+
text=user_msg,
125+
images=[agents.llm.ChatImage(image=latest_image)],
126+
)
127+
)
128+
img_msg_queue.append(initial_ctx.messages[-1])
129+
if len(img_msg_queue) >= MAX_IMAGES:
130+
msg = img_msg_queue.popleft()
131+
msg.images = []
132+
133+
stream = await gpt.chat(initial_ctx)
134+
await assistant.say(stream, allow_interruptions=True)
135+
136+
@assistant.on("function_calls_finished")
137+
def _function_calls_done(ctx: AssistantContext):
138+
user_msg = ctx.get_metadata("user_msg")
139+
if not user_msg:
140+
return
141+
asyncio.ensure_future(respond_to_image(user_msg))
142+
143+
assistant.start(ctx.room)
144+
145+
await asyncio.sleep(0.5)
146+
await assistant.say("Hey, how can I help you today?", allow_interruptions=True)
147+
while ctx.room.connection_state == rtc.ConnectionState.CONN_CONNECTED:
148+
video_track = await get_human_video_track(ctx.room)
149+
async for event in rtc.VideoStream(video_track):
150+
latest_image = event.frame
151+
152+
153+
async def request_fnc(req: JobRequest) -> None:
154+
logging.info("received request %s", req)
155+
await req.accept(entrypoint)
156+
157+
158+
if __name__ == "__main__":
159+
cli.run_app(WorkerOptions(request_fnc))

0 commit comments

Comments
 (0)