|
# /// script |
|
# requires-python = ">=3.9" |
|
# dependencies = [ |
|
# "streamlit", |
|
# "google-genai", |
|
# "pandas" |
|
# ] |
|
# /// |
|
|
|
import streamlit as st |
|
import time |
|
import tempfile |
|
import os |
|
import pandas as pd |
|
from google import genai |
|
from google.genai import types |
|
|
|
# --- Configuration --- |
|
MODEL_ID = "gemini-3-flash-preview" |
|
|
|
st.set_page_config( |
|
page_title="Gemini File Search Manager", |
|
page_icon="ποΈ", |
|
layout="wide" |
|
) |
|
|
|
# --- Session State Initialization --- |
|
if "messages" not in st.session_state: |
|
st.session_state.messages = [] |
|
if "active_store_id" not in st.session_state: |
|
st.session_state.active_store_id = None |
|
if "active_store_name" not in st.session_state: |
|
st.session_state.active_store_name = None |
|
|
|
# --- Helper Functions --- |
|
|
|
def get_client(): |
|
"""Initialize the GenAI client using the API key from sidebar.""" |
|
api_key = st.session_state.get("api_key") |
|
if not api_key: |
|
return None |
|
return genai.Client(api_key=api_key) |
|
|
|
def list_stores(client): |
|
"""Fetches list of existing File Search Stores.""" |
|
try: |
|
stores = [] |
|
# The list method returns an iterable |
|
for store in client.file_search_stores.list(): |
|
stores.append({ |
|
"display_name": store.display_name, |
|
"name": store.name, # This is the ID (resources/...) |
|
"create_time": store.create_time, |
|
}) |
|
return stores |
|
except Exception as e: |
|
st.error(f"Failed to list stores: {e}") |
|
return [] |
|
|
|
def create_store(client, display_name): |
|
"""Creates a new empty File Search Store.""" |
|
try: |
|
store = client.file_search_stores.create( |
|
config={'display_name': display_name} |
|
) |
|
return store |
|
except Exception as e: |
|
st.error(f"Failed to create store: {e}") |
|
return None |
|
|
|
def delete_store(client, store_name): |
|
"""Deletes a File Search Store.""" |
|
try: |
|
client.file_search_stores.delete(name=store_name, config={'force': True}) |
|
return True |
|
except Exception as e: |
|
st.error(f"Failed to delete store: {e}") |
|
return False |
|
|
|
def upload_files_to_store(client, store_name, uploaded_files): |
|
"""Uploads and indexes files into the specific store.""" |
|
status_msg = st.empty() |
|
progress_bar = st.progress(0) |
|
total = len(uploaded_files) |
|
|
|
for idx, up_file in enumerate(uploaded_files): |
|
# Write to temp file because SDK expects path |
|
with tempfile.NamedTemporaryFile(delete=False, suffix=f"_{up_file.name}") as tmp: |
|
tmp.write(up_file.getvalue()) |
|
tmp_path = tmp.name |
|
|
|
try: |
|
status_msg.info(f"Uploading and indexing: {up_file.name}...") |
|
|
|
# Combine Upload + Import |
|
operation = client.file_search_stores.upload_to_file_search_store( |
|
file=tmp_path, |
|
file_search_store_name=store_name, |
|
config={ |
|
'display_name': up_file.name, |
|
# Optional: Explicit chunking config |
|
'chunking_config': { |
|
'white_space_config': { |
|
'max_tokens_per_chunk': 500, |
|
'max_overlap_tokens': 50 |
|
} |
|
} |
|
} |
|
) |
|
|
|
# Poll for completion |
|
while not operation.done: |
|
time.sleep(1) |
|
operation = client.operations.get(operation) |
|
|
|
except Exception as e: |
|
st.error(f"Error processing {up_file.name}: {e}") |
|
finally: |
|
os.remove(tmp_path) |
|
progress_bar.progress((idx + 1) / total) |
|
|
|
status_msg.success("All files processed!") |
|
time.sleep(1) |
|
status_msg.empty() |
|
progress_bar.empty() |
|
|
|
# --- Sidebar --- |
|
with st.sidebar: |
|
st.title("βοΈ Setup") |
|
st.session_state.api_key = st.text_input("Gemini API Key", type="password") |
|
|
|
st.divider() |
|
|
|
if st.session_state.active_store_name: |
|
st.success(f"**Active Store:**\n{st.session_state.active_store_name}") |
|
if st.button("Unselect Store"): |
|
st.session_state.active_store_id = None |
|
st.session_state.active_store_name = None |
|
st.rerun() |
|
else: |
|
st.warning("No store selected. Go to 'Store Management' tab.") |
|
|
|
# --- Main Layout --- |
|
client = get_client() |
|
|
|
if not client: |
|
st.info("π Please enter your Google Gemini API Key in the sidebar to continue.") |
|
st.stop() |
|
|
|
tab1, tab2 = st.tabs(["π¬ Chat & Upload", "ποΈ Store Management"]) |
|
|
|
# ========================================== |
|
# TAB 1: Chat and Upload |
|
# ========================================== |
|
with tab1: |
|
col_upload, col_chat = st.columns([1, 2]) |
|
|
|
# Left Column: Upload to Active Store |
|
with col_upload: |
|
st.subheader("π€ Add Documents") |
|
if not st.session_state.active_store_id: |
|
st.info("Please select or create a store in the 'Store Management' tab first.") |
|
else: |
|
uploaded_files = st.file_uploader( |
|
"Upload files to active store", |
|
accept_multiple_files=True |
|
) |
|
if uploaded_files and st.button("Process Files", type="primary"): |
|
upload_files_to_store(client, st.session_state.active_store_id, uploaded_files) |
|
|
|
# Right Column: Chat Interface |
|
with col_chat: |
|
st.subheader("π¬ RAG Chat") |
|
|
|
# Display History |
|
for msg in st.session_state.messages: |
|
with st.chat_message(msg["role"]): |
|
st.markdown(msg["content"]) |
|
if "citations" in msg and msg["citations"]: |
|
with st.expander("π Citations"): |
|
for c in msg["citations"]: |
|
st.markdown(f"- {c}") |
|
|
|
# Input |
|
prompt = st.chat_input("Ask about your documents...") |
|
if prompt: |
|
st.session_state.messages.append({"role": "user", "content": prompt}) |
|
with st.chat_message("user"): |
|
st.markdown(prompt) |
|
|
|
with st.chat_message("assistant"): |
|
message_placeholder = st.empty() |
|
|
|
if not st.session_state.active_store_id: |
|
response_text = "β οΈ Please select a File Search Store to query." |
|
citations = [] |
|
else: |
|
try: |
|
with st.spinner("Searching..."): |
|
response = client.models.generate_content( |
|
model=MODEL_ID, |
|
contents=prompt, |
|
config=types.GenerateContentConfig( |
|
tools=[ |
|
types.Tool( |
|
file_search=types.FileSearch( |
|
file_search_store_names=[st.session_state.active_store_id] |
|
) |
|
) |
|
] |
|
) |
|
) |
|
response_text = response.text |
|
|
|
# --- Citations Fix --- |
|
citations = [] |
|
if response.candidates and response.candidates[0].grounding_metadata: |
|
gm = response.candidates[0].grounding_metadata |
|
if gm.grounding_chunks: |
|
for chunk in gm.grounding_chunks: |
|
# The correct attribute is 'retrieved_context' |
|
if hasattr(chunk, 'retrieved_context') and chunk.retrieved_context: |
|
title = chunk.retrieved_context.title |
|
# If title is None/Empty, try using the URI or a default |
|
if not title: |
|
title = chunk.retrieved_context.uri or "Unknown Source" |
|
citations.append(title) |
|
|
|
citations = list(set(citations)) # Deduplicate |
|
|
|
except Exception as e: |
|
response_text = f"Error: {str(e)}" |
|
citations = [] |
|
|
|
message_placeholder.markdown(response_text) |
|
if citations: |
|
with st.expander("π Citations"): |
|
for c in citations: |
|
st.markdown(f"- {c}") |
|
|
|
st.session_state.messages.append({ |
|
"role": "assistant", |
|
"content": response_text, |
|
"citations": citations |
|
}) |
|
|
|
# ========================================== |
|
# TAB 2: Store Management |
|
# ========================================== |
|
with tab2: |
|
st.header("Manage File Search Stores") |
|
st.markdown(""" |
|
File Search Stores persist your data. You can create different stores for different projects |
|
(e.g., "Finance Docs", "Technical Manuals"). |
|
""") |
|
|
|
# 1. Create New Store |
|
with st.expander("β Create New Store"): |
|
with st.form("create_store_form"): |
|
new_store_name = st.text_input("Display Name (e.g., 'Project Alpha')") |
|
submitted = st.form_submit_button("Create") |
|
if submitted and new_store_name: |
|
with st.spinner("Creating..."): |
|
store = create_store(client, new_store_name) |
|
if store: |
|
st.success(f"Created store: {store.name}") |
|
time.sleep(1) |
|
st.rerun() |
|
|
|
# 2. List Existing Stores |
|
st.subheader("Existing Stores") |
|
stores = list_stores(client) |
|
|
|
if not stores: |
|
st.info("No stores found.") |
|
else: |
|
# Convert to DataFrame for nicer display |
|
df = pd.DataFrame(stores) |
|
|
|
for index, row in df.iterrows(): |
|
col1, col2, col3, col4 = st.columns([3, 2, 1, 1]) |
|
|
|
with col1: |
|
st.markdown(f"**{row['display_name']}**") |
|
st.caption(f"ID: `{row['name']}`") |
|
|
|
with col2: |
|
st.text(f"Created: {row['create_time']}") |
|
|
|
with col3: |
|
# Select Button |
|
if st.button("Select", key=f"sel_{index}"): |
|
st.session_state.active_store_id = row['name'] |
|
st.session_state.active_store_name = row['display_name'] |
|
st.session_state.messages = [] # Clear chat on switch |
|
st.rerun() |
|
|
|
with col4: |
|
# Delete Button |
|
if st.button("ποΈ Delete", key=f"del_{index}", type="primary"): |
|
with st.spinner("Deleting..."): |
|
if delete_store(client, row['name']): |
|
if st.session_state.active_store_id == row['name']: |
|
st.session_state.active_store_id = None |
|
st.session_state.active_store_name = None |
|
st.success("Deleted.") |
|
time.sleep(1) |
|
st.rerun() |
|
st.divider() |