Код перемещён в директорию src

This commit is contained in:
2023-06-05 15:51:59 +03:00
parent fdd40e53e1
commit 44c6262990
6 changed files with 1 additions and 1 deletions

74
src/extractor.py Normal file
View File

@@ -0,0 +1,74 @@
import logging
import json
import os
import aiohttp
import backoff
from environs import load_dotenv
from models import Abitr
from settings import ApiConfig
from state import State
load_dotenv()
class ApiExtractor:
def __init__(self):
self.api_config = ApiConfig()
self.headers = {"Authorization-Token": self.api_config.token}
self.fields = ["ID", "IBLOCK_ID", "NAME", "CODE", "SECTION_ID"]
@backoff.on_exception(backoff.expo, (aiohttp.ClientResponseError, aiohttp.ClientConnectorError), base=2, factor=1,
max_value=5, max_tries=None)
async def get_extract_data(self, state: State, iblock_id: int, fields: list = None, **kwargs) -> list[Abitr]:
data = {
'iblockId': iblock_id,
'order': json.dumps({'ID': 'ASC'}, ensure_ascii=False)
}
if fields is not None:
data['fields'] = json.dumps(self.fields + fields, ensure_ascii=False)
else:
data['fields'] = json.dumps(self.fields + ["PROPERTY_*"], ensure_ascii=False)
min_id = state.get_state(f'iblock_{iblock_id}') or os.getenv('LAST_ID') or 0
data['bitrFilter'] = json.dumps({'>ID': str(min_id)}, ensure_ascii=False)
abitrs = []
async with aiohttp.ClientSession() as session:
async with session.get(f'{self.api_config.host}/getnativeiblockelementslist', headers=self.headers,
data=data) as resp:
results = json.loads(await resp.text())
if len(results) > 0:
logging.info(f'Получение абитуриентов ID > {min_id}. Iblock - {iblock_id}')
for result in results:
data = {
'elementId': result,
}
async with session.get(f'{self.api_config.host}/getiblockelement/', headers=self.headers,
data=data) as resp:
res = json.loads(await resp.text())
res = res[str(result)]
class_attrs = vars(Abitr)['__annotations__']
try:
class_attrs.pop('ID')
except KeyError:
pass
abitr = {}
abitr['ID'] = res['ID']
for key, value in class_attrs.items():
if key in res:
abitr[key] = res[key]['VALUE']
if res['BENEFITS']['VALUE'] != '-' or res['BENEFITS2']['VALUE'] != '-':
if res['BENEFITS']['VALUE'] != '-':
abitr['BENEFITS'] = res['BENEFITS']['VALUE']
abitr['PRIVILEGES_QUESTION'] = 2
if res['BENEFITS2']['VALUE'] != '-':
abitr['BENEFITS'] = res['BENEFITS2']['VALUE']
abitr['PRIVILEGES_QUESTION'] = 1
abitrs.append(Abitr(**{key: value for key, value in abitr.items()}))
if len(abitrs) > 0:
logging.info(f'Получено абитуриентов - {len(abitrs)}')
return abitrs

42
src/loader.py Normal file
View File

@@ -0,0 +1,42 @@
import asyncio
import json
import logging
import time
import aiohttp
import backoff as backoff
from environs import load_dotenv
from models import Abitr
from settings import ApiConfig, EtlConfig
from state import State, BaseStorage
load_dotenv()
class EtlLoader:
def __init__(self):
self.etl = EtlConfig()
self.api_config = ApiConfig()
@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, ), base=2, factor=1,
max_value=5, max_tries=None)
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage):
async with aiohttp.ClientSession() as session:
logging.info(f"Информация об абитуриенте: {abitr.FIO}")
data_dict = {k: v for k, v in abitr.dict(exclude_none=True).items() if v != '' and v != '-'}
data_dict.pop('ID')
logging.info('Отправка данных к принимающей стороне... ')
data_dict = json.loads(json.dumps(data_dict, ensure_ascii=False))
async with session.post(
url=f"{self.etl.protocol}://{self.etl.host}:{self.etl.port}",
data=data_dict,
timeout=aiohttp.ClientTimeout(total=15)
) as response:
pass
state.set_state(f'iblock_{iblock_id}', abitr.ID)
storage.save_state(state.local_state)
logging.info('Данные успешно доставлены!')
await asyncio.sleep(1)

