Compare commits

..

25 Commits

Author SHA1 Message Date
4d53a73e3c Update README.md 2023-07-18 13:31:34 +03:00
d82d80aced Скрипт повторной отправки пользователя 2023-07-18 13:19:56 +03:00
e5794029ab Игнор ошибки при валидации 2023-07-18 11:00:36 +03:00
f51a3ca403 Поправлен вывод ошибки 2023-07-05 10:40:30 +03:00
0dfe1d8b7e Фикс ошибки неправильной загрузки обработки данных из getnativeiblockelementslist 2023-07-04 15:10:15 +03:00
9121013eb8 Данные повторно отправляться не будут, если доставка была с ошибкой 2023-06-27 12:29:32 +03:00
76fafc90e1 Теперь по умолчанию поле со льготами отправляет значение 8 2023-06-26 11:48:51 +03:00
grucshetskyaleksei
8b55667c94 Добавил обработку ServerDisconnectedError в backoff 2023-06-22 02:48:20 +03:00
grucshetskyaleksei
d0f18393ab Убрал предварительное форматирование в json 2023-06-21 12:14:52 +03:00
grucshetskyaleksei
c5be39f749 Добавил отображение в json данных, которые отправляются принимающей стороне 2023-06-08 13:33:28 +03:00
grucshetskyaleksei
b3dd87811c Вынес определение TG-бота только в случае, если он используется для уведомлений 2023-06-08 13:32:29 +03:00
ab67698655 Реализовл подгрузку .env через docker-compose 2023-06-06 14:40:06 +03:00
2c501cb59e Убрал лишний print 2023-06-06 14:23:38 +03:00
80f6f04e38 Обновил Readme и env.dist для возможности отправки уведомлений 2023-06-06 14:22:37 +03:00
ee19e9be1e Добавил возможность получения уведомления о старте обработчика в ТГ 2023-06-06 14:16:32 +03:00
d3432ebd25 Вынес часть настроек в MiscSettings 2023-06-06 14:15:25 +03:00
81dc7a1527 Изменил(а) на 'README.md' 2023-06-06 10:09:04 +00:00
7a10529009 Изменил(а) на 'README.md' 2023-06-06 10:03:11 +00:00
fadb275242 Убрал ненужную обработку Benefits 2023-06-06 11:50:22 +03:00
b5c046ea25 Добавил исключение ClientConnectorError 2023-06-06 11:28:44 +03:00
b7272a6e1e Вывел получение данных в единую функцию 2023-06-06 11:28:20 +03:00
4fc9b0fc64 Изменил время ожидания aiohttp 2023-06-06 10:20:45 +03:00
ca8860b4c4 Изменил время ожидания aiohttp 2023-06-05 17:52:59 +03:00
264c9ed484 Изменил время ожидания backoff 2023-06-05 17:44:20 +03:00
a5827f9168 Изменил время ожидания backoff 2023-06-05 17:44:09 +03:00
12 changed files with 213 additions and 68 deletions

View File

@@ -4,5 +4,13 @@ IBLOCKS=1,2,3
DESTINATION_HOST=127.0.0.1 DESTINATION_HOST=127.0.0.1
DESTINATION_PORT=8000 DESTINATION_PORT=8000
DESTINATION_PROTOCOL=http DESTINATION_PROTOCOL=http
LAST_ID=0
SIGNED=ERIGJHEJKRGH SIGNED=ERIGJHEJKRGH
# ETL_PROCESS
LAST_ID=0
MAX_WAIT_SIZE=60
# Notify
USE_NOTIFY=False
BOT_TOKEN=2512356:SEFijdsfiojAEFDMSD
CHAT_ID=53242346

View File

@@ -11,6 +11,4 @@ RUN python -m pip install --no-cache-dir --upgrade pip \
COPY /src . COPY /src .
COPY .env .
CMD ["python", "main.py"] CMD ["python", "main.py"]

View File

