Compare commits
2 Commits
361afad6fe
...
264c9ed484
Author | SHA1 | Date | |
---|---|---|---|
264c9ed484 | |||
a5827f9168 |
|
@ -4,5 +4,9 @@ IBLOCKS=1,2,3
|
||||||
DESTINATION_HOST=127.0.0.1
|
DESTINATION_HOST=127.0.0.1
|
||||||
DESTINATION_PORT=8000
|
DESTINATION_PORT=8000
|
||||||
DESTINATION_PROTOCOL=http
|
DESTINATION_PROTOCOL=http
|
||||||
LAST_ID=0
|
|
||||||
SIGNED=ERIGJHEJKRGH
|
SIGNED=ERIGJHEJKRGH
|
||||||
|
|
||||||
|
|
||||||
|
# ETL_PROCESS
|
||||||
|
LAST_ID=0
|
||||||
|
MAX_WAIT_SIZE=60
|
|
@ -19,4 +19,5 @@
|
||||||
- `DESTINATION_PORT`: порт назначения.
|
- `DESTINATION_PORT`: порт назначения.
|
||||||
- `DESTINATION_PROTOCOL`: протокол назначения (http или https).
|
- `DESTINATION_PROTOCOL`: протокол назначения (http или https).
|
||||||
- `LAST_ID`: последний успешно обработанный ID.
|
- `LAST_ID`: последний успешно обработанный ID.
|
||||||
|
- `MAX_WAIT_SIZE`: максимальное время ожидания backoff.
|
||||||
4. После настройки переменных, запустите проект при помощи docker-compose командой `docker-compose up -d`.
|
4. После настройки переменных, запустите проект при помощи docker-compose командой `docker-compose up -d`.
|
|
@ -7,10 +7,11 @@ import backoff
|
||||||
from environs import load_dotenv
|
from environs import load_dotenv
|
||||||
|
|
||||||
from models import Abitr
|
from models import Abitr
|
||||||
from settings import ApiConfig
|
from settings import ApiConfig, EtlConfig
|
||||||
from state import State
|
from state import State
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
etl_config = EtlConfig()
|
||||||
|
|
||||||
class ApiExtractor:
|
class ApiExtractor:
|
||||||
|
|
||||||
|
@ -21,7 +22,7 @@ class ApiExtractor:
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, (aiohttp.ClientResponseError, aiohttp.ClientConnectorError,
|
@backoff.on_exception(backoff.expo, (aiohttp.ClientResponseError, aiohttp.ClientConnectorError,
|
||||||
aiohttp.ServerDisconnectedError), base=2, factor=1,
|
aiohttp.ServerDisconnectedError), base=2, factor=1,
|
||||||
max_value=5, max_tries=None)
|
max_value=etl_config.max_wait_size, max_tries=None)
|
||||||
async def get_extract_data(self, state: State, iblock_id: int, fields: list = None, **kwargs) -> list[Abitr]:
|
async def get_extract_data(self, state: State, iblock_id: int, fields: list = None, **kwargs) -> list[Abitr]:
|
||||||
|
|
||||||
data = {
|
data = {
|
||||||
|
|
|
@ -12,6 +12,7 @@ from settings import ApiConfig, EtlConfig
|
||||||
from state import State, BaseStorage
|
from state import State, BaseStorage
|
||||||
|
|
||||||
load_dotenv()
|
load_dotenv()
|
||||||
|
etl_config = EtlConfig()
|
||||||
|
|
||||||
|
|
||||||
class EtlLoader:
|
class EtlLoader:
|
||||||
|
@ -21,7 +22,7 @@ class EtlLoader:
|
||||||
self.api_config = ApiConfig()
|
self.api_config = ApiConfig()
|
||||||
|
|
||||||
@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, ), base=2, factor=1,
|
@backoff.on_exception(backoff.expo, (asyncio.TimeoutError, ), base=2, factor=1,
|
||||||
max_value=5, max_tries=None)
|
max_value=etl_config.max_wait_size, max_tries=None)
|
||||||
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage):
|
async def load_data(self, state: State, abitr: Abitr, iblock_id: int, storage: BaseStorage):
|
||||||
async with aiohttp.ClientSession() as session:
|
async with aiohttp.ClientSession() as session:
|
||||||
logging.info(f"Информация об абитуриенте: {abitr.FIO}")
|
logging.info(f"Информация об абитуриенте: {abitr.FIO}")
|
||||||
|
|
|
@ -18,3 +18,4 @@ class EtlConfig:
|
||||||
host: str = os.environ.get('DESTINATION_HOST')
|
host: str = os.environ.get('DESTINATION_HOST')
|
||||||
port: int = int(os.environ.get('DESTINATION_PORT'))
|
port: int = int(os.environ.get('DESTINATION_PORT'))
|
||||||
protocol: str = os.environ.get('DESTINATION_PROTOCOL')
|
protocol: str = os.environ.get('DESTINATION_PROTOCOL')
|
||||||
|
max_wait_size: int = os.environ.get('MAX_WAIT_SIZE', 60)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user