Last active
March 15, 2023 18:08
-
-
Save arynyklas/e1070e9c0dbbe0b7bdc43bd8a67b796e to your computer and use it in GitHub Desktop.
Python aiogram 3.0.07b multibot to communicate with OpenAI's text-davinci-003 AI
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Dict | |
TEXTS: Dict[str, str] = { | |
"start": "Hello {mention} 👋\n\nAsk me anything :)", | |
"wait": "Wait please...", | |
"error": "Something went wrong." | |
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from pydantic import BaseModel | |
from yaml import load as load_yaml, Loader | |
from typing import List | |
class WebConfig(BaseModel): | |
host: str | |
port: int | |
path: str | |
base_url: str | |
class Config(BaseModel): | |
main_bot_token: str | |
admins: List[int] | |
openai_token: str | |
web: WebConfig | |
CONFIG_FILENAME: str = "config.yml" | |
with open(CONFIG_FILENAME, "r", encoding="utf-8") as file: | |
data: dict = load_yaml( | |
stream = file, | |
Loader = Loader | |
) | |
config: Config = Config( | |
web = WebConfig(**data.pop("web")), | |
**data | |
) | |
__all__ = ( | |
"config", | |
) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
main_bot_token: "..." | |
admins: | |
- 794823214 | |
openai_token: "..." | |
web: | |
host: "127.0.0.1" | |
port: 8080 | |
path: "/webhook/telegram/bot/{bot_token}" | |
base_url: "https://..." |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from aiogram import Bot, Dispatcher, F, Router, filters, types, enums, exceptions | |
from aiogram.utils.text_decorations import html_decoration | |
from aiogram.client.session.aiohttp import AiohttpSession | |
from aiogram.fsm.storage.memory import MemoryStorage | |
from aiogram.utils.token import TokenValidationError, validate_token | |
from aiogram.webhook.aiohttp_server import SimpleRequestHandler, TokenBasedRequestHandler, setup_application | |
from aiohttp.web import Application as WebApplication, run_app as run_web_app | |
from functools import partial | |
from asyncio import AbstractEventLoop, get_event_loop | |
from magic_filter.magic import MagicT | |
import openai | |
from basic_data import TEXTS | |
from config import config | |
from typing import Callable, Dict, List, Coroutine, Any, Optional, Union | |
quote_html: Callable[[str], str] = html_decoration.quote | |
openai.api_key = config.openai_token | |
MAIN_BOT_PATH: str = config.web.path.format( | |
bot_token = config.main_bot_token | |
) | |
ADDITIONAL_BOTS_URL: str = config.web.base_url + config.web.path | |
waitings: Dict[int, List[int]] = [] | |
def wrap_openai_call(func: Callable[..., Any]) -> Callable[..., Coroutine[Any, Any, Any]]: | |
async def run(*args, loop: Optional[AbstractEventLoop]=None, executor=None, **kwargs): | |
if loop is None: | |
loop: AbstractEventLoop = get_event_loop() | |
return await loop.run_in_executor( | |
executor = executor, | |
func = partial(func, *args, **kwargs) | |
) | |
return run | |
def is_bot_token(bot_token: str) -> bool: | |
try: | |
validate_token( | |
token = bot_token | |
) | |
except TokenValidationError: | |
return False | |
return True | |
async def get_bot_id(bot: Bot) -> int: | |
return (await bot.me()).id | |
admins_filter: MagicT = F.from_user.id.in_(config.admins) | |
private_chat_filter: MagicT = F.chat.type == enums.ChatType.PRIVATE | |
main_router: Router = Router() | |
additional_router: Router = Router() | |
@main_router.message(admins_filter, filters.Command("start")) | |
async def main_router_start_command_handler(message: types.Message, bot: Bot) -> None: | |
await message.answer( | |
text = "Pong!" | |
) | |
@main_router.message(admins_filter, filters.Command("add", magic=F.args.func(is_bot_token))) | |
async def main_router_add_command_handler(message: types.Message, command: filters.CommandObject, bot: Bot) -> None: | |
new_additional_bot_token: str = command.args | |
new_additional_bot: Bot = Bot( | |
token = new_additional_bot_token, | |
session = bot.session | |
) | |
try: | |
new_additional_bot_user: types.User = await new_additional_bot.get_me() | |
except exceptions.TelegramUnauthorizedError: | |
await message.answer( | |
text = "Invalid token" | |
) | |
return | |
await new_additional_bot.delete_webhook( | |
drop_pending_updates = True | |
) | |
await new_additional_bot.set_webhook( | |
url = ADDITIONAL_BOTS_URL.format( | |
bot_token = new_additional_bot_token | |
) | |
) | |
await message.reply( | |
text = "Bot @{bot_username} successful added!".format( | |
bot_username = new_additional_bot_user.username | |
) | |
) | |
@additional_router.message(filters.Command("start")) | |
async def command_start_handler(message: types.Message) -> None: | |
await message.answer( | |
text = TEXTS["start"].format( | |
mention = html_decoration.link( | |
value = message.from_user.full_name, | |
link = message.from_user.url | |
) | |
) | |
) | |
@additional_router.message(F.chat.type == enums.ChatType.PRIVATE, F.content_type == enums.ContentType.TEXT) | |
@additional_router.message(filters.Command("ask")) | |
async def text_type_handler(message: types.Message, bot: Bot) -> None: | |
bot_id: int = get_bot_id( | |
bot = bot | |
) | |
user_id: int = message.from_user.id | |
if user_id in waitings[bot_id]: | |
return | |
waitings[bot_id].append(user_id) | |
bot_message: types.Message = await message.reply( | |
text = TEXTS["wait"] | |
) | |
query: str = message.text | |
command: Union[str, None] = ( | |
message.get_full_command( | |
pure = True | |
) or [None] | |
)[0] | |
if command == "/ask": | |
query = query.split(" ", 1)[1] | |
try: | |
response: Any = await wrap_openai_call(openai.Completion.create)( | |
model = "text-davinci-003", | |
prompt = query, | |
temperature = 0.9, | |
max_tokens = 500, | |
top_p = 1, | |
frequency_penalty = 0, | |
presence_penalty = 0.6 | |
) | |
await bot_message.edit_text( | |
text = quote_html(response.choices[0].text.strip()) | |
) | |
except: | |
await bot_message.edit_text( | |
text = TEXTS["error"] | |
) | |
waitings[bot_id].remove(user_id) | |
async def on_bot_startup(webhook_url: str, dispatcher: Dispatcher, bot: Bot) -> None: | |
await bot.set_webhook( | |
url = webhook_url, | |
drop_pending_updates = True | |
) | |
async def on_startup(bot: Bot) -> None: | |
await bot.set_webhook( | |
url = config.web.base_url + MAIN_BOT_PATH | |
) | |
def main() -> None: | |
bot_settings: dict = { | |
"session": AiohttpSession(), | |
"parse_mode": enums.ParseMode.HTML | |
} | |
main_bot: Bot = Bot( | |
token = config.main_bot_token, | |
**bot_settings | |
) | |
storage: MemoryStorage = MemoryStorage() | |
main_dispatcher: Dispatcher = Dispatcher( | |
storage = storage | |
) | |
main_dispatcher.include_router( | |
router = main_router | |
) | |
main_dispatcher.startup.register( | |
callback = on_startup | |
) | |
additional_dispatcher: Dispatcher = Dispatcher( | |
storage = storage | |
) | |
additional_dispatcher.include_router( | |
router = additional_router | |
) | |
web_app: WebApplication = WebApplication() | |
SimpleRequestHandler( | |
dispatcher = main_dispatcher, | |
bot = main_bot | |
).register( | |
web_app, | |
path = MAIN_BOT_PATH | |
) | |
TokenBasedRequestHandler( | |
dispatcher=additional_dispatcher, | |
bot_settings=bot_settings, | |
).register( | |
web_app, | |
path = config.web.path | |
) | |
setup_application( | |
web_app, | |
main_dispatcher, | |
bot = main_bot | |
) | |
setup_application( | |
web_app, | |
additional_dispatcher | |
) | |
run_web_app( | |
app = web_app, | |
host = config.web.host, | |
port = config.web.port | |
) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment