Skip to content
69 changes: 69 additions & 0 deletions genai/live/live_code_exec_with_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio


async def generate_content() -> list[str]:
# [START googlegenaisdk_live_code_exec_with_txt]
from google import genai
from google.genai.types import (
LiveConnectConfig,
Modality,
Tool,
ToolCodeExecution,
Content,
Part,
)

client = genai.Client()
# model_id = "gemini-live-2.5-flash" #todo
GEMINI_MODEL_NAME = "gemini-2.0-flash-live-preview-04-09"
config = LiveConnectConfig(
response_modalities=[Modality.TEXT],
tools=[Tool(code_execution=ToolCodeExecution())],
)
async with client.aio.live.connect(model=model_id, config=config) as session:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
async with client.aio.live.connect(model=model_id, config=config) as session:
async with client.aio.live.connect(model=GEMINI_MODEL_NAME, config=config) as session:

text_input = "Compute the largest prime palindrome under 10"
print("> ", text_input, "\n")
await session.send_client_content(
turns=Content(role="user", parts=[Part(text=text_input)])
)

response = []

async for chunk in session.receive():
if chunk.server_content:
if chunk.text is not None:
response.append(chunk.text)

model_turn = chunk.server_content.model_turn
if model_turn:
for part in model_turn.parts:
if part.executable_code is not None:
print(part.executable_code.code)

if part.code_execution_result is not None:
print(part.code_execution_result.output)

print("".join(response))
# Example output:
# > Compute the largest prime palindrome under 10
# Final Answer: The final answer is $\boxed{7}$
# [END googlegenaisdk_live_code_exec_with_txt]
return response


if __name__ == "__main__":
asyncio.run(generate_content())
72 changes: 72 additions & 0 deletions genai/live/live_func_call_with_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio


async def generate_content() -> list[str]:
# [START googlegenaisdk_live_func_call_with_txt]
from google import genai
from google.genai.types import (
LiveConnectConfig,
Modality,
Tool,
FunctionDeclaration,
FunctionResponse,
)

client = genai.Client()
# model = "gemini-live-2.5-flash"
model_id = "gemini-2.0-flash-live-preview-04-09"

turn_on_the_lights = FunctionDeclaration(name="turn_on_the_lights")
turn_off_the_lights = FunctionDeclaration(name="turn_off_the_lights")

config = LiveConnectConfig(
response_modalities=[Modality.TEXT],
tools=[Tool(function_declarations=[turn_on_the_lights, turn_off_the_lights])],
)
async with client.aio.live.connect(model=model_id, config=config) as session:
text_input = "Turn on the lights please"
print("> ", text_input, "\n")
await session.send_client_content(turns={"parts": [{"text": text_input}]})

async for chunk in session.receive():
if chunk.server_content:
if chunk.text is not None:
print(chunk.text)

elif chunk.tool_call:
function_responses = []
for fc in chunk.tool_call.function_calls:
function_response = FunctionResponse(
name=fc.name,
response={
"result": "ok"
}, # simple, hard-coded function response
)
function_responses.append(function_response)
print(function_response.response["result"])

await session.send_tool_response(function_responses=function_responses)

# Example output:
# > Turn on the lights please
# ok
# [END googlegenaisdk_live_func_call_with_txt]
return function_responses


if __name__ == "__main__":
asyncio.run(generate_content())
70 changes: 70 additions & 0 deletions genai/live/live_ground_googsearch_with_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


import asyncio


async def generate_content() -> list[str]:
# [START googlegenaisdk_live_ground_googsearch_with_txt]
from google import genai
from google.genai.types import (
LiveConnectConfig,
Modality,
Tool,
GoogleSearch,
Content,
Part,
)

client = genai.Client()
# model = "gemini-live-2.5-flash" #todo
model_id = "gemini-2.0-flash-live-preview-04-09"
config = LiveConnectConfig(
response_modalities=[Modality.TEXT],
tools=[Tool(google_search=GoogleSearch())],
)
async with client.aio.live.connect(model=model_id, config=config) as session:
text_input = "When did the last Brazil vs. Argentina soccer match happen?"
await session.send_client_content(
turns=Content(role="user", parts=[Part(text=text_input)])
)

response = []

async for chunk in session.receive():
if chunk.server_content:
if chunk.text is not None:
response.append(chunk.text)

# The model might generate and execute Python code to use Search
model_turn = chunk.server_content.model_turn
if model_turn:
for part in model_turn.parts:
if part.executable_code is not None:
print(part.executable_code.code)

if part.code_execution_result is not None:
print(part.code_execution_result.output)

print("".join(response))
# Example output:
# > When did the last Brazil vs. Argentina soccer match happen?
# The last Brazil vs. Argentina soccer match was on March 25, 2025, a 2026 World Cup qualifier, where Argentina defeated Brazil 4-1.
# [END googlegenaisdk_live_ground_googsearch_with_txt]
return response


