Compare commits
25 Commits
361afad6fe
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 4d53a73e3c | |||
| d82d80aced | |||
| e5794029ab | |||
| f51a3ca403 | |||
| 0dfe1d8b7e | |||
| 9121013eb8 | |||
| 76fafc90e1 | |||
|
|
8b55667c94 | ||
|
|
d0f18393ab | ||
|
|
c5be39f749 | ||
|
|
b3dd87811c | ||
| ab67698655 | |||
| 2c501cb59e | |||
| 80f6f04e38 | |||
| ee19e9be1e | |||
| d3432ebd25 | |||
| 81dc7a1527 | |||
| 7a10529009 | |||
| fadb275242 | |||
| b5c046ea25 | |||
| b7272a6e1e | |||
| 4fc9b0fc64 | |||
| ca8860b4c4 | |||
| 264c9ed484 | |||
| a5827f9168 |
10
.env.dist
10
.env.dist
@@ -4,5 +4,13 @@ IBLOCKS=1,2,3
|
||||
DESTINATION_HOST=127.0.0.1
|
||||
DESTINATION_PORT=8000
|
||||
DESTINATION_PROTOCOL=http
|
||||
LAST_ID=0
|
||||
SIGNED=ERIGJHEJKRGH
|
||||
|
||||
# ETL_PROCESS
|
||||
LAST_ID=0
|
||||
MAX_WAIT_SIZE=60
|
||||
|
||||
# Notify
|
||||
USE_NOTIFY=False
|
||||
BOT_TOKEN=2512356:SEFijdsfiojAEFDMSD
|
||||
CHAT_ID=53242346
|
||||
@@ -11,6 +11,4 @@ RUN python -m pip install --no-cache-dir --upgrade pip \
|
||||
|
||||
COPY /src .
|
||||
|
||||
COPY .env .
|
||||
|
||||
CMD ["python", "main.py"]
|
||||
25
README.md
25
README.md
@@ -8,7 +8,7 @@
|
||||
- Docker-compose
|
||||
- Git
|
||||
|
||||
### Шаги для запуска
|
||||
### Шаги для запуска в Docker
|
||||
1. Клонируйте репозиторий на вашу машину при помощи команды `git clone https://git.jsdio.ru/jsdio/ips_etl.git`.
|
||||
2. Переименуйте файл `.env.dist` в `.env`.
|
||||
3. Откройте файл `.env` и замените значения переменных на свои:
|
||||
@@ -18,5 +18,28 @@
|
||||
- `DESTINATION_HOST`: адрес хоста назначения.
|
||||
- `DESTINATION_PORT`: порт назначения.
|
||||
- `DESTINATION_PROTOCOL`: протокол назначения (http или https).
|
||||
- `SIGNED`: ключ подписи, должен совпадать на обеих сторонах.
|
||||
- `LAST_ID`: последний успешно обработанный ID.
|
||||
- `MAX_WAIT_SIZE`: максимальное время ожидания backoff.
|
||||
- `USE_NOTIFY`: уведомлять ли о ходе работы обработчика в ТГ, `True` либо `False`
|
||||
- `BOT_TOKEN`: токен Telegram-бота.
|
||||
- `CHAT_ID`: ID чата, куда отсылать уведомление.
|
||||
4. После настройки переменных, запустите проект при помощи docker-compose командой `docker-compose up -d`.
|
||||
|
||||
### Шаги для запуска локально
|
||||
1. Выполнить п.1-3 из предыдущего списка
|
||||
2. Выполнить команду `pip install -r requirements.txt`
|
||||
3. Запускать main.py из директории src
|
||||
|
||||
|
||||
## Повторная отправка пользователя
|
||||
Запустить `manage.py` из директории src c опцией `resend` и атрибутами `iblock-id` и `element-id`.
|
||||
Подробее `manage.py resend --help`
|
||||
### Примеры:
|
||||
### Повторная отправка пользователя локально
|
||||
`python manage.py resend --iblock-id 14 --element-id 2981`
|
||||
### Повторная отправка пользователя в Docker
|
||||
`docker exec etl python manage.py resend --iblock-id 14 --element-id 2981`
|
||||
|
||||
**Внимание!** Данные всех абитуриентов, следующих за исправляемым ID, должны быть предварительно исправлены
|
||||
либо ненужные абитуриенты должны быть отключены через административную панель.
|
||||
@@ -4,3 +4,5 @@ services:
|
||||
build: .
|
||||
container_name: etl
|
||||
restart: always
|
||||
env_file:
|
||||
- .env
|
||||
@@ -1,5 +1,9 @@
|
||||
pydantic==1.10.7
|
||||
environs==9.5.0
|
||||
requests==2.30.0
|
||||
aiogram~=2.25.1
|
||||
aiohttp~=3.8.4
|
||||
backoff~=2.2.1
|
||||
environs==9.5.0
|
||||
pydantic==1.10.7
|
||||
requests==2.30.0
|
||||
typer==0.9.0
|
||||
|
||||
|
||||
|
||||
@@ -4,25 +4,45 @@ import os
|
||||
|
||||
import aiohttp
|
||||
import backoff
|
||||
import pydantic.error_wrappers
|
||||
from environs import load_dotenv
|
||||
|
||||
from models import Abitr
|
||||
from settings import ApiConfig
|
||||
from state import State
|
||||
from settings import ApiConfig, EtlConfig, MiscSettings
|
||||
from state import State, BaseStorage
|
||||
from tg_bot import TgBot
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
etl_config = EtlConfig()
|
||||
misc_settings = MiscSettings()
|
||||
|
||||
|
||||
class ApiExtractor:
|
||||
|
||||
def __init__(self):
|
||||
self.api_config = ApiConfig()
|
||||
self.headers = {"Authorization-Token": self.api_config.token}
|
||||
self.headers = {
|
||||
"Authorization-Token": self.api_config.token,
|
||||
}
|
||||
self.fields = ["ID", "IBLOCK_ID", "NAME", "CODE", "SECTION_ID"]
|
||||
|
||||
async def __get_data(self, url, data):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(url=url, headers=self.headers,
|
||||
data=data) as resp:
|
||||
try:
|
||||
return json.loads(await resp.text())
|
||||
except json.JSONDecodeError:
|
||||
logging.error(f'Получены неверные данные - {await resp.text()}')
|
||||
return {}
|
||||
|
||||
@backoff.on_exception(backoff.expo, (aiohttp.ClientResponseError, aiohttp.ClientConnectorError,
|
||||
aiohttp.ServerDisconnectedError), base=2, factor=1,
|
||||
max_value=5, max_tries=None)
|
||||
async def get_extract_data(self, state: State, iblock_id: int, fields: list = None, **kwargs) -> list[Abitr]:
|
||||
max_value=int(misc_settings.max_wait_size), max_tries=None)
|
||||
async def get_extract_data(self, state: State, iblock_id: int, storage: BaseStorage, fields: list = None,
|
||||
resend: bool = False, element_id: int = 0, **kwargs) -> list[Abitr]:
|
||||
|
||||
data = {
|
||||
'iblockId': iblock_id,
|
||||
@@ -32,25 +52,27 @@ class ApiExtractor:
|
||||
data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False)
|
||||
else:
|
||||
data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False)
|
||||
|
||||
if not resend:
|
||||
min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0
|
||||
if os.getenv('LAST_ID').isdigit():
|
||||
if int(min_id) < int(os.getenv('LAST_ID')):
|
||||
min_id = os.getenv('LAST_ID')
|
||||
else:
|
||||
min_id = element_id
|
||||
data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False)
|
||||
abitrs = []
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(f'{self.api_config.host}/getnativeiblockelementslist', headers=self.headers,
|
||||
data=data) as resp:
|
||||
|
||||
results = json.loads(await resp.text())
|
||||
# Получаем всех новых абитуриентов
|
||||
results = await self.__get_data(f'{self.api_config.host}/getnativeiblockelementslist', data)
|
||||
if len(results) > 0:
|
||||
logging.info(f'Получение абитуриентов ID > {min_id}. Iblock - {iblock_id}')
|
||||
for result in results:
|
||||
data = {
|
||||
'elementId': result,
|
||||
}
|
||||
async with session.get(f'{self.api_config.host}/getiblockelement/', headers=self.headers,
|
||||
data=data) as resp:
|
||||
res = json.loads(await resp.text())
|
||||
|
||||
# Получаем информацию об конкретном абитуриенте
|
||||
res = await self.__get_data(f'{self.api_config.host}/getiblockelement', data)
|
||||
res = res[str(result)]
|
||||
class_attrs = vars(Abitr)['__annotations__']
|
||||
try:
|
||||
@@ -62,14 +84,20 @@ class ApiExtractor:
|
||||
for key, value in class_attrs.items():
|
||||
if key in res:
|
||||
abitr[key] = res[key]['VALUE']
|
||||
if res['BENEFITS']['VALUE'] != '-' or res['BENEFITS2']['VALUE'] != '-':
|
||||
if res['BENEFITS']['VALUE'] != '-':
|
||||
abitr['BENEFITS'] = res['BENEFITS']['VALUE']
|
||||
abitr['PRIVILEGES_QUESTION'] = 2
|
||||
if res['BENEFITS2']['VALUE'] != '-':
|
||||
abitr['BENEFITS'] = res['BENEFITS2']['VALUE']
|
||||
abitr['PRIVILEGES_QUESTION'] = 1
|
||||
if abitr['BENEFITS'] == '' or abitr['BENEFITS'] is None:
|
||||
abitr['BENEFITS'] = 8
|
||||
try:
|
||||
abitrs.append(Abitr(**{key: value for key, value in abitr.items()}))
|
||||
except pydantic.error_wrappers.ValidationError as e:
|
||||
if not resend:
|
||||
logging.info(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
|
||||
tgbot = TgBot()
|
||||
await tgbot.send_notify(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
|
||||
state.set_state(f'iblock_{iblock_id}', abitr["ID"])
|
||||
storage.save_state(state.local_state)
|
||||
else:
|
||||
raise ValueError(f'Данные абитуриента {abitr["ID"]} невалидны. {e}')
|
||||
|
||||
if len(abitrs) > 0:
|
||||
logging.info(f'Получено абитуриентов - {len(abitrs)}')
|
||||
return abitrs
|
||||
|
||||
@@ -1,17 +1,17 @@
|
||||
import asyncio
|
||||
import json
|
||||
import logging
|
||||
import time
|
||||
|
||||
import aiohttp
|
||||
import backoff as backoff
|
||||
from environs import load_dotenv
|
||||
|
||||
from models import Abitr
|
||||
from settings import ApiConfig, EtlConfig
|
||||
from settings import ApiConfig, EtlConfig, MiscSettings
|
||||
from state import State, BaseStorage
|
||||
|
||||
load_dotenv()
|
||||
etl_config = EtlConfig()
|
||||
misc_settings = MiscSettings()
|
||||
|
||||
|
||||
class EtlLoader:
|
||||
@@ -20,23 +20,25 @@ class EtlLoader:
|
||||
self.etl = EtlConfig()
|
||||
self.api_config = ApiConfig()
|
||||
|
||||
@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, ), base=2, factor=1,
|
||||
max_value=5, max_tries=None)
|
||||
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage):
|
||||
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage, resend: bool = False):
|
||||
async with aiohttp.ClientSession() as session:
|
||||
logging.info(f"Информация об абитуриенте: {abitr.FIO}")
|
||||
data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'}
|
||||
data_dict.pop('ID')
|
||||
logging.info('Отправка данных к принимающей стороне... ')
|
||||
data_dict = json.loads(json.dumps(data_dict, ensure_ascii=False))
|
||||
logging.info(f'Данные: \n{json.dumps(data_dict, ensure_ascii=False)}')
|
||||
try:
|
||||
async with session.post(
|
||||
url=f"{self.etl.protocol}://{self.etl.host}:{self.etl.port}",
|
||||
data=data_dict,
|
||||
timeout=aiohttp.ClientTimeout(total=15)
|
||||
timeout=aiohttp.ClientTimeout(total=int(misc_settings.max_wait_size))
|
||||
) as response:
|
||||
pass
|
||||
logging.info('Данные успешно доставлены!')
|
||||
except (asyncio.TimeoutError, aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError):
|
||||
logging.error('Данные доставлены с ошибкой')
|
||||
if not resend:
|
||||
state.set_state(f'iblock_{iblock_id}', abitr.ID)
|
||||
storage.save_state(state.local_state)
|
||||
logging.info('Данные успешно доставлены!')
|
||||
|
||||
await asyncio.sleep(1)
|
||||
|
||||
|
||||
11
src/main.py
11
src/main.py
@@ -6,8 +6,9 @@ from environs import load_dotenv
|
||||
|
||||
from extractor import ApiExtractor
|
||||
from loader import EtlLoader
|
||||
from settings import ApiConfig
|
||||
from settings import ApiConfig, MiscSettings
|
||||
from state import JsonFileStorage, State
|
||||
from tg_bot import TgBot
|
||||
|
||||
load_dotenv()
|
||||
|
||||
@@ -20,10 +21,16 @@ async def main():
|
||||
api = ApiConfig()
|
||||
state = State(storage=storage)
|
||||
loader = EtlLoader()
|
||||
misc_settings = MiscSettings()
|
||||
logging.info('Обработчик готов к работе.')
|
||||
|
||||
if misc_settings.use_notify:
|
||||
tgbot = TgBot()
|
||||
await tgbot.send_notify('Обработчик запущен.')
|
||||
|
||||
while True:
|
||||
for iblock_id in api.iblocks:
|
||||
abitrs = await extractor.get_extract_data(iblock_id=int(iblock_id), state=state)
|
||||
abitrs = await extractor.get_extract_data(iblock_id=int(iblock_id), state=state, storage=storage)
|
||||
for abitr in abitrs:
|
||||
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id))
|
||||
await asyncio.sleep(10)
|
||||
|
||||
50
src/manage.py
Normal file
50
src/manage.py
Normal file
@@ -0,0 +1,50 @@
|
||||
import asyncio
|
||||
import typer
|
||||
|
||||
from extractor import ApiExtractor
|
||||
from loader import EtlLoader
|
||||
from state import JsonFileStorage, State
|
||||
|
||||
app = typer.Typer()
|
||||
|
||||
|
||||
async def resend_user(iblock_id: int, element_id: int):
|
||||
extractor = ApiExtractor()
|
||||
storage = JsonFileStorage()
|
||||
state = State(storage=storage)
|
||||
loader = EtlLoader()
|
||||
abitrs = await extractor.get_extract_data(
|
||||
iblock_id=iblock_id,
|
||||
element_id=element_id - 1,
|
||||
state=state,
|
||||
storage=storage,
|
||||
resend=True
|
||||
)
|
||||
if len(abitrs) == 0:
|
||||
raise ValueError('Не нашлось абитуриентов с заданным условием')
|
||||
if element_id not in [abitr.ID for abitr in abitrs]:
|
||||
raise ValueError('Не нашлось абитуриентов с заданным условием')
|
||||
for abitr in abitrs:
|
||||
if abitr.ID != element_id:
|
||||
continue
|
||||
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id), resend=True)
|
||||
|
||||
|
||||
@app.command()
|
||||
def resend(
|
||||
iblock_id: int = typer.Option(..., help="ID инфоблока с абитуриентами"),
|
||||
element_id: int = typer.Option(..., help="ID абитуриента для переотправки")
|
||||
):
|
||||
try:
|
||||
asyncio.run(resend_user(iblock_id=iblock_id, element_id=element_id))
|
||||
except ValueError as e:
|
||||
print(e)
|
||||
|
||||
|
||||
@app.command()
|
||||
def main():
|
||||
pass
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(app())
|
||||
@@ -47,7 +47,7 @@ class Abitr(BaseModel):
|
||||
PRIORUTY2: Optional[Any]
|
||||
PRIORUTY3: Optional[Any]
|
||||
PRIVILEGES_QUESTION: Optional[Any]
|
||||
BENEFITS: Optional[Any]
|
||||
BENEFITS: Optional[Any] = 8
|
||||
MOVA_ID: int
|
||||
MATH_EN_ID: int
|
||||
FIZ_ID: int
|
||||
|
||||
@@ -18,3 +18,8 @@ class EtlConfig:
|
||||
host: str = os.environ.get('DESTINATION_HOST')
|
||||
port: int = int(os.environ.get('DESTINATION_PORT'))
|
||||
protocol: str = os.environ.get('DESTINATION_PROTOCOL')
|
||||
|
||||
@dataclass
|
||||
class MiscSettings:
|
||||
max_wait_size: int = os.environ.get('MAX_WAIT_SIZE', 60)
|
||||
use_notify: bool = os.environ.get('USE_NOTIFY', False) == 'True'
|
||||
|
||||
18
src/tg_bot.py
Normal file
18
src/tg_bot.py
Normal file
@@ -0,0 +1,18 @@
|
||||
import os
|
||||
|
||||
from aiogram import Bot
|
||||
from environs import load_dotenv
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
class TgBot:
|
||||
|
||||
def __init__(self):
|
||||
self.token = os.getenv('BOT_TOKEN')
|
||||
self.chat_id = os.getenv('CHAT_ID')
|
||||
self.bot = Bot(token=self.token)
|
||||
|
||||
async def send_notify(self, text):
|
||||
await self.bot.send_message(self.chat_id, text)
|
||||
Reference in New Issue
Block a user