@@ -8,7 +8,7 @@
- Docker-compose - Docker-compose
- Git - Git
### Шаги для запуска ### Шаги для запуска в Docker
1. Клонируйте репозиторий на вашу машину при помощи команды `git clone https://git.jsdio.ru/jsdio/ips_etl.git`. 1. Клонируйте репозиторий на вашу машину при помощи команды `git clone https://git.jsdio.ru/jsdio/ips_etl.git`.
2. Переименуйте файл `.env.dist` в `.env`. 2. Переименуйте файл `.env.dist` в `.env`.
3. Откройте файл `.env` и замените значения переменных на свои: 3. Откройте файл `.env` и замените значения переменных на свои:
@@ -18,5 +18,28 @@
- `DESTINATION_HOST`: адрес хоста назначения. - `DESTINATION_HOST`: адрес хоста назначения.
- `DESTINATION_PORT`: порт назначения. - `DESTINATION_PORT`: порт назначения.
- `DESTINATION_PROTOCOL`: протокол назначения (http или https). - `DESTINATION_PROTOCOL`: протокол назначения (http или https).
- `SIGNED`: ключ подписи, должен совпадать на обеих сторонах.
- `LAST_ID`: последний успешно обработанный ID. - `LAST_ID`: последний успешно обработанный ID.
- `MAX_WAIT_SIZE`: максимальное время ожидания backoff.
- `USE_NOTIFY`: уведомлять ли о ходе работы обработчика в ТГ, `True` либо `False`
- `BOT_TOKEN`: токен Telegram-бота.
- `CHAT_ID`: ID чата, куда отсылать уведомление.
4. После настройки переменных, запустите проект при помощи docker-compose командой `docker-compose up -d`. 4. После настройки переменных, запустите проект при помощи docker-compose командой `docker-compose up -d`.
### Шаги для запуска локально
1. Выполнить п.1-3 из предыдущего списка
2. Выполнить команду `pip install -r requirements.txt`
3. Запускать main.py из директории src
## Повторная отправка пользователя
Запустить `manage.py` из директории src c опцией `resend` и атрибутами `iblock-id` и `element-id`.
Подробее `manage.py resend --help`
### Примеры:
### Повторная отправка пользователя локально
`python manage.py resend --iblock-id 14 --element-id 2981`
### Повторная отправка пользователя в Docker
`docker exec etl python manage.py resend --iblock-id 14 --element-id 2981`
**Внимание!** Данные всех абитуриентов, следующих за исправляемым ID, должны быть предварительно исправлены
либо ненужные абитуриенты должны быть отключены через административную панель.

View File

@@ -4,3 +4,5 @@ services:
build: . build: .
container_name: etl container_name: etl
restart: always restart: always
env_file:
- .env

View File

@@ -1,5 +1,9 @@
pydantic==1.10.7 aiogram~=2.25.1
environs==9.5.0
requests==2.30.0
aiohttp~=3.8.4 aiohttp~=3.8.4
backoff~=2.2.1 backoff~=2.2.1
environs==9.5.0
pydantic==1.10.7
requests==2.30.0
typer==0.9.0

View File

