46 lines
1.8 KiB
Python
46 lines
1.8 KiB
Python
import asyncio
|
||
import json
|
||
import logging
|
||
import time
|
||
|
||
import aiohttp
|
||
import backoff as backoff
|
||
from environs import load_dotenv
|
||
|
||
from models import Abitr
|
||
from settings import ApiConfig, EtlConfig, MiscSettings
|
||
from state import State, BaseStorage
|
||
|
||
load_dotenv()
|
||
etl_config = EtlConfig()
|
||
misc_settings = MiscSettings()
|
||
|
||
|
||
class EtlLoader:
|
||
|
||
def __init__(self):
|
||
self.etl = EtlConfig()
|
||
self.api_config = ApiConfig()
|
||
|
||
@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, aiohttp.ClientConnectorError,), base=2, factor=1,
|
||
max_value=int(misc_settings.max_wait_size), max_tries=None)
|
||
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage):
|
||
async with aiohttp.ClientSession() as session:
|
||
logging.info(f"Информация об абитуриенте: {abitr.FIO}")
|
||
data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'}
|
||
data_dict.pop('ID')
|
||
logging.info('Отправка данных к принимающей стороне... ')
|
||
data_dict = json.loads(json.dumps(data_dict, ensure_ascii=False))
|
||
logging.info(f'Данные: \n{json.dumps(data_dict, ensure_ascii=False)}')
|
||
async with session.post(
|
||
url=f"{self.etl.protocol}://{self.etl.host}:{self.etl.port}",
|
||
data=data_dict,
|
||
timeout=aiohttp.ClientTimeout(total=int(misc_settings.max_wait_size))
|
||
) as response:
|
||
pass
|
||
state.set_state(f'iblock_{iblock_id}', abitr.ID)
|
||
storage.save_state(state.local_state)
|
||
logging.info('Данные успешно доставлены!')
|
||
await asyncio.sleep(1)
|
||
|