chore: autoformatting code and sorting import

This commit is contained in:
Danil 2025-06-06 15:54:11 +03:00
parent cd40332ef6
commit 3d1bacf85b
11 changed files with 177 additions and 229 deletions

12
bot.py
View file

@ -1,13 +1,13 @@
import yaml
from aiogram import Bot from aiogram import Bot
from aiogram.client.default import DefaultBotProperties from aiogram.client.default import DefaultBotProperties
from aiogram.enums import ParseMode from aiogram.enums import ParseMode
import yaml
from database.server import Database from database.server import Database
db = Database('/data/shirino.db') db = Database("/data/shirino.db")
config = yaml.safe_load(open('../config.yaml', 'r', encoding='utf-8')) config = yaml.safe_load(open("../config.yaml", "r", encoding="utf-8"))
bot = Bot( bot = Bot(
token=config['telegram_token'], token=config["telegram_token"],
default=DefaultBotProperties(parse_mode=ParseMode.HTML) default=DefaultBotProperties(parse_mode=ParseMode.HTML),
) )

View file

@ -1,14 +1,14 @@
import hashlib import hashlib
from aiogram import types, Router from aiogram import Router, types
from aiogram.filters import Command from aiogram.filters import Command
from bot import bot, db from bot import bot, db
from functions.convert import Converter from functions.convert import Converter
from functions.create_chart import create_chart from functions.create_chart import create_chart
from i18n.localization import I18n
from utils.format_number import format_number from utils.format_number import format_number
from utils.inline_query import reply from utils.inline_query import reply
from i18n.localization import I18n
router = Router() router = Router()
i18n = I18n() i18n = I18n()
@ -21,45 +21,38 @@ async def currency(query: types.InlineQuery) -> None:
result_id = hashlib.md5(text.encode()).hexdigest() result_id = hashlib.md5(text.encode()).hexdigest()
get_bot = await bot.get_me() get_bot = await bot.get_me()
data = await db.fetch( data = await db.fetch("SELECT * FROM users WHERE user_id = ?", query.from_user.id)
'SELECT * FROM users WHERE user_id = ?',
query.from_user.id
)
lang = data.get('lang') lang = data.get("lang")
locale = i18n.get_locale(lang) locale = i18n.get_locale(lang)
currency_example = locale["currency_example"].format( currency_example = locale["currency_example"].format(bot_username=get_bot.username)
bot_username=get_bot.username
)
if len(args) < 2: if len(args) < 2:
await reply( await reply(
result_id, result_id,
[(locale["error_not_enough_args"], currency_example, None, None)], [(locale["error_not_enough_args"], currency_example, None, None)],
query query,
) )
return return
conv = Converter() conv = Converter()
from_currency, conv_currency = '', '' from_currency, conv_currency = "", ""
if len(args) == 3: if len(args) == 3:
try: try:
conv.amount = float(args[0].replace(',', '.')) conv.amount = float(args[0].replace(",", "."))
if conv.amount < 0: if conv.amount < 0:
await reply( await reply(
result_id, result_id, [(locale["error_negative_amount"], None, None)], query
[(locale["error_negative_amount"], None, None)],
query
) )
return return
except ValueError: except ValueError:
await reply( await reply(
result_id, result_id,
[(locale["error_invalid_number"], currency_example, None, None)], [(locale["error_invalid_number"], currency_example, None, None)],
query query,
) )
return return
from_currency = args[1] from_currency = args[1]
@ -81,21 +74,17 @@ async def currency(query: types.InlineQuery) -> None:
try: try:
await conv.convert() await conv.convert()
except RuntimeError: except RuntimeError:
await reply( await reply(result_id, [(locale["error_currency_rate"], None, None)], query)
result_id,
[(locale["error_currency_rate"], None, None)],
query
)
return return
chart = None chart = None
if bool(data.get('chart', 1)): if bool(data.get("chart", 1)):
chart = await create_chart( chart = await create_chart(
from_currency, from_currency,
conv_currency, conv_currency,
data.get('chart_period', 'month'), data.get("chart_period", "month"),
data.get('chart_backend', 'matplotlib') data.get("chart_backend", "matplotlib"),
) )
message = ( message = (
@ -105,13 +94,6 @@ async def currency(query: types.InlineQuery) -> None:
results = [(message, None, None)] results = [(message, None, None)]
if chart: if chart:
results.insert( results.insert(0, (message, None, chart))
0,
(
message,
None,
chart
)
)
await reply(result_id, results, query) await reply(result_id, results, query)

View file

@ -1,12 +1,10 @@
from typing import Optional, Tuple, List import json
from typing import List, Optional, Tuple
from aiogram import Router, types from aiogram import Router, types
from aiogram.filters import Command from aiogram.filters import Command
from aiogram.types import ( from aiogram.types import (CallbackQuery, InlineKeyboardButton,
InlineKeyboardMarkup, InlineKeyboardMarkup)
InlineKeyboardButton,
CallbackQuery,
)
import json
from bot import db from bot import db
from i18n.localization import I18n from i18n.localization import I18n
@ -31,17 +29,11 @@ PERIOD_OPTIONS: List[PeriodOption] = [
async def get_user_locale(user_id: int) -> dict: async def get_user_locale(user_id: int) -> dict:
data = await db.fetch( data = await db.fetch("SELECT lang FROM users WHERE user_id = $1", user_id)
'SELECT lang FROM users WHERE user_id = $1', user_id
)
if not data: if not data:
await db.insert( await db.insert("INSERT INTO users (user_id) VALUES (?)", user_id)
'INSERT INTO users (user_id) VALUES (?)', user_id data = await db.fetch("SELECT lang FROM users WHERE user_id = $1", user_id)
) return i18n.get_locale(data.get("lang", "en"))
data = await db.fetch(
'SELECT lang FROM users WHERE user_id = $1', user_id
)
return i18n.get_locale(data.get('lang', 'en'))
def build_options_keyboard( def build_options_keyboard(
@ -57,13 +49,9 @@ def build_options_keyboard(
for code, label_key in options: for code, label_key in options:
label = locale.get(label_key, label_key) label = locale.get(label_key, label_key)
text = ( text = f"[X] {label}" if code == current_value else label
f"[X] {label}" if code == current_value else label
)
row.append( row.append(
InlineKeyboardButton( InlineKeyboardButton(text=text, callback_data=f"{callback_prefix}_{code}")
text=text, callback_data=f"{callback_prefix}_{code}"
)
) )
if len(row) == buttons_per_row: if len(row) == buttons_per_row:
buttons.append(row) buttons.append(row)
@ -85,17 +73,11 @@ def build_options_keyboard(
def get_chart_toggle_keyboard( def get_chart_toggle_keyboard(
chart_enabled: bool, locale: dict chart_enabled: bool, locale: dict
) -> InlineKeyboardMarkup: ) -> InlineKeyboardMarkup:
toggle_text = ( toggle_text = locale["chart_disable"] if chart_enabled else locale["chart_enable"]
locale["chart_disable"]
if chart_enabled
else locale["chart_enable"]
)
return InlineKeyboardMarkup( return InlineKeyboardMarkup(
inline_keyboard=[ inline_keyboard=[
[ [
InlineKeyboardButton( InlineKeyboardButton(text=toggle_text, callback_data="chart_toggle"),
text=toggle_text, callback_data="chart_toggle"
),
InlineKeyboardButton( InlineKeyboardButton(
text=locale["chart_period"], text=locale["chart_period"],
callback_data="chart_period", callback_data="chart_period",
@ -134,9 +116,8 @@ async def safe_edit_message_text(
new_text_clean = new_text.strip() new_text_clean = new_text.strip()
is_text_same = current_text == new_text_clean is_text_same = current_text == new_text_clean
is_markup_same = ( is_markup_same = markup_to_json(message.reply_markup) == markup_to_json(
markup_to_json(message.reply_markup) new_reply_markup
== markup_to_json(new_reply_markup)
) )
if is_text_same and is_markup_same: if is_text_same and is_markup_same:
@ -168,20 +149,17 @@ async def settings_handler(message: types.Message):
] ]
) )
await message.answer( await message.answer(locale["settings_title"], reply_markup=settings_keyboard)
locale["settings_title"],
reply_markup=settings_keyboard
)
@router.callback_query(lambda c: c.data == "setting_lang") @router.callback_query(lambda c: c.data == "setting_lang")
async def show_language_menu(callback: CallbackQuery): async def show_language_menu(callback: CallbackQuery):
locale = await get_user_locale(callback.from_user.id) locale = await get_user_locale(callback.from_user.id)
data = await db.fetch( data = await db.fetch(
'SELECT lang FROM users WHERE user_id = $1', "SELECT lang FROM users WHERE user_id = $1", callback.from_user.id
callback.from_user.id
) )
current_lang = data.get('lang', 'en') current_lang = data.get("lang", "en")
keyboard = build_options_keyboard( keyboard = build_options_keyboard(
options=LANG_OPTIONS, options=LANG_OPTIONS,
@ -191,15 +169,14 @@ async def show_language_menu(callback: CallbackQuery):
back_callback="back_to_settings", back_callback="back_to_settings",
) )
await safe_edit_message_text( await safe_edit_message_text(callback, locale["choose_language"], keyboard)
callback, locale["choose_language"], keyboard
)
@router.callback_query(lambda c: c.data and c.data.startswith("lang_")) @router.callback_query(lambda c: c.data and c.data.startswith("lang_"))
async def language_selected(callback: CallbackQuery): async def language_selected(callback: CallbackQuery):
lang = callback.data.split("_")[1] lang = callback.data.split("_")[1]
await db.update( await db.update(
'UPDATE users SET lang = $1 WHERE user_id = $2', "UPDATE users SET lang = $1 WHERE user_id = $2",
lang, lang,
callback.from_user.id, callback.from_user.id,
) )
@ -213,12 +190,8 @@ async def language_selected(callback: CallbackQuery):
back_callback="back_to_settings", back_callback="back_to_settings",
) )
await safe_edit_message_text( await safe_edit_message_text(callback, locale["choose_language"], keyboard)
callback, locale["choose_language"], keyboard await callback.answer(locale["language_set"].format(lang=lang))
)
await callback.answer(
locale["language_set"].format(lang=lang)
)
@router.callback_query(lambda c: c.data == "setting_backend") @router.callback_query(lambda c: c.data == "setting_backend")
@ -226,10 +199,10 @@ async def show_backend_settings(callback: CallbackQuery):
locale = await get_user_locale(callback.from_user.id) locale = await get_user_locale(callback.from_user.id)
data = await db.fetch( data = await db.fetch(
'SELECT chart_backend, lang FROM users WHERE user_id = $1', "SELECT chart_backend, lang FROM users WHERE user_id = $1",
callback.from_user.id callback.from_user.id,
) )
current_backend = data['chart_backend'] current_backend = data["chart_backend"]
backend_label = locale.get(current_backend, current_backend) backend_label = locale.get(current_backend, current_backend)
keyboard = build_options_keyboard( keyboard = build_options_keyboard(
@ -241,9 +214,7 @@ async def show_backend_settings(callback: CallbackQuery):
) )
await safe_edit_message_text( await safe_edit_message_text(
callback, callback, f"{locale['choose_chart_backend']}", keyboard
f"{locale['choose_chart_backend']}",
keyboard
) )
@ -252,8 +223,9 @@ async def set_backend(callback: CallbackQuery):
backend = callback.data.split("_")[1] backend = callback.data.split("_")[1]
await db.update( await db.update(
'UPDATE users SET chart_backend = $1 WHERE user_id = $2', "UPDATE users SET chart_backend = $1 WHERE user_id = $2",
backend, callback.from_user.id backend,
callback.from_user.id,
) )
locale = await get_user_locale(callback.from_user.id) locale = await get_user_locale(callback.from_user.id)
@ -266,11 +238,7 @@ async def set_backend(callback: CallbackQuery):
back_callback="back_to_settings", back_callback="back_to_settings",
) )
await safe_edit_message_text( await safe_edit_message_text(callback, locale["choose_chart_backend"], keyboard)
callback,
locale["choose_chart_backend"],
keyboard
)
@router.callback_query(lambda c: c.data == "back_to_settings") @router.callback_query(lambda c: c.data == "back_to_settings")
@ -304,7 +272,7 @@ async def back_to_settings(callback: CallbackQuery):
@router.callback_query(lambda c: c.data == "setting_chart") @router.callback_query(lambda c: c.data == "setting_chart")
async def show_chart_settings(callback: CallbackQuery): async def show_chart_settings(callback: CallbackQuery):
data = await db.fetch( data = await db.fetch(
'SELECT * FROM users WHERE user_id = $1', "SELECT * FROM users WHERE user_id = $1",
callback.from_user.id, callback.from_user.id,
) )
lang = data.get("lang", "en") lang = data.get("lang", "en")
@ -313,9 +281,11 @@ async def show_chart_settings(callback: CallbackQuery):
chart_status = bool(data.get("chart", 1)) chart_status = bool(data.get("chart", 1))
period = data.get("chart_period") period = data.get("chart_period")
status_text = locale.get("enabled", "Enabled") \ status_text = (
if chart_status \ locale.get("enabled", "Enabled")
else locale.get("disabled", "Disabled") if chart_status
else locale.get("disabled", "Disabled")
)
period_text = locale.get(period, period) period_text = locale.get(period, period)
text = ( text = (
@ -332,8 +302,7 @@ async def show_chart_settings(callback: CallbackQuery):
@router.callback_query(lambda c: c.data == "chart_toggle") @router.callback_query(lambda c: c.data == "chart_toggle")
async def toggle_chart(callback: CallbackQuery): async def toggle_chart(callback: CallbackQuery):
data = await db.fetch( data = await db.fetch(
'SELECT chart, lang FROM users WHERE user_id = $1', "SELECT chart, lang FROM users WHERE user_id = $1", callback.from_user.id
callback.from_user.id
) )
lang = data.get("lang", "en") lang = data.get("lang", "en")
locale = i18n.get_locale(lang) locale = i18n.get_locale(lang)
@ -342,8 +311,9 @@ async def toggle_chart(callback: CallbackQuery):
new_status = not current_status new_status = not current_status
await db.update( await db.update(
'UPDATE users SET chart = $1 WHERE user_id = $2', "UPDATE users SET chart = $1 WHERE user_id = $2",
new_status, callback.from_user.id new_status,
callback.from_user.id,
) )
await show_chart_settings(callback) await show_chart_settings(callback)
@ -352,8 +322,7 @@ async def toggle_chart(callback: CallbackQuery):
@router.callback_query(lambda c: c.data == "chart_period") @router.callback_query(lambda c: c.data == "chart_period")
async def change_chart_period(callback: CallbackQuery): async def change_chart_period(callback: CallbackQuery):
data = await db.fetch( data = await db.fetch(
'SELECT chart_period, lang FROM users WHERE user_id = $1', "SELECT chart_period, lang FROM users WHERE user_id = $1", callback.from_user.id
callback.from_user.id
) )
lang = data.get("lang", "en") lang = data.get("lang", "en")
locale = i18n.get_locale(lang) locale = i18n.get_locale(lang)
@ -368,19 +337,16 @@ async def change_chart_period(callback: CallbackQuery):
back_callback="setting_chart", back_callback="setting_chart",
) )
await safe_edit_message_text( await safe_edit_message_text(callback, locale["choose_period"], keyboard)
callback,
locale["choose_period"],
keyboard
)
@router.callback_query(lambda c: c.data and c.data.startswith("period_")) @router.callback_query(lambda c: c.data and c.data.startswith("period_"))
async def set_chart_period(callback: CallbackQuery): async def set_chart_period(callback: CallbackQuery):
period = callback.data.split("_")[1] period = callback.data.split("_")[1]
await db.update( await db.update(
'UPDATE users SET chart_period = $1 WHERE user_id = $2', "UPDATE users SET chart_period = $1 WHERE user_id = $2",
period, callback.from_user.id period,
callback.from_user.id,
) )
locale = await get_user_locale(callback.from_user.id) locale = await get_user_locale(callback.from_user.id)
@ -393,8 +359,4 @@ async def set_chart_period(callback: CallbackQuery):
back_callback="setting_chart", back_callback="setting_chart",
) )
await safe_edit_message_text( await safe_edit_message_text(callback, locale["choose_period"], keyboard)
callback,
locale["choose_period"],
keyboard
)

View file

@ -1,26 +1,28 @@
from aiogram import types, Router
from aiogram.filters import CommandStart
import re import re
from aiogram import Router, types
from aiogram.filters import CommandStart
from bot import bot, db from bot import bot, db
from i18n.localization import I18n from i18n.localization import I18n
router = Router() router = Router()
i18n = I18n() i18n = I18n()
def escape_md_v2(text: str) -> str: def escape_md_v2(text: str) -> str:
return re.sub(r'([_*\[\]()~#+\-=|{}.!\\])', r'\\\1', text) return re.sub(r"([_*\[\]()~#+\-=|{}.!\\])", r"\\\1", text)
@router.message(CommandStart()) @router.message(CommandStart())
async def start(message: types.Message) -> None: async def start(message: types.Message) -> None:
get_bot = await bot.get_me() get_bot = await bot.get_me()
data = await db.fetch( data = await db.fetch(
'SELECT lang FROM users WHERE user_id = $1', "SELECT lang FROM users WHERE user_id = $1", message.from_user.id
message.from_user.id
) )
locale = i18n.get_locale(data.get('lang')) locale = i18n.get_locale(data.get("lang"))
raw_template = locale.get("start_message") raw_template = locale.get("start_message")
raw_text = raw_template.format(bot_username=get_bot.username) raw_text = raw_template.format(bot_username=get_bot.username)
@ -28,15 +30,14 @@ async def start(message: types.Message) -> None:
button_text = locale.get("source_code_button") button_text = locale.get("source_code_button")
keyboard = types.InlineKeyboardMarkup(inline_keyboard=[ keyboard = types.InlineKeyboardMarkup(
[types.InlineKeyboardButton( inline_keyboard=[
text=button_text, [
url="https://github.com/redume/shirino") types.InlineKeyboardButton(
text=button_text, url="https://github.com/redume/shirino"
)
]
] ]
])
await message.reply(
text,
parse_mode="MarkdownV2",
reply_markup=keyboard
) )
await message.reply(text, parse_mode="MarkdownV2", reply_markup=keyboard)

View file

@ -1,12 +1,13 @@
import json import json
from datetime import date, datetime from datetime import date, datetime
from pathlib import Path from pathlib import Path
from typing import Optional, List, Dict, Any from typing import Any, Dict, List, Optional
import aiosqlite import aiosqlite
import yaml import yaml
config = yaml.safe_load(open('../config.yaml', 'r', encoding='utf-8')) config = yaml.safe_load(open("../config.yaml", "r", encoding="utf-8"))
def custom_encoder(obj): def custom_encoder(obj):
""" """
@ -19,6 +20,7 @@ def custom_encoder(obj):
return obj.isoformat() return obj.isoformat()
raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable") raise TypeError(f"Object of type {obj.__class__.__name__} is not JSON serializable")
class Database: class Database:
""" """
Asynchronous SQLite database handler using aiosqlite. Asynchronous SQLite database handler using aiosqlite.
@ -65,16 +67,15 @@ class Database:
self.db_path = db_path self.db_path = db_path
self.conn: Optional[aiosqlite.Connection] = None self.conn: Optional[aiosqlite.Connection] = None
async def _create_table(self) -> None: async def _create_table(self) -> None:
""" """
Create table from SQL file using aiosqlite. Create table from SQL file using aiosqlite.
Reads SQL commands from 'schemas/data.sql' Reads SQL commands from 'schemas/data.sql'
and executes them as a script. and executes them as a script.
""" """
sql_file = Path(__file__).parent / "schemas" / "data.sql" sql_file = Path(__file__).parent / "schemas" / "data.sql"
sql = sql_file.read_text(encoding='utf-8') sql = sql_file.read_text(encoding="utf-8")
async with self.conn.execute("BEGIN"): async with self.conn.execute("BEGIN"):
await self.conn.executescript(sql) await self.conn.executescript(sql)
@ -139,12 +140,13 @@ class Database:
async with self.conn.execute(query, args) as cursor: async with self.conn.execute(query, args) as cursor:
rows = await cursor.fetchall() rows = await cursor.fetchall()
return json.loads( return (
json.dumps( json.loads(
[dict(row) for row in rows], json.dumps([dict(row) for row in rows], default=custom_encoder)
default=custom_encoder
) )
) if rows else [] if rows
else []
)
async def insert(self, query: str, *args) -> Dict[str, Any]: async def insert(self, query: str, *args) -> Dict[str, Any]:
""" """

View file

@ -9,14 +9,15 @@ import yaml
from utils.format_number import format_number from utils.format_number import format_number
config = yaml.safe_load(open('../config.yaml', 'r', encoding='utf-8')) config = yaml.safe_load(open("../config.yaml", "r", encoding="utf-8"))
class Converter: class Converter:
def __init__(self): def __init__(self):
self.amount: float = 1.0 self.amount: float = 1.0
self.conv_amount: float = 0.0 self.conv_amount: float = 0.0
self.from_currency: str = '' self.from_currency: str = ""
self.conv_currency: str = '' self.conv_currency: str = ""
async def convert(self) -> None: async def convert(self) -> None:
if not await self.kekkai(): if not await self.kekkai():
@ -27,68 +28,66 @@ class Converter:
async def get_lastdate(self) -> str: async def get_lastdate(self) -> str:
async with aiohttp.ClientSession( async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=3) timeout=aiohttp.ClientTimeout(total=3)
) as session: ) as session:
async with session.get( async with session.get(f"{config['kekkai_instance']}/api/metadata") as res:
f"{config['kekkai_instance']}/api/metadata"
) as res:
if not HTTPStatus(res.status).is_success: if not HTTPStatus(res.status).is_success:
return ( return (datetime.now() - timedelta(1)).strftime("%Y-%m-%d")
datetime.now() - timedelta(1)
).strftime('%Y-%m-%d')
data = await res.json() data = await res.json()
return data.get( return data.get(
'last_date', "last_date", (datetime.now() - timedelta(1)).strftime("%Y-%m-%d")
(datetime.now() - timedelta(1)).strftime('%Y-%m-%d') )
)
async def kekkai(self) -> bool: async def kekkai(self) -> bool:
date = await self.get_lastdate() date = await self.get_lastdate()
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3)) as session: async with aiohttp.ClientSession(
async with session.get(f'{config['kekkai_instance']}/api/getRate/', params={ timeout=aiohttp.ClientTimeout(total=3)
'from_currency': self.from_currency, ) as session:
'conv_currency': self.conv_currency, async with session.get(
'date': date, f"{config['kekkai_instance']}/api/getRate/",
'conv_amount': self.amount params={
}) as res: "from_currency": self.from_currency,
"conv_currency": self.conv_currency,
"date": date,
"conv_amount": self.amount,
},
) as res:
if not HTTPStatus(res.status).is_success: if not HTTPStatus(res.status).is_success:
return False return False
data = await res.json() data = await res.json()
self.conv_amount = data.get('conv_amount', 0.0) self.conv_amount = data.get("conv_amount", 0.0)
return True return True
async def ddg(self) -> None: async def ddg(self) -> None:
async with aiohttp.ClientSession( async with aiohttp.ClientSession(
timeout=aiohttp.ClientTimeout(total=3) timeout=aiohttp.ClientTimeout(total=3)
) as session: ) as session:
async with session.get( async with session.get(
'https://duckduckgo.com/js/spice/currency/' "https://duckduckgo.com/js/spice/currency/"
f'{self.amount}/{self.from_currency}/{self.conv_currency}' f"{self.amount}/{self.from_currency}/{self.conv_currency}"
) as res: ) as res:
data_text = await res.text() data_text = await res.text()
data = json.loads(re.findall(r'\(\s*(.*)\s*\);$', data_text)[0]) data = json.loads(re.findall(r"\(\s*(.*)\s*\);$", data_text)[0])
for key in ['terms', 'privacy', 'timestamp']: for key in ["terms", "privacy", "timestamp"]:
data.pop(key, None) data.pop(key, None)
if not data.get('to'): if not data.get("to"):
raise RuntimeError( raise RuntimeError(
'Failed to get the exchange rate from DuckDuckGo' "Failed to get the exchange rate from DuckDuckGo"
) )
conv = data.get('to')[0] conv = data.get("to")[0]
conv_amount = conv.get('mid') conv_amount = conv.get("mid")
if conv_amount is None: if conv_amount is None:
raise RuntimeError( raise RuntimeError("Error when converting currency via DuckDuckGo")
'Error when converting currency via DuckDuckGo'
)
self.conv_amount = float(conv_amount) self.conv_amount = float(conv_amount)

View file

@ -1,35 +1,34 @@
import time
from http import HTTPStatus from http import HTTPStatus
from urllib.parse import urlencode from urllib.parse import urlencode
import time
import yaml
import aiohttp import aiohttp
import yaml
config = yaml.safe_load(open("../config.yaml", "r", encoding="utf-8"))
config = yaml.safe_load(open('../config.yaml', 'r', encoding='utf-8'))
async def create_chart( async def create_chart(
from_currency: str, from_currency: str, conv_currency: str, period: str, backend: str
conv_currency: str, ) -> (str, None):
period: str,
backend: str) -> (str, None):
params = { params = {
'from_currency': from_currency, "from_currency": from_currency,
'conv_currency': conv_currency, "conv_currency": conv_currency,
'period': period, "period": period,
'backend': backend, "backend": backend,
'time_unique': time.time() "time_unique": time.time(),
} }
# Without time_unqiue Telegram does not want to load the image # Without time_unqiue Telegram does not want to load the image
# Probably because of some kind of caching, but it's disabled. # Probably because of some kind of caching, but it's disabled.
base_url = f'{config["kekkai_instance"]}/api/getChart/' base_url = f'{config["kekkai_instance"]}/api/getChart/'
query_string = urlencode(params) query_string = urlencode(params)
full_url = f'{base_url}?{query_string}' full_url = f"{base_url}?{query_string}"
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3)) as session: async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=3)) as session:
async with session.get(full_url) as res: async with session.get(full_url) as res:
if not HTTPStatus(res.status).is_success: if not HTTPStatus(res.status).is_success:
return None return None
return full_url return full_url