@@ -4,25 +4,45 @@ import os
import aiohttp import aiohttp
import backoff import backoff
import pydantic.error_wrappers
from environs import load_dotenv from environs import load_dotenv
from models import Abitr from models import Abitr
from settings import ApiConfig from settings import ApiConfig, EtlConfig, MiscSettings
from state import State from state import State, BaseStorage
from tg_bot import TgBot
load_dotenv() load_dotenv()
etl_config = EtlConfig()
misc_settings = MiscSettings()
class ApiExtractor: class ApiExtractor:
def __init__(self): def __init__(self):
self.api_config = ApiConfig() self.api_config = ApiConfig()
self.headers = {"Authorization-Token": self.api_config.token} self.headers = {
"Authorization-Token": self.api_config.token,
}
self.fields = ["ID", "IBLOCK_ID", "NAME", "CODE", "SECTION_ID"] self.fields = ["ID", "IBLOCK_ID", "NAME", "CODE", "SECTION_ID"]
async def __get_data(self, url, data):
async with aiohttp.ClientSession() as session:
async with session.get(url=url, headers=self.headers,
data=data) as resp:
try:
return json.loads(await resp.text())
except json.JSONDecodeError:
logging.error(f'Получены неверные данные - {await resp.text()}')
return {}
@backoff.on_exception(backoff.expo, (aiohttp.ClientResponseError, aiohttp.ClientConnectorError, @backoff.on_exception(backoff.expo, (aiohttp.ClientResponseError, aiohttp.ClientConnectorError,
aiohttp.ServerDisconnectedError), base=2, factor=1, aiohttp.ServerDisconnectedError), base=2, factor=1,
max_value=5, max_tries=None) max_value=int(misc_settings.max_wait_size), max_tries=None)
async def get_extract_data(self, state: State, iblock_id: int, fields: list = None, **kwargs) -> list[Abitr]: async def get_extract_data(self, state: State, iblock_id: int, storage: BaseStorage, fields: list = None,
resend: bool = False, element_id: int = 0, **kwargs) -> list[Abitr]:
data = { data = {
'iblockId': iblock_id, 'iblockId': iblock_id,
@@ -32,25 +52,27 @@ class ApiExtractor:
data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False) data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False)
else: else:
data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False) data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False)
if not resend:
min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0 min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0
if os.getenv('LAST_ID').isdigit():
if int(min_id) < int(os.getenv('LAST_ID')):
min_id = os.getenv('LAST_ID')
else:
min_id = element_id
data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False) data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False)
abitrs = [] abitrs = []
async with aiohttp.ClientSession() as session: # Получаем всех новых абитуриентов
async with session.get(f'{self.api_config.host}/getnativeiblockelementslist', headers=self.headers, results = await self.__get_data(f'{self.api_config.host}/getnativeiblockelementslist', data)
data=data) as resp:
results = json.loads(await resp.text())
if len(results) > 0: if len(results) > 0:
logging.info(f'Получение абитуриентов ID > {min_id}. Iblock - {iblock_id}') logging.info(f'Получение абитуриентов ID > {min_id}. Iblock - {iblock_id}')
for result in results: for result in results:
data = { data = {
'elementId': result, 'elementId': result,
} }
async with session.get(f'{self.api_config.host}/getiblockelement/', headers=self.headers,
data=data) as resp: # Получаем информацию об конкретном абитуриенте
res = json.loads(await resp.text()) res = await self.__get_data(f'{self.api_config.host}/getiblockelement', data)
res = res[str(result)] res = res[str(result)]
class_attrs = vars(Abitr)['__annotations__'] class_attrs = vars(Abitr)['__annotations__']
try: try:
@@ -62,14 +84,20 @@ class ApiExtractor:
for key, value in class_attrs.items(): for key, value in class_attrs.items():
if key in res: if key in res:
abitr[key] = res[key]['VALUE'] abitr[key] = res[key]['VALUE']
if res['BENEFITS']['VALUE'] != '-' or res['BENEFITS2']['VALUE'] != '-': if abitr['BENEFITS'] == '' or abitr['BENEFITS'] is None:
if res['BENEFITS']['VALUE'] != '-': abitr['BENEFITS'] = 8
abitr['BENEFITS'] = res['BENEFITS']['VALUE'] try:
abitr['PRIVILEGES_QUESTION'] = 2
if res['BENEFITS2']['VALUE'] != '-':
abitr['BENEFITS'] = res['BENEFITS2']['VALUE']
abitr['PRIVILEGES_QUESTION'] = 1
abitrs.append(Abitr(**{key: value for key, value in abitr.items()})) abitrs.append(Abitr(**{key: value for key, value in abitr.items()}))
except pydantic.error_wrappers.ValidationError as e:
if not resend:
logging.info(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
tgbot = TgBot()
await tgbot.send_notify(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
state.set_state(f'iblock_{iblock_id}', abitr["ID"])
storage.save_state(state.local_state)
else:
raise ValueError(f'Данные абитуриента {abitr["ID"]} невалидны. {e}')
if len(abitrs) > 0: if len(abitrs) > 0:
logging.info(f'Получено абитуриентов - {len(abitrs)}') logging.info(f'Получено абитуриентов - {len(abitrs)}')
return abitrs return abitrs

View File

@@ -1,17 +1,17 @@
import asyncio import asyncio
import json import json
import logging import logging
import time
import aiohttp import aiohttp
import backoff as backoff
from environs import load_dotenv from environs import load_dotenv
from models import Abitr from models import Abitr
from settings import ApiConfig, EtlConfig from settings import ApiConfig, EtlConfig, MiscSettings
from state import State, BaseStorage from state import State, BaseStorage
load_dotenv() load_dotenv()
etl_config = EtlConfig()
misc_settings = MiscSettings()
class EtlLoader: class EtlLoader:
@@ -20,23 +20,25 @@ class EtlLoader:
self.etl = EtlConfig() self.etl = EtlConfig()
self.api_config = ApiConfig() self.api_config = ApiConfig()
@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, ), base=2, factor=1, async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage, resend: bool = False):
max_value=5, max_tries=None)
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
logging.info(f"Информация об абитуриенте: {abitr.FIO}") logging.info(f"Информация об абитуриенте: {abitr.FIO}")
data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'} data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'}
data_dict.pop('ID') data_dict.pop('ID')
logging.info('Отправка данных к принимающей стороне... ') logging.info('Отправка данных к принимающей стороне... ')
data_dict = json.loads(json.dumps(data_dict, ensure_ascii=False)) logging.info(f'Данные: \n{json.dumps(data_dict, ensure_ascii=False)}')
try:
async with session.post( async with session.post(
url=f"{self.etl.protocol}://{self.etl.host}:{self.etl.port}", url=f"{self.etl.protocol}://{self.etl.host}:{self.etl.port}",
data=data_dict, data=data_dict,
timeout=aiohttp.ClientTimeout(total=15) timeout=aiohttp.ClientTimeout(total=int(misc_settings.max_wait_size))
) as response: ) as response:
pass logging.info('Данные успешно доставлены!')
except (asyncio.TimeoutError, aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError):
logging.error('Данные доставлены с ошибкой')
if not resend:
state.set_state(f'iblock_{iblock_id}', abitr.ID) state.set_state(f'iblock_{iblock_id}', abitr.ID)
storage.save_state(state.local_state) storage.save_state(state.local_state)
logging.info('Данные успешно доставлены!')
await asyncio.sleep(1) await asyncio.sleep(1)

