Compare commits

..

23 Commits

Author SHA1 Message Date
4d53a73e3c Update README.md 2023-07-18 13:31:34 +03:00
d82d80aced Скрипт повторной отправки пользователя 2023-07-18 13:19:56 +03:00
e5794029ab Игнор ошибки при валидации 2023-07-18 11:00:36 +03:00
f51a3ca403 Поправлен вывод ошибки 2023-07-05 10:40:30 +03:00
0dfe1d8b7e Фикс ошибки неправильной загрузки обработки данных из getnativeiblockelementslist 2023-07-04 15:10:15 +03:00
9121013eb8 Данные повторно отправляться не будут, если доставка была с ошибкой 2023-06-27 12:29:32 +03:00
76fafc90e1 Теперь по умолчанию поле со льготами отправляет значение 8 2023-06-26 11:48:51 +03:00
grucshetskyaleksei
8b55667c94 Добавил обработку ServerDisconnectedError в backoff 2023-06-22 02:48:20 +03:00
grucshetskyaleksei
d0f18393ab Убрал предварительное форматирование в json 2023-06-21 12:14:52 +03:00
grucshetskyaleksei
c5be39f749 Добавил отображение в json данных, которые отправляются принимающей стороне 2023-06-08 13:33:28 +03:00
grucshetskyaleksei
b3dd87811c Вынес определение TG-бота только в случае, если он используется для уведомлений 2023-06-08 13:32:29 +03:00
ab67698655 Реализовл подгрузку .env через docker-compose 2023-06-06 14:40:06 +03:00
2c501cb59e Убрал лишний print 2023-06-06 14:23:38 +03:00
80f6f04e38 Обновил Readme и env.dist для возможности отправки уведомлений 2023-06-06 14:22:37 +03:00
ee19e9be1e Добавил возможность получения уведомления о старте обработчика в ТГ 2023-06-06 14:16:32 +03:00
d3432ebd25 Вынес часть настроек в MiscSettings 2023-06-06 14:15:25 +03:00
81dc7a1527 Изменил(а) на 'README.md' 2023-06-06 10:09:04 +00:00
7a10529009 Изменил(а) на 'README.md' 2023-06-06 10:03:11 +00:00
fadb275242 Убрал ненужную обработку Benefits 2023-06-06 11:50:22 +03:00
b5c046ea25 Добавил исключение ClientConnectorError 2023-06-06 11:28:44 +03:00
b7272a6e1e Вывел получение данных в единую функцию 2023-06-06 11:28:20 +03:00
4fc9b0fc64 Изменил время ожидания aiohttp 2023-06-06 10:20:45 +03:00
ca8860b4c4 Изменил время ожидания aiohttp 2023-06-05 17:52:59 +03:00
12 changed files with 206 additions and 69 deletions

View File

@@ -6,7 +6,11 @@ DESTINATION_PORT=8000
DESTINATION_PROTOCOL=http
SIGNED=ERIGJHEJKRGH
# ETL_PROCESS
LAST_ID=0
MAX_WAIT_SIZE=60
# Notify
USE_NOTIFY=False
BOT_TOKEN=2512356:SEFijdsfiojAEFDMSD
CHAT_ID=53242346

View File

@@ -11,6 +11,4 @@ RUN python -m pip install --no-cache-dir --upgrade pip \
COPY /src .
COPY .env .
CMD ["python", "main.py"]

View File

@@ -8,7 +8,7 @@
- Docker-compose
- Git
### Шаги для запуска
### Шаги для запуска в Docker
1. Клонируйте репозиторий на вашу машину при помощи команды `git clone https://git.jsdio.ru/jsdio/ips_etl.git`.
2. Переименуйте файл `.env.dist` в `.env`.
3. Откройте файл `.env` и замените значения переменных на свои:
@@ -18,6 +18,28 @@
- `DESTINATION_HOST`: адрес хоста назначения.
- `DESTINATION_PORT`: порт назначения.
- `DESTINATION_PROTOCOL`: протокол назначения (http или https).
- `SIGNED`: ключ подписи, должен совпадать на обеих сторонах.
- `LAST_ID`: последний успешно обработанный ID.
- `MAX_WAIT_SIZE`: максимальное время ожидания backoff.
- `USE_NOTIFY`: уведомлять ли о ходе работы обработчика в ТГ, `True` либо `False`
- `BOT_TOKEN`: токен Telegram-бота.
- `CHAT_ID`: ID чата, куда отсылать уведомление.
4. После настройки переменных, запустите проект при помощи docker-compose командой `docker-compose up -d`.
### Шаги для запуска локально
1. Выполнить п.1-3 из предыдущего списка
2. Выполнить команду `pip install -r requirements.txt`
3. Запускать main.py из директории src
## Повторная отправка пользователя
Запустить `manage.py` из директории src c опцией `resend` и атрибутами `iblock-id` и `element-id`.
Подробее `manage.py resend --help`
### Примеры:
### Повторная отправка пользователя локально
`python manage.py resend --iblock-id 14 --element-id 2981`
### Повторная отправка пользователя в Docker
`docker exec etl python manage.py resend --iblock-id 14 --element-id 2981`
**Внимание!** Данные всех абитуриентов, следующих за исправляемым ID, должны быть предварительно исправлены
либо ненужные абитуриенты должны быть отключены через административную панель.