View file

@ -1,6 +1,7 @@
import yaml
from pathlib import Path from pathlib import Path
import yaml
class I18n: class I18n:
"""Load every YAML file in i18n/locales and let you pull out a single-language dict.""" """Load every YAML file in i18n/locales and let you pull out a single-language dict."""
@ -21,7 +22,9 @@ class I18n:
def get_locale(self, lang: str | None = None) -> dict: def get_locale(self, lang: str | None = None) -> dict:
"""Return the whole dictionary for one language (fallback → default_lang).""" """Return the whole dictionary for one language (fallback → default_lang)."""
lang = (lang or self.default_lang).lower()[:2] lang = (lang or self.default_lang).lower()[:2]
lang_dict = self.translations.get(lang, self.translations.get(self.default_lang, {})) lang_dict = self.translations.get(
lang, self.translations.get(self.default_lang, {})
)
fallback_dict = self.translations.get(self.default_lang, {}) fallback_dict = self.translations.get(self.default_lang, {})
merged_dict = { merged_dict = {
@ -29,4 +32,4 @@ class I18n:
for key in set(lang_dict) | set(fallback_dict) for key in set(lang_dict) | set(fallback_dict)
} }
return merged_dict return merged_dict

28
main.py
View file

@ -1,23 +1,23 @@
import yaml import yaml
from aiogram import Dispatcher
from aiogram.webhook.aiohttp_server import (SimpleRequestHandler,
setup_application)
from aiohttp import web from aiohttp import web
from aiogram import Dispatcher
from aiogram.webhook.aiohttp_server import SimpleRequestHandler, setup_application
from commands import currency, start, settings
from bot import bot, db from bot import bot, db
from commands import currency, settings, start
config = yaml.safe_load(open("../config.yaml", "r", encoding="utf-8"))
config = yaml.safe_load(open('../config.yaml', 'r', encoding='utf-8'))
async def on_startup(bot: bot) -> None: async def on_startup(bot: bot) -> None:
await db.connect() await db.connect()
await db._create_table() await db._create_table()
await bot.set_webhook( await bot.set_webhook(
f"{config['webhook']['base_url']}{config['webhook']['path']}", f"{config['webhook']['base_url']}{config['webhook']['path']}",
secret_token=config['webhook']['secret_token'], secret_token=config["webhook"]["secret_token"],
allowed_updates=['inline_query', 'message', 'callback_query'] allowed_updates=["inline_query", "message", "callback_query"],
) )
async def on_shutdown(): async def on_shutdown():
@ -36,16 +36,14 @@ def main() -> None:
app = web.Application() app = web.Application()
webhook_requests_handler = SimpleRequestHandler( webhook_requests_handler = SimpleRequestHandler(
dispatcher=dp, dispatcher=dp, bot=bot, secret_token=config["webhook"]["secret_token"]
bot=bot,
secret_token=config['webhook']['secret_token']
) )
webhook_requests_handler.register(app, path=config['webhook']['path']) webhook_requests_handler.register(app, path=config["webhook"]["path"])
setup_application(app, dp, bot=bot) setup_application(app, dp, bot=bot)
web.run_app(app, host='0.0.0.0', port=443) web.run_app(app, host="0.0.0.0", port=443)
if __name__ == '__main__': if __name__ == "__main__":
main() main()

View file

@ -1,22 +1,22 @@
from decimal import Decimal from decimal import Decimal
def format_number(number): def format_number(number):
number = Decimal(str(number)) number = Decimal(str(number))
integer_part = number // 1 integer_part = number // 1
fractional_part = number - integer_part fractional_part = number - integer_part
formatted_integer = '{:,.0f}'.format(integer_part).replace(',', ' ') formatted_integer = "{:,.0f}".format(integer_part).replace(",", " ")
if fractional_part == 0: if fractional_part == 0:
return formatted_integer return formatted_integer
fractional_str = f"{fractional_part:.30f}".split('.')[1] fractional_str = f"{fractional_part:.30f}".split(".")[1]
first_non_zero = next( first_non_zero = next(
(i for i, char in enumerate(fractional_str) if char != '0'), (i for i, char in enumerate(fractional_str) if char != "0"), len(fractional_str)
len(fractional_str) )
) result_fractional = fractional_str[: first_non_zero + 3]
result_fractional = fractional_str[:first_non_zero + 3] result_fractional = result_fractional.rstrip("0")
result_fractional = result_fractional.rstrip('0')
if not result_fractional: if not result_fractional:
return formatted_integer return formatted_integer

View file

@ -2,8 +2,10 @@ import re
from aiogram import types from aiogram import types
def esc_md(text: str) -> str: def esc_md(text: str) -> str:
return re.sub(r'([_*\[\]()~`>#+\-=|{}.!\\])', r'\\\1', text) return re.sub(r"([_*\[\]()~`>#+\-=|{}.!\\])", r"\\\1", text)
async def reply(result_id: str, args: list, query: types.InlineQuery) -> None: async def reply(result_id: str, args: list, query: types.InlineQuery) -> None:
if not args: if not args:
@ -24,7 +26,7 @@ async def reply(result_id: str, args: list, query: types.InlineQuery) -> None:
title=title, title=title,
description=description, description=description,
caption=esc_md(title), caption=esc_md(title),
parse_mode="MarkdownV2" parse_mode="MarkdownV2",
) )
else: else:
article = types.InlineQueryResultArticle( article = types.InlineQueryResultArticle(