View File

@@ -6,8 +6,9 @@ from environs import load_dotenv
from extractor import ApiExtractor from extractor import ApiExtractor
from loader import EtlLoader from loader import EtlLoader
from settings import ApiConfig from settings import ApiConfig, MiscSettings
from state import JsonFileStorage, State from state import JsonFileStorage, State
from tg_bot import TgBot
load_dotenv() load_dotenv()
@@ -20,10 +21,16 @@ async def main():
api = ApiConfig() api = ApiConfig()
state = State(storage=storage) state = State(storage=storage)
loader = EtlLoader() loader = EtlLoader()
misc_settings = MiscSettings()
logging.info('Обработчик готов к работе.') logging.info('Обработчик готов к работе.')
if misc_settings.use_notify:
tgbot = TgBot()
await tgbot.send_notify('Обработчик запущен.')
while True: while True:
for iblock_id in api.iblocks: for iblock_id in api.iblocks:
abitrs = await extractor.get_extract_data(iblock_id=int(iblock_id), state=state) abitrs = await extractor.get_extract_data(iblock_id=int(iblock_id), state=state, storage=storage)
for abitr in abitrs: for abitr in abitrs:
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id)) await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id))
await asyncio.sleep(10) await asyncio.sleep(10)

50
src/manage.py Normal file
View File

@@ -0,0 +1,50 @@
import asyncio
import typer
from extractor import ApiExtractor
from loader import EtlLoader
from state import JsonFileStorage, State
app = typer.Typer()
async def resend_user(iblock_id: int, element_id: int):
extractor = ApiExtractor()
storage = JsonFileStorage()
state = State(storage=storage)
loader = EtlLoader()
abitrs = await extractor.get_extract_data(
iblock_id=iblock_id,
element_id=element_id - 1,
state=state,
storage=storage,
resend=True
)
if len(abitrs) == 0:
raise ValueError('Не нашлось абитуриентов с заданным условием')
if element_id not in [abitr.ID for abitr in abitrs]:
raise ValueError('Не нашлось абитуриентов с заданным условием')
for abitr in abitrs:
if abitr.ID != element_id:
continue
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id), resend=True)
@app.command()
def resend(
iblock_id: int = typer.Option(..., help="ID инфоблока с абитуриентами"),
element_id: int = typer.Option(..., help="ID абитуриента для переотправки")
):
try:
asyncio.run(resend_user(iblock_id=iblock_id, element_id=element_id))
except ValueError as e:
print(e)
@app.command()
def main():
pass
if __name__ == "__main__":
asyncio.run(app())

View File

@@ -47,7 +47,7 @@ class Abitr(BaseModel):
PRIORUTY2: Optional[Any] PRIORUTY2: Optional[Any]
PRIORUTY3: Optional[Any] PRIORUTY3: Optional[Any]
PRIVILEGES_QUESTION: Optional[Any] PRIVILEGES_QUESTION: Optional[Any]
BENEFITS: Optional[Any] BENEFITS: Optional[Any] = 8
MOVA_ID: int MOVA_ID: int
MATH_EN_ID: int MATH_EN_ID: int
FIZ_ID: int FIZ_ID: int

View File

@@ -18,3 +18,8 @@ class EtlConfig:
host: str = os.environ.get('DESTINATION_HOST') host: str = os.environ.get('DESTINATION_HOST')
port: int = int(os.environ.get('DESTINATION_PORT')) port: int = int(os.environ.get('DESTINATION_PORT'))
protocol: str = os.environ.get('DESTINATION_PROTOCOL') protocol: str = os.environ.get('DESTINATION_PROTOCOL')
@dataclass
class MiscSettings:
max_wait_size: int = os.environ.get('MAX_WAIT_SIZE', 60)
use_notify: bool = os.environ.get('USE_NOTIFY', False) == 'True'

18
src/tg_bot.py Normal file
View File

@@ -0,0 +1,18 @@
import os
from aiogram import Bot
from environs import load_dotenv
load_dotenv()
class TgBot:
def __init__(self):
self.token = os.getenv('BOT_TOKEN')
self.chat_id = os.getenv('CHAT_ID')
self.bot = Bot(token=self.token)
async def send_notify(self, text):
await self.bot.send_message(self.chat_id, text)