From 980eea316b190207ddb8de37c72bfe3f453080f5 Mon Sep 17 00:00:00 2001 From: jsdio Date: Mon, 5 Jun 2023 14:32:09 +0300 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=B0=D0=BF=D0=B8=D1=81=D0=B0=D0=BD=20?= =?UTF-8?q?=D0=BA=D0=BB=D0=B0=D1=81=D1=81=20=D0=B7=D0=B0=D0=BF=D0=B8=D1=81?= =?UTF-8?q?=D0=B8=20=D0=B4=D0=B0=D0=BD=D0=BD=D1=8B=D1=85=20=D0=B2=20=D0=BE?= =?UTF-8?q?=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82=D1=87=D0=B8=D0=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- loader.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 loader.py diff --git a/loader.py b/loader.py new file mode 100644 index 0000000..0db2232 --- /dev/null +++ b/loader.py @@ -0,0 +1,42 @@ +import asyncio +import json +import logging +import time + +import aiohttp +import backoff as backoff +from environs import load_dotenv + +from models import Abitr +from settings import ApiConfig, EtlConfig +from state import State, BaseStorage + +load_dotenv() + + +class EtlLoader: + + def __init__(self): + self.etl = EtlConfig() + self.api_config = ApiConfig() + + @backoff.on_exception(backoff.expo, (asyncio.TimeoutError, ), base=2, factor=1, + max_value=5, max_tries=None) + async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage): + async with aiohttp.ClientSession() as session: + logging.info(f"Информация об абитуриенте: {abitr.FIO}") + data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'} + data_dict.pop('ID') + logging.info('Отправка данных к принимающей стороне... ') + data_dict = json.loads(json.dumps(data_dict, ensure_ascii=False)) + async with session.post( + url=f"{self.etl.protocol}://{self.etl.host}:{self.etl.port}", + data=data_dict, + timeout=aiohttp.ClientTimeout(total=15) + ) as response: + pass + state.set_state(f'iblock_{iblock_id}', abitr.ID) + storage.save_state(state.local_state) + logging.info('Данные успешно доставлены!') + await asyncio.sleep(1) +