mirror of
https://github.com/ijaric/voice_assistant.git
synced 2025-05-24 14:33:26 +00:00
24 lines
686 B
Python
24 lines
686 B
Python
import contextlib
|
|
import typing
|
|
|
|
import lib.api.schemas as api_schemas
|
|
|
|
|
|
class BrokerPublisher:
|
|
def __init__(self, broker_class: typing.Type, settings: object):
|
|
self.broker = broker_class(settings)
|
|
|
|
async def connect(self):
|
|
await self.broker.connect()
|
|
|
|
async def dispose(self):
|
|
await self.broker.dispose()
|
|
|
|
async def publish_message(self, message_body: api_schemas.BrokerMessage, routing_key: str):
|
|
await self.broker.publish_message(message_body, routing_key)
|
|
|
|
@contextlib.asynccontextmanager
|
|
async def get_connection(self) -> typing.AsyncGenerator:
|
|
async with self.broker.get_connection() as conn:
|
|
yield conn
|