33
src/main.py Normal file
View File

@@ -0,0 +1,33 @@
import logging
import asyncio
from environs import load_dotenv
from extractor import ApiExtractor
from loader import EtlLoader
from settings import ApiConfig
from state import JsonFileStorage, State
load_dotenv()
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(funcName)s - %(message)s')
async def main():
extractor = ApiExtractor()
storage = JsonFileStorage()
api = ApiConfig()
state = State(storage=storage)
loader = EtlLoader()
logging.info('Обработчик готов к работе.')
while True:
for iblock_id in api.iblocks:
abitrs = await extractor.get_extract_data(iblock_id=int(iblock_id), state=state)
for abitr in abitrs:
await loader.load_data(state=state, abitr=abitr, storage=storage, iblock_id=int(iblock_id))
await asyncio.sleep(10)
if __name__ == '__main__':
asyncio.run(main())

5
src/requirements.txt Normal file
View File

@@ -0,0 +1,5 @@
pydantic==1.10.7
environs==9.5.0
requests==2.30.0
aiohttp~=3.8.4
backoff~=2.2.1

20
src/settings.py Normal file
View File

@@ -0,0 +1,20 @@
from dataclasses import dataclass, field
import os
from environs import load_dotenv
from typing import List
load_dotenv()
@dataclass
class ApiConfig:
host: str = os.getenv('API_HOST')
token: str = os.getenv('API_TOKEN')
iblocks: List[str] = field(default_factory=lambda: os.getenv('IBLOCKS').split(','))
@dataclass
class EtlConfig:
host: str = os.environ.get('DESTINATION_HOST')
port: int = int(os.environ.get('DESTINATION_PORT'))
protocol: str = os.environ.get('DESTINATION_PROTOCOL')

65
src/state.py Normal file
View File

@@ -0,0 +1,65 @@
import abc
from typing import Any, Dict
import json
import os
class BaseStorage(abc.ABC):
"""Абстрактное хранилище состояния.
Позволяет сохранять и получать состояние.
"""
@abc.abstractmethod
def save_state(self, state: Dict[str, Any]) -> None:
"""Сохранить состояние в хранилище."""
raise NotImplementedError
@abc.abstractmethod
def retrieve_state(self) -> Dict[str, Any]:
"""Получить состояние из хранилища."""
raise NotImplementedError
class JsonFileStorage(BaseStorage):
"""Реализация хранилища, использующего локальный файл.
Формат хранения: JSON
"""
def __init__(self, file_path: str = 'state.json') -> None:
self.file_path = file_path
try:
with open(file_path, 'r', encoding='UTF-8') as f:
self.local_storage = json.load(f)
except FileNotFoundError:
self.local_storage = {}
def save_state(self, state: Dict[str, Any]) -> None:
"""Сохранить состояние в хранилище."""
with open(self.file_path, 'w+', encoding='UTF-8') as f:
json.dump(state, f, ensure_ascii=False)
def retrieve_state(self) -> Dict[str, Any]:
"""Получить состояние из хранилища."""
if not os.path.exists(self.file_path):
return {}
with open(self.file_path, 'r') as file:
return json.load(file)
class State:
"""Класс для работы с состояниями."""
def __init__(self, storage: BaseStorage) -> None:
self.storage = storage
self.local_state = storage.retrieve_state()
def set_state(self, key: str, value: Any) -> None:
"""Установить состояние для определённого ключа."""
self.local_state[key] = value
def get_state(self, key: str) -> Any:
"""Получить состояние по определённому ключу."""
return self.local_state.get(key)