if __name__ == "__main__":
asyncio.run(generate_content())
73 changes: 73 additions & 0 deletions genai/live/live_ground_ragengine_with_txt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio


async def generate_content(memory_corpus: str) -> list[str]:
# [START googlegenaisdk_live_ground_ragengine_with_txt]
from google import genai
from google.genai.types import (
Content,
LiveConnectConfig,
Modality,
Part,
Tool,
Retrieval,
VertexRagStore,
VertexRagStoreRagResource,
)

client = genai.Client()
# model_id = "gemini-live-2.5-flash"
model_id = "gemini-2.0-flash-live-preview-04-09"

rag_store = VertexRagStore(
rag_resources=[
VertexRagStoreRagResource(
rag_corpus=memory_corpus # Use memory corpus if you want to store context.
)
],
# Set `store_context` to true to allow Live API sink context into your memory corpus.
store_context=True,
)
config = LiveConnectConfig(
response_modalities=[Modality.TEXT],
tools=[Tool(retrieval=Retrieval(vertex_rag_store=rag_store))],
)

async with client.aio.live.connect(model=model_id, config=config) as session:
text_input = "What year did Mariusz Pudzianowski win World's Strongest Man?"
print("> ", text_input, "\n")

await session.send_client_content(
turns=Content(role="user", parts=[Part(text=text_input)])
)

response = []

async for message in session.receive():
if message.text:
response.append(message.text)
continue

print("".join(response))
# Example output:
# > What year did Mariusz Pudzianowski win World's Strongest Man?
# Mariusz Pudzianowski won World's Strongest Man in 2002, 2003, 2005, 2007, and 2008.
# [END googlegenaisdk_live_ground_ragengine_with_txt]
return response


if __name__ == "__main__":
asyncio.run(generate_content("memory_corpus"))
28 changes: 17 additions & 11 deletions genai/live/live_websocket_audiogen_with_txt.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ def get_bearer_token() -> str:
import google.auth
from google.auth.transport.requests import Request

creds, _ = google.auth.default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
creds, _ = google.auth.default(
scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
auth_req = Request()
creds.refresh(auth_req)
bearer_token = creds.token
Expand Down Expand Up @@ -55,9 +57,7 @@ async def generate_content() -> str:

# Websocket Configuration
WEBSOCKET_HOST = "us-central1-aiplatform.googleapis.com"
WEBSOCKET_SERVICE_URL = (
f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent"
)
WEBSOCKET_SERVICE_URL = f"wss://{WEBSOCKET_HOST}/ws/google.cloud.aiplatform.v1.LlmBidiService/BidiGenerateContent"

# Websocket Authentication
headers = {
Expand All @@ -66,9 +66,7 @@ async def generate_content() -> str:
}

# Model Configuration
model_path = (
f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}"
)
model_path = f"projects/{PROJECT_ID}/locations/{LOCATION}/publishers/google/models/{GEMINI_MODEL_NAME}"
model_generation_config = {
"response_modalities": ["AUDIO"],
"speech_config": {
Expand All @@ -77,7 +75,9 @@ async def generate_content() -> str:
},
}

async with connect(WEBSOCKET_SERVICE_URL, additional_headers=headers) as websocket_session:
async with connect(
WEBSOCKET_SERVICE_URL, additional_headers=headers
) as websocket_session:
# 1. Send setup configuration
websocket_config = {
"setup": {
Expand Down Expand Up @@ -120,7 +120,9 @@ async def generate_content() -> str:
server_content = response_chunk.get("serverContent")
if not server_content:
# This might indicate an error or an unexpected message format
print(f"Received non-serverContent message or empty content: {response_chunk}")
print(
f"Received non-serverContent message or empty content: {response_chunk}"
)
break

# Collect audio chunks
Expand All @@ -129,15 +131,19 @@ async def generate_content() -> str:
for part in model_turn["parts"]:
if part["inlineData"]["mimeType"] == "audio/pcm":
audio_chunk = base64.b64decode(part["inlineData"]["data"])
aggregated_response_parts.append(np.frombuffer(audio_chunk, dtype=np.int16))
aggregated_response_parts.append(
np.frombuffer(audio_chunk, dtype=np.int16)
)

# End of response
if server_content.get("turnComplete"):
break

# Save audio to a file
if aggregated_response_parts:
wavfile.write("output.wav", 24000, np.concatenate(aggregated_response_parts))
wavfile.write(
"output.wav", 24000, np.concatenate(aggregated_response_parts)
)
# Example response:
# Setup Response: {'setupComplete': {}}
# Input: Hello? Gemini are you there?
Expand Down
Loading