1
0
mirror of https://github.com/ijaric/voice_assistant.git synced 2025-05-24 14:33:26 +00:00

feat: Настройки БД & async клиент

This commit is contained in:
Artem Litvinov 2023-09-23 21:17:45 +01:00
parent 852242efd4
commit b43ac44da9
5 changed files with 44 additions and 27 deletions

View File

@ -8,7 +8,7 @@ import lib.app.split_settings as app_split_settings
class Settings(pydantic_settings.BaseSettings): class Settings(pydantic_settings.BaseSettings):
api: app_split_settings.ApiSettings = pydantic.Field(default_factory=lambda: app_split_settings.ApiSettings()) api: app_split_settings.ApiSettings = pydantic.Field(default_factory=lambda: app_split_settings.ApiSettings())
postgres: app_split_settings.PostgresSettings = pydantic.Field( db: app_split_settings.PostgresSettings = pydantic.Field(
default_factory=lambda: app_split_settings.PostgresSettings() default_factory=lambda: app_split_settings.PostgresSettings()
) )
logger: app_split_settings.LoggingSettings = pydantic.Field( logger: app_split_settings.LoggingSettings = pydantic.Field(

View File

@ -1,12 +1,15 @@
from .api import * from .api import *
from .logger import * from .logger import *
from .postgres import * from .postgres import *
from .postgres import DBSettings, PostgresSettings
from .project import * from .project import *
__all__ = [ __all__ = [
"ApiSettings", "ApiSettings",
"DBSettings",
"LoggingSettings", "LoggingSettings",
"get_logging_config", "PostgresSettings",
"PostgresSettings", "PostgresSettings",
"ProjectSettings", "ProjectSettings",
"get_logging_config",
] ]

View File

@ -4,7 +4,23 @@ import pydantic_settings
import lib.app.split_settings.utils as app_split_settings_utils import lib.app.split_settings.utils as app_split_settings_utils
class PostgresSettings(pydantic_settings.BaseSettings): class DBSettings(pydantic_settings.BaseSettings):
"""Abstract class for database settings."""
protocol: str
name: str
host: str
port: int
user: str
password: pydantic.SecretStr
@property
def dsn(self) -> str:
"""Get database DSN."""
return f"{self.protocol}://{self.user}:{self.password}@{self.host}:{self.port}/{self.name}"
class PostgresSettings(DBSettings):
model_config = pydantic_settings.SettingsConfigDict( model_config = pydantic_settings.SettingsConfigDict(
env_file=app_split_settings_utils.ENV_PATH, env_file=app_split_settings_utils.ENV_PATH,
env_prefix="POSTGRES_", env_prefix="POSTGRES_",
@ -12,6 +28,7 @@ class PostgresSettings(pydantic_settings.BaseSettings):
extra="ignore", extra="ignore",
) )
protocol: str = "postgresql+asyncpg"
name: str = "database_name" name: str = "database_name"
host: str = "localhost" host: str = "localhost"
port: int = 5432 port: int = 5432

View File

@ -1,3 +1,7 @@
# from libs.api.models.movies import * import sqlalchemy.orm
# from libs.api.models.notification_templates import *
# from libs.api.models.users import *
class Base(sqlalchemy.orm.DeclarativeBase):
"""Base Model Class for ORM Models and Alembic."""
pass

View File

@ -1,38 +1,31 @@
import typing import typing
from sqlalchemy.ext.asyncio import AsyncSession, async_sessionmaker, create_async_engine import sqlalchemy.ext.asyncio as sa_asyncio
from sqlalchemy.orm import DeclarativeBase
import lib.app.settings as app_settings import lib.app.settings as app_settings
settings = app_settings.settings # settings = app_settings.settings
# Создаём базовый класс для будущих моделей
class Base(DeclarativeBase):
pass
# Создаём движок
# Настройки подключения к БД передаём из переменных окружения, которые заранее загружены в файл настроек
class AsyncDB: class AsyncDB:
def __init__(self): """Async DB connection."""
self.database_dsn = (
f"postgresql+asyncpg://{settings.db.user}:{settings.db.password}" def __init__(self, settings: app_settings.Settings):
f"@{settings.db.host}:{settings.db.port}/{settings.db.name}" self.database_dsn = settings.db.dsn
self.engine = sa_asyncio.create_async_engine(self.database_dsn, echo=settings.project.debug, future=True)
self.async_session = sa_asyncio.async_sessionmaker(
self.engine, class_=sa_asyncio.AsyncSession, expire_on_commit=False
) )
self.engine = create_async_engine(self.database_dsn, echo=settings.project.debug, future=True)
self.async_session = async_sessionmaker(self.engine, class_=AsyncSession, expire_on_commit=False)
db = AsyncDB() async def get_session(settings: app_settings.Settings) -> typing.AsyncGenerator[sa_asyncio.AsyncSession, typing.Any]:
db = AsyncDB(settings)
async def get_session() -> typing.AsyncGenerator[AsyncSession, typing.Any]:
async with db.async_session() as session: async with db.async_session() as session:
try: try:
yield session yield session
except Exception: except Exception:
await session.rollback() await session.rollback()
raise raise
finally:
await session.close()
await db.engine.dispose()