Skip to content

Instantly share code, notes, and snippets.

@lemassykoi
Last active July 13, 2024 19:49
Show Gist options
  • Save lemassykoi/afd5ac3329cc6e08fd9351a30616af9a to your computer and use it in GitHub Desktop.
Save lemassykoi/afd5ac3329cc6e08fd9351a30616af9a to your computer and use it in GitHub Desktop.
Ollama with custom Tools and Domoticz Interaction, on Telegram
import logging
import time
import asyncio
import aiohttp
import requests
import json
from langchain_experimental.llms.ollama_functions import OllamaFunctions
from langchain_core.messages import AIMessage, HumanMessage
from aiogram import Bot, Dispatcher, F
from aiogram.enums import ParseMode
from aiogram.filters.command import Command, CommandStart
from aiogram.types import Message, CallbackQuery, BotCommand, InlineKeyboardButton
from aiogram.utils.keyboard import InlineKeyboardBuilder
from asyncio import Lock
from colorama import Back, Style, Fore
from langchain_core.chat_history import BaseChatMessageHistory
from langchain_community.chat_message_histories import ChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain_community.document_loaders import TextLoader
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_chroma import Chroma
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.agent_toolkits.load_tools import load_tools
from langchain.agents.agent_toolkits import create_retriever_tool
from langchain.memory import ConversationBufferMemory
from langchain.agents import AgentExecutor, create_structured_chat_agent
from typing import TypedDict, Annotated, Sequence
from langchain_core.messages import BaseMessage
from langgraph.graph import add_messages
from langchain.tools.render import render_text_description
from langchain_core.tools import tool
from langchain import hub
import whisper
from datetime import datetime
from functools import wraps
from pydub import AudioSegment
from deep_translator import single_detection, GoogleTranslator
import PyPDF2
import unicodedata
import re
import os
os.environ["SERPER_API_KEY"] = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
os.environ["ANONYMIZED_TELEMETRY"] = 'False'
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], add_messages]
class contextLock:
lock = Lock()
async def __aenter__(self):
await self.lock.acquire()
async def __aexit__(self, exc_type, exc_value, exc_traceback):
self.lock.release()
# Initialize Ollama with the desired model
current_model_name = "gemma2" # num_ctx = 4096 for Gemma2
#current_model_name = "llama3-gradient:8b-instruct-1048k-q8_0" # num_ctx = 1024*14 // maximum num_ctx = 256000
#current_model_name = "mistral:7b-instruct-v0.3-q8_0" # num_ctx = 1024*8 // tools ok
#current_model_name = "llama3:8b-instruct-q8_0" # tool call ok // num_ctx = 1024*8
idx = ''
domoticz_user = 'user'
domoticz_pass = 'xxxxxxxxxxxxxxx!'
domoticz_ip = '192.168.10.120'
domoticz_port = '80'
domoticz_plan_name = 'LLM' # In Domoticz, create a Room Plan and assign devices to it
url_get_devices = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=getdevices'
url_get_lights = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=getdevices&filter=lights&used=true&order=Name'
url_get_status = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=getversion'
url_switch_on = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=switchlight&idx={idx}&switchcmd=On'
url_switch_off = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=switchlight&idx={idx}&switchcmd=Off'
url_switch_toggle = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=switchlight&idx={idx}&switchcmd=Toggle'
url_get_plans = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=getplans&order=name&used=true'
LANGDETECT_API_KEY = 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
SERPER_API_KEY = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
GOOGLE_API_KEY = 'xxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxxxxxxx'
GOOGLE_CSE_ID = 'xxxxxxxxxxxxxxxxx'
TAVILY_API_KEY = 'tvly-xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'
token = "1234567890:xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
ollama_base_url = 'http://127.0.0.1:11434'
current_embed_model = "snowflake-arctic-embed:latest" # 1024
#current_embed_model = "nomic-embed-text:latest" # 8192
#current_embed_model = "mxbai-embed-large:latest" # 512
embed_ctx = 1024
file_path = 'vault.txt' # Initial file to embed
download_folder = 'downloads'
llm_func = OllamaFunctions(model=current_model_name, base_url=ollama_base_url, format="json", num_ctx=1024*8, temperature=0.6, verbose=True, debug=True)
embedding_function = OllamaEmbeddings(base_url=ollama_base_url, model=current_embed_model, num_ctx=embed_ctx, show_progress=True,)
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s \t (%(filename)s:%(lineno)d)"
store = {}
ACTIVE_CHATS = {}
ACTIVE_CHATS_LOCK = contextLock()
my_filename = "temp.txt" # Temp File used to embed files sent by telegram
current_chat_language = 'EN'
chat_languages = ['EN', 'FR']
chat_history_EN = []
chat_history_FR = []
mention = None
CHAT_TYPE_GROUP = "group"
CHAT_TYPE_SUPERGROUP = "supergroup"
allowed_ids = list(map(int, "xxxxxxxxx,xxxxxxxxx,xxxxxxxxx".split(",")))
admin_ids = list(map(int, "xxxxxxxxx"))
bot = Bot(token)
dp = Dispatcher(name='TG_dispatcher')
commands = [
BotCommand(command="start", description="Start"),
BotCommand(command="help", description="Get Help about Bot Usage"),
BotCommand(command="llm", description="Change the current LLM used"),
BotCommand(command="lang", description="Change the tchat language"),
BotCommand(command="reset", description="Reset the current chat"),
]
logging.basicConfig(
format=log_format,
level='INFO'
)
logging.getLogger("langchain_community.retrievers.web_research").setLevel(logging.ERROR)
logger = logging.getLogger(__name__)
def perms_allowed(func):
@wraps(func)
async def wrapper(message: Message = None, query: CallbackQuery = None):
user_id = message.from_user.id if message else query.from_user.id
if user_id in admin_ids or user_id in allowed_ids:
if message:
return await func(message)
elif query:
return await func(query=query)
else:
if message:
if message and message.chat.type in ["supergroup", "group"]:
return
await message.answer("Access Denied")
elif query:
if message and message.chat.type in ["supergroup", "group"]:
return
await query.answer("Access Denied")
return wrapper
def perms_admins(func):
@wraps(func)
async def wrapper(message: Message = None, query: CallbackQuery = None):
user_id = message.from_user.id if message else query.from_user.id
if user_id in admin_ids:
if message:
return await func(message)
elif query:
return await func(query=query)
else:
if message:
if message and message.chat.type in ["supergroup", "group"]:
return
await message.answer("Access Denied")
logger.info(f"[MSG] {message.from_user.full_name} ({message.from_user.id}) is not allowed to use this bot.")
elif query:
if message and message.chat.type in ["supergroup", "group"]:
return
await query.answer("Access Denied")
logger.info(f"[QUERY] {message.from_user.full_name} ({message.from_user.id}) is not allowed to use this bot.")
return wrapper
def translate_to_language(text: str, lang: str) -> str:
"""With some text and a language code as input, translate the text to the desired language. Returns a string with text translated."""
translated = GoogleTranslator(source='auto', target=lang.lower()).translate(text=text)
return translated
def determine_language(string: str) -> str:
"""With a string as an input, determine the language of the string. Returns a string with language code."""
lang = single_detection(string, api_key=LANGDETECT_API_KEY).upper()
logger.info(f'Language detected : {lang}')
return lang
async def get_bot_info():
global mention
if mention is None:
get = await bot.get_me()
mention = f"@{get.username}"
return mention
async def model_list():
async with aiohttp.ClientSession() as session:
url = f"{ollama_base_url}/api/tags"
async with session.get(url) as response:
if response.status == 200:
data = await response.json()
return data["models"]
else:
return []
def is_mentioned_in_group_or_supergroup(message):
return message.chat.type in [CHAT_TYPE_GROUP, CHAT_TYPE_SUPERGROUP] and (
(message.text is not None and message.text.startswith(mention))
or (message.caption is not None and message.caption.startswith(mention))
)
@dp.message(CommandStart())
@perms_allowed
async def command_start_handler(message: Message) -> None:
logger.info(Back.RED + f'START Asked for {message.from_user.full_name}' + Style.RESET_ALL)
session_id = f"{message.chat.id}"
start_message_FR = f"Bonjour {message.from_user.full_name}, bienvenue sur ce Tchat !\n\
Mon nom est Yabo.\n\
Je ferais mon maximum pour vous aider. Demandez-moi ce que vous voulez !\n\n\
Langue Actuelle : <code>{current_chat_language}</code>\n\
LLM Actuel : <code>{current_model_name}</code>\n\
Pour changer de modèle : /llm\n\
Pour changer de langue : /lang"
start_message_EN = f"Hi {message.from_user.full_name}, welcome to this Chat!\n\
My name is Yabo.\n\
I will do my best to help you. Ask me anything!\n\n\
Current language : <code>{current_chat_language}</code>\n\
Current LLM : <code>{current_model_name}</code>\n\
To change current model : /llm\n\
To change current language : /lang"
user_EN = f"Hi! My name is {message.from_user.full_name}."
user_FR = f"Salut! Mon nom est {message.from_user.full_name}."
if current_chat_language == 'FR':
start_message = start_message_FR
get_session_history(f"{session_id}").add_user_message(HumanMessage(content=user_FR))
get_session_history(f"{session_id}").add_ai_message(AIMessage(start_message_FR))
else:
start_message = start_message_EN
get_session_history(f"{session_id}").add_user_message(HumanMessage(content=user_EN))
get_session_history(f"{session_id}").add_ai_message(AIMessage(start_message_EN))
await message.answer(
text = start_message,
parse_mode = ParseMode.HTML,
disable_web_page_preview = True,
)
@dp.message(Command("help"))
async def command_help_handler(message: Message) -> None:
logger.info(Back.RED + f'HELP Asked for {message.from_user.full_name}' + Style.RESET_ALL)
help_message_FR = "Je suis un Agent IA, disponible pour répondre à vos questions. \
J'ai accès à un moteur de recherche web, et je peux répondre en fonction du contexte trouvé. \
Je suis aussi capable de conserver l'historique de la conversation."
help_message_EN = "I am an AI Agent, available to answer your questions. \
I have access to a web search engine, and I can respond based on the context found.\n \
I'm also able to keep an history for this chat."
session_id = f"{message.chat.id}"
if current_chat_language == 'FR':
help_message = help_message_FR
get_session_history(f"{session_id}").add_user_message(HumanMessage(content="A l'aide! Que sais-tu faire?"))
get_session_history(f"{session_id}").add_ai_message(AIMessage(help_message_FR))
else:
help_message = help_message_EN
get_session_history(f"{session_id}").add_user_message(HumanMessage(content="Help! What can you do?"))
get_session_history(f"{session_id}").add_ai_message(AIMessage(help_message_EN))
await message.answer(
help_message,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
)
@dp.message(Command("reset"))
async def command_reset_handler(message: Message) -> None:
logger.info(Back.RED + f'RESET Asked for {message.from_user.full_name}' + Style.RESET_ALL)
session_id = message.chat.id
if session_id in store:
store[session_id] = ChatMessageHistory()
await bot.send_message(chat_id=session_id, text=f'Chat history has been reset for user {session_id}')
else:
logger.warning(f'RESET : No chat history for user {session_id}')
await bot.send_message(chat_id=session_id, text=f'No Chat history for user {session_id}')
@dp.message(Command("llm"))
async def command_llm_handler(message: Message) -> None:
logger.info(Back.RED + f'LLM change Asked for {message.from_user.full_name}' + Style.RESET_ALL)
models = await model_list()
switchllm_builder = InlineKeyboardBuilder()
for model in models:
modelname = model["name"]
i = 0
if any(x in modelname for x in ["embed", "code"]): # bypass ollama models which are not made for chat
continue
else:
modelfamilies = ""
i += 1
if model["details"]["families"]:
modelicon = {"llama": "🦙", "clip": "📷"}
try:
modelfamilies = "".join(
[modelicon[family] for family in model["details"]["families"]]
)
except KeyError:
modelfamilies = "✨"
except KeyboardInterrupt:
raise SystemExit
switchllm_builder.row(
InlineKeyboardButton(
text=f"{modelname} {modelfamilies}", callback_data=f"model_{modelname}"
)
)
await message.answer(
#chat_id = message.chat.id,
text = f"{len(models)} models, {i} available.\n🦙 = Regular\n🦙📷 = Multimodal",
reply_markup = switchllm_builder.as_markup(),
)
@dp.message(Command("lang"))
async def command_lang_handler(message: Message) -> None:
logger.info(Back.RED + f'LANGUAGE change Asked for {message.from_user.full_name}' + Style.RESET_ALL)
switchlang_builder = InlineKeyboardBuilder()
for lang in chat_languages:
if lang == current_chat_language:
switchlang_builder.row(
InlineKeyboardButton(
text=f"{lang} <--", callback_data=f"lang_{lang}"
)
)
else:
switchlang_builder.row(
InlineKeyboardButton(
text=f"{lang}", callback_data=f"lang_{lang}"
)
)
await message.answer(
#chat_id = message.chat.id,
text = "Pick Up a Language :",
reply_markup = switchlang_builder.as_markup(),
)
@dp.callback_query(lambda query: query.data.startswith("model_"))
async def model_callback_handler(query: CallbackQuery):
logger.info('CALLBACK HANDLER : LLM')
global current_model_name
try:
current_model_name = query.data.split("model_")[1]
await bot.delete_message(query.message.chat.id, query.message.message_id)
await bot.send_message(text=f"SUCCESS : Chosen model: {current_model_name}", chat_id=query.message.chat.id)
except Exception:
# failed to switch llm
await bot.delete_message(query.message.chat.id, query.message.message_id)
await bot.send_message(text=f"ERROR : Chosen model NOT applied.\nModel in use : {current_model_name}", chat_id=query.message.chat.id)
@dp.callback_query(lambda query: query.data.startswith("lang_"))
async def language_callback_handler(query: CallbackQuery):
logger.info('CALLBACK HANDLER : LANGUAGE')
try:
global current_chat_language
current_chat_language = query.data.split("lang_")[1]
await bot.delete_message(query.message.chat.id, query.message.message_id)
await bot.send_message(text=f"SUCCESS : Chosen language : {current_chat_language}", chat_id=query.message.chat.id)
except Exception:
# failed to switch llm
await bot.delete_message(query.message.chat.id, query.message.message_id)
await bot.send_message(text=f"ERROR : Chosen language NOT applied.\nLanguage in use : {current_chat_language}", chat_id=query.message.chat.id)
@dp.message(F.content_type.in_({'text'}))
async def handle_message(message: Message) -> None:
await get_bot_info()
session_id = f"{message.chat.id}"
# Check if session exists
if not session_exists(session_id):
logger.info(f'INCOMING Message before /start : {session_id} - {message.from_user.full_name} :\n{Fore.RED + message.text + Style.RESET_ALL}')
determine_language(message.text)
text_FR="Salut! Il faut commencer la conversation avec /start"
text_EN="Hi! Please start the conversation with /start"
if current_chat_language == 'FR':
text = text_FR
else:
text = text_EN
await message.answer(
text=text,
parse_mode=ParseMode.HTML,
disable_web_page_preview=True,
)
return
# Session exist, check if message is private
if message.chat.type == "private":
query = message.text
logger.info(f'INCOMING Private Chat from {session_id} - {message.from_user.full_name} :\n{Fore.MAGENTA + query + Style.RESET_ALL}')
await bot.send_chat_action(message.chat.id, "typing")
lang = determine_language(query)
if current_chat_language != lang:
logger.info(f'String Language {lang} is different from current language {current_chat_language}')
query = translate_to_language(query, current_chat_language)
tic = time.perf_counter()
answer = chain_to_llm_EN(query, session_id)
toc = time.perf_counter()
# Session exist, check if message is from group
if is_mentioned_in_group_or_supergroup(message):
if message.text is not None:
text_without_mention = message.text.replace(mention, "").strip()
prompt = text_without_mention
else:
text_without_mention = message.caption.replace(mention, "").strip()
prompt = text_without_mention
lang = determine_language(prompt)
if current_chat_language != lang:
logger.info(f'String Language {lang} is different from current language {current_chat_language}')
prompt = translate_to_language(prompt, current_chat_language)
logger.info(f'INCOMING Group Chat from {session_id} - {message.from_user.full_name} :\n{Fore.MAGENTA + prompt + Style.RESET_ALL}')
await bot.send_chat_action(message.chat.id, "typing")
tic = time.perf_counter()
answer = chain_to_llm_EN(prompt, session_id)
toc = time.perf_counter()
query_duration = f"{toc - tic:0.4f}"
await handle_response(message, answer, query_duration, session_id)
@dp.message(F.content_type.in_({'audio'}))
async def handle_audio_message(message: Message) -> None:
session_id = f"{message.chat.id}"
logger.info(f'INCOMING AUDIO Message : {session_id} - {message.from_user.full_name}' + Style.RESET_ALL)
await get_bot_info()
logger.info(message)
audio_file_id = message.audio.file_id
file = await bot.get_file(audio_file_id)
file_path = file.file_path
await bot.download_file(file_path, "audio.mp3")
## Not implemented at the moment
@dp.message(F.content_type.in_({'document'}))
async def handle_document_message(message: Message) -> None:
session_id = f"{message.chat.id}"
logger.info(f'INCOMING DOCUMENT Message : {session_id} - {message.from_user.full_name}' + Style.RESET_ALL)
await get_bot_info()
logger.debug(message)
#document_file_id = message.document.file_id
file_id = message.document.file_id
file_name = message.document.file_name.replace(" ", "_")
file_name = unicodedata.normalize('NFD', file_name).encode('ascii', 'ignore').decode('utf-8')
logger.info(f'File Name : {file_name}')
max_filename_len = 64
max_filename_len = max_filename_len - 14 - 10
if message.document.mime_type == 'application/pdf':
logger.info('Document is PDF')
doc_type = 'PDF'
if len(file_name) > max_filename_len:
file_name = file_name[:(max_filename_len-4)] + '.pdf'
logger.info(f'File Name : {file_name}')
elif message.document.mime_type == 'text/plain':
logger.info('Document is TXT')
doc_type = 'TXT'
elif message.document.mime_type == 'text/csv':
logger.info('Document is CSV')
doc_type = 'CSV'
elif message.document.mime_type == 'text/html':
logger.info('Document is HTML')
doc_type = 'HTML'
elif message.document.mime_type == 'application/json':
logger.info('Document is JSON')
doc_type = 'JSON'
else:
TG_answer = f'<b>Document is not supported !</b>\nNo Handler for <code>{message.document.mime_type}</code>'
log_answer = Fore.RED + f'Document is not supported ! No Handler for {message.document.mime_type}' + Style.RESET_ALL
logger.warning(log_answer)
await bot.send_message(chat_id=message.chat.id, text=TG_answer)
return
embed_document_kb = InlineKeyboardBuilder()
file_path = os.path.join(download_folder, file_name)
print(f"embed_doc:{doc_type}:True|{file_path}")
embed_document_kb.row(
InlineKeyboardButton(text="✔️ OUI", callback_data=f"_doc:{doc_type}:True|{file_path}"),
InlineKeyboardButton(text="❌ NON", callback_data=f"_doc:{doc_type}:False"),
)
# Créer le répertoire de téléchargement s'il n'existe pas
os.makedirs(os.path.dirname(file_path), exist_ok=True)
logger.info(f'Downloading File {file_name}')
file_info = await bot.get_file(file_id)
await bot.download_file(file_info.file_path, file_path)
# Demander à l'utilisateur s'il souhaite intégrer le PDF
logger.info('Asking to User if he wants to embed file to Vault')
text=f"Do you want to embed the provided {doc_type} Document?"
await bot.send_message(chat_id=message.chat.id, text=text, reply_markup=embed_document_kb.as_markup())
@dp.message(F.content_type.in_({'voice'}))
async def handle_voice_message(message: Message) -> None:
tic = time.perf_counter()
session_id = f"{message.chat.id}"
logger.info(f'INCOMING VOICE Message : {session_id} - {message.from_user.full_name}' + Style.RESET_ALL)
await get_bot_info()
voice_file_id = message.voice.file_id
file = await bot.get_file(voice_file_id)
file_path = file.file_path
await bot.download_file(file_path, "voice.ogg")
ogg_file = "voice.ogg"
mp3_file = "voice.mp3"
audio = AudioSegment.from_ogg(ogg_file)
audio.export(mp3_file, format="mp3")
# Call to whisper
model = whisper.load_model("base")
result = model.transcribe(mp3_file)
query_text = result["text"]
query_lang = result["language"].upper()
if query_lang == 'EN':
answer = chain_to_llm_EN(query_text, session_id)
else:
answer = chain_to_llm_FR(query_text, session_id, query_lang)
toc = time.perf_counter()
query_duration = f"{toc - tic:0.4f}"
await handle_response(message, answer, query_duration, session_id)
@dp.callback_query(lambda query: query.data.startswith("_doc:"))
async def embed_document_callback_handler(query: CallbackQuery):
logger.info(f"Query Data : {query.data}") ## _doc:PDF:True|{file_path} // _doc:PDF:False
stripped_query = query.data.split(':')
do_embed = stripped_query[2]
doc_type = stripped_query[1]
logger.info(f"Doc Type : {doc_type}")
logger.info(f"Do Embed ? : {do_embed}")
if do_embed == 'False':
logger.info('Embedding DOC : NO')
await bot.edit_message_reply_markup(chat_id=query.message.chat.id, message_id=query.message.message_id, reply_markup=None)
await bot.send_message(chat_id=query.message.chat.id, text="Intégration annulée.")
else:
logger.info('Embedding DOC : YES')
await bot.edit_message_reply_markup(chat_id=query.message.chat.id, message_id=query.message.message_id, reply_markup=None)
await bot.send_message(chat_id=query.message.chat.id, text="Patientez, intégration en cours...")
file_path = query.data.split('|')[1]
if doc_type == 'PDF':
logger.info(f'Embedding PDF: {file_path}')
# Lire le fichier PDF avec PyPDF2
with open(file_path, 'rb') as f:
logger.info('Read PDF')
reader = PyPDF2.PdfReader(f)
logger.info('Count PDF Pages')
num_pages = len(reader.pages)
logger.info(f"Nombre de pages : {num_pages}")
text = ''
for page_num in range(1, num_pages + 1):
logger.info(Fore.CYAN + f"Processing page number: {page_num}" + Style.RESET_ALL)
page = reader.pages[page_num - 1]
if page.extract_text():
text += page.extract_text() + " "
# Normalize whitespace and clean up text
text = re.sub(r'\s+', ' ', text).strip()
# Split text into chunks by sentences, respecting a maximum chunk size
sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation
chunks = []
current_chunk = ""
for sentence in sentences:
# Check if the current sentence plus the current chunk exceeds the limit
if len(current_chunk) + len(sentence) + 1 < 8192: # +1 for the space
current_chunk += (sentence + " ").strip()
else:
# When the chunk exceeds 1024 characters, store it and start a new one
chunks.append(current_chunk)
current_chunk = sentence + " "
if current_chunk: # Don't forget the last chunk!
chunks.append(current_chunk)
with open(my_filename, "a", encoding="UTF-8") as vault_file:
for chunk in chunks:
# Write each chunk to its own line
vault_file.write(chunk.strip() + u"\n") # Two newlines to separate chunks
logger.info(f"PDF content appended to {my_filename} with each chunk on a separate line.")
elif doc_type == 'TXT':
logger.info(f'Embedding TXT: {file_path}')
with open(file_path, 'r', encoding="utf-8") as txt_file:
text = txt_file.read()
# Normalize whitespace and clean up text
text = re.sub(r'\s+', ' ', text).strip()
# Split text into chunks by sentences, respecting a maximum chunk size
sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation
chunks = []
current_chunk = ""
for sentence in sentences:
# Check if the current sentence plus the current chunk exceeds the limit
if len(current_chunk) + len(sentence) + 1 < 1024: # +1 for the space
current_chunk += (sentence + " ").strip()
else:
# When the chunk exceeds 1024 characters, store it and start a new one
chunks.append(current_chunk)
current_chunk = sentence + " "
if current_chunk: # Don't forget the last chunk!
chunks.append(current_chunk)
with open(my_filename, "a", encoding="utf-8") as vault_file:
for chunk in chunks:
# Write each chunk to its own line
vault_file.write(chunk.strip() + u"\n") # Two newlines to separate chunks
logger.info(f"Text file content appended to {my_filename} with each chunk on a separate line.")
elif doc_type == 'CSV':
logger.info(f'Embedding CSV: {file_path} - Not Implemented yet')
import csv
with open(file_path, newline='') as csvfile:
spamreader = csv.reader(csvfile, delimiter=',', quotechar='"')
for row in spamreader:
print(', '.join(row))
elif doc_type == 'HTML':
logger.info(f'Embedding HTML: {file_path} - Not Implemented yet')
elif doc_type == 'JSON':
logger.info(f'Embedding JSON: {file_path}')
with open(file_path, 'r', encoding="utf-8") as json_file:
data = json.load(json_file)
# Flatten the JSON data into a single string
text = json.dumps(data, ensure_ascii=False)
# Normalize whitespace and clean up text
text = re.sub(r'\s+', ' ', text).strip()
# Split text into chunks by sentences, respecting a maximum chunk size
sentences = re.split(r'(?<=[.!?]) +', text) # split on spaces following sentence-ending punctuation
chunks = []
current_chunk = ""
for sentence in sentences:
# Check if the current sentence plus the current chunk exceeds the limit
if len(current_chunk) + len(sentence) + 1 < 1024: # +1 for the space
current_chunk += (sentence + " ").strip()
else:
# When the chunk exceeds 1024 characters, store it and start a new one
chunks.append(current_chunk)
current_chunk = sentence + " "
if current_chunk: # Don't forget the last chunk!
chunks.append(current_chunk)
with open(my_filename, "a", encoding="utf-8") as vault_file:
for chunk in chunks:
# Write each chunk to its own line
vault_file.write(chunk.strip() + u"\n") # Two newlines to separate chunks
logger.info(f"JSON file content appended to {my_filename} with each chunk on a separate line.")
## Embed the result
logger.info(f'Reading content of downloaded file {my_filename}')
text_loader = TextLoader(my_filename, encoding='UTF-8')
docs = text_loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=embed_ctx, chunk_overlap=128)
splits = text_splitter.split_documents(docs)
# Initialize Embedding
logger.info('Embedding...')
# Add documents to existing vectorstore
Chroma.add_documents(self=chroma_vector_store_docs, documents=splits)
logger.info('End.')
await bot.send_message(chat_id=query.message.chat.id, text="Intégration OK !\nVous pouvez maintenant poser une question en rapport avec le document envoyé.")
async def handle_response(message, full_response, query_duration: float = None, session_id: str = None) -> bool:
full_response_stripped = full_response.strip()
if full_response_stripped == "":
return False
text = f"{full_response_stripped}\n\n⚙️ {current_model_name}\nGenerated in {query_duration}s."
await send_response(message, text)
async with ACTIVE_CHATS_LOCK:
if ACTIVE_CHATS.get(message.from_user.id) is not None:
ACTIVE_CHATS[message.from_user.id]["messages"].append(
{"role": "assistant", "content": full_response_stripped}
)
logger.debug(Fore.MAGENTA +
f"[Response]: '{full_response_stripped}' for {message.from_user.full_name}" + Style.RESET_ALL
)
return True
async def send_response(message, text) -> None:
if message.chat.id < 0 or message.chat.id == message.from_user.id:
await bot.send_message(chat_id=message.chat.id, text=text)
else:
await bot.edit_message_text(
chat_id = message.chat.id,
message_id = message.message_id,
text = text
)
def get_session_history(session_id: str) -> BaseChatMessageHistory:
if session_id not in store:
store[session_id] = ChatMessageHistory()
return store[session_id]
def session_exists(session_id: str) -> bool:
return session_id in store
def filter_device_data(device):
return {
"Name" : device.get("Name"),
"idx" : device.get("idx"),
"Data" : device.get("Data"),
"SwitchType" : device.get("SwitchType")
}
def filter_device_from_plan(device, idx):
if idx in device["PlanIDs"]:
return device
def get_domoticz_plan_idx(name = domoticz_plan_name) -> int:
response = requests.get(url_get_plans).json()
json_plans = response.get('result')
idx_to_name = {item["Name"]: item["idx"] for item in json_plans}
if name in idx_to_name:
return int(idx_to_name[name])
def chain_to_llm_EN(query: str, session_id: str) -> str:
logger.info(f'LLM EN Query : {query}')
config = {"configurable": {"thread_id": "chat_EN", "session_id": session_id}}
try:
agent_with_chat_history = construct_chain(session_id)
response = agent_with_chat_history.invoke({"input": query}, config=config)
answer = (response["output"])
except Exception as e:
print(f'=== ERROR EN : {e}')
answer = "AN ERROR OCCURRED"
pass
logger.info(f'OUTGOING Answer for {session_id} :\n{answer}')
return answer
def chain_to_llm_FR(query: str, session_id: str, language: str = None) -> str:
logger.info(f'LLM FR Query : {query}')
config = {"configurable": {"thread_id": "chat_FR", "session_id": session_id}}
# LLM is better at english, so translate query to english, to send it to LLM
if language != current_chat_language:
logger.info(f'chain_to_llm_FR : Language {language} is different from {current_chat_language}')
query = translate_to_language(query, current_chat_language)
try:
agent_with_chat_history = construct_chain(session_id)
response = agent_with_chat_history.invoke({"input": query}, config=config)
answer = (response["output"])
except Exception as e:
print(f'=== ERROR FR : {e}')
answer = "AN ERROR OCCURRED"
pass
logger.info(f'OUTGOING Answer for {session_id} :\n{answer}')
logger.info('Translation to FR')
# Translate Answer from LLM back in French
answer = translate_to_language(answer, 'FR')
return answer
logger.info(Back.RED + 'Start...' + Style.RESET_ALL)
logger.info(Back.RED + 'Model :' + Style.RESET_ALL + ' ' + f'{Back.GREEN + current_model_name}' + Style.RESET_ALL)
logger.info(Back.RED + 'Language :' + Style.RESET_ALL + ' ' + f'{Back.GREEN + current_chat_language}' + Style.RESET_ALL)
# Reading initial Text file
logger.info(f'Reading content of file {file_path}')
text_loader = TextLoader(file_path, encoding='UTF-8')
docs = text_loader.load()
text_splitter = RecursiveCharacterTextSplitter(chunk_size=1024, chunk_overlap=128)
splits = text_splitter.split_documents(docs)
# Initialize Embedding
logger.info('Embedding...')
chroma_vector_store_docs = Chroma.from_documents(documents=splits, embedding=embedding_function)
# Build Retriever
retriever_docs = chroma_vector_store_docs.as_retriever(search_kwargs={'k': 3})
logger.info('End.')
# Tools Part
tool_parse_txt = create_retriever_tool(
retriever_docs,
"search_docs",
"Searches and returns documents from the internal memo. Useful when you need to answer questions about Jouques, Claude, or something related to local IT Assistance.",
)
@tool("get_domoticz_status", return_direct=False)
def get_domoticz_status(input: str) -> str:
"""Get the current status of the local Home Automation (Domoticz) instance. Returns a JSON dict with 'status' key."""
return requests.get(url_get_status).json()
@tool("get_domoticz_devices", return_direct=False)
def get_domoticz_devices(input: str) -> str:
"""Query all Devices and their associated state, which are present in the local Domoticz Home Automation system.
Returns a filtered JSON list with all switchlight devices name and their IDX number, needed to initiate an action."""
response = requests.get(url_get_devices).json()
json_devices = response.get('result')
idx = get_domoticz_plan_idx()
filtered_devices_from_plan = [filter_device_from_plan(device, idx) for device in json_devices if filter_device_from_plan(device, idx) is not None]
filtered_devices = [filter_device_data(device) for device in filtered_devices_from_plan if filter_device_data(device) is not None]
# Write file to disk
with open('output.json', 'w') as f:
json.dump(filtered_devices, f)
with open('output_from_plan.json', 'w') as f:
json.dump(filtered_devices_from_plan, f)
with open('output_pretty.json', 'w') as f:
json.dump(filtered_devices, f, indent=4)
with open('output_full.json', 'w') as f:
json.dump(json_devices, f, indent=4) ## 490k tokens...
return str(filtered_devices)
@tool("domoticz_switch_on", return_direct=False)
def domoticz_switch_on(input: str) -> str:
"""To switch On a Domoticz device, with it's IDX as an integer input. You might need to get the corresponding IDX with get_domoticz_devices tool before. Returns a JSON dict with 'status' key of the power on command."""
url_switch_on = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=switchlight&idx={input}&switchcmd=On'
return requests.get(url_switch_on).json()
@tool("domoticz_switch_off", return_direct=False)
def domoticz_switch_off(input: str) -> str:
"""To switch Off a Domoticz device, with it's IDX as an integer input. You might need to get the corresponding IDX with get_domoticz_devices tool before. Returns a JSON dict with 'status' key of the power off command."""
url_switch_off = f'http://{domoticz_user}:{domoticz_pass}@{domoticz_ip}:{domoticz_port}/json.htm?type=command&param=switchlight&idx={input}&switchcmd=Off'
return requests.get(url_switch_off).json()
@tool("get_current_date_time", return_direct=False)
def get_current_date_time(input: str) -> str:
"""Use the running system's default time zone to return the current date and time in ISO format. You need to format it to a Human consumable form."""
return datetime.now().isoformat() # '%Y-%m-%dT%H:%M:%S'
@tool("convert_fahrenheit_to_celsius", return_direct=False)
def convert_fahrenheit_to_celsius(input: str) -> str:
"""Convert a temperature (numeric input) from fahrenheit to celsius. Returns an integer."""
return int((int(input) - 32) * 5.0/9.0)
@tool("convert_celsius_to_fahrenheit", return_direct=False)
def convert_celsius_to_fahrenheit(input: str) -> str:
"""Convert a temperature (numeric input) from celsius to fahrenheit. Returns an integer."""
return int(9.0/5.0 * int(input) + 32)
@tool("magic_function", return_direct=False)
def magic_function(input: str) -> str:
"""Applies a magic function to a numeric only input."""
return int(input) + 2
all_tools = load_tools(['wikipedia', 'google-serper'], llm=llm_func)
all_tools.append(get_current_date_time)
all_tools.append(convert_fahrenheit_to_celsius)
all_tools.append(convert_celsius_to_fahrenheit)
all_tools.append(get_domoticz_devices)
all_tools.append(get_domoticz_status)
all_tools.append(domoticz_switch_on)
all_tools.append(domoticz_switch_off)
all_tools.append(tool_parse_txt)
############
## PROMPT ##
############
prompt = hub.pull("hwchase17/structured-chat-agent")
## Edit Prompt
prompt = prompt.partial(
tools = render_text_description(all_tools),
tool_names = ", ".join([t.name for t in all_tools]),
)
## Show prompt
#print(prompt.pretty_print())
############
## AGENTS ##
############
structured_agent = create_structured_chat_agent(llm_func, all_tools, prompt)
def construct_chain(session_id: int):
chat_memory = ConversationBufferMemory(chat_memory=get_session_history(session_id), memory_key="chat_history", input_key="input", output_key="output", return_messages=True)
agent_executor = AgentExecutor(agent=structured_agent, tools=all_tools, memory=chat_memory, handle_parsing_errors=False, return_intermediate_steps=True, verbose=True)
return RunnableWithMessageHistory(
agent_executor,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
)
""" session_id = 'xxxxxxxxx'
chat_memory = ConversationBufferMemory(chat_memory=get_session_history('xxxxxxxxx'), memory_key="chat_history", input_key="input", output_key="output", return_messages=True)
agent_executor = AgentExecutor(agent=structured_agent, tools=all_tools, memory=chat_memory, handle_parsing_errors=False, return_intermediate_steps=True, verbose=False)
agent_with_chat_history = RunnableWithMessageHistory(
agent_executor,
get_session_history,
input_messages_key="input",
history_messages_key="chat_history",
)
config_EN = {"configurable": {"thread_id": "chat_EN", "session_id": session_id}}
chat_history = []
query = "Hi! My name is Bob TRENTON. How are you?"
print('\n=======\n')
print(query)
print('\n=======\n')
chain_to_llm_EN(query, session_id)
query = "What is the current political situation in France?"
print(query)
message = agent_with_chat_history.invoke({"input": query}, config=config_EN)
print(message["output"])
query = "Did I tell you my name? What is it?"
print(query)
message = agent_with_chat_history.invoke({"input": query}, config=config_EN)
print(message["output"])
exit(0) """
""" ## DEBUG
questions = [
"Hi, my name is Bob. Here is a secret code : 2E3C449555OFC@B6C6E27AE0C3. Please keep it in memory for later use.",
"What happened during the latest legislative elections in France? When was that?",
"Why the far left camp should not get into responsibilities?",
"Isn't it a situation which looks like Germany's in 1936?",
"Please tell me my name and the secret code I told you earlier."
]
session_id = 'xxxxxxxxx'
for question in questions:
print(question + '\n=======\n')
chain_to_llm_EN(question, session_id)
print('\n=======\n')
exit(0) """
async def main():
await bot.set_my_commands(commands)
await dp.start_polling(bot, skip_update=True)
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print('\n=======\n')
import pprint
pprint.pp(store)
print('\n=======\n')
print('End.')
raise SystemExit
exit(0)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment