Created
December 10, 2023 11:36
-
-
Save cppxaxa/61703da440c004c0a926ec8fbfaea85a to your computer and use it in GitHub Desktop.
A Telegram chatbot that responds back from a GGUF LLM Intel neural chat running on local host machine with some buttons for list items
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# llama_cpp_python==0.2.20 | |
# requests==2.23.0 | |
# timeout_decorator==0.5.0 | |
import json | |
import re | |
import uuid | |
import requests | |
import time | |
import urllib | |
from llama_cpp import Llama | |
import threading | |
from urllib.parse import quote | |
from timeout_decorator import timeout, TimeoutError | |
TOKEN = "<Telegram token from TG BotFather>" | |
URL = "https://api.telegram.org/bot{}/".format(TOKEN) | |
chat_history_map = {} | |
llm: Llama = None | |
def get_url(url): | |
response = requests.get(url) | |
content = response.content.decode("utf8") | |
return content | |
def get_json_from_url(url): | |
content = get_url(url) | |
js = json.loads(content) | |
return js | |
def get_updates(offset=None): | |
url = URL + "getUpdates?timeout=100" | |
if offset: | |
url += "&offset={}".format(offset) | |
js = get_json_from_url(url) | |
return js | |
def get_last_update_id(updates): | |
update_ids = [] | |
for update in updates["result"]: | |
update_ids.append(int(update["update_id"])) | |
return max(update_ids) | |
@timeout(800) | |
def call_llm(message_list): | |
global llm | |
q = "Q: {{question_without_mark}}? " | |
a = "A: {{answer}} <END> " | |
a_placeholder = "A: " | |
prompt = "Q: What do you do? A: I'm an intelligent machine that responds to questions by being concise and I try to respond in points. <END> Q: What is 2+2? A: 4 <END> Q: What is the state of water in room temperature? A: Liquid. <END> " | |
mode = 'q' | |
for message in message_list: | |
if mode == 'q': | |
question_without_mark = message | |
if question_without_mark.strip()[-1] == "?": | |
question_without_mark = question_without_mark.strip()[:-1] | |
pq = q.replace("{{question_without_mark}}", question_without_mark) | |
prompt = prompt + pq | |
mode = 'a' | |
else: | |
pa = a.replace("{{answer}}", message) | |
prompt = prompt + pa | |
mode = 'q' | |
if mode == 'a': | |
prompt = prompt + a_placeholder | |
output = llm( | |
prompt, | |
max_tokens=4000, | |
stop=["Q:", "<END>"], | |
echo=True | |
) | |
# try: | |
# output = llm( | |
# prompt, | |
# max_tokens=4000, | |
# stop=["Q:", "<END>"], | |
# echo=True | |
# ) | |
# except Exception as e: | |
# return repr(e) | |
result = output["choices"][0]["text"][len(prompt):].strip() | |
return result | |
class log_thread(threading.Thread): | |
def __init__(self, thread_name, thread_ID, tgchat): | |
threading.Thread.__init__(self) | |
self.thread_name = thread_name | |
self.thread_ID = thread_ID | |
self.tgchat = tgchat | |
self.should_stop = False | |
def run(self): | |
while not self.should_stop: | |
print(str(self.thread_name) + " " + str(self.thread_ID)) | |
send_message("[INFO] Working", self.tgchat) | |
time.sleep(120) | |
def stop(self): | |
print("Stopping") | |
self.should_stop = True | |
def process_conversation(text_messages, tgchat): | |
chat_logger = log_thread(f"tgchat:{tgchat}", 1000, tgchat) | |
chat_logger.start() | |
prompt = None | |
excp = None | |
try: | |
prompt = call_llm(text_messages) | |
except TimeoutError as e: | |
excp = e | |
finally: | |
chat_logger.stop() | |
del chat_logger | |
if excp: | |
raise excp | |
return prompt | |
def echo_all(updates): | |
global chat_history_map | |
for update in updates["result"]: | |
if "message" in update: | |
text = update["message"]["text"] | |
tgchat = update["message"]["chat"]["id"] | |
chat_history = [] if update["message"]["chat"]["id"] not in chat_history_map else chat_history_map[update["message"]["chat"]["id"]] | |
chat_history.append(text) | |
try: | |
result = process_conversation(chat_history, tgchat) | |
chat_history.append(result) | |
chat_history = chat_history[-4:] # Last 2 message pairs. | |
chat_history_map[update["message"]["chat"]["id"]] = chat_history | |
reply_markup = get_reply_markup(result) | |
send_message(result, tgchat, reply_markup) | |
except TimeoutError: | |
print("Process timed out") | |
send_message("[ERROR] Process timed out", tgchat) | |
elif "callback_query" in update: | |
callback_query = update["callback_query"] | |
message = callback_query["message"] | |
chat_history = [] if message["chat"]["id"] not in chat_history_map else chat_history_map[message["chat"]["id"]] | |
tgchat = message["chat"]["id"] | |
button_id = callback_query["data"] | |
inline_keyboard = message["reply_markup"]["inline_keyboard"] | |
button_text = None | |
for callback_list in inline_keyboard: | |
for callback in callback_list: | |
if callback["callback_data"] == button_id: | |
button_text = callback["text"] | |
if button_text: | |
chat_history.append(button_text) | |
try: | |
result = process_conversation(chat_history, tgchat) | |
if result.strip() != "": | |
chat_history.append(result) | |
chat_history = chat_history[-4:] # Last 2 message pairs. | |
chat_history_map[message["chat"]["id"]] = chat_history | |
reply_markup = get_reply_markup(result) | |
send_message(result, tgchat, reply_markup) | |
else: | |
send_message("[ERROR] We don't have any response for you", tgchat) | |
except TimeoutError: | |
print("Process timed out") | |
send_message("[ERROR] Process timed out", tgchat) | |
else: | |
send_message("[ERROR] Unable to understand input", tgchat) | |
else: | |
pass | |
def get_last_chat_id_and_text(updates): | |
num_updates = len(updates["result"]) | |
last_update = num_updates - 1 | |
text = updates["result"][last_update]["message"]["text"] | |
chat_id = updates["result"][last_update]["message"]["chat"]["id"] | |
return (text, chat_id) | |
def send_message(text, chat_id, reply_markup=None): | |
text = urllib.parse.quote_plus(text) | |
url = URL + "sendMessage?text={}&chat_id={}".format(text, chat_id) | |
if reply_markup: | |
url += "&reply_markup={}".format(reply_markup) | |
get_url(url) | |
def get_reply_markup_basic_newline(input_string): | |
input_string = input_string.strip() | |
lines = input_string.split("\n") | |
if len(lines) < 2: | |
print("Found lines less than 2") | |
return None | |
question_list = [line.strip() for line in lines if len(line) < 45 and len(line) > 3] | |
question_list = [line if not line.endswith(".") else line[:len(line) - 1] for line in question_list] | |
question_list = [line for line in question_list if line.find('.') == -1] | |
question_list = [f"Explain me {line.strip()}?" for line in question_list] | |
callbacks = [[{'text': quote(question), 'callback_data': str(uuid.uuid4()).replace('-', '_')}] for question in question_list] | |
keyboard = { | |
'inline_keyboard': callbacks | |
} | |
return json.dumps(keyboard) | |
def get_reply_markup_numbered_list(input_string): | |
# Check if the input contains a numbered list | |
matches = re.findall(r'\d+\.\s+(.*?)(?=\d+\.|\Z)', input_string, re.DOTALL) | |
if matches: | |
question_list = [f"What is {match.strip()}" for match in matches] | |
question_list = [q.strip() if not q.strip().endswith("??") else q.strip()[:len(q.strip()) - 1] for q in question_list] | |
callbacks = [[{'text': quote(question), 'callback_data': str(uuid.uuid4()).replace('-', '_')}] for question in question_list] | |
keyboard = { | |
'inline_keyboard': callbacks | |
} | |
return json.dumps(keyboard) | |
else: | |
return None | |
def get_reply_markup(input_string): | |
val = get_reply_markup_numbered_list(input_string) | |
if val: return val | |
val = get_reply_markup_basic_newline(input_string) | |
if val: return val | |
return None | |
def main(): | |
global llm | |
last_update_id = None | |
llm = Llama(model_path="./neural-chat-7b-v3-1.Q8_0.gguf", n_ctx=4000) | |
while True: | |
updates = get_updates(last_update_id) | |
if len(updates["result"]) > 0: | |
last_update_id = get_last_update_id(updates) + 1 | |
echo_all(updates) | |
time.sleep(0.5) | |
if __name__ == '__main__': | |
# message = ''' | |
# 1. 889+111 | |
# 2. sum of 9 and 11 | |
# 3. 7+9? | |
# ''' | |
# reply_markup = get_reply_markup(message) | |
# send_message(message.strip(), "6832486947", reply_markup) | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment