Skip to content

Code Docs

fastapi_accelerator.auth_jwt

Модуль для аутентификации по JWT

BaseAuthJWT

Пример:

class AuthJWT(BaseAuthJWT):
    async def check_auth(username: str, password: str) -> bool:
        """Проверка введенного логина и пароля."""
        return username == "admin" and password == "admin"

    async def add_jwt_body(username: str) -> dict:
        """Функция для добавление дополнительных данных в JWT токен пользователя"""
        return {"version": username.title()}


# Подключить аутентификацию по JWT
AuthJWT.mount_auth(app)

Пример защиты API метода:

@app.get("/cheack_protected", summary="Проверить аутентификацию по JWT")
async def protected_route(jwt: dict = Depends(jwt_auth)):
    return {"message": "This is a protected route", "user": jwt}
Source code in fastapi_accelerator/auth_jwt.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class BaseAuthJWT:
    '''
    Пример:

    ```python
    class AuthJWT(BaseAuthJWT):
        async def check_auth(username: str, password: str) -> bool:
            """Проверка введенного логина и пароля."""
            return username == "admin" and password == "admin"

        async def add_jwt_body(username: str) -> dict:
            """Функция для добавление дополнительных данных в JWT токен пользователя"""
            return {"version": username.title()}


    # Подключить аутентификацию по JWT
    AuthJWT.mount_auth(app)
    ```

    Пример защиты API метода:

    ```python
    @app.get("/cheack_protected", summary="Проверить аутентификацию по JWT")
    async def protected_route(jwt: dict = Depends(jwt_auth)):
        return {"message": "This is a protected route", "user": jwt}
    ```
    '''

    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    # Установиться автоматически в mount_auth
    secret_key = None

    @abc.abstractmethod
    async def check_auth(username: str, password: str) -> bool:
        """Проверка введенного логина и пароля."""
        raise NotImplementedError()

    @abc.abstractmethod
    async def add_jwt_body(username: str) -> dict:
        """Функция для добавление дополнительных данных в JWT токен пользователя"""

    @classmethod
    def mount_auth(cls, app: FastAPI):
        """Подключение аутентификации по JWT"""
        # Установить класс для аутентификации
        app.state.auth_jwt = cls
        cls.secret_key = app.state.SECRET_KEY

        @app.post("/token", summary="Аутентификация по JWT", tags=["common"])
        async def login(
            user: Annotated[OAuth2PasswordRequestForm, Depends()],
        ) -> Token:
            if await cls.check_auth(user.username, user.password):
                return Token(
                    access_token=cls._create_access_token(
                        data={
                            "sub": user.username,
                            **await cls.add_jwt_body(user.username),
                        },
                    ),
                    token_type="bearer",
                )
            else:
                raise HTTPException(status_code=401, detail="Invalid credentials")

        @app.get(
            "/check_protected",
            summary="Проверить аутентификацию по JWT",
            tags=["common"],
        )
        async def protected_route(request: Request, jwt: dict = Depends(jwt_auth)):
            return {"message": "This is a protected route", "user": jwt}

        return login, protected_route

    @classmethod
    def _create_access_token(
        cls,
        data: dict,
        expires_delta: Union[timedelta, None] = None,
    ) -> str:
        """Создание JWT токена"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(
                minutes=cls.ACCESS_TOKEN_EXPIRE_MINUTES
            )
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, cls.secret_key, algorithm=cls.ALGORITHM)
        return encoded_jwt

    @classmethod
    def _verify_token(cls, token: str) -> Optional[dict]:
        """Проверка валидности JWT токена"""
        try:
            payload = jwt.decode(token, cls.secret_key, algorithms=[cls.ALGORITHM])
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

add_jwt_body(username) abstractmethod async

Функция для добавление дополнительных данных в JWT токен пользователя

Source code in fastapi_accelerator/auth_jwt.py
60
61
62
@abc.abstractmethod
async def add_jwt_body(username: str) -> dict:
    """Функция для добавление дополнительных данных в JWT токен пользователя"""

check_auth(username, password) abstractmethod async

Проверка введенного логина и пароля.

Source code in fastapi_accelerator/auth_jwt.py
55
56
57
58
@abc.abstractmethod
async def check_auth(username: str, password: str) -> bool:
    """Проверка введенного логина и пароля."""
    raise NotImplementedError()

mount_auth(app) classmethod

Подключение аутентификации по JWT

Source code in fastapi_accelerator/auth_jwt.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@classmethod
def mount_auth(cls, app: FastAPI):
    """Подключение аутентификации по JWT"""
    # Установить класс для аутентификации
    app.state.auth_jwt = cls
    cls.secret_key = app.state.SECRET_KEY

    @app.post("/token", summary="Аутентификация по JWT", tags=["common"])
    async def login(
        user: Annotated[OAuth2PasswordRequestForm, Depends()],
    ) -> Token:
        if await cls.check_auth(user.username, user.password):
            return Token(
                access_token=cls._create_access_token(
                    data={
                        "sub": user.username,
                        **await cls.add_jwt_body(user.username),
                    },
                ),
                token_type="bearer",
            )
        else:
            raise HTTPException(status_code=401, detail="Invalid credentials")

    @app.get(
        "/check_protected",
        summary="Проверить аутентификацию по JWT",
        tags=["common"],
    )
    async def protected_route(request: Request, jwt: dict = Depends(jwt_auth)):
        return {"message": "This is a protected route", "user": jwt}

    return login, protected_route

jwt_auth(request, token=Depends(oauth2_scheme))

Depends для проверки JWT

Source code in fastapi_accelerator/auth_jwt.py
128
129
130
131
132
133
134
135
136
137
138
def jwt_auth(request: Request, token: str = Depends(oauth2_scheme)) -> dict:
    """Depends для проверки JWT"""
    auth_jwt: BaseAuthJWT = AUTH_JWT() or AUTH_JWT(request.app)
    payload = auth_jwt._verify_token(token)
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return payload

fastapi_accelerator.cache

Работа с кеш Redis

cache_redis(cache_class, cache_ttl, cache=True)

Декоратор для кеширования ответов API в Redis.

Parameters:

Name Type Description Default
cache_class BaseCache

Класс кеша для взаимодействия с Redis.

required
cache_ttl timedelta

Время жизни кеша.

required
cache bool

Флаг, включающий или отключающий кеширование.

True
class ViewSetRetrieve(BaseViewSet):
    def register_routes(self):
        '''Регистраций API обработчиков'''

        @self.router.get(f"{self.prefix}/{{item_uid}}", tags=self.tags)
        @cache_redis(self.cache, self.cache_class, self.cache_ttl)
        async def get_item(
            request: Request,
            item_uid: str = Path(...),
            aorm: OrmAsync = Depends(AppOrm.aget_orm),
        ) -> self.pydantic_model:
            response = await aorm.get(
                select(self.db_model).filter(self._name_pk == item_uid)
            )
            return response

        return get_item
Source code in fastapi_accelerator/cache.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
def cache_redis(cache_class: BaseCache, cache_ttl: timedelta, cache: bool = True):
    """
    Декоратор для кеширования ответов API в Redis.

    Args:
        cache_class: Класс кеша для взаимодействия с Redis.
        cache_ttl: Время жизни кеша.
        cache: Флаг, включающий или отключающий кеширование.

    ```python
    class ViewSetRetrieve(BaseViewSet):
        def register_routes(self):
            '''Регистраций API обработчиков'''

            @self.router.get(f"{self.prefix}/{{item_uid}}", tags=self.tags)
            @cache_redis(self.cache, self.cache_class, self.cache_ttl)
            async def get_item(
                request: Request,
                item_uid: str = Path(...),
                aorm: OrmAsync = Depends(AppOrm.aget_orm),
            ) -> self.pydantic_model:
                response = await aorm.get(
                    select(self.db_model).filter(self._name_pk == item_uid)
                )
                return response

            return get_item
    ```
    """

    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            request: Request = kwargs["request"]

            cache_status = CACHE_STATUS() or CACHE_STATUS(request.app)

            if not cache_status or not cache:
                # кеш отключен
                return await func(*args, **kwargs)

            key_cache: str = f"{request.url.path}?{request.url.query}"
            cache_response = await cache_class.get(key_cache)
            if cache_response:
                # Создаем ответ с заголовком, указывающим, что данные из кеша
                return JSONResponse(
                    content=json.loads(cache_response), headers={"X-Cache": "HIT"}
                )

            response = await func(*args, **kwargs)

            json_data = jsonable_encoder(response)
            await cache_class.set(key_cache, json.dumps(json_data), ex=cache_ttl)
            # Возвращаем результат с заголовком
            return JSONResponse(content=json_data, headers={"X-Cache": "MISS"})

        return wrapper

    return decorator

fastapi_accelerator.exception

custom_http_exception_handler(request, exc) async

Кастомный обработки исключений

Подключение:

from starlette.exceptions import HTTPException as StarletteHTTPException
app.exception_handler(StarletteHTTPException)(custom_http_exception_handler)
Source code in fastapi_accelerator/exception.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
async def custom_http_exception_handler(request: Request, exc: StarletteHTTPException):
    """Кастомный обработки исключений

    Подключение:

        from starlette.exceptions import HTTPException as StarletteHTTPException
        app.exception_handler(StarletteHTTPException)(custom_http_exception_handler)
    """
    exc_status_code = exc.status_code
    if exc_status_code in (400, 401, 403, 404, 429, 503, 504):
        debug = DEBUG() or DEBUG(request.app)
        timezone = TIMEZONE() or TIMEZONE(request.app)

        content = {}
        content["detail"] = exc.detail
        content["context"] = request_log_format(request, exc_status_code)
        content["erorr_id"] = str(uuid.uuid4())
        content["datetime"] = get_datetime_now(timezone).isoformat()
        content["host"] = request.headers["host"]
        content["http_method"] = request.method
        content["request_path"] = request.url.path
        content["query_params"] = dict(request.query_params)
        content["user-agent"] = request.headers.get("User-Agent", "unknown")

        # Добавляем трассировку стека в debug режиме
        if debug:
            content["traceback"] = traceback.format_exc()
        return JSONResponse(
            status_code=exc_status_code, content=content, headers=exc.headers
        )
    return await http_exception_handler(request, exc)

fastapi_accelerator.middleware

log_request_response(request, call_next) async

Логировать время выполение API запроса

Подключение:

app.middleware('http')(log_request_response)

Source code in fastapi_accelerator/middleware.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
async def log_request_response(request: Request, call_next):
    """Логировать время выполение API запроса

    Подключение:

    app.middleware('http')(log_request_response)
    """
    start_time = time.perf_counter()
    response = await call_next(request)
    process_time = (time.perf_counter() - start_time) * 1000

    debug = DEBUG() or DEBUG(request.app)
    if debug:
        logger = logging.getLogger("uvicorn")
        logger.info(request_log_format(request, response.status_code, process_time))
    response.headers["X-Process-Time"] = f"{process_time:.2f} ms"
    return response

request_log_format(request, status_code, process_time=None)

Форматировать запрос в лог строку

Source code in fastapi_accelerator/middleware.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def request_log_format(
    request: Request, status_code: int, process_time: float = None
) -> str:
    """Форматировать запрос в лог строку"""
    query = request.url.query
    return (
        '{host}:{port} - "{method} {path}{query}" - {status_code}{process_time}'.format(
            host=request.client.host,
            port=request.client.port,
            method=request.method,
            path=request.url.path,
            query=f"?{query}" if query else "",
            status_code=status_code,
            process_time=f" - [{process_time:.2f} ms]" if process_time else "",
        )
    )

options: show_source: true heading_level: 3

fastapi_accelerator.paginator

Модуль дял пагиации овтета

options: show_source: true heading_level: 3

fastapi_accelerator.timezone

Модуль для работы со временем и временными зонами

get_datetime_now(timezone)

Получить текущие время с учетом тайм зоны

Установка врменной зоны для проекта

app.state.TIMEZONE = moscow_tz

Source code in fastapi_accelerator/timezone.py
13
14
15
16
17
18
19
def get_datetime_now(timezone: pytz.timezone) -> datetime:
    """Получить текущие время с учетом тайм зоны

    # Установка врменной зоны для проекта
    app.state.TIMEZONE = moscow_tz
    """
    return datetime.now(timezone)

fastapi_accelerator.utils

NoInstanceMeta

Bases: type

Нельзя создавать экземпляры этого класса

Source code in fastapi_accelerator/utils.py
44
45
46
47
48
class NoInstanceMeta(type):
    """Нельзя создавать экземпляры этого класса"""

    def __call__(cls, *args, **kwargs):
        raise TypeError("You cannot create instances of this class")

SingletonMeta

Bases: type

Мета класс для реализации паттерна Одиночка

Source code in fastapi_accelerator/utils.py
 6
 7
 8
 9
10
11
12
13
14
class SingletonMeta(type):
    """Мета класс для реализации паттерна Одиночка"""

    instance = None

    def __call__(cls, *args, **kwargs):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kwargs)
        return cls.instance

run_async(async_function)

Синхронная обертка для асинхронного вызова

Source code in fastapi_accelerator/utils.py
56
57
58
def run_async(async_function):
    """Синхронная обертка для асинхронного вызова"""
    return asyncio.get_event_loop().run_until_complete(async_function)

singleton(func)

Декоратор для реализации паттерна Одиночка

Пример использования:

@singleton
def CACHE_STATUS(app=None) -> bool:
    return app.state.CACHE_STATUS

cache_status = CACHE_STATUS() or CACHE_STATUS(request.app)
Source code in fastapi_accelerator/utils.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def singleton(func):
    """Декоратор для реализации паттерна Одиночка

    Пример использования:

        @singleton
        def CACHE_STATUS(app=None) -> bool:
            return app.state.CACHE_STATUS

        cache_status = CACHE_STATUS() or CACHE_STATUS(request.app)
    """

    instance = None

    def wrapper(*args, **kwargs):
        nonlocal instance
        if instance is not None:
            return instance
        if instance is None and (not args and not kwargs):
            return False
        if instance is None:
            instance = func(*args, **kwargs)
        return instance

    return wrapper

to_namedtuple(**kwargs)

Вернуть именованный кортеж

Source code in fastapi_accelerator/utils.py
51
52
53
def to_namedtuple(**kwargs: dict[str, Any]) -> NamedTuple:
    """Вернуть именованный кортеж"""
    return namedtuple("CommonNameTuple", kwargs.keys())(**kwargs)

fastapi_accelerator.viewset

Модуль для высокоуровневой работы с API обработками используя подход ViewSet которые автоматизируют базовый CRUD функционал, и поваляют гибко переопределять логику ViewSet

AppOrm

Брать методы получения сессии из настроек APP DATABASE_MANAGER

Source code in fastapi_accelerator/viewset.py
30
31
32
33
34
35
36
37
38
39
40
class AppOrm:
    """Брать методы получения сессии из настроек APP DATABASE_MANAGER"""

    @classmethod
    async def aget_orm(cls, request: Request):
        """Метод получения сессии берем из настоек APP"""
        database_manager: MainDatabaseManager = DATABASE_MANAGER() or DATABASE_MANAGER(
            request.app
        )
        async for orm in database_manager.aget_orm():
            yield orm

aget_orm(request) async classmethod

Метод получения сессии берем из настоек APP

Source code in fastapi_accelerator/viewset.py
33
34
35
36
37
38
39
40
@classmethod
async def aget_orm(cls, request: Request):
    """Метод получения сессии берем из настоек APP"""
    database_manager: MainDatabaseManager = DATABASE_MANAGER() or DATABASE_MANAGER(
        request.app
    )
    async for orm in database_manager.aget_orm():
        yield orm

BaseViewSet

Bases: ABC

Source code in fastapi_accelerator/viewset.py
25
26
27
class BaseViewSet(abc.ABC):
    def _register_endpoint(self):
        """Регистраций API обработчиков"""

GenericViewSet

from app.api.v1.schemas.file import File from app.models.file import File as FileDb

router = APIRouter()

class FileViewSet(FullViewSet): db_model = FileDb pydantic_model = File

router.views = [ FileViewSet().as_view(router, prefix="/file"), ]

Source code in fastapi_accelerator/viewset.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
class GenericViewSet:
    """
    from app.api.v1.schemas.file import File
    from app.models.file import File as FileDb

    router = APIRouter()

    class FileViewSet(FullViewSet):
        db_model = FileDb
        pydantic_model = File

    router.views = [
        FileViewSet().as_view(router, prefix="/file"),
    ]
    """

    # Модель БД SqlAlchemy
    db_model: Type
    # Модель Схемы
    pydantic_model: Type[BaseModel]

    # Теги для URL
    tags: Optional[List[str]]

    # Пагинация для List
    paginator_class: Optional[BasePaginatorClass]

    # Класс для кешировать ответ List, Retrieve
    cache_class: Optional[BaseCache]
    # Время жизни ответа в кеше
    cache_ttl: Optional[timedelta]

    # Включить поддержку вложенных схем pydantic
    # это значит что будет происходить рекурсивное создание, обновление, удаление связанных записей
    deep_schema: Optional[bool]

    # Зависимости на весь ViewSet
    # Например можно указать аутентификацию по JWT `[Depends(jwt_auth)]`
    dependencies: Optional[list[Callable]]

    # Какой тип у первичного ключа для Api метода
    type_item_id: Optional[type]

    def as_view(self, router: APIRouter, prefix: str):
        self._mro_class = self.__class__.mro()
        # Модель БД SqlAlchemy
        self.router: APIRouter = router
        # Модель Схемы
        self.db_model: Type = self.db_model
        self.pydantic_model: Type[BaseModel] = self.pydantic_model
        # Префикс для URL
        self.prefix: str = prefix
        # Теги
        self.tags: List[str] = getattr(self, "tags", [self._mro_class[0].__name__])
        # Имя столбца с первичным ключом
        self.name_pk: str = get_pk(self.db_model)
        # Включить поддержку вложенных схем
        self.deep_schema = getattr(self, "deep_schema", False)
        # Зависимости на весь ViewSet
        self.dependencies = getattr(self, "dependencies", [])
        # Пагинация для List
        self.paginator_class = getattr(self, "paginator_class", None)
        # Какой тип у первичного ключа для Api метода
        self.type_item_id = getattr(
            self,
            "type_item_id",
            # Попытаться определить тип автоматически по типу первичного ключа в модели
            SQL_TO_PYTHON_TYPE.get(
                class_mapper(self.db_model).primary_key[0].type.__class__,
                Union[str, int, UUID],
            ),
        )

        # Логика кеширования
        self.cache = False
        if cache_class := getattr(self, "cache_class", False):
            self.cache = True
            self.cache_class = cache_class
            self.cache_ttl = getattr(self, "cache_ttl", timedelta(seconds=3))
        else:
            self.cache_class = None
            self.cache_ttl = None

        # Подключить API обработчики
        for cls in self._mro_class:
            if cls in (
                ViewSetList,
                ViewSetCreate,
                ViewSetRetrieve,
                ViewSetUpdate,
                ViewSetDelete,
            ):
                cls._register_endpoint(self)

        # Документирование тегов
        self.openapi_tag = {"name": self.tags[0], "description": str(self)}
        return self

    def __str__(self) -> str:
        """Сформировать документацию для ViewSet"""
        description = "ViewSet"
        if doc := self._mro_class[0].__doc__:
            description = "{doc}: {db_model}{dependencies}{cache}{deep_schema}".format(
                doc=doc.strip(),
                db_model=f" **db_model**={self.db_model.__name__}",
                dependencies=(
                    f" **dependencies**=[{', '.join(d.dependency.__name__ for d in self.dependencies)}]"
                    if self.dependencies
                    else ""
                ),
                cache=(" **cache**=On" if self.cache else ""),
                deep_schema=(" **deep_schema**=On" if self.deep_schema else ""),
            )
        return description

__str__()

Сформировать документацию для ViewSet

Source code in fastapi_accelerator/viewset.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def __str__(self) -> str:
    """Сформировать документацию для ViewSet"""
    description = "ViewSet"
    if doc := self._mro_class[0].__doc__:
        description = "{doc}: {db_model}{dependencies}{cache}{deep_schema}".format(
            doc=doc.strip(),
            db_model=f" **db_model**={self.db_model.__name__}",
            dependencies=(
                f" **dependencies**=[{', '.join(d.dependency.__name__ for d in self.dependencies)}]"
                if self.dependencies
                else ""
            ),
            cache=(" **cache**=On" if self.cache else ""),
            deep_schema=(" **deep_schema**=On" if self.deep_schema else ""),
        )
    return description

ViewSetCreate

Bases: BaseViewSet

Source code in fastapi_accelerator/viewset.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
class ViewSetCreate(BaseViewSet):
    def _register_endpoint(self):
        self.create()

    def _to_db_item(self, item: Type[BaseModel]) -> T:
        """Преобразовать элемент pydantic в db_item"""
        db_item = (
            deep_instance(self.db_model, item)
            if self.deep_schema
            else self.db_model(**item.dict())
        )
        return db_item

    def create(self):
        """API обработки"""

        @self.router.post(
            f"{self.prefix}", tags=self.tags, dependencies=self.dependencies
        )
        async def create_item(
            item: self.pydantic_model, aorm: OrmAsync = Depends(AppOrm.aget_orm)
        ) -> self.pydantic_model:
            return await self.db_create(item, aorm)

        return create_item

    async def db_create(self, item: Type[BaseModel], aorm: OrmAsync) -> object:
        """Функция для создание записи в БД."""
        return await aorm.create_item(self._to_db_item(item), deep=self.deep_schema)

create()

API обработки

Source code in fastapi_accelerator/viewset.py
134
135
136
137
138
139
140
141
142
143
144
145
def create(self):
    """API обработки"""

    @self.router.post(
        f"{self.prefix}", tags=self.tags, dependencies=self.dependencies
    )
    async def create_item(
        item: self.pydantic_model, aorm: OrmAsync = Depends(AppOrm.aget_orm)
    ) -> self.pydantic_model:
        return await self.db_create(item, aorm)

    return create_item

db_create(item, aorm) async

Функция для создание записи в БД.

Source code in fastapi_accelerator/viewset.py
147
148
149
async def db_create(self, item: Type[BaseModel], aorm: OrmAsync) -> object:
    """Функция для создание записи в БД."""
    return await aorm.create_item(self._to_db_item(item), deep=self.deep_schema)

ViewSetDelete

Bases: BaseViewSet

Source code in fastapi_accelerator/viewset.py
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class ViewSetDelete(BaseViewSet):
    def _register_endpoint(self):
        self.delete()

    def delete(self):
        """API обработки"""

        @self.router.delete(
            f"{self.prefix}/{{item_id}}", tags=self.tags, dependencies=self.dependencies
        )
        async def delete_item(
            item_id: self.type_item_id, aorm: OrmAsync = Depends(AppOrm.aget_orm)
        ):
            return await self.db_delete(item_id, aorm)

        return delete_item

    async def db_delete(self, item_id: Union[str, int, UUID], aorm: OrmAsync):
        """Функция для удаления записи в БД"""
        return await aorm.delete_item(self.db_model, item_id, deep=self.deep_schema)

db_delete(item_id, aorm) async

Функция для удаления записи в БД

Source code in fastapi_accelerator/viewset.py
197
198
199
async def db_delete(self, item_id: Union[str, int, UUID], aorm: OrmAsync):
    """Функция для удаления записи в БД"""
    return await aorm.delete_item(self.db_model, item_id, deep=self.deep_schema)

delete()

API обработки

Source code in fastapi_accelerator/viewset.py
184
185
186
187
188
189
190
191
192
193
194
195
def delete(self):
    """API обработки"""

    @self.router.delete(
        f"{self.prefix}/{{item_id}}", tags=self.tags, dependencies=self.dependencies
    )
    async def delete_item(
        item_id: self.type_item_id, aorm: OrmAsync = Depends(AppOrm.aget_orm)
    ):
        return await self.db_delete(item_id, aorm)

    return delete_item

ViewSetList

Bases: BaseViewSet

Source code in fastapi_accelerator/viewset.py
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class ViewSetList(BaseViewSet):
    def _register_endpoint(self):
        self.list_paginator() if self.paginator_class else self.list()

    @staticmethod
    def get_offset(page: int, size: int) -> int:
        return (page - 1) * size

    def list_paginator(self):
        """Обработчик API с пагинацией"""

        @self.router.get(
            f"{self.prefix}", tags=self.tags, dependencies=self.dependencies
        )
        @cache_redis(self.cache_class, self.cache_ttl, self.cache)
        async def get_list_items_paginator(
            request: Request,
            page: int = Query(1, gt=0),
            size: int = Query(10, gt=0),
            aorm: OrmAsync = Depends(AppOrm.aget_orm),
        ) -> self.paginator_class.Schema:
            offset = self.get_offset(page, size)
            response = await aorm.get_list(
                select(self.db_model).order_by(self.name_pk).offset(offset).limit(size),
                deep=self.deep_schema,
                db_model=self.db_model,
            )
            return self.paginator_class.json(page, size, response)

        return get_list_items_paginator

    def list(self):
        """Обработчик API"""

        @self.router.get(
            f"{self.prefix}", tags=self.tags, dependencies=self.dependencies
        )
        @cache_redis(self.cache_class, self.cache_ttl, self.cache)
        async def get_list_items(
            request: Request,
            skip: int = Query(0, gte=0),
            limit: int = Query(100, gt=0),
            aorm: OrmAsync = Depends(AppOrm.aget_orm),
        ) -> List[self.pydantic_model]:
            return await aorm.get_list(
                select(self.db_model).offset(skip).limit(limit),
                deep=self.deep_schema,
                db_model=self.db_model,
            )

        return get_list_items

list()

Обработчик API

Source code in fastapi_accelerator/viewset.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def list(self):
    """Обработчик API"""

    @self.router.get(
        f"{self.prefix}", tags=self.tags, dependencies=self.dependencies
    )
    @cache_redis(self.cache_class, self.cache_ttl, self.cache)
    async def get_list_items(
        request: Request,
        skip: int = Query(0, gte=0),
        limit: int = Query(100, gt=0),
        aorm: OrmAsync = Depends(AppOrm.aget_orm),
    ) -> List[self.pydantic_model]:
        return await aorm.get_list(
            select(self.db_model).offset(skip).limit(limit),
            deep=self.deep_schema,
            db_model=self.db_model,
        )

    return get_list_items

list_paginator()

Обработчик API с пагинацией

Source code in fastapi_accelerator/viewset.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def list_paginator(self):
    """Обработчик API с пагинацией"""

    @self.router.get(
        f"{self.prefix}", tags=self.tags, dependencies=self.dependencies
    )
    @cache_redis(self.cache_class, self.cache_ttl, self.cache)
    async def get_list_items_paginator(
        request: Request,
        page: int = Query(1, gt=0),
        size: int = Query(10, gt=0),
        aorm: OrmAsync = Depends(AppOrm.aget_orm),
    ) -> self.paginator_class.Schema:
        offset = self.get_offset(page, size)
        response = await aorm.get_list(
            select(self.db_model).order_by(self.name_pk).offset(offset).limit(size),
            deep=self.deep_schema,
            db_model=self.db_model,
        )
        return self.paginator_class.json(page, size, response)

    return get_list_items_paginator

ViewSetUpdate

Bases: BaseViewSet

Source code in fastapi_accelerator/viewset.py
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
class ViewSetUpdate(BaseViewSet):
    def _register_endpoint(self):
        self.update()

    def update(self):
        """API обработки"""

        @self.router.put(
            f"{self.prefix}/{{item_id}}", tags=self.tags, dependencies=self.dependencies
        )
        async def update_item(
            item_id: self.type_item_id,
            item: self.pydantic_model,
            aorm: OrmAsync = Depends(AppOrm.aget_orm),
        ) -> self.pydantic_model:
            return await self.db_update(item_id, item, aorm)

        return update_item

    async def db_update(
        self, item_id: Union[str, int, UUID], item: Type[BaseModel], aorm: OrmAsync
    ) -> object:
        """Функция для обновление записи в БД"""
        return await aorm.update_item(
            self.db_model, item_id, item.dict(exclude_unset=True), deep=self.deep_schema
        )

db_update(item_id, item, aorm) async

Функция для обновление записи в БД

Source code in fastapi_accelerator/viewset.py
171
172
173
174
175
176
177
async def db_update(
    self, item_id: Union[str, int, UUID], item: Type[BaseModel], aorm: OrmAsync
) -> object:
    """Функция для обновление записи в БД"""
    return await aorm.update_item(
        self.db_model, item_id, item.dict(exclude_unset=True), deep=self.deep_schema
    )

update()

API обработки

Source code in fastapi_accelerator/viewset.py
156
157
158
159
160
161
162
163
164
165
166
167
168
169
def update(self):
    """API обработки"""

    @self.router.put(
        f"{self.prefix}/{{item_id}}", tags=self.tags, dependencies=self.dependencies
    )
    async def update_item(
        item_id: self.type_item_id,
        item: self.pydantic_model,
        aorm: OrmAsync = Depends(AppOrm.aget_orm),
    ) -> self.pydantic_model:
        return await self.db_update(item_id, item, aorm)

    return update_item

fastapi_accelerator.db

BaseDatabaseManager

Базовый класс для работы с РСУБД

Source code in fastapi_accelerator/db/dbsession.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class BaseDatabaseManager(metaclass=SingletonMeta):
    """Базовый класс для работы с РСУБД"""

    def __init__(
        self,
        database_url: str,
        *,
        pool_size=10,
        max_overflow=0,
        echo=True,
        DEV_STATUS: bool = False,
    ):
        self.database_url = None
        self.engine = None
        self.session = None
        self.Base = None
        self.adatabase_url = None
        self.aengine = None
        self.asession = None
        self._aBase = None
        self.DEV_STATUS = None

    def get_session() -> Generator[Session, None, None]:
        pass

    def get_session_transaction() -> Generator[Session, None, None]:
        pass

    async def aget_session() -> AsyncGenerator[AsyncSession, None]:
        pass

    async def aget_session_transaction() -> AsyncGenerator[AsyncSession, None]:
        pass

    def check_dev(self):
        """Проверять включенный режим разработки"""
        if not self.DEV_STATUS:
            raise ValueError("Такое действие можно выполнять только в DEV режиме.")

    def create_all(self):
        pass

    def drop_all(self):
        pass

    async def acreate_all(self):
        pass

    async def adrop_all(self):
        pass

check_dev()

Проверять включенный режим разработки

Source code in fastapi_accelerator/db/dbsession.py
49
50
51
52
def check_dev(self):
    """Проверять включенный режим разработки"""
    if not self.DEV_STATUS:
        raise ValueError("Такое действие можно выполнять только в DEV режиме.")

BaseOrm

Source code in fastapi_accelerator/db/dborm.py
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
class BaseOrm:

    def __init__(self, asession: AsyncSession):
        self.asession = asession

    async def _execute(
        self, query: Select, deep: bool = False, db_model: Type[T] | None = None
    ):
        """Выполнить Select запрос"""
        if not deep:
            response = await self.asession.execute(query)
            return response
        else:
            if not db_model:
                raise TypeError("Not set db_model")
            relationships = class_mapper(db_model).relationships
            options = [joinedload(getattr(db_model, rel.key)) for rel in relationships]
            response = await self.asession.execute(query.options(*options))
            return response

    async def get(
        self,
        query: Select,
        deep: bool = False,
        db_model: Type[T] | None = None,
    ) -> Optional[T]:
        """Получить объект по запросу"""
        response = await self._execute(query, deep, db_model)
        return response.scalar_one_or_none()

    async def get_list(
        self,
        query: Select,
        deep: bool = False,
        db_model: Type[T] | None = None,
    ) -> list[T]:
        """Получить список объектов по запросу"""
        response = await self._execute(query, deep, db_model)
        return response.scalars().all()

    async def update(self, query: Update, update_data: dict) -> T:
        """Обновить объекты по запросу"""
        query = query.values(**update_data).returning(query.table)
        result = await self.asession.execute(query)
        await self.asession.commit()
        return result.scalars().first()

    async def delete(self, query: Delete) -> bool:
        """Удалить объекты по запросу"""
        result = await self.asession.execute(query)
        await self.asession.commit()
        return result.rowcount > 0

delete(query) async

Удалить объекты по запросу

Source code in fastapi_accelerator/db/dborm.py
106
107
108
109
110
async def delete(self, query: Delete) -> bool:
    """Удалить объекты по запросу"""
    result = await self.asession.execute(query)
    await self.asession.commit()
    return result.rowcount > 0

get(query, deep=False, db_model=None) async

Получить объект по запросу

Source code in fastapi_accelerator/db/dborm.py
79
80
81
82
83
84
85
86
87
async def get(
    self,
    query: Select,
    deep: bool = False,
    db_model: Type[T] | None = None,
) -> Optional[T]:
    """Получить объект по запросу"""
    response = await self._execute(query, deep, db_model)
    return response.scalar_one_or_none()

get_list(query, deep=False, db_model=None) async

Получить список объектов по запросу

Source code in fastapi_accelerator/db/dborm.py
89
90
91
92
93
94
95
96
97
async def get_list(
    self,
    query: Select,
    deep: bool = False,
    db_model: Type[T] | None = None,
) -> list[T]:
    """Получить список объектов по запросу"""
    response = await self._execute(query, deep, db_model)
    return response.scalars().all()

update(query, update_data) async

Обновить объекты по запросу

Source code in fastapi_accelerator/db/dborm.py
 99
100
101
102
103
104
async def update(self, query: Update, update_data: dict) -> T:
    """Обновить объекты по запросу"""
    query = query.values(**update_data).returning(query.table)
    result = await self.asession.execute(query)
    await self.asession.commit()
    return result.scalars().first()

DatabaseAsyncSessionMixin

Bases: BaseDatabaseManager

Класс для асинхронной работы с РСУБД используя пул соединений

Source code in fastapi_accelerator/db/dbsession.py
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
class DatabaseAsyncSessionMixin(BaseDatabaseManager):
    """Класс для асинхронной работы с РСУБД используя пул соединений"""

    @classmethod
    async def aget_session(cls) -> AsyncGenerator[AsyncSession, None]:
        """Зависимость для получения сессии базы данных

        Получить сессию:

        async for asession in aget_orm():
            yield asession
        """
        async with cls.instance.asession() as asession:
            yield asession

    @classmethod
    async def aget_orm(cls) -> AsyncGenerator[OrmAsync, None]:
        """Зависимость для получения сессии базы данных с транзакцией

        Получить сессию:

        async for orm in aget_orm():
            yield orm
        """
        async for asession in cls.aget_session():
            yield OrmAsync(asession)

    @classmethod
    async def aget_session_transaction(cls) -> AsyncGenerator[AsyncSession, None]:
        """Зависимость для получения сессию в транзакции

        Получить сессию:

        async for asession in aget_session_transaction():
            yield asession
        """
        async with cls.instance.asession() as asession:
            async with asession.begin():
                yield asession

    @classmethod
    async def acreate_all(cls):
        cls.instance.check_dev()
        async with cls.instance.aengine.begin() as aconn:
            # await conn.run_sync(Base.metadata.drop_all)
            await aconn.run_sync(cls.instance._aBase.metadata.create_all)

    @classmethod
    async def dispose(cls):
        """Метод dispose() закрывает все неиспользуемые соединения
        в пуле соединений, связанном с данным engine.
        Это позволяет освободить ресурсы базы данных, когда они больше не нужны
        """
        await cls.instance.aengine.dispose()

aget_orm() async classmethod

Зависимость для получения сессии базы данных с транзакцией

Получить сессию:

async for orm in aget_orm(): yield orm

Source code in fastapi_accelerator/db/dbsession.py
141
142
143
144
145
146
147
148
149
150
151
@classmethod
async def aget_orm(cls) -> AsyncGenerator[OrmAsync, None]:
    """Зависимость для получения сессии базы данных с транзакцией

    Получить сессию:

    async for orm in aget_orm():
        yield orm
    """
    async for asession in cls.aget_session():
        yield OrmAsync(asession)

aget_session() async classmethod

Зависимость для получения сессии базы данных

Получить сессию:

async for asession in aget_orm(): yield asession

Source code in fastapi_accelerator/db/dbsession.py
129
130
131
132
133
134
135
136
137
138
139
@classmethod
async def aget_session(cls) -> AsyncGenerator[AsyncSession, None]:
    """Зависимость для получения сессии базы данных

    Получить сессию:

    async for asession in aget_orm():
        yield asession
    """
    async with cls.instance.asession() as asession:
        yield asession

aget_session_transaction() async classmethod

Зависимость для получения сессию в транзакции

Получить сессию:

async for asession in aget_session_transaction(): yield asession

Source code in fastapi_accelerator/db/dbsession.py
153
154
155
156
157
158
159
160
161
162
163
164
@classmethod
async def aget_session_transaction(cls) -> AsyncGenerator[AsyncSession, None]:
    """Зависимость для получения сессию в транзакции

    Получить сессию:

    async for asession in aget_session_transaction():
        yield asession
    """
    async with cls.instance.asession() as asession:
        async with asession.begin():
            yield asession

dispose() async classmethod

Метод dispose() закрывает все неиспользуемые соединения в пуле соединений, связанном с данным engine. Это позволяет освободить ресурсы базы данных, когда они больше не нужны

Source code in fastapi_accelerator/db/dbsession.py
173
174
175
176
177
178
179
@classmethod
async def dispose(cls):
    """Метод dispose() закрывает все неиспользуемые соединения
    в пуле соединений, связанном с данным engine.
    Это позволяет освободить ресурсы базы данных, когда они больше не нужны
    """
    await cls.instance.aengine.dispose()

DatabaseSyncSessionMixin

Bases: BaseDatabaseManager

Класс для синхронной работы с РСУБД

Source code in fastapi_accelerator/db/dbsession.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
class DatabaseSyncSessionMixin(BaseDatabaseManager):
    """Класс для синхронной работы с РСУБД"""

    @classmethod
    def get_session(cls) -> Generator[Session, None, None]:
        """Зависимость для получения сессии базы данных"""
        session = cls.instance.session()
        try:
            yield session
        finally:
            session.close()

    @classmethod
    def get_session_transaction(cls) -> Generator[Session, None, None]:
        """Зависимость для получения сессии базы данных с транзакцией"""
        session = cls.instance.session()
        try:
            with session.begin():
                yield session
        except Exception:
            session.rollback()
            raise
        finally:
            session.close()

    @classmethod
    def get_metadata(cls) -> MetaData:
        """Получаем обновленные метаданные"""
        metadata = MetaData()
        metadata.reflect(bind=cls.instance.engine)
        return metadata

    @classmethod
    def create_all(cls):
        """Создать все таблицы в БД"""
        cls.instance.check_dev()
        cls.instance.Base.metadata.create_all(bind=cls.instance.engine)

    @classmethod
    def drop_all(cls):
        """Удалить все таблицы из БД"""
        cls.instance.check_dev()
        cls.instance.get_metadata().drop_all(bind=cls.instance.engine)

    @classmethod
    def clear_all(cls, exclude_tables_name: list[str] = None):
        """Отчистить данные во всех таблицах

        exclude_tables_name: Список имён таблиц которые не нужно отчищать
        """
        cls.instance.check_dev()
        with cls.instance.session() as session:
            with session.begin():
                for table in reversed(cls.instance.get_metadata().sorted_tables):
                    if table.name in exclude_tables_name:
                        continue
                    session.execute(table.delete())

clear_all(exclude_tables_name=None) classmethod

Отчистить данные во всех таблицах

exclude_tables_name: Список имён таблиц которые не нужно отчищать

Source code in fastapi_accelerator/db/dbsession.py
111
112
113
114
115
116
117
118
119
120
121
122
123
@classmethod
def clear_all(cls, exclude_tables_name: list[str] = None):
    """Отчистить данные во всех таблицах

    exclude_tables_name: Список имён таблиц которые не нужно отчищать
    """
    cls.instance.check_dev()
    with cls.instance.session() as session:
        with session.begin():
            for table in reversed(cls.instance.get_metadata().sorted_tables):
                if table.name in exclude_tables_name:
                    continue
                session.execute(table.delete())

create_all() classmethod

Создать все таблицы в БД

Source code in fastapi_accelerator/db/dbsession.py
 99
100
101
102
103
@classmethod
def create_all(cls):
    """Создать все таблицы в БД"""
    cls.instance.check_dev()
    cls.instance.Base.metadata.create_all(bind=cls.instance.engine)

drop_all() classmethod

Удалить все таблицы из БД

Source code in fastapi_accelerator/db/dbsession.py
105
106
107
108
109
@classmethod
def drop_all(cls):
    """Удалить все таблицы из БД"""
    cls.instance.check_dev()
    cls.instance.get_metadata().drop_all(bind=cls.instance.engine)

get_metadata() classmethod

Получаем обновленные метаданные

Source code in fastapi_accelerator/db/dbsession.py
92
93
94
95
96
97
@classmethod
def get_metadata(cls) -> MetaData:
    """Получаем обновленные метаданные"""
    metadata = MetaData()
    metadata.reflect(bind=cls.instance.engine)
    return metadata

get_session() classmethod

Зависимость для получения сессии базы данных

Source code in fastapi_accelerator/db/dbsession.py
70
71
72
73
74
75
76
77
@classmethod
def get_session(cls) -> Generator[Session, None, None]:
    """Зависимость для получения сессии базы данных"""
    session = cls.instance.session()
    try:
        yield session
    finally:
        session.close()

get_session_transaction() classmethod

Зависимость для получения сессии базы данных с транзакцией

Source code in fastapi_accelerator/db/dbsession.py
79
80
81
82
83
84
85
86
87
88
89
90
@classmethod
def get_session_transaction(cls) -> Generator[Session, None, None]:
    """Зависимость для получения сессии базы данных с транзакцией"""
    session = cls.instance.session()
    try:
        with session.begin():
            yield session
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

OrmAsync

Bases: OrmAsyncItem

Взаимодействие с БД

Source code in fastapi_accelerator/db/dborm.py
265
266
267
268
class OrmAsync(OrmAsyncItem):
    """Взаимодействие с БД"""

    ...

OrmAsyncItem

Bases: BaseOrm

Логика для работы с одним элементом

Source code in fastapi_accelerator/db/dborm.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
class OrmAsyncItem(BaseOrm):
    """Логика для работы с одним элементом"""

    async def get_item(
        self, db_model: Type[T], item_id: Union[str, int, uuid.UUID], deep: bool = False
    ) -> Optional[T]:
        """Получить объект по PK"""
        if not deep:
            # Получить имя первичного ключа
            name_pk: str = get_pk(db_model)
            return await self.get(select(db_model).filter(name_pk == item_id))
        else:
            """Получить вложенный объект"""
            return await self.asession.get(
                db_model, item_id, options=[selectinload("*")]
            )

    async def create_item(self, obj: T, deep: bool = False) -> T:
        """Создать объект"""
        if not deep:
            self.asession.add(obj)
            await self.asession.commit()
            await self.asession.refresh(obj)
            return obj
        else:
            """Создать вложенный объект"""
            self.asession.add(obj)
            await self.asession.commit()
            return await self.eager_refresh(obj)

    async def update_item(
        self,
        db_model: Type[T],
        item_id: Union[str, int, uuid.UUID],
        update_item: dict,
        deep: bool = False,
    ) -> T:
        """Обновить объект по PK"""

        if not deep:
            # Получить имя первичного ключа
            name_pk: str = get_pk(db_model)
            return await self.update(
                update(db_model).filter(name_pk == item_id), update_item
            )
        else:
            """Обновить вложенный объект"""

            async def update_nested(obj: T, update_item: dict) -> T:
                """Функция рекурсивного обновления"""
                db_model = obj.__class__
                mapper = class_mapper(db_model)
                # Связи объекта с другими таблицами
                relationships_keys = {
                    r.local_remote_pairs[0][0].key: r.key for r in mapper.relationships
                }
                # Получить имя первичного ключа
                name_pk: str = get_pk(db_model)
                # Исключаем колонки с первичными ключами
                columns = [c.key for c in mapper.column_attrs if c.key != name_pk.name]
                for column_name in columns:
                    # Взять значение входного
                    update_value = update_item.get(column_name)
                    # Если это связь с другой таблицей
                    if overwrite_column_name := relationships_keys.get(column_name):
                        related_obj = getattr(obj, overwrite_column_name)
                        nested_obj = await update_nested(
                            related_obj, update_item.get(overwrite_column_name)
                        )
                        update_value = nested_obj
                        column_name = overwrite_column_name

                    setattr(obj, column_name, update_value)
                return obj

            # Получить объект
            obj = await self.asession.get(
                db_model, item_id, options=[selectinload("*")]
            )
            if not obj:
                return NoResultFound()  # Если объект не найден, возвращаем ошибку

            # Обновить объект
            update_obj = await update_nested(obj, update_item)
            # Применить обновление в БД
            self.asession.add(update_obj)
            await self.asession.commit()
            return update_obj

    async def delete_item(
        self, db_model: Type[T], item_id: Union[str, int, uuid.UUID], deep: bool = False
    ) -> bool:
        """Удалить объект по PK"""
        if not deep:
            # Получить имя первичного ключа
            name_pk: str = get_pk(db_model)
            return await self.delete(delete(db_model).filter(name_pk == item_id))
        else:
            """Удалить вложенный объект и его зависимости."""

            # Получаем объект с его зависимостями через selectinload
            obj = await self.asession.get(
                db_model, item_id, options=[selectinload("*")]
            )
            if not obj:
                return False  # Если объект не найден, возвращаем False

            async def delete_nested(
                tmp_obj: T,
                pre_model: Optional[Type[T]] = None,
            ):
                """Рекурсивное удаление всех зависимостей объекта."""
                mapper = class_mapper(tmp_obj.__class__)

                # Находим все связи объекта с другими таблицами
                for relationship in mapper.relationships:
                    # Не пытаемся удалить запись из предыдущей итерации стека
                    if (
                        pre_model
                        and relationship.target.name == pre_model.__table__.name
                    ):
                        continue
                    related_objs = getattr(tmp_obj, relationship.key)
                    if related_objs:
                        if relationship.uselist:
                            # Если это список объектов, удаляем их рекурсивно
                            for related_obj in related_objs:
                                await delete_nested(
                                    related_obj, pre_model=tmp_obj.__class__
                                )
                        else:
                            # Если это один объект, удаляем его рекурсивно
                            await delete_nested(
                                related_objs, pre_model=tmp_obj.__class__
                            )

                # Удаляем сам объект
                await self.asession.delete(tmp_obj)

            # Начинаем рекурсивное удаление с корневого объекта
            await delete_nested(obj)
            # Коммит транзакции
            await self.asession.commit()
            return True

    async def eager_refresh(self, obj: Type[T]) -> Type[T]:
        """Жадно загрузить все связанные записи для данного объекта."""
        return await self.asession.get(
            type(obj), get_pk(obj), options=[selectinload("*")]
        )

create_item(obj, deep=False) async

Создать объект

Source code in fastapi_accelerator/db/dborm.py
130
131
132
133
134
135
136
137
138
139
140
141
async def create_item(self, obj: T, deep: bool = False) -> T:
    """Создать объект"""
    if not deep:
        self.asession.add(obj)
        await self.asession.commit()
        await self.asession.refresh(obj)
        return obj
    else:
        """Создать вложенный объект"""
        self.asession.add(obj)
        await self.asession.commit()
        return await self.eager_refresh(obj)

delete_item(db_model, item_id, deep=False) async

Удалить объект по PK

Source code in fastapi_accelerator/db/dborm.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
async def delete_item(
    self, db_model: Type[T], item_id: Union[str, int, uuid.UUID], deep: bool = False
) -> bool:
    """Удалить объект по PK"""
    if not deep:
        # Получить имя первичного ключа
        name_pk: str = get_pk(db_model)
        return await self.delete(delete(db_model).filter(name_pk == item_id))
    else:
        """Удалить вложенный объект и его зависимости."""

        # Получаем объект с его зависимостями через selectinload
        obj = await self.asession.get(
            db_model, item_id, options=[selectinload("*")]
        )
        if not obj:
            return False  # Если объект не найден, возвращаем False

        async def delete_nested(
            tmp_obj: T,
            pre_model: Optional[Type[T]] = None,
        ):
            """Рекурсивное удаление всех зависимостей объекта."""
            mapper = class_mapper(tmp_obj.__class__)

            # Находим все связи объекта с другими таблицами
            for relationship in mapper.relationships:
                # Не пытаемся удалить запись из предыдущей итерации стека
                if (
                    pre_model
                    and relationship.target.name == pre_model.__table__.name
                ):
                    continue
                related_objs = getattr(tmp_obj, relationship.key)
                if related_objs:
                    if relationship.uselist:
                        # Если это список объектов, удаляем их рекурсивно
                        for related_obj in related_objs:
                            await delete_nested(
                                related_obj, pre_model=tmp_obj.__class__
                            )
                    else:
                        # Если это один объект, удаляем его рекурсивно
                        await delete_nested(
                            related_objs, pre_model=tmp_obj.__class__
                        )

            # Удаляем сам объект
            await self.asession.delete(tmp_obj)

        # Начинаем рекурсивное удаление с корневого объекта
        await delete_nested(obj)
        # Коммит транзакции
        await self.asession.commit()
        return True

eager_refresh(obj) async

Жадно загрузить все связанные записи для данного объекта.

Source code in fastapi_accelerator/db/dborm.py
258
259
260
261
262
async def eager_refresh(self, obj: Type[T]) -> Type[T]:
    """Жадно загрузить все связанные записи для данного объекта."""
    return await self.asession.get(
        type(obj), get_pk(obj), options=[selectinload("*")]
    )

get_item(db_model, item_id, deep=False) async

Получить объект по PK

Source code in fastapi_accelerator/db/dborm.py
116
117
118
119
120
121
122
123
124
125
126
127
128
async def get_item(
    self, db_model: Type[T], item_id: Union[str, int, uuid.UUID], deep: bool = False
) -> Optional[T]:
    """Получить объект по PK"""
    if not deep:
        # Получить имя первичного ключа
        name_pk: str = get_pk(db_model)
        return await self.get(select(db_model).filter(name_pk == item_id))
    else:
        """Получить вложенный объект"""
        return await self.asession.get(
            db_model, item_id, options=[selectinload("*")]
        )

update_item(db_model, item_id, update_item, deep=False) async

Обновить объект по PK

Source code in fastapi_accelerator/db/dborm.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
async def update_item(
    self,
    db_model: Type[T],
    item_id: Union[str, int, uuid.UUID],
    update_item: dict,
    deep: bool = False,
) -> T:
    """Обновить объект по PK"""

    if not deep:
        # Получить имя первичного ключа
        name_pk: str = get_pk(db_model)
        return await self.update(
            update(db_model).filter(name_pk == item_id), update_item
        )
    else:
        """Обновить вложенный объект"""

        async def update_nested(obj: T, update_item: dict) -> T:
            """Функция рекурсивного обновления"""
            db_model = obj.__class__
            mapper = class_mapper(db_model)
            # Связи объекта с другими таблицами
            relationships_keys = {
                r.local_remote_pairs[0][0].key: r.key for r in mapper.relationships
            }
            # Получить имя первичного ключа
            name_pk: str = get_pk(db_model)
            # Исключаем колонки с первичными ключами
            columns = [c.key for c in mapper.column_attrs if c.key != name_pk.name]
            for column_name in columns:
                # Взять значение входного
                update_value = update_item.get(column_name)
                # Если это связь с другой таблицей
                if overwrite_column_name := relationships_keys.get(column_name):
                    related_obj = getattr(obj, overwrite_column_name)
                    nested_obj = await update_nested(
                        related_obj, update_item.get(overwrite_column_name)
                    )
                    update_value = nested_obj
                    column_name = overwrite_column_name

                setattr(obj, column_name, update_value)
            return obj

        # Получить объект
        obj = await self.asession.get(
            db_model, item_id, options=[selectinload("*")]
        )
        if not obj:
            return NoResultFound()  # Если объект не найден, возвращаем ошибку

        # Обновить объект
        update_obj = await update_nested(obj, update_item)
        # Применить обновление в БД
        self.asession.add(update_obj)
        await self.asession.commit()
        return update_obj

SingletonMeta

Bases: type

Мета класс для реализации паттерна Одиночка

Source code in fastapi_accelerator/utils.py
 6
 7
 8
 9
10
11
12
13
14
class SingletonMeta(type):
    """Мета класс для реализации паттерна Одиночка"""

    instance = None

    def __call__(cls, *args, **kwargs):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kwargs)
        return cls.instance

deep_instance(db_model, pydantic_data)

Создает экземпляр SQLAlchemy модели из словаря. Если словарь содержит вложенные данные для связанных объектов, они также будут созданы.

Source code in fastapi_accelerator/db/dborm.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def deep_instance(db_model: Type[T], pydantic_data: BaseModel) -> T:
    """
    Создает экземпляр SQLAlchemy модели из словаря.
    Если словарь содержит вложенные данные для связанных объектов, они также будут созданы.
    """
    kwargs = {}
    for field, value in pydantic_data:
        # Если значение также является словарем, создаем вложенный объект
        if isinstance(value, BaseModel):
            # Получаем тип модели для связанного объекта
            related_model = getattr(db_model, field).property.mapper.class_
            value = deep_instance(related_model, value)
        kwargs[field] = value
    return db_model(**kwargs)

get_pk(db_model)

Получить первичный ключ у модели

Source code in fastapi_accelerator/db/dborm.py
54
55
56
def get_pk(db_model: Type[T]) -> Any:
    """Получить первичный ключ у модели"""
    return getattr(db_model, db_model.__table__.primary_key.columns.values()[0].name)

fastapi_accelerator.integration

ApiHTTP

Bases: NamedTuple

Stores API connection details and client.

Source code in fastapi_accelerator/integration/http_integration.py
31
32
33
34
35
36
37
38
class ApiHTTP(NamedTuple):
    """Stores API connection details and client."""

    credentials: dict | None  # Authentication credentials
    url: ParseResult  # Parsed URL of the API endpoint
    version: str  # API version
    httpmethod: str  # Use HTTP method
    client: httpx.AsyncClient | None  # HTTP client for making requests (None in tests)

BaseIntegration

Базовый класс для интеграций

Source code in fastapi_accelerator/integration/base_integration.py
12
13
14
15
class BaseIntegration:
    """Базовый класс для интеграций"""

    ...

BaseStabilityPattern

Базовый класс для реализации паттерна стабильности

Source code in fastapi_accelerator/integration/stability_patterns.py
64
65
66
67
68
69
70
71
72
73
74
75
76
class BaseStabilityPattern:
    """Базовый класс для реализации паттерна стабильности"""

    async def run(self, func: Callable[..., Coroutine]) -> Any: ...

    def __call__(
        self, func: Callable[..., Awaitable[R]]
    ) -> Callable[..., Awaitable[R]]:
        @wraps(func)
        async def wrapper(*args, **kwargs) -> R:
            return await self.run(lambda: func(*args, **kwargs))

        return wrapper

CircuitBreakerError

Bases: StabilityError

Ошибка, возникающая при срабатывании предохранителя.

Source code in fastapi_accelerator/integration/stability_patterns.py
29
30
31
32
33
34
35
36
37
class CircuitBreakerError(StabilityError):
    """Ошибка, возникающая при срабатывании предохранителя."""

    # Этот статус сигнализирует, что сервер временно не может обработать запрос
    # из-за перегрузки или проведения технических работ.
    http_status = 503

    def __init__(self, message="Circuit Breaker в состоянии OPEN."):
        super().__init__(message)

EndpointsDeclaration

Базовый класс для объявления интеграций с внешними API.

Этот класс предоставляет платформу для определения и управления интеграциями API, включая обработку аутентификации и настройку базового URL.

Source code in fastapi_accelerator/integration/http_integration.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class EndpointsDeclaration:
    """
    Базовый класс для объявления интеграций с внешними API.

    Этот класс предоставляет платформу для определения и управления интеграциями API,
    включая обработку аутентификации и настройку базового URL.
    """

    integration: IntegrationHTTP | None = None

    def __init__(self, base_url: URL = "", credentials: dict | None = None):
        """
        Аргументы:
            base_url (URL): Базовый URL для API. Это может быть доменное имя или адрес в формате host:port.
            credentials (dict | None): Доступные учетные данные для аутентификации в внешней системе.

        Примечание:
            Рекомендуется задавать base_url через переменные окружения, чтобы избежать проблем с конфигурацией
            на разных уровнях (например, на production-среде) из-за межсетевых экранов или других факторов.
        """
        self.base_url = base_url
        self.credentials = credentials

    class Schema:
        """Схема Pydantic для успешных ответов.

        Эта схема описывает структуру данных, которые ожидаются в успешных ответах от REST API.
        """

    class SchemaError:
        """Схема Pydantic для неуспешных ответов.

        Эта схема описывает структуру данных, которые могут быть возвращены
        в случае ошибки при взаимодействии с REST API.
        """

Schema

Схема Pydantic для успешных ответов.

Эта схема описывает структуру данных, которые ожидаются в успешных ответах от REST API.

Source code in fastapi_accelerator/integration/http_integration.py
194
195
196
197
198
class Schema:
    """Схема Pydantic для успешных ответов.

    Эта схема описывает структуру данных, которые ожидаются в успешных ответах от REST API.
    """

SchemaError

Схема Pydantic для неуспешных ответов.

Эта схема описывает структуру данных, которые могут быть возвращены в случае ошибки при взаимодействии с REST API.

Source code in fastapi_accelerator/integration/http_integration.py
200
201
202
203
204
205
class SchemaError:
    """Схема Pydantic для неуспешных ответов.

    Эта схема описывает структуру данных, которые могут быть возвращены
    в случае ошибки при взаимодействии с REST API.
    """

__init__(base_url='', credentials=None)

Аргументы

base_url (URL): Базовый URL для API. Это может быть доменное имя или адрес в формате host:port. credentials (dict | None): Доступные учетные данные для аутентификации в внешней системе.

Примечание

Рекомендуется задавать base_url через переменные окружения, чтобы избежать проблем с конфигурацией на разных уровнях (например, на production-среде) из-за межсетевых экранов или других факторов.

Source code in fastapi_accelerator/integration/http_integration.py
181
182
183
184
185
186
187
188
189
190
191
192
def __init__(self, base_url: URL = "", credentials: dict | None = None):
    """
    Аргументы:
        base_url (URL): Базовый URL для API. Это может быть доменное имя или адрес в формате host:port.
        credentials (dict | None): Доступные учетные данные для аутентификации в внешней системе.

    Примечание:
        Рекомендуется задавать base_url через переменные окружения, чтобы избежать проблем с конфигурацией
        на разных уровнях (например, на production-среде) из-за межсетевых экранов или других факторов.
    """
    self.base_url = base_url
    self.credentials = credentials

HTTPMethod

Bases: str, Enum

Доступные HTTP методы

Source code in fastapi_accelerator/integration/http_integration.py
21
22
23
24
25
26
27
28
class HTTPMethod(str, Enum):
    """Доступные HTTP методы"""

    get = "GET"
    post = "POST"
    put = "PUT"
    patch = "PATCH"
    delete = "DELETE"

IntegrationHTTP

Bases: BaseIntegration

Класс для интеграции с сервисами REST API.

Этот класс используется для регистрации конечных точек в классе EndpointsDeclaration используя декоратор @endpoint. Это обеспечивает структурированный способ определения и интеграция с API.

Source code in fastapi_accelerator/integration/http_integration.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
class IntegrationHTTP(BaseIntegration):
    """
    Класс для интеграции с сервисами REST API.

    Этот класс используется для регистрации конечных точек в классе `EndpointsDeclaration`
    используя декоратор `@endpoint`. Это обеспечивает структурированный способ определения
    и интеграция с API.
    """

    def __init__(self, name: str, doc: str):
        # Документаци интеграции
        self._doc = doc
        # Информация о классе
        self._class_info = {
            "name": self.__class__.__name__,
            "docstring": self._doc,
            "methods": [],
        }
        # Человеоко понятное имя
        self.name = name

    def _add_integrations_method(
        self,
        func: Callable,
        path: str,
        version: str,
        docurl: str,
        httpmethod: HTTPMethod,
    ):
        """Добавьте документацию по методу интеграции."""
        # Добавить документацию функции
        self._class_info["methods"].append(
            {
                "name": func.__name__,
                "docstring": getattr(func, "__doc__", ""),
                "annotations": {
                    k: v.__name__ if hasattr(v, "__name__") else str(v)
                    for k, v in func.__annotations__.items()
                    # Исключаем аргумент api
                    if k not in ("api")
                },
                "path": path,
                "httpmethod": httpmethod,
                "version": version,
                "docurl": docurl,
            }
        )

    @property
    def docs(self) -> dict:
        """Получите документацию по этой интеграции."""
        return self._class_info

    def endpoint(
        self, httpmethod: HTTPMethod, path: str, version: str, docurl: str
    ) -> Callable[[Callable[..., Awaitable[R]]], Callable[..., Awaitable[R]]]:
        """
        Декоратор для интеграции REST-эндпоинтов с HTTP-запросами.

        Этот декоратор применяется к методам класса, наследующего от `IntegrationHTTP`,
        и обеспечивает интеграцию с внешними REST-эндпоинтами через HTTP-запросы.

        Args:
            path (str): Путь, на который отправляется запрос.
            version (str): Версия этого эндпоинта.
            docurl (str): Ссылка на документацию эндпоинта.
            httpmethod (HTTPMethod): Какой HTTP метод испольузеится.

        Returns:
            Callable[..., Awaitable[R]]: Декорированная функция, готовая для интеграции с REST-эндпоинтом.

        Примечания:
            - Метод, к которому применяется декоратор, должен быть определен в классе, наследующем от `IntegrationHTTP`
            - Для декорируемого метода должен быть указан возвращаемый тип аннотации.
            - Обязательные аргументы должны быть указаны в декораторе.

        Raises:
            ValueError: Если для интеграционной функции не указан тип возвращаемого значения.
            TypeError: Вызыван метод не у экземпляра класса

        """

        def decorator(func: Callable[..., Awaitable[R]]) -> Callable[..., Awaitable[R]]:

            # Проверить что у функции указан тип ответа
            return_type = func.__annotations__.get("return")
            if not return_type:
                raise ValueError(
                    "Return type must be specified for integration function"
                )

            # Добавить метод в хранилище
            self._add_integrations_method(func, path, version, docurl, httpmethod)

            @wraps(func)
            async def wrapper(
                self_endpoint: EndpointsDeclaration, *args, **kwargs
            ) -> R:
                if not isinstance(self_endpoint, EndpointsDeclaration):
                    raise TypeError(
                        "Method must be called on an instance, not the class itself"
                    )

                # Полный путь до endpoint
                url: ParseResult = urlparse(self_endpoint.base_url + path)
                try:

                    response = await wraper_endpoint(
                        self_endpoint,
                        func,
                        url,
                        version,
                        httpmethod,
                        *args,
                        **kwargs,
                    )
                    # Конвертировать ответ в ожидаемый тип
                    return convert_response(return_type, response)
                except StabilityError as e:
                    # Если возникло исключение в обработчиках стабльности
                    raise HTTPException(
                        status_code=getattr(e, "http_status", 500),
                        detail=f"{e.__class__.__name__}: {self.__class__.__name__}.{func.__name__}: {e.message}",
                    )

            return wrapper

        return decorator

docs: dict property

Получите документацию по этой интеграции.

endpoint(httpmethod, path, version, docurl)

Декоратор для интеграции REST-эндпоинтов с HTTP-запросами.

Этот декоратор применяется к методам класса, наследующего от IntegrationHTTP, и обеспечивает интеграцию с внешними REST-эндпоинтами через HTTP-запросы.

Parameters:

Name Type Description Default
path str

Путь, на который отправляется запрос.

required
version str

Версия этого эндпоинта.

required
docurl str

Ссылка на документацию эндпоинта.

required
httpmethod HTTPMethod

Какой HTTP метод испольузеится.

required

Returns:

Type Description
Callable[[Callable[..., Awaitable[R]]], Callable[..., Awaitable[R]]]

Callable[..., Awaitable[R]]: Декорированная функция, готовая для интеграции с REST-эндпоинтом.

Примечания
  • Метод, к которому применяется декоратор, должен быть определен в классе, наследующем от IntegrationHTTP
  • Для декорируемого метода должен быть указан возвращаемый тип аннотации.
  • Обязательные аргументы должны быть указаны в декораторе.

Raises:

Type Description
ValueError

Если для интеграционной функции не указан тип возвращаемого значения.

TypeError

Вызыван метод не у экземпляра класса

Source code in fastapi_accelerator/integration/http_integration.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
def endpoint(
    self, httpmethod: HTTPMethod, path: str, version: str, docurl: str
) -> Callable[[Callable[..., Awaitable[R]]], Callable[..., Awaitable[R]]]:
    """
    Декоратор для интеграции REST-эндпоинтов с HTTP-запросами.

    Этот декоратор применяется к методам класса, наследующего от `IntegrationHTTP`,
    и обеспечивает интеграцию с внешними REST-эндпоинтами через HTTP-запросы.

    Args:
        path (str): Путь, на который отправляется запрос.
        version (str): Версия этого эндпоинта.
        docurl (str): Ссылка на документацию эндпоинта.
        httpmethod (HTTPMethod): Какой HTTP метод испольузеится.

    Returns:
        Callable[..., Awaitable[R]]: Декорированная функция, готовая для интеграции с REST-эндпоинтом.

    Примечания:
        - Метод, к которому применяется декоратор, должен быть определен в классе, наследующем от `IntegrationHTTP`
        - Для декорируемого метода должен быть указан возвращаемый тип аннотации.
        - Обязательные аргументы должны быть указаны в декораторе.

    Raises:
        ValueError: Если для интеграционной функции не указан тип возвращаемого значения.
        TypeError: Вызыван метод не у экземпляра класса

    """

    def decorator(func: Callable[..., Awaitable[R]]) -> Callable[..., Awaitable[R]]:

        # Проверить что у функции указан тип ответа
        return_type = func.__annotations__.get("return")
        if not return_type:
            raise ValueError(
                "Return type must be specified for integration function"
            )

        # Добавить метод в хранилище
        self._add_integrations_method(func, path, version, docurl, httpmethod)

        @wraps(func)
        async def wrapper(
            self_endpoint: EndpointsDeclaration, *args, **kwargs
        ) -> R:
            if not isinstance(self_endpoint, EndpointsDeclaration):
                raise TypeError(
                    "Method must be called on an instance, not the class itself"
                )

            # Полный путь до endpoint
            url: ParseResult = urlparse(self_endpoint.base_url + path)
            try:

                response = await wraper_endpoint(
                    self_endpoint,
                    func,
                    url,
                    version,
                    httpmethod,
                    *args,
                    **kwargs,
                )
                # Конвертировать ответ в ожидаемый тип
                return convert_response(return_type, response)
            except StabilityError as e:
                # Если возникло исключение в обработчиках стабльности
                raise HTTPException(
                    status_code=getattr(e, "http_status", 500),
                    detail=f"{e.__class__.__name__}: {self.__class__.__name__}.{func.__name__}: {e.message}",
                )

        return wrapper

    return decorator

MaxRetriesExceededError

Bases: StabilityError

Ошибка, возникающая при превышении максимального числа попыток.

Source code in fastapi_accelerator/integration/stability_patterns.py
40
41
42
43
44
45
46
47
48
class MaxRetriesExceededError(StabilityError):
    """Ошибка, возникающая при превышении максимального числа попыток."""

    # Этот статус указывает, что клиент отправил слишком много запросов за короткий период времени.
    http_status = 429

    def __init__(self, max_attempts: int):
        message = f"Превышено максимальное число попыток: {max_attempts}."
        super().__init__(message)

StabilityError

Bases: Exception

Базовый класс для ошибок стабильности.

Source code in fastapi_accelerator/integration/stability_patterns.py
 9
10
11
12
13
14
15
class StabilityError(Exception):
    """Базовый класс для ошибок стабильности."""

    http_status = None

    def __init__(self, message) -> None:
        self.message = message

StabilityTimeoutError

Bases: StabilityError

Ошибка, возникающая при превышении времени ожидания.

Source code in fastapi_accelerator/integration/stability_patterns.py
18
19
20
21
22
23
24
25
26
class StabilityTimeoutError(StabilityError):
    """Ошибка, возникающая при превышении времени ожидания."""

    # Этот статус указывает, что сервер, выступающий в роли шлюза или прокси,
    # не получил своевременный ответ от вышестоящего сервера.
    http_status = 504

    def __init__(self, message="Время ожидания истекло."):
        super().__init__(message)

ThrottlingError

Bases: StabilityError

Ошибка, возникающая при превышении лимита запросов.

Source code in fastapi_accelerator/integration/stability_patterns.py
51
52
53
54
55
56
57
58
class ThrottlingError(StabilityError):
    """Ошибка, возникающая при превышении лимита запросов."""

    # Этот статус указывает, что клиент отправил слишком много запросов за короткий период времени.
    http_status = 429

    def __init__(self, message="Превышен лимит запросов в секунду."):
        super().__init__(message)

sp

Содержит в себе классы реализующие паттерны стабильности.

Рекомендуемый порядок применения паттернов стабильности:

Fallback должен быть самым внутренним, так как он предоставляет альтернативное поведение в случае сбоя основной функции.

Fallback(alternative_func)

Timeout следует применять сразу после Fallback, чтобы ограничить время выполнения как основной функции, так и резервной.

Timeout(seconds=timeout_seconds)

CircuitBreaker идет следующим, чтобы предотвратить повторные вызовы, если функция постоянно завершается неудачно или по таймауту.

CircuitBreaker(fail_threshold, reset_timeout)

Retry следует за CircuitBreaker, чтобы попытаться выполнить операцию несколько раз, если CircuitBreaker позволяет это.

RetryPattern(max_attempts=max_attempts, delay=timedelta(seconds=delay_seconds))

Throttling применяется в последнюю очередь, чтобы ограничить частоту вызовов всей обёрнутой функциональности.

Throttling(calls_per_second=calls_per_second)

Source code in fastapi_accelerator/integration/stability_patterns.py
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class sp:
    """
    Содержит в себе классы реализующие паттерны стабильности.

    Рекомендуемый порядок применения паттернов стабильности:

    Fallback должен быть самым внутренним, так как он предоставляет
    альтернативное поведение в случае сбоя основной функции.
    > Fallback(alternative_func)

    Timeout следует применять сразу после Fallback, чтобы ограничить время выполнения как
    основной функции, так и резервной.
    > Timeout(seconds=timeout_seconds)

    CircuitBreaker идет следующим, чтобы предотвратить повторные вызовы, если функция постоянно
    завершается неудачно или по таймауту.
    > CircuitBreaker(fail_threshold, reset_timeout)

    Retry следует за CircuitBreaker, чтобы попытаться выполнить
    операцию несколько раз, если CircuitBreaker позволяет это.
    > RetryPattern(max_attempts=max_attempts, delay=timedelta(seconds=delay_seconds))

    Throttling применяется в последнюю очередь, чтобы ограничить частоту
    вызовов всей обёрнутой функциональности.
    > Throttling(calls_per_second=calls_per_second)
    """

    class Fallback(BaseStabilityPattern):
        """(Резервный вариант) - Предоставляет альтернативный путь выполнения в случае сбоя основного.
        Позволяет системе деградировать контролируемо, а не падать с ошибкой."""

        def __init__(self, alternative_func: Coroutine) -> None:
            """
            alternative_func: Функция которая вызолится при возникновение исключения
            """
            self.alternative_func = alternative_func

        async def run(self, func: Callable[..., Coroutine]) -> Any:
            try:
                return await func()
            except Exception:
                return await self.alternative_func()

    class Timeout(BaseStabilityPattern):
        """(Тайм-аут) - Ограничивает время ожидания ответа от внешнего сервиса. Предотвращает блокировку ресурсов при
        зависании вызова."""

        def __init__(self, seconds: int = 10) -> None:
            """
            seconds: Через сколько секунд прервать выполнение запроса
            """
            self.seconds = seconds

        async def run(self, func: Callable[..., Coroutine]) -> Any:
            try:
                return await asyncio.wait_for(func(), timeout=self.seconds)
            except asyncio.TimeoutError:
                raise StabilityTimeoutError("Function call timed out")

    class CircuitBreaker(BaseStabilityPattern):
        """(Предохранитель) - Отслеживает количество ошибок при вызове внешнего сервиса. При превышении лимита временно
        блокирует вызов, предотвращая каскадные сбои.


        Состояния Circuit Breaker:

        -   **Close** - Идет передача запросов между сервисами и подсчет количества сбоев.
            Если число сбоев за заданный интервал времени превышает пороговое значение,
            выключатель переводится в состояние Open.

        -   **Open** - Запросы от исходного сервиса немедленно возвращаются с ошибкой.
            По истечении заданного тайм-аута выключатель переводится в состояние Half-Open.

        -   **Half-open** - Выключатель пропускает ограниченное количество запросов от исходного сервиса и
            подсчитывает число успешных запросов. Если необходимое количество достигнуто, выключатель переходит
            в состояние Closed, если нет — возвращается в статус Open.
        """

        def __init__(self, fail_threshold: int = 3, reset_timeout: float = 10):
            """
            fail_threshold: Пороговое значения числа сбоев
            reset_timeout: Период сброса подсчета количества сбоев
            """
            self.fail_threshold = fail_threshold
            self.reset_timeout = reset_timeout
            self._failures = 0
            self._last_failure_time = None
            self._state = "CLOSED"

        async def run(self, func: Callable[..., Coroutine]) -> Any:
            if self._state == "OPEN":
                if (
                    asyncio.get_event_loop().time() - self._last_failure_time
                    > self.reset_timeout
                ):
                    self._state = "HALF-OPEN"
                else:
                    raise CircuitBreakerError("Circuit is OPEN")

            try:
                result = await func()
                if self._state == "HALF-OPEN":
                    self._state = "CLOSED"
                    self._failures = 0
                return result
            except Exception as e:
                self._failures += 1
                if self._failures >= self.fail_threshold:
                    self._state = "OPEN"
                    self._last_failure_time = asyncio.get_event_loop().time()
                raise e

    class RetryPattern(BaseStabilityPattern):
        """(Паттерн повторения) - Автоматически повторяет запрос при возникновении временной ошибки."""

        def __init__(
            self, max_attempts: int = 3, delay: timedelta = timedelta(seconds=1)
        ) -> None:
            """
            max_attempts: Сколько раз попытаться повторить запрос
            delay: Задержка между попытками запросов
            """
            self.max_attempts = max_attempts
            self.delay = delay

        async def run(self, func: Callable[..., Coroutine]) -> Any:
            attempts = 0
            while attempts < self.max_attempts:
                try:
                    return await func()
                except Exception as e:
                    attempts += 1
                    if attempts == self.max_attempts:
                        raise MaxRetriesExceededError(self.max_attempts) from e
                    await asyncio.sleep(self.delay.total_seconds())

    class Throttling(BaseStabilityPattern):
        """(Регулирование) - Ограничивает количество запросов к ресурсу для предотвращения его перегрузки.
        Защищает систему от шторма запросов."""

        def __init__(self, calls_per_second: int = 1_000):
            """
            calls_per_second: Сколько разрешено запросов в секунду
            """
            self.calls_per_second = calls_per_second
            self._last_called = 0
            self._interval = 1 / calls_per_second

        async def run(self, func: Callable[..., Coroutine]) -> Any:
            current_time = asyncio.get_event_loop().time()
            time_passed = current_time - self._last_called
            # Вычисляем, сколько времени нужно подождать
            if time_passed < self._interval:
                raise ThrottlingError("Превышен лимит запросов в секунду.")
            # Обновляем время последнего вызова
            self._last_called = asyncio.get_event_loop().time()
            # Вызываем переданную функцию
            return await func()

CircuitBreaker

Bases: BaseStabilityPattern

(Предохранитель) - Отслеживает количество ошибок при вызове внешнего сервиса. При превышении лимита временно блокирует вызов, предотвращая каскадные сбои.

Состояния Circuit Breaker:

  • Close - Идет передача запросов между сервисами и подсчет количества сбоев. Если число сбоев за заданный интервал времени превышает пороговое значение, выключатель переводится в состояние Open.

  • Open - Запросы от исходного сервиса немедленно возвращаются с ошибкой. По истечении заданного тайм-аута выключатель переводится в состояние Half-Open.

  • Half-open - Выключатель пропускает ограниченное количество запросов от исходного сервиса и подсчитывает число успешных запросов. Если необходимое количество достигнуто, выключатель переходит в состояние Closed, если нет — возвращается в статус Open.

Source code in fastapi_accelerator/integration/stability_patterns.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
class CircuitBreaker(BaseStabilityPattern):
    """(Предохранитель) - Отслеживает количество ошибок при вызове внешнего сервиса. При превышении лимита временно
    блокирует вызов, предотвращая каскадные сбои.


    Состояния Circuit Breaker:

    -   **Close** - Идет передача запросов между сервисами и подсчет количества сбоев.
        Если число сбоев за заданный интервал времени превышает пороговое значение,
        выключатель переводится в состояние Open.

    -   **Open** - Запросы от исходного сервиса немедленно возвращаются с ошибкой.
        По истечении заданного тайм-аута выключатель переводится в состояние Half-Open.

    -   **Half-open** - Выключатель пропускает ограниченное количество запросов от исходного сервиса и
        подсчитывает число успешных запросов. Если необходимое количество достигнуто, выключатель переходит
        в состояние Closed, если нет — возвращается в статус Open.
    """

    def __init__(self, fail_threshold: int = 3, reset_timeout: float = 10):
        """
        fail_threshold: Пороговое значения числа сбоев
        reset_timeout: Период сброса подсчета количества сбоев
        """
        self.fail_threshold = fail_threshold
        self.reset_timeout = reset_timeout
        self._failures = 0
        self._last_failure_time = None
        self._state = "CLOSED"

    async def run(self, func: Callable[..., Coroutine]) -> Any:
        if self._state == "OPEN":
            if (
                asyncio.get_event_loop().time() - self._last_failure_time
                > self.reset_timeout
            ):
                self._state = "HALF-OPEN"
            else:
                raise CircuitBreakerError("Circuit is OPEN")

        try:
            result = await func()
            if self._state == "HALF-OPEN":
                self._state = "CLOSED"
                self._failures = 0
            return result
        except Exception as e:
            self._failures += 1
            if self._failures >= self.fail_threshold:
                self._state = "OPEN"
                self._last_failure_time = asyncio.get_event_loop().time()
            raise e
__init__(fail_threshold=3, reset_timeout=10)

fail_threshold: Пороговое значения числа сбоев reset_timeout: Период сброса подсчета количества сбоев

Source code in fastapi_accelerator/integration/stability_patterns.py
157
158
159
160
161
162
163
164
165
166
def __init__(self, fail_threshold: int = 3, reset_timeout: float = 10):
    """
    fail_threshold: Пороговое значения числа сбоев
    reset_timeout: Период сброса подсчета количества сбоев
    """
    self.fail_threshold = fail_threshold
    self.reset_timeout = reset_timeout
    self._failures = 0
    self._last_failure_time = None
    self._state = "CLOSED"

Fallback

Bases: BaseStabilityPattern

(Резервный вариант) - Предоставляет альтернативный путь выполнения в случае сбоя основного. Позволяет системе деградировать контролируемо, а не падать с ошибкой.

Source code in fastapi_accelerator/integration/stability_patterns.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class Fallback(BaseStabilityPattern):
    """(Резервный вариант) - Предоставляет альтернативный путь выполнения в случае сбоя основного.
    Позволяет системе деградировать контролируемо, а не падать с ошибкой."""

    def __init__(self, alternative_func: Coroutine) -> None:
        """
        alternative_func: Функция которая вызолится при возникновение исключения
        """
        self.alternative_func = alternative_func

    async def run(self, func: Callable[..., Coroutine]) -> Any:
        try:
            return await func()
        except Exception:
            return await self.alternative_func()
__init__(alternative_func)

alternative_func: Функция которая вызолится при возникновение исключения

Source code in fastapi_accelerator/integration/stability_patterns.py
110
111
112
113
114
def __init__(self, alternative_func: Coroutine) -> None:
    """
    alternative_func: Функция которая вызолится при возникновение исключения
    """
    self.alternative_func = alternative_func

RetryPattern

Bases: BaseStabilityPattern

(Паттерн повторения) - Автоматически повторяет запрос при возникновении временной ошибки.

Source code in fastapi_accelerator/integration/stability_patterns.py
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
class RetryPattern(BaseStabilityPattern):
    """(Паттерн повторения) - Автоматически повторяет запрос при возникновении временной ошибки."""

    def __init__(
        self, max_attempts: int = 3, delay: timedelta = timedelta(seconds=1)
    ) -> None:
        """
        max_attempts: Сколько раз попытаться повторить запрос
        delay: Задержка между попытками запросов
        """
        self.max_attempts = max_attempts
        self.delay = delay

    async def run(self, func: Callable[..., Coroutine]) -> Any:
        attempts = 0
        while attempts < self.max_attempts:
            try:
                return await func()
            except Exception as e:
                attempts += 1
                if attempts == self.max_attempts:
                    raise MaxRetriesExceededError(self.max_attempts) from e
                await asyncio.sleep(self.delay.total_seconds())
__init__(max_attempts=3, delay=timedelta(seconds=1))

max_attempts: Сколько раз попытаться повторить запрос delay: Задержка между попытками запросов

Source code in fastapi_accelerator/integration/stability_patterns.py
194
195
196
197
198
199
200
201
202
def __init__(
    self, max_attempts: int = 3, delay: timedelta = timedelta(seconds=1)
) -> None:
    """
    max_attempts: Сколько раз попытаться повторить запрос
    delay: Задержка между попытками запросов
    """
    self.max_attempts = max_attempts
    self.delay = delay

Throttling

Bases: BaseStabilityPattern

(Регулирование) - Ограничивает количество запросов к ресурсу для предотвращения его перегрузки. Защищает систему от шторма запросов.

Source code in fastapi_accelerator/integration/stability_patterns.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
class Throttling(BaseStabilityPattern):
    """(Регулирование) - Ограничивает количество запросов к ресурсу для предотвращения его перегрузки.
    Защищает систему от шторма запросов."""

    def __init__(self, calls_per_second: int = 1_000):
        """
        calls_per_second: Сколько разрешено запросов в секунду
        """
        self.calls_per_second = calls_per_second
        self._last_called = 0
        self._interval = 1 / calls_per_second

    async def run(self, func: Callable[..., Coroutine]) -> Any:
        current_time = asyncio.get_event_loop().time()
        time_passed = current_time - self._last_called
        # Вычисляем, сколько времени нужно подождать
        if time_passed < self._interval:
            raise ThrottlingError("Превышен лимит запросов в секунду.")
        # Обновляем время последнего вызова
        self._last_called = asyncio.get_event_loop().time()
        # Вызываем переданную функцию
        return await func()
__init__(calls_per_second=1000)

calls_per_second: Сколько разрешено запросов в секунду

Source code in fastapi_accelerator/integration/stability_patterns.py
219
220
221
222
223
224
225
def __init__(self, calls_per_second: int = 1_000):
    """
    calls_per_second: Сколько разрешено запросов в секунду
    """
    self.calls_per_second = calls_per_second
    self._last_called = 0
    self._interval = 1 / calls_per_second

Timeout

Bases: BaseStabilityPattern

(Тайм-аут) - Ограничивает время ожидания ответа от внешнего сервиса. Предотвращает блокировку ресурсов при зависании вызова.

Source code in fastapi_accelerator/integration/stability_patterns.py
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class Timeout(BaseStabilityPattern):
    """(Тайм-аут) - Ограничивает время ожидания ответа от внешнего сервиса. Предотвращает блокировку ресурсов при
    зависании вызова."""

    def __init__(self, seconds: int = 10) -> None:
        """
        seconds: Через сколько секунд прервать выполнение запроса
        """
        self.seconds = seconds

    async def run(self, func: Callable[..., Coroutine]) -> Any:
        try:
            return await asyncio.wait_for(func(), timeout=self.seconds)
        except asyncio.TimeoutError:
            raise StabilityTimeoutError("Function call timed out")
__init__(seconds=10)

seconds: Через сколько секунд прервать выполнение запроса

Source code in fastapi_accelerator/integration/stability_patterns.py
126
127
128
129
130
def __init__(self, seconds: int = 10) -> None:
    """
    seconds: Через сколько секунд прервать выполнение запроса
    """
    self.seconds = seconds

convert_response(rtypes, data)

Конвертирует входные данные в модель Pydantic или список моделей.

Эта функция пытается преобразовать входные данные в указанный тип или типы. Она поддерживает одиночные типы, Union типы, и списки типов.

Parameters:

Name Type Description Default
rtypes GenericAlias

Ожидаемый тип или типы ответа. Может быть одиночным типом или Union из нескольких типов.

required
data Union[dict, list]

Данные для конвертации. Могут быть словарем или списком.

required

Returns:

Type Description
Union[Any, BaseModel, List[BaseModel]]

Union[dict, list, BaseModel, List[BaseModel]]: Сконвертированные данные.

Union[Any, BaseModel, List[BaseModel]]

Если конвертация не удалась или не требовалась, возвращаются исходные данные.

Raises:

Type Description
ValidationError

Если все попытки конвертации завершились неудачно.

Source code in fastapi_accelerator/integration/base_integration.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def convert_response(
    rtypes: GenericAlias, data: Union[Any, dict, list]
) -> Union[Any, BaseModel, List[BaseModel]]:
    """
    Конвертирует входные данные в модель Pydantic или список моделей.

    Эта функция пытается преобразовать входные данные в указанный тип или типы.
    Она поддерживает одиночные типы, Union типы, и списки типов.

    Args:
        rtypes (GenericAlias):
            Ожидаемый тип или типы ответа. Может быть одиночным типом или Union из нескольких типов.
        data (Union[dict, list]):
            Данные для конвертации. Могут быть словарем или списком.

    Returns:
        Union[dict, list, BaseModel, List[BaseModel]]: Сконвертированные данные.
        Если конвертация не удалась или не требовалась, возвращаются исходные данные.

    Raises:
        ValidationError: Если все попытки конвертации завершились неудачно.
    """
    # Если rtypes - это Union, разворачиваем его в список типов.
    # В противном случае, создаем список из одного элемента.
    rtypes = rtypes.__args__ if isinstance(rtypes, UnionType) else [rtypes]

    # Определяем, сколько ошибок можно пропустить.
    # Это позволяет попробовать все типы из Union перед тем, как выбросить исключение.
    skip_error = len(rtypes) - 1

    for rtype in rtypes:
        # Проверяем, ожидается ли список элементов
        origin = get_origin(rtype)
        many = origin is list
        if many:
            # Если ожидается список, извлекаем тип элементов списка
            rtype = get_args(rtype)[0]

        # Проверяем, является ли ожидаемый тип Pydantic моделью
        if isinstance(rtype, type) and issubclass(rtype, BaseModel):
            try:
                if many:
                    # Если ожидается список моделей, применяем parse_obj к каждому элементу
                    return [rtype.model_validate(d) for d in data]
                else:
                    # Если ожидается одна модель, применяем parse_obj ко всем данным
                    return rtype.model_validate(data)
            except ValidationError as e:
                # Если возникла ошибка валидации, проверяем, можно ли её пропустить
                if skip_error:
                    # Если можно пропустить, уменьшаем счетчик и продолжаем цикл
                    skip_error -= 1
                else:
                    # Если пропустить нельзя, выбрасываем исключение
                    raise e
        # Есои не указана схема, то не пытаемся делать конвертацию типов
        else:
            break

    # Если ни одна конвертация не удалась или не требовалась, возвращаем исходные данные
    return data

wraper_endpoint(self_endpoint, func, url, version, httpmethod, *args, **kwargs) async

Выполнить метод с интеграцией

Примечание

Эта функция можно будет заменена во вемя тестирования, чтобы не делать реальные запросы. Для этого используйте фикстуру patch_integration

Source code in fastapi_accelerator/integration/http_integration.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
async def wraper_endpoint(
    self_endpoint: EndpointsDeclaration,
    func: Callable,
    url: ParseResult,
    version: str,
    httpmethod: HTTPMethod,
    *args,
    **kwargs,
) -> R:
    """Выполнить метод с интеграцией

    Примечание:
        Эта функция можно будет заменена во вемя тестирования,
        чтобы не делать реальные запросы.
        Для этого используйте фикстуру patch_integration
    """

    async with httpx.AsyncClient() as client:
        response = await func(
            ApiHTTP(self_endpoint.credentials, url, version, httpmethod.name, client),
            *args,
            **kwargs,
        )
    return response

fastapi_accelerator.testutils

Пакет с логикой тестовых фикстур, который упрощают создания и поддержание тестов в FastAPi проекте

ApiHTTP

Bases: NamedTuple

Stores API connection details and client.

Source code in fastapi_accelerator/integration/http_integration.py
31
32
33
34
35
36
37
38
class ApiHTTP(NamedTuple):
    """Stores API connection details and client."""

    credentials: dict | None  # Authentication credentials
    url: ParseResult  # Parsed URL of the API endpoint
    version: str  # API version
    httpmethod: str  # Use HTTP method
    client: httpx.AsyncClient | None  # HTTP client for making requests (None in tests)

BaseAuthJWT

Пример:

class AuthJWT(BaseAuthJWT):
    async def check_auth(username: str, password: str) -> bool:
        """Проверка введенного логина и пароля."""
        return username == "admin" and password == "admin"

    async def add_jwt_body(username: str) -> dict:
        """Функция для добавление дополнительных данных в JWT токен пользователя"""
        return {"version": username.title()}


# Подключить аутентификацию по JWT
AuthJWT.mount_auth(app)

Пример защиты API метода:

@app.get("/cheack_protected", summary="Проверить аутентификацию по JWT")
async def protected_route(jwt: dict = Depends(jwt_auth)):
    return {"message": "This is a protected route", "user": jwt}
Source code in fastapi_accelerator/auth_jwt.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
class BaseAuthJWT:
    '''
    Пример:

    ```python
    class AuthJWT(BaseAuthJWT):
        async def check_auth(username: str, password: str) -> bool:
            """Проверка введенного логина и пароля."""
            return username == "admin" and password == "admin"

        async def add_jwt_body(username: str) -> dict:
            """Функция для добавление дополнительных данных в JWT токен пользователя"""
            return {"version": username.title()}


    # Подключить аутентификацию по JWT
    AuthJWT.mount_auth(app)
    ```

    Пример защиты API метода:

    ```python
    @app.get("/cheack_protected", summary="Проверить аутентификацию по JWT")
    async def protected_route(jwt: dict = Depends(jwt_auth)):
        return {"message": "This is a protected route", "user": jwt}
    ```
    '''

    ALGORITHM = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES = 30
    # Установиться автоматически в mount_auth
    secret_key = None

    @abc.abstractmethod
    async def check_auth(username: str, password: str) -> bool:
        """Проверка введенного логина и пароля."""
        raise NotImplementedError()

    @abc.abstractmethod
    async def add_jwt_body(username: str) -> dict:
        """Функция для добавление дополнительных данных в JWT токен пользователя"""

    @classmethod
    def mount_auth(cls, app: FastAPI):
        """Подключение аутентификации по JWT"""
        # Установить класс для аутентификации
        app.state.auth_jwt = cls
        cls.secret_key = app.state.SECRET_KEY

        @app.post("/token", summary="Аутентификация по JWT", tags=["common"])
        async def login(
            user: Annotated[OAuth2PasswordRequestForm, Depends()],
        ) -> Token:
            if await cls.check_auth(user.username, user.password):
                return Token(
                    access_token=cls._create_access_token(
                        data={
                            "sub": user.username,
                            **await cls.add_jwt_body(user.username),
                        },
                    ),
                    token_type="bearer",
                )
            else:
                raise HTTPException(status_code=401, detail="Invalid credentials")

        @app.get(
            "/check_protected",
            summary="Проверить аутентификацию по JWT",
            tags=["common"],
        )
        async def protected_route(request: Request, jwt: dict = Depends(jwt_auth)):
            return {"message": "This is a protected route", "user": jwt}

        return login, protected_route

    @classmethod
    def _create_access_token(
        cls,
        data: dict,
        expires_delta: Union[timedelta, None] = None,
    ) -> str:
        """Создание JWT токена"""
        to_encode = data.copy()
        if expires_delta:
            expire = datetime.utcnow() + expires_delta
        else:
            expire = datetime.utcnow() + timedelta(
                minutes=cls.ACCESS_TOKEN_EXPIRE_MINUTES
            )
        to_encode.update({"exp": expire})
        encoded_jwt = jwt.encode(to_encode, cls.secret_key, algorithm=cls.ALGORITHM)
        return encoded_jwt

    @classmethod
    def _verify_token(cls, token: str) -> Optional[dict]:
        """Проверка валидности JWT токена"""
        try:
            payload = jwt.decode(token, cls.secret_key, algorithms=[cls.ALGORITHM])
            return payload
        except jwt.ExpiredSignatureError:
            return None
        except jwt.InvalidTokenError:
            return None

add_jwt_body(username) abstractmethod async

Функция для добавление дополнительных данных в JWT токен пользователя

Source code in fastapi_accelerator/auth_jwt.py
60
61
62
@abc.abstractmethod
async def add_jwt_body(username: str) -> dict:
    """Функция для добавление дополнительных данных в JWT токен пользователя"""

check_auth(username, password) abstractmethod async

Проверка введенного логина и пароля.

Source code in fastapi_accelerator/auth_jwt.py
55
56
57
58
@abc.abstractmethod
async def check_auth(username: str, password: str) -> bool:
    """Проверка введенного логина и пароля."""
    raise NotImplementedError()

mount_auth(app) classmethod

Подключение аутентификации по JWT

Source code in fastapi_accelerator/auth_jwt.py
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
@classmethod
def mount_auth(cls, app: FastAPI):
    """Подключение аутентификации по JWT"""
    # Установить класс для аутентификации
    app.state.auth_jwt = cls
    cls.secret_key = app.state.SECRET_KEY

    @app.post("/token", summary="Аутентификация по JWT", tags=["common"])
    async def login(
        user: Annotated[OAuth2PasswordRequestForm, Depends()],
    ) -> Token:
        if await cls.check_auth(user.username, user.password):
            return Token(
                access_token=cls._create_access_token(
                    data={
                        "sub": user.username,
                        **await cls.add_jwt_body(user.username),
                    },
                ),
                token_type="bearer",
            )
        else:
            raise HTTPException(status_code=401, detail="Invalid credentials")

    @app.get(
        "/check_protected",
        summary="Проверить аутентификацию по JWT",
        tags=["common"],
    )
    async def protected_route(request: Request, jwt: dict = Depends(jwt_auth)):
        return {"message": "This is a protected route", "user": jwt}

    return login, protected_route

BaseAuthJwtPytest

Bases: BasePytest

Базовый класс для тестов с использованием pytest, который выполняет логику аутентификации для клиента по JWT

Source code in fastapi_accelerator/testutils/utils.py
170
171
172
173
174
175
176
177
178
179
180
181
182
class BaseAuthJwtPytest(BasePytest):
    """
    Базовый класс для тестов с использованием pytest,
    который выполняет логику аутентификации для клиента по JWT
    """

    @pytest.fixture(autouse=True)
    def setup_method(self, client):
        @client_auth_jwt()
        def inner(self, client):
            return super().setup_method()

        return inner(self, client=client)

BasePytest

Базовый класс для тестов с использованием pytest.

Данный класс предоставляет методы для настройки и очистки состояния тестов. Наследуйте этот класс и называйте дочерний класс в формате:

class TestИмяКласса(BasePytest):

    def setUp(self):
        ...

    def test_метод_1(self):
        ...
Source code in fastapi_accelerator/testutils/utils.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
class BasePytest:
    """
    Базовый класс для тестов с использованием pytest.

    Данный класс предоставляет методы для настройки и очистки состояния тестов.
    Наследуйте этот класс и называйте дочерний класс в формате:

    ```python
    class TestИмяКласса(BasePytest):

        def setUp(self):
            ...

        def test_метод_1(self):
            ...
    ```
    """

    # Данные для входа тестового пользователя
    TEST_USER = {"username": "test", "password": "qwerty"}

    def setup_method(self, method=None):
        """
        Вызывается перед выполнением каждого тестового метода.

        Этот метод вызывает setUp(), который может быть переопределен в дочернем классе
        для выполнения необходимой настройки перед тестами.

        :param method: Метод, который будет выполняться (тест).
        """
        self.setUp()

    def setUp(self):
        """
        Метод для выполнения необходимой настройки перед каждым тестом.

        Этот метод может быть переопределен в дочернем классе для выполнения специфической
        настройки, необходимой для тестов.
        """

    def teardown_method(self, method=None):
        """
        Вызывается после выполнения каждого тестового метода.

        Этот метод вызывает tearDown(), который может быть переопределен в дочернем классе
        для выполнения необходимой очистки после тестов.

        :param method: Метод, который был выполнен (тест).
        """
        self.tearDown()

    def tearDown(self):
        """
        Метод для выполнения необходимой очистки после каждого теста.

        Этот метод может быть переопределен в дочернем классе для выполнения специфической
        очистки, необходимой после тестов.
        """

    @classmethod
    def setup_class(cls):
        """
        Вызывается перед выполнением всех тестовых методов в классе.

        Этот метод вызывает setUpClass(), который может быть переопределен в дочернем классе
        для выполнения необходимой настройки перед всеми тестами в классе.
        """
        cls.setUpClass()

    @classmethod
    def setUpClass(cls):
        """
        Метод для выполнения необходимой настройки перед всеми тестами в классе.

        Этот метод может быть переопределен в дочернем классе для выполнения специфической
        настройки, необходимой для всех тестов в классе.
        """

    @classmethod
    def teardown_class(cls):
        """
        Вызывается после выполнения всех тестовых методов в классе.

        Этот метод может быть переопределен в дочернем классе для выполнения необходимой
        очистки после всех тестов в классе.
        """
        cls.tearDownClass()

    @classmethod
    def tearDownClass(cls):
        """
        Метод для выполнения необходимой очистки после всех тестов в классе.

        Этот метод может быть переопределен в дочернем классе для выполнения специфической
        очистки, необходимой после всех тестов в классе.
        """

setUp()

Метод для выполнения необходимой настройки перед каждым тестом.

Этот метод может быть переопределен в дочернем классе для выполнения специфической настройки, необходимой для тестов.

Source code in fastapi_accelerator/testutils/utils.py
104
105
106
107
108
109
110
def setUp(self):
    """
    Метод для выполнения необходимой настройки перед каждым тестом.

    Этот метод может быть переопределен в дочернем классе для выполнения специфической
    настройки, необходимой для тестов.
    """

setUpClass() classmethod

Метод для выполнения необходимой настройки перед всеми тестами в классе.

Этот метод может быть переопределен в дочернем классе для выполнения специфической настройки, необходимой для всех тестов в классе.

Source code in fastapi_accelerator/testutils/utils.py
141
142
143
144
145
146
147
148
@classmethod
def setUpClass(cls):
    """
    Метод для выполнения необходимой настройки перед всеми тестами в классе.

    Этот метод может быть переопределен в дочернем классе для выполнения специфической
    настройки, необходимой для всех тестов в классе.
    """

setup_class() classmethod

Вызывается перед выполнением всех тестовых методов в классе.

Этот метод вызывает setUpClass(), который может быть переопределен в дочернем классе для выполнения необходимой настройки перед всеми тестами в классе.

Source code in fastapi_accelerator/testutils/utils.py
131
132
133
134
135
136
137
138
139
@classmethod
def setup_class(cls):
    """
    Вызывается перед выполнением всех тестовых методов в классе.

    Этот метод вызывает setUpClass(), который может быть переопределен в дочернем классе
    для выполнения необходимой настройки перед всеми тестами в классе.
    """
    cls.setUpClass()

setup_method(method=None)

Вызывается перед выполнением каждого тестового метода.

Этот метод вызывает setUp(), который может быть переопределен в дочернем классе для выполнения необходимой настройки перед тестами.

:param method: Метод, который будет выполняться (тест).

Source code in fastapi_accelerator/testutils/utils.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
def setup_method(self, method=None):
    """
    Вызывается перед выполнением каждого тестового метода.

    Этот метод вызывает setUp(), который может быть переопределен в дочернем классе
    для выполнения необходимой настройки перед тестами.

    :param method: Метод, который будет выполняться (тест).
    """
    self.setUp()

tearDown()

Метод для выполнения необходимой очистки после каждого теста.

Этот метод может быть переопределен в дочернем классе для выполнения специфической очистки, необходимой после тестов.

Source code in fastapi_accelerator/testutils/utils.py
123
124
125
126
127
128
129
def tearDown(self):
    """
    Метод для выполнения необходимой очистки после каждого теста.

    Этот метод может быть переопределен в дочернем классе для выполнения специфической
    очистки, необходимой после тестов.
    """

tearDownClass() classmethod

Метод для выполнения необходимой очистки после всех тестов в классе.

Этот метод может быть переопределен в дочернем классе для выполнения специфической очистки, необходимой после всех тестов в классе.

Source code in fastapi_accelerator/testutils/utils.py
160
161
162
163
164
165
166
167
@classmethod
def tearDownClass(cls):
    """
    Метод для выполнения необходимой очистки после всех тестов в классе.

    Этот метод может быть переопределен в дочернем классе для выполнения специфической
    очистки, необходимой после всех тестов в классе.
    """

teardown_class() classmethod

Вызывается после выполнения всех тестовых методов в классе.

Этот метод может быть переопределен в дочернем классе для выполнения необходимой очистки после всех тестов в классе.

Source code in fastapi_accelerator/testutils/utils.py
150
151
152
153
154
155
156
157
158
@classmethod
def teardown_class(cls):
    """
    Вызывается после выполнения всех тестовых методов в классе.

    Этот метод может быть переопределен в дочернем классе для выполнения необходимой
    очистки после всех тестов в классе.
    """
    cls.tearDownClass()

teardown_method(method=None)

Вызывается после выполнения каждого тестового метода.

Этот метод вызывает tearDown(), который может быть переопределен в дочернем классе для выполнения необходимой очистки после тестов.

:param method: Метод, который был выполнен (тест).

Source code in fastapi_accelerator/testutils/utils.py
112
113
114
115
116
117
118
119
120
121
def teardown_method(self, method=None):
    """
    Вызывается после выполнения каждого тестового метода.

    Этот метод вызывает tearDown(), который может быть переопределен в дочернем классе
    для выполнения необходимой очистки после тестов.

    :param method: Метод, который был выполнен (тест).
    """
    self.tearDown()

EndpointsDeclaration

Базовый класс для объявления интеграций с внешними API.

Этот класс предоставляет платформу для определения и управления интеграциями API, включая обработку аутентификации и настройку базового URL.

Source code in fastapi_accelerator/integration/http_integration.py
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
class EndpointsDeclaration:
    """
    Базовый класс для объявления интеграций с внешними API.

    Этот класс предоставляет платформу для определения и управления интеграциями API,
    включая обработку аутентификации и настройку базового URL.
    """

    integration: IntegrationHTTP | None = None

    def __init__(self, base_url: URL = "", credentials: dict | None = None):
        """
        Аргументы:
            base_url (URL): Базовый URL для API. Это может быть доменное имя или адрес в формате host:port.
            credentials (dict | None): Доступные учетные данные для аутентификации в внешней системе.

        Примечание:
            Рекомендуется задавать base_url через переменные окружения, чтобы избежать проблем с конфигурацией
            на разных уровнях (например, на production-среде) из-за межсетевых экранов или других факторов.
        """
        self.base_url = base_url
        self.credentials = credentials

    class Schema:
        """Схема Pydantic для успешных ответов.

        Эта схема описывает структуру данных, которые ожидаются в успешных ответах от REST API.
        """

    class SchemaError:
        """Схема Pydantic для неуспешных ответов.

        Эта схема описывает структуру данных, которые могут быть возвращены
        в случае ошибки при взаимодействии с REST API.
        """

Schema

Схема Pydantic для успешных ответов.

Эта схема описывает структуру данных, которые ожидаются в успешных ответах от REST API.

Source code in fastapi_accelerator/integration/http_integration.py
194
195
196
197
198
class Schema:
    """Схема Pydantic для успешных ответов.

    Эта схема описывает структуру данных, которые ожидаются в успешных ответах от REST API.
    """

SchemaError

Схема Pydantic для неуспешных ответов.

Эта схема описывает структуру данных, которые могут быть возвращены в случае ошибки при взаимодействии с REST API.

Source code in fastapi_accelerator/integration/http_integration.py
200
201
202
203
204
205
class SchemaError:
    """Схема Pydantic для неуспешных ответов.

    Эта схема описывает структуру данных, которые могут быть возвращены
    в случае ошибки при взаимодействии с REST API.
    """

__init__(base_url='', credentials=None)

Аргументы

base_url (URL): Базовый URL для API. Это может быть доменное имя или адрес в формате host:port. credentials (dict | None): Доступные учетные данные для аутентификации в внешней системе.

Примечание

Рекомендуется задавать base_url через переменные окружения, чтобы избежать проблем с конфигурацией на разных уровнях (например, на production-среде) из-за межсетевых экранов или других факторов.

Source code in fastapi_accelerator/integration/http_integration.py
181
182
183
184
185
186
187
188
189
190
191
192
def __init__(self, base_url: URL = "", credentials: dict | None = None):
    """
    Аргументы:
        base_url (URL): Базовый URL для API. Это может быть доменное имя или адрес в формате host:port.
        credentials (dict | None): Доступные учетные данные для аутентификации в внешней системе.

    Примечание:
        Рекомендуется задавать base_url через переменные окружения, чтобы избежать проблем с конфигурацией
        на разных уровнях (например, на production-среде) из-за межсетевых экранов или других факторов.
    """
    self.base_url = base_url
    self.credentials = credentials

HTTPMethod

Bases: str, Enum

Доступные HTTP методы

Source code in fastapi_accelerator/integration/http_integration.py
21
22
23
24
25
26
27
28
class HTTPMethod(str, Enum):
    """Доступные HTTP методы"""

    get = "GET"
    post = "POST"
    put = "PUT"
    patch = "PATCH"
    delete = "DELETE"

MockRules

Source code in fastapi_accelerator/testutils/fixture_integration.py
15
16
17
18
19
20
21
22
23
24
25
class MockRules:
    def __init__(self, mock_rules: dict[Callable, Callable]) -> None:
        """

        Аргументы:
            mock_rules (dict[Callable, Callable]): Правила подмены методов интеграции на mock.
            Если в коде вызывается интеграция, которая не указана в mock_rules, возникает исключение.
            Это предотвращает случайные реальные запросы, если вы забыли указать mock.

        """
        self._rules = mock_rules

__init__(mock_rules)

Аргументы

mock_rules (dict[Callable, Callable]): Правила подмены методов интеграции на mock. Если в коде вызывается интеграция, которая не указана в mock_rules, возникает исключение. Это предотвращает случайные реальные запросы, если вы забыли указать mock.

Source code in fastapi_accelerator/testutils/fixture_integration.py
16
17
18
19
20
21
22
23
24
25
def __init__(self, mock_rules: dict[Callable, Callable]) -> None:
    """

    Аргументы:
        mock_rules (dict[Callable, Callable]): Правила подмены методов интеграции на mock.
        Если в коде вызывается интеграция, которая не указана в mock_rules, возникает исключение.
        Это предотвращает случайные реальные запросы, если вы забыли указать mock.

    """
    self._rules = mock_rules

SQLQueryTracker

Хранит трекер запросов в РСУБД

Source code in fastapi_accelerator/testutils/fixture_db/trace_sql.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
class SQLQueryTracker:
    """Хранит трекер запросов в РСУБД"""

    def __init__(self):
        self.queries: list[NamedTuple] = []

    def add(self, statement: str, params: tuple, executemany: bool):
        """Добавить SQL команду в трекер"""
        self.queries.append(TrackerNameTuple(statement, params, executemany))

    @property
    def count(self) -> int:
        return len(self.queries)

    def __str__(self) -> str:
        return f"{self.queries}"

add(statement, params, executemany)

Добавить SQL команду в трекер

Source code in fastapi_accelerator/testutils/fixture_db/trace_sql.py
27
28
29
def add(self, statement: str, params: tuple, executemany: bool):
    """Добавить SQL команду в трекер"""
    self.queries.append(TrackerNameTuple(statement, params, executemany))

SettingTest

Настройки для тестов

Пример app/conftest.py:

from app.core.config import TEST_DATABASE_URL
from fastapi_accelerator.db.dbsession import MainDatabaseManager

# Вы можете указать точный список импорта, это для простоты мы импортируем все
from fastapi_accelerator.testutils import *  # noqa E402
from fastapi_accelerator.testutils import SettingTest

# Нужно создать менеджер БД до импорта APP
# чтобы паттерн одиночка создал только тестовое instance
# а в приложение уже взялся тестовый instance
TestDatabaseManager = MainDatabaseManager(
    TEST_DATABASE_URL, echo=False, DEV_STATUS=True
)

from main import app  # noqa E402

# Отключить кеширование во время тестов
app.state.CACHE_STATUS = False

SettingTest(TestDatabaseManager, app, alembic_migrate=True, keepdb=True)
Source code in fastapi_accelerator/testutils/fixture_base.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
class SettingTest(metaclass=SingletonMeta):
    """Настройки для тестов

    Пример `app/conftest.py`:

    ```python
    from app.core.config import TEST_DATABASE_URL
    from fastapi_accelerator.db.dbsession import MainDatabaseManager

    # Вы можете указать точный список импорта, это для простоты мы импортируем все
    from fastapi_accelerator.testutils import *  # noqa E402
    from fastapi_accelerator.testutils import SettingTest

    # Нужно создать менеджер БД до импорта APP
    # чтобы паттерн одиночка создал только тестовое instance
    # а в приложение уже взялся тестовый instance
    TestDatabaseManager = MainDatabaseManager(
        TEST_DATABASE_URL, echo=False, DEV_STATUS=True
    )

    from main import app  # noqa E402

    # Отключить кеширование во время тестов
    app.state.CACHE_STATUS = False

    SettingTest(TestDatabaseManager, app, alembic_migrate=True, keepdb=True)
    ```

    """

    def __init__(
        self,
        DatabaseManager: MainDatabaseManager,
        app: FastAPI,
        alembic_migrate: bool = False,
        keepdb: bool = True,
    ) -> Self:
        """
        alembic_migrate:
            Использовать ли для создания таблиц, миграции alembic, или сразу создавать конечный вариант таблиц
            лучше использовать alembic миграции, чтобы сразу проверять и их  вов ремя тестирования
            - Если True -> использовать alembic миграции
            - Если False -> использовать create_all()

        keepdb:
            - Если True -> не удалять тестовую БД после тестов
            - Если False -> удалить тестовую БД после тестов
        """

        # Проверить что в имени БД есть подстрока test
        if (
            DatabaseManager.database_url.split("/")[-1].find("test") == -1
            or DatabaseManager.adatabase_url.split("/")[-1].find("test") == -1
        ):
            raise ValueError("Имя БД должно иметь подстроку test")

        self.DatabaseManager = DatabaseManager
        self.app = app
        self.alembic_migrate = alembic_migrate
        self.keepdb = keepdb

__init__(DatabaseManager, app, alembic_migrate=False, keepdb=True)

alembic_migrate

Использовать ли для создания таблиц, миграции alembic, или сразу создавать конечный вариант таблиц лучше использовать alembic миграции, чтобы сразу проверять и их вов ремя тестирования - Если True -> использовать alembic миграции - Если False -> использовать create_all()

keepdb
  • Если True -> не удалять тестовую БД после тестов
  • Если False -> удалить тестовую БД после тестов
Source code in fastapi_accelerator/testutils/fixture_base.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def __init__(
    self,
    DatabaseManager: MainDatabaseManager,
    app: FastAPI,
    alembic_migrate: bool = False,
    keepdb: bool = True,
) -> Self:
    """
    alembic_migrate:
        Использовать ли для создания таблиц, миграции alembic, или сразу создавать конечный вариант таблиц
        лучше использовать alembic миграции, чтобы сразу проверять и их  вов ремя тестирования
        - Если True -> использовать alembic миграции
        - Если False -> использовать create_all()

    keepdb:
        - Если True -> не удалять тестовую БД после тестов
        - Если False -> удалить тестовую БД после тестов
    """

    # Проверить что в имени БД есть подстрока test
    if (
        DatabaseManager.database_url.split("/")[-1].find("test") == -1
        or DatabaseManager.adatabase_url.split("/")[-1].find("test") == -1
    ):
        raise ValueError("Имя БД должно иметь подстроку test")

    self.DatabaseManager = DatabaseManager
    self.app = app
    self.alembic_migrate = alembic_migrate
    self.keepdb = keepdb

SingletonMeta

Bases: type

Мета класс для реализации паттерна Одиночка

Source code in fastapi_accelerator/utils.py
 6
 7
 8
 9
10
11
12
13
14
class SingletonMeta(type):
    """Мета класс для реализации паттерна Одиночка"""

    instance = None

    def __call__(cls, *args, **kwargs):
        if cls.instance is None:
            cls.instance = super(SingletonMeta, cls).__call__(*args, **kwargs)
        return cls.instance

TrackerNameTuple

Bases: NamedTuple

Хранит SQL команду

Source code in fastapi_accelerator/testutils/fixture_db/trace_sql.py
11
12
13
14
15
16
17
18
class TrackerNameTuple(NamedTuple):
    """Хранит SQL команду"""

    # Текст SQL
    statement: str
    # Параметры в SQL
    params: list | dict
    executemany: bool

apply_fixture_db(export_func, flush=False)

Декоратор, который добавляет фикстуры в БД перед тестом и удаляет их после теста.

:param export_func: Функция, возвращающая объекты для добавления в БД. :param flush: Удалить данные после выполнения теста, если установлено в True. Не нужно указывать если вы используете фикстуры client, так как он уже выполняет отчистку всех данных в common_setup_table

Пример:

def export_fixture_task() -> NamedTuple: return dict_to_namedtuple( file1=FileDb( uid=uuid.UUID("469d4176-98f3-48a2-8794-0e2472bc2b7e"), filename="file1.txt", size=100, format="text/plain", extension=".txt", ) )

@apply_fixture_db(export_fixture_task) def test_base(client: TestClient, fixtures: NamedTuple): response = client.get('url') assert response.status_code == 200 assert response.json() == {"uid": fixtures.file1.uid}

Source code in fastapi_accelerator/testutils/fixture_db/apply_fixture.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def apply_fixture_db(  # noqa C901
    export_func: Callable[[], NamedTuple], flush: bool = False
):
    """Декоратор, который добавляет фикстуры в БД перед тестом и удаляет их после теста.

    :param export_func: Функция, возвращающая объекты для добавления в БД.
    :param flush: Удалить данные после выполнения теста, если установлено в True.
        Не нужно указывать если вы используете фикстуры `client`,
        так как он уже выполняет отчистку всех данных в `common_setup_table`

    Пример:

    def export_fixture_task() -> NamedTuple:
        return dict_to_namedtuple(
            file1=FileDb(
                uid=uuid.UUID("469d4176-98f3-48a2-8794-0e2472bc2b7e"),
                filename="file1.txt",
                size=100,
                format="text/plain",
                extension=".txt",
            )
        )

    @apply_fixture_db(export_fixture_task)
    def test_base(client: TestClient, fixtures: NamedTuple):
        response = client.get('url')
        assert response.status_code == 200
        assert response.json() == {"uid": fixtures.file1.uid}
    """

    def up(fixtures: NamedTuple) -> NamedTuple:
        # Накатить фикстуры
        with SettingTest.instance.DatabaseManager.session() as session:
            session: Session
            try:
                session.add_all(fixtures)
                session.commit()
                for item in fixtures:
                    session.refresh(item)
            except IntegrityError as e:
                session.rollback()
                raise e

        return fixtures

    def down(fixtures: NamedTuple):
        if flush:
            # Удалить фикстуры
            with SettingTest.instance.DatabaseManager.session() as session:
                session: Session
                try:
                    for item in fixtures:
                        session.delete(item)
                    session.commit()
                except IntegrityError as e:
                    session.rollback()
                    raise e

    def decor(func):
        @wraps(func)
        def wrap(*args, **kwargs):
            fixtures = export_func()
            try:
                up(fixtures)
                # Если ожидается аргумент fixtures
                if "fixtures" in inspect.getfullargspec(func).args:
                    # то подменяем его
                    kwargs["fixtures"] = fixtures
                return func(*args, **kwargs)
            finally:
                down(fixtures)

        return wrap

    return decor

check_response_json(response, exp_status_code, exp_json, exclude_list=None)

Проверка json API ответа

Пример:

def test_get_item(self, client: TestClient): response = client.get(self.url) check_response_json(response, 200, {...})

Source code in fastapi_accelerator/testutils/utils.py
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def check_response_json(
    response: Response,
    exp_status_code: int,
    exp_json: Any,
    exclude_list: Optional[list[str]] = None,
) -> bool:
    """Проверка json API ответа

    Пример:

    def test_get_item(self, client: TestClient):
        response = client.get(self.url)
        check_response_json(response, 200, {...})
    """
    assert response.status_code == exp_status_code
    response_json = response.json()
    if exclude_list:
        rm_key_from_deep_dict(response_json, exclude_list)
    assert response_json == exp_json
    return True

client(common_client, common_clean_table)

Вернуть тестовый клиент FastAPI.

Source code in fastapi_accelerator/testutils/fixture_db/db.py
86
87
88
89
90
91
@pytest.fixture(scope="function")
def client(
    common_client: TestClient, common_clean_table
) -> Generator[TestClient, Any, None]:
    """Вернуть тестовый клиент FastAPI."""
    yield common_client

client_auth_jwt(username=None)

Декоратор который аутентифицирует тестового клиента по JWT.

Использование в функции:

@client_auth_jwt(username="test"}) def test_get_list(client: TestClient): response = client.get(self.url) assert response.status_code == 200 assert response.json() == []

Использование в классе:

class TestTaskExecution(BasePytest):

@client_auth_jwt() # Данные для входа возьмутся из self.TEST_USER
def test_get_list(self, client: TestClient):
    response = client.get(self.url)
    assert response.status_code == 200
    assert response.json() == []
Source code in fastapi_accelerator/testutils/fixture_auth.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def client_auth_jwt(username: str = None):
    """Декоратор который аутентифицирует тестового клиента по JWT.

    Использование в функции:

    @client_auth_jwt(username="test"})
    def test_get_list(client: TestClient):
        response = client.get(self.url)
        assert response.status_code == 200
        assert response.json() == []

    Использование в классе:

    class TestTaskExecution(BasePytest):

        @client_auth_jwt() # Данные для входа возьмутся из self.TEST_USER
        def test_get_list(self, client: TestClient):
            response = client.get(self.url)
            assert response.status_code == 200
            assert response.json() == []

    """
    auth_jwt: BaseAuthJWT | None = SettingTest.instance.app.state.auth_jwt

    if not auth_jwt:
        raise ValueError("No found state - auth_jwt.")

    def decor(func):

        @wraps(func)
        def wrap(*args, **kwargs):
            # username можно передать в аргументе,
            # иначе он возьмется из self.TEST_USER класса
            # в котором обвялен тестовый метод
            current_username = username or args[0].TEST_USER["username"]
            access_token: str = auth_jwt._create_access_token(
                data={
                    "sub": current_username,
                    **run_async(auth_jwt.add_jwt_body(current_username)),
                },
            )
            kwargs["client"].headers["authorization"] = f"Bearer {access_token}"
            return func(*args, **kwargs)

        return wrap

    return decor

common_clean_table(common_setup_database)

Отчистка данных в таблицах, выполняется после каждого теста.

Source code in fastapi_accelerator/testutils/fixture_db/db.py
74
75
76
77
78
79
80
81
82
83
@pytest.fixture(scope="function")
def common_clean_table(common_setup_database) -> Generator:
    """Отчистка данных в таблицах, выполняется после каждого теста."""
    try:
        yield
    finally:
        # Отчистить данные в таблицах
        SettingTest.instance.DatabaseManager.clear_all(["alembic_version"])
        # Синхронный вызов асинхронного метода dispose()
        run_async(SettingTest.instance.DatabaseManager.dispose())

common_client()

Создает тестовый клиент FastAPI.

Source code in fastapi_accelerator/testutils/fixture_db/db.py
66
67
68
69
70
71
@pytest.fixture(scope="session")
def common_client() -> Generator[TestClient, None, None]:
    """Создает тестовый клиент FastAPI."""

    with TestClient(SettingTest.instance.app) as test_client:
        yield test_client

common_setup_database(engine)

Создает и настраивает тестовую базу данных один раз, на протяжение всех тестов.

Source code in fastapi_accelerator/testutils/fixture_db/db.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
@pytest.fixture(scope="session")
def common_setup_database(engine) -> Generator[None, None, None]:
    """Создает и настраивает тестовую базу данных один раз, на протяжение всех тестов."""
    # Создать БД
    if not database_exists(engine.url):
        create_database(engine.url)

    if SettingTest.instance.alembic_migrate:
        # Использовать alembic для создания таблиц через миграции
        from alembic import command
        from alembic.config import Config

        alembic_cfg = Config("alembic.ini")
        alembic_cfg.set_main_option(
            "sqlalchemy.url", SettingTest.instance.DatabaseManager.database_url
        )
        command.upgrade(alembic_cfg, "head")
    else:
        # Создать сразу конечный вариант таблиц
        SettingTest.instance.DatabaseManager.create_all()
    try:
        yield
    finally:
        if not SettingTest.instance.keepdb:
            # Удалить таблицы после тестов
            SettingTest.instance.DatabaseManager.drop_all()

db_session()

Получить сессию к тестовой БД

Source code in fastapi_accelerator/testutils/fixture_db/db.py
94
95
96
97
98
@pytest.fixture(scope="function")
def db_session() -> Generator[Session, Any, None]:
    """Получить сессию к тестовой БД"""
    for session in SettingTest.instance.DatabaseManager.get_session():
        yield session

fixtures()

Пустая фикстура в которую будет вставлены значения из декоратора apply_fixture_db

Source code in fastapi_accelerator/testutils/fixture_db/apply_fixture.py
14
15
16
17
@pytest.fixture(scope="session")
def fixtures():
    """Пустая фикстура в которую будет вставлены значения из декоратора apply_fixture_db"""
    yield NotImplementedError()

patch_integration(mock_rules)

Декоратор для подмены методов интеграции на mock.

Примечание

Вы можете хранить mock_rules в отдельных переменных и переиспользовать их для разных функций/методов тестирования.

Source code in fastapi_accelerator/testutils/fixture_integration.py
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
def patch_integration(mock_rules: MockRules):
    """Декоратор для подмены методов интеграции на mock.

    Примечание:
        Вы можете хранить mock_rules в отдельных переменных и
        переиспользовать их для разных функций/методов тестирования.
    """

    def decor(func):
        @wraps(func)
        def wrap(*args, **kwargs):
            with patch(
                "fastapi_accelerator.integration.http_integration.wraper_endpoint"
            ) as wraper_endpoint:
                # Подмена методов интеграций на mock
                i = _IntegrationAsyncMock(wraper_endpoint)
                for real_func, mock_func in mock_rules._rules.items():
                    i.overwrite_method(real_func, mock_func)

                return func(*args, **kwargs)

        return wrap

    return decor

rm_key_from_deep_dict(data, keys)

Удалить ключи из dict или из списка словарей рекурсивно.

data: Данные для отчистки keys: Ключи которые нужно удалить из данных

Пример:

rm_key_from_deep_dict({"date": "...", "user": "..."}, ["data"])

{"user": "..."}

Source code in fastapi_accelerator/testutils/utils.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def rm_key_from_deep_dict(data: dict | list, keys: list[str]):
    """Удалить ключи из dict или из списка словарей рекурсивно.

    data: Данные для отчистки
    keys: Ключи которые нужно удалить из данных

    Пример:

    rm_key_from_deep_dict({"date": "...", "user": "..."}, ["data"])
    >>> {"user": "..."}
    """
    if isinstance(data, dict):
        # Удаляем ключи из словаря
        for key in keys:
            data.pop(key, None)  # Используем pop с None, чтобы избежать KeyError
        # Рекурсивно обрабатываем значения в словаре
        for value in data.values():
            rm_key_from_deep_dict(value, keys)
    elif isinstance(data, list):
        # Рекурсивно обрабатываем каждый элемент списка
        for item in data:
            rm_key_from_deep_dict(item, keys)

    return data

run_async(async_function)

Синхронная обертка для асинхронного вызова

Source code in fastapi_accelerator/utils.py
56
57
58
def run_async(async_function):
    """Синхронная обертка для асинхронного вызова"""
    return asyncio.get_event_loop().run_until_complete(async_function)

track_queries(db_manager, expected_count=None)

Перехват SQL команд, для их анализа

expected_count: Сколько ожидается выполниться SQL команд

Пример использования:

def test_get_list(self, client: TestClient, db_manager: MainDatabaseManager): with track_queries(db_manager, expected_count=2) as tracker: response = client.get(self.url)

print(tracker.queries)
Source code in fastapi_accelerator/testutils/fixture_db/trace_sql.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
@contextmanager
def track_queries(
    db_manager: MainDatabaseManager, expected_count: int = None
) -> Generator[SQLQueryTracker, None, None]:
    """
    Перехват SQL команд, для их анализа

    expected_count: Сколько ожидается выполниться SQL команд


    Пример использования:

    def test_get_list(self, client: TestClient, db_manager: MainDatabaseManager):
        with track_queries(db_manager, expected_count=2) as tracker:
            response = client.get(self.url)

        print(tracker.queries)
    """

    tracker = SQLQueryTracker()

    def _before_cursor_execute(
        conn, cursor, statement: str, params: tuple, context, executemany: bool
    ):
        tracker.add(statement, params, executemany)

    # Отлеживаем события как в синхронном так и в асинхронном engine
    event.listen(
        db_manager.aengine.sync_engine, "before_cursor_execute", _before_cursor_execute
    )
    event.listen(db_manager.engine, "before_cursor_execute", _before_cursor_execute)
    try:
        # Трекер который можно анализировать в тестах
        yield tracker
    finally:
        # Отключить отслеживание
        event.remove(
            db_manager.aengine.sync_engine,
            "before_cursor_execute",
            _before_cursor_execute,
        )
        event.remove(db_manager.engine, "before_cursor_execute", _before_cursor_execute)

        if expected_count:
            # Проверить количество sql команд
            if tracker.count != expected_count:
                raise ValueError(
                    f"{tracker.queries}\n\n{tracker.count} != {expected_count}"
                )

url_path_for()

Функция чтобы получить полный URL путь по названию функции

Пример:

def test_base(client: TestClient, url_path_for: Callable):
    response = client.get(url_path_for("ИмяФункции"))
Source code in fastapi_accelerator/testutils/utils.py
58
59
60
61
62
63
64
65
66
67
68
69
@pytest.fixture(scope="function")
def url_path_for() -> Generator[Callable[[str], str], None, None]:
    """Функция чтобы получить полный URL путь по названию функции

    Пример:

    ```python
    def test_base(client: TestClient, url_path_for: Callable):
        response = client.get(url_path_for("ИмяФункции"))
    ```
    """
    yield lambda name_url: SettingTest.instance.app.url_path_for(name_url)