View File

@@ -4,3 +4,5 @@ services:
build: .
container_name: etl
restart: always
env_file:
- .env

View File

@@ -1,5 +1,9 @@
pydantic==1.10.7
environs==9.5.0
requests==2.30.0
aiogram~=2.25.1
aiohttp~=3.8.4
backoff~=2.2.1
environs==9.5.0
pydantic==1.10.7
requests==2.30.0
typer==0.9.0

View File

@@ -4,26 +4,45 @@ import os
import aiohttp
import backoff
import pydantic.error_wrappers
from environs import load_dotenv
from models import Abitr
from settings import ApiConfig, EtlConfig
from state import State
from settings import ApiConfig, EtlConfig, MiscSettings
from state import State, BaseStorage
from tg_bot import TgBot
load_dotenv()
etl_config = EtlConfig()
misc_settings = MiscSettings()
class ApiExtractor:
def __init__(self):
self.api_config = ApiConfig()
self.headers = {"Authorization-Token": self.api_config.token}
self.headers = {
"Authorization-Token": self.api_config.token,
}
self.fields = ["ID", "IBLOCK_ID", "NAME", "CODE", "SECTION_ID"]
async def __get_data(self, url, data):
async with aiohttp.ClientSession() as session:
async with session.get(url=url, headers=self.headers,
data=data) as resp:
try:
return json.loads(await resp.text())
except json.JSONDecodeError:
logging.error(f'Получены неверные данные - {await resp.text()}')
return {}
@backoff.on_exception(backoff.expo, (aiohttp.ClientResponseError, aiohttp.ClientConnectorError,
aiohttp.ServerDisconnectedError), base=2, factor=1,
max_value=etl_config.max_wait_size, max_tries=None)
async def get_extract_data(self, state: State, iblock_id: int, fields: list = None, **kwargs) -> list[Abitr]:
max_value=int(misc_settings.max_wait_size), max_tries=None)
async def get_extract_data(self, state: State, iblock_id: int, storage: BaseStorage, fields: list = None,
resend: bool = False, element_id: int = 0, **kwargs) -> list[Abitr]:
data = {
'iblockId': iblock_id,
@@ -33,25 +52,27 @@ class ApiExtractor:
data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False)
else:
data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False)
if not resend:
min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0
if os.getenv('LAST_ID').isdigit():
if int(min_id) < int(os.getenv('LAST_ID')):
min_id = os.getenv('LAST_ID')
else:
min_id = element_id
data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False)
abitrs = []
async with aiohttp.ClientSession() as session:
async with session.get(f'{self.api_config.host}/getnativeiblockelementslist', headers=self.headers,
data=data) as resp:
results = json.loads(await resp.text())
# Получаем всех новых абитуриентов
results = await self.__get_data(f'{self.api_config.host}/getnativeiblockelementslist', data)
if len(results) > 0:
logging.info(f'Получение абитуриентов ID > {min_id}. Iblock - {iblock_id}')
for result in results:
data = {
'elementId': result,
}
async with session.get(f'{self.api_config.host}/getiblockelement/', headers=self.headers,
data=data) as resp:
res = json.loads(await resp.text())
# Получаем информацию об конкретном абитуриенте
res = await self.__get_data(f'{self.api_config.host}/getiblockelement', data)
res = res[str(result)]
class_attrs = vars(Abitr)['__annotations__']
try:
@@ -63,14 +84,20 @@ class ApiExtractor:
for key, value in class_attrs.items():
if key in res:
abitr[key] = res[key]['VALUE']
if res['BENEFITS']['VALUE'] != '-' or res['BENEFITS2']['VALUE'] != '-':
if res['BENEFITS']['VALUE'] != '-':
abitr['BENEFITS'] = res['BENEFITS']['VALUE']
abitr['PRIVILEGES_QUESTION'] = 2
if res['BENEFITS2']['VALUE'] != '-':
abitr['BENEFITS'] = res['BENEFITS2']['VALUE']
abitr['PRIVILEGES_QUESTION'] = 1
if abitr['BENEFITS'] == '' or abitr['BENEFITS'] is None:
abitr['BENEFITS'] = 8
try:
abitrs.append(Abitr(**{key: value for key, value in abitr.items()}))
except pydantic.error_wrappers.ValidationError as e:
if not resend:
logging.info(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
tgbot = TgBot()
await tgbot.send_notify(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
state.set_state(f'iblock_{iblock_id}', abitr["ID"])
storage.save_state(state.local_state)
else:
raise ValueError(f'Данные абитуриента {abitr["ID"]} невалидны. {e}')
if len(abitrs) > 0:
logging.info(f'Получено абитуриентов - {len(abitrs)}')
return abitrs

View File

@@ -1,18 +1,17 @@
import asyncio
import json
import logging
import time
import aiohttp
import backoff as backoff
from environs import load_dotenv
from models import Abitr
from settings import ApiConfig, EtlConfig
from settings import ApiConfig, EtlConfig, MiscSettings
from state import State, BaseStorage
load_dotenv()
etl_config = EtlConfig()
misc_settings = MiscSettings()
class EtlLoader:
@@ -21,23 +20,25 @@ class EtlLoader:
self.etl = EtlConfig()
self.api_config = ApiConfig()
@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, ), base=2, factor=1,
max_value=etl_config.max_wait_size, max_tries=None)
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage):
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage, resend: bool = False):
async with aiohttp.ClientSession() as session:
logging.info(f"Информация об абитуриенте: {abitr.FIO}")
data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'}
data_dict.pop('ID')
logging.info('Отправка данных к принимающей стороне... ')
data_dict = json.loads(json.dumps(data_dict, ensure_ascii=False))
logging.info(f'Данные: \n{json.dumps(data_dict, ensure_ascii=False)}')
try:
async with session.post(
url=f"{self.etl.protocol}://{self.etl.host}:{self.etl.port}",
data=data_dict,
timeout=aiohttp.ClientTimeout(total=15)
timeout=aiohttp.ClientTimeout(total=int(misc_settings.max_wait_size))
) as response:
pass
logging.info('Данные успешно доставлены!')
except (asyncio.TimeoutError, aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError):
logging.error('Данные доставлены с ошибкой')
if not resend:
state.set_state(f'iblock_{iblock_id}', abitr.ID)
storage.save_state(state.local_state)
logging.info('Данные успешно доставлены!')
await asyncio.sleep(1)

View File

@@ -6,8 +6,9 @@ from environs import load_dotenv
from extractor import ApiExtractor
from loader import EtlLoader
from settings import ApiConfig
from settings import ApiConfig, MiscSettings
from state import JsonFileStorage, State
from tg_bot import TgBot
load_dotenv()
@@ -20,10 +21,16 @@ async def main():
api = ApiConfig()
state = State(storage=storage)
loader = EtlLoader()
misc_settings = MiscSettings()
logging.info('Обработчик готов к работе.')
if misc_settings.use_notify:
tgbot = TgBot()
await tgbot.send_notify('Обработчик запущен.')
while True:
for iblock_id in api.iblocks:
abitrs = await extractor.get_extract_data(iblock_id=int(iblock_id), state=state)
abitrs = await extractor.get_extract_data(iblock_id=int(iblock_id), state=state, storage=storage)
for abitr in abitrs:
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id))
await asyncio.sleep(10)

50
src/manage.py Normal file
View File

@@ -0,0 +1,50 @@
import asyncio
import typer
from extractor import ApiExtractor
from loader import EtlLoader
from state import JsonFileStorage, State
app = typer.Typer()
async def resend_user(iblock_id: int, element_id: int):
extractor = ApiExtractor()
storage = JsonFileStorage()
state = State(storage=storage)
loader = EtlLoader()
abitrs = await extractor.get_extract_data(
iblock_id=iblock_id,
element_id=element_id - 1,
state=state,
storage=storage,
resend=True
)
if len(abitrs) == 0:
raise ValueError('Не нашлось абитуриентов с заданным условием')
if element_id not in [abitr.ID for abitr in abitrs]:
raise ValueError('Не нашлось абитуриентов с заданным условием')
for abitr in abitrs:
if abitr.ID != element_id:
continue
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id), resend=True)
@app.command()
def resend(
iblock_id: int = typer.Option(..., help="ID инфоблока с абитуриентами"),
element_id: int = typer.Option(..., help="ID абитуриента для переотправки")
):
try:
asyncio.run(resend_user(iblock_id=iblock_id, element_id=element_id))
except ValueError as e:
print(e)
@app.command()
def main():
pass
if __name__ == "__main__":
asyncio.run(app())

View File

@@ -47,7 +47,7 @@ class Abitr(BaseModel):
PRIORUTY2: Optional[Any]
PRIORUTY3: Optional[Any]
PRIVILEGES_QUESTION: Optional[Any]
BENEFITS: Optional[Any]
BENEFITS: Optional[Any] = 8
MOVA_ID: int
MATH_EN_ID: int
FIZ_ID: int

View File

@@ -18,4 +18,8 @@ class EtlConfig:
host: str = os.environ.get('DESTINATION_HOST')
port: int = int(os.environ.get('DESTINATION_PORT'))
protocol: str = os.environ.get('DESTINATION_PROTOCOL')
@dataclass
class MiscSettings:
max_wait_size: int = os.environ.get('MAX_WAIT_SIZE', 60)
use_notify: bool = os.environ.get('USE_NOTIFY', False) == 'True'

18
src/tg_bot.py Normal file
View File

@@ -0,0 +1,18 @@
import os
from aiogram import Bot
from environs import load_dotenv
load_dotenv()
class TgBot:
def __init__(self):
self.token = os.getenv('BOT_TOKEN')
self.chat_id = os.getenv('CHAT_ID')
self.bot = Bot(token=self.token)
async def send_notify(self, text):
await self.bot.send_message(self.chat_id, text)