Shirino/main.py

249 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
import json
import hashlib
import logging
import string
from typing import Any, Dict, List, Optional
import requests
from pydantic import BaseSettings
from aiogram import Bot, types # type: ignore
from aiogram.dispatcher import Dispatcher # type: ignore
from aiogram.utils import executor # type: ignore
from aiogram.types import InlineQuery # type: ignore
from aiogram.types import InlineQueryResultArticle
from aiogram.types import InputTextMessageContent
# Constants
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
# ---
# Config from .env
class Settings(BaseSettings):
debug: bool
timeout: int = 2
ndigits: int = 3
coinapi_keys: List[str]
telegram_token: str
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
settings = Settings() # type: ignore
# ---
# Logging
log = logging.getLogger('shirino')
handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(fmt)
log.addHandler(handler)
if settings.debug:
handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)
# ---
coinapi_len = len(settings.coinapi_keys)
coinapi_active = [0] # API key index
bot = Bot(token=settings.telegram_token)
dp = Dispatcher(bot)
class CurrencyConverter:
def __init__(self) -> None:
self.amount: float = 1.0
self.conv_amount: float = 0.0
self.from_currency = ''
self.conv_currency = ''
def convert(self) -> None:
"""Currency conversion"""
if not self.ddgapi():
self.coinapi()
str_amount = f'{self.conv_amount}'
point = str_amount.find(".")
after_point = str_amount[point + 1:]
fnz = min( # index of first non-zero digit
(
after_point.index(i)
for i in string.digits[1:]
if i in after_point
),
default=-1,
)
if fnz == -1:
# it is an integer like 81.0
return
# how many digits should be after the point:
# ndigits (3 by default) after first non-zero
ndigits = fnz + settings.ndigits
self.conv_amount = round(self.conv_amount, ndigits)
def ddgapi(self) -> bool:
"""Get data from DuckDuckGo's currency API
Returns:
`False` if the currency does not exist,
`True` on successful conversion
"""
# API request
resp = requests.get(
(
f'{DDG_URL}/{self.amount}/{self.from_currency}'
f'/{self.conv_currency}'
),
timeout=settings.timeout,
)
log.debug(resp.text)
# Parsing JSON data
data: Dict[str, Any] = json.loads(
resp.text
.replace('ddg_spice_currency(', '')
.replace(');', '')
)
log.debug(data)
# If the currency does not exist
descr = data.get('headers', {}).get('description', '')
if descr.find('ERROR') != -1:
return False
# Otherwise
conv: Dict[str, str] = data.get('conversion', {})
conv_amount = conv.get('converted-amount')
if conv_amount is None:
raise RuntimeError('Ошибка при конвертации через DDG')
self.conv_amount = float(conv_amount)
log.debug(conv)
return True
def coinapi(self, depth: int = coinapi_len) -> None:
"""Get data from CoinAPI (for cryptocurrencies)
Args:
depth (int, optional): Counter protecting from infinite recursion
"""
if depth <= 0:
raise RecursionError('Рейтлимит на всех токенах')
resp = requests.get(
(
f'{COINAPI_URL}/{self.from_currency}'
f'/{self.conv_currency}'
),
headers={
'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]],
},
timeout=settings.timeout,
)
if resp.status_code == 429:
log.warning('CoinAPI ratelimited, rotating token')
rotate_token(settings.coinapi_keys, coinapi_active)
self.coinapi(depth - 1)
data: Dict[str, Any] = resp.json()
rate = data.get('rate')
if rate is None:
raise RuntimeError('Не удалось получить курс валюты от CoinAPI')
self.conv_amount = float(rate * self.amount)
def rotate_token(lst: List[str], active: List[int]) -> None:
"""Rotates API key to prevent ratelimits
Args:
lst (List[str]): Keys list
active (List[str]): Mutable object with current key index
"""
active[0] = (active[0] + 1) % len(lst)
@dp.inline_handler()
async def currency(inline_query: InlineQuery) -> None:
query = inline_query.query
article: List[Optional[InlineQueryResultArticle]] = [None]
text = query.split()
len_ = len(text)
result_id = hashlib.md5(query.encode()).hexdigest()
conv = CurrencyConverter()
try:
if len_ == 3:
conv.amount = float(text[0])
conv.from_currency = text[1].upper()
conv.conv_currency = text[2].upper()
conv.convert()
elif len_ == 2:
conv.from_currency = text[0].upper()
conv.conv_currency = text[1].upper()
conv.convert()
else:
raise ValueError('Надо 2 или 3 аргумента')
result = (
f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
)
except Exception as ex:
result = f'{type(ex).__name__}: {ex}'
article[0] = InlineQueryResultArticle(
id=result_id,
title=result,
input_message_content=InputTextMessageContent(
message_text=result,
),
)
await inline_query.answer(
article, # type: ignore
cache_time=1,
is_personal=True,
)
@dp.message_handler(commands=['start'])
async def start(message: types.Message):
await message.answer("Hi! Bot can show the exchange rate of crypto and fiat currency. "
"The bot is used through the inline commands "
"`@shirino_bot 12 usd rub` or `@shirino_bot usd rub`"
"\n\nThe bot is open source on [Github](https://github.com/redume/shirino)",
parse_mode="markdown")
executor.start_polling(dp, skip_updates=True)