diff --git a/README.md b/README.md index 0b8e84b..c3f95be 100644 --- a/README.md +++ b/README.md @@ -29,4 +29,14 @@ ### Шаги для запуска локально 1. Выполнить п.1-3 из предыдущего списка 2. Выполнить команду `pip install -r requirements.txt` -3. Запускать main.py из директории src \ No newline at end of file +3. Запускать main.py из директории src + + +## Повторная отправка пользователя +Запустить `manage.py` из директории src c опцией `resend` и атрибутами `iblock-id` и `element-id`. +Подробее `manage.py resend --help` +### Примеры: +### Повторная отправка пользователя локально +`python manage.py resend --iblock-id 14 --element-id 2981` +### Повторная отправка пользователя в Docker +`docker exec etl python manage.py resend --iblock-id 14 --element-id 2981` diff --git a/requirements.txt b/requirements.txt index 6cc690d..8678369 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,9 @@ -pydantic==1.10.7 -environs==9.5.0 -requests==2.30.0 +aiogram~=2.25.1 aiohttp~=3.8.4 backoff~=2.2.1 -aiogram~=2.25.1 \ No newline at end of file +environs==9.5.0 +pydantic==1.10.7 +requests==2.30.0 +typer==0.9.0 + + diff --git a/src/extractor.py b/src/extractor.py index a8595da..2ab3e42 100644 --- a/src/extractor.py +++ b/src/extractor.py @@ -42,7 +42,7 @@ class ApiExtractor: aiohttp.ServerDisconnectedError), base=2, factor=1, max_value=int(misc_settings.max_wait_size), max_tries=None) async def get_extract_data(self, state: State, iblock_id: int, storage: BaseStorage, fields: list = None, - **kwargs) -> list[Abitr]: + resend: bool = False, element_id: int = 0, **kwargs) -> list[Abitr]: data = { 'iblockId': iblock_id, @@ -52,10 +52,13 @@ class ApiExtractor: data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False) else: data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False) - min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0 - if os.getenv('LAST_ID').isdigit(): - if int(min_id) < int(os.getenv('LAST_ID')): - min_id = os.getenv('LAST_ID') + if not resend: + min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0 + if os.getenv('LAST_ID').isdigit(): + if int(min_id) < int(os.getenv('LAST_ID')): + min_id = os.getenv('LAST_ID') + else: + min_id = element_id data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False) abitrs = [] @@ -86,12 +89,14 @@ class ApiExtractor: try: abitrs.append(Abitr(**{key: value for key, value in abitr.items()})) except pydantic.error_wrappers.ValidationError as e: - logging.info(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}') - tgbot = TgBot() - await tgbot.send_notify(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}') - state.set_state(f'iblock_{iblock_id}', abitr["ID"]) - storage.save_state(state.local_state) - + if not resend: + logging.info(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}') + tgbot = TgBot() + await tgbot.send_notify(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}') + state.set_state(f'iblock_{iblock_id}', abitr["ID"]) + storage.save_state(state.local_state) + else: + raise ValueError(f'Данные абитуриента {abitr["ID"]} невалидны. {e}') if len(abitrs) > 0: logging.info(f'Получено абитуриентов - {len(abitrs)}') diff --git a/src/loader.py b/src/loader.py index 13a6a47..078be05 100644 --- a/src/loader.py +++ b/src/loader.py @@ -20,7 +20,7 @@ class EtlLoader: self.etl = EtlConfig() self.api_config = ApiConfig() - async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage): + async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage, resend: bool = False): async with aiohttp.ClientSession() as session: logging.info(f"Информация об абитуриенте: {abitr.FIO}") data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'} @@ -36,8 +36,9 @@ class EtlLoader: logging.info('Данные успешно доставлены!') except (asyncio.TimeoutError, aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError): logging.error('Данные доставлены с ошибкой') - state.set_state(f'iblock_{iblock_id}', abitr.ID) - storage.save_state(state.local_state) + if not resend: + state.set_state(f'iblock_{iblock_id}', abitr.ID) + storage.save_state(state.local_state) await asyncio.sleep(1) diff --git a/src/manage.py b/src/manage.py new file mode 100644 index 0000000..e35f643 --- /dev/null +++ b/src/manage.py @@ -0,0 +1,50 @@ +import asyncio +import typer + +from extractor import ApiExtractor +from loader import EtlLoader +from state import JsonFileStorage, State + +app = typer.Typer() + + +async def resend_user(iblock_id: int, element_id: int): + extractor = ApiExtractor() + storage = JsonFileStorage() + state = State(storage=storage) + loader = EtlLoader() + abitrs = await extractor.get_extract_data( + iblock_id=iblock_id, + element_id=element_id - 1, + state=state, + storage=storage, + resend=True + ) + if len(abitrs) == 0: + raise ValueError('Не нашлось абитуриентов с заданным условием') + if element_id not in [abitr.ID for abitr in abitrs]: + raise ValueError('Не нашлось абитуриентов с заданным условием') + for abitr in abitrs: + if abitr.ID != element_id: + continue + await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id), resend=True) + + +@app.command() +def resend( + iblock_id: int = typer.Option(..., help="ID инфоблока с абитуриентами"), + element_id: int = typer.Option(..., help="ID абитуриента для переотправки") +): + try: + asyncio.run(resend_user(iblock_id=iblock_id, element_id=element_id)) + except ValueError as e: + print(e) + + +@app.command() +def main(): + pass + + +if __name__ == "__main__": + asyncio.run(app())