Скрипт повторной отправки пользователя

This commit is contained in:
Григорич 2023-07-18 13:19:56 +03:00
parent e5794029ab
commit d82d80aced
5 changed files with 88 additions and 19 deletions

View File

@ -30,3 +30,13 @@
1. Выполнить п.1-3 из предыдущего списка 1. Выполнить п.1-3 из предыдущего списка
2. Выполнить команду `pip install -r requirements.txt` 2. Выполнить команду `pip install -r requirements.txt`
3. Запускать main.py из директории src 3. Запускать main.py из директории src
## Повторная отправка пользователя
Запустить `manage.py` из директории src c опцией `resend` и атрибутами `iblock-id` и `element-id`.
Подробее `manage.py resend --help`
### Примеры:
### Повторная отправка пользователя локально
`python manage.py resend --iblock-id 14 --element-id 2981`
### Повторная отправка пользователя в Docker
`docker exec etl python manage.py resend --iblock-id 14 --element-id 2981`

View File

@ -1,6 +1,9 @@
pydantic==1.10.7 aiogram~=2.25.1
environs==9.5.0
requests==2.30.0
aiohttp~=3.8.4 aiohttp~=3.8.4
backoff~=2.2.1 backoff~=2.2.1
aiogram~=2.25.1 environs==9.5.0
pydantic==1.10.7
requests==2.30.0
typer==0.9.0

View File

@ -42,7 +42,7 @@ class ApiExtractor:
aiohttp.ServerDisconnectedError), base=2, factor=1, aiohttp.ServerDisconnectedError), base=2, factor=1,
max_value=int(misc_settings.max_wait_size), max_tries=None) max_value=int(misc_settings.max_wait_size), max_tries=None)
async def get_extract_data(self, state: State, iblock_id: int, storage: BaseStorage, fields: list = None, async def get_extract_data(self, state: State, iblock_id: int, storage: BaseStorage, fields: list = None,
**kwargs) -> list[Abitr]: resend: bool = False, element_id: int = 0, **kwargs) -> list[Abitr]:
data = { data = {
'iblockId': iblock_id, 'iblockId': iblock_id,
@ -52,10 +52,13 @@ class ApiExtractor:
data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False) data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False)
else: else:
data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False) data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False)
min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0 if not resend:
if os.getenv('LAST_ID').isdigit(): min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0
if int(min_id) < int(os.getenv('LAST_ID')): if os.getenv('LAST_ID').isdigit():
min_id = os.getenv('LAST_ID') if int(min_id) < int(os.getenv('LAST_ID')):
min_id = os.getenv('LAST_ID')
else:
min_id = element_id
data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False) data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False)
abitrs = [] abitrs = []
@ -86,12 +89,14 @@ class ApiExtractor:
try: try:
abitrs.append(Abitr(**{key: value for key, value in abitr.items()})) abitrs.append(Abitr(**{key: value for key, value in abitr.items()}))
except pydantic.error_wrappers.ValidationError as e: except pydantic.error_wrappers.ValidationError as e:
logging.info(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}') if not resend:
tgbot = TgBot() logging.info(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
await tgbot.send_notify(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}') tgbot = TgBot()
state.set_state(f'iblock_{iblock_id}', abitr["ID"]) await tgbot.send_notify(f'Ошибка валидации ID {abitr["ID"]} - {str(e)}')
storage.save_state(state.local_state) state.set_state(f'iblock_{iblock_id}', abitr["ID"])
storage.save_state(state.local_state)
else:
raise ValueError(f'Данные абитуриента {abitr["ID"]} невалидны. {e}')
if len(abitrs) > 0: if len(abitrs) > 0:
logging.info(f'Получено абитуриентов - {len(abitrs)}') logging.info(f'Получено абитуриентов - {len(abitrs)}')

View File

@ -20,7 +20,7 @@ class EtlLoader:
self.etl = EtlConfig() self.etl = EtlConfig()
self.api_config = ApiConfig() self.api_config = ApiConfig()
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage): async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage, resend: bool = False):
async with aiohttp.ClientSession() as session: async with aiohttp.ClientSession() as session:
logging.info(f"Информация об абитуриенте: {abitr.FIO}") logging.info(f"Информация об абитуриенте: {abitr.FIO}")
data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'} data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'}
@ -36,8 +36,9 @@ class EtlLoader:
logging.info('Данные успешно доставлены!') logging.info('Данные успешно доставлены!')
except (asyncio.TimeoutError, aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError): except (asyncio.TimeoutError, aiohttp.ClientConnectorError, aiohttp.ServerDisconnectedError):
logging.error('Данные доставлены с ошибкой') logging.error('Данные доставлены с ошибкой')
state.set_state(f'iblock_{iblock_id}', abitr.ID) if not resend:
storage.save_state(state.local_state) state.set_state(f'iblock_{iblock_id}', abitr.ID)
storage.save_state(state.local_state)
await asyncio.sleep(1) await asyncio.sleep(1)

50
src/manage.py Normal file
View File

@ -0,0 +1,50 @@
import asyncio
import typer
from extractor import ApiExtractor
from loader import EtlLoader
from state import JsonFileStorage, State
app = typer.Typer()
async def resend_user(iblock_id: int, element_id: int):
extractor = ApiExtractor()
storage = JsonFileStorage()
state = State(storage=storage)
loader = EtlLoader()
abitrs = await extractor.get_extract_data(
iblock_id=iblock_id,
element_id=element_id - 1,
state=state,
storage=storage,
resend=True
)
if len(abitrs) == 0:
raise ValueError('Не нашлось абитуриентов с заданным условием')
if element_id not in [abitr.ID for abitr in abitrs]:
raise ValueError('Не нашлось абитуриентов с заданным условием')
for abitr in abitrs:
if abitr.ID != element_id:
continue
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id), resend=True)
@app.command()
def resend(
iblock_id: int = typer.Option(..., help="ID инфоблока с абитуриентами"),
element_id: int = typer.Option(..., help="ID абитуриента для переотправки")
):
try:
asyncio.run(resend_user(iblock_id=iblock_id, element_id=element_id))
except ValueError as e:
print(e)
@app.command()
def main():
pass
if __name__ == "__main__":
asyncio